Salome HOME
Work in progress : workload manager engine test ok
authorOvidiu Mircescu <ovidiu.mircescu@edf.fr>
Mon, 20 Apr 2020 13:47:02 +0000 (15:47 +0200)
committerOvidiu Mircescu <ovidiu.mircescu@edf.fr>
Mon, 20 Apr 2020 13:47:02 +0000 (15:47 +0200)
14 files changed:
CMakeLists.txt
src/engine/CMakeLists.txt
src/engine/Executor.cxx
src/engine/Executor.hxx
src/engine/Test/RuntimeForEngineIntegrationTest.cxx
src/engine/Test/RuntimeForEngineTest.cxx
src/workloadmanager/CMakeLists.txt
src/workloadmanager/DefaultAlgorithm.cxx
src/workloadmanager/DefaultAlgorithm.hxx
src/workloadmanager/Task.hxx
src/workloadmanager/Test/TestMain.cxx
src/workloadmanager/WorkloadAlgorithm.hxx
src/workloadmanager/WorkloadManager.cxx
src/workloadmanager/WorkloadManager.hxx

index 06496287630f6031dcd60202953a28019d15c4fd..11eda118895698523f05f1179eeb7ca991be2e89 100644 (file)
@@ -286,7 +286,7 @@ INCLUDE(CMakePackageConfigHelpers)
 # List of targets in this project we want to make visible to the rest of the world.
 # They all have to be INSTALL'd with the option "EXPORT ${PROJECT_NAME}TargetGroup"
 SET(_${PROJECT_NAME}_exposed_targets 
-  YACSBases YACSlibEngine 
+  YACSBases YACSlibEngine YACSlibWorkloadmanager
 )
 
 IF(SALOME_YACS_USE_KERNEL)
index c6e9b85ea5c3f2dc79db23076cb55baf4a537678..d8f4d296dd0037e526435843820fc8fe292c8c4f 100644 (file)
@@ -31,12 +31,14 @@ ADD_DEFINITIONS(
 INCLUDE_DIRECTORIES(
   ${PTHREAD_INCLUDE_DIR}
   ${PROJECT_SOURCE_DIR}/src/bases
+  ${PROJECT_SOURCE_DIR}/src
   )
 
 # libraries to link to
 SET(_link_LIBRARIES
   ${PTHREAD_LIBRARIES}
   YACSBases
+  YACSlibWorkloadmanager
   )
 
 # --- headers ---
index e4a76ec5fbafd8a06cc3909e73152aff7c467207..2795d6a6d5391e7af81ef8c8e8fdf41b0d13ecfc 100644 (file)
@@ -251,12 +251,7 @@ void Executor::RunB(Scheduler *graph,int debug, bool fromScratch)
     string tracefile = "traceExec_";
     tracefile += _mainSched->getName();
     _trace.open(tracefile.c_str());
-#ifdef WIN32
-   _start = timeGetTime();
-#else
-    gettimeofday(&_start, NULL);
-#endif
-
+    _start = std::chrono::steady_clock::now();
   } // --- End of critical section
 
   if (debug > 1) _displayDot(graph);
@@ -307,7 +302,6 @@ void Executor::RunB(Scheduler *graph,int debug, bool fromScratch)
           if (checkBreakPoints()) break; // end of thread requested, OK to exit at once;
           if (debug > 0) _displayDot(graph);
           DEBTRACE("---");
-          //loadTasks(_tasks);
           loadParallelTasks(_tasks,this);
           if (debug > 1) _displayDot(graph);
           DEBTRACE("---");
@@ -899,12 +893,6 @@ struct threadargs
   Executor *execInst;
 };
 
