Salome HOME
Addition of autolocker class to avoid deadlocks on throw.
[modules/yacs.git] / src / engine / Executor.cxx
1 // Copyright (C) 2006-2014  CEA/DEN, EDF R&D
2 //
3 // This library is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU Lesser General Public
5 // License as published by the Free Software Foundation; either
6 // version 2.1 of the License, or (at your option) any later version.
7 //
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
11 // Lesser General Public License for more details.
12 //
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
16 //
17 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
18 //
19
20 #include "Executor.hxx"
21 #include "Task.hxx"
22 #include "AutoLocker.hxx"
23 #include "Scheduler.hxx"
24 #include "Dispatcher.hxx"
25 #include "Container.hxx"
26 #include "ComponentInstance.hxx"
27
28 #include "VisitorSaveState.hxx"
29 #include "ServiceNode.hxx"
30 #include "ComposedNode.hxx"
31
32 #include <iostream>
33 #include <fstream>
34 #include <sys/stat.h>
35 #ifndef WIN32
36 #include <sys/time.h>
37 #include <unistd.h>
38 #endif
39
40 #include <cstdlib>
41 #include <algorithm>
42
43 #ifdef WIN32
44 #define usleep(A) _sleep(A/1000)
45 #if !defined(S_ISCHR) || !defined(S_ISREG)
46 #  ifndef S_IFMT
47 #    ifdef _S_IFMT
48 #      define S_IFMT _S_IFMT
49 #      define S_IFCHR _S_IFCHR
50 #      define S_IFREG _S_IFREG
51 #    else
52 #    ifdef __S_IFMT
53 #      define S_IFMT __S_IFMT
54 #      define S_IFCHR __S_IFCHR
55 #      define S_IFREG __S_IFREG
56 #    endif
57 #    endif
58 #  endif
59 #  define S_ISCHR(mode) (((mode) & S_IFMT) == S_IFCHR)
60 #  define S_ISREG(mode) (((mode) & S_IFMT) == S_IFREG)
61 #endif
62 #endif
63
64 using namespace YACS::ENGINE;
65 using namespace std;
66
67 using YACS::BASES::Mutex;
68 using YACS::BASES::Thread;
69 using YACS::BASES::Semaphore;
70
71 //#define _DEVDEBUG_
72 #include "YacsTrace.hxx"
73
74 int Executor::_maxThreads(50);
75 size_t Executor::_threadStackSize(1048576); // Default thread stack size is 1MB
76
77 Executor::Executor():_nbOfConcurrentThreads(0), _semForMaxThreads(_maxThreads)
78 {
79   _root=0;
80   _toContinue = true;
81   _isOKToEnd = false;
82   _stopOnErrorRequested = false;
83   _dumpOnErrorRequested = false;
84   _errorDetected = false;
85   _isRunningunderExternalControl=false;
86   _executorState = YACS::NOTYETINITIALIZED;
87   _execMode = YACS::CONTINUE;
88   _semThreadCnt = _maxThreads;
89   DEBTRACE("Executor initialized with max threads = " << _maxThreads);
90 }
91
92 Executor::~Executor()
93 {
94   for(list<Thread *>::iterator iter=_groupOfAllThreadsCreated.begin();iter!=_groupOfAllThreadsCreated.end();iter++)
95     delete *iter;
96 }
97
98 //! Execute a graph waiting for completion
99 /*!
100  *  \param graph : schema to execute
101  *  \param debug : display the graph with dot if debug == 1
102  *  \param fromScratch : if true the graph is reinitialized
103  *
104  *  Calls Scheduler::getNextTasks and Scheduler::selectRunnableTasks to select tasks to execute
105  *  
106  *  Calls Executor::launchTask to execute a selected Task.
107  *
108  *  Completion when graph is finished (Scheduler::isFinished)
109  */
110
111 void Executor::RunA(Scheduler *graph,int debug, bool fromScratch)
112 {
113   DEBTRACE("Executor::RunW debug: " << debug << " fromScratch: " << fromScratch);
114   _mainSched=graph;
115   _root = dynamic_cast<ComposedNode *>(_mainSched);
116   if (!_root) throw Exception("Executor::Run, Internal Error!");
117   bool isMore;
118   int i=0;
119   if(debug>1)_displayDot(graph);
120   if (fromScratch)
121     {
122       graph->init();
123       graph->exUpdateState();
124     }
125   if(debug>1)_displayDot(graph);
126   vector<Task *> tasks;
127   vector<Task *>::iterator iter;
128   _toContinue=true;
129   _execMode = YACS::CONTINUE;
130   _isWaitingEventsFromRunningTasks = false;
131   _numberOfRunningTasks = 0;
132   _runningTasks.clear();
133   _numberOfEndedTasks=0;
134   while(_toContinue)
135     {
136       sleepWhileNoEventsFromAnyRunningTask();
137
138       if(debug>2)_displayDot(graph);
139
140       {//Critical section
141         YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
142         tasks=graph->getNextTasks(isMore);
143         graph->selectRunnableTasks(tasks);
144       }//End of critical section
145
146       if(debug>2)_displayDot(graph);
147
148       for(iter=tasks.begin();iter!=tasks.end();iter++)
149         loadTask(*iter);
150
151       if(debug>1)_displayDot(graph);
152
153       launchTasks(tasks);
154
155       if(debug>1)_displayDot(graph);
156
157       {//Critical section
158         YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
159         _toContinue=!graph->isFinished();
160       }//End of critical section
161       DEBTRACE("_toContinue: " << _toContinue);
162
163       if(debug>0)_displayDot(graph);
164
165       i++;
166     }
167 }
168
169 //! Execute a graph with breakpoints or step by step
170 /*!
171  *  To be launch in a thread (main thread controls the progression).
172  *  \param graph : schema to execute
173  *  \param debug : display the graph with dot if debug >0
174  *  \param fromScratch : if false, state from a previous partial exection is already loaded
175  *
176  *  Calls Scheduler::getNextTasks and Scheduler::selectRunnableTasks to select tasks to execute
177  *  
178  *  Calls Executor::checkBreakPoints to verify if a pause is requested 
179  * 
180  *  Calls Executor::launchTask to execute a selected Task
181  *
182  *  Completion when graph is finished (Scheduler::isFinished)
183  *
184  *  States of execution:
185  *  - YACS::NOTYETINITIALIZED
186  *  - YACS::INITIALISED
187  *  - YACS::RUNNING            (to next breakpoint or step)
188  *  - YACS::WAITINGTASKS       (a breakpoint or step as been reached, but there are still running tasks)
189  *  - YACS::PAUSED             (a breakpoint or step as been reached, no more running tasks)
190  *  - YACS::FINISHED           (no more ready tasks, nore running tasks)
191  *  - YACS::STOPPED            (stopped by user before end)
192  *
193  *  Modes of Execution:
194  *  - YACS::CONTINUE           (normal run without breakpoints)
195  *  - YACS::STEPBYSTEP         (pause at each loop)
196  *  - YACS::STOPBEFORENODES    (pause when a node is reached)
197  *
198  *  A breakpoint is defined by a node name. The breakpoint is reached when the node becomes ready.
199  *  Step by Step means execution node by node or group of node by group of nodes.
200  *  At a given step, the user decides to launch all the ready nodes or only a subset
201  *  (Caution: some nodes must run in parallel). 
202  *  The next event (end of task) may give a new set of ready nodes, and define a new step.
203  *
204  *  The graph execution may be controled by a pilot which sends requests. Requests are asynchronous.
205  *  Requests are taken into account only on certain states, otherwise return the status IgnoredRequest.
206  *  - Executor::getCurrentExecMode
207  *  - Executor::getExecutorState
208  *  - Executor::setExecMode             : change the execution mode for next loop
209  *  - Executor::setListOfBreakPoints    : must be set before setting YACS::STOPBEFORENODES
210  *  - Executor::getTasksToLoad          : when paused or waiting tasks, get the list of next tasks
211  *  - Executor::setStepsToExecute       : define a subset of the list given by Executor::getTasksToLoad
212  *  - Executor::resumeCurrentBreakPoint : when paused or waiting tasks, resumes execution
213  *  - Executor::isNotFinished
214  *  - Executor::stopExecution           : stop execution asap, i.e. set STEPBYSTEP and wait PAUSED
215  *  - Executor::saveState               : dump the current state of execution in an xml file
216  *  - Executor::loadState               : Not yet implemented
217  *  - Executor::getNbOfThreads
218  *  - Executor::displayDot
219  *  - Executor::setStopOnError          : ask to stop execution if a node is found in ERROR state
220  *
221  *  If the pilot wants to wait the state YACS::PAUSED or YACS::WAITINGTASKS, synchronisation is obtained with:
222  *  - Executor::waitPause
223  *
224  *  TO BE VALIDATED:
225  *  - Pilot may connect to executor during execution, or deconnect.
226  *  - Several Pilots may be connected at the same time (for observation...)
227  * 
228  */
229
230 void Executor::RunB(Scheduler *graph,int debug, bool fromScratch)
231 {
232   DEBTRACE("Executor::RunB debug: "<< graph->getName() <<" "<< debug<<" fromScratch: "<<fromScratch);
233
234   { // --- Critical section
235     YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
236     _mainSched = graph;
237     _root = dynamic_cast<ComposedNode *>(_mainSched);
238     if (!_root) throw Exception("Executor::Run, Internal Error!");
239     _executorState = YACS::NOTYETINITIALIZED;
240     sendEvent("executor");
241     _toContinue=true;
242     _isOKToEnd = false;
243     _errorDetected = false;
244     _isWaitingEventsFromRunningTasks = false;
245     _numberOfRunningTasks = 0;
246     _runningTasks.clear();
247     _numberOfEndedTasks = 0;
248     string tracefile = "traceExec_";
249     tracefile += _mainSched->getName();
250     _trace.open(tracefile.c_str());
251 #ifdef WIN32
252    _start = timeGetTime();
253 #else
254     gettimeofday(&_start, NULL);
255 #endif
256
257   } // --- End of critical section
258
259   if (debug > 1) _displayDot(graph);
260
261   if (fromScratch)
262     {
263       try
264         {
265           graph->init();
266           graph->exUpdateState();
267         }
268       catch(Exception& ex)
269         {
270           DEBTRACE("exception: "<< (ex.what()));
271           _executorState = YACS::FINISHED;
272           sendEvent("executor");
273           throw;
274         }
275     }
276   _executorState = YACS::INITIALISED;
277   sendEvent("executor");
278
279   if (debug > 1) _displayDot(graph);
280
281   vector<Task *>::iterator iter;
282   bool isMore;
283   int problemCount=0;
284   int numberAllTasks;
285
286   _executorState = YACS::RUNNING;
287   sendEvent("executor");
288   while (_toContinue)
289     {
290       DEBTRACE("--- executor main loop");
291       sleepWhileNoEventsFromAnyRunningTask();
292       DEBTRACE("--- events...");
293       if (debug > 2) _displayDot(graph);
294       { // --- Critical section
295         YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
296         _tasks=graph->getNextTasks(isMore);
297         numberAllTasks=_numberOfRunningTasks+_tasks.size();
298         graph->selectRunnableTasks(_tasks);
299       } // --- End of critical section
300       if (debug > 2) _displayDot(graph);
301       if (_executorState == YACS::RUNNING)
302         {
303           if (checkBreakPoints()) break; // end of thread requested, OK to exit at once;
304           if (debug > 0) _displayDot(graph);
305           DEBTRACE("---");
306           for (iter = _tasks.begin(); iter != _tasks.end(); iter++)
307             loadTask(*iter);
308           if (debug > 1) _displayDot(graph);
309           DEBTRACE("---");
310           launchTasks(_tasks);
311           DEBTRACE("---");
312         }
313       if (debug > 1) _displayDot(graph);
314       { // --- Critical section
315         DEBTRACE("---");
316         YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
317         //It is possible that the graph is finished but it remains running tasks (it's an error but we must take it into account)
318         if(_numberOfRunningTasks == 0)
319           _toContinue = !graph->isFinished();
320
321         DEBTRACE("_numberOfRunningTasks: " << _numberOfRunningTasks);
322         DEBTRACE("_numberOfEndedTasks: " << _numberOfEndedTasks);
323         DEBTRACE("_toContinue: " << _toContinue);
324         if(_toContinue && numberAllTasks==0)
325           {
326             //Problem : no running tasks and no task to launch ??
327             problemCount++;
328             std::cerr << "Problem in Executor : no running tasks and no task to launch ?? problemCount=" << problemCount << std::endl;
329             //Pause to give a chance to interrupt
330             usleep(1000);
331             if(problemCount > 25)
332               {
333                 // Too much problems encountered : stop execution
334                 _toContinue=false;
335               }
336           }
337
338         if (! _toContinue)
339           {
340             _executorState = YACS::FINISHED;
341             sendEvent("executor");
342             _condForPilot.notify_all();
343           }
344       } // --- End of critical section
345       if (debug > 0) _displayDot(graph);
346       DEBTRACE("_toContinue: " << _toContinue);
347     }
348
349   DEBTRACE("End of main Loop");
350
351   { // --- Critical section
352     YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
353     if ( _toContinue) // --- break while(): request to stop detected on checkBreakPoints()
354       {
355         DEBTRACE("stop requested: End soon");
356         _executorState = YACS::STOPPED;
357         _toContinue = false;
358         sendEvent("executor");
359       }
360   } // --- End of critical section
361   if ( _dumpOnErrorRequested && _errorDetected)
362     {
363       saveState(_dumpErrorFile);
364     }
365   _trace.close();
366   DEBTRACE("End of RunB thread");  
367 }
368
369 YACS::ExecutionMode Executor::getCurrentExecMode()
370 {
371   _isRunningunderExternalControl=true;
372   return _execMode;
373 }
374
375
376 YACS::ExecutorState Executor::getExecutorState()
377 {
378   _isRunningunderExternalControl=true;
379   return _executorState;
380 }
381
382
383 bool Executor::isNotFinished()
384 {
385   _isRunningunderExternalControl=true;
386   return _toContinue;
387 }
388
389 //! ask to stop execution on the first node found in error
390 /*!
391  * \param dumpRequested   produce a state dump when an error is found
392  * \param xmlFile         name of file used for state dump
393  */
394
395 void Executor::setStopOnError(bool dumpRequested, std::string xmlFile)
396 {
397   { // --- Critical section
398     YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
399     _dumpErrorFile=xmlFile;
400     _stopOnErrorRequested=true;
401     _dumpOnErrorRequested = dumpRequested;
402     if (dumpRequested && xmlFile.empty())
403       throw YACS::Exception("dump on error requested and no filename given for dump");
404     DEBTRACE("_dumpErrorFile " << _dumpErrorFile << " " << _dumpOnErrorRequested);
405   } // --- End of critical section
406 }
407
408 //! ask to do not stop execution on nodes found in error
409 /*!
410  */
411
412 void Executor::unsetStopOnError()
413 {
414   { // --- Critical section
415     YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
416     _stopOnErrorRequested=false;
417   } // --- End of critical section
418 }
419
420 //! Dynamically set the current mode of execution
421 /*!
422  * The mode can be Continue, step by step, or stop before execution of a node
423  * defined in a list of breakpoints.
424  */
425
426 void Executor::setExecMode(YACS::ExecutionMode mode)
427 {
428   DEBTRACE("Executor::setExecMode(YACS::ExecutionMode mode) " << mode);
429   { // --- Critical section
430     YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
431     _isRunningunderExternalControl=true;
432     _execMode = mode;
433   } // --- End of critical section
434 }
435
436 //! wake up executor when in pause
437 /*!
438  * When Executor is in state paused or waiting for task completion, the thread
439  * running loop RunB waits on condition _condForStepByStep.
440  * Thread RunB is waken up.
441  * \return true when actually wakes up executor
442  */
443
444 bool Executor::resumeCurrentBreakPoint()
445 {
446   DEBTRACE("Executor::resumeCurrentBreakPoint()");
447   bool ret = false;
448   //bool doDump = false;
449   { // --- Critical section
450     YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
451     _isRunningunderExternalControl=true;
452     DEBTRACE("_executorState: " << _executorState);
453     switch (_executorState)
454       {
455       case YACS::WAITINGTASKS:
456       case YACS::PAUSED:
457         {
458           _condForStepByStep.notify_all();
459           _executorState = YACS::RUNNING;
460           sendEvent("executor");
461           ret = true;
462           //if (_dumpOnErrorRequested && _errorDetected) doDump =true;
463           break;
464         }
465       case YACS::FINISHED:
466       case YACS::STOPPED:
467         {
468           //if (_dumpOnErrorRequested && _errorDetected) doDump =true;
469           DEBTRACE("Graph Execution finished or stopped !");
470           break;
471         }
472       default :
473         {
474           // debug: no easy way to verify if main loop is acutally waiting on condition
475         }
476       }
477     DEBTRACE("---");
478     //if (doDump) saveState(_dumpErrorFile);
479   } // --- End of critical section
480   return ret;
481 }
482
483
484 //! define a list of nodes names as breakpoints in the graph
485
486
487 void Executor::setListOfBreakPoints(std::list<std::string> listOfBreakPoints)
488 {
489   DEBTRACE("Executor::setListOfBreakPoints(std::list<std::string> listOfBreakPoints)");
490   { // --- Critical section
491     YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
492     _isRunningunderExternalControl=true;
493     _listOfBreakPoints = listOfBreakPoints;
494   } // --- End of critical section
495 }
496
497
498 //! Get the list of tasks to load, to define a subset to execute in step by step mode
499 /*!
500  *  If the executor is not in mode YACS::WAITINGTASKS nor YACS::PAUSED, the list is empty.
501  *  Use Executor::waitPause to wait.
502  */
503 std::list<std::string> Executor::getTasksToLoad()
504 {
505   DEBTRACE("Executor::getTasksToLoad()");
506   list<string> listOfNodesToLoad;
507   listOfNodesToLoad.clear();
508   { // --- Critical section
509     YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
510     _isRunningunderExternalControl=true;
511     switch (_executorState)
512       {
513       case YACS::WAITINGTASKS:
514       case YACS::PAUSED:
515         {
516           listOfNodesToLoad = _listOfTasksToLoad;
517           break;
518         }
519       case YACS::NOTYETINITIALIZED:
520       case YACS::INITIALISED:
521       case YACS::RUNNING:
522       case YACS::FINISHED:
523       case YACS::STOPPED:
524       default:
525         {
526           break;
527         }
528       }
529   } // --- End of critical section
530   return listOfNodesToLoad;
531 }
532
533
534 //! Define a subset of task to execute in step by step mode
535 /*!
536  * Behaviour is unpredictable if the list is not a subset of the list given by Executor::getTasksToLoad
537  * in the current step.
538  * If some nodes must run in parallel, they must stay together in the list.
539  */
540
541 bool Executor::setStepsToExecute(std::list<std::string> listToExecute)
542 {
543   DEBTRACE("Executor::setStepsToExecute(std::list<std::string> listToExecute)");
544   bool ret = true;
545   vector<Task *>::iterator iter;
546   vector<Task *> restrictedTasks;
547   { // --- Critical section
548     YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
549     _isRunningunderExternalControl=true;
550     switch (_executorState)
551       {
552       case YACS::WAITINGTASKS:
553       case YACS::PAUSED:
554         {
555           for (iter=_tasksSave.begin(); iter!=_tasksSave.end(); iter++)
556             {
557               string readyNode = _mainSched->getTaskName(*iter);
558               if (find(listToExecute.begin(), listToExecute.end(), readyNode)
559                   != listToExecute.end())
560                 {
561                   restrictedTasks.push_back(*iter);
562                   DEBTRACE("node to execute " << readyNode);
563                 }
564             }
565           _tasks.clear();
566           for (iter=restrictedTasks.begin(); iter!=restrictedTasks.end(); iter++)
567             {
568               _tasks.push_back(*iter);
569             }
570           break;
571         }
572       case YACS::NOTYETINITIALIZED:
573       case YACS::INITIALISED:
574       case YACS::RUNNING:
575       case YACS::FINISHED:
576       case YACS::STOPPED:
577       default:
578         {
579           break;
580         }
581       }
582     } // --- End of critical section
583
584   _tasks.clear();
585   for (iter=restrictedTasks.begin(); iter!=restrictedTasks.end(); iter++)
586     {
587       _tasks.push_back(*iter);
588     }
589   for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
590     {
591       string readyNode = _mainSched->getTaskName(*iter);
592       DEBTRACE("selected node to execute " << readyNode);
593     }
594
595   return ret;
596 }
597
598 //! suspend pilot execution until Executor is in pause or waiting tasks completion mode.
599 /*!
600  *  Do nothing if execution is finished or in pause.
601  *  Wait first step if Executor is running or in initialization.
602  */
603
604 void Executor::waitPause()
605 {
606   DEBTRACE("Executor::waitPause()" << _executorState);
607   { // --- Critical section
608     YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
609     _isRunningunderExternalControl=true;
610     switch (_executorState)
611       {
612       default:
613       case YACS::STOPPED:
614       case YACS::FINISHED:
615       case YACS::WAITINGTASKS:
616       case YACS::PAUSED:
617         {
618           break;
619         }
620       case YACS::NOTYETINITIALIZED:
621       case YACS::INITIALISED:
622       case YACS::RUNNING:
623         {
624           _condForPilot.wait(_mutexForSchedulerUpdate); // wait until executor is PAUSED or WAITINGTASKS
625           break;
626         }
627       }
628   } // --- End of critical section
629   DEBTRACE("---");
630 }
631
632 //! stops the execution as soon as possible 
633
634 void Executor::stopExecution()
635 {
636   setExecMode(YACS::STEPBYSTEP);
637   //waitPause();
638   _isOKToEnd = true;
639   resumeCurrentBreakPoint();
640 }
641
642 //! save the current state of execution in an xml file
643
644 bool Executor::saveState(const std::string& xmlFile)
645 {
646   DEBTRACE("Executor::saveState() in " << xmlFile);
647   YACS::ENGINE::VisitorSaveState vst(_root);
648   vst.openFileDump(xmlFile.c_str());
649   _root->accept(&vst);
650   vst.closeFileDump();
651   return true;
652 }
653
654 //! not yet implemented
655
656 bool Executor::loadState()
657 {
658   DEBTRACE("Executor::loadState()");
659   _isRunningunderExternalControl=true;
660   return true;
661 }
662
663
664 static int isfile(const char *filename) 
665 {
666   struct stat buf;
667   if (stat(filename, &buf) != 0)
668     return 0;
669   if (!S_ISREG(buf.st_mode))
670     return 0;
671   return 1;
672 }
673
674 //! Display the graph state as a dot display, public method
675
676 void Executor::displayDot(Scheduler *graph)
677 {
678   _isRunningunderExternalControl=true;
679   _displayDot(graph);
680 }
681
682 //! Display the graph state as a dot display
683 /*!
684  *  \param graph  : the node to display
685  */
686
687 void Executor::_displayDot(Scheduler *graph)
688 {
689    std::ofstream g("titi");
690    ((ComposedNode*)graph)->writeDot(g);
691    g.close();
692    const char displayScript[]="display.sh";
693    if(isfile(displayScript))
694      system("sh display.sh");
695    else
696      system("dot -Tpng titi|display -delay 5");
697 }
698
699 //! Wait reactivation in modes Step By step or with BreakPoints
700 /*!
701  *  Check mode of execution (set by main thread):
702  *  - YACS::CONTINUE        : the graph execution continues.
703  *  - YACS::STEPBYSTEP      : wait on condition (pilot thread, Executor::resumeCurrentBreakPoint)
704  *  - YACS::STOPBEFORENODES : if there are ready nodes in a list of breakpoints,
705  *                            wait on condition (pilot thread, Executor::resumeCurrentBreakPoint)
706  *                            else continue the graph execution.
707  *  \return true if end of executor thread is requested
708  */
709
710 bool Executor::checkBreakPoints()
711 {
712   DEBTRACE("Executor::checkBreakPoints()");
713   vector<Task *>::iterator iter;
714   bool endRequested = false;
715
716   switch (_execMode)
717     {
718     case YACS::CONTINUE:
719       {
720         break;
721       }
722     case YACS::STOPBEFORENODES:
723       {
724         bool stop = false;
725         { // --- Critical section
726           YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
727           _tasksSave = _tasks;
728           for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
729             {
730               string nodeToLoad = _mainSched->getTaskName(*iter);
731               if (find(_listOfBreakPoints.begin(), _listOfBreakPoints.end(), nodeToLoad)
732                   != _listOfBreakPoints.end())
733                 {
734                   stop = true;
735                   break;
736                 }
737             }
738           if (stop)
739             {
740               _listOfTasksToLoad.clear();
741               for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
742                 {
743                   string nodeToLoad = _mainSched->getTaskName(*iter);
744                   _listOfTasksToLoad.push_back(nodeToLoad);
745                 }
746               if (getNbOfThreads())
747                 _executorState = YACS::WAITINGTASKS; // will be paused after completion of running tasks
748               else
749                 _executorState = YACS::PAUSED;
750               sendEvent("executor");
751               _condForPilot.notify_all();
752             }
753           if (stop && !_isOKToEnd) waitResume(); // wait until pilot calls resumeCurrentBreakPoint(), mutex released during wait 
754           if (_isOKToEnd) endRequested = true;
755         } // --- End of critical section
756           if (stop) DEBTRACE("wake up from waitResume");
757         break;
758       }
759     default:
760     case YACS::STEPBYSTEP:
761       {
762         { // --- Critical section
763           YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
764           _tasksSave = _tasks;
765           _listOfTasksToLoad.clear();
766           for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
767             {
768               string nodeToLoad = _mainSched->getTaskName(*iter);
769               _listOfTasksToLoad.push_back(nodeToLoad);
770             }
771           if (getNbOfThreads())
772             _executorState = YACS::WAITINGTASKS; // will be paused after completion of running tasks
773           else
774             _executorState = YACS::PAUSED;
775           sendEvent("executor");
776           _condForPilot.notify_all();
777           if (!_isOKToEnd)
778             waitResume(); // wait until pilot calls resumeCurrentBreakPoint(), mutex released during wait
779                           // or, if no pilot, wait until no more running tasks (stop on error)
780           if (_isOKToEnd) endRequested = true;
781         } // --- End of critical section
782         DEBTRACE("wake up from waitResume");
783         break;
784       }
785     }
786   DEBTRACE("endRequested: " << endRequested);
787   return endRequested;
788 }
789
790
791 //! in modes Step By step or with BreakPoint, wait until pilot resumes the execution
792 /*!
793  *  With the condition Mutex, the mutex is released atomically during the wait.
794  *  Pilot calls Executor::resumeCurrentBreakPoint to resume execution.
795  *  Must be called while mutex is locked.
796  */
797
798 void Executor::waitResume()
799 {
800   DEBTRACE("Executor::waitResume()");
801   _condForStepByStep.wait(_mutexForSchedulerUpdate); // wait until pilot calls resumeCurrentBreakPoint()
802   DEBTRACE("---");
803 }
804
805
806 //! Perform loading of a Task.
807 /*!
808  *  \param task  : Task to load
809  */
810
811 void Executor::loadTask(Task *task)
812 {
813   DEBTRACE("Executor::loadTask(Task *task)");
814   if(task->getState() != YACS::TOLOAD)return;
815   traceExec(task, "state:TOLOAD");
816   {//Critical section
817     YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
818     _mainSched->notifyFrom(task,YACS::START);
819   }//End of critical section
820   try
821     {
822       traceExec(task, "load");
823       task->load();
824       traceExec(task, "initService");
825       task->initService();
826     }
827   catch(Exception& ex) 
828     {
829       std::cerr << ex.what() << std::endl;
830       {//Critical section
831         YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
832         task->aborted();
833         _mainSched->notifyFrom(task,YACS::ABORT);
834         traceExec(task, "state:"+Node::getStateName(task->getState()));
835       }//End of critical section
836     }
837   catch(...) 
838     {
839       std::cerr << "Load failed" << std::endl;
840       {//Critical section
841         YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
842         task->aborted();
843         _mainSched->notifyFrom(task,YACS::ABORT);
844         traceExec(task, "state:"+Node::getStateName(task->getState()));
845       }//End of critical section
846     }
847 }
848
849
850 //! Execute a list of tasks possibly connected through datastream links
851 /*!
852  *  \param tasks  : a list of tasks to execute
853  *
854  */
855 void Executor::launchTasks(std::vector<Task *>& tasks)
856 {
857   vector<Task *>::iterator iter;
858   //First phase, make datastream connections
859   for(iter=tasks.begin();iter!=tasks.end();iter++)
860     {
861       YACS::StatesForNode state=(*iter)->getState();
862       if(state != YACS::TOLOAD && state != YACS::TORECONNECT)continue;
863       try
864         {
865           (*iter)->connectService();
866           traceExec(*iter, "connectService");
867           {//Critical section
868             YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
869             (*iter)->connected();
870           }//End of critical section
871         }
872       catch(Exception& ex) 
873         {
874           std::cerr << ex.what() << std::endl;
875           try
876             {
877               (*iter)->disconnectService();
878               traceExec(*iter, "disconnectService");
879             }
880           catch(...) 
881             {
882               // Disconnect has failed
883               traceExec(*iter, "disconnectService failed, ABORT");
884             }
885           {//Critical section
886             YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
887             (*iter)->aborted();
888             _mainSched->notifyFrom(*iter,YACS::ABORT);
889           }//End of critical section
890         }
891       catch(...) 
892         {
893           std::cerr << "Problem in connectService" << std::endl;
894           try
895             {
896               (*iter)->disconnectService();
897               traceExec(*iter, "disconnectService");
898             }
899           catch(...) 
900             {
901               // Disconnect has failed
902               traceExec(*iter, "disconnectService failed, ABORT");
903             }
904           {//Critical section
905             YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
906             (*iter)->aborted();
907             _mainSched->notifyFrom(*iter,YACS::ABORT);
908           }//End of critical section
909         }
910       if((*iter)->getState() == YACS::ERROR)
911         {
912           //try to put all coupled tasks in error
913           std::set<Task*> coupledSet;
914           (*iter)->getCoupledTasks(coupledSet);
915           for (std::set<Task*>::iterator it = coupledSet.begin(); it != coupledSet.end(); ++it)
916             {
917               Task* t=*it;
918               if(t == *iter)continue;
919               if(t->getState() == YACS::ERROR)continue;
920               try
921                 {
922                   t->disconnectService();
923                   traceExec(t, "disconnectService");
924                 }
925               catch(...)
926                 {
927                   // Disconnect has failed
928                   traceExec(t, "disconnectService failed, ABORT");
929                 }
930               {//Critical section
931                 YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
932                 t->aborted();
933                 _mainSched->notifyFrom(t,YACS::ABORT);
934               }//End of critical section
935               traceExec(t, "state:"+Node::getStateName(t->getState()));
936             }
937         }
938       traceExec(*iter, "state:"+Node::getStateName((*iter)->getState()));
939     }
940
941   //Second phase, execute each task in a thread
942   for(iter=tasks.begin();iter!=tasks.end();iter++)
943     {
944       launchTask(*iter);
945     }
946 }
947
948 struct threadargs {
949   Task *task;
950   Scheduler *sched;
951   Executor *execInst;
952 };
953
954 //! Execute a Task in a thread
955 /*!
956  *  \param task  : Task to execute
957  *
958  *  Calls Scheduler::notifyFrom of main node (_mainSched) to notify start
959  *
960  *  Calls Executor::functionForTaskExecution in Thread
961  */
962
963 void Executor::launchTask(Task *task)
964 {
965   DEBTRACE("Executor::launchTask(Task *task)");
966   struct threadargs *args;
967   if(task->getState() != YACS::TOACTIVATE)return;
968
969   DEBTRACE("before _semForMaxThreads.wait " << _semThreadCnt);
970   if(_semThreadCnt == 0)
971     {
972       //check if we have enough threads to run
973       std::set<Task*> tmpSet=_runningTasks;
974       std::set<Task*>::iterator it = tmpSet.begin();
975       std::string status="running";
976       std::set<Task*> coupledSet;
977       while( it != tmpSet.end() )
978         {
979           Task* tt=*it;
980           coupledSet.clear();
981           tt->getCoupledTasks(coupledSet);
982           status="running";
983           for (std::set<Task*>::iterator iter = coupledSet.begin(); iter != coupledSet.end(); ++iter)
984             {
985               if((*iter)->getState() == YACS::TOACTIVATE)status="toactivate";
986               tmpSet.erase(*iter);
987             }
988           if(status=="running")break;
989           it = tmpSet.begin();
990         }
991
992       if(status=="toactivate")
993         {
994           std::cerr << "WARNING: maybe you need more threads to run your schema (current value="<< _maxThreads << ")" << std::endl;
995           std::cerr << "If it is the case, set the YACS_MAX_THREADS environment variable to a bigger value (export YACS_MAX_THREADS=xxx)" << std::endl;
996         }
997     }
998
999   _semForMaxThreads.wait();
1000   _semThreadCnt -= 1;
1001
1002   args= new threadargs;
1003   args->task = task;
1004   args->sched = _mainSched;
1005   args->execInst = this;
1006
1007   traceExec(task, "launch");
1008
1009   { // --- Critical section
1010     YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
1011     _numberOfRunningTasks++;
1012     _runningTasks.insert(task);
1013     task->begin(); //change state to ACTIVATED
1014   } // --- End of critical section
1015   Thread(functionForTaskExecution, args, _threadStackSize);
1016 }
1017
1018 //! wait until a running task ends
1019
1020 void Executor::sleepWhileNoEventsFromAnyRunningTask()
1021 {
1022   DEBTRACE("Executor::sleepWhileNoEventsFromAnyRunningTask()");
1023 //   _semForNewTasksToPerform.wait(); //----utiliser pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
1024   YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
1025   if (_numberOfRunningTasks > 0 && _numberOfEndedTasks==0)
1026     {
1027       _isWaitingEventsFromRunningTasks = true;
1028       _condForNewTasksToPerform.wait(_mutexForSchedulerUpdate); // mutex released during wait
1029     }
1030   _numberOfEndedTasks=0;
1031   DEBTRACE("---");
1032 }
1033
1034 //! not implemented
1035
1036 void Executor::notifyEndOfThread(YACS::BASES::Thread *thread)
1037 {
1038   /*_mutexForNbOfConcurrentThreads.lock();
1039   _groupOfAllThreadsCreated.remove(thread);
1040   delete thread;
1041   _mutexForNbOfConcurrentThreads.unlock();*/
1042 }
1043
1044
1045 //! must be used protected by _mutexForSchedulerUpdate!
1046
1047 void Executor::wakeUp()
1048 {
1049   DEBTRACE("Executor::wakeUp() " << _isWaitingEventsFromRunningTasks);
1050   if (_isWaitingEventsFromRunningTasks)
1051     {
1052       _isWaitingEventsFromRunningTasks = false;
1053       _condForNewTasksToPerform.notify_all();
1054     }
1055   else
1056     _numberOfEndedTasks++;
1057 }
1058
1059 //! number of running tasks
1060
1061 int Executor::getNbOfThreads()
1062 {
1063   int ret;
1064   YACS::BASES::AutoLocker alck(&_mutexForNbOfConcurrentThreads);
1065   _isRunningunderExternalControl=true;
1066   ret = _groupOfAllThreadsCreated.size();
1067   return ret;
1068 }
1069
1070
1071 //! Function to perform execution of a task in a thread
1072 /*!
1073  *  \param arg  : 3 elements (a Task, a Scheduler, an Executor)
1074  *
1075  *  Calls Task::execute
1076  *
1077  *  Calls Task::finished when the task is finished
1078  *
1079  *  Calls (notify with event YACS::FINISH) Scheduler::notifyFrom when the task is finished
1080  *
1081  *  Calls Executor::wakeUp and Executor::notifyEndOfThread
1082  */
1083
1084 void *Executor::functionForTaskExecution(void *arg)
1085 {
1086   DEBTRACE("Executor::functionForTaskExecution(void *arg)");
1087
1088   struct threadargs *args = (struct threadargs *) arg;
1089   Task *task=args->task;
1090   Scheduler *sched=args->sched;
1091   Executor *execInst=args->execInst;
1092   delete args;
1093   execInst->traceExec(task, "state:"+Node::getStateName(task->getState()));
1094
1095   Thread::detach();
1096
1097   // Execute task
1098
1099   YACS::Event ev=YACS::FINISH;
1100   try
1101     {
1102       execInst->traceExec(task, "start execution");
1103       task->execute();
1104       execInst->traceExec(task, "end execution OK");
1105     }
1106   catch(Exception& ex)
1107     {
1108       std::cerr << "YACS Exception during execute" << std::endl;
1109       std::cerr << ex.what() << std::endl;
1110       ev=YACS::ABORT;
1111       string message = "end execution ABORT, ";
1112       message += ex.what();
1113       execInst->traceExec(task, message);
1114     }
1115   catch(...) 
1116     {
1117       // Execution has failed
1118       std::cerr << "Execution has failed: unknown reason" << std::endl;
1119       ev=YACS::ABORT;
1120       execInst->traceExec(task, "end execution ABORT, unknown reason");
1121     }
1122
1123   // Disconnect task
1124   try
1125     {
1126       DEBTRACE("task->disconnectService()");
1127       task->disconnectService();
1128       execInst->traceExec(task, "disconnectService");
1129     }
1130   catch(...) 
1131     {
1132       // Disconnect has failed
1133       std::cerr << "disconnect has failed" << std::endl;
1134       ev=YACS::ABORT;
1135       execInst->traceExec(task, "disconnectService failed, ABORT");
1136     }
1137
1138   DEBTRACE("End task->execute()");
1139   { // --- Critical section
1140     YACS::BASES::AutoLocker alck(&execInst->_mutexForSchedulerUpdate);
1141     try
1142       {
1143         if (ev == YACS::FINISH) task->finished();
1144         if (ev == YACS::ABORT)
1145           {
1146             execInst->_errorDetected = true;
1147             if (execInst->_stopOnErrorRequested)
1148               {
1149                 execInst->_execMode = YACS::STEPBYSTEP;
1150                 execInst->_isOKToEnd = true;
1151               }
1152             task->aborted();
1153           }
1154         execInst->traceExec(task, "state:"+Node::getStateName(task->getState()));
1155         sched->notifyFrom(task,ev);
1156       }
1157     catch(Exception& ex)
1158       {
1159         //notify has failed : it is supposed to have set state
1160         //so no need to do anything
1161         std::cerr << "Error during notification" << std::endl;
1162         std::cerr << ex.what() << std::endl;
1163       }
1164     catch(...)
1165       {
1166         //notify has failed : it is supposed to have set state
1167         //so no need to do anything
1168         std::cerr << "Notification failed" << std::endl;
1169       }
1170     execInst->_numberOfRunningTasks--;
1171     execInst->_runningTasks.erase(task);
1172     DEBTRACE("_numberOfRunningTasks: " << execInst->_numberOfRunningTasks 
1173              << " _execMode: " << execInst->_execMode
1174              << " _executorState: " << execInst->_executorState);
1175     if ((execInst->_numberOfRunningTasks == 0) && (execInst->_execMode != YACS::CONTINUE)) // no more running tasks
1176       {
1177         if (execInst->_executorState == YACS::WAITINGTASKS)
1178           {
1179             execInst->_executorState = YACS::PAUSED;
1180             execInst->sendEvent("executor");
1181             execInst->_condForPilot.notify_all();
1182             if (execInst->_errorDetected &&
1183                 execInst->_stopOnErrorRequested &&
1184                 !execInst->_isRunningunderExternalControl)
1185               execInst->_condForStepByStep.notify_all(); // exec thread may be on waitResume
1186           }
1187       }
1188     DEBTRACE("before _semForMaxThreads.post " << execInst->_semThreadCnt);
1189     execInst->_semForMaxThreads.post();
1190     execInst->_semThreadCnt += 1;
1191     DEBTRACE("after _semForMaxThreads.post " << execInst->_semThreadCnt);
1192     if (execInst->_executorState != YACS::PAUSED)  execInst->wakeUp();
1193
1194   } // --- End of critical section (change state)
1195
1196   //execInst->notifyEndOfThread(0);
1197   Thread::exit(0);
1198   return 0;
1199 }
1200
1201 void Executor::traceExec(Task *task, const std::string& message)
1202 {
1203   string nodeName = _mainSched->getTaskName(task);
1204   Container *cont = task->getContainer();
1205   string containerName = "---";
1206   string placement = "---";
1207   if (cont)
1208     {
1209       containerName = cont->getName();
1210       ComponentInstance *compo = task->getComponent();
1211       ServiceNode *taskCast(dynamic_cast<ServiceNode *>(task));
1212       if(taskCast)
1213         placement = cont->getFullPlacementId(taskCast);
1214     }
1215 #ifdef WIN32
1216   DWORD now = timeGetTime();
1217   double elapse = (now - _start)/1000.0;
1218 #else
1219   timeval now;
1220   gettimeofday(&now, NULL);
1221   double elapse = (now.tv_sec - _start.tv_sec) + double(now.tv_usec - _start.tv_usec)/1000000.0;
1222 #endif
1223   {
1224     YACS::BASES::AutoLocker alck(&_mutexForTrace);
1225     _trace << elapse << " " << containerName << " " << placement << " " << nodeName << " " << message << endl;
1226     _trace << flush;
1227   }
1228 }
1229
1230 //! emit notification to all observers registered with  the dispatcher 
1231 /*!
1232  * The dispatcher is unique and can be obtained by getDispatcher()
1233  */
1234 void Executor::sendEvent(const std::string& event)
1235 {
1236   Dispatcher* disp=Dispatcher::getDispatcher();
1237   YASSERT(disp);
1238   YASSERT(_root);
1239   disp->dispatch(_root,event);
1240 }