Salome HOME
copy tag mergefrom_BR_V0_1_CC_Salome_04oct07
[modules/yacs.git] / src / engine / Executor.cxx
1 #include "Executor.hxx"
2 #include "Task.hxx"
3 #include "Scheduler.hxx"
4 #include "Dispatcher.hxx"
5
6 #include "VisitorSaveState.hxx"
7 #include "ComposedNode.hxx"
8
9 #include <iostream>
10 #include <fstream>
11 #include <sys/stat.h>
12 #include <cassert>
13
14 using namespace YACS::ENGINE;
15 using namespace std;
16
17 using YACS::BASES::Mutex;
18 using YACS::BASES::Thread;
19 using YACS::BASES::Semaphore;
20
21 //#define _DEVDEBUG_
22 #include "YacsTrace.hxx"
23
24 Executor::Executor():_nbOfConcurrentThreads(0), _semForMaxThreads(50)
25 {
26   _root=0;
27   _toContinue = true;
28   _isOKToEnd = false;
29   _stopOnErrorRequested = false;
30   _dumpOnErrorRequested = false;
31   _errorDetected = false;
32   _isRunningunderExternalControl=false;
33   _executorState = YACS::NOTYETINITIALIZED;
34   _execMode = YACS::CONTINUE;
35   _semThreadCnt = 50;
36 }
37
38 Executor::~Executor()
39 {
40   for(list<Thread *>::iterator iter=_groupOfAllThreadsCreated.begin();iter!=_groupOfAllThreadsCreated.end();iter++)
41     delete *iter;
42 }
43
44 //! Execute a graph waiting for completion
45 /*!
46  *  \param graph : schema to execute
47  *  \param debug : display the graph with dot if debug == 1
48  *
49  *  Calls Scheduler::getNextTasks and Scheduler::selectRunnableTasks to select tasks to execute
50  *  
51  *  Calls Executor::launchTask to execute a selected Task.
52  *
53  *  Completion when graph is finished (Scheduler::isFinished)
54  */
55
56 void Executor::RunA(Scheduler *graph,int debug, bool fromScratch)
57 {
58   DEBTRACE("Executor::RunW debug: " << debug << " fromScratch: " << fromScratch);
59   _mainSched=graph;
60   _root = dynamic_cast<ComposedNode *>(_mainSched);
61   if (!_root) throw Exception("Executor::Run, Internal Error!");
62   bool isMore;
63   int i=0;
64   if(debug>1)_displayDot(graph);
65   if (fromScratch)
66     {
67       graph->init();
68       graph->exUpdateState();
69     }
70   if(debug>1)_displayDot(graph);
71   vector<Task *> tasks;
72   vector<Task *>::iterator iter;
73   _toContinue=true;
74   _execMode = YACS::CONTINUE;
75   _isWaitingEventsFromRunningTasks = false;
76   _numberOfRunningTasks = 0;
77   while(_toContinue)
78     {
79       sleepWhileNoEventsFromAnyRunningTask();
80
81       if(debug>2)_displayDot(graph);
82
83       {//Critical section
84         _mutexForSchedulerUpdate.lock();
85         tasks=graph->getNextTasks(isMore);
86         graph->selectRunnableTasks(tasks);
87         _mutexForSchedulerUpdate.unlock();
88       }//End of critical section
89
90       if(debug>2)_displayDot(graph);
91
92       for(iter=tasks.begin();iter!=tasks.end();iter++)
93         loadTask(*iter);
94
95       if(debug>1)_displayDot(graph);
96
97       launchTasks(tasks);
98
99       if(debug>1)_displayDot(graph);
100
101       {//Critical section
102         _mutexForSchedulerUpdate.lock();
103         _toContinue=!graph->isFinished();
104         _mutexForSchedulerUpdate.unlock();
105       }//End of critical section
106       DEBTRACE("_toContinue: " << _toContinue);
107
108       if(debug>0)_displayDot(graph);
109
110       i++;
111     }
112 }
113
114 //! Execute a graph with breakpoints or step by step
115 /*!
116  *  To be launch in a thread (main thread controls the progression).
117  *  \param graph : schema to execute
118  *  \param debug : display the graph with dot if debug >0
119  *  \param fromscratch : if false, state from a previous partial exection is already loaded
120  *
121  *  Calls Scheduler::getNextTasks and Scheduler::selectRunnableTasks to select tasks to execute
122  *  
123  *  Calls Executor::checkBreakPoints to verify if a pause is requested 
124  * 
125  *  Calls Executor::launchTask to execute a selected Task
126  *
127  *  Completion when graph is finished (Scheduler::isFinished)
128  *
129  *  States of execution:
130  *  - YACS::NOTYETINITIALIZED
131  *  - YACS::INITIALISED
132  *  - YACS::RUNNING            (to next breakpoint or step)
133  *  - YACS::WAITINGTASKS       (a breakpoint or step as been reached, but there are still running tasks)
134  *  - YACS::PAUSED             (a breakpoint or step as been reached, no more running tasks)
135  *  - YACS::FINISHED           (no more ready tasks, nore running tasks)
136  *  - YACS::STOPPED            (stopped by user before end)
137  *
138  *  Modes of Execution:
139  *  - YACS::CONTINUE           (normal run without breakpoints)
140  *  - YACS::STEPBYSTEP         (pause at each loop)
141  *  - YACS::STOPBEFORENODES    (pause when a node is reached)
142  *
143  *  A breakpoint is defined by a node name. The breakpoint is reached when the node becomes ready.
144  *  Step by Step means execution node by node or group of node by group of nodes.
145  *  At a given step, the user decides to launch all the ready nodes or only a subset
146  *  (Caution: some nodes must run in parallel). 
147  *  The next event (end of task) may give a new set of ready nodes, and define a new step.
148  *
149  *  The graph execution may be controled by a pilot which sends requests. Requests are asynchronous.
150  *  Requests are taken into account only on certain states, otherwise return the status IgnoredRequest.
151  *  - Executor::getCurrentExecMode
152  *  - Executor::getExecutorState
153  *  - Executor::setExecMode             : change the execution mode for next loop
154  *  - Executor::setListOfBreakPoints    : must be set before setting YACS::STOPBEFORENODES
155  *  - Executor::getTasksToLoad          : when paused or waiting tasks, get the list of next tasks
156  *  - Executor::setStepsToExecute       : define a subset of the list given by Executor::getTasksToLoad
157  *  - Executor::resumeCurrentBreakPoint : when paused or waiting tasks, resumes execution
158  *  - Executor::isNotFinished
159  *  - Executor::stopExecution           : stop execution asap, i.e. set STEPBYSTEP and wait PAUSED
160  *  - Executor::saveState               : dump the current state of execution in an xml file
161  *  - Executor::loadState               : Not yet implemented
162  *  - Executor::getNbOfThreads
163  *  - Executor::displayDot
164  *  - Executor::setStopOnError          : ask to stop execution if a node is found in ERROR state
165  *
166  *  If the pilot wants to wait the state YACS::PAUSED or YACS::WAITINGTASKS, synchronisation is obtained with:
167  *  - Executor::waitPause
168  *
169  *  TO BE VALIDATED:
170  *  - Pilot may connect to executor during execution, or deconnect.
171  *  - Several Pilots may be connected at the same time (for observation...)
172  * 
173  */
174
175 void Executor::RunB(Scheduler *graph,int debug, bool fromScratch)
176 {
177   DEBTRACE("Executor::RunB debug: "<< graph->getName() <<" "<< debug<<" fromScratch: "<<fromScratch);
178
179   { // --- Critical section
180     _mutexForSchedulerUpdate.lock();
181     _mainSched = graph;
182     _root = dynamic_cast<ComposedNode *>(_mainSched);
183     if (!_root) throw Exception("Executor::Run, Internal Error!");
184     _executorState = YACS::NOTYETINITIALIZED;
185     sendEvent("executor");
186     _toContinue=true;
187     _isOKToEnd = false;
188     _errorDetected = false;
189     _isWaitingEventsFromRunningTasks = false;
190     _numberOfRunningTasks = 0;
191     string tracefile = "traceExec_";
192     tracefile += _mainSched->getName();
193     _trace.open(tracefile.c_str());
194     _mutexForSchedulerUpdate.unlock();
195   } // --- End of critical section
196
197   if (debug > 1) _displayDot(graph);
198
199   if (fromScratch)
200     {
201       graph->init();
202       graph->exUpdateState();
203     }
204   _executorState = YACS::INITIALISED;
205   sendEvent("executor");
206
207   if (debug > 1) _displayDot(graph);
208
209   vector<Task *>::iterator iter;
210   bool isMore;
211   int problemCount=0;
212   int numberAllTasks;
213
214   _executorState = YACS::RUNNING;
215   sendEvent("executor");
216   while (_toContinue)
217     {
218       DEBTRACE("--- executor main loop");
219       sleepWhileNoEventsFromAnyRunningTask();
220       DEBTRACE("--- events...");
221       if (debug > 2) _displayDot(graph);
222       { // --- Critical section
223         _mutexForSchedulerUpdate.lock();
224         _tasks=graph->getNextTasks(isMore);
225         numberAllTasks=_numberOfRunningTasks+_tasks.size();
226         graph->selectRunnableTasks(_tasks);
227         _mutexForSchedulerUpdate.unlock();
228       } // --- End of critical section
229       if (debug > 2) _displayDot(graph);
230       if (_executorState == YACS::RUNNING)
231         {
232           if (checkBreakPoints()) break; // end of thread requested, OK to exit at once;
233           if (debug > 0) _displayDot(graph);
234           DEBTRACE("---");
235           for (iter = _tasks.begin(); iter != _tasks.end(); iter++)
236             loadTask(*iter);
237           if (debug > 1) _displayDot(graph);
238           DEBTRACE("---");
239           launchTasks(_tasks);
240           DEBTRACE("---");
241         }
242       if (debug > 1) _displayDot(graph);
243       { // --- Critical section
244         DEBTRACE("---");
245         _mutexForSchedulerUpdate.lock();
246         _toContinue = !graph->isFinished();
247
248         if(_toContinue && numberAllTasks==0)
249           {
250             //Problem : no running tasks and no task to launch ??
251             problemCount++;
252             std::cerr << "Problem in Executor : no running tasks and no task to launch ?? problemCount=" << problemCount << std::endl;
253             //Pause to give a chance to interrupt
254             usleep(1000);
255             if(problemCount > 25)
256               {
257                 // Too much problems encountered : stop execution
258                 _toContinue=false;
259               }
260           }
261
262         if (! _toContinue)
263           {
264             _executorState = YACS::FINISHED;
265             sendEvent("executor");
266             _condForPilot.notify_all();
267           }
268         _mutexForSchedulerUpdate.unlock();
269       } // --- End of critical section
270       if (debug > 0) _displayDot(graph);
271       DEBTRACE("_toContinue: " << _toContinue);
272     }
273
274   DEBTRACE("End of main Loop");
275
276   { // --- Critical section
277     _mutexForSchedulerUpdate.lock();
278     if ( _toContinue) // --- break while(): request to stop detected on checkBreakPoints()
279       {
280         DEBTRACE("stop requested: End soon");
281         _executorState = YACS::STOPPED;
282         _toContinue = false;
283         sendEvent("executor");
284       }
285     _mutexForSchedulerUpdate.unlock();
286   } // --- End of critical section
287   if ( _dumpOnErrorRequested && _errorDetected && !_isRunningunderExternalControl)
288     {
289       saveState(_dumpErrorFile);
290     }
291   _trace.close();
292   DEBTRACE("End of RunB thread");  
293 }
294
295 YACS::ExecutionMode Executor::getCurrentExecMode()
296 {
297   _isRunningunderExternalControl=true;
298   return _execMode;
299 }
300
301
302 YACS::ExecutorState Executor::getExecutorState()
303 {
304   _isRunningunderExternalControl=true;
305   return _executorState;
306 }
307
308
309 bool Executor::isNotFinished()
310 {
311   _isRunningunderExternalControl=true;
312   return _toContinue;
313 }
314
315 //! ask to stop execution on the first node found in error
316 /*!
317  * \param dumpRequested   produce a state dump when an error is found
318  * \param xmlFile         name of file used for state dump
319  */
320
321 void Executor::setStopOnError(bool dumpRequested, std::string xmlFile)
322 {
323   { // --- Critical section
324     _mutexForSchedulerUpdate.lock();
325     _dumpErrorFile=xmlFile;
326     _stopOnErrorRequested=true;
327     _dumpOnErrorRequested = dumpRequested;
328     if (dumpRequested && xmlFile.empty())
329       throw YACS::Exception("dump on error requested and no filename given for dump");
330     _mutexForSchedulerUpdate.unlock();
331   } // --- End of critical section
332 }
333
334 //! Dynamically set the current mode of execution
335 /*!
336  * The mode can be Continue, step by step, or stop before execution of a node
337  * defined in a list of breakpoints.
338  */
339
340 void Executor::setExecMode(YACS::ExecutionMode mode)
341 {
342   DEBTRACE("Executor::setExecMode(YACS::ExecutionMode mode) " << mode);
343   { // --- Critical section
344     _mutexForSchedulerUpdate.lock();
345     _isRunningunderExternalControl=true;
346     _execMode = mode;
347     _mutexForSchedulerUpdate.unlock();
348   } // --- End of critical section
349 }
350
351 //! wake up executor when in pause
352 /*!
353  * When Executor is in state paused or waiting for task completion, the thread
354  * running loop RunB waits on condition _condForStepByStep.
355  * Thread RunB is waken up.
356  * \return true when actually wakes up executor
357  */
358
359 bool Executor::resumeCurrentBreakPoint()
360 {
361   DEBTRACE("Executor::resumeCurrentBreakPoint()");
362   bool ret = false;
363   //bool doDump = false;
364   { // --- Critical section
365     _mutexForSchedulerUpdate.lock();
366     _isRunningunderExternalControl=true;
367     DEBTRACE("_executorState: " << _executorState);
368     switch (_executorState)
369       {
370       case YACS::WAITINGTASKS:
371       case YACS::PAUSED:
372         {
373           _condForStepByStep.notify_all();
374           _executorState = YACS::RUNNING;
375           sendEvent("executor");
376           ret = true;
377           //if (_dumpOnErrorRequested && _errorDetected) doDump =true;
378           break;
379         }
380       case YACS::FINISHED:
381       case YACS::STOPPED:
382         {
383           //if (_dumpOnErrorRequested && _errorDetected) doDump =true;
384           DEBTRACE("Graph Execution finished or stopped !");
385           break;
386         }
387       default :
388         {
389           // debug: no easy way to verify if main loop is acutally waiting on condition
390         }
391       }
392     _mutexForSchedulerUpdate.unlock();
393     DEBTRACE("---");
394     //if (doDump) saveState(_dumpErrorFile);
395   } // --- End of critical section
396   return ret;
397 }
398
399
400 //! define a list of nodes names as breakpoints in the graph
401
402
403 void Executor::setListOfBreakPoints(std::list<std::string> listOfBreakPoints)
404 {
405   DEBTRACE("Executor::setListOfBreakPoints(std::list<std::string> listOfBreakPoints)");
406   { // --- Critical section
407     _mutexForSchedulerUpdate.lock();
408     _isRunningunderExternalControl=true;
409     _listOfBreakPoints = listOfBreakPoints;
410     _mutexForSchedulerUpdate.unlock();
411   } // --- End of critical section
412 }
413
414
415 //! Get the list of tasks to load, to define a subset to execute in step by step mode
416 /*!
417  *  If the executor is not in mode YACS::WAITINGTASKS nor YACS::PAUSED, the list is empty.
418  *  Use Executor::waitPause to wait.
419  */
420 std::list<std::string> Executor::getTasksToLoad()
421 {
422   DEBTRACE("Executor::getTasksToLoad()");
423   list<string> listOfNodesToLoad;
424   listOfNodesToLoad.clear();
425   { // --- Critical section
426     _mutexForSchedulerUpdate.lock();
427     _isRunningunderExternalControl=true;
428     switch (_executorState)
429       {
430       case YACS::WAITINGTASKS:
431       case YACS::PAUSED:
432         {
433           listOfNodesToLoad = _listOfTasksToLoad;
434           break;
435         }
436       case YACS::NOTYETINITIALIZED:
437       case YACS::INITIALISED:
438       case YACS::RUNNING:
439       case YACS::FINISHED:
440       case YACS::STOPPED:
441       default:
442         {
443           break;
444         }
445       }
446     _mutexForSchedulerUpdate.unlock();
447   } // --- End of critical section
448   return listOfNodesToLoad;
449 }
450
451
452 //! Define a subset of task to execute in step by step mode
453 /*!
454  * Behaviour is unpredictable if the list is not a subset of the list given by Executor::getTasksToLoad
455  * in the current step.
456  * If some nodes must run in parallel, they must stay together in the list.
457  */
458
459 bool Executor::setStepsToExecute(std::list<std::string> listToExecute)
460 {
461   DEBTRACE("Executor::setStepsToExecute(std::list<std::string> listToExecute)");
462   bool ret = false;
463   vector<Task *>::iterator iter;
464   vector<Task *> restrictedTasks;
465   { // --- Critical section
466     _mutexForSchedulerUpdate.lock();
467     _isRunningunderExternalControl=true;
468     switch (_executorState)
469       {
470       case YACS::WAITINGTASKS:
471       case YACS::PAUSED:
472         {
473           for (iter=_tasksSave.begin(); iter!=_tasksSave.end(); iter++)
474             {
475               string readyNode = _mainSched->getTaskName(*iter);
476               if (find(listToExecute.begin(), listToExecute.end(), readyNode)
477                   != listToExecute.end())
478                 {
479                   restrictedTasks.push_back(*iter);
480                   DEBTRACE("node to execute " << readyNode);
481                 }
482             }
483           _tasks.clear();
484           for (iter=restrictedTasks.begin(); iter!=restrictedTasks.end(); iter++)
485             {
486               _tasks.push_back(*iter);
487             }
488           break;
489         }
490       case YACS::NOTYETINITIALIZED:
491       case YACS::INITIALISED:
492       case YACS::RUNNING:
493       case YACS::FINISHED:
494       case YACS::STOPPED:
495       default:
496         {
497           break;
498         }
499       }
500     _mutexForSchedulerUpdate.unlock();
501     } // --- End of critical section
502
503   _tasks.clear();
504   for (iter=restrictedTasks.begin(); iter!=restrictedTasks.end(); iter++)
505     {
506       _tasks.push_back(*iter);
507     }
508   for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
509     {
510       string readyNode = _mainSched->getTaskName(*iter);
511       DEBTRACE("selected node to execute " << readyNode);
512     }
513
514 }
515
516 //! suspend pilot execution until Executor is in pause or waiting tasks completion mode.
517 /*!
518  *  Do nothing if execution is finished or in pause.
519  *  Wait first step if Executor is running or in initialization.
520  */
521
522 void Executor::waitPause()
523 {
524   DEBTRACE("Executor::waitPause()");
525   { // --- Critical section
526     _mutexForSchedulerUpdate.lock();
527     _isRunningunderExternalControl=true;
528     switch (_executorState)
529       {
530       default:
531       case YACS::STOPPED:
532       case YACS::FINISHED:
533       case YACS::WAITINGTASKS:
534       case YACS::PAUSED:
535         {
536           break;
537         }
538       case YACS::NOTYETINITIALIZED:
539       case YACS::INITIALISED:
540       case YACS::RUNNING:
541         {
542           _condForPilot.wait(_mutexForSchedulerUpdate); // wait until executor is PAUSED or WAITINGTASKS
543           break;
544         }
545       }
546     _mutexForSchedulerUpdate.unlock();
547   } // --- End of critical section
548   DEBTRACE("---");
549 }
550
551 //! stops the execution as soon as possible 
552
553 void Executor::stopExecution()
554 {
555   setExecMode(YACS::STEPBYSTEP);
556   //waitPause();
557   _isOKToEnd = true;
558   resumeCurrentBreakPoint();
559 }
560
561 //! save the current state of execution in an xml file
562
563 bool Executor::saveState(const std::string& xmlFile)
564 {
565   DEBTRACE("Executor::saveState() in " << xmlFile);
566   YACS::ENGINE::VisitorSaveState vst(_root);
567   vst.openFileDump(xmlFile.c_str());
568   _root->accept(&vst);
569   vst.closeFileDump();
570 }
571
572 //! not yet implemented
573
574 bool Executor::loadState()
575 {
576   DEBTRACE("Executor::loadState()");
577   _isRunningunderExternalControl=true;
578 }
579
580
581 static int isfile(char *filename) 
582 {
583   struct stat buf;
584   if (stat(filename, &buf) != 0)
585     return 0;
586   if (!S_ISREG(buf.st_mode))
587     return 0;
588   return 1;
589 }
590
591 //! Display the graph state as a dot display, public method
592
593 void Executor::displayDot(Scheduler *graph)
594 {
595   _isRunningunderExternalControl=true;
596   _displayDot(graph);
597 }
598
599 //! Display the graph state as a dot display
600 /*!
601  *  \param graph  : the node to display
602  */
603
604 void Executor::_displayDot(Scheduler *graph)
605 {
606    std::ofstream g("titi");
607    ((ComposedNode*)graph)->writeDot(g);
608    g.close();
609    if(isfile("display.sh"))
610      system("sh display.sh");
611    else
612      system("dot -Tpng titi|display -delay 5");
613 }
614
615 //! Wait reactivation in modes Step By step or with BreakPoints
616 /*!
617  *  Check mode of execution (set by main thread):
618  *  - YACS::CONTINUE        : the graph execution continues.
619  *  - YACS::STEPBYSTEP      : wait on condition (pilot thread, Executor::resumeCurrentBreakPoint)
620  *  - YACS::STOPBEFORENODES : if there are ready nodes in a list of breakpoints,
621  *                            wait on condition (pilot thread, Executor::resumeCurrentBreakPoint)
622  *                            else continue the graph execution.
623  *  \return true if end of executor thread is requested
624  */
625
626 bool Executor::checkBreakPoints()
627 {
628   DEBTRACE("Executor::checkBreakPoints()");
629   vector<Task *>::iterator iter;
630   bool endRequested = false;
631
632   switch (_execMode)
633     {
634     case YACS::CONTINUE:
635       {
636         break;
637       }
638     case YACS::STOPBEFORENODES:
639       {
640         bool stop = false;
641         { // --- Critical section
642           _mutexForSchedulerUpdate.lock();
643           _tasksSave = _tasks;
644           for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
645             {
646               string nodeToLoad = _mainSched->getTaskName(*iter);
647               if (find(_listOfBreakPoints.begin(), _listOfBreakPoints.end(), nodeToLoad)
648                   != _listOfBreakPoints.end())
649                 {
650                   stop = true;
651                   break;
652                 }
653             }
654           if (stop)
655             {
656               _listOfTasksToLoad.clear();
657               for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
658                 {
659                   string nodeToLoad = _mainSched->getTaskName(*iter);
660                   _listOfTasksToLoad.push_back(nodeToLoad);
661                 }
662               if (getNbOfThreads())
663                 _executorState = YACS::WAITINGTASKS; // will be paused after completion of running tasks
664               else
665                 _executorState = YACS::PAUSED;
666               sendEvent("executor");
667               _condForPilot.notify_all();
668             }
669           //_mutexForSchedulerUpdate.unlock(); 
670           //} // --- End of critical section
671           if (stop && !_isOKToEnd) waitResume(); // wait until pilot calls resumeCurrentBreakPoint(), mutex released during wait 
672           if (_isOKToEnd) endRequested = true;
673           _mutexForSchedulerUpdate.unlock();
674         } // --- End of critical section
675           if (stop) DEBTRACE("wake up from waitResume");
676         break;
677       }
678     default:
679     case YACS::STEPBYSTEP:
680       {
681         { // --- Critical section
682           _mutexForSchedulerUpdate.lock();
683           _tasksSave = _tasks;
684           _listOfTasksToLoad.clear();
685           for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
686             {
687               string nodeToLoad = _mainSched->getTaskName(*iter);
688               _listOfTasksToLoad.push_back(nodeToLoad);
689             }
690           if (getNbOfThreads())
691             _executorState = YACS::WAITINGTASKS; // will be paused after completion of running tasks
692           else
693             _executorState = YACS::PAUSED;
694           sendEvent("executor");
695           _condForPilot.notify_all();
696           if (!_isOKToEnd)
697             waitResume(); // wait until pilot calls resumeCurrentBreakPoint(), mutex released during wait
698                           // or, if no pilot, wait until no more running tasks (stop on error)
699           if (_isOKToEnd) endRequested = true;
700           _mutexForSchedulerUpdate.unlock();
701         } // --- End of critical section
702         DEBTRACE("wake up from waitResume");
703         break;
704       }
705     }
706   DEBTRACE("endRequested: " << endRequested);
707   return endRequested;
708 }
709
710
711 //! in modes Step By step or with BreakPoint, wait until pilot resumes the execution
712 /*!
713  *  With the condition Mutex, the mutex is released atomically during the wait.
714  *  Pilot calls Executor::resumeCurrentBreakPoint to resume execution.
715  *  Must be called while mutex is locked.
716  */
717
718 void Executor::waitResume()
719 {
720   DEBTRACE("Executor::waitResume()");
721   _condForStepByStep.wait(_mutexForSchedulerUpdate); // wait until pilot calls resumeCurrentBreakPoint()
722   DEBTRACE("---");
723 }
724
725
726 //! Perform loading of a Task.
727 /*!
728  *  \param task  : Task to load
729  */
730
731 void Executor::loadTask(Task *task)
732 {
733   DEBTRACE("Executor::loadTask(Task *task)");
734   if(task->getState() != YACS::TOLOAD)return;
735   {//Critical section
736     _mutexForSchedulerUpdate.lock();
737     task->loaded();
738     _mainSched->notifyFrom(task,YACS::START);
739     _mutexForSchedulerUpdate.unlock();
740   }//End of critical section
741   try
742     {
743       task->load();
744     }
745   catch(Exception& ex) 
746     {
747       std::cerr << ex.what() << std::endl;
748       {//Critical section
749         _mutexForSchedulerUpdate.lock();
750         task->aborted();
751         _mainSched->notifyFrom(task,YACS::ABORT);
752         _mutexForSchedulerUpdate.unlock();
753       }//End of critical section
754     }
755   catch(...) 
756     {
757       std::cerr << "Load failed" << std::endl;
758       {//Critical section
759         _mutexForSchedulerUpdate.lock();
760         task->aborted();
761         _mainSched->notifyFrom(task,YACS::ABORT);
762         _mutexForSchedulerUpdate.unlock();
763       }//End of critical section
764     }
765 }
766
767
768 //! Execute a list of tasks possibly connected through datastream links
769 /*!
770  *  \param tasks  : a list of tasks to execute
771  *
772  */
773 void Executor::launchTasks(std::vector<Task *>& tasks)
774 {
775   vector<Task *>::iterator iter;
776   //First phase, initialize the execution
777   for(iter=tasks.begin();iter!=tasks.end();iter++)
778     {
779       if((*iter)->getState() != YACS::TOACTIVATE)continue;
780       try
781         {
782           (*iter)->initService();
783           traceExec(*iter, "initService");
784         }
785       catch(Exception& ex) 
786         {
787           std::cerr << ex.what() << std::endl;
788           {//Critical section
789             _mutexForSchedulerUpdate.lock();
790             (*iter)->aborted();
791             _mainSched->notifyFrom(*iter,YACS::ABORT);
792             _mutexForSchedulerUpdate.unlock();
793           }//End of critical section
794         }
795       catch(...) 
796         {
797           std::cerr << "Problem in initService" << std::endl;
798           {//Critical section
799             _mutexForSchedulerUpdate.lock();
800             (*iter)->aborted();
801             _mainSched->notifyFrom(*iter,YACS::ABORT);
802             _mutexForSchedulerUpdate.unlock();
803           }//End of critical section
804         }
805     }
806   //Second phase, make datastream connections
807   for(iter=tasks.begin();iter!=tasks.end();iter++)
808     {
809       if((*iter)->getState() != YACS::TOACTIVATE)continue;
810       try
811         {
812           (*iter)->connectService();
813           traceExec(*iter, "connectService");
814         }
815       catch(Exception& ex) 
816         {
817           std::cerr << ex.what() << std::endl;
818           {//Critical section
819             _mutexForSchedulerUpdate.lock();
820             (*iter)->aborted();
821             _mainSched->notifyFrom(*iter,YACS::ABORT);
822             _mutexForSchedulerUpdate.unlock();
823           }//End of critical section
824         }
825       catch(...) 
826         {
827           std::cerr << "Problem in connectService" << std::endl;
828           {//Critical section
829             _mutexForSchedulerUpdate.lock();
830             (*iter)->aborted();
831             _mainSched->notifyFrom(*iter,YACS::ABORT);
832             _mutexForSchedulerUpdate.unlock();
833           }//End of critical section
834         }
835     }
836   //Third phase, execute each task in a thread
837   for(iter=tasks.begin();iter!=tasks.end();iter++)
838     {
839       DEBTRACE("before _semForMaxThreads.wait " << _semThreadCnt);
840       _semForMaxThreads.wait();
841       _semThreadCnt -= 1;
842       launchTask(*iter);
843     }
844 }
845
846 //! Execute a Task in a thread
847 /*!
848  *  \param task  : Task to execute
849  *
850  *  Calls Scheduler::notifyFrom of main node (_mainSched) to notify start
851  *
852  *  Calls Executor::functionForTaskExecution in Thread
853  */
854
855 void Executor::launchTask(Task *task)
856 {
857   DEBTRACE("Executor::launchTask(Task *task)");
858   if(task->getState() != YACS::TOACTIVATE)return;
859   void **args=new void *[3];
860   args[0]=(void *)task;
861   args[1]=(void *)_mainSched;
862   args[2]=(void *)this;
863   traceExec(task, "launch");
864
865   { // --- Critical section
866     _mutexForSchedulerUpdate.lock();
867     _numberOfRunningTasks++;
868     task->begin(); //change state to ACTIVATED
869     //no more need : done when loading
870     //_mainSched->notifyFrom(task,YACS::START);
871     _mutexForSchedulerUpdate.unlock();
872   } // --- End of critical section
873   Thread(functionForTaskExecution,args);
874   //functionForTaskExecution(args);
875 }
876
877 //! wait until a running task ends
878
879 void Executor::sleepWhileNoEventsFromAnyRunningTask()
880 {
881   DEBTRACE("Executor::sleepWhileNoEventsFromAnyRunningTask()");
882 //   _semForNewTasksToPerform.wait(); //----utiliser pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
883   _mutexForSchedulerUpdate.lock();
884   if (_numberOfRunningTasks > 0)
885     {
886       _isWaitingEventsFromRunningTasks = true;
887       _condForNewTasksToPerform.wait(_mutexForSchedulerUpdate); // mutex released during wait
888     }
889   _mutexForSchedulerUpdate.unlock();
890   DEBTRACE("---");
891 }
892
893 //! not implemented
894
895 void Executor::notifyEndOfThread(YACS::BASES::Thread *thread)
896 {
897   /*_mutexForNbOfConcurrentThreads.lock();
898   _groupOfAllThreadsCreated.remove(thread);
899   delete thread;
900   _mutexForNbOfConcurrentThreads.unlock();*/
901 }
902
903
904 //! must be used protected by _mutexForSchedulerUpdate!
905
906 void Executor::wakeUp()
907 {
908   DEBTRACE("Executor::wakeUp()");
909   if (_isWaitingEventsFromRunningTasks)
910     {
911       _isWaitingEventsFromRunningTasks = false;
912       _condForNewTasksToPerform.notify_all();
913     }
914 }
915
916 //! number of running tasks
917
918 int Executor::getNbOfThreads()
919 {
920   int ret;
921   _mutexForNbOfConcurrentThreads.lock();
922   _isRunningunderExternalControl=true;
923   ret = _groupOfAllThreadsCreated.size();
924   _mutexForNbOfConcurrentThreads.unlock();
925   return ret;
926 }
927
928
929 //! Function to perform execution of a task in a thread
930 /*!
931  *  \param arg  : 3 elements (a Task, a Scheduler, an Executor)
932  *
933  *  Calls Task::execute
934  *
935  *  Calls Task::finished when the task is finished
936  *
937  *  Calls (notify with event YACS::FINISH) Scheduler::notifyFrom when the task is finished
938  *
939  *  Calls Executor::wakeUp and Executor::notifyEndOfThread
940  */
941
942 void *Executor::functionForTaskExecution(void *arg)
943 {
944   DEBTRACE("Executor::functionForTaskExecution(void *arg)");
945   void **argT=(void **)arg;
946   Task *task=(Task *)argT[0];
947   Scheduler *sched=(Scheduler *)argT[1];
948   Executor *execInst=(Executor *)argT[2];
949   delete [] argT;
950   Thread::detach();
951
952   // Execute task
953
954   YACS::Event ev=YACS::FINISH;
955   try
956     {
957       execInst->traceExec(task, "start execution");
958       task->execute();
959       execInst->traceExec(task, "end execution OK");
960     }
961   catch(Exception& ex)
962     {
963       std::cerr << "YACS Exception during execute" << std::endl;
964       std::cerr << ex.what() << std::endl;
965       ev=YACS::ABORT;
966       string message = "end execution ABORT, ";
967       message += ex.what();
968       execInst->traceExec(task, message);
969     }
970   catch(...) 
971     {
972       // Execution has failed
973       std::cerr << "Execution has failed: unknown reason" << std::endl;
974       ev=YACS::ABORT;
975       execInst->traceExec(task, "end execution ABORT, unknown reason");
976     }
977
978   // Disconnect task
979   try
980     {
981       task->disconnectService();
982       execInst->traceExec(task, "disconnectService");
983     }
984   catch(...) 
985     {
986       // Disconnect has failed
987       std::cerr << "disconnect has failed" << std::endl;
988       ev=YACS::ABORT;
989       execInst->traceExec(task, "disconnectService failed, ABORT");
990     }
991
992   DEBTRACE("End task->execute()");
993   { // --- Critical section
994     execInst->_mutexForSchedulerUpdate.lock();
995     try
996       {
997         if (ev == YACS::FINISH) task->finished();
998         if (ev == YACS::ABORT)
999           {
1000             execInst->_errorDetected = true;
1001             if (execInst->_stopOnErrorRequested)
1002               {
1003                 execInst->_execMode = YACS::STEPBYSTEP;
1004                 if (!execInst->_isRunningunderExternalControl) execInst->_isOKToEnd = true;
1005               }
1006             task->aborted();
1007           }
1008         sched->notifyFrom(task,ev);
1009       }
1010     catch(Exception& ex)
1011       {
1012         //notify has failed : it is supposed to have set state
1013         //so no need to do anything
1014         std::cerr << "Error during notification" << std::endl;
1015         std::cerr << ex.what() << std::endl;
1016       }
1017     catch(...)
1018       {
1019         //notify has failed : it is supposed to have set state
1020         //so no need to do anything
1021         std::cerr << "Notification failed" << std::endl;
1022       }
1023     execInst->_numberOfRunningTasks--;
1024     DEBTRACE("_numberOfRunningTasks: " << execInst->_numberOfRunningTasks 
1025              << " _execMode: " << execInst->_execMode
1026              << " _executorState: " << execInst->_executorState);
1027     if ((execInst->_numberOfRunningTasks == 0) && (execInst->_execMode != YACS::CONTINUE)) // no more running tasks
1028       {
1029         if (execInst->_executorState == YACS::WAITINGTASKS)
1030           {
1031             execInst->_executorState = YACS::PAUSED;
1032             execInst->sendEvent("executor");
1033             execInst->_condForPilot.notify_all();
1034             if (execInst->_errorDetected &&
1035                 execInst->_stopOnErrorRequested &&
1036                 !execInst->_isRunningunderExternalControl)
1037               execInst->_condForStepByStep.notify_all(); // exec thread may be on waitResume
1038           }
1039       }
1040     DEBTRACE("before _semForMaxThreads.post " << execInst->_semThreadCnt);
1041     execInst->_semForMaxThreads.post();
1042     execInst->_semThreadCnt += 1;
1043     DEBTRACE("after _semForMaxThreads.post " << execInst->_semThreadCnt);
1044     if (execInst->_executorState != YACS::PAUSED)  execInst->wakeUp();
1045
1046     execInst->_mutexForSchedulerUpdate.unlock();
1047   } // --- End of critical section (change state)
1048
1049   //execInst->notifyEndOfThread(0);
1050   Thread::exit(0);
1051   return 0;
1052 }
1053
1054 void Executor::traceExec(Task *task, const std::string& message)
1055 {
1056   string nodeName = _mainSched->getTaskName(task);
1057   _mutexForTrace.lock();
1058   _trace << nodeName << " " << message << endl;
1059   _trace << flush;
1060   _mutexForTrace.unlock();
1061 }
1062
1063 //! emit notification to all observers registered with  the dispatcher 
1064 /*!
1065  * The dispatcher is unique and can be obtained by getDispatcher()
1066  */
1067 void Executor::sendEvent(const std::string& event)
1068 {
1069   Dispatcher* disp=Dispatcher::getDispatcher();
1070   assert(disp);
1071   assert(_root);
1072   disp->dispatch(_root,event);
1073 }