Salome HOME
23d888f4eced12c02e5f4a436a313427fcf4f2af
[modules/yacs.git] / src / engine / Executor.cxx
1 // Copyright (C) 2006-2024  CEA, EDF
2 //
3 // This library is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU Lesser General Public
5 // License as published by the Free Software Foundation; either
6 // version 2.1 of the License, or (at your option) any later version.
7 //
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
11 // Lesser General Public License for more details.
12 //
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
16 //
17 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
18 //
19
20 #include "Executor.hxx"
21 #include "Task.hxx"
22 #include "AutoLocker.hxx"
23 #include "Scheduler.hxx"
24 #include "Dispatcher.hxx"
25 #include "Container.hxx"
26 #include "HomogeneousPoolContainer.hxx"
27 #include "ComponentInstance.hxx"
28
29 #include "VisitorSaveState.hxx"
30 #include "ServiceNode.hxx"
31 #include "ComposedNode.hxx"
32
33 #include "WlmTask.hxx"
34 #include "workloadmanager/WorkloadManager.hxx"
35 #include "workloadmanager/DefaultAlgorithm.hxx"
36
37 #include <iostream>
38 #include <fstream>
39 #include <sys/stat.h>
40 #ifndef WIN32
41 #include <sys/time.h>
42 #include <unistd.h>
43 #endif
44
45 #include <cstdlib>
46 #include <algorithm>
47
48 #ifdef WIN32
49 #define usleep(A) _sleep(A/1000)
50 #if !defined(S_ISCHR) || !defined(S_ISREG)
51 #  ifndef S_IFMT
52 #    ifdef _S_IFMT
53 #      define S_IFMT _S_IFMT
54 #      define S_IFCHR _S_IFCHR
55 #      define S_IFREG _S_IFREG
56 #    else
57 #    ifdef __S_IFMT
58 #      define S_IFMT __S_IFMT
59 #      define S_IFCHR __S_IFCHR
60 #      define S_IFREG __S_IFREG
61 #    endif
62 #    endif
63 #  endif
64 #  define S_ISCHR(mode) (((mode) & S_IFMT) == S_IFCHR)
65 #  define S_ISREG(mode) (((mode) & S_IFMT) == S_IFREG)
66 #endif
67 #endif
68
69 using namespace YACS::ENGINE;
70 using namespace std;
71
72 using YACS::BASES::Mutex;
73 using YACS::BASES::Thread;
74 using YACS::BASES::Semaphore;
75
76 //#define _DEVDEBUG_
77 #include "YacsTrace.hxx"
78
79 int Executor::_maxThreads(1000);
80 size_t Executor::_threadStackSize(262144); // Default thread stack size is 256 kB == 2**18 because threads launched by YACS are lightweight
81
82 Executor::Executor():_mainSched(NULL),_isWaitingEventsFromRunningTasks(false),_nbOfConcurrentThreads(0), _semForMaxThreads(_maxThreads),_keepGoingOnFail(false),_DPLScopeSensitive(false)
83 {
84   _root=0;
85   _toContinue = true;
86   _isOKToEnd = false;
87   _stopOnErrorRequested = false;
88   _dumpOnErrorRequested = false;
89   _errorDetected = false;
90   _isRunningunderExternalControl=false;
91   _executorState = YACS::NOTYETINITIALIZED;
92   _execMode = YACS::CONTINUE;
93   _semThreadCnt = _maxThreads;
94   _numberOfRunningTasks = 0;
95   _numberOfEndedTasks = 0;
96   DEBTRACE("Executor initialized with max threads = " << _maxThreads);
97 }
98
99 Executor::~Executor()
100 {
101 }
102
103 //! Execute a graph waiting for completion
104 /*!
105  *  \param graph : schema to execute
106  *  \param debug : display the graph with dot if debug == 1
107  *  \param fromScratch : if true the graph is reinitialized
108  *
109  *  Calls Scheduler::getNextTasks and Scheduler::selectRunnableTasks to select tasks to execute
110  *  
111  *  Calls Executor::launchTask to execute a selected Task.
112  *
113  *  Completion when graph is finished (Scheduler::isFinished)
114  */
115
116 void Executor::RunA(Scheduler *graph,int debug, bool fromScratch)
117 {
118   DEBTRACE("Executor::RunW debug: " << debug << " fromScratch: " << fromScratch);
119   _mainSched=graph;
120   _root = dynamic_cast<ComposedNode *>(_mainSched);
121   if (!_root) throw Exception("Executor::Run, Internal Error!");
122   bool isMore;
123   int i=0;
124   if(debug>1)_displayDot(graph);
125   if (fromScratch)
126     {
127       graph->init();
128       graph->exUpdateState();
129     }
130   if(debug>1)_displayDot(graph);
131   vector<Task *> tasks;
132   vector<Task *>::iterator iter;
133   _toContinue=true;
134   _execMode = YACS::CONTINUE;
135   _isWaitingEventsFromRunningTasks = false;
136   _numberOfRunningTasks = 0;
137   _runningTasks.clear();
138   _numberOfEndedTasks=0;
139   while(_toContinue)
140     {
141       sleepWhileNoEventsFromAnyRunningTask();
142
143       if(debug>2)_displayDot(graph);
144
145       {//Critical section
146         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
147         tasks=graph->getNextTasks(isMore);
148         graph->selectRunnableTasks(tasks);
149       }//End of critical section
150
151       if(debug>2)_displayDot(graph);
152
153       for(iter=tasks.begin();iter!=tasks.end();iter++)
154         loadTask(*iter,this);
155
156       if(debug>1)_displayDot(graph);
157
158       launchTasks(tasks);
159
160       if(debug>1)_displayDot(graph);
161
162       {//Critical section
163         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
164         _toContinue=!graph->isFinished();
165       }//End of critical section
166       DEBTRACE("_toContinue: " << _toContinue);
167
168       if(debug>0)_displayDot(graph);
169
170       i++;
171     }
172 }
173
174 //! Execute a graph with breakpoints or step by step
175 /*!
176  *  To be launch in a thread (main thread controls the progression).
177  *  \param graph : schema to execute
178  *  \param debug : display the graph with dot if debug >0
179  *  \param fromScratch : if false, state from a previous partial exection is already loaded
180  *
181  *  Calls Scheduler::getNextTasks and Scheduler::selectRunnableTasks to select tasks to execute
182  *  
183  *  Calls Executor::checkBreakPoints to verify if a pause is requested 
184  * 
185  *  Calls Executor::launchTask to execute a selected Task
186  *
187  *  Completion when graph is finished (Scheduler::isFinished)
188  *
189  *  States of execution:
190  *  - YACS::NOTYETINITIALIZED
191  *  - YACS::INITIALISED
192  *  - YACS::RUNNING            (to next breakpoint or step)
193  *  - YACS::WAITINGTASKS       (a breakpoint or step as been reached, but there are still running tasks)
194  *  - YACS::PAUSED             (a breakpoint or step as been reached, no more running tasks)
195  *  - YACS::FINISHED           (no more ready tasks, nore running tasks)
196  *  - YACS::STOPPED            (stopped by user before end)
197  *
198  *  Modes of Execution:
199  *  - YACS::CONTINUE           (normal run without breakpoints)
200  *  - YACS::STEPBYSTEP         (pause at each loop)
201  *  - YACS::STOPBEFORENODES    (pause when a node is reached)
202  *
203  *  A breakpoint is defined by a node name. The breakpoint is reached when the node becomes ready.
204  *  Step by Step means execution node by node or group of node by group of nodes.
205  *  At a given step, the user decides to launch all the ready nodes or only a subset
206  *  (Caution: some nodes must run in parallel). 
207  *  The next event (end of task) may give a new set of ready nodes, and define a new step.
208  *
209  *  The graph execution may be controled by a pilot which sends requests. Requests are asynchronous.
210  *  Requests are taken into account only on certain states, otherwise return the status IgnoredRequest.
211  *  - Executor::getCurrentExecMode
212  *  - Executor::getExecutorState
213  *  - Executor::setExecMode             : change the execution mode for next loop
214  *  - Executor::setListOfBreakPoints    : must be set before setting YACS::STOPBEFORENODES
215  *  - Executor::getTasksToLoad          : when paused or waiting tasks, get the list of next tasks
216  *  - Executor::setStepsToExecute       : define a subset of the list given by Executor::getTasksToLoad
217  *  - Executor::resumeCurrentBreakPoint : when paused or waiting tasks, resumes execution
218  *  - Executor::isNotFinished
219  *  - Executor::stopExecution           : stop execution asap, i.e. set STEPBYSTEP and wait PAUSED
220  *  - Executor::saveState               : dump the current state of execution in an xml file
221  *  - Executor::loadState               : Not yet implemented
222  *  - Executor::getNbOfThreads
223  *  - Executor::displayDot
224  *  - Executor::setStopOnError          : ask to stop execution if a node is found in ERROR state
225  *
226  *  If the pilot wants to wait the state YACS::PAUSED or YACS::WAITINGTASKS, synchronisation is obtained with:
227  *  - Executor::waitPause
228  *
229  *  TO BE VALIDATED:
230  *  - Pilot may connect to executor during execution, or deconnect.
231  *  - Several Pilots may be connected at the same time (for observation...)
232  * 
233  */
234
235 void Executor::RunB(Scheduler *graph,int debug, bool fromScratch)
236 {
237   DEBTRACE("Executor::RunB debug: "<< graph->getName() <<" "<< debug<<" fromScratch: "<<fromScratch);
238
239   { // --- Critical section
240     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
241     _mainSched = graph;
242     _root = dynamic_cast<ComposedNode *>(_mainSched);
243     if (!_root) throw Exception("Executor::Run, Internal Error!");
244     _executorState = YACS::NOTYETINITIALIZED;
245     sendEvent("executor");
246     _toContinue=true;
247     _isOKToEnd = false;
248     _errorDetected = false;
249     _isWaitingEventsFromRunningTasks = false;
250     _numberOfRunningTasks = 0;
251     _runningTasks.clear();
252     _numberOfEndedTasks = 0;
253     string tracefile = "traceExec_";
254     tracefile += _mainSched->getName();
255     _trace.open(tracefile.c_str());
256     _start = std::chrono::steady_clock::now();
257   } // --- End of critical section
258
259   if (debug > 1) _displayDot(graph);
260
261   if (fromScratch)
262     {
263       try
264         {
265           graph->init();
266           graph->exUpdateState();
267         }
268       catch(Exception& ex)
269         {
270           DEBTRACE("exception: "<< (ex.what()));
271           _executorState = YACS::FINISHED;
272           sendEvent("executor");
273           throw;
274         }
275     }
276   _executorState = YACS::INITIALISED;
277   sendEvent("executor");
278
279   if (debug > 1) _displayDot(graph);
280
281   vector<Task *>::iterator iter;
282   bool isMore;
283   int problemCount=0;
284   int numberAllTasks;
285
286   _executorState = YACS::RUNNING;
287   sendEvent("executor");
288   while (_toContinue)
289     {
290       DEBTRACE("--- executor main loop");
291       sleepWhileNoEventsFromAnyRunningTask();
292       DEBTRACE("--- events...");
293       if (debug > 2) _displayDot(graph);
294       { // --- Critical section
295         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
296         std::vector<Task *> tasks = graph->getNextTasks(isMore);
297         graph->selectRunnableTasks(tasks);
298         filterTasksConsideringContainers(tasks);
299         _tasks = tasks;
300         numberAllTasks=_numberOfRunningTasks+_tasks.size();
301       } // --- End of critical section
302       if (debug > 2) _displayDot(graph);
303       if (_executorState == YACS::RUNNING)
304         {
305           if (checkBreakPoints()) break; // end of thread requested, OK to exit at once;
306           if (debug > 0) _displayDot(graph);
307           DEBTRACE("---");
308           loadParallelTasks(_tasks,this);
309           if (debug > 1) _displayDot(graph);
310           DEBTRACE("---");
311           launchTasks(_tasks);
312           DEBTRACE("---");
313         }
314       if (debug > 1) _displayDot(graph);
315       { // --- Critical section
316         DEBTRACE("---");
317         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
318         //It is possible that the graph is finished but it remains running tasks (it's an error but we must take it into account)
319         if(_numberOfRunningTasks == 0)
320           _toContinue = !graph->isFinished();
321
322         DEBTRACE("_numberOfRunningTasks: " << _numberOfRunningTasks);
323         DEBTRACE("_numberOfEndedTasks: " << _numberOfEndedTasks);
324         DEBTRACE("_toContinue: " << _toContinue);
325         if(_toContinue && numberAllTasks==0)
326           {
327             //Problem : no running tasks and no task to launch ??
328             problemCount++;
329             std::cerr << "Problem in Executor : no running tasks and no task to launch ?? problemCount=" << problemCount << std::endl;
330             //Pause to give a chance to interrupt
331             usleep(1000);
332             if(problemCount > 25)
333               {
334                 // Too much problems encountered : stop execution
335                 _toContinue=false;
336               }
337           }
338
339         if (! _toContinue)
340           {
341             _executorState = YACS::FINISHED;
342             sendEvent("executor");
343             _condForPilot.notify_all();
344           }
345       } // --- End of critical section
346       if (debug > 0) _displayDot(graph);
347       DEBTRACE("_toContinue: " << _toContinue);
348     }
349
350   DEBTRACE("End of main Loop");
351
352   { // --- Critical section
353     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
354     if ( _toContinue) // --- break while(): request to stop detected on checkBreakPoints()
355       {
356         DEBTRACE("stop requested: End soon");
357         _executorState = YACS::STOPPED;
358         _toContinue = false;
359         sendEvent("executor");
360       }
361   } // --- End of critical section
362   if ( _dumpOnErrorRequested && _errorDetected)
363     {
364       saveState(_dumpErrorFile);
365     }
366   {
367     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForTrace);
368     _trace.close();
369   }
370   DEBTRACE("End of RunB thread");  
371 }
372
373 YACS::ExecutionMode Executor::getCurrentExecMode()
374 {
375   _isRunningunderExternalControl=true;
376   return _execMode;
377 }
378
379
380 YACS::ExecutorState Executor::getExecutorState()
381 {
382   _isRunningunderExternalControl=true;
383   return _executorState;
384 }
385
386
387 bool Executor::isNotFinished()
388 {
389   _isRunningunderExternalControl=true;
390   return _toContinue;
391 }
392
393 //! ask to stop execution on the first node found in error
394 /*!
395  * \param dumpRequested   produce a state dump when an error is found
396  * \param xmlFile         name of file used for state dump
397  */
398
399 void Executor::setStopOnError(bool dumpRequested, std::string xmlFile)
400 {
401   { // --- Critical section
402     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
403     _dumpErrorFile=xmlFile;
404     _stopOnErrorRequested=true;
405     _dumpOnErrorRequested = dumpRequested;
406     if (dumpRequested && xmlFile.empty())
407       throw YACS::Exception("dump on error requested and no filename given for dump");
408     DEBTRACE("_dumpErrorFile " << _dumpErrorFile << " " << _dumpOnErrorRequested);
409   } // --- End of critical section
410 }
411
412 //! ask to do not stop execution on nodes found in error
413 /*!
414  */
415
416 void Executor::unsetStopOnError()
417 {
418   { // --- Critical section
419     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
420     _stopOnErrorRequested=false;
421   } // --- End of critical section
422 }
423
424 //! Dynamically set the current mode of execution
425 /*!
426  * The mode can be Continue, step by step, or stop before execution of a node
427  * defined in a list of breakpoints.
428  */
429
430 void Executor::setExecMode(YACS::ExecutionMode mode)
431 {
432   DEBTRACE("Executor::setExecMode(YACS::ExecutionMode mode) " << mode);
433   { // --- Critical section
434     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
435     _isRunningunderExternalControl=true;
436     _execMode = mode;
437   } // --- End of critical section
438 }
439
440 //! wake up executor when in pause
441 /*!
442  * When Executor is in state paused or waiting for task completion, the thread
443  * running loop RunB waits on condition _condForStepByStep.
444  * Thread RunB is waken up.
445  * \return true when actually wakes up executor
446  */
447
448 bool Executor::resumeCurrentBreakPoint()
449 {
450   DEBTRACE("Executor::resumeCurrentBreakPoint()");
451   bool ret = false;
452   //bool doDump = false;
453   { // --- Critical section
454     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
455     _isRunningunderExternalControl=true;
456     DEBTRACE("_executorState: " << _executorState);
457     switch (_executorState)
458       {
459       case YACS::WAITINGTASKS:
460       case YACS::PAUSED:
461         {
462           _condForStepByStep.notify_all();
463           _executorState = YACS::RUNNING;
464           sendEvent("executor");
465           ret = true;
466           //if (_dumpOnErrorRequested && _errorDetected) doDump =true;
467           break;
468         }
469       case YACS::FINISHED:
470       case YACS::STOPPED:
471         {
472           //if (_dumpOnErrorRequested && _errorDetected) doDump =true;
473           DEBTRACE("Graph Execution finished or stopped !");
474           break;
475         }
476       default :
477         {
478           // debug: no easy way to verify if main loop is acutally waiting on condition
479         }
480       }
481     DEBTRACE("---");
482     //if (doDump) saveState(_dumpErrorFile);
483   } // --- End of critical section
484   return ret;
485 }
486
487
488 //! define a list of nodes names as breakpoints in the graph
489
490
491 void Executor::setListOfBreakPoints(std::list<std::string> listOfBreakPoints)
492 {
493   DEBTRACE("Executor::setListOfBreakPoints(std::list<std::string> listOfBreakPoints)");
494   { // --- Critical section
495     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
496     _isRunningunderExternalControl=true;
497     _listOfBreakPoints = listOfBreakPoints;
498   } // --- End of critical section
499 }
500
501
502 //! Get the list of tasks to load, to define a subset to execute in step by step mode
503 /*!
504  *  If the executor is not in mode YACS::WAITINGTASKS nor YACS::PAUSED, the list is empty.
505  *  Use Executor::waitPause to wait.
506  */
507 std::list<std::string> Executor::getTasksToLoad()
508 {
509   DEBTRACE("Executor::getTasksToLoad()");
510   list<string> listOfNodesToLoad;
511   listOfNodesToLoad.clear();
512   { // --- Critical section
513     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
514     _isRunningunderExternalControl=true;
515     switch (_executorState)
516       {
517       case YACS::WAITINGTASKS:
518       case YACS::PAUSED:
519         {
520           listOfNodesToLoad = _listOfTasksToLoad;
521           break;
522         }
523       case YACS::NOTYETINITIALIZED:
524       case YACS::INITIALISED:
525       case YACS::RUNNING:
526       case YACS::FINISHED:
527       case YACS::STOPPED:
528       default:
529         {
530           break;
531         }
532       }
533   } // --- End of critical section
534   return listOfNodesToLoad;
535 }
536
537
538 //! Define a subset of task to execute in step by step mode
539 /*!
540  * Behaviour is unpredictable if the list is not a subset of the list given by Executor::getTasksToLoad
541  * in the current step.
542  * If some nodes must run in parallel, they must stay together in the list.
543  */
544
545 bool Executor::setStepsToExecute(std::list<std::string> listToExecute)
546 {
547   DEBTRACE("Executor::setStepsToExecute(std::list<std::string> listToExecute)");
548   bool ret = true;
549   vector<Task *>::iterator iter;
550   vector<Task *> restrictedTasks;
551   { // --- Critical section
552     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
553     _isRunningunderExternalControl=true;
554     switch (_executorState)
555       {
556       case YACS::WAITINGTASKS:
557       case YACS::PAUSED:
558         {
559           for (iter=_tasksSave.begin(); iter!=_tasksSave.end(); iter++)
560             {
561               string readyNode = _mainSched->getTaskName(*iter);
562               if (find(listToExecute.begin(), listToExecute.end(), readyNode)
563                   != listToExecute.end())
564                 {
565                   restrictedTasks.push_back(*iter);
566                   DEBTRACE("node to execute " << readyNode);
567                 }
568             }
569           _tasks.clear();
570           for (iter=restrictedTasks.begin(); iter!=restrictedTasks.end(); iter++)
571             {
572               _tasks.push_back(*iter);
573             }
574           break;
575         }
576       case YACS::NOTYETINITIALIZED:
577       case YACS::INITIALISED:
578       case YACS::RUNNING:
579       case YACS::FINISHED:
580       case YACS::STOPPED:
581       default:
582         {
583           break;
584         }
585       }
586     } // --- End of critical section
587
588   _tasks.clear();
589   for (iter=restrictedTasks.begin(); iter!=restrictedTasks.end(); iter++)
590     {
591       _tasks.push_back(*iter);
592     }
593   for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
594     {
595       string readyNode = _mainSched->getTaskName(*iter);
596       DEBTRACE("selected node to execute " << readyNode);
597     }
598
599   return ret;
600 }
601
602 //! suspend pilot execution until Executor is in pause or waiting tasks completion mode.
603 /*!
604  *  Do nothing if execution is finished or in pause.
605  *  Wait first step if Executor is running or in initialization.
606  */
607
608 void Executor::waitPause()
609 {
610   DEBTRACE("Executor::waitPause()" << _executorState);
611   { // --- Critical section
612     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
613     _isRunningunderExternalControl=true;
614     switch (_executorState)
615       {
616       default:
617       case YACS::STOPPED:
618       case YACS::FINISHED:
619       case YACS::WAITINGTASKS:
620       case YACS::PAUSED:
621         {
622           break;
623         }
624       case YACS::NOTYETINITIALIZED:
625       case YACS::INITIALISED:
626       case YACS::RUNNING:
627         {
628           _condForPilot.wait(_mutexForSchedulerUpdate); // wait until executor is PAUSED or WAITINGTASKS
629           break;
630         }
631       }
632   } // --- End of critical section
633   DEBTRACE("---");
634 }
635
636 /*!
637  * This method can be called at any time simultaneously during a RunB call.
638  * This method will wait until the executor is locked in a consistent state of a running graph.
639  *
640  * This method is expected to be called in association with resume method.
641  * The returned parameter is expected to be transfered to resume method.
642  */
643 bool Executor::suspendASAP()
644 {
645   // no AutoLocker here. It's not a bug.
646   _mutexForSchedulerUpdate.lock();
647   if(!_toContinue && _executorState==YACS::FINISHED)
648     {// execution is finished
649       _mutexForSchedulerUpdate.unLock();
650       return false;// the executor is no more running
651     }
652   //general case. Leave method with locker in locked status
653   return true;
654 }
655
656 /*!
657  * This method is expected to be called in association with suspendASAP method.
658  * Expected to be called just after suspendASAP with output of resume as input parameter
659  */
660 void Executor::resume(bool suspended)
661 {
662   if(suspended)
663     _mutexForSchedulerUpdate.unLock();
664 }
665
666 //! stops the execution as soon as possible 
667
668 void Executor::stopExecution()
669 {
670   setExecMode(YACS::STEPBYSTEP);
671   //waitPause();
672   _isOKToEnd = true;
673   resumeCurrentBreakPoint();
674 }
675
676 //! save the current state of execution in an xml file
677
678 bool Executor::saveState(const std::string& xmlFile)
679 {
680   DEBTRACE("Executor::saveState() in " << xmlFile);
681   bool result = false;
682   try {
683     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
684     YACS::ENGINE::VisitorSaveState vst(_root);
685     vst.openFileDump(xmlFile.c_str());
686     _root->accept(&vst);
687     vst.closeFileDump();
688     result = true;
689   }
690   catch(Exception& ex) {
691     std::cerr << ex.what() << std::endl;
692   }
693   return result;
694 }
695
696 //! not yet implemented
697
698 bool Executor::loadState()
699 {
700   DEBTRACE("Executor::loadState()");
701   _isRunningunderExternalControl=true;
702   return true;
703 }
704
705
706 static int isfile(const char *filename) 
707 {
708   struct stat buf;
709   if (stat(filename, &buf) != 0)
710     return 0;
711   if (!S_ISREG(buf.st_mode))
712     return 0;
713   return 1;
714 }
715
716 //! Display the graph state as a dot display, public method
717
718 void Executor::displayDot(Scheduler *graph)
719 {
720   _isRunningunderExternalControl=true;
721   _displayDot(graph);
722 }
723
724 //! Display the graph state as a dot display
725 /*!
726  *  \param graph  : the node to display
727  */
728
729 void Executor::_displayDot(Scheduler *graph)
730 {
731    std::ofstream g("titi");
732    ((ComposedNode*)graph)->writeDot(g);
733    g.close();
734    const char displayScript[]="display.sh";
735    if(isfile(displayScript))
736      system("sh display.sh");
737    else
738      system("dot -Tpng titi|display -delay 5");
739 }
740
741 //! Wait reactivation in modes Step By step or with BreakPoints
742 /*!
743  *  Check mode of execution (set by main thread):
744  *  - YACS::CONTINUE        : the graph execution continues.
745  *  - YACS::STEPBYSTEP      : wait on condition (pilot thread, Executor::resumeCurrentBreakPoint)
746  *  - YACS::STOPBEFORENODES : if there are ready nodes in a list of breakpoints,
747  *                            wait on condition (pilot thread, Executor::resumeCurrentBreakPoint)
748  *                            else continue the graph execution.
749  *  \return true if end of executor thread is requested
750  */
751
752 bool Executor::checkBreakPoints()
753 {
754   DEBTRACE("Executor::checkBreakPoints()");
755   vector<Task *>::iterator iter;
756   bool endRequested = false;
757
758   switch (_execMode)
759     {
760     case YACS::CONTINUE:
761       {
762         break;
763       }
764     case YACS::STOPBEFORENODES:
765       {
766         bool stop = false;
767         { // --- Critical section
768           YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
769           _tasksSave = _tasks;
770           for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
771             {
772               string nodeToLoad = _mainSched->getTaskName(*iter);
773               if (find(_listOfBreakPoints.begin(), _listOfBreakPoints.end(), nodeToLoad)
774                   != _listOfBreakPoints.end())
775                 {
776                   stop = true;
777                   break;
778                 }
779             }
780           if (stop)
781             {
782               _listOfTasksToLoad.clear();
783               for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
784                 {
785                   string nodeToLoad = _mainSched->getTaskName(*iter);
786                   _listOfTasksToLoad.push_back(nodeToLoad);
787                 }
788               if (getNbOfThreads())
789                 _executorState = YACS::WAITINGTASKS; // will be paused after completion of running tasks
790               else
791                 _executorState = YACS::PAUSED;
792               sendEvent("executor");
793               _condForPilot.notify_all();
794             }
795           if (stop && !_isOKToEnd) waitResume(); // wait until pilot calls resumeCurrentBreakPoint(), mutex released during wait 
796           if (_isOKToEnd) endRequested = true;
797         } // --- End of critical section
798           if (stop) DEBTRACE("wake up from waitResume");
799         break;
800       }
801     default:
802     case YACS::STEPBYSTEP:
803       {
804         { // --- Critical section
805           YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
806           _tasksSave = _tasks;
807           _listOfTasksToLoad.clear();
808           for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
809             {
810               string nodeToLoad = _mainSched->getTaskName(*iter);
811               _listOfTasksToLoad.push_back(nodeToLoad);
812             }
813           if (getNbOfThreads())
814             _executorState = YACS::WAITINGTASKS; // will be paused after completion of running tasks
815           else
816             _executorState = YACS::PAUSED;
817           sendEvent("executor");
818           _condForPilot.notify_all();
819           if (!_isOKToEnd)
820             waitResume(); // wait until pilot calls resumeCurrentBreakPoint(), mutex released during wait
821                           // or, if no pilot, wait until no more running tasks (stop on error)
822           if (_isOKToEnd) endRequested = true;
823         } // --- End of critical section
824         DEBTRACE("wake up from waitResume");
825         break;
826       }
827     }
828   DEBTRACE("endRequested: " << endRequested);
829   return endRequested;
830 }
831
832
833 //! in modes Step By step or with BreakPoint, wait until pilot resumes the execution
834 /*!
835  *  With the condition Mutex, the mutex is released atomically during the wait.
836  *  Pilot calls Executor::resumeCurrentBreakPoint to resume execution.
837  *  Must be called while mutex is locked.
838  */
839
840 void Executor::waitResume()
841 {
842   DEBTRACE("Executor::waitResume()");
843   _condForStepByStep.wait(_mutexForSchedulerUpdate); // wait until pilot calls resumeCurrentBreakPoint()
844   DEBTRACE("---");
845 }
846
847
848 //! Perform loading of a Task.
849 /*!
850  *  \param task  : Task to load
851  */
852
853 void Executor::loadTask(Task *task, const Executor *execInst)
854 {
855   DEBTRACE("Executor::loadTask(Task *task)");
856   if(task->getState() != YACS::TOLOAD)
857     return;
858   traceExec(task, "state:TOLOAD", ComputePlacement(task));
859   {//Critical section
860     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
861     _mainSched->notifyFrom(task,YACS::START,execInst);
862   }//End of critical section
863   try
864     {
865       traceExec(task, "load", ComputePlacement(task));
866       task->load();
867       traceExec(task, "initService", ComputePlacement(task));
868       task->initService();
869     }
870   catch(Exception& ex) 
871     {
872       std::cerr << ex.what() << std::endl;
873       {//Critical section
874         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
875         task->aborted();
876         _mainSched->notifyFrom(task,YACS::ABORT,execInst);
877         traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
878       }//End of critical section
879     }
880   catch(...) 
881     {
882       std::cerr << "Load failed" << std::endl;
883       {//Critical section
884         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
885         task->aborted();
886         _mainSched->notifyFrom(task,YACS::ABORT,execInst);
887         traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
888       }//End of critical section
889     }
890 }
891
892 struct threadargs
893 {
894   Task *task;
895   Scheduler *sched;
896   Executor *execInst;
897 };
898
899 void Executor::loadParallelTasks(const std::vector<Task *>& tasks, const Executor *execInst)
900 {
901   std::vector<Thread> ths(tasks.size());
902   std::size_t ithread(0);
903   for(std::vector<Task *>::const_iterator iter = _tasks.begin(); iter != _tasks.end(); iter++, ithread++)
904     {
905       DEBTRACE("Executor::loadParallelTasks(Task *task)");
906       struct threadargs *args(new threadargs);
907       args->task = (*iter);
908       args->sched = _mainSched;
909       args->execInst = this;
910       ths[ithread].go(functionForTaskLoad, args, _threadStackSize);
911     }
912   for(ithread=0;ithread<tasks.size();ithread++)
913     ths[ithread].join();
914 }
915
916 //! Execute a list of tasks possibly connected through datastream links
917 /*!
918  *  \param tasks  : a list of tasks to execute
919  *
920  */
921 void Executor::launchTasks(const std::vector<Task *>& tasks)
922 {
923   //First phase, make datastream connections
924   for(vector<Task *>::const_iterator iter=tasks.begin();iter!=tasks.end();iter++)
925     {
926       YACS::StatesForNode state=(*iter)->getState();
927       if(state != YACS::TOLOAD && state != YACS::TORECONNECT)continue;
928       try
929         {
930           (*iter)->connectService();
931           traceExec(*iter, "connectService",ComputePlacement(*iter));
932           {//Critical section
933             YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
934             (*iter)->connected();
935           }//End of critical section
936         }
937       catch(Exception& ex) 
938         {
939           std::cerr << ex.what() << std::endl;
940           try
941             {
942               (*iter)->disconnectService();
943               traceExec(*iter, "disconnectService",ComputePlacement(*iter));
944             }
945           catch(...) 
946             {
947               // Disconnect has failed
948               traceExec(*iter, "disconnectService failed, ABORT",ComputePlacement(*iter));
949             }
950           {//Critical section
951             YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
952             (*iter)->aborted();
953             _mainSched->notifyFrom(*iter,YACS::ABORT,this);
954           }//End of critical section
955         }
956       catch(...) 
957         {
958           std::cerr << "Problem in connectService" << std::endl;
959           try
960             {
961               (*iter)->disconnectService();
962               traceExec(*iter, "disconnectService",ComputePlacement(*iter));
963             }
964           catch(...) 
965             {
966               // Disconnect has failed
967               traceExec(*iter, "disconnectService failed, ABORT",ComputePlacement(*iter));
968             }
969           {//Critical section
970             YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
971             (*iter)->aborted();
972             _mainSched->notifyFrom(*iter,YACS::ABORT,this);
973           }//End of critical section
974         }
975       if((*iter)->getState() == YACS::ERROR)
976         {
977           //try to put all coupled tasks in error
978           std::set<Task*> coupledSet;
979           (*iter)->getCoupledTasks(coupledSet);
980           for (std::set<Task*>::iterator it = coupledSet.begin(); it != coupledSet.end(); ++it)
981             {
982               Task* t=*it;
983               if(t == *iter)continue;
984               if(t->getState() == YACS::ERROR)continue;
985               try
986                 {
987                   t->disconnectService();
988                   traceExec(t, "disconnectService",ComputePlacement(*iter));
989                 }
990               catch(...)
991                 {
992                   // Disconnect has failed
993                   traceExec(t, "disconnectService failed, ABORT",ComputePlacement(*iter));
994                 }
995               {//Critical section
996                 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
997                 t->aborted();
998                 _mainSched->notifyFrom(t,YACS::ABORT,this);
999               }//End of critical section
1000               traceExec(t, "state:"+Node::getStateName(t->getState()),ComputePlacement(*iter));
1001             }
1002         }
1003       traceExec(*iter, "state:"+Node::getStateName((*iter)->getState()),ComputePlacement(*iter));
1004     }
1005
1006   //Second phase, execute each task in a thread
1007   for(vector<Task *>::const_iterator iter=tasks.begin();iter!=tasks.end();iter++)
1008     {
1009       launchTask(*iter);
1010     }
1011 }
1012
1013 //! Execute a Task in a thread
1014 /*!
1015  *  \param task  : Task to execute
1016  *
1017  *  Calls Scheduler::notifyFrom of main node (_mainSched) to notify start
1018  *
1019  *  Calls Executor::functionForTaskExecution in Thread
1020  */
1021
1022 void Executor::launchTask(Task *task)
1023 {
1024   DEBTRACE("Executor::launchTask(Task *task)");
1025   struct threadargs *args;
1026   if(task->getState() != YACS::TOACTIVATE)return;
1027
1028   DEBTRACE("before _semForMaxThreads.wait " << _semThreadCnt);
1029   if(_semThreadCnt == 0)
1030     {
1031       // --- Critical section
1032       YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1033       //check if we have enough threads to run
1034       std::set<Task*> tmpSet=_runningTasks;
1035       std::set<Task*>::iterator it = tmpSet.begin();
1036       std::string status="running";
1037       std::set<Task*> coupledSet;
1038       while( it != tmpSet.end() )
1039         {
1040           Task* tt=*it;
1041           coupledSet.clear();
1042           tt->getCoupledTasks(coupledSet);
1043           status="running";
1044           for (std::set<Task*>::iterator iter = coupledSet.begin(); iter != coupledSet.end(); ++iter)
1045             {
1046               if((*iter)->getState() == YACS::TOACTIVATE)status="toactivate";
1047               tmpSet.erase(*iter);
1048             }
1049           if(status=="running")break;
1050           it = tmpSet.begin();
1051         }
1052
1053       if(status=="toactivate")
1054         {
1055           std::cerr << "WARNING: maybe you need more threads to run your schema (current value="<< _maxThreads << ")" << std::endl;
1056           std::cerr << "If it is the case, set the YACS_MAX_THREADS environment variable to a bigger value (export YACS_MAX_THREADS=xxx)" << std::endl;
1057         }
1058       // --- End of critical section
1059     }
1060
1061   _semForMaxThreads.wait();
1062   _semThreadCnt -= 1;
1063
1064   args= new threadargs;
1065   args->task = task;
1066   args->sched = _mainSched;
1067   args->execInst = this;
1068
1069   traceExec(task, "launch",ComputePlacement(task));
1070
1071   { // --- Critical section
1072     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1073     _numberOfRunningTasks++;
1074     _runningTasks.insert(task);
1075     task->begin(); //change state to ACTIVATED
1076   } // --- End of critical section
1077   Thread(functionForTaskExecution, args, _threadStackSize);
1078 }
1079
1080 //! wait until a running task ends
1081
1082 void Executor::sleepWhileNoEventsFromAnyRunningTask()
1083 {
1084   DEBTRACE("Executor::sleepWhileNoEventsFromAnyRunningTask()");
1085 //   _semForNewTasksToPerform.wait(); //----utiliser pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
1086   YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1087   if (_numberOfRunningTasks > 0 && _numberOfEndedTasks==0)
1088     {
1089       _isWaitingEventsFromRunningTasks = true;
1090       _condForNewTasksToPerform.wait(_mutexForSchedulerUpdate); // mutex released during wait
1091     }
1092   _numberOfEndedTasks=0;
1093   DEBTRACE("---");
1094 }
1095
1096
1097 //! must be used protected by _mutexForSchedulerUpdate!
1098
1099 void Executor::wakeUp()
1100 {
1101   DEBTRACE("Executor::wakeUp() " << _isWaitingEventsFromRunningTasks);
1102   if (_isWaitingEventsFromRunningTasks)
1103     {
1104       _isWaitingEventsFromRunningTasks = false;
1105       _condForNewTasksToPerform.notify_all();
1106     }
1107   else
1108     _numberOfEndedTasks++;
1109 }
1110
1111 int Executor::getMaxNbOfThreads() const
1112 {
1113   return (int)_maxNbThreads;
1114 }
1115
1116 void Executor::setMaxNbOfThreads(int maxNbThreads)
1117 {
1118   _maxNbThreads = static_cast< std::uint32_t >(maxNbThreads);
1119 }
1120
1121 int Executor::getNbOfThreads()
1122 {
1123   int ret = 0;
1124   YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForNbOfConcurrentThreads);
1125   _isRunningunderExternalControl=true;
1126   return ret;
1127 }
1128
1129 //! number of running tasks
1130 int Executor::getNumberOfRunningTasks()
1131 {
1132   YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1133   _isRunningunderExternalControl=true;
1134   return _numberOfRunningTasks;
1135 }
1136
1137 /*!
1138  * This thread is NOT supposed to be detached !
1139  */
1140 void *Executor::functionForTaskLoad(void *arg)
1141 {
1142   DEBTRACE("Executor::functionForTaskLoad(void *arg)");
1143   struct threadargs *args = (struct threadargs *) arg;
1144   Task *task=args->task;
1145   Scheduler *sched=args->sched;
1146   Executor *execInst=args->execInst;
1147   delete args;
1148   execInst->loadTask(task,execInst);// no throw of this method - all throw are catched !
1149   return 0;
1150 }
1151
1152 //! Function to perform execution of a task in a thread
1153 /*!
1154  *  \param arg  : 3 elements (a Task, a Scheduler, an Executor)
1155  *
1156  *  Calls Task::execute
1157  *
1158  *  Calls Task::finished when the task is finished
1159  *
1160  *  Calls (notify with event YACS::FINISH) Scheduler::notifyFrom when the task is finished
1161  *
1162  *  Calls Executor::wakeUp and Executor::notifyEndOfThread
1163  */
1164
1165 void *Executor::functionForTaskExecution(void *arg)
1166 {
1167   DEBTRACE("Executor::functionForTaskExecution(void *arg)");
1168
1169   struct threadargs *args = (struct threadargs *) arg;
1170   Task *task=args->task;
1171   Scheduler *sched=args->sched;
1172   Executor *execInst=args->execInst;
1173   delete args;
1174   execInst->traceExec(task, "state:"+Node::getStateName(task->getState()),ComputePlacement(task));
1175
1176   Thread::detach();
1177
1178   // Execute task
1179
1180   if(execInst->getDPLScopeSensitive())
1181     {
1182       Node *node(dynamic_cast<Node *>(task));
1183       ComposedNode *gfn(dynamic_cast<ComposedNode *>(sched));
1184       if(node!=0 && gfn!=0)
1185         node->applyDPLScope(gfn);
1186     }
1187
1188   YACS::Event ev=YACS::FINISH;
1189   try
1190     {
1191       execInst->traceExec(task, "start execution",ComputePlacement(task));
1192       task->execute();
1193       execInst->traceExec(task, "end execution OK",ComputePlacement(task));
1194     }
1195   catch(Exception& ex)
1196     {
1197       std::cerr << "YACS Exception during execute" << std::endl;
1198       std::cerr << ex.what() << std::endl;
1199       ev=YACS::ABORT;
1200       string message = "end execution ABORT, ";
1201       message += ex.what();
1202       execInst->traceExec(task, message,ComputePlacement(task));
1203     }
1204   catch(...) 
1205     {
1206       // Execution has failed
1207       std::cerr << "Execution has failed: unknown reason" << std::endl;
1208       ev=YACS::ABORT;
1209       execInst->traceExec(task, "end execution ABORT, unknown reason",ComputePlacement(task));
1210     }
1211
1212   // Disconnect task
1213   try
1214     {
1215       DEBTRACE("task->disconnectService()");
1216       task->disconnectService();
1217       execInst->traceExec(task, "disconnectService",ComputePlacement(task));
1218     }
1219   catch(...) 
1220     {
1221       // Disconnect has failed
1222       std::cerr << "disconnect has failed" << std::endl;
1223       ev=YACS::ABORT;
1224       execInst->traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
1225     }
1226   //
1227
1228   std::string placement(ComputePlacement(task));
1229
1230   // container management for HomogeneousPoolOfContainer
1231
1232   HomogeneousPoolContainer *contC(dynamic_cast<HomogeneousPoolContainer *>(task->getContainer()));
1233   if(contC)
1234     {
1235       std::lock_guard<std::mutex> alckCont(contC->getLocker());
1236       contC->release(task);
1237     }
1238
1239   DEBTRACE("End task->execute()");
1240   { // --- Critical section
1241     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&execInst->_mutexForSchedulerUpdate);
1242     try
1243       {
1244         if (ev == YACS::FINISH) task->finished();
1245         if (ev == YACS::ABORT)
1246           {
1247             execInst->_errorDetected = true;
1248             if (execInst->_stopOnErrorRequested)
1249               {
1250                 execInst->_execMode = YACS::STEPBYSTEP;
1251                 execInst->_isOKToEnd = true;
1252               }
1253             task->aborted();
1254           }
1255         execInst->traceExec(task, "state:"+Node::getStateName(task->getState()),placement);
1256         sched->notifyFrom(task,ev,execInst);
1257       }
1258     catch(Exception& ex)
1259       {
1260         //notify has failed : it is supposed to have set state
1261         //so no need to do anything
1262         std::cerr << "Error during notification" << std::endl;
1263         std::cerr << ex.what() << std::endl;
1264       }
1265     catch(...)
1266       {
1267         //notify has failed : it is supposed to have set state
1268         //so no need to do anything
1269         std::cerr << "Notification failed" << std::endl;
1270       }
1271     execInst->_numberOfRunningTasks--;
1272     execInst->_runningTasks.erase(task);
1273     DEBTRACE("_numberOfRunningTasks: " << execInst->_numberOfRunningTasks 
1274              << " _execMode: " << execInst->_execMode
1275              << " _executorState: " << execInst->_executorState);
1276     if ((execInst->_numberOfRunningTasks == 0) && (execInst->_execMode != YACS::CONTINUE)) // no more running tasks
1277       {
1278         if (execInst->_executorState == YACS::WAITINGTASKS)
1279           {
1280             execInst->_executorState = YACS::PAUSED;
1281             execInst->sendEvent("executor");
1282             execInst->_condForPilot.notify_all();
1283             if (execInst->_errorDetected &&
1284                 execInst->_stopOnErrorRequested &&
1285                 !execInst->_isRunningunderExternalControl)
1286               execInst->_condForStepByStep.notify_all(); // exec thread may be on waitResume
1287           }
1288       }
1289     DEBTRACE("before _semForMaxThreads.post " << execInst->_semThreadCnt);
1290     execInst->_semForMaxThreads.post();
1291     execInst->_semThreadCnt += 1;
1292     DEBTRACE("after _semForMaxThreads.post " << execInst->_semThreadCnt);
1293     if (execInst->_executorState != YACS::PAUSED)  execInst->wakeUp();
1294
1295   } // --- End of critical section (change state)
1296
1297   Thread::exit(0);
1298   return 0;
1299 }
1300
1301 void Executor::traceExec(Task *task, const std::string& message, const std::string& placement)
1302 {
1303   string nodeName = _mainSched->getTaskName(task);
1304   Container *cont = task->getContainer();
1305   string containerName = "---";
1306   if (cont)
1307     containerName = cont->getName();
1308
1309   std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now();
1310   std::chrono::milliseconds millisec;
1311   millisec = std::chrono::duration_cast<std::chrono::milliseconds>(now -_start);
1312   double elapse = double(millisec.count()) / 1000.0;
1313   {
1314     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForTrace);
1315     _trace << elapse << " " << containerName << " " << placement << " " << nodeName << " " << message << endl;
1316     _trace << flush;
1317   }
1318 }
1319
1320 //! emit notification to all observers registered with  the dispatcher 
1321 /*!
1322  * The dispatcher is unique and can be obtained by getDispatcher()
1323  */
1324 void Executor::sendEvent(const std::string& event)
1325 {
1326   Dispatcher* disp=Dispatcher::getDispatcher();
1327   YASSERT(disp);
1328   YASSERT(_root);
1329   disp->dispatch(_root,event);
1330 }
1331
1332 struct HPCCompare
1333 {
1334   bool operator()(HomogeneousPoolContainer * lhs, HomogeneousPoolContainer * rhs) const
1335   {
1336     if(!lhs && !rhs)
1337       return false;
1338     if(!lhs)
1339       return true;
1340     if(!rhs)
1341       return false;
1342     return lhs->getNumberOfCoresPerWorker() < rhs->getNumberOfCoresPerWorker();
1343   }
1344 };
1345
1346 /*!
1347  * This method takes in input a list of tasks and selects from that lists a part of it considering only the containers.
1348  * If tasks have no container instance subclass of HomogeneousPoolContainer this method will let the \a tsks untouched.
1349  *
1350  * \param [in,out] tsks - list of tasks to be
1351  */
1352 void Executor::filterTasksConsideringContainers(std::vector<Task *>& tsks)
1353 {
1354   std::map<HomogeneousPoolContainer *, std::vector<Task *>, HPCCompare > m;
1355   for(auto cur : tsks)
1356     {
1357       if(!cur)
1358         continue;
1359       Container *cont(cur->getContainer());
1360       if(!cont)
1361         {
1362           m[nullptr].push_back(cur);
1363           continue;
1364         }
1365       HomogeneousPoolContainer *contC(dynamic_cast<HomogeneousPoolContainer *>(cont));
1366       if(!contC)
1367         {
1368           m[nullptr].push_back(cur);
1369           continue;
1370         }
1371       m[contC].push_back(cur);
1372     }
1373   //
1374   std::vector<Task *> ret;
1375   for(auto it : m)
1376     {
1377       HomogeneousPoolContainer *curhpc(it.first);
1378       const std::vector<Task *>& curtsks(it.second);
1379       if(!curhpc)
1380         {
1381           std::uint32_t nbThreadsRunning = _runningTasks.size();
1382           std::uint32_t nbOfFreeSpace = _maxNbThreads - min(_maxNbThreads,nbThreadsRunning);
1383           std::uint32_t nbOfCandidates = static_cast<std::uint32_t>( curtsks.size() );
1384           std::uint32_t nbOfCandidatesToBeLaunched = std::min(nbOfCandidates,nbOfFreeSpace);
1385           DEBTRACE("nb threads running: " << nbThreadsRunning);
1386           DEBTRACE("MaxNbThreads: " << _maxNbThreads);
1387           DEBTRACE("nbOfFreeSpace: " << nbOfFreeSpace);
1388           DEBTRACE("nbOfCandidates: " << nbOfCandidates);
1389           DEBTRACE("nbOfCandidatesToBeLaunched: " << nbOfCandidatesToBeLaunched);
1390           ret.insert(ret.end(),curtsks.begin(),curtsks.begin() + nbOfCandidatesToBeLaunched);
1391         }
1392       else
1393         {
1394           // start of critical section for container curhpc
1395           std::lock_guard<std::mutex> alckCont(curhpc->getLocker());
1396           std::vector<const Task *> vecOfTaskSharingSameHPContToBeRunSimutaneously;
1397           std::size_t sz(curhpc->getNumberOfFreePlace());
1398           std::vector<Task *>::const_iterator it2(curtsks.begin());
1399           for(std::size_t i=0;i<sz && it2!=curtsks.end();i++,it2++)
1400             {
1401               vecOfTaskSharingSameHPContToBeRunSimutaneously.push_back(*it2);
1402               ret.push_back(*it2);
1403             }
1404           curhpc->allocateFor(vecOfTaskSharingSameHPContToBeRunSimutaneously);
1405           //end of critical section
1406         }
1407     }
1408   //
1409   tsks=ret;
1410 }
1411
1412 std::string Executor::ComputePlacement(Task *zeTask)
1413 {
1414   std::string placement("---");
1415   if(!zeTask)
1416     return placement;
1417   if(zeTask->getContainer())
1418     placement=zeTask->getContainer()->getFullPlacementId(zeTask);
1419   return placement;
1420 }
1421
1422 ///////// NEW EXECUTOR ////////////////////////////////
1423 void Executor::loadTask(Task *task, const WorkloadManager::RunInfo& runInfo)
1424 {
1425   if(task->getState() != YACS::TOLOAD)
1426     return;
1427   traceExec(task, "state:TOLOAD", ComputePlacement(task));
1428   {//Critical section
1429     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1430     _mainSched->notifyFrom(task,YACS::START,this);
1431   }//End of critical section
1432   try
1433     {
1434       std::ostringstream container_name;
1435       container_name << runInfo.type.name << "-" << runInfo.index;
1436       task->imposeResource(runInfo.resource.name, container_name.str());
1437       traceExec(task, "load", ComputePlacement(task));
1438       task->load();
1439       traceExec(task, "initService", ComputePlacement(task));
1440       task->initService();
1441     }
1442   catch(Exception& ex)
1443     {
1444       std::cerr << ex.what() << std::endl;
1445       {//Critical section
1446         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1447         task->aborted();
1448         _mainSched->notifyFrom(task,YACS::ABORT, this);
1449         traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
1450       }//End of critical section
1451     }
1452   catch(...)
1453     {
1454       std::cerr << "Load failed" << std::endl;
1455       {//Critical section
1456         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1457         task->aborted();
1458         _mainSched->notifyFrom(task,YACS::ABORT, this);
1459         traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
1460       }//End of critical section
1461     }
1462 }
1463
1464 void Executor::beginTask(Task *task)
1465 {
1466   // --- Critical section
1467   YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1468   _numberOfRunningTasks++;
1469   _runningTasks.insert(task);
1470   // --- End of critical section
1471 }
1472
1473 void Executor::endTask(Task *task, YACS::Event ev)
1474 {
1475   YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1476   try
1477   {
1478     if (ev == YACS::FINISH) task->finished();
1479     if (ev == YACS::ABORT)
1480     {
1481       _errorDetected = true;
1482       if (_stopOnErrorRequested)
1483       {
1484         _execMode = YACS::STEPBYSTEP;
1485         _isOKToEnd = true;
1486       }
1487       task->aborted();
1488     }
1489     //traceExec(task, "state:"+Node::getStateName(task->getState()),placement);
1490     _mainSched->notifyFrom(task,ev,this);
1491   }
1492   catch(Exception& ex)
1493   {
1494     //notify has failed : it is supposed to have set state
1495     //so no need to do anything
1496     std::cerr << "Error during notification" << std::endl;
1497     std::cerr << ex.what() << std::endl;
1498   }
1499   catch(...)
1500   {
1501     //notify has failed : it is supposed to have set state
1502     //so no need to do anything
1503     std::cerr << "Notification failed" << std::endl;
1504   }
1505   _numberOfRunningTasks--;
1506   _runningTasks.erase(task);
1507   if ((_numberOfRunningTasks == 0) && (_execMode != YACS::CONTINUE)) // no more running tasks
1508     {
1509       if (_executorState == YACS::WAITINGTASKS)
1510         {
1511           _executorState = YACS::PAUSED;
1512           sendEvent("executor");
1513           _condForPilot.notify_all();
1514           if (_errorDetected &&
1515               _stopOnErrorRequested &&
1516               !_isRunningunderExternalControl)
1517             _condForStepByStep.notify_all(); // exec thread may be on waitResume
1518         }
1519     }
1520   if (_executorState != YACS::PAUSED)
1521     wakeUp();
1522 }
1523
1524 void Executor::failTask(Task *task, const std::string& message)
1525 {
1526   ElementaryNode* elemNode = dynamic_cast<ElementaryNode*>(task);
1527   if(elemNode != nullptr)
1528   {
1529     StateLoader(elemNode, YACS::ERROR);
1530     elemNode->setErrorDetails(message);
1531   }
1532   endTask(task, YACS::ABORT);
1533 }
1534
1535 YACS::Event  Executor::runTask(Task *task)
1536 {
1537   { // --- Critical section
1538     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1539     task->begin(); //change state to ACTIVATED
1540   }
1541   traceExec(task, "state:"+Node::getStateName(task->getState()),ComputePlacement(task));
1542
1543   if(getDPLScopeSensitive())
1544     {
1545       Node *node(dynamic_cast<Node *>(task));
1546       ComposedNode *gfn(dynamic_cast<ComposedNode *>(_mainSched));
1547       if(node!=0 && gfn!=0)
1548         node->applyDPLScope(gfn);
1549     }
1550
1551   YACS::Event ev=YACS::FINISH;
1552   try
1553     {
1554       traceExec(task, "start execution",ComputePlacement(task));
1555       task->execute();
1556       traceExec(task, "end execution OK",ComputePlacement(task));
1557     }
1558   catch(Exception& ex)
1559     {
1560       std::cerr << "YACS Exception during execute" << std::endl;
1561       std::cerr << ex.what() << std::endl;
1562       ev=YACS::ABORT;
1563       string message = "end execution ABORT, ";
1564       message += ex.what();
1565       traceExec(task, message,ComputePlacement(task));
1566     }
1567   catch(...)
1568     {
1569       // Execution has failed
1570       std::cerr << "Execution has failed: unknown reason" << std::endl;
1571       ev=YACS::ABORT;
1572       traceExec(task, "end execution ABORT, unknown reason",ComputePlacement(task));
1573     }
1574
1575   // Disconnect task
1576   try
1577     {
1578       DEBTRACE("task->disconnectService()");
1579       task->disconnectService();
1580       traceExec(task, "disconnectService",ComputePlacement(task));
1581     }
1582   catch(...)
1583     {
1584       // Disconnect has failed
1585       std::cerr << "disconnect has failed" << std::endl;
1586       ev=YACS::ABORT;
1587       traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
1588     }
1589   //
1590
1591   std::string placement(ComputePlacement(task));
1592
1593   // container management for HomogeneousPoolOfContainer
1594
1595   HomogeneousPoolContainer *contC(dynamic_cast<HomogeneousPoolContainer *>(task->getContainer()));
1596   if(contC)
1597   {
1598     std::lock_guard<std::mutex> alckCont(contC->getLocker());
1599     contC->release(task);
1600   }
1601
1602   return ev;
1603 }
1604
1605 void Executor::makeDatastreamConnections(Task *task)
1606 {
1607   YACS::StatesForNode state=task->getState();
1608   if(state != YACS::TOLOAD && state != YACS::TORECONNECT)
1609     return;
1610   try
1611     {
1612       task->connectService();
1613       traceExec(task, "connectService",ComputePlacement(task));
1614       {//Critical section
1615         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1616         task->connected();
1617       }//End of critical section
1618     }
1619   catch(Exception& ex)
1620     {
1621       std::cerr << ex.what() << std::endl;
1622       try
1623         {
1624           (task)->disconnectService();
1625           traceExec(task, "disconnectService",ComputePlacement(task));
1626         }
1627       catch(...)
1628         {
1629           // Disconnect has failed
1630           traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
1631         }
1632       {//Critical section
1633         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1634         task->aborted();
1635         _mainSched->notifyFrom(task,YACS::ABORT,this);
1636       }//End of critical section
1637     }
1638   catch(...)
1639     {
1640       std::cerr << "Problem in connectService" << std::endl;
1641       try
1642         {
1643           (task)->disconnectService();
1644           traceExec(task, "disconnectService",ComputePlacement(task));
1645         }
1646       catch(...)
1647         {
1648           // Disconnect has failed
1649           traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
1650         }
1651       {//Critical section
1652         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1653         task->aborted();
1654         _mainSched->notifyFrom(task,YACS::ABORT,this);
1655       }//End of critical section
1656     }
1657   if(task->getState() == YACS::ERROR)
1658     {
1659       //try to put all coupled tasks in error
1660       std::set<Task*> coupledSet;
1661       task->getCoupledTasks(coupledSet);
1662       for (std::set<Task*>::iterator it = coupledSet.begin(); it != coupledSet.end(); ++it)
1663         {
1664           Task* t=*it;
1665           if(t == task)continue;
1666           if(t->getState() == YACS::ERROR)continue;
1667           try
1668             {
1669               t->disconnectService();
1670               traceExec(t, "disconnectService",ComputePlacement(task));
1671             }
1672           catch(...)
1673             {
1674               // Disconnect has failed
1675               traceExec(t, "disconnectService failed, ABORT",ComputePlacement(task));
1676             }
1677           {//Critical section
1678             YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1679             t->aborted();
1680             _mainSched->notifyFrom(t,YACS::ABORT,this);
1681           }//End of critical section
1682           traceExec(t, "state:"+Node::getStateName(t->getState()),ComputePlacement(task));
1683         }
1684     }
1685   traceExec(task, "state:"+Node::getStateName(task->getState()),ComputePlacement(task));
1686 }
1687
1688 void Executor::runWlm(Scheduler *graph,int debug, bool fromScratch)
1689 {
1690   DEBTRACE("Executor::runWlm debug: "<< graph->getName() <<" "<< debug<<" fromScratch: "<<fromScratch);
1691   { // --- Critical section
1692     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1693     _mainSched = graph;
1694     _root = dynamic_cast<ComposedNode *>(_mainSched);
1695     if (!_root) throw Exception("Executor::Run, Internal Error!");
1696     _executorState = YACS::NOTYETINITIALIZED;
1697     sendEvent("executor");
1698     _toContinue=true;
1699     _isOKToEnd = false;
1700     _errorDetected = false;
1701     _isWaitingEventsFromRunningTasks = false;
1702     _numberOfRunningTasks = 0;
1703     _runningTasks.clear();
1704     _numberOfEndedTasks = 0;
1705     string tracefile = "traceExec_";
1706     tracefile += _mainSched->getName();
1707     _trace.open(tracefile.c_str());
1708     _start = std::chrono::steady_clock::now();
1709   } // --- End of critical section
1710
1711   if (debug > 1) _displayDot(graph);
1712
1713   if (fromScratch)
1714     {
1715       try
1716         {
1717           graph->init();
1718           graph->exUpdateState();
1719         }
1720       catch(Exception& ex)
1721         {
1722           DEBTRACE("exception: "<< (ex.what()));
1723           _executorState = YACS::FINISHED;
1724           sendEvent("executor");
1725           throw;
1726         }
1727     }
1728   _executorState = YACS::INITIALISED;
1729   sendEvent("executor");
1730
1731   if (debug > 1) _displayDot(graph);
1732
1733   bool isMore;
1734   int problemCount=0;
1735   int numberAllTasks;
1736
1737   _executorState = YACS::RUNNING;
1738   sendEvent("executor");
1739
1740   WorkloadManager::DefaultAlgorithm algo;
1741   WorkloadManager::WorkloadManager wlm(algo);
1742   WlmTask::loadResources(wlm);
1743   wlm.start();
1744
1745   while (_toContinue)
1746     {
1747       DEBTRACE("--- executor main loop");
1748       sleepWhileNoEventsFromAnyRunningTask();
1749       DEBTRACE("--- events...");
1750       if (debug > 2) _displayDot(graph);
1751       { // --- Critical section
1752         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1753         std::vector<Task *> readyTasks=graph->getNextTasks(isMore);
1754         graph->selectRunnableTasks(readyTasks);
1755         _tasks.clear();
1756         for(Task * t : readyTasks)
1757           if(_runningTasks.find(t) == _runningTasks.end())
1758             _tasks.push_back(t);
1759         // TODO: to be removed
1760         filterTasksConsideringContainers(_tasks);
1761         numberAllTasks=_numberOfRunningTasks+_tasks.size();
1762       } // --- End of critical section
1763       if (debug > 2) _displayDot(graph);
1764       DEBTRACE("--- events...");
1765       if (_executorState == YACS::RUNNING)
1766       {
1767         if (checkBreakPoints()) break; // end of thread requested, OK to exit at once;
1768         for(Task * task : _tasks)
1769         {
1770           beginTask(task);
1771           WlmTask* newTask = new WlmTask(*this, task);
1772           wlm.addTask(newTask);
1773         }
1774       }
1775       if (debug > 1) _displayDot(graph);
1776       { // --- Critical section
1777         DEBTRACE("---");
1778         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1779         //It is possible that the graph is finished but it remains running tasks (it's an error but we must take it into account)
1780         _toContinue = !graph->isFinished();
1781
1782         DEBTRACE("_numberOfRunningTasks: " << _numberOfRunningTasks);
1783         DEBTRACE("_numberOfEndedTasks: " << _numberOfEndedTasks);
1784         DEBTRACE("_toContinue: " << _toContinue);
1785         if(_toContinue && numberAllTasks==0)
1786         {
1787           //Problem : no running tasks and no task to launch ??
1788           problemCount++;
1789           std::cerr << "Problem in Executor : no running tasks and no task to launch ?? problemCount=" << problemCount << std::endl;
1790           //Pause to give a chance to interrupt
1791           usleep(1000);
1792           if(problemCount > 25)
1793           {
1794             // Too much problems encountered : stop execution
1795             _toContinue=false;
1796           }
1797         }
1798
1799         if (! _toContinue)
1800           {
1801             _executorState = YACS::FINISHED;
1802             sendEvent("executor");
1803             _condForPilot.notify_all();
1804           }
1805       } // --- End of critical section
1806       if (debug > 0) _displayDot(graph);
1807       DEBTRACE("_toContinue: " << _toContinue);
1808     }
1809
1810   wlm.stop();
1811   DEBTRACE("End of main Loop");
1812
1813   { // --- Critical section
1814     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1815     if ( _toContinue) // --- break while(): request to stop detected on checkBreakPoints()
1816       {
1817         DEBTRACE("stop requested: End soon");
1818         _executorState = YACS::STOPPED;
1819         _toContinue = false;
1820         sendEvent("executor");
1821       }
1822   } // --- End of critical section
1823   if ( _dumpOnErrorRequested && _errorDetected)
1824     {
1825       saveState(_dumpErrorFile);
1826     }
1827   {
1828     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForTrace);
1829     _trace.close();
1830   }
1831 }
1832
1833 void Executor::RunW(Scheduler *graph,int debug, bool fromScratch)
1834 {
1835   std::string str_value = graph->getProperty("executor");
1836   if(str_value == "WorkloadManager"
1837      || str_value == "WORKLOADMANAGER"
1838      || str_value == "workloadmanager"
1839      || str_value == "WorkLoadManager")
1840     runWlm(graph, debug, fromScratch);
1841   else
1842     RunB(graph, debug, fromScratch);
1843 }