Salome HOME
Copyrights update 2015.
[modules/yacs.git] / src / engine / Executor.cxx
1 // Copyright (C) 2006-2015  CEA/DEN, EDF R&D
2 //
3 // This library is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU Lesser General Public
5 // License as published by the Free Software Foundation; either
6 // version 2.1 of the License, or (at your option) any later version.
7 //
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
11 // Lesser General Public License for more details.
12 //
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
16 //
17 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
18 //
19
20 #include "Executor.hxx"
21 #include "Task.hxx"
22 #include "AutoLocker.hxx"
23 #include "Scheduler.hxx"
24 #include "Dispatcher.hxx"
25 #include "Container.hxx"
26 #include "HomogeneousPoolContainer.hxx"
27 #include "ComponentInstance.hxx"
28
29 #include "VisitorSaveState.hxx"
30 #include "ServiceNode.hxx"
31 #include "ComposedNode.hxx"
32
33 #include <iostream>
34 #include <fstream>
35 #include <sys/stat.h>
36 #ifndef WIN32
37 #include <sys/time.h>
38 #include <unistd.h>
39 #endif
40
41 #include <cstdlib>
42 #include <algorithm>
43
44 #ifdef WIN32
45 #define usleep(A) _sleep(A/1000)
46 #if !defined(S_ISCHR) || !defined(S_ISREG)
47 #  ifndef S_IFMT
48 #    ifdef _S_IFMT
49 #      define S_IFMT _S_IFMT
50 #      define S_IFCHR _S_IFCHR
51 #      define S_IFREG _S_IFREG
52 #    else
53 #    ifdef __S_IFMT
54 #      define S_IFMT __S_IFMT
55 #      define S_IFCHR __S_IFCHR
56 #      define S_IFREG __S_IFREG
57 #    endif
58 #    endif
59 #  endif
60 #  define S_ISCHR(mode) (((mode) & S_IFMT) == S_IFCHR)
61 #  define S_ISREG(mode) (((mode) & S_IFMT) == S_IFREG)
62 #endif
63 #endif
64
65 using namespace YACS::ENGINE;
66 using namespace std;
67
68 using YACS::BASES::Mutex;
69 using YACS::BASES::Thread;
70 using YACS::BASES::Semaphore;
71
72 //#define _DEVDEBUG_
73 #include "YacsTrace.hxx"
74
75 int Executor::_maxThreads(50);
76 size_t Executor::_threadStackSize(1048576); // Default thread stack size is 1MB
77
78 Executor::Executor():_nbOfConcurrentThreads(0), _semForMaxThreads(_maxThreads),_keepGoingOnFail(false)
79 {
80   _root=0;
81   _toContinue = true;
82   _isOKToEnd = false;
83   _stopOnErrorRequested = false;
84   _dumpOnErrorRequested = false;
85   _errorDetected = false;
86   _isRunningunderExternalControl=false;
87   _executorState = YACS::NOTYETINITIALIZED;
88   _execMode = YACS::CONTINUE;
89   _semThreadCnt = _maxThreads;
90   DEBTRACE("Executor initialized with max threads = " << _maxThreads);
91 }
92
93 Executor::~Executor()
94 {
95   for(list<Thread *>::iterator iter=_groupOfAllThreadsCreated.begin();iter!=_groupOfAllThreadsCreated.end();iter++)
96     delete *iter;
97 }
98
99 //! Execute a graph waiting for completion
100 /*!
101  *  \param graph : schema to execute
102  *  \param debug : display the graph with dot if debug == 1
103  *  \param fromScratch : if true the graph is reinitialized
104  *
105  *  Calls Scheduler::getNextTasks and Scheduler::selectRunnableTasks to select tasks to execute
106  *  
107  *  Calls Executor::launchTask to execute a selected Task.
108  *
109  *  Completion when graph is finished (Scheduler::isFinished)
110  */
111
112 void Executor::RunA(Scheduler *graph,int debug, bool fromScratch)
113 {
114   DEBTRACE("Executor::RunW debug: " << debug << " fromScratch: " << fromScratch);
115   _mainSched=graph;
116   _root = dynamic_cast<ComposedNode *>(_mainSched);
117   if (!_root) throw Exception("Executor::Run, Internal Error!");
118   bool isMore;
119   int i=0;
120   if(debug>1)_displayDot(graph);
121   if (fromScratch)
122     {
123       graph->init();
124       graph->exUpdateState();
125     }
126   if(debug>1)_displayDot(graph);
127   vector<Task *> tasks;
128   vector<Task *>::iterator iter;
129   _toContinue=true;
130   _execMode = YACS::CONTINUE;
131   _isWaitingEventsFromRunningTasks = false;
132   _numberOfRunningTasks = 0;
133   _runningTasks.clear();
134   _numberOfEndedTasks=0;
135   while(_toContinue)
136     {
137       sleepWhileNoEventsFromAnyRunningTask();
138
139       if(debug>2)_displayDot(graph);
140
141       {//Critical section
142         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
143         tasks=graph->getNextTasks(isMore);
144         graph->selectRunnableTasks(tasks);
145       }//End of critical section
146
147       if(debug>2)_displayDot(graph);
148
149       for(iter=tasks.begin();iter!=tasks.end();iter++)
150         loadTask(*iter,this);
151
152       if(debug>1)_displayDot(graph);
153
154       launchTasks(tasks);
155
156       if(debug>1)_displayDot(graph);
157
158       {//Critical section
159         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
160         _toContinue=!graph->isFinished();
161       }//End of critical section
162       DEBTRACE("_toContinue: " << _toContinue);
163
164       if(debug>0)_displayDot(graph);
165
166       i++;
167     }
168 }
169
170 //! Execute a graph with breakpoints or step by step
171 /*!
172  *  To be launch in a thread (main thread controls the progression).
173  *  \param graph : schema to execute
174  *  \param debug : display the graph with dot if debug >0
175  *  \param fromScratch : if false, state from a previous partial exection is already loaded
176  *
177  *  Calls Scheduler::getNextTasks and Scheduler::selectRunnableTasks to select tasks to execute
178  *  
179  *  Calls Executor::checkBreakPoints to verify if a pause is requested 
180  * 
181  *  Calls Executor::launchTask to execute a selected Task
182  *
183  *  Completion when graph is finished (Scheduler::isFinished)
184  *
185  *  States of execution:
186  *  - YACS::NOTYETINITIALIZED
187  *  - YACS::INITIALISED
188  *  - YACS::RUNNING            (to next breakpoint or step)
189  *  - YACS::WAITINGTASKS       (a breakpoint or step as been reached, but there are still running tasks)
190  *  - YACS::PAUSED             (a breakpoint or step as been reached, no more running tasks)
191  *  - YACS::FINISHED           (no more ready tasks, nore running tasks)
192  *  - YACS::STOPPED            (stopped by user before end)
193  *
194  *  Modes of Execution:
195  *  - YACS::CONTINUE           (normal run without breakpoints)
196  *  - YACS::STEPBYSTEP         (pause at each loop)
197  *  - YACS::STOPBEFORENODES    (pause when a node is reached)
198  *
199  *  A breakpoint is defined by a node name. The breakpoint is reached when the node becomes ready.
200  *  Step by Step means execution node by node or group of node by group of nodes.
201  *  At a given step, the user decides to launch all the ready nodes or only a subset
202  *  (Caution: some nodes must run in parallel). 
203  *  The next event (end of task) may give a new set of ready nodes, and define a new step.
204  *
205  *  The graph execution may be controled by a pilot which sends requests. Requests are asynchronous.
206  *  Requests are taken into account only on certain states, otherwise return the status IgnoredRequest.
207  *  - Executor::getCurrentExecMode
208  *  - Executor::getExecutorState
209  *  - Executor::setExecMode             : change the execution mode for next loop
210  *  - Executor::setListOfBreakPoints    : must be set before setting YACS::STOPBEFORENODES
211  *  - Executor::getTasksToLoad          : when paused or waiting tasks, get the list of next tasks
212  *  - Executor::setStepsToExecute       : define a subset of the list given by Executor::getTasksToLoad
213  *  - Executor::resumeCurrentBreakPoint : when paused or waiting tasks, resumes execution
214  *  - Executor::isNotFinished
215  *  - Executor::stopExecution           : stop execution asap, i.e. set STEPBYSTEP and wait PAUSED
216  *  - Executor::saveState               : dump the current state of execution in an xml file
217  *  - Executor::loadState               : Not yet implemented
218  *  - Executor::getNbOfThreads
219  *  - Executor::displayDot
220  *  - Executor::setStopOnError          : ask to stop execution if a node is found in ERROR state
221  *
222  *  If the pilot wants to wait the state YACS::PAUSED or YACS::WAITINGTASKS, synchronisation is obtained with:
223  *  - Executor::waitPause
224  *
225  *  TO BE VALIDATED:
226  *  - Pilot may connect to executor during execution, or deconnect.
227  *  - Several Pilots may be connected at the same time (for observation...)
228  * 
229  */
230
231 void Executor::RunB(Scheduler *graph,int debug, bool fromScratch)
232 {
233   DEBTRACE("Executor::RunB debug: "<< graph->getName() <<" "<< debug<<" fromScratch: "<<fromScratch);
234
235   { // --- Critical section
236     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
237     _mainSched = graph;
238     _root = dynamic_cast<ComposedNode *>(_mainSched);
239     if (!_root) throw Exception("Executor::Run, Internal Error!");
240     _executorState = YACS::NOTYETINITIALIZED;
241     sendEvent("executor");
242     _toContinue=true;
243     _isOKToEnd = false;
244     _errorDetected = false;
245     _isWaitingEventsFromRunningTasks = false;
246     _numberOfRunningTasks = 0;
247     _runningTasks.clear();
248     _numberOfEndedTasks = 0;
249     string tracefile = "traceExec_";
250     tracefile += _mainSched->getName();
251     _trace.open(tracefile.c_str());
252 #ifdef WIN32
253    _start = timeGetTime();
254 #else
255     gettimeofday(&_start, NULL);
256 #endif
257
258   } // --- End of critical section
259
260   if (debug > 1) _displayDot(graph);
261
262   if (fromScratch)
263     {
264       try
265         {
266           graph->init();
267           graph->exUpdateState();
268         }
269       catch(Exception& ex)
270         {
271           DEBTRACE("exception: "<< (ex.what()));
272           _executorState = YACS::FINISHED;
273           sendEvent("executor");
274           throw;
275         }
276     }
277   _executorState = YACS::INITIALISED;
278   sendEvent("executor");
279
280   if (debug > 1) _displayDot(graph);
281
282   vector<Task *>::iterator iter;
283   bool isMore;
284   int problemCount=0;
285   int numberAllTasks;
286
287   _executorState = YACS::RUNNING;
288   sendEvent("executor");
289   while (_toContinue)
290     {
291       DEBTRACE("--- executor main loop");
292       sleepWhileNoEventsFromAnyRunningTask();
293       DEBTRACE("--- events...");
294       if (debug > 2) _displayDot(graph);
295       { // --- Critical section
296         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
297         _tasks=graph->getNextTasks(isMore);
298         numberAllTasks=_numberOfRunningTasks+_tasks.size();
299         graph->selectRunnableTasks(_tasks);
300         FilterTasksConsideringContainers(_tasks);
301       } // --- End of critical section
302       if (debug > 2) _displayDot(graph);
303       if (_executorState == YACS::RUNNING)
304         {
305           if (checkBreakPoints()) break; // end of thread requested, OK to exit at once;
306           if (debug > 0) _displayDot(graph);
307           DEBTRACE("---");
308           //loadTasks(_tasks);
309           loadParallelTasks(_tasks,this);
310           if (debug > 1) _displayDot(graph);
311           DEBTRACE("---");
312           launchTasks(_tasks);
313           DEBTRACE("---");
314         }
315       if (debug > 1) _displayDot(graph);
316       { // --- Critical section
317         DEBTRACE("---");
318         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
319         //It is possible that the graph is finished but it remains running tasks (it's an error but we must take it into account)
320         if(_numberOfRunningTasks == 0)
321           _toContinue = !graph->isFinished();
322
323         DEBTRACE("_numberOfRunningTasks: " << _numberOfRunningTasks);
324         DEBTRACE("_numberOfEndedTasks: " << _numberOfEndedTasks);
325         DEBTRACE("_toContinue: " << _toContinue);
326         if(_toContinue && numberAllTasks==0)
327           {
328             //Problem : no running tasks and no task to launch ??
329             problemCount++;
330             std::cerr << "Problem in Executor : no running tasks and no task to launch ?? problemCount=" << problemCount << std::endl;
331             //Pause to give a chance to interrupt
332             usleep(1000);
333             if(problemCount > 25)
334               {
335                 // Too much problems encountered : stop execution
336                 _toContinue=false;
337               }
338           }
339
340         if (! _toContinue)
341           {
342             _executorState = YACS::FINISHED;
343             sendEvent("executor");
344             _condForPilot.notify_all();
345           }
346       } // --- End of critical section
347       if (debug > 0) _displayDot(graph);
348       DEBTRACE("_toContinue: " << _toContinue);
349     }
350
351   DEBTRACE("End of main Loop");
352
353   { // --- Critical section
354     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
355     if ( _toContinue) // --- break while(): request to stop detected on checkBreakPoints()
356       {
357         DEBTRACE("stop requested: End soon");
358         _executorState = YACS::STOPPED;
359         _toContinue = false;
360         sendEvent("executor");
361       }
362   } // --- End of critical section
363   if ( _dumpOnErrorRequested && _errorDetected)
364     {
365       saveState(_dumpErrorFile);
366     }
367   _trace.close();
368   DEBTRACE("End of RunB thread");  
369 }
370
371 YACS::ExecutionMode Executor::getCurrentExecMode()
372 {
373   _isRunningunderExternalControl=true;
374   return _execMode;
375 }
376
377
378 YACS::ExecutorState Executor::getExecutorState()
379 {
380   _isRunningunderExternalControl=true;
381   return _executorState;
382 }
383
384
385 bool Executor::isNotFinished()
386 {
387   _isRunningunderExternalControl=true;
388   return _toContinue;
389 }
390
391 //! ask to stop execution on the first node found in error
392 /*!
393  * \param dumpRequested   produce a state dump when an error is found
394  * \param xmlFile         name of file used for state dump
395  */
396
397 void Executor::setStopOnError(bool dumpRequested, std::string xmlFile)
398 {
399   { // --- Critical section
400     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
401     _dumpErrorFile=xmlFile;
402     _stopOnErrorRequested=true;
403     _dumpOnErrorRequested = dumpRequested;
404     if (dumpRequested && xmlFile.empty())
405       throw YACS::Exception("dump on error requested and no filename given for dump");
406     DEBTRACE("_dumpErrorFile " << _dumpErrorFile << " " << _dumpOnErrorRequested);
407   } // --- End of critical section
408 }
409
410 //! ask to do not stop execution on nodes found in error
411 /*!
412  */
413
414 void Executor::unsetStopOnError()
415 {
416   { // --- Critical section
417     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
418     _stopOnErrorRequested=false;
419   } // --- End of critical section
420 }
421
422 //! Dynamically set the current mode of execution
423 /*!
424  * The mode can be Continue, step by step, or stop before execution of a node
425  * defined in a list of breakpoints.
426  */
427
428 void Executor::setExecMode(YACS::ExecutionMode mode)
429 {
430   DEBTRACE("Executor::setExecMode(YACS::ExecutionMode mode) " << mode);
431   { // --- Critical section
432     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
433     _isRunningunderExternalControl=true;
434     _execMode = mode;
435   } // --- End of critical section
436 }
437
438 //! wake up executor when in pause
439 /*!
440  * When Executor is in state paused or waiting for task completion, the thread
441  * running loop RunB waits on condition _condForStepByStep.
442  * Thread RunB is waken up.
443  * \return true when actually wakes up executor
444  */
445
446 bool Executor::resumeCurrentBreakPoint()
447 {
448   DEBTRACE("Executor::resumeCurrentBreakPoint()");
449   bool ret = false;
450   //bool doDump = false;
451   { // --- Critical section
452     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
453     _isRunningunderExternalControl=true;
454     DEBTRACE("_executorState: " << _executorState);
455     switch (_executorState)
456       {
457       case YACS::WAITINGTASKS:
458       case YACS::PAUSED:
459         {
460           _condForStepByStep.notify_all();
461           _executorState = YACS::RUNNING;
462           sendEvent("executor");
463           ret = true;
464           //if (_dumpOnErrorRequested && _errorDetected) doDump =true;
465           break;
466         }
467       case YACS::FINISHED:
468       case YACS::STOPPED:
469         {
470           //if (_dumpOnErrorRequested && _errorDetected) doDump =true;
471           DEBTRACE("Graph Execution finished or stopped !");
472           break;
473         }
474       default :
475         {
476           // debug: no easy way to verify if main loop is acutally waiting on condition
477         }
478       }
479     DEBTRACE("---");
480     //if (doDump) saveState(_dumpErrorFile);
481   } // --- End of critical section
482   return ret;
483 }
484
485
486 //! define a list of nodes names as breakpoints in the graph
487
488
489 void Executor::setListOfBreakPoints(std::list<std::string> listOfBreakPoints)
490 {
491   DEBTRACE("Executor::setListOfBreakPoints(std::list<std::string> listOfBreakPoints)");
492   { // --- Critical section
493     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
494     _isRunningunderExternalControl=true;
495     _listOfBreakPoints = listOfBreakPoints;
496   } // --- End of critical section
497 }
498
499
500 //! Get the list of tasks to load, to define a subset to execute in step by step mode
501 /*!
502  *  If the executor is not in mode YACS::WAITINGTASKS nor YACS::PAUSED, the list is empty.
503  *  Use Executor::waitPause to wait.
504  */
505 std::list<std::string> Executor::getTasksToLoad()
506 {
507   DEBTRACE("Executor::getTasksToLoad()");
508   list<string> listOfNodesToLoad;
509   listOfNodesToLoad.clear();
510   { // --- Critical section
511     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
512     _isRunningunderExternalControl=true;
513     switch (_executorState)
514       {
515       case YACS::WAITINGTASKS:
516       case YACS::PAUSED:
517         {
518           listOfNodesToLoad = _listOfTasksToLoad;
519           break;
520         }
521       case YACS::NOTYETINITIALIZED:
522       case YACS::INITIALISED:
523       case YACS::RUNNING:
524       case YACS::FINISHED:
525       case YACS::STOPPED:
526       default:
527         {
528           break;
529         }
530       }
531   } // --- End of critical section
532   return listOfNodesToLoad;
533 }
534
535
536 //! Define a subset of task to execute in step by step mode
537 /*!
538  * Behaviour is unpredictable if the list is not a subset of the list given by Executor::getTasksToLoad
539  * in the current step.
540  * If some nodes must run in parallel, they must stay together in the list.
541  */
542
543 bool Executor::setStepsToExecute(std::list<std::string> listToExecute)
544 {
545   DEBTRACE("Executor::setStepsToExecute(std::list<std::string> listToExecute)");
546   bool ret = true;
547   vector<Task *>::iterator iter;
548   vector<Task *> restrictedTasks;
549   { // --- Critical section
550     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
551     _isRunningunderExternalControl=true;
552     switch (_executorState)
553       {
554       case YACS::WAITINGTASKS:
555       case YACS::PAUSED:
556         {
557           for (iter=_tasksSave.begin(); iter!=_tasksSave.end(); iter++)
558             {
559               string readyNode = _mainSched->getTaskName(*iter);
560               if (find(listToExecute.begin(), listToExecute.end(), readyNode)
561                   != listToExecute.end())
562                 {
563                   restrictedTasks.push_back(*iter);
564                   DEBTRACE("node to execute " << readyNode);
565                 }
566             }
567           _tasks.clear();
568           for (iter=restrictedTasks.begin(); iter!=restrictedTasks.end(); iter++)
569             {
570               _tasks.push_back(*iter);
571             }
572           break;
573         }
574       case YACS::NOTYETINITIALIZED:
575       case YACS::INITIALISED:
576       case YACS::RUNNING:
577       case YACS::FINISHED:
578       case YACS::STOPPED:
579       default:
580         {
581           break;
582         }
583       }
584     } // --- End of critical section
585
586   _tasks.clear();
587   for (iter=restrictedTasks.begin(); iter!=restrictedTasks.end(); iter++)
588     {
589       _tasks.push_back(*iter);
590     }
591   for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
592     {
593       string readyNode = _mainSched->getTaskName(*iter);
594       DEBTRACE("selected node to execute " << readyNode);
595     }
596
597   return ret;
598 }
599
600 //! suspend pilot execution until Executor is in pause or waiting tasks completion mode.
601 /*!
602  *  Do nothing if execution is finished or in pause.
603  *  Wait first step if Executor is running or in initialization.
604  */
605
606 void Executor::waitPause()
607 {
608   DEBTRACE("Executor::waitPause()" << _executorState);
609   { // --- Critical section
610     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
611     _isRunningunderExternalControl=true;
612     switch (_executorState)
613       {
614       default:
615       case YACS::STOPPED:
616       case YACS::FINISHED:
617       case YACS::WAITINGTASKS:
618       case YACS::PAUSED:
619         {
620           break;
621         }
622       case YACS::NOTYETINITIALIZED:
623       case YACS::INITIALISED:
624       case YACS::RUNNING:
625         {
626           _condForPilot.wait(_mutexForSchedulerUpdate); // wait until executor is PAUSED or WAITINGTASKS
627           break;
628         }
629       }
630   } // --- End of critical section
631   DEBTRACE("---");
632 }
633
634 //! stops the execution as soon as possible 
635
636 void Executor::stopExecution()
637 {
638   setExecMode(YACS::STEPBYSTEP);
639   //waitPause();
640   _isOKToEnd = true;
641   resumeCurrentBreakPoint();
642 }
643
644 //! save the current state of execution in an xml file
645
646 bool Executor::saveState(const std::string& xmlFile)
647 {
648   DEBTRACE("Executor::saveState() in " << xmlFile);
649   YACS::ENGINE::VisitorSaveState vst(_root);
650   vst.openFileDump(xmlFile.c_str());
651   _root->accept(&vst);
652   vst.closeFileDump();
653   return true;
654 }
655
656 //! not yet implemented
657
658 bool Executor::loadState()
659 {
660   DEBTRACE("Executor::loadState()");
661   _isRunningunderExternalControl=true;
662   return true;
663 }
664
665
666 static int isfile(const char *filename) 
667 {
668   struct stat buf;
669   if (stat(filename, &buf) != 0)
670     return 0;
671   if (!S_ISREG(buf.st_mode))
672     return 0;
673   return 1;
674 }
675
676 //! Display the graph state as a dot display, public method
677
678 void Executor::displayDot(Scheduler *graph)
679 {
680   _isRunningunderExternalControl=true;
681   _displayDot(graph);
682 }
683
684 //! Display the graph state as a dot display
685 /*!
686  *  \param graph  : the node to display
687  */
688
689 void Executor::_displayDot(Scheduler *graph)
690 {
691    std::ofstream g("titi");
692    ((ComposedNode*)graph)->writeDot(g);
693    g.close();
694    const char displayScript[]="display.sh";
695    if(isfile(displayScript))
696      system("sh display.sh");
697    else
698      system("dot -Tpng titi|display -delay 5");
699 }
700
701 //! Wait reactivation in modes Step By step or with BreakPoints
702 /*!
703  *  Check mode of execution (set by main thread):
704  *  - YACS::CONTINUE        : the graph execution continues.
705  *  - YACS::STEPBYSTEP      : wait on condition (pilot thread, Executor::resumeCurrentBreakPoint)
706  *  - YACS::STOPBEFORENODES : if there are ready nodes in a list of breakpoints,
707  *                            wait on condition (pilot thread, Executor::resumeCurrentBreakPoint)
708  *                            else continue the graph execution.
709  *  \return true if end of executor thread is requested
710  */
711
712 bool Executor::checkBreakPoints()
713 {
714   DEBTRACE("Executor::checkBreakPoints()");
715   vector<Task *>::iterator iter;
716   bool endRequested = false;
717
718   switch (_execMode)
719     {
720     case YACS::CONTINUE:
721       {
722         break;
723       }
724     case YACS::STOPBEFORENODES:
725       {
726         bool stop = false;
727         { // --- Critical section
728           YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
729           _tasksSave = _tasks;
730           for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
731             {
732               string nodeToLoad = _mainSched->getTaskName(*iter);
733               if (find(_listOfBreakPoints.begin(), _listOfBreakPoints.end(), nodeToLoad)
734                   != _listOfBreakPoints.end())
735                 {
736                   stop = true;
737                   break;
738                 }
739             }
740           if (stop)
741             {
742               _listOfTasksToLoad.clear();
743               for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
744                 {
745                   string nodeToLoad = _mainSched->getTaskName(*iter);
746                   _listOfTasksToLoad.push_back(nodeToLoad);
747                 }
748               if (getNbOfThreads())
749                 _executorState = YACS::WAITINGTASKS; // will be paused after completion of running tasks
750               else
751                 _executorState = YACS::PAUSED;
752               sendEvent("executor");
753               _condForPilot.notify_all();
754             }
755           if (stop && !_isOKToEnd) waitResume(); // wait until pilot calls resumeCurrentBreakPoint(), mutex released during wait 
756           if (_isOKToEnd) endRequested = true;
757         } // --- End of critical section
758           if (stop) DEBTRACE("wake up from waitResume");
759         break;
760       }
761     default:
762     case YACS::STEPBYSTEP:
763       {
764         { // --- Critical section
765           YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
766           _tasksSave = _tasks;
767           _listOfTasksToLoad.clear();
768           for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
769             {
770               string nodeToLoad = _mainSched->getTaskName(*iter);
771               _listOfTasksToLoad.push_back(nodeToLoad);
772             }
773           if (getNbOfThreads())
774             _executorState = YACS::WAITINGTASKS; // will be paused after completion of running tasks
775           else
776             _executorState = YACS::PAUSED;
777           sendEvent("executor");
778           _condForPilot.notify_all();
779           if (!_isOKToEnd)
780             waitResume(); // wait until pilot calls resumeCurrentBreakPoint(), mutex released during wait
781                           // or, if no pilot, wait until no more running tasks (stop on error)
782           if (_isOKToEnd) endRequested = true;
783         } // --- End of critical section
784         DEBTRACE("wake up from waitResume");
785         break;
786       }
787     }
788   DEBTRACE("endRequested: " << endRequested);
789   return endRequested;
790 }
791
792
793 //! in modes Step By step or with BreakPoint, wait until pilot resumes the execution
794 /*!
795  *  With the condition Mutex, the mutex is released atomically during the wait.
796  *  Pilot calls Executor::resumeCurrentBreakPoint to resume execution.
797  *  Must be called while mutex is locked.
798  */
799
800 void Executor::waitResume()
801 {
802   DEBTRACE("Executor::waitResume()");
803   _condForStepByStep.wait(_mutexForSchedulerUpdate); // wait until pilot calls resumeCurrentBreakPoint()
804   DEBTRACE("---");
805 }
806
807
808 //! Perform loading of a Task.
809 /*!
810  *  \param task  : Task to load
811  */
812
813 void Executor::loadTask(Task *task, const Executor *execInst)
814 {
815   DEBTRACE("Executor::loadTask(Task *task)");
816   if(task->getState() != YACS::TOLOAD)
817     return;
818   traceExec(task, "state:TOLOAD", ComputePlacement(task));
819   {//Critical section
820     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
821     _mainSched->notifyFrom(task,YACS::START,execInst);
822   }//End of critical section
823   try
824     {
825       traceExec(task, "load", ComputePlacement(task));
826       task->load();
827       traceExec(task, "initService", ComputePlacement(task));
828       task->initService();
829     }
830   catch(Exception& ex) 
831     {
832       std::cerr << ex.what() << std::endl;
833       {//Critical section
834         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
835         task->aborted();
836         _mainSched->notifyFrom(task,YACS::ABORT,execInst);
837         traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
838       }//End of critical section
839     }
840   catch(...) 
841     {
842       std::cerr << "Load failed" << std::endl;
843       {//Critical section
844         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
845         task->aborted();
846         _mainSched->notifyFrom(task,YACS::ABORT,execInst);
847         traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
848       }//End of critical section
849     }
850 }
851
852 struct threadargs
853 {
854   Task *task;
855   Scheduler *sched;
856   Executor *execInst;
857 };
858
859 void Executor::loadTasks(const std::vector<Task *>& tasks, const Executor *execInst)
860 {
861   for(std::vector<Task *>::const_iterator iter = _tasks.begin(); iter != _tasks.end(); iter++)
862     loadTask(*iter,execInst);
863 }
864
865 void Executor::loadParallelTasks(const std::vector<Task *>& tasks, const Executor *execInst)
866 {
867   std::vector<Thread> ths(tasks.size());
868   std::size_t ithread(0);
869   for(std::vector<Task *>::const_iterator iter = _tasks.begin(); iter != _tasks.end(); iter++, ithread++)
870     {
871       DEBTRACE("Executor::loadParallelTasks(Task *task)");
872       struct threadargs *args(new threadargs);
873       args->task = (*iter);
874       args->sched = _mainSched;
875       args->execInst = this;
876       ths[ithread].go(functionForTaskLoad, args, _threadStackSize);
877     }
878   for(ithread=0;ithread<tasks.size();ithread++)
879     ths[ithread].join();
880 }
881
882 //! Execute a list of tasks possibly connected through datastream links
883 /*!
884  *  \param tasks  : a list of tasks to execute
885  *
886  */
887 void Executor::launchTasks(const std::vector<Task *>& tasks)
888 {
889   //First phase, make datastream connections
890   for(vector<Task *>::const_iterator iter=tasks.begin();iter!=tasks.end();iter++)
891     {
892       YACS::StatesForNode state=(*iter)->getState();
893       if(state != YACS::TOLOAD && state != YACS::TORECONNECT)continue;
894       try
895         {
896           (*iter)->connectService();
897           traceExec(*iter, "connectService",ComputePlacement(*iter));
898           {//Critical section
899             YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
900             (*iter)->connected();
901           }//End of critical section
902         }
903       catch(Exception& ex) 
904         {
905           std::cerr << ex.what() << std::endl;
906           try
907             {
908               (*iter)->disconnectService();
909               traceExec(*iter, "disconnectService",ComputePlacement(*iter));
910             }
911           catch(...) 
912             {
913               // Disconnect has failed
914               traceExec(*iter, "disconnectService failed, ABORT",ComputePlacement(*iter));
915             }
916           {//Critical section
917             YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
918             (*iter)->aborted();
919             _mainSched->notifyFrom(*iter,YACS::ABORT,this);
920           }//End of critical section
921         }
922       catch(...) 
923         {
924           std::cerr << "Problem in connectService" << std::endl;
925           try
926             {
927               (*iter)->disconnectService();
928               traceExec(*iter, "disconnectService",ComputePlacement(*iter));
929             }
930           catch(...) 
931             {
932               // Disconnect has failed
933               traceExec(*iter, "disconnectService failed, ABORT",ComputePlacement(*iter));
934             }
935           {//Critical section
936             YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
937             (*iter)->aborted();
938             _mainSched->notifyFrom(*iter,YACS::ABORT,this);
939           }//End of critical section
940         }
941       if((*iter)->getState() == YACS::ERROR)
942         {
943           //try to put all coupled tasks in error
944           std::set<Task*> coupledSet;
945           (*iter)->getCoupledTasks(coupledSet);
946           for (std::set<Task*>::iterator it = coupledSet.begin(); it != coupledSet.end(); ++it)
947             {
948               Task* t=*it;
949               if(t == *iter)continue;
950               if(t->getState() == YACS::ERROR)continue;
951               try
952                 {
953                   t->disconnectService();
954                   traceExec(t, "disconnectService",ComputePlacement(*iter));
955                 }
956               catch(...)
957                 {
958                   // Disconnect has failed
959                   traceExec(t, "disconnectService failed, ABORT",ComputePlacement(*iter));
960                 }
961               {//Critical section
962                 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
963                 t->aborted();
964                 _mainSched->notifyFrom(t,YACS::ABORT,this);
965               }//End of critical section
966               traceExec(t, "state:"+Node::getStateName(t->getState()),ComputePlacement(*iter));
967             }
968         }
969       traceExec(*iter, "state:"+Node::getStateName((*iter)->getState()),ComputePlacement(*iter));
970     }
971
972   //Second phase, execute each task in a thread
973   for(vector<Task *>::const_iterator iter=tasks.begin();iter!=tasks.end();iter++)
974     {
975       launchTask(*iter);
976     }
977 }
978
979 //! Execute a Task in a thread
980 /*!
981  *  \param task  : Task to execute
982  *
983  *  Calls Scheduler::notifyFrom of main node (_mainSched) to notify start
984  *
985  *  Calls Executor::functionForTaskExecution in Thread
986  */
987
988 void Executor::launchTask(Task *task)
989 {
990   DEBTRACE("Executor::launchTask(Task *task)");
991   struct threadargs *args;
992   if(task->getState() != YACS::TOACTIVATE)return;
993
994   DEBTRACE("before _semForMaxThreads.wait " << _semThreadCnt);
995   if(_semThreadCnt == 0)
996     {
997       //check if we have enough threads to run
998       std::set<Task*> tmpSet=_runningTasks;
999       std::set<Task*>::iterator it = tmpSet.begin();
1000       std::string status="running";
1001       std::set<Task*> coupledSet;
1002       while( it != tmpSet.end() )
1003         {
1004           Task* tt=*it;
1005           coupledSet.clear();
1006           tt->getCoupledTasks(coupledSet);
1007           status="running";
1008           for (std::set<Task*>::iterator iter = coupledSet.begin(); iter != coupledSet.end(); ++iter)
1009             {
1010               if((*iter)->getState() == YACS::TOACTIVATE)status="toactivate";
1011               tmpSet.erase(*iter);
1012             }
1013           if(status=="running")break;
1014           it = tmpSet.begin();
1015         }
1016
1017       if(status=="toactivate")
1018         {
1019           std::cerr << "WARNING: maybe you need more threads to run your schema (current value="<< _maxThreads << ")" << std::endl;
1020           std::cerr << "If it is the case, set the YACS_MAX_THREADS environment variable to a bigger value (export YACS_MAX_THREADS=xxx)" << std::endl;
1021         }
1022     }
1023
1024   _semForMaxThreads.wait();
1025   _semThreadCnt -= 1;
1026
1027   args= new threadargs;
1028   args->task = task;
1029   args->sched = _mainSched;
1030   args->execInst = this;
1031
1032   traceExec(task, "launch",ComputePlacement(task));
1033
1034   { // --- Critical section
1035     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1036     _numberOfRunningTasks++;
1037     _runningTasks.insert(task);
1038     task->begin(); //change state to ACTIVATED
1039   } // --- End of critical section
1040   Thread(functionForTaskExecution, args, _threadStackSize);
1041 }
1042
1043 //! wait until a running task ends
1044
1045 void Executor::sleepWhileNoEventsFromAnyRunningTask()
1046 {
1047   DEBTRACE("Executor::sleepWhileNoEventsFromAnyRunningTask()");
1048 //   _semForNewTasksToPerform.wait(); //----utiliser pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
1049   YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1050   if (_numberOfRunningTasks > 0 && _numberOfEndedTasks==0)
1051     {
1052       _isWaitingEventsFromRunningTasks = true;
1053       _condForNewTasksToPerform.wait(_mutexForSchedulerUpdate); // mutex released during wait
1054     }
1055   _numberOfEndedTasks=0;
1056   DEBTRACE("---");
1057 }
1058
1059 //! not implemented
1060
1061 void Executor::notifyEndOfThread(YACS::BASES::Thread *thread)
1062 {
1063   /*_mutexForNbOfConcurrentThreads.lock();
1064   _groupOfAllThreadsCreated.remove(thread);
1065   delete thread;
1066   _mutexForNbOfConcurrentThreads.unlock();*/
1067 }
1068
1069
1070 //! must be used protected by _mutexForSchedulerUpdate!
1071
1072 void Executor::wakeUp()
1073 {
1074   DEBTRACE("Executor::wakeUp() " << _isWaitingEventsFromRunningTasks);
1075   if (_isWaitingEventsFromRunningTasks)
1076     {
1077       _isWaitingEventsFromRunningTasks = false;
1078       _condForNewTasksToPerform.notify_all();
1079     }
1080   else
1081     _numberOfEndedTasks++;
1082 }
1083
1084 //! number of running tasks
1085
1086 int Executor::getNbOfThreads()
1087 {
1088   int ret;
1089   YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForNbOfConcurrentThreads);
1090   _isRunningunderExternalControl=true;
1091   ret = _groupOfAllThreadsCreated.size();
1092   return ret;
1093 }
1094
1095 /*!
1096  * This thread is NOT supposed to be detached !
1097  */
1098 void *Executor::functionForTaskLoad(void *arg)
1099 {
1100   DEBTRACE("Executor::functionForTaskLoad(void *arg)");
1101   struct threadargs *args = (struct threadargs *) arg;
1102   Task *task=args->task;
1103   Scheduler *sched=args->sched;
1104   Executor *execInst=args->execInst;
1105   delete args;
1106   execInst->loadTask(task,execInst);// no throw of this method - all throw are catched !
1107   return 0;
1108 }
1109
1110 //! Function to perform execution of a task in a thread
1111 /*!
1112  *  \param arg  : 3 elements (a Task, a Scheduler, an Executor)
1113  *
1114  *  Calls Task::execute
1115  *
1116  *  Calls Task::finished when the task is finished
1117  *
1118  *  Calls (notify with event YACS::FINISH) Scheduler::notifyFrom when the task is finished
1119  *
1120  *  Calls Executor::wakeUp and Executor::notifyEndOfThread
1121  */
1122
1123 void *Executor::functionForTaskExecution(void *arg)
1124 {
1125   DEBTRACE("Executor::functionForTaskExecution(void *arg)");
1126
1127   struct threadargs *args = (struct threadargs *) arg;
1128   Task *task=args->task;
1129   Scheduler *sched=args->sched;
1130   Executor *execInst=args->execInst;
1131   delete args;
1132   execInst->traceExec(task, "state:"+Node::getStateName(task->getState()),ComputePlacement(task));
1133
1134   Thread::detach();
1135
1136   // Execute task
1137
1138   YACS::Event ev=YACS::FINISH;
1139   try
1140     {
1141       execInst->traceExec(task, "start execution",ComputePlacement(task));
1142       task->execute();
1143       execInst->traceExec(task, "end execution OK",ComputePlacement(task));
1144     }
1145   catch(Exception& ex)
1146     {
1147       std::cerr << "YACS Exception during execute" << std::endl;
1148       std::cerr << ex.what() << std::endl;
1149       ev=YACS::ABORT;
1150       string message = "end execution ABORT, ";
1151       message += ex.what();
1152       execInst->traceExec(task, message,ComputePlacement(task));
1153     }
1154   catch(...) 
1155     {
1156       // Execution has failed
1157       std::cerr << "Execution has failed: unknown reason" << std::endl;
1158       ev=YACS::ABORT;
1159       execInst->traceExec(task, "end execution ABORT, unknown reason",ComputePlacement(task));
1160     }
1161
1162   // Disconnect task
1163   try
1164     {
1165       DEBTRACE("task->disconnectService()");
1166       task->disconnectService();
1167       execInst->traceExec(task, "disconnectService",ComputePlacement(task));
1168     }
1169   catch(...) 
1170     {
1171       // Disconnect has failed
1172       std::cerr << "disconnect has failed" << std::endl;
1173       ev=YACS::ABORT;
1174       execInst->traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
1175     }
1176   //
1177
1178   std::string placement(ComputePlacement(task));
1179
1180   // container management for HomogeneousPoolOfContainer
1181
1182   HomogeneousPoolContainer *contC(dynamic_cast<HomogeneousPoolContainer *>(task->getContainer()));
1183   if(contC)
1184     {
1185       YACS::BASES::AutoLocker<Container> alckCont(contC);
1186       contC->release(task);
1187     }
1188
1189   DEBTRACE("End task->execute()");
1190   { // --- Critical section
1191     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&execInst->_mutexForSchedulerUpdate);
1192     try
1193       {
1194         if (ev == YACS::FINISH) task->finished();
1195         if (ev == YACS::ABORT)
1196           {
1197             execInst->_errorDetected = true;
1198             if (execInst->_stopOnErrorRequested)
1199               {
1200                 execInst->_execMode = YACS::STEPBYSTEP;
1201                 execInst->_isOKToEnd = true;
1202               }
1203             task->aborted();
1204           }
1205         execInst->traceExec(task, "state:"+Node::getStateName(task->getState()),placement);
1206         sched->notifyFrom(task,ev,execInst);
1207       }
1208     catch(Exception& ex)
1209       {
1210         //notify has failed : it is supposed to have set state
1211         //so no need to do anything
1212         std::cerr << "Error during notification" << std::endl;
1213         std::cerr << ex.what() << std::endl;
1214       }
1215     catch(...)
1216       {
1217         //notify has failed : it is supposed to have set state
1218         //so no need to do anything
1219         std::cerr << "Notification failed" << std::endl;
1220       }
1221     execInst->_numberOfRunningTasks--;
1222     execInst->_runningTasks.erase(task);
1223     DEBTRACE("_numberOfRunningTasks: " << execInst->_numberOfRunningTasks 
1224              << " _execMode: " << execInst->_execMode
1225              << " _executorState: " << execInst->_executorState);
1226     if ((execInst->_numberOfRunningTasks == 0) && (execInst->_execMode != YACS::CONTINUE)) // no more running tasks
1227       {
1228         if (execInst->_executorState == YACS::WAITINGTASKS)
1229           {
1230             execInst->_executorState = YACS::PAUSED;
1231             execInst->sendEvent("executor");
1232             execInst->_condForPilot.notify_all();
1233             if (execInst->_errorDetected &&
1234                 execInst->_stopOnErrorRequested &&
1235                 !execInst->_isRunningunderExternalControl)
1236               execInst->_condForStepByStep.notify_all(); // exec thread may be on waitResume
1237           }
1238       }
1239     DEBTRACE("before _semForMaxThreads.post " << execInst->_semThreadCnt);
1240     execInst->_semForMaxThreads.post();
1241     execInst->_semThreadCnt += 1;
1242     DEBTRACE("after _semForMaxThreads.post " << execInst->_semThreadCnt);
1243     if (execInst->_executorState != YACS::PAUSED)  execInst->wakeUp();
1244
1245   } // --- End of critical section (change state)
1246
1247   //execInst->notifyEndOfThread(0);
1248   Thread::exit(0);
1249   return 0;
1250 }
1251
1252 void Executor::traceExec(Task *task, const std::string& message, const std::string& placement)
1253 {
1254   string nodeName = _mainSched->getTaskName(task);
1255   Container *cont = task->getContainer();
1256   string containerName = "---";
1257   if (cont)
1258     containerName = cont->getName();
1259
1260 #ifdef WIN32
1261   DWORD now = timeGetTime();
1262   double elapse = (now - _start)/1000.0;
1263 #else
1264   timeval now;
1265   gettimeofday(&now, NULL);
1266   double elapse = (now.tv_sec - _start.tv_sec) + double(now.tv_usec - _start.tv_usec)/1000000.0;
1267 #endif
1268   {
1269     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForTrace);
1270     _trace << elapse << " " << containerName << " " << placement << " " << nodeName << " " << message << endl;
1271     _trace << flush;
1272   }
1273 }
1274
1275 //! emit notification to all observers registered with  the dispatcher 
1276 /*!
1277  * The dispatcher is unique and can be obtained by getDispatcher()
1278  */
1279 void Executor::sendEvent(const std::string& event)
1280 {
1281   Dispatcher* disp=Dispatcher::getDispatcher();
1282   YASSERT(disp);
1283   YASSERT(_root);
1284   disp->dispatch(_root,event);
1285 }
1286
1287 /*!
1288  * This method takes in input a list of tasks and selects from that lists a part of it considering only the containers.
1289  * If tasks have no container instance subclass of HomogeneousPoolContainer this method will let the \a tsks untouched.
1290  *
1291  * \param [in,out] tsks - list of tasks to be
1292  */
1293 void Executor::FilterTasksConsideringContainers(std::vector<Task *>& tsks)
1294 {
1295   std::map<HomogeneousPoolContainer *, std::vector<Task *> > m;
1296   for(std::vector<Task *>::const_iterator it=tsks.begin();it!=tsks.end();it++)
1297     {
1298       Task *cur(*it);
1299       if(!cur)
1300         continue;
1301       Container *cont(cur->getContainer());
1302       if(!cont)
1303         {
1304           m[(HomogeneousPoolContainer *)NULL].push_back(cur);
1305           continue;
1306         }
1307       HomogeneousPoolContainer *contC(dynamic_cast<HomogeneousPoolContainer *>(cont));
1308       if(!contC)
1309         {
1310           m[(HomogeneousPoolContainer *)NULL].push_back(cur);
1311           continue;
1312         }
1313       m[contC].push_back(cur);
1314     }
1315   //
1316   std::vector<Task *> ret;
1317   for(std::map<HomogeneousPoolContainer *, std::vector<Task *> >::const_iterator it=m.begin();it!=m.end();it++)
1318     {
1319       HomogeneousPoolContainer *curhpc((*it).first);
1320       const std::vector<Task *>& curtsks((*it).second);
1321       if(!curhpc)
1322         {
1323           ret.insert(ret.end(),curtsks.begin(),curtsks.end());
1324         }
1325       else
1326         {
1327           // start of critical section for container curhpc
1328           YACS::BASES::AutoLocker<Container> alckForCont(curhpc);
1329           std::vector<const Task *> vecOfTaskSharingSameHPContToBeRunSimutaneously;
1330           std::size_t sz(curhpc->getNumberOfFreePlace());
1331           std::vector<Task *>::const_iterator it2(curtsks.begin());
1332           for(std::size_t i=0;i<sz && it2!=curtsks.end();i++,it2++)
1333             {
1334               vecOfTaskSharingSameHPContToBeRunSimutaneously.push_back(*it2);
1335               ret.push_back(*it2);
1336             }
1337           curhpc->allocateFor(vecOfTaskSharingSameHPContToBeRunSimutaneously);
1338           //end of critical section
1339         }
1340     }
1341   //
1342   tsks=ret;
1343 }
1344
1345 std::string Executor::ComputePlacement(Task *zeTask)
1346 {
1347   std::string placement("---");
1348   if(!zeTask)
1349     return placement;
1350   if(zeTask->getContainer())
1351     placement=zeTask->getContainer()->getFullPlacementId(zeTask);
1352   return placement;
1353 }