]> SALOME platform Git repositories - modules/yacs.git/blob - src/engine/Executor.cxx
Salome HOME
Work in progress : workload manager step 1
[modules/yacs.git] / src / engine / Executor.cxx
1 // Copyright (C) 2006-2019  CEA/DEN, EDF R&D
2 //
3 // This library is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU Lesser General Public
5 // License as published by the Free Software Foundation; either
6 // version 2.1 of the License, or (at your option) any later version.
7 //
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
11 // Lesser General Public License for more details.
12 //
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
16 //
17 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
18 //
19
20 #include "Executor.hxx"
21 #include "Task.hxx"
22 #include "AutoLocker.hxx"
23 #include "Scheduler.hxx"
24 #include "Dispatcher.hxx"
25 #include "Container.hxx"
26 #include "HomogeneousPoolContainer.hxx"
27 #include "ComponentInstance.hxx"
28
29 #include "VisitorSaveState.hxx"
30 #include "ServiceNode.hxx"
31 #include "ComposedNode.hxx"
32
33 #include <iostream>
34 #include <fstream>
35 #include <sys/stat.h>
36 #ifndef WIN32
37 #include <sys/time.h>
38 #include <unistd.h>
39 #endif
40
41 #include <cstdlib>
42 #include <algorithm>
43
44 #ifdef WIN32
45 #define usleep(A) _sleep(A/1000)
46 #if !defined(S_ISCHR) || !defined(S_ISREG)
47 #  ifndef S_IFMT
48 #    ifdef _S_IFMT
49 #      define S_IFMT _S_IFMT
50 #      define S_IFCHR _S_IFCHR
51 #      define S_IFREG _S_IFREG
52 #    else
53 #    ifdef __S_IFMT
54 #      define S_IFMT __S_IFMT
55 #      define S_IFCHR __S_IFCHR
56 #      define S_IFREG __S_IFREG
57 #    endif
58 #    endif
59 #  endif
60 #  define S_ISCHR(mode) (((mode) & S_IFMT) == S_IFCHR)
61 #  define S_ISREG(mode) (((mode) & S_IFMT) == S_IFREG)
62 #endif
63 #endif
64
65 using namespace YACS::ENGINE;
66 using namespace std;
67
68 using YACS::BASES::Mutex;
69 using YACS::BASES::Thread;
70 using YACS::BASES::Semaphore;
71
72 //#define _DEVDEBUG_
73 #include "YacsTrace.hxx"
74
75 int Executor::_maxThreads(1000);
76 size_t Executor::_threadStackSize(262144); // Default thread stack size is 256 kB == 2**18 because threads launched by YACS are lightweight
77
78 Executor::Executor():_mainSched(NULL),_isWaitingEventsFromRunningTasks(false),_nbOfConcurrentThreads(0), _semForMaxThreads(_maxThreads),_keepGoingOnFail(false),_DPLScopeSensitive(false)
79 {
80   _root=0;
81   _toContinue = true;
82   _isOKToEnd = false;
83   _stopOnErrorRequested = false;
84   _dumpOnErrorRequested = false;
85   _errorDetected = false;
86   _isRunningunderExternalControl=false;
87   _executorState = YACS::NOTYETINITIALIZED;
88   _execMode = YACS::CONTINUE;
89   _semThreadCnt = _maxThreads;
90   _numberOfRunningTasks = 0;
91   _numberOfEndedTasks = 0;
92   DEBTRACE("Executor initialized with max threads = " << _maxThreads);
93 }
94
95 Executor::~Executor()
96 {
97   for(list<Thread *>::iterator iter=_groupOfAllThreadsCreated.begin();iter!=_groupOfAllThreadsCreated.end();iter++)
98     delete *iter;
99 }
100
101 //! Execute a graph waiting for completion
102 /*!
103  *  \param graph : schema to execute
104  *  \param debug : display the graph with dot if debug == 1
105  *  \param fromScratch : if true the graph is reinitialized
106  *
107  *  Calls Scheduler::getNextTasks and Scheduler::selectRunnableTasks to select tasks to execute
108  *  
109  *  Calls Executor::launchTask to execute a selected Task.
110  *
111  *  Completion when graph is finished (Scheduler::isFinished)
112  */
113
114 void Executor::RunA(Scheduler *graph,int debug, bool fromScratch)
115 {
116   DEBTRACE("Executor::RunW debug: " << debug << " fromScratch: " << fromScratch);
117   _mainSched=graph;
118   _root = dynamic_cast<ComposedNode *>(_mainSched);
119   if (!_root) throw Exception("Executor::Run, Internal Error!");
120   bool isMore;
121   int i=0;
122   if(debug>1)_displayDot(graph);
123   if (fromScratch)
124     {
125       graph->init();
126       graph->exUpdateState();
127     }
128   if(debug>1)_displayDot(graph);
129   vector<Task *> tasks;
130   vector<Task *>::iterator iter;
131   _toContinue=true;
132   _execMode = YACS::CONTINUE;
133   _isWaitingEventsFromRunningTasks = false;
134   _numberOfRunningTasks = 0;
135   _runningTasks.clear();
136   _numberOfEndedTasks=0;
137   while(_toContinue)
138     {
139       sleepWhileNoEventsFromAnyRunningTask();
140
141       if(debug>2)_displayDot(graph);
142
143       {//Critical section
144         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
145         tasks=graph->getNextTasks(isMore);
146         graph->selectRunnableTasks(tasks);
147       }//End of critical section
148
149       if(debug>2)_displayDot(graph);
150
151       for(iter=tasks.begin();iter!=tasks.end();iter++)
152         loadTask(*iter,this);
153
154       if(debug>1)_displayDot(graph);
155
156       launchTasks(tasks);
157
158       if(debug>1)_displayDot(graph);
159
160       {//Critical section
161         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
162         _toContinue=!graph->isFinished();
163       }//End of critical section
164       DEBTRACE("_toContinue: " << _toContinue);
165
166       if(debug>0)_displayDot(graph);
167
168       i++;
169     }
170 }
171
172 //! Execute a graph with breakpoints or step by step
173 /*!
174  *  To be launch in a thread (main thread controls the progression).
175  *  \param graph : schema to execute
176  *  \param debug : display the graph with dot if debug >0
177  *  \param fromScratch : if false, state from a previous partial exection is already loaded
178  *
179  *  Calls Scheduler::getNextTasks and Scheduler::selectRunnableTasks to select tasks to execute
180  *  
181  *  Calls Executor::checkBreakPoints to verify if a pause is requested 
182  * 
183  *  Calls Executor::launchTask to execute a selected Task
184  *
185  *  Completion when graph is finished (Scheduler::isFinished)
186  *
187  *  States of execution:
188  *  - YACS::NOTYETINITIALIZED
189  *  - YACS::INITIALISED
190  *  - YACS::RUNNING            (to next breakpoint or step)
191  *  - YACS::WAITINGTASKS       (a breakpoint or step as been reached, but there are still running tasks)
192  *  - YACS::PAUSED             (a breakpoint or step as been reached, no more running tasks)
193  *  - YACS::FINISHED           (no more ready tasks, nore running tasks)
194  *  - YACS::STOPPED            (stopped by user before end)
195  *
196  *  Modes of Execution:
197  *  - YACS::CONTINUE           (normal run without breakpoints)
198  *  - YACS::STEPBYSTEP         (pause at each loop)
199  *  - YACS::STOPBEFORENODES    (pause when a node is reached)
200  *
201  *  A breakpoint is defined by a node name. The breakpoint is reached when the node becomes ready.
202  *  Step by Step means execution node by node or group of node by group of nodes.
203  *  At a given step, the user decides to launch all the ready nodes or only a subset
204  *  (Caution: some nodes must run in parallel). 
205  *  The next event (end of task) may give a new set of ready nodes, and define a new step.
206  *
207  *  The graph execution may be controled by a pilot which sends requests. Requests are asynchronous.
208  *  Requests are taken into account only on certain states, otherwise return the status IgnoredRequest.
209  *  - Executor::getCurrentExecMode
210  *  - Executor::getExecutorState
211  *  - Executor::setExecMode             : change the execution mode for next loop
212  *  - Executor::setListOfBreakPoints    : must be set before setting YACS::STOPBEFORENODES
213  *  - Executor::getTasksToLoad          : when paused or waiting tasks, get the list of next tasks
214  *  - Executor::setStepsToExecute       : define a subset of the list given by Executor::getTasksToLoad
215  *  - Executor::resumeCurrentBreakPoint : when paused or waiting tasks, resumes execution
216  *  - Executor::isNotFinished
217  *  - Executor::stopExecution           : stop execution asap, i.e. set STEPBYSTEP and wait PAUSED
218  *  - Executor::saveState               : dump the current state of execution in an xml file
219  *  - Executor::loadState               : Not yet implemented
220  *  - Executor::getNbOfThreads
221  *  - Executor::displayDot
222  *  - Executor::setStopOnError          : ask to stop execution if a node is found in ERROR state
223  *
224  *  If the pilot wants to wait the state YACS::PAUSED or YACS::WAITINGTASKS, synchronisation is obtained with:
225  *  - Executor::waitPause
226  *
227  *  TO BE VALIDATED:
228  *  - Pilot may connect to executor during execution, or deconnect.
229  *  - Several Pilots may be connected at the same time (for observation...)
230  * 
231  */
232
233 void Executor::RunB(Scheduler *graph,int debug, bool fromScratch)
234 {
235   DEBTRACE("Executor::RunB debug: "<< graph->getName() <<" "<< debug<<" fromScratch: "<<fromScratch);
236
237   { // --- Critical section
238     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
239     _mainSched = graph;
240     _root = dynamic_cast<ComposedNode *>(_mainSched);
241     if (!_root) throw Exception("Executor::Run, Internal Error!");
242     _executorState = YACS::NOTYETINITIALIZED;
243     sendEvent("executor");
244     _toContinue=true;
245     _isOKToEnd = false;
246     _errorDetected = false;
247     _isWaitingEventsFromRunningTasks = false;
248     _numberOfRunningTasks = 0;
249     _runningTasks.clear();
250     _numberOfEndedTasks = 0;
251     string tracefile = "traceExec_";
252     tracefile += _mainSched->getName();
253     _trace.open(tracefile.c_str());
254     _start = std::chrono::steady_clock::now();
255   } // --- End of critical section
256
257   if (debug > 1) _displayDot(graph);
258
259   if (fromScratch)
260     {
261       try
262         {
263           graph->init();
264           graph->exUpdateState();
265         }
266       catch(Exception& ex)
267         {
268           DEBTRACE("exception: "<< (ex.what()));
269           _executorState = YACS::FINISHED;
270           sendEvent("executor");
271           throw;
272         }
273     }
274   _executorState = YACS::INITIALISED;
275   sendEvent("executor");
276
277   if (debug > 1) _displayDot(graph);
278
279   vector<Task *>::iterator iter;
280   bool isMore;
281   int problemCount=0;
282   int numberAllTasks;
283
284   _executorState = YACS::RUNNING;
285   sendEvent("executor");
286   while (_toContinue)
287     {
288       DEBTRACE("--- executor main loop");
289       sleepWhileNoEventsFromAnyRunningTask();
290       DEBTRACE("--- events...");
291       if (debug > 2) _displayDot(graph);
292       { // --- Critical section
293         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
294         _tasks=graph->getNextTasks(isMore);
295         graph->selectRunnableTasks(_tasks);
296         FilterTasksConsideringContainers(_tasks);
297         numberAllTasks=_numberOfRunningTasks+_tasks.size();
298       } // --- End of critical section
299       if (debug > 2) _displayDot(graph);
300       if (_executorState == YACS::RUNNING)
301         {
302           if (checkBreakPoints()) break; // end of thread requested, OK to exit at once;
303           if (debug > 0) _displayDot(graph);
304           DEBTRACE("---");
305           loadParallelTasks(_tasks,this);
306           if (debug > 1) _displayDot(graph);
307           DEBTRACE("---");
308           launchTasks(_tasks);
309           DEBTRACE("---");
310         }
311       if (debug > 1) _displayDot(graph);
312       { // --- Critical section
313         DEBTRACE("---");
314         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
315         //It is possible that the graph is finished but it remains running tasks (it's an error but we must take it into account)
316         if(_numberOfRunningTasks == 0)
317           _toContinue = !graph->isFinished();
318
319         DEBTRACE("_numberOfRunningTasks: " << _numberOfRunningTasks);
320         DEBTRACE("_numberOfEndedTasks: " << _numberOfEndedTasks);
321         DEBTRACE("_toContinue: " << _toContinue);
322         if(_toContinue && numberAllTasks==0)
323           {
324             //Problem : no running tasks and no task to launch ??
325             problemCount++;
326             std::cerr << "Problem in Executor : no running tasks and no task to launch ?? problemCount=" << problemCount << std::endl;
327             //Pause to give a chance to interrupt
328             usleep(1000);
329             if(problemCount > 25)
330               {
331                 // Too much problems encountered : stop execution
332                 _toContinue=false;
333               }
334           }
335
336         if (! _toContinue)
337           {
338             _executorState = YACS::FINISHED;
339             sendEvent("executor");
340             _condForPilot.notify_all();
341           }
342       } // --- End of critical section
343       if (debug > 0) _displayDot(graph);
344       DEBTRACE("_toContinue: " << _toContinue);
345     }
346
347   DEBTRACE("End of main Loop");
348
349   { // --- Critical section
350     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
351     if ( _toContinue) // --- break while(): request to stop detected on checkBreakPoints()
352       {
353         DEBTRACE("stop requested: End soon");
354         _executorState = YACS::STOPPED;
355         _toContinue = false;
356         sendEvent("executor");
357       }
358   } // --- End of critical section
359   if ( _dumpOnErrorRequested && _errorDetected)
360     {
361       saveState(_dumpErrorFile);
362     }
363   {
364     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForTrace);
365     _trace.close();
366   }
367   DEBTRACE("End of RunB thread");  
368 }
369
370 YACS::ExecutionMode Executor::getCurrentExecMode()
371 {
372   _isRunningunderExternalControl=true;
373   return _execMode;
374 }
375
376
377 YACS::ExecutorState Executor::getExecutorState()
378 {
379   _isRunningunderExternalControl=true;
380   return _executorState;
381 }
382
383
384 bool Executor::isNotFinished()
385 {
386   _isRunningunderExternalControl=true;
387   return _toContinue;
388 }
389
390 //! ask to stop execution on the first node found in error
391 /*!
392  * \param dumpRequested   produce a state dump when an error is found
393  * \param xmlFile         name of file used for state dump
394  */
395
396 void Executor::setStopOnError(bool dumpRequested, std::string xmlFile)
397 {
398   { // --- Critical section
399     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
400     _dumpErrorFile=xmlFile;
401     _stopOnErrorRequested=true;
402     _dumpOnErrorRequested = dumpRequested;
403     if (dumpRequested && xmlFile.empty())
404       throw YACS::Exception("dump on error requested and no filename given for dump");
405     DEBTRACE("_dumpErrorFile " << _dumpErrorFile << " " << _dumpOnErrorRequested);
406   } // --- End of critical section
407 }
408
409 //! ask to do not stop execution on nodes found in error
410 /*!
411  */
412
413 void Executor::unsetStopOnError()
414 {
415   { // --- Critical section
416     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
417     _stopOnErrorRequested=false;
418   } // --- End of critical section
419 }
420
421 //! Dynamically set the current mode of execution
422 /*!
423  * The mode can be Continue, step by step, or stop before execution of a node
424  * defined in a list of breakpoints.
425  */
426
427 void Executor::setExecMode(YACS::ExecutionMode mode)
428 {
429   DEBTRACE("Executor::setExecMode(YACS::ExecutionMode mode) " << mode);
430   { // --- Critical section
431     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
432     _isRunningunderExternalControl=true;
433     _execMode = mode;
434   } // --- End of critical section
435 }
436
437 //! wake up executor when in pause
438 /*!
439  * When Executor is in state paused or waiting for task completion, the thread
440  * running loop RunB waits on condition _condForStepByStep.
441  * Thread RunB is waken up.
442  * \return true when actually wakes up executor
443  */
444
445 bool Executor::resumeCurrentBreakPoint()
446 {
447   DEBTRACE("Executor::resumeCurrentBreakPoint()");
448   bool ret = false;
449   //bool doDump = false;
450   { // --- Critical section
451     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
452     _isRunningunderExternalControl=true;
453     DEBTRACE("_executorState: " << _executorState);
454     switch (_executorState)
455       {
456       case YACS::WAITINGTASKS:
457       case YACS::PAUSED:
458         {
459           _condForStepByStep.notify_all();
460           _executorState = YACS::RUNNING;
461           sendEvent("executor");
462           ret = true;
463           //if (_dumpOnErrorRequested && _errorDetected) doDump =true;
464           break;
465         }
466       case YACS::FINISHED:
467       case YACS::STOPPED:
468         {
469           //if (_dumpOnErrorRequested && _errorDetected) doDump =true;
470           DEBTRACE("Graph Execution finished or stopped !");
471           break;
472         }
473       default :
474         {
475           // debug: no easy way to verify if main loop is acutally waiting on condition
476         }
477       }
478     DEBTRACE("---");
479     //if (doDump) saveState(_dumpErrorFile);
480   } // --- End of critical section
481   return ret;
482 }
483
484
485 //! define a list of nodes names as breakpoints in the graph
486
487
488 void Executor::setListOfBreakPoints(std::list<std::string> listOfBreakPoints)
489 {
490   DEBTRACE("Executor::setListOfBreakPoints(std::list<std::string> listOfBreakPoints)");
491   { // --- Critical section
492     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
493     _isRunningunderExternalControl=true;
494     _listOfBreakPoints = listOfBreakPoints;
495   } // --- End of critical section
496 }
497
498
499 //! Get the list of tasks to load, to define a subset to execute in step by step mode
500 /*!
501  *  If the executor is not in mode YACS::WAITINGTASKS nor YACS::PAUSED, the list is empty.
502  *  Use Executor::waitPause to wait.
503  */
504 std::list<std::string> Executor::getTasksToLoad()
505 {
506   DEBTRACE("Executor::getTasksToLoad()");
507   list<string> listOfNodesToLoad;
508   listOfNodesToLoad.clear();
509   { // --- Critical section
510     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
511     _isRunningunderExternalControl=true;
512     switch (_executorState)
513       {
514       case YACS::WAITINGTASKS:
515       case YACS::PAUSED:
516         {
517           listOfNodesToLoad = _listOfTasksToLoad;
518           break;
519         }
520       case YACS::NOTYETINITIALIZED:
521       case YACS::INITIALISED:
522       case YACS::RUNNING:
523       case YACS::FINISHED:
524       case YACS::STOPPED:
525       default:
526         {
527           break;
528         }
529       }
530   } // --- End of critical section
531   return listOfNodesToLoad;
532 }
533
534
535 //! Define a subset of task to execute in step by step mode
536 /*!
537  * Behaviour is unpredictable if the list is not a subset of the list given by Executor::getTasksToLoad
538  * in the current step.
539  * If some nodes must run in parallel, they must stay together in the list.
540  */
541
542 bool Executor::setStepsToExecute(std::list<std::string> listToExecute)
543 {
544   DEBTRACE("Executor::setStepsToExecute(std::list<std::string> listToExecute)");
545   bool ret = true;
546   vector<Task *>::iterator iter;
547   vector<Task *> restrictedTasks;
548   { // --- Critical section
549     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
550     _isRunningunderExternalControl=true;
551     switch (_executorState)
552       {
553       case YACS::WAITINGTASKS:
554       case YACS::PAUSED:
555         {
556           for (iter=_tasksSave.begin(); iter!=_tasksSave.end(); iter++)
557             {
558               string readyNode = _mainSched->getTaskName(*iter);
559               if (find(listToExecute.begin(), listToExecute.end(), readyNode)
560                   != listToExecute.end())
561                 {
562                   restrictedTasks.push_back(*iter);
563                   DEBTRACE("node to execute " << readyNode);
564                 }
565             }
566           _tasks.clear();
567           for (iter=restrictedTasks.begin(); iter!=restrictedTasks.end(); iter++)
568             {
569               _tasks.push_back(*iter);
570             }
571           break;
572         }
573       case YACS::NOTYETINITIALIZED:
574       case YACS::INITIALISED:
575       case YACS::RUNNING:
576       case YACS::FINISHED:
577       case YACS::STOPPED:
578       default:
579         {
580           break;
581         }
582       }
583     } // --- End of critical section
584
585   _tasks.clear();
586   for (iter=restrictedTasks.begin(); iter!=restrictedTasks.end(); iter++)
587     {
588       _tasks.push_back(*iter);
589     }
590   for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
591     {
592       string readyNode = _mainSched->getTaskName(*iter);
593       DEBTRACE("selected node to execute " << readyNode);
594     }
595
596   return ret;
597 }
598
599 //! suspend pilot execution until Executor is in pause or waiting tasks completion mode.
600 /*!
601  *  Do nothing if execution is finished or in pause.
602  *  Wait first step if Executor is running or in initialization.
603  */
604
605 void Executor::waitPause()
606 {
607   DEBTRACE("Executor::waitPause()" << _executorState);
608   { // --- Critical section
609     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
610     _isRunningunderExternalControl=true;
611     switch (_executorState)
612       {
613       default:
614       case YACS::STOPPED:
615       case YACS::FINISHED:
616       case YACS::WAITINGTASKS:
617       case YACS::PAUSED:
618         {
619           break;
620         }
621       case YACS::NOTYETINITIALIZED:
622       case YACS::INITIALISED:
623       case YACS::RUNNING:
624         {
625           _condForPilot.wait(_mutexForSchedulerUpdate); // wait until executor is PAUSED or WAITINGTASKS
626           break;
627         }
628       }
629   } // --- End of critical section
630   DEBTRACE("---");
631 }
632
633 /*!
634  * This method can be called at any time simultaneously during a RunB call.
635  * This method will wait until the executor is locked in a consistent state of a running graph.
636  *
637  * This method is expected to be called in association with resume method.
638  * The returned parameter is expected to be transfered to resume method.
639  */
640 bool Executor::suspendASAP()
641 {
642   // no AutoLocker here. It's not a bug.
643   _mutexForSchedulerUpdate.lock();
644   if(!_toContinue && _executorState==YACS::FINISHED)
645     {// execution is finished
646       _mutexForSchedulerUpdate.unLock();
647       return false;// the executor is no more running
648     }
649   //general case. Leave method with locker in locked status
650   return true;
651 }
652
653 /*!
654  * This method is expected to be called in association with suspendASAP method.
655  * Expected to be called just after suspendASAP with output of resume as input parameter
656  */
657 void Executor::resume(bool suspended)
658 {
659   if(suspended)
660     _mutexForSchedulerUpdate.unLock();
661 }
662
663 //! stops the execution as soon as possible 
664
665 void Executor::stopExecution()
666 {
667   setExecMode(YACS::STEPBYSTEP);
668   //waitPause();
669   _isOKToEnd = true;
670   resumeCurrentBreakPoint();
671 }
672
673 //! save the current state of execution in an xml file
674
675 bool Executor::saveState(const std::string& xmlFile)
676 {
677   DEBTRACE("Executor::saveState() in " << xmlFile);
678   bool result = false;
679   try {
680     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
681     YACS::ENGINE::VisitorSaveState vst(_root);
682     vst.openFileDump(xmlFile.c_str());
683     _root->accept(&vst);
684     vst.closeFileDump();
685     result = true;
686   }
687   catch(Exception& ex) {
688     std::cerr << ex.what() << std::endl;
689   }
690   return result;
691 }
692
693 //! not yet implemented
694
695 bool Executor::loadState()
696 {
697   DEBTRACE("Executor::loadState()");
698   _isRunningunderExternalControl=true;
699   return true;
700 }
701
702
703 static int isfile(const char *filename) 
704 {
705   struct stat buf;
706   if (stat(filename, &buf) != 0)
707     return 0;
708   if (!S_ISREG(buf.st_mode))
709     return 0;
710   return 1;
711 }
712
713 //! Display the graph state as a dot display, public method
714
715 void Executor::displayDot(Scheduler *graph)
716 {
717   _isRunningunderExternalControl=true;
718   _displayDot(graph);
719 }
720
721 //! Display the graph state as a dot display
722 /*!
723  *  \param graph  : the node to display
724  */
725
726 void Executor::_displayDot(Scheduler *graph)
727 {
728    std::ofstream g("titi");
729    ((ComposedNode*)graph)->writeDot(g);
730    g.close();
731    const char displayScript[]="display.sh";
732    if(isfile(displayScript))
733      system("sh display.sh");
734    else
735      system("dot -Tpng titi|display -delay 5");
736 }
737
738 //! Wait reactivation in modes Step By step or with BreakPoints
739 /*!
740  *  Check mode of execution (set by main thread):
741  *  - YACS::CONTINUE        : the graph execution continues.
742  *  - YACS::STEPBYSTEP      : wait on condition (pilot thread, Executor::resumeCurrentBreakPoint)
743  *  - YACS::STOPBEFORENODES : if there are ready nodes in a list of breakpoints,
744  *                            wait on condition (pilot thread, Executor::resumeCurrentBreakPoint)
745  *                            else continue the graph execution.
746  *  \return true if end of executor thread is requested
747  */
748
749 bool Executor::checkBreakPoints()
750 {
751   DEBTRACE("Executor::checkBreakPoints()");
752   vector<Task *>::iterator iter;
753   bool endRequested = false;
754
755   switch (_execMode)
756     {
757     case YACS::CONTINUE:
758       {
759         break;
760       }
761     case YACS::STOPBEFORENODES:
762       {
763         bool stop = false;
764         { // --- Critical section
765           YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
766           _tasksSave = _tasks;
767           for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
768             {
769               string nodeToLoad = _mainSched->getTaskName(*iter);
770               if (find(_listOfBreakPoints.begin(), _listOfBreakPoints.end(), nodeToLoad)
771                   != _listOfBreakPoints.end())
772                 {
773                   stop = true;
774                   break;
775                 }
776             }
777           if (stop)
778             {
779               _listOfTasksToLoad.clear();
780               for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
781                 {
782                   string nodeToLoad = _mainSched->getTaskName(*iter);
783                   _listOfTasksToLoad.push_back(nodeToLoad);
784                 }
785               if (getNbOfThreads())
786                 _executorState = YACS::WAITINGTASKS; // will be paused after completion of running tasks
787               else
788                 _executorState = YACS::PAUSED;
789               sendEvent("executor");
790               _condForPilot.notify_all();
791             }
792           if (stop && !_isOKToEnd) waitResume(); // wait until pilot calls resumeCurrentBreakPoint(), mutex released during wait 
793           if (_isOKToEnd) endRequested = true;
794         } // --- End of critical section
795           if (stop) DEBTRACE("wake up from waitResume");
796         break;
797       }
798     default:
799     case YACS::STEPBYSTEP:
800       {
801         { // --- Critical section
802           YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
803           _tasksSave = _tasks;
804           _listOfTasksToLoad.clear();
805           for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
806             {
807               string nodeToLoad = _mainSched->getTaskName(*iter);
808               _listOfTasksToLoad.push_back(nodeToLoad);
809             }
810           if (getNbOfThreads())
811             _executorState = YACS::WAITINGTASKS; // will be paused after completion of running tasks
812           else
813             _executorState = YACS::PAUSED;
814           sendEvent("executor");
815           _condForPilot.notify_all();
816           if (!_isOKToEnd)
817             waitResume(); // wait until pilot calls resumeCurrentBreakPoint(), mutex released during wait
818                           // or, if no pilot, wait until no more running tasks (stop on error)
819           if (_isOKToEnd) endRequested = true;
820         } // --- End of critical section
821         DEBTRACE("wake up from waitResume");
822         break;
823       }
824     }
825   DEBTRACE("endRequested: " << endRequested);
826   return endRequested;
827 }
828
829
830 //! in modes Step By step or with BreakPoint, wait until pilot resumes the execution
831 /*!
832  *  With the condition Mutex, the mutex is released atomically during the wait.
833  *  Pilot calls Executor::resumeCurrentBreakPoint to resume execution.
834  *  Must be called while mutex is locked.
835  */
836
837 void Executor::waitResume()
838 {
839   DEBTRACE("Executor::waitResume()");
840   _condForStepByStep.wait(_mutexForSchedulerUpdate); // wait until pilot calls resumeCurrentBreakPoint()
841   DEBTRACE("---");
842 }
843
844
845 //! Perform loading of a Task.
846 /*!
847  *  \param task  : Task to load
848  */
849
850 void Executor::loadTask(Task *task, const Executor *execInst)
851 {
852   DEBTRACE("Executor::loadTask(Task *task)");
853   if(task->getState() != YACS::TOLOAD)
854     return;
855   traceExec(task, "state:TOLOAD", ComputePlacement(task));
856   {//Critical section
857     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
858     _mainSched->notifyFrom(task,YACS::START,execInst);
859   }//End of critical section
860   try
861     {
862       traceExec(task, "load", ComputePlacement(task));
863       task->load();
864       traceExec(task, "initService", ComputePlacement(task));
865       task->initService();
866     }
867   catch(Exception& ex) 
868     {
869       std::cerr << ex.what() << std::endl;
870       {//Critical section
871         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
872         task->aborted();
873         _mainSched->notifyFrom(task,YACS::ABORT,execInst);
874         traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
875       }//End of critical section
876     }
877   catch(...) 
878     {
879       std::cerr << "Load failed" << std::endl;
880       {//Critical section
881         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
882         task->aborted();
883         _mainSched->notifyFrom(task,YACS::ABORT,execInst);
884         traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
885       }//End of critical section
886     }
887 }
888
889 struct threadargs
890 {
891   Task *task;
892   Scheduler *sched;
893   Executor *execInst;
894 };
895
896 void Executor::loadParallelTasks(const std::vector<Task *>& tasks, const Executor *execInst)
897 {
898   std::vector<Thread> ths(tasks.size());
899   std::size_t ithread(0);
900   for(std::vector<Task *>::const_iterator iter = _tasks.begin(); iter != _tasks.end(); iter++, ithread++)
901     {
902       DEBTRACE("Executor::loadParallelTasks(Task *task)");
903       struct threadargs *args(new threadargs);
904       args->task = (*iter);
905       args->sched = _mainSched;
906       args->execInst = this;
907       ths[ithread].go(functionForTaskLoad, args, _threadStackSize);
908     }
909   for(ithread=0;ithread<tasks.size();ithread++)
910     ths[ithread].join();
911 }
912
913 //! Execute a list of tasks possibly connected through datastream links
914 /*!
915  *  \param tasks  : a list of tasks to execute
916  *
917  */
918 void Executor::launchTasks(const std::vector<Task *>& tasks)
919 {
920   //First phase, make datastream connections
921   for(vector<Task *>::const_iterator iter=tasks.begin();iter!=tasks.end();iter++)
922     {
923       YACS::StatesForNode state=(*iter)->getState();
924       if(state != YACS::TOLOAD && state != YACS::TORECONNECT)continue;
925       try
926         {
927           (*iter)->connectService();
928           traceExec(*iter, "connectService",ComputePlacement(*iter));
929           {//Critical section
930             YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
931             (*iter)->connected();
932           }//End of critical section
933         }
934       catch(Exception& ex) 
935         {
936           std::cerr << ex.what() << std::endl;
937           try
938             {
939               (*iter)->disconnectService();
940               traceExec(*iter, "disconnectService",ComputePlacement(*iter));
941             }
942           catch(...) 
943             {
944               // Disconnect has failed
945               traceExec(*iter, "disconnectService failed, ABORT",ComputePlacement(*iter));
946             }
947           {//Critical section
948             YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
949             (*iter)->aborted();
950             _mainSched->notifyFrom(*iter,YACS::ABORT,this);
951           }//End of critical section
952         }
953       catch(...) 
954         {
955           std::cerr << "Problem in connectService" << std::endl;
956           try
957             {
958               (*iter)->disconnectService();
959               traceExec(*iter, "disconnectService",ComputePlacement(*iter));
960             }
961           catch(...) 
962             {
963               // Disconnect has failed
964               traceExec(*iter, "disconnectService failed, ABORT",ComputePlacement(*iter));
965             }
966           {//Critical section
967             YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
968             (*iter)->aborted();
969             _mainSched->notifyFrom(*iter,YACS::ABORT,this);
970           }//End of critical section
971         }
972       if((*iter)->getState() == YACS::ERROR)
973         {
974           //try to put all coupled tasks in error
975           std::set<Task*> coupledSet;
976           (*iter)->getCoupledTasks(coupledSet);
977           for (std::set<Task*>::iterator it = coupledSet.begin(); it != coupledSet.end(); ++it)
978             {
979               Task* t=*it;
980               if(t == *iter)continue;
981               if(t->getState() == YACS::ERROR)continue;
982               try
983                 {
984                   t->disconnectService();
985                   traceExec(t, "disconnectService",ComputePlacement(*iter));
986                 }
987               catch(...)
988                 {
989                   // Disconnect has failed
990                   traceExec(t, "disconnectService failed, ABORT",ComputePlacement(*iter));
991                 }
992               {//Critical section
993                 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
994                 t->aborted();
995                 _mainSched->notifyFrom(t,YACS::ABORT,this);
996               }//End of critical section
997               traceExec(t, "state:"+Node::getStateName(t->getState()),ComputePlacement(*iter));
998             }
999         }
1000       traceExec(*iter, "state:"+Node::getStateName((*iter)->getState()),ComputePlacement(*iter));
1001     }
1002
1003   //Second phase, execute each task in a thread
1004   for(vector<Task *>::const_iterator iter=tasks.begin();iter!=tasks.end();iter++)
1005     {
1006       launchTask(*iter);
1007     }
1008 }
1009
1010 //! Execute a Task in a thread
1011 /*!
1012  *  \param task  : Task to execute
1013  *
1014  *  Calls Scheduler::notifyFrom of main node (_mainSched) to notify start
1015  *
1016  *  Calls Executor::functionForTaskExecution in Thread
1017  */
1018
1019 void Executor::launchTask(Task *task)
1020 {
1021   DEBTRACE("Executor::launchTask(Task *task)");
1022   struct threadargs *args;
1023   if(task->getState() != YACS::TOACTIVATE)return;
1024
1025   DEBTRACE("before _semForMaxThreads.wait " << _semThreadCnt);
1026   if(_semThreadCnt == 0)
1027     {
1028       // --- Critical section
1029       YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1030       //check if we have enough threads to run
1031       std::set<Task*> tmpSet=_runningTasks;
1032       std::set<Task*>::iterator it = tmpSet.begin();
1033       std::string status="running";
1034       std::set<Task*> coupledSet;
1035       while( it != tmpSet.end() )
1036         {
1037           Task* tt=*it;
1038           coupledSet.clear();
1039           tt->getCoupledTasks(coupledSet);
1040           status="running";
1041           for (std::set<Task*>::iterator iter = coupledSet.begin(); iter != coupledSet.end(); ++iter)
1042             {
1043               if((*iter)->getState() == YACS::TOACTIVATE)status="toactivate";
1044               tmpSet.erase(*iter);
1045             }
1046           if(status=="running")break;
1047           it = tmpSet.begin();
1048         }
1049
1050       if(status=="toactivate")
1051         {
1052           std::cerr << "WARNING: maybe you need more threads to run your schema (current value="<< _maxThreads << ")" << std::endl;
1053           std::cerr << "If it is the case, set the YACS_MAX_THREADS environment variable to a bigger value (export YACS_MAX_THREADS=xxx)" << std::endl;
1054         }
1055       // --- End of critical section
1056     }
1057
1058   _semForMaxThreads.wait();
1059   _semThreadCnt -= 1;
1060
1061   args= new threadargs;
1062   args->task = task;
1063   args->sched = _mainSched;
1064   args->execInst = this;
1065
1066   traceExec(task, "launch",ComputePlacement(task));
1067
1068   { // --- Critical section
1069     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1070     _numberOfRunningTasks++;
1071     _runningTasks.insert(task);
1072     task->begin(); //change state to ACTIVATED
1073   } // --- End of critical section
1074   Thread(functionForTaskExecution, args, _threadStackSize);
1075 }
1076
1077 //! wait until a running task ends
1078
1079 void Executor::sleepWhileNoEventsFromAnyRunningTask()
1080 {
1081   DEBTRACE("Executor::sleepWhileNoEventsFromAnyRunningTask()");
1082 //   _semForNewTasksToPerform.wait(); //----utiliser pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
1083   YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1084   if (_numberOfRunningTasks > 0 && _numberOfEndedTasks==0)
1085     {
1086       _isWaitingEventsFromRunningTasks = true;
1087       _condForNewTasksToPerform.wait(_mutexForSchedulerUpdate); // mutex released during wait
1088     }
1089   _numberOfEndedTasks=0;
1090   DEBTRACE("---");
1091 }
1092
1093 //! not implemented
1094
1095 void Executor::notifyEndOfThread(YACS::BASES::Thread *thread)
1096 {
1097   /*_mutexForNbOfConcurrentThreads.lock();
1098   _groupOfAllThreadsCreated.remove(thread);
1099   delete thread;
1100   _mutexForNbOfConcurrentThreads.unlock();*/
1101 }
1102
1103
1104 //! must be used protected by _mutexForSchedulerUpdate!
1105
1106 void Executor::wakeUp()
1107 {
1108   DEBTRACE("Executor::wakeUp() " << _isWaitingEventsFromRunningTasks);
1109   if (_isWaitingEventsFromRunningTasks)
1110     {
1111       _isWaitingEventsFromRunningTasks = false;
1112       _condForNewTasksToPerform.notify_all();
1113     }
1114   else
1115     _numberOfEndedTasks++;
1116 }
1117
1118 //! number of running tasks
1119
1120 int Executor::getNbOfThreads()
1121 {
1122   int ret;
1123   YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForNbOfConcurrentThreads);
1124   _isRunningunderExternalControl=true;
1125   ret = _groupOfAllThreadsCreated.size();
1126   return ret;
1127 }
1128
1129 /*!
1130  * This thread is NOT supposed to be detached !
1131  */
1132 void *Executor::functionForTaskLoad(void *arg)
1133 {
1134   DEBTRACE("Executor::functionForTaskLoad(void *arg)");
1135   struct threadargs *args = (struct threadargs *) arg;
1136   Task *task=args->task;
1137   Scheduler *sched=args->sched;
1138   Executor *execInst=args->execInst;
1139   delete args;
1140   execInst->loadTask(task,execInst);// no throw of this method - all throw are catched !
1141   return 0;
1142 }
1143
1144 //! Function to perform execution of a task in a thread
1145 /*!
1146  *  \param arg  : 3 elements (a Task, a Scheduler, an Executor)
1147  *
1148  *  Calls Task::execute
1149  *
1150  *  Calls Task::finished when the task is finished
1151  *
1152  *  Calls (notify with event YACS::FINISH) Scheduler::notifyFrom when the task is finished
1153  *
1154  *  Calls Executor::wakeUp and Executor::notifyEndOfThread
1155  */
1156
1157 void *Executor::functionForTaskExecution(void *arg)
1158 {
1159   DEBTRACE("Executor::functionForTaskExecution(void *arg)");
1160
1161   struct threadargs *args = (struct threadargs *) arg;
1162   Task *task=args->task;
1163   Scheduler *sched=args->sched;
1164   Executor *execInst=args->execInst;
1165   delete args;
1166   execInst->traceExec(task, "state:"+Node::getStateName(task->getState()),ComputePlacement(task));
1167
1168   Thread::detach();
1169
1170   // Execute task
1171
1172   if(execInst->getDPLScopeSensitive())
1173     {
1174       Node *node(dynamic_cast<Node *>(task));
1175       ComposedNode *gfn(dynamic_cast<ComposedNode *>(sched));
1176       if(node!=0 && gfn!=0)
1177         node->applyDPLScope(gfn);
1178     }
1179
1180   YACS::Event ev=YACS::FINISH;
1181   try
1182     {
1183       execInst->traceExec(task, "start execution",ComputePlacement(task));
1184       task->execute();
1185       execInst->traceExec(task, "end execution OK",ComputePlacement(task));
1186     }
1187   catch(Exception& ex)
1188     {
1189       std::cerr << "YACS Exception during execute" << std::endl;
1190       std::cerr << ex.what() << std::endl;
1191       ev=YACS::ABORT;
1192       string message = "end execution ABORT, ";
1193       message += ex.what();
1194       execInst->traceExec(task, message,ComputePlacement(task));
1195     }
1196   catch(...) 
1197     {
1198       // Execution has failed
1199       std::cerr << "Execution has failed: unknown reason" << std::endl;
1200       ev=YACS::ABORT;
1201       execInst->traceExec(task, "end execution ABORT, unknown reason",ComputePlacement(task));
1202     }
1203
1204   // Disconnect task
1205   try
1206     {
1207       DEBTRACE("task->disconnectService()");
1208       task->disconnectService();
1209       execInst->traceExec(task, "disconnectService",ComputePlacement(task));
1210     }
1211   catch(...) 
1212     {
1213       // Disconnect has failed
1214       std::cerr << "disconnect has failed" << std::endl;
1215       ev=YACS::ABORT;
1216       execInst->traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
1217     }
1218   //
1219
1220   std::string placement(ComputePlacement(task));
1221
1222   // container management for HomogeneousPoolOfContainer
1223
1224   HomogeneousPoolContainer *contC(dynamic_cast<HomogeneousPoolContainer *>(task->getContainer()));
1225   if(contC)
1226     {
1227       std::lock_guard<std::mutex> alckCont(contC->getLocker());
1228       contC->release(task);
1229     }
1230
1231   DEBTRACE("End task->execute()");
1232   { // --- Critical section
1233     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&execInst->_mutexForSchedulerUpdate);
1234     try
1235       {
1236         if (ev == YACS::FINISH) task->finished();
1237         if (ev == YACS::ABORT)
1238           {
1239             execInst->_errorDetected = true;
1240             if (execInst->_stopOnErrorRequested)
1241               {
1242                 execInst->_execMode = YACS::STEPBYSTEP;
1243                 execInst->_isOKToEnd = true;
1244               }
1245             task->aborted();
1246           }
1247         execInst->traceExec(task, "state:"+Node::getStateName(task->getState()),placement);
1248         sched->notifyFrom(task,ev,execInst);
1249       }
1250     catch(Exception& ex)
1251       {
1252         //notify has failed : it is supposed to have set state
1253         //so no need to do anything
1254         std::cerr << "Error during notification" << std::endl;
1255         std::cerr << ex.what() << std::endl;
1256       }
1257     catch(...)
1258       {
1259         //notify has failed : it is supposed to have set state
1260         //so no need to do anything
1261         std::cerr << "Notification failed" << std::endl;
1262       }
1263     execInst->_numberOfRunningTasks--;
1264     execInst->_runningTasks.erase(task);
1265     DEBTRACE("_numberOfRunningTasks: " << execInst->_numberOfRunningTasks 
1266              << " _execMode: " << execInst->_execMode
1267              << " _executorState: " << execInst->_executorState);
1268     if ((execInst->_numberOfRunningTasks == 0) && (execInst->_execMode != YACS::CONTINUE)) // no more running tasks
1269       {
1270         if (execInst->_executorState == YACS::WAITINGTASKS)
1271           {
1272             execInst->_executorState = YACS::PAUSED;
1273             execInst->sendEvent("executor");
1274             execInst->_condForPilot.notify_all();
1275             if (execInst->_errorDetected &&
1276                 execInst->_stopOnErrorRequested &&
1277                 !execInst->_isRunningunderExternalControl)
1278               execInst->_condForStepByStep.notify_all(); // exec thread may be on waitResume
1279           }
1280       }
1281     DEBTRACE("before _semForMaxThreads.post " << execInst->_semThreadCnt);
1282     execInst->_semForMaxThreads.post();
1283     execInst->_semThreadCnt += 1;
1284     DEBTRACE("after _semForMaxThreads.post " << execInst->_semThreadCnt);
1285     if (execInst->_executorState != YACS::PAUSED)  execInst->wakeUp();
1286
1287   } // --- End of critical section (change state)
1288
1289   //execInst->notifyEndOfThread(0);
1290   Thread::exit(0);
1291   return 0;
1292 }
1293
1294 void Executor::traceExec(Task *task, const std::string& message, const std::string& placement)
1295 {
1296   string nodeName = _mainSched->getTaskName(task);
1297   Container *cont = task->getContainer();
1298   string containerName = "---";
1299   if (cont)
1300     containerName = cont->getName();
1301
1302   std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now();
1303   std::chrono::milliseconds millisec;
1304   millisec = std::chrono::duration_cast<std::chrono::milliseconds>(now -_start);
1305   double elapse = double(millisec.count()) / 1000.0;
1306   {
1307     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForTrace);
1308     _trace << elapse << " " << containerName << " " << placement << " " << nodeName << " " << message << endl;
1309     _trace << flush;
1310   }
1311 }
1312
1313 //! emit notification to all observers registered with  the dispatcher 
1314 /*!
1315  * The dispatcher is unique and can be obtained by getDispatcher()
1316  */
1317 void Executor::sendEvent(const std::string& event)
1318 {
1319   Dispatcher* disp=Dispatcher::getDispatcher();
1320   YASSERT(disp);
1321   YASSERT(_root);
1322   disp->dispatch(_root,event);
1323 }
1324
1325 struct HPCCompare
1326 {
1327   bool operator()(HomogeneousPoolContainer * lhs, HomogeneousPoolContainer * rhs) const
1328   {
1329     if(!lhs && !rhs)
1330       return false;
1331     if(!lhs)
1332       return true;
1333     if(!rhs)
1334       return false;
1335     return lhs->getNumberOfCoresPerWorker() < rhs->getNumberOfCoresPerWorker();
1336   }
1337 };
1338
1339 /*!
1340  * This method takes in input a list of tasks and selects from that lists a part of it considering only the containers.
1341  * If tasks have no container instance subclass of HomogeneousPoolContainer this method will let the \a tsks untouched.
1342  *
1343  * \param [in,out] tsks - list of tasks to be
1344  */
1345 void Executor::FilterTasksConsideringContainers(std::vector<Task *>& tsks)
1346 {
1347   std::map<HomogeneousPoolContainer *, std::vector<Task *>, HPCCompare > m;
1348   for(auto cur : tsks)
1349     {
1350       if(!cur)
1351         continue;
1352       Container *cont(cur->getContainer());
1353       if(!cont)
1354         {
1355           m[nullptr].push_back(cur);
1356           continue;
1357         }
1358       HomogeneousPoolContainer *contC(dynamic_cast<HomogeneousPoolContainer *>(cont));
1359       if(!contC)
1360         {
1361           m[nullptr].push_back(cur);
1362           continue;
1363         }
1364       m[contC].push_back(cur);
1365     }
1366   //
1367   std::vector<Task *> ret;
1368   for(auto it : m)
1369     {
1370       HomogeneousPoolContainer *curhpc(it.first);
1371       const std::vector<Task *>& curtsks(it.second);
1372       if(!curhpc)
1373         {
1374           ret.insert(ret.end(),curtsks.begin(),curtsks.end());
1375         }
1376       else
1377         {
1378           // start of critical section for container curhpc
1379           std::lock_guard<std::mutex> alckCont(curhpc->getLocker());
1380           std::vector<const Task *> vecOfTaskSharingSameHPContToBeRunSimutaneously;
1381           std::size_t sz(curhpc->getNumberOfFreePlace());
1382           std::vector<Task *>::const_iterator it2(curtsks.begin());
1383           for(std::size_t i=0;i<sz && it2!=curtsks.end();i++,it2++)
1384             {
1385               vecOfTaskSharingSameHPContToBeRunSimutaneously.push_back(*it2);
1386               ret.push_back(*it2);
1387             }
1388           curhpc->allocateFor(vecOfTaskSharingSameHPContToBeRunSimutaneously);
1389           //end of critical section
1390         }
1391     }
1392   //
1393   tsks=ret;
1394 }
1395
1396 std::string Executor::ComputePlacement(Task *zeTask)
1397 {
1398   std::string placement("---");
1399   if(!zeTask)
1400     return placement;
1401   if(zeTask->getContainer())
1402     placement=zeTask->getContainer()->getFullPlacementId(zeTask);
1403   return placement;
1404 }
1405
1406 ///////// NEW EXECUTOR ////////////////////////////////
1407 void Executor::loadTask(Task *task)
1408 {
1409   if(task->getState() != YACS::TOLOAD)
1410     return;
1411   traceExec(task, "state:TOLOAD", ComputePlacement(task));
1412   {//Critical section
1413     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1414     _mainSched->notifyFrom(task,YACS::START,this);
1415   }//End of critical section
1416   try
1417     {
1418       traceExec(task, "load", ComputePlacement(task));
1419       task->load();
1420       traceExec(task, "initService", ComputePlacement(task));
1421       task->initService();
1422     }
1423   catch(Exception& ex) 
1424     {
1425       std::cerr << ex.what() << std::endl;
1426       {//Critical section
1427         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1428         task->aborted();
1429         _mainSched->notifyFrom(task,YACS::ABORT, this);
1430         traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
1431       }//End of critical section
1432     }
1433   catch(...) 
1434     {
1435       std::cerr << "Load failed" << std::endl;
1436       {//Critical section
1437         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1438         task->aborted();
1439         _mainSched->notifyFrom(task,YACS::ABORT, this);
1440         traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
1441       }//End of critical section
1442     }
1443 }
1444
1445 void Executor::beginTask(Task *task)
1446 {
1447   // --- Critical section
1448   YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1449   _numberOfRunningTasks++;
1450   _runningTasks.insert(task);
1451   // --- End of critical section
1452 }
1453
1454 void Executor::endTask(Task *task, YACS::Event ev)
1455 {
1456   YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1457   try
1458   {
1459     if (ev == YACS::FINISH) task->finished();
1460     if (ev == YACS::ABORT)
1461     {
1462       _errorDetected = true;
1463       if (_stopOnErrorRequested)
1464       {
1465         _execMode = YACS::STEPBYSTEP;
1466         _isOKToEnd = true;
1467       }
1468       task->aborted();
1469     }
1470     //traceExec(task, "state:"+Node::getStateName(task->getState()),placement);
1471     _mainSched->notifyFrom(task,ev,this);
1472   }
1473   catch(Exception& ex)
1474   {
1475     //notify has failed : it is supposed to have set state
1476     //so no need to do anything
1477     std::cerr << "Error during notification" << std::endl;
1478     std::cerr << ex.what() << std::endl;
1479   }
1480   catch(...)
1481   {
1482     //notify has failed : it is supposed to have set state
1483     //so no need to do anything
1484     std::cerr << "Notification failed" << std::endl;
1485   }
1486   _numberOfRunningTasks--;
1487   _runningTasks.erase(task);
1488   DEBTRACE("_numberOfRunningTasks: " << _numberOfRunningTasks 
1489             << " _execMode: " << _execMode
1490             << " _executorState: " << _executorState);
1491   if ((_numberOfRunningTasks == 0) && (_execMode != YACS::CONTINUE)) // no more running tasks
1492     {
1493       if (_executorState == YACS::WAITINGTASKS)
1494         {
1495           _executorState = YACS::PAUSED;
1496           sendEvent("executor");
1497           _condForPilot.notify_all();
1498           if (_errorDetected &&
1499               _stopOnErrorRequested &&
1500               !_isRunningunderExternalControl)
1501             _condForStepByStep.notify_all(); // exec thread may be on waitResume
1502         }
1503     }
1504   if (_executorState != YACS::PAUSED)
1505     wakeUp();
1506 }
1507
1508 YACS::Event  Executor::runTask(Task *task)
1509 {
1510   { // --- Critical section
1511     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1512     task->begin(); //change state to ACTIVATED
1513   }
1514   traceExec(task, "state:"+Node::getStateName(task->getState()),ComputePlacement(task));
1515
1516   if(getDPLScopeSensitive())
1517     {
1518       Node *node(dynamic_cast<Node *>(task));
1519       ComposedNode *gfn(dynamic_cast<ComposedNode *>(_mainSched));
1520       if(node!=0 && gfn!=0)
1521         node->applyDPLScope(gfn);
1522     }
1523
1524   YACS::Event ev=YACS::FINISH;
1525   try
1526     {
1527       traceExec(task, "start execution",ComputePlacement(task));
1528       task->execute();
1529       traceExec(task, "end execution OK",ComputePlacement(task));
1530     }
1531   catch(Exception& ex)
1532     {
1533       std::cerr << "YACS Exception during execute" << std::endl;
1534       std::cerr << ex.what() << std::endl;
1535       ev=YACS::ABORT;
1536       string message = "end execution ABORT, ";
1537       message += ex.what();
1538       traceExec(task, message,ComputePlacement(task));
1539     }
1540   catch(...) 
1541     {
1542       // Execution has failed
1543       std::cerr << "Execution has failed: unknown reason" << std::endl;
1544       ev=YACS::ABORT;
1545       traceExec(task, "end execution ABORT, unknown reason",ComputePlacement(task));
1546     }
1547
1548   // Disconnect task
1549   try
1550     {
1551       DEBTRACE("task->disconnectService()");
1552       task->disconnectService();
1553       traceExec(task, "disconnectService",ComputePlacement(task));
1554     }
1555   catch(...) 
1556     {
1557       // Disconnect has failed
1558       std::cerr << "disconnect has failed" << std::endl;
1559       ev=YACS::ABORT;
1560       traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
1561     }
1562   //
1563
1564   std::string placement(ComputePlacement(task));
1565
1566   // container management for HomogeneousPoolOfContainer
1567
1568   HomogeneousPoolContainer *contC(dynamic_cast<HomogeneousPoolContainer *>(task->getContainer()));
1569   if(contC)
1570   {
1571     std::lock_guard<std::mutex> alckCont(contC->getLocker());
1572     contC->release(task);
1573   }
1574
1575   return ev;
1576 }
1577
1578 void Executor::makeDatastreamConnections(Task *task)
1579 {
1580   YACS::StatesForNode state=task->getState();
1581   if(state != YACS::TOLOAD && state != YACS::TORECONNECT)
1582     return;
1583   try
1584     {
1585       task->connectService();
1586       traceExec(task, "connectService",ComputePlacement(task));
1587       {//Critical section
1588         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1589         task->connected();
1590       }//End of critical section
1591     }
1592   catch(Exception& ex) 
1593     {
1594       std::cerr << ex.what() << std::endl;
1595       try
1596         {
1597           (task)->disconnectService();
1598           traceExec(task, "disconnectService",ComputePlacement(task));
1599         }
1600       catch(...) 
1601         {
1602           // Disconnect has failed
1603           traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
1604         }
1605       {//Critical section
1606         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1607         task->aborted();
1608         _mainSched->notifyFrom(task,YACS::ABORT,this);
1609       }//End of critical section
1610     }
1611   catch(...) 
1612     {
1613       std::cerr << "Problem in connectService" << std::endl;
1614       try
1615         {
1616           (task)->disconnectService();
1617           traceExec(task, "disconnectService",ComputePlacement(task));
1618         }
1619       catch(...) 
1620         {
1621           // Disconnect has failed
1622           traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
1623         }
1624       {//Critical section
1625         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1626         task->aborted();
1627         _mainSched->notifyFrom(task,YACS::ABORT,this);
1628       }//End of critical section
1629     }
1630   if(task->getState() == YACS::ERROR)
1631     {
1632       //try to put all coupled tasks in error
1633       std::set<Task*> coupledSet;
1634       task->getCoupledTasks(coupledSet);
1635       for (std::set<Task*>::iterator it = coupledSet.begin(); it != coupledSet.end(); ++it)
1636         {
1637           Task* t=*it;
1638           if(t == task)continue;
1639           if(t->getState() == YACS::ERROR)continue;
1640           try
1641             {
1642               t->disconnectService();
1643               traceExec(t, "disconnectService",ComputePlacement(task));
1644             }
1645           catch(...)
1646             {
1647               // Disconnect has failed
1648               traceExec(t, "disconnectService failed, ABORT",ComputePlacement(task));
1649             }
1650           {//Critical section
1651             YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1652             t->aborted();
1653             _mainSched->notifyFrom(t,YACS::ABORT,this);
1654           }//End of critical section
1655           traceExec(t, "state:"+Node::getStateName(t->getState()),ComputePlacement(task));
1656         }
1657     }
1658   traceExec(task, "state:"+Node::getStateName(task->getState()),ComputePlacement(task));
1659 }
1660
1661 #include "workloadmanager/WorkloadManager.hxx"
1662 #include "workloadmanager/DefaultAlgorithm.hxx"
1663 #include "Runtime.hxx"
1664 static
1665 void loadResources(WorkloadManager::WorkloadManager& wm)
1666 {
1667   Runtime *r(getRuntime());
1668   if(!r)
1669     throw YACS::Exception("loadResources : no runtime  !");
1670   std::vector< std::pair<std::string,int> > data(r->getCatalogOfComputeNodes());
1671   int id = 0;
1672   for(const std::pair<std::string,int>& res : data)
1673   {
1674     WorkloadManager::Resource newResource;
1675     newResource.name = res.first;
1676     newResource.id = id;
1677     id++;
1678     newResource.nbCores = res.second;
1679     wm.addResource(newResource);
1680   }
1681 }
1682
1683 class NewTask : public WorkloadManager::Task
1684 {
1685 public:
1686   NewTask(Executor& executor, YACS::ENGINE::Task* yacsTask);
1687   const WorkloadManager::ContainerType& type()const override;
1688   void run(const WorkloadManager::Container& c)override;
1689 private:
1690   WorkloadManager::ContainerType _type;
1691   Executor& _executor;
1692   YACS::ENGINE::Task * _yacsTask;
1693 };
1694
1695 NewTask::NewTask(Executor& executor, YACS::ENGINE::Task* yacsTask)
1696 : _type()
1697 , _executor(executor)
1698 , _yacsTask(yacsTask)
1699 {
1700   _type.neededCores = 0;
1701   _type.id = 0;
1702   _type.name = "test";
1703 }
1704
1705 const WorkloadManager::ContainerType& NewTask::type()const
1706 {
1707   return _type;
1708 }
1709
1710 void NewTask::run(const WorkloadManager::Container& c)
1711 {
1712   _executor.loadTask(_yacsTask);
1713   _executor.makeDatastreamConnections(_yacsTask);
1714   YACS::Event ev = _executor.runTask(_yacsTask);
1715   _executor.endTask(_yacsTask, ev);
1716   delete this; // provisoire
1717 }
1718
1719 void Executor::newRun(Scheduler *graph,int debug, bool fromScratch)
1720 {
1721   DEBTRACE("Executor::newRun debug: "<< graph->getName() <<" "<< debug<<" fromScratch: "<<fromScratch);
1722   { // --- Critical section
1723     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1724     _mainSched = graph;
1725     _root = dynamic_cast<ComposedNode *>(_mainSched);
1726     if (!_root) throw Exception("Executor::Run, Internal Error!");
1727     _executorState = YACS::NOTYETINITIALIZED;
1728     sendEvent("executor");
1729     _toContinue=true;
1730     _isOKToEnd = false;
1731     _errorDetected = false;
1732     _isWaitingEventsFromRunningTasks = false;
1733     _numberOfRunningTasks = 0;
1734     _runningTasks.clear();
1735     _numberOfEndedTasks = 0;
1736     string tracefile = "traceExec_";
1737     tracefile += _mainSched->getName();
1738     _trace.open(tracefile.c_str());
1739     _start = std::chrono::steady_clock::now();
1740   } // --- End of critical section
1741
1742   if (debug > 1) _displayDot(graph);
1743
1744   if (fromScratch)
1745     {
1746       try
1747         {
1748           graph->init();
1749           graph->exUpdateState();
1750         }
1751       catch(Exception& ex)
1752         {
1753           DEBTRACE("exception: "<< (ex.what()));
1754           _executorState = YACS::FINISHED;
1755           sendEvent("executor");
1756           throw;
1757         }
1758     }
1759   _executorState = YACS::INITIALISED;
1760   sendEvent("executor");
1761
1762   if (debug > 1) _displayDot(graph);
1763
1764   bool isMore;
1765   int problemCount=0;
1766   int numberAllTasks;
1767
1768   _executorState = YACS::RUNNING;
1769   sendEvent("executor");
1770
1771   WorkloadManager::DefaultAlgorithm algo;
1772   WorkloadManager::WorkloadManager wlm(algo);
1773   loadResources(wlm);
1774   wlm.start();
1775   
1776   while (_toContinue)
1777     {
1778       DEBTRACE("--- executor main loop");
1779       sleepWhileNoEventsFromAnyRunningTask();
1780       DEBTRACE("--- events...");
1781       if (debug > 2) _displayDot(graph);
1782       { // --- Critical section
1783         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1784         std::vector<Task *> readyTasks=graph->getNextTasks(isMore);
1785         graph->selectRunnableTasks(readyTasks);
1786         _tasks.clear();
1787         for(Task * t : readyTasks)
1788           if(_runningTasks.find(t) == _runningTasks.end())
1789             _tasks.push_back(t);
1790         // TODO: to be removed
1791         FilterTasksConsideringContainers(_tasks);
1792         numberAllTasks=_numberOfRunningTasks+_tasks.size();
1793       } // --- End of critical section
1794       if (debug > 2) _displayDot(graph);
1795       DEBTRACE("--- events...");
1796       if (_executorState == YACS::RUNNING)
1797       {
1798         if (checkBreakPoints()) break; // end of thread requested, OK to exit at once;
1799         for(Task * task : _tasks)
1800         {
1801           beginTask(task);
1802           NewTask* newTask = new NewTask(*this, task);
1803           wlm.addTask(newTask);
1804         }
1805       }
1806       if (debug > 1) _displayDot(graph);
1807       { // --- Critical section
1808         DEBTRACE("---");
1809         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1810         //It is possible that the graph is finished but it remains running tasks (it's an error but we must take it into account)
1811         _toContinue = !graph->isFinished();
1812
1813         DEBTRACE("_numberOfRunningTasks: " << _numberOfRunningTasks);
1814         DEBTRACE("_numberOfEndedTasks: " << _numberOfEndedTasks);
1815         DEBTRACE("_toContinue: " << _toContinue);
1816         if(_toContinue && numberAllTasks==0)
1817         {
1818           //Problem : no running tasks and no task to launch ??
1819           problemCount++;
1820           std::cerr << "Problem in Executor : no running tasks and no task to launch ?? problemCount=" << problemCount << std::endl;
1821           //Pause to give a chance to interrupt
1822           usleep(1000);
1823           if(problemCount > 25)
1824           {
1825             // Too much problems encountered : stop execution
1826             _toContinue=false;
1827           }
1828         }
1829
1830         if (! _toContinue)
1831           {
1832             _executorState = YACS::FINISHED;
1833             sendEvent("executor");
1834             _condForPilot.notify_all();
1835           }
1836       } // --- End of critical section
1837       if (debug > 0) _displayDot(graph);
1838       DEBTRACE("_toContinue: " << _toContinue);
1839     }
1840
1841   wlm.stop();
1842   DEBTRACE("End of main Loop");
1843
1844   { // --- Critical section
1845     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1846     if ( _toContinue) // --- break while(): request to stop detected on checkBreakPoints()
1847       {
1848         DEBTRACE("stop requested: End soon");
1849         _executorState = YACS::STOPPED;
1850         _toContinue = false;
1851         sendEvent("executor");
1852       }
1853   } // --- End of critical section
1854   if ( _dumpOnErrorRequested && _errorDetected)
1855     {
1856       saveState(_dumpErrorFile);
1857     }
1858   {
1859     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForTrace);
1860     _trace.close();
1861   }
1862   DEBTRACE("End of RunB thread");  
1863 }