Salome HOME
Merge branch 'V8_5_BR'
[modules/yacs.git] / src / engine / Executor.cxx
1 // Copyright (C) 2006-2016  CEA/DEN, EDF R&D
2 //
3 // This library is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU Lesser General Public
5 // License as published by the Free Software Foundation; either
6 // version 2.1 of the License, or (at your option) any later version.
7 //
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
11 // Lesser General Public License for more details.
12 //
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
16 //
17 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
18 //
19
20 #include "Executor.hxx"
21 #include "Task.hxx"
22 #include "AutoLocker.hxx"
23 #include "Scheduler.hxx"
24 #include "Dispatcher.hxx"
25 #include "Container.hxx"
26 #include "HomogeneousPoolContainer.hxx"
27 #include "ComponentInstance.hxx"
28
29 #include "VisitorSaveState.hxx"
30 #include "ServiceNode.hxx"
31 #include "ComposedNode.hxx"
32
33 #include <iostream>
34 #include <fstream>
35 #include <sys/stat.h>
36 #ifndef WIN32
37 #include <sys/time.h>
38 #include <unistd.h>
39 #endif
40
41 #include <cstdlib>
42 #include <algorithm>
43
44 #ifdef WIN32
45 #define usleep(A) _sleep(A/1000)
46 #if !defined(S_ISCHR) || !defined(S_ISREG)
47 #  ifndef S_IFMT
48 #    ifdef _S_IFMT
49 #      define S_IFMT _S_IFMT
50 #      define S_IFCHR _S_IFCHR
51 #      define S_IFREG _S_IFREG
52 #    else
53 #    ifdef __S_IFMT
54 #      define S_IFMT __S_IFMT
55 #      define S_IFCHR __S_IFCHR
56 #      define S_IFREG __S_IFREG
57 #    endif
58 #    endif
59 #  endif
60 #  define S_ISCHR(mode) (((mode) & S_IFMT) == S_IFCHR)
61 #  define S_ISREG(mode) (((mode) & S_IFMT) == S_IFREG)
62 #endif
63 #endif
64
65 using namespace YACS::ENGINE;
66 using namespace std;
67
68 using YACS::BASES::Mutex;
69 using YACS::BASES::Thread;
70 using YACS::BASES::Semaphore;
71
72 //#define _DEVDEBUG_
73 #include "YacsTrace.hxx"
74
75 int Executor::_maxThreads(1000);
76 size_t Executor::_threadStackSize(262144); // Default thread stack size is 256 kB == 2**18 because threads launched by YACS are lightweight
77
78 Executor::Executor():_mainSched(NULL),_isWaitingEventsFromRunningTasks(false),_nbOfConcurrentThreads(0), _semForMaxThreads(_maxThreads),_keepGoingOnFail(false),_DPLScopeSensitive(false)
79 {
80   _root=0;
81   _toContinue = true;
82   _isOKToEnd = false;
83   _stopOnErrorRequested = false;
84   _dumpOnErrorRequested = false;
85   _errorDetected = false;
86   _isRunningunderExternalControl=false;
87   _executorState = YACS::NOTYETINITIALIZED;
88   _execMode = YACS::CONTINUE;
89   _semThreadCnt = _maxThreads;
90   _numberOfRunningTasks = 0;
91   _numberOfEndedTasks = 0;
92   DEBTRACE("Executor initialized with max threads = " << _maxThreads);
93 }
94
95 Executor::~Executor()
96 {
97   for(list<Thread *>::iterator iter=_groupOfAllThreadsCreated.begin();iter!=_groupOfAllThreadsCreated.end();iter++)
98     delete *iter;
99 }
100
101 //! Execute a graph waiting for completion
102 /*!
103  *  \param graph : schema to execute
104  *  \param debug : display the graph with dot if debug == 1
105  *  \param fromScratch : if true the graph is reinitialized
106  *
107  *  Calls Scheduler::getNextTasks and Scheduler::selectRunnableTasks to select tasks to execute
108  *  
109  *  Calls Executor::launchTask to execute a selected Task.
110  *
111  *  Completion when graph is finished (Scheduler::isFinished)
112  */
113
114 void Executor::RunA(Scheduler *graph,int debug, bool fromScratch)
115 {
116   DEBTRACE("Executor::RunW debug: " << debug << " fromScratch: " << fromScratch);
117   _mainSched=graph;
118   _root = dynamic_cast<ComposedNode *>(_mainSched);
119   if (!_root) throw Exception("Executor::Run, Internal Error!");
120   bool isMore;
121   int i=0;
122   if(debug>1)_displayDot(graph);
123   if (fromScratch)
124     {
125       graph->init();
126       graph->exUpdateState();
127     }
128   if(debug>1)_displayDot(graph);
129   vector<Task *> tasks;
130   vector<Task *>::iterator iter;
131   _toContinue=true;
132   _execMode = YACS::CONTINUE;
133   _isWaitingEventsFromRunningTasks = false;
134   _numberOfRunningTasks = 0;
135   _runningTasks.clear();
136   _numberOfEndedTasks=0;
137   while(_toContinue)
138     {
139       sleepWhileNoEventsFromAnyRunningTask();
140
141       if(debug>2)_displayDot(graph);
142
143       {//Critical section
144         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
145         tasks=graph->getNextTasks(isMore);
146         graph->selectRunnableTasks(tasks);
147       }//End of critical section
148
149       if(debug>2)_displayDot(graph);
150
151       for(iter=tasks.begin();iter!=tasks.end();iter++)
152         loadTask(*iter,this);
153
154       if(debug>1)_displayDot(graph);
155
156       launchTasks(tasks);
157
158       if(debug>1)_displayDot(graph);
159
160       {//Critical section
161         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
162         _toContinue=!graph->isFinished();
163       }//End of critical section
164       DEBTRACE("_toContinue: " << _toContinue);
165
166       if(debug>0)_displayDot(graph);
167
168       i++;
169     }
170 }
171
172 //! Execute a graph with breakpoints or step by step
173 /*!
174  *  To be launch in a thread (main thread controls the progression).
175  *  \param graph : schema to execute
176  *  \param debug : display the graph with dot if debug >0
177  *  \param fromScratch : if false, state from a previous partial exection is already loaded
178  *
179  *  Calls Scheduler::getNextTasks and Scheduler::selectRunnableTasks to select tasks to execute
180  *  
181  *  Calls Executor::checkBreakPoints to verify if a pause is requested 
182  * 
183  *  Calls Executor::launchTask to execute a selected Task
184  *
185  *  Completion when graph is finished (Scheduler::isFinished)
186  *
187  *  States of execution:
188  *  - YACS::NOTYETINITIALIZED
189  *  - YACS::INITIALISED
190  *  - YACS::RUNNING            (to next breakpoint or step)
191  *  - YACS::WAITINGTASKS       (a breakpoint or step as been reached, but there are still running tasks)
192  *  - YACS::PAUSED             (a breakpoint or step as been reached, no more running tasks)
193  *  - YACS::FINISHED           (no more ready tasks, nore running tasks)
194  *  - YACS::STOPPED            (stopped by user before end)
195  *
196  *  Modes of Execution:
197  *  - YACS::CONTINUE           (normal run without breakpoints)
198  *  - YACS::STEPBYSTEP         (pause at each loop)
199  *  - YACS::STOPBEFORENODES    (pause when a node is reached)
200  *
201  *  A breakpoint is defined by a node name. The breakpoint is reached when the node becomes ready.
202  *  Step by Step means execution node by node or group of node by group of nodes.
203  *  At a given step, the user decides to launch all the ready nodes or only a subset
204  *  (Caution: some nodes must run in parallel). 
205  *  The next event (end of task) may give a new set of ready nodes, and define a new step.
206  *
207  *  The graph execution may be controled by a pilot which sends requests. Requests are asynchronous.
208  *  Requests are taken into account only on certain states, otherwise return the status IgnoredRequest.
209  *  - Executor::getCurrentExecMode
210  *  - Executor::getExecutorState
211  *  - Executor::setExecMode             : change the execution mode for next loop
212  *  - Executor::setListOfBreakPoints    : must be set before setting YACS::STOPBEFORENODES
213  *  - Executor::getTasksToLoad          : when paused or waiting tasks, get the list of next tasks
214  *  - Executor::setStepsToExecute       : define a subset of the list given by Executor::getTasksToLoad
215  *  - Executor::resumeCurrentBreakPoint : when paused or waiting tasks, resumes execution
216  *  - Executor::isNotFinished
217  *  - Executor::stopExecution           : stop execution asap, i.e. set STEPBYSTEP and wait PAUSED
218  *  - Executor::saveState               : dump the current state of execution in an xml file
219  *  - Executor::loadState               : Not yet implemented
220  *  - Executor::getNbOfThreads
221  *  - Executor::displayDot
222  *  - Executor::setStopOnError          : ask to stop execution if a node is found in ERROR state
223  *
224  *  If the pilot wants to wait the state YACS::PAUSED or YACS::WAITINGTASKS, synchronisation is obtained with:
225  *  - Executor::waitPause
226  *
227  *  TO BE VALIDATED:
228  *  - Pilot may connect to executor during execution, or deconnect.
229  *  - Several Pilots may be connected at the same time (for observation...)
230  * 
231  */
232
233 void Executor::RunB(Scheduler *graph,int debug, bool fromScratch)
234 {
235   DEBTRACE("Executor::RunB debug: "<< graph->getName() <<" "<< debug<<" fromScratch: "<<fromScratch);
236
237   { // --- Critical section
238     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
239     _mainSched = graph;
240     _root = dynamic_cast<ComposedNode *>(_mainSched);
241     if (!_root) throw Exception("Executor::Run, Internal Error!");
242     _executorState = YACS::NOTYETINITIALIZED;
243     sendEvent("executor");
244     _toContinue=true;
245     _isOKToEnd = false;
246     _errorDetected = false;
247     _isWaitingEventsFromRunningTasks = false;
248     _numberOfRunningTasks = 0;
249     _runningTasks.clear();
250     _numberOfEndedTasks = 0;
251     string tracefile = "traceExec_";
252     tracefile += _mainSched->getName();
253     _trace.open(tracefile.c_str());
254 #ifdef WIN32
255    _start = timeGetTime();
256 #else
257     gettimeofday(&_start, NULL);
258 #endif
259
260   } // --- End of critical section
261
262   if (debug > 1) _displayDot(graph);
263
264   if (fromScratch)
265     {
266       try
267         {
268           graph->init();
269           graph->exUpdateState();
270         }
271       catch(Exception& ex)
272         {
273           DEBTRACE("exception: "<< (ex.what()));
274           _executorState = YACS::FINISHED;
275           sendEvent("executor");
276           throw;
277         }
278     }
279   _executorState = YACS::INITIALISED;
280   sendEvent("executor");
281
282   if (debug > 1) _displayDot(graph);
283
284   vector<Task *>::iterator iter;
285   bool isMore;
286   int problemCount=0;
287   int numberAllTasks;
288
289   _executorState = YACS::RUNNING;
290   sendEvent("executor");
291   while (_toContinue)
292     {
293       DEBTRACE("--- executor main loop");
294       sleepWhileNoEventsFromAnyRunningTask();
295       DEBTRACE("--- events...");
296       if (debug > 2) _displayDot(graph);
297       { // --- Critical section
298         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
299         _tasks=graph->getNextTasks(isMore);
300         graph->selectRunnableTasks(_tasks);
301         FilterTasksConsideringContainers(_tasks);
302         numberAllTasks=_numberOfRunningTasks+_tasks.size();
303       } // --- End of critical section
304       if (debug > 2) _displayDot(graph);
305       if (_executorState == YACS::RUNNING)
306         {
307           if (checkBreakPoints()) break; // end of thread requested, OK to exit at once;
308           if (debug > 0) _displayDot(graph);
309           DEBTRACE("---");
310           //loadTasks(_tasks);
311           loadParallelTasks(_tasks,this);
312           if (debug > 1) _displayDot(graph);
313           DEBTRACE("---");
314           launchTasks(_tasks);
315           DEBTRACE("---");
316         }
317       if (debug > 1) _displayDot(graph);
318       { // --- Critical section
319         DEBTRACE("---");
320         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
321         //It is possible that the graph is finished but it remains running tasks (it's an error but we must take it into account)
322         if(_numberOfRunningTasks == 0)
323           _toContinue = !graph->isFinished();
324
325         DEBTRACE("_numberOfRunningTasks: " << _numberOfRunningTasks);
326         DEBTRACE("_numberOfEndedTasks: " << _numberOfEndedTasks);
327         DEBTRACE("_toContinue: " << _toContinue);
328         if(_toContinue && numberAllTasks==0)
329           {
330             //Problem : no running tasks and no task to launch ??
331             problemCount++;
332             std::cerr << "Problem in Executor : no running tasks and no task to launch ?? problemCount=" << problemCount << std::endl;
333             //Pause to give a chance to interrupt
334             usleep(1000);
335             if(problemCount > 25)
336               {
337                 // Too much problems encountered : stop execution
338                 _toContinue=false;
339               }
340           }
341
342         if (! _toContinue)
343           {
344             _executorState = YACS::FINISHED;
345             sendEvent("executor");
346             _condForPilot.notify_all();
347           }
348       } // --- End of critical section
349       if (debug > 0) _displayDot(graph);
350       DEBTRACE("_toContinue: " << _toContinue);
351     }
352
353   DEBTRACE("End of main Loop");
354
355   { // --- Critical section
356     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
357     if ( _toContinue) // --- break while(): request to stop detected on checkBreakPoints()
358       {
359         DEBTRACE("stop requested: End soon");
360         _executorState = YACS::STOPPED;
361         _toContinue = false;
362         sendEvent("executor");
363       }
364   } // --- End of critical section
365   if ( _dumpOnErrorRequested && _errorDetected)
366     {
367       saveState(_dumpErrorFile);
368     }
369   {
370     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForTrace);
371     _trace.close();
372   }
373   DEBTRACE("End of RunB thread");  
374 }
375
376 YACS::ExecutionMode Executor::getCurrentExecMode()
377 {
378   _isRunningunderExternalControl=true;
379   return _execMode;
380 }
381
382
383 YACS::ExecutorState Executor::getExecutorState()
384 {
385   _isRunningunderExternalControl=true;
386   return _executorState;
387 }
388
389
390 bool Executor::isNotFinished()
391 {
392   _isRunningunderExternalControl=true;
393   return _toContinue;
394 }
395
396 //! ask to stop execution on the first node found in error
397 /*!
398  * \param dumpRequested   produce a state dump when an error is found
399  * \param xmlFile         name of file used for state dump
400  */
401
402 void Executor::setStopOnError(bool dumpRequested, std::string xmlFile)
403 {
404   { // --- Critical section
405     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
406     _dumpErrorFile=xmlFile;
407     _stopOnErrorRequested=true;
408     _dumpOnErrorRequested = dumpRequested;
409     if (dumpRequested && xmlFile.empty())
410       throw YACS::Exception("dump on error requested and no filename given for dump");
411     DEBTRACE("_dumpErrorFile " << _dumpErrorFile << " " << _dumpOnErrorRequested);
412   } // --- End of critical section
413 }
414
415 //! ask to do not stop execution on nodes found in error
416 /*!
417  */
418
419 void Executor::unsetStopOnError()
420 {
421   { // --- Critical section
422     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
423     _stopOnErrorRequested=false;
424   } // --- End of critical section
425 }
426
427 //! Dynamically set the current mode of execution
428 /*!
429  * The mode can be Continue, step by step, or stop before execution of a node
430  * defined in a list of breakpoints.
431  */
432
433 void Executor::setExecMode(YACS::ExecutionMode mode)
434 {
435   DEBTRACE("Executor::setExecMode(YACS::ExecutionMode mode) " << mode);
436   { // --- Critical section
437     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
438     _isRunningunderExternalControl=true;
439     _execMode = mode;
440   } // --- End of critical section
441 }
442
443 //! wake up executor when in pause
444 /*!
445  * When Executor is in state paused or waiting for task completion, the thread
446  * running loop RunB waits on condition _condForStepByStep.
447  * Thread RunB is waken up.
448  * \return true when actually wakes up executor
449  */
450
451 bool Executor::resumeCurrentBreakPoint()
452 {
453   DEBTRACE("Executor::resumeCurrentBreakPoint()");
454   bool ret = false;
455   //bool doDump = false;
456   { // --- Critical section
457     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
458     _isRunningunderExternalControl=true;
459     DEBTRACE("_executorState: " << _executorState);
460     switch (_executorState)
461       {
462       case YACS::WAITINGTASKS:
463       case YACS::PAUSED:
464         {
465           _condForStepByStep.notify_all();
466           _executorState = YACS::RUNNING;
467           sendEvent("executor");
468           ret = true;
469           //if (_dumpOnErrorRequested && _errorDetected) doDump =true;
470           break;
471         }
472       case YACS::FINISHED:
473       case YACS::STOPPED:
474         {
475           //if (_dumpOnErrorRequested && _errorDetected) doDump =true;
476           DEBTRACE("Graph Execution finished or stopped !");
477           break;
478         }
479       default :
480         {
481           // debug: no easy way to verify if main loop is acutally waiting on condition
482         }
483       }
484     DEBTRACE("---");
485     //if (doDump) saveState(_dumpErrorFile);
486   } // --- End of critical section
487   return ret;
488 }
489
490
491 //! define a list of nodes names as breakpoints in the graph
492
493
494 void Executor::setListOfBreakPoints(std::list<std::string> listOfBreakPoints)
495 {
496   DEBTRACE("Executor::setListOfBreakPoints(std::list<std::string> listOfBreakPoints)");
497   { // --- Critical section
498     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
499     _isRunningunderExternalControl=true;
500     _listOfBreakPoints = listOfBreakPoints;
501   } // --- End of critical section
502 }
503
504
505 //! Get the list of tasks to load, to define a subset to execute in step by step mode
506 /*!
507  *  If the executor is not in mode YACS::WAITINGTASKS nor YACS::PAUSED, the list is empty.
508  *  Use Executor::waitPause to wait.
509  */
510 std::list<std::string> Executor::getTasksToLoad()
511 {
512   DEBTRACE("Executor::getTasksToLoad()");
513   list<string> listOfNodesToLoad;
514   listOfNodesToLoad.clear();
515   { // --- Critical section
516     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
517     _isRunningunderExternalControl=true;
518     switch (_executorState)
519       {
520       case YACS::WAITINGTASKS:
521       case YACS::PAUSED:
522         {
523           listOfNodesToLoad = _listOfTasksToLoad;
524           break;
525         }
526       case YACS::NOTYETINITIALIZED:
527       case YACS::INITIALISED:
528       case YACS::RUNNING:
529       case YACS::FINISHED:
530       case YACS::STOPPED:
531       default:
532         {
533           break;
534         }
535       }
536   } // --- End of critical section
537   return listOfNodesToLoad;
538 }
539
540
541 //! Define a subset of task to execute in step by step mode
542 /*!
543  * Behaviour is unpredictable if the list is not a subset of the list given by Executor::getTasksToLoad
544  * in the current step.
545  * If some nodes must run in parallel, they must stay together in the list.
546  */
547
548 bool Executor::setStepsToExecute(std::list<std::string> listToExecute)
549 {
550   DEBTRACE("Executor::setStepsToExecute(std::list<std::string> listToExecute)");
551   bool ret = true;
552   vector<Task *>::iterator iter;
553   vector<Task *> restrictedTasks;
554   { // --- Critical section
555     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
556     _isRunningunderExternalControl=true;
557     switch (_executorState)
558       {
559       case YACS::WAITINGTASKS:
560       case YACS::PAUSED:
561         {
562           for (iter=_tasksSave.begin(); iter!=_tasksSave.end(); iter++)
563             {
564               string readyNode = _mainSched->getTaskName(*iter);
565               if (find(listToExecute.begin(), listToExecute.end(), readyNode)
566                   != listToExecute.end())
567                 {
568                   restrictedTasks.push_back(*iter);
569                   DEBTRACE("node to execute " << readyNode);
570                 }
571             }
572           _tasks.clear();
573           for (iter=restrictedTasks.begin(); iter!=restrictedTasks.end(); iter++)
574             {
575               _tasks.push_back(*iter);
576             }
577           break;
578         }
579       case YACS::NOTYETINITIALIZED:
580       case YACS::INITIALISED:
581       case YACS::RUNNING:
582       case YACS::FINISHED:
583       case YACS::STOPPED:
584       default:
585         {
586           break;
587         }
588       }
589     } // --- End of critical section
590
591   _tasks.clear();
592   for (iter=restrictedTasks.begin(); iter!=restrictedTasks.end(); iter++)
593     {
594       _tasks.push_back(*iter);
595     }
596   for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
597     {
598       string readyNode = _mainSched->getTaskName(*iter);
599       DEBTRACE("selected node to execute " << readyNode);
600     }
601
602   return ret;
603 }
604
605 //! suspend pilot execution until Executor is in pause or waiting tasks completion mode.
606 /*!
607  *  Do nothing if execution is finished or in pause.
608  *  Wait first step if Executor is running or in initialization.
609  */
610
611 void Executor::waitPause()
612 {
613   DEBTRACE("Executor::waitPause()" << _executorState);
614   { // --- Critical section
615     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
616     _isRunningunderExternalControl=true;
617     switch (_executorState)
618       {
619       default:
620       case YACS::STOPPED:
621       case YACS::FINISHED:
622       case YACS::WAITINGTASKS:
623       case YACS::PAUSED:
624         {
625           break;
626         }
627       case YACS::NOTYETINITIALIZED:
628       case YACS::INITIALISED:
629       case YACS::RUNNING:
630         {
631           _condForPilot.wait(_mutexForSchedulerUpdate); // wait until executor is PAUSED or WAITINGTASKS
632           break;
633         }
634       }
635   } // --- End of critical section
636   DEBTRACE("---");
637 }
638
639 /*!
640  * This method can be called at any time simultaneously during a RunB call.
641  * This method will wait until the executor is locked in a consistent state of a running graph.
642  *
643  * This method is expected to be called in association with resume method.
644  * The returned parameter is expected to be transfered to resume method.
645  */
646 bool Executor::suspendASAP()
647 {
648   // no AutoLocker here. It's not a bug.
649   _mutexForSchedulerUpdate.lock();
650   if(!_toContinue && _executorState==YACS::FINISHED)
651     {// execution is finished
652       _mutexForSchedulerUpdate.unLock();
653       return false;// the executor is no more running
654     }
655   //general case. Leave method with locker in locked status
656   return true;
657 }
658
659 /*!
660  * This method is expected to be called in association with suspendASAP method.
661  * Expected to be called just after suspendASAP with output of resume as input parameter
662  */
663 void Executor::resume(bool suspended)
664 {
665   if(suspended)
666     _mutexForSchedulerUpdate.unLock();
667 }
668
669 //! stops the execution as soon as possible 
670
671 void Executor::stopExecution()
672 {
673   setExecMode(YACS::STEPBYSTEP);
674   //waitPause();
675   _isOKToEnd = true;
676   resumeCurrentBreakPoint();
677 }
678
679 //! save the current state of execution in an xml file
680
681 bool Executor::saveState(const std::string& xmlFile)
682 {
683   DEBTRACE("Executor::saveState() in " << xmlFile);
684   bool result = false;
685   try {
686     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
687     YACS::ENGINE::VisitorSaveState vst(_root);
688     vst.openFileDump(xmlFile.c_str());
689     _root->accept(&vst);
690     vst.closeFileDump();
691     result = true;
692   }
693   catch(Exception& ex) {
694     std::cerr << ex.what() << std::endl;
695   }
696   return result;
697 }
698
699 //! not yet implemented
700
701 bool Executor::loadState()
702 {
703   DEBTRACE("Executor::loadState()");
704   _isRunningunderExternalControl=true;
705   return true;
706 }
707
708
709 static int isfile(const char *filename) 
710 {
711   struct stat buf;
712   if (stat(filename, &buf) != 0)
713     return 0;
714   if (!S_ISREG(buf.st_mode))
715     return 0;
716   return 1;
717 }
718
719 //! Display the graph state as a dot display, public method
720
721 void Executor::displayDot(Scheduler *graph)
722 {
723   _isRunningunderExternalControl=true;
724   _displayDot(graph);
725 }
726
727 //! Display the graph state as a dot display
728 /*!
729  *  \param graph  : the node to display
730  */
731
732 void Executor::_displayDot(Scheduler *graph)
733 {
734    std::ofstream g("titi");
735    ((ComposedNode*)graph)->writeDot(g);
736    g.close();
737    const char displayScript[]="display.sh";
738    if(isfile(displayScript))
739      system("sh display.sh");
740    else
741      system("dot -Tpng titi|display -delay 5");
742 }
743
744 //! Wait reactivation in modes Step By step or with BreakPoints
745 /*!
746  *  Check mode of execution (set by main thread):
747  *  - YACS::CONTINUE        : the graph execution continues.
748  *  - YACS::STEPBYSTEP      : wait on condition (pilot thread, Executor::resumeCurrentBreakPoint)
749  *  - YACS::STOPBEFORENODES : if there are ready nodes in a list of breakpoints,
750  *                            wait on condition (pilot thread, Executor::resumeCurrentBreakPoint)
751  *                            else continue the graph execution.
752  *  \return true if end of executor thread is requested
753  */
754
755 bool Executor::checkBreakPoints()
756 {
757   DEBTRACE("Executor::checkBreakPoints()");
758   vector<Task *>::iterator iter;
759   bool endRequested = false;
760
761   switch (_execMode)
762     {
763     case YACS::CONTINUE:
764       {
765         break;
766       }
767     case YACS::STOPBEFORENODES:
768       {
769         bool stop = false;
770         { // --- Critical section
771           YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
772           _tasksSave = _tasks;
773           for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
774             {
775               string nodeToLoad = _mainSched->getTaskName(*iter);
776               if (find(_listOfBreakPoints.begin(), _listOfBreakPoints.end(), nodeToLoad)
777                   != _listOfBreakPoints.end())
778                 {
779                   stop = true;
780                   break;
781                 }
782             }
783           if (stop)
784             {
785               _listOfTasksToLoad.clear();
786               for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
787                 {
788                   string nodeToLoad = _mainSched->getTaskName(*iter);
789                   _listOfTasksToLoad.push_back(nodeToLoad);
790                 }
791               if (getNbOfThreads())
792                 _executorState = YACS::WAITINGTASKS; // will be paused after completion of running tasks
793               else
794                 _executorState = YACS::PAUSED;
795               sendEvent("executor");
796               _condForPilot.notify_all();
797             }
798           if (stop && !_isOKToEnd) waitResume(); // wait until pilot calls resumeCurrentBreakPoint(), mutex released during wait 
799           if (_isOKToEnd) endRequested = true;
800         } // --- End of critical section
801           if (stop) DEBTRACE("wake up from waitResume");
802         break;
803       }
804     default:
805     case YACS::STEPBYSTEP:
806       {
807         { // --- Critical section
808           YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
809           _tasksSave = _tasks;
810           _listOfTasksToLoad.clear();
811           for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
812             {
813               string nodeToLoad = _mainSched->getTaskName(*iter);
814               _listOfTasksToLoad.push_back(nodeToLoad);
815             }
816           if (getNbOfThreads())
817             _executorState = YACS::WAITINGTASKS; // will be paused after completion of running tasks
818           else
819             _executorState = YACS::PAUSED;
820           sendEvent("executor");
821           _condForPilot.notify_all();
822           if (!_isOKToEnd)
823             waitResume(); // wait until pilot calls resumeCurrentBreakPoint(), mutex released during wait
824                           // or, if no pilot, wait until no more running tasks (stop on error)
825           if (_isOKToEnd) endRequested = true;
826         } // --- End of critical section
827         DEBTRACE("wake up from waitResume");
828         break;
829       }
830     }
831   DEBTRACE("endRequested: " << endRequested);
832   return endRequested;
833 }
834
835
836 //! in modes Step By step or with BreakPoint, wait until pilot resumes the execution
837 /*!
838  *  With the condition Mutex, the mutex is released atomically during the wait.
839  *  Pilot calls Executor::resumeCurrentBreakPoint to resume execution.
840  *  Must be called while mutex is locked.
841  */
842
843 void Executor::waitResume()
844 {
845   DEBTRACE("Executor::waitResume()");
846   _condForStepByStep.wait(_mutexForSchedulerUpdate); // wait until pilot calls resumeCurrentBreakPoint()
847   DEBTRACE("---");
848 }
849
850
851 //! Perform loading of a Task.
852 /*!
853  *  \param task  : Task to load
854  */
855
856 void Executor::loadTask(Task *task, const Executor *execInst)
857 {
858   DEBTRACE("Executor::loadTask(Task *task)");
859   if(task->getState() != YACS::TOLOAD)
860     return;
861   traceExec(task, "state:TOLOAD", ComputePlacement(task));
862   {//Critical section
863     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
864     _mainSched->notifyFrom(task,YACS::START,execInst);
865   }//End of critical section
866   try
867     {
868       traceExec(task, "load", ComputePlacement(task));
869       task->load();
870       traceExec(task, "initService", ComputePlacement(task));
871       task->initService();
872     }
873   catch(Exception& ex) 
874     {
875       std::cerr << ex.what() << std::endl;
876       {//Critical section
877         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
878         task->aborted();
879         _mainSched->notifyFrom(task,YACS::ABORT,execInst);
880         traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
881       }//End of critical section
882     }
883   catch(...) 
884     {
885       std::cerr << "Load failed" << std::endl;
886       {//Critical section
887         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
888         task->aborted();
889         _mainSched->notifyFrom(task,YACS::ABORT,execInst);
890         traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
891       }//End of critical section
892     }
893 }
894
895 struct threadargs
896 {
897   Task *task;
898   Scheduler *sched;
899   Executor *execInst;
900 };
901
902 void Executor::loadTasks(const std::vector<Task *>& tasks, const Executor *execInst)
903 {
904   for(std::vector<Task *>::const_iterator iter = _tasks.begin(); iter != _tasks.end(); iter++)
905     loadTask(*iter,execInst);
906 }
907
908 void Executor::loadParallelTasks(const std::vector<Task *>& tasks, const Executor *execInst)
909 {
910   std::vector<Thread> ths(tasks.size());
911   std::size_t ithread(0);
912   for(std::vector<Task *>::const_iterator iter = _tasks.begin(); iter != _tasks.end(); iter++, ithread++)
913     {
914       DEBTRACE("Executor::loadParallelTasks(Task *task)");
915       struct threadargs *args(new threadargs);
916       args->task = (*iter);
917       args->sched = _mainSched;
918       args->execInst = this;
919       ths[ithread].go(functionForTaskLoad, args, _threadStackSize);
920     }
921   for(ithread=0;ithread<tasks.size();ithread++)
922     ths[ithread].join();
923 }
924
925 //! Execute a list of tasks possibly connected through datastream links
926 /*!
927  *  \param tasks  : a list of tasks to execute
928  *
929  */
930 void Executor::launchTasks(const std::vector<Task *>& tasks)
931 {
932   //First phase, make datastream connections
933   for(vector<Task *>::const_iterator iter=tasks.begin();iter!=tasks.end();iter++)
934     {
935       YACS::StatesForNode state=(*iter)->getState();
936       if(state != YACS::TOLOAD && state != YACS::TORECONNECT)continue;
937       try
938         {
939           (*iter)->connectService();
940           traceExec(*iter, "connectService",ComputePlacement(*iter));
941           {//Critical section
942             YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
943             (*iter)->connected();
944           }//End of critical section
945         }
946       catch(Exception& ex) 
947         {
948           std::cerr << ex.what() << std::endl;
949           try
950             {
951               (*iter)->disconnectService();
952               traceExec(*iter, "disconnectService",ComputePlacement(*iter));
953             }
954           catch(...) 
955             {
956               // Disconnect has failed
957               traceExec(*iter, "disconnectService failed, ABORT",ComputePlacement(*iter));
958             }
959           {//Critical section
960             YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
961             (*iter)->aborted();
962             _mainSched->notifyFrom(*iter,YACS::ABORT,this);
963           }//End of critical section
964         }
965       catch(...) 
966         {
967           std::cerr << "Problem in connectService" << std::endl;
968           try
969             {
970               (*iter)->disconnectService();
971               traceExec(*iter, "disconnectService",ComputePlacement(*iter));
972             }
973           catch(...) 
974             {
975               // Disconnect has failed
976               traceExec(*iter, "disconnectService failed, ABORT",ComputePlacement(*iter));
977             }
978           {//Critical section
979             YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
980             (*iter)->aborted();
981             _mainSched->notifyFrom(*iter,YACS::ABORT,this);
982           }//End of critical section
983         }
984       if((*iter)->getState() == YACS::ERROR)
985         {
986           //try to put all coupled tasks in error
987           std::set<Task*> coupledSet;
988           (*iter)->getCoupledTasks(coupledSet);
989           for (std::set<Task*>::iterator it = coupledSet.begin(); it != coupledSet.end(); ++it)
990             {
991               Task* t=*it;
992               if(t == *iter)continue;
993               if(t->getState() == YACS::ERROR)continue;
994               try
995                 {
996                   t->disconnectService();
997                   traceExec(t, "disconnectService",ComputePlacement(*iter));
998                 }
999               catch(...)
1000                 {
1001                   // Disconnect has failed
1002                   traceExec(t, "disconnectService failed, ABORT",ComputePlacement(*iter));
1003                 }
1004               {//Critical section
1005                 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1006                 t->aborted();
1007                 _mainSched->notifyFrom(t,YACS::ABORT,this);
1008               }//End of critical section
1009               traceExec(t, "state:"+Node::getStateName(t->getState()),ComputePlacement(*iter));
1010             }
1011         }
1012       traceExec(*iter, "state:"+Node::getStateName((*iter)->getState()),ComputePlacement(*iter));
1013     }
1014
1015   //Second phase, execute each task in a thread
1016   for(vector<Task *>::const_iterator iter=tasks.begin();iter!=tasks.end();iter++)
1017     {
1018       launchTask(*iter);
1019     }
1020 }
1021
1022 //! Execute a Task in a thread
1023 /*!
1024  *  \param task  : Task to execute
1025  *
1026  *  Calls Scheduler::notifyFrom of main node (_mainSched) to notify start
1027  *
1028  *  Calls Executor::functionForTaskExecution in Thread
1029  */
1030
1031 void Executor::launchTask(Task *task)
1032 {
1033   DEBTRACE("Executor::launchTask(Task *task)");
1034   struct threadargs *args;
1035   if(task->getState() != YACS::TOACTIVATE)return;
1036
1037   DEBTRACE("before _semForMaxThreads.wait " << _semThreadCnt);
1038   if(_semThreadCnt == 0)
1039     {
1040       // --- Critical section
1041       YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1042       //check if we have enough threads to run
1043       std::set<Task*> tmpSet=_runningTasks;
1044       std::set<Task*>::iterator it = tmpSet.begin();
1045       std::string status="running";
1046       std::set<Task*> coupledSet;
1047       while( it != tmpSet.end() )
1048         {
1049           Task* tt=*it;
1050           coupledSet.clear();
1051           tt->getCoupledTasks(coupledSet);
1052           status="running";
1053           for (std::set<Task*>::iterator iter = coupledSet.begin(); iter != coupledSet.end(); ++iter)
1054             {
1055               if((*iter)->getState() == YACS::TOACTIVATE)status="toactivate";
1056               tmpSet.erase(*iter);
1057             }
1058           if(status=="running")break;
1059           it = tmpSet.begin();
1060         }
1061
1062       if(status=="toactivate")
1063         {
1064           std::cerr << "WARNING: maybe you need more threads to run your schema (current value="<< _maxThreads << ")" << std::endl;
1065           std::cerr << "If it is the case, set the YACS_MAX_THREADS environment variable to a bigger value (export YACS_MAX_THREADS=xxx)" << std::endl;
1066         }
1067       // --- End of critical section
1068     }
1069
1070   _semForMaxThreads.wait();
1071   _semThreadCnt -= 1;
1072
1073   args= new threadargs;
1074   args->task = task;
1075   args->sched = _mainSched;
1076   args->execInst = this;
1077
1078   traceExec(task, "launch",ComputePlacement(task));
1079
1080   { // --- Critical section
1081     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1082     _numberOfRunningTasks++;
1083     _runningTasks.insert(task);
1084     task->begin(); //change state to ACTIVATED
1085   } // --- End of critical section
1086   Thread(functionForTaskExecution, args, _threadStackSize);
1087 }
1088
1089 //! wait until a running task ends
1090
1091 void Executor::sleepWhileNoEventsFromAnyRunningTask()
1092 {
1093   DEBTRACE("Executor::sleepWhileNoEventsFromAnyRunningTask()");
1094 //   _semForNewTasksToPerform.wait(); //----utiliser pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
1095   YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1096   if (_numberOfRunningTasks > 0 && _numberOfEndedTasks==0)
1097     {
1098       _isWaitingEventsFromRunningTasks = true;
1099       _condForNewTasksToPerform.wait(_mutexForSchedulerUpdate); // mutex released during wait
1100     }
1101   _numberOfEndedTasks=0;
1102   DEBTRACE("---");
1103 }
1104
1105 //! not implemented
1106
1107 void Executor::notifyEndOfThread(YACS::BASES::Thread *thread)
1108 {
1109   /*_mutexForNbOfConcurrentThreads.lock();
1110   _groupOfAllThreadsCreated.remove(thread);
1111   delete thread;
1112   _mutexForNbOfConcurrentThreads.unlock();*/
1113 }
1114
1115
1116 //! must be used protected by _mutexForSchedulerUpdate!
1117
1118 void Executor::wakeUp()
1119 {
1120   DEBTRACE("Executor::wakeUp() " << _isWaitingEventsFromRunningTasks);
1121   if (_isWaitingEventsFromRunningTasks)
1122     {
1123       _isWaitingEventsFromRunningTasks = false;
1124       _condForNewTasksToPerform.notify_all();
1125     }
1126   else
1127     _numberOfEndedTasks++;
1128 }
1129
1130 //! number of running tasks
1131
1132 int Executor::getNbOfThreads()
1133 {
1134   int ret;
1135   YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForNbOfConcurrentThreads);
1136   _isRunningunderExternalControl=true;
1137   ret = _groupOfAllThreadsCreated.size();
1138   return ret;
1139 }
1140
1141 /*!
1142  * This thread is NOT supposed to be detached !
1143  */
1144 void *Executor::functionForTaskLoad(void *arg)
1145 {
1146   DEBTRACE("Executor::functionForTaskLoad(void *arg)");
1147   struct threadargs *args = (struct threadargs *) arg;
1148   Task *task=args->task;
1149   Scheduler *sched=args->sched;
1150   Executor *execInst=args->execInst;
1151   delete args;
1152   execInst->loadTask(task,execInst);// no throw of this method - all throw are catched !
1153   return 0;
1154 }
1155
1156 //! Function to perform execution of a task in a thread
1157 /*!
1158  *  \param arg  : 3 elements (a Task, a Scheduler, an Executor)
1159  *
1160  *  Calls Task::execute
1161  *
1162  *  Calls Task::finished when the task is finished
1163  *
1164  *  Calls (notify with event YACS::FINISH) Scheduler::notifyFrom when the task is finished
1165  *
1166  *  Calls Executor::wakeUp and Executor::notifyEndOfThread
1167  */
1168
1169 void *Executor::functionForTaskExecution(void *arg)
1170 {
1171   DEBTRACE("Executor::functionForTaskExecution(void *arg)");
1172
1173   struct threadargs *args = (struct threadargs *) arg;
1174   Task *task=args->task;
1175   Scheduler *sched=args->sched;
1176   Executor *execInst=args->execInst;
1177   delete args;
1178   execInst->traceExec(task, "state:"+Node::getStateName(task->getState()),ComputePlacement(task));
1179
1180   Thread::detach();
1181
1182   // Execute task
1183
1184   if(execInst->getDPLScopeSensitive())
1185     {
1186       Node *node(dynamic_cast<Node *>(task));
1187       ComposedNode *gfn(dynamic_cast<ComposedNode *>(sched));
1188       if(node!=0 && gfn!=0)
1189         node->applyDPLScope(gfn);
1190     }
1191
1192   YACS::Event ev=YACS::FINISH;
1193   try
1194     {
1195       execInst->traceExec(task, "start execution",ComputePlacement(task));
1196       task->execute();
1197       execInst->traceExec(task, "end execution OK",ComputePlacement(task));
1198     }
1199   catch(Exception& ex)
1200     {
1201       std::cerr << "YACS Exception during execute" << std::endl;
1202       std::cerr << ex.what() << std::endl;
1203       ev=YACS::ABORT;
1204       string message = "end execution ABORT, ";
1205       message += ex.what();
1206       execInst->traceExec(task, message,ComputePlacement(task));
1207     }
1208   catch(...) 
1209     {
1210       // Execution has failed
1211       std::cerr << "Execution has failed: unknown reason" << std::endl;
1212       ev=YACS::ABORT;
1213       execInst->traceExec(task, "end execution ABORT, unknown reason",ComputePlacement(task));
1214     }
1215
1216   // Disconnect task
1217   try
1218     {
1219       DEBTRACE("task->disconnectService()");
1220       task->disconnectService();
1221       execInst->traceExec(task, "disconnectService",ComputePlacement(task));
1222     }
1223   catch(...) 
1224     {
1225       // Disconnect has failed
1226       std::cerr << "disconnect has failed" << std::endl;
1227       ev=YACS::ABORT;
1228       execInst->traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
1229     }
1230   //
1231
1232   std::string placement(ComputePlacement(task));
1233
1234   // container management for HomogeneousPoolOfContainer
1235
1236   HomogeneousPoolContainer *contC(dynamic_cast<HomogeneousPoolContainer *>(task->getContainer()));
1237   if(contC)
1238     {
1239       YACS::BASES::AutoLocker<Container> alckCont(contC);
1240       contC->release(task);
1241     }
1242
1243   DEBTRACE("End task->execute()");
1244   { // --- Critical section
1245     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&execInst->_mutexForSchedulerUpdate);
1246     try
1247       {
1248         if (ev == YACS::FINISH) task->finished();
1249         if (ev == YACS::ABORT)
1250           {
1251             execInst->_errorDetected = true;
1252             if (execInst->_stopOnErrorRequested)
1253               {
1254                 execInst->_execMode = YACS::STEPBYSTEP;
1255                 execInst->_isOKToEnd = true;
1256               }
1257             task->aborted();
1258           }
1259         execInst->traceExec(task, "state:"+Node::getStateName(task->getState()),placement);
1260         sched->notifyFrom(task,ev,execInst);
1261       }
1262     catch(Exception& ex)
1263       {
1264         //notify has failed : it is supposed to have set state
1265         //so no need to do anything
1266         std::cerr << "Error during notification" << std::endl;
1267         std::cerr << ex.what() << std::endl;
1268       }
1269     catch(...)
1270       {
1271         //notify has failed : it is supposed to have set state
1272         //so no need to do anything
1273         std::cerr << "Notification failed" << std::endl;
1274       }
1275     execInst->_numberOfRunningTasks--;
1276     execInst->_runningTasks.erase(task);
1277     DEBTRACE("_numberOfRunningTasks: " << execInst->_numberOfRunningTasks 
1278              << " _execMode: " << execInst->_execMode
1279              << " _executorState: " << execInst->_executorState);
1280     if ((execInst->_numberOfRunningTasks == 0) && (execInst->_execMode != YACS::CONTINUE)) // no more running tasks
1281       {
1282         if (execInst->_executorState == YACS::WAITINGTASKS)
1283           {
1284             execInst->_executorState = YACS::PAUSED;
1285             execInst->sendEvent("executor");
1286             execInst->_condForPilot.notify_all();
1287             if (execInst->_errorDetected &&
1288                 execInst->_stopOnErrorRequested &&
1289                 !execInst->_isRunningunderExternalControl)
1290               execInst->_condForStepByStep.notify_all(); // exec thread may be on waitResume
1291           }
1292       }
1293     DEBTRACE("before _semForMaxThreads.post " << execInst->_semThreadCnt);
1294     execInst->_semForMaxThreads.post();
1295     execInst->_semThreadCnt += 1;
1296     DEBTRACE("after _semForMaxThreads.post " << execInst->_semThreadCnt);
1297     if (execInst->_executorState != YACS::PAUSED)  execInst->wakeUp();
1298
1299   } // --- End of critical section (change state)
1300
1301   //execInst->notifyEndOfThread(0);
1302   Thread::exit(0);
1303   return 0;
1304 }
1305
1306 void Executor::traceExec(Task *task, const std::string& message, const std::string& placement)
1307 {
1308   string nodeName = _mainSched->getTaskName(task);
1309   Container *cont = task->getContainer();
1310   string containerName = "---";
1311   if (cont)
1312     containerName = cont->getName();
1313
1314 #ifdef WIN32
1315   DWORD now = timeGetTime();
1316   double elapse = (now - _start)/1000.0;
1317 #else
1318   timeval now;
1319   gettimeofday(&now, NULL);
1320   double elapse = (now.tv_sec - _start.tv_sec) + double(now.tv_usec - _start.tv_usec)/1000000.0;
1321 #endif
1322   {
1323     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForTrace);
1324     _trace << elapse << " " << containerName << " " << placement << " " << nodeName << " " << message << endl;
1325     _trace << flush;
1326   }
1327 }
1328
1329 //! emit notification to all observers registered with  the dispatcher 
1330 /*!
1331  * The dispatcher is unique and can be obtained by getDispatcher()
1332  */
1333 void Executor::sendEvent(const std::string& event)
1334 {
1335   Dispatcher* disp=Dispatcher::getDispatcher();
1336   YASSERT(disp);
1337   YASSERT(_root);
1338   disp->dispatch(_root,event);
1339 }
1340
1341 /*!
1342  * This method takes in input a list of tasks and selects from that lists a part of it considering only the containers.
1343  * If tasks have no container instance subclass of HomogeneousPoolContainer this method will let the \a tsks untouched.
1344  *
1345  * \param [in,out] tsks - list of tasks to be
1346  */
1347 void Executor::FilterTasksConsideringContainers(std::vector<Task *>& tsks)
1348 {
1349   std::map<HomogeneousPoolContainer *, std::vector<Task *> > m;
1350   for(std::vector<Task *>::const_iterator it=tsks.begin();it!=tsks.end();it++)
1351     {
1352       Task *cur(*it);
1353       if(!cur)
1354         continue;
1355       Container *cont(cur->getContainer());
1356       if(!cont)
1357         {
1358           m[(HomogeneousPoolContainer *)NULL].push_back(cur);
1359           continue;
1360         }
1361       HomogeneousPoolContainer *contC(dynamic_cast<HomogeneousPoolContainer *>(cont));
1362       if(!contC)
1363         {
1364           m[(HomogeneousPoolContainer *)NULL].push_back(cur);
1365           continue;
1366         }
1367       m[contC].push_back(cur);
1368     }
1369   //
1370   std::vector<Task *> ret;
1371   for(std::map<HomogeneousPoolContainer *, std::vector<Task *> >::const_iterator it=m.begin();it!=m.end();it++)
1372     {
1373       HomogeneousPoolContainer *curhpc((*it).first);
1374       const std::vector<Task *>& curtsks((*it).second);
1375       if(!curhpc)
1376         {
1377           ret.insert(ret.end(),curtsks.begin(),curtsks.end());
1378         }
1379       else
1380         {
1381           // start of critical section for container curhpc
1382           YACS::BASES::AutoLocker<Container> alckForCont(curhpc);
1383           std::vector<const Task *> vecOfTaskSharingSameHPContToBeRunSimutaneously;
1384           std::size_t sz(curhpc->getNumberOfFreePlace());
1385           std::vector<Task *>::const_iterator it2(curtsks.begin());
1386           for(std::size_t i=0;i<sz && it2!=curtsks.end();i++,it2++)
1387             {
1388               vecOfTaskSharingSameHPContToBeRunSimutaneously.push_back(*it2);
1389               ret.push_back(*it2);
1390             }
1391           curhpc->allocateFor(vecOfTaskSharingSameHPContToBeRunSimutaneously);
1392           //end of critical section
1393         }
1394     }
1395   //
1396   tsks=ret;
1397 }
1398
1399 std::string Executor::ComputePlacement(Task *zeTask)
1400 {
1401   std::string placement("---");
1402   if(!zeTask)
1403     return placement;
1404   if(zeTask->getContainer())
1405     placement=zeTask->getContainer()->getFullPlacementId(zeTask);
1406   return placement;
1407 }