]> SALOME platform Git repositories - modules/yacs.git/blob - src/engine/Executor.cxx
Salome HOME
615beff1fd5e17bcc3cd3482522e5f9d2ca67a3c
[modules/yacs.git] / src / engine / Executor.cxx
1 // Copyright (C) 2006-2012  CEA/DEN, EDF R&D
2 //
3 // This library is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU Lesser General Public
5 // License as published by the Free Software Foundation; either
6 // version 2.1 of the License.
7 //
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
11 // Lesser General Public License for more details.
12 //
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
16 //
17 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
18 //
19
20 #include "Executor.hxx"
21 #include "Task.hxx"
22 #include "Scheduler.hxx"
23 #include "Dispatcher.hxx"
24 #include "Container.hxx"
25 #include "ComponentInstance.hxx"
26
27 #include "VisitorSaveState.hxx"
28 #include "ComposedNode.hxx"
29
30 #include <iostream>
31 #include <fstream>
32 #include <sys/stat.h>
33 #ifndef WIN32
34 #include <sys/time.h>
35 #include <unistd.h>
36 #endif
37
38 #include <cstdlib>
39 #include <algorithm>
40
41 #ifdef WNT
42 #define usleep(A) _sleep(A/1000)
43 #if !defined(S_ISCHR) || !defined(S_ISREG)
44 #  ifndef S_IFMT
45 #    ifdef _S_IFMT
46 #      define S_IFMT _S_IFMT
47 #      define S_IFCHR _S_IFCHR
48 #      define S_IFREG _S_IFREG
49 #    else
50 #    ifdef __S_IFMT
51 #      define S_IFMT __S_IFMT
52 #      define S_IFCHR __S_IFCHR
53 #      define S_IFREG __S_IFREG
54 #    endif
55 #    endif
56 #  endif
57 #  define S_ISCHR(mode) (((mode) & S_IFMT) == S_IFCHR)
58 #  define S_ISREG(mode) (((mode) & S_IFMT) == S_IFREG)
59 #endif
60 #endif
61
62 using namespace YACS::ENGINE;
63 using namespace std;
64
65 using YACS::BASES::Mutex;
66 using YACS::BASES::Thread;
67 using YACS::BASES::Semaphore;
68
69 //#define _DEVDEBUG_
70 #include "YacsTrace.hxx"
71
72 int Executor::_maxThreads(50);
73
74 Executor::Executor():_nbOfConcurrentThreads(0), _semForMaxThreads(_maxThreads)
75 {
76   _root=0;
77   _toContinue = true;
78   _isOKToEnd = false;
79   _stopOnErrorRequested = false;
80   _dumpOnErrorRequested = false;
81   _errorDetected = false;
82   _isRunningunderExternalControl=false;
83   _executorState = YACS::NOTYETINITIALIZED;
84   _execMode = YACS::CONTINUE;
85   _semThreadCnt = _maxThreads;
86   DEBTRACE("Executor initialized with max threads = " << _maxThreads);
87 }
88
89 Executor::~Executor()
90 {
91   for(list<Thread *>::iterator iter=_groupOfAllThreadsCreated.begin();iter!=_groupOfAllThreadsCreated.end();iter++)
92     delete *iter;
93 }
94
95 //! Execute a graph waiting for completion
96 /*!
97  *  \param graph : schema to execute
98  *  \param debug : display the graph with dot if debug == 1
99  *  \param fromScratch : if true the graph is reinitialized
100  *
101  *  Calls Scheduler::getNextTasks and Scheduler::selectRunnableTasks to select tasks to execute
102  *  
103  *  Calls Executor::launchTask to execute a selected Task.
104  *
105  *  Completion when graph is finished (Scheduler::isFinished)
106  */
107
108 void Executor::RunA(Scheduler *graph,int debug, bool fromScratch)
109 {
110   DEBTRACE("Executor::RunW debug: " << debug << " fromScratch: " << fromScratch);
111   _mainSched=graph;
112   _root = dynamic_cast<ComposedNode *>(_mainSched);
113   if (!_root) throw Exception("Executor::Run, Internal Error!");
114   bool isMore;
115   int i=0;
116   if(debug>1)_displayDot(graph);
117   if (fromScratch)
118     {
119       graph->init();
120       graph->exUpdateState();
121     }
122   if(debug>1)_displayDot(graph);
123   vector<Task *> tasks;
124   vector<Task *>::iterator iter;
125   _toContinue=true;
126   _execMode = YACS::CONTINUE;
127   _isWaitingEventsFromRunningTasks = false;
128   _numberOfRunningTasks = 0;
129   _runningTasks.clear();
130   _numberOfEndedTasks=0;
131   while(_toContinue)
132     {
133       sleepWhileNoEventsFromAnyRunningTask();
134
135       if(debug>2)_displayDot(graph);
136
137       {//Critical section
138         _mutexForSchedulerUpdate.lock();
139         tasks=graph->getNextTasks(isMore);
140         graph->selectRunnableTasks(tasks);
141         _mutexForSchedulerUpdate.unlock();
142       }//End of critical section
143
144       if(debug>2)_displayDot(graph);
145
146       for(iter=tasks.begin();iter!=tasks.end();iter++)
147         loadTask(*iter);
148
149       if(debug>1)_displayDot(graph);
150
151       launchTasks(tasks);
152
153       if(debug>1)_displayDot(graph);
154
155       {//Critical section
156         _mutexForSchedulerUpdate.lock();
157         _toContinue=!graph->isFinished();
158         _mutexForSchedulerUpdate.unlock();
159       }//End of critical section
160       DEBTRACE("_toContinue: " << _toContinue);
161
162       if(debug>0)_displayDot(graph);
163
164       i++;
165     }
166 }
167
168 //! Execute a graph with breakpoints or step by step
169 /*!
170  *  To be launch in a thread (main thread controls the progression).
171  *  \param graph : schema to execute
172  *  \param debug : display the graph with dot if debug >0
173  *  \param fromScratch : if false, state from a previous partial exection is already loaded
174  *
175  *  Calls Scheduler::getNextTasks and Scheduler::selectRunnableTasks to select tasks to execute
176  *  
177  *  Calls Executor::checkBreakPoints to verify if a pause is requested 
178  * 
179  *  Calls Executor::launchTask to execute a selected Task
180  *
181  *  Completion when graph is finished (Scheduler::isFinished)
182  *
183  *  States of execution:
184  *  - YACS::NOTYETINITIALIZED
185  *  - YACS::INITIALISED
186  *  - YACS::RUNNING            (to next breakpoint or step)
187  *  - YACS::WAITINGTASKS       (a breakpoint or step as been reached, but there are still running tasks)
188  *  - YACS::PAUSED             (a breakpoint or step as been reached, no more running tasks)
189  *  - YACS::FINISHED           (no more ready tasks, nore running tasks)
190  *  - YACS::STOPPED            (stopped by user before end)
191  *
192  *  Modes of Execution:
193  *  - YACS::CONTINUE           (normal run without breakpoints)
194  *  - YACS::STEPBYSTEP         (pause at each loop)
195  *  - YACS::STOPBEFORENODES    (pause when a node is reached)
196  *
197  *  A breakpoint is defined by a node name. The breakpoint is reached when the node becomes ready.
198  *  Step by Step means execution node by node or group of node by group of nodes.
199  *  At a given step, the user decides to launch all the ready nodes or only a subset
200  *  (Caution: some nodes must run in parallel). 
201  *  The next event (end of task) may give a new set of ready nodes, and define a new step.
202  *
203  *  The graph execution may be controled by a pilot which sends requests. Requests are asynchronous.
204  *  Requests are taken into account only on certain states, otherwise return the status IgnoredRequest.
205  *  - Executor::getCurrentExecMode
206  *  - Executor::getExecutorState
207  *  - Executor::setExecMode             : change the execution mode for next loop
208  *  - Executor::setListOfBreakPoints    : must be set before setting YACS::STOPBEFORENODES
209  *  - Executor::getTasksToLoad          : when paused or waiting tasks, get the list of next tasks
210  *  - Executor::setStepsToExecute       : define a subset of the list given by Executor::getTasksToLoad
211  *  - Executor::resumeCurrentBreakPoint : when paused or waiting tasks, resumes execution
212  *  - Executor::isNotFinished
213  *  - Executor::stopExecution           : stop execution asap, i.e. set STEPBYSTEP and wait PAUSED
214  *  - Executor::saveState               : dump the current state of execution in an xml file
215  *  - Executor::loadState               : Not yet implemented
216  *  - Executor::getNbOfThreads
217  *  - Executor::displayDot
218  *  - Executor::setStopOnError          : ask to stop execution if a node is found in ERROR state
219  *
220  *  If the pilot wants to wait the state YACS::PAUSED or YACS::WAITINGTASKS, synchronisation is obtained with:
221  *  - Executor::waitPause
222  *
223  *  TO BE VALIDATED:
224  *  - Pilot may connect to executor during execution, or deconnect.
225  *  - Several Pilots may be connected at the same time (for observation...)
226  * 
227  */
228
229 void Executor::RunB(Scheduler *graph,int debug, bool fromScratch)
230 {
231   DEBTRACE("Executor::RunB debug: "<< graph->getName() <<" "<< debug<<" fromScratch: "<<fromScratch);
232
233   { // --- Critical section
234     _mutexForSchedulerUpdate.lock();
235     _mainSched = graph;
236     _root = dynamic_cast<ComposedNode *>(_mainSched);
237     if (!_root) throw Exception("Executor::Run, Internal Error!");
238     _executorState = YACS::NOTYETINITIALIZED;
239     sendEvent("executor");
240     _toContinue=true;
241     _isOKToEnd = false;
242     _errorDetected = false;
243     _isWaitingEventsFromRunningTasks = false;
244     _numberOfRunningTasks = 0;
245     _runningTasks.clear();
246     _numberOfEndedTasks = 0;
247     string tracefile = "traceExec_";
248     tracefile += _mainSched->getName();
249     _trace.open(tracefile.c_str());
250 #ifdef WIN32
251    _start = timeGetTime();
252 #else
253     gettimeofday(&_start, NULL);
254 #endif
255
256     _mutexForSchedulerUpdate.unlock();
257   } // --- End of critical section
258
259   if (debug > 1) _displayDot(graph);
260
261   if (fromScratch)
262     {
263       try
264         {
265           graph->init();
266           graph->exUpdateState();
267         }
268       catch(Exception& ex)
269         {
270           DEBTRACE("exception: "<< (ex.what()));
271           _executorState = YACS::FINISHED;
272           sendEvent("executor");
273           throw;
274         }
275     }
276   _executorState = YACS::INITIALISED;
277   sendEvent("executor");
278
279   if (debug > 1) _displayDot(graph);
280
281   vector<Task *>::iterator iter;
282   bool isMore;
283   int problemCount=0;
284   int numberAllTasks;
285
286   _executorState = YACS::RUNNING;
287   sendEvent("executor");
288   while (_toContinue)
289     {
290       DEBTRACE("--- executor main loop");
291       sleepWhileNoEventsFromAnyRunningTask();
292       DEBTRACE("--- events...");
293       if (debug > 2) _displayDot(graph);
294       { // --- Critical section
295         _mutexForSchedulerUpdate.lock();
296         _tasks=graph->getNextTasks(isMore);
297         numberAllTasks=_numberOfRunningTasks+_tasks.size();
298         graph->selectRunnableTasks(_tasks);
299         _mutexForSchedulerUpdate.unlock();
300       } // --- End of critical section
301       if (debug > 2) _displayDot(graph);
302       if (_executorState == YACS::RUNNING)
303         {
304           if (checkBreakPoints()) break; // end of thread requested, OK to exit at once;
305           if (debug > 0) _displayDot(graph);
306           DEBTRACE("---");
307           for (iter = _tasks.begin(); iter != _tasks.end(); iter++)
308             loadTask(*iter);
309           if (debug > 1) _displayDot(graph);
310           DEBTRACE("---");
311           launchTasks(_tasks);
312           DEBTRACE("---");
313         }
314       if (debug > 1) _displayDot(graph);
315       { // --- Critical section
316         DEBTRACE("---");
317         _mutexForSchedulerUpdate.lock();
318         //It is possible that the graph is finished but it remains running tasks (it's an error but we must take it into account)
319         if(_numberOfRunningTasks == 0)
320           _toContinue = !graph->isFinished();
321
322         DEBTRACE("_numberOfRunningTasks: " << _numberOfRunningTasks);
323         DEBTRACE("_numberOfEndedTasks: " << _numberOfEndedTasks);
324         DEBTRACE("_toContinue: " << _toContinue);
325         if(_toContinue && numberAllTasks==0)
326           {
327             //Problem : no running tasks and no task to launch ??
328             problemCount++;
329             std::cerr << "Problem in Executor : no running tasks and no task to launch ?? problemCount=" << problemCount << std::endl;
330             //Pause to give a chance to interrupt
331             usleep(1000);
332             if(problemCount > 25)
333               {
334                 // Too much problems encountered : stop execution
335                 _toContinue=false;
336               }
337           }
338
339         if (! _toContinue)
340           {
341             _executorState = YACS::FINISHED;
342             sendEvent("executor");
343             _condForPilot.notify_all();
344           }
345         _mutexForSchedulerUpdate.unlock();
346       } // --- End of critical section
347       if (debug > 0) _displayDot(graph);
348       DEBTRACE("_toContinue: " << _toContinue);
349     }
350
351   DEBTRACE("End of main Loop");
352
353   { // --- Critical section
354     _mutexForSchedulerUpdate.lock();
355     if ( _toContinue) // --- break while(): request to stop detected on checkBreakPoints()
356       {
357         DEBTRACE("stop requested: End soon");
358         _executorState = YACS::STOPPED;
359         _toContinue = false;
360         sendEvent("executor");
361       }
362     _mutexForSchedulerUpdate.unlock();
363   } // --- End of critical section
364   if ( _dumpOnErrorRequested && _errorDetected)
365     {
366       saveState(_dumpErrorFile);
367     }
368   _trace.close();
369   DEBTRACE("End of RunB thread");  
370 }
371
372 YACS::ExecutionMode Executor::getCurrentExecMode()
373 {
374   _isRunningunderExternalControl=true;
375   return _execMode;
376 }
377
378
379 YACS::ExecutorState Executor::getExecutorState()
380 {
381   _isRunningunderExternalControl=true;
382   return _executorState;
383 }
384
385
386 bool Executor::isNotFinished()
387 {
388   _isRunningunderExternalControl=true;
389   return _toContinue;
390 }
391
392 //! ask to stop execution on the first node found in error
393 /*!
394  * \param dumpRequested   produce a state dump when an error is found
395  * \param xmlFile         name of file used for state dump
396  */
397
398 void Executor::setStopOnError(bool dumpRequested, std::string xmlFile)
399 {
400   { // --- Critical section
401     _mutexForSchedulerUpdate.lock();
402     _dumpErrorFile=xmlFile;
403     _stopOnErrorRequested=true;
404     _dumpOnErrorRequested = dumpRequested;
405     if (dumpRequested && xmlFile.empty())
406       throw YACS::Exception("dump on error requested and no filename given for dump");
407     _mutexForSchedulerUpdate.unlock();
408     DEBTRACE("_dumpErrorFile " << _dumpErrorFile << " " << _dumpOnErrorRequested);
409   } // --- End of critical section
410 }
411
412 //! ask to do not stop execution on nodes found in error
413 /*!
414  */
415
416 void Executor::unsetStopOnError()
417 {
418   { // --- Critical section
419     _mutexForSchedulerUpdate.lock();
420     _stopOnErrorRequested=false;
421     _mutexForSchedulerUpdate.unlock();
422   } // --- End of critical section
423 }
424
425 //! Dynamically set the current mode of execution
426 /*!
427  * The mode can be Continue, step by step, or stop before execution of a node
428  * defined in a list of breakpoints.
429  */
430
431 void Executor::setExecMode(YACS::ExecutionMode mode)
432 {
433   DEBTRACE("Executor::setExecMode(YACS::ExecutionMode mode) " << mode);
434   { // --- Critical section
435     _mutexForSchedulerUpdate.lock();
436     _isRunningunderExternalControl=true;
437     _execMode = mode;
438     _mutexForSchedulerUpdate.unlock();
439   } // --- End of critical section
440 }
441
442 //! wake up executor when in pause
443 /*!
444  * When Executor is in state paused or waiting for task completion, the thread
445  * running loop RunB waits on condition _condForStepByStep.
446  * Thread RunB is waken up.
447  * \return true when actually wakes up executor
448  */
449
450 bool Executor::resumeCurrentBreakPoint()
451 {
452   DEBTRACE("Executor::resumeCurrentBreakPoint()");
453   bool ret = false;
454   //bool doDump = false;
455   { // --- Critical section
456     _mutexForSchedulerUpdate.lock();
457     _isRunningunderExternalControl=true;
458     DEBTRACE("_executorState: " << _executorState);
459     switch (_executorState)
460       {
461       case YACS::WAITINGTASKS:
462       case YACS::PAUSED:
463         {
464           _condForStepByStep.notify_all();
465           _executorState = YACS::RUNNING;
466           sendEvent("executor");
467           ret = true;
468           //if (_dumpOnErrorRequested && _errorDetected) doDump =true;
469           break;
470         }
471       case YACS::FINISHED:
472       case YACS::STOPPED:
473         {
474           //if (_dumpOnErrorRequested && _errorDetected) doDump =true;
475           DEBTRACE("Graph Execution finished or stopped !");
476           break;
477         }
478       default :
479         {
480           // debug: no easy way to verify if main loop is acutally waiting on condition
481         }
482       }
483     _mutexForSchedulerUpdate.unlock();
484     DEBTRACE("---");
485     //if (doDump) saveState(_dumpErrorFile);
486   } // --- End of critical section
487   return ret;
488 }
489
490
491 //! define a list of nodes names as breakpoints in the graph
492
493
494 void Executor::setListOfBreakPoints(std::list<std::string> listOfBreakPoints)
495 {
496   DEBTRACE("Executor::setListOfBreakPoints(std::list<std::string> listOfBreakPoints)");
497   { // --- Critical section
498     _mutexForSchedulerUpdate.lock();
499     _isRunningunderExternalControl=true;
500     _listOfBreakPoints = listOfBreakPoints;
501     _mutexForSchedulerUpdate.unlock();
502   } // --- End of critical section
503 }
504
505
506 //! Get the list of tasks to load, to define a subset to execute in step by step mode
507 /*!
508  *  If the executor is not in mode YACS::WAITINGTASKS nor YACS::PAUSED, the list is empty.
509  *  Use Executor::waitPause to wait.
510  */
511 std::list<std::string> Executor::getTasksToLoad()
512 {
513   DEBTRACE("Executor::getTasksToLoad()");
514   list<string> listOfNodesToLoad;
515   listOfNodesToLoad.clear();
516   { // --- Critical section
517     _mutexForSchedulerUpdate.lock();
518     _isRunningunderExternalControl=true;
519     switch (_executorState)
520       {
521       case YACS::WAITINGTASKS:
522       case YACS::PAUSED:
523         {
524           listOfNodesToLoad = _listOfTasksToLoad;
525           break;
526         }
527       case YACS::NOTYETINITIALIZED:
528       case YACS::INITIALISED:
529       case YACS::RUNNING:
530       case YACS::FINISHED:
531       case YACS::STOPPED:
532       default:
533         {
534           break;
535         }
536       }
537     _mutexForSchedulerUpdate.unlock();
538   } // --- End of critical section
539   return listOfNodesToLoad;
540 }
541
542
543 //! Define a subset of task to execute in step by step mode
544 /*!
545  * Behaviour is unpredictable if the list is not a subset of the list given by Executor::getTasksToLoad
546  * in the current step.
547  * If some nodes must run in parallel, they must stay together in the list.
548  */
549
550 bool Executor::setStepsToExecute(std::list<std::string> listToExecute)
551 {
552   DEBTRACE("Executor::setStepsToExecute(std::list<std::string> listToExecute)");
553   bool ret = true;
554   vector<Task *>::iterator iter;
555   vector<Task *> restrictedTasks;
556   { // --- Critical section
557     _mutexForSchedulerUpdate.lock();
558     _isRunningunderExternalControl=true;
559     switch (_executorState)
560       {
561       case YACS::WAITINGTASKS:
562       case YACS::PAUSED:
563         {
564           for (iter=_tasksSave.begin(); iter!=_tasksSave.end(); iter++)
565             {
566               string readyNode = _mainSched->getTaskName(*iter);
567               if (find(listToExecute.begin(), listToExecute.end(), readyNode)
568                   != listToExecute.end())
569                 {
570                   restrictedTasks.push_back(*iter);
571                   DEBTRACE("node to execute " << readyNode);
572                 }
573             }
574           _tasks.clear();
575           for (iter=restrictedTasks.begin(); iter!=restrictedTasks.end(); iter++)
576             {
577               _tasks.push_back(*iter);
578             }
579           break;
580         }
581       case YACS::NOTYETINITIALIZED:
582       case YACS::INITIALISED:
583       case YACS::RUNNING:
584       case YACS::FINISHED:
585       case YACS::STOPPED:
586       default:
587         {
588           break;
589         }
590       }
591     _mutexForSchedulerUpdate.unlock();
592     } // --- End of critical section
593
594   _tasks.clear();
595   for (iter=restrictedTasks.begin(); iter!=restrictedTasks.end(); iter++)
596     {
597       _tasks.push_back(*iter);
598     }
599   for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
600     {
601       string readyNode = _mainSched->getTaskName(*iter);
602       DEBTRACE("selected node to execute " << readyNode);
603     }
604
605   return ret;
606 }
607
608 //! suspend pilot execution until Executor is in pause or waiting tasks completion mode.
609 /*!
610  *  Do nothing if execution is finished or in pause.
611  *  Wait first step if Executor is running or in initialization.
612  */
613
614 void Executor::waitPause()
615 {
616   DEBTRACE("Executor::waitPause()" << _executorState);
617   { // --- Critical section
618     _mutexForSchedulerUpdate.lock();
619     _isRunningunderExternalControl=true;
620     switch (_executorState)
621       {
622       default:
623       case YACS::STOPPED:
624       case YACS::FINISHED:
625       case YACS::WAITINGTASKS:
626       case YACS::PAUSED:
627         {
628           break;
629         }
630       case YACS::NOTYETINITIALIZED:
631       case YACS::INITIALISED:
632       case YACS::RUNNING:
633         {
634           _condForPilot.wait(_mutexForSchedulerUpdate); // wait until executor is PAUSED or WAITINGTASKS
635           break;
636         }
637       }
638     _mutexForSchedulerUpdate.unlock();
639   } // --- End of critical section
640   DEBTRACE("---");
641 }
642
643 //! stops the execution as soon as possible 
644
645 void Executor::stopExecution()
646 {
647   setExecMode(YACS::STEPBYSTEP);
648   //waitPause();
649   _isOKToEnd = true;
650   resumeCurrentBreakPoint();
651 }
652
653 //! save the current state of execution in an xml file
654
655 bool Executor::saveState(const std::string& xmlFile)
656 {
657   DEBTRACE("Executor::saveState() in " << xmlFile);
658   YACS::ENGINE::VisitorSaveState vst(_root);
659   vst.openFileDump(xmlFile.c_str());
660   _root->accept(&vst);
661   vst.closeFileDump();
662   return true;
663 }
664
665 //! not yet implemented
666
667 bool Executor::loadState()
668 {
669   DEBTRACE("Executor::loadState()");
670   _isRunningunderExternalControl=true;
671   return true;
672 }
673
674
675 static int isfile(const char *filename) 
676 {
677   struct stat buf;
678   if (stat(filename, &buf) != 0)
679     return 0;
680   if (!S_ISREG(buf.st_mode))
681     return 0;
682   return 1;
683 }
684
685 //! Display the graph state as a dot display, public method
686
687 void Executor::displayDot(Scheduler *graph)
688 {
689   _isRunningunderExternalControl=true;
690   _displayDot(graph);
691 }
692
693 //! Display the graph state as a dot display
694 /*!
695  *  \param graph  : the node to display
696  */
697
698 void Executor::_displayDot(Scheduler *graph)
699 {
700    std::ofstream g("titi");
701    ((ComposedNode*)graph)->writeDot(g);
702    g.close();
703    const char displayScript[]="display.sh";
704    if(isfile(displayScript))
705      system("sh display.sh");
706    else
707      system("dot -Tpng titi|display -delay 5");
708 }
709
710 //! Wait reactivation in modes Step By step or with BreakPoints
711 /*!
712  *  Check mode of execution (set by main thread):
713  *  - YACS::CONTINUE        : the graph execution continues.
714  *  - YACS::STEPBYSTEP      : wait on condition (pilot thread, Executor::resumeCurrentBreakPoint)
715  *  - YACS::STOPBEFORENODES : if there are ready nodes in a list of breakpoints,
716  *                            wait on condition (pilot thread, Executor::resumeCurrentBreakPoint)
717  *                            else continue the graph execution.
718  *  \return true if end of executor thread is requested
719  */
720
721 bool Executor::checkBreakPoints()
722 {
723   DEBTRACE("Executor::checkBreakPoints()");
724   vector<Task *>::iterator iter;
725   bool endRequested = false;
726
727   switch (_execMode)
728     {
729     case YACS::CONTINUE:
730       {
731         break;
732       }
733     case YACS::STOPBEFORENODES:
734       {
735         bool stop = false;
736         { // --- Critical section
737           _mutexForSchedulerUpdate.lock();
738           _tasksSave = _tasks;
739           for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
740             {
741               string nodeToLoad = _mainSched->getTaskName(*iter);
742               if (find(_listOfBreakPoints.begin(), _listOfBreakPoints.end(), nodeToLoad)
743                   != _listOfBreakPoints.end())
744                 {
745                   stop = true;
746                   break;
747                 }
748             }
749           if (stop)
750             {
751               _listOfTasksToLoad.clear();
752               for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
753                 {
754                   string nodeToLoad = _mainSched->getTaskName(*iter);
755                   _listOfTasksToLoad.push_back(nodeToLoad);
756                 }
757               if (getNbOfThreads())
758                 _executorState = YACS::WAITINGTASKS; // will be paused after completion of running tasks
759               else
760                 _executorState = YACS::PAUSED;
761               sendEvent("executor");
762               _condForPilot.notify_all();
763             }
764           //_mutexForSchedulerUpdate.unlock(); 
765           //} // --- End of critical section
766           if (stop && !_isOKToEnd) waitResume(); // wait until pilot calls resumeCurrentBreakPoint(), mutex released during wait 
767           if (_isOKToEnd) endRequested = true;
768           _mutexForSchedulerUpdate.unlock();
769         } // --- End of critical section
770           if (stop) DEBTRACE("wake up from waitResume");
771         break;
772       }
773     default:
774     case YACS::STEPBYSTEP:
775       {
776         { // --- Critical section
777           _mutexForSchedulerUpdate.lock();
778           _tasksSave = _tasks;
779           _listOfTasksToLoad.clear();
780           for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
781             {
782               string nodeToLoad = _mainSched->getTaskName(*iter);
783               _listOfTasksToLoad.push_back(nodeToLoad);
784             }
785           if (getNbOfThreads())
786             _executorState = YACS::WAITINGTASKS; // will be paused after completion of running tasks
787           else
788             _executorState = YACS::PAUSED;
789           sendEvent("executor");
790           _condForPilot.notify_all();
791           if (!_isOKToEnd)
792             waitResume(); // wait until pilot calls resumeCurrentBreakPoint(), mutex released during wait
793                           // or, if no pilot, wait until no more running tasks (stop on error)
794           if (_isOKToEnd) endRequested = true;
795           _mutexForSchedulerUpdate.unlock();
796         } // --- End of critical section
797         DEBTRACE("wake up from waitResume");
798         break;
799       }
800     }
801   DEBTRACE("endRequested: " << endRequested);
802   return endRequested;
803 }
804
805
806 //! in modes Step By step or with BreakPoint, wait until pilot resumes the execution
807 /*!
808  *  With the condition Mutex, the mutex is released atomically during the wait.
809  *  Pilot calls Executor::resumeCurrentBreakPoint to resume execution.
810  *  Must be called while mutex is locked.
811  */
812
813 void Executor::waitResume()
814 {
815   DEBTRACE("Executor::waitResume()");
816   _condForStepByStep.wait(_mutexForSchedulerUpdate); // wait until pilot calls resumeCurrentBreakPoint()
817   DEBTRACE("---");
818 }
819
820
821 //! Perform loading of a Task.
822 /*!
823  *  \param task  : Task to load
824  */
825
826 void Executor::loadTask(Task *task)
827 {
828   DEBTRACE("Executor::loadTask(Task *task)");
829   if(task->getState() != YACS::TOLOAD)return;
830   traceExec(task, "state:TOLOAD");
831   {//Critical section
832     _mutexForSchedulerUpdate.lock();
833     _mainSched->notifyFrom(task,YACS::START);
834     _mutexForSchedulerUpdate.unlock();
835   }//End of critical section
836   try
837     {
838       traceExec(task, "load");
839       task->load();
840       traceExec(task, "initService");
841       task->initService();
842     }
843   catch(Exception& ex) 
844     {
845       std::cerr << ex.what() << std::endl;
846       {//Critical section
847         _mutexForSchedulerUpdate.lock();
848         task->aborted();
849         _mainSched->notifyFrom(task,YACS::ABORT);
850         traceExec(task, "state:"+Node::getStateName(task->getState()));
851         _mutexForSchedulerUpdate.unlock();
852       }//End of critical section
853     }
854   catch(...) 
855     {
856       std::cerr << "Load failed" << std::endl;
857       {//Critical section
858         _mutexForSchedulerUpdate.lock();
859         task->aborted();
860         _mainSched->notifyFrom(task,YACS::ABORT);
861         traceExec(task, "state:"+Node::getStateName(task->getState()));
862         _mutexForSchedulerUpdate.unlock();
863       }//End of critical section
864     }
865 }
866
867
868 //! Execute a list of tasks possibly connected through datastream links
869 /*!
870  *  \param tasks  : a list of tasks to execute
871  *
872  */
873 void Executor::launchTasks(std::vector<Task *>& tasks)
874 {
875   vector<Task *>::iterator iter;
876   //First phase, make datastream connections
877   for(iter=tasks.begin();iter!=tasks.end();iter++)
878     {
879       YACS::StatesForNode state=(*iter)->getState();
880       if(state != YACS::TOLOAD && state != YACS::TORECONNECT)continue;
881       try
882         {
883           (*iter)->connectService();
884           traceExec(*iter, "connectService");
885           {//Critical section
886             _mutexForSchedulerUpdate.lock();
887             (*iter)->connected();
888             _mutexForSchedulerUpdate.unlock();
889           }//End of critical section
890         }
891       catch(Exception& ex) 
892         {
893           std::cerr << ex.what() << std::endl;
894           try
895             {
896               (*iter)->disconnectService();
897               traceExec(*iter, "disconnectService");
898             }
899           catch(...) 
900             {
901               // Disconnect has failed
902               traceExec(*iter, "disconnectService failed, ABORT");
903             }
904           {//Critical section
905             _mutexForSchedulerUpdate.lock();
906             (*iter)->aborted();
907             _mainSched->notifyFrom(*iter,YACS::ABORT);
908             _mutexForSchedulerUpdate.unlock();
909           }//End of critical section
910         }
911       catch(...) 
912         {
913           std::cerr << "Problem in connectService" << std::endl;
914           try
915             {
916               (*iter)->disconnectService();
917               traceExec(*iter, "disconnectService");
918             }
919           catch(...) 
920             {
921               // Disconnect has failed
922               traceExec(*iter, "disconnectService failed, ABORT");
923             }
924           {//Critical section
925             _mutexForSchedulerUpdate.lock();
926             (*iter)->aborted();
927             _mainSched->notifyFrom(*iter,YACS::ABORT);
928             _mutexForSchedulerUpdate.unlock();
929           }//End of critical section
930         }
931       if((*iter)->getState() == YACS::ERROR)
932         {
933           //try to put all coupled tasks in error
934           std::set<Task*> coupledSet;
935           (*iter)->getCoupledTasks(coupledSet);
936           for (std::set<Task*>::iterator it = coupledSet.begin(); it != coupledSet.end(); ++it)
937             {
938               Task* t=*it;
939               if(t == *iter)continue;
940               if(t->getState() == YACS::ERROR)continue;
941               try
942                 {
943                   t->disconnectService();
944                   traceExec(t, "disconnectService");
945                 }
946               catch(...)
947                 {
948                   // Disconnect has failed
949                   traceExec(t, "disconnectService failed, ABORT");
950                 }
951               {//Critical section
952                 _mutexForSchedulerUpdate.lock();
953                 t->aborted();
954                 _mainSched->notifyFrom(t,YACS::ABORT);
955                 _mutexForSchedulerUpdate.unlock();
956               }//End of critical section
957               traceExec(t, "state:"+Node::getStateName(t->getState()));
958             }
959         }
960       traceExec(*iter, "state:"+Node::getStateName((*iter)->getState()));
961     }
962
963   //Second phase, execute each task in a thread
964   for(iter=tasks.begin();iter!=tasks.end();iter++)
965     {
966       launchTask(*iter);
967     }
968 }
969
970 struct threadargs {
971   Task *task;
972   Scheduler *sched;
973   Executor *execInst;
974 };
975
976 //! Execute a Task in a thread
977 /*!
978  *  \param task  : Task to execute
979  *
980  *  Calls Scheduler::notifyFrom of main node (_mainSched) to notify start
981  *
982  *  Calls Executor::functionForTaskExecution in Thread
983  */
984
985 void Executor::launchTask(Task *task)
986 {
987   DEBTRACE("Executor::launchTask(Task *task)");
988   struct threadargs *args;
989   if(task->getState() != YACS::TOACTIVATE)return;
990
991   DEBTRACE("before _semForMaxThreads.wait " << _semThreadCnt);
992   if(_semThreadCnt == 0)
993     {
994       //check if we have enough threads to run
995       std::set<Task*> tmpSet=_runningTasks;
996       std::set<Task*>::iterator it = tmpSet.begin();
997       std::string status="running";
998       std::set<Task*> coupledSet;
999       while( it != tmpSet.end() )
1000         {
1001           Task* tt=*it;
1002           coupledSet.clear();
1003           tt->getCoupledTasks(coupledSet);
1004           status="running";
1005           for (std::set<Task*>::iterator iter = coupledSet.begin(); iter != coupledSet.end(); ++iter)
1006             {
1007               if((*iter)->getState() == YACS::TOACTIVATE)status="toactivate";
1008               tmpSet.erase(*iter);
1009             }
1010           if(status=="running")break;
1011           it = tmpSet.begin();
1012         }
1013
1014       if(status=="toactivate")
1015         {
1016           std::cerr << "WARNING: maybe you need more threads to run your schema (current value="<< _maxThreads << ")" << std::endl;
1017           std::cerr << "If it is the case, set the YACS_MAX_THREADS environment variable to a bigger value (export YACS_MAX_THREADS=xxx)" << std::endl;
1018         }
1019     }
1020
1021   _semForMaxThreads.wait();
1022   _semThreadCnt -= 1;
1023
1024   args= new threadargs;
1025   args->task = task;
1026   args->sched = _mainSched;
1027   args->execInst = this;
1028
1029   traceExec(task, "launch");
1030
1031   { // --- Critical section
1032     _mutexForSchedulerUpdate.lock();
1033     _numberOfRunningTasks++;
1034     _runningTasks.insert(task);
1035     task->begin(); //change state to ACTIVATED
1036     _mutexForSchedulerUpdate.unlock();
1037   } // --- End of critical section
1038   Thread(functionForTaskExecution,args);
1039 }
1040
1041 //! wait until a running task ends
1042
1043 void Executor::sleepWhileNoEventsFromAnyRunningTask()
1044 {
1045   DEBTRACE("Executor::sleepWhileNoEventsFromAnyRunningTask()");
1046 //   _semForNewTasksToPerform.wait(); //----utiliser pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
1047   _mutexForSchedulerUpdate.lock();
1048   if (_numberOfRunningTasks > 0 && _numberOfEndedTasks==0)
1049     {
1050       _isWaitingEventsFromRunningTasks = true;
1051       _condForNewTasksToPerform.wait(_mutexForSchedulerUpdate); // mutex released during wait
1052     }
1053   _numberOfEndedTasks=0;
1054   _mutexForSchedulerUpdate.unlock();
1055   DEBTRACE("---");
1056 }
1057
1058 //! not implemented
1059
1060 void Executor::notifyEndOfThread(YACS::BASES::Thread *thread)
1061 {
1062   /*_mutexForNbOfConcurrentThreads.lock();
1063   _groupOfAllThreadsCreated.remove(thread);
1064   delete thread;
1065   _mutexForNbOfConcurrentThreads.unlock();*/
1066 }
1067
1068
1069 //! must be used protected by _mutexForSchedulerUpdate!
1070
1071 void Executor::wakeUp()
1072 {
1073   DEBTRACE("Executor::wakeUp() " << _isWaitingEventsFromRunningTasks);
1074   if (_isWaitingEventsFromRunningTasks)
1075     {
1076       _isWaitingEventsFromRunningTasks = false;
1077       _condForNewTasksToPerform.notify_all();
1078     }
1079   else
1080     _numberOfEndedTasks++;
1081 }
1082
1083 //! number of running tasks
1084
1085 int Executor::getNbOfThreads()
1086 {
1087   int ret;
1088   _mutexForNbOfConcurrentThreads.lock();
1089   _isRunningunderExternalControl=true;
1090   ret = _groupOfAllThreadsCreated.size();
1091   _mutexForNbOfConcurrentThreads.unlock();
1092   return ret;
1093 }
1094
1095
1096 //! Function to perform execution of a task in a thread
1097 /*!
1098  *  \param arg  : 3 elements (a Task, a Scheduler, an Executor)
1099  *
1100  *  Calls Task::execute
1101  *
1102  *  Calls Task::finished when the task is finished
1103  *
1104  *  Calls (notify with event YACS::FINISH) Scheduler::notifyFrom when the task is finished
1105  *
1106  *  Calls Executor::wakeUp and Executor::notifyEndOfThread
1107  */
1108
1109 void *Executor::functionForTaskExecution(void *arg)
1110 {
1111   DEBTRACE("Executor::functionForTaskExecution(void *arg)");
1112
1113   struct threadargs *args = (struct threadargs *) arg;
1114   Task *task=args->task;
1115   Scheduler *sched=args->sched;
1116   Executor *execInst=args->execInst;
1117   delete args;
1118   execInst->traceExec(task, "state:"+Node::getStateName(task->getState()));
1119
1120   Thread::detach();
1121
1122   // Execute task
1123
1124   YACS::Event ev=YACS::FINISH;
1125   try
1126     {
1127       execInst->traceExec(task, "start execution");
1128       task->execute();
1129       execInst->traceExec(task, "end execution OK");
1130     }
1131   catch(Exception& ex)
1132     {
1133       std::cerr << "YACS Exception during execute" << std::endl;
1134       std::cerr << ex.what() << std::endl;
1135       ev=YACS::ABORT;
1136       string message = "end execution ABORT, ";
1137       message += ex.what();
1138       execInst->traceExec(task, message);
1139     }
1140   catch(...) 
1141     {
1142       // Execution has failed
1143       std::cerr << "Execution has failed: unknown reason" << std::endl;
1144       ev=YACS::ABORT;
1145       execInst->traceExec(task, "end execution ABORT, unknown reason");
1146     }
1147
1148   // Disconnect task
1149   try
1150     {
1151       DEBTRACE("task->disconnectService()");
1152       task->disconnectService();
1153       execInst->traceExec(task, "disconnectService");
1154     }
1155   catch(...) 
1156     {
1157       // Disconnect has failed
1158       std::cerr << "disconnect has failed" << std::endl;
1159       ev=YACS::ABORT;
1160       execInst->traceExec(task, "disconnectService failed, ABORT");
1161     }
1162
1163   DEBTRACE("End task->execute()");
1164   { // --- Critical section
1165     execInst->_mutexForSchedulerUpdate.lock();
1166     try
1167       {
1168         if (ev == YACS::FINISH) task->finished();
1169         if (ev == YACS::ABORT)
1170           {
1171             execInst->_errorDetected = true;
1172             if (execInst->_stopOnErrorRequested)
1173               {
1174                 execInst->_execMode = YACS::STEPBYSTEP;
1175                 execInst->_isOKToEnd = true;
1176               }
1177             task->aborted();
1178           }
1179         execInst->traceExec(task, "state:"+Node::getStateName(task->getState()));
1180         sched->notifyFrom(task,ev);
1181       }
1182     catch(Exception& ex)
1183       {
1184         //notify has failed : it is supposed to have set state
1185         //so no need to do anything
1186         std::cerr << "Error during notification" << std::endl;
1187         std::cerr << ex.what() << std::endl;
1188       }
1189     catch(...)
1190       {
1191         //notify has failed : it is supposed to have set state
1192         //so no need to do anything
1193         std::cerr << "Notification failed" << std::endl;
1194       }
1195     execInst->_numberOfRunningTasks--;
1196     execInst->_runningTasks.erase(task);
1197     DEBTRACE("_numberOfRunningTasks: " << execInst->_numberOfRunningTasks 
1198              << " _execMode: " << execInst->_execMode
1199              << " _executorState: " << execInst->_executorState);
1200     if ((execInst->_numberOfRunningTasks == 0) && (execInst->_execMode != YACS::CONTINUE)) // no more running tasks
1201       {
1202         if (execInst->_executorState == YACS::WAITINGTASKS)
1203           {
1204             execInst->_executorState = YACS::PAUSED;
1205             execInst->sendEvent("executor");
1206             execInst->_condForPilot.notify_all();
1207             if (execInst->_errorDetected &&
1208                 execInst->_stopOnErrorRequested &&
1209                 !execInst->_isRunningunderExternalControl)
1210               execInst->_condForStepByStep.notify_all(); // exec thread may be on waitResume
1211           }
1212       }
1213     DEBTRACE("before _semForMaxThreads.post " << execInst->_semThreadCnt);
1214     execInst->_semForMaxThreads.post();
1215     execInst->_semThreadCnt += 1;
1216     DEBTRACE("after _semForMaxThreads.post " << execInst->_semThreadCnt);
1217     if (execInst->_executorState != YACS::PAUSED)  execInst->wakeUp();
1218
1219     execInst->_mutexForSchedulerUpdate.unlock();
1220   } // --- End of critical section (change state)
1221
1222   //execInst->notifyEndOfThread(0);
1223   Thread::exit(0);
1224   return 0;
1225 }
1226
1227 void Executor::traceExec(Task *task, const std::string& message)
1228 {
1229   string nodeName = _mainSched->getTaskName(task);
1230   Container *cont = task->getContainer();
1231   string containerName = "---";
1232   string placement = "---";
1233   if (cont)
1234     {
1235       containerName = cont->getName();
1236       ComponentInstance *compo = task->getComponent();
1237       //if (compo)
1238       placement = cont->getFullPlacementId(compo);
1239     }
1240 #ifdef WIN32
1241   DWORD now = timeGetTime();
1242   double elapse = (now - _start)/1000.0;
1243 #else
1244   timeval now;
1245   gettimeofday(&now, NULL);
1246   double elapse = (now.tv_sec - _start.tv_sec) + double(now.tv_usec - _start.tv_usec)/1000000.0;
1247 #endif
1248   _mutexForTrace.lock();
1249   _trace << elapse << " " << containerName << " " << placement << " " << nodeName << " " << message << endl;
1250   _trace << flush;
1251   _mutexForTrace.unlock();
1252 }
1253
1254 //! emit notification to all observers registered with  the dispatcher 
1255 /*!
1256  * The dispatcher is unique and can be obtained by getDispatcher()
1257  */
1258 void Executor::sendEvent(const std::string& event)
1259 {
1260   Dispatcher* disp=Dispatcher::getDispatcher();
1261   YASSERT(disp);
1262   YASSERT(_root);
1263   disp->dispatch(_root,event);
1264 }