Salome HOME
WIP: workload manager final version of the API
[modules/yacs.git] / src / engine / Executor.cxx
index 615beff1fd5e17bcc3cd3482522e5f9d2ca67a3c..8af044080cf03e8ceeea95530cb5177606155fcb 100644 (file)
@@ -1,9 +1,9 @@
-// Copyright (C) 2006-2012  CEA/DEN, EDF R&D
+// Copyright (C) 2006-2019  CEA/DEN, EDF R&D
 //
 // This library is free software; you can redistribute it and/or
 // modify it under the terms of the GNU Lesser General Public
 // License as published by the Free Software Foundation; either
-// version 2.1 of the License.
+// version 2.1 of the License, or (at your option) any later version.
 //
 // This library is distributed in the hope that it will be useful,
 // but WITHOUT ANY WARRANTY; without even the implied warranty of
 
 #include "Executor.hxx"
 #include "Task.hxx"
+#include "AutoLocker.hxx"
 #include "Scheduler.hxx"
 #include "Dispatcher.hxx"
 #include "Container.hxx"
+#include "HomogeneousPoolContainer.hxx"
 #include "ComponentInstance.hxx"
 
 #include "VisitorSaveState.hxx"
+#include "ServiceNode.hxx"
 #include "ComposedNode.hxx"
 
+#include "workloadmanager/WorkloadManager.hxx"
+#include "workloadmanager/DefaultAlgorithm.hxx"
+
 #include <iostream>
 #include <fstream>
 #include <sys/stat.h>
@@ -38,7 +44,7 @@
 #include <cstdlib>
 #include <algorithm>
 
-#ifdef WNT
+#ifdef WIN32
 #define usleep(A) _sleep(A/1000)
 #if !defined(S_ISCHR) || !defined(S_ISREG)
 #  ifndef S_IFMT
@@ -69,9 +75,10 @@ using YACS::BASES::Semaphore;
 //#define _DEVDEBUG_
 #include "YacsTrace.hxx"
 
-int Executor::_maxThreads(50);
+int Executor::_maxThreads(1000);
+size_t Executor::_threadStackSize(262144); // Default thread stack size is 256 kB == 2**18 because threads launched by YACS are lightweight
 
-Executor::Executor():_nbOfConcurrentThreads(0), _semForMaxThreads(_maxThreads)
+Executor::Executor():_mainSched(NULL),_isWaitingEventsFromRunningTasks(false),_nbOfConcurrentThreads(0), _semForMaxThreads(_maxThreads),_keepGoingOnFail(false),_DPLScopeSensitive(false)
 {
   _root=0;
   _toContinue = true;
@@ -83,6 +90,8 @@ Executor::Executor():_nbOfConcurrentThreads(0), _semForMaxThreads(_maxThreads)
   _executorState = YACS::NOTYETINITIALIZED;
   _execMode = YACS::CONTINUE;
   _semThreadCnt = _maxThreads;
+  _numberOfRunningTasks = 0;
+  _numberOfEndedTasks = 0;
   DEBTRACE("Executor initialized with max threads = " << _maxThreads);
 }
 
@@ -135,16 +144,15 @@ void Executor::RunA(Scheduler *graph,int debug, bool fromScratch)
       if(debug>2)_displayDot(graph);
 
       {//Critical section
-        _mutexForSchedulerUpdate.lock();
+        YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
         tasks=graph->getNextTasks(isMore);
         graph->selectRunnableTasks(tasks);
-        _mutexForSchedulerUpdate.unlock();
       }//End of critical section
 
       if(debug>2)_displayDot(graph);
 
       for(iter=tasks.begin();iter!=tasks.end();iter++)
-        loadTask(*iter);
+        loadTask(*iter,this);
 
       if(debug>1)_displayDot(graph);
 
@@ -153,9 +161,8 @@ void Executor::RunA(Scheduler *graph,int debug, bool fromScratch)
       if(debug>1)_displayDot(graph);
 
       {//Critical section
-        _mutexForSchedulerUpdate.lock();
+        YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
         _toContinue=!graph->isFinished();
-        _mutexForSchedulerUpdate.unlock();
       }//End of critical section
       DEBTRACE("_toContinue: " << _toContinue);
 
@@ -231,7 +238,7 @@ void Executor::RunB(Scheduler *graph,int debug, bool fromScratch)
   DEBTRACE("Executor::RunB debug: "<< graph->getName() <<" "<< debug<<" fromScratch: "<<fromScratch);
 
   { // --- Critical section
-    _mutexForSchedulerUpdate.lock();
+    YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
     _mainSched = graph;
     _root = dynamic_cast<ComposedNode *>(_mainSched);
     if (!_root) throw Exception("Executor::Run, Internal Error!");
@@ -247,13 +254,7 @@ void Executor::RunB(Scheduler *graph,int debug, bool fromScratch)
     string tracefile = "traceExec_";
     tracefile += _mainSched->getName();
     _trace.open(tracefile.c_str());
-#ifdef WIN32
-   _start = timeGetTime();
-#else
-    gettimeofday(&_start, NULL);
-#endif
-
-    _mutexForSchedulerUpdate.unlock();
+    _start = std::chrono::steady_clock::now();
   } // --- End of critical section
 
   if (debug > 1) _displayDot(graph);
@@ -292,11 +293,11 @@ void Executor::RunB(Scheduler *graph,int debug, bool fromScratch)
       DEBTRACE("--- events...");
       if (debug > 2) _displayDot(graph);
       { // --- Critical section
-        _mutexForSchedulerUpdate.lock();
+        YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
         _tasks=graph->getNextTasks(isMore);
-        numberAllTasks=_numberOfRunningTasks+_tasks.size();
         graph->selectRunnableTasks(_tasks);
-        _mutexForSchedulerUpdate.unlock();
+        FilterTasksConsideringContainers(_tasks);
+        numberAllTasks=_numberOfRunningTasks+_tasks.size();
       } // --- End of critical section
       if (debug > 2) _displayDot(graph);
       if (_executorState == YACS::RUNNING)
@@ -304,8 +305,7 @@ void Executor::RunB(Scheduler *graph,int debug, bool fromScratch)
           if (checkBreakPoints()) break; // end of thread requested, OK to exit at once;
           if (debug > 0) _displayDot(graph);
           DEBTRACE("---");
-          for (iter = _tasks.begin(); iter != _tasks.end(); iter++)
-            loadTask(*iter);
+          loadParallelTasks(_tasks,this);
           if (debug > 1) _displayDot(graph);
           DEBTRACE("---");
           launchTasks(_tasks);
