Salome HOME
1130df735062aadeef730193f6a9b1033b36314d
[modules/yacs.git] / src / engine / Executor.cxx
1 // Copyright (C) 2006-2015  CEA/DEN, EDF R&D
2 //
3 // This library is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU Lesser General Public
5 // License as published by the Free Software Foundation; either
6 // version 2.1 of the License, or (at your option) any later version.
7 //
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
11 // Lesser General Public License for more details.
12 //
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
16 //
17 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
18 //
19
20 #include "Executor.hxx"
21 #include "Task.hxx"
22 #include "AutoLocker.hxx"
23 #include "Scheduler.hxx"
24 #include "Dispatcher.hxx"
25 #include "Container.hxx"
26 #include "HomogeneousPoolContainer.hxx"
27 #include "ComponentInstance.hxx"
28
29 #include "VisitorSaveState.hxx"
30 #include "ServiceNode.hxx"
31 #include "ComposedNode.hxx"
32
33 #include <iostream>
34 #include <fstream>
35 #include <sys/stat.h>
36 #ifndef WIN32
37 #include <sys/time.h>
38 #include <unistd.h>
39 #endif
40
41 #include <cstdlib>
42 #include <algorithm>
43
44 #ifdef WIN32
45 #define usleep(A) _sleep(A/1000)
46 #if !defined(S_ISCHR) || !defined(S_ISREG)
47 #  ifndef S_IFMT
48 #    ifdef _S_IFMT
49 #      define S_IFMT _S_IFMT
50 #      define S_IFCHR _S_IFCHR
51 #      define S_IFREG _S_IFREG
52 #    else
53 #    ifdef __S_IFMT
54 #      define S_IFMT __S_IFMT
55 #      define S_IFCHR __S_IFCHR
56 #      define S_IFREG __S_IFREG
57 #    endif
58 #    endif
59 #  endif
60 #  define S_ISCHR(mode) (((mode) & S_IFMT) == S_IFCHR)
61 #  define S_ISREG(mode) (((mode) & S_IFMT) == S_IFREG)
62 #endif
63 #endif
64
65 using namespace YACS::ENGINE;
66 using namespace std;
67
68 using YACS::BASES::Mutex;
69 using YACS::BASES::Thread;
70 using YACS::BASES::Semaphore;
71
72 //#define _DEVDEBUG_
73 #include "YacsTrace.hxx"
74
75 int Executor::_maxThreads(1000);
76 size_t Executor::_threadStackSize(262144); // Default thread stack size is 256 kB == 2**18 because threads launched by YACS are lightweight
77
78 Executor::Executor():_nbOfConcurrentThreads(0), _semForMaxThreads(_maxThreads),_keepGoingOnFail(false),_DPLScopeSensitive(false)
79 {
80   _root=0;
81   _toContinue = true;
82   _isOKToEnd = false;
83   _stopOnErrorRequested = false;
84   _dumpOnErrorRequested = false;
85   _errorDetected = false;
86   _isRunningunderExternalControl=false;
87   _executorState = YACS::NOTYETINITIALIZED;
88   _execMode = YACS::CONTINUE;
89   _semThreadCnt = _maxThreads;
90   DEBTRACE("Executor initialized with max threads = " << _maxThreads);
91 }
92
93 Executor::~Executor()
94 {
95   for(list<Thread *>::iterator iter=_groupOfAllThreadsCreated.begin();iter!=_groupOfAllThreadsCreated.end();iter++)
96     delete *iter;
97 }
98
99 //! Execute a graph waiting for completion
100 /*!
101  *  \param graph : schema to execute
102  *  \param debug : display the graph with dot if debug == 1
103  *  \param fromScratch : if true the graph is reinitialized
104  *
105  *  Calls Scheduler::getNextTasks and Scheduler::selectRunnableTasks to select tasks to execute
106  *  
107  *  Calls Executor::launchTask to execute a selected Task.
108  *
109  *  Completion when graph is finished (Scheduler::isFinished)
110  */
111
112 void Executor::RunA(Scheduler *graph,int debug, bool fromScratch)
113 {
114   DEBTRACE("Executor::RunW debug: " << debug << " fromScratch: " << fromScratch);
115   _mainSched=graph;
116   _root = dynamic_cast<ComposedNode *>(_mainSched);
117   if (!_root) throw Exception("Executor::Run, Internal Error!");
118   bool isMore;
119   int i=0;
120   if(debug>1)_displayDot(graph);
121   if (fromScratch)
122     {
123       graph->init();
124       graph->exUpdateState();
125     }
126   if(debug>1)_displayDot(graph);
127   vector<Task *> tasks;
128   vector<Task *>::iterator iter;
129   _toContinue=true;
130   _execMode = YACS::CONTINUE;
131   _isWaitingEventsFromRunningTasks = false;
132   _numberOfRunningTasks = 0;
133   _runningTasks.clear();
134   _numberOfEndedTasks=0;
135   while(_toContinue)
136     {
137       sleepWhileNoEventsFromAnyRunningTask();
138
139       if(debug>2)_displayDot(graph);
140
141       {//Critical section
142         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
143         tasks=graph->getNextTasks(isMore);
144         graph->selectRunnableTasks(tasks);
145       }//End of critical section
146
147       if(debug>2)_displayDot(graph);
148
149       for(iter=tasks.begin();iter!=tasks.end();iter++)
150         loadTask(*iter,this);
151
152       if(debug>1)_displayDot(graph);
153
154       launchTasks(tasks);
155
156       if(debug>1)_displayDot(graph);
157
158       {//Critical section
159         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
160         _toContinue=!graph->isFinished();
161       }//End of critical section
162       DEBTRACE("_toContinue: " << _toContinue);
163
164       if(debug>0)_displayDot(graph);
165
166       i++;
167     }
168 }
169
170 //! Execute a graph with breakpoints or step by step
171 /*!
172  *  To be launch in a thread (main thread controls the progression).
173  *  \param graph : schema to execute
174  *  \param debug : display the graph with dot if debug >0
175  *  \param fromScratch : if false, state from a previous partial exection is already loaded
176  *
177  *  Calls Scheduler::getNextTasks and Scheduler::selectRunnableTasks to select tasks to execute
178  *  
179  *  Calls Executor::checkBreakPoints to verify if a pause is requested 
180  * 
181  *  Calls Executor::launchTask to execute a selected Task
182  *
183  *  Completion when graph is finished (Scheduler::isFinished)
184  *
185  *  States of execution:
186  *  - YACS::NOTYETINITIALIZED
187  *  - YACS::INITIALISED
188  *  - YACS::RUNNING            (to next breakpoint or step)
189  *  - YACS::WAITINGTASKS       (a breakpoint or step as been reached, but there are still running tasks)
190  *  - YACS::PAUSED             (a breakpoint or step as been reached, no more running tasks)
191  *  - YACS::FINISHED           (no more ready tasks, nore running tasks)
192  *  - YACS::STOPPED            (stopped by user before end)
193  *
194  *  Modes of Execution:
195  *  - YACS::CONTINUE           (normal run without breakpoints)
196  *  - YACS::STEPBYSTEP         (pause at each loop)
197  *  - YACS::STOPBEFORENODES    (pause when a node is reached)
198  *
199  *  A breakpoint is defined by a node name. The breakpoint is reached when the node becomes ready.
200  *  Step by Step means execution node by node or group of node by group of nodes.
201  *  At a given step, the user decides to launch all the ready nodes or only a subset
202  *  (Caution: some nodes must run in parallel). 
203  *  The next event (end of task) may give a new set of ready nodes, and define a new step.
204  *
205  *  The graph execution may be controled by a pilot which sends requests. Requests are asynchronous.
206  *  Requests are taken into account only on certain states, otherwise return the status IgnoredRequest.
207  *  - Executor::getCurrentExecMode
208  *  - Executor::getExecutorState
209  *  - Executor::setExecMode             : change the execution mode for next loop
210  *  - Executor::setListOfBreakPoints    : must be set before setting YACS::STOPBEFORENODES
211  *  - Executor::getTasksToLoad          : when paused or waiting tasks, get the list of next tasks
212  *  - Executor::setStepsToExecute       : define a subset of the list given by Executor::getTasksToLoad
213  *  - Executor::resumeCurrentBreakPoint : when paused or waiting tasks, resumes execution
214  *  - Executor::isNotFinished
215  *  - Executor::stopExecution           : stop execution asap, i.e. set STEPBYSTEP and wait PAUSED
216  *  - Executor::saveState               : dump the current state of execution in an xml file
217  *  - Executor::loadState               : Not yet implemented
218  *  - Executor::getNbOfThreads
219  *  - Executor::displayDot
220  *  - Executor::setStopOnError          : ask to stop execution if a node is found in ERROR state
221  *
222  *  If the pilot wants to wait the state YACS::PAUSED or YACS::WAITINGTASKS, synchronisation is obtained with:
223  *  - Executor::waitPause
224  *
225  *  TO BE VALIDATED:
226  *  - Pilot may connect to executor during execution, or deconnect.
227  *  - Several Pilots may be connected at the same time (for observation...)
228  * 
229  */
230
231 void Executor::RunB(Scheduler *graph,int debug, bool fromScratch)
232 {
233   DEBTRACE("Executor::RunB debug: "<< graph->getName() <<" "<< debug<<" fromScratch: "<<fromScratch);
234
235   { // --- Critical section
236     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
237     _mainSched = graph;
238     _root = dynamic_cast<ComposedNode *>(_mainSched);
239     if (!_root) throw Exception("Executor::Run, Internal Error!");
240     _executorState = YACS::NOTYETINITIALIZED;
241     sendEvent("executor");
242     _toContinue=true;
243     _isOKToEnd = false;
244     _errorDetected = false;
245     _isWaitingEventsFromRunningTasks = false;
246     _numberOfRunningTasks = 0;
247     _runningTasks.clear();
248     _numberOfEndedTasks = 0;
249     string tracefile = "traceExec_";
250     tracefile += _mainSched->getName();
251     _trace.open(tracefile.c_str());
252 #ifdef WIN32
253    _start = timeGetTime();
254 #else
255     gettimeofday(&_start, NULL);
256 #endif
257
258   } // --- End of critical section
259
260   if (debug > 1) _displayDot(graph);
261
262   if (fromScratch)
263     {
264       try
265         {
266           graph->init();
267           graph->exUpdateState();
268         }
269       catch(Exception& ex)
270         {
271           DEBTRACE("exception: "<< (ex.what()));
272           _executorState = YACS::FINISHED;
273           sendEvent("executor");
274           throw;
275         }
276     }
277   _executorState = YACS::INITIALISED;
278   sendEvent("executor");
279
280   if (debug > 1) _displayDot(graph);
281
282   vector<Task *>::iterator iter;
283   bool isMore;
284   int problemCount=0;
285   int numberAllTasks;
286
287   _executorState = YACS::RUNNING;
288   sendEvent("executor");
289   while (_toContinue)
290     {
291       DEBTRACE("--- executor main loop");
292       sleepWhileNoEventsFromAnyRunningTask();
293       DEBTRACE("--- events...");
294       if (debug > 2) _displayDot(graph);
295       { // --- Critical section
296         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
297         _tasks=graph->getNextTasks(isMore);
298         graph->selectRunnableTasks(_tasks);
299         FilterTasksConsideringContainers(_tasks);
300         numberAllTasks=_numberOfRunningTasks+_tasks.size();
301       } // --- End of critical section
302       if (debug > 2) _displayDot(graph);
303       if (_executorState == YACS::RUNNING)
304         {
305           if (checkBreakPoints()) break; // end of thread requested, OK to exit at once;
306           if (debug > 0) _displayDot(graph);
307           DEBTRACE("---");
308           //loadTasks(_tasks);
309           loadParallelTasks(_tasks,this);
310           if (debug > 1) _displayDot(graph);
311           DEBTRACE("---");
312           launchTasks(_tasks);
313           DEBTRACE("---");
314         }
315       if (debug > 1) _displayDot(graph);
316       { // --- Critical section
317         DEBTRACE("---");
318         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
319         //It is possible that the graph is finished but it remains running tasks (it's an error but we must take it into account)
320         if(_numberOfRunningTasks == 0)
321           _toContinue = !graph->isFinished();
322
323         DEBTRACE("_numberOfRunningTasks: " << _numberOfRunningTasks);
324         DEBTRACE("_numberOfEndedTasks: " << _numberOfEndedTasks);
325         DEBTRACE("_toContinue: " << _toContinue);
326         if(_toContinue && numberAllTasks==0)
327           {
328             //Problem : no running tasks and no task to launch ??
329             problemCount++;
330             std::cerr << "Problem in Executor : no running tasks and no task to launch ?? problemCount=" << problemCount << std::endl;
331             //Pause to give a chance to interrupt
332             usleep(1000);
333             if(problemCount > 25)
334               {
335                 // Too much problems encountered : stop execution
336                 _toContinue=false;
337               }
338           }
339
340         if (! _toContinue)
341           {
342             _executorState = YACS::FINISHED;
343             sendEvent("executor");
344             _condForPilot.notify_all();
345           }
346       } // --- End of critical section
347       if (debug > 0) _displayDot(graph);
348       DEBTRACE("_toContinue: " << _toContinue);
349     }
350
351   DEBTRACE("End of main Loop");
352
353   { // --- Critical section
354     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
355     if ( _toContinue) // --- break while(): request to stop detected on checkBreakPoints()
356       {
357         DEBTRACE("stop requested: End soon");
358         _executorState = YACS::STOPPED;
359         _toContinue = false;
360         sendEvent("executor");
361       }
362   } // --- End of critical section
363   if ( _dumpOnErrorRequested && _errorDetected)
364     {
365       saveState(_dumpErrorFile);
366     }
367   {
368     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForTrace);
369     _trace.close();
370   }
371   DEBTRACE("End of RunB thread");  
372 }
373
374 YACS::ExecutionMode Executor::getCurrentExecMode()
375 {
376   _isRunningunderExternalControl=true;
377   return _execMode;
378 }
379
380
381 YACS::ExecutorState Executor::getExecutorState()
382 {
383   _isRunningunderExternalControl=true;
384   return _executorState;
385 }
386
387
388 bool Executor::isNotFinished()
389 {
390   _isRunningunderExternalControl=true;
391   return _toContinue;
392 }
393
394 //! ask to stop execution on the first node found in error
395 /*!
396  * \param dumpRequested   produce a state dump when an error is found
397  * \param xmlFile         name of file used for state dump
398  */
399
400 void Executor::setStopOnError(bool dumpRequested, std::string xmlFile)
401 {
402   { // --- Critical section
403     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
404     _dumpErrorFile=xmlFile;
405     _stopOnErrorRequested=true;
406     _dumpOnErrorRequested = dumpRequested;
407     if (dumpRequested && xmlFile.empty())
408       throw YACS::Exception("dump on error requested and no filename given for dump");
409     DEBTRACE("_dumpErrorFile " << _dumpErrorFile << " " << _dumpOnErrorRequested);
410   } // --- End of critical section
411 }
412
413 //! ask to do not stop execution on nodes found in error
414 /*!
415  */
416
417 void Executor::unsetStopOnError()
418 {
419   { // --- Critical section
420     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
421     _stopOnErrorRequested=false;
422   } // --- End of critical section
423 }
424
425 //! Dynamically set the current mode of execution
426 /*!
427  * The mode can be Continue, step by step, or stop before execution of a node
428  * defined in a list of breakpoints.
429  */
430
431 void Executor::setExecMode(YACS::ExecutionMode mode)
432 {
433   DEBTRACE("Executor::setExecMode(YACS::ExecutionMode mode) " << mode);
434   { // --- Critical section
435     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
436     _isRunningunderExternalControl=true;
437     _execMode = mode;
438   } // --- End of critical section
439 }
440
441 //! wake up executor when in pause
442 /*!
443  * When Executor is in state paused or waiting for task completion, the thread
444  * running loop RunB waits on condition _condForStepByStep.
445  * Thread RunB is waken up.
446  * \return true when actually wakes up executor
447  */
448
449 bool Executor::resumeCurrentBreakPoint()
450 {
451   DEBTRACE("Executor::resumeCurrentBreakPoint()");
452   bool ret = false;
453   //bool doDump = false;
454   { // --- Critical section
455     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
456     _isRunningunderExternalControl=true;
457     DEBTRACE("_executorState: " << _executorState);
458     switch (_executorState)
459       {
460       case YACS::WAITINGTASKS:
461       case YACS::PAUSED:
462         {
463           _condForStepByStep.notify_all();
464           _executorState = YACS::RUNNING;
465           sendEvent("executor");
466           ret = true;
467           //if (_dumpOnErrorRequested && _errorDetected) doDump =true;
468           break;
469         }
470       case YACS::FINISHED:
471       case YACS::STOPPED:
472         {
473           //if (_dumpOnErrorRequested && _errorDetected) doDump =true;
474           DEBTRACE("Graph Execution finished or stopped !");
475           break;
476         }
477       default :
478         {
479           // debug: no easy way to verify if main loop is acutally waiting on condition
480         }
481       }
482     DEBTRACE("---");
483     //if (doDump) saveState(_dumpErrorFile);
484   } // --- End of critical section
485   return ret;
486 }
487
488
489 //! define a list of nodes names as breakpoints in the graph
490
491
492 void Executor::setListOfBreakPoints(std::list<std::string> listOfBreakPoints)
493 {
494   DEBTRACE("Executor::setListOfBreakPoints(std::list<std::string> listOfBreakPoints)");
495   { // --- Critical section
496     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
497     _isRunningunderExternalControl=true;
498     _listOfBreakPoints = listOfBreakPoints;
499   } // --- End of critical section
500 }
501
502
503 //! Get the list of tasks to load, to define a subset to execute in step by step mode
504 /*!
505  *  If the executor is not in mode YACS::WAITINGTASKS nor YACS::PAUSED, the list is empty.
506  *  Use Executor::waitPause to wait.
507  */
508 std::list<std::string> Executor::getTasksToLoad()
509 {
510   DEBTRACE("Executor::getTasksToLoad()");
511   list<string> listOfNodesToLoad;
512   listOfNodesToLoad.clear();
513   { // --- Critical section
514     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
515     _isRunningunderExternalControl=true;
516     switch (_executorState)
517       {
518       case YACS::WAITINGTASKS:
519       case YACS::PAUSED:
520         {
521           listOfNodesToLoad = _listOfTasksToLoad;
522           break;
523         }
524       case YACS::NOTYETINITIALIZED:
525       case YACS::INITIALISED:
526       case YACS::RUNNING:
527       case YACS::FINISHED:
528       case YACS::STOPPED:
529       default:
530         {
531           break;
532         }
533       }
534   } // --- End of critical section
535   return listOfNodesToLoad;
536 }
537
538
539 //! Define a subset of task to execute in step by step mode
540 /*!
541  * Behaviour is unpredictable if the list is not a subset of the list given by Executor::getTasksToLoad
542  * in the current step.
543  * If some nodes must run in parallel, they must stay together in the list.
544  */
545
546 bool Executor::setStepsToExecute(std::list<std::string> listToExecute)
547 {
548   DEBTRACE("Executor::setStepsToExecute(std::list<std::string> listToExecute)");
549   bool ret = true;
550   vector<Task *>::iterator iter;
551   vector<Task *> restrictedTasks;
552   { // --- Critical section
553     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
554     _isRunningunderExternalControl=true;
555     switch (_executorState)
556       {
557       case YACS::WAITINGTASKS:
558       case YACS::PAUSED:
559         {
560           for (iter=_tasksSave.begin(); iter!=_tasksSave.end(); iter++)
561             {
562               string readyNode = _mainSched->getTaskName(*iter);
563               if (find(listToExecute.begin(), listToExecute.end(), readyNode)
564                   != listToExecute.end())
565                 {
566                   restrictedTasks.push_back(*iter);
567                   DEBTRACE("node to execute " << readyNode);
568                 }
569             }
570           _tasks.clear();
571           for (iter=restrictedTasks.begin(); iter!=restrictedTasks.end(); iter++)
572             {
573               _tasks.push_back(*iter);
574             }
575           break;
576         }
577       case YACS::NOTYETINITIALIZED:
578       case YACS::INITIALISED:
579       case YACS::RUNNING:
580       case YACS::FINISHED:
581       case YACS::STOPPED:
582       default:
583         {
584           break;
585         }
586       }
587     } // --- End of critical section
588
589   _tasks.clear();
590   for (iter=restrictedTasks.begin(); iter!=restrictedTasks.end(); iter++)
591     {
592       _tasks.push_back(*iter);
593     }
594   for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
595     {
596       string readyNode = _mainSched->getTaskName(*iter);
597       DEBTRACE("selected node to execute " << readyNode);
598     }
599
600   return ret;
601 }
602
603 //! suspend pilot execution until Executor is in pause or waiting tasks completion mode.
604 /*!
605  *  Do nothing if execution is finished or in pause.
606  *  Wait first step if Executor is running or in initialization.
607  */
608
609 void Executor::waitPause()
610 {
611   DEBTRACE("Executor::waitPause()" << _executorState);
612   { // --- Critical section
613     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
614     _isRunningunderExternalControl=true;
615     switch (_executorState)
616       {
617       default:
618       case YACS::STOPPED:
619       case YACS::FINISHED:
620       case YACS::WAITINGTASKS:
621       case YACS::PAUSED:
622         {
623           break;
624         }
625       case YACS::NOTYETINITIALIZED:
626       case YACS::INITIALISED:
627       case YACS::RUNNING:
628         {
629           _condForPilot.wait(_mutexForSchedulerUpdate); // wait until executor is PAUSED or WAITINGTASKS
630           break;
631         }
632       }
633   } // --- End of critical section
634   DEBTRACE("---");
635 }
636
637 //! stops the execution as soon as possible 
638
639 void Executor::stopExecution()
640 {
641   setExecMode(YACS::STEPBYSTEP);
642   //waitPause();
643   _isOKToEnd = true;
644   resumeCurrentBreakPoint();
645 }
646
647 //! save the current state of execution in an xml file
648
649 bool Executor::saveState(const std::string& xmlFile)
650 {
651   DEBTRACE("Executor::saveState() in " << xmlFile);
652   bool result = false;
653   try {
654     YACS::ENGINE::VisitorSaveState vst(_root);
655     vst.openFileDump(xmlFile.c_str());
656     _root->accept(&vst);
657     vst.closeFileDump();
658     result = true;
659   }
660   catch(Exception& ex) {
661     std::cerr << ex.what() << std::endl;
662   }
663   return result;
664 }
665
666 //! not yet implemented
667
668 bool Executor::loadState()
669 {
670   DEBTRACE("Executor::loadState()");
671   _isRunningunderExternalControl=true;
672   return true;
673 }
674
675
676 static int isfile(const char *filename) 
677 {
678   struct stat buf;
679   if (stat(filename, &buf) != 0)
680     return 0;
681   if (!S_ISREG(buf.st_mode))
682     return 0;
683   return 1;
684 }
685
686 //! Display the graph state as a dot display, public method
687
688 void Executor::displayDot(Scheduler *graph)
689 {
690   _isRunningunderExternalControl=true;
691   _displayDot(graph);
692 }
693
694 //! Display the graph state as a dot display
695 /*!
696  *  \param graph  : the node to display
697  */
698
699 void Executor::_displayDot(Scheduler *graph)
700 {
701    std::ofstream g("titi");
702    ((ComposedNode*)graph)->writeDot(g);
703    g.close();
704    const char displayScript[]="display.sh";
705    if(isfile(displayScript))
706      system("sh display.sh");
707    else
708      system("dot -Tpng titi|display -delay 5");
709 }
710
711 //! Wait reactivation in modes Step By step or with BreakPoints
712 /*!
713  *  Check mode of execution (set by main thread):
714  *  - YACS::CONTINUE        : the graph execution continues.
715  *  - YACS::STEPBYSTEP      : wait on condition (pilot thread, Executor::resumeCurrentBreakPoint)
716  *  - YACS::STOPBEFORENODES : if there are ready nodes in a list of breakpoints,
717  *                            wait on condition (pilot thread, Executor::resumeCurrentBreakPoint)
718  *                            else continue the graph execution.
719  *  \return true if end of executor thread is requested
720  */
721
722 bool Executor::checkBreakPoints()
723 {
724   DEBTRACE("Executor::checkBreakPoints()");
725   vector<Task *>::iterator iter;
726   bool endRequested = false;
727
728   switch (_execMode)
729     {
730     case YACS::CONTINUE:
731       {
732         break;
733       }
734     case YACS::STOPBEFORENODES:
735       {
736         bool stop = false;
737         { // --- Critical section
738           YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
739           _tasksSave = _tasks;
740           for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
741             {
742               string nodeToLoad = _mainSched->getTaskName(*iter);
743               if (find(_listOfBreakPoints.begin(), _listOfBreakPoints.end(), nodeToLoad)
744                   != _listOfBreakPoints.end())
745                 {
746                   stop = true;
747                   break;
748                 }
749             }
750           if (stop)
751             {
752               _listOfTasksToLoad.clear();
753               for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
754                 {
755                   string nodeToLoad = _mainSched->getTaskName(*iter);
756                   _listOfTasksToLoad.push_back(nodeToLoad);
757                 }
758               if (getNbOfThreads())
759                 _executorState = YACS::WAITINGTASKS; // will be paused after completion of running tasks
760               else
761                 _executorState = YACS::PAUSED;
762               sendEvent("executor");
763               _condForPilot.notify_all();
764             }
765           if (stop && !_isOKToEnd) waitResume(); // wait until pilot calls resumeCurrentBreakPoint(), mutex released during wait 
766           if (_isOKToEnd) endRequested = true;
767         } // --- End of critical section
768           if (stop) DEBTRACE("wake up from waitResume");
769         break;
770       }
771     default:
772     case YACS::STEPBYSTEP:
773       {
774         { // --- Critical section
775           YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
776           _tasksSave = _tasks;
777           _listOfTasksToLoad.clear();
778           for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
779             {
780               string nodeToLoad = _mainSched->getTaskName(*iter);
781               _listOfTasksToLoad.push_back(nodeToLoad);
782             }
783           if (getNbOfThreads())
784             _executorState = YACS::WAITINGTASKS; // will be paused after completion of running tasks
785           else
786             _executorState = YACS::PAUSED;
787           sendEvent("executor");
788           _condForPilot.notify_all();
789           if (!_isOKToEnd)
790             waitResume(); // wait until pilot calls resumeCurrentBreakPoint(), mutex released during wait
791                           // or, if no pilot, wait until no more running tasks (stop on error)
792           if (_isOKToEnd) endRequested = true;
793         } // --- End of critical section
794         DEBTRACE("wake up from waitResume");
795         break;
796       }
797     }
798   DEBTRACE("endRequested: " << endRequested);
799   return endRequested;
800 }
801
802
803 //! in modes Step By step or with BreakPoint, wait until pilot resumes the execution
804 /*!
805  *  With the condition Mutex, the mutex is released atomically during the wait.
806  *  Pilot calls Executor::resumeCurrentBreakPoint to resume execution.
807  *  Must be called while mutex is locked.
808  */
809
810 void Executor::waitResume()
811 {
812   DEBTRACE("Executor::waitResume()");
813   _condForStepByStep.wait(_mutexForSchedulerUpdate); // wait until pilot calls resumeCurrentBreakPoint()
814   DEBTRACE("---");
815 }
816
817
818 //! Perform loading of a Task.
819 /*!
820  *  \param task  : Task to load
821  */
822
823 void Executor::loadTask(Task *task, const Executor *execInst)
824 {
825   DEBTRACE("Executor::loadTask(Task *task)");
826   if(task->getState() != YACS::TOLOAD)
827     return;
828   traceExec(task, "state:TOLOAD", ComputePlacement(task));
829   {//Critical section
830     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
831     _mainSched->notifyFrom(task,YACS::START,execInst);
832   }//End of critical section
833   try
834     {
835       traceExec(task, "load", ComputePlacement(task));
836       task->load();
837       traceExec(task, "initService", ComputePlacement(task));
838       task->initService();
839     }
840   catch(Exception& ex) 
841     {
842       std::cerr << ex.what() << std::endl;
843       {//Critical section
844         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
845         task->aborted();
846         _mainSched->notifyFrom(task,YACS::ABORT,execInst);
847         traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
848       }//End of critical section
849     }
850   catch(...) 
851     {
852       std::cerr << "Load failed" << std::endl;
853       {//Critical section
854         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
855         task->aborted();
856         _mainSched->notifyFrom(task,YACS::ABORT,execInst);
857         traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
858       }//End of critical section
859     }
860 }
861
862 struct threadargs
863 {
864   Task *task;
865   Scheduler *sched;
866   Executor *execInst;
867 };
868
869 void Executor::loadTasks(const std::vector<Task *>& tasks, const Executor *execInst)
870 {
871   for(std::vector<Task *>::const_iterator iter = _tasks.begin(); iter != _tasks.end(); iter++)
872     loadTask(*iter,execInst);
873 }
874
875 void Executor::loadParallelTasks(const std::vector<Task *>& tasks, const Executor *execInst)
876 {
877   std::vector<Thread> ths(tasks.size());
878   std::size_t ithread(0);
879   for(std::vector<Task *>::const_iterator iter = _tasks.begin(); iter != _tasks.end(); iter++, ithread++)
880     {
881       DEBTRACE("Executor::loadParallelTasks(Task *task)");
882       struct threadargs *args(new threadargs);
883       args->task = (*iter);
884       args->sched = _mainSched;
885       args->execInst = this;
886       ths[ithread].go(functionForTaskLoad, args, _threadStackSize);
887     }
888   for(ithread=0;ithread<tasks.size();ithread++)
889     ths[ithread].join();
890 }
891
892 //! Execute a list of tasks possibly connected through datastream links
893 /*!
894  *  \param tasks  : a list of tasks to execute
895  *
896  */
897 void Executor::launchTasks(const std::vector<Task *>& tasks)
898 {
899   //First phase, make datastream connections
900   for(vector<Task *>::const_iterator iter=tasks.begin();iter!=tasks.end();iter++)
901     {
902       YACS::StatesForNode state=(*iter)->getState();
903       if(state != YACS::TOLOAD && state != YACS::TORECONNECT)continue;
904       try
905         {
906           (*iter)->connectService();
907           traceExec(*iter, "connectService",ComputePlacement(*iter));
908           {//Critical section
909             YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
910             (*iter)->connected();
911           }//End of critical section
912         }
913       catch(Exception& ex) 
914         {
915           std::cerr << ex.what() << std::endl;
916           try
917             {
918               (*iter)->disconnectService();
919               traceExec(*iter, "disconnectService",ComputePlacement(*iter));
920             }
921           catch(...) 
922             {
923               // Disconnect has failed
924               traceExec(*iter, "disconnectService failed, ABORT",ComputePlacement(*iter));
925             }
926           {//Critical section
927             YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
928             (*iter)->aborted();
929             _mainSched->notifyFrom(*iter,YACS::ABORT,this);
930           }//End of critical section
931         }
932       catch(...) 
933         {
934           std::cerr << "Problem in connectService" << std::endl;
935           try
936             {
937               (*iter)->disconnectService();
938               traceExec(*iter, "disconnectService",ComputePlacement(*iter));
939             }
940           catch(...) 
941             {
942               // Disconnect has failed
943               traceExec(*iter, "disconnectService failed, ABORT",ComputePlacement(*iter));
944             }
945           {//Critical section
946             YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
947             (*iter)->aborted();
948             _mainSched->notifyFrom(*iter,YACS::ABORT,this);
949           }//End of critical section
950         }
951       if((*iter)->getState() == YACS::ERROR)
952         {
953           //try to put all coupled tasks in error
954           std::set<Task*> coupledSet;
955           (*iter)->getCoupledTasks(coupledSet);
956           for (std::set<Task*>::iterator it = coupledSet.begin(); it != coupledSet.end(); ++it)
957             {
958               Task* t=*it;
959               if(t == *iter)continue;
960               if(t->getState() == YACS::ERROR)continue;
961               try
962                 {
963                   t->disconnectService();
964                   traceExec(t, "disconnectService",ComputePlacement(*iter));
965                 }
966               catch(...)
967                 {
968                   // Disconnect has failed
969                   traceExec(t, "disconnectService failed, ABORT",ComputePlacement(*iter));
970                 }
971               {//Critical section
972                 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
973                 t->aborted();
974                 _mainSched->notifyFrom(t,YACS::ABORT,this);
975               }//End of critical section
976               traceExec(t, "state:"+Node::getStateName(t->getState()),ComputePlacement(*iter));
977             }
978         }
979       traceExec(*iter, "state:"+Node::getStateName((*iter)->getState()),ComputePlacement(*iter));
980     }
981
982   //Second phase, execute each task in a thread
983   for(vector<Task *>::const_iterator iter=tasks.begin();iter!=tasks.end();iter++)
984     {
985       launchTask(*iter);
986     }
987 }
988
989 //! Execute a Task in a thread
990 /*!
991  *  \param task  : Task to execute
992  *
993  *  Calls Scheduler::notifyFrom of main node (_mainSched) to notify start
994  *
995  *  Calls Executor::functionForTaskExecution in Thread
996  */
997
998 void Executor::launchTask(Task *task)
999 {
1000   DEBTRACE("Executor::launchTask(Task *task)");
1001   struct threadargs *args;
1002   if(task->getState() != YACS::TOACTIVATE)return;
1003
1004   DEBTRACE("before _semForMaxThreads.wait " << _semThreadCnt);
1005   if(_semThreadCnt == 0)
1006     {
1007       // --- Critical section
1008       YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1009       //check if we have enough threads to run
1010       std::set<Task*> tmpSet=_runningTasks;
1011       std::set<Task*>::iterator it = tmpSet.begin();
1012       std::string status="running";
1013       std::set<Task*> coupledSet;
1014       while( it != tmpSet.end() )
1015         {
1016           Task* tt=*it;
1017           coupledSet.clear();
1018           tt->getCoupledTasks(coupledSet);
1019           status="running";
1020           for (std::set<Task*>::iterator iter = coupledSet.begin(); iter != coupledSet.end(); ++iter)
1021             {
1022               if((*iter)->getState() == YACS::TOACTIVATE)status="toactivate";
1023               tmpSet.erase(*iter);
1024             }
1025           if(status=="running")break;
1026           it = tmpSet.begin();
1027         }
1028
1029       if(status=="toactivate")
1030         {
1031           std::cerr << "WARNING: maybe you need more threads to run your schema (current value="<< _maxThreads << ")" << std::endl;
1032           std::cerr << "If it is the case, set the YACS_MAX_THREADS environment variable to a bigger value (export YACS_MAX_THREADS=xxx)" << std::endl;
1033         }
1034       // --- End of critical section
1035     }
1036
1037   _semForMaxThreads.wait();
1038   _semThreadCnt -= 1;
1039
1040   args= new threadargs;
1041   args->task = task;
1042   args->sched = _mainSched;
1043   args->execInst = this;
1044
1045   traceExec(task, "launch",ComputePlacement(task));
1046
1047   { // --- Critical section
1048     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1049     _numberOfRunningTasks++;
1050     _runningTasks.insert(task);
1051     task->begin(); //change state to ACTIVATED
1052   } // --- End of critical section
1053   Thread(functionForTaskExecution, args, _threadStackSize);
1054 }
1055
1056 //! wait until a running task ends
1057
1058 void Executor::sleepWhileNoEventsFromAnyRunningTask()
1059 {
1060   DEBTRACE("Executor::sleepWhileNoEventsFromAnyRunningTask()");
1061 //   _semForNewTasksToPerform.wait(); //----utiliser pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
1062   YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1063   if (_numberOfRunningTasks > 0 && _numberOfEndedTasks==0)
1064     {
1065       _isWaitingEventsFromRunningTasks = true;
1066       _condForNewTasksToPerform.wait(_mutexForSchedulerUpdate); // mutex released during wait
1067     }
1068   _numberOfEndedTasks=0;
1069   DEBTRACE("---");
1070 }
1071
1072 //! not implemented
1073
1074 void Executor::notifyEndOfThread(YACS::BASES::Thread *thread)
1075 {
1076   /*_mutexForNbOfConcurrentThreads.lock();
1077   _groupOfAllThreadsCreated.remove(thread);
1078   delete thread;
1079   _mutexForNbOfConcurrentThreads.unlock();*/
1080 }
1081
1082
1083 //! must be used protected by _mutexForSchedulerUpdate!
1084
1085 void Executor::wakeUp()
1086 {
1087   DEBTRACE("Executor::wakeUp() " << _isWaitingEventsFromRunningTasks);
1088   if (_isWaitingEventsFromRunningTasks)
1089     {
1090       _isWaitingEventsFromRunningTasks = false;
1091       _condForNewTasksToPerform.notify_all();
1092     }
1093   else
1094     _numberOfEndedTasks++;
1095 }
1096
1097 //! number of running tasks
1098
1099 int Executor::getNbOfThreads()
1100 {
1101   int ret;
1102   YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForNbOfConcurrentThreads);
1103   _isRunningunderExternalControl=true;
1104   ret = _groupOfAllThreadsCreated.size();
1105   return ret;
1106 }
1107
1108 /*!
1109  * This thread is NOT supposed to be detached !
1110  */
1111 void *Executor::functionForTaskLoad(void *arg)
1112 {
1113   DEBTRACE("Executor::functionForTaskLoad(void *arg)");
1114   struct threadargs *args = (struct threadargs *) arg;
1115   Task *task=args->task;
1116   Scheduler *sched=args->sched;
1117   Executor *execInst=args->execInst;
1118   delete args;
1119   execInst->loadTask(task,execInst);// no throw of this method - all throw are catched !
1120   return 0;
1121 }
1122
1123 //! Function to perform execution of a task in a thread
1124 /*!
1125  *  \param arg  : 3 elements (a Task, a Scheduler, an Executor)
1126  *
1127  *  Calls Task::execute
1128  *
1129  *  Calls Task::finished when the task is finished
1130  *
1131  *  Calls (notify with event YACS::FINISH) Scheduler::notifyFrom when the task is finished
1132  *
1133  *  Calls Executor::wakeUp and Executor::notifyEndOfThread
1134  */
1135
1136 void *Executor::functionForTaskExecution(void *arg)
1137 {
1138   DEBTRACE("Executor::functionForTaskExecution(void *arg)");
1139
1140   struct threadargs *args = (struct threadargs *) arg;
1141   Task *task=args->task;
1142   Scheduler *sched=args->sched;
1143   Executor *execInst=args->execInst;
1144   delete args;
1145   execInst->traceExec(task, "state:"+Node::getStateName(task->getState()),ComputePlacement(task));
1146
1147   Thread::detach();
1148
1149   // Execute task
1150
1151   if(execInst->getDPLScopeSensitive())
1152     {
1153       Node *node(dynamic_cast<Node *>(task));
1154       ComposedNode *gfn(dynamic_cast<ComposedNode *>(sched));
1155       if(node!=0 && gfn!=0)
1156         node->applyDPLScope(gfn);
1157     }
1158
1159   YACS::Event ev=YACS::FINISH;
1160   try
1161     {
1162       execInst->traceExec(task, "start execution",ComputePlacement(task));
1163       task->execute();
1164       execInst->traceExec(task, "end execution OK",ComputePlacement(task));
1165     }
1166   catch(Exception& ex)
1167     {
1168       std::cerr << "YACS Exception during execute" << std::endl;
1169       std::cerr << ex.what() << std::endl;
1170       ev=YACS::ABORT;
1171       string message = "end execution ABORT, ";
1172       message += ex.what();
1173       execInst->traceExec(task, message,ComputePlacement(task));
1174     }
1175   catch(...) 
1176     {
1177       // Execution has failed
1178       std::cerr << "Execution has failed: unknown reason" << std::endl;
1179       ev=YACS::ABORT;
1180       execInst->traceExec(task, "end execution ABORT, unknown reason",ComputePlacement(task));
1181     }
1182
1183   // Disconnect task
1184   try
1185     {
1186       DEBTRACE("task->disconnectService()");
1187       task->disconnectService();
1188       execInst->traceExec(task, "disconnectService",ComputePlacement(task));
1189     }
1190   catch(...) 
1191     {
1192       // Disconnect has failed
1193       std::cerr << "disconnect has failed" << std::endl;
1194       ev=YACS::ABORT;
1195       execInst->traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
1196     }
1197   //
1198
1199   std::string placement(ComputePlacement(task));
1200
1201   // container management for HomogeneousPoolOfContainer
1202
1203   HomogeneousPoolContainer *contC(dynamic_cast<HomogeneousPoolContainer *>(task->getContainer()));
1204   if(contC)
1205     {
1206       YACS::BASES::AutoLocker<Container> alckCont(contC);
1207       contC->release(task);
1208     }
1209
1210   DEBTRACE("End task->execute()");
1211   { // --- Critical section
1212     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&execInst->_mutexForSchedulerUpdate);
1213     try
1214       {
1215         if (ev == YACS::FINISH) task->finished();
1216         if (ev == YACS::ABORT)
1217           {
1218             execInst->_errorDetected = true;
1219             if (execInst->_stopOnErrorRequested)
1220               {
1221                 execInst->_execMode = YACS::STEPBYSTEP;
1222                 execInst->_isOKToEnd = true;
1223               }
1224             task->aborted();
1225           }
1226         execInst->traceExec(task, "state:"+Node::getStateName(task->getState()),placement);
1227         sched->notifyFrom(task,ev,execInst);
1228       }
1229     catch(Exception& ex)
1230       {
1231         //notify has failed : it is supposed to have set state
1232         //so no need to do anything
1233         std::cerr << "Error during notification" << std::endl;
1234         std::cerr << ex.what() << std::endl;
1235       }
1236     catch(...)
1237       {
1238         //notify has failed : it is supposed to have set state
1239         //so no need to do anything
1240         std::cerr << "Notification failed" << std::endl;
1241       }
1242     execInst->_numberOfRunningTasks--;
1243     execInst->_runningTasks.erase(task);
1244     DEBTRACE("_numberOfRunningTasks: " << execInst->_numberOfRunningTasks 
1245              << " _execMode: " << execInst->_execMode
1246              << " _executorState: " << execInst->_executorState);
1247     if ((execInst->_numberOfRunningTasks == 0) && (execInst->_execMode != YACS::CONTINUE)) // no more running tasks
1248       {
1249         if (execInst->_executorState == YACS::WAITINGTASKS)
1250           {
1251             execInst->_executorState = YACS::PAUSED;
1252             execInst->sendEvent("executor");
1253             execInst->_condForPilot.notify_all();
1254             if (execInst->_errorDetected &&
1255                 execInst->_stopOnErrorRequested &&
1256                 !execInst->_isRunningunderExternalControl)
1257               execInst->_condForStepByStep.notify_all(); // exec thread may be on waitResume
1258           }
1259       }
1260     DEBTRACE("before _semForMaxThreads.post " << execInst->_semThreadCnt);
1261     execInst->_semForMaxThreads.post();
1262     execInst->_semThreadCnt += 1;
1263     DEBTRACE("after _semForMaxThreads.post " << execInst->_semThreadCnt);
1264     if (execInst->_executorState != YACS::PAUSED)  execInst->wakeUp();
1265
1266   } // --- End of critical section (change state)
1267
1268   //execInst->notifyEndOfThread(0);
1269   Thread::exit(0);
1270   return 0;
1271 }
1272
1273 void Executor::traceExec(Task *task, const std::string& message, const std::string& placement)
1274 {
1275   string nodeName = _mainSched->getTaskName(task);
1276   Container *cont = task->getContainer();
1277   string containerName = "---";
1278   if (cont)
1279     containerName = cont->getName();
1280
1281 #ifdef WIN32
1282   DWORD now = timeGetTime();
1283   double elapse = (now - _start)/1000.0;
1284 #else
1285   timeval now;
1286   gettimeofday(&now, NULL);
1287   double elapse = (now.tv_sec - _start.tv_sec) + double(now.tv_usec - _start.tv_usec)/1000000.0;
1288 #endif
1289   {
1290     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForTrace);
1291     _trace << elapse << " " << containerName << " " << placement << " " << nodeName << " " << message << endl;
1292     _trace << flush;
1293   }
1294 }
1295
1296 //! emit notification to all observers registered with  the dispatcher 
1297 /*!
1298  * The dispatcher is unique and can be obtained by getDispatcher()
1299  */
1300 void Executor::sendEvent(const std::string& event)
1301 {
1302   Dispatcher* disp=Dispatcher::getDispatcher();
1303   YASSERT(disp);
1304   YASSERT(_root);
1305   disp->dispatch(_root,event);
1306 }
1307
1308 /*!
1309  * This method takes in input a list of tasks and selects from that lists a part of it considering only the containers.
1310  * If tasks have no container instance subclass of HomogeneousPoolContainer this method will let the \a tsks untouched.
1311  *
1312  * \param [in,out] tsks - list of tasks to be
1313  */
1314 void Executor::FilterTasksConsideringContainers(std::vector<Task *>& tsks)
1315 {
1316   std::map<HomogeneousPoolContainer *, std::vector<Task *> > m;
1317   for(std::vector<Task *>::const_iterator it=tsks.begin();it!=tsks.end();it++)
1318     {
1319       Task *cur(*it);
1320       if(!cur)
1321         continue;
1322       Container *cont(cur->getContainer());
1323       if(!cont)
1324         {
1325           m[(HomogeneousPoolContainer *)NULL].push_back(cur);
1326           continue;
1327         }
1328       HomogeneousPoolContainer *contC(dynamic_cast<HomogeneousPoolContainer *>(cont));
1329       if(!contC)
1330         {
1331           m[(HomogeneousPoolContainer *)NULL].push_back(cur);
1332           continue;
1333         }
1334       m[contC].push_back(cur);
1335     }
1336   //
1337   std::vector<Task *> ret;
1338   for(std::map<HomogeneousPoolContainer *, std::vector<Task *> >::const_iterator it=m.begin();it!=m.end();it++)
1339     {
1340       HomogeneousPoolContainer *curhpc((*it).first);
1341       const std::vector<Task *>& curtsks((*it).second);
1342       if(!curhpc)
1343         {
1344           ret.insert(ret.end(),curtsks.begin(),curtsks.end());
1345         }
1346       else
1347         {
1348           // start of critical section for container curhpc
1349           YACS::BASES::AutoLocker<Container> alckForCont(curhpc);
1350           std::vector<const Task *> vecOfTaskSharingSameHPContToBeRunSimutaneously;
1351           std::size_t sz(curhpc->getNumberOfFreePlace());
1352           std::vector<Task *>::const_iterator it2(curtsks.begin());
1353           for(std::size_t i=0;i<sz && it2!=curtsks.end();i++,it2++)
1354             {
1355               vecOfTaskSharingSameHPContToBeRunSimutaneously.push_back(*it2);
1356               ret.push_back(*it2);
1357             }
1358           curhpc->allocateFor(vecOfTaskSharingSameHPContToBeRunSimutaneously);
1359           //end of critical section
1360         }
1361     }
1362   //
1363   tsks=ret;
1364 }
1365
1366 std::string Executor::ComputePlacement(Task *zeTask)
1367 {
1368   std::string placement("---");
1369   if(!zeTask)
1370     return placement;
1371   if(zeTask->getContainer())
1372     placement=zeTask->getContainer()->getFullPlacementId(zeTask);
1373   return placement;
1374 }