1 // Copyright (C) 2006-2020 CEA/DEN, EDF R&D
3 // This library is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU Lesser General Public
5 // License as published by the Free Software Foundation; either
6 // version 2.1 of the License, or (at your option) any later version.
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 // Lesser General Public License for more details.
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
20 #include "Executor.hxx"
22 #include "AutoLocker.hxx"
23 #include "Scheduler.hxx"
24 #include "Dispatcher.hxx"
25 #include "Container.hxx"
26 #include "HomogeneousPoolContainer.hxx"
27 #include "ComponentInstance.hxx"
29 #include "VisitorSaveState.hxx"
30 #include "ServiceNode.hxx"
31 #include "ComposedNode.hxx"
33 #include "WlmTask.hxx"
34 #include "workloadmanager/WorkloadManager.hxx"
35 #include "workloadmanager/DefaultAlgorithm.hxx"
49 #define usleep(A) _sleep(A/1000)
50 #if !defined(S_ISCHR) || !defined(S_ISREG)
53 # define S_IFMT _S_IFMT
54 # define S_IFCHR _S_IFCHR
55 # define S_IFREG _S_IFREG
58 # define S_IFMT __S_IFMT
59 # define S_IFCHR __S_IFCHR
60 # define S_IFREG __S_IFREG
64 # define S_ISCHR(mode) (((mode) & S_IFMT) == S_IFCHR)
65 # define S_ISREG(mode) (((mode) & S_IFMT) == S_IFREG)
69 using namespace YACS::ENGINE;
72 using YACS::BASES::Mutex;
73 using YACS::BASES::Thread;
74 using YACS::BASES::Semaphore;
77 #include "YacsTrace.hxx"
79 int Executor::_maxThreads(1000);
80 size_t Executor::_threadStackSize(262144); // Default thread stack size is 256 kB == 2**18 because threads launched by YACS are lightweight
82 Executor::Executor():_mainSched(NULL),_isWaitingEventsFromRunningTasks(false),_nbOfConcurrentThreads(0), _semForMaxThreads(_maxThreads),_keepGoingOnFail(false),_DPLScopeSensitive(false)
87 _stopOnErrorRequested = false;
88 _dumpOnErrorRequested = false;
89 _errorDetected = false;
90 _isRunningunderExternalControl=false;
91 _executorState = YACS::NOTYETINITIALIZED;
92 _execMode = YACS::CONTINUE;
93 _semThreadCnt = _maxThreads;
94 _numberOfRunningTasks = 0;
95 _numberOfEndedTasks = 0;
96 DEBTRACE("Executor initialized with max threads = " << _maxThreads);
101 for(list<Thread *>::iterator iter=_groupOfAllThreadsCreated.begin();iter!=_groupOfAllThreadsCreated.end();iter++)
105 //! Execute a graph waiting for completion
107 * \param graph : schema to execute
108 * \param debug : display the graph with dot if debug == 1
109 * \param fromScratch : if true the graph is reinitialized
111 * Calls Scheduler::getNextTasks and Scheduler::selectRunnableTasks to select tasks to execute
113 * Calls Executor::launchTask to execute a selected Task.
115 * Completion when graph is finished (Scheduler::isFinished)
118 void Executor::RunA(Scheduler *graph,int debug, bool fromScratch)
120 DEBTRACE("Executor::RunW debug: " << debug << " fromScratch: " << fromScratch);
122 _root = dynamic_cast<ComposedNode *>(_mainSched);
123 if (!_root) throw Exception("Executor::Run, Internal Error!");
126 if(debug>1)_displayDot(graph);
130 graph->exUpdateState();
132 if(debug>1)_displayDot(graph);
133 vector<Task *> tasks;
134 vector<Task *>::iterator iter;
136 _execMode = YACS::CONTINUE;
137 _isWaitingEventsFromRunningTasks = false;
138 _numberOfRunningTasks = 0;
139 _runningTasks.clear();
140 _numberOfEndedTasks=0;
143 sleepWhileNoEventsFromAnyRunningTask();
145 if(debug>2)_displayDot(graph);
148 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
149 tasks=graph->getNextTasks(isMore);
150 graph->selectRunnableTasks(tasks);
151 }//End of critical section
153 if(debug>2)_displayDot(graph);
155 for(iter=tasks.begin();iter!=tasks.end();iter++)
156 loadTask(*iter,this);
158 if(debug>1)_displayDot(graph);
162 if(debug>1)_displayDot(graph);
165 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
166 _toContinue=!graph->isFinished();
167 }//End of critical section
168 DEBTRACE("_toContinue: " << _toContinue);
170 if(debug>0)_displayDot(graph);
176 //! Execute a graph with breakpoints or step by step
178 * To be launch in a thread (main thread controls the progression).
179 * \param graph : schema to execute
180 * \param debug : display the graph with dot if debug >0
181 * \param fromScratch : if false, state from a previous partial exection is already loaded
183 * Calls Scheduler::getNextTasks and Scheduler::selectRunnableTasks to select tasks to execute
185 * Calls Executor::checkBreakPoints to verify if a pause is requested
187 * Calls Executor::launchTask to execute a selected Task
189 * Completion when graph is finished (Scheduler::isFinished)
191 * States of execution:
192 * - YACS::NOTYETINITIALIZED
193 * - YACS::INITIALISED
194 * - YACS::RUNNING (to next breakpoint or step)
195 * - YACS::WAITINGTASKS (a breakpoint or step as been reached, but there are still running tasks)
196 * - YACS::PAUSED (a breakpoint or step as been reached, no more running tasks)
197 * - YACS::FINISHED (no more ready tasks, nore running tasks)
198 * - YACS::STOPPED (stopped by user before end)
200 * Modes of Execution:
201 * - YACS::CONTINUE (normal run without breakpoints)
202 * - YACS::STEPBYSTEP (pause at each loop)
203 * - YACS::STOPBEFORENODES (pause when a node is reached)
205 * A breakpoint is defined by a node name. The breakpoint is reached when the node becomes ready.
206 * Step by Step means execution node by node or group of node by group of nodes.
207 * At a given step, the user decides to launch all the ready nodes or only a subset
208 * (Caution: some nodes must run in parallel).
209 * The next event (end of task) may give a new set of ready nodes, and define a new step.
211 * The graph execution may be controled by a pilot which sends requests. Requests are asynchronous.
212 * Requests are taken into account only on certain states, otherwise return the status IgnoredRequest.
213 * - Executor::getCurrentExecMode
214 * - Executor::getExecutorState
215 * - Executor::setExecMode : change the execution mode for next loop
216 * - Executor::setListOfBreakPoints : must be set before setting YACS::STOPBEFORENODES
217 * - Executor::getTasksToLoad : when paused or waiting tasks, get the list of next tasks
218 * - Executor::setStepsToExecute : define a subset of the list given by Executor::getTasksToLoad
219 * - Executor::resumeCurrentBreakPoint : when paused or waiting tasks, resumes execution
220 * - Executor::isNotFinished
221 * - Executor::stopExecution : stop execution asap, i.e. set STEPBYSTEP and wait PAUSED
222 * - Executor::saveState : dump the current state of execution in an xml file
223 * - Executor::loadState : Not yet implemented
224 * - Executor::getNbOfThreads
225 * - Executor::displayDot
226 * - Executor::setStopOnError : ask to stop execution if a node is found in ERROR state
228 * If the pilot wants to wait the state YACS::PAUSED or YACS::WAITINGTASKS, synchronisation is obtained with:
229 * - Executor::waitPause
232 * - Pilot may connect to executor during execution, or deconnect.
233 * - Several Pilots may be connected at the same time (for observation...)
237 void Executor::RunB(Scheduler *graph,int debug, bool fromScratch)
239 DEBTRACE("Executor::RunB debug: "<< graph->getName() <<" "<< debug<<" fromScratch: "<<fromScratch);
241 { // --- Critical section
242 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
244 _root = dynamic_cast<ComposedNode *>(_mainSched);
245 if (!_root) throw Exception("Executor::Run, Internal Error!");
246 _executorState = YACS::NOTYETINITIALIZED;
247 sendEvent("executor");
250 _errorDetected = false;
251 _isWaitingEventsFromRunningTasks = false;
252 _numberOfRunningTasks = 0;
253 _runningTasks.clear();
254 _numberOfEndedTasks = 0;
255 string tracefile = "traceExec_";
256 tracefile += _mainSched->getName();
257 _trace.open(tracefile.c_str());
258 _start = std::chrono::steady_clock::now();
259 } // --- End of critical section
261 if (debug > 1) _displayDot(graph);
268 graph->exUpdateState();
272 DEBTRACE("exception: "<< (ex.what()));
273 _executorState = YACS::FINISHED;
274 sendEvent("executor");
278 _executorState = YACS::INITIALISED;
279 sendEvent("executor");
281 if (debug > 1) _displayDot(graph);
283 vector<Task *>::iterator iter;
288 _executorState = YACS::RUNNING;
289 sendEvent("executor");
292 DEBTRACE("--- executor main loop");
293 sleepWhileNoEventsFromAnyRunningTask();
294 DEBTRACE("--- events...");
295 if (debug > 2) _displayDot(graph);
296 { // --- Critical section
297 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
298 _tasks=graph->getNextTasks(isMore);
299 graph->selectRunnableTasks(_tasks);
300 FilterTasksConsideringContainers(_tasks);
301 numberAllTasks=_numberOfRunningTasks+_tasks.size();
302 } // --- End of critical section
303 if (debug > 2) _displayDot(graph);
304 if (_executorState == YACS::RUNNING)
306 if (checkBreakPoints()) break; // end of thread requested, OK to exit at once;
307 if (debug > 0) _displayDot(graph);
309 loadParallelTasks(_tasks,this);
310 if (debug > 1) _displayDot(graph);
315 if (debug > 1) _displayDot(graph);
316 { // --- Critical section
318 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
319 //It is possible that the graph is finished but it remains running tasks (it's an error but we must take it into account)
320 if(_numberOfRunningTasks == 0)
321 _toContinue = !graph->isFinished();
323 DEBTRACE("_numberOfRunningTasks: " << _numberOfRunningTasks);
324 DEBTRACE("_numberOfEndedTasks: " << _numberOfEndedTasks);
325 DEBTRACE("_toContinue: " << _toContinue);
326 if(_toContinue && numberAllTasks==0)
328 //Problem : no running tasks and no task to launch ??
330 std::cerr << "Problem in Executor : no running tasks and no task to launch ?? problemCount=" << problemCount << std::endl;
331 //Pause to give a chance to interrupt
333 if(problemCount > 25)
335 // Too much problems encountered : stop execution
342 _executorState = YACS::FINISHED;
343 sendEvent("executor");
344 _condForPilot.notify_all();
346 } // --- End of critical section
347 if (debug > 0) _displayDot(graph);
348 DEBTRACE("_toContinue: " << _toContinue);
351 DEBTRACE("End of main Loop");
353 { // --- Critical section
354 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
355 if ( _toContinue) // --- break while(): request to stop detected on checkBreakPoints()
357 DEBTRACE("stop requested: End soon");
358 _executorState = YACS::STOPPED;
360 sendEvent("executor");
362 } // --- End of critical section
363 if ( _dumpOnErrorRequested && _errorDetected)
365 saveState(_dumpErrorFile);
368 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForTrace);
371 DEBTRACE("End of RunB thread");
374 YACS::ExecutionMode Executor::getCurrentExecMode()
376 _isRunningunderExternalControl=true;
381 YACS::ExecutorState Executor::getExecutorState()
383 _isRunningunderExternalControl=true;
384 return _executorState;
388 bool Executor::isNotFinished()
390 _isRunningunderExternalControl=true;
394 //! ask to stop execution on the first node found in error
396 * \param dumpRequested produce a state dump when an error is found
397 * \param xmlFile name of file used for state dump
400 void Executor::setStopOnError(bool dumpRequested, std::string xmlFile)
402 { // --- Critical section
403 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
404 _dumpErrorFile=xmlFile;
405 _stopOnErrorRequested=true;
406 _dumpOnErrorRequested = dumpRequested;
407 if (dumpRequested && xmlFile.empty())
408 throw YACS::Exception("dump on error requested and no filename given for dump");
409 DEBTRACE("_dumpErrorFile " << _dumpErrorFile << " " << _dumpOnErrorRequested);
410 } // --- End of critical section
413 //! ask to do not stop execution on nodes found in error
417 void Executor::unsetStopOnError()
419 { // --- Critical section
420 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
421 _stopOnErrorRequested=false;
422 } // --- End of critical section
425 //! Dynamically set the current mode of execution
427 * The mode can be Continue, step by step, or stop before execution of a node
428 * defined in a list of breakpoints.
431 void Executor::setExecMode(YACS::ExecutionMode mode)
433 DEBTRACE("Executor::setExecMode(YACS::ExecutionMode mode) " << mode);
434 { // --- Critical section
435 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
436 _isRunningunderExternalControl=true;
438 } // --- End of critical section
441 //! wake up executor when in pause
443 * When Executor is in state paused or waiting for task completion, the thread
444 * running loop RunB waits on condition _condForStepByStep.
445 * Thread RunB is waken up.
446 * \return true when actually wakes up executor
449 bool Executor::resumeCurrentBreakPoint()
451 DEBTRACE("Executor::resumeCurrentBreakPoint()");
453 //bool doDump = false;
454 { // --- Critical section
455 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
456 _isRunningunderExternalControl=true;
457 DEBTRACE("_executorState: " << _executorState);
458 switch (_executorState)
460 case YACS::WAITINGTASKS:
463 _condForStepByStep.notify_all();
464 _executorState = YACS::RUNNING;
465 sendEvent("executor");
467 //if (_dumpOnErrorRequested && _errorDetected) doDump =true;
473 //if (_dumpOnErrorRequested && _errorDetected) doDump =true;
474 DEBTRACE("Graph Execution finished or stopped !");
479 // debug: no easy way to verify if main loop is acutally waiting on condition
483 //if (doDump) saveState(_dumpErrorFile);
484 } // --- End of critical section
489 //! define a list of nodes names as breakpoints in the graph
492 void Executor::setListOfBreakPoints(std::list<std::string> listOfBreakPoints)
494 DEBTRACE("Executor::setListOfBreakPoints(std::list<std::string> listOfBreakPoints)");
495 { // --- Critical section
496 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
497 _isRunningunderExternalControl=true;
498 _listOfBreakPoints = listOfBreakPoints;
499 } // --- End of critical section
503 //! Get the list of tasks to load, to define a subset to execute in step by step mode
505 * If the executor is not in mode YACS::WAITINGTASKS nor YACS::PAUSED, the list is empty.
506 * Use Executor::waitPause to wait.
508 std::list<std::string> Executor::getTasksToLoad()
510 DEBTRACE("Executor::getTasksToLoad()");
511 list<string> listOfNodesToLoad;
512 listOfNodesToLoad.clear();
513 { // --- Critical section
514 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
515 _isRunningunderExternalControl=true;
516 switch (_executorState)
518 case YACS::WAITINGTASKS:
521 listOfNodesToLoad = _listOfTasksToLoad;
524 case YACS::NOTYETINITIALIZED:
525 case YACS::INITIALISED:
534 } // --- End of critical section
535 return listOfNodesToLoad;
539 //! Define a subset of task to execute in step by step mode
541 * Behaviour is unpredictable if the list is not a subset of the list given by Executor::getTasksToLoad
542 * in the current step.
543 * If some nodes must run in parallel, they must stay together in the list.
546 bool Executor::setStepsToExecute(std::list<std::string> listToExecute)
548 DEBTRACE("Executor::setStepsToExecute(std::list<std::string> listToExecute)");
550 vector<Task *>::iterator iter;
551 vector<Task *> restrictedTasks;
552 { // --- Critical section
553 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
554 _isRunningunderExternalControl=true;
555 switch (_executorState)
557 case YACS::WAITINGTASKS:
560 for (iter=_tasksSave.begin(); iter!=_tasksSave.end(); iter++)
562 string readyNode = _mainSched->getTaskName(*iter);
563 if (find(listToExecute.begin(), listToExecute.end(), readyNode)
564 != listToExecute.end())
566 restrictedTasks.push_back(*iter);
567 DEBTRACE("node to execute " << readyNode);
571 for (iter=restrictedTasks.begin(); iter!=restrictedTasks.end(); iter++)
573 _tasks.push_back(*iter);
577 case YACS::NOTYETINITIALIZED:
578 case YACS::INITIALISED:
587 } // --- End of critical section
590 for (iter=restrictedTasks.begin(); iter!=restrictedTasks.end(); iter++)
592 _tasks.push_back(*iter);
594 for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
596 string readyNode = _mainSched->getTaskName(*iter);
597 DEBTRACE("selected node to execute " << readyNode);
603 //! suspend pilot execution until Executor is in pause or waiting tasks completion mode.
605 * Do nothing if execution is finished or in pause.
606 * Wait first step if Executor is running or in initialization.
609 void Executor::waitPause()
611 DEBTRACE("Executor::waitPause()" << _executorState);
612 { // --- Critical section
613 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
614 _isRunningunderExternalControl=true;
615 switch (_executorState)
620 case YACS::WAITINGTASKS:
625 case YACS::NOTYETINITIALIZED:
626 case YACS::INITIALISED:
629 _condForPilot.wait(_mutexForSchedulerUpdate); // wait until executor is PAUSED or WAITINGTASKS
633 } // --- End of critical section
638 * This method can be called at any time simultaneously during a RunB call.
639 * This method will wait until the executor is locked in a consistent state of a running graph.
641 * This method is expected to be called in association with resume method.
642 * The returned parameter is expected to be transfered to resume method.
644 bool Executor::suspendASAP()
646 // no AutoLocker here. It's not a bug.
647 _mutexForSchedulerUpdate.lock();
648 if(!_toContinue && _executorState==YACS::FINISHED)
649 {// execution is finished
650 _mutexForSchedulerUpdate.unLock();
651 return false;// the executor is no more running
653 //general case. Leave method with locker in locked status
658 * This method is expected to be called in association with suspendASAP method.
659 * Expected to be called just after suspendASAP with output of resume as input parameter
661 void Executor::resume(bool suspended)
664 _mutexForSchedulerUpdate.unLock();
667 //! stops the execution as soon as possible
669 void Executor::stopExecution()
671 setExecMode(YACS::STEPBYSTEP);
674 resumeCurrentBreakPoint();
677 //! save the current state of execution in an xml file
679 bool Executor::saveState(const std::string& xmlFile)
681 DEBTRACE("Executor::saveState() in " << xmlFile);
684 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
685 YACS::ENGINE::VisitorSaveState vst(_root);
686 vst.openFileDump(xmlFile.c_str());
691 catch(Exception& ex) {
692 std::cerr << ex.what() << std::endl;
697 //! not yet implemented
699 bool Executor::loadState()
701 DEBTRACE("Executor::loadState()");
702 _isRunningunderExternalControl=true;
707 static int isfile(const char *filename)
710 if (stat(filename, &buf) != 0)
712 if (!S_ISREG(buf.st_mode))
717 //! Display the graph state as a dot display, public method
719 void Executor::displayDot(Scheduler *graph)
721 _isRunningunderExternalControl=true;
725 //! Display the graph state as a dot display
727 * \param graph : the node to display
730 void Executor::_displayDot(Scheduler *graph)
732 std::ofstream g("titi");
733 ((ComposedNode*)graph)->writeDot(g);
735 const char displayScript[]="display.sh";
736 if(isfile(displayScript))
737 system("sh display.sh");
739 system("dot -Tpng titi|display -delay 5");
742 //! Wait reactivation in modes Step By step or with BreakPoints
744 * Check mode of execution (set by main thread):
745 * - YACS::CONTINUE : the graph execution continues.
746 * - YACS::STEPBYSTEP : wait on condition (pilot thread, Executor::resumeCurrentBreakPoint)
747 * - YACS::STOPBEFORENODES : if there are ready nodes in a list of breakpoints,
748 * wait on condition (pilot thread, Executor::resumeCurrentBreakPoint)
749 * else continue the graph execution.
750 * \return true if end of executor thread is requested
753 bool Executor::checkBreakPoints()
755 DEBTRACE("Executor::checkBreakPoints()");
756 vector<Task *>::iterator iter;
757 bool endRequested = false;
765 case YACS::STOPBEFORENODES:
768 { // --- Critical section
769 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
771 for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
773 string nodeToLoad = _mainSched->getTaskName(*iter);
774 if (find(_listOfBreakPoints.begin(), _listOfBreakPoints.end(), nodeToLoad)
775 != _listOfBreakPoints.end())
783 _listOfTasksToLoad.clear();
784 for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
786 string nodeToLoad = _mainSched->getTaskName(*iter);
787 _listOfTasksToLoad.push_back(nodeToLoad);
789 if (getNbOfThreads())
790 _executorState = YACS::WAITINGTASKS; // will be paused after completion of running tasks
792 _executorState = YACS::PAUSED;
793 sendEvent("executor");
794 _condForPilot.notify_all();
796 if (stop && !_isOKToEnd) waitResume(); // wait until pilot calls resumeCurrentBreakPoint(), mutex released during wait
797 if (_isOKToEnd) endRequested = true;
798 } // --- End of critical section
799 if (stop) DEBTRACE("wake up from waitResume");
803 case YACS::STEPBYSTEP:
805 { // --- Critical section
806 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
808 _listOfTasksToLoad.clear();
809 for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
811 string nodeToLoad = _mainSched->getTaskName(*iter);
812 _listOfTasksToLoad.push_back(nodeToLoad);
814 if (getNbOfThreads())
815 _executorState = YACS::WAITINGTASKS; // will be paused after completion of running tasks
817 _executorState = YACS::PAUSED;
818 sendEvent("executor");
819 _condForPilot.notify_all();
821 waitResume(); // wait until pilot calls resumeCurrentBreakPoint(), mutex released during wait
822 // or, if no pilot, wait until no more running tasks (stop on error)
823 if (_isOKToEnd) endRequested = true;
824 } // --- End of critical section
825 DEBTRACE("wake up from waitResume");
829 DEBTRACE("endRequested: " << endRequested);
834 //! in modes Step By step or with BreakPoint, wait until pilot resumes the execution
836 * With the condition Mutex, the mutex is released atomically during the wait.
837 * Pilot calls Executor::resumeCurrentBreakPoint to resume execution.
838 * Must be called while mutex is locked.
841 void Executor::waitResume()
843 DEBTRACE("Executor::waitResume()");
844 _condForStepByStep.wait(_mutexForSchedulerUpdate); // wait until pilot calls resumeCurrentBreakPoint()
849 //! Perform loading of a Task.
851 * \param task : Task to load
854 void Executor::loadTask(Task *task, const Executor *execInst)
856 DEBTRACE("Executor::loadTask(Task *task)");
857 if(task->getState() != YACS::TOLOAD)
859 traceExec(task, "state:TOLOAD", ComputePlacement(task));
861 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
862 _mainSched->notifyFrom(task,YACS::START,execInst);
863 }//End of critical section
866 traceExec(task, "load", ComputePlacement(task));
868 traceExec(task, "initService", ComputePlacement(task));
873 std::cerr << ex.what() << std::endl;
875 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
877 _mainSched->notifyFrom(task,YACS::ABORT,execInst);
878 traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
879 }//End of critical section
883 std::cerr << "Load failed" << std::endl;
885 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
887 _mainSched->notifyFrom(task,YACS::ABORT,execInst);
888 traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
889 }//End of critical section
900 void Executor::loadParallelTasks(const std::vector<Task *>& tasks, const Executor *execInst)
902 std::vector<Thread> ths(tasks.size());
903 std::size_t ithread(0);
904 for(std::vector<Task *>::const_iterator iter = _tasks.begin(); iter != _tasks.end(); iter++, ithread++)
906 DEBTRACE("Executor::loadParallelTasks(Task *task)");
907 struct threadargs *args(new threadargs);
908 args->task = (*iter);
909 args->sched = _mainSched;
910 args->execInst = this;
911 ths[ithread].go(functionForTaskLoad, args, _threadStackSize);
913 for(ithread=0;ithread<tasks.size();ithread++)
917 //! Execute a list of tasks possibly connected through datastream links
919 * \param tasks : a list of tasks to execute
922 void Executor::launchTasks(const std::vector<Task *>& tasks)
924 //First phase, make datastream connections
925 for(vector<Task *>::const_iterator iter=tasks.begin();iter!=tasks.end();iter++)
927 YACS::StatesForNode state=(*iter)->getState();
928 if(state != YACS::TOLOAD && state != YACS::TORECONNECT)continue;
931 (*iter)->connectService();
932 traceExec(*iter, "connectService",ComputePlacement(*iter));
934 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
935 (*iter)->connected();
936 }//End of critical section
940 std::cerr << ex.what() << std::endl;
943 (*iter)->disconnectService();
944 traceExec(*iter, "disconnectService",ComputePlacement(*iter));
948 // Disconnect has failed
949 traceExec(*iter, "disconnectService failed, ABORT",ComputePlacement(*iter));
952 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
954 _mainSched->notifyFrom(*iter,YACS::ABORT,this);
955 }//End of critical section
959 std::cerr << "Problem in connectService" << std::endl;
962 (*iter)->disconnectService();
963 traceExec(*iter, "disconnectService",ComputePlacement(*iter));
967 // Disconnect has failed
968 traceExec(*iter, "disconnectService failed, ABORT",ComputePlacement(*iter));
971 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
973 _mainSched->notifyFrom(*iter,YACS::ABORT,this);
974 }//End of critical section
976 if((*iter)->getState() == YACS::ERROR)
978 //try to put all coupled tasks in error
979 std::set<Task*> coupledSet;
980 (*iter)->getCoupledTasks(coupledSet);
981 for (std::set<Task*>::iterator it = coupledSet.begin(); it != coupledSet.end(); ++it)
984 if(t == *iter)continue;
985 if(t->getState() == YACS::ERROR)continue;
988 t->disconnectService();
989 traceExec(t, "disconnectService",ComputePlacement(*iter));
993 // Disconnect has failed
994 traceExec(t, "disconnectService failed, ABORT",ComputePlacement(*iter));
997 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
999 _mainSched->notifyFrom(t,YACS::ABORT,this);
1000 }//End of critical section
1001 traceExec(t, "state:"+Node::getStateName(t->getState()),ComputePlacement(*iter));
1004 traceExec(*iter, "state:"+Node::getStateName((*iter)->getState()),ComputePlacement(*iter));
1007 //Second phase, execute each task in a thread
1008 for(vector<Task *>::const_iterator iter=tasks.begin();iter!=tasks.end();iter++)
1014 //! Execute a Task in a thread
1016 * \param task : Task to execute
1018 * Calls Scheduler::notifyFrom of main node (_mainSched) to notify start
1020 * Calls Executor::functionForTaskExecution in Thread
1023 void Executor::launchTask(Task *task)
1025 DEBTRACE("Executor::launchTask(Task *task)");
1026 struct threadargs *args;
1027 if(task->getState() != YACS::TOACTIVATE)return;
1029 DEBTRACE("before _semForMaxThreads.wait " << _semThreadCnt);
1030 if(_semThreadCnt == 0)
1032 // --- Critical section
1033 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1034 //check if we have enough threads to run
1035 std::set<Task*> tmpSet=_runningTasks;
1036 std::set<Task*>::iterator it = tmpSet.begin();
1037 std::string status="running";
1038 std::set<Task*> coupledSet;
1039 while( it != tmpSet.end() )
1043 tt->getCoupledTasks(coupledSet);
1045 for (std::set<Task*>::iterator iter = coupledSet.begin(); iter != coupledSet.end(); ++iter)
1047 if((*iter)->getState() == YACS::TOACTIVATE)status="toactivate";
1048 tmpSet.erase(*iter);
1050 if(status=="running")break;
1051 it = tmpSet.begin();
1054 if(status=="toactivate")
1056 std::cerr << "WARNING: maybe you need more threads to run your schema (current value="<< _maxThreads << ")" << std::endl;
1057 std::cerr << "If it is the case, set the YACS_MAX_THREADS environment variable to a bigger value (export YACS_MAX_THREADS=xxx)" << std::endl;
1059 // --- End of critical section
1062 _semForMaxThreads.wait();
1065 args= new threadargs;
1067 args->sched = _mainSched;
1068 args->execInst = this;
1070 traceExec(task, "launch",ComputePlacement(task));
1072 { // --- Critical section
1073 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1074 _numberOfRunningTasks++;
1075 _runningTasks.insert(task);
1076 task->begin(); //change state to ACTIVATED
1077 } // --- End of critical section
1078 Thread(functionForTaskExecution, args, _threadStackSize);
1081 //! wait until a running task ends
1083 void Executor::sleepWhileNoEventsFromAnyRunningTask()
1085 DEBTRACE("Executor::sleepWhileNoEventsFromAnyRunningTask()");
1086 // _semForNewTasksToPerform.wait(); //----utiliser pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
1087 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1088 if (_numberOfRunningTasks > 0 && _numberOfEndedTasks==0)
1090 _isWaitingEventsFromRunningTasks = true;
1091 _condForNewTasksToPerform.wait(_mutexForSchedulerUpdate); // mutex released during wait
1093 _numberOfEndedTasks=0;
1099 void Executor::notifyEndOfThread(YACS::BASES::Thread *thread)
1101 /*_mutexForNbOfConcurrentThreads.lock();
1102 _groupOfAllThreadsCreated.remove(thread);
1104 _mutexForNbOfConcurrentThreads.unlock();*/
1108 //! must be used protected by _mutexForSchedulerUpdate!
1110 void Executor::wakeUp()
1112 DEBTRACE("Executor::wakeUp() " << _isWaitingEventsFromRunningTasks);
1113 if (_isWaitingEventsFromRunningTasks)
1115 _isWaitingEventsFromRunningTasks = false;
1116 _condForNewTasksToPerform.notify_all();
1119 _numberOfEndedTasks++;
1122 //! number of running tasks
1124 int Executor::getNbOfThreads()
1127 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForNbOfConcurrentThreads);
1128 _isRunningunderExternalControl=true;
1129 ret = _groupOfAllThreadsCreated.size();
1134 * This thread is NOT supposed to be detached !
1136 void *Executor::functionForTaskLoad(void *arg)
1138 DEBTRACE("Executor::functionForTaskLoad(void *arg)");
1139 struct threadargs *args = (struct threadargs *) arg;
1140 Task *task=args->task;
1141 Scheduler *sched=args->sched;
1142 Executor *execInst=args->execInst;
1144 execInst->loadTask(task,execInst);// no throw of this method - all throw are catched !
1148 //! Function to perform execution of a task in a thread
1150 * \param arg : 3 elements (a Task, a Scheduler, an Executor)
1152 * Calls Task::execute
1154 * Calls Task::finished when the task is finished
1156 * Calls (notify with event YACS::FINISH) Scheduler::notifyFrom when the task is finished
1158 * Calls Executor::wakeUp and Executor::notifyEndOfThread
1161 void *Executor::functionForTaskExecution(void *arg)
1163 DEBTRACE("Executor::functionForTaskExecution(void *arg)");
1165 struct threadargs *args = (struct threadargs *) arg;
1166 Task *task=args->task;
1167 Scheduler *sched=args->sched;
1168 Executor *execInst=args->execInst;
1170 execInst->traceExec(task, "state:"+Node::getStateName(task->getState()),ComputePlacement(task));
1176 if(execInst->getDPLScopeSensitive())
1178 Node *node(dynamic_cast<Node *>(task));
1179 ComposedNode *gfn(dynamic_cast<ComposedNode *>(sched));
1180 if(node!=0 && gfn!=0)
1181 node->applyDPLScope(gfn);
1184 YACS::Event ev=YACS::FINISH;
1187 execInst->traceExec(task, "start execution",ComputePlacement(task));
1189 execInst->traceExec(task, "end execution OK",ComputePlacement(task));
1191 catch(Exception& ex)
1193 std::cerr << "YACS Exception during execute" << std::endl;
1194 std::cerr << ex.what() << std::endl;
1196 string message = "end execution ABORT, ";
1197 message += ex.what();
1198 execInst->traceExec(task, message,ComputePlacement(task));
1202 // Execution has failed
1203 std::cerr << "Execution has failed: unknown reason" << std::endl;
1205 execInst->traceExec(task, "end execution ABORT, unknown reason",ComputePlacement(task));
1211 DEBTRACE("task->disconnectService()");
1212 task->disconnectService();
1213 execInst->traceExec(task, "disconnectService",ComputePlacement(task));
1217 // Disconnect has failed
1218 std::cerr << "disconnect has failed" << std::endl;
1220 execInst->traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
1224 std::string placement(ComputePlacement(task));
1226 // container management for HomogeneousPoolOfContainer
1228 HomogeneousPoolContainer *contC(dynamic_cast<HomogeneousPoolContainer *>(task->getContainer()));
1231 std::lock_guard<std::mutex> alckCont(contC->getLocker());
1232 contC->release(task);
1235 DEBTRACE("End task->execute()");
1236 { // --- Critical section
1237 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&execInst->_mutexForSchedulerUpdate);
1240 if (ev == YACS::FINISH) task->finished();
1241 if (ev == YACS::ABORT)
1243 execInst->_errorDetected = true;
1244 if (execInst->_stopOnErrorRequested)
1246 execInst->_execMode = YACS::STEPBYSTEP;
1247 execInst->_isOKToEnd = true;
1251 execInst->traceExec(task, "state:"+Node::getStateName(task->getState()),placement);
1252 sched->notifyFrom(task,ev,execInst);
1254 catch(Exception& ex)
1256 //notify has failed : it is supposed to have set state
1257 //so no need to do anything
1258 std::cerr << "Error during notification" << std::endl;
1259 std::cerr << ex.what() << std::endl;
1263 //notify has failed : it is supposed to have set state
1264 //so no need to do anything
1265 std::cerr << "Notification failed" << std::endl;
1267 execInst->_numberOfRunningTasks--;
1268 execInst->_runningTasks.erase(task);
1269 DEBTRACE("_numberOfRunningTasks: " << execInst->_numberOfRunningTasks
1270 << " _execMode: " << execInst->_execMode
1271 << " _executorState: " << execInst->_executorState);
1272 if ((execInst->_numberOfRunningTasks == 0) && (execInst->_execMode != YACS::CONTINUE)) // no more running tasks
1274 if (execInst->_executorState == YACS::WAITINGTASKS)
1276 execInst->_executorState = YACS::PAUSED;
1277 execInst->sendEvent("executor");
1278 execInst->_condForPilot.notify_all();
1279 if (execInst->_errorDetected &&
1280 execInst->_stopOnErrorRequested &&
1281 !execInst->_isRunningunderExternalControl)
1282 execInst->_condForStepByStep.notify_all(); // exec thread may be on waitResume
1285 DEBTRACE("before _semForMaxThreads.post " << execInst->_semThreadCnt);
1286 execInst->_semForMaxThreads.post();
1287 execInst->_semThreadCnt += 1;
1288 DEBTRACE("after _semForMaxThreads.post " << execInst->_semThreadCnt);
1289 if (execInst->_executorState != YACS::PAUSED) execInst->wakeUp();
1291 } // --- End of critical section (change state)
1293 //execInst->notifyEndOfThread(0);
1298 void Executor::traceExec(Task *task, const std::string& message, const std::string& placement)
1300 string nodeName = _mainSched->getTaskName(task);
1301 Container *cont = task->getContainer();
1302 string containerName = "---";
1304 containerName = cont->getName();
1306 std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now();
1307 std::chrono::milliseconds millisec;
1308 millisec = std::chrono::duration_cast<std::chrono::milliseconds>(now -_start);
1309 double elapse = double(millisec.count()) / 1000.0;
1311 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForTrace);
1312 _trace << elapse << " " << containerName << " " << placement << " " << nodeName << " " << message << endl;
1317 //! emit notification to all observers registered with the dispatcher
1319 * The dispatcher is unique and can be obtained by getDispatcher()
1321 void Executor::sendEvent(const std::string& event)
1323 Dispatcher* disp=Dispatcher::getDispatcher();
1326 disp->dispatch(_root,event);
1331 bool operator()(HomogeneousPoolContainer * lhs, HomogeneousPoolContainer * rhs) const
1339 return lhs->getNumberOfCoresPerWorker() < rhs->getNumberOfCoresPerWorker();
1344 * This method takes in input a list of tasks and selects from that lists a part of it considering only the containers.
1345 * If tasks have no container instance subclass of HomogeneousPoolContainer this method will let the \a tsks untouched.
1347 * \param [in,out] tsks - list of tasks to be
1349 void Executor::FilterTasksConsideringContainers(std::vector<Task *>& tsks)
1351 std::map<HomogeneousPoolContainer *, std::vector<Task *>, HPCCompare > m;
1352 for(auto cur : tsks)
1356 Container *cont(cur->getContainer());
1359 m[nullptr].push_back(cur);
1362 HomogeneousPoolContainer *contC(dynamic_cast<HomogeneousPoolContainer *>(cont));
1365 m[nullptr].push_back(cur);
1368 m[contC].push_back(cur);
1371 std::vector<Task *> ret;
1374 HomogeneousPoolContainer *curhpc(it.first);
1375 const std::vector<Task *>& curtsks(it.second);
1378 ret.insert(ret.end(),curtsks.begin(),curtsks.end());
1382 // start of critical section for container curhpc
1383 std::lock_guard<std::mutex> alckCont(curhpc->getLocker());
1384 std::vector<const Task *> vecOfTaskSharingSameHPContToBeRunSimutaneously;
1385 std::size_t sz(curhpc->getNumberOfFreePlace());
1386 std::vector<Task *>::const_iterator it2(curtsks.begin());
1387 for(std::size_t i=0;i<sz && it2!=curtsks.end();i++,it2++)
1389 vecOfTaskSharingSameHPContToBeRunSimutaneously.push_back(*it2);
1390 ret.push_back(*it2);
1392 curhpc->allocateFor(vecOfTaskSharingSameHPContToBeRunSimutaneously);
1393 //end of critical section
1400 std::string Executor::ComputePlacement(Task *zeTask)
1402 std::string placement("---");
1405 if(zeTask->getContainer())
1406 placement=zeTask->getContainer()->getFullPlacementId(zeTask);
1410 ///////// NEW EXECUTOR ////////////////////////////////
1411 void Executor::loadTask(Task *task, const WorkloadManager::RunInfo& runInfo)
1413 if(task->getState() != YACS::TOLOAD)
1415 traceExec(task, "state:TOLOAD", ComputePlacement(task));
1417 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1418 _mainSched->notifyFrom(task,YACS::START,this);
1419 }//End of critical section
1422 std::ostringstream container_name;
1423 container_name << runInfo.type.name << "-" << runInfo.index;
1424 task->imposeResource(runInfo.resource.name, container_name.str());
1425 traceExec(task, "load", ComputePlacement(task));
1427 traceExec(task, "initService", ComputePlacement(task));
1428 task->initService();
1430 catch(Exception& ex)
1432 std::cerr << ex.what() << std::endl;
1434 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1436 _mainSched->notifyFrom(task,YACS::ABORT, this);
1437 traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
1438 }//End of critical section
1442 std::cerr << "Load failed" << std::endl;
1444 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1446 _mainSched->notifyFrom(task,YACS::ABORT, this);
1447 traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
1448 }//End of critical section
1452 void Executor::beginTask(Task *task)
1454 // --- Critical section
1455 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1456 _numberOfRunningTasks++;
1457 _runningTasks.insert(task);
1458 // --- End of critical section
1461 void Executor::endTask(Task *task, YACS::Event ev)
1463 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1466 if (ev == YACS::FINISH) task->finished();
1467 if (ev == YACS::ABORT)
1469 _errorDetected = true;
1470 if (_stopOnErrorRequested)
1472 _execMode = YACS::STEPBYSTEP;
1477 //traceExec(task, "state:"+Node::getStateName(task->getState()),placement);
1478 _mainSched->notifyFrom(task,ev,this);
1480 catch(Exception& ex)
1482 //notify has failed : it is supposed to have set state
1483 //so no need to do anything
1484 std::cerr << "Error during notification" << std::endl;
1485 std::cerr << ex.what() << std::endl;
1489 //notify has failed : it is supposed to have set state
1490 //so no need to do anything
1491 std::cerr << "Notification failed" << std::endl;
1493 _numberOfRunningTasks--;
1494 _runningTasks.erase(task);
1495 if ((_numberOfRunningTasks == 0) && (_execMode != YACS::CONTINUE)) // no more running tasks
1497 if (_executorState == YACS::WAITINGTASKS)
1499 _executorState = YACS::PAUSED;
1500 sendEvent("executor");
1501 _condForPilot.notify_all();
1502 if (_errorDetected &&
1503 _stopOnErrorRequested &&
1504 !_isRunningunderExternalControl)
1505 _condForStepByStep.notify_all(); // exec thread may be on waitResume
1508 if (_executorState != YACS::PAUSED)
1512 YACS::Event Executor::runTask(Task *task)
1514 { // --- Critical section
1515 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1516 task->begin(); //change state to ACTIVATED
1518 traceExec(task, "state:"+Node::getStateName(task->getState()),ComputePlacement(task));
1520 if(getDPLScopeSensitive())
1522 Node *node(dynamic_cast<Node *>(task));
1523 ComposedNode *gfn(dynamic_cast<ComposedNode *>(_mainSched));
1524 if(node!=0 && gfn!=0)
1525 node->applyDPLScope(gfn);
1528 YACS::Event ev=YACS::FINISH;
1531 traceExec(task, "start execution",ComputePlacement(task));
1533 traceExec(task, "end execution OK",ComputePlacement(task));
1535 catch(Exception& ex)
1537 std::cerr << "YACS Exception during execute" << std::endl;
1538 std::cerr << ex.what() << std::endl;
1540 string message = "end execution ABORT, ";
1541 message += ex.what();
1542 traceExec(task, message,ComputePlacement(task));
1546 // Execution has failed
1547 std::cerr << "Execution has failed: unknown reason" << std::endl;
1549 traceExec(task, "end execution ABORT, unknown reason",ComputePlacement(task));
1555 DEBTRACE("task->disconnectService()");
1556 task->disconnectService();
1557 traceExec(task, "disconnectService",ComputePlacement(task));
1561 // Disconnect has failed
1562 std::cerr << "disconnect has failed" << std::endl;
1564 traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
1568 std::string placement(ComputePlacement(task));
1570 // container management for HomogeneousPoolOfContainer
1572 HomogeneousPoolContainer *contC(dynamic_cast<HomogeneousPoolContainer *>(task->getContainer()));
1575 std::lock_guard<std::mutex> alckCont(contC->getLocker());
1576 contC->release(task);
1582 void Executor::makeDatastreamConnections(Task *task)
1584 YACS::StatesForNode state=task->getState();
1585 if(state != YACS::TOLOAD && state != YACS::TORECONNECT)
1589 task->connectService();
1590 traceExec(task, "connectService",ComputePlacement(task));
1592 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1594 }//End of critical section
1596 catch(Exception& ex)
1598 std::cerr << ex.what() << std::endl;
1601 (task)->disconnectService();
1602 traceExec(task, "disconnectService",ComputePlacement(task));
1606 // Disconnect has failed
1607 traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
1610 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1612 _mainSched->notifyFrom(task,YACS::ABORT,this);
1613 }//End of critical section
1617 std::cerr << "Problem in connectService" << std::endl;
1620 (task)->disconnectService();
1621 traceExec(task, "disconnectService",ComputePlacement(task));
1625 // Disconnect has failed
1626 traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
1629 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1631 _mainSched->notifyFrom(task,YACS::ABORT,this);
1632 }//End of critical section
1634 if(task->getState() == YACS::ERROR)
1636 //try to put all coupled tasks in error
1637 std::set<Task*> coupledSet;
1638 task->getCoupledTasks(coupledSet);
1639 for (std::set<Task*>::iterator it = coupledSet.begin(); it != coupledSet.end(); ++it)
1642 if(t == task)continue;
1643 if(t->getState() == YACS::ERROR)continue;
1646 t->disconnectService();
1647 traceExec(t, "disconnectService",ComputePlacement(task));
1651 // Disconnect has failed
1652 traceExec(t, "disconnectService failed, ABORT",ComputePlacement(task));
1655 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1657 _mainSched->notifyFrom(t,YACS::ABORT,this);
1658 }//End of critical section
1659 traceExec(t, "state:"+Node::getStateName(t->getState()),ComputePlacement(task));
1662 traceExec(task, "state:"+Node::getStateName(task->getState()),ComputePlacement(task));
1665 void Executor::runWlm(Scheduler *graph,int debug, bool fromScratch)
1667 DEBTRACE("Executor::runWlm debug: "<< graph->getName() <<" "<< debug<<" fromScratch: "<<fromScratch);
1668 { // --- Critical section
1669 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1671 _root = dynamic_cast<ComposedNode *>(_mainSched);
1672 if (!_root) throw Exception("Executor::Run, Internal Error!");
1673 _executorState = YACS::NOTYETINITIALIZED;
1674 sendEvent("executor");
1677 _errorDetected = false;
1678 _isWaitingEventsFromRunningTasks = false;
1679 _numberOfRunningTasks = 0;
1680 _runningTasks.clear();
1681 _numberOfEndedTasks = 0;
1682 string tracefile = "traceExec_";
1683 tracefile += _mainSched->getName();
1684 _trace.open(tracefile.c_str());
1685 _start = std::chrono::steady_clock::now();
1686 } // --- End of critical section
1688 if (debug > 1) _displayDot(graph);
1695 graph->exUpdateState();
1697 catch(Exception& ex)
1699 DEBTRACE("exception: "<< (ex.what()));
1700 _executorState = YACS::FINISHED;
1701 sendEvent("executor");
1705 _executorState = YACS::INITIALISED;
1706 sendEvent("executor");
1708 if (debug > 1) _displayDot(graph);
1714 _executorState = YACS::RUNNING;
1715 sendEvent("executor");
1717 WorkloadManager::DefaultAlgorithm algo;
1718 WorkloadManager::WorkloadManager wlm(algo);
1719 WlmTask::loadResources(wlm);
1724 DEBTRACE("--- executor main loop");
1725 sleepWhileNoEventsFromAnyRunningTask();
1726 DEBTRACE("--- events...");
1727 if (debug > 2) _displayDot(graph);
1728 { // --- Critical section
1729 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1730 std::vector<Task *> readyTasks=graph->getNextTasks(isMore);
1731 graph->selectRunnableTasks(readyTasks);
1733 for(Task * t : readyTasks)
1734 if(_runningTasks.find(t) == _runningTasks.end())
1735 _tasks.push_back(t);
1736 // TODO: to be removed
1737 FilterTasksConsideringContainers(_tasks);
1738 numberAllTasks=_numberOfRunningTasks+_tasks.size();
1739 } // --- End of critical section
1740 if (debug > 2) _displayDot(graph);
1741 DEBTRACE("--- events...");
1742 if (_executorState == YACS::RUNNING)
1744 if (checkBreakPoints()) break; // end of thread requested, OK to exit at once;
1745 for(Task * task : _tasks)
1748 WlmTask* newTask = new WlmTask(*this, task);
1749 wlm.addTask(newTask);
1752 if (debug > 1) _displayDot(graph);
1753 { // --- Critical section
1755 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1756 //It is possible that the graph is finished but it remains running tasks (it's an error but we must take it into account)
1757 _toContinue = !graph->isFinished();
1759 DEBTRACE("_numberOfRunningTasks: " << _numberOfRunningTasks);
1760 DEBTRACE("_numberOfEndedTasks: " << _numberOfEndedTasks);
1761 DEBTRACE("_toContinue: " << _toContinue);
1762 if(_toContinue && numberAllTasks==0)
1764 //Problem : no running tasks and no task to launch ??
1766 std::cerr << "Problem in Executor : no running tasks and no task to launch ?? problemCount=" << problemCount << std::endl;
1767 //Pause to give a chance to interrupt
1769 if(problemCount > 25)
1771 // Too much problems encountered : stop execution
1778 _executorState = YACS::FINISHED;
1779 sendEvent("executor");
1780 _condForPilot.notify_all();
1782 } // --- End of critical section
1783 if (debug > 0) _displayDot(graph);
1784 DEBTRACE("_toContinue: " << _toContinue);
1788 DEBTRACE("End of main Loop");
1790 { // --- Critical section
1791 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1792 if ( _toContinue) // --- break while(): request to stop detected on checkBreakPoints()
1794 DEBTRACE("stop requested: End soon");
1795 _executorState = YACS::STOPPED;
1796 _toContinue = false;
1797 sendEvent("executor");
1799 } // --- End of critical section
1800 if ( _dumpOnErrorRequested && _errorDetected)
1802 saveState(_dumpErrorFile);
1805 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForTrace);
1810 void Executor::RunW(Scheduler *graph,int debug, bool fromScratch)
1812 std::string str_value = graph->getProperty("executor");
1813 if(str_value == "WorkloadManager"
1814 || str_value == "WORKLOADMANAGER"
1815 || str_value == "workloadmanager"
1816 || str_value == "WorkLoadManager")
1817 runWlm(graph, debug, fromScratch);
1819 RunB(graph, debug, fromScratch);