Salome HOME
Roll back for the keepgoing feature.
[modules/yacs.git] / src / engine / Executor.cxx
1 // Copyright (C) 2006-2016  CEA/DEN, EDF R&D
2 //
3 // This library is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU Lesser General Public
5 // License as published by the Free Software Foundation; either
6 // version 2.1 of the License, or (at your option) any later version.
7 //
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
11 // Lesser General Public License for more details.
12 //
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
16 //
17 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
18 //
19
20 #include "Executor.hxx"
21 #include "Task.hxx"
22 #include "AutoLocker.hxx"
23 #include "Scheduler.hxx"
24 #include "Dispatcher.hxx"
25 #include "Container.hxx"
26 #include "HomogeneousPoolContainer.hxx"
27 #include "ComponentInstance.hxx"
28
29 #include "VisitorSaveState.hxx"
30 #include "ServiceNode.hxx"
31 #include "ComposedNode.hxx"
32
33 #include <iostream>
34 #include <fstream>
35 #include <sys/stat.h>
36 #ifndef WIN32
37 #include <sys/time.h>
38 #include <unistd.h>
39 #endif
40
41 #include <cstdlib>
42 #include <algorithm>
43
44 #ifdef WIN32
45 #define usleep(A) _sleep(A/1000)
46 #if !defined(S_ISCHR) || !defined(S_ISREG)
47 #  ifndef S_IFMT
48 #    ifdef _S_IFMT
49 #      define S_IFMT _S_IFMT
50 #      define S_IFCHR _S_IFCHR
51 #      define S_IFREG _S_IFREG
52 #    else
53 #    ifdef __S_IFMT
54 #      define S_IFMT __S_IFMT
55 #      define S_IFCHR __S_IFCHR
56 #      define S_IFREG __S_IFREG
57 #    endif
58 #    endif
59 #  endif
60 #  define S_ISCHR(mode) (((mode) & S_IFMT) == S_IFCHR)
61 #  define S_ISREG(mode) (((mode) & S_IFMT) == S_IFREG)
62 #endif
63 #endif
64
65 using namespace YACS::ENGINE;
66 using namespace std;
67
68 using YACS::BASES::Mutex;
69 using YACS::BASES::Thread;
70 using YACS::BASES::Semaphore;
71
72 //#define _DEVDEBUG_
73 #include "YacsTrace.hxx"
74
75 int Executor::_maxThreads(1000);
76 size_t Executor::_threadStackSize(262144); // Default thread stack size is 256 kB == 2**18 because threads launched by YACS are lightweight
77
78 Executor::Executor():_nbOfConcurrentThreads(0), _semForMaxThreads(_maxThreads),_keepGoingOnFail(false),_DPLScopeSensitive(false)
79 {
80   _root=0;
81   _toContinue = true;
82   _isOKToEnd = false;
83   _stopOnErrorRequested = false;
84   _dumpOnErrorRequested = false;
85   _errorDetected = false;
86   _isRunningunderExternalControl=false;
87   _executorState = YACS::NOTYETINITIALIZED;
88   _execMode = YACS::CONTINUE;
89   _semThreadCnt = _maxThreads;
90   DEBTRACE("Executor initialized with max threads = " << _maxThreads);
91 }
92
93 Executor::~Executor()
94 {
95   for(list<Thread *>::iterator iter=_groupOfAllThreadsCreated.begin();iter!=_groupOfAllThreadsCreated.end();iter++)
96     delete *iter;
97 }
98
99 //! Execute a graph waiting for completion
100 /*!
101  *  \param graph : schema to execute
102  *  \param debug : display the graph with dot if debug == 1
103  *  \param fromScratch : if true the graph is reinitialized
104  *
105  *  Calls Scheduler::getNextTasks and Scheduler::selectRunnableTasks to select tasks to execute
106  *  
107  *  Calls Executor::launchTask to execute a selected Task.
108  *
109  *  Completion when graph is finished (Scheduler::isFinished)
110  */
111
112 void Executor::RunA(Scheduler *graph,int debug, bool fromScratch)
113 {
114   DEBTRACE("Executor::RunW debug: " << debug << " fromScratch: " << fromScratch);
115   _mainSched=graph;
116   _root = dynamic_cast<ComposedNode *>(_mainSched);
117   if (!_root) throw Exception("Executor::Run, Internal Error!");
118   bool isMore;
119   int i=0;
120   if(debug>1)_displayDot(graph);
121   if (fromScratch)
122     {
123       graph->init();
124       graph->exUpdateState();
125     }
126   if(debug>1)_displayDot(graph);
127   vector<Task *> tasks;
128   vector<Task *>::iterator iter;
129   _toContinue=true;
130   _execMode = YACS::CONTINUE;
131   _isWaitingEventsFromRunningTasks = false;
132   _numberOfRunningTasks = 0;
133   _runningTasks.clear();
134   _numberOfEndedTasks=0;
135   while(_toContinue)
136     {
137       sleepWhileNoEventsFromAnyRunningTask();
138
139       if(debug>2)_displayDot(graph);
140
141       {//Critical section
142         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
143         tasks=graph->getNextTasks(isMore);
144         graph->selectRunnableTasks(tasks);
145       }//End of critical section
146
147       if(debug>2)_displayDot(graph);
148
149       for(iter=tasks.begin();iter!=tasks.end();iter++)
150         loadTask(*iter,this);
151
152       if(debug>1)_displayDot(graph);
153
154       launchTasks(tasks);
155
156       if(debug>1)_displayDot(graph);
157
158       {//Critical section
159         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
160         _toContinue=!graph->isFinished();
161       }//End of critical section
162       DEBTRACE("_toContinue: " << _toContinue);
163
164       if(debug>0)_displayDot(graph);
165
166       i++;
167     }
168 }
169
170 //! Execute a graph with breakpoints or step by step
171 /*!
172  *  To be launch in a thread (main thread controls the progression).
173  *  \param graph : schema to execute
174  *  \param debug : display the graph with dot if debug >0
175  *  \param fromScratch : if false, state from a previous partial exection is already loaded
176  *
177  *  Calls Scheduler::getNextTasks and Scheduler::selectRunnableTasks to select tasks to execute
178  *  
179  *  Calls Executor::checkBreakPoints to verify if a pause is requested 
180  * 
181  *  Calls Executor::launchTask to execute a selected Task
182  *
183  *  Completion when graph is finished (Scheduler::isFinished)
184  *
185  *  States of execution:
186  *  - YACS::NOTYETINITIALIZED
187  *  - YACS::INITIALISED
188  *  - YACS::RUNNING            (to next breakpoint or step)
189  *  - YACS::WAITINGTASKS       (a breakpoint or step as been reached, but there are still running tasks)
190  *  - YACS::PAUSED             (a breakpoint or step as been reached, no more running tasks)
191  *  - YACS::FINISHED           (no more ready tasks, nore running tasks)
192  *  - YACS::STOPPED            (stopped by user before end)
193  *
194  *  Modes of Execution:
195  *  - YACS::CONTINUE           (normal run without breakpoints)
196  *  - YACS::STEPBYSTEP         (pause at each loop)
197  *  - YACS::STOPBEFORENODES    (pause when a node is reached)
198  *
199  *  A breakpoint is defined by a node name. The breakpoint is reached when the node becomes ready.
200  *  Step by Step means execution node by node or group of node by group of nodes.
201  *  At a given step, the user decides to launch all the ready nodes or only a subset
202  *  (Caution: some nodes must run in parallel). 
203  *  The next event (end of task) may give a new set of ready nodes, and define a new step.
204  *
205  *  The graph execution may be controled by a pilot which sends requests. Requests are asynchronous.
206  *  Requests are taken into account only on certain states, otherwise return the status IgnoredRequest.
207  *  - Executor::getCurrentExecMode
208  *  - Executor::getExecutorState
209  *  - Executor::setExecMode             : change the execution mode for next loop
210  *  - Executor::setListOfBreakPoints    : must be set before setting YACS::STOPBEFORENODES
211  *  - Executor::getTasksToLoad          : when paused or waiting tasks, get the list of next tasks
212  *  - Executor::setStepsToExecute       : define a subset of the list given by Executor::getTasksToLoad
213  *  - Executor::resumeCurrentBreakPoint : when paused or waiting tasks, resumes execution
214  *  - Executor::isNotFinished
215  *  - Executor::stopExecution           : stop execution asap, i.e. set STEPBYSTEP and wait PAUSED
216  *  - Executor::saveState               : dump the current state of execution in an xml file
217  *  - Executor::loadState               : Not yet implemented
218  *  - Executor::getNbOfThreads
219  *  - Executor::displayDot
220  *  - Executor::setStopOnError          : ask to stop execution if a node is found in ERROR state
221  *
222  *  If the pilot wants to wait the state YACS::PAUSED or YACS::WAITINGTASKS, synchronisation is obtained with:
223  *  - Executor::waitPause
224  *
225  *  TO BE VALIDATED:
226  *  - Pilot may connect to executor during execution, or deconnect.
227  *  - Several Pilots may be connected at the same time (for observation...)
228  * 
229  */
230
231 void Executor::RunB(Scheduler *graph,int debug, bool fromScratch)
232 {
233   DEBTRACE("Executor::RunB debug: "<< graph->getName() <<" "<< debug<<" fromScratch: "<<fromScratch);
234
235   { // --- Critical section
236     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
237     _mainSched = graph;
238     _root = dynamic_cast<ComposedNode *>(_mainSched);
239     if (!_root) throw Exception("Executor::Run, Internal Error!");
240     _executorState = YACS::NOTYETINITIALIZED;
241     sendEvent("executor");
242     _toContinue=true;
243     _isOKToEnd = false;
244     _errorDetected = false;
245     _isWaitingEventsFromRunningTasks = false;
246     _numberOfRunningTasks = 0;
247     _runningTasks.clear();
248     _numberOfEndedTasks = 0;
249     string tracefile = "traceExec_";
250     tracefile += _mainSched->getName();
251     _trace.open(tracefile.c_str());
252 #ifdef WIN32
253    _start = timeGetTime();
254 #else
255     gettimeofday(&_start, NULL);
256 #endif
257
258   } // --- End of critical section
259
260   if (debug > 1) _displayDot(graph);
261
262   if (fromScratch)
263     {
264       try
265         {
266           graph->init();
267           graph->exUpdateState();
268         }
269       catch(Exception& ex)
270         {
271           DEBTRACE("exception: "<< (ex.what()));
272           _executorState = YACS::FINISHED;
273           sendEvent("executor");
274           throw;
275         }
276     }
277   _executorState = YACS::INITIALISED;
278   sendEvent("executor");
279
280   if (debug > 1) _displayDot(graph);
281
282   vector<Task *>::iterator iter;
283   bool isMore;
284   int problemCount=0;
285   int numberAllTasks;
286
287   _executorState = YACS::RUNNING;
288   sendEvent("executor");
289   while (_toContinue)
290     {
291       DEBTRACE("--- executor main loop");
292       sleepWhileNoEventsFromAnyRunningTask();
293       DEBTRACE("--- events...");
294       if (debug > 2) _displayDot(graph);
295       { // --- Critical section
296         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
297         _tasks=graph->getNextTasks(isMore);
298         graph->selectRunnableTasks(_tasks);
299         FilterTasksConsideringContainers(_tasks);
300         numberAllTasks=_numberOfRunningTasks+_tasks.size();
301       } // --- End of critical section
302       if (debug > 2) _displayDot(graph);
303       if (_executorState == YACS::RUNNING)
304         {
305           if (checkBreakPoints()) break; // end of thread requested, OK to exit at once;
306           if (debug > 0) _displayDot(graph);
307           DEBTRACE("---");
308           //loadTasks(_tasks);
309           loadParallelTasks(_tasks,this);
310           if (debug > 1) _displayDot(graph);
311           DEBTRACE("---");
312           launchTasks(_tasks);
313           DEBTRACE("---");
314         }
315       if (debug > 1) _displayDot(graph);
316       { // --- Critical section
317         DEBTRACE("---");
318         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
319         //It is possible that the graph is finished but it remains running tasks (it's an error but we must take it into account)
320         if(_numberOfRunningTasks == 0)
321           _toContinue = !graph->isFinished();
322
323         DEBTRACE("_numberOfRunningTasks: " << _numberOfRunningTasks);
324         DEBTRACE("_numberOfEndedTasks: " << _numberOfEndedTasks);
325         DEBTRACE("_toContinue: " << _toContinue);
326         if(_toContinue && numberAllTasks==0)
327           {
328             //Problem : no running tasks and no task to launch ??
329             problemCount++;
330             std::cerr << "Problem in Executor : no running tasks and no task to launch ?? problemCount=" << problemCount << std::endl;
331             //Pause to give a chance to interrupt
332             usleep(1000);
333             if(problemCount > 25)
334               {
335                 // Too much problems encountered : stop execution
336                 _toContinue=false;
337               }
338           }
339
340         if (! _toContinue)
341           {
342             _executorState = YACS::FINISHED;
343             sendEvent("executor");
344             _condForPilot.notify_all();
345           }
346       } // --- End of critical section
347       if (debug > 0) _displayDot(graph);
348       DEBTRACE("_toContinue: " << _toContinue);
349     }
350
351   DEBTRACE("End of main Loop");
352
353   { // --- Critical section
354     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
355     if ( _toContinue) // --- break while(): request to stop detected on checkBreakPoints()
356       {
357         DEBTRACE("stop requested: End soon");
358         _executorState = YACS::STOPPED;
359         _toContinue = false;
360         sendEvent("executor");
361       }
362   } // --- End of critical section
363   if ( _dumpOnErrorRequested && _errorDetected)
364     {
365       saveState(_dumpErrorFile);
366     }
367   {
368     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForTrace);
369     _trace.close();
370   }
371   DEBTRACE("End of RunB thread");  
372 }
373
374 YACS::ExecutionMode Executor::getCurrentExecMode()
375 {
376   _isRunningunderExternalControl=true;
377   return _execMode;
378 }
379
380
381 YACS::ExecutorState Executor::getExecutorState()
382 {
383   _isRunningunderExternalControl=true;
384   return _executorState;
385 }
386
387
388 bool Executor::isNotFinished()
389 {
390   _isRunningunderExternalControl=true;
391   return _toContinue;
392 }
393
394 //! ask to stop execution on the first node found in error
395 /*!
396  * \param dumpRequested   produce a state dump when an error is found
397  * \param xmlFile         name of file used for state dump
398  */
399
400 void Executor::setStopOnError(bool dumpRequested, std::string xmlFile)
401 {
402   { // --- Critical section
403     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
404     _dumpErrorFile=xmlFile;
405     _stopOnErrorRequested=true;
406     _dumpOnErrorRequested = dumpRequested;
407     if (dumpRequested && xmlFile.empty())
408       throw YACS::Exception("dump on error requested and no filename given for dump");
409     DEBTRACE("_dumpErrorFile " << _dumpErrorFile << " " << _dumpOnErrorRequested);
410   } // --- End of critical section
411 }
412
413 //! ask to do not stop execution on nodes found in error
414 /*!
415  */
416
417 void Executor::unsetStopOnError()
418 {
419   { // --- Critical section
420     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
421     _stopOnErrorRequested=false;
422   } // --- End of critical section
423 }
424
425 //! Dynamically set the current mode of execution
426 /*!
427  * The mode can be Continue, step by step, or stop before execution of a node
428  * defined in a list of breakpoints.
429  */
430
431 void Executor::setExecMode(YACS::ExecutionMode mode)
432 {
433   DEBTRACE("Executor::setExecMode(YACS::ExecutionMode mode) " << mode);
434   { // --- Critical section
435     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
436     _isRunningunderExternalControl=true;
437     _execMode = mode;
438   } // --- End of critical section
439 }
440
441 //! wake up executor when in pause
442 /*!
443  * When Executor is in state paused or waiting for task completion, the thread
444  * running loop RunB waits on condition _condForStepByStep.
445  * Thread RunB is waken up.
446  * \return true when actually wakes up executor
447  */
448
449 bool Executor::resumeCurrentBreakPoint()
450 {
451   DEBTRACE("Executor::resumeCurrentBreakPoint()");
452   bool ret = false;
453   //bool doDump = false;
454   { // --- Critical section
455     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
456     _isRunningunderExternalControl=true;
457     DEBTRACE("_executorState: " << _executorState);
458     switch (_executorState)
459       {
460       case YACS::WAITINGTASKS:
461       case YACS::PAUSED:
462         {
463           _condForStepByStep.notify_all();
464           _executorState = YACS::RUNNING;
465           sendEvent("executor");
466           ret = true;
467           //if (_dumpOnErrorRequested && _errorDetected) doDump =true;
468           break;
469         }
470       case YACS::FINISHED:
471       case YACS::STOPPED:
472         {
473           //if (_dumpOnErrorRequested && _errorDetected) doDump =true;
474           DEBTRACE("Graph Execution finished or stopped !");
475           break;
476         }
477       default :
478         {
479           // debug: no easy way to verify if main loop is acutally waiting on condition
480         }
481       }
482     DEBTRACE("---");
483     //if (doDump) saveState(_dumpErrorFile);
484   } // --- End of critical section
485   return ret;
486 }
487
488
489 //! define a list of nodes names as breakpoints in the graph
490
491
492 void Executor::setListOfBreakPoints(std::list<std::string> listOfBreakPoints)
493 {
494   DEBTRACE("Executor::setListOfBreakPoints(std::list<std::string> listOfBreakPoints)");
495   { // --- Critical section
496     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
497     _isRunningunderExternalControl=true;
498     _listOfBreakPoints = listOfBreakPoints;
499   } // --- End of critical section
500 }
501
502
503 //! Get the list of tasks to load, to define a subset to execute in step by step mode
504 /*!
505  *  If the executor is not in mode YACS::WAITINGTASKS nor YACS::PAUSED, the list is empty.
506  *  Use Executor::waitPause to wait.
507  */
508 std::list<std::string> Executor::getTasksToLoad()
509 {
510   DEBTRACE("Executor::getTasksToLoad()");
511   list<string> listOfNodesToLoad;
512   listOfNodesToLoad.clear();
513   { // --- Critical section
514     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
515     _isRunningunderExternalControl=true;
516     switch (_executorState)
517       {
518       case YACS::WAITINGTASKS:
519       case YACS::PAUSED:
520         {
521           listOfNodesToLoad = _listOfTasksToLoad;
522           break;
523         }
524       case YACS::NOTYETINITIALIZED:
525       case YACS::INITIALISED:
526       case YACS::RUNNING:
527       case YACS::FINISHED:
528       case YACS::STOPPED:
529       default:
530         {
531           break;
532         }
533       }
534   } // --- End of critical section
535   return listOfNodesToLoad;
536 }
537
538
539 //! Define a subset of task to execute in step by step mode
540 /*!
541  * Behaviour is unpredictable if the list is not a subset of the list given by Executor::getTasksToLoad
542  * in the current step.
543  * If some nodes must run in parallel, they must stay together in the list.
544  */
545
546 bool Executor::setStepsToExecute(std::list<std::string> listToExecute)
547 {
548   DEBTRACE("Executor::setStepsToExecute(std::list<std::string> listToExecute)");
549   bool ret = true;
550   vector<Task *>::iterator iter;
551   vector<Task *> restrictedTasks;
552   { // --- Critical section
553     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
554     _isRunningunderExternalControl=true;
555     switch (_executorState)
556       {
557       case YACS::WAITINGTASKS:
558       case YACS::PAUSED:
559         {
560           for (iter=_tasksSave.begin(); iter!=_tasksSave.end(); iter++)
561             {
562               string readyNode = _mainSched->getTaskName(*iter);
563               if (find(listToExecute.begin(), listToExecute.end(), readyNode)
564                   != listToExecute.end())
565                 {
566                   restrictedTasks.push_back(*iter);
567                   DEBTRACE("node to execute " << readyNode);
568                 }
569             }
570           _tasks.clear();
571           for (iter=restrictedTasks.begin(); iter!=restrictedTasks.end(); iter++)
572             {
573               _tasks.push_back(*iter);
574             }
575           break;
576         }
577       case YACS::NOTYETINITIALIZED:
578       case YACS::INITIALISED:
579       case YACS::RUNNING:
580       case YACS::FINISHED:
581       case YACS::STOPPED:
582       default:
583         {
584           break;
585         }
586       }
587     } // --- End of critical section
588
589   _tasks.clear();
590   for (iter=restrictedTasks.begin(); iter!=restrictedTasks.end(); iter++)
591     {
592       _tasks.push_back(*iter);
593     }
594   for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
595     {
596       string readyNode = _mainSched->getTaskName(*iter);
597       DEBTRACE("selected node to execute " << readyNode);
598     }
599
600   return ret;
601 }
602
603 //! suspend pilot execution until Executor is in pause or waiting tasks completion mode.
604 /*!
605  *  Do nothing if execution is finished or in pause.
606  *  Wait first step if Executor is running or in initialization.
607  */
608
609 void Executor::waitPause()
610 {
611   DEBTRACE("Executor::waitPause()" << _executorState);
612   { // --- Critical section
613     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
614     _isRunningunderExternalControl=true;
615     switch (_executorState)
616       {
617       default:
618       case YACS::STOPPED:
619       case YACS::FINISHED:
620       case YACS::WAITINGTASKS:
621       case YACS::PAUSED:
622         {
623           break;
624         }
625       case YACS::NOTYETINITIALIZED:
626       case YACS::INITIALISED:
627       case YACS::RUNNING:
628         {
629           _condForPilot.wait(_mutexForSchedulerUpdate); // wait until executor is PAUSED or WAITINGTASKS
630           break;
631         }
632       }
633   } // --- End of critical section
634   DEBTRACE("---");
635 }
636
637 //! stops the execution as soon as possible 
638
639 void Executor::stopExecution()
640 {
641   setExecMode(YACS::STEPBYSTEP);
642   //waitPause();
643   _isOKToEnd = true;
644   resumeCurrentBreakPoint();
645 }
646
647 //! save the current state of execution in an xml file
648
649 bool Executor::saveState(const std::string& xmlFile)
650 {
651   DEBTRACE("Executor::saveState() in " << xmlFile);
652   bool result = false;
653   try {
654     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
655     YACS::ENGINE::VisitorSaveState vst(_root);
656     vst.openFileDump(xmlFile.c_str());
657     _root->accept(&vst);
658     vst.closeFileDump();
659     result = true;
660   }
661   catch(Exception& ex) {
662     std::cerr << ex.what() << std::endl;
663   }
664   return result;
665 }
666
667 //! not yet implemented
668
669 bool Executor::loadState()
670 {
671   DEBTRACE("Executor::loadState()");
672   _isRunningunderExternalControl=true;
673   return true;
674 }
675
676
677 static int isfile(const char *filename) 
678 {
679   struct stat buf;
680   if (stat(filename, &buf) != 0)
681     return 0;
682   if (!S_ISREG(buf.st_mode))
683     return 0;
684   return 1;
685 }
686
687 //! Display the graph state as a dot display, public method
688
689 void Executor::displayDot(Scheduler *graph)
690 {
691   _isRunningunderExternalControl=true;
692   _displayDot(graph);
693 }
694
695 //! Display the graph state as a dot display
696 /*!
697  *  \param graph  : the node to display
698  */
699
700 void Executor::_displayDot(Scheduler *graph)
701 {
702    std::ofstream g("titi");
703    ((ComposedNode*)graph)->writeDot(g);
704    g.close();
705    const char displayScript[]="display.sh";
706    if(isfile(displayScript))
707      system("sh display.sh");
708    else
709      system("dot -Tpng titi|display -delay 5");
710 }
711
712 //! Wait reactivation in modes Step By step or with BreakPoints
713 /*!
714  *  Check mode of execution (set by main thread):
715  *  - YACS::CONTINUE        : the graph execution continues.
716  *  - YACS::STEPBYSTEP      : wait on condition (pilot thread, Executor::resumeCurrentBreakPoint)
717  *  - YACS::STOPBEFORENODES : if there are ready nodes in a list of breakpoints,
718  *                            wait on condition (pilot thread, Executor::resumeCurrentBreakPoint)
719  *                            else continue the graph execution.
720  *  \return true if end of executor thread is requested
721  */
722
723 bool Executor::checkBreakPoints()
724 {
725   DEBTRACE("Executor::checkBreakPoints()");
726   vector<Task *>::iterator iter;
727   bool endRequested = false;
728
729   switch (_execMode)
730     {
731     case YACS::CONTINUE:
732       {
733         break;
734       }
735     case YACS::STOPBEFORENODES:
736       {
737         bool stop = false;
738         { // --- Critical section
739           YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
740           _tasksSave = _tasks;
741           for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
742             {
743               string nodeToLoad = _mainSched->getTaskName(*iter);
744               if (find(_listOfBreakPoints.begin(), _listOfBreakPoints.end(), nodeToLoad)
745                   != _listOfBreakPoints.end())
746                 {
747                   stop = true;
748                   break;
749                 }
750             }
751           if (stop)
752             {
753               _listOfTasksToLoad.clear();
754               for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
755                 {
756                   string nodeToLoad = _mainSched->getTaskName(*iter);
757                   _listOfTasksToLoad.push_back(nodeToLoad);
758                 }
759               if (getNbOfThreads())
760                 _executorState = YACS::WAITINGTASKS; // will be paused after completion of running tasks
761               else
762                 _executorState = YACS::PAUSED;
763               sendEvent("executor");
764               _condForPilot.notify_all();
765             }
766           if (stop && !_isOKToEnd) waitResume(); // wait until pilot calls resumeCurrentBreakPoint(), mutex released during wait 
767           if (_isOKToEnd) endRequested = true;
768         } // --- End of critical section
769           if (stop) DEBTRACE("wake up from waitResume");
770         break;
771       }
772     default:
773     case YACS::STEPBYSTEP:
774       {
775         { // --- Critical section
776           YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
777           _tasksSave = _tasks;
778           _listOfTasksToLoad.clear();
779           for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
780             {
781               string nodeToLoad = _mainSched->getTaskName(*iter);
782               _listOfTasksToLoad.push_back(nodeToLoad);
783             }
784           if (getNbOfThreads())
785             _executorState = YACS::WAITINGTASKS; // will be paused after completion of running tasks
786           else
787             _executorState = YACS::PAUSED;
788           sendEvent("executor");
789           _condForPilot.notify_all();
790           if (!_isOKToEnd)
791             waitResume(); // wait until pilot calls resumeCurrentBreakPoint(), mutex released during wait
792                           // or, if no pilot, wait until no more running tasks (stop on error)
793           if (_isOKToEnd) endRequested = true;
794         } // --- End of critical section
795         DEBTRACE("wake up from waitResume");
796         break;
797       }
798     }
799   DEBTRACE("endRequested: " << endRequested);
800   return endRequested;
801 }
802
803
804 //! in modes Step By step or with BreakPoint, wait until pilot resumes the execution
805 /*!
806  *  With the condition Mutex, the mutex is released atomically during the wait.
807  *  Pilot calls Executor::resumeCurrentBreakPoint to resume execution.
808  *  Must be called while mutex is locked.
809  */
810
811 void Executor::waitResume()
812 {
813   DEBTRACE("Executor::waitResume()");
814   _condForStepByStep.wait(_mutexForSchedulerUpdate); // wait until pilot calls resumeCurrentBreakPoint()
815   DEBTRACE("---");
816 }
817
818
819 //! Perform loading of a Task.
820 /*!
821  *  \param task  : Task to load
822  */
823
824 void Executor::loadTask(Task *task, const Executor *execInst)
825 {
826   DEBTRACE("Executor::loadTask(Task *task)");
827   if(task->getState() != YACS::TOLOAD)
828     return;
829   traceExec(task, "state:TOLOAD", ComputePlacement(task));
830   {//Critical section
831     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
832     _mainSched->notifyFrom(task,YACS::START,execInst);
833   }//End of critical section
834   try
835     {
836       traceExec(task, "load", ComputePlacement(task));
837       task->load();
838       traceExec(task, "initService", ComputePlacement(task));
839       task->initService();
840     }
841   catch(Exception& ex) 
842     {
843       std::cerr << ex.what() << std::endl;
844       {//Critical section
845         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
846         task->aborted();
847         _mainSched->notifyFrom(task,YACS::ABORT,execInst);
848         traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
849       }//End of critical section
850     }
851   catch(...) 
852     {
853       std::cerr << "Load failed" << std::endl;
854       {//Critical section
855         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
856         task->aborted();
857         _mainSched->notifyFrom(task,YACS::ABORT,execInst);
858         traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
859       }//End of critical section
860     }
861 }
862
863 struct threadargs
864 {
865   Task *task;
866   Scheduler *sched;
867   Executor *execInst;
868 };
869
870 void Executor::loadTasks(const std::vector<Task *>& tasks, const Executor *execInst)
871 {
872   for(std::vector<Task *>::const_iterator iter = _tasks.begin(); iter != _tasks.end(); iter++)
873     loadTask(*iter,execInst);
874 }
875
876 void Executor::loadParallelTasks(const std::vector<Task *>& tasks, const Executor *execInst)
877 {
878   std::vector<Thread> ths(tasks.size());
879   std::size_t ithread(0);
880   for(std::vector<Task *>::const_iterator iter = _tasks.begin(); iter != _tasks.end(); iter++, ithread++)
881     {
882       DEBTRACE("Executor::loadParallelTasks(Task *task)");
883       struct threadargs *args(new threadargs);
884       args->task = (*iter);
885       args->sched = _mainSched;
886       args->execInst = this;
887       ths[ithread].go(functionForTaskLoad, args, _threadStackSize);
888     }
889   for(ithread=0;ithread<tasks.size();ithread++)
890     ths[ithread].join();
891 }
892
893 //! Execute a list of tasks possibly connected through datastream links
894 /*!
895  *  \param tasks  : a list of tasks to execute
896  *
897  */
898 void Executor::launchTasks(const std::vector<Task *>& tasks)
899 {
900   //First phase, make datastream connections
901   for(vector<Task *>::const_iterator iter=tasks.begin();iter!=tasks.end();iter++)
902     {
903       YACS::StatesForNode state=(*iter)->getState();
904       if(state != YACS::TOLOAD && state != YACS::TORECONNECT)continue;
905       try
906         {
907           (*iter)->connectService();
908           traceExec(*iter, "connectService",ComputePlacement(*iter));
909           {//Critical section
910             YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
911             (*iter)->connected();
912           }//End of critical section
913         }
914       catch(Exception& ex) 
915         {
916           std::cerr << ex.what() << std::endl;
917           try
918             {
919               (*iter)->disconnectService();
920               traceExec(*iter, "disconnectService",ComputePlacement(*iter));
921             }
922           catch(...) 
923             {
924               // Disconnect has failed
925               traceExec(*iter, "disconnectService failed, ABORT",ComputePlacement(*iter));
926             }
927           {//Critical section
928             YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
929             (*iter)->aborted();
930             _mainSched->notifyFrom(*iter,YACS::ABORT,this);
931           }//End of critical section
932         }
933       catch(...) 
934         {
935           std::cerr << "Problem in connectService" << std::endl;
936           try
937             {
938               (*iter)->disconnectService();
939               traceExec(*iter, "disconnectService",ComputePlacement(*iter));
940             }
941           catch(...) 
942             {
943               // Disconnect has failed
944               traceExec(*iter, "disconnectService failed, ABORT",ComputePlacement(*iter));
945             }
946           {//Critical section
947             YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
948             (*iter)->aborted();
949             _mainSched->notifyFrom(*iter,YACS::ABORT,this);
950           }//End of critical section
951         }
952       if((*iter)->getState() == YACS::ERROR)
953         {
954           //try to put all coupled tasks in error
955           std::set<Task*> coupledSet;
956           (*iter)->getCoupledTasks(coupledSet);
957           for (std::set<Task*>::iterator it = coupledSet.begin(); it != coupledSet.end(); ++it)
958             {
959               Task* t=*it;
960               if(t == *iter)continue;
961               if(t->getState() == YACS::ERROR)continue;
962               try
963                 {
964                   t->disconnectService();
965                   traceExec(t, "disconnectService",ComputePlacement(*iter));
966                 }
967               catch(...)
968                 {
969                   // Disconnect has failed
970                   traceExec(t, "disconnectService failed, ABORT",ComputePlacement(*iter));
971                 }
972               {//Critical section
973                 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
974                 t->aborted();
975                 _mainSched->notifyFrom(t,YACS::ABORT,this);
976               }//End of critical section
977               traceExec(t, "state:"+Node::getStateName(t->getState()),ComputePlacement(*iter));
978             }
979         }
980       traceExec(*iter, "state:"+Node::getStateName((*iter)->getState()),ComputePlacement(*iter));
981     }
982
983   //Second phase, execute each task in a thread
984   for(vector<Task *>::const_iterator iter=tasks.begin();iter!=tasks.end();iter++)
985     {
986       launchTask(*iter);
987     }
988 }
989
990 //! Execute a Task in a thread
991 /*!
992  *  \param task  : Task to execute
993  *
994  *  Calls Scheduler::notifyFrom of main node (_mainSched) to notify start
995  *
996  *  Calls Executor::functionForTaskExecution in Thread
997  */
998
999 void Executor::launchTask(Task *task)
1000 {
1001   DEBTRACE("Executor::launchTask(Task *task)");
1002   struct threadargs *args;
1003   if(task->getState() != YACS::TOACTIVATE)return;
1004
1005   DEBTRACE("before _semForMaxThreads.wait " << _semThreadCnt);
1006   if(_semThreadCnt == 0)
1007     {
1008       // --- Critical section
1009       YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1010       //check if we have enough threads to run
1011       std::set<Task*> tmpSet=_runningTasks;
1012       std::set<Task*>::iterator it = tmpSet.begin();
1013       std::string status="running";
1014       std::set<Task*> coupledSet;
1015       while( it != tmpSet.end() )
1016         {
1017           Task* tt=*it;
1018           coupledSet.clear();
1019           tt->getCoupledTasks(coupledSet);
1020           status="running";
1021           for (std::set<Task*>::iterator iter = coupledSet.begin(); iter != coupledSet.end(); ++iter)
1022             {
1023               if((*iter)->getState() == YACS::TOACTIVATE)status="toactivate";
1024               tmpSet.erase(*iter);
1025             }
1026           if(status=="running")break;
1027           it = tmpSet.begin();
1028         }
1029
1030       if(status=="toactivate")
1031         {
1032           std::cerr << "WARNING: maybe you need more threads to run your schema (current value="<< _maxThreads << ")" << std::endl;
1033           std::cerr << "If it is the case, set the YACS_MAX_THREADS environment variable to a bigger value (export YACS_MAX_THREADS=xxx)" << std::endl;
1034         }
1035       // --- End of critical section
1036     }
1037
1038   _semForMaxThreads.wait();
1039   _semThreadCnt -= 1;
1040
1041   args= new threadargs;
1042   args->task = task;
1043   args->sched = _mainSched;
1044   args->execInst = this;
1045
1046   traceExec(task, "launch",ComputePlacement(task));
1047
1048   { // --- Critical section
1049     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1050     _numberOfRunningTasks++;
1051     _runningTasks.insert(task);
1052     task->begin(); //change state to ACTIVATED
1053   } // --- End of critical section
1054   Thread(functionForTaskExecution, args, _threadStackSize);
1055 }
1056
1057 //! wait until a running task ends
1058
1059 void Executor::sleepWhileNoEventsFromAnyRunningTask()
1060 {
1061   DEBTRACE("Executor::sleepWhileNoEventsFromAnyRunningTask()");
1062 //   _semForNewTasksToPerform.wait(); //----utiliser pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
1063   YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1064   if (_numberOfRunningTasks > 0 && _numberOfEndedTasks==0)
1065     {
1066       _isWaitingEventsFromRunningTasks = true;
1067       _condForNewTasksToPerform.wait(_mutexForSchedulerUpdate); // mutex released during wait
1068     }
1069   _numberOfEndedTasks=0;
1070   DEBTRACE("---");
1071 }
1072
1073 //! not implemented
1074
1075 void Executor::notifyEndOfThread(YACS::BASES::Thread *thread)
1076 {
1077   /*_mutexForNbOfConcurrentThreads.lock();
1078   _groupOfAllThreadsCreated.remove(thread);
1079   delete thread;
1080   _mutexForNbOfConcurrentThreads.unlock();*/
1081 }
1082
1083
1084 //! must be used protected by _mutexForSchedulerUpdate!
1085
1086 void Executor::wakeUp()
1087 {
1088   DEBTRACE("Executor::wakeUp() " << _isWaitingEventsFromRunningTasks);
1089   if (_isWaitingEventsFromRunningTasks)
1090     {
1091       _isWaitingEventsFromRunningTasks = false;
1092       _condForNewTasksToPerform.notify_all();
1093     }
1094   else
1095     _numberOfEndedTasks++;
1096 }
1097
1098 //! number of running tasks
1099
1100 int Executor::getNbOfThreads()
1101 {
1102   int ret;
1103   YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForNbOfConcurrentThreads);
1104   _isRunningunderExternalControl=true;
1105   ret = _groupOfAllThreadsCreated.size();
1106   return ret;
1107 }
1108
1109 /*!
1110  * This thread is NOT supposed to be detached !
1111  */
1112 void *Executor::functionForTaskLoad(void *arg)
1113 {
1114   DEBTRACE("Executor::functionForTaskLoad(void *arg)");
1115   struct threadargs *args = (struct threadargs *) arg;
1116   Task *task=args->task;
1117   Scheduler *sched=args->sched;
1118   Executor *execInst=args->execInst;
1119   delete args;
1120   execInst->loadTask(task,execInst);// no throw of this method - all throw are catched !
1121   return 0;
1122 }
1123
1124 //! Function to perform execution of a task in a thread
1125 /*!
1126  *  \param arg  : 3 elements (a Task, a Scheduler, an Executor)
1127  *
1128  *  Calls Task::execute
1129  *
1130  *  Calls Task::finished when the task is finished
1131  *
1132  *  Calls (notify with event YACS::FINISH) Scheduler::notifyFrom when the task is finished
1133  *
1134  *  Calls Executor::wakeUp and Executor::notifyEndOfThread
1135  */
1136
1137 void *Executor::functionForTaskExecution(void *arg)
1138 {
1139   DEBTRACE("Executor::functionForTaskExecution(void *arg)");
1140
1141   struct threadargs *args = (struct threadargs *) arg;
1142   Task *task=args->task;
1143   Scheduler *sched=args->sched;
1144   Executor *execInst=args->execInst;
1145   delete args;
1146   execInst->traceExec(task, "state:"+Node::getStateName(task->getState()),ComputePlacement(task));
1147
1148   Thread::detach();
1149
1150   // Execute task
1151
1152   if(execInst->getDPLScopeSensitive())
1153     {
1154       Node *node(dynamic_cast<Node *>(task));
1155       ComposedNode *gfn(dynamic_cast<ComposedNode *>(sched));
1156       if(node!=0 && gfn!=0)
1157         node->applyDPLScope(gfn);
1158     }
1159
1160   YACS::Event ev=YACS::FINISH;
1161   try
1162     {
1163       execInst->traceExec(task, "start execution",ComputePlacement(task));
1164       task->execute();
1165       execInst->traceExec(task, "end execution OK",ComputePlacement(task));
1166     }
1167   catch(Exception& ex)
1168     {
1169       std::cerr << "YACS Exception during execute" << std::endl;
1170       std::cerr << ex.what() << std::endl;
1171       ev=YACS::ABORT;
1172       string message = "end execution ABORT, ";
1173       message += ex.what();
1174       execInst->traceExec(task, message,ComputePlacement(task));
1175     }
1176   catch(...) 
1177     {
1178       // Execution has failed
1179       std::cerr << "Execution has failed: unknown reason" << std::endl;
1180       ev=YACS::ABORT;
1181       execInst->traceExec(task, "end execution ABORT, unknown reason",ComputePlacement(task));
1182     }
1183
1184   // Disconnect task
1185   try
1186     {
1187       DEBTRACE("task->disconnectService()");
1188       task->disconnectService();
1189       execInst->traceExec(task, "disconnectService",ComputePlacement(task));
1190     }
1191   catch(...) 
1192     {
1193       // Disconnect has failed
1194       std::cerr << "disconnect has failed" << std::endl;
1195       ev=YACS::ABORT;
1196       execInst->traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
1197     }
1198   //
1199
1200   std::string placement(ComputePlacement(task));
1201
1202   // container management for HomogeneousPoolOfContainer
1203
1204   HomogeneousPoolContainer *contC(dynamic_cast<HomogeneousPoolContainer *>(task->getContainer()));
1205   if(contC)
1206     {
1207       YACS::BASES::AutoLocker<Container> alckCont(contC);
1208       contC->release(task);
1209     }
1210
1211   DEBTRACE("End task->execute()");
1212   { // --- Critical section
1213     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&execInst->_mutexForSchedulerUpdate);
1214     try
1215       {
1216         if (ev == YACS::FINISH) task->finished();
1217         if (ev == YACS::ABORT)
1218           {
1219             execInst->_errorDetected = true;
1220             if (execInst->_stopOnErrorRequested)
1221               {
1222                 execInst->_execMode = YACS::STEPBYSTEP;
1223                 execInst->_isOKToEnd = true;
1224               }
1225             task->aborted();
1226           }
1227         execInst->traceExec(task, "state:"+Node::getStateName(task->getState()),placement);
1228         sched->notifyFrom(task,ev,execInst);
1229       }
1230     catch(Exception& ex)
1231       {
1232         //notify has failed : it is supposed to have set state
1233         //so no need to do anything
1234         std::cerr << "Error during notification" << std::endl;
1235         std::cerr << ex.what() << std::endl;
1236       }
1237     catch(...)
1238       {
1239         //notify has failed : it is supposed to have set state
1240         //so no need to do anything
1241         std::cerr << "Notification failed" << std::endl;
1242       }
1243     execInst->_numberOfRunningTasks--;
1244     execInst->_runningTasks.erase(task);
1245     DEBTRACE("_numberOfRunningTasks: " << execInst->_numberOfRunningTasks 
1246              << " _execMode: " << execInst->_execMode
1247              << " _executorState: " << execInst->_executorState);
1248     if ((execInst->_numberOfRunningTasks == 0) && (execInst->_execMode != YACS::CONTINUE)) // no more running tasks
1249       {
1250         if (execInst->_executorState == YACS::WAITINGTASKS)
1251           {
1252             execInst->_executorState = YACS::PAUSED;
1253             execInst->sendEvent("executor");
1254             execInst->_condForPilot.notify_all();
1255             if (execInst->_errorDetected &&
1256                 execInst->_stopOnErrorRequested &&
1257                 !execInst->_isRunningunderExternalControl)
1258               execInst->_condForStepByStep.notify_all(); // exec thread may be on waitResume
1259           }
1260       }
1261     DEBTRACE("before _semForMaxThreads.post " << execInst->_semThreadCnt);
1262     execInst->_semForMaxThreads.post();
1263     execInst->_semThreadCnt += 1;
1264     DEBTRACE("after _semForMaxThreads.post " << execInst->_semThreadCnt);
1265     if (execInst->_executorState != YACS::PAUSED)  execInst->wakeUp();
1266
1267   } // --- End of critical section (change state)
1268
1269   //execInst->notifyEndOfThread(0);
1270   Thread::exit(0);
1271   return 0;
1272 }
1273
1274 void Executor::traceExec(Task *task, const std::string& message, const std::string& placement)
1275 {
1276   string nodeName = _mainSched->getTaskName(task);
1277   Container *cont = task->getContainer();
1278   string containerName = "---";
1279   if (cont)
1280     containerName = cont->getName();
1281
1282 #ifdef WIN32
1283   DWORD now = timeGetTime();
1284   double elapse = (now - _start)/1000.0;
1285 #else
1286   timeval now;
1287   gettimeofday(&now, NULL);
1288   double elapse = (now.tv_sec - _start.tv_sec) + double(now.tv_usec - _start.tv_usec)/1000000.0;
1289 #endif
1290   {
1291     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForTrace);
1292     _trace << elapse << " " << containerName << " " << placement << " " << nodeName << " " << message << endl;
1293     _trace << flush;
1294   }
1295 }
1296
1297 //! emit notification to all observers registered with  the dispatcher 
1298 /*!
1299  * The dispatcher is unique and can be obtained by getDispatcher()
1300  */
1301 void Executor::sendEvent(const std::string& event)
1302 {
1303   Dispatcher* disp=Dispatcher::getDispatcher();
1304   YASSERT(disp);
1305   YASSERT(_root);
1306   disp->dispatch(_root,event);
1307 }
1308
1309 /*!
1310  * This method takes in input a list of tasks and selects from that lists a part of it considering only the containers.
1311  * If tasks have no container instance subclass of HomogeneousPoolContainer this method will let the \a tsks untouched.
1312  *
1313  * \param [in,out] tsks - list of tasks to be
1314  */
1315 void Executor::FilterTasksConsideringContainers(std::vector<Task *>& tsks)
1316 {
1317   std::map<HomogeneousPoolContainer *, std::vector<Task *> > m;
1318   for(std::vector<Task *>::const_iterator it=tsks.begin();it!=tsks.end();it++)
1319     {
1320       Task *cur(*it);
1321       if(!cur)
1322         continue;
1323       Container *cont(cur->getContainer());
1324       if(!cont)
1325         {
1326           m[(HomogeneousPoolContainer *)NULL].push_back(cur);
1327           continue;
1328         }
1329       HomogeneousPoolContainer *contC(dynamic_cast<HomogeneousPoolContainer *>(cont));
1330       if(!contC)
1331         {
1332           m[(HomogeneousPoolContainer *)NULL].push_back(cur);
1333           continue;
1334         }
1335       m[contC].push_back(cur);
1336     }
1337   //
1338   std::vector<Task *> ret;
1339   for(std::map<HomogeneousPoolContainer *, std::vector<Task *> >::const_iterator it=m.begin();it!=m.end();it++)
1340     {
1341       HomogeneousPoolContainer *curhpc((*it).first);
1342       const std::vector<Task *>& curtsks((*it).second);
1343       if(!curhpc)
1344         {
1345           ret.insert(ret.end(),curtsks.begin(),curtsks.end());
1346         }
1347       else
1348         {
1349           // start of critical section for container curhpc
1350           YACS::BASES::AutoLocker<Container> alckForCont(curhpc);
1351           std::vector<const Task *> vecOfTaskSharingSameHPContToBeRunSimutaneously;
1352           std::size_t sz(curhpc->getNumberOfFreePlace());
1353           std::vector<Task *>::const_iterator it2(curtsks.begin());
1354           for(std::size_t i=0;i<sz && it2!=curtsks.end();i++,it2++)
1355             {
1356               vecOfTaskSharingSameHPContToBeRunSimutaneously.push_back(*it2);
1357               ret.push_back(*it2);
1358             }
1359           curhpc->allocateFor(vecOfTaskSharingSameHPContToBeRunSimutaneously);
1360           //end of critical section
1361         }
1362     }
1363   //
1364   tsks=ret;
1365 }
1366
1367 std::string Executor::ComputePlacement(Task *zeTask)
1368 {
1369   std::string placement("---");
1370   if(!zeTask)
1371     return placement;
1372   if(zeTask->getContainer())
1373     placement=zeTask->getContainer()->getFullPlacementId(zeTask);
1374   return placement;
1375 }