@@ -314,7 +314,7 @@ void Executor::RunB(Scheduler *graph,int debug, bool fromScratch)
       if (debug > 1) _displayDot(graph);
       { // --- Critical section
         DEBTRACE("---");
-        _mutexForSchedulerUpdate.lock();
+        YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
         //It is possible that the graph is finished but it remains running tasks (it's an error but we must take it into account)
         if(_numberOfRunningTasks == 0)
           _toContinue = !graph->isFinished();
@@ -342,7 +342,6 @@ void Executor::RunB(Scheduler *graph,int debug, bool fromScratch)
             sendEvent("executor");
             _condForPilot.notify_all();
           }
-        _mutexForSchedulerUpdate.unlock();
       } // --- End of critical section
       if (debug > 0) _displayDot(graph);
       DEBTRACE("_toContinue: " << _toContinue);
@@ -351,7 +350,7 @@ void Executor::RunB(Scheduler *graph,int debug, bool fromScratch)
   DEBTRACE("End of main Loop");
 
   { // --- Critical section
-    _mutexForSchedulerUpdate.lock();
+    YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
     if ( _toContinue) // --- break while(): request to stop detected on checkBreakPoints()
       {
         DEBTRACE("stop requested: End soon");
@@ -359,13 +358,15 @@ void Executor::RunB(Scheduler *graph,int debug, bool fromScratch)
         _toContinue = false;
         sendEvent("executor");
       }
-    _mutexForSchedulerUpdate.unlock();
   } // --- End of critical section
   if ( _dumpOnErrorRequested && _errorDetected)
     {
       saveState(_dumpErrorFile);
     }
-  _trace.close();
+  {
+    YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForTrace);
+    _trace.close();
+  }
   DEBTRACE("End of RunB thread");  
 }
 
@@ -398,13 +399,12 @@ bool Executor::isNotFinished()
 void Executor::setStopOnError(bool dumpRequested, std::string xmlFile)
 {
   { // --- Critical section
-    _mutexForSchedulerUpdate.lock();
+    YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
     _dumpErrorFile=xmlFile;
     _stopOnErrorRequested=true;
     _dumpOnErrorRequested = dumpRequested;
     if (dumpRequested && xmlFile.empty())
       throw YACS::Exception("dump on error requested and no filename given for dump");
-    _mutexForSchedulerUpdate.unlock();
     DEBTRACE("_dumpErrorFile " << _dumpErrorFile << " " << _dumpOnErrorRequested);
   } // --- End of critical section
 }
@@ -416,9 +416,8 @@ void Executor::setStopOnError(bool dumpRequested, std::string xmlFile)
 void Executor::unsetStopOnError()
 {
   { // --- Critical section
-    _mutexForSchedulerUpdate.lock();
+    YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
     _stopOnErrorRequested=false;
-    _mutexForSchedulerUpdate.unlock();
   } // --- End of critical section
 }
 
@@ -432,10 +431,9 @@ void Executor::setExecMode(YACS::ExecutionMode mode)
 {
   DEBTRACE("Executor::setExecMode(YACS::ExecutionMode mode) " << mode);
   { // --- Critical section
-    _mutexForSchedulerUpdate.lock();
+    YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
     _isRunningunderExternalControl=true;
     _execMode = mode;
-    _mutexForSchedulerUpdate.unlock();
   } // --- End of critical section
 }
 
@@ -453,7 +451,7 @@ bool Executor::resumeCurrentBreakPoint()
   bool ret = false;
   //bool doDump = false;
   { // --- Critical section
-    _mutexForSchedulerUpdate.lock();
+    YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
     _isRunningunderExternalControl=true;
     DEBTRACE("_executorState: " << _executorState);
     switch (_executorState)
@@ -480,7 +478,6 @@ bool Executor::resumeCurrentBreakPoint()
           // debug: no easy way to verify if main loop is acutally waiting on condition
         }
       }
-    _mutexForSchedulerUpdate.unlock();
     DEBTRACE("---");
     //if (doDump) saveState(_dumpErrorFile);
   } // --- End of critical section
@@ -495,10 +492,9 @@ void Executor::setListOfBreakPoints(std::list<std::string> listOfBreakPoints)
 {
   DEBTRACE("Executor::setListOfBreakPoints(std::list<std::string> listOfBreakPoints)");
   { // --- Critical section
-    _mutexForSchedulerUpdate.lock();
+    YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
     _isRunningunderExternalControl=true;
     _listOfBreakPoints = listOfBreakPoints;
-    _mutexForSchedulerUpdate.unlock();
   } // --- End of critical section
 }
 
@@ -514,7 +510,7 @@ std::list<std::string> Executor::getTasksToLoad()
   list<string> listOfNodesToLoad;
   listOfNodesToLoad.clear();
   { // --- Critical section
-    _mutexForSchedulerUpdate.lock();
+    YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
     _isRunningunderExternalControl=true;
     switch (_executorState)
       {
@@ -534,7 +530,6 @@ std::list<std::string> Executor::getTasksToLoad()
           break;
         }
       }
-    _mutexForSchedulerUpdate.unlock();
   } // --- End of critical section
   return listOfNodesToLoad;
 }
@@ -554,7 +549,7 @@ bool Executor::setStepsToExecute(std::list<std::string> listToExecute)
   vector<Task *>::iterator iter;
   vector<Task *> restrictedTasks;
   { // --- Critical section
-    _mutexForSchedulerUpdate.lock();
+    YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
     _isRunningunderExternalControl=true;
     switch (_executorState)
       {
@@ -588,7 +583,6 @@ bool Executor::setStepsToExecute(std::list<std::string> listToExecute)
           break;
         }
       }
-    _mutexForSchedulerUpdate.unlock();
     } // --- End of critical section
 
   _tasks.clear();
@@ -615,7 +609,7 @@ void Executor::waitPause()
 {
   DEBTRACE("Executor::waitPause()" << _executorState);
   { // --- Critical section
-    _mutexForSchedulerUpdate.lock();
+    YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
     _isRunningunderExternalControl=true;
     switch (_executorState)
       {
@@ -635,11 +629,40 @@ void Executor::waitPause()
           break;
         }
       }
-    _mutexForSchedulerUpdate.unlock();
   } // --- End of critical section
   DEBTRACE("---");
 }
 
