Salome HOME
Fix race condition that could cause SIGSEGV with "Stop on error" mode
[modules/yacs.git] / src / engine / Executor.cxx
1 // Copyright (C) 2006-2015  CEA/DEN, EDF R&D
2 //
3 // This library is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU Lesser General Public
5 // License as published by the Free Software Foundation; either
6 // version 2.1 of the License, or (at your option) any later version.
7 //
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
11 // Lesser General Public License for more details.
12 //
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
16 //
17 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
18 //
19
20 #include "Executor.hxx"
21 #include "Task.hxx"
22 #include "AutoLocker.hxx"
23 #include "Scheduler.hxx"
24 #include "Dispatcher.hxx"
25 #include "Container.hxx"
26 #include "HomogeneousPoolContainer.hxx"
27 #include "ComponentInstance.hxx"
28
29 #include "VisitorSaveState.hxx"
30 #include "ServiceNode.hxx"
31 #include "ComposedNode.hxx"
32
33 #include <iostream>
34 #include <fstream>
35 #include <sys/stat.h>
36 #ifndef WIN32
37 #include <sys/time.h>
38 #include <unistd.h>
39 #endif
40
41 #include <cstdlib>
42 #include <algorithm>
43
44 #ifdef WIN32
45 #define usleep(A) _sleep(A/1000)
46 #if !defined(S_ISCHR) || !defined(S_ISREG)
47 #  ifndef S_IFMT
48 #    ifdef _S_IFMT
49 #      define S_IFMT _S_IFMT
50 #      define S_IFCHR _S_IFCHR
51 #      define S_IFREG _S_IFREG
52 #    else
53 #    ifdef __S_IFMT
54 #      define S_IFMT __S_IFMT
55 #      define S_IFCHR __S_IFCHR
56 #      define S_IFREG __S_IFREG
57 #    endif
58 #    endif
59 #  endif
60 #  define S_ISCHR(mode) (((mode) & S_IFMT) == S_IFCHR)
61 #  define S_ISREG(mode) (((mode) & S_IFMT) == S_IFREG)
62 #endif
63 #endif
64
65 using namespace YACS::ENGINE;
66 using namespace std;
67
68 using YACS::BASES::Mutex;
69 using YACS::BASES::Thread;
70 using YACS::BASES::Semaphore;
71
72 //#define _DEVDEBUG_
73 #include "YacsTrace.hxx"
74
75 int Executor::_maxThreads(50);
76 size_t Executor::_threadStackSize(1048576); // Default thread stack size is 1MB
77
78 Executor::Executor():_nbOfConcurrentThreads(0), _semForMaxThreads(_maxThreads),_keepGoingOnFail(false)
79 {
80   _root=0;
81   _toContinue = true;
82   _isOKToEnd = false;
83   _stopOnErrorRequested = false;
84   _dumpOnErrorRequested = false;
85   _errorDetected = false;
86   _isRunningunderExternalControl=false;
87   _executorState = YACS::NOTYETINITIALIZED;
88   _execMode = YACS::CONTINUE;
89   _semThreadCnt = _maxThreads;
90   DEBTRACE("Executor initialized with max threads = " << _maxThreads);
91 }
92
93 Executor::~Executor()
94 {
95   for(list<Thread *>::iterator iter=_groupOfAllThreadsCreated.begin();iter!=_groupOfAllThreadsCreated.end();iter++)
96     delete *iter;
97 }
98
99 //! Execute a graph waiting for completion
100 /*!
101  *  \param graph : schema to execute
102  *  \param debug : display the graph with dot if debug == 1
103  *  \param fromScratch : if true the graph is reinitialized
104  *
105  *  Calls Scheduler::getNextTasks and Scheduler::selectRunnableTasks to select tasks to execute
106  *  
107  *  Calls Executor::launchTask to execute a selected Task.
108  *
109  *  Completion when graph is finished (Scheduler::isFinished)
110  */
111
112 void Executor::RunA(Scheduler *graph,int debug, bool fromScratch)
113 {
114   DEBTRACE("Executor::RunW debug: " << debug << " fromScratch: " << fromScratch);
115   _mainSched=graph;
116   _root = dynamic_cast<ComposedNode *>(_mainSched);
117   if (!_root) throw Exception("Executor::Run, Internal Error!");
118   bool isMore;
119   int i=0;
120   if(debug>1)_displayDot(graph);
121   if (fromScratch)
122     {
123       graph->init();
124       graph->exUpdateState();
125     }
126   if(debug>1)_displayDot(graph);
127   vector<Task *> tasks;
128   vector<Task *>::iterator iter;
129   _toContinue=true;
130   _execMode = YACS::CONTINUE;
131   _isWaitingEventsFromRunningTasks = false;
132   _numberOfRunningTasks = 0;
133   _runningTasks.clear();
134   _numberOfEndedTasks=0;
135   while(_toContinue)
136     {
137       sleepWhileNoEventsFromAnyRunningTask();
138
139       if(debug>2)_displayDot(graph);
140
141       {//Critical section
142         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
143         tasks=graph->getNextTasks(isMore);
144         graph->selectRunnableTasks(tasks);
145       }//End of critical section
146
147       if(debug>2)_displayDot(graph);
148
149       for(iter=tasks.begin();iter!=tasks.end();iter++)
150         loadTask(*iter,this);
151
152       if(debug>1)_displayDot(graph);
153
154       launchTasks(tasks);
155
156       if(debug>1)_displayDot(graph);
157
158       {//Critical section
159         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
160         _toContinue=!graph->isFinished();
161       }//End of critical section
162       DEBTRACE("_toContinue: " << _toContinue);
163
164       if(debug>0)_displayDot(graph);
165
166       i++;
167     }
168 }
169
170 //! Execute a graph with breakpoints or step by step
171 /*!
172  *  To be launch in a thread (main thread controls the progression).
173  *  \param graph : schema to execute
174  *  \param debug : display the graph with dot if debug >0
175  *  \param fromScratch : if false, state from a previous partial exection is already loaded
176  *
177  *  Calls Scheduler::getNextTasks and Scheduler::selectRunnableTasks to select tasks to execute
178  *  
179  *  Calls Executor::checkBreakPoints to verify if a pause is requested 
180  * 
181  *  Calls Executor::launchTask to execute a selected Task
182  *
183  *  Completion when graph is finished (Scheduler::isFinished)
184  *
185  *  States of execution:
186  *  - YACS::NOTYETINITIALIZED
187  *  - YACS::INITIALISED
188  *  - YACS::RUNNING            (to next breakpoint or step)
189  *  - YACS::WAITINGTASKS       (a breakpoint or step as been reached, but there are still running tasks)
190  *  - YACS::PAUSED             (a breakpoint or step as been reached, no more running tasks)
191  *  - YACS::FINISHED           (no more ready tasks, nore running tasks)
192  *  - YACS::STOPPED            (stopped by user before end)
193  *
194  *  Modes of Execution:
195  *  - YACS::CONTINUE           (normal run without breakpoints)
196  *  - YACS::STEPBYSTEP         (pause at each loop)
197  *  - YACS::STOPBEFORENODES    (pause when a node is reached)
198  *
199  *  A breakpoint is defined by a node name. The breakpoint is reached when the node becomes ready.
200  *  Step by Step means execution node by node or group of node by group of nodes.
201  *  At a given step, the user decides to launch all the ready nodes or only a subset
202  *  (Caution: some nodes must run in parallel). 
203  *  The next event (end of task) may give a new set of ready nodes, and define a new step.
204  *
205  *  The graph execution may be controled by a pilot which sends requests. Requests are asynchronous.
206  *  Requests are taken into account only on certain states, otherwise return the status IgnoredRequest.
207  *  - Executor::getCurrentExecMode
208  *  - Executor::getExecutorState
209  *  - Executor::setExecMode             : change the execution mode for next loop
210  *  - Executor::setListOfBreakPoints    : must be set before setting YACS::STOPBEFORENODES
211  *  - Executor::getTasksToLoad          : when paused or waiting tasks, get the list of next tasks
212  *  - Executor::setStepsToExecute       : define a subset of the list given by Executor::getTasksToLoad
213  *  - Executor::resumeCurrentBreakPoint : when paused or waiting tasks, resumes execution
214  *  - Executor::isNotFinished
215  *  - Executor::stopExecution           : stop execution asap, i.e. set STEPBYSTEP and wait PAUSED
216  *  - Executor::saveState               : dump the current state of execution in an xml file
217  *  - Executor::loadState               : Not yet implemented
218  *  - Executor::getNbOfThreads
219  *  - Executor::displayDot
220  *  - Executor::setStopOnError          : ask to stop execution if a node is found in ERROR state
221  *
222  *  If the pilot wants to wait the state YACS::PAUSED or YACS::WAITINGTASKS, synchronisation is obtained with:
223  *  - Executor::waitPause
224  *
225  *  TO BE VALIDATED:
226  *  - Pilot may connect to executor during execution, or deconnect.
227  *  - Several Pilots may be connected at the same time (for observation...)
228  * 
229  */
230
231 void Executor::RunB(Scheduler *graph,int debug, bool fromScratch)
232 {
233   DEBTRACE("Executor::RunB debug: "<< graph->getName() <<" "<< debug<<" fromScratch: "<<fromScratch);
234
235   { // --- Critical section
236     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
237     _mainSched = graph;
238     _root = dynamic_cast<ComposedNode *>(_mainSched);
239     if (!_root) throw Exception("Executor::Run, Internal Error!");
240     _executorState = YACS::NOTYETINITIALIZED;
241     sendEvent("executor");
242     _toContinue=true;
243     _isOKToEnd = false;
244     _errorDetected = false;
245     _isWaitingEventsFromRunningTasks = false;
246     _numberOfRunningTasks = 0;
247     _runningTasks.clear();
248     _numberOfEndedTasks = 0;
249     string tracefile = "traceExec_";
250     tracefile += _mainSched->getName();
251     _trace.open(tracefile.c_str());
252 #ifdef WIN32
253    _start = timeGetTime();
254 #else
255     gettimeofday(&_start, NULL);
256 #endif
257
258   } // --- End of critical section
259
260   if (debug > 1) _displayDot(graph);
261
262   if (fromScratch)
263     {
264       try
265         {
266           graph->init();
267           graph->exUpdateState();
268         }
269       catch(Exception& ex)
270         {
271           DEBTRACE("exception: "<< (ex.what()));
272           _executorState = YACS::FINISHED;
273           sendEvent("executor");
274           throw;
275         }
276     }
277   _executorState = YACS::INITIALISED;
278   sendEvent("executor");
279
280   if (debug > 1) _displayDot(graph);
281
282   vector<Task *>::iterator iter;
283   bool isMore;
284   int problemCount=0;
285   int numberAllTasks;
286
287   _executorState = YACS::RUNNING;
288   sendEvent("executor");
289   while (_toContinue)
290     {
291       DEBTRACE("--- executor main loop");
292       sleepWhileNoEventsFromAnyRunningTask();
293       DEBTRACE("--- events...");
294       if (debug > 2) _displayDot(graph);
295       { // --- Critical section
296         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
297         _tasks=graph->getNextTasks(isMore);
298         numberAllTasks=_numberOfRunningTasks+_tasks.size();
299         graph->selectRunnableTasks(_tasks);
300         FilterTasksConsideringContainers(_tasks);
301       } // --- End of critical section
302       if (debug > 2) _displayDot(graph);
303       if (_executorState == YACS::RUNNING)
304         {
305           if (checkBreakPoints()) break; // end of thread requested, OK to exit at once;
306           if (debug > 0) _displayDot(graph);
307           DEBTRACE("---");
308           //loadTasks(_tasks);
309           loadParallelTasks(_tasks,this);
310           if (debug > 1) _displayDot(graph);
311           DEBTRACE("---");
312           launchTasks(_tasks);
313           DEBTRACE("---");
314         }
315       if (debug > 1) _displayDot(graph);
316       { // --- Critical section
317         DEBTRACE("---");
318         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
319         //It is possible that the graph is finished but it remains running tasks (it's an error but we must take it into account)
320         if(_numberOfRunningTasks == 0)
321           _toContinue = !graph->isFinished();
322
323         DEBTRACE("_numberOfRunningTasks: " << _numberOfRunningTasks);
324         DEBTRACE("_numberOfEndedTasks: " << _numberOfEndedTasks);
325         DEBTRACE("_toContinue: " << _toContinue);
326         if(_toContinue && numberAllTasks==0)
327           {
328             //Problem : no running tasks and no task to launch ??
329             problemCount++;
330             std::cerr << "Problem in Executor : no running tasks and no task to launch ?? problemCount=" << problemCount << std::endl;
331             //Pause to give a chance to interrupt
332             usleep(1000);
333             if(problemCount > 25)
334               {
335                 // Too much problems encountered : stop execution
336                 _toContinue=false;
337               }
338           }
339
340         if (! _toContinue)
341           {
342             _executorState = YACS::FINISHED;
343             sendEvent("executor");
344             _condForPilot.notify_all();
345           }
346       } // --- End of critical section
347       if (debug > 0) _displayDot(graph);
348       DEBTRACE("_toContinue: " << _toContinue);
349     }
350
351   DEBTRACE("End of main Loop");
352
353   { // --- Critical section
354     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
355     if ( _toContinue) // --- break while(): request to stop detected on checkBreakPoints()
356       {
357         DEBTRACE("stop requested: End soon");
358         _executorState = YACS::STOPPED;
359         _toContinue = false;
360         sendEvent("executor");
361       }
362   } // --- End of critical section
363   if ( _dumpOnErrorRequested && _errorDetected)
364     {
365       saveState(_dumpErrorFile);
366     }
367   {
368     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForTrace);
369     _trace.close();
370   }
371   DEBTRACE("End of RunB thread");  
372 }
373
374 YACS::ExecutionMode Executor::getCurrentExecMode()
375 {
376   _isRunningunderExternalControl=true;
377   return _execMode;
378 }
379
380
381 YACS::ExecutorState Executor::getExecutorState()
382 {
383   _isRunningunderExternalControl=true;
384   return _executorState;
385 }
386
387
388 bool Executor::isNotFinished()
389 {
390   _isRunningunderExternalControl=true;
391   return _toContinue;
392 }
393
394 //! ask to stop execution on the first node found in error
395 /*!
396  * \param dumpRequested   produce a state dump when an error is found
397  * \param xmlFile         name of file used for state dump
398  */
399
400 void Executor::setStopOnError(bool dumpRequested, std::string xmlFile)
401 {
402   { // --- Critical section
403     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
404     _dumpErrorFile=xmlFile;
405     _stopOnErrorRequested=true;
406     _dumpOnErrorRequested = dumpRequested;
407     if (dumpRequested && xmlFile.empty())
408       throw YACS::Exception("dump on error requested and no filename given for dump");
409     DEBTRACE("_dumpErrorFile " << _dumpErrorFile << " " << _dumpOnErrorRequested);
410   } // --- End of critical section
411 }
412
413 //! ask to do not stop execution on nodes found in error
414 /*!
415  */
416
417 void Executor::unsetStopOnError()
418 {
419   { // --- Critical section
420     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
421     _stopOnErrorRequested=false;
422   } // --- End of critical section
423 }
424
425 //! Dynamically set the current mode of execution
426 /*!
427  * The mode can be Continue, step by step, or stop before execution of a node
428  * defined in a list of breakpoints.
429  */
430
431 void Executor::setExecMode(YACS::ExecutionMode mode)
432 {
433   DEBTRACE("Executor::setExecMode(YACS::ExecutionMode mode) " << mode);
434   { // --- Critical section
435     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
436     _isRunningunderExternalControl=true;
437     _execMode = mode;
438   } // --- End of critical section
439 }
440
441 //! wake up executor when in pause
442 /*!
443  * When Executor is in state paused or waiting for task completion, the thread
444  * running loop RunB waits on condition _condForStepByStep.
445  * Thread RunB is waken up.
446  * \return true when actually wakes up executor
447  */
448
449 bool Executor::resumeCurrentBreakPoint()
450 {
451   DEBTRACE("Executor::resumeCurrentBreakPoint()");
452   bool ret = false;
453   //bool doDump = false;
454   { // --- Critical section
455     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
456     _isRunningunderExternalControl=true;
457     DEBTRACE("_executorState: " << _executorState);
458     switch (_executorState)
459       {
460       case YACS::WAITINGTASKS:
461       case YACS::PAUSED:
462         {
463           _condForStepByStep.notify_all();
464           _executorState = YACS::RUNNING;
465           sendEvent("executor");
466           ret = true;
467           //if (_dumpOnErrorRequested && _errorDetected) doDump =true;
468           break;
469         }
470       case YACS::FINISHED:
471       case YACS::STOPPED:
472         {
473           //if (_dumpOnErrorRequested && _errorDetected) doDump =true;
474           DEBTRACE("Graph Execution finished or stopped !");
475           break;
476         }
477       default :
478         {
479           // debug: no easy way to verify if main loop is acutally waiting on condition
480         }
481       }
482     DEBTRACE("---");
483     //if (doDump) saveState(_dumpErrorFile);
484   } // --- End of critical section
485   return ret;
486 }
487
488
489 //! define a list of nodes names as breakpoints in the graph
490
491
492 void Executor::setListOfBreakPoints(std::list<std::string> listOfBreakPoints)
493 {
494   DEBTRACE("Executor::setListOfBreakPoints(std::list<std::string> listOfBreakPoints)");
495   { // --- Critical section
496     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
497     _isRunningunderExternalControl=true;
498     _listOfBreakPoints = listOfBreakPoints;
499   } // --- End of critical section
500 }
501
502
503 //! Get the list of tasks to load, to define a subset to execute in step by step mode
504 /*!
505  *  If the executor is not in mode YACS::WAITINGTASKS nor YACS::PAUSED, the list is empty.
506  *  Use Executor::waitPause to wait.
507  */
508 std::list<std::string> Executor::getTasksToLoad()
509 {
510   DEBTRACE("Executor::getTasksToLoad()");
511   list<string> listOfNodesToLoad;
512   listOfNodesToLoad.clear();
513   { // --- Critical section
514     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
515     _isRunningunderExternalControl=true;
516     switch (_executorState)
517       {
518       case YACS::WAITINGTASKS:
519       case YACS::PAUSED:
520         {
521           listOfNodesToLoad = _listOfTasksToLoad;
522           break;
523         }
524       case YACS::NOTYETINITIALIZED:
525       case YACS::INITIALISED:
526       case YACS::RUNNING:
527       case YACS::FINISHED:
528       case YACS::STOPPED:
529       default:
530         {
531           break;
532         }
533       }
534   } // --- End of critical section
535   return listOfNodesToLoad;
536 }
537
538
539 //! Define a subset of task to execute in step by step mode
540 /*!
541  * Behaviour is unpredictable if the list is not a subset of the list given by Executor::getTasksToLoad
542  * in the current step.
543  * If some nodes must run in parallel, they must stay together in the list.
544  */
545
546 bool Executor::setStepsToExecute(std::list<std::string> listToExecute)
547 {
548   DEBTRACE("Executor::setStepsToExecute(std::list<std::string> listToExecute)");
549   bool ret = true;
550   vector<Task *>::iterator iter;
551   vector<Task *> restrictedTasks;
552   { // --- Critical section
553     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
554     _isRunningunderExternalControl=true;
555     switch (_executorState)
556       {
557       case YACS::WAITINGTASKS:
558       case YACS::PAUSED:
559         {
560           for (iter=_tasksSave.begin(); iter!=_tasksSave.end(); iter++)
561             {
562               string readyNode = _mainSched->getTaskName(*iter);
563               if (find(listToExecute.begin(), listToExecute.end(), readyNode)
564                   != listToExecute.end())
565                 {
566                   restrictedTasks.push_back(*iter);
567                   DEBTRACE("node to execute " << readyNode);
568                 }
569             }
570           _tasks.clear();
571           for (iter=restrictedTasks.begin(); iter!=restrictedTasks.end(); iter++)
572             {
573               _tasks.push_back(*iter);
574             }
575           break;
576         }
577       case YACS::NOTYETINITIALIZED:
578       case YACS::INITIALISED:
579       case YACS::RUNNING:
580       case YACS::FINISHED:
581       case YACS::STOPPED:
582       default:
583         {
584           break;
585         }
586       }
587     } // --- End of critical section
588
589   _tasks.clear();
590   for (iter=restrictedTasks.begin(); iter!=restrictedTasks.end(); iter++)
591     {
592       _tasks.push_back(*iter);
593     }
594   for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
595     {
596       string readyNode = _mainSched->getTaskName(*iter);
597       DEBTRACE("selected node to execute " << readyNode);
598     }
599
600   return ret;
601 }
602
603 //! suspend pilot execution until Executor is in pause or waiting tasks completion mode.
604 /*!
605  *  Do nothing if execution is finished or in pause.
606  *  Wait first step if Executor is running or in initialization.
607  */
608
609 void Executor::waitPause()
610 {
611   DEBTRACE("Executor::waitPause()" << _executorState);
612   { // --- Critical section
613     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
614     _isRunningunderExternalControl=true;
615     switch (_executorState)
616       {
617       default:
618       case YACS::STOPPED:
619       case YACS::FINISHED:
620       case YACS::WAITINGTASKS:
621       case YACS::PAUSED:
622         {
623           break;
624         }
625       case YACS::NOTYETINITIALIZED:
626       case YACS::INITIALISED:
627       case YACS::RUNNING:
628         {
629           _condForPilot.wait(_mutexForSchedulerUpdate); // wait until executor is PAUSED or WAITINGTASKS
630           break;
631         }
632       }
633   } // --- End of critical section
634   DEBTRACE("---");
635 }
636
637 //! stops the execution as soon as possible 
638
639 void Executor::stopExecution()
640 {
641   setExecMode(YACS::STEPBYSTEP);
642   //waitPause();
643   _isOKToEnd = true;
644   resumeCurrentBreakPoint();
645 }
646
647 //! save the current state of execution in an xml file
648
649 bool Executor::saveState(const std::string& xmlFile)
650 {
651   DEBTRACE("Executor::saveState() in " << xmlFile);
652   YACS::ENGINE::VisitorSaveState vst(_root);
653   vst.openFileDump(xmlFile.c_str());
654   _root->accept(&vst);
655   vst.closeFileDump();
656   return true;
657 }
658
659 //! not yet implemented
660
661 bool Executor::loadState()
662 {
663   DEBTRACE("Executor::loadState()");
664   _isRunningunderExternalControl=true;
665   return true;
666 }
667
668
669 static int isfile(const char *filename) 
670 {
671   struct stat buf;
672   if (stat(filename, &buf) != 0)
673     return 0;
674   if (!S_ISREG(buf.st_mode))
675     return 0;
676   return 1;
677 }
678
679 //! Display the graph state as a dot display, public method
680
681 void Executor::displayDot(Scheduler *graph)
682 {
683   _isRunningunderExternalControl=true;
684   _displayDot(graph);
685 }
686
687 //! Display the graph state as a dot display
688 /*!
689  *  \param graph  : the node to display
690  */
691
692 void Executor::_displayDot(Scheduler *graph)
693 {
694    std::ofstream g("titi");
695    ((ComposedNode*)graph)->writeDot(g);
696    g.close();
697    const char displayScript[]="display.sh";
698    if(isfile(displayScript))
699      system("sh display.sh");
700    else
701      system("dot -Tpng titi|display -delay 5");
702 }
703
704 //! Wait reactivation in modes Step By step or with BreakPoints
705 /*!
706  *  Check mode of execution (set by main thread):
707  *  - YACS::CONTINUE        : the graph execution continues.
708  *  - YACS::STEPBYSTEP      : wait on condition (pilot thread, Executor::resumeCurrentBreakPoint)
709  *  - YACS::STOPBEFORENODES : if there are ready nodes in a list of breakpoints,
710  *                            wait on condition (pilot thread, Executor::resumeCurrentBreakPoint)
711  *                            else continue the graph execution.
712  *  \return true if end of executor thread is requested
713  */
714
715 bool Executor::checkBreakPoints()
716 {
717   DEBTRACE("Executor::checkBreakPoints()");
718   vector<Task *>::iterator iter;
719   bool endRequested = false;
720
721   switch (_execMode)
722     {
723     case YACS::CONTINUE:
724       {
725         break;
726       }
727     case YACS::STOPBEFORENODES:
728       {
729         bool stop = false;
730         { // --- Critical section
731           YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
732           _tasksSave = _tasks;
733           for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
734             {
735               string nodeToLoad = _mainSched->getTaskName(*iter);
736               if (find(_listOfBreakPoints.begin(), _listOfBreakPoints.end(), nodeToLoad)
737                   != _listOfBreakPoints.end())
738                 {
739                   stop = true;
740                   break;
741                 }
742             }
743           if (stop)
744             {
745               _listOfTasksToLoad.clear();
746               for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
747                 {
748                   string nodeToLoad = _mainSched->getTaskName(*iter);
749                   _listOfTasksToLoad.push_back(nodeToLoad);
750                 }
751               if (getNbOfThreads())
752                 _executorState = YACS::WAITINGTASKS; // will be paused after completion of running tasks
753               else
754                 _executorState = YACS::PAUSED;
755               sendEvent("executor");
756               _condForPilot.notify_all();
757             }
758           if (stop && !_isOKToEnd) waitResume(); // wait until pilot calls resumeCurrentBreakPoint(), mutex released during wait 
759           if (_isOKToEnd) endRequested = true;
760         } // --- End of critical section
761           if (stop) DEBTRACE("wake up from waitResume");
762         break;
763       }
764     default:
765     case YACS::STEPBYSTEP:
766       {
767         { // --- Critical section
768           YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
769           _tasksSave = _tasks;
770           _listOfTasksToLoad.clear();
771           for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
772             {
773               string nodeToLoad = _mainSched->getTaskName(*iter);
774               _listOfTasksToLoad.push_back(nodeToLoad);
775             }
776           if (getNbOfThreads())
777             _executorState = YACS::WAITINGTASKS; // will be paused after completion of running tasks
778           else
779             _executorState = YACS::PAUSED;
780           sendEvent("executor");
781           _condForPilot.notify_all();
782           if (!_isOKToEnd)
783             waitResume(); // wait until pilot calls resumeCurrentBreakPoint(), mutex released during wait
784                           // or, if no pilot, wait until no more running tasks (stop on error)
785           if (_isOKToEnd) endRequested = true;
786         } // --- End of critical section
787         DEBTRACE("wake up from waitResume");
788         break;
789       }
790     }
791   DEBTRACE("endRequested: " << endRequested);
792   return endRequested;
793 }
794
795
796 //! in modes Step By step or with BreakPoint, wait until pilot resumes the execution
797 /*!
798  *  With the condition Mutex, the mutex is released atomically during the wait.
799  *  Pilot calls Executor::resumeCurrentBreakPoint to resume execution.
800  *  Must be called while mutex is locked.
801  */
802
803 void Executor::waitResume()
804 {
805   DEBTRACE("Executor::waitResume()");
806   _condForStepByStep.wait(_mutexForSchedulerUpdate); // wait until pilot calls resumeCurrentBreakPoint()
807   DEBTRACE("---");
808 }
809
810
811 //! Perform loading of a Task.
812 /*!
813  *  \param task  : Task to load
814  */
815
816 void Executor::loadTask(Task *task, const Executor *execInst)
817 {
818   DEBTRACE("Executor::loadTask(Task *task)");
819   if(task->getState() != YACS::TOLOAD)
820     return;
821   traceExec(task, "state:TOLOAD", ComputePlacement(task));
822   {//Critical section
823     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
824     _mainSched->notifyFrom(task,YACS::START,execInst);
825   }//End of critical section
826   try
827     {
828       traceExec(task, "load", ComputePlacement(task));
829       task->load();
830       traceExec(task, "initService", ComputePlacement(task));
831       task->initService();
832     }
833   catch(Exception& ex) 
834     {
835       std::cerr << ex.what() << std::endl;
836       {//Critical section
837         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
838         task->aborted();
839         _mainSched->notifyFrom(task,YACS::ABORT,execInst);
840         traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
841       }//End of critical section
842     }
843   catch(...) 
844     {
845       std::cerr << "Load failed" << std::endl;
846       {//Critical section
847         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
848         task->aborted();
849         _mainSched->notifyFrom(task,YACS::ABORT,execInst);
850         traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
851       }//End of critical section
852     }
853 }
854
855 struct threadargs
856 {
857   Task *task;
858   Scheduler *sched;
859   Executor *execInst;
860 };
861
862 void Executor::loadTasks(const std::vector<Task *>& tasks, const Executor *execInst)
863 {
864   for(std::vector<Task *>::const_iterator iter = _tasks.begin(); iter != _tasks.end(); iter++)
865     loadTask(*iter,execInst);
866 }
867
868 void Executor::loadParallelTasks(const std::vector<Task *>& tasks, const Executor *execInst)
869 {
870   std::vector<Thread> ths(tasks.size());
871   std::size_t ithread(0);
872   for(std::vector<Task *>::const_iterator iter = _tasks.begin(); iter != _tasks.end(); iter++, ithread++)
873     {
874       DEBTRACE("Executor::loadParallelTasks(Task *task)");
875       struct threadargs *args(new threadargs);
876       args->task = (*iter);
877       args->sched = _mainSched;
878       args->execInst = this;
879       ths[ithread].go(functionForTaskLoad, args, _threadStackSize);
880     }
881   for(ithread=0;ithread<tasks.size();ithread++)
882     ths[ithread].join();
883 }
884
885 //! Execute a list of tasks possibly connected through datastream links
886 /*!
887  *  \param tasks  : a list of tasks to execute
888  *
889  */
890 void Executor::launchTasks(const std::vector<Task *>& tasks)
891 {
892   //First phase, make datastream connections
893   for(vector<Task *>::const_iterator iter=tasks.begin();iter!=tasks.end();iter++)
894     {
895       YACS::StatesForNode state=(*iter)->getState();
896       if(state != YACS::TOLOAD && state != YACS::TORECONNECT)continue;
897       try
898         {
899           (*iter)->connectService();
900           traceExec(*iter, "connectService",ComputePlacement(*iter));
901           {//Critical section
902             YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
903             (*iter)->connected();
904           }//End of critical section
905         }
906       catch(Exception& ex) 
907         {
908           std::cerr << ex.what() << std::endl;
909           try
910             {
911               (*iter)->disconnectService();
912               traceExec(*iter, "disconnectService",ComputePlacement(*iter));
913             }
914           catch(...) 
915             {
916               // Disconnect has failed
917               traceExec(*iter, "disconnectService failed, ABORT",ComputePlacement(*iter));
918             }
919           {//Critical section
920             YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
921             (*iter)->aborted();
922             _mainSched->notifyFrom(*iter,YACS::ABORT,this);
923           }//End of critical section
924         }
925       catch(...) 
926         {
927           std::cerr << "Problem in connectService" << std::endl;
928           try
929             {
930               (*iter)->disconnectService();
931               traceExec(*iter, "disconnectService",ComputePlacement(*iter));
932             }
933           catch(...) 
934             {
935               // Disconnect has failed
936               traceExec(*iter, "disconnectService failed, ABORT",ComputePlacement(*iter));
937             }
938           {//Critical section
939             YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
940             (*iter)->aborted();
941             _mainSched->notifyFrom(*iter,YACS::ABORT,this);
942           }//End of critical section
943         }
944       if((*iter)->getState() == YACS::ERROR)
945         {
946           //try to put all coupled tasks in error
947           std::set<Task*> coupledSet;
948           (*iter)->getCoupledTasks(coupledSet);
949           for (std::set<Task*>::iterator it = coupledSet.begin(); it != coupledSet.end(); ++it)
950             {
951               Task* t=*it;
952               if(t == *iter)continue;
953               if(t->getState() == YACS::ERROR)continue;
954               try
955                 {
956                   t->disconnectService();
957                   traceExec(t, "disconnectService",ComputePlacement(*iter));
958                 }
959               catch(...)
960                 {
961                   // Disconnect has failed
962                   traceExec(t, "disconnectService failed, ABORT",ComputePlacement(*iter));
963                 }
964               {//Critical section
965                 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
966                 t->aborted();
967                 _mainSched->notifyFrom(t,YACS::ABORT,this);
968               }//End of critical section
969               traceExec(t, "state:"+Node::getStateName(t->getState()),ComputePlacement(*iter));
970             }
971         }
972       traceExec(*iter, "state:"+Node::getStateName((*iter)->getState()),ComputePlacement(*iter));
973     }
974
975   //Second phase, execute each task in a thread
976   for(vector<Task *>::const_iterator iter=tasks.begin();iter!=tasks.end();iter++)
977     {
978       launchTask(*iter);
979     }
980 }
981
982 //! Execute a Task in a thread
983 /*!
984  *  \param task  : Task to execute
985  *
986  *  Calls Scheduler::notifyFrom of main node (_mainSched) to notify start
987  *
988  *  Calls Executor::functionForTaskExecution in Thread
989  */
990
991 void Executor::launchTask(Task *task)
992 {
993   DEBTRACE("Executor::launchTask(Task *task)");
994   struct threadargs *args;
995   if(task->getState() != YACS::TOACTIVATE)return;
996
997   DEBTRACE("before _semForMaxThreads.wait " << _semThreadCnt);
998   if(_semThreadCnt == 0)
999     {
1000       //check if we have enough threads to run
1001       std::set<Task*> tmpSet=_runningTasks;
1002       std::set<Task*>::iterator it = tmpSet.begin();
1003       std::string status="running";
1004       std::set<Task*> coupledSet;
1005       while( it != tmpSet.end() )
1006         {
1007           Task* tt=*it;
1008           coupledSet.clear();
1009           tt->getCoupledTasks(coupledSet);
1010           status="running";
1011           for (std::set<Task*>::iterator iter = coupledSet.begin(); iter != coupledSet.end(); ++iter)
1012             {
1013               if((*iter)->getState() == YACS::TOACTIVATE)status="toactivate";
1014               tmpSet.erase(*iter);
1015             }
1016           if(status=="running")break;
1017           it = tmpSet.begin();
1018         }
1019
1020       if(status=="toactivate")
1021         {
1022           std::cerr << "WARNING: maybe you need more threads to run your schema (current value="<< _maxThreads << ")" << std::endl;
1023           std::cerr << "If it is the case, set the YACS_MAX_THREADS environment variable to a bigger value (export YACS_MAX_THREADS=xxx)" << std::endl;
1024         }
1025     }
1026
1027   _semForMaxThreads.wait();
1028   _semThreadCnt -= 1;
1029
1030   args= new threadargs;
1031   args->task = task;
1032   args->sched = _mainSched;
1033   args->execInst = this;
1034
1035   traceExec(task, "launch",ComputePlacement(task));
1036
1037   { // --- Critical section
1038     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1039     _numberOfRunningTasks++;
1040     _runningTasks.insert(task);
1041     task->begin(); //change state to ACTIVATED
1042   } // --- End of critical section
1043   Thread(functionForTaskExecution, args, _threadStackSize);
1044 }
1045
1046 //! wait until a running task ends
1047
1048 void Executor::sleepWhileNoEventsFromAnyRunningTask()
1049 {
1050   DEBTRACE("Executor::sleepWhileNoEventsFromAnyRunningTask()");
1051 //   _semForNewTasksToPerform.wait(); //----utiliser pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
1052   YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1053   if (_numberOfRunningTasks > 0 && _numberOfEndedTasks==0)
1054     {
1055       _isWaitingEventsFromRunningTasks = true;
1056       _condForNewTasksToPerform.wait(_mutexForSchedulerUpdate); // mutex released during wait
1057     }
1058   _numberOfEndedTasks=0;
1059   DEBTRACE("---");
1060 }
1061
1062 //! not implemented
1063
1064 void Executor::notifyEndOfThread(YACS::BASES::Thread *thread)
1065 {
1066   /*_mutexForNbOfConcurrentThreads.lock();
1067   _groupOfAllThreadsCreated.remove(thread);
1068   delete thread;
1069   _mutexForNbOfConcurrentThreads.unlock();*/
1070 }
1071
1072
1073 //! must be used protected by _mutexForSchedulerUpdate!
1074
1075 void Executor::wakeUp()
1076 {
1077   DEBTRACE("Executor::wakeUp() " << _isWaitingEventsFromRunningTasks);
1078   if (_isWaitingEventsFromRunningTasks)
1079     {
1080       _isWaitingEventsFromRunningTasks = false;
1081       _condForNewTasksToPerform.notify_all();
1082     }
1083   else
1084     _numberOfEndedTasks++;
1085 }
1086
1087 //! number of running tasks
1088
1089 int Executor::getNbOfThreads()
1090 {
1091   int ret;
1092   YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForNbOfConcurrentThreads);
1093   _isRunningunderExternalControl=true;
1094   ret = _groupOfAllThreadsCreated.size();
1095   return ret;
1096 }
1097
1098 /*!
1099  * This thread is NOT supposed to be detached !
1100  */
1101 void *Executor::functionForTaskLoad(void *arg)
1102 {
1103   DEBTRACE("Executor::functionForTaskLoad(void *arg)");
1104   struct threadargs *args = (struct threadargs *) arg;
1105   Task *task=args->task;
1106   Scheduler *sched=args->sched;
1107   Executor *execInst=args->execInst;
1108   delete args;
1109   execInst->loadTask(task,execInst);// no throw of this method - all throw are catched !
1110   return 0;
1111 }
1112
1113 //! Function to perform execution of a task in a thread
1114 /*!
1115  *  \param arg  : 3 elements (a Task, a Scheduler, an Executor)
1116  *
1117  *  Calls Task::execute
1118  *
1119  *  Calls Task::finished when the task is finished
1120  *
1121  *  Calls (notify with event YACS::FINISH) Scheduler::notifyFrom when the task is finished
1122  *
1123  *  Calls Executor::wakeUp and Executor::notifyEndOfThread
1124  */
1125
1126 void *Executor::functionForTaskExecution(void *arg)
1127 {
1128   DEBTRACE("Executor::functionForTaskExecution(void *arg)");
1129
1130   struct threadargs *args = (struct threadargs *) arg;
1131   Task *task=args->task;
1132   Scheduler *sched=args->sched;
1133   Executor *execInst=args->execInst;
1134   delete args;
1135   execInst->traceExec(task, "state:"+Node::getStateName(task->getState()),ComputePlacement(task));
1136
1137   Thread::detach();
1138
1139   // Execute task
1140
1141   YACS::Event ev=YACS::FINISH;
1142   try
1143     {
1144       execInst->traceExec(task, "start execution",ComputePlacement(task));
1145       task->execute();
1146       execInst->traceExec(task, "end execution OK",ComputePlacement(task));
1147     }
1148   catch(Exception& ex)
1149     {
1150       std::cerr << "YACS Exception during execute" << std::endl;
1151       std::cerr << ex.what() << std::endl;
1152       ev=YACS::ABORT;
1153       string message = "end execution ABORT, ";
1154       message += ex.what();
1155       execInst->traceExec(task, message,ComputePlacement(task));
1156     }
1157   catch(...) 
1158     {
1159       // Execution has failed
1160       std::cerr << "Execution has failed: unknown reason" << std::endl;
1161       ev=YACS::ABORT;
1162       execInst->traceExec(task, "end execution ABORT, unknown reason",ComputePlacement(task));
1163     }
1164
1165   // Disconnect task
1166   try
1167     {
1168       DEBTRACE("task->disconnectService()");
1169       task->disconnectService();
1170       execInst->traceExec(task, "disconnectService",ComputePlacement(task));
1171     }
1172   catch(...) 
1173     {
1174       // Disconnect has failed
1175       std::cerr << "disconnect has failed" << std::endl;
1176       ev=YACS::ABORT;
1177       execInst->traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
1178     }
1179   //
1180
1181   std::string placement(ComputePlacement(task));
1182
1183   // container management for HomogeneousPoolOfContainer
1184
1185   HomogeneousPoolContainer *contC(dynamic_cast<HomogeneousPoolContainer *>(task->getContainer()));
1186   if(contC)
1187     {
1188       YACS::BASES::AutoLocker<Container> alckCont(contC);
1189       contC->release(task);
1190     }
1191
1192   DEBTRACE("End task->execute()");
1193   { // --- Critical section
1194     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&execInst->_mutexForSchedulerUpdate);
1195     try
1196       {
1197         if (ev == YACS::FINISH) task->finished();
1198         if (ev == YACS::ABORT)
1199           {
1200             execInst->_errorDetected = true;
1201             if (execInst->_stopOnErrorRequested)
1202               {
1203                 execInst->_execMode = YACS::STEPBYSTEP;
1204                 execInst->_isOKToEnd = true;
1205               }
1206             task->aborted();
1207           }
1208         execInst->traceExec(task, "state:"+Node::getStateName(task->getState()),placement);
1209         sched->notifyFrom(task,ev,execInst);
1210       }
1211     catch(Exception& ex)
1212       {
1213         //notify has failed : it is supposed to have set state
1214         //so no need to do anything
1215         std::cerr << "Error during notification" << std::endl;
1216         std::cerr << ex.what() << std::endl;
1217       }
1218     catch(...)
1219       {
1220         //notify has failed : it is supposed to have set state
1221         //so no need to do anything
1222         std::cerr << "Notification failed" << std::endl;
1223       }
1224     execInst->_numberOfRunningTasks--;
1225     execInst->_runningTasks.erase(task);
1226     DEBTRACE("_numberOfRunningTasks: " << execInst->_numberOfRunningTasks 
1227              << " _execMode: " << execInst->_execMode
1228              << " _executorState: " << execInst->_executorState);
1229     if ((execInst->_numberOfRunningTasks == 0) && (execInst->_execMode != YACS::CONTINUE)) // no more running tasks
1230       {
1231         if (execInst->_executorState == YACS::WAITINGTASKS)
1232           {
1233             execInst->_executorState = YACS::PAUSED;
1234             execInst->sendEvent("executor");
1235             execInst->_condForPilot.notify_all();
1236             if (execInst->_errorDetected &&
1237                 execInst->_stopOnErrorRequested &&
1238                 !execInst->_isRunningunderExternalControl)
1239               execInst->_condForStepByStep.notify_all(); // exec thread may be on waitResume
1240           }
1241       }
1242     DEBTRACE("before _semForMaxThreads.post " << execInst->_semThreadCnt);
1243     execInst->_semForMaxThreads.post();
1244     execInst->_semThreadCnt += 1;
1245     DEBTRACE("after _semForMaxThreads.post " << execInst->_semThreadCnt);
1246     if (execInst->_executorState != YACS::PAUSED)  execInst->wakeUp();
1247
1248   } // --- End of critical section (change state)
1249
1250   //execInst->notifyEndOfThread(0);
1251   Thread::exit(0);
1252   return 0;
1253 }
1254
1255 void Executor::traceExec(Task *task, const std::string& message, const std::string& placement)
1256 {
1257   string nodeName = _mainSched->getTaskName(task);
1258   Container *cont = task->getContainer();
1259   string containerName = "---";
1260   if (cont)
1261     containerName = cont->getName();
1262
1263 #ifdef WIN32
1264   DWORD now = timeGetTime();
1265   double elapse = (now - _start)/1000.0;
1266 #else
1267   timeval now;
1268   gettimeofday(&now, NULL);
1269   double elapse = (now.tv_sec - _start.tv_sec) + double(now.tv_usec - _start.tv_usec)/1000000.0;
1270 #endif
1271   {
1272     YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForTrace);
1273     _trace << elapse << " " << containerName << " " << placement << " " << nodeName << " " << message << endl;
1274     _trace << flush;
1275   }
1276 }
1277
1278 //! emit notification to all observers registered with  the dispatcher 
1279 /*!
1280  * The dispatcher is unique and can be obtained by getDispatcher()
1281  */
1282 void Executor::sendEvent(const std::string& event)
1283 {
1284   Dispatcher* disp=Dispatcher::getDispatcher();
1285   YASSERT(disp);
1286   YASSERT(_root);
1287   disp->dispatch(_root,event);
1288 }
1289
1290 /*!
1291  * This method takes in input a list of tasks and selects from that lists a part of it considering only the containers.
1292  * If tasks have no container instance subclass of HomogeneousPoolContainer this method will let the \a tsks untouched.
1293  *
1294  * \param [in,out] tsks - list of tasks to be
1295  */
1296 void Executor::FilterTasksConsideringContainers(std::vector<Task *>& tsks)
1297 {
1298   std::map<HomogeneousPoolContainer *, std::vector<Task *> > m;
1299   for(std::vector<Task *>::const_iterator it=tsks.begin();it!=tsks.end();it++)
1300     {
1301       Task *cur(*it);
1302       if(!cur)
1303         continue;
1304       Container *cont(cur->getContainer());
1305       if(!cont)
1306         {
1307           m[(HomogeneousPoolContainer *)NULL].push_back(cur);
1308           continue;
1309         }
1310       HomogeneousPoolContainer *contC(dynamic_cast<HomogeneousPoolContainer *>(cont));
1311       if(!contC)
1312         {
1313           m[(HomogeneousPoolContainer *)NULL].push_back(cur);
1314           continue;
1315         }
1316       m[contC].push_back(cur);
1317     }
1318   //
1319   std::vector<Task *> ret;
1320   for(std::map<HomogeneousPoolContainer *, std::vector<Task *> >::const_iterator it=m.begin();it!=m.end();it++)
1321     {
1322       HomogeneousPoolContainer *curhpc((*it).first);
1323       const std::vector<Task *>& curtsks((*it).second);
1324       if(!curhpc)
1325         {
1326           ret.insert(ret.end(),curtsks.begin(),curtsks.end());
1327         }
1328       else
1329         {
1330           // start of critical section for container curhpc
1331           YACS::BASES::AutoLocker<Container> alckForCont(curhpc);
1332           std::vector<const Task *> vecOfTaskSharingSameHPContToBeRunSimutaneously;
1333           std::size_t sz(curhpc->getNumberOfFreePlace());
1334           std::vector<Task *>::const_iterator it2(curtsks.begin());
1335           for(std::size_t i=0;i<sz && it2!=curtsks.end();i++,it2++)
1336             {
1337               vecOfTaskSharingSameHPContToBeRunSimutaneously.push_back(*it2);
1338               ret.push_back(*it2);
1339             }
1340           curhpc->allocateFor(vecOfTaskSharingSameHPContToBeRunSimutaneously);
1341           //end of critical section
1342         }
1343     }
1344   //
1345   tsks=ret;
1346 }
1347
1348 std::string Executor::ComputePlacement(Task *zeTask)
1349 {
1350   std::string placement("---");
1351   if(!zeTask)
1352     return placement;
1353   if(zeTask->getContainer())
1354     placement=zeTask->getContainer()->getFullPlacementId(zeTask);
1355   return placement;
1356 }