Salome HOME
First test qui HPsalome container is running.
[modules/yacs.git] / src / engine / Executor.cxx
index 0e3830c8dc5f70bc905fb3362e43c4cbeac5d076..799fe96e77845b0a38e99f4ac719791e25435471 100644 (file)
@@ -23,6 +23,7 @@
 #include "Scheduler.hxx"
 #include "Dispatcher.hxx"
 #include "Container.hxx"
+#include "HomogeneousPoolContainer.hxx"
 #include "ComponentInstance.hxx"
 
 #include "VisitorSaveState.hxx"
@@ -138,7 +139,7 @@ void Executor::RunA(Scheduler *graph,int debug, bool fromScratch)
       if(debug>2)_displayDot(graph);
 
       {//Critical section
-        YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
+        YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
         tasks=graph->getNextTasks(isMore);
         graph->selectRunnableTasks(tasks);
       }//End of critical section
@@ -155,7 +156,7 @@ void Executor::RunA(Scheduler *graph,int debug, bool fromScratch)
       if(debug>1)_displayDot(graph);
 
       {//Critical section
-        YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
+        YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
         _toContinue=!graph->isFinished();
       }//End of critical section
       DEBTRACE("_toContinue: " << _toContinue);
@@ -232,7 +233,7 @@ void Executor::RunB(Scheduler *graph,int debug, bool fromScratch)
   DEBTRACE("Executor::RunB debug: "<< graph->getName() <<" "<< debug<<" fromScratch: "<<fromScratch);
 
   { // --- Critical section
-    YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
+    YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
     _mainSched = graph;
     _root = dynamic_cast<ComposedNode *>(_mainSched);
     if (!_root) throw Exception("Executor::Run, Internal Error!");
@@ -292,10 +293,11 @@ void Executor::RunB(Scheduler *graph,int debug, bool fromScratch)
       DEBTRACE("--- events...");
       if (debug > 2) _displayDot(graph);
       { // --- Critical section
-        YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
+        YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
         _tasks=graph->getNextTasks(isMore);
         numberAllTasks=_numberOfRunningTasks+_tasks.size();
         graph->selectRunnableTasks(_tasks);
+        FilterTasksConsideringContainers(_tasks);
       } // --- End of critical section
       if (debug > 2) _displayDot(graph);
       if (_executorState == YACS::RUNNING)
@@ -313,7 +315,7 @@ void Executor::RunB(Scheduler *graph,int debug, bool fromScratch)
       if (debug > 1) _displayDot(graph);
       { // --- Critical section
         DEBTRACE("---");
-        YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
+        YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
         //It is possible that the graph is finished but it remains running tasks (it's an error but we must take it into account)
         if(_numberOfRunningTasks == 0)
           _toContinue = !graph->isFinished();
@@ -349,7 +351,7 @@ void Executor::RunB(Scheduler *graph,int debug, bool fromScratch)
   DEBTRACE("End of main Loop");
 
   { // --- Critical section
-    YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
+    YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
     if ( _toContinue) // --- break while(): request to stop detected on checkBreakPoints()
       {
         DEBTRACE("stop requested: End soon");
@@ -395,7 +397,7 @@ bool Executor::isNotFinished()
 void Executor::setStopOnError(bool dumpRequested, std::string xmlFile)
 {
   { // --- Critical section
-    YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
+    YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
     _dumpErrorFile=xmlFile;
     _stopOnErrorRequested=true;
     _dumpOnErrorRequested = dumpRequested;
@@ -412,7 +414,7 @@ void Executor::setStopOnError(bool dumpRequested, std::string xmlFile)
 void Executor::unsetStopOnError()
 {
   { // --- Critical section
-    YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
+    YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
     _stopOnErrorRequested=false;
   } // --- End of critical section
 }
@@ -427,7 +429,7 @@ void Executor::setExecMode(YACS::ExecutionMode mode)
 {
   DEBTRACE("Executor::setExecMode(YACS::ExecutionMode mode) " << mode);
   { // --- Critical section
-    YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
+    YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
     _isRunningunderExternalControl=true;
     _execMode = mode;
   } // --- End of critical section
@@ -447,7 +449,7 @@ bool Executor::resumeCurrentBreakPoint()
   bool ret = false;
   //bool doDump = false;
   { // --- Critical section
-    YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
+    YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
     _isRunningunderExternalControl=true;
     DEBTRACE("_executorState: " << _executorState);
     switch (_executorState)
@@ -488,7 +490,7 @@ void Executor::setListOfBreakPoints(std::list<std::string> listOfBreakPoints)
 {
   DEBTRACE("Executor::setListOfBreakPoints(std::list<std::string> listOfBreakPoints)");
   { // --- Critical section
-    YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
+    YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
     _isRunningunderExternalControl=true;
     _listOfBreakPoints = listOfBreakPoints;
   } // --- End of critical section
@@ -506,7 +508,7 @@ std::list<std::string> Executor::getTasksToLoad()
   list<string> listOfNodesToLoad;
   listOfNodesToLoad.clear();
   { // --- Critical section
-    YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
+    YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
     _isRunningunderExternalControl=true;
     switch (_executorState)
       {
@@ -545,7 +547,7 @@ bool Executor::setStepsToExecute(std::list<std::string> listToExecute)
   vector<Task *>::iterator iter;
   vector<Task *> restrictedTasks;
   { // --- Critical section
-    YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
+    YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
     _isRunningunderExternalControl=true;
     switch (_executorState)
       {
@@ -605,7 +607,7 @@ void Executor::waitPause()
 {
   DEBTRACE("Executor::waitPause()" << _executorState);
   { // --- Critical section
-    YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
+    YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
     _isRunningunderExternalControl=true;
     switch (_executorState)
       {
@@ -723,7 +725,7 @@ bool Executor::checkBreakPoints()
       {
         bool stop = false;
         { // --- Critical section
-          YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
+          YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
           _tasksSave = _tasks;
           for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
             {
@@ -760,7 +762,7 @@ bool Executor::checkBreakPoints()
     case YACS::STEPBYSTEP:
       {
         { // --- Critical section
-          YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
+          YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
           _tasksSave = _tasks;
           _listOfTasksToLoad.clear();
           for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
@@ -812,36 +814,36 @@ void Executor::loadTask(Task *task)
 {
   DEBTRACE("Executor::loadTask(Task *task)");
   if(task->getState() != YACS::TOLOAD)return;
-  traceExec(task, "state:TOLOAD");
+  traceExec(task, "state:TOLOAD", ComputePlacement(task));
   {//Critical section
-    YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
+    YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
     _mainSched->notifyFrom(task,YACS::START);
   }//End of critical section
   try
     {
-      traceExec(task, "load");
+      traceExec(task, "load", ComputePlacement(task));
       task->load();
-      traceExec(task, "initService");
+      traceExec(task, "initService", ComputePlacement(task));
       task->initService();
     }
   catch(Exception& ex) 
     {
       std::cerr << ex.what() << std::endl;
       {//Critical section
-        YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
+        YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
         task->aborted();
         _mainSched->notifyFrom(task,YACS::ABORT);
-        traceExec(task, "state:"+Node::getStateName(task->getState()));
+        traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
       }//End of critical section
     }
   catch(...) 
     {
       std::cerr << "Load failed" << std::endl;
       {//Critical section
-        YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
+        YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
         task->aborted();
         _mainSched->notifyFrom(task,YACS::ABORT);
-        traceExec(task, "state:"+Node::getStateName(task->getState()));
+        traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
       }//End of critical section
     }
 }
@@ -863,9 +865,9 @@ void Executor::launchTasks(std::vector<Task *>& tasks)
       try
         {
           (*iter)->connectService();
-          traceExec(*iter, "connectService");
+          traceExec(*iter, "connectService",ComputePlacement(*iter));
           {//Critical section
-            YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
+            YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
             (*iter)->connected();
           }//End of critical section
         }
@@ -875,15 +877,15 @@ void Executor::launchTasks(std::vector<Task *>& tasks)
           try
             {
               (*iter)->disconnectService();
-              traceExec(*iter, "disconnectService");
+              traceExec(*iter, "disconnectService",ComputePlacement(*iter));
             }
           catch(...) 
             {
               // Disconnect has failed
-              traceExec(*iter, "disconnectService failed, ABORT");
+              traceExec(*iter, "disconnectService failed, ABORT",ComputePlacement(*iter));
             }
           {//Critical section
-            YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
+            YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
             (*iter)->aborted();
             _mainSched->notifyFrom(*iter,YACS::ABORT);
           }//End of critical section
@@ -894,15 +896,15 @@ void Executor::launchTasks(std::vector<Task *>& tasks)
           try
             {
               (*iter)->disconnectService();
-              traceExec(*iter, "disconnectService");
+              traceExec(*iter, "disconnectService",ComputePlacement(*iter));
             }
           catch(...) 
             {
               // Disconnect has failed
-              traceExec(*iter, "disconnectService failed, ABORT");
+              traceExec(*iter, "disconnectService failed, ABORT",ComputePlacement(*iter));
             }
           {//Critical section
-            YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
+            YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
             (*iter)->aborted();
             _mainSched->notifyFrom(*iter,YACS::ABORT);
           }//End of critical section
@@ -920,22 +922,22 @@ void Executor::launchTasks(std::vector<Task *>& tasks)
               try
                 {
                   t->disconnectService();
-                  traceExec(t, "disconnectService");
+                  traceExec(t, "disconnectService",ComputePlacement(*iter));
                 }
               catch(...)
                 {
                   // Disconnect has failed
-                  traceExec(t, "disconnectService failed, ABORT");
+                  traceExec(t, "disconnectService failed, ABORT",ComputePlacement(*iter));
                 }
               {//Critical section
-                YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
+                YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
                 t->aborted();
                 _mainSched->notifyFrom(t,YACS::ABORT);
               }//End of critical section
-              traceExec(t, "state:"+Node::getStateName(t->getState()));
+              traceExec(t, "state:"+Node::getStateName(t->getState()),ComputePlacement(*iter));
             }
         }
-      traceExec(*iter, "state:"+Node::getStateName((*iter)->getState()));
+      traceExec(*iter, "state:"+Node::getStateName((*iter)->getState()),ComputePlacement(*iter));
     }
 
   //Second phase, execute each task in a thread
@@ -1004,10 +1006,10 @@ void Executor::launchTask(Task *task)
   args->sched = _mainSched;
   args->execInst = this;
 
-  traceExec(task, "launch");
+  traceExec(task, "launch",ComputePlacement(task));
 
   { // --- Critical section
-    YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
+    YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
     _numberOfRunningTasks++;
     _runningTasks.insert(task);
     task->begin(); //change state to ACTIVATED
@@ -1021,7 +1023,7 @@ void Executor::sleepWhileNoEventsFromAnyRunningTask()
 {
   DEBTRACE("Executor::sleepWhileNoEventsFromAnyRunningTask()");
 //   _semForNewTasksToPerform.wait(); //----utiliser pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
-  YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
+  YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
   if (_numberOfRunningTasks > 0 && _numberOfEndedTasks==0)
     {
       _isWaitingEventsFromRunningTasks = true;
@@ -1061,7 +1063,7 @@ void Executor::wakeUp()
 int Executor::getNbOfThreads()
 {
   int ret;
-  YACS::BASES::AutoLocker alck(&_mutexForNbOfConcurrentThreads);
+  YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForNbOfConcurrentThreads);
   _isRunningunderExternalControl=true;
   ret = _groupOfAllThreadsCreated.size();
   return ret;
@@ -1090,7 +1092,7 @@ void *Executor::functionForTaskExecution(void *arg)
   Scheduler *sched=args->sched;
   Executor *execInst=args->execInst;
   delete args;
-  execInst->traceExec(task, "state:"+Node::getStateName(task->getState()));
+  execInst->traceExec(task, "state:"+Node::getStateName(task->getState()),ComputePlacement(task));
 
   Thread::detach();
 
@@ -1099,9 +1101,9 @@ void *Executor::functionForTaskExecution(void *arg)
   YACS::Event ev=YACS::FINISH;
   try
     {
-      execInst->traceExec(task, "start execution");
+      execInst->traceExec(task, "start execution",ComputePlacement(task));
       task->execute();
-      execInst->traceExec(task, "end execution OK");
+      execInst->traceExec(task, "end execution OK",ComputePlacement(task));
     }
   catch(Exception& ex)
     {
@@ -1110,14 +1112,14 @@ void *Executor::functionForTaskExecution(void *arg)
       ev=YACS::ABORT;
       string message = "end execution ABORT, ";
       message += ex.what();
-      execInst->traceExec(task, message);
+      execInst->traceExec(task, message,ComputePlacement(task));
     }
   catch(...) 
     {
       // Execution has failed
       std::cerr << "Execution has failed: unknown reason" << std::endl;
       ev=YACS::ABORT;
-      execInst->traceExec(task, "end execution ABORT, unknown reason");
+      execInst->traceExec(task, "end execution ABORT, unknown reason",ComputePlacement(task));
     }
 
   // Disconnect task
@@ -1125,19 +1127,31 @@ void *Executor::functionForTaskExecution(void *arg)
     {
       DEBTRACE("task->disconnectService()");
       task->disconnectService();
-      execInst->traceExec(task, "disconnectService");
+      execInst->traceExec(task, "disconnectService",ComputePlacement(task));
     }
   catch(...) 
     {
       // Disconnect has failed
       std::cerr << "disconnect has failed" << std::endl;
       ev=YACS::ABORT;
-      execInst->traceExec(task, "disconnectService failed, ABORT");
+      execInst->traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
+    }
+  //
+
+  std::string placement(ComputePlacement(task));
+
+  // container management for HomogeneousPoolOfContainer
+
+  HomogeneousPoolContainer *contC(dynamic_cast<HomogeneousPoolContainer *>(task->getContainer()));
+  if(contC)
+    {
+      YACS::BASES::AutoLocker<Container> alckCont(contC);
+      contC->release(task);
     }
 
   DEBTRACE("End task->execute()");
   { // --- Critical section
-    YACS::BASES::AutoLocker alck(&execInst->_mutexForSchedulerUpdate);
+    YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&execInst->_mutexForSchedulerUpdate);
     try
       {
         if (ev == YACS::FINISH) task->finished();
@@ -1151,7 +1165,7 @@ void *Executor::functionForTaskExecution(void *arg)
               }
             task->aborted();
           }
-        execInst->traceExec(task, "state:"+Node::getStateName(task->getState()));
+        execInst->traceExec(task, "state:"+Node::getStateName(task->getState()),placement);
         sched->notifyFrom(task,ev);
       }
     catch(Exception& ex)
@@ -1198,20 +1212,14 @@ void *Executor::functionForTaskExecution(void *arg)
   return 0;
 }
 
-void Executor::traceExec(Task *task, const std::string& message)
+void Executor::traceExec(Task *task, const std::string& message, const std::string& placement)
 {
   string nodeName = _mainSched->getTaskName(task);
   Container *cont = task->getContainer();
   string containerName = "---";
-  string placement = "---";
   if (cont)
-    {
-      containerName = cont->getName();
-      ComponentInstance *compo = task->getComponent();
-      ServiceNode *taskCast(dynamic_cast<ServiceNode *>(task));
-      if(taskCast)
-        placement = cont->getFullPlacementId(taskCast);
-    }
+    containerName = cont->getName();
+
 #ifdef WIN32
   DWORD now = timeGetTime();
   double elapse = (now - _start)/1000.0;
@@ -1221,7 +1229,7 @@ void Executor::traceExec(Task *task, const std::string& message)
   double elapse = (now.tv_sec - _start.tv_sec) + double(now.tv_usec - _start.tv_usec)/1000000.0;
 #endif
   {
-    YACS::BASES::AutoLocker alck(&_mutexForTrace);
+    YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForTrace);
     _trace << elapse << " " << containerName << " " << placement << " " << nodeName << " " << message << endl;
     _trace << flush;
   }
@@ -1238,3 +1246,71 @@ void Executor::sendEvent(const std::string& event)
   YASSERT(_root);
   disp->dispatch(_root,event);
 }
+
+/*!
+ * This method takes in input a list of tasks and selects from that lists a part of it considering only the containers.
+ * If tasks have no container instance subclass of HomogeneousPoolContainer this method will let the \a tsks untouched.
+ *
+ * \param [in,out] tsks - list of tasks to be
+ */
+void Executor::FilterTasksConsideringContainers(std::vector<Task *>& tsks)
+{
+  std::map<HomogeneousPoolContainer *, std::vector<Task *> > m;
+  for(std::vector<Task *>::const_iterator it=tsks.begin();it!=tsks.end();it++)
+    {
+      Task *cur(*it);
+      if(!cur)
+        continue;
+      Container *cont(cur->getContainer());
+      if(!cont)
+        {
+          m[(HomogeneousPoolContainer *)NULL].push_back(cur);
+          continue;
+        }
+      HomogeneousPoolContainer *contC(dynamic_cast<HomogeneousPoolContainer *>(cont));
+      if(!contC)
+        {
+          m[(HomogeneousPoolContainer *)NULL].push_back(cur);
+          continue;
+        }
+      m[contC].push_back(cur);
+    }
+  //
+  std::vector<Task *> ret;
+  for(std::map<HomogeneousPoolContainer *, std::vector<Task *> >::const_iterator it=m.begin();it!=m.end();it++)
+    {
+      HomogeneousPoolContainer *curhpc((*it).first);
+      const std::vector<Task *>& curtsks((*it).second);
+      if(!curhpc)
+        {
+          ret.insert(ret.end(),curtsks.begin(),curtsks.end());
+        }
+      else
+        {
+          // start of critical section for container curhpc
+          YACS::BASES::AutoLocker<Container> alckForCont(curhpc);
+          std::vector<const Task *> vecOfTaskSharingSameHPContToBeRunSimutaneously;
+          std::size_t sz(curhpc->getNumberOfFreePlace());
+          std::vector<Task *>::const_iterator it2(curtsks.begin());
+          for(std::size_t i=0;i<sz && it2!=curtsks.end();i++,it2++)
+            {
+              vecOfTaskSharingSameHPContToBeRunSimutaneously.push_back(*it2);
+              ret.push_back(*it2);
+            }
+          curhpc->allocateFor(vecOfTaskSharingSameHPContToBeRunSimutaneously);
+          //end of critical section
+        }
+    }
+  //
+  tsks=ret;
+}
+
+std::string Executor::ComputePlacement(Task *zeTask)
+{
+  std::string placement("---");
+  if(!zeTask)
+    return placement;
+  if(zeTask->getContainer())
+    placement=zeTask->getContainer()->getFullPlacementId(zeTask);
+  return placement;
+}