Salome HOME
6b43fb0ee9eb9e3cbe7ee451198e6e9485229c76
[modules/yacs.git] / src / engine / Executor.cxx
1 // Copyright (C) 2006-2023  CEA, EDF
2 //
3 // This library is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU Lesser General Public
5 // License as published by the Free Software Foundation; either
6 // version 2.1 of the License, or (at your option) any later version.
7 //
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
11 // Lesser General Public License for more details.
12 //
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
16 //
17 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
18 //
19
20 #include "Executor.hxx"
21 #include "Task.hxx"
22 #include "AutoLocker.hxx"
23 #include "Scheduler.hxx"
24 #include "Dispatcher.hxx"
25 #include "Container.hxx"
26 #include "HomogeneousPoolContainer.hxx"
27 #include "ComponentInstance.hxx"
28
29 #include "VisitorSaveState.hxx"
30 #include "ServiceNode.hxx"
31 #include "ComposedNode.hxx"
32
33 #include "WlmTask.hxx"
34 #include "workloadmanager/WorkloadManager.hxx"
35 #include "workloadmanager/DefaultAlgorithm.hxx"
36
37 #include <iostream>
38 #include <fstream>
39 #include <sys/stat.h>
40 #ifndef WIN32
41 #include <sys/time.h>
42 #include <unistd.h>
43 #endif
44
45 #include <cstdlib>
46 #include <algorithm>
47
48 #ifdef WIN32
49 #define usleep(A) _sleep(A/1000)
50 #if !defined(S_ISCHR) || !defined(S_ISREG)
51 #  ifndef S_IFMT
52 #    ifdef _S_IFMT
53 #      define S_IFMT _S_IFMT
54 #      define S_IFCHR _S_IFCHR
55 #      define S_IFREG _S_IFREG
56 #    else
57 #    ifdef __S_IFMT
58 #      define S_IFMT __S_IFMT
59 #      define S_IFCHR __S_IFCHR
60 #      define S_IFREG __S_IFREG
61 #    endif
62 #    endif
63 #  endif
64 #  define S_ISCHR(mode) (((mode) & S_IFMT) == S_IFCHR)
65 #  define S_ISREG(mode) (((mode) & S_IFMT) == S_IFREG)
66 #endif
67 #endif
68
69 using namespace YACS::ENGINE;
70 using namespace std;
71
72 using YACS::BASES::Mutex;
73 using YACS::BASES::Thread;
74 using YACS::BASES::Semaphore;
75
76 //#define _DEVDEBUG_
77 #include "YacsTrace.hxx"
78
79 int Executor::_maxThreads(1000);
80 size_t Executor::_threadStackSize(262144); // Default thread stack size is 256 kB == 2**18 because threads launched by YACS are lightweight
81
82 Executor::Executor():_mainSched(NULL),_isWaitingEventsFromRunningTasks(false),_nbOfConcurrentThreads(0), _semForMaxThreads(_maxThreads),_keepGoingOnFail(false),_DPLScopeSensitive(false)
83 {
84   _root=0;
85   _toContinue = true;
86   _isOKToEnd = false;
87   _stopOnErrorRequested = false;
88   _dumpOnErrorRequested = false;
89   _errorDetected = false;
90   _isRunningunderExternalControl=false;
91   _executorState = YACS::NOTYETINITIALIZED;
92   _execMode = YACS::CONTINUE;
93   _semThreadCnt = _maxThreads;
94   _numberOfRunningTasks = 0;
95   _numberOfEndedTasks = 0;
96   DEBTRACE("Executor initialized with max threads = " << _maxThreads);
97 }
98
99 Executor::~Executor()
100 {
101   for(list<Thread *>::iterator iter=_groupOfAllThreadsCreated.begin();iter!=_groupOfAllThreadsCreated.end();iter++)
102     delete *iter;
103 }
104
105 //! Execute a graph waiting for completion
106 /*!
107  *  \param graph : schema to execute
108  *  \param debug : display the graph with dot if debug == 1
109  *  \param fromScratch : if true the graph is reinitialized
110  *
111  *  Calls Scheduler::getNextTasks and Scheduler::selectRunnableTasks to select tasks to execute
112  *  
113  *  Calls Executor::launchTask to execute a selected Task.
114  *
115  *  Completion when graph is finished (Scheduler::isFinished)
116  */
117
118 void Executor::RunA(Scheduler *graph,int debug, bool fromScratch)
119 {
120   DEBTRACE("Executor::RunW debug: " << debug << " fromScratch: " << fromScratch);
121   _mainSched=graph;
122   _root = dynamic_cast<ComposedNode *>(_mainSched);
123   if (!_root) throw Exception("Executor::Run, Internal Error!");
124   bool isMore;
125   int i=0;
126   if(debug>1)_displayDot(graph);
127   if (fromScratch)
128     {
129       graph->init();
130       graph->exUpdateState();
131     }
132   if(debug>1)_displayDot(graph);
133   vector<Task *> tasks;
134   vector<Task *>::iterator iter;
135   _toContinue=true;
136   _execMode = YACS::CONTINUE;
137   _isWaitingEventsFromRunningTasks = false;
138   _numberOfRunningTasks = 0;
139   _runningTasks.clear();
140   _numberOfEndedTasks=0;
141   while(_toContinue)
142     {
143       sleepWhileNoEventsFromAnyRunningTask();
144
145       if(debug>2)_displayDot(graph);
146
147       {//Critical section
148         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
149         tasks=graph->getNextTasks(isMore);
150         graph->selectRunnableTasks(tasks);
151       }//End of critical section
152
153       if(debug>2)_displayDot(graph);
154
155       for(iter=tasks.begin();iter!=tasks.end();iter++)
156         loadTask(*iter,this);
157
158       if(debug>1)_displayDot(graph);
159
160       launchTasks(tasks);
161
162       if(debug>1)_displayDot(graph);
163
164       {//Critical section
165         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
166         _toContinue=!graph->isFinished();
167       }//End of critical section
168       DEBTRACE("_toContinue: " << _toContinue);
169
170       if(debug>0)_displayDot(graph);
171
172       i++;
173     }
174 }
175
176 //! Execute a graph with breakpoints or step by step
177 /*!
178  *  To be launch in a thread (main thread controls the progression).
179  *  \param graph : schema to execute
180  *  \param debug : display the graph with dot if debug >0
181  *  \param fromScratch : if false, state from a previous partial exection is already loaded
182  *
183  *  Calls Scheduler::getNextTasks and Scheduler::selectRunnableTasks to select tasks to execute
184  *  
185  *  Calls Executor::checkBreakPoints to verify if a pause is requested 
186  * 
187  *  Calls Executor::launchTask to execute a selected Task
188  *
189  *  Completion when graph is finished (Scheduler::isFinished)
190  *
191  *  States of execution:
192  *  - YACS::NOTYETINITIALIZED
193  *  - YACS::INITIALISED
194  *  - YACS::RUNNING            (to next breakpoint or step)
195  *  - YACS::WAITINGTASKS       (a breakpoint or step as been reached, but there are still running tasks)
196  *  - YACS::PAUSED             (a breakpoint or step as been reached, no more running tasks)
197  *  - YACS::FINISHED           (no more ready tasks, nore running tasks)
198  *  - YACS::STOPPED            (stopped by user before end)
199  *
200  *  Modes of Execution:
201  *  - YACS::CONTINUE           (normal run without breakpoints)
202  *  - YACS::STEPBYSTEP         (pause at each loop)
203  *  - YACS::STOPBEFORENODES    (pause when a node is reached)
204  *
205  *  A breakpoint is defined by a node name. The breakpoint is reached when the node becomes ready.
206  *  Step by Step means execution node by node or group of node by group of nodes.
207  *  At a given step, the user decides to launch all the ready nodes or only a subset
208  *  (Caution: some nodes must run in parallel). 
209  *  The next event (end of task) may give a new set of ready nodes, and define a new step.
210  *
211  *  The graph execution may be controled by a pilot which sends requests. Requests are asynchronous.
212  *  Requests are taken into account only on certain states, otherwise return the status IgnoredRequest.
213  *  - Executor::getCurrentExecMode
214  *  - Executor::getExecutorState
215  *  - Executor::setExecMode             : change the execution mode for next loop
216  *  - Executor::setListOfBreakPoints    : must be set before setting YACS::STOPBEFORENODES
217  *  - Executor::getTasksToLoad          : when paused or waiting tasks, get the list of next tasks
218  *  - Executor::setStepsToExecute       : define a subset of the list given by Executor::getTasksToLoad
219  *  - Executor::resumeCurrentBreakPoint : when paused or waiting tasks, resumes execution
220  *  - Executor::isNotFinished
221  *  - Executor::stopExecution           : stop execution asap, i.e. set STEPBYSTEP and wait PAUSED
222  *  - Executor::saveState               : dump the current state of execution in an xml file
223  *  - Executor::loadState               : Not yet implemented
224  *  - Executor::getNbOfThreads
225  *  - Executor::displayDot
226  *  - Executor::setStopOnError          : ask to stop execution if a node is found in ERROR state
227  *
228  *  If the pilot wants to wait the state YACS::PAUSED or YACS::WAITINGTASKS, synchronisation is obtained with:
229  *  - Executor::waitPause
230  *
231  *  TO BE VALIDATED:
232  *  - Pilot may connect to executor during execution, or deconnect.
233  *  - Several Pilots may be connected at the same time (for observation...)
234  * 
235  */
236
237 void Executor::RunB(Scheduler *graph,int debug, bool fromScratch)
238 {
239   DEBTRACE("Executor::RunB debug: "<< graph->getName() <<" "<< debug<<" fromScratch: "<<fromScratch);
240
241   { // --- Critical section
242     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
243     _mainSched = graph;
244     _root = dynamic_cast<ComposedNode *>(_mainSched);
245     if (!_root) throw Exception("Executor::Run, Internal Error!");
246     _executorState = YACS::NOTYETINITIALIZED;
247     sendEvent("executor");
248     _toContinue=true;
249     _isOKToEnd = false;
250     _errorDetected = false;
251     _isWaitingEventsFromRunningTasks = false;
252     _numberOfRunningTasks = 0;
253     _runningTasks.clear();
254     _numberOfEndedTasks = 0;
255     string tracefile = "traceExec_";
256     tracefile += _mainSched->getName();
257     _trace.open(tracefile.c_str());
258     _start = std::chrono::steady_clock::now();
259   } // --- End of critical section
260
261   if (debug > 1) _displayDot(graph);
262
263   if (fromScratch)
264     {
265       try
266         {
267           graph->init();
268           graph->exUpdateState();
269         }
270       catch(Exception& ex)
271         {
272           DEBTRACE("exception: "<< (ex.what()));
273           _executorState = YACS::FINISHED;
274           sendEvent("executor");
275           throw;
276         }
277     }
278   _executorState = YACS::INITIALISED;
279   sendEvent("executor");
280
281   if (debug > 1) _displayDot(graph);
282
283   vector<Task *>::iterator iter;
284   bool isMore;
285   int problemCount=0;
286   int numberAllTasks;
287
288   _executorState = YACS::RUNNING;
289   sendEvent("executor");
290   while (_toContinue)
291     {
292       DEBTRACE("--- executor main loop");
293       sleepWhileNoEventsFromAnyRunningTask();
294       DEBTRACE("--- events...");
295       if (debug > 2) _displayDot(graph);
296       { // --- Critical section
297         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
298         _tasks=graph->getNextTasks(isMore);
299         graph->selectRunnableTasks(_tasks);
300         FilterTasksConsideringContainers(_tasks);
301         numberAllTasks=_numberOfRunningTasks+_tasks.size();
302       } // --- End of critical section
303       if (debug > 2) _displayDot(graph);
304       if (_executorState == YACS::RUNNING)
305         {
306           if (checkBreakPoints()) break; // end of thread requested, OK to exit at once;
307           if (debug > 0) _displayDot(graph);
308           DEBTRACE("---");
309           loadParallelTasks(_tasks,this);
310           if (debug > 1) _displayDot(graph);
311           DEBTRACE("---");
312           launchTasks(_tasks);
313           DEBTRACE("---");
314         }
315       if (debug > 1) _displayDot(graph);
316       { // --- Critical section
317         DEBTRACE("---");
318         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
319         //It is possible that the graph is finished but it remains running tasks (it's an error but we must take it into account)
320         if(_numberOfRunningTasks == 0)
321           _toContinue = !graph->isFinished();
322
323         DEBTRACE("_numberOfRunningTasks: " << _numberOfRunningTasks);
324         DEBTRACE("_numberOfEndedTasks: " << _numberOfEndedTasks);
325         DEBTRACE("_toContinue: " << _toContinue);
326         if(_toContinue && numberAllTasks==0)
327           {
328             //Problem : no running tasks and no task to launch ??
329             problemCount++;
330             std::cerr << "Problem in Executor : no running tasks and no task to launch ?? problemCount=" << problemCount << std::endl;
331             //Pause to give a chance to interrupt
332             usleep(1000);
333             if(problemCount > 25)
334               {
335                 // Too much problems encountered : stop execution
336                 _toContinue=false;
337               }
338           }
339
340         if (! _toContinue)
341           {
342             _executorState = YACS::FINISHED;
343             sendEvent("executor");
344             _condForPilot.notify_all();
345           }
346       } // --- End of critical section
347       if (debug > 0) _displayDot(graph);
348       DEBTRACE("_toContinue: " << _toContinue);
349     }
350
351   DEBTRACE("End of main Loop");
352
353   { // --- Critical section
354     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
355     if ( _toContinue) // --- break while(): request to stop detected on checkBreakPoints()
356       {
357         DEBTRACE("stop requested: End soon");
358         _executorState = YACS::STOPPED;
359         _toContinue = false;
360         sendEvent("executor");
361       }
362   } // --- End of critical section
363   if ( _dumpOnErrorRequested && _errorDetected)
364     {
365       saveState(_dumpErrorFile);
366     }
367   {
368     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForTrace);
369     _trace.close();
370   }
371   DEBTRACE("End of RunB thread");  
372 }
373
374 YACS::ExecutionMode Executor::getCurrentExecMode()
375 {
376   _isRunningunderExternalControl=true;
377   return _execMode;
378 }
379
380
381 YACS::ExecutorState Executor::getExecutorState()
382 {
383   _isRunningunderExternalControl=true;
384   return _executorState;
385 }
386
387
388 bool Executor::isNotFinished()
389 {
390   _isRunningunderExternalControl=true;
391   return _toContinue;
392 }
393
394 //! ask to stop execution on the first node found in error
395 /*!
396  * \param dumpRequested   produce a state dump when an error is found
397  * \param xmlFile         name of file used for state dump
398  */
399
400 void Executor::setStopOnError(bool dumpRequested, std::string xmlFile)
401 {
402   { // --- Critical section
403     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
404     _dumpErrorFile=xmlFile;
405     _stopOnErrorRequested=true;
406     _dumpOnErrorRequested = dumpRequested;
407     if (dumpRequested && xmlFile.empty())
408       throw YACS::Exception("dump on error requested and no filename given for dump");
409     DEBTRACE("_dumpErrorFile " << _dumpErrorFile << " " << _dumpOnErrorRequested);
410   } // --- End of critical section
411 }
412
413 //! ask to do not stop execution on nodes found in error
414 /*!
415  */
416
417 void Executor::unsetStopOnError()
418 {
419   { // --- Critical section
420     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
421     _stopOnErrorRequested=false;
422   } // --- End of critical section
423 }
424
425 //! Dynamically set the current mode of execution
426 /*!
427  * The mode can be Continue, step by step, or stop before execution of a node
428  * defined in a list of breakpoints.
429  */
430
431 void Executor::setExecMode(YACS::ExecutionMode mode)
432 {
433   DEBTRACE("Executor::setExecMode(YACS::ExecutionMode mode) " << mode);
434   { // --- Critical section
435     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
436     _isRunningunderExternalControl=true;
437     _execMode = mode;
438   } // --- End of critical section
439 }
440
441 //! wake up executor when in pause
442 /*!
443  * When Executor is in state paused or waiting for task completion, the thread
444  * running loop RunB waits on condition _condForStepByStep.
445  * Thread RunB is waken up.
446  * \return true when actually wakes up executor
447  */
448
449 bool Executor::resumeCurrentBreakPoint()
450 {
451   DEBTRACE("Executor::resumeCurrentBreakPoint()");
452   bool ret = false;
453   //bool doDump = false;
454   { // --- Critical section
455     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
456     _isRunningunderExternalControl=true;
457     DEBTRACE("_executorState: " << _executorState);
458     switch (_executorState)
459       {
460       case YACS::WAITINGTASKS:
461       case YACS::PAUSED:
462         {
463           _condForStepByStep.notify_all();
464           _executorState = YACS::RUNNING;
465           sendEvent("executor");
466           ret = true;
467           //if (_dumpOnErrorRequested && _errorDetected) doDump =true;
468           break;
469         }
470       case YACS::FINISHED:
471       case YACS::STOPPED:
472         {
473           //if (_dumpOnErrorRequested && _errorDetected) doDump =true;
474           DEBTRACE("Graph Execution finished or stopped !");
475           break;
476         }
477       default :
478         {
479           // debug: no easy way to verify if main loop is acutally waiting on condition
480         }
481       }
482     DEBTRACE("---");
483     //if (doDump) saveState(_dumpErrorFile);
484   } // --- End of critical section
485   return ret;
486 }
487
488
489 //! define a list of nodes names as breakpoints in the graph
490
491
492 void Executor::setListOfBreakPoints(std::list<std::string> listOfBreakPoints)
493 {
494   DEBTRACE("Executor::setListOfBreakPoints(std::list<std::string> listOfBreakPoints)");
495   { // --- Critical section
496     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
497     _isRunningunderExternalControl=true;
498     _listOfBreakPoints = listOfBreakPoints;
499   } // --- End of critical section
500 }
501
502
503 //! Get the list of tasks to load, to define a subset to execute in step by step mode
504 /*!
505  *  If the executor is not in mode YACS::WAITINGTASKS nor YACS::PAUSED, the list is empty.
506  *  Use Executor::waitPause to wait.
507  */
508 std::list<std::string> Executor::getTasksToLoad()
509 {
510   DEBTRACE("Executor::getTasksToLoad()");
511   list<string> listOfNodesToLoad;
512   listOfNodesToLoad.clear();
513   { // --- Critical section
514     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
515     _isRunningunderExternalControl=true;
516     switch (_executorState)
517       {
518       case YACS::WAITINGTASKS:
519       case YACS::PAUSED:
520         {
521           listOfNodesToLoad = _listOfTasksToLoad;
522           break;
523         }
524       case YACS::NOTYETINITIALIZED:
525       case YACS::INITIALISED:
526       case YACS::RUNNING:
527       case YACS::FINISHED:
528       case YACS::STOPPED:
529       default:
530         {
531           break;
532         }
533       }
534   } // --- End of critical section
535   return listOfNodesToLoad;
536 }
537
538
539 //! Define a subset of task to execute in step by step mode
540 /*!
541  * Behaviour is unpredictable if the list is not a subset of the list given by Executor::getTasksToLoad
542  * in the current step.
543  * If some nodes must run in parallel, they must stay together in the list.
544  */
545
546 bool Executor::setStepsToExecute(std::list<std::string> listToExecute)
547 {
548   DEBTRACE("Executor::setStepsToExecute(std::list<std::string> listToExecute)");
549   bool ret = true;
550   vector<Task *>::iterator iter;
551   vector<Task *> restrictedTasks;
552   { // --- Critical section
553     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
554     _isRunningunderExternalControl=true;
555     switch (_executorState)
556       {
557       case YACS::WAITINGTASKS:
558       case YACS::PAUSED:
559         {
560           for (iter=_tasksSave.begin(); iter!=_tasksSave.end(); iter++)
561             {
562               string readyNode = _mainSched->getTaskName(*iter);
563               if (find(listToExecute.begin(), listToExecute.end(), readyNode)
564                   != listToExecute.end())
565                 {
566                   restrictedTasks.push_back(*iter);
567                   DEBTRACE("node to execute " << readyNode);
568                 }
569             }
570           _tasks.clear();
571           for (iter=restrictedTasks.begin(); iter!=restrictedTasks.end(); iter++)
572             {
573               _tasks.push_back(*iter);
574             }
575           break;
576         }
577       case YACS::NOTYETINITIALIZED:
578       case YACS::INITIALISED:
579       case YACS::RUNNING:
580       case YACS::FINISHED:
581       case YACS::STOPPED:
582       default:
583         {
584           break;
585         }
586       }
587     } // --- End of critical section
588
589   _tasks.clear();
590   for (iter=restrictedTasks.begin(); iter!=restrictedTasks.end(); iter++)
591     {
592       _tasks.push_back(*iter);
593     }
594   for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
595     {
596       string readyNode = _mainSched->getTaskName(*iter);
597       DEBTRACE("selected node to execute " << readyNode);
598     }
599
600   return ret;
601 }
602
603 //! suspend pilot execution until Executor is in pause or waiting tasks completion mode.
604 /*!
605  *  Do nothing if execution is finished or in pause.
606  *  Wait first step if Executor is running or in initialization.
607  */
608
609 void Executor::waitPause()
610 {
611   DEBTRACE("Executor::waitPause()" << _executorState);
612   { // --- Critical section
613     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
614     _isRunningunderExternalControl=true;
615     switch (_executorState)
616       {
617       default:
618       case YACS::STOPPED:
619       case YACS::FINISHED:
620       case YACS::WAITINGTASKS:
621       case YACS::PAUSED:
622         {
623           break;
624         }
625       case YACS::NOTYETINITIALIZED:
626       case YACS::INITIALISED:
627       case YACS::RUNNING:
628         {
629           _condForPilot.wait(_mutexForSchedulerUpdate); // wait until executor is PAUSED or WAITINGTASKS
630           break;
631         }
632       }
633   } // --- End of critical section
634   DEBTRACE("---");
635 }
636
637 /*!
638  * This method can be called at any time simultaneously during a RunB call.
639  * This method will wait until the executor is locked in a consistent state of a running graph.
640  *
641  * This method is expected to be called in association with resume method.
642  * The returned parameter is expected to be transfered to resume method.
643  */
644 bool Executor::suspendASAP()
645 {
646   // no AutoLocker here. It's not a bug.
647   _mutexForSchedulerUpdate.lock();
648   if(!_toContinue && _executorState==YACS::FINISHED)
649     {// execution is finished
650       _mutexForSchedulerUpdate.unLock();
651       return false;// the executor is no more running
652     }
653   //general case. Leave method with locker in locked status
654   return true;
655 }
656
657 /*!
658  * This method is expected to be called in association with suspendASAP method.
659  * Expected to be called just after suspendASAP with output of resume as input parameter
660  */
661 void Executor::resume(bool suspended)
662 {
663   if(suspended)
664     _mutexForSchedulerUpdate.unLock();
665 }
666
667 //! stops the execution as soon as possible 
668
669 void Executor::stopExecution()
670 {
671   setExecMode(YACS::STEPBYSTEP);
672   //waitPause();
673   _isOKToEnd = true;
674   resumeCurrentBreakPoint();
675 }
676
677 //! save the current state of execution in an xml file
678
679 bool Executor::saveState(const std::string& xmlFile)
680 {
681   DEBTRACE("Executor::saveState() in " << xmlFile);
682   bool result = false;
683   try {
684     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
685     YACS::ENGINE::VisitorSaveState vst(_root);
686     vst.openFileDump(xmlFile.c_str());
687     _root->accept(&vst);
688     vst.closeFileDump();
689     result = true;
690   }
691   catch(Exception& ex) {
692     std::cerr << ex.what() << std::endl;
693   }
694   return result;
695 }
696
697 //! not yet implemented
698
699 bool Executor::loadState()
700 {
701   DEBTRACE("Executor::loadState()");
702   _isRunningunderExternalControl=true;
703   return true;
704 }
705
706
707 static int isfile(const char *filename) 
708 {
709   struct stat buf;
710   if (stat(filename, &buf) != 0)
711     return 0;
712   if (!S_ISREG(buf.st_mode))
713     return 0;
714   return 1;
715 }
716
717 //! Display the graph state as a dot display, public method
718
719 void Executor::displayDot(Scheduler *graph)
720 {
721   _isRunningunderExternalControl=true;
722   _displayDot(graph);
723 }
724
725 //! Display the graph state as a dot display
726 /*!
727  *  \param graph  : the node to display
728  */
729
730 void Executor::_displayDot(Scheduler *graph)
731 {
732    std::ofstream g("titi");
733    ((ComposedNode*)graph)->writeDot(g);
734    g.close();
735    const char displayScript[]="display.sh";
736    if(isfile(displayScript))
737      system("sh display.sh");
738    else
739      system("dot -Tpng titi|display -delay 5");
740 }
741
742 //! Wait reactivation in modes Step By step or with BreakPoints
743 /*!
744  *  Check mode of execution (set by main thread):
745  *  - YACS::CONTINUE        : the graph execution continues.
746  *  - YACS::STEPBYSTEP      : wait on condition (pilot thread, Executor::resumeCurrentBreakPoint)
747  *  - YACS::STOPBEFORENODES : if there are ready nodes in a list of breakpoints,
748  *                            wait on condition (pilot thread, Executor::resumeCurrentBreakPoint)
749  *                            else continue the graph execution.
750  *  \return true if end of executor thread is requested
751  */
752
753 bool Executor::checkBreakPoints()
754 {
755   DEBTRACE("Executor::checkBreakPoints()");
756   vector<Task *>::iterator iter;
757   bool endRequested = false;
758
759   switch (_execMode)
760     {
761     case YACS::CONTINUE:
762       {
763         break;
764       }
765     case YACS::STOPBEFORENODES:
766       {
767         bool stop = false;
768         { // --- Critical section
769           YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
770           _tasksSave = _tasks;
771           for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
772             {
773               string nodeToLoad = _mainSched->getTaskName(*iter);
774               if (find(_listOfBreakPoints.begin(), _listOfBreakPoints.end(), nodeToLoad)
775                   != _listOfBreakPoints.end())
776                 {
777                   stop = true;
778                   break;
779                 }
780             }
781           if (stop)
782             {
783               _listOfTasksToLoad.clear();
784               for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
785                 {
786                   string nodeToLoad = _mainSched->getTaskName(*iter);
787                   _listOfTasksToLoad.push_back(nodeToLoad);
788                 }
789               if (getNbOfThreads())
790                 _executorState = YACS::WAITINGTASKS; // will be paused after completion of running tasks
791               else
792                 _executorState = YACS::PAUSED;
793               sendEvent("executor");
794               _condForPilot.notify_all();
795             }
796           if (stop && !_isOKToEnd) waitResume(); // wait until pilot calls resumeCurrentBreakPoint(), mutex released during wait 
797           if (_isOKToEnd) endRequested = true;
798         } // --- End of critical section
799           if (stop) DEBTRACE("wake up from waitResume");
800         break;
801       }
802     default:
803     case YACS::STEPBYSTEP:
804       {
805         { // --- Critical section
806           YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
807           _tasksSave = _tasks;
808           _listOfTasksToLoad.clear();
809           for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
810             {
811               string nodeToLoad = _mainSched->getTaskName(*iter);
812               _listOfTasksToLoad.push_back(nodeToLoad);
813             }
814           if (getNbOfThreads())
815             _executorState = YACS::WAITINGTASKS; // will be paused after completion of running tasks
816           else
817             _executorState = YACS::PAUSED;
818           sendEvent("executor");
819           _condForPilot.notify_all();
820           if (!_isOKToEnd)
821             waitResume(); // wait until pilot calls resumeCurrentBreakPoint(), mutex released during wait
822                           // or, if no pilot, wait until no more running tasks (stop on error)
823           if (_isOKToEnd) endRequested = true;
824         } // --- End of critical section
825         DEBTRACE("wake up from waitResume");
826         break;
827       }
828     }
829   DEBTRACE("endRequested: " << endRequested);
830   return endRequested;
831 }
832
833
834 //! in modes Step By step or with BreakPoint, wait until pilot resumes the execution
835 /*!
836  *  With the condition Mutex, the mutex is released atomically during the wait.
837  *  Pilot calls Executor::resumeCurrentBreakPoint to resume execution.
838  *  Must be called while mutex is locked.
839  */
840
841 void Executor::waitResume()
842 {
843   DEBTRACE("Executor::waitResume()");
844   _condForStepByStep.wait(_mutexForSchedulerUpdate); // wait until pilot calls resumeCurrentBreakPoint()
845   DEBTRACE("---");
846 }
847
848
849 //! Perform loading of a Task.
850 /*!
851  *  \param task  : Task to load
852  */
853
854 void Executor::loadTask(Task *task, const Executor *execInst)
855 {
856   DEBTRACE("Executor::loadTask(Task *task)");
857   if(task->getState() != YACS::TOLOAD)
858     return;
859   traceExec(task, "state:TOLOAD", ComputePlacement(task));
860   {//Critical section
861     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
862     _mainSched->notifyFrom(task,YACS::START,execInst);
863   }//End of critical section
864   try
865     {
866       traceExec(task, "load", ComputePlacement(task));
867       task->load();
868       traceExec(task, "initService", ComputePlacement(task));
869       task->initService();
870     }
871   catch(Exception& ex) 
872     {
873       std::cerr << ex.what() << std::endl;
874       {//Critical section
875         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
876         task->aborted();
877         _mainSched->notifyFrom(task,YACS::ABORT,execInst);
878         traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
879       }//End of critical section
880     }
881   catch(...) 
882     {
883       std::cerr << "Load failed" << std::endl;
884       {//Critical section
885         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
886         task->aborted();
887         _mainSched->notifyFrom(task,YACS::ABORT,execInst);
888         traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
889       }//End of critical section
890     }
891 }
892
893 struct threadargs
894 {
895   Task *task;
896   Scheduler *sched;
897   Executor *execInst;
898 };
899
900 void Executor::loadParallelTasks(const std::vector<Task *>& tasks, const Executor *execInst)
901 {
902   std::vector<Thread> ths(tasks.size());
903   std::size_t ithread(0);
904   for(std::vector<Task *>::const_iterator iter = _tasks.begin(); iter != _tasks.end(); iter++, ithread++)
905     {
906       DEBTRACE("Executor::loadParallelTasks(Task *task)");
907       struct threadargs *args(new threadargs);
908       args->task = (*iter);
909       args->sched = _mainSched;
910       args->execInst = this;
911       ths[ithread].go(functionForTaskLoad, args, _threadStackSize);
912     }
913   for(ithread=0;ithread<tasks.size();ithread++)
914     ths[ithread].join();
915 }
916
917 //! Execute a list of tasks possibly connected through datastream links
918 /*!
919  *  \param tasks  : a list of tasks to execute
920  *
921  */
922 void Executor::launchTasks(const std::vector<Task *>& tasks)
923 {
924   //First phase, make datastream connections
925   for(vector<Task *>::const_iterator iter=tasks.begin();iter!=tasks.end();iter++)
926     {
927       YACS::StatesForNode state=(*iter)->getState();
928       if(state != YACS::TOLOAD && state != YACS::TORECONNECT)continue;
929       try
930         {
931           (*iter)->connectService();
932           traceExec(*iter, "connectService",ComputePlacement(*iter));
933           {//Critical section
934             YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
935             (*iter)->connected();
936           }//End of critical section
937         }
938       catch(Exception& ex) 
939         {
940           std::cerr << ex.what() << std::endl;
941           try
942             {
943               (*iter)->disconnectService();
944               traceExec(*iter, "disconnectService",ComputePlacement(*iter));
945             }
946           catch(...) 
947             {
948               // Disconnect has failed
949               traceExec(*iter, "disconnectService failed, ABORT",ComputePlacement(*iter));
950             }
951           {//Critical section
952             YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
953             (*iter)->aborted();
954             _mainSched->notifyFrom(*iter,YACS::ABORT,this);
955           }//End of critical section
956         }
957       catch(...) 
958         {
959           std::cerr << "Problem in connectService" << std::endl;
960           try
961             {
962               (*iter)->disconnectService();
963               traceExec(*iter, "disconnectService",ComputePlacement(*iter));
964             }
965           catch(...) 
966             {
967               // Disconnect has failed
968               traceExec(*iter, "disconnectService failed, ABORT",ComputePlacement(*iter));
969             }
970           {//Critical section
971             YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
972             (*iter)->aborted();
973             _mainSched->notifyFrom(*iter,YACS::ABORT,this);
974           }//End of critical section
975         }
976       if((*iter)->getState() == YACS::ERROR)
977         {
978           //try to put all coupled tasks in error
979           std::set<Task*> coupledSet;
980           (*iter)->getCoupledTasks(coupledSet);
981           for (std::set<Task*>::iterator it = coupledSet.begin(); it != coupledSet.end(); ++it)
982             {
983               Task* t=*it;
984               if(t == *iter)continue;
985               if(t->getState() == YACS::ERROR)continue;
986               try
987                 {
988                   t->disconnectService();
989                   traceExec(t, "disconnectService",ComputePlacement(*iter));
990                 }
991               catch(...)
992                 {
993                   // Disconnect has failed
994                   traceExec(t, "disconnectService failed, ABORT",ComputePlacement(*iter));
995                 }
996               {//Critical section
997                 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
998                 t->aborted();
999                 _mainSched->notifyFrom(t,YACS::ABORT,this);
1000               }//End of critical section
1001               traceExec(t, "state:"+Node::getStateName(t->getState()),ComputePlacement(*iter));
1002             }
1003         }
1004       traceExec(*iter, "state:"+Node::getStateName((*iter)->getState()),ComputePlacement(*iter));
1005     }
1006
1007   //Second phase, execute each task in a thread
1008   for(vector<Task *>::const_iterator iter=tasks.begin();iter!=tasks.end();iter++)
1009     {
1010       launchTask(*iter);
1011     }
1012 }
1013
1014 //! Execute a Task in a thread
1015 /*!
1016  *  \param task  : Task to execute
1017  *
1018  *  Calls Scheduler::notifyFrom of main node (_mainSched) to notify start
1019  *
1020  *  Calls Executor::functionForTaskExecution in Thread
1021  */
1022
1023 void Executor::launchTask(Task *task)
1024 {
1025   DEBTRACE("Executor::launchTask(Task *task)");
1026   struct threadargs *args;
1027   if(task->getState() != YACS::TOACTIVATE)return;
1028
1029   DEBTRACE("before _semForMaxThreads.wait " << _semThreadCnt);
1030   if(_semThreadCnt == 0)
1031     {
1032       // --- Critical section
1033       YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1034       //check if we have enough threads to run
1035       std::set<Task*> tmpSet=_runningTasks;
1036       std::set<Task*>::iterator it = tmpSet.begin();
1037       std::string status="running";
1038       std::set<Task*> coupledSet;
1039       while( it != tmpSet.end() )
1040         {
1041           Task* tt=*it;
1042           coupledSet.clear();
1043           tt->getCoupledTasks(coupledSet);
1044           status="running";
1045           for (std::set<Task*>::iterator iter = coupledSet.begin(); iter != coupledSet.end(); ++iter)
1046             {
1047               if((*iter)->getState() == YACS::TOACTIVATE)status="toactivate";
1048               tmpSet.erase(*iter);
1049             }
1050           if(status=="running")break;
1051           it = tmpSet.begin();
1052         }
1053
1054       if(status=="toactivate")
1055         {
1056           std::cerr << "WARNING: maybe you need more threads to run your schema (current value="<< _maxThreads << ")" << std::endl;
1057           std::cerr << "If it is the case, set the YACS_MAX_THREADS environment variable to a bigger value (export YACS_MAX_THREADS=xxx)" << std::endl;
1058         }
1059       // --- End of critical section
1060     }
1061
1062   _semForMaxThreads.wait();
1063   _semThreadCnt -= 1;
1064
1065   args= new threadargs;
1066   args->task = task;
1067   args->sched = _mainSched;
1068   args->execInst = this;
1069
1070   traceExec(task, "launch",ComputePlacement(task));
1071
1072   { // --- Critical section
1073     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1074     _numberOfRunningTasks++;
1075     _runningTasks.insert(task);
1076     task->begin(); //change state to ACTIVATED
1077   } // --- End of critical section
1078   Thread(functionForTaskExecution, args, _threadStackSize);
1079 }
1080
1081 //! wait until a running task ends
1082
1083 void Executor::sleepWhileNoEventsFromAnyRunningTask()
1084 {
1085   DEBTRACE("Executor::sleepWhileNoEventsFromAnyRunningTask()");
1086 //   _semForNewTasksToPerform.wait(); //----utiliser pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
1087   YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1088   if (_numberOfRunningTasks > 0 && _numberOfEndedTasks==0)
1089     {
1090       _isWaitingEventsFromRunningTasks = true;
1091       _condForNewTasksToPerform.wait(_mutexForSchedulerUpdate); // mutex released during wait
1092     }
1093   _numberOfEndedTasks=0;
1094   DEBTRACE("---");
1095 }
1096
1097 //! not implemented
1098
1099 void Executor::notifyEndOfThread(YACS::BASES::Thread *thread)
1100 {
1101   /*_mutexForNbOfConcurrentThreads.lock();
1102   _groupOfAllThreadsCreated.remove(thread);
1103   delete thread;
1104   _mutexForNbOfConcurrentThreads.unlock();*/
1105 }
1106
1107
1108 //! must be used protected by _mutexForSchedulerUpdate!
1109
1110 void Executor::wakeUp()
1111 {
1112   DEBTRACE("Executor::wakeUp() " << _isWaitingEventsFromRunningTasks);
1113   if (_isWaitingEventsFromRunningTasks)
1114     {
1115       _isWaitingEventsFromRunningTasks = false;
1116       _condForNewTasksToPerform.notify_all();
1117     }
1118   else
1119     _numberOfEndedTasks++;
1120 }
1121
1122 //! number of running tasks
1123
1124 int Executor::getNbOfThreads()
1125 {
1126   int ret;
1127   YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForNbOfConcurrentThreads);
1128   _isRunningunderExternalControl=true;
1129   ret = _groupOfAllThreadsCreated.size();
1130   return ret;
1131 }
1132
1133 /*!
1134  * This thread is NOT supposed to be detached !
1135  */
1136 void *Executor::functionForTaskLoad(void *arg)
1137 {
1138   DEBTRACE("Executor::functionForTaskLoad(void *arg)");
1139   struct threadargs *args = (struct threadargs *) arg;
1140   Task *task=args->task;
1141   Scheduler *sched=args->sched;
1142   Executor *execInst=args->execInst;
1143   delete args;
1144   execInst->loadTask(task,execInst);// no throw of this method - all throw are catched !
1145   return 0;
1146 }
1147
1148 //! Function to perform execution of a task in a thread
1149 /*!
1150  *  \param arg  : 3 elements (a Task, a Scheduler, an Executor)
1151  *
1152  *  Calls Task::execute
1153  *
1154  *  Calls Task::finished when the task is finished
1155  *
1156  *  Calls (notify with event YACS::FINISH) Scheduler::notifyFrom when the task is finished
1157  *
1158  *  Calls Executor::wakeUp and Executor::notifyEndOfThread
1159  */
1160
1161 void *Executor::functionForTaskExecution(void *arg)
1162 {
1163   DEBTRACE("Executor::functionForTaskExecution(void *arg)");
1164
1165   struct threadargs *args = (struct threadargs *) arg;
1166   Task *task=args->task;
1167   Scheduler *sched=args->sched;
1168   Executor *execInst=args->execInst;
1169   delete args;
1170   execInst->traceExec(task, "state:"+Node::getStateName(task->getState()),ComputePlacement(task));
1171
1172   Thread::detach();
1173
1174   // Execute task
1175
1176   if(execInst->getDPLScopeSensitive())
1177     {
1178       Node *node(dynamic_cast<Node *>(task));
1179       ComposedNode *gfn(dynamic_cast<ComposedNode *>(sched));
1180       if(node!=0 && gfn!=0)
1181         node->applyDPLScope(gfn);
1182     }
1183
1184   YACS::Event ev=YACS::FINISH;
1185   try
1186     {
1187       execInst->traceExec(task, "start execution",ComputePlacement(task));
1188       task->execute();
1189       execInst->traceExec(task, "end execution OK",ComputePlacement(task));
1190     }
1191   catch(Exception& ex)
1192     {
1193       std::cerr << "YACS Exception during execute" << std::endl;
1194       std::cerr << ex.what() << std::endl;
1195       ev=YACS::ABORT;
1196       string message = "end execution ABORT, ";
1197       message += ex.what();
1198       execInst->traceExec(task, message,ComputePlacement(task));
1199     }
1200   catch(...) 
1201     {
1202       // Execution has failed
1203       std::cerr << "Execution has failed: unknown reason" << std::endl;
1204       ev=YACS::ABORT;
1205       execInst->traceExec(task, "end execution ABORT, unknown reason",ComputePlacement(task));
1206     }
1207
1208   // Disconnect task
1209   try
1210     {
1211       DEBTRACE("task->disconnectService()");
1212       task->disconnectService();
1213       execInst->traceExec(task, "disconnectService",ComputePlacement(task));
1214     }
1215   catch(...) 
1216     {
1217       // Disconnect has failed
1218       std::cerr << "disconnect has failed" << std::endl;
1219       ev=YACS::ABORT;
1220       execInst->traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
1221     }
1222   //
1223
1224   std::string placement(ComputePlacement(task));
1225
1226   // container management for HomogeneousPoolOfContainer
1227
1228   HomogeneousPoolContainer *contC(dynamic_cast<HomogeneousPoolContainer *>(task->getContainer()));
1229   if(contC)
1230     {
1231       std::lock_guard<std::mutex> alckCont(contC->getLocker());
1232       contC->release(task);
1233     }
1234
1235   DEBTRACE("End task->execute()");
1236   { // --- Critical section
1237     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&execInst->_mutexForSchedulerUpdate);
1238     try
1239       {
1240         if (ev == YACS::FINISH) task->finished();
1241         if (ev == YACS::ABORT)
1242           {
1243             execInst->_errorDetected = true;
1244             if (execInst->_stopOnErrorRequested)
1245               {
1246                 execInst->_execMode = YACS::STEPBYSTEP;
1247                 execInst->_isOKToEnd = true;
1248               }
1249             task->aborted();
1250           }
1251         execInst->traceExec(task, "state:"+Node::getStateName(task->getState()),placement);
1252         sched->notifyFrom(task,ev,execInst);
1253       }
1254     catch(Exception& ex)
1255       {
1256         //notify has failed : it is supposed to have set state
1257         //so no need to do anything
1258         std::cerr << "Error during notification" << std::endl;
1259         std::cerr << ex.what() << std::endl;
1260       }
1261     catch(...)
1262       {
1263         //notify has failed : it is supposed to have set state
1264         //so no need to do anything
1265         std::cerr << "Notification failed" << std::endl;
1266       }
1267     execInst->_numberOfRunningTasks--;
1268     execInst->_runningTasks.erase(task);
1269     DEBTRACE("_numberOfRunningTasks: " << execInst->_numberOfRunningTasks 
1270              << " _execMode: " << execInst->_execMode
1271              << " _executorState: " << execInst->_executorState);
1272     if ((execInst->_numberOfRunningTasks == 0) && (execInst->_execMode != YACS::CONTINUE)) // no more running tasks
1273       {
1274         if (execInst->_executorState == YACS::WAITINGTASKS)
1275           {
1276             execInst->_executorState = YACS::PAUSED;
1277             execInst->sendEvent("executor");
1278             execInst->_condForPilot.notify_all();
1279             if (execInst->_errorDetected &&
1280                 execInst->_stopOnErrorRequested &&
1281                 !execInst->_isRunningunderExternalControl)
1282               execInst->_condForStepByStep.notify_all(); // exec thread may be on waitResume
1283           }
1284       }
1285     DEBTRACE("before _semForMaxThreads.post " << execInst->_semThreadCnt);
1286     execInst->_semForMaxThreads.post();
1287     execInst->_semThreadCnt += 1;
1288     DEBTRACE("after _semForMaxThreads.post " << execInst->_semThreadCnt);
1289     if (execInst->_executorState != YACS::PAUSED)  execInst->wakeUp();
1290
1291   } // --- End of critical section (change state)
1292
1293   //execInst->notifyEndOfThread(0);
1294   Thread::exit(0);
1295   return 0;
1296 }
1297
1298 void Executor::traceExec(Task *task, const std::string& message, const std::string& placement)
1299 {
1300   string nodeName = _mainSched->getTaskName(task);
1301   Container *cont = task->getContainer();
1302   string containerName = "---";
1303   if (cont)
1304     containerName = cont->getName();
1305
1306   std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now();
1307   std::chrono::milliseconds millisec;
1308   millisec = std::chrono::duration_cast<std::chrono::milliseconds>(now -_start);
1309   double elapse = double(millisec.count()) / 1000.0;
1310   {
1311     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForTrace);
1312     _trace << elapse << " " << containerName << " " << placement << " " << nodeName << " " << message << endl;
1313     _trace << flush;
1314   }
1315 }
1316
1317 //! emit notification to all observers registered with  the dispatcher 
1318 /*!
1319  * The dispatcher is unique and can be obtained by getDispatcher()
1320  */
1321 void Executor::sendEvent(const std::string& event)
1322 {
1323   Dispatcher* disp=Dispatcher::getDispatcher();
1324   YASSERT(disp);
1325   YASSERT(_root);
1326   disp->dispatch(_root,event);
1327 }
1328
1329 struct HPCCompare
1330 {
1331   bool operator()(HomogeneousPoolContainer * lhs, HomogeneousPoolContainer * rhs) const
1332   {
1333     if(!lhs && !rhs)
1334       return false;
1335     if(!lhs)
1336       return true;
1337     if(!rhs)
1338       return false;
1339     return lhs->getNumberOfCoresPerWorker() < rhs->getNumberOfCoresPerWorker();
1340   }
1341 };
1342
1343 /*!
1344  * This method takes in input a list of tasks and selects from that lists a part of it considering only the containers.
1345  * If tasks have no container instance subclass of HomogeneousPoolContainer this method will let the \a tsks untouched.
1346  *
1347  * \param [in,out] tsks - list of tasks to be
1348  */
1349 void Executor::FilterTasksConsideringContainers(std::vector<Task *>& tsks)
1350 {
1351   std::map<HomogeneousPoolContainer *, std::vector<Task *>, HPCCompare > m;
1352   for(auto cur : tsks)
1353     {
1354       if(!cur)
1355         continue;
1356       Container *cont(cur->getContainer());
1357       if(!cont)
1358         {
1359           m[nullptr].push_back(cur);
1360           continue;
1361         }
1362       HomogeneousPoolContainer *contC(dynamic_cast<HomogeneousPoolContainer *>(cont));
1363       if(!contC)
1364         {
1365           m[nullptr].push_back(cur);
1366           continue;
1367         }
1368       m[contC].push_back(cur);
1369     }
1370   //
1371   std::vector<Task *> ret;
1372   for(auto it : m)
1373     {
1374       HomogeneousPoolContainer *curhpc(it.first);
1375       const std::vector<Task *>& curtsks(it.second);
1376       if(!curhpc)
1377         {
1378           ret.insert(ret.end(),curtsks.begin(),curtsks.end());
1379         }
1380       else
1381         {
1382           // start of critical section for container curhpc
1383           std::lock_guard<std::mutex> alckCont(curhpc->getLocker());
1384           std::vector<const Task *> vecOfTaskSharingSameHPContToBeRunSimutaneously;
1385           std::size_t sz(curhpc->getNumberOfFreePlace());
1386           std::vector<Task *>::const_iterator it2(curtsks.begin());
1387           for(std::size_t i=0;i<sz && it2!=curtsks.end();i++,it2++)
1388             {
1389               vecOfTaskSharingSameHPContToBeRunSimutaneously.push_back(*it2);
1390               ret.push_back(*it2);
1391             }
1392           curhpc->allocateFor(vecOfTaskSharingSameHPContToBeRunSimutaneously);
1393           //end of critical section
1394         }
1395     }
1396   //
1397   tsks=ret;
1398 }
1399
1400 std::string Executor::ComputePlacement(Task *zeTask)
1401 {
1402   std::string placement("---");
1403   if(!zeTask)
1404     return placement;
1405   if(zeTask->getContainer())
1406     placement=zeTask->getContainer()->getFullPlacementId(zeTask);
1407   return placement;
1408 }
1409
1410 ///////// NEW EXECUTOR ////////////////////////////////
1411 void Executor::loadTask(Task *task, const WorkloadManager::RunInfo& runInfo)
1412 {
1413   if(task->getState() != YACS::TOLOAD)
1414     return;
1415   traceExec(task, "state:TOLOAD", ComputePlacement(task));
1416   {//Critical section
1417     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1418     _mainSched->notifyFrom(task,YACS::START,this);
1419   }//End of critical section
1420   try
1421     {
1422       std::ostringstream container_name;
1423       container_name << runInfo.type.name << "-" << runInfo.index;
1424       task->imposeResource(runInfo.resource.name, container_name.str());
1425       traceExec(task, "load", ComputePlacement(task));
1426       task->load();
1427       traceExec(task, "initService", ComputePlacement(task));
1428       task->initService();
1429     }
1430   catch(Exception& ex)
1431     {
1432       std::cerr << ex.what() << std::endl;
1433       {//Critical section
1434         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1435         task->aborted();
1436         _mainSched->notifyFrom(task,YACS::ABORT, this);
1437         traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
1438       }//End of critical section
1439     }
1440   catch(...)
1441     {
1442       std::cerr << "Load failed" << std::endl;
1443       {//Critical section
1444         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1445         task->aborted();
1446         _mainSched->notifyFrom(task,YACS::ABORT, this);
1447         traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
1448       }//End of critical section
1449     }
1450 }
1451
1452 void Executor::beginTask(Task *task)
1453 {
1454   // --- Critical section
1455   YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1456   _numberOfRunningTasks++;
1457   _runningTasks.insert(task);
1458   // --- End of critical section
1459 }
1460
1461 void Executor::endTask(Task *task, YACS::Event ev)
1462 {
1463   YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1464   try
1465   {
1466     if (ev == YACS::FINISH) task->finished();
1467     if (ev == YACS::ABORT)
1468     {
1469       _errorDetected = true;
1470       if (_stopOnErrorRequested)
1471       {
1472         _execMode = YACS::STEPBYSTEP;
1473         _isOKToEnd = true;
1474       }
1475       task->aborted();
1476     }
1477     //traceExec(task, "state:"+Node::getStateName(task->getState()),placement);
1478     _mainSched->notifyFrom(task,ev,this);
1479   }
1480   catch(Exception& ex)
1481   {
1482     //notify has failed : it is supposed to have set state
1483     //so no need to do anything
1484     std::cerr << "Error during notification" << std::endl;
1485     std::cerr << ex.what() << std::endl;
1486   }
1487   catch(...)
1488   {
1489     //notify has failed : it is supposed to have set state
1490     //so no need to do anything
1491     std::cerr << "Notification failed" << std::endl;
1492   }
1493   _numberOfRunningTasks--;
1494   _runningTasks.erase(task);
1495   if ((_numberOfRunningTasks == 0) && (_execMode != YACS::CONTINUE)) // no more running tasks
1496     {
1497       if (_executorState == YACS::WAITINGTASKS)
1498         {
1499           _executorState = YACS::PAUSED;
1500           sendEvent("executor");
1501           _condForPilot.notify_all();
1502           if (_errorDetected &&
1503               _stopOnErrorRequested &&
1504               !_isRunningunderExternalControl)
1505             _condForStepByStep.notify_all(); // exec thread may be on waitResume
1506         }
1507     }
1508   if (_executorState != YACS::PAUSED)
1509     wakeUp();
1510 }
1511
1512 void Executor::failTask(Task *task, const std::string& message)
1513 {
1514   ElementaryNode* elemNode = dynamic_cast<ElementaryNode*>(task);
1515   if(elemNode != nullptr)
1516   {
1517     StateLoader(elemNode, YACS::ERROR);
1518     elemNode->setErrorDetails(message);
1519   }
1520   endTask(task, YACS::ABORT);
1521 }
1522
1523 YACS::Event  Executor::runTask(Task *task)
1524 {
1525   { // --- Critical section
1526     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1527     task->begin(); //change state to ACTIVATED
1528   }
1529   traceExec(task, "state:"+Node::getStateName(task->getState()),ComputePlacement(task));
1530
1531   if(getDPLScopeSensitive())
1532     {
1533       Node *node(dynamic_cast<Node *>(task));
1534       ComposedNode *gfn(dynamic_cast<ComposedNode *>(_mainSched));
1535       if(node!=0 && gfn!=0)
1536         node->applyDPLScope(gfn);
1537     }
1538
1539   YACS::Event ev=YACS::FINISH;
1540   try
1541     {
1542       traceExec(task, "start execution",ComputePlacement(task));
1543       task->execute();
1544       traceExec(task, "end execution OK",ComputePlacement(task));
1545     }
1546   catch(Exception& ex)
1547     {
1548       std::cerr << "YACS Exception during execute" << std::endl;
1549       std::cerr << ex.what() << std::endl;
1550       ev=YACS::ABORT;
1551       string message = "end execution ABORT, ";
1552       message += ex.what();
1553       traceExec(task, message,ComputePlacement(task));
1554     }
1555   catch(...)
1556     {
1557       // Execution has failed
1558       std::cerr << "Execution has failed: unknown reason" << std::endl;
1559       ev=YACS::ABORT;
1560       traceExec(task, "end execution ABORT, unknown reason",ComputePlacement(task));
1561     }
1562
1563   // Disconnect task
1564   try
1565     {
1566       DEBTRACE("task->disconnectService()");
1567       task->disconnectService();
1568       traceExec(task, "disconnectService",ComputePlacement(task));
1569     }
1570   catch(...)
1571     {
1572       // Disconnect has failed
1573       std::cerr << "disconnect has failed" << std::endl;
1574       ev=YACS::ABORT;
1575       traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
1576     }
1577   //
1578
1579   std::string placement(ComputePlacement(task));
1580
1581   // container management for HomogeneousPoolOfContainer
1582
1583   HomogeneousPoolContainer *contC(dynamic_cast<HomogeneousPoolContainer *>(task->getContainer()));
1584   if(contC)
1585   {
1586     std::lock_guard<std::mutex> alckCont(contC->getLocker());
1587     contC->release(task);
1588   }
1589
1590   return ev;
1591 }
1592
1593 void Executor::makeDatastreamConnections(Task *task)
1594 {
1595   YACS::StatesForNode state=task->getState();
1596   if(state != YACS::TOLOAD && state != YACS::TORECONNECT)
1597     return;
1598   try
1599     {
1600       task->connectService();
1601       traceExec(task, "connectService",ComputePlacement(task));
1602       {//Critical section
1603         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1604         task->connected();
1605       }//End of critical section
1606     }
1607   catch(Exception& ex)
1608     {
1609       std::cerr << ex.what() << std::endl;
1610       try
1611         {
1612           (task)->disconnectService();
1613           traceExec(task, "disconnectService",ComputePlacement(task));
1614         }
1615       catch(...)
1616         {
1617           // Disconnect has failed
1618           traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
1619         }
1620       {//Critical section
1621         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1622         task->aborted();
1623         _mainSched->notifyFrom(task,YACS::ABORT,this);
1624       }//End of critical section
1625     }
1626   catch(...)
1627     {
1628       std::cerr << "Problem in connectService" << std::endl;
1629       try
1630         {
1631           (task)->disconnectService();
1632           traceExec(task, "disconnectService",ComputePlacement(task));
1633         }
1634       catch(...)
1635         {
1636           // Disconnect has failed
1637           traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
1638         }
1639       {//Critical section
1640         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1641         task->aborted();
1642         _mainSched->notifyFrom(task,YACS::ABORT,this);
1643       }//End of critical section
1644     }
1645   if(task->getState() == YACS::ERROR)
1646     {
1647       //try to put all coupled tasks in error
1648       std::set<Task*> coupledSet;
1649       task->getCoupledTasks(coupledSet);
1650       for (std::set<Task*>::iterator it = coupledSet.begin(); it != coupledSet.end(); ++it)
1651         {
1652           Task* t=*it;
1653           if(t == task)continue;
1654           if(t->getState() == YACS::ERROR)continue;
1655           try
1656             {
1657               t->disconnectService();
1658               traceExec(t, "disconnectService",ComputePlacement(task));
1659             }
1660           catch(...)
1661             {
1662               // Disconnect has failed
1663               traceExec(t, "disconnectService failed, ABORT",ComputePlacement(task));
1664             }
1665           {//Critical section
1666             YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1667             t->aborted();
1668             _mainSched->notifyFrom(t,YACS::ABORT,this);
1669           }//End of critical section
1670           traceExec(t, "state:"+Node::getStateName(t->getState()),ComputePlacement(task));
1671         }
1672     }
1673   traceExec(task, "state:"+Node::getStateName(task->getState()),ComputePlacement(task));
1674 }
1675
1676 void Executor::runWlm(Scheduler *graph,int debug, bool fromScratch)
1677 {
1678   DEBTRACE("Executor::runWlm debug: "<< graph->getName() <<" "<< debug<<" fromScratch: "<<fromScratch);
1679   { // --- Critical section
1680     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1681     _mainSched = graph;
1682     _root = dynamic_cast<ComposedNode *>(_mainSched);
1683     if (!_root) throw Exception("Executor::Run, Internal Error!");
1684     _executorState = YACS::NOTYETINITIALIZED;
1685     sendEvent("executor");
1686     _toContinue=true;
1687     _isOKToEnd = false;
1688     _errorDetected = false;
1689     _isWaitingEventsFromRunningTasks = false;
1690     _numberOfRunningTasks = 0;
1691     _runningTasks.clear();
1692     _numberOfEndedTasks = 0;
1693     string tracefile = "traceExec_";
1694     tracefile += _mainSched->getName();
1695     _trace.open(tracefile.c_str());
1696     _start = std::chrono::steady_clock::now();
1697   } // --- End of critical section
1698
1699   if (debug > 1) _displayDot(graph);
1700
1701   if (fromScratch)
1702     {
1703       try
1704         {
1705           graph->init();
1706           graph->exUpdateState();
1707         }
1708       catch(Exception& ex)
1709         {
1710           DEBTRACE("exception: "<< (ex.what()));
1711           _executorState = YACS::FINISHED;
1712           sendEvent("executor");
1713           throw;
1714         }
1715     }
1716   _executorState = YACS::INITIALISED;
1717   sendEvent("executor");
1718
1719   if (debug > 1) _displayDot(graph);
1720
1721   bool isMore;
1722   int problemCount=0;
1723   int numberAllTasks;
1724
1725   _executorState = YACS::RUNNING;
1726   sendEvent("executor");
1727
1728   WorkloadManager::DefaultAlgorithm algo;
1729   WorkloadManager::WorkloadManager wlm(algo);
1730   WlmTask::loadResources(wlm);
1731   wlm.start();
1732
1733   while (_toContinue)
1734     {
1735       DEBTRACE("--- executor main loop");
1736       sleepWhileNoEventsFromAnyRunningTask();
1737       DEBTRACE("--- events...");
1738       if (debug > 2) _displayDot(graph);
1739       { // --- Critical section
1740         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1741         std::vector<Task *> readyTasks=graph->getNextTasks(isMore);
1742         graph->selectRunnableTasks(readyTasks);
1743         _tasks.clear();
1744         for(Task * t : readyTasks)
1745           if(_runningTasks.find(t) == _runningTasks.end())
1746             _tasks.push_back(t);
1747         // TODO: to be removed
1748         FilterTasksConsideringContainers(_tasks);
1749         numberAllTasks=_numberOfRunningTasks+_tasks.size();
1750       } // --- End of critical section
1751       if (debug > 2) _displayDot(graph);
1752       DEBTRACE("--- events...");
1753       if (_executorState == YACS::RUNNING)
1754       {
1755         if (checkBreakPoints()) break; // end of thread requested, OK to exit at once;
1756         for(Task * task : _tasks)
1757         {
1758           beginTask(task);
1759           WlmTask* newTask = new WlmTask(*this, task);
1760           wlm.addTask(newTask);
1761         }
1762       }
1763       if (debug > 1) _displayDot(graph);
1764       { // --- Critical section
1765         DEBTRACE("---");
1766         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1767         //It is possible that the graph is finished but it remains running tasks (it's an error but we must take it into account)
1768         _toContinue = !graph->isFinished();
1769
1770         DEBTRACE("_numberOfRunningTasks: " << _numberOfRunningTasks);
1771         DEBTRACE("_numberOfEndedTasks: " << _numberOfEndedTasks);
1772         DEBTRACE("_toContinue: " << _toContinue);
1773         if(_toContinue && numberAllTasks==0)
1774         {
1775           //Problem : no running tasks and no task to launch ??
1776           problemCount++;
1777           std::cerr << "Problem in Executor : no running tasks and no task to launch ?? problemCount=" << problemCount << std::endl;
1778           //Pause to give a chance to interrupt
1779           usleep(1000);
1780           if(problemCount > 25)
1781           {
1782             // Too much problems encountered : stop execution
1783             _toContinue=false;
1784           }
1785         }
1786
1787         if (! _toContinue)
1788           {
1789             _executorState = YACS::FINISHED;
1790             sendEvent("executor");
1791             _condForPilot.notify_all();
1792           }
1793       } // --- End of critical section
1794       if (debug > 0) _displayDot(graph);
1795       DEBTRACE("_toContinue: " << _toContinue);
1796     }
1797
1798   wlm.stop();
1799   DEBTRACE("End of main Loop");
1800
1801   { // --- Critical section
1802     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1803     if ( _toContinue) // --- break while(): request to stop detected on checkBreakPoints()
1804       {
1805         DEBTRACE("stop requested: End soon");
1806         _executorState = YACS::STOPPED;
1807         _toContinue = false;
1808         sendEvent("executor");
1809       }
1810   } // --- End of critical section
1811   if ( _dumpOnErrorRequested && _errorDetected)
1812     {
1813       saveState(_dumpErrorFile);
1814     }
1815   {
1816     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForTrace);
1817     _trace.close();
1818   }
1819 }
1820
1821 void Executor::RunW(Scheduler *graph,int debug, bool fromScratch)
1822 {
1823   std::string str_value = graph->getProperty("executor");
1824   if(str_value == "WorkloadManager"
1825      || str_value == "WORKLOADMANAGER"
1826      || str_value == "workloadmanager"
1827      || str_value == "WorkLoadManager")
1828     runWlm(graph, debug, fromScratch);
1829   else
1830     RunB(graph, debug, fromScratch);
1831 }