Salome HOME
First test qui HPsalome container is running.
[modules/yacs.git] / src / engine / Executor.cxx
1 // Copyright (C) 2006-2014  CEA/DEN, EDF R&D
2 //
3 // This library is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU Lesser General Public
5 // License as published by the Free Software Foundation; either
6 // version 2.1 of the License, or (at your option) any later version.
7 //
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
11 // Lesser General Public License for more details.
12 //
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
16 //
17 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
18 //
19
20 #include "Executor.hxx"
21 #include "Task.hxx"
22 #include "AutoLocker.hxx"
23 #include "Scheduler.hxx"
24 #include "Dispatcher.hxx"
25 #include "Container.hxx"
26 #include "HomogeneousPoolContainer.hxx"
27 #include "ComponentInstance.hxx"
28
29 #include "VisitorSaveState.hxx"
30 #include "ServiceNode.hxx"
31 #include "ComposedNode.hxx"
32
33 #include <iostream>
34 #include <fstream>
35 #include <sys/stat.h>
36 #ifndef WIN32
37 #include <sys/time.h>
38 #include <unistd.h>
39 #endif
40
41 #include <cstdlib>
42 #include <algorithm>
43
44 #ifdef WIN32
45 #define usleep(A) _sleep(A/1000)
46 #if !defined(S_ISCHR) || !defined(S_ISREG)
47 #  ifndef S_IFMT
48 #    ifdef _S_IFMT
49 #      define S_IFMT _S_IFMT
50 #      define S_IFCHR _S_IFCHR
51 #      define S_IFREG _S_IFREG
52 #    else
53 #    ifdef __S_IFMT
54 #      define S_IFMT __S_IFMT
55 #      define S_IFCHR __S_IFCHR
56 #      define S_IFREG __S_IFREG
57 #    endif
58 #    endif
59 #  endif
60 #  define S_ISCHR(mode) (((mode) & S_IFMT) == S_IFCHR)
61 #  define S_ISREG(mode) (((mode) & S_IFMT) == S_IFREG)
62 #endif
63 #endif
64
65 using namespace YACS::ENGINE;
66 using namespace std;
67
68 using YACS::BASES::Mutex;
69 using YACS::BASES::Thread;
70 using YACS::BASES::Semaphore;
71
72 //#define _DEVDEBUG_
73 #include "YacsTrace.hxx"
74
75 int Executor::_maxThreads(50);
76 size_t Executor::_threadStackSize(1048576); // Default thread stack size is 1MB
77
78 Executor::Executor():_nbOfConcurrentThreads(0), _semForMaxThreads(_maxThreads)
79 {
80   _root=0;
81   _toContinue = true;
82   _isOKToEnd = false;
83   _stopOnErrorRequested = false;
84   _dumpOnErrorRequested = false;
85   _errorDetected = false;
86   _isRunningunderExternalControl=false;
87   _executorState = YACS::NOTYETINITIALIZED;
88   _execMode = YACS::CONTINUE;
89   _semThreadCnt = _maxThreads;
90   DEBTRACE("Executor initialized with max threads = " << _maxThreads);
91 }
92
93 Executor::~Executor()
94 {
95   for(list<Thread *>::iterator iter=_groupOfAllThreadsCreated.begin();iter!=_groupOfAllThreadsCreated.end();iter++)
96     delete *iter;
97 }
98
99 //! Execute a graph waiting for completion
100 /*!
101  *  \param graph : schema to execute
102  *  \param debug : display the graph with dot if debug == 1
103  *  \param fromScratch : if true the graph is reinitialized
104  *
105  *  Calls Scheduler::getNextTasks and Scheduler::selectRunnableTasks to select tasks to execute
106  *  
107  *  Calls Executor::launchTask to execute a selected Task.
108  *
109  *  Completion when graph is finished (Scheduler::isFinished)
110  */
111
112 void Executor::RunA(Scheduler *graph,int debug, bool fromScratch)
113 {
114   DEBTRACE("Executor::RunW debug: " << debug << " fromScratch: " << fromScratch);
115   _mainSched=graph;
116   _root = dynamic_cast<ComposedNode *>(_mainSched);
117   if (!_root) throw Exception("Executor::Run, Internal Error!");
118   bool isMore;
119   int i=0;
120   if(debug>1)_displayDot(graph);
121   if (fromScratch)
122     {
123       graph->init();
124       graph->exUpdateState();
125     }
126   if(debug>1)_displayDot(graph);
127   vector<Task *> tasks;
128   vector<Task *>::iterator iter;
129   _toContinue=true;
130   _execMode = YACS::CONTINUE;
131   _isWaitingEventsFromRunningTasks = false;
132   _numberOfRunningTasks = 0;
133   _runningTasks.clear();
134   _numberOfEndedTasks=0;
135   while(_toContinue)
136     {
137       sleepWhileNoEventsFromAnyRunningTask();
138
139       if(debug>2)_displayDot(graph);
140
141       {//Critical section
142         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
143         tasks=graph->getNextTasks(isMore);
144         graph->selectRunnableTasks(tasks);
145       }//End of critical section
146
147       if(debug>2)_displayDot(graph);
148
149       for(iter=tasks.begin();iter!=tasks.end();iter++)
150         loadTask(*iter);
151
152       if(debug>1)_displayDot(graph);
153
154       launchTasks(tasks);
155
156       if(debug>1)_displayDot(graph);
157
158       {//Critical section
159         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
160         _toContinue=!graph->isFinished();
161       }//End of critical section
162       DEBTRACE("_toContinue: " << _toContinue);
163
164       if(debug>0)_displayDot(graph);
165
166       i++;
167     }
168 }
169
170 //! Execute a graph with breakpoints or step by step
171 /*!
172  *  To be launch in a thread (main thread controls the progression).
173  *  \param graph : schema to execute
174  *  \param debug : display the graph with dot if debug >0
175  *  \param fromScratch : if false, state from a previous partial exection is already loaded
176  *
177  *  Calls Scheduler::getNextTasks and Scheduler::selectRunnableTasks to select tasks to execute
178  *  
179  *  Calls Executor::checkBreakPoints to verify if a pause is requested 
180  * 
181  *  Calls Executor::launchTask to execute a selected Task
182  *
183  *  Completion when graph is finished (Scheduler::isFinished)
184  *
185  *  States of execution:
186  *  - YACS::NOTYETINITIALIZED
187  *  - YACS::INITIALISED
188  *  - YACS::RUNNING            (to next breakpoint or step)
189  *  - YACS::WAITINGTASKS       (a breakpoint or step as been reached, but there are still running tasks)
190  *  - YACS::PAUSED             (a breakpoint or step as been reached, no more running tasks)
191  *  - YACS::FINISHED           (no more ready tasks, nore running tasks)
192  *  - YACS::STOPPED            (stopped by user before end)
193  *
194  *  Modes of Execution:
195  *  - YACS::CONTINUE           (normal run without breakpoints)
196  *  - YACS::STEPBYSTEP         (pause at each loop)
197  *  - YACS::STOPBEFORENODES    (pause when a node is reached)
198  *
199  *  A breakpoint is defined by a node name. The breakpoint is reached when the node becomes ready.
200  *  Step by Step means execution node by node or group of node by group of nodes.
201  *  At a given step, the user decides to launch all the ready nodes or only a subset
202  *  (Caution: some nodes must run in parallel). 
203  *  The next event (end of task) may give a new set of ready nodes, and define a new step.
204  *
205  *  The graph execution may be controled by a pilot which sends requests. Requests are asynchronous.
206  *  Requests are taken into account only on certain states, otherwise return the status IgnoredRequest.
207  *  - Executor::getCurrentExecMode
208  *  - Executor::getExecutorState
209  *  - Executor::setExecMode             : change the execution mode for next loop
210  *  - Executor::setListOfBreakPoints    : must be set before setting YACS::STOPBEFORENODES
211  *  - Executor::getTasksToLoad          : when paused or waiting tasks, get the list of next tasks
212  *  - Executor::setStepsToExecute       : define a subset of the list given by Executor::getTasksToLoad
213  *  - Executor::resumeCurrentBreakPoint : when paused or waiting tasks, resumes execution
214  *  - Executor::isNotFinished
215  *  - Executor::stopExecution           : stop execution asap, i.e. set STEPBYSTEP and wait PAUSED
216  *  - Executor::saveState               : dump the current state of execution in an xml file
217  *  - Executor::loadState               : Not yet implemented
218  *  - Executor::getNbOfThreads
219  *  - Executor::displayDot
220  *  - Executor::setStopOnError          : ask to stop execution if a node is found in ERROR state
221  *
222  *  If the pilot wants to wait the state YACS::PAUSED or YACS::WAITINGTASKS, synchronisation is obtained with:
223  *  - Executor::waitPause
224  *
225  *  TO BE VALIDATED:
226  *  - Pilot may connect to executor during execution, or deconnect.
227  *  - Several Pilots may be connected at the same time (for observation...)
228  * 
229  */
230
231 void Executor::RunB(Scheduler *graph,int debug, bool fromScratch)
232 {
233   DEBTRACE("Executor::RunB debug: "<< graph->getName() <<" "<< debug<<" fromScratch: "<<fromScratch);
234
235   { // --- Critical section
236     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
237     _mainSched = graph;
238     _root = dynamic_cast<ComposedNode *>(_mainSched);
239     if (!_root) throw Exception("Executor::Run, Internal Error!");
240     _executorState = YACS::NOTYETINITIALIZED;
241     sendEvent("executor");
242     _toContinue=true;
243     _isOKToEnd = false;
244     _errorDetected = false;
245     _isWaitingEventsFromRunningTasks = false;
246     _numberOfRunningTasks = 0;
247     _runningTasks.clear();
248     _numberOfEndedTasks = 0;
249     string tracefile = "traceExec_";
250     tracefile += _mainSched->getName();
251     _trace.open(tracefile.c_str());
252 #ifdef WIN32
253    _start = timeGetTime();
254 #else
255     gettimeofday(&_start, NULL);
256 #endif
257
258   } // --- End of critical section
259
260   if (debug > 1) _displayDot(graph);
261
262   if (fromScratch)
263     {
264       try
265         {
266           graph->init();
267           graph->exUpdateState();
268         }
269       catch(Exception& ex)
270         {
271           DEBTRACE("exception: "<< (ex.what()));
272           _executorState = YACS::FINISHED;
273           sendEvent("executor");
274           throw;
275         }
276     }
277   _executorState = YACS::INITIALISED;
278   sendEvent("executor");
279
280   if (debug > 1) _displayDot(graph);
281
282   vector<Task *>::iterator iter;
283   bool isMore;
284   int problemCount=0;
285   int numberAllTasks;
286
287   _executorState = YACS::RUNNING;
288   sendEvent("executor");
289   while (_toContinue)
290     {
291       DEBTRACE("--- executor main loop");
292       sleepWhileNoEventsFromAnyRunningTask();
293       DEBTRACE("--- events...");
294       if (debug > 2) _displayDot(graph);
295       { // --- Critical section
296         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
297         _tasks=graph->getNextTasks(isMore);
298         numberAllTasks=_numberOfRunningTasks+_tasks.size();
299         graph->selectRunnableTasks(_tasks);
300         FilterTasksConsideringContainers(_tasks);
301       } // --- End of critical section
302       if (debug > 2) _displayDot(graph);
303       if (_executorState == YACS::RUNNING)
304         {
305           if (checkBreakPoints()) break; // end of thread requested, OK to exit at once;
306           if (debug > 0) _displayDot(graph);
307           DEBTRACE("---");
308           for (iter = _tasks.begin(); iter != _tasks.end(); iter++)
309             loadTask(*iter);
310           if (debug > 1) _displayDot(graph);
311           DEBTRACE("---");
312           launchTasks(_tasks);
313           DEBTRACE("---");
314         }
315       if (debug > 1) _displayDot(graph);
316       { // --- Critical section
317         DEBTRACE("---");
318         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
319         //It is possible that the graph is finished but it remains running tasks (it's an error but we must take it into account)
320         if(_numberOfRunningTasks == 0)
321           _toContinue = !graph->isFinished();
322
323         DEBTRACE("_numberOfRunningTasks: " << _numberOfRunningTasks);
324         DEBTRACE("_numberOfEndedTasks: " << _numberOfEndedTasks);
325         DEBTRACE("_toContinue: " << _toContinue);
326         if(_toContinue && numberAllTasks==0)
327           {
328             //Problem : no running tasks and no task to launch ??
329             problemCount++;
330             std::cerr << "Problem in Executor : no running tasks and no task to launch ?? problemCount=" << problemCount << std::endl;
331             //Pause to give a chance to interrupt
332             usleep(1000);
333             if(problemCount > 25)
334               {
335                 // Too much problems encountered : stop execution
336                 _toContinue=false;
337               }
338           }
339
340         if (! _toContinue)
341           {
342             _executorState = YACS::FINISHED;
343             sendEvent("executor");
344             _condForPilot.notify_all();
345           }
346       } // --- End of critical section
347       if (debug > 0) _displayDot(graph);
348       DEBTRACE("_toContinue: " << _toContinue);
349     }
350
351   DEBTRACE("End of main Loop");
352
353   { // --- Critical section
354     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
355     if ( _toContinue) // --- break while(): request to stop detected on checkBreakPoints()
356       {
357         DEBTRACE("stop requested: End soon");
358         _executorState = YACS::STOPPED;
359         _toContinue = false;
360         sendEvent("executor");
361       }
362   } // --- End of critical section
363   if ( _dumpOnErrorRequested && _errorDetected)
364     {
365       saveState(_dumpErrorFile);
366     }
367   _trace.close();
368   DEBTRACE("End of RunB thread");  
369 }
370
371 YACS::ExecutionMode Executor::getCurrentExecMode()
372 {
373   _isRunningunderExternalControl=true;
374   return _execMode;
375 }
376
377
378 YACS::ExecutorState Executor::getExecutorState()
379 {
380   _isRunningunderExternalControl=true;
381   return _executorState;
382 }
383
384
385 bool Executor::isNotFinished()
386 {
387   _isRunningunderExternalControl=true;
388   return _toContinue;
389 }
390
391 //! ask to stop execution on the first node found in error
392 /*!
393  * \param dumpRequested   produce a state dump when an error is found
394  * \param xmlFile         name of file used for state dump
395  */
396
397 void Executor::setStopOnError(bool dumpRequested, std::string xmlFile)
398 {
399   { // --- Critical section
400     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
401     _dumpErrorFile=xmlFile;
402     _stopOnErrorRequested=true;
403     _dumpOnErrorRequested = dumpRequested;
404     if (dumpRequested && xmlFile.empty())
405       throw YACS::Exception("dump on error requested and no filename given for dump");
406     DEBTRACE("_dumpErrorFile " << _dumpErrorFile << " " << _dumpOnErrorRequested);
407   } // --- End of critical section
408 }
409
410 //! ask to do not stop execution on nodes found in error
411 /*!
412  */
413
414 void Executor::unsetStopOnError()
415 {
416   { // --- Critical section
417     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
418     _stopOnErrorRequested=false;
419   } // --- End of critical section
420 }
421
422 //! Dynamically set the current mode of execution
423 /*!
424  * The mode can be Continue, step by step, or stop before execution of a node
425  * defined in a list of breakpoints.
426  */
427
428 void Executor::setExecMode(YACS::ExecutionMode mode)
429 {
430   DEBTRACE("Executor::setExecMode(YACS::ExecutionMode mode) " << mode);
431   { // --- Critical section
432     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
433     _isRunningunderExternalControl=true;
434     _execMode = mode;
435   } // --- End of critical section
436 }
437
438 //! wake up executor when in pause
439 /*!
440  * When Executor is in state paused or waiting for task completion, the thread
441  * running loop RunB waits on condition _condForStepByStep.
442  * Thread RunB is waken up.
443  * \return true when actually wakes up executor
444  */
445
446 bool Executor::resumeCurrentBreakPoint()
447 {
448   DEBTRACE("Executor::resumeCurrentBreakPoint()");
449   bool ret = false;
450   //bool doDump = false;
451   { // --- Critical section
452     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
453     _isRunningunderExternalControl=true;
454     DEBTRACE("_executorState: " << _executorState);
455     switch (_executorState)
456       {
457       case YACS::WAITINGTASKS:
458       case YACS::PAUSED:
459         {
460           _condForStepByStep.notify_all();
461           _executorState = YACS::RUNNING;
462           sendEvent("executor");
463           ret = true;
464           //if (_dumpOnErrorRequested && _errorDetected) doDump =true;
465           break;
466         }
467       case YACS::FINISHED:
468       case YACS::STOPPED:
469         {
470           //if (_dumpOnErrorRequested && _errorDetected) doDump =true;
471           DEBTRACE("Graph Execution finished or stopped !");
472           break;
473         }
474       default :
475         {
476           // debug: no easy way to verify if main loop is acutally waiting on condition
477         }
478       }
479     DEBTRACE("---");
480     //if (doDump) saveState(_dumpErrorFile);
481   } // --- End of critical section
482   return ret;
483 }
484
485
486 //! define a list of nodes names as breakpoints in the graph
487
488
489 void Executor::setListOfBreakPoints(std::list<std::string> listOfBreakPoints)
490 {
491   DEBTRACE("Executor::setListOfBreakPoints(std::list<std::string> listOfBreakPoints)");
492   { // --- Critical section
493     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
494     _isRunningunderExternalControl=true;
495     _listOfBreakPoints = listOfBreakPoints;
496   } // --- End of critical section
497 }
498
499
500 //! Get the list of tasks to load, to define a subset to execute in step by step mode
501 /*!
502  *  If the executor is not in mode YACS::WAITINGTASKS nor YACS::PAUSED, the list is empty.
503  *  Use Executor::waitPause to wait.
504  */
505 std::list<std::string> Executor::getTasksToLoad()
506 {
507   DEBTRACE("Executor::getTasksToLoad()");
508   list<string> listOfNodesToLoad;
509   listOfNodesToLoad.clear();
510   { // --- Critical section
511     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
512     _isRunningunderExternalControl=true;
513     switch (_executorState)
514       {
515       case YACS::WAITINGTASKS:
516       case YACS::PAUSED:
517         {
518           listOfNodesToLoad = _listOfTasksToLoad;
519           break;
520         }
521       case YACS::NOTYETINITIALIZED:
522       case YACS::INITIALISED:
523       case YACS::RUNNING:
524       case YACS::FINISHED:
525       case YACS::STOPPED:
526       default:
527         {
528           break;
529         }
530       }
531   } // --- End of critical section
532   return listOfNodesToLoad;
533 }
534
535
536 //! Define a subset of task to execute in step by step mode
537 /*!
538  * Behaviour is unpredictable if the list is not a subset of the list given by Executor::getTasksToLoad
539  * in the current step.
540  * If some nodes must run in parallel, they must stay together in the list.
541  */
542
543 bool Executor::setStepsToExecute(std::list<std::string> listToExecute)
544 {
545   DEBTRACE("Executor::setStepsToExecute(std::list<std::string> listToExecute)");
546   bool ret = true;
547   vector<Task *>::iterator iter;
548   vector<Task *> restrictedTasks;
549   { // --- Critical section
550     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
551     _isRunningunderExternalControl=true;
552     switch (_executorState)
553       {
554       case YACS::WAITINGTASKS:
555       case YACS::PAUSED:
556         {
557           for (iter=_tasksSave.begin(); iter!=_tasksSave.end(); iter++)
558             {
559               string readyNode = _mainSched->getTaskName(*iter);
560               if (find(listToExecute.begin(), listToExecute.end(), readyNode)
561                   != listToExecute.end())
562                 {
563                   restrictedTasks.push_back(*iter);
564                   DEBTRACE("node to execute " << readyNode);
565                 }
566             }
567           _tasks.clear();
568           for (iter=restrictedTasks.begin(); iter!=restrictedTasks.end(); iter++)
569             {
570               _tasks.push_back(*iter);
571             }
572           break;
573         }
574       case YACS::NOTYETINITIALIZED:
575       case YACS::INITIALISED:
576       case YACS::RUNNING:
577       case YACS::FINISHED:
578       case YACS::STOPPED:
579       default:
580         {
581           break;
582         }
583       }
584     } // --- End of critical section
585
586   _tasks.clear();
587   for (iter=restrictedTasks.begin(); iter!=restrictedTasks.end(); iter++)
588     {
589       _tasks.push_back(*iter);
590     }
591   for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
592     {
593       string readyNode = _mainSched->getTaskName(*iter);
594       DEBTRACE("selected node to execute " << readyNode);
595     }
596
597   return ret;
598 }
599
600 //! suspend pilot execution until Executor is in pause or waiting tasks completion mode.
601 /*!
602  *  Do nothing if execution is finished or in pause.
603  *  Wait first step if Executor is running or in initialization.
604  */
605
606 void Executor::waitPause()
607 {
608   DEBTRACE("Executor::waitPause()" << _executorState);
609   { // --- Critical section
610     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
611     _isRunningunderExternalControl=true;
612     switch (_executorState)
613       {
614       default:
615       case YACS::STOPPED:
616       case YACS::FINISHED:
617       case YACS::WAITINGTASKS:
618       case YACS::PAUSED:
619         {
620           break;
621         }
622       case YACS::NOTYETINITIALIZED:
623       case YACS::INITIALISED:
624       case YACS::RUNNING:
625         {
626           _condForPilot.wait(_mutexForSchedulerUpdate); // wait until executor is PAUSED or WAITINGTASKS
627           break;
628         }
629       }
630   } // --- End of critical section
631   DEBTRACE("---");
632 }
633
634 //! stops the execution as soon as possible 
635
636 void Executor::stopExecution()
637 {
638   setExecMode(YACS::STEPBYSTEP);
639   //waitPause();
640   _isOKToEnd = true;
641   resumeCurrentBreakPoint();
642 }
643
644 //! save the current state of execution in an xml file
645
646 bool Executor::saveState(const std::string& xmlFile)
647 {
648   DEBTRACE("Executor::saveState() in " << xmlFile);
649   YACS::ENGINE::VisitorSaveState vst(_root);
650   vst.openFileDump(xmlFile.c_str());
651   _root->accept(&vst);
652   vst.closeFileDump();
653   return true;
654 }
655
656 //! not yet implemented
657
658 bool Executor::loadState()
659 {
660   DEBTRACE("Executor::loadState()");
661   _isRunningunderExternalControl=true;
662   return true;
663 }
664
665
666 static int isfile(const char *filename) 
667 {
668   struct stat buf;
669   if (stat(filename, &buf) != 0)
670     return 0;
671   if (!S_ISREG(buf.st_mode))
672     return 0;
673   return 1;
674 }
675
676 //! Display the graph state as a dot display, public method
677
678 void Executor::displayDot(Scheduler *graph)
679 {
680   _isRunningunderExternalControl=true;
681   _displayDot(graph);
682 }
683
684 //! Display the graph state as a dot display
685 /*!
686  *  \param graph  : the node to display
687  */
688
689 void Executor::_displayDot(Scheduler *graph)
690 {
691    std::ofstream g("titi");
692    ((ComposedNode*)graph)->writeDot(g);
693    g.close();
694    const char displayScript[]="display.sh";
695    if(isfile(displayScript))
696      system("sh display.sh");
697    else
698      system("dot -Tpng titi|display -delay 5");
699 }
700
701 //! Wait reactivation in modes Step By step or with BreakPoints
702 /*!
703  *  Check mode of execution (set by main thread):
704  *  - YACS::CONTINUE        : the graph execution continues.
705  *  - YACS::STEPBYSTEP      : wait on condition (pilot thread, Executor::resumeCurrentBreakPoint)
706  *  - YACS::STOPBEFORENODES : if there are ready nodes in a list of breakpoints,
707  *                            wait on condition (pilot thread, Executor::resumeCurrentBreakPoint)
708  *                            else continue the graph execution.
709  *  \return true if end of executor thread is requested
710  */
711
712 bool Executor::checkBreakPoints()
713 {
714   DEBTRACE("Executor::checkBreakPoints()");
715   vector<Task *>::iterator iter;
716   bool endRequested = false;
717
718   switch (_execMode)
719     {
720     case YACS::CONTINUE:
721       {
722         break;
723       }
724     case YACS::STOPBEFORENODES:
725       {
726         bool stop = false;
727         { // --- Critical section
728           YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
729           _tasksSave = _tasks;
730           for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
731             {
732               string nodeToLoad = _mainSched->getTaskName(*iter);
733               if (find(_listOfBreakPoints.begin(), _listOfBreakPoints.end(), nodeToLoad)
734                   != _listOfBreakPoints.end())
735                 {
736                   stop = true;
737                   break;
738                 }
739             }
740           if (stop)
741             {
742               _listOfTasksToLoad.clear();
743               for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
744                 {
745                   string nodeToLoad = _mainSched->getTaskName(*iter);
746                   _listOfTasksToLoad.push_back(nodeToLoad);
747                 }
748               if (getNbOfThreads())
749                 _executorState = YACS::WAITINGTASKS; // will be paused after completion of running tasks
750               else
751                 _executorState = YACS::PAUSED;
752               sendEvent("executor");
753               _condForPilot.notify_all();
754             }
755           if (stop && !_isOKToEnd) waitResume(); // wait until pilot calls resumeCurrentBreakPoint(), mutex released during wait 
756           if (_isOKToEnd) endRequested = true;
757         } // --- End of critical section
758           if (stop) DEBTRACE("wake up from waitResume");
759         break;
760       }
761     default:
762     case YACS::STEPBYSTEP:
763       {
764         { // --- Critical section
765           YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
766           _tasksSave = _tasks;
767           _listOfTasksToLoad.clear();
768           for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
769             {
770               string nodeToLoad = _mainSched->getTaskName(*iter);
771               _listOfTasksToLoad.push_back(nodeToLoad);
772             }
773           if (getNbOfThreads())
774             _executorState = YACS::WAITINGTASKS; // will be paused after completion of running tasks
775           else
776             _executorState = YACS::PAUSED;
777           sendEvent("executor");
778           _condForPilot.notify_all();
779           if (!_isOKToEnd)
780             waitResume(); // wait until pilot calls resumeCurrentBreakPoint(), mutex released during wait
781                           // or, if no pilot, wait until no more running tasks (stop on error)
782           if (_isOKToEnd) endRequested = true;
783         } // --- End of critical section
784         DEBTRACE("wake up from waitResume");
785         break;
786       }
787     }
788   DEBTRACE("endRequested: " << endRequested);
789   return endRequested;
790 }
791
792
793 //! in modes Step By step or with BreakPoint, wait until pilot resumes the execution
794 /*!
795  *  With the condition Mutex, the mutex is released atomically during the wait.
796  *  Pilot calls Executor::resumeCurrentBreakPoint to resume execution.
797  *  Must be called while mutex is locked.
798  */
799
800 void Executor::waitResume()
801 {
802   DEBTRACE("Executor::waitResume()");
803   _condForStepByStep.wait(_mutexForSchedulerUpdate); // wait until pilot calls resumeCurrentBreakPoint()
804   DEBTRACE("---");
805 }
806
807
808 //! Perform loading of a Task.
809 /*!
810  *  \param task  : Task to load
811  */
812
813 void Executor::loadTask(Task *task)
814 {
815   DEBTRACE("Executor::loadTask(Task *task)");
816   if(task->getState() != YACS::TOLOAD)return;
817   traceExec(task, "state:TOLOAD", ComputePlacement(task));
818   {//Critical section
819     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
820     _mainSched->notifyFrom(task,YACS::START);
821   }//End of critical section
822   try
823     {
824       traceExec(task, "load", ComputePlacement(task));
825       task->load();
826       traceExec(task, "initService", ComputePlacement(task));
827       task->initService();
828     }
829   catch(Exception& ex) 
830     {
831       std::cerr << ex.what() << std::endl;
832       {//Critical section
833         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
834         task->aborted();
835         _mainSched->notifyFrom(task,YACS::ABORT);
836         traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
837       }//End of critical section
838     }
839   catch(...) 
840     {
841       std::cerr << "Load failed" << std::endl;
842       {//Critical section
843         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
844         task->aborted();
845         _mainSched->notifyFrom(task,YACS::ABORT);
846         traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
847       }//End of critical section
848     }
849 }
850
851
852 //! Execute a list of tasks possibly connected through datastream links
853 /*!
854  *  \param tasks  : a list of tasks to execute
855  *
856  */
857 void Executor::launchTasks(std::vector<Task *>& tasks)
858 {
859   vector<Task *>::iterator iter;
860   //First phase, make datastream connections
861   for(iter=tasks.begin();iter!=tasks.end();iter++)
862     {
863       YACS::StatesForNode state=(*iter)->getState();
864       if(state != YACS::TOLOAD && state != YACS::TORECONNECT)continue;
865       try
866         {
867           (*iter)->connectService();
868           traceExec(*iter, "connectService",ComputePlacement(*iter));
869           {//Critical section
870             YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
871             (*iter)->connected();
872           }//End of critical section
873         }
874       catch(Exception& ex) 
875         {
876           std::cerr << ex.what() << std::endl;
877           try
878             {
879               (*iter)->disconnectService();
880               traceExec(*iter, "disconnectService",ComputePlacement(*iter));
881             }
882           catch(...) 
883             {
884               // Disconnect has failed
885               traceExec(*iter, "disconnectService failed, ABORT",ComputePlacement(*iter));
886             }
887           {//Critical section
888             YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
889             (*iter)->aborted();
890             _mainSched->notifyFrom(*iter,YACS::ABORT);
891           }//End of critical section
892         }
893       catch(...) 
894         {
895           std::cerr << "Problem in connectService" << std::endl;
896           try
897             {
898               (*iter)->disconnectService();
899               traceExec(*iter, "disconnectService",ComputePlacement(*iter));
900             }
901           catch(...) 
902             {
903               // Disconnect has failed
904               traceExec(*iter, "disconnectService failed, ABORT",ComputePlacement(*iter));
905             }
906           {//Critical section
907             YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
908             (*iter)->aborted();
909             _mainSched->notifyFrom(*iter,YACS::ABORT);
910           }//End of critical section
911         }
912       if((*iter)->getState() == YACS::ERROR)
913         {
914           //try to put all coupled tasks in error
915           std::set<Task*> coupledSet;
916           (*iter)->getCoupledTasks(coupledSet);
917           for (std::set<Task*>::iterator it = coupledSet.begin(); it != coupledSet.end(); ++it)
918             {
919               Task* t=*it;
920               if(t == *iter)continue;
921               if(t->getState() == YACS::ERROR)continue;
922               try
923                 {
924                   t->disconnectService();
925                   traceExec(t, "disconnectService",ComputePlacement(*iter));
926                 }
927               catch(...)
928                 {
929                   // Disconnect has failed
930                   traceExec(t, "disconnectService failed, ABORT",ComputePlacement(*iter));
931                 }
932               {//Critical section
933                 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
934                 t->aborted();
935                 _mainSched->notifyFrom(t,YACS::ABORT);
936               }//End of critical section
937               traceExec(t, "state:"+Node::getStateName(t->getState()),ComputePlacement(*iter));
938             }
939         }
940       traceExec(*iter, "state:"+Node::getStateName((*iter)->getState()),ComputePlacement(*iter));
941     }
942
943   //Second phase, execute each task in a thread
944   for(iter=tasks.begin();iter!=tasks.end();iter++)
945     {
946       launchTask(*iter);
947     }
948 }
949
950 struct threadargs {
951   Task *task;
952   Scheduler *sched;
953   Executor *execInst;
954 };
955
956 //! Execute a Task in a thread
957 /*!
958  *  \param task  : Task to execute
959  *
960  *  Calls Scheduler::notifyFrom of main node (_mainSched) to notify start
961  *
962  *  Calls Executor::functionForTaskExecution in Thread
963  */
964
965 void Executor::launchTask(Task *task)
966 {
967   DEBTRACE("Executor::launchTask(Task *task)");
968   struct threadargs *args;
969   if(task->getState() != YACS::TOACTIVATE)return;
970
971   DEBTRACE("before _semForMaxThreads.wait " << _semThreadCnt);
972   if(_semThreadCnt == 0)
973     {
974       //check if we have enough threads to run
975       std::set<Task*> tmpSet=_runningTasks;
976       std::set<Task*>::iterator it = tmpSet.begin();
977       std::string status="running";
978       std::set<Task*> coupledSet;
979       while( it != tmpSet.end() )
980         {
981           Task* tt=*it;
982           coupledSet.clear();
983           tt->getCoupledTasks(coupledSet);
984           status="running";
985           for (std::set<Task*>::iterator iter = coupledSet.begin(); iter != coupledSet.end(); ++iter)
986             {
987               if((*iter)->getState() == YACS::TOACTIVATE)status="toactivate";
988               tmpSet.erase(*iter);
989             }
990           if(status=="running")break;
991           it = tmpSet.begin();
992         }
993
994       if(status=="toactivate")
995         {
996           std::cerr << "WARNING: maybe you need more threads to run your schema (current value="<< _maxThreads << ")" << std::endl;
997           std::cerr << "If it is the case, set the YACS_MAX_THREADS environment variable to a bigger value (export YACS_MAX_THREADS=xxx)" << std::endl;
998         }
999     }
1000
1001   _semForMaxThreads.wait();
1002   _semThreadCnt -= 1;
1003
1004   args= new threadargs;
1005   args->task = task;
1006   args->sched = _mainSched;
1007   args->execInst = this;
1008
1009   traceExec(task, "launch",ComputePlacement(task));
1010
1011   { // --- Critical section
1012     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1013     _numberOfRunningTasks++;
1014     _runningTasks.insert(task);
1015     task->begin(); //change state to ACTIVATED
1016   } // --- End of critical section
1017   Thread(functionForTaskExecution, args, _threadStackSize);
1018 }
1019
1020 //! wait until a running task ends
1021
1022 void Executor::sleepWhileNoEventsFromAnyRunningTask()
1023 {
1024   DEBTRACE("Executor::sleepWhileNoEventsFromAnyRunningTask()");
1025 //   _semForNewTasksToPerform.wait(); //----utiliser pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
1026   YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1027   if (_numberOfRunningTasks > 0 && _numberOfEndedTasks==0)
1028     {
1029       _isWaitingEventsFromRunningTasks = true;
1030       _condForNewTasksToPerform.wait(_mutexForSchedulerUpdate); // mutex released during wait
1031     }
1032   _numberOfEndedTasks=0;
1033   DEBTRACE("---");
1034 }
1035
1036 //! not implemented
1037
1038 void Executor::notifyEndOfThread(YACS::BASES::Thread *thread)
1039 {
1040   /*_mutexForNbOfConcurrentThreads.lock();
1041   _groupOfAllThreadsCreated.remove(thread);
1042   delete thread;
1043   _mutexForNbOfConcurrentThreads.unlock();*/
1044 }
1045
1046
1047 //! must be used protected by _mutexForSchedulerUpdate!
1048
1049 void Executor::wakeUp()
1050 {
1051   DEBTRACE("Executor::wakeUp() " << _isWaitingEventsFromRunningTasks);
1052   if (_isWaitingEventsFromRunningTasks)
1053     {
1054       _isWaitingEventsFromRunningTasks = false;
1055       _condForNewTasksToPerform.notify_all();
1056     }
1057   else
1058     _numberOfEndedTasks++;
1059 }
1060
1061 //! number of running tasks
1062
1063 int Executor::getNbOfThreads()
1064 {
1065   int ret;
1066   YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForNbOfConcurrentThreads);
1067   _isRunningunderExternalControl=true;
1068   ret = _groupOfAllThreadsCreated.size();
1069   return ret;
1070 }
1071
1072
1073 //! Function to perform execution of a task in a thread
1074 /*!
1075  *  \param arg  : 3 elements (a Task, a Scheduler, an Executor)
1076  *
1077  *  Calls Task::execute
1078  *
1079  *  Calls Task::finished when the task is finished
1080  *
1081  *  Calls (notify with event YACS::FINISH) Scheduler::notifyFrom when the task is finished
1082  *
1083  *  Calls Executor::wakeUp and Executor::notifyEndOfThread
1084  */
1085
1086 void *Executor::functionForTaskExecution(void *arg)
1087 {
1088   DEBTRACE("Executor::functionForTaskExecution(void *arg)");
1089
1090   struct threadargs *args = (struct threadargs *) arg;
1091   Task *task=args->task;
1092   Scheduler *sched=args->sched;
1093   Executor *execInst=args->execInst;
1094   delete args;
1095   execInst->traceExec(task, "state:"+Node::getStateName(task->getState()),ComputePlacement(task));
1096
1097   Thread::detach();
1098
1099   // Execute task
1100
1101   YACS::Event ev=YACS::FINISH;
1102   try
1103     {
1104       execInst->traceExec(task, "start execution",ComputePlacement(task));
1105       task->execute();
1106       execInst->traceExec(task, "end execution OK",ComputePlacement(task));
1107     }
1108   catch(Exception& ex)
1109     {
1110       std::cerr << "YACS Exception during execute" << std::endl;
1111       std::cerr << ex.what() << std::endl;
1112       ev=YACS::ABORT;
1113       string message = "end execution ABORT, ";
1114       message += ex.what();
1115       execInst->traceExec(task, message,ComputePlacement(task));
1116     }
1117   catch(...) 
1118     {
1119       // Execution has failed
1120       std::cerr << "Execution has failed: unknown reason" << std::endl;
1121       ev=YACS::ABORT;
1122       execInst->traceExec(task, "end execution ABORT, unknown reason",ComputePlacement(task));
1123     }
1124
1125   // Disconnect task
1126   try
1127     {
1128       DEBTRACE("task->disconnectService()");
1129       task->disconnectService();
1130       execInst->traceExec(task, "disconnectService",ComputePlacement(task));
1131     }
1132   catch(...) 
1133     {
1134       // Disconnect has failed
1135       std::cerr << "disconnect has failed" << std::endl;
1136       ev=YACS::ABORT;
1137       execInst->traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
1138     }
1139   //
1140
1141   std::string placement(ComputePlacement(task));
1142
1143   // container management for HomogeneousPoolOfContainer
1144
1145   HomogeneousPoolContainer *contC(dynamic_cast<HomogeneousPoolContainer *>(task->getContainer()));
1146   if(contC)
1147     {
1148       YACS::BASES::AutoLocker<Container> alckCont(contC);
1149       contC->release(task);
1150     }
1151
1152   DEBTRACE("End task->execute()");
1153   { // --- Critical section
1154     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&execInst->_mutexForSchedulerUpdate);
1155     try
1156       {
1157         if (ev == YACS::FINISH) task->finished();
1158         if (ev == YACS::ABORT)
1159           {
1160             execInst->_errorDetected = true;
1161             if (execInst->_stopOnErrorRequested)
1162               {
1163                 execInst->_execMode = YACS::STEPBYSTEP;
1164                 execInst->_isOKToEnd = true;
1165               }
1166             task->aborted();
1167           }
1168         execInst->traceExec(task, "state:"+Node::getStateName(task->getState()),placement);
1169         sched->notifyFrom(task,ev);
1170       }
1171     catch(Exception& ex)
1172       {
1173         //notify has failed : it is supposed to have set state
1174         //so no need to do anything
1175         std::cerr << "Error during notification" << std::endl;
1176         std::cerr << ex.what() << std::endl;
1177       }
1178     catch(...)
1179       {
1180         //notify has failed : it is supposed to have set state
1181         //so no need to do anything
1182         std::cerr << "Notification failed" << std::endl;
1183       }
1184     execInst->_numberOfRunningTasks--;
1185     execInst->_runningTasks.erase(task);
1186     DEBTRACE("_numberOfRunningTasks: " << execInst->_numberOfRunningTasks 
1187              << " _execMode: " << execInst->_execMode
1188              << " _executorState: " << execInst->_executorState);
1189     if ((execInst->_numberOfRunningTasks == 0) && (execInst->_execMode != YACS::CONTINUE)) // no more running tasks
1190       {
1191         if (execInst->_executorState == YACS::WAITINGTASKS)
1192           {
1193             execInst->_executorState = YACS::PAUSED;
1194             execInst->sendEvent("executor");
1195             execInst->_condForPilot.notify_all();
1196             if (execInst->_errorDetected &&
1197                 execInst->_stopOnErrorRequested &&
1198                 !execInst->_isRunningunderExternalControl)
1199               execInst->_condForStepByStep.notify_all(); // exec thread may be on waitResume
1200           }
1201       }
1202     DEBTRACE("before _semForMaxThreads.post " << execInst->_semThreadCnt);
1203     execInst->_semForMaxThreads.post();
1204     execInst->_semThreadCnt += 1;
1205     DEBTRACE("after _semForMaxThreads.post " << execInst->_semThreadCnt);
1206     if (execInst->_executorState != YACS::PAUSED)  execInst->wakeUp();
1207
1208   } // --- End of critical section (change state)
1209
1210   //execInst->notifyEndOfThread(0);
1211   Thread::exit(0);
1212   return 0;
1213 }
1214
1215 void Executor::traceExec(Task *task, const std::string& message, const std::string& placement)
1216 {
1217   string nodeName = _mainSched->getTaskName(task);
1218   Container *cont = task->getContainer();
1219   string containerName = "---";
1220   if (cont)
1221     containerName = cont->getName();
1222
1223 #ifdef WIN32
1224   DWORD now = timeGetTime();
1225   double elapse = (now - _start)/1000.0;
1226 #else
1227   timeval now;
1228   gettimeofday(&now, NULL);
1229   double elapse = (now.tv_sec - _start.tv_sec) + double(now.tv_usec - _start.tv_usec)/1000000.0;
1230 #endif
1231   {
1232     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForTrace);
1233     _trace << elapse << " " << containerName << " " << placement << " " << nodeName << " " << message << endl;
1234     _trace << flush;
1235   }
1236 }
1237
1238 //! emit notification to all observers registered with  the dispatcher 
1239 /*!
1240  * The dispatcher is unique and can be obtained by getDispatcher()
1241  */
1242 void Executor::sendEvent(const std::string& event)
1243 {
1244   Dispatcher* disp=Dispatcher::getDispatcher();
1245   YASSERT(disp);
1246   YASSERT(_root);
1247   disp->dispatch(_root,event);
1248 }
1249
1250 /*!
1251  * This method takes in input a list of tasks and selects from that lists a part of it considering only the containers.
1252  * If tasks have no container instance subclass of HomogeneousPoolContainer this method will let the \a tsks untouched.
1253  *
1254  * \param [in,out] tsks - list of tasks to be
1255  */
1256 void Executor::FilterTasksConsideringContainers(std::vector<Task *>& tsks)
1257 {
1258   std::map<HomogeneousPoolContainer *, std::vector<Task *> > m;
1259   for(std::vector<Task *>::const_iterator it=tsks.begin();it!=tsks.end();it++)
1260     {
1261       Task *cur(*it);
1262       if(!cur)
1263         continue;
1264       Container *cont(cur->getContainer());
1265       if(!cont)
1266         {
1267           m[(HomogeneousPoolContainer *)NULL].push_back(cur);
1268           continue;
1269         }
1270       HomogeneousPoolContainer *contC(dynamic_cast<HomogeneousPoolContainer *>(cont));
1271       if(!contC)
1272         {
1273           m[(HomogeneousPoolContainer *)NULL].push_back(cur);
1274           continue;
1275         }
1276       m[contC].push_back(cur);
1277     }
1278   //
1279   std::vector<Task *> ret;
1280   for(std::map<HomogeneousPoolContainer *, std::vector<Task *> >::const_iterator it=m.begin();it!=m.end();it++)
1281     {
1282       HomogeneousPoolContainer *curhpc((*it).first);
1283       const std::vector<Task *>& curtsks((*it).second);
1284       if(!curhpc)
1285         {
1286           ret.insert(ret.end(),curtsks.begin(),curtsks.end());
1287         }
1288       else
1289         {
1290           // start of critical section for container curhpc
1291           YACS::BASES::AutoLocker<Container> alckForCont(curhpc);
1292           std::vector<const Task *> vecOfTaskSharingSameHPContToBeRunSimutaneously;
1293           std::size_t sz(curhpc->getNumberOfFreePlace());
1294           std::vector<Task *>::const_iterator it2(curtsks.begin());
1295           for(std::size_t i=0;i<sz && it2!=curtsks.end();i++,it2++)
1296             {
1297               vecOfTaskSharingSameHPContToBeRunSimutaneously.push_back(*it2);
1298               ret.push_back(*it2);
1299             }
1300           curhpc->allocateFor(vecOfTaskSharingSameHPContToBeRunSimutaneously);
1301           //end of critical section
1302         }
1303     }
1304   //
1305   tsks=ret;
1306 }
1307
1308 std::string Executor::ComputePlacement(Task *zeTask)
1309 {
1310   std::string placement("---");
1311   if(!zeTask)
1312     return placement;
1313   if(zeTask->getContainer())
1314     placement=zeTask->getContainer()->getFullPlacementId(zeTask);
1315   return placement;
1316 }