-void Executor::loadTasks(const std::vector<Task *>& tasks, const Executor *execInst)
-{
-  for(std::vector<Task *>::const_iterator iter = _tasks.begin(); iter != _tasks.end(); iter++)
-    loadTask(*iter,execInst);
-}
-
 void Executor::loadParallelTasks(const std::vector<Task *>& tasks, const Executor *execInst)
 {
   std::vector<Thread> ths(tasks.size());
@@ -1311,14 +1299,10 @@ void Executor::traceExec(Task *task, const std::string& message, const std::stri
   if (cont)
     containerName = cont->getName();
 
-#ifdef WIN32
-  DWORD now = timeGetTime();
-  double elapse = (now - _start)/1000.0;
-#else
-  timeval now;
-  gettimeofday(&now, NULL);
-  double elapse = (now.tv_sec - _start.tv_sec) + double(now.tv_usec - _start.tv_usec)/1000000.0;
-#endif
+  std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now();
+  std::chrono::milliseconds millisec;
+  millisec = std::chrono::duration_cast<std::chrono::milliseconds>(now -_start);
+  double elapse = double(millisec.count()) / 1000.0;
   {
     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForTrace);
     _trace << elapse << " " << containerName << " " << placement << " " << nodeName << " " << message << endl;
@@ -1418,3 +1402,460 @@ std::string Executor::ComputePlacement(Task *zeTask)
     placement=zeTask->getContainer()->getFullPlacementId(zeTask);
   return placement;
 }
+
+///////// NEW EXECUTOR ////////////////////////////////
+void Executor::loadTask(Task *task)
+{
+  if(task->getState() != YACS::TOLOAD)
+    return;
+  traceExec(task, "state:TOLOAD", ComputePlacement(task));
+  {//Critical section
+    YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+    _mainSched->notifyFrom(task,YACS::START,this);
+  }//End of critical section
+  try
+    {
+      traceExec(task, "load", ComputePlacement(task));
+      task->load();
+      traceExec(task, "initService", ComputePlacement(task));
+      task->initService();
+    }
+  catch(Exception& ex) 
+    {
+      std::cerr << ex.what() << std::endl;
+      {//Critical section
+        YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+        task->aborted();
+        _mainSched->notifyFrom(task,YACS::ABORT, this);
+        traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
+      }//End of critical section
+    }
+  catch(...) 
+    {
+      std::cerr << "Load failed" << std::endl;
+      {//Critical section
+        YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+        task->aborted();
+        _mainSched->notifyFrom(task,YACS::ABORT, this);
+        traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
+      }//End of critical section
+    }
+}
+
+void Executor::beginTask(Task *task)
+{
+  // --- Critical section
+  YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+  _numberOfRunningTasks++;
+  _runningTasks.insert(task);
+  // --- End of critical section
+}
+
+void Executor::endTask(Task *task, YACS::Event ev)
+{
+  YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+  try
+  {
+    if (ev == YACS::FINISH) task->finished();
+    if (ev == YACS::ABORT)
+    {
+      _errorDetected = true;
+      if (_stopOnErrorRequested)
+      {
+        _execMode = YACS::STEPBYSTEP;
+        _isOKToEnd = true;
+      }
+      task->aborted();
+    }
+    //traceExec(task, "state:"+Node::getStateName(task->getState()),placement);
+    _mainSched->notifyFrom(task,ev,this);
+  }
+  catch(Exception& ex)
+  {
+    //notify has failed : it is supposed to have set state
+    //so no need to do anything
+    std::cerr << "Error during notification" << std::endl;
+    std::cerr << ex.what() << std::endl;
+  }
+  catch(...)
+  {
+    //notify has failed : it is supposed to have set state
+    //so no need to do anything
+    std::cerr << "Notification failed" << std::endl;
+  }
+  _numberOfRunningTasks--;
+  _runningTasks.erase(task);
+  DEBTRACE("_numberOfRunningTasks: " << _numberOfRunningTasks 
+            << " _execMode: " << _execMode
+            << " _executorState: " << _executorState);
+  if ((_numberOfRunningTasks == 0) && (_execMode != YACS::CONTINUE)) // no more running tasks
+    {
+      if (_executorState == YACS::WAITINGTASKS)
+        {
+          _executorState = YACS::PAUSED;
+          sendEvent("executor");
+          _condForPilot.notify_all();
+          if (_errorDetected &&
+              _stopOnErrorRequested &&
+              !_isRunningunderExternalControl)
+            _condForStepByStep.notify_all(); // exec thread may be on waitResume
+        }
+    }
+  if (_executorState != YACS::PAUSED)
+    wakeUp();
+}
+
+YACS::Event  Executor::runTask(Task *task)
+{
+  { // --- Critical section
+    YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+    task->begin(); //change state to ACTIVATED
+  }
+  traceExec(task, "state:"+Node::getStateName(task->getState()),ComputePlacement(task));
+
+  if(getDPLScopeSensitive())
+    {
+      Node *node(dynamic_cast<Node *>(task));
+      ComposedNode *gfn(dynamic_cast<ComposedNode *>(_mainSched));
+      if(node!=0 && gfn!=0)
+        node->applyDPLScope(gfn);
+    }
+
+  YACS::Event ev=YACS::FINISH;
+  try
+    {
+      traceExec(task, "start execution",ComputePlacement(task));
+      task->execute();
+      traceExec(task, "end execution OK",ComputePlacement(task));
+    }
+  catch(Exception& ex)
+    {
+      std::cerr << "YACS Exception during execute" << std::endl;
+      std::cerr << ex.what() << std::endl;
+      ev=YACS::ABORT;
+      string message = "end execution ABORT, ";
+      message += ex.what();
+      traceExec(task, message,ComputePlacement(task));
+    }
+  catch(...) 
+    {
+      // Execution has failed
+      std::cerr << "Execution has failed: unknown reason" << std::endl;
+      ev=YACS::ABORT;
+      traceExec(task, "end execution ABORT, unknown reason",ComputePlacement(task));
+    }
+
+  // Disconnect task
+  try
+    {
+      DEBTRACE("task->disconnectService()");
+      task->disconnectService();
+      traceExec(task, "disconnectService",ComputePlacement(task));
+    }
+  catch(...) 
+    {
+      // Disconnect has failed
+      std::cerr << "disconnect has failed" << std::endl;
+      ev=YACS::ABORT;
+      traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
+    }
+  //
+
+  std::string placement(ComputePlacement(task));
+
+  // container management for HomogeneousPoolOfContainer
+
+  HomogeneousPoolContainer *contC(dynamic_cast<HomogeneousPoolContainer *>(task->getContainer()));
+  if(contC)
+  {
+    std::lock_guard<std::mutex> alckCont(contC->getLocker());
+    contC->release(task);
+  }
+
+  return ev;
+}
+
+void Executor::makeDatastreamConnections(Task *task)
+{
+  YACS::StatesForNode state=task->getState();
+  if(state != YACS::TOLOAD && state != YACS::TORECONNECT)
+    return;
+  try
+    {
+      task->connectService();
+      traceExec(task, "connectService",ComputePlacement(task));
+      {//Critical section
+        YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+        task->connected();
+      }//End of critical section
+    }
+  catch(Exception& ex) 
+    {
+      std::cerr << ex.what() << std::endl;
+      try
+        {
+          (task)->disconnectService();
+          traceExec(task, "disconnectService",ComputePlacement(task));
+        }
+      catch(...) 
+        {
+          // Disconnect has failed
+          traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
+        }
+      {//Critical section
+        YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+        task->aborted();
+        _mainSched->notifyFrom(task,YACS::ABORT,this);
+      }//End of critical section
+    }
+  catch(...) 
+    {
+      std::cerr << "Problem in connectService" << std::endl;
+      try
+        {
+          (task)->disconnectService();
+          traceExec(task, "disconnectService",ComputePlacement(task));
+        }
+      catch(...) 
+        {
+          // Disconnect has failed
+          traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
+        }
+      {//Critical section
+        YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+        task->aborted();
+        _mainSched->notifyFrom(task,YACS::ABORT,this);
+      }//End of critical section
+    }
+  if(task->getState() == YACS::ERROR)
+    {
+      //try to put all coupled tasks in error
+      std::set<Task*> coupledSet;
+      task->getCoupledTasks(coupledSet);
+      for (std::set<Task*>::iterator it = coupledSet.begin(); it != coupledSet.end(); ++it)
+        {
+          Task* t=*it;
+          if(t == task)continue;
+          if(t->getState() == YACS::ERROR)continue;
+          try
+            {
+              t->disconnectService();
+              traceExec(t, "disconnectService",ComputePlacement(task));
+            }
+          catch(...)
+            {
+              // Disconnect has failed
+              traceExec(t, "disconnectService failed, ABORT",ComputePlacement(task));
+            }
+          {//Critical section
+            YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+            t->aborted();
+            _mainSched->notifyFrom(t,YACS::ABORT,this);
+          }//End of critical section
+          traceExec(t, "state:"+Node::getStateName(t->getState()),ComputePlacement(task));
+        }
+    }
+  traceExec(task, "state:"+Node::getStateName(task->getState()),ComputePlacement(task));
+}
+
+#include "workloadmanager/WorkloadManager.hxx"
+#include "workloadmanager/DefaultAlgorithm.hxx"
+#include "Runtime.hxx"
+static
+void loadResources(WorkloadManager::WorkloadManager& wm)
+{
+  Runtime *r(getRuntime());
+  if(!r)
+    throw YACS::Exception("loadResources : no runtime  !");
+  std::vector< std::pair<std::string,int> > data(r->getCatalogOfComputeNodes());
+  int id = 0;
+  for(const std::pair<std::string,int>& res : data)
+  {
+    WorkloadManager::Resource newResource;
+    newResource.name = res.first;
+    newResource.id = id;
+    id++;
+    newResource.nbCores = res.second;
+    wm.addResource(newResource);
+  }
+}
+
+class NewTask : public WorkloadManager::Task
+{
+public:
+  NewTask(Executor& executor, YACS::ENGINE::Task* yacsTask);
+  const WorkloadManager::ContainerType& type()const override;
+  void run(const WorkloadManager::Container& c)override;
+private:
+  WorkloadManager::ContainerType _type;
+  Executor& _executor;
+  YACS::ENGINE::Task * _yacsTask;
+};
+
+NewTask::NewTask(Executor& executor, YACS::ENGINE::Task* yacsTask)
+: _type()
+, _executor(executor)
+, _yacsTask(yacsTask)
+{
+  _type.neededCores = 0;
+  _type.id = 0;
+  _type.name = "test";
+}
+
+const WorkloadManager::ContainerType& NewTask::type()const
+{
+  return _type;
+}
+
+void NewTask::run(const WorkloadManager::Container& c)
+{
+  _executor.loadTask(_yacsTask);
+  _executor.makeDatastreamConnections(_yacsTask);
+  YACS::Event ev = _executor.runTask(_yacsTask);
+  _executor.endTask(_yacsTask, ev);
+  delete this; // provisoire
+}
+
+void Executor::newRun(Scheduler *graph,int debug, bool fromScratch)
+{
+  DEBTRACE("Executor::newRun debug: "<< graph->getName() <<" "<< debug<<" fromScratch: "<<fromScratch);
+  { // --- Critical section
+    YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+    _mainSched = graph;
+    _root = dynamic_cast<ComposedNode *>(_mainSched);
+    if (!_root) throw Exception("Executor::Run, Internal Error!");
+    _executorState = YACS::NOTYETINITIALIZED;
+    sendEvent("executor");
+    _toContinue=true;
+    _isOKToEnd = false;
+    _errorDetected = false;
+    _isWaitingEventsFromRunningTasks = false;
+    _numberOfRunningTasks = 0;
+    _runningTasks.clear();
+    _numberOfEndedTasks = 0;
+    string tracefile = "traceExec_";
+    tracefile += _mainSched->getName();
+    _trace.open(tracefile.c_str());
+    _start = std::chrono::steady_clock::now();
+  } // --- End of critical section
+
+  if (debug > 1) _displayDot(graph);
+
+  if (fromScratch)
+    {
+      try
+        {
+          graph->init();
+          graph->exUpdateState();
+        }
+      catch(Exception& ex)
+        {
+          DEBTRACE("exception: "<< (ex.what()));
+          _executorState = YACS::FINISHED;
+          sendEvent("executor");
+          throw;
+        }
+    }
+  _executorState = YACS::INITIALISED;
+  sendEvent("executor");
+
+  if (debug > 1) _displayDot(graph);
+
+  bool isMore;
+  int problemCount=0;
+  int numberAllTasks;
+
+  _executorState = YACS::RUNNING;
+  sendEvent("executor");
+
+  WorkloadManager::DefaultAlgorithm algo;
+  WorkloadManager::WorkloadManager wlm(algo);
+  loadResources(wlm);
+  wlm.start();
+  
+  while (_toContinue)
+    {
+      DEBTRACE("--- executor main loop");
+      sleepWhileNoEventsFromAnyRunningTask();
+      DEBTRACE("--- events...");
+      if (debug > 2) _displayDot(graph);
+      { // --- Critical section
+        YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+        std::vector<Task *> readyTasks=graph->getNextTasks(isMore);
+        graph->selectRunnableTasks(readyTasks);
+        _tasks.clear();
+        for(Task * t : readyTasks)
+          if(_runningTasks.find(t) == _runningTasks.end())
+            _tasks.push_back(t);
+        numberAllTasks=_numberOfRunningTasks+_tasks.size();
+      } // --- End of critical section
+      if (debug > 2) _displayDot(graph);
+      DEBTRACE("--- events...");
+      if (_executorState == YACS::RUNNING)
+      {
+        if (checkBreakPoints()) break; // end of thread requested, OK to exit at once;
+        for(Task * task : _tasks)
+        {
+          beginTask(task);
+          NewTask* newTask = new NewTask(*this, task);
+          wlm.addTask(newTask);
+        }
+      }
+      if (debug > 1) _displayDot(graph);
+      { // --- Critical section
+        DEBTRACE("---");
+        YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+        //It is possible that the graph is finished but it remains running tasks (it's an error but we must take it into account)
+        _toContinue = !graph->isFinished();
+
+        DEBTRACE("_numberOfRunningTasks: " << _numberOfRunningTasks);
+        DEBTRACE("_numberOfEndedTasks: " << _numberOfEndedTasks);
+        DEBTRACE("_toContinue: " << _toContinue);
+        if(_toContinue && numberAllTasks==0)
+        {
+          //Problem : no running tasks and no task to launch ??
+          problemCount++;
+          std::cerr << "Problem in Executor : no running tasks and no task to launch ?? problemCount=" << problemCount << std::endl;
+          //Pause to give a chance to interrupt
+          usleep(1000);
+          if(problemCount > 25)
+          {
+            // Too much problems encountered : stop execution
+            _toContinue=false;
+          }
+        }
+
+        if (! _toContinue)
+          {
+            _executorState = YACS::FINISHED;
+            sendEvent("executor");
+            _condForPilot.notify_all();
+          }
+      } // --- End of critical section
+      if (debug > 0) _displayDot(graph);
+      DEBTRACE("_toContinue: " << _toContinue);
+    }
+
+  wlm.stop();
+  DEBTRACE("End of main Loop");
+
+  { // --- Critical section
+    YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+    if ( _toContinue) // --- break while(): request to stop detected on checkBreakPoints()
+      {
+        DEBTRACE("stop requested: End soon");
+        _executorState = YACS::STOPPED;
+        _toContinue = false;
+        sendEvent("executor");
+      }
+  } // --- End of critical section
+  if ( _dumpOnErrorRequested && _errorDetected)
+    {
+      saveState(_dumpErrorFile);
+    }
+  {
+    YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForTrace);
+    _trace.close();
+  }
+  DEBTRACE("End of RunB thread");  
+}
index cb1f7285455d5b3b58e0e626f0b5439f22a6e556..50168ebd9752043d71556d4a871b91b9720162b9 100644 (file)
@@ -37,6 +37,7 @@
 #include <string>
 #include <fstream>
 #include <ctime>
