Salome HOME
Update copyrights 2014.
[modules/yacs.git] / src / engine / Executor.cxx
1 // Copyright (C) 2006-2014  CEA/DEN, EDF R&D
2 //
3 // This library is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU Lesser General Public
5 // License as published by the Free Software Foundation; either
6 // version 2.1 of the License, or (at your option) any later version.
7 //
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
11 // Lesser General Public License for more details.
12 //
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
16 //
17 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
18 //
19
20 #include "Executor.hxx"
21 #include "Task.hxx"
22 #include "Scheduler.hxx"
23 #include "Dispatcher.hxx"
24 #include "Container.hxx"
25 #include "ComponentInstance.hxx"
26
27 #include "VisitorSaveState.hxx"
28 #include "ComposedNode.hxx"
29
30 #include <iostream>
31 #include <fstream>
32 #include <sys/stat.h>
33 #ifndef WIN32
34 #include <sys/time.h>
35 #include <unistd.h>
36 #endif
37
38 #include <cstdlib>
39 #include <algorithm>
40
41 #ifdef WIN32
42 #define usleep(A) _sleep(A/1000)
43 #if !defined(S_ISCHR) || !defined(S_ISREG)
44 #  ifndef S_IFMT
45 #    ifdef _S_IFMT
46 #      define S_IFMT _S_IFMT
47 #      define S_IFCHR _S_IFCHR
48 #      define S_IFREG _S_IFREG
49 #    else
50 #    ifdef __S_IFMT
51 #      define S_IFMT __S_IFMT
52 #      define S_IFCHR __S_IFCHR
53 #      define S_IFREG __S_IFREG
54 #    endif
55 #    endif
56 #  endif
57 #  define S_ISCHR(mode) (((mode) & S_IFMT) == S_IFCHR)
58 #  define S_ISREG(mode) (((mode) & S_IFMT) == S_IFREG)
59 #endif
60 #endif
61
62 using namespace YACS::ENGINE;
63 using namespace std;
64
65 using YACS::BASES::Mutex;
66 using YACS::BASES::Thread;
67 using YACS::BASES::Semaphore;
68
69 //#define _DEVDEBUG_
70 #include "YacsTrace.hxx"
71
72 int Executor::_maxThreads(50);
73 size_t Executor::_threadStackSize(1048576); // Default thread stack size is 1MB
74
75 Executor::Executor():_nbOfConcurrentThreads(0), _semForMaxThreads(_maxThreads)
76 {
77   _root=0;
78   _toContinue = true;
79   _isOKToEnd = false;
80   _stopOnErrorRequested = false;
81   _dumpOnErrorRequested = false;
82   _errorDetected = false;
83   _isRunningunderExternalControl=false;
84   _executorState = YACS::NOTYETINITIALIZED;
85   _execMode = YACS::CONTINUE;
86   _semThreadCnt = _maxThreads;
87   DEBTRACE("Executor initialized with max threads = " << _maxThreads);
88 }
89
90 Executor::~Executor()
91 {
92   for(list<Thread *>::iterator iter=_groupOfAllThreadsCreated.begin();iter!=_groupOfAllThreadsCreated.end();iter++)
93     delete *iter;
94 }
95
96 //! Execute a graph waiting for completion
97 /*!
98  *  \param graph : schema to execute
99  *  \param debug : display the graph with dot if debug == 1
100  *  \param fromScratch : if true the graph is reinitialized
101  *
102  *  Calls Scheduler::getNextTasks and Scheduler::selectRunnableTasks to select tasks to execute
103  *  
104  *  Calls Executor::launchTask to execute a selected Task.
105  *
106  *  Completion when graph is finished (Scheduler::isFinished)
107  */
108
109 void Executor::RunA(Scheduler *graph,int debug, bool fromScratch)
110 {
111   DEBTRACE("Executor::RunW debug: " << debug << " fromScratch: " << fromScratch);
112   _mainSched=graph;
113   _root = dynamic_cast<ComposedNode *>(_mainSched);
114   if (!_root) throw Exception("Executor::Run, Internal Error!");
115   bool isMore;
116   int i=0;
117   if(debug>1)_displayDot(graph);
118   if (fromScratch)
119     {
120       graph->init();
121       graph->exUpdateState();
122     }
123   if(debug>1)_displayDot(graph);
124   vector<Task *> tasks;
125   vector<Task *>::iterator iter;
126   _toContinue=true;
127   _execMode = YACS::CONTINUE;
128   _isWaitingEventsFromRunningTasks = false;
129   _numberOfRunningTasks = 0;
130   _runningTasks.clear();
131   _numberOfEndedTasks=0;
132   while(_toContinue)
133     {
134       sleepWhileNoEventsFromAnyRunningTask();
135
136       if(debug>2)_displayDot(graph);
137
138       {//Critical section
139         _mutexForSchedulerUpdate.lock();
140         tasks=graph->getNextTasks(isMore);
141         graph->selectRunnableTasks(tasks);
142         _mutexForSchedulerUpdate.unlock();
143       }//End of critical section
144
145       if(debug>2)_displayDot(graph);
146
147       for(iter=tasks.begin();iter!=tasks.end();iter++)
148         loadTask(*iter);
149
150       if(debug>1)_displayDot(graph);
151
152       launchTasks(tasks);
153
154       if(debug>1)_displayDot(graph);
155
156       {//Critical section
157         _mutexForSchedulerUpdate.lock();
158         _toContinue=!graph->isFinished();
159         _mutexForSchedulerUpdate.unlock();
160       }//End of critical section
161       DEBTRACE("_toContinue: " << _toContinue);
162
163       if(debug>0)_displayDot(graph);
164
165       i++;
166     }
167 }
168
169 //! Execute a graph with breakpoints or step by step
170 /*!
171  *  To be launch in a thread (main thread controls the progression).
172  *  \param graph : schema to execute
173  *  \param debug : display the graph with dot if debug >0
174  *  \param fromScratch : if false, state from a previous partial exection is already loaded
175  *
176  *  Calls Scheduler::getNextTasks and Scheduler::selectRunnableTasks to select tasks to execute
177  *  
178  *  Calls Executor::checkBreakPoints to verify if a pause is requested 
179  * 
180  *  Calls Executor::launchTask to execute a selected Task
181  *
182  *  Completion when graph is finished (Scheduler::isFinished)
183  *
184  *  States of execution:
185  *  - YACS::NOTYETINITIALIZED
186  *  - YACS::INITIALISED
187  *  - YACS::RUNNING            (to next breakpoint or step)
188  *  - YACS::WAITINGTASKS       (a breakpoint or step as been reached, but there are still running tasks)
189  *  - YACS::PAUSED             (a breakpoint or step as been reached, no more running tasks)
190  *  - YACS::FINISHED           (no more ready tasks, nore running tasks)
191  *  - YACS::STOPPED            (stopped by user before end)
192  *
193  *  Modes of Execution:
194  *  - YACS::CONTINUE           (normal run without breakpoints)
195  *  - YACS::STEPBYSTEP         (pause at each loop)
196  *  - YACS::STOPBEFORENODES    (pause when a node is reached)
197  *
198  *  A breakpoint is defined by a node name. The breakpoint is reached when the node becomes ready.
199  *  Step by Step means execution node by node or group of node by group of nodes.
200  *  At a given step, the user decides to launch all the ready nodes or only a subset
201  *  (Caution: some nodes must run in parallel). 
202  *  The next event (end of task) may give a new set of ready nodes, and define a new step.
203  *
204  *  The graph execution may be controled by a pilot which sends requests. Requests are asynchronous.
205  *  Requests are taken into account only on certain states, otherwise return the status IgnoredRequest.
206  *  - Executor::getCurrentExecMode
207  *  - Executor::getExecutorState
208  *  - Executor::setExecMode             : change the execution mode for next loop
209  *  - Executor::setListOfBreakPoints    : must be set before setting YACS::STOPBEFORENODES
210  *  - Executor::getTasksToLoad          : when paused or waiting tasks, get the list of next tasks
211  *  - Executor::setStepsToExecute       : define a subset of the list given by Executor::getTasksToLoad
212  *  - Executor::resumeCurrentBreakPoint : when paused or waiting tasks, resumes execution
213  *  - Executor::isNotFinished
214  *  - Executor::stopExecution           : stop execution asap, i.e. set STEPBYSTEP and wait PAUSED
215  *  - Executor::saveState               : dump the current state of execution in an xml file
216  *  - Executor::loadState               : Not yet implemented
217  *  - Executor::getNbOfThreads
218  *  - Executor::displayDot
219  *  - Executor::setStopOnError          : ask to stop execution if a node is found in ERROR state
220  *
221  *  If the pilot wants to wait the state YACS::PAUSED or YACS::WAITINGTASKS, synchronisation is obtained with:
222  *  - Executor::waitPause
223  *
224  *  TO BE VALIDATED:
225  *  - Pilot may connect to executor during execution, or deconnect.
226  *  - Several Pilots may be connected at the same time (for observation...)
227  * 
228  */
229
230 void Executor::RunB(Scheduler *graph,int debug, bool fromScratch)
231 {
232   DEBTRACE("Executor::RunB debug: "<< graph->getName() <<" "<< debug<<" fromScratch: "<<fromScratch);
233
234   { // --- Critical section
235     _mutexForSchedulerUpdate.lock();
236     _mainSched = graph;
237     _root = dynamic_cast<ComposedNode *>(_mainSched);
238     if (!_root) throw Exception("Executor::Run, Internal Error!");
239     _executorState = YACS::NOTYETINITIALIZED;
240     sendEvent("executor");
241     _toContinue=true;
242     _isOKToEnd = false;
243     _errorDetected = false;
244     _isWaitingEventsFromRunningTasks = false;
245     _numberOfRunningTasks = 0;
246     _runningTasks.clear();
247     _numberOfEndedTasks = 0;
248     string tracefile = "traceExec_";
249     tracefile += _mainSched->getName();
250     _trace.open(tracefile.c_str());
251 #ifdef WIN32
252    _start = timeGetTime();
253 #else
254     gettimeofday(&_start, NULL);
255 #endif
256
257     _mutexForSchedulerUpdate.unlock();
258   } // --- End of critical section
259
260   if (debug > 1) _displayDot(graph);
261
262   if (fromScratch)
263     {
264       try
265         {
266           graph->init();
267           graph->exUpdateState();
268         }
269       catch(Exception& ex)
270         {
271           DEBTRACE("exception: "<< (ex.what()));
272           _executorState = YACS::FINISHED;
273           sendEvent("executor");
274           throw;
275         }
276     }
277   _executorState = YACS::INITIALISED;
278   sendEvent("executor");
279
280   if (debug > 1) _displayDot(graph);
281
282   vector<Task *>::iterator iter;
283   bool isMore;
284   int problemCount=0;
285   int numberAllTasks;
286
287   _executorState = YACS::RUNNING;
288   sendEvent("executor");
289   while (_toContinue)
290     {
291       DEBTRACE("--- executor main loop");
292       sleepWhileNoEventsFromAnyRunningTask();
293       DEBTRACE("--- events...");
294       if (debug > 2) _displayDot(graph);
295       { // --- Critical section
296         _mutexForSchedulerUpdate.lock();
297         _tasks=graph->getNextTasks(isMore);
298         numberAllTasks=_numberOfRunningTasks+_tasks.size();
299         graph->selectRunnableTasks(_tasks);
300         _mutexForSchedulerUpdate.unlock();
301       } // --- End of critical section
302       if (debug > 2) _displayDot(graph);
303       if (_executorState == YACS::RUNNING)
304         {
305           if (checkBreakPoints()) break; // end of thread requested, OK to exit at once;
306           if (debug > 0) _displayDot(graph);
307           DEBTRACE("---");
308           for (iter = _tasks.begin(); iter != _tasks.end(); iter++)
309             loadTask(*iter);
310           if (debug > 1) _displayDot(graph);
311           DEBTRACE("---");
312           launchTasks(_tasks);
313           DEBTRACE("---");
314         }
315       if (debug > 1) _displayDot(graph);
316       { // --- Critical section
317         DEBTRACE("---");
318         _mutexForSchedulerUpdate.lock();
319         //It is possible that the graph is finished but it remains running tasks (it's an error but we must take it into account)
320         if(_numberOfRunningTasks == 0)
321           _toContinue = !graph->isFinished();
322
323         DEBTRACE("_numberOfRunningTasks: " << _numberOfRunningTasks);
324         DEBTRACE("_numberOfEndedTasks: " << _numberOfEndedTasks);
325         DEBTRACE("_toContinue: " << _toContinue);
326         if(_toContinue && numberAllTasks==0)
327           {
328             //Problem : no running tasks and no task to launch ??
329             problemCount++;
330             std::cerr << "Problem in Executor : no running tasks and no task to launch ?? problemCount=" << problemCount << std::endl;
331             //Pause to give a chance to interrupt
332             usleep(1000);
333             if(problemCount > 25)
334               {
335                 // Too much problems encountered : stop execution
336                 _toContinue=false;
337               }
338           }
339
340         if (! _toContinue)
341           {
342             _executorState = YACS::FINISHED;
343             sendEvent("executor");
344             _condForPilot.notify_all();
345           }
346         _mutexForSchedulerUpdate.unlock();
347       } // --- End of critical section
348       if (debug > 0) _displayDot(graph);
349       DEBTRACE("_toContinue: " << _toContinue);
350     }
351
352   DEBTRACE("End of main Loop");
353
354   { // --- Critical section
355     _mutexForSchedulerUpdate.lock();
356     if ( _toContinue) // --- break while(): request to stop detected on checkBreakPoints()
357       {
358         DEBTRACE("stop requested: End soon");
359         _executorState = YACS::STOPPED;
360         _toContinue = false;
361         sendEvent("executor");
362       }
363     _mutexForSchedulerUpdate.unlock();
364   } // --- End of critical section
365   if ( _dumpOnErrorRequested && _errorDetected)
366     {
367       saveState(_dumpErrorFile);
368     }
369   _trace.close();
370   DEBTRACE("End of RunB thread");  
371 }
372
373 YACS::ExecutionMode Executor::getCurrentExecMode()
374 {
375   _isRunningunderExternalControl=true;
376   return _execMode;
377 }
378
379
380 YACS::ExecutorState Executor::getExecutorState()
381 {
382   _isRunningunderExternalControl=true;
383   return _executorState;
384 }
385
386
387 bool Executor::isNotFinished()
388 {
389   _isRunningunderExternalControl=true;
390   return _toContinue;
391 }
392
393 //! ask to stop execution on the first node found in error
394 /*!
395  * \param dumpRequested   produce a state dump when an error is found
396  * \param xmlFile         name of file used for state dump
397  */
398
399 void Executor::setStopOnError(bool dumpRequested, std::string xmlFile)
400 {
401   { // --- Critical section
402     _mutexForSchedulerUpdate.lock();
403     _dumpErrorFile=xmlFile;
404     _stopOnErrorRequested=true;
405     _dumpOnErrorRequested = dumpRequested;
406     if (dumpRequested && xmlFile.empty())
407       throw YACS::Exception("dump on error requested and no filename given for dump");
408     _mutexForSchedulerUpdate.unlock();
409     DEBTRACE("_dumpErrorFile " << _dumpErrorFile << " " << _dumpOnErrorRequested);
410   } // --- End of critical section
411 }
412
413 //! ask to do not stop execution on nodes found in error
414 /*!
415  */
416
417 void Executor::unsetStopOnError()
418 {
419   { // --- Critical section
420     _mutexForSchedulerUpdate.lock();
421     _stopOnErrorRequested=false;
422     _mutexForSchedulerUpdate.unlock();
423   } // --- End of critical section
424 }
425
426 //! Dynamically set the current mode of execution
427 /*!
428  * The mode can be Continue, step by step, or stop before execution of a node
429  * defined in a list of breakpoints.
430  */
431
432 void Executor::setExecMode(YACS::ExecutionMode mode)
433 {
434   DEBTRACE("Executor::setExecMode(YACS::ExecutionMode mode) " << mode);
435   { // --- Critical section
436     _mutexForSchedulerUpdate.lock();
437     _isRunningunderExternalControl=true;
438     _execMode = mode;
439     _mutexForSchedulerUpdate.unlock();
440   } // --- End of critical section
441 }
442
443 //! wake up executor when in pause
444 /*!
445  * When Executor is in state paused or waiting for task completion, the thread
446  * running loop RunB waits on condition _condForStepByStep.
447  * Thread RunB is waken up.
448  * \return true when actually wakes up executor
449  */
450
451 bool Executor::resumeCurrentBreakPoint()
452 {
453   DEBTRACE("Executor::resumeCurrentBreakPoint()");
454   bool ret = false;
455   //bool doDump = false;
456   { // --- Critical section
457     _mutexForSchedulerUpdate.lock();
458     _isRunningunderExternalControl=true;
459     DEBTRACE("_executorState: " << _executorState);
460     switch (_executorState)
461       {
462       case YACS::WAITINGTASKS:
463       case YACS::PAUSED:
464         {
465           _condForStepByStep.notify_all();
466           _executorState = YACS::RUNNING;
467           sendEvent("executor");
468           ret = true;
469           //if (_dumpOnErrorRequested && _errorDetected) doDump =true;
470           break;
471         }
472       case YACS::FINISHED:
473       case YACS::STOPPED:
474         {
475           //if (_dumpOnErrorRequested && _errorDetected) doDump =true;
476           DEBTRACE("Graph Execution finished or stopped !");
477           break;
478         }
479       default :
480         {
481           // debug: no easy way to verify if main loop is acutally waiting on condition
482         }
483       }
484     _mutexForSchedulerUpdate.unlock();
485     DEBTRACE("---");
486     //if (doDump) saveState(_dumpErrorFile);
487   } // --- End of critical section
488   return ret;
489 }
490
491
492 //! define a list of nodes names as breakpoints in the graph
493
494
495 void Executor::setListOfBreakPoints(std::list<std::string> listOfBreakPoints)
496 {
497   DEBTRACE("Executor::setListOfBreakPoints(std::list<std::string> listOfBreakPoints)");
498   { // --- Critical section
499     _mutexForSchedulerUpdate.lock();
500     _isRunningunderExternalControl=true;
501     _listOfBreakPoints = listOfBreakPoints;
502     _mutexForSchedulerUpdate.unlock();
503   } // --- End of critical section
504 }
505
506
507 //! Get the list of tasks to load, to define a subset to execute in step by step mode
508 /*!
509  *  If the executor is not in mode YACS::WAITINGTASKS nor YACS::PAUSED, the list is empty.
510  *  Use Executor::waitPause to wait.
511  */
512 std::list<std::string> Executor::getTasksToLoad()
513 {
514   DEBTRACE("Executor::getTasksToLoad()");
515   list<string> listOfNodesToLoad;
516   listOfNodesToLoad.clear();
517   { // --- Critical section
518     _mutexForSchedulerUpdate.lock();
519     _isRunningunderExternalControl=true;
520     switch (_executorState)
521       {
522       case YACS::WAITINGTASKS:
523       case YACS::PAUSED:
524         {
525           listOfNodesToLoad = _listOfTasksToLoad;
526           break;
527         }
528       case YACS::NOTYETINITIALIZED:
529       case YACS::INITIALISED:
530       case YACS::RUNNING:
531       case YACS::FINISHED:
532       case YACS::STOPPED:
533       default:
534         {
535           break;
536         }
537       }
538     _mutexForSchedulerUpdate.unlock();
539   } // --- End of critical section
540   return listOfNodesToLoad;
541 }
542
543
544 //! Define a subset of task to execute in step by step mode
545 /*!
546  * Behaviour is unpredictable if the list is not a subset of the list given by Executor::getTasksToLoad
547  * in the current step.
548  * If some nodes must run in parallel, they must stay together in the list.
549  */
550
551 bool Executor::setStepsToExecute(std::list<std::string> listToExecute)
552 {
553   DEBTRACE("Executor::setStepsToExecute(std::list<std::string> listToExecute)");
554   bool ret = true;
555   vector<Task *>::iterator iter;
556   vector<Task *> restrictedTasks;
557   { // --- Critical section
558     _mutexForSchedulerUpdate.lock();
559     _isRunningunderExternalControl=true;
560     switch (_executorState)
561       {
562       case YACS::WAITINGTASKS:
563       case YACS::PAUSED:
564         {
565           for (iter=_tasksSave.begin(); iter!=_tasksSave.end(); iter++)
566             {
567               string readyNode = _mainSched->getTaskName(*iter);
568               if (find(listToExecute.begin(), listToExecute.end(), readyNode)
569                   != listToExecute.end())
570                 {
571                   restrictedTasks.push_back(*iter);
572                   DEBTRACE("node to execute " << readyNode);
573                 }
574             }
575           _tasks.clear();
576           for (iter=restrictedTasks.begin(); iter!=restrictedTasks.end(); iter++)
577             {
578               _tasks.push_back(*iter);
579             }
580           break;
581         }
582       case YACS::NOTYETINITIALIZED:
583       case YACS::INITIALISED:
584       case YACS::RUNNING:
585       case YACS::FINISHED:
586       case YACS::STOPPED:
587       default:
588         {
589           break;
590         }
591       }
592     _mutexForSchedulerUpdate.unlock();
593     } // --- End of critical section
594
595   _tasks.clear();
596   for (iter=restrictedTasks.begin(); iter!=restrictedTasks.end(); iter++)
597     {
598       _tasks.push_back(*iter);
599     }
600   for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
601     {
602       string readyNode = _mainSched->getTaskName(*iter);
603       DEBTRACE("selected node to execute " << readyNode);
604     }
605
606   return ret;
607 }
608
609 //! suspend pilot execution until Executor is in pause or waiting tasks completion mode.
610 /*!
611  *  Do nothing if execution is finished or in pause.
612  *  Wait first step if Executor is running or in initialization.
613  */
614
615 void Executor::waitPause()
616 {
617   DEBTRACE("Executor::waitPause()" << _executorState);
618   { // --- Critical section
619     _mutexForSchedulerUpdate.lock();
620     _isRunningunderExternalControl=true;
621     switch (_executorState)
622       {
623       default:
624       case YACS::STOPPED:
625       case YACS::FINISHED:
626       case YACS::WAITINGTASKS:
627       case YACS::PAUSED:
628         {
629           break;
630         }
631       case YACS::NOTYETINITIALIZED:
632       case YACS::INITIALISED:
633       case YACS::RUNNING:
634         {
635           _condForPilot.wait(_mutexForSchedulerUpdate); // wait until executor is PAUSED or WAITINGTASKS
636           break;
637         }
638       }
639     _mutexForSchedulerUpdate.unlock();
640   } // --- End of critical section
641   DEBTRACE("---");
642 }
643
644 //! stops the execution as soon as possible 
645
646 void Executor::stopExecution()
647 {
648   setExecMode(YACS::STEPBYSTEP);
649   //waitPause();
650   _isOKToEnd = true;
651   resumeCurrentBreakPoint();
652 }
653
654 //! save the current state of execution in an xml file
655
656 bool Executor::saveState(const std::string& xmlFile)
657 {
658   DEBTRACE("Executor::saveState() in " << xmlFile);
659   YACS::ENGINE::VisitorSaveState vst(_root);
660   vst.openFileDump(xmlFile.c_str());
661   _root->accept(&vst);
662   vst.closeFileDump();
663   return true;
664 }
665
666 //! not yet implemented
667
668 bool Executor::loadState()
669 {
670   DEBTRACE("Executor::loadState()");
671   _isRunningunderExternalControl=true;
672   return true;
673 }
674
675
676 static int isfile(const char *filename) 
677 {
678   struct stat buf;
679   if (stat(filename, &buf) != 0)
680     return 0;
681   if (!S_ISREG(buf.st_mode))
682     return 0;
683   return 1;
684 }
685
686 //! Display the graph state as a dot display, public method
687
688 void Executor::displayDot(Scheduler *graph)
689 {
690   _isRunningunderExternalControl=true;
691   _displayDot(graph);
692 }
693
694 //! Display the graph state as a dot display
695 /*!
696  *  \param graph  : the node to display
697  */
698
699 void Executor::_displayDot(Scheduler *graph)
700 {
701    std::ofstream g("titi");
702    ((ComposedNode*)graph)->writeDot(g);
703    g.close();
704    const char displayScript[]="display.sh";
705    if(isfile(displayScript))
706      system("sh display.sh");
707    else
708      system("dot -Tpng titi|display -delay 5");
709 }
710
711 //! Wait reactivation in modes Step By step or with BreakPoints
712 /*!
713  *  Check mode of execution (set by main thread):
714  *  - YACS::CONTINUE        : the graph execution continues.
715  *  - YACS::STEPBYSTEP      : wait on condition (pilot thread, Executor::resumeCurrentBreakPoint)
716  *  - YACS::STOPBEFORENODES : if there are ready nodes in a list of breakpoints,
717  *                            wait on condition (pilot thread, Executor::resumeCurrentBreakPoint)
718  *                            else continue the graph execution.
719  *  \return true if end of executor thread is requested
720  */
721
722 bool Executor::checkBreakPoints()
723 {
724   DEBTRACE("Executor::checkBreakPoints()");
725   vector<Task *>::iterator iter;
726   bool endRequested = false;
727
728   switch (_execMode)
729     {
730     case YACS::CONTINUE:
731       {
732         break;
733       }
734     case YACS::STOPBEFORENODES:
735       {
736         bool stop = false;
737         { // --- Critical section
738           _mutexForSchedulerUpdate.lock();
739           _tasksSave = _tasks;
740           for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
741             {
742               string nodeToLoad = _mainSched->getTaskName(*iter);
743               if (find(_listOfBreakPoints.begin(), _listOfBreakPoints.end(), nodeToLoad)
744                   != _listOfBreakPoints.end())
745                 {
746                   stop = true;
747                   break;
748                 }
749             }
750           if (stop)
751             {
752               _listOfTasksToLoad.clear();
753               for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
754                 {
755                   string nodeToLoad = _mainSched->getTaskName(*iter);
756                   _listOfTasksToLoad.push_back(nodeToLoad);
757                 }
758               if (getNbOfThreads())
759                 _executorState = YACS::WAITINGTASKS; // will be paused after completion of running tasks
760               else
761                 _executorState = YACS::PAUSED;
762               sendEvent("executor");
763               _condForPilot.notify_all();
764             }
765           //_mutexForSchedulerUpdate.unlock(); 
766           //} // --- End of critical section
767           if (stop && !_isOKToEnd) waitResume(); // wait until pilot calls resumeCurrentBreakPoint(), mutex released during wait 
768           if (_isOKToEnd) endRequested = true;
769           _mutexForSchedulerUpdate.unlock();
770         } // --- End of critical section
771           if (stop) DEBTRACE("wake up from waitResume");
772         break;
773       }
774     default:
775     case YACS::STEPBYSTEP:
776       {
777         { // --- Critical section
778           _mutexForSchedulerUpdate.lock();
779           _tasksSave = _tasks;
780           _listOfTasksToLoad.clear();
781           for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
782             {
783               string nodeToLoad = _mainSched->getTaskName(*iter);
784               _listOfTasksToLoad.push_back(nodeToLoad);
785             }
786           if (getNbOfThreads())
787             _executorState = YACS::WAITINGTASKS; // will be paused after completion of running tasks
788           else
789             _executorState = YACS::PAUSED;
790           sendEvent("executor");
791           _condForPilot.notify_all();
792           if (!_isOKToEnd)
793             waitResume(); // wait until pilot calls resumeCurrentBreakPoint(), mutex released during wait
794                           // or, if no pilot, wait until no more running tasks (stop on error)
795           if (_isOKToEnd) endRequested = true;
796           _mutexForSchedulerUpdate.unlock();
797         } // --- End of critical section
798         DEBTRACE("wake up from waitResume");
799         break;
800       }
801     }
802   DEBTRACE("endRequested: " << endRequested);
803   return endRequested;
804 }
805
806
807 //! in modes Step By step or with BreakPoint, wait until pilot resumes the execution
808 /*!
809  *  With the condition Mutex, the mutex is released atomically during the wait.
810  *  Pilot calls Executor::resumeCurrentBreakPoint to resume execution.
811  *  Must be called while mutex is locked.
812  */
813
814 void Executor::waitResume()
815 {
816   DEBTRACE("Executor::waitResume()");
817   _condForStepByStep.wait(_mutexForSchedulerUpdate); // wait until pilot calls resumeCurrentBreakPoint()
818   DEBTRACE("---");
819 }
820
821
822 //! Perform loading of a Task.
823 /*!
824  *  \param task  : Task to load
825  */
826
827 void Executor::loadTask(Task *task)
828 {
829   DEBTRACE("Executor::loadTask(Task *task)");
830   if(task->getState() != YACS::TOLOAD)return;
831   traceExec(task, "state:TOLOAD");
832   {//Critical section
833     _mutexForSchedulerUpdate.lock();
834     _mainSched->notifyFrom(task,YACS::START);
835     _mutexForSchedulerUpdate.unlock();
836   }//End of critical section
837   try
838     {
839       traceExec(task, "load");
840       task->load();
841       traceExec(task, "initService");
842       task->initService();
843     }
844   catch(Exception& ex) 
845     {
846       std::cerr << ex.what() << std::endl;
847       {//Critical section
848         _mutexForSchedulerUpdate.lock();
849         task->aborted();
850         _mainSched->notifyFrom(task,YACS::ABORT);
851         traceExec(task, "state:"+Node::getStateName(task->getState()));
852         _mutexForSchedulerUpdate.unlock();
853       }//End of critical section
854     }
855   catch(...) 
856     {
857       std::cerr << "Load failed" << std::endl;
858       {//Critical section
859         _mutexForSchedulerUpdate.lock();
860         task->aborted();
861         _mainSched->notifyFrom(task,YACS::ABORT);
862         traceExec(task, "state:"+Node::getStateName(task->getState()));
863         _mutexForSchedulerUpdate.unlock();
864       }//End of critical section
865     }
866 }
867
868
869 //! Execute a list of tasks possibly connected through datastream links
870 /*!
871  *  \param tasks  : a list of tasks to execute
872  *
873  */
874 void Executor::launchTasks(std::vector<Task *>& tasks)
875 {
876   vector<Task *>::iterator iter;
877   //First phase, make datastream connections
878   for(iter=tasks.begin();iter!=tasks.end();iter++)
879     {
880       YACS::StatesForNode state=(*iter)->getState();
881       if(state != YACS::TOLOAD && state != YACS::TORECONNECT)continue;
882       try
883         {
884           (*iter)->connectService();
885           traceExec(*iter, "connectService");
886           {//Critical section
887             _mutexForSchedulerUpdate.lock();
888             (*iter)->connected();
889             _mutexForSchedulerUpdate.unlock();
890           }//End of critical section
891         }
892       catch(Exception& ex) 
893         {
894           std::cerr << ex.what() << std::endl;
895           try
896             {
897               (*iter)->disconnectService();
898               traceExec(*iter, "disconnectService");
899             }
900           catch(...) 
901             {
902               // Disconnect has failed
903               traceExec(*iter, "disconnectService failed, ABORT");
904             }
905           {//Critical section
906             _mutexForSchedulerUpdate.lock();
907             (*iter)->aborted();
908             _mainSched->notifyFrom(*iter,YACS::ABORT);
909             _mutexForSchedulerUpdate.unlock();
910           }//End of critical section
911         }
912       catch(...) 
913         {
914           std::cerr << "Problem in connectService" << std::endl;
915           try
916             {
917               (*iter)->disconnectService();
918               traceExec(*iter, "disconnectService");
919             }
920           catch(...) 
921             {
922               // Disconnect has failed
923               traceExec(*iter, "disconnectService failed, ABORT");
924             }
925           {//Critical section
926             _mutexForSchedulerUpdate.lock();
927             (*iter)->aborted();
928             _mainSched->notifyFrom(*iter,YACS::ABORT);
929             _mutexForSchedulerUpdate.unlock();
930           }//End of critical section
931         }
932       if((*iter)->getState() == YACS::ERROR)
933         {
934           //try to put all coupled tasks in error
935           std::set<Task*> coupledSet;
936           (*iter)->getCoupledTasks(coupledSet);
937           for (std::set<Task*>::iterator it = coupledSet.begin(); it != coupledSet.end(); ++it)
938             {
939               Task* t=*it;
940               if(t == *iter)continue;
941               if(t->getState() == YACS::ERROR)continue;
942               try
943                 {
944                   t->disconnectService();
945                   traceExec(t, "disconnectService");
946                 }
947               catch(...)
948                 {
949                   // Disconnect has failed
950                   traceExec(t, "disconnectService failed, ABORT");
951                 }
952               {//Critical section
953                 _mutexForSchedulerUpdate.lock();
954                 t->aborted();
955                 _mainSched->notifyFrom(t,YACS::ABORT);
956                 _mutexForSchedulerUpdate.unlock();
957               }//End of critical section
958               traceExec(t, "state:"+Node::getStateName(t->getState()));
959             }
960         }
961       traceExec(*iter, "state:"+Node::getStateName((*iter)->getState()));
962     }
963
964   //Second phase, execute each task in a thread
965   for(iter=tasks.begin();iter!=tasks.end();iter++)
966     {
967       launchTask(*iter);
968     }
969 }
970
971 struct threadargs {
972   Task *task;
973   Scheduler *sched;
974   Executor *execInst;
975 };
976
977 //! Execute a Task in a thread
978 /*!
979  *  \param task  : Task to execute
980  *
981  *  Calls Scheduler::notifyFrom of main node (_mainSched) to notify start
982  *
983  *  Calls Executor::functionForTaskExecution in Thread
984  */
985
986 void Executor::launchTask(Task *task)
987 {
988   DEBTRACE("Executor::launchTask(Task *task)");
989   struct threadargs *args;
990   if(task->getState() != YACS::TOACTIVATE)return;
991
992   DEBTRACE("before _semForMaxThreads.wait " << _semThreadCnt);
993   if(_semThreadCnt == 0)
994     {
995       //check if we have enough threads to run
996       std::set<Task*> tmpSet=_runningTasks;
997       std::set<Task*>::iterator it = tmpSet.begin();
998       std::string status="running";
999       std::set<Task*> coupledSet;
1000       while( it != tmpSet.end() )
1001         {
1002           Task* tt=*it;
1003           coupledSet.clear();
1004           tt->getCoupledTasks(coupledSet);
1005           status="running";
1006           for (std::set<Task*>::iterator iter = coupledSet.begin(); iter != coupledSet.end(); ++iter)
1007             {
1008               if((*iter)->getState() == YACS::TOACTIVATE)status="toactivate";
1009               tmpSet.erase(*iter);
1010             }
1011           if(status=="running")break;
1012           it = tmpSet.begin();
1013         }
1014
1015       if(status=="toactivate")
1016         {
1017           std::cerr << "WARNING: maybe you need more threads to run your schema (current value="<< _maxThreads << ")" << std::endl;
1018           std::cerr << "If it is the case, set the YACS_MAX_THREADS environment variable to a bigger value (export YACS_MAX_THREADS=xxx)" << std::endl;
1019         }
1020     }
1021
1022   _semForMaxThreads.wait();
1023   _semThreadCnt -= 1;
1024
1025   args= new threadargs;
1026   args->task = task;
1027   args->sched = _mainSched;
1028   args->execInst = this;
1029
1030   traceExec(task, "launch");
1031
1032   { // --- Critical section
1033     _mutexForSchedulerUpdate.lock();
1034     _numberOfRunningTasks++;
1035     _runningTasks.insert(task);
1036     task->begin(); //change state to ACTIVATED
1037     _mutexForSchedulerUpdate.unlock();
1038   } // --- End of critical section
1039   Thread(functionForTaskExecution, args, _threadStackSize);
1040 }
1041
1042 //! wait until a running task ends
1043
1044 void Executor::sleepWhileNoEventsFromAnyRunningTask()
1045 {
1046   DEBTRACE("Executor::sleepWhileNoEventsFromAnyRunningTask()");
1047 //   _semForNewTasksToPerform.wait(); //----utiliser pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
1048   _mutexForSchedulerUpdate.lock();
1049   if (_numberOfRunningTasks > 0 && _numberOfEndedTasks==0)
1050     {
1051       _isWaitingEventsFromRunningTasks = true;
1052       _condForNewTasksToPerform.wait(_mutexForSchedulerUpdate); // mutex released during wait
1053     }
1054   _numberOfEndedTasks=0;
1055   _mutexForSchedulerUpdate.unlock();
1056   DEBTRACE("---");
1057 }
1058
1059 //! not implemented
1060
1061 void Executor::notifyEndOfThread(YACS::BASES::Thread *thread)
1062 {
1063   /*_mutexForNbOfConcurrentThreads.lock();
1064   _groupOfAllThreadsCreated.remove(thread);
1065   delete thread;
1066   _mutexForNbOfConcurrentThreads.unlock();*/
1067 }
1068
1069
1070 //! must be used protected by _mutexForSchedulerUpdate!
1071
1072 void Executor::wakeUp()
1073 {
1074   DEBTRACE("Executor::wakeUp() " << _isWaitingEventsFromRunningTasks);
1075   if (_isWaitingEventsFromRunningTasks)
1076     {
1077       _isWaitingEventsFromRunningTasks = false;
1078       _condForNewTasksToPerform.notify_all();
1079     }
1080   else
1081     _numberOfEndedTasks++;
1082 }
1083
1084 //! number of running tasks
1085
1086 int Executor::getNbOfThreads()
1087 {
1088   int ret;
1089   _mutexForNbOfConcurrentThreads.lock();
1090   _isRunningunderExternalControl=true;
1091   ret = _groupOfAllThreadsCreated.size();
1092   _mutexForNbOfConcurrentThreads.unlock();
1093   return ret;
1094 }
1095
1096
1097 //! Function to perform execution of a task in a thread
1098 /*!
1099  *  \param arg  : 3 elements (a Task, a Scheduler, an Executor)
1100  *
1101  *  Calls Task::execute
1102  *
1103  *  Calls Task::finished when the task is finished
1104  *
1105  *  Calls (notify with event YACS::FINISH) Scheduler::notifyFrom when the task is finished
1106  *
1107  *  Calls Executor::wakeUp and Executor::notifyEndOfThread
1108  */
1109
1110 void *Executor::functionForTaskExecution(void *arg)
1111 {
1112   DEBTRACE("Executor::functionForTaskExecution(void *arg)");
1113
1114   struct threadargs *args = (struct threadargs *) arg;
1115   Task *task=args->task;
1116   Scheduler *sched=args->sched;
1117   Executor *execInst=args->execInst;
1118   delete args;
1119   execInst->traceExec(task, "state:"+Node::getStateName(task->getState()));
1120
1121   Thread::detach();
1122
1123   // Execute task
1124
1125   YACS::Event ev=YACS::FINISH;
1126   try
1127     {
1128       execInst->traceExec(task, "start execution");
1129       task->execute();
1130       execInst->traceExec(task, "end execution OK");
1131     }
1132   catch(Exception& ex)
1133     {
1134       std::cerr << "YACS Exception during execute" << std::endl;
1135       std::cerr << ex.what() << std::endl;
1136       ev=YACS::ABORT;
1137       string message = "end execution ABORT, ";
1138       message += ex.what();
1139       execInst->traceExec(task, message);
1140     }
1141   catch(...) 
1142     {
1143       // Execution has failed
1144       std::cerr << "Execution has failed: unknown reason" << std::endl;
1145       ev=YACS::ABORT;
1146       execInst->traceExec(task, "end execution ABORT, unknown reason");
1147     }
1148
1149   // Disconnect task
1150   try
1151     {
1152       DEBTRACE("task->disconnectService()");
1153       task->disconnectService();
1154       execInst->traceExec(task, "disconnectService");
1155     }
1156   catch(...) 
1157     {
1158       // Disconnect has failed
1159       std::cerr << "disconnect has failed" << std::endl;
1160       ev=YACS::ABORT;
1161       execInst->traceExec(task, "disconnectService failed, ABORT");
1162     }
1163
1164   DEBTRACE("End task->execute()");
1165   { // --- Critical section
1166     execInst->_mutexForSchedulerUpdate.lock();
1167     try
1168       {
1169         if (ev == YACS::FINISH) task->finished();
1170         if (ev == YACS::ABORT)
1171           {
1172             execInst->_errorDetected = true;
1173             if (execInst->_stopOnErrorRequested)
1174               {
1175                 execInst->_execMode = YACS::STEPBYSTEP;
1176                 execInst->_isOKToEnd = true;
1177               }
1178             task->aborted();
1179           }
1180         execInst->traceExec(task, "state:"+Node::getStateName(task->getState()));
1181         sched->notifyFrom(task,ev);
1182       }
1183     catch(Exception& ex)
1184       {
1185         //notify has failed : it is supposed to have set state
1186         //so no need to do anything
1187         std::cerr << "Error during notification" << std::endl;
1188         std::cerr << ex.what() << std::endl;
1189       }
1190     catch(...)
1191       {
1192         //notify has failed : it is supposed to have set state
1193         //so no need to do anything
1194         std::cerr << "Notification failed" << std::endl;
1195       }
1196     execInst->_numberOfRunningTasks--;
1197     execInst->_runningTasks.erase(task);
1198     DEBTRACE("_numberOfRunningTasks: " << execInst->_numberOfRunningTasks 
1199              << " _execMode: " << execInst->_execMode
1200              << " _executorState: " << execInst->_executorState);
1201     if ((execInst->_numberOfRunningTasks == 0) && (execInst->_execMode != YACS::CONTINUE)) // no more running tasks
1202       {
1203         if (execInst->_executorState == YACS::WAITINGTASKS)
1204           {
1205             execInst->_executorState = YACS::PAUSED;
1206             execInst->sendEvent("executor");
1207             execInst->_condForPilot.notify_all();
1208             if (execInst->_errorDetected &&
1209                 execInst->_stopOnErrorRequested &&
1210                 !execInst->_isRunningunderExternalControl)
1211               execInst->_condForStepByStep.notify_all(); // exec thread may be on waitResume
1212           }
1213       }
1214     DEBTRACE("before _semForMaxThreads.post " << execInst->_semThreadCnt);
1215     execInst->_semForMaxThreads.post();
1216     execInst->_semThreadCnt += 1;
1217     DEBTRACE("after _semForMaxThreads.post " << execInst->_semThreadCnt);
1218     if (execInst->_executorState != YACS::PAUSED)  execInst->wakeUp();
1219
1220     execInst->_mutexForSchedulerUpdate.unlock();
1221   } // --- End of critical section (change state)
1222
1223   //execInst->notifyEndOfThread(0);
1224   Thread::exit(0);
1225   return 0;
1226 }
1227
1228 void Executor::traceExec(Task *task, const std::string& message)
1229 {
1230   string nodeName = _mainSched->getTaskName(task);
1231   Container *cont = task->getContainer();
1232   string containerName = "---";
1233   string placement = "---";
1234   if (cont)
1235     {
1236       containerName = cont->getName();
1237       ComponentInstance *compo = task->getComponent();
1238       //if (compo)
1239       placement = cont->getFullPlacementId(compo);
1240     }
1241 #ifdef WIN32
1242   DWORD now = timeGetTime();
1243   double elapse = (now - _start)/1000.0;
1244 #else
1245   timeval now;
1246   gettimeofday(&now, NULL);
1247   double elapse = (now.tv_sec - _start.tv_sec) + double(now.tv_usec - _start.tv_usec)/1000000.0;
1248 #endif
1249   _mutexForTrace.lock();
1250   _trace << elapse << " " << containerName << " " << placement << " " << nodeName << " " << message << endl;
1251   _trace << flush;
1252   _mutexForTrace.unlock();
1253 }
1254
1255 //! emit notification to all observers registered with  the dispatcher 
1256 /*!
1257  * The dispatcher is unique and can be obtained by getDispatcher()
1258  */
1259 void Executor::sendEvent(const std::string& event)
1260 {
1261   Dispatcher* disp=Dispatcher::getDispatcher();
1262   YASSERT(disp);
1263   YASSERT(_root);
1264   disp->dispatch(_root,event);
1265 }