Salome HOME
WIP: workload manager final version of the API
[modules/yacs.git] / src / engine / Executor.cxx
1 // Copyright (C) 2006-2019  CEA/DEN, EDF R&D
2 //
3 // This library is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU Lesser General Public
5 // License as published by the Free Software Foundation; either
6 // version 2.1 of the License, or (at your option) any later version.
7 //
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
11 // Lesser General Public License for more details.
12 //
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
16 //
17 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
18 //
19
20 #include "Executor.hxx"
21 #include "Task.hxx"
22 #include "AutoLocker.hxx"
23 #include "Scheduler.hxx"
24 #include "Dispatcher.hxx"
25 #include "Container.hxx"
26 #include "HomogeneousPoolContainer.hxx"
27 #include "ComponentInstance.hxx"
28
29 #include "VisitorSaveState.hxx"
30 #include "ServiceNode.hxx"
31 #include "ComposedNode.hxx"
32
33 #include "workloadmanager/WorkloadManager.hxx"
34 #include "workloadmanager/DefaultAlgorithm.hxx"
35
36 #include <iostream>
37 #include <fstream>
38 #include <sys/stat.h>
39 #ifndef WIN32
40 #include <sys/time.h>
41 #include <unistd.h>
42 #endif
43
44 #include <cstdlib>
45 #include <algorithm>
46
47 #ifdef WIN32
48 #define usleep(A) _sleep(A/1000)
49 #if !defined(S_ISCHR) || !defined(S_ISREG)
50 #  ifndef S_IFMT
51 #    ifdef _S_IFMT
52 #      define S_IFMT _S_IFMT
53 #      define S_IFCHR _S_IFCHR
54 #      define S_IFREG _S_IFREG
55 #    else
56 #    ifdef __S_IFMT
57 #      define S_IFMT __S_IFMT
58 #      define S_IFCHR __S_IFCHR
59 #      define S_IFREG __S_IFREG
60 #    endif
61 #    endif
62 #  endif
63 #  define S_ISCHR(mode) (((mode) & S_IFMT) == S_IFCHR)
64 #  define S_ISREG(mode) (((mode) & S_IFMT) == S_IFREG)
65 #endif
66 #endif
67
68 using namespace YACS::ENGINE;
69 using namespace std;
70
71 using YACS::BASES::Mutex;
72 using YACS::BASES::Thread;
73 using YACS::BASES::Semaphore;
74
75 //#define _DEVDEBUG_
76 #include "YacsTrace.hxx"
77
78 int Executor::_maxThreads(1000);
79 size_t Executor::_threadStackSize(262144); // Default thread stack size is 256 kB == 2**18 because threads launched by YACS are lightweight
80
81 Executor::Executor():_mainSched(NULL),_isWaitingEventsFromRunningTasks(false),_nbOfConcurrentThreads(0), _semForMaxThreads(_maxThreads),_keepGoingOnFail(false),_DPLScopeSensitive(false)
82 {
83   _root=0;
84   _toContinue = true;
85   _isOKToEnd = false;
86   _stopOnErrorRequested = false;
87   _dumpOnErrorRequested = false;
88   _errorDetected = false;
89   _isRunningunderExternalControl=false;
90   _executorState = YACS::NOTYETINITIALIZED;
91   _execMode = YACS::CONTINUE;
92   _semThreadCnt = _maxThreads;
93   _numberOfRunningTasks = 0;
94   _numberOfEndedTasks = 0;
95   DEBTRACE("Executor initialized with max threads = " << _maxThreads);
96 }
97
98 Executor::~Executor()
99 {
100   for(list<Thread *>::iterator iter=_groupOfAllThreadsCreated.begin();iter!=_groupOfAllThreadsCreated.end();iter++)
101     delete *iter;
102 }
103
104 //! Execute a graph waiting for completion
105 /*!
106  *  \param graph : schema to execute
107  *  \param debug : display the graph with dot if debug == 1
108  *  \param fromScratch : if true the graph is reinitialized
109  *
110  *  Calls Scheduler::getNextTasks and Scheduler::selectRunnableTasks to select tasks to execute
111  *  
112  *  Calls Executor::launchTask to execute a selected Task.
113  *
114  *  Completion when graph is finished (Scheduler::isFinished)
115  */
116
117 void Executor::RunA(Scheduler *graph,int debug, bool fromScratch)
118 {
119   DEBTRACE("Executor::RunW debug: " << debug << " fromScratch: " << fromScratch);
120   _mainSched=graph;
121   _root = dynamic_cast<ComposedNode *>(_mainSched);
122   if (!_root) throw Exception("Executor::Run, Internal Error!");
123   bool isMore;
124   int i=0;
125   if(debug>1)_displayDot(graph);
126   if (fromScratch)
127     {
128       graph->init();
129       graph->exUpdateState();
130     }
131   if(debug>1)_displayDot(graph);
132   vector<Task *> tasks;
133   vector<Task *>::iterator iter;
134   _toContinue=true;
135   _execMode = YACS::CONTINUE;
136   _isWaitingEventsFromRunningTasks = false;
137   _numberOfRunningTasks = 0;
138   _runningTasks.clear();
139   _numberOfEndedTasks=0;
140   while(_toContinue)
141     {
142       sleepWhileNoEventsFromAnyRunningTask();
143
144       if(debug>2)_displayDot(graph);
145
146       {//Critical section
147         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
148         tasks=graph->getNextTasks(isMore);
149         graph->selectRunnableTasks(tasks);
150       }//End of critical section
151
152       if(debug>2)_displayDot(graph);
153
154       for(iter=tasks.begin();iter!=tasks.end();iter++)
155         loadTask(*iter,this);
156
157       if(debug>1)_displayDot(graph);
158
159       launchTasks(tasks);
160
161       if(debug>1)_displayDot(graph);
162
163       {//Critical section
164         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
165         _toContinue=!graph->isFinished();
166       }//End of critical section
167       DEBTRACE("_toContinue: " << _toContinue);
168
169       if(debug>0)_displayDot(graph);
170
171       i++;
172     }
173 }
174
175 //! Execute a graph with breakpoints or step by step
176 /*!
177  *  To be launch in a thread (main thread controls the progression).
178  *  \param graph : schema to execute
179  *  \param debug : display the graph with dot if debug >0
180  *  \param fromScratch : if false, state from a previous partial exection is already loaded
181  *
182  *  Calls Scheduler::getNextTasks and Scheduler::selectRunnableTasks to select tasks to execute
183  *  
184  *  Calls Executor::checkBreakPoints to verify if a pause is requested 
185  * 
186  *  Calls Executor::launchTask to execute a selected Task
187  *
188  *  Completion when graph is finished (Scheduler::isFinished)
189  *
190  *  States of execution:
191  *  - YACS::NOTYETINITIALIZED
192  *  - YACS::INITIALISED
193  *  - YACS::RUNNING            (to next breakpoint or step)
194  *  - YACS::WAITINGTASKS       (a breakpoint or step as been reached, but there are still running tasks)
195  *  - YACS::PAUSED             (a breakpoint or step as been reached, no more running tasks)
196  *  - YACS::FINISHED           (no more ready tasks, nore running tasks)
197  *  - YACS::STOPPED            (stopped by user before end)
198  *
199  *  Modes of Execution:
200  *  - YACS::CONTINUE           (normal run without breakpoints)
201  *  - YACS::STEPBYSTEP         (pause at each loop)
202  *  - YACS::STOPBEFORENODES    (pause when a node is reached)
203  *
204  *  A breakpoint is defined by a node name. The breakpoint is reached when the node becomes ready.
205  *  Step by Step means execution node by node or group of node by group of nodes.
206  *  At a given step, the user decides to launch all the ready nodes or only a subset
207  *  (Caution: some nodes must run in parallel). 
208  *  The next event (end of task) may give a new set of ready nodes, and define a new step.
209  *
210  *  The graph execution may be controled by a pilot which sends requests. Requests are asynchronous.
211  *  Requests are taken into account only on certain states, otherwise return the status IgnoredRequest.
212  *  - Executor::getCurrentExecMode
213  *  - Executor::getExecutorState
214  *  - Executor::setExecMode             : change the execution mode for next loop
215  *  - Executor::setListOfBreakPoints    : must be set before setting YACS::STOPBEFORENODES
216  *  - Executor::getTasksToLoad          : when paused or waiting tasks, get the list of next tasks
217  *  - Executor::setStepsToExecute       : define a subset of the list given by Executor::getTasksToLoad
218  *  - Executor::resumeCurrentBreakPoint : when paused or waiting tasks, resumes execution
219  *  - Executor::isNotFinished
220  *  - Executor::stopExecution           : stop execution asap, i.e. set STEPBYSTEP and wait PAUSED
221  *  - Executor::saveState               : dump the current state of execution in an xml file
222  *  - Executor::loadState               : Not yet implemented
223  *  - Executor::getNbOfThreads
224  *  - Executor::displayDot
225  *  - Executor::setStopOnError          : ask to stop execution if a node is found in ERROR state
226  *
227  *  If the pilot wants to wait the state YACS::PAUSED or YACS::WAITINGTASKS, synchronisation is obtained with:
228  *  - Executor::waitPause
229  *
230  *  TO BE VALIDATED:
231  *  - Pilot may connect to executor during execution, or deconnect.
232  *  - Several Pilots may be connected at the same time (for observation...)
233  * 
234  */
235
236 void Executor::RunB(Scheduler *graph,int debug, bool fromScratch)
237 {
238   DEBTRACE("Executor::RunB debug: "<< graph->getName() <<" "<< debug<<" fromScratch: "<<fromScratch);
239
240   { // --- Critical section
241     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
242     _mainSched = graph;
243     _root = dynamic_cast<ComposedNode *>(_mainSched);
244     if (!_root) throw Exception("Executor::Run, Internal Error!");
245     _executorState = YACS::NOTYETINITIALIZED;
246     sendEvent("executor");
247     _toContinue=true;
248     _isOKToEnd = false;
249     _errorDetected = false;
250     _isWaitingEventsFromRunningTasks = false;
251     _numberOfRunningTasks = 0;
252     _runningTasks.clear();
253     _numberOfEndedTasks = 0;
254     string tracefile = "traceExec_";
255     tracefile += _mainSched->getName();
256     _trace.open(tracefile.c_str());
257     _start = std::chrono::steady_clock::now();
258   } // --- End of critical section
259
260   if (debug > 1) _displayDot(graph);
261
262   if (fromScratch)
263     {
264       try
265         {
266           graph->init();
267           graph->exUpdateState();
268         }
269       catch(Exception& ex)
270         {
271           DEBTRACE("exception: "<< (ex.what()));
272           _executorState = YACS::FINISHED;
273           sendEvent("executor");
274           throw;
275         }
276     }
277   _executorState = YACS::INITIALISED;
278   sendEvent("executor");
279
280   if (debug > 1) _displayDot(graph);
281
282   vector<Task *>::iterator iter;
283   bool isMore;
284   int problemCount=0;
285   int numberAllTasks;
286
287   _executorState = YACS::RUNNING;
288   sendEvent("executor");
289   while (_toContinue)
290     {
291       DEBTRACE("--- executor main loop");
292       sleepWhileNoEventsFromAnyRunningTask();
293       DEBTRACE("--- events...");
294       if (debug > 2) _displayDot(graph);
295       { // --- Critical section
296         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
297         _tasks=graph->getNextTasks(isMore);
298         graph->selectRunnableTasks(_tasks);
299         FilterTasksConsideringContainers(_tasks);
300         numberAllTasks=_numberOfRunningTasks+_tasks.size();
301       } // --- End of critical section
302       if (debug > 2) _displayDot(graph);
303       if (_executorState == YACS::RUNNING)
304         {
305           if (checkBreakPoints()) break; // end of thread requested, OK to exit at once;
306           if (debug > 0) _displayDot(graph);
307           DEBTRACE("---");
308           loadParallelTasks(_tasks,this);
309           if (debug > 1) _displayDot(graph);
310           DEBTRACE("---");
311           launchTasks(_tasks);
312           DEBTRACE("---");
313         }
314       if (debug > 1) _displayDot(graph);
315       { // --- Critical section
316         DEBTRACE("---");
317         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
318         //It is possible that the graph is finished but it remains running tasks (it's an error but we must take it into account)
319         if(_numberOfRunningTasks == 0)
320           _toContinue = !graph->isFinished();
321
322         DEBTRACE("_numberOfRunningTasks: " << _numberOfRunningTasks);
323         DEBTRACE("_numberOfEndedTasks: " << _numberOfEndedTasks);
324         DEBTRACE("_toContinue: " << _toContinue);
325         if(_toContinue && numberAllTasks==0)
326           {
327             //Problem : no running tasks and no task to launch ??
328             problemCount++;
329             std::cerr << "Problem in Executor : no running tasks and no task to launch ?? problemCount=" << problemCount << std::endl;
330             //Pause to give a chance to interrupt
331             usleep(1000);
332             if(problemCount > 25)
333               {
334                 // Too much problems encountered : stop execution
335                 _toContinue=false;
336               }
337           }
338
339         if (! _toContinue)
340           {
341             _executorState = YACS::FINISHED;
342             sendEvent("executor");
343             _condForPilot.notify_all();
344           }
345       } // --- End of critical section
346       if (debug > 0) _displayDot(graph);
347       DEBTRACE("_toContinue: " << _toContinue);
348     }
349
350   DEBTRACE("End of main Loop");
351
352   { // --- Critical section
353     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
354     if ( _toContinue) // --- break while(): request to stop detected on checkBreakPoints()
355       {
356         DEBTRACE("stop requested: End soon");
357         _executorState = YACS::STOPPED;
358         _toContinue = false;
359         sendEvent("executor");
360       }
361   } // --- End of critical section
362   if ( _dumpOnErrorRequested && _errorDetected)
363     {
364       saveState(_dumpErrorFile);
365     }
366   {
367     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForTrace);
368     _trace.close();
369   }
370   DEBTRACE("End of RunB thread");  
371 }
372
373 YACS::ExecutionMode Executor::getCurrentExecMode()
374 {
375   _isRunningunderExternalControl=true;
376   return _execMode;
377 }
378
379
380 YACS::ExecutorState Executor::getExecutorState()
381 {
382   _isRunningunderExternalControl=true;
383   return _executorState;
384 }
385
386
387 bool Executor::isNotFinished()
388 {
389   _isRunningunderExternalControl=true;
390   return _toContinue;
391 }
392
393 //! ask to stop execution on the first node found in error
394 /*!
395  * \param dumpRequested   produce a state dump when an error is found
396  * \param xmlFile         name of file used for state dump
397  */
398
399 void Executor::setStopOnError(bool dumpRequested, std::string xmlFile)
400 {
401   { // --- Critical section
402     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
403     _dumpErrorFile=xmlFile;
404     _stopOnErrorRequested=true;
405     _dumpOnErrorRequested = dumpRequested;
406     if (dumpRequested && xmlFile.empty())
407       throw YACS::Exception("dump on error requested and no filename given for dump");
408     DEBTRACE("_dumpErrorFile " << _dumpErrorFile << " " << _dumpOnErrorRequested);
409   } // --- End of critical section
410 }
411
412 //! ask to do not stop execution on nodes found in error
413 /*!
414  */
415
416 void Executor::unsetStopOnError()
417 {
418   { // --- Critical section
419     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
420     _stopOnErrorRequested=false;
421   } // --- End of critical section
422 }
423
424 //! Dynamically set the current mode of execution
425 /*!
426  * The mode can be Continue, step by step, or stop before execution of a node
427  * defined in a list of breakpoints.
428  */
429
430 void Executor::setExecMode(YACS::ExecutionMode mode)
431 {
432   DEBTRACE("Executor::setExecMode(YACS::ExecutionMode mode) " << mode);
433   { // --- Critical section
434     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
435     _isRunningunderExternalControl=true;
436     _execMode = mode;
437   } // --- End of critical section
438 }
439
440 //! wake up executor when in pause
441 /*!
442  * When Executor is in state paused or waiting for task completion, the thread
443  * running loop RunB waits on condition _condForStepByStep.
444  * Thread RunB is waken up.
445  * \return true when actually wakes up executor
446  */
447
448 bool Executor::resumeCurrentBreakPoint()
449 {
450   DEBTRACE("Executor::resumeCurrentBreakPoint()");
451   bool ret = false;
452   //bool doDump = false;
453   { // --- Critical section
454     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
455     _isRunningunderExternalControl=true;
456     DEBTRACE("_executorState: " << _executorState);
457     switch (_executorState)
458       {
459       case YACS::WAITINGTASKS:
460       case YACS::PAUSED:
461         {
462           _condForStepByStep.notify_all();
463           _executorState = YACS::RUNNING;
464           sendEvent("executor");
465           ret = true;
466           //if (_dumpOnErrorRequested && _errorDetected) doDump =true;
467           break;
468         }
469       case YACS::FINISHED:
470       case YACS::STOPPED:
471         {
472           //if (_dumpOnErrorRequested && _errorDetected) doDump =true;
473           DEBTRACE("Graph Execution finished or stopped !");
474           break;
475         }
476       default :
477         {
478           // debug: no easy way to verify if main loop is acutally waiting on condition
479         }
480       }
481     DEBTRACE("---");
482     //if (doDump) saveState(_dumpErrorFile);
483   } // --- End of critical section
484   return ret;
485 }
486
487
488 //! define a list of nodes names as breakpoints in the graph
489
490
491 void Executor::setListOfBreakPoints(std::list<std::string> listOfBreakPoints)
492 {
493   DEBTRACE("Executor::setListOfBreakPoints(std::list<std::string> listOfBreakPoints)");
494   { // --- Critical section
495     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
496     _isRunningunderExternalControl=true;
497     _listOfBreakPoints = listOfBreakPoints;
498   } // --- End of critical section
499 }
500
501
502 //! Get the list of tasks to load, to define a subset to execute in step by step mode
503 /*!
504  *  If the executor is not in mode YACS::WAITINGTASKS nor YACS::PAUSED, the list is empty.
505  *  Use Executor::waitPause to wait.
506  */
507 std::list<std::string> Executor::getTasksToLoad()
508 {
509   DEBTRACE("Executor::getTasksToLoad()");
510   list<string> listOfNodesToLoad;
511   listOfNodesToLoad.clear();
512   { // --- Critical section
513     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
514     _isRunningunderExternalControl=true;
515     switch (_executorState)
516       {
517       case YACS::WAITINGTASKS:
518       case YACS::PAUSED:
519         {
520           listOfNodesToLoad = _listOfTasksToLoad;
521           break;
522         }
523       case YACS::NOTYETINITIALIZED:
524       case YACS::INITIALISED:
525       case YACS::RUNNING:
526       case YACS::FINISHED:
527       case YACS::STOPPED:
528       default:
529         {
530           break;
531         }
532       }
533   } // --- End of critical section
534   return listOfNodesToLoad;
535 }
536
537
538 //! Define a subset of task to execute in step by step mode
539 /*!
540  * Behaviour is unpredictable if the list is not a subset of the list given by Executor::getTasksToLoad
541  * in the current step.
542  * If some nodes must run in parallel, they must stay together in the list.
543  */
544
545 bool Executor::setStepsToExecute(std::list<std::string> listToExecute)
546 {
547   DEBTRACE("Executor::setStepsToExecute(std::list<std::string> listToExecute)");
548   bool ret = true;
549   vector<Task *>::iterator iter;
550   vector<Task *> restrictedTasks;
551   { // --- Critical section
552     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
553     _isRunningunderExternalControl=true;
554     switch (_executorState)
555       {
556       case YACS::WAITINGTASKS:
557       case YACS::PAUSED:
558         {
559           for (iter=_tasksSave.begin(); iter!=_tasksSave.end(); iter++)
560             {
561               string readyNode = _mainSched->getTaskName(*iter);
562               if (find(listToExecute.begin(), listToExecute.end(), readyNode)
563                   != listToExecute.end())
564                 {
565                   restrictedTasks.push_back(*iter);
566                   DEBTRACE("node to execute " << readyNode);
567                 }
568             }
569           _tasks.clear();
570           for (iter=restrictedTasks.begin(); iter!=restrictedTasks.end(); iter++)
571             {
572               _tasks.push_back(*iter);
573             }
574           break;
575         }
576       case YACS::NOTYETINITIALIZED:
577       case YACS::INITIALISED:
578       case YACS::RUNNING:
579       case YACS::FINISHED:
580       case YACS::STOPPED:
581       default:
582         {
583           break;
584         }
585       }
586     } // --- End of critical section
587
588   _tasks.clear();
589   for (iter=restrictedTasks.begin(); iter!=restrictedTasks.end(); iter++)
590     {
591       _tasks.push_back(*iter);
592     }
593   for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
594     {
595       string readyNode = _mainSched->getTaskName(*iter);
596       DEBTRACE("selected node to execute " << readyNode);
597     }
598
599   return ret;
600 }
601
602 //! suspend pilot execution until Executor is in pause or waiting tasks completion mode.
603 /*!
604  *  Do nothing if execution is finished or in pause.
605  *  Wait first step if Executor is running or in initialization.
606  */
607
608 void Executor::waitPause()
609 {
610   DEBTRACE("Executor::waitPause()" << _executorState);
611   { // --- Critical section
612     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
613     _isRunningunderExternalControl=true;
614     switch (_executorState)
615       {
616       default:
617       case YACS::STOPPED:
618       case YACS::FINISHED:
619       case YACS::WAITINGTASKS:
620       case YACS::PAUSED:
621         {
622           break;
623         }
624       case YACS::NOTYETINITIALIZED:
625       case YACS::INITIALISED:
626       case YACS::RUNNING:
627         {
628           _condForPilot.wait(_mutexForSchedulerUpdate); // wait until executor is PAUSED or WAITINGTASKS
629           break;
630         }
631       }
632   } // --- End of critical section
633   DEBTRACE("---");
634 }
635
636 /*!
637  * This method can be called at any time simultaneously during a RunB call.
638  * This method will wait until the executor is locked in a consistent state of a running graph.
639  *
640  * This method is expected to be called in association with resume method.
641  * The returned parameter is expected to be transfered to resume method.
642  */
643 bool Executor::suspendASAP()
644 {
645   // no AutoLocker here. It's not a bug.
646   _mutexForSchedulerUpdate.lock();
647   if(!_toContinue && _executorState==YACS::FINISHED)
648     {// execution is finished
649       _mutexForSchedulerUpdate.unLock();
650       return false;// the executor is no more running
651     }
652   //general case. Leave method with locker in locked status
653   return true;
654 }
655
656 /*!
657  * This method is expected to be called in association with suspendASAP method.
658  * Expected to be called just after suspendASAP with output of resume as input parameter
659  */
660 void Executor::resume(bool suspended)
661 {
662   if(suspended)
663     _mutexForSchedulerUpdate.unLock();
664 }
665
666 //! stops the execution as soon as possible 
667
668 void Executor::stopExecution()
669 {
670   setExecMode(YACS::STEPBYSTEP);
671   //waitPause();
672   _isOKToEnd = true;
673   resumeCurrentBreakPoint();
674 }
675
676 //! save the current state of execution in an xml file
677
678 bool Executor::saveState(const std::string& xmlFile)
679 {
680   DEBTRACE("Executor::saveState() in " << xmlFile);
681   bool result = false;
682   try {
683     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
684     YACS::ENGINE::VisitorSaveState vst(_root);
685     vst.openFileDump(xmlFile.c_str());
686     _root->accept(&vst);
687     vst.closeFileDump();
688     result = true;
689   }
690   catch(Exception& ex) {
691     std::cerr << ex.what() << std::endl;
692   }
693   return result;
694 }
695
696 //! not yet implemented
697
698 bool Executor::loadState()
699 {
700   DEBTRACE("Executor::loadState()");
701   _isRunningunderExternalControl=true;
702   return true;
703 }
704
705
706 static int isfile(const char *filename) 
707 {
708   struct stat buf;
709   if (stat(filename, &buf) != 0)
710     return 0;
711   if (!S_ISREG(buf.st_mode))
712     return 0;
713   return 1;
714 }
715
716 //! Display the graph state as a dot display, public method
717
718 void Executor::displayDot(Scheduler *graph)
719 {
720   _isRunningunderExternalControl=true;
721   _displayDot(graph);
722 }
723
724 //! Display the graph state as a dot display
725 /*!
726  *  \param graph  : the node to display
727  */
728
729 void Executor::_displayDot(Scheduler *graph)
730 {
731    std::ofstream g("titi");
732    ((ComposedNode*)graph)->writeDot(g);
733    g.close();
734    const char displayScript[]="display.sh";
735    if(isfile(displayScript))
736      system("sh display.sh");
737    else
738      system("dot -Tpng titi|display -delay 5");
739 }
740
741 //! Wait reactivation in modes Step By step or with BreakPoints
742 /*!
743  *  Check mode of execution (set by main thread):
744  *  - YACS::CONTINUE        : the graph execution continues.
745  *  - YACS::STEPBYSTEP      : wait on condition (pilot thread, Executor::resumeCurrentBreakPoint)
746  *  - YACS::STOPBEFORENODES : if there are ready nodes in a list of breakpoints,
747  *                            wait on condition (pilot thread, Executor::resumeCurrentBreakPoint)
748  *                            else continue the graph execution.
749  *  \return true if end of executor thread is requested
750  */
751
752 bool Executor::checkBreakPoints()
753 {
754   DEBTRACE("Executor::checkBreakPoints()");
755   vector<Task *>::iterator iter;
756   bool endRequested = false;
757
758   switch (_execMode)
759     {
760     case YACS::CONTINUE:
761       {
762         break;
763       }
764     case YACS::STOPBEFORENODES:
765       {
766         bool stop = false;
767         { // --- Critical section
768           YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
769           _tasksSave = _tasks;
770           for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
771             {
772               string nodeToLoad = _mainSched->getTaskName(*iter);
773               if (find(_listOfBreakPoints.begin(), _listOfBreakPoints.end(), nodeToLoad)
774                   != _listOfBreakPoints.end())
775                 {
776                   stop = true;
777                   break;
778                 }
779             }
780           if (stop)
781             {
782               _listOfTasksToLoad.clear();
783               for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
784                 {
785                   string nodeToLoad = _mainSched->getTaskName(*iter);
786                   _listOfTasksToLoad.push_back(nodeToLoad);
787                 }
788               if (getNbOfThreads())
789                 _executorState = YACS::WAITINGTASKS; // will be paused after completion of running tasks
790               else
791                 _executorState = YACS::PAUSED;
792               sendEvent("executor");
793               _condForPilot.notify_all();
794             }
795           if (stop && !_isOKToEnd) waitResume(); // wait until pilot calls resumeCurrentBreakPoint(), mutex released during wait 
796           if (_isOKToEnd) endRequested = true;
797         } // --- End of critical section
798           if (stop) DEBTRACE("wake up from waitResume");
799         break;
800       }
801     default:
802     case YACS::STEPBYSTEP:
803       {
804         { // --- Critical section
805           YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
806           _tasksSave = _tasks;
807           _listOfTasksToLoad.clear();
808           for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
809             {
810               string nodeToLoad = _mainSched->getTaskName(*iter);
811               _listOfTasksToLoad.push_back(nodeToLoad);
812             }
813           if (getNbOfThreads())
814             _executorState = YACS::WAITINGTASKS; // will be paused after completion of running tasks
815           else
816             _executorState = YACS::PAUSED;
817           sendEvent("executor");
818           _condForPilot.notify_all();
819           if (!_isOKToEnd)
820             waitResume(); // wait until pilot calls resumeCurrentBreakPoint(), mutex released during wait
821                           // or, if no pilot, wait until no more running tasks (stop on error)
822           if (_isOKToEnd) endRequested = true;
823         } // --- End of critical section
824         DEBTRACE("wake up from waitResume");
825         break;
826       }
827     }
828   DEBTRACE("endRequested: " << endRequested);
829   return endRequested;
830 }
831
832
833 //! in modes Step By step or with BreakPoint, wait until pilot resumes the execution
834 /*!
835  *  With the condition Mutex, the mutex is released atomically during the wait.
836  *  Pilot calls Executor::resumeCurrentBreakPoint to resume execution.
837  *  Must be called while mutex is locked.
838  */
839
840 void Executor::waitResume()
841 {
842   DEBTRACE("Executor::waitResume()");
843   _condForStepByStep.wait(_mutexForSchedulerUpdate); // wait until pilot calls resumeCurrentBreakPoint()
844   DEBTRACE("---");
845 }
846
847
848 //! Perform loading of a Task.
849 /*!
850  *  \param task  : Task to load
851  */
852
853 void Executor::loadTask(Task *task, const Executor *execInst)
854 {
855   DEBTRACE("Executor::loadTask(Task *task)");
856   if(task->getState() != YACS::TOLOAD)
857     return;
858   traceExec(task, "state:TOLOAD", ComputePlacement(task));
859   {//Critical section
860     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
861     _mainSched->notifyFrom(task,YACS::START,execInst);
862   }//End of critical section
863   try
864     {
865       traceExec(task, "load", ComputePlacement(task));
866       task->load();
867       traceExec(task, "initService", ComputePlacement(task));
868       task->initService();
869     }
870   catch(Exception& ex) 
871     {
872       std::cerr << ex.what() << std::endl;
873       {//Critical section
874         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
875         task->aborted();
876         _mainSched->notifyFrom(task,YACS::ABORT,execInst);
877         traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
878       }//End of critical section
879     }
880   catch(...) 
881     {
882       std::cerr << "Load failed" << std::endl;
883       {//Critical section
884         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
885         task->aborted();
886         _mainSched->notifyFrom(task,YACS::ABORT,execInst);
887         traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
888       }//End of critical section
889     }
890 }
891
892 struct threadargs
893 {
894   Task *task;
895   Scheduler *sched;
896   Executor *execInst;
897 };
898
899 void Executor::loadParallelTasks(const std::vector<Task *>& tasks, const Executor *execInst)
900 {
901   std::vector<Thread> ths(tasks.size());
902   std::size_t ithread(0);
903   for(std::vector<Task *>::const_iterator iter = _tasks.begin(); iter != _tasks.end(); iter++, ithread++)
904     {
905       DEBTRACE("Executor::loadParallelTasks(Task *task)");
906       struct threadargs *args(new threadargs);
907       args->task = (*iter);
908       args->sched = _mainSched;
909       args->execInst = this;
910       ths[ithread].go(functionForTaskLoad, args, _threadStackSize);
911     }
912   for(ithread=0;ithread<tasks.size();ithread++)
913     ths[ithread].join();
914 }
915
916 //! Execute a list of tasks possibly connected through datastream links
917 /*!
918  *  \param tasks  : a list of tasks to execute
919  *
920  */
921 void Executor::launchTasks(const std::vector<Task *>& tasks)
922 {
923   //First phase, make datastream connections
924   for(vector<Task *>::const_iterator iter=tasks.begin();iter!=tasks.end();iter++)
925     {
926       YACS::StatesForNode state=(*iter)->getState();
927       if(state != YACS::TOLOAD && state != YACS::TORECONNECT)continue;
928       try
929         {
930           (*iter)->connectService();
931           traceExec(*iter, "connectService",ComputePlacement(*iter));
932           {//Critical section
933             YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
934             (*iter)->connected();
935           }//End of critical section
936         }
937       catch(Exception& ex) 
938         {
939           std::cerr << ex.what() << std::endl;
940           try
941             {
942               (*iter)->disconnectService();
943               traceExec(*iter, "disconnectService",ComputePlacement(*iter));
944             }
945           catch(...) 
946             {
947               // Disconnect has failed
948               traceExec(*iter, "disconnectService failed, ABORT",ComputePlacement(*iter));
949             }
950           {//Critical section
951             YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
952             (*iter)->aborted();
953             _mainSched->notifyFrom(*iter,YACS::ABORT,this);
954           }//End of critical section
955         }
956       catch(...) 
957         {
958           std::cerr << "Problem in connectService" << std::endl;
959           try
960             {
961               (*iter)->disconnectService();
962               traceExec(*iter, "disconnectService",ComputePlacement(*iter));
963             }
964           catch(...) 
965             {
966               // Disconnect has failed
967               traceExec(*iter, "disconnectService failed, ABORT",ComputePlacement(*iter));
968             }
969           {//Critical section
970             YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
971             (*iter)->aborted();
972             _mainSched->notifyFrom(*iter,YACS::ABORT,this);
973           }//End of critical section
974         }
975       if((*iter)->getState() == YACS::ERROR)
976         {
977           //try to put all coupled tasks in error
978           std::set<Task*> coupledSet;
979           (*iter)->getCoupledTasks(coupledSet);
980           for (std::set<Task*>::iterator it = coupledSet.begin(); it != coupledSet.end(); ++it)
981             {
982               Task* t=*it;
983               if(t == *iter)continue;
984               if(t->getState() == YACS::ERROR)continue;
985               try
986                 {
987                   t->disconnectService();
988                   traceExec(t, "disconnectService",ComputePlacement(*iter));
989                 }
990               catch(...)
991                 {
992                   // Disconnect has failed
993                   traceExec(t, "disconnectService failed, ABORT",ComputePlacement(*iter));
994                 }
995               {//Critical section
996                 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
997                 t->aborted();
998                 _mainSched->notifyFrom(t,YACS::ABORT,this);
999               }//End of critical section
1000               traceExec(t, "state:"+Node::getStateName(t->getState()),ComputePlacement(*iter));
1001             }
1002         }
1003       traceExec(*iter, "state:"+Node::getStateName((*iter)->getState()),ComputePlacement(*iter));
1004     }
1005
1006   //Second phase, execute each task in a thread
1007   for(vector<Task *>::const_iterator iter=tasks.begin();iter!=tasks.end();iter++)
1008     {
1009       launchTask(*iter);
1010     }
1011 }
1012
1013 //! Execute a Task in a thread
1014 /*!
1015  *  \param task  : Task to execute
1016  *
1017  *  Calls Scheduler::notifyFrom of main node (_mainSched) to notify start
1018  *
1019  *  Calls Executor::functionForTaskExecution in Thread
1020  */
1021
1022 void Executor::launchTask(Task *task)
1023 {
1024   DEBTRACE("Executor::launchTask(Task *task)");
1025   struct threadargs *args;
1026   if(task->getState() != YACS::TOACTIVATE)return;
1027
1028   DEBTRACE("before _semForMaxThreads.wait " << _semThreadCnt);
1029   if(_semThreadCnt == 0)
1030     {
1031       // --- Critical section
1032       YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1033       //check if we have enough threads to run
1034       std::set<Task*> tmpSet=_runningTasks;
1035       std::set<Task*>::iterator it = tmpSet.begin();
1036       std::string status="running";
1037       std::set<Task*> coupledSet;
1038       while( it != tmpSet.end() )
1039         {
1040           Task* tt=*it;
1041           coupledSet.clear();
1042           tt->getCoupledTasks(coupledSet);
1043           status="running";
1044           for (std::set<Task*>::iterator iter = coupledSet.begin(); iter != coupledSet.end(); ++iter)
1045             {
1046               if((*iter)->getState() == YACS::TOACTIVATE)status="toactivate";
1047               tmpSet.erase(*iter);
1048             }
1049           if(status=="running")break;
1050           it = tmpSet.begin();
1051         }
1052
1053       if(status=="toactivate")
1054         {
1055           std::cerr << "WARNING: maybe you need more threads to run your schema (current value="<< _maxThreads << ")" << std::endl;
1056           std::cerr << "If it is the case, set the YACS_MAX_THREADS environment variable to a bigger value (export YACS_MAX_THREADS=xxx)" << std::endl;
1057         }
1058       // --- End of critical section
1059     }
1060
1061   _semForMaxThreads.wait();
1062   _semThreadCnt -= 1;
1063
1064   args= new threadargs;
1065   args->task = task;
1066   args->sched = _mainSched;
1067   args->execInst = this;
1068
1069   traceExec(task, "launch",ComputePlacement(task));
1070
1071   { // --- Critical section
1072     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1073     _numberOfRunningTasks++;
1074     _runningTasks.insert(task);
1075     task->begin(); //change state to ACTIVATED
1076   } // --- End of critical section
1077   Thread(functionForTaskExecution, args, _threadStackSize);
1078 }
1079
1080 //! wait until a running task ends
1081
1082 void Executor::sleepWhileNoEventsFromAnyRunningTask()
1083 {
1084   DEBTRACE("Executor::sleepWhileNoEventsFromAnyRunningTask()");
1085 //   _semForNewTasksToPerform.wait(); //----utiliser pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
1086   YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1087   if (_numberOfRunningTasks > 0 && _numberOfEndedTasks==0)
1088     {
1089       _isWaitingEventsFromRunningTasks = true;
1090       _condForNewTasksToPerform.wait(_mutexForSchedulerUpdate); // mutex released during wait
1091     }
1092   _numberOfEndedTasks=0;
1093   DEBTRACE("---");
1094 }
1095
1096 //! not implemented
1097
1098 void Executor::notifyEndOfThread(YACS::BASES::Thread *thread)
1099 {
1100   /*_mutexForNbOfConcurrentThreads.lock();
1101   _groupOfAllThreadsCreated.remove(thread);
1102   delete thread;
1103   _mutexForNbOfConcurrentThreads.unlock();*/
1104 }
1105
1106
1107 //! must be used protected by _mutexForSchedulerUpdate!
1108
1109 void Executor::wakeUp()
1110 {
1111   DEBTRACE("Executor::wakeUp() " << _isWaitingEventsFromRunningTasks);
1112   if (_isWaitingEventsFromRunningTasks)
1113     {
1114       _isWaitingEventsFromRunningTasks = false;
1115       _condForNewTasksToPerform.notify_all();
1116     }
1117   else
1118     _numberOfEndedTasks++;
1119 }
1120
1121 //! number of running tasks
1122
1123 int Executor::getNbOfThreads()
1124 {
1125   int ret;
1126   YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForNbOfConcurrentThreads);
1127   _isRunningunderExternalControl=true;
1128   ret = _groupOfAllThreadsCreated.size();
1129   return ret;
1130 }
1131
1132 /*!
1133  * This thread is NOT supposed to be detached !
1134  */
1135 void *Executor::functionForTaskLoad(void *arg)
1136 {
1137   DEBTRACE("Executor::functionForTaskLoad(void *arg)");
1138   struct threadargs *args = (struct threadargs *) arg;
1139   Task *task=args->task;
1140   Scheduler *sched=args->sched;
1141   Executor *execInst=args->execInst;
1142   delete args;
1143   execInst->loadTask(task,execInst);// no throw of this method - all throw are catched !
1144   return 0;
1145 }
1146
1147 //! Function to perform execution of a task in a thread
1148 /*!
1149  *  \param arg  : 3 elements (a Task, a Scheduler, an Executor)
1150  *
1151  *  Calls Task::execute
1152  *
1153  *  Calls Task::finished when the task is finished
1154  *
1155  *  Calls (notify with event YACS::FINISH) Scheduler::notifyFrom when the task is finished
1156  *
1157  *  Calls Executor::wakeUp and Executor::notifyEndOfThread
1158  */
1159
1160 void *Executor::functionForTaskExecution(void *arg)
1161 {
1162   DEBTRACE("Executor::functionForTaskExecution(void *arg)");
1163
1164   struct threadargs *args = (struct threadargs *) arg;
1165   Task *task=args->task;
1166   Scheduler *sched=args->sched;
1167   Executor *execInst=args->execInst;
1168   delete args;
1169   execInst->traceExec(task, "state:"+Node::getStateName(task->getState()),ComputePlacement(task));
1170
1171   Thread::detach();
1172
1173   // Execute task
1174
1175   if(execInst->getDPLScopeSensitive())
1176     {
1177       Node *node(dynamic_cast<Node *>(task));
1178       ComposedNode *gfn(dynamic_cast<ComposedNode *>(sched));
1179       if(node!=0 && gfn!=0)
1180         node->applyDPLScope(gfn);
1181     }
1182
1183   YACS::Event ev=YACS::FINISH;
1184   try
1185     {
1186       execInst->traceExec(task, "start execution",ComputePlacement(task));
1187       task->execute();
1188       execInst->traceExec(task, "end execution OK",ComputePlacement(task));
1189     }
1190   catch(Exception& ex)
1191     {
1192       std::cerr << "YACS Exception during execute" << std::endl;
1193       std::cerr << ex.what() << std::endl;
1194       ev=YACS::ABORT;
1195       string message = "end execution ABORT, ";
1196       message += ex.what();
1197       execInst->traceExec(task, message,ComputePlacement(task));
1198     }
1199   catch(...) 
1200     {
1201       // Execution has failed
1202       std::cerr << "Execution has failed: unknown reason" << std::endl;
1203       ev=YACS::ABORT;
1204       execInst->traceExec(task, "end execution ABORT, unknown reason",ComputePlacement(task));
1205     }
1206
1207   // Disconnect task
1208   try
1209     {
1210       DEBTRACE("task->disconnectService()");
1211       task->disconnectService();
1212       execInst->traceExec(task, "disconnectService",ComputePlacement(task));
1213     }
1214   catch(...) 
1215     {
1216       // Disconnect has failed
1217       std::cerr << "disconnect has failed" << std::endl;
1218       ev=YACS::ABORT;
1219       execInst->traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
1220     }
1221   //
1222
1223   std::string placement(ComputePlacement(task));
1224
1225   // container management for HomogeneousPoolOfContainer
1226
1227   HomogeneousPoolContainer *contC(dynamic_cast<HomogeneousPoolContainer *>(task->getContainer()));
1228   if(contC)
1229     {
1230       std::lock_guard<std::mutex> alckCont(contC->getLocker());
1231       contC->release(task);
1232     }
1233
1234   DEBTRACE("End task->execute()");
1235   { // --- Critical section
1236     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&execInst->_mutexForSchedulerUpdate);
1237     try
1238       {
1239         if (ev == YACS::FINISH) task->finished();
1240         if (ev == YACS::ABORT)
1241           {
1242             execInst->_errorDetected = true;
1243             if (execInst->_stopOnErrorRequested)
1244               {
1245                 execInst->_execMode = YACS::STEPBYSTEP;
1246                 execInst->_isOKToEnd = true;
1247               }
1248             task->aborted();
1249           }
1250         execInst->traceExec(task, "state:"+Node::getStateName(task->getState()),placement);
1251         sched->notifyFrom(task,ev,execInst);
1252       }
1253     catch(Exception& ex)
1254       {
1255         //notify has failed : it is supposed to have set state
1256         //so no need to do anything
1257         std::cerr << "Error during notification" << std::endl;
1258         std::cerr << ex.what() << std::endl;
1259       }
1260     catch(...)
1261       {
1262         //notify has failed : it is supposed to have set state
1263         //so no need to do anything
1264         std::cerr << "Notification failed" << std::endl;
1265       }
1266     execInst->_numberOfRunningTasks--;
1267     execInst->_runningTasks.erase(task);
1268     DEBTRACE("_numberOfRunningTasks: " << execInst->_numberOfRunningTasks 
1269              << " _execMode: " << execInst->_execMode
1270              << " _executorState: " << execInst->_executorState);
1271     if ((execInst->_numberOfRunningTasks == 0) && (execInst->_execMode != YACS::CONTINUE)) // no more running tasks
1272       {
1273         if (execInst->_executorState == YACS::WAITINGTASKS)
1274           {
1275             execInst->_executorState = YACS::PAUSED;
1276             execInst->sendEvent("executor");
1277             execInst->_condForPilot.notify_all();
1278             if (execInst->_errorDetected &&
1279                 execInst->_stopOnErrorRequested &&
1280                 !execInst->_isRunningunderExternalControl)
1281               execInst->_condForStepByStep.notify_all(); // exec thread may be on waitResume
1282           }
1283       }
1284     DEBTRACE("before _semForMaxThreads.post " << execInst->_semThreadCnt);
1285     execInst->_semForMaxThreads.post();
1286     execInst->_semThreadCnt += 1;
1287     DEBTRACE("after _semForMaxThreads.post " << execInst->_semThreadCnt);
1288     if (execInst->_executorState != YACS::PAUSED)  execInst->wakeUp();
1289
1290   } // --- End of critical section (change state)
1291
1292   //execInst->notifyEndOfThread(0);
1293   Thread::exit(0);
1294   return 0;
1295 }
1296
1297 void Executor::traceExec(Task *task, const std::string& message, const std::string& placement)
1298 {
1299   string nodeName = _mainSched->getTaskName(task);
1300   Container *cont = task->getContainer();
1301   string containerName = "---";
1302   if (cont)
1303     containerName = cont->getName();
1304
1305   std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now();
1306   std::chrono::milliseconds millisec;
1307   millisec = std::chrono::duration_cast<std::chrono::milliseconds>(now -_start);
1308   double elapse = double(millisec.count()) / 1000.0;
1309   {
1310     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForTrace);
1311     _trace << elapse << " " << containerName << " " << placement << " " << nodeName << " " << message << endl;
1312     _trace << flush;
1313   }
1314 }
1315
1316 //! emit notification to all observers registered with  the dispatcher 
1317 /*!
1318  * The dispatcher is unique and can be obtained by getDispatcher()
1319  */
1320 void Executor::sendEvent(const std::string& event)
1321 {
1322   Dispatcher* disp=Dispatcher::getDispatcher();
1323   YASSERT(disp);
1324   YASSERT(_root);
1325   disp->dispatch(_root,event);
1326 }
1327
1328 struct HPCCompare
1329 {
1330   bool operator()(HomogeneousPoolContainer * lhs, HomogeneousPoolContainer * rhs) const
1331   {
1332     if(!lhs && !rhs)
1333       return false;
1334     if(!lhs)
1335       return true;
1336     if(!rhs)
1337       return false;
1338     return lhs->getNumberOfCoresPerWorker() < rhs->getNumberOfCoresPerWorker();
1339   }
1340 };
1341
1342 /*!
1343  * This method takes in input a list of tasks and selects from that lists a part of it considering only the containers.
1344  * If tasks have no container instance subclass of HomogeneousPoolContainer this method will let the \a tsks untouched.
1345  *
1346  * \param [in,out] tsks - list of tasks to be
1347  */
1348 void Executor::FilterTasksConsideringContainers(std::vector<Task *>& tsks)
1349 {
1350   std::map<HomogeneousPoolContainer *, std::vector<Task *>, HPCCompare > m;
1351   for(auto cur : tsks)
1352     {
1353       if(!cur)
1354         continue;
1355       Container *cont(cur->getContainer());
1356       if(!cont)
1357         {
1358           m[nullptr].push_back(cur);
1359           continue;
1360         }
1361       HomogeneousPoolContainer *contC(dynamic_cast<HomogeneousPoolContainer *>(cont));
1362       if(!contC)
1363         {
1364           m[nullptr].push_back(cur);
1365           continue;
1366         }
1367       m[contC].push_back(cur);
1368     }
1369   //
1370   std::vector<Task *> ret;
1371   for(auto it : m)
1372     {
1373       HomogeneousPoolContainer *curhpc(it.first);
1374       const std::vector<Task *>& curtsks(it.second);
1375       if(!curhpc)
1376         {
1377           ret.insert(ret.end(),curtsks.begin(),curtsks.end());
1378         }
1379       else
1380         {
1381           // start of critical section for container curhpc
1382           std::lock_guard<std::mutex> alckCont(curhpc->getLocker());
1383           std::vector<const Task *> vecOfTaskSharingSameHPContToBeRunSimutaneously;
1384           std::size_t sz(curhpc->getNumberOfFreePlace());
1385           std::vector<Task *>::const_iterator it2(curtsks.begin());
1386           for(std::size_t i=0;i<sz && it2!=curtsks.end();i++,it2++)
1387             {
1388               vecOfTaskSharingSameHPContToBeRunSimutaneously.push_back(*it2);
1389               ret.push_back(*it2);
1390             }
1391           curhpc->allocateFor(vecOfTaskSharingSameHPContToBeRunSimutaneously);
1392           //end of critical section
1393         }
1394     }
1395   //
1396   tsks=ret;
1397 }
1398
1399 std::string Executor::ComputePlacement(Task *zeTask)
1400 {
1401   std::string placement("---");
1402   if(!zeTask)
1403     return placement;
1404   if(zeTask->getContainer())
1405     placement=zeTask->getContainer()->getFullPlacementId(zeTask);
1406   return placement;
1407 }
1408
1409 ///////// NEW EXECUTOR ////////////////////////////////
1410 void Executor::loadTask(Task *task, const WorkloadManager::RunInfo& runInfo)
1411 {
1412   if(task->getState() != YACS::TOLOAD)
1413     return;
1414   traceExec(task, "state:TOLOAD", ComputePlacement(task));
1415   {//Critical section
1416     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1417     _mainSched->notifyFrom(task,YACS::START,this);
1418   }//End of critical section
1419   try
1420     {
1421       std::ostringstream container_name;
1422       container_name << runInfo.resource.name << "-"
1423                      << runInfo.type.name << "-" << runInfo.index;
1424       task->imposeResource(runInfo.resource.name, container_name.str());
1425       traceExec(task, "load", ComputePlacement(task));
1426       task->load();
1427       traceExec(task, "initService", ComputePlacement(task));
1428       task->initService();
1429     }
1430   catch(Exception& ex) 
1431     {
1432       std::cerr << ex.what() << std::endl;
1433       {//Critical section
1434         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1435         task->aborted();
1436         _mainSched->notifyFrom(task,YACS::ABORT, this);
1437         traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
1438       }//End of critical section
1439     }
1440   catch(...) 
1441     {
1442       std::cerr << "Load failed" << std::endl;
1443       {//Critical section
1444         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1445         task->aborted();
1446         _mainSched->notifyFrom(task,YACS::ABORT, this);
1447         traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
1448       }//End of critical section
1449     }
1450 }
1451
1452 void Executor::beginTask(Task *task)
1453 {
1454   // --- Critical section
1455   YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1456   _numberOfRunningTasks++;
1457   _runningTasks.insert(task);
1458   // --- End of critical section
1459 }
1460
1461 void Executor::endTask(Task *task, YACS::Event ev)
1462 {
1463   YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1464   try
1465   {
1466     if (ev == YACS::FINISH) task->finished();
1467     if (ev == YACS::ABORT)
1468     {
1469       _errorDetected = true;
1470       if (_stopOnErrorRequested)
1471       {
1472         _execMode = YACS::STEPBYSTEP;
1473         _isOKToEnd = true;
1474       }
1475       task->aborted();
1476     }
1477     //traceExec(task, "state:"+Node::getStateName(task->getState()),placement);
1478     _mainSched->notifyFrom(task,ev,this);
1479   }
1480   catch(Exception& ex)
1481   {
1482     //notify has failed : it is supposed to have set state
1483     //so no need to do anything
1484     std::cerr << "Error during notification" << std::endl;
1485     std::cerr << ex.what() << std::endl;
1486   }
1487   catch(...)
1488   {
1489     //notify has failed : it is supposed to have set state
1490     //so no need to do anything
1491     std::cerr << "Notification failed" << std::endl;
1492   }
1493   _numberOfRunningTasks--;
1494   _runningTasks.erase(task);
1495   DEBTRACE("_numberOfRunningTasks: " << _numberOfRunningTasks 
1496             << " _execMode: " << _execMode
1497             << " _executorState: " << _executorState);
1498   if ((_numberOfRunningTasks == 0) && (_execMode != YACS::CONTINUE)) // no more running tasks
1499     {
1500       if (_executorState == YACS::WAITINGTASKS)
1501         {
1502           _executorState = YACS::PAUSED;
1503           sendEvent("executor");
1504           _condForPilot.notify_all();
1505           if (_errorDetected &&
1506               _stopOnErrorRequested &&
1507               !_isRunningunderExternalControl)
1508             _condForStepByStep.notify_all(); // exec thread may be on waitResume
1509         }
1510     }
1511   if (_executorState != YACS::PAUSED)
1512     wakeUp();
1513 }
1514
1515 YACS::Event  Executor::runTask(Task *task)
1516 {
1517   { // --- Critical section
1518     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1519     task->begin(); //change state to ACTIVATED
1520   }
1521   traceExec(task, "state:"+Node::getStateName(task->getState()),ComputePlacement(task));
1522
1523   if(getDPLScopeSensitive())
1524     {
1525       Node *node(dynamic_cast<Node *>(task));
1526       ComposedNode *gfn(dynamic_cast<ComposedNode *>(_mainSched));
1527       if(node!=0 && gfn!=0)
1528         node->applyDPLScope(gfn);
1529     }
1530
1531   YACS::Event ev=YACS::FINISH;
1532   try
1533     {
1534       traceExec(task, "start execution",ComputePlacement(task));
1535       task->execute();
1536       traceExec(task, "end execution OK",ComputePlacement(task));
1537     }
1538   catch(Exception& ex)
1539     {
1540       std::cerr << "YACS Exception during execute" << std::endl;
1541       std::cerr << ex.what() << std::endl;
1542       ev=YACS::ABORT;
1543       string message = "end execution ABORT, ";
1544       message += ex.what();
1545       traceExec(task, message,ComputePlacement(task));
1546     }
1547   catch(...) 
1548     {
1549       // Execution has failed
1550       std::cerr << "Execution has failed: unknown reason" << std::endl;
1551       ev=YACS::ABORT;
1552       traceExec(task, "end execution ABORT, unknown reason",ComputePlacement(task));
1553     }
1554
1555   // Disconnect task
1556   try
1557     {
1558       DEBTRACE("task->disconnectService()");
1559       task->disconnectService();
1560       traceExec(task, "disconnectService",ComputePlacement(task));
1561     }
1562   catch(...) 
1563     {
1564       // Disconnect has failed
1565       std::cerr << "disconnect has failed" << std::endl;
1566       ev=YACS::ABORT;
1567       traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
1568     }
1569   //
1570
1571   std::string placement(ComputePlacement(task));
1572
1573   // container management for HomogeneousPoolOfContainer
1574
1575   HomogeneousPoolContainer *contC(dynamic_cast<HomogeneousPoolContainer *>(task->getContainer()));
1576   if(contC)
1577   {
1578     std::lock_guard<std::mutex> alckCont(contC->getLocker());
1579     contC->release(task);
1580   }
1581
1582   return ev;
1583 }
1584
1585 void Executor::makeDatastreamConnections(Task *task)
1586 {
1587   YACS::StatesForNode state=task->getState();
1588   if(state != YACS::TOLOAD && state != YACS::TORECONNECT)
1589     return;
1590   try
1591     {
1592       task->connectService();
1593       traceExec(task, "connectService",ComputePlacement(task));
1594       {//Critical section
1595         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1596         task->connected();
1597       }//End of critical section
1598     }
1599   catch(Exception& ex) 
1600     {
1601       std::cerr << ex.what() << std::endl;
1602       try
1603         {
1604           (task)->disconnectService();
1605           traceExec(task, "disconnectService",ComputePlacement(task));
1606         }
1607       catch(...) 
1608         {
1609           // Disconnect has failed
1610           traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
1611         }
1612       {//Critical section
1613         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1614         task->aborted();
1615         _mainSched->notifyFrom(task,YACS::ABORT,this);
1616       }//End of critical section
1617     }
1618   catch(...) 
1619     {
1620       std::cerr << "Problem in connectService" << std::endl;
1621       try
1622         {
1623           (task)->disconnectService();
1624           traceExec(task, "disconnectService",ComputePlacement(task));
1625         }
1626       catch(...) 
1627         {
1628           // Disconnect has failed
1629           traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
1630         }
1631       {//Critical section
1632         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1633         task->aborted();
1634         _mainSched->notifyFrom(task,YACS::ABORT,this);
1635       }//End of critical section
1636     }
1637   if(task->getState() == YACS::ERROR)
1638     {
1639       //try to put all coupled tasks in error
1640       std::set<Task*> coupledSet;
1641       task->getCoupledTasks(coupledSet);
1642       for (std::set<Task*>::iterator it = coupledSet.begin(); it != coupledSet.end(); ++it)
1643         {
1644           Task* t=*it;
1645           if(t == task)continue;
1646           if(t->getState() == YACS::ERROR)continue;
1647           try
1648             {
1649               t->disconnectService();
1650               traceExec(t, "disconnectService",ComputePlacement(task));
1651             }
1652           catch(...)
1653             {
1654               // Disconnect has failed
1655               traceExec(t, "disconnectService failed, ABORT",ComputePlacement(task));
1656             }
1657           {//Critical section
1658             YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1659             t->aborted();
1660             _mainSched->notifyFrom(t,YACS::ABORT,this);
1661           }//End of critical section
1662           traceExec(t, "state:"+Node::getStateName(t->getState()),ComputePlacement(task));
1663         }
1664     }
1665   traceExec(task, "state:"+Node::getStateName(task->getState()),ComputePlacement(task));
1666 }
1667
1668 #include "Runtime.hxx"
1669 static
1670 void loadResources(WorkloadManager::WorkloadManager& wm)
1671 {
1672   Runtime *r(getRuntime());
1673   if(!r)
1674     throw YACS::Exception("loadResources : no runtime  !");
1675   std::vector< std::pair<std::string,int> > data(r->getCatalogOfComputeNodes());
1676   int id = 0;
1677   for(const std::pair<std::string,int>& res : data)
1678   {
1679     WorkloadManager::Resource newResource;
1680     newResource.name = res.first;
1681     newResource.id = id;
1682     id++;
1683     newResource.nbCores = res.second;
1684     wm.addResource(newResource);
1685   }
1686 }
1687
1688 class NewTask : public WorkloadManager::Task
1689 {
1690 public:
1691   NewTask(Executor& executor, YACS::ENGINE::Task* yacsTask);
1692   const WorkloadManager::ContainerType& type()const override;
1693   void run(const WorkloadManager::RunInfo& runInfo)override;
1694 private:
1695   WorkloadManager::ContainerType _type;
1696   Executor& _executor;
1697   YACS::ENGINE::Task * _yacsTask;
1698 };
1699
1700 NewTask::NewTask(Executor& executor, YACS::ENGINE::Task* yacsTask)
1701 : _type()
1702 , _executor(executor)
1703 , _yacsTask(yacsTask)
1704 {
1705   Container * yacsContainer = yacsTask->getContainer();
1706   if(yacsContainer != nullptr && yacsTask->canAcceptImposedResource())
1707   {
1708     _type.ignoreResources = false;
1709     _type.name = yacsContainer->getName();
1710     std::string nb_procs_str = yacsContainer->getProperty("nb_parallel_procs");
1711     float needed_cores = 0.0;
1712     if(!nb_procs_str.empty())
1713       needed_cores = std::stof(nb_procs_str);
1714     _type.neededCores = needed_cores;
1715   }
1716   else
1717   {
1718     _type.ignoreResources = true;
1719     _type.name = "test";
1720     _type.neededCores = 0;
1721   }
1722   _type.id = 0;
1723 }
1724
1725 const WorkloadManager::ContainerType& NewTask::type()const
1726 {
1727   return _type;
1728 }
1729
1730 void NewTask::run(const WorkloadManager::RunInfo& runInfo)
1731 {
1732   _executor.loadTask(_yacsTask, runInfo);
1733   _executor.makeDatastreamConnections(_yacsTask);
1734   YACS::Event ev = _executor.runTask(_yacsTask);
1735   _executor.endTask(_yacsTask, ev);
1736   delete this; // provisoire
1737 }
1738
1739 void Executor::newRun(Scheduler *graph,int debug, bool fromScratch)
1740 {
1741   DEBTRACE("Executor::newRun debug: "<< graph->getName() <<" "<< debug<<" fromScratch: "<<fromScratch);
1742   { // --- Critical section
1743     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1744     _mainSched = graph;
1745     _root = dynamic_cast<ComposedNode *>(_mainSched);
1746     if (!_root) throw Exception("Executor::Run, Internal Error!");
1747     _executorState = YACS::NOTYETINITIALIZED;
1748     sendEvent("executor");
1749     _toContinue=true;
1750     _isOKToEnd = false;
1751     _errorDetected = false;
1752     _isWaitingEventsFromRunningTasks = false;
1753     _numberOfRunningTasks = 0;
1754     _runningTasks.clear();
1755     _numberOfEndedTasks = 0;
1756     string tracefile = "traceExec_";
1757     tracefile += _mainSched->getName();
1758     _trace.open(tracefile.c_str());
1759     _start = std::chrono::steady_clock::now();
1760   } // --- End of critical section
1761
1762   if (debug > 1) _displayDot(graph);
1763
1764   if (fromScratch)
1765     {
1766       try
1767         {
1768           graph->init();
1769           graph->exUpdateState();
1770         }
1771       catch(Exception& ex)
1772         {
1773           DEBTRACE("exception: "<< (ex.what()));
1774           _executorState = YACS::FINISHED;
1775           sendEvent("executor");
1776           throw;
1777         }
1778     }
1779   _executorState = YACS::INITIALISED;
1780   sendEvent("executor");
1781
1782   if (debug > 1) _displayDot(graph);
1783
1784   bool isMore;
1785   int problemCount=0;
1786   int numberAllTasks;
1787
1788   _executorState = YACS::RUNNING;
1789   sendEvent("executor");
1790
1791   WorkloadManager::DefaultAlgorithm algo;
1792   WorkloadManager::WorkloadManager wlm(algo);
1793   loadResources(wlm);
1794   wlm.start();
1795
1796   while (_toContinue)
1797     {
1798       DEBTRACE("--- executor main loop");
1799       sleepWhileNoEventsFromAnyRunningTask();
1800       DEBTRACE("--- events...");
1801       if (debug > 2) _displayDot(graph);
1802       { // --- Critical section
1803         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1804         std::vector<Task *> readyTasks=graph->getNextTasks(isMore);
1805         graph->selectRunnableTasks(readyTasks);
1806         _tasks.clear();
1807         for(Task * t : readyTasks)
1808           if(_runningTasks.find(t) == _runningTasks.end())
1809             _tasks.push_back(t);
1810         // TODO: to be removed
1811         FilterTasksConsideringContainers(_tasks);
1812         numberAllTasks=_numberOfRunningTasks+_tasks.size();
1813       } // --- End of critical section
1814       if (debug > 2) _displayDot(graph);
1815       DEBTRACE("--- events...");
1816       if (_executorState == YACS::RUNNING)
1817       {
1818         if (checkBreakPoints()) break; // end of thread requested, OK to exit at once;
1819         for(Task * task : _tasks)
1820         {
1821           beginTask(task);
1822           NewTask* newTask = new NewTask(*this, task);
1823           wlm.addTask(newTask);
1824         }
1825       }
1826       if (debug > 1) _displayDot(graph);
1827       { // --- Critical section
1828         DEBTRACE("---");
1829         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1830         //It is possible that the graph is finished but it remains running tasks (it's an error but we must take it into account)
1831         _toContinue = !graph->isFinished();
1832
1833         DEBTRACE("_numberOfRunningTasks: " << _numberOfRunningTasks);
1834         DEBTRACE("_numberOfEndedTasks: " << _numberOfEndedTasks);
1835         DEBTRACE("_toContinue: " << _toContinue);
1836         if(_toContinue && numberAllTasks==0)
1837         {
1838           //Problem : no running tasks and no task to launch ??
1839           problemCount++;
1840           std::cerr << "Problem in Executor : no running tasks and no task to launch ?? problemCount=" << problemCount << std::endl;
1841           //Pause to give a chance to interrupt
1842           usleep(1000);
1843           if(problemCount > 25)
1844           {
1845             // Too much problems encountered : stop execution
1846             _toContinue=false;
1847           }
1848         }
1849
1850         if (! _toContinue)
1851           {
1852             _executorState = YACS::FINISHED;
1853             sendEvent("executor");
1854             _condForPilot.notify_all();
1855           }
1856       } // --- End of critical section
1857       if (debug > 0) _displayDot(graph);
1858       DEBTRACE("_toContinue: " << _toContinue);
1859     }
1860
1861   wlm.stop();
1862   DEBTRACE("End of main Loop");
1863
1864   { // --- Critical section
1865     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1866     if ( _toContinue) // --- break while(): request to stop detected on checkBreakPoints()
1867       {
1868         DEBTRACE("stop requested: End soon");
1869         _executorState = YACS::STOPPED;
1870         _toContinue = false;
1871         sendEvent("executor");
1872       }
1873   } // --- End of critical section
1874   if ( _dumpOnErrorRequested && _errorDetected)
1875     {
1876       saveState(_dumpErrorFile);
1877     }
1878   {
1879     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForTrace);
1880     _trace.close();
1881   }
1882   DEBTRACE("End of RunB thread");  
1883 }
1884
1885 void Executor::RunW(Scheduler *graph,int debug, bool fromScratch)
1886 {
1887   std::string str_value = graph->getProperty("executor");
1888   if(str_value == "WorkloadManager"
1889      || str_value == "WORKLOADMANAGER"
1890      || str_value == "workloadmanager"
1891      || str_value == "WorkLoadManager")
1892     newRun(graph, debug, fromScratch);
1893   else
1894     RunB(graph, debug, fromScratch);
1895 }