+#include <chrono>
 
 namespace YACS
 {
@@ -88,17 +89,18 @@ namespace YACS
       bool _keepGoingOnFail;
       //! specifies if scope DynParaLoop is active or not. False by default.
       bool _DPLScopeSensitive;
-#ifdef WIN32
-         DWORD _start;
-#else
-      timeval _start;
-#endif
+      std::chrono::steady_clock::time_point _start;
     public:
       Executor();
       virtual ~Executor();
       void RunA(Scheduler *graph,int debug=0, bool fromScratch=true);
-      void RunW(Scheduler *graph,int debug=0, bool fromScratch=true) { RunB(graph, debug, fromScratch); }
+      void RunW(Scheduler *graph,int debug=0, bool fromScratch=true)
+      {
+        //RunB(graph, debug, fromScratch);
+        newRun(graph, debug, fromScratch);
+      }
       void RunB(Scheduler *graph,int debug=0, bool fromScratch=true);
+      void newRun(Scheduler *graph,int debug=0, bool fromScratch=true);
       void setKeepGoingProperty(bool newVal) { _keepGoingOnFail=newVal; }
       bool getKeepGoingProperty() const { return _keepGoingOnFail; }
       void setDPLScopeSensitive(bool newVal) { _DPLScopeSensitive=newVal; }
@@ -125,11 +127,17 @@ namespace YACS
       static int _maxThreads;
       static size_t _threadStackSize;
       YACS::BASES::Mutex& getTheMutexForSchedulerUpdate() { return _mutexForSchedulerUpdate; }
+      ///// new executor !!!!!
+      void loadTask(Task *task);
+      YACS::Event runTask(Task *task);
+      void makeDatastreamConnections(Task *task);
+      void beginTask(Task *task);
+      void endTask(Task *task, YACS::Event ev);
+      ////////////
     protected:
       bool checkBreakPoints();
       void waitResume();
       void loadTask(Task *task, const Executor *execInst);
-      void loadTasks(const std::vector<Task *>& tasks, const Executor *execInst);
       void loadParallelTasks(const std::vector<Task *>& tasks, const Executor *execInst);
       void launchTasks(const std::vector<Task*>& tasks);
       void launchTask(Task *task);
index bb1ec18756f01cbe8bbccaf33382493321afd86a..159e56f46beec915cb6c1c89026682ef5a73b03c 100644 (file)
@@ -36,7 +36,13 @@ void RuntimeForEngineIntegrationTest::setRuntime()
 
 std::vector< std::pair<std::string,int> > RuntimeForEngineIntegrationTest::getCatalogOfComputeNodes() const
 {
-  throw Exception("RuntimeForEngineIntegrationTest not implemented !");
+  std::vector< std::pair<std::string,int> > result(1);
+  std::pair<std::string,int> localhost;
+  localhost.first = "localhost";
+  localhost.second = 8;
+  result[0] = localhost;
+  return result;
+//  throw Exception("RuntimeForEngineTest::getCatalogOfComputeNodes : not implemented !");
 }
 
 ElementaryNode* RuntimeForEngineIntegrationTest::createNode(const std::string& implementation, const std::string& name) throw (YACS::Exception)
index d97035268c29e22178293ab2f6c7bdf3bb07f17e..45eff9e9113f95f493063fee1d3f181d4c65ee6e 100644 (file)
@@ -115,7 +115,13 @@ void RuntimeForEngineTest::setRuntime()
 
 std::vector< std::pair<std::string,int> > RuntimeForEngineTest::getCatalogOfComputeNodes() const
 {
-  throw Exception("RuntimeForEngineTest::getCatalogOfComputeNodes : not implemented !");
+  std::vector< std::pair<std::string,int> > result(1);
+  std::pair<std::string,int> localhost;
+  localhost.first = "localhost";
+  localhost.second = 8;
+  result[0] = localhost;
+  return result;
+//  throw Exception("RuntimeForEngineTest::getCatalogOfComputeNodes : not implemented !");
 }
 
 ElementaryNode* RuntimeForEngineTest::createNode(const string& implementation, const string& name) throw(YACS::Exception)
index 654ef6f32615a420eb4e543d2ed9128e85c6cc4c..ec3c10d4e5b44a918b74d53c50eecc43c6ef8f00 100644 (file)
@@ -38,7 +38,7 @@ SET (_wlm_headers
 ADD_LIBRARY(YACSlibWorkloadmanager ${_wlm_sources})
 TARGET_LINK_LIBRARIES(YACSlibWorkloadmanager ${_link_LIBRARIES})
 INSTALL(TARGETS YACSlibWorkloadmanager EXPORT ${PROJECT_NAME}TargetGroup DESTINATION ${SALOME_INSTALL_LIBS})
-INSTALL(FILES ${_wlm_headers} DESTINATION ${SALOME_INSTALL_HEADERS})
+INSTALL(FILES ${_wlm_headers} DESTINATION ${SALOME_INSTALL_HEADERS}/workloadmanager)
 
 IF(SALOME_BUILD_TESTS)
   ADD_SUBDIRECTORY(Test)
index d25d18f46d443c2df2d294ef0a866ddd804d1eeb..96e48b4f65c5a645fa3a35a51112ada4c6466aa6 100644 (file)
 #include "Task.hxx"
 #include <stdexcept>
 #include <limits>
+#include <algorithm>
 
 namespace WorkloadManager
 {
 void DefaultAlgorithm::addTask(Task* t)
 {
   // put the tasks which need more cores in front.
-  float newNeedCores = t->type()->neededCores;
+  float newNeedCores = t->type().neededCores;
   if(_waitingTasks.empty())
     _waitingTasks.push_back(t);
-  else if(_waitingTasks.back()->type()->neededCores >= newNeedCores)
+  else if(_waitingTasks.back()->type().neededCores >= newNeedCores)
     _waitingTasks.push_back(t);
   else
   {
     std::list<Task*>::iterator it = _waitingTasks.begin();
-    while(it != _waitingTasks.end() && (*it)->type()->neededCores >= newNeedCores)
+    while(it != _waitingTasks.end() && (*it)->type().neededCores >= newNeedCores)
       it++;
     _waitingTasks.insert(it, t);
   }
@@ -45,13 +46,9 @@ bool DefaultAlgorithm::empty()const
   return _waitingTasks.empty();
 }
 
-void DefaultAlgorithm::addResource(Resource* r)
+void DefaultAlgorithm::addResource(const Resource& r)
 {
-  // add the resource. The operation is ignored if the resource already exists.
-  _resources.emplace(std::piecewise_construct,
-                     std::forward_as_tuple(r),
-                     std::forward_as_tuple(r)
-                    );
+  _resources.emplace_back(r);
 }
 
 WorkloadAlgorithm::LaunchInfo DefaultAlgorithm::chooseTask()
@@ -62,18 +59,19 @@ WorkloadAlgorithm::LaunchInfo DefaultAlgorithm::chooseTask()
       !result.taskFound && itTask != _waitingTasks.end();
       itTask ++)
   {
-    const ContainerType* ctype = (*itTask)->type();
-    std::map<const Resource *, ResourceLoadInfo>::iterator best_resource;
+    const ContainerType& ctype = (*itTask)->type();
+    std::list<ResourceLoadInfo>::iterator best_resource;
     best_resource = _resources.end();
     float best_cost = std::numeric_limits<float>::max();
+    bool isSupported = false;
     for(auto itResource = _resources.begin();
         itResource != _resources.end();
         itResource++)
-      if(itResource->second.isSupported(ctype))
+      if(itResource->isSupported(ctype))
       {
-        if(itResource->second.isAllocPossible(ctype))
+        if(itResource->isAllocPossible(ctype))
         {
-          float thisCost = itResource->second.cost(ctype);
+          float thisCost = itResource->cost(ctype);
           if( best_cost > thisCost)
           {
             best_cost = thisCost;
@@ -86,9 +84,13 @@ WorkloadAlgorithm::LaunchInfo DefaultAlgorithm::chooseTask()
       chosenTaskIt = itTask;
       result.task = (*itTask);
       result.taskFound = true;
-      result.worker.resource = best_resource->first;
+      result.worker.resource = best_resource->resource();
       result.worker.type = ctype;
-      result.worker.index = best_resource->second.alloc(ctype);
+      result.worker.index = best_resource->alloc(ctype);
+    }
+    else if(!isSupported)
+    {
+      // TODO: This task can never be run by any available resource.
     }
   }
   if(result.taskFound)
@@ -98,17 +100,19 @@ WorkloadAlgorithm::LaunchInfo DefaultAlgorithm::chooseTask()
 
 void DefaultAlgorithm::liberate(const LaunchInfo& info)
 {
-  const Resource* r = info.worker.resource;
+  const Resource& r = info.worker.resource;
   unsigned int index = info.worker.index;
-  const ContainerType* ctype = info.worker.type;
-  std::map<const Resource* ,ResourceLoadInfo>::iterator it = _resources.find(r);
-  it->second.free(ctype, index);
+  const ContainerType& ctype = info.worker.type;
+  std::list<ResourceLoadInfo>::iterator it = std::find(_resources.begin(),
+                                                       _resources.end(),
+                                                       r);
+  it->free(ctype, index); // we are sure to find it
 }
 
 // ResourceInfoForContainer
 
 DefaultAlgorithm::ResourceInfoForContainer::ResourceInfoForContainer
-                                (const Resource * r, const ContainerType* ctype)
+                                (const Resource& r, const ContainerType& ctype)
 : _ctype(ctype)
 , _resource(r)
 , _runningContainers()
@@ -118,7 +122,7 @@ DefaultAlgorithm::ResourceInfoForContainer::ResourceInfoForContainer
 
 unsigned int DefaultAlgorithm::ResourceInfoForContainer::maxContainers()const
 {
-  return float(_resource->nbCores) / _ctype->neededCores;
+  return float(_resource.nbCores) / _ctype.neededCores;
 }
 
 unsigned int  DefaultAlgorithm::ResourceInfoForContainer::alloc()
@@ -151,7 +155,7 @@ bool DefaultAlgorithm::ResourceInfoForContainer::isContainerRunning
 
 // ResourceLoadInfo
 
-DefaultAlgorithm::ResourceLoadInfo::ResourceLoadInfo(const Resource * r)
+DefaultAlgorithm::ResourceLoadInfo::ResourceLoadInfo(const Resource& r)
 : _resource(r)
 , _load(0.0)
 , _ctypes()
@@ -159,47 +163,48 @@ DefaultAlgorithm::ResourceLoadInfo::ResourceLoadInfo(const Resource * r)
 }
 
 bool DefaultAlgorithm::ResourceLoadInfo::isSupported
-                                (const ContainerType* ctype)const
+                                (const ContainerType& ctype)const
 {
-  return ctype->neededCores <= _resource->nbCores ;
+  return ctype.neededCores <= _resource.nbCores ;
 }
                                           
 bool DefaultAlgorithm::ResourceLoadInfo::isAllocPossible
-                                (const ContainerType* ctype)const
+                                (const ContainerType& ctype)const
 {
-  return ctype->neededCores + _load <= _resource->nbCores;
+  return ctype.neededCores + _load <= _resource.nbCores;
 }
 
 float DefaultAlgorithm::ResourceLoadInfo::cost
-                                (const ContainerType* ctype)const
+                                (const ContainerType& ctype)const
 {
-  return _load * 100.0 / float(_resource->nbCores);
+  return _load * 100.0 / float(_resource.nbCores);
 }
 
 unsigned int DefaultAlgorithm::ResourceLoadInfo::alloc
-                                (const ContainerType* ctype)
+                                (const ContainerType& ctype)
 {
-  std::map<const ContainerType*, ResourceInfoForContainer>::iterator it;
-  it = _ctypes.find(ctype);
+  std::list<ResourceInfoForContainer>::iterator it = std::find(_ctypes.begin(),
+                                                               _ctypes.end(),
+                                                               ctype);
+  // add the type if not found
   if(it == _ctypes.end())
   {
-    // add the type if not found
-    it = _ctypes.emplace(std::piecewise_construct,
-                         std::forward_as_tuple(ctype),
-                         std::forward_as_tuple(_resource, ctype)
-                        ).first;
+    _ctypes.emplace_back(_resource, ctype);
+    it = _ctypes.end();
+    it--;
   }
-  _load += ctype->neededCores;
-  return it->second.alloc();
+  _load += ctype.neededCores;
+  return it->alloc();
 }
 
 void DefaultAlgorithm::ResourceLoadInfo::free
-                                (const ContainerType* ctype, int index)
+                                (const ContainerType& ctype, int index)
 {
-  _load -= ctype->neededCores;
-  std::map<const ContainerType*, ResourceInfoForContainer>::iterator it;
-  it = _ctypes.find(ctype);
-  it->second.free(index);
+  _load -= ctype.neededCores;
+  std::list<ResourceInfoForContainer>::iterator it = std::find(_ctypes.begin(),
+                                                               _ctypes.end(),
+                                                               ctype);
+  it->free(index);
 }
 
 }
index d30f50499f673159ef22b07de0b6b3b80c4dd09f..2e58c66c50be69d8f30edb1058e980908cff7897 100644 (file)
@@ -33,7 +33,7 @@ class DefaultAlgorithm : public WorkloadAlgorithm
 {
 public:
   void addTask(Task* t)override;
-  void addResource(Resource* r)override;
+  void addResource(const Resource& r)override;
   LaunchInfo chooseTask()override;
   void liberate(const LaunchInfo& info)override;
   bool empty()const override;
@@ -43,36 +43,46 @@ private:
   class ResourceInfoForContainer
   {
   public:
-    ResourceInfoForContainer(const Resource * r, const ContainerType* ctype);
+    ResourceInfoForContainer(const Resource& r, const ContainerType& ctype);
     unsigned int maxContainers()const;
     unsigned int  alloc();
     void free(unsigned int index);
     unsigned int nbRunningContainers()const;
     bool isContainerRunning(unsigned int index)const;
+    bool operator<(const ResourceInfoForContainer& other)const
+    { return _ctype < other._ctype;}
+    bool operator==(const ContainerType& other)const
+    { return _ctype == other;}
+    const ContainerType& type()const { return _ctype;}
   private:
-    const ContainerType* _ctype;
-    const Resource* _resource;
-    std::set<unsigned int> _runningContainers;
+    ContainerType _ctype;
+    const Resource& _resource; // same ref as ResourceLoadInfo
+    std::set<unsigned int> _runningContainers; // 0 to max possible containers on this resource
     unsigned int _firstFreeContainer;
   };
   
   class ResourceLoadInfo
   {
   public:
-    ResourceLoadInfo(const Resource * r);
-    bool isSupported(const ContainerType* ctype)const;
-    bool isAllocPossible(const ContainerType* ctype)const;
-    float cost(const ContainerType* ctype)const;
-    unsigned int alloc(const ContainerType* ctype);
-    void free(const ContainerType* ctype, int index);
+    ResourceLoadInfo(const Resource& r);
+    bool isSupported(const ContainerType& ctype)const;
+    bool isAllocPossible(const ContainerType& ctype)const;
+    float cost(const ContainerType& ctype)const;
+    unsigned int alloc(const ContainerType& ctype);
+    void free(const ContainerType& ctype, int index);
+    bool operator<(const ResourceLoadInfo& other)const
+    { return _resource < other._resource;}
+    bool operator==(const Resource& other)const
+    { return _resource == other;}
+    const Resource& resource()const { return _resource;}
   private:
-    const Resource* _resource;
+    Resource _resource;
     float _load;
-    std::map<const ContainerType*, ResourceInfoForContainer> _ctypes;
+    std::list<ResourceInfoForContainer> _ctypes;
   };
   
 private:
-  std::map<const Resource *, ResourceLoadInfo> _resources;
+  std::list<ResourceLoadInfo> _resources;
   std::list<Task*> _waitingTasks;
 };
 }
index bdada0b900931a0535380c39a2c0b00232ce0d48..3dd7c93d08e67b4617b939a97316f726c5c26b76 100644 (file)
@@ -25,24 +25,38 @@ namespace WorkloadManager
 {
   struct ContainerType
   {
-    float neededCores; // needed by WorkloadManager
-    // parameters for client use, not needed by WorkloadManager:
+    float neededCores = 0.0; // needed by WorkloadManager
+    // parameters for client use, used by WorkloadManager to distinguish objects
     std::string name;
-    int id;
+    int id = 0;
+    bool operator==(const ContainerType& other)const
+    { return id == other.id && name == other.name;}
+    bool operator<(const ContainerType& other)const
+    {
+      return (id < other.id) ||
+             (id == other.id && name < other.name);
+    }
   };
   
   struct Resource
   {
     unsigned int nbCores; // needed by WorkloadManager
-    // parameters for client use, not needed by WorkloadManager:
+    // parameters for client use, used by WorkloadManager to distinguish objects
     std::string name;
     int id;
+    bool operator==(const Resource& other)const
+    { return id == other.id && name == other.name;}
+    bool operator<(const Resource& other)const
+    {
+      return (id < other.id) ||
+             (id == other.id && name < other.name);
+    }
   };
   
   struct Container
   {
-    const ContainerType* type;
-    const Resource* resource;
+    ContainerType type;
+    Resource resource;
     unsigned int index; // worker index on the resource for this type
   };
 
@@ -52,7 +66,8 @@ namespace WorkloadManager
   class Task
   {
   public:
-    virtual const ContainerType* type()const =0;
+    virtual ~Task(){};
+    virtual const ContainerType& type()const =0;
     virtual void run(const Container& c)=0;
   };
 }
index b3b53c9d981339ac7aedaf27d64e0d2ff3574934..a550959518f2c02420839aab2746375d33d74723 100644 (file)
 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
 //
 //
-#include <cppunit/CompilerOutputter.h>
-#include <cppunit/TestResult.h>
-#include <cppunit/TestResultCollector.h>
-#include <cppunit/TextTestProgressListener.h>
-#include <cppunit/BriefTestProgressListener.h>
-#include <cppunit/extensions/TestFactoryRegistry.h>
-#include <cppunit/TestRunner.h>
-#include <cppunit/TextTestRunner.h>
+
+#include <cppunit/TestFixture.h>
 #include <stdexcept>
 
 #include <iostream>
@@ -83,12 +77,12 @@ private:
 class MyTask : public WorkloadManager::Task
 {
 public:
-  const WorkloadManager::ContainerType* type()const override {return _type;}
+  const WorkloadManager::ContainerType& type()const override {return *_type;}
   void run(const WorkloadManager::Container& c)override
   {
     _check->check(c, this);
 
-    DEBUG_LOG("Running task ", _id, " on ", c.resource->name, "-", c.type->name,
+    DEBUG_LOG("Running task ", _id, " on ", c.resource.name, "-", c.type.name,
               "-", c.index);
     std::this_thread::sleep_for(std::chrono::seconds(_sleep));
     DEBUG_LOG("Finish task ", _id);
@@ -141,7 +135,7 @@ void Checker<size_R, size_T>::check(const WorkloadManager::Container& c,
                                     MyTask* t)
 {
   std::unique_lock<std::mutex> lock(_mutex);
-  int& max = _maxContainersForResource[c.resource->id][c.type->id];
+  int& max = _maxContainersForResource[c.resource.id][c.type.id];
   if( max < c.index)
     max = c.index;
 }
@@ -155,11 +149,13 @@ void Checker<size_R, size_T>::globalCheck()
     for(std::size_t j=0; j < size_T; j++)
     {
       int max = _maxContainersForResource[i][j];
-      DEBUG_LOG(resources[i].name, ", ", types[j].name, ":", max+1);
+      DEBUG_LOG(resources[i].name, ", ", types[j].name,
+                " max simultaneous runs:", max+1);
       CPPUNIT_ASSERT( (max+1) * types[j].neededCores <= resources[i].nbCores );
       global_max += types[j].neededCores * float(max+1);
     }
-    DEBUG_LOG(resources[i].name, " global: ", global_max);
+    DEBUG_LOG(resources[i].name, " max cores added for evry type: ", global_max);
+    // This assertion may be false if there are more resources than needed.
     CPPUNIT_ASSERT(global_max >= resources[i].nbCores); // cores fully used
   }
 }
@@ -184,12 +180,13 @@ public:
 void MyTest::atest()
 {
   constexpr std::size_t resourcesNumber = 2;
-  constexpr std::size_t typesNumber = 2;
+  constexpr std::size_t typesNumber = 3;
   Checker<resourcesNumber, typesNumber> check;
   check.resources[0].nbCores = 10;
   check.resources[1].nbCores = 18;
   check.types[0].neededCores = 4.0;
   check.types[1].neededCores = 1.0;
+  check.types[2].neededCores = 0.0; // tasks to be run with no cost
 
   for(std::size_t i=0; i < resourcesNumber; i ++)
     DEBUG_LOG(check.resources[i].name, " has ", check.resources[i].nbCores,
@@ -198,22 +195,25 @@ void MyTest::atest()
     DEBUG_LOG(check.types[i].name, " needs ", check.types[i].neededCores,
               " cores.");
 
-  constexpr std::size_t tasksNumber = 100;
+  constexpr std::size_t tasksNumber = 150;
   MyTask tasks[tasksNumber];
-  for(std::size_t i = 0; i < tasksNumber / 2; i++)
-    tasks[i].reset(i, &check.types[0], 2, &check);
-  for(std::size_t i = tasksNumber / 2; i < tasksNumber; i++)
-    tasks[i].reset(i, &check.types[1], 1, &check);
+  for(int type_id = 0; type_id < typesNumber; type_id++)
+    for(int j = type_id * tasksNumber / typesNumber;
+        j < (type_id + 1) * tasksNumber / typesNumber;
+        j++)
+        //            id,  ContainerType,       sleep (1|2s)
+        tasks[j].reset(j, &check.types[type_id], 2-type_id%2, &check);
 
   DEBUG_LOG("Number of tasks: ", tasksNumber);
-  DEBUG_LOG("Tasks from 0 to ", tasksNumber/2, " are ", tasks[0].type()->name);
-  DEBUG_LOG("Tasks from ", tasksNumber/2, " to ", tasksNumber, " are ",
-            tasks[tasksNumber / 2].type()->name);
+  for(int type_id = 0; type_id < typesNumber; type_id++)
+    DEBUG_LOG("Tasks from ", type_id * tasksNumber / typesNumber, 
+              " to ", (type_id + 1) * tasksNumber / typesNumber,
+              " are of type ", check.types[type_id].name);
 
   WorkloadManager::DefaultAlgorithm algo;
   WorkloadManager::WorkloadManager wlm(algo);
   for(std::size_t i=0; i < resourcesNumber; i ++)
-    wlm.addResource(&check.resources[i]);
+    wlm.addResource(check.resources[i]);
 
   // Add 4 core tasks first
   check.reset();
index bad0dcf39118eb642447e25e2062b92537f25bf3..064747faf2d5898e7522c1593e207b66bf3214c5 100644 (file)
@@ -34,7 +34,7 @@ public:
   };
 
   virtual void addTask(Task* t)=0;
-  virtual void addResource(Resource* r)=0;
+  virtual void addResource(const Resource& r)=0;
   virtual LaunchInfo chooseTask()=0;
   virtual void liberate(const LaunchInfo& info)=0;
   virtual bool empty()const =0;
index 5d9f4e3e8fe84724396a33ded1b70296b1d34fc1..f3896ca52cbfd756a028d4220e5ecb3c5c13d927 100644 (file)
@@ -40,7 +40,7 @@ namespace WorkloadManager
     stop();
   }
   
-  void WorkloadManager::addResource(Resource* r)
+  void WorkloadManager::addResource(const Resource& r)
   {
     std::unique_lock<std::mutex> lock(_data_mutex);
     _algo.addResource(r);
@@ -78,8 +78,8 @@ namespace WorkloadManager
     }
     _startCondition.notify_one();
     _endCondition.notify_one();
-    for(std::future<void>& th : _otherThreads)
-      th.wait();
+   for(std::future<void>& th : _otherThreads)
+     th.wait();
   }
 
   void WorkloadManager::runTasks()
@@ -88,7 +88,7 @@ namespace WorkloadManager
     while(!threadStop)
     {
       std::unique_lock<std::mutex> lock(_data_mutex);
-      _startCondition.wait(lock, [this] {return !_algo.empty();});
+      _startCondition.wait(lock, [this] {return !_algo.empty() || _stop;});
       RunningInfo taskInfo;
       while(chooseTaskToRun(taskInfo))
       {
@@ -118,7 +118,11 @@ namespace WorkloadManager
     while(!threadStop)
     {
       std::unique_lock<std::mutex> lock(_data_mutex);
-      _endCondition.wait(lock, [this] {return !_finishedTasks.empty();});
+      _endCondition.wait(lock, [this]
+                            {
+                              return !_finishedTasks.empty() ||
+                              (_stop && _runningTasks.empty() && _algo.empty());
+                            });
       while(!_finishedTasks.empty())
       {
         RunningInfo taskInfo = _finishedTasks.front();
@@ -136,8 +140,9 @@ namespace WorkloadManager
   {
     // We are already under the lock
     taskInfo.id = _nextIndex;
-    _nextIndex ++;
     taskInfo.info = _algo.chooseTask();
+    if(taskInfo.info.taskFound)
+      _nextIndex ++;
     return taskInfo.info.taskFound;
   }
 
index 264b58920839e2a96c753bed2a6456fa32ee0ade..8c033b956bee96a21a3353bbad271489239cec37 100644 (file)
@@ -33,9 +33,11 @@ namespace WorkloadManager
   {
   public:
     WorkloadManager(WorkloadAlgorithm& algo);
+    WorkloadManager(const WorkloadManager&) = delete;
+    WorkloadManager()=delete;
     ~WorkloadManager();
     void addTask(Task* t);
-    void addResource(Resource* r);
+    void addResource(const Resource& r);
     void start(); //! start execution
     void stop(); //! stop execution
 
@@ -53,7 +55,7 @@ namespace WorkloadManager
     std::condition_variable _startCondition; // start tasks thread notification
     std::condition_variable _endCondition; // end tasks thread notification
     bool _stop;
-    std::list< std::future<void> > _otherThreads;
+    std::vector< std::future<void> > _otherThreads;
     WorkloadAlgorithm& _algo;
 
     void runTasks();