1 // Copyright (C) 2006-2019 CEA/DEN, EDF R&D
3 // This library is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU Lesser General Public
5 // License as published by the Free Software Foundation; either
6 // version 2.1 of the License, or (at your option) any later version.
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 // Lesser General Public License for more details.
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
20 #include "Executor.hxx"
22 #include "AutoLocker.hxx"
23 #include "Scheduler.hxx"
24 #include "Dispatcher.hxx"
25 #include "Container.hxx"
26 #include "HomogeneousPoolContainer.hxx"
27 #include "ComponentInstance.hxx"
29 #include "VisitorSaveState.hxx"
30 #include "ServiceNode.hxx"
31 #include "ComposedNode.hxx"
45 #define usleep(A) _sleep(A/1000)
46 #if !defined(S_ISCHR) || !defined(S_ISREG)
49 # define S_IFMT _S_IFMT
50 # define S_IFCHR _S_IFCHR
51 # define S_IFREG _S_IFREG
54 # define S_IFMT __S_IFMT
55 # define S_IFCHR __S_IFCHR
56 # define S_IFREG __S_IFREG
60 # define S_ISCHR(mode) (((mode) & S_IFMT) == S_IFCHR)
61 # define S_ISREG(mode) (((mode) & S_IFMT) == S_IFREG)
65 using namespace YACS::ENGINE;
68 using YACS::BASES::Mutex;
69 using YACS::BASES::Thread;
70 using YACS::BASES::Semaphore;
73 #include "YacsTrace.hxx"
75 int Executor::_maxThreads(1000);
76 size_t Executor::_threadStackSize(262144); // Default thread stack size is 256 kB == 2**18 because threads launched by YACS are lightweight
78 Executor::Executor():_mainSched(NULL),_isWaitingEventsFromRunningTasks(false),_nbOfConcurrentThreads(0), _semForMaxThreads(_maxThreads),_keepGoingOnFail(false),_DPLScopeSensitive(false)
83 _stopOnErrorRequested = false;
84 _dumpOnErrorRequested = false;
85 _errorDetected = false;
86 _isRunningunderExternalControl=false;
87 _executorState = YACS::NOTYETINITIALIZED;
88 _execMode = YACS::CONTINUE;
89 _semThreadCnt = _maxThreads;
90 _numberOfRunningTasks = 0;
91 _numberOfEndedTasks = 0;
92 DEBTRACE("Executor initialized with max threads = " << _maxThreads);
97 for(list<Thread *>::iterator iter=_groupOfAllThreadsCreated.begin();iter!=_groupOfAllThreadsCreated.end();iter++)
101 //! Execute a graph waiting for completion
103 * \param graph : schema to execute
104 * \param debug : display the graph with dot if debug == 1
105 * \param fromScratch : if true the graph is reinitialized
107 * Calls Scheduler::getNextTasks and Scheduler::selectRunnableTasks to select tasks to execute
109 * Calls Executor::launchTask to execute a selected Task.
111 * Completion when graph is finished (Scheduler::isFinished)
114 void Executor::RunA(Scheduler *graph,int debug, bool fromScratch)
116 DEBTRACE("Executor::RunW debug: " << debug << " fromScratch: " << fromScratch);
118 _root = dynamic_cast<ComposedNode *>(_mainSched);
119 if (!_root) throw Exception("Executor::Run, Internal Error!");
122 if(debug>1)_displayDot(graph);
126 graph->exUpdateState();
128 if(debug>1)_displayDot(graph);
129 vector<Task *> tasks;
130 vector<Task *>::iterator iter;
132 _execMode = YACS::CONTINUE;
133 _isWaitingEventsFromRunningTasks = false;
134 _numberOfRunningTasks = 0;
135 _runningTasks.clear();
136 _numberOfEndedTasks=0;
139 sleepWhileNoEventsFromAnyRunningTask();
141 if(debug>2)_displayDot(graph);
144 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
145 tasks=graph->getNextTasks(isMore);
146 graph->selectRunnableTasks(tasks);
147 }//End of critical section
149 if(debug>2)_displayDot(graph);
151 for(iter=tasks.begin();iter!=tasks.end();iter++)
152 loadTask(*iter,this);
154 if(debug>1)_displayDot(graph);
158 if(debug>1)_displayDot(graph);
161 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
162 _toContinue=!graph->isFinished();
163 }//End of critical section
164 DEBTRACE("_toContinue: " << _toContinue);
166 if(debug>0)_displayDot(graph);
172 //! Execute a graph with breakpoints or step by step
174 * To be launch in a thread (main thread controls the progression).
175 * \param graph : schema to execute
176 * \param debug : display the graph with dot if debug >0
177 * \param fromScratch : if false, state from a previous partial exection is already loaded
179 * Calls Scheduler::getNextTasks and Scheduler::selectRunnableTasks to select tasks to execute
181 * Calls Executor::checkBreakPoints to verify if a pause is requested
183 * Calls Executor::launchTask to execute a selected Task
185 * Completion when graph is finished (Scheduler::isFinished)
187 * States of execution:
188 * - YACS::NOTYETINITIALIZED
189 * - YACS::INITIALISED
190 * - YACS::RUNNING (to next breakpoint or step)
191 * - YACS::WAITINGTASKS (a breakpoint or step as been reached, but there are still running tasks)
192 * - YACS::PAUSED (a breakpoint or step as been reached, no more running tasks)
193 * - YACS::FINISHED (no more ready tasks, nore running tasks)
194 * - YACS::STOPPED (stopped by user before end)
196 * Modes of Execution:
197 * - YACS::CONTINUE (normal run without breakpoints)
198 * - YACS::STEPBYSTEP (pause at each loop)
199 * - YACS::STOPBEFORENODES (pause when a node is reached)
201 * A breakpoint is defined by a node name. The breakpoint is reached when the node becomes ready.
202 * Step by Step means execution node by node or group of node by group of nodes.
203 * At a given step, the user decides to launch all the ready nodes or only a subset
204 * (Caution: some nodes must run in parallel).
205 * The next event (end of task) may give a new set of ready nodes, and define a new step.
207 * The graph execution may be controled by a pilot which sends requests. Requests are asynchronous.
208 * Requests are taken into account only on certain states, otherwise return the status IgnoredRequest.
209 * - Executor::getCurrentExecMode
210 * - Executor::getExecutorState
211 * - Executor::setExecMode : change the execution mode for next loop
212 * - Executor::setListOfBreakPoints : must be set before setting YACS::STOPBEFORENODES
213 * - Executor::getTasksToLoad : when paused or waiting tasks, get the list of next tasks
214 * - Executor::setStepsToExecute : define a subset of the list given by Executor::getTasksToLoad
215 * - Executor::resumeCurrentBreakPoint : when paused or waiting tasks, resumes execution
216 * - Executor::isNotFinished
217 * - Executor::stopExecution : stop execution asap, i.e. set STEPBYSTEP and wait PAUSED
218 * - Executor::saveState : dump the current state of execution in an xml file
219 * - Executor::loadState : Not yet implemented
220 * - Executor::getNbOfThreads
221 * - Executor::displayDot
222 * - Executor::setStopOnError : ask to stop execution if a node is found in ERROR state
224 * If the pilot wants to wait the state YACS::PAUSED or YACS::WAITINGTASKS, synchronisation is obtained with:
225 * - Executor::waitPause
228 * - Pilot may connect to executor during execution, or deconnect.
229 * - Several Pilots may be connected at the same time (for observation...)
233 void Executor::RunB(Scheduler *graph,int debug, bool fromScratch)
235 DEBTRACE("Executor::RunB debug: "<< graph->getName() <<" "<< debug<<" fromScratch: "<<fromScratch);
237 { // --- Critical section
238 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
240 _root = dynamic_cast<ComposedNode *>(_mainSched);
241 if (!_root) throw Exception("Executor::Run, Internal Error!");
242 _executorState = YACS::NOTYETINITIALIZED;
243 sendEvent("executor");
246 _errorDetected = false;
247 _isWaitingEventsFromRunningTasks = false;
248 _numberOfRunningTasks = 0;
249 _runningTasks.clear();
250 _numberOfEndedTasks = 0;
251 string tracefile = "traceExec_";
252 tracefile += _mainSched->getName();
253 _trace.open(tracefile.c_str());
255 _start = timeGetTime();
257 gettimeofday(&_start, NULL);
260 } // --- End of critical section
262 if (debug > 1) _displayDot(graph);
269 graph->exUpdateState();
273 DEBTRACE("exception: "<< (ex.what()));
274 _executorState = YACS::FINISHED;
275 sendEvent("executor");
279 _executorState = YACS::INITIALISED;
280 sendEvent("executor");
282 if (debug > 1) _displayDot(graph);
284 vector<Task *>::iterator iter;
289 _executorState = YACS::RUNNING;
290 sendEvent("executor");
293 DEBTRACE("--- executor main loop");
294 sleepWhileNoEventsFromAnyRunningTask();
295 DEBTRACE("--- events...");
296 if (debug > 2) _displayDot(graph);
297 { // --- Critical section
298 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
299 _tasks=graph->getNextTasks(isMore);
300 graph->selectRunnableTasks(_tasks);
301 FilterTasksConsideringContainers(_tasks);
302 numberAllTasks=_numberOfRunningTasks+_tasks.size();
303 } // --- End of critical section
304 if (debug > 2) _displayDot(graph);
305 if (_executorState == YACS::RUNNING)
307 if (checkBreakPoints()) break; // end of thread requested, OK to exit at once;
308 if (debug > 0) _displayDot(graph);
311 loadParallelTasks(_tasks,this);
312 if (debug > 1) _displayDot(graph);
317 if (debug > 1) _displayDot(graph);
318 { // --- Critical section
320 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
321 //It is possible that the graph is finished but it remains running tasks (it's an error but we must take it into account)
322 if(_numberOfRunningTasks == 0)
323 _toContinue = !graph->isFinished();
325 DEBTRACE("_numberOfRunningTasks: " << _numberOfRunningTasks);
326 DEBTRACE("_numberOfEndedTasks: " << _numberOfEndedTasks);
327 DEBTRACE("_toContinue: " << _toContinue);
328 if(_toContinue && numberAllTasks==0)
330 //Problem : no running tasks and no task to launch ??
332 std::cerr << "Problem in Executor : no running tasks and no task to launch ?? problemCount=" << problemCount << std::endl;
333 //Pause to give a chance to interrupt
335 if(problemCount > 25)
337 // Too much problems encountered : stop execution
344 _executorState = YACS::FINISHED;
345 sendEvent("executor");
346 _condForPilot.notify_all();
348 } // --- End of critical section
349 if (debug > 0) _displayDot(graph);
350 DEBTRACE("_toContinue: " << _toContinue);
353 DEBTRACE("End of main Loop");
355 { // --- Critical section
356 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
357 if ( _toContinue) // --- break while(): request to stop detected on checkBreakPoints()
359 DEBTRACE("stop requested: End soon");
360 _executorState = YACS::STOPPED;
362 sendEvent("executor");
364 } // --- End of critical section
365 if ( _dumpOnErrorRequested && _errorDetected)
367 saveState(_dumpErrorFile);
370 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForTrace);
373 DEBTRACE("End of RunB thread");
376 YACS::ExecutionMode Executor::getCurrentExecMode()
378 _isRunningunderExternalControl=true;
383 YACS::ExecutorState Executor::getExecutorState()
385 _isRunningunderExternalControl=true;
386 return _executorState;
390 bool Executor::isNotFinished()
392 _isRunningunderExternalControl=true;
396 //! ask to stop execution on the first node found in error
398 * \param dumpRequested produce a state dump when an error is found
399 * \param xmlFile name of file used for state dump
402 void Executor::setStopOnError(bool dumpRequested, std::string xmlFile)
404 { // --- Critical section
405 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
406 _dumpErrorFile=xmlFile;
407 _stopOnErrorRequested=true;
408 _dumpOnErrorRequested = dumpRequested;
409 if (dumpRequested && xmlFile.empty())
410 throw YACS::Exception("dump on error requested and no filename given for dump");
411 DEBTRACE("_dumpErrorFile " << _dumpErrorFile << " " << _dumpOnErrorRequested);
412 } // --- End of critical section
415 //! ask to do not stop execution on nodes found in error
419 void Executor::unsetStopOnError()
421 { // --- Critical section
422 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
423 _stopOnErrorRequested=false;
424 } // --- End of critical section
427 //! Dynamically set the current mode of execution
429 * The mode can be Continue, step by step, or stop before execution of a node
430 * defined in a list of breakpoints.
433 void Executor::setExecMode(YACS::ExecutionMode mode)
435 DEBTRACE("Executor::setExecMode(YACS::ExecutionMode mode) " << mode);
436 { // --- Critical section
437 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
438 _isRunningunderExternalControl=true;
440 } // --- End of critical section
443 //! wake up executor when in pause
445 * When Executor is in state paused or waiting for task completion, the thread
446 * running loop RunB waits on condition _condForStepByStep.
447 * Thread RunB is waken up.
448 * \return true when actually wakes up executor
451 bool Executor::resumeCurrentBreakPoint()
453 DEBTRACE("Executor::resumeCurrentBreakPoint()");
455 //bool doDump = false;
456 { // --- Critical section
457 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
458 _isRunningunderExternalControl=true;
459 DEBTRACE("_executorState: " << _executorState);
460 switch (_executorState)
462 case YACS::WAITINGTASKS:
465 _condForStepByStep.notify_all();
466 _executorState = YACS::RUNNING;
467 sendEvent("executor");
469 //if (_dumpOnErrorRequested && _errorDetected) doDump =true;
475 //if (_dumpOnErrorRequested && _errorDetected) doDump =true;
476 DEBTRACE("Graph Execution finished or stopped !");
481 // debug: no easy way to verify if main loop is acutally waiting on condition
485 //if (doDump) saveState(_dumpErrorFile);
486 } // --- End of critical section
491 //! define a list of nodes names as breakpoints in the graph
494 void Executor::setListOfBreakPoints(std::list<std::string> listOfBreakPoints)
496 DEBTRACE("Executor::setListOfBreakPoints(std::list<std::string> listOfBreakPoints)");
497 { // --- Critical section
498 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
499 _isRunningunderExternalControl=true;
500 _listOfBreakPoints = listOfBreakPoints;
501 } // --- End of critical section
505 //! Get the list of tasks to load, to define a subset to execute in step by step mode
507 * If the executor is not in mode YACS::WAITINGTASKS nor YACS::PAUSED, the list is empty.
508 * Use Executor::waitPause to wait.
510 std::list<std::string> Executor::getTasksToLoad()
512 DEBTRACE("Executor::getTasksToLoad()");
513 list<string> listOfNodesToLoad;
514 listOfNodesToLoad.clear();
515 { // --- Critical section
516 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
517 _isRunningunderExternalControl=true;
518 switch (_executorState)
520 case YACS::WAITINGTASKS:
523 listOfNodesToLoad = _listOfTasksToLoad;
526 case YACS::NOTYETINITIALIZED:
527 case YACS::INITIALISED:
536 } // --- End of critical section
537 return listOfNodesToLoad;
541 //! Define a subset of task to execute in step by step mode
543 * Behaviour is unpredictable if the list is not a subset of the list given by Executor::getTasksToLoad
544 * in the current step.
545 * If some nodes must run in parallel, they must stay together in the list.
548 bool Executor::setStepsToExecute(std::list<std::string> listToExecute)
550 DEBTRACE("Executor::setStepsToExecute(std::list<std::string> listToExecute)");
552 vector<Task *>::iterator iter;
553 vector<Task *> restrictedTasks;
554 { // --- Critical section
555 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
556 _isRunningunderExternalControl=true;
557 switch (_executorState)
559 case YACS::WAITINGTASKS:
562 for (iter=_tasksSave.begin(); iter!=_tasksSave.end(); iter++)
564 string readyNode = _mainSched->getTaskName(*iter);
565 if (find(listToExecute.begin(), listToExecute.end(), readyNode)
566 != listToExecute.end())
568 restrictedTasks.push_back(*iter);
569 DEBTRACE("node to execute " << readyNode);
573 for (iter=restrictedTasks.begin(); iter!=restrictedTasks.end(); iter++)
575 _tasks.push_back(*iter);
579 case YACS::NOTYETINITIALIZED:
580 case YACS::INITIALISED:
589 } // --- End of critical section
592 for (iter=restrictedTasks.begin(); iter!=restrictedTasks.end(); iter++)
594 _tasks.push_back(*iter);
596 for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
598 string readyNode = _mainSched->getTaskName(*iter);
599 DEBTRACE("selected node to execute " << readyNode);
605 //! suspend pilot execution until Executor is in pause or waiting tasks completion mode.
607 * Do nothing if execution is finished or in pause.
608 * Wait first step if Executor is running or in initialization.
611 void Executor::waitPause()
613 DEBTRACE("Executor::waitPause()" << _executorState);
614 { // --- Critical section
615 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
616 _isRunningunderExternalControl=true;
617 switch (_executorState)
622 case YACS::WAITINGTASKS:
627 case YACS::NOTYETINITIALIZED:
628 case YACS::INITIALISED:
631 _condForPilot.wait(_mutexForSchedulerUpdate); // wait until executor is PAUSED or WAITINGTASKS
635 } // --- End of critical section
640 * This method can be called at any time simultaneously during a RunB call.
641 * This method will wait until the executor is locked in a consistent state of a running graph.
643 * This method is expected to be called in association with resume method.
644 * The returned parameter is expected to be transfered to resume method.
646 bool Executor::suspendASAP()
648 // no AutoLocker here. It's not a bug.
649 _mutexForSchedulerUpdate.lock();
650 if(!_toContinue && _executorState==YACS::FINISHED)
651 {// execution is finished
652 _mutexForSchedulerUpdate.unLock();
653 return false;// the executor is no more running
655 //general case. Leave method with locker in locked status
660 * This method is expected to be called in association with suspendASAP method.
661 * Expected to be called just after suspendASAP with output of resume as input parameter
663 void Executor::resume(bool suspended)
666 _mutexForSchedulerUpdate.unLock();
669 //! stops the execution as soon as possible
671 void Executor::stopExecution()
673 setExecMode(YACS::STEPBYSTEP);
676 resumeCurrentBreakPoint();
679 //! save the current state of execution in an xml file
681 bool Executor::saveState(const std::string& xmlFile)
683 DEBTRACE("Executor::saveState() in " << xmlFile);
686 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
687 YACS::ENGINE::VisitorSaveState vst(_root);
688 vst.openFileDump(xmlFile.c_str());
693 catch(Exception& ex) {
694 std::cerr << ex.what() << std::endl;
699 //! not yet implemented
701 bool Executor::loadState()
703 DEBTRACE("Executor::loadState()");
704 _isRunningunderExternalControl=true;
709 static int isfile(const char *filename)
712 if (stat(filename, &buf) != 0)
714 if (!S_ISREG(buf.st_mode))
719 //! Display the graph state as a dot display, public method
721 void Executor::displayDot(Scheduler *graph)
723 _isRunningunderExternalControl=true;
727 //! Display the graph state as a dot display
729 * \param graph : the node to display
732 void Executor::_displayDot(Scheduler *graph)
734 std::ofstream g("titi");
735 ((ComposedNode*)graph)->writeDot(g);
737 const char displayScript[]="display.sh";
738 if(isfile(displayScript))
739 system("sh display.sh");
741 system("dot -Tpng titi|display -delay 5");
744 //! Wait reactivation in modes Step By step or with BreakPoints
746 * Check mode of execution (set by main thread):
747 * - YACS::CONTINUE : the graph execution continues.
748 * - YACS::STEPBYSTEP : wait on condition (pilot thread, Executor::resumeCurrentBreakPoint)
749 * - YACS::STOPBEFORENODES : if there are ready nodes in a list of breakpoints,
750 * wait on condition (pilot thread, Executor::resumeCurrentBreakPoint)
751 * else continue the graph execution.
752 * \return true if end of executor thread is requested
755 bool Executor::checkBreakPoints()
757 DEBTRACE("Executor::checkBreakPoints()");
758 vector<Task *>::iterator iter;
759 bool endRequested = false;
767 case YACS::STOPBEFORENODES:
770 { // --- Critical section
771 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
773 for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
775 string nodeToLoad = _mainSched->getTaskName(*iter);
776 if (find(_listOfBreakPoints.begin(), _listOfBreakPoints.end(), nodeToLoad)
777 != _listOfBreakPoints.end())
785 _listOfTasksToLoad.clear();
786 for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
788 string nodeToLoad = _mainSched->getTaskName(*iter);
789 _listOfTasksToLoad.push_back(nodeToLoad);
791 if (getNbOfThreads())
792 _executorState = YACS::WAITINGTASKS; // will be paused after completion of running tasks
794 _executorState = YACS::PAUSED;
795 sendEvent("executor");
796 _condForPilot.notify_all();
798 if (stop && !_isOKToEnd) waitResume(); // wait until pilot calls resumeCurrentBreakPoint(), mutex released during wait
799 if (_isOKToEnd) endRequested = true;
800 } // --- End of critical section
801 if (stop) DEBTRACE("wake up from waitResume");
805 case YACS::STEPBYSTEP:
807 { // --- Critical section
808 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
810 _listOfTasksToLoad.clear();
811 for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
813 string nodeToLoad = _mainSched->getTaskName(*iter);
814 _listOfTasksToLoad.push_back(nodeToLoad);
816 if (getNbOfThreads())
817 _executorState = YACS::WAITINGTASKS; // will be paused after completion of running tasks
819 _executorState = YACS::PAUSED;
820 sendEvent("executor");
821 _condForPilot.notify_all();
823 waitResume(); // wait until pilot calls resumeCurrentBreakPoint(), mutex released during wait
824 // or, if no pilot, wait until no more running tasks (stop on error)
825 if (_isOKToEnd) endRequested = true;
826 } // --- End of critical section
827 DEBTRACE("wake up from waitResume");
831 DEBTRACE("endRequested: " << endRequested);
836 //! in modes Step By step or with BreakPoint, wait until pilot resumes the execution
838 * With the condition Mutex, the mutex is released atomically during the wait.
839 * Pilot calls Executor::resumeCurrentBreakPoint to resume execution.
840 * Must be called while mutex is locked.
843 void Executor::waitResume()
845 DEBTRACE("Executor::waitResume()");
846 _condForStepByStep.wait(_mutexForSchedulerUpdate); // wait until pilot calls resumeCurrentBreakPoint()
851 //! Perform loading of a Task.
853 * \param task : Task to load
856 void Executor::loadTask(Task *task, const Executor *execInst)
858 DEBTRACE("Executor::loadTask(Task *task)");
859 if(task->getState() != YACS::TOLOAD)
861 traceExec(task, "state:TOLOAD", ComputePlacement(task));
863 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
864 _mainSched->notifyFrom(task,YACS::START,execInst);
865 }//End of critical section
868 traceExec(task, "load", ComputePlacement(task));
870 traceExec(task, "initService", ComputePlacement(task));
875 std::cerr << ex.what() << std::endl;
877 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
879 _mainSched->notifyFrom(task,YACS::ABORT,execInst);
880 traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
881 }//End of critical section
885 std::cerr << "Load failed" << std::endl;
887 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
889 _mainSched->notifyFrom(task,YACS::ABORT,execInst);
890 traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
891 }//End of critical section
902 void Executor::loadTasks(const std::vector<Task *>& tasks, const Executor *execInst)
904 for(std::vector<Task *>::const_iterator iter = _tasks.begin(); iter != _tasks.end(); iter++)
905 loadTask(*iter,execInst);
908 void Executor::loadParallelTasks(const std::vector<Task *>& tasks, const Executor *execInst)
910 std::vector<Thread> ths(tasks.size());
911 std::size_t ithread(0);
912 for(std::vector<Task *>::const_iterator iter = _tasks.begin(); iter != _tasks.end(); iter++, ithread++)
914 DEBTRACE("Executor::loadParallelTasks(Task *task)");
915 struct threadargs *args(new threadargs);
916 args->task = (*iter);
917 args->sched = _mainSched;
918 args->execInst = this;
919 ths[ithread].go(functionForTaskLoad, args, _threadStackSize);
921 for(ithread=0;ithread<tasks.size();ithread++)
925 //! Execute a list of tasks possibly connected through datastream links
927 * \param tasks : a list of tasks to execute
930 void Executor::launchTasks(const std::vector<Task *>& tasks)
932 //First phase, make datastream connections
933 for(vector<Task *>::const_iterator iter=tasks.begin();iter!=tasks.end();iter++)
935 YACS::StatesForNode state=(*iter)->getState();
936 if(state != YACS::TOLOAD && state != YACS::TORECONNECT)continue;
939 (*iter)->connectService();
940 traceExec(*iter, "connectService",ComputePlacement(*iter));
942 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
943 (*iter)->connected();
944 }//End of critical section
948 std::cerr << ex.what() << std::endl;
951 (*iter)->disconnectService();
952 traceExec(*iter, "disconnectService",ComputePlacement(*iter));
956 // Disconnect has failed
957 traceExec(*iter, "disconnectService failed, ABORT",ComputePlacement(*iter));
960 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
962 _mainSched->notifyFrom(*iter,YACS::ABORT,this);
963 }//End of critical section
967 std::cerr << "Problem in connectService" << std::endl;
970 (*iter)->disconnectService();
971 traceExec(*iter, "disconnectService",ComputePlacement(*iter));
975 // Disconnect has failed
976 traceExec(*iter, "disconnectService failed, ABORT",ComputePlacement(*iter));
979 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
981 _mainSched->notifyFrom(*iter,YACS::ABORT,this);
982 }//End of critical section
984 if((*iter)->getState() == YACS::ERROR)
986 //try to put all coupled tasks in error
987 std::set<Task*> coupledSet;
988 (*iter)->getCoupledTasks(coupledSet);
989 for (std::set<Task*>::iterator it = coupledSet.begin(); it != coupledSet.end(); ++it)
992 if(t == *iter)continue;
993 if(t->getState() == YACS::ERROR)continue;
996 t->disconnectService();
997 traceExec(t, "disconnectService",ComputePlacement(*iter));
1001 // Disconnect has failed
1002 traceExec(t, "disconnectService failed, ABORT",ComputePlacement(*iter));
1005 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1007 _mainSched->notifyFrom(t,YACS::ABORT,this);
1008 }//End of critical section
1009 traceExec(t, "state:"+Node::getStateName(t->getState()),ComputePlacement(*iter));
1012 traceExec(*iter, "state:"+Node::getStateName((*iter)->getState()),ComputePlacement(*iter));
1015 //Second phase, execute each task in a thread
1016 for(vector<Task *>::const_iterator iter=tasks.begin();iter!=tasks.end();iter++)
1022 //! Execute a Task in a thread
1024 * \param task : Task to execute
1026 * Calls Scheduler::notifyFrom of main node (_mainSched) to notify start
1028 * Calls Executor::functionForTaskExecution in Thread
1031 void Executor::launchTask(Task *task)
1033 DEBTRACE("Executor::launchTask(Task *task)");
1034 struct threadargs *args;
1035 if(task->getState() != YACS::TOACTIVATE)return;
1037 DEBTRACE("before _semForMaxThreads.wait " << _semThreadCnt);
1038 if(_semThreadCnt == 0)
1040 // --- Critical section
1041 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1042 //check if we have enough threads to run
1043 std::set<Task*> tmpSet=_runningTasks;
1044 std::set<Task*>::iterator it = tmpSet.begin();
1045 std::string status="running";
1046 std::set<Task*> coupledSet;
1047 while( it != tmpSet.end() )
1051 tt->getCoupledTasks(coupledSet);
1053 for (std::set<Task*>::iterator iter = coupledSet.begin(); iter != coupledSet.end(); ++iter)
1055 if((*iter)->getState() == YACS::TOACTIVATE)status="toactivate";
1056 tmpSet.erase(*iter);
1058 if(status=="running")break;
1059 it = tmpSet.begin();
1062 if(status=="toactivate")
1064 std::cerr << "WARNING: maybe you need more threads to run your schema (current value="<< _maxThreads << ")" << std::endl;
1065 std::cerr << "If it is the case, set the YACS_MAX_THREADS environment variable to a bigger value (export YACS_MAX_THREADS=xxx)" << std::endl;
1067 // --- End of critical section
1070 _semForMaxThreads.wait();
1073 args= new threadargs;
1075 args->sched = _mainSched;
1076 args->execInst = this;
1078 traceExec(task, "launch",ComputePlacement(task));
1080 { // --- Critical section
1081 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1082 _numberOfRunningTasks++;
1083 _runningTasks.insert(task);
1084 task->begin(); //change state to ACTIVATED
1085 } // --- End of critical section
1086 Thread(functionForTaskExecution, args, _threadStackSize);
1089 //! wait until a running task ends
1091 void Executor::sleepWhileNoEventsFromAnyRunningTask()
1093 DEBTRACE("Executor::sleepWhileNoEventsFromAnyRunningTask()");
1094 // _semForNewTasksToPerform.wait(); //----utiliser pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
1095 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1096 if (_numberOfRunningTasks > 0 && _numberOfEndedTasks==0)
1098 _isWaitingEventsFromRunningTasks = true;
1099 _condForNewTasksToPerform.wait(_mutexForSchedulerUpdate); // mutex released during wait
1101 _numberOfEndedTasks=0;
1107 void Executor::notifyEndOfThread(YACS::BASES::Thread *thread)
1109 /*_mutexForNbOfConcurrentThreads.lock();
1110 _groupOfAllThreadsCreated.remove(thread);
1112 _mutexForNbOfConcurrentThreads.unlock();*/
1116 //! must be used protected by _mutexForSchedulerUpdate!
1118 void Executor::wakeUp()
1120 DEBTRACE("Executor::wakeUp() " << _isWaitingEventsFromRunningTasks);
1121 if (_isWaitingEventsFromRunningTasks)
1123 _isWaitingEventsFromRunningTasks = false;
1124 _condForNewTasksToPerform.notify_all();
1127 _numberOfEndedTasks++;
1130 //! number of running tasks
1132 int Executor::getNbOfThreads()
1135 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForNbOfConcurrentThreads);
1136 _isRunningunderExternalControl=true;
1137 ret = _groupOfAllThreadsCreated.size();
1142 * This thread is NOT supposed to be detached !
1144 void *Executor::functionForTaskLoad(void *arg)
1146 DEBTRACE("Executor::functionForTaskLoad(void *arg)");
1147 struct threadargs *args = (struct threadargs *) arg;
1148 Task *task=args->task;
1149 Scheduler *sched=args->sched;
1150 Executor *execInst=args->execInst;
1152 execInst->loadTask(task,execInst);// no throw of this method - all throw are catched !
1156 //! Function to perform execution of a task in a thread
1158 * \param arg : 3 elements (a Task, a Scheduler, an Executor)
1160 * Calls Task::execute
1162 * Calls Task::finished when the task is finished
1164 * Calls (notify with event YACS::FINISH) Scheduler::notifyFrom when the task is finished
1166 * Calls Executor::wakeUp and Executor::notifyEndOfThread
1169 void *Executor::functionForTaskExecution(void *arg)
1171 DEBTRACE("Executor::functionForTaskExecution(void *arg)");
1173 struct threadargs *args = (struct threadargs *) arg;
1174 Task *task=args->task;
1175 Scheduler *sched=args->sched;
1176 Executor *execInst=args->execInst;
1178 execInst->traceExec(task, "state:"+Node::getStateName(task->getState()),ComputePlacement(task));
1184 if(execInst->getDPLScopeSensitive())
1186 Node *node(dynamic_cast<Node *>(task));
1187 ComposedNode *gfn(dynamic_cast<ComposedNode *>(sched));
1188 if(node!=0 && gfn!=0)
1189 node->applyDPLScope(gfn);
1192 YACS::Event ev=YACS::FINISH;
1195 execInst->traceExec(task, "start execution",ComputePlacement(task));
1197 execInst->traceExec(task, "end execution OK",ComputePlacement(task));
1199 catch(Exception& ex)
1201 std::cerr << "YACS Exception during execute" << std::endl;
1202 std::cerr << ex.what() << std::endl;
1204 string message = "end execution ABORT, ";
1205 message += ex.what();
1206 execInst->traceExec(task, message,ComputePlacement(task));
1210 // Execution has failed
1211 std::cerr << "Execution has failed: unknown reason" << std::endl;
1213 execInst->traceExec(task, "end execution ABORT, unknown reason",ComputePlacement(task));
1219 DEBTRACE("task->disconnectService()");
1220 task->disconnectService();
1221 execInst->traceExec(task, "disconnectService",ComputePlacement(task));
1225 // Disconnect has failed
1226 std::cerr << "disconnect has failed" << std::endl;
1228 execInst->traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
1232 std::string placement(ComputePlacement(task));
1234 // container management for HomogeneousPoolOfContainer
1236 HomogeneousPoolContainer *contC(dynamic_cast<HomogeneousPoolContainer *>(task->getContainer()));
1239 YACS::BASES::AutoLocker<Container> alckCont(contC);
1240 contC->release(task);
1243 DEBTRACE("End task->execute()");
1244 { // --- Critical section
1245 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&execInst->_mutexForSchedulerUpdate);
1248 if (ev == YACS::FINISH) task->finished();
1249 if (ev == YACS::ABORT)
1251 execInst->_errorDetected = true;
1252 if (execInst->_stopOnErrorRequested)
1254 execInst->_execMode = YACS::STEPBYSTEP;
1255 execInst->_isOKToEnd = true;
1259 execInst->traceExec(task, "state:"+Node::getStateName(task->getState()),placement);
1260 sched->notifyFrom(task,ev,execInst);
1262 catch(Exception& ex)
1264 //notify has failed : it is supposed to have set state
1265 //so no need to do anything
1266 std::cerr << "Error during notification" << std::endl;
1267 std::cerr << ex.what() << std::endl;
1271 //notify has failed : it is supposed to have set state
1272 //so no need to do anything
1273 std::cerr << "Notification failed" << std::endl;
1275 execInst->_numberOfRunningTasks--;
1276 execInst->_runningTasks.erase(task);
1277 DEBTRACE("_numberOfRunningTasks: " << execInst->_numberOfRunningTasks
1278 << " _execMode: " << execInst->_execMode
1279 << " _executorState: " << execInst->_executorState);
1280 if ((execInst->_numberOfRunningTasks == 0) && (execInst->_execMode != YACS::CONTINUE)) // no more running tasks
1282 if (execInst->_executorState == YACS::WAITINGTASKS)
1284 execInst->_executorState = YACS::PAUSED;
1285 execInst->sendEvent("executor");
1286 execInst->_condForPilot.notify_all();
1287 if (execInst->_errorDetected &&
1288 execInst->_stopOnErrorRequested &&
1289 !execInst->_isRunningunderExternalControl)
1290 execInst->_condForStepByStep.notify_all(); // exec thread may be on waitResume
1293 DEBTRACE("before _semForMaxThreads.post " << execInst->_semThreadCnt);
1294 execInst->_semForMaxThreads.post();
1295 execInst->_semThreadCnt += 1;
1296 DEBTRACE("after _semForMaxThreads.post " << execInst->_semThreadCnt);
1297 if (execInst->_executorState != YACS::PAUSED) execInst->wakeUp();
1299 } // --- End of critical section (change state)
1301 //execInst->notifyEndOfThread(0);
1306 void Executor::traceExec(Task *task, const std::string& message, const std::string& placement)
1308 string nodeName = _mainSched->getTaskName(task);
1309 Container *cont = task->getContainer();
1310 string containerName = "---";
1312 containerName = cont->getName();
1315 DWORD now = timeGetTime();
1316 double elapse = (now - _start)/1000.0;
1319 gettimeofday(&now, NULL);
1320 double elapse = (now.tv_sec - _start.tv_sec) + double(now.tv_usec - _start.tv_usec)/1000000.0;
1323 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForTrace);
1324 _trace << elapse << " " << containerName << " " << placement << " " << nodeName << " " << message << endl;
1329 //! emit notification to all observers registered with the dispatcher
1331 * The dispatcher is unique and can be obtained by getDispatcher()
1333 void Executor::sendEvent(const std::string& event)
1335 Dispatcher* disp=Dispatcher::getDispatcher();
1338 disp->dispatch(_root,event);
1342 * This method takes in input a list of tasks and selects from that lists a part of it considering only the containers.
1343 * If tasks have no container instance subclass of HomogeneousPoolContainer this method will let the \a tsks untouched.
1345 * \param [in,out] tsks - list of tasks to be
1347 void Executor::FilterTasksConsideringContainers(std::vector<Task *>& tsks)
1349 std::map<HomogeneousPoolContainer *, std::vector<Task *> > m;
1350 for(std::vector<Task *>::const_iterator it=tsks.begin();it!=tsks.end();it++)
1355 Container *cont(cur->getContainer());
1358 m[(HomogeneousPoolContainer *)NULL].push_back(cur);
1361 HomogeneousPoolContainer *contC(dynamic_cast<HomogeneousPoolContainer *>(cont));
1364 m[(HomogeneousPoolContainer *)NULL].push_back(cur);
1367 m[contC].push_back(cur);
1370 std::vector<Task *> ret;
1371 for(std::map<HomogeneousPoolContainer *, std::vector<Task *> >::const_iterator it=m.begin();it!=m.end();it++)
1373 HomogeneousPoolContainer *curhpc((*it).first);
1374 const std::vector<Task *>& curtsks((*it).second);
1377 ret.insert(ret.end(),curtsks.begin(),curtsks.end());
1381 // start of critical section for container curhpc
1382 YACS::BASES::AutoLocker<Container> alckForCont(curhpc);
1383 std::vector<const Task *> vecOfTaskSharingSameHPContToBeRunSimutaneously;
1384 std::size_t sz(curhpc->getNumberOfFreePlace());
1385 std::vector<Task *>::const_iterator it2(curtsks.begin());
1386 for(std::size_t i=0;i<sz && it2!=curtsks.end();i++,it2++)
1388 vecOfTaskSharingSameHPContToBeRunSimutaneously.push_back(*it2);
1389 ret.push_back(*it2);
1391 curhpc->allocateFor(vecOfTaskSharingSameHPContToBeRunSimutaneously);
1392 //end of critical section
1399 std::string Executor::ComputePlacement(Task *zeTask)
1401 std::string placement("---");
1404 if(zeTask->getContainer())
1405 placement=zeTask->getContainer()->getFullPlacementId(zeTask);