+/*!
+ * This method can be called at any time simultaneously during a RunB call.
+ * This method will wait until the executor is locked in a consistent state of a running graph.
+ *
+ * This method is expected to be called in association with resume method.
+ * The returned parameter is expected to be transfered to resume method.
+ */
+bool Executor::suspendASAP()
+{
+  // no AutoLocker here. It's not a bug.
+  _mutexForSchedulerUpdate.lock();
+  if(!_toContinue && _executorState==YACS::FINISHED)
+    {// execution is finished
+      _mutexForSchedulerUpdate.unLock();
+      return false;// the executor is no more running
+    }
+  //general case. Leave method with locker in locked status
+  return true;
+}
+
+/*!
+ * This method is expected to be called in association with suspendASAP method.
+ * Expected to be called just after suspendASAP with output of resume as input parameter
+ */
+void Executor::resume(bool suspended)
+{
+  if(suspended)
+    _mutexForSchedulerUpdate.unLock();
+}
+
 //! stops the execution as soon as possible 
 
 void Executor::stopExecution()
@@ -655,11 +678,19 @@ void Executor::stopExecution()
 bool Executor::saveState(const std::string& xmlFile)
 {
   DEBTRACE("Executor::saveState() in " << xmlFile);
-  YACS::ENGINE::VisitorSaveState vst(_root);
-  vst.openFileDump(xmlFile.c_str());
-  _root->accept(&vst);
-  vst.closeFileDump();
-  return true;
+  bool result = false;
+  try {
+    YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+    YACS::ENGINE::VisitorSaveState vst(_root);
+    vst.openFileDump(xmlFile.c_str());
+    _root->accept(&vst);
+    vst.closeFileDump();
+    result = true;
+  }
+  catch(Exception& ex) {
+    std::cerr << ex.what() << std::endl;
+  }
+  return result;
 }
 
 //! not yet implemented
@@ -734,7 +765,7 @@ bool Executor::checkBreakPoints()
       {
         bool stop = false;
         { // --- Critical section
-          _mutexForSchedulerUpdate.lock();
+          YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
           _tasksSave = _tasks;
           for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
             {
@@ -761,11 +792,8 @@ bool Executor::checkBreakPoints()
               sendEvent("executor");
               _condForPilot.notify_all();
             }
-          //_mutexForSchedulerUpdate.unlock(); 
-          //} // --- End of critical section
           if (stop && !_isOKToEnd) waitResume(); // wait until pilot calls resumeCurrentBreakPoint(), mutex released during wait 
           if (_isOKToEnd) endRequested = true;
-          _mutexForSchedulerUpdate.unlock();
         } // --- End of critical section
           if (stop) DEBTRACE("wake up from waitResume");
         break;
@@ -774,7 +802,7 @@ bool Executor::checkBreakPoints()
     case YACS::STEPBYSTEP:
       {
         { // --- Critical section
-          _mutexForSchedulerUpdate.lock();
+          YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
           _tasksSave = _tasks;
           _listOfTasksToLoad.clear();
           for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
@@ -792,7 +820,6 @@ bool Executor::checkBreakPoints()
             waitResume(); // wait until pilot calls resumeCurrentBreakPoint(), mutex released during wait
                           // or, if no pilot, wait until no more running tasks (stop on error)
           if (_isOKToEnd) endRequested = true;
-          _mutexForSchedulerUpdate.unlock();
         } // --- End of critical section
         DEBTRACE("wake up from waitResume");
         break;
@@ -823,69 +850,88 @@ void Executor::waitResume()
  *  \param task  : Task to load
  */
 
-void Executor::loadTask(Task *task)
+void Executor::loadTask(Task *task, const Executor *execInst)
 {
   DEBTRACE("Executor::loadTask(Task *task)");
-  if(task->getState() != YACS::TOLOAD)return;
-  traceExec(task, "state:TOLOAD");
+  if(task->getState() != YACS::TOLOAD)
+    return;
+  traceExec(task, "state:TOLOAD", ComputePlacement(task));
   {//Critical section
-    _mutexForSchedulerUpdate.lock();
-    _mainSched->notifyFrom(task,YACS::START);
-    _mutexForSchedulerUpdate.unlock();
+    YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+    _mainSched->notifyFrom(task,YACS::START,execInst);
   }//End of critical section
   try
     {
-      traceExec(task, "load");
+      traceExec(task, "load", ComputePlacement(task));
       task->load();
-      traceExec(task, "initService");
+      traceExec(task, "initService", ComputePlacement(task));
       task->initService();
     }
   catch(Exception& ex) 
     {
       std::cerr << ex.what() << std::endl;
       {//Critical section
-        _mutexForSchedulerUpdate.lock();
+        YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
         task->aborted();
-        _mainSched->notifyFrom(task,YACS::ABORT);
-        traceExec(task, "state:"+Node::getStateName(task->getState()));
-        _mutexForSchedulerUpdate.unlock();
+        _mainSched->notifyFrom(task,YACS::ABORT,execInst);
+        traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
       }//End of critical section
     }
   catch(...) 
     {
       std::cerr << "Load failed" << std::endl;
       {//Critical section
-        _mutexForSchedulerUpdate.lock();
+        YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
         task->aborted();
-        _mainSched->notifyFrom(task,YACS::ABORT);
-        traceExec(task, "state:"+Node::getStateName(task->getState()));
-        _mutexForSchedulerUpdate.unlock();
+        _mainSched->notifyFrom(task,YACS::ABORT,execInst);
+        traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
       }//End of critical section
     }
 }
 
+struct threadargs
+{
+  Task *task;
+  Scheduler *sched;
+  Executor *execInst;
+};
+
+void Executor::loadParallelTasks(const std::vector<Task *>& tasks, const Executor *execInst)
+{
+  std::vector<Thread> ths(tasks.size());
+  std::size_t ithread(0);
+  for(std::vector<Task *>::const_iterator iter = _tasks.begin(); iter != _tasks.end(); iter++, ithread++)
+    {
+      DEBTRACE("Executor::loadParallelTasks(Task *task)");
+      struct threadargs *args(new threadargs);
+      args->task = (*iter);
+      args->sched = _mainSched;
+      args->execInst = this;
+      ths[ithread].go(functionForTaskLoad, args, _threadStackSize);
+    }
+  for(ithread=0;ithread<tasks.size();ithread++)
+    ths[ithread].join();
+}
 
 //! Execute a list of tasks possibly connected through datastream links
 /*!
  *  \param tasks  : a list of tasks to execute
  *
  */
-void Executor::launchTasks(std::vector<Task *>& tasks)
+void Executor::launchTasks(const std::vector<Task *>& tasks)
 {
-  vector<Task *>::iterator iter;
   //First phase, make datastream connections
-  for(iter=tasks.begin();iter!=tasks.end();iter++)
+  for(vector<Task *>::const_iterator iter=tasks.begin();iter!=tasks.end();iter++)
     {
       YACS::StatesForNode state=(*iter)->getState();
       if(state != YACS::TOLOAD && state != YACS::TORECONNECT)continue;
       try
         {
           (*iter)->connectService();
-          traceExec(*iter, "connectService");
+          traceExec(*iter, "connectService",ComputePlacement(*iter));
           {//Critical section
-            _mutexForSchedulerUpdate.lock();
+            YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
             (*iter)->connected();
-            _mutexForSchedulerUpdate.unlock();
           }//End of critical section
         }
       catch(Exception& ex) 
@@ -894,18 +940,17 @@ void Executor::launchTasks(std::vector<Task *>& tasks)
           try
             {
               (*iter)->disconnectService();
-              traceExec(*iter, "disconnectService");
+              traceExec(*iter, "disconnectService",ComputePlacement(*iter));
             }
           catch(...) 
             {
               // Disconnect has failed
-              traceExec(*iter, "disconnectService failed, ABORT");
+              traceExec(*iter, "disconnectService failed, ABORT",ComputePlacement(*iter));
             }
           {//Critical section
-            _mutexForSchedulerUpdate.lock();
+            YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
             (*iter)->aborted();
-            _mainSched->notifyFrom(*iter,YACS::ABORT);
-            _mutexForSchedulerUpdate.unlock();
+            _mainSched->notifyFrom(*iter,YACS::ABORT,this);
           }//End of critical section
         }
       catch(...) 
@@ -914,18 +959,17 @@ void Executor::launchTasks(std::vector<Task *>& tasks)
           try
             {
               (*iter)->disconnectService();
-              traceExec(*iter, "disconnectService");
+              traceExec(*iter, "disconnectService",ComputePlacement(*iter));
             }
           catch(...) 
             {
               // Disconnect has failed
-              traceExec(*iter, "disconnectService failed, ABORT");
+              traceExec(*iter, "disconnectService failed, ABORT",ComputePlacement(*iter));
             }
           {//Critical section
-            _mutexForSchedulerUpdate.lock();
+            YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
             (*iter)->aborted();
-            _mainSched->notifyFrom(*iter,YACS::ABORT);
-            _mutexForSchedulerUpdate.unlock();
+            _mainSched->notifyFrom(*iter,YACS::ABORT,this);
           }//End of critical section
         }
       if((*iter)->getState() == YACS::ERROR)
@@ -941,38 +985,31 @@ void Executor::launchTasks(std::vector<Task *>& tasks)
               try
                 {
                   t->disconnectService();
-                  traceExec(t, "disconnectService");
+                  traceExec(t, "disconnectService",ComputePlacement(*iter));
                 }
               catch(...)
                 {
                   // Disconnect has failed
-                  traceExec(t, "disconnectService failed, ABORT");
+                  traceExec(t, "disconnectService failed, ABORT",ComputePlacement(*iter));
                 }
               {//Critical section
-                _mutexForSchedulerUpdate.lock();
+                YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
                 t->aborted();
-                _mainSched->notifyFrom(t,YACS::ABORT);
-                _mutexForSchedulerUpdate.unlock();
+                _mainSched->notifyFrom(t,YACS::ABORT,this);
               }//End of critical section
-              traceExec(t, "state:"+Node::getStateName(t->getState()));
+              traceExec(t, "state:"+Node::getStateName(t->getState()),ComputePlacement(*iter));
             }
         }
-      traceExec(*iter, "state:"+Node::getStateName((*iter)->getState()));
+      traceExec(*iter, "state:"+Node::getStateName((*iter)->getState()),ComputePlacement(*iter));
     }
 
   //Second phase, execute each task in a thread
-  for(iter=tasks.begin();iter!=tasks.end();iter++)
+  for(vector<Task *>::const_iterator iter=tasks.begin();iter!=tasks.end();iter++)
     {
       launchTask(*iter);
     }
 }
 
-struct threadargs {
-  Task *task;
-  Scheduler *sched;
-  Executor *execInst;
-};
-
 //! Execute a Task in a thread
 /*!
  *  \param task  : Task to execute
@@ -991,6 +1028,8 @@ void Executor::launchTask(Task *task)
   DEBTRACE("before _semForMaxThreads.wait " << _semThreadCnt);
   if(_semThreadCnt == 0)
     {
+      // --- Critical section
+      YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
       //check if we have enough threads to run
       std::set<Task*> tmpSet=_runningTasks;
       std::set<Task*>::iterator it = tmpSet.begin();
@@ -1016,6 +1055,7 @@ void Executor::launchTask(Task *task)
           std::cerr << "WARNING: maybe you need more threads to run your schema (current value="<< _maxThreads << ")" << std::endl;
           std::cerr << "If it is the case, set the YACS_MAX_THREADS environment variable to a bigger value (export YACS_MAX_THREADS=xxx)" << std::endl;
         }
+      // --- End of critical section
     }
 
   _semForMaxThreads.wait();
@@ -1026,16 +1066,15 @@ void Executor::launchTask(Task *task)
   args->sched = _mainSched;
   args->execInst = this;
 
-  traceExec(task, "launch");
+  traceExec(task, "launch",ComputePlacement(task));
 
   { // --- Critical section
-    _mutexForSchedulerUpdate.lock();
+    YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
     _numberOfRunningTasks++;
     _runningTasks.insert(task);
     task->begin(); //change state to ACTIVATED
-    _mutexForSchedulerUpdate.unlock();
   } // --- End of critical section
-  Thread(functionForTaskExecution,args);
+  Thread(functionForTaskExecution, args, _threadStackSize);
 }
 
 //! wait until a running task ends
@@ -1044,14 +1083,13 @@ void Executor::sleepWhileNoEventsFromAnyRunningTask()
 {
   DEBTRACE("Executor::sleepWhileNoEventsFromAnyRunningTask()");
 //   _semForNewTasksToPerform.wait(); //----utiliser pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
-  _mutexForSchedulerUpdate.lock();
+  YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
   if (_numberOfRunningTasks > 0 && _numberOfEndedTasks==0)
     {
       _isWaitingEventsFromRunningTasks = true;
       _condForNewTasksToPerform.wait(_mutexForSchedulerUpdate); // mutex released during wait
     }
   _numberOfEndedTasks=0;
-  _mutexForSchedulerUpdate.unlock();
   DEBTRACE("---");
 }
 
@@ -1085,13 +1123,26 @@ void Executor::wakeUp()
 int Executor::getNbOfThreads()
 {
   int ret;
-  _mutexForNbOfConcurrentThreads.lock();
+  YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForNbOfConcurrentThreads);
   _isRunningunderExternalControl=true;
   ret = _groupOfAllThreadsCreated.size();
-  _mutexForNbOfConcurrentThreads.unlock();
   return ret;
 }
 
+/*!
+ * This thread is NOT supposed to be detached !
+ */
+void *Executor::functionForTaskLoad(void *arg)
+{
+  DEBTRACE("Executor::functionForTaskLoad(void *arg)");
+  struct threadargs *args = (struct threadargs *) arg;
+  Task *task=args->task;
+  Scheduler *sched=args->sched;
+  Executor *execInst=args->execInst;
+  delete args;
+  execInst->loadTask(task,execInst);// no throw of this method - all throw are catched !
+  return 0;
+}
 
 //! Function to perform execution of a task in a thread
 /*!
@@ -1115,18 +1166,26 @@ void *Executor::functionForTaskExecution(void *arg)
   Scheduler *sched=args->sched;
   Executor *execInst=args->execInst;
   delete args;
-  execInst->traceExec(task, "state:"+Node::getStateName(task->getState()));
+  execInst->traceExec(task, "state:"+Node::getStateName(task->getState()),ComputePlacement(task));
 
   Thread::detach();
 
   // Execute task
 
+  if(execInst->getDPLScopeSensitive())
+    {
+      Node *node(dynamic_cast<Node *>(task));
+      ComposedNode *gfn(dynamic_cast<ComposedNode *>(sched));
+      if(node!=0 && gfn!=0)
+        node->applyDPLScope(gfn);
+    }
+
   YACS::Event ev=YACS::FINISH;
   try
     {
-      execInst->traceExec(task, "start execution");
+      execInst->traceExec(task, "start execution",ComputePlacement(task));
       task->execute();
-      execInst->traceExec(task, "end execution OK");
+      execInst->traceExec(task, "end execution OK",ComputePlacement(task));
     }
   catch(Exception& ex)
     {
@@ -1135,14 +1194,14 @@ void *Executor::functionForTaskExecution(void *arg)
       ev=YACS::ABORT;
       string message = "end execution ABORT, ";
       message += ex.what();
-      execInst->traceExec(task, message);
+      execInst->traceExec(task, message,ComputePlacement(task));
     }
   catch(...) 
     {
       // Execution has failed
       std::cerr << "Execution has failed: unknown reason" << std::endl;
       ev=YACS::ABORT;
-      execInst->traceExec(task, "end execution ABORT, unknown reason");
+      execInst->traceExec(task, "end execution ABORT, unknown reason",ComputePlacement(task));
     }
 
   // Disconnect task
@@ -1150,19 +1209,31 @@ void *Executor::functionForTaskExecution(void *arg)
     {
       DEBTRACE("task->disconnectService()");
       task->disconnectService();
-      execInst->traceExec(task, "disconnectService");
+      execInst->traceExec(task, "disconnectService",ComputePlacement(task));
     }
   catch(...) 
     {
       // Disconnect has failed
       std::cerr << "disconnect has failed" << std::endl;
       ev=YACS::ABORT;
-      execInst->traceExec(task, "disconnectService failed, ABORT");
+      execInst->traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
+    }
+  //
+
+  std::string placement(ComputePlacement(task));
+
+  // container management for HomogeneousPoolOfContainer
+
+  HomogeneousPoolContainer *contC(dynamic_cast<HomogeneousPoolContainer *>(task->getContainer()));
+  if(contC)
+    {
+      std::lock_guard<std::mutex> alckCont(contC->getLocker());
+      contC->release(task);
     }
 
   DEBTRACE("End task->execute()");
   { // --- Critical section
-    execInst->_mutexForSchedulerUpdate.lock();
+    YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&execInst->_mutexForSchedulerUpdate);
     try
       {
         if (ev == YACS::FINISH) task->finished();
@@ -1176,8 +1247,8 @@ void *Executor::functionForTaskExecution(void *arg)
               }
             task->aborted();
           }
-        execInst->traceExec(task, "state:"+Node::getStateName(task->getState()));
-        sched->notifyFrom(task,ev);
+        execInst->traceExec(task, "state:"+Node::getStateName(task->getState()),placement);
+        sched->notifyFrom(task,ev,execInst);
       }
     catch(Exception& ex)
       {
@@ -1216,7 +1287,6 @@ void *Executor::functionForTaskExecution(void *arg)
     DEBTRACE("after _semForMaxThreads.post " << execInst->_semThreadCnt);
     if (execInst->_executorState != YACS::PAUSED)  execInst->wakeUp();
 
-    execInst->_mutexForSchedulerUpdate.unlock();
   } // --- End of critical section (change state)
 
   //execInst->notifyEndOfThread(0);
@@ -1224,31 +1294,23 @@ void *Executor::functionForTaskExecution(void *arg)
   return 0;
 }
 
-void Executor::traceExec(Task *task, const std::string& message)
+void Executor::traceExec(Task *task, const std::string& message, const std::string& placement)
 {
   string nodeName = _mainSched->getTaskName(task);
   Container *cont = task->getContainer();
   string containerName = "---";
-  string placement = "---";
   if (cont)
-    {
-      containerName = cont->getName();
-      ComponentInstance *compo = task->getComponent();
-      //if (compo)
-      placement = cont->getFullPlacementId(compo);
-    }
-#ifdef WIN32
-  DWORD now = timeGetTime();
-  double elapse = (now - _start)/1000.0;
-#else
-  timeval now;
-  gettimeofday(&now, NULL);
-  double elapse = (now.tv_sec - _start.tv_sec) + double(now.tv_usec - _start.tv_usec)/1000000.0;
-#endif
-  _mutexForTrace.lock();
-  _trace << elapse << " " << containerName << " " << placement << " " << nodeName << " " << message << endl;
-  _trace << flush;
-  _mutexForTrace.unlock();
+    containerName = cont->getName();
+
+  std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now();
+  std::chrono::milliseconds millisec;
+  millisec = std::chrono::duration_cast<std::chrono::milliseconds>(now -_start);
+  double elapse = double(millisec.count()) / 1000.0;
+  {
+    YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForTrace);
+    _trace << elapse << " " << containerName << " " << placement << " " << nodeName << " " << message << endl;
+    _trace << flush;
+  }
 }
 
 //! emit notification to all observers registered with  the dispatcher 
@@ -1262,3 +1324,572 @@ void Executor::sendEvent(const std::string& event)
   YASSERT(_root);
   disp->dispatch(_root,event);
 }
+
+struct HPCCompare
+{
+  bool operator()(HomogeneousPoolContainer * lhs, HomogeneousPoolContainer * rhs) const
+  {
+    if(!lhs && !rhs)
+      return false;
+    if(!lhs)
+      return true;
+    if(!rhs)
+      return false;
+    return lhs->getNumberOfCoresPerWorker() < rhs->getNumberOfCoresPerWorker();
+  }
+};
+
+/*!
+ * This method takes in input a list of tasks and selects from that lists a part of it considering only the containers.
+ * If tasks have no container instance subclass of HomogeneousPoolContainer this method will let the \a tsks untouched.
+ *
+ * \param [in,out] tsks - list of tasks to be
+ */
+void Executor::FilterTasksConsideringContainers(std::vector<Task *>& tsks)
+{
+  std::map<HomogeneousPoolContainer *, std::vector<Task *>, HPCCompare > m;
+  for(auto cur : tsks)
+    {
+      if(!cur)
+        continue;
+      Container *cont(cur->getContainer());
+      if(!cont)
+        {
+          m[nullptr].push_back(cur);
+          continue;
+        }
+      HomogeneousPoolContainer *contC(dynamic_cast<HomogeneousPoolContainer *>(cont));
+      if(!contC)
+        {
+          m[nullptr].push_back(cur);
+          continue;
+        }
+      m[contC].push_back(cur);
+    }
+  //
+  std::vector<Task *> ret;
+  for(auto it : m)
+    {
+      HomogeneousPoolContainer *curhpc(it.first);
+      const std::vector<Task *>& curtsks(it.second);
+      if(!curhpc)
+        {
+          ret.insert(ret.end(),curtsks.begin(),curtsks.end());
+        }
+      else
+        {
+          // start of critical section for container curhpc
+          std::lock_guard<std::mutex> alckCont(curhpc->getLocker());
+          std::vector<const Task *> vecOfTaskSharingSameHPContToBeRunSimutaneously;
+          std::size_t sz(curhpc->getNumberOfFreePlace());
+          std::vector<Task *>::const_iterator it2(curtsks.begin());
+          for(std::size_t i=0;i<sz && it2!=curtsks.end();i++,it2++)
+            {
+              vecOfTaskSharingSameHPContToBeRunSimutaneously.push_back(*it2);
+              ret.push_back(*it2);
+            }
+          curhpc->allocateFor(vecOfTaskSharingSameHPContToBeRunSimutaneously);
+          //end of critical section
+        }
+    }
+  //
+  tsks=ret;
+}
+
+std::string Executor::ComputePlacement(Task *zeTask)
+{
+  std::string placement("---");
+  if(!zeTask)
+    return placement;
+  if(zeTask->getContainer())
+    placement=zeTask->getContainer()->getFullPlacementId(zeTask);
+  return placement;
+}
+
+///////// NEW EXECUTOR ////////////////////////////////
+void Executor::loadTask(Task *task, const WorkloadManager::RunInfo& runInfo)
+{
+  if(task->getState() != YACS::TOLOAD)
+    return;
+  traceExec(task, "state:TOLOAD", ComputePlacement(task));
+  {//Critical section
+    YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+    _mainSched->notifyFrom(task,YACS::START,this);
+  }//End of critical section
+  try
+    {
+      std::ostringstream container_name;
+      container_name << runInfo.resource.name << "-"
+                     << runInfo.type.name << "-" << runInfo.index;
+      task->imposeResource(runInfo.resource.name, container_name.str());
+      traceExec(task, "load", ComputePlacement(task));
+      task->load();
+      traceExec(task, "initService", ComputePlacement(task));
+      task->initService();
+    }
+  catch(Exception& ex) 
+    {
+      std::cerr << ex.what() << std::endl;
+      {//Critical section
+        YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+        task->aborted();
+        _mainSched->notifyFrom(task,YACS::ABORT, this);
+        traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
+      }//End of critical section
+    }
+  catch(...) 
+    {
+      std::cerr << "Load failed" << std::endl;
+      {//Critical section
+        YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+        task->aborted();
+        _mainSched->notifyFrom(task,YACS::ABORT, this);
+        traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
+      }//End of critical section
+    }
+}
+
+void Executor::beginTask(Task *task)
+{
+  // --- Critical section
+  YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+  _numberOfRunningTasks++;
+  _runningTasks.insert(task);
+  // --- End of critical section
+}
+
+void Executor::endTask(Task *task, YACS::Event ev)
+{
+  YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+  try
+  {
+    if (ev == YACS::FINISH) task->finished();
+    if (ev == YACS::ABORT)
+    {
+      _errorDetected = true;
+      if (_stopOnErrorRequested)
+      {
+        _execMode = YACS::STEPBYSTEP;
+        _isOKToEnd = true;
+      }
+      task->aborted();
+    }
+    //traceExec(task, "state:"+Node::getStateName(task->getState()),placement);
+    _mainSched->notifyFrom(task,ev,this);
+  }
+  catch(Exception& ex)
+  {
+    //notify has failed : it is supposed to have set state
+    //so no need to do anything
+    std::cerr << "Error during notification" << std::endl;
+    std::cerr << ex.what() << std::endl;
+  }
+  catch(...)
+  {
+    //notify has failed : it is supposed to have set state
+    //so no need to do anything
+    std::cerr << "Notification failed" << std::endl;
+  }
+  _numberOfRunningTasks--;
+  _runningTasks.erase(task);
+  DEBTRACE("_numberOfRunningTasks: " << _numberOfRunningTasks 
+            << " _execMode: " << _execMode
+            << " _executorState: " << _executorState);
+  if ((_numberOfRunningTasks == 0) && (_execMode != YACS::CONTINUE)) // no more running tasks
+    {
+      if (_executorState == YACS::WAITINGTASKS)
+        {
+          _executorState = YACS::PAUSED;
+          sendEvent("executor");
+          _condForPilot.notify_all();
+          if (_errorDetected &&
+              _stopOnErrorRequested &&
+              !_isRunningunderExternalControl)
+            _condForStepByStep.notify_all(); // exec thread may be on waitResume
+        }
+    }
+  if (_executorState != YACS::PAUSED)
+    wakeUp();
+}
+
+YACS::Event  Executor::runTask(Task *task)
+{
+  { // --- Critical section
+    YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+    task->begin(); //change state to ACTIVATED
+  }
+  traceExec(task, "state:"+Node::getStateName(task->getState()),ComputePlacement(task));
+
+  if(getDPLScopeSensitive())
+    {
+      Node *node(dynamic_cast<Node *>(task));
+      ComposedNode *gfn(dynamic_cast<ComposedNode *>(_mainSched));
+      if(node!=0 && gfn!=0)
+        node->applyDPLScope(gfn);
+    }
+
+  YACS::Event ev=YACS::FINISH;
+  try
+    {
+      traceExec(task, "start execution",ComputePlacement(task));
+      task->execute();
+      traceExec(task, "end execution OK",ComputePlacement(task));
+    }
+  catch(Exception& ex)
+    {
+      std::cerr << "YACS Exception during execute" << std::endl;
+      std::cerr << ex.what() << std::endl;
+      ev=YACS::ABORT;
+      string message = "end execution ABORT, ";
+      message += ex.what();
+      traceExec(task, message,ComputePlacement(task));
+    }
+  catch(...) 
+    {
+      // Execution has failed
+      std::cerr << "Execution has failed: unknown reason" << std::endl;
+      ev=YACS::ABORT;
+      traceExec(task, "end execution ABORT, unknown reason",ComputePlacement(task));
+    }
+
+  // Disconnect task
+  try
+    {
+      DEBTRACE("task->disconnectService()");
+      task->disconnectService();
+      traceExec(task, "disconnectService",ComputePlacement(task));
+    }
+  catch(...) 
+    {
+      // Disconnect has failed
+      std::cerr << "disconnect has failed" << std::endl;
+      ev=YACS::ABORT;
+      traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
+    }
+  //
+
+  std::string placement(ComputePlacement(task));
+
+  // container management for HomogeneousPoolOfContainer
+
+  HomogeneousPoolContainer *contC(dynamic_cast<HomogeneousPoolContainer *>(task->getContainer()));
+  if(contC)
+  {
+    std::lock_guard<std::mutex> alckCont(contC->getLocker());
+    contC->release(task);
+  }
+
+  return ev;
+}
+
+void Executor::makeDatastreamConnections(Task *task)
+{
+  YACS::StatesForNode state=task->getState();
+  if(state != YACS::TOLOAD && state != YACS::TORECONNECT)
+    return;
+  try
+    {
+      task->connectService();
+      traceExec(task, "connectService",ComputePlacement(task));
+      {//Critical section
+        YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+        task->connected();
+      }//End of critical section
+    }
+  catch(Exception& ex) 
+    {
+      std::cerr << ex.what() << std::endl;
+      try
+        {
+          (task)->disconnectService();
+          traceExec(task, "disconnectService",ComputePlacement(task));
+        }
+      catch(...) 
+        {
+          // Disconnect has failed
+          traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
+        }
+      {//Critical section
+        YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+        task->aborted();
+        _mainSched->notifyFrom(task,YACS::ABORT,this);
+      }//End of critical section
+    }
+  catch(...) 
+    {
+      std::cerr << "Problem in connectService" << std::endl;
+      try
+        {
+          (task)->disconnectService();
+          traceExec(task, "disconnectService",ComputePlacement(task));
+        }
+      catch(...) 
+        {
+          // Disconnect has failed
+          traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
+        }
+      {//Critical section
+        YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+        task->aborted();
+        _mainSched->notifyFrom(task,YACS::ABORT,this);
+      }//End of critical section
+    }
+  if(task->getState() == YACS::ERROR)
+    {
+      //try to put all coupled tasks in error
+      std::set<Task*> coupledSet;
+      task->getCoupledTasks(coupledSet);
+      for (std::set<Task*>::iterator it = coupledSet.begin(); it != coupledSet.end(); ++it)
+        {
+          Task* t=*it;
+          if(t == task)continue;
+          if(t->getState() == YACS::ERROR)continue;
+          try
+            {
+              t->disconnectService();
+              traceExec(t, "disconnectService",ComputePlacement(task));
+            }
+          catch(...)
+            {
+              // Disconnect has failed
+              traceExec(t, "disconnectService failed, ABORT",ComputePlacement(task));
+            }
+          {//Critical section
+            YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+            t->aborted();
+            _mainSched->notifyFrom(t,YACS::ABORT,this);
+          }//End of critical section
+          traceExec(t, "state:"+Node::getStateName(t->getState()),ComputePlacement(task));
+        }
+    }
+  traceExec(task, "state:"+Node::getStateName(task->getState()),ComputePlacement(task));
+}
+
+#include "Runtime.hxx"
+static
+void loadResources(WorkloadManager::WorkloadManager& wm)
+{
+  Runtime *r(getRuntime());
+  if(!r)
+    throw YACS::Exception("loadResources : no runtime  !");
+  std::vector< std::pair<std::string,int> > data(r->getCatalogOfComputeNodes());
+  int id = 0;
+  for(const std::pair<std::string,int>& res : data)
+  {
+    WorkloadManager::Resource newResource;
+    newResource.name = res.first;
+    newResource.id = id;
+    id++;
+    newResource.nbCores = res.second;
+    wm.addResource(newResource);
+  }
+}
+
+class NewTask : public WorkloadManager::Task
+{
+public:
+  NewTask(Executor& executor, YACS::ENGINE::Task* yacsTask);
+  const WorkloadManager::ContainerType& type()const override;
+  void run(const WorkloadManager::RunInfo& runInfo)override;
+private:
+  WorkloadManager::ContainerType _type;
+  Executor& _executor;
+  YACS::ENGINE::Task * _yacsTask;
+};
+
+NewTask::NewTask(Executor& executor, YACS::ENGINE::Task* yacsTask)
+: _type()
+, _executor(executor)
+, _yacsTask(yacsTask)
+{
+  Container * yacsContainer = yacsTask->getContainer();
+  if(yacsContainer != nullptr && yacsTask->canAcceptImposedResource())
+  {
+    _type.ignoreResources = false;
+    _type.name = yacsContainer->getName();
+    std::string nb_procs_str = yacsContainer->getProperty("nb_parallel_procs");
+    float needed_cores = 0.0;
+    if(!nb_procs_str.empty())
+      needed_cores = std::stof(nb_procs_str);
+    _type.neededCores = needed_cores;
+  }
+  else
+  {
+    _type.ignoreResources = true;
+    _type.name = "test";
+    _type.neededCores = 0;
+  }
+  _type.id = 0;
+}
+
+const WorkloadManager::ContainerType& NewTask::type()const
+{
+  return _type;
+}
+
+void NewTask::run(const WorkloadManager::RunInfo& runInfo)
+{
+  _executor.loadTask(_yacsTask, runInfo);
+  _executor.makeDatastreamConnections(_yacsTask);
+  YACS::Event ev = _executor.runTask(_yacsTask);
+  _executor.endTask(_yacsTask, ev);
+  delete this; // provisoire
+}
+
+void Executor::newRun(Scheduler *graph,int debug, bool fromScratch)
+{
+  DEBTRACE("Executor::newRun debug: "<< graph->getName() <<" "<< debug<<" fromScratch: "<<fromScratch);
+  { // --- Critical section
+    YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+    _mainSched = graph;
+    _root = dynamic_cast<ComposedNode *>(_mainSched);
+    if (!_root) throw Exception("Executor::Run, Internal Error!");
+    _executorState = YACS::NOTYETINITIALIZED;
+    sendEvent("executor");
+    _toContinue=true;
+    _isOKToEnd = false;
+    _errorDetected = false;
+    _isWaitingEventsFromRunningTasks = false;
+    _numberOfRunningTasks = 0;
+    _runningTasks.clear();
+    _numberOfEndedTasks = 0;
+    string tracefile = "traceExec_";
+    tracefile += _mainSched->getName();
+    _trace.open(tracefile.c_str());
+    _start = std::chrono::steady_clock::now();
+  } // --- End of critical section
+
+  if (debug > 1) _displayDot(graph);
+
+  if (fromScratch)
+    {
+      try
+        {
+          graph->init();
+          graph->exUpdateState();
+        }
+      catch(Exception& ex)
+        {
+          DEBTRACE("exception: "<< (ex.what()));
+          _executorState = YACS::FINISHED;
+          sendEvent("executor");
+          throw;
+        }
+    }
+  _executorState = YACS::INITIALISED;
+  sendEvent("executor");
+
+  if (debug > 1) _displayDot(graph);
+
+  bool isMore;
+  int problemCount=0;
+  int numberAllTasks;
+
+  _executorState = YACS::RUNNING;
+  sendEvent("executor");
+
+  WorkloadManager::DefaultAlgorithm algo;
+  WorkloadManager::WorkloadManager wlm(algo);
+  loadResources(wlm);
+  wlm.start();
+
+  while (_toContinue)
+    {
+      DEBTRACE("--- executor main loop");
+      sleepWhileNoEventsFromAnyRunningTask();
+      DEBTRACE("--- events...");
+      if (debug > 2) _displayDot(graph);
+      { // --- Critical section
+        YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+        std::vector<Task *> readyTasks=graph->getNextTasks(isMore);
+        graph->selectRunnableTasks(readyTasks);
+        _tasks.clear();
+        for(Task * t : readyTasks)
+          if(_runningTasks.find(t) == _runningTasks.end())
+            _tasks.push_back(t);
+        // TODO: to be removed
+        FilterTasksConsideringContainers(_tasks);
+        numberAllTasks=_numberOfRunningTasks+_tasks.size();
+      } // --- End of critical section
+      if (debug > 2) _displayDot(graph);
+      DEBTRACE("--- events...");
+      if (_executorState == YACS::RUNNING)
+      {
+        if (checkBreakPoints()) break; // end of thread requested, OK to exit at once;
+        for(Task * task : _tasks)
+        {
+          beginTask(task);
+          NewTask* newTask = new NewTask(*this, task);
+          wlm.addTask(newTask);
+        }
+      }
+      if (debug > 1) _displayDot(graph);
+      { // --- Critical section
+        DEBTRACE("---");
+        YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+        //It is possible that the graph is finished but it remains running tasks (it's an error but we must take it into account)
+        _toContinue = !graph->isFinished();
+
+        DEBTRACE("_numberOfRunningTasks: " << _numberOfRunningTasks);
+        DEBTRACE("_numberOfEndedTasks: " << _numberOfEndedTasks);
+        DEBTRACE("_toContinue: " << _toContinue);
+        if(_toContinue && numberAllTasks==0)
+        {
+          //Problem : no running tasks and no task to launch ??
+          problemCount++;
+          std::cerr << "Problem in Executor : no running tasks and no task to launch ?? problemCount=" << problemCount << std::endl;
+          //Pause to give a chance to interrupt
+          usleep(1000);
+          if(problemCount > 25)
+          {
+            // Too much problems encountered : stop execution
+            _toContinue=false;
+          }
+        }
+
+        if (! _toContinue)
+          {
+            _executorState = YACS::FINISHED;
+            sendEvent("executor");
+            _condForPilot.notify_all();
+          }
+      } // --- End of critical section
+      if (debug > 0) _displayDot(graph);
+      DEBTRACE("_toContinue: " << _toContinue);
+    }
+
+  wlm.stop();
+  DEBTRACE("End of main Loop");
+
+  { // --- Critical section
+    YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+    if ( _toContinue) // --- break while(): request to stop detected on checkBreakPoints()
+      {
+        DEBTRACE("stop requested: End soon");
+        _executorState = YACS::STOPPED;
+        _toContinue = false;
+        sendEvent("executor");
+      }
+  } // --- End of critical section
+  if ( _dumpOnErrorRequested && _errorDetected)
+    {
+      saveState(_dumpErrorFile);
+    }
+  {
+    YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForTrace);
+    _trace.close();
+  }
+  DEBTRACE("End of RunB thread");  
+}
+
+void Executor::RunW(Scheduler *graph,int debug, bool fromScratch)
+{
+  std::string str_value = graph->getProperty("executor");
+  if(str_value == "WorkloadManager"
+     || str_value == "WORKLOADMANAGER"
+     || str_value == "workloadmanager"
+     || str_value == "WorkLoadManager")
+    newRun(graph, debug, fromScratch);
+  else
+    RunB(graph, debug, fromScratch);
+}