1 // Copyright (C) 2006-2023 CEA, EDF
3 // This library is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU Lesser General Public
5 // License as published by the Free Software Foundation; either
6 // version 2.1 of the License, or (at your option) any later version.
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 // Lesser General Public License for more details.
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
20 #include "Executor.hxx"
22 #include "AutoLocker.hxx"
23 #include "Scheduler.hxx"
24 #include "Dispatcher.hxx"
25 #include "Container.hxx"
26 #include "HomogeneousPoolContainer.hxx"
27 #include "ComponentInstance.hxx"
29 #include "VisitorSaveState.hxx"
30 #include "ServiceNode.hxx"
31 #include "ComposedNode.hxx"
33 #include "WlmTask.hxx"
34 #include "workloadmanager/WorkloadManager.hxx"
35 #include "workloadmanager/DefaultAlgorithm.hxx"
49 #define usleep(A) _sleep(A/1000)
50 #if !defined(S_ISCHR) || !defined(S_ISREG)
53 # define S_IFMT _S_IFMT
54 # define S_IFCHR _S_IFCHR
55 # define S_IFREG _S_IFREG
58 # define S_IFMT __S_IFMT
59 # define S_IFCHR __S_IFCHR
60 # define S_IFREG __S_IFREG
64 # define S_ISCHR(mode) (((mode) & S_IFMT) == S_IFCHR)
65 # define S_ISREG(mode) (((mode) & S_IFMT) == S_IFREG)
69 using namespace YACS::ENGINE;
72 using YACS::BASES::Mutex;
73 using YACS::BASES::Thread;
74 using YACS::BASES::Semaphore;
77 #include "YacsTrace.hxx"
79 int Executor::_maxThreads(1000);
80 size_t Executor::_threadStackSize(262144); // Default thread stack size is 256 kB == 2**18 because threads launched by YACS are lightweight
82 Executor::Executor():_mainSched(NULL),_isWaitingEventsFromRunningTasks(false),_nbOfConcurrentThreads(0), _semForMaxThreads(_maxThreads),_keepGoingOnFail(false),_DPLScopeSensitive(false)
87 _stopOnErrorRequested = false;
88 _dumpOnErrorRequested = false;
89 _errorDetected = false;
90 _isRunningunderExternalControl=false;
91 _executorState = YACS::NOTYETINITIALIZED;
92 _execMode = YACS::CONTINUE;
93 _semThreadCnt = _maxThreads;
94 _numberOfRunningTasks = 0;
95 _numberOfEndedTasks = 0;
96 DEBTRACE("Executor initialized with max threads = " << _maxThreads);
103 //! Execute a graph waiting for completion
105 * \param graph : schema to execute
106 * \param debug : display the graph with dot if debug == 1
107 * \param fromScratch : if true the graph is reinitialized
109 * Calls Scheduler::getNextTasks and Scheduler::selectRunnableTasks to select tasks to execute
111 * Calls Executor::launchTask to execute a selected Task.
113 * Completion when graph is finished (Scheduler::isFinished)
116 void Executor::RunA(Scheduler *graph,int debug, bool fromScratch)
118 DEBTRACE("Executor::RunW debug: " << debug << " fromScratch: " << fromScratch);
120 _root = dynamic_cast<ComposedNode *>(_mainSched);
121 if (!_root) throw Exception("Executor::Run, Internal Error!");
124 if(debug>1)_displayDot(graph);
128 graph->exUpdateState();
130 if(debug>1)_displayDot(graph);
131 vector<Task *> tasks;
132 vector<Task *>::iterator iter;
134 _execMode = YACS::CONTINUE;
135 _isWaitingEventsFromRunningTasks = false;
136 _numberOfRunningTasks = 0;
137 _runningTasks.clear();
138 _numberOfEndedTasks=0;
141 sleepWhileNoEventsFromAnyRunningTask();
143 if(debug>2)_displayDot(graph);
146 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
147 tasks=graph->getNextTasks(isMore);
148 graph->selectRunnableTasks(tasks);
149 }//End of critical section
151 if(debug>2)_displayDot(graph);
153 for(iter=tasks.begin();iter!=tasks.end();iter++)
154 loadTask(*iter,this);
156 if(debug>1)_displayDot(graph);
160 if(debug>1)_displayDot(graph);
163 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
164 _toContinue=!graph->isFinished();
165 }//End of critical section
166 DEBTRACE("_toContinue: " << _toContinue);
168 if(debug>0)_displayDot(graph);
174 //! Execute a graph with breakpoints or step by step
176 * To be launch in a thread (main thread controls the progression).
177 * \param graph : schema to execute
178 * \param debug : display the graph with dot if debug >0
179 * \param fromScratch : if false, state from a previous partial exection is already loaded
181 * Calls Scheduler::getNextTasks and Scheduler::selectRunnableTasks to select tasks to execute
183 * Calls Executor::checkBreakPoints to verify if a pause is requested
185 * Calls Executor::launchTask to execute a selected Task
187 * Completion when graph is finished (Scheduler::isFinished)
189 * States of execution:
190 * - YACS::NOTYETINITIALIZED
191 * - YACS::INITIALISED
192 * - YACS::RUNNING (to next breakpoint or step)
193 * - YACS::WAITINGTASKS (a breakpoint or step as been reached, but there are still running tasks)
194 * - YACS::PAUSED (a breakpoint or step as been reached, no more running tasks)
195 * - YACS::FINISHED (no more ready tasks, nore running tasks)
196 * - YACS::STOPPED (stopped by user before end)
198 * Modes of Execution:
199 * - YACS::CONTINUE (normal run without breakpoints)
200 * - YACS::STEPBYSTEP (pause at each loop)
201 * - YACS::STOPBEFORENODES (pause when a node is reached)
203 * A breakpoint is defined by a node name. The breakpoint is reached when the node becomes ready.
204 * Step by Step means execution node by node or group of node by group of nodes.
205 * At a given step, the user decides to launch all the ready nodes or only a subset
206 * (Caution: some nodes must run in parallel).
207 * The next event (end of task) may give a new set of ready nodes, and define a new step.
209 * The graph execution may be controled by a pilot which sends requests. Requests are asynchronous.
210 * Requests are taken into account only on certain states, otherwise return the status IgnoredRequest.
211 * - Executor::getCurrentExecMode
212 * - Executor::getExecutorState
213 * - Executor::setExecMode : change the execution mode for next loop
214 * - Executor::setListOfBreakPoints : must be set before setting YACS::STOPBEFORENODES
215 * - Executor::getTasksToLoad : when paused or waiting tasks, get the list of next tasks
216 * - Executor::setStepsToExecute : define a subset of the list given by Executor::getTasksToLoad
217 * - Executor::resumeCurrentBreakPoint : when paused or waiting tasks, resumes execution
218 * - Executor::isNotFinished
219 * - Executor::stopExecution : stop execution asap, i.e. set STEPBYSTEP and wait PAUSED
220 * - Executor::saveState : dump the current state of execution in an xml file
221 * - Executor::loadState : Not yet implemented
222 * - Executor::getNbOfThreads
223 * - Executor::displayDot
224 * - Executor::setStopOnError : ask to stop execution if a node is found in ERROR state
226 * If the pilot wants to wait the state YACS::PAUSED or YACS::WAITINGTASKS, synchronisation is obtained with:
227 * - Executor::waitPause
230 * - Pilot may connect to executor during execution, or deconnect.
231 * - Several Pilots may be connected at the same time (for observation...)
235 void Executor::RunB(Scheduler *graph,int debug, bool fromScratch)
237 DEBTRACE("Executor::RunB debug: "<< graph->getName() <<" "<< debug<<" fromScratch: "<<fromScratch);
239 { // --- Critical section
240 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
242 _root = dynamic_cast<ComposedNode *>(_mainSched);
243 if (!_root) throw Exception("Executor::Run, Internal Error!");
244 _executorState = YACS::NOTYETINITIALIZED;
245 sendEvent("executor");
248 _errorDetected = false;
249 _isWaitingEventsFromRunningTasks = false;
250 _numberOfRunningTasks = 0;
251 _runningTasks.clear();
252 _numberOfEndedTasks = 0;
253 string tracefile = "traceExec_";
254 tracefile += _mainSched->getName();
255 _trace.open(tracefile.c_str());
256 _start = std::chrono::steady_clock::now();
257 } // --- End of critical section
259 if (debug > 1) _displayDot(graph);
266 graph->exUpdateState();
270 DEBTRACE("exception: "<< (ex.what()));
271 _executorState = YACS::FINISHED;
272 sendEvent("executor");
276 _executorState = YACS::INITIALISED;
277 sendEvent("executor");
279 if (debug > 1) _displayDot(graph);
281 vector<Task *>::iterator iter;
286 _executorState = YACS::RUNNING;
287 sendEvent("executor");
290 DEBTRACE("--- executor main loop");
291 sleepWhileNoEventsFromAnyRunningTask();
292 DEBTRACE("--- events...");
293 if (debug > 2) _displayDot(graph);
294 { // --- Critical section
295 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
296 std::vector<Task *> tasks = graph->getNextTasks(isMore);
297 graph->selectRunnableTasks(tasks);
298 filterTasksConsideringContainers(tasks);
300 numberAllTasks=_numberOfRunningTasks+_tasks.size();
301 } // --- End of critical section
302 if (debug > 2) _displayDot(graph);
303 if (_executorState == YACS::RUNNING)
305 if (checkBreakPoints()) break; // end of thread requested, OK to exit at once;
306 if (debug > 0) _displayDot(graph);
308 loadParallelTasks(_tasks,this);
309 if (debug > 1) _displayDot(graph);
314 if (debug > 1) _displayDot(graph);
315 { // --- Critical section
317 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
318 //It is possible that the graph is finished but it remains running tasks (it's an error but we must take it into account)
319 if(_numberOfRunningTasks == 0)
320 _toContinue = !graph->isFinished();
322 DEBTRACE("_numberOfRunningTasks: " << _numberOfRunningTasks);
323 DEBTRACE("_numberOfEndedTasks: " << _numberOfEndedTasks);
324 DEBTRACE("_toContinue: " << _toContinue);
325 if(_toContinue && numberAllTasks==0)
327 //Problem : no running tasks and no task to launch ??
329 std::cerr << "Problem in Executor : no running tasks and no task to launch ?? problemCount=" << problemCount << std::endl;
330 //Pause to give a chance to interrupt
332 if(problemCount > 25)
334 // Too much problems encountered : stop execution
341 _executorState = YACS::FINISHED;
342 sendEvent("executor");
343 _condForPilot.notify_all();
345 } // --- End of critical section
346 if (debug > 0) _displayDot(graph);
347 DEBTRACE("_toContinue: " << _toContinue);
350 DEBTRACE("End of main Loop");
352 { // --- Critical section
353 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
354 if ( _toContinue) // --- break while(): request to stop detected on checkBreakPoints()
356 DEBTRACE("stop requested: End soon");
357 _executorState = YACS::STOPPED;
359 sendEvent("executor");
361 } // --- End of critical section
362 if ( _dumpOnErrorRequested && _errorDetected)
364 saveState(_dumpErrorFile);
367 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForTrace);
370 DEBTRACE("End of RunB thread");
373 YACS::ExecutionMode Executor::getCurrentExecMode()
375 _isRunningunderExternalControl=true;
380 YACS::ExecutorState Executor::getExecutorState()
382 _isRunningunderExternalControl=true;
383 return _executorState;
387 bool Executor::isNotFinished()
389 _isRunningunderExternalControl=true;
393 //! ask to stop execution on the first node found in error
395 * \param dumpRequested produce a state dump when an error is found
396 * \param xmlFile name of file used for state dump
399 void Executor::setStopOnError(bool dumpRequested, std::string xmlFile)
401 { // --- Critical section
402 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
403 _dumpErrorFile=xmlFile;
404 _stopOnErrorRequested=true;
405 _dumpOnErrorRequested = dumpRequested;
406 if (dumpRequested && xmlFile.empty())
407 throw YACS::Exception("dump on error requested and no filename given for dump");
408 DEBTRACE("_dumpErrorFile " << _dumpErrorFile << " " << _dumpOnErrorRequested);
409 } // --- End of critical section
412 //! ask to do not stop execution on nodes found in error
416 void Executor::unsetStopOnError()
418 { // --- Critical section
419 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
420 _stopOnErrorRequested=false;
421 } // --- End of critical section
424 //! Dynamically set the current mode of execution
426 * The mode can be Continue, step by step, or stop before execution of a node
427 * defined in a list of breakpoints.
430 void Executor::setExecMode(YACS::ExecutionMode mode)
432 DEBTRACE("Executor::setExecMode(YACS::ExecutionMode mode) " << mode);
433 { // --- Critical section
434 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
435 _isRunningunderExternalControl=true;
437 } // --- End of critical section
440 //! wake up executor when in pause
442 * When Executor is in state paused or waiting for task completion, the thread
443 * running loop RunB waits on condition _condForStepByStep.
444 * Thread RunB is waken up.
445 * \return true when actually wakes up executor
448 bool Executor::resumeCurrentBreakPoint()
450 DEBTRACE("Executor::resumeCurrentBreakPoint()");
452 //bool doDump = false;
453 { // --- Critical section
454 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
455 _isRunningunderExternalControl=true;
456 DEBTRACE("_executorState: " << _executorState);
457 switch (_executorState)
459 case YACS::WAITINGTASKS:
462 _condForStepByStep.notify_all();
463 _executorState = YACS::RUNNING;
464 sendEvent("executor");
466 //if (_dumpOnErrorRequested && _errorDetected) doDump =true;
472 //if (_dumpOnErrorRequested && _errorDetected) doDump =true;
473 DEBTRACE("Graph Execution finished or stopped !");
478 // debug: no easy way to verify if main loop is acutally waiting on condition
482 //if (doDump) saveState(_dumpErrorFile);
483 } // --- End of critical section
488 //! define a list of nodes names as breakpoints in the graph
491 void Executor::setListOfBreakPoints(std::list<std::string> listOfBreakPoints)
493 DEBTRACE("Executor::setListOfBreakPoints(std::list<std::string> listOfBreakPoints)");
494 { // --- Critical section
495 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
496 _isRunningunderExternalControl=true;
497 _listOfBreakPoints = listOfBreakPoints;
498 } // --- End of critical section
502 //! Get the list of tasks to load, to define a subset to execute in step by step mode
504 * If the executor is not in mode YACS::WAITINGTASKS nor YACS::PAUSED, the list is empty.
505 * Use Executor::waitPause to wait.
507 std::list<std::string> Executor::getTasksToLoad()
509 DEBTRACE("Executor::getTasksToLoad()");
510 list<string> listOfNodesToLoad;
511 listOfNodesToLoad.clear();
512 { // --- Critical section
513 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
514 _isRunningunderExternalControl=true;
515 switch (_executorState)
517 case YACS::WAITINGTASKS:
520 listOfNodesToLoad = _listOfTasksToLoad;
523 case YACS::NOTYETINITIALIZED:
524 case YACS::INITIALISED:
533 } // --- End of critical section
534 return listOfNodesToLoad;
538 //! Define a subset of task to execute in step by step mode
540 * Behaviour is unpredictable if the list is not a subset of the list given by Executor::getTasksToLoad
541 * in the current step.
542 * If some nodes must run in parallel, they must stay together in the list.
545 bool Executor::setStepsToExecute(std::list<std::string> listToExecute)
547 DEBTRACE("Executor::setStepsToExecute(std::list<std::string> listToExecute)");
549 vector<Task *>::iterator iter;
550 vector<Task *> restrictedTasks;
551 { // --- Critical section
552 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
553 _isRunningunderExternalControl=true;
554 switch (_executorState)
556 case YACS::WAITINGTASKS:
559 for (iter=_tasksSave.begin(); iter!=_tasksSave.end(); iter++)
561 string readyNode = _mainSched->getTaskName(*iter);
562 if (find(listToExecute.begin(), listToExecute.end(), readyNode)
563 != listToExecute.end())
565 restrictedTasks.push_back(*iter);
566 DEBTRACE("node to execute " << readyNode);
570 for (iter=restrictedTasks.begin(); iter!=restrictedTasks.end(); iter++)
572 _tasks.push_back(*iter);
576 case YACS::NOTYETINITIALIZED:
577 case YACS::INITIALISED:
586 } // --- End of critical section
589 for (iter=restrictedTasks.begin(); iter!=restrictedTasks.end(); iter++)
591 _tasks.push_back(*iter);
593 for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
595 string readyNode = _mainSched->getTaskName(*iter);
596 DEBTRACE("selected node to execute " << readyNode);
602 //! suspend pilot execution until Executor is in pause or waiting tasks completion mode.
604 * Do nothing if execution is finished or in pause.
605 * Wait first step if Executor is running or in initialization.
608 void Executor::waitPause()
610 DEBTRACE("Executor::waitPause()" << _executorState);
611 { // --- Critical section
612 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
613 _isRunningunderExternalControl=true;
614 switch (_executorState)
619 case YACS::WAITINGTASKS:
624 case YACS::NOTYETINITIALIZED:
625 case YACS::INITIALISED:
628 _condForPilot.wait(_mutexForSchedulerUpdate); // wait until executor is PAUSED or WAITINGTASKS
632 } // --- End of critical section
637 * This method can be called at any time simultaneously during a RunB call.
638 * This method will wait until the executor is locked in a consistent state of a running graph.
640 * This method is expected to be called in association with resume method.
641 * The returned parameter is expected to be transfered to resume method.
643 bool Executor::suspendASAP()
645 // no AutoLocker here. It's not a bug.
646 _mutexForSchedulerUpdate.lock();
647 if(!_toContinue && _executorState==YACS::FINISHED)
648 {// execution is finished
649 _mutexForSchedulerUpdate.unLock();
650 return false;// the executor is no more running
652 //general case. Leave method with locker in locked status
657 * This method is expected to be called in association with suspendASAP method.
658 * Expected to be called just after suspendASAP with output of resume as input parameter
660 void Executor::resume(bool suspended)
663 _mutexForSchedulerUpdate.unLock();
666 //! stops the execution as soon as possible
668 void Executor::stopExecution()
670 setExecMode(YACS::STEPBYSTEP);
673 resumeCurrentBreakPoint();
676 //! save the current state of execution in an xml file
678 bool Executor::saveState(const std::string& xmlFile)
680 DEBTRACE("Executor::saveState() in " << xmlFile);
683 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
684 YACS::ENGINE::VisitorSaveState vst(_root);
685 vst.openFileDump(xmlFile.c_str());
690 catch(Exception& ex) {
691 std::cerr << ex.what() << std::endl;
696 //! not yet implemented
698 bool Executor::loadState()
700 DEBTRACE("Executor::loadState()");
701 _isRunningunderExternalControl=true;
706 static int isfile(const char *filename)
709 if (stat(filename, &buf) != 0)
711 if (!S_ISREG(buf.st_mode))
716 //! Display the graph state as a dot display, public method
718 void Executor::displayDot(Scheduler *graph)
720 _isRunningunderExternalControl=true;
724 //! Display the graph state as a dot display
726 * \param graph : the node to display
729 void Executor::_displayDot(Scheduler *graph)
731 std::ofstream g("titi");
732 ((ComposedNode*)graph)->writeDot(g);
734 const char displayScript[]="display.sh";
735 if(isfile(displayScript))
736 system("sh display.sh");
738 system("dot -Tpng titi|display -delay 5");
741 //! Wait reactivation in modes Step By step or with BreakPoints
743 * Check mode of execution (set by main thread):
744 * - YACS::CONTINUE : the graph execution continues.
745 * - YACS::STEPBYSTEP : wait on condition (pilot thread, Executor::resumeCurrentBreakPoint)
746 * - YACS::STOPBEFORENODES : if there are ready nodes in a list of breakpoints,
747 * wait on condition (pilot thread, Executor::resumeCurrentBreakPoint)
748 * else continue the graph execution.
749 * \return true if end of executor thread is requested
752 bool Executor::checkBreakPoints()
754 DEBTRACE("Executor::checkBreakPoints()");
755 vector<Task *>::iterator iter;
756 bool endRequested = false;
764 case YACS::STOPBEFORENODES:
767 { // --- Critical section
768 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
770 for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
772 string nodeToLoad = _mainSched->getTaskName(*iter);
773 if (find(_listOfBreakPoints.begin(), _listOfBreakPoints.end(), nodeToLoad)
774 != _listOfBreakPoints.end())
782 _listOfTasksToLoad.clear();
783 for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
785 string nodeToLoad = _mainSched->getTaskName(*iter);
786 _listOfTasksToLoad.push_back(nodeToLoad);
788 if (getNbOfThreads())
789 _executorState = YACS::WAITINGTASKS; // will be paused after completion of running tasks
791 _executorState = YACS::PAUSED;
792 sendEvent("executor");
793 _condForPilot.notify_all();
795 if (stop && !_isOKToEnd) waitResume(); // wait until pilot calls resumeCurrentBreakPoint(), mutex released during wait
796 if (_isOKToEnd) endRequested = true;
797 } // --- End of critical section
798 if (stop) DEBTRACE("wake up from waitResume");
802 case YACS::STEPBYSTEP:
804 { // --- Critical section
805 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
807 _listOfTasksToLoad.clear();
808 for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
810 string nodeToLoad = _mainSched->getTaskName(*iter);
811 _listOfTasksToLoad.push_back(nodeToLoad);
813 if (getNbOfThreads())
814 _executorState = YACS::WAITINGTASKS; // will be paused after completion of running tasks
816 _executorState = YACS::PAUSED;
817 sendEvent("executor");
818 _condForPilot.notify_all();
820 waitResume(); // wait until pilot calls resumeCurrentBreakPoint(), mutex released during wait
821 // or, if no pilot, wait until no more running tasks (stop on error)
822 if (_isOKToEnd) endRequested = true;
823 } // --- End of critical section
824 DEBTRACE("wake up from waitResume");
828 DEBTRACE("endRequested: " << endRequested);
833 //! in modes Step By step or with BreakPoint, wait until pilot resumes the execution
835 * With the condition Mutex, the mutex is released atomically during the wait.
836 * Pilot calls Executor::resumeCurrentBreakPoint to resume execution.
837 * Must be called while mutex is locked.
840 void Executor::waitResume()
842 DEBTRACE("Executor::waitResume()");
843 _condForStepByStep.wait(_mutexForSchedulerUpdate); // wait until pilot calls resumeCurrentBreakPoint()
848 //! Perform loading of a Task.
850 * \param task : Task to load
853 void Executor::loadTask(Task *task, const Executor *execInst)
855 DEBTRACE("Executor::loadTask(Task *task)");
856 if(task->getState() != YACS::TOLOAD)
858 traceExec(task, "state:TOLOAD", ComputePlacement(task));
860 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
861 _mainSched->notifyFrom(task,YACS::START,execInst);
862 }//End of critical section
865 traceExec(task, "load", ComputePlacement(task));
867 traceExec(task, "initService", ComputePlacement(task));
872 std::cerr << ex.what() << std::endl;
874 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
876 _mainSched->notifyFrom(task,YACS::ABORT,execInst);
877 traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
878 }//End of critical section
882 std::cerr << "Load failed" << std::endl;
884 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
886 _mainSched->notifyFrom(task,YACS::ABORT,execInst);
887 traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
888 }//End of critical section
899 void Executor::loadParallelTasks(const std::vector<Task *>& tasks, const Executor *execInst)
901 std::vector<Thread> ths(tasks.size());
902 std::size_t ithread(0);
903 for(std::vector<Task *>::const_iterator iter = _tasks.begin(); iter != _tasks.end(); iter++, ithread++)
905 DEBTRACE("Executor::loadParallelTasks(Task *task)");
906 struct threadargs *args(new threadargs);
907 args->task = (*iter);
908 args->sched = _mainSched;
909 args->execInst = this;
910 ths[ithread].go(functionForTaskLoad, args, _threadStackSize);
912 for(ithread=0;ithread<tasks.size();ithread++)
916 //! Execute a list of tasks possibly connected through datastream links
918 * \param tasks : a list of tasks to execute
921 void Executor::launchTasks(const std::vector<Task *>& tasks)
923 //First phase, make datastream connections
924 for(vector<Task *>::const_iterator iter=tasks.begin();iter!=tasks.end();iter++)
926 YACS::StatesForNode state=(*iter)->getState();
927 if(state != YACS::TOLOAD && state != YACS::TORECONNECT)continue;
930 (*iter)->connectService();
931 traceExec(*iter, "connectService",ComputePlacement(*iter));
933 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
934 (*iter)->connected();
935 }//End of critical section
939 std::cerr << ex.what() << std::endl;
942 (*iter)->disconnectService();
943 traceExec(*iter, "disconnectService",ComputePlacement(*iter));
947 // Disconnect has failed
948 traceExec(*iter, "disconnectService failed, ABORT",ComputePlacement(*iter));
951 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
953 _mainSched->notifyFrom(*iter,YACS::ABORT,this);
954 }//End of critical section
958 std::cerr << "Problem in connectService" << std::endl;
961 (*iter)->disconnectService();
962 traceExec(*iter, "disconnectService",ComputePlacement(*iter));
966 // Disconnect has failed
967 traceExec(*iter, "disconnectService failed, ABORT",ComputePlacement(*iter));
970 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
972 _mainSched->notifyFrom(*iter,YACS::ABORT,this);
973 }//End of critical section
975 if((*iter)->getState() == YACS::ERROR)
977 //try to put all coupled tasks in error
978 std::set<Task*> coupledSet;
979 (*iter)->getCoupledTasks(coupledSet);
980 for (std::set<Task*>::iterator it = coupledSet.begin(); it != coupledSet.end(); ++it)
983 if(t == *iter)continue;
984 if(t->getState() == YACS::ERROR)continue;
987 t->disconnectService();
988 traceExec(t, "disconnectService",ComputePlacement(*iter));
992 // Disconnect has failed
993 traceExec(t, "disconnectService failed, ABORT",ComputePlacement(*iter));
996 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
998 _mainSched->notifyFrom(t,YACS::ABORT,this);
999 }//End of critical section
1000 traceExec(t, "state:"+Node::getStateName(t->getState()),ComputePlacement(*iter));
1003 traceExec(*iter, "state:"+Node::getStateName((*iter)->getState()),ComputePlacement(*iter));
1006 //Second phase, execute each task in a thread
1007 for(vector<Task *>::const_iterator iter=tasks.begin();iter!=tasks.end();iter++)
1013 //! Execute a Task in a thread
1015 * \param task : Task to execute
1017 * Calls Scheduler::notifyFrom of main node (_mainSched) to notify start
1019 * Calls Executor::functionForTaskExecution in Thread
1022 void Executor::launchTask(Task *task)
1024 DEBTRACE("Executor::launchTask(Task *task)");
1025 struct threadargs *args;
1026 if(task->getState() != YACS::TOACTIVATE)return;
1028 DEBTRACE("before _semForMaxThreads.wait " << _semThreadCnt);
1029 if(_semThreadCnt == 0)
1031 // --- Critical section
1032 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1033 //check if we have enough threads to run
1034 std::set<Task*> tmpSet=_runningTasks;
1035 std::set<Task*>::iterator it = tmpSet.begin();
1036 std::string status="running";
1037 std::set<Task*> coupledSet;
1038 while( it != tmpSet.end() )
1042 tt->getCoupledTasks(coupledSet);
1044 for (std::set<Task*>::iterator iter = coupledSet.begin(); iter != coupledSet.end(); ++iter)
1046 if((*iter)->getState() == YACS::TOACTIVATE)status="toactivate";
1047 tmpSet.erase(*iter);
1049 if(status=="running")break;
1050 it = tmpSet.begin();
1053 if(status=="toactivate")
1055 std::cerr << "WARNING: maybe you need more threads to run your schema (current value="<< _maxThreads << ")" << std::endl;
1056 std::cerr << "If it is the case, set the YACS_MAX_THREADS environment variable to a bigger value (export YACS_MAX_THREADS=xxx)" << std::endl;
1058 // --- End of critical section
1061 _semForMaxThreads.wait();
1064 args= new threadargs;
1066 args->sched = _mainSched;
1067 args->execInst = this;
1069 traceExec(task, "launch",ComputePlacement(task));
1071 { // --- Critical section
1072 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1073 _numberOfRunningTasks++;
1074 _runningTasks.insert(task);
1075 task->begin(); //change state to ACTIVATED
1076 } // --- End of critical section
1077 Thread(functionForTaskExecution, args, _threadStackSize);
1080 //! wait until a running task ends
1082 void Executor::sleepWhileNoEventsFromAnyRunningTask()
1084 DEBTRACE("Executor::sleepWhileNoEventsFromAnyRunningTask()");
1085 // _semForNewTasksToPerform.wait(); //----utiliser pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
1086 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1087 if (_numberOfRunningTasks > 0 && _numberOfEndedTasks==0)
1089 _isWaitingEventsFromRunningTasks = true;
1090 _condForNewTasksToPerform.wait(_mutexForSchedulerUpdate); // mutex released during wait
1092 _numberOfEndedTasks=0;
1097 //! must be used protected by _mutexForSchedulerUpdate!
1099 void Executor::wakeUp()
1101 DEBTRACE("Executor::wakeUp() " << _isWaitingEventsFromRunningTasks);
1102 if (_isWaitingEventsFromRunningTasks)
1104 _isWaitingEventsFromRunningTasks = false;
1105 _condForNewTasksToPerform.notify_all();
1108 _numberOfEndedTasks++;
1111 int Executor::getMaxNbOfThreads() const
1113 return (int)_maxNbThreads;
1116 void Executor::setMaxNbOfThreads(int maxNbThreads)
1118 _maxNbThreads = static_cast< std::uint32_t >(maxNbThreads);
1121 int Executor::getNbOfThreads()
1124 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForNbOfConcurrentThreads);
1125 _isRunningunderExternalControl=true;
1129 //! number of running tasks
1130 int Executor::getNumberOfRunningTasks()
1132 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1133 _isRunningunderExternalControl=true;
1134 return _numberOfRunningTasks;
1138 * This thread is NOT supposed to be detached !
1140 void *Executor::functionForTaskLoad(void *arg)
1142 DEBTRACE("Executor::functionForTaskLoad(void *arg)");
1143 struct threadargs *args = (struct threadargs *) arg;
1144 Task *task=args->task;
1145 Scheduler *sched=args->sched;
1146 Executor *execInst=args->execInst;
1148 execInst->loadTask(task,execInst);// no throw of this method - all throw are catched !
1152 //! Function to perform execution of a task in a thread
1154 * \param arg : 3 elements (a Task, a Scheduler, an Executor)
1156 * Calls Task::execute
1158 * Calls Task::finished when the task is finished
1160 * Calls (notify with event YACS::FINISH) Scheduler::notifyFrom when the task is finished
1162 * Calls Executor::wakeUp and Executor::notifyEndOfThread
1165 void *Executor::functionForTaskExecution(void *arg)
1167 DEBTRACE("Executor::functionForTaskExecution(void *arg)");
1169 struct threadargs *args = (struct threadargs *) arg;
1170 Task *task=args->task;
1171 Scheduler *sched=args->sched;
1172 Executor *execInst=args->execInst;
1174 execInst->traceExec(task, "state:"+Node::getStateName(task->getState()),ComputePlacement(task));
1180 if(execInst->getDPLScopeSensitive())
1182 Node *node(dynamic_cast<Node *>(task));
1183 ComposedNode *gfn(dynamic_cast<ComposedNode *>(sched));
1184 if(node!=0 && gfn!=0)
1185 node->applyDPLScope(gfn);
1188 YACS::Event ev=YACS::FINISH;
1191 execInst->traceExec(task, "start execution",ComputePlacement(task));
1193 execInst->traceExec(task, "end execution OK",ComputePlacement(task));
1195 catch(Exception& ex)
1197 std::cerr << "YACS Exception during execute" << std::endl;
1198 std::cerr << ex.what() << std::endl;
1200 string message = "end execution ABORT, ";
1201 message += ex.what();
1202 execInst->traceExec(task, message,ComputePlacement(task));
1206 // Execution has failed
1207 std::cerr << "Execution has failed: unknown reason" << std::endl;
1209 execInst->traceExec(task, "end execution ABORT, unknown reason",ComputePlacement(task));
1215 DEBTRACE("task->disconnectService()");
1216 task->disconnectService();
1217 execInst->traceExec(task, "disconnectService",ComputePlacement(task));
1221 // Disconnect has failed
1222 std::cerr << "disconnect has failed" << std::endl;
1224 execInst->traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
1228 std::string placement(ComputePlacement(task));
1230 // container management for HomogeneousPoolOfContainer
1232 HomogeneousPoolContainer *contC(dynamic_cast<HomogeneousPoolContainer *>(task->getContainer()));
1235 std::lock_guard<std::mutex> alckCont(contC->getLocker());
1236 contC->release(task);
1239 DEBTRACE("End task->execute()");
1240 { // --- Critical section
1241 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&execInst->_mutexForSchedulerUpdate);
1244 if (ev == YACS::FINISH) task->finished();
1245 if (ev == YACS::ABORT)
1247 execInst->_errorDetected = true;
1248 if (execInst->_stopOnErrorRequested)
1250 execInst->_execMode = YACS::STEPBYSTEP;
1251 execInst->_isOKToEnd = true;
1255 execInst->traceExec(task, "state:"+Node::getStateName(task->getState()),placement);
1256 sched->notifyFrom(task,ev,execInst);
1258 catch(Exception& ex)
1260 //notify has failed : it is supposed to have set state
1261 //so no need to do anything
1262 std::cerr << "Error during notification" << std::endl;
1263 std::cerr << ex.what() << std::endl;
1267 //notify has failed : it is supposed to have set state
1268 //so no need to do anything
1269 std::cerr << "Notification failed" << std::endl;
1271 execInst->_numberOfRunningTasks--;
1272 execInst->_runningTasks.erase(task);
1273 DEBTRACE("_numberOfRunningTasks: " << execInst->_numberOfRunningTasks
1274 << " _execMode: " << execInst->_execMode
1275 << " _executorState: " << execInst->_executorState);
1276 if ((execInst->_numberOfRunningTasks == 0) && (execInst->_execMode != YACS::CONTINUE)) // no more running tasks
1278 if (execInst->_executorState == YACS::WAITINGTASKS)
1280 execInst->_executorState = YACS::PAUSED;
1281 execInst->sendEvent("executor");
1282 execInst->_condForPilot.notify_all();
1283 if (execInst->_errorDetected &&
1284 execInst->_stopOnErrorRequested &&
1285 !execInst->_isRunningunderExternalControl)
1286 execInst->_condForStepByStep.notify_all(); // exec thread may be on waitResume
1289 DEBTRACE("before _semForMaxThreads.post " << execInst->_semThreadCnt);
1290 execInst->_semForMaxThreads.post();
1291 execInst->_semThreadCnt += 1;
1292 DEBTRACE("after _semForMaxThreads.post " << execInst->_semThreadCnt);
1293 if (execInst->_executorState != YACS::PAUSED) execInst->wakeUp();
1295 } // --- End of critical section (change state)
1301 void Executor::traceExec(Task *task, const std::string& message, const std::string& placement)
1303 string nodeName = _mainSched->getTaskName(task);
1304 Container *cont = task->getContainer();
1305 string containerName = "---";
1307 containerName = cont->getName();
1309 std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now();
1310 std::chrono::milliseconds millisec;
1311 millisec = std::chrono::duration_cast<std::chrono::milliseconds>(now -_start);
1312 double elapse = double(millisec.count()) / 1000.0;
1314 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForTrace);
1315 _trace << elapse << " " << containerName << " " << placement << " " << nodeName << " " << message << endl;
1320 //! emit notification to all observers registered with the dispatcher
1322 * The dispatcher is unique and can be obtained by getDispatcher()
1324 void Executor::sendEvent(const std::string& event)
1326 Dispatcher* disp=Dispatcher::getDispatcher();
1329 disp->dispatch(_root,event);
1334 bool operator()(HomogeneousPoolContainer * lhs, HomogeneousPoolContainer * rhs) const
1342 return lhs->getNumberOfCoresPerWorker() < rhs->getNumberOfCoresPerWorker();
1347 * This method takes in input a list of tasks and selects from that lists a part of it considering only the containers.
1348 * If tasks have no container instance subclass of HomogeneousPoolContainer this method will let the \a tsks untouched.
1350 * \param [in,out] tsks - list of tasks to be
1352 void Executor::filterTasksConsideringContainers(std::vector<Task *>& tsks)
1354 std::map<HomogeneousPoolContainer *, std::vector<Task *>, HPCCompare > m;
1355 for(auto cur : tsks)
1359 Container *cont(cur->getContainer());
1362 m[nullptr].push_back(cur);
1365 HomogeneousPoolContainer *contC(dynamic_cast<HomogeneousPoolContainer *>(cont));
1368 m[nullptr].push_back(cur);
1371 m[contC].push_back(cur);
1374 std::vector<Task *> ret;
1377 HomogeneousPoolContainer *curhpc(it.first);
1378 const std::vector<Task *>& curtsks(it.second);
1381 std::uint32_t nbThreadsRunning = _runningTasks.size();
1382 std::uint32_t nbOfFreeSpace = _maxNbThreads - min(_maxNbThreads,nbThreadsRunning);
1383 std::uint32_t nbOfCandidates = static_cast<std::uint32_t>( curtsks.size() );
1384 std::uint32_t nbOfCandidatesToBeLaunched = std::min(nbOfCandidates,nbOfFreeSpace);
1385 DEBTRACE("nb threads running: " << nbThreadsRunning);
1386 DEBTRACE("MaxNbThreads: " << _maxNbThreads);
1387 DEBTRACE("nbOfFreeSpace: " << nbOfFreeSpace);
1388 DEBTRACE("nbOfCandidates: " << nbOfCandidates);
1389 DEBTRACE("nbOfCandidatesToBeLaunched: " << nbOfCandidatesToBeLaunched);
1390 ret.insert(ret.end(),curtsks.begin(),curtsks.begin() + nbOfCandidatesToBeLaunched);
1394 // start of critical section for container curhpc
1395 std::lock_guard<std::mutex> alckCont(curhpc->getLocker());
1396 std::vector<const Task *> vecOfTaskSharingSameHPContToBeRunSimutaneously;
1397 std::size_t sz(curhpc->getNumberOfFreePlace());
1398 std::vector<Task *>::const_iterator it2(curtsks.begin());
1399 for(std::size_t i=0;i<sz && it2!=curtsks.end();i++,it2++)
1401 vecOfTaskSharingSameHPContToBeRunSimutaneously.push_back(*it2);
1402 ret.push_back(*it2);
1404 curhpc->allocateFor(vecOfTaskSharingSameHPContToBeRunSimutaneously);
1405 //end of critical section
1412 std::string Executor::ComputePlacement(Task *zeTask)
1414 std::string placement("---");
1417 if(zeTask->getContainer())
1418 placement=zeTask->getContainer()->getFullPlacementId(zeTask);
1422 ///////// NEW EXECUTOR ////////////////////////////////
1423 void Executor::loadTask(Task *task, const WorkloadManager::RunInfo& runInfo)
1425 if(task->getState() != YACS::TOLOAD)
1427 traceExec(task, "state:TOLOAD", ComputePlacement(task));
1429 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1430 _mainSched->notifyFrom(task,YACS::START,this);
1431 }//End of critical section
1434 std::ostringstream container_name;
1435 container_name << runInfo.type.name << "-" << runInfo.index;
1436 task->imposeResource(runInfo.resource.name, container_name.str());
1437 traceExec(task, "load", ComputePlacement(task));
1439 traceExec(task, "initService", ComputePlacement(task));
1440 task->initService();
1442 catch(Exception& ex)
1444 std::cerr << ex.what() << std::endl;
1446 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1448 _mainSched->notifyFrom(task,YACS::ABORT, this);
1449 traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
1450 }//End of critical section
1454 std::cerr << "Load failed" << std::endl;
1456 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1458 _mainSched->notifyFrom(task,YACS::ABORT, this);
1459 traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
1460 }//End of critical section
1464 void Executor::beginTask(Task *task)
1466 // --- Critical section
1467 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1468 _numberOfRunningTasks++;
1469 _runningTasks.insert(task);
1470 // --- End of critical section
1473 void Executor::endTask(Task *task, YACS::Event ev)
1475 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1478 if (ev == YACS::FINISH) task->finished();
1479 if (ev == YACS::ABORT)
1481 _errorDetected = true;
1482 if (_stopOnErrorRequested)
1484 _execMode = YACS::STEPBYSTEP;
1489 //traceExec(task, "state:"+Node::getStateName(task->getState()),placement);
1490 _mainSched->notifyFrom(task,ev,this);
1492 catch(Exception& ex)
1494 //notify has failed : it is supposed to have set state
1495 //so no need to do anything
1496 std::cerr << "Error during notification" << std::endl;
1497 std::cerr << ex.what() << std::endl;
1501 //notify has failed : it is supposed to have set state
1502 //so no need to do anything
1503 std::cerr << "Notification failed" << std::endl;
1505 _numberOfRunningTasks--;
1506 _runningTasks.erase(task);
1507 if ((_numberOfRunningTasks == 0) && (_execMode != YACS::CONTINUE)) // no more running tasks
1509 if (_executorState == YACS::WAITINGTASKS)
1511 _executorState = YACS::PAUSED;
1512 sendEvent("executor");
1513 _condForPilot.notify_all();
1514 if (_errorDetected &&
1515 _stopOnErrorRequested &&
1516 !_isRunningunderExternalControl)
1517 _condForStepByStep.notify_all(); // exec thread may be on waitResume
1520 if (_executorState != YACS::PAUSED)
1524 void Executor::failTask(Task *task, const std::string& message)
1526 ElementaryNode* elemNode = dynamic_cast<ElementaryNode*>(task);
1527 if(elemNode != nullptr)
1529 StateLoader(elemNode, YACS::ERROR);
1530 elemNode->setErrorDetails(message);
1532 endTask(task, YACS::ABORT);
1535 YACS::Event Executor::runTask(Task *task)
1537 { // --- Critical section
1538 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1539 task->begin(); //change state to ACTIVATED
1541 traceExec(task, "state:"+Node::getStateName(task->getState()),ComputePlacement(task));
1543 if(getDPLScopeSensitive())
1545 Node *node(dynamic_cast<Node *>(task));
1546 ComposedNode *gfn(dynamic_cast<ComposedNode *>(_mainSched));
1547 if(node!=0 && gfn!=0)
1548 node->applyDPLScope(gfn);
1551 YACS::Event ev=YACS::FINISH;
1554 traceExec(task, "start execution",ComputePlacement(task));
1556 traceExec(task, "end execution OK",ComputePlacement(task));
1558 catch(Exception& ex)
1560 std::cerr << "YACS Exception during execute" << std::endl;
1561 std::cerr << ex.what() << std::endl;
1563 string message = "end execution ABORT, ";
1564 message += ex.what();
1565 traceExec(task, message,ComputePlacement(task));
1569 // Execution has failed
1570 std::cerr << "Execution has failed: unknown reason" << std::endl;
1572 traceExec(task, "end execution ABORT, unknown reason",ComputePlacement(task));
1578 DEBTRACE("task->disconnectService()");
1579 task->disconnectService();
1580 traceExec(task, "disconnectService",ComputePlacement(task));
1584 // Disconnect has failed
1585 std::cerr << "disconnect has failed" << std::endl;
1587 traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
1591 std::string placement(ComputePlacement(task));
1593 // container management for HomogeneousPoolOfContainer
1595 HomogeneousPoolContainer *contC(dynamic_cast<HomogeneousPoolContainer *>(task->getContainer()));
1598 std::lock_guard<std::mutex> alckCont(contC->getLocker());
1599 contC->release(task);
1605 void Executor::makeDatastreamConnections(Task *task)
1607 YACS::StatesForNode state=task->getState();
1608 if(state != YACS::TOLOAD && state != YACS::TORECONNECT)
1612 task->connectService();
1613 traceExec(task, "connectService",ComputePlacement(task));
1615 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1617 }//End of critical section
1619 catch(Exception& ex)
1621 std::cerr << ex.what() << std::endl;
1624 (task)->disconnectService();
1625 traceExec(task, "disconnectService",ComputePlacement(task));
1629 // Disconnect has failed
1630 traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
1633 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1635 _mainSched->notifyFrom(task,YACS::ABORT,this);
1636 }//End of critical section
1640 std::cerr << "Problem in connectService" << std::endl;
1643 (task)->disconnectService();
1644 traceExec(task, "disconnectService",ComputePlacement(task));
1648 // Disconnect has failed
1649 traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
1652 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1654 _mainSched->notifyFrom(task,YACS::ABORT,this);
1655 }//End of critical section
1657 if(task->getState() == YACS::ERROR)
1659 //try to put all coupled tasks in error
1660 std::set<Task*> coupledSet;
1661 task->getCoupledTasks(coupledSet);
1662 for (std::set<Task*>::iterator it = coupledSet.begin(); it != coupledSet.end(); ++it)
1665 if(t == task)continue;
1666 if(t->getState() == YACS::ERROR)continue;
1669 t->disconnectService();
1670 traceExec(t, "disconnectService",ComputePlacement(task));
1674 // Disconnect has failed
1675 traceExec(t, "disconnectService failed, ABORT",ComputePlacement(task));
1678 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1680 _mainSched->notifyFrom(t,YACS::ABORT,this);
1681 }//End of critical section
1682 traceExec(t, "state:"+Node::getStateName(t->getState()),ComputePlacement(task));
1685 traceExec(task, "state:"+Node::getStateName(task->getState()),ComputePlacement(task));
1688 void Executor::runWlm(Scheduler *graph,int debug, bool fromScratch)
1690 DEBTRACE("Executor::runWlm debug: "<< graph->getName() <<" "<< debug<<" fromScratch: "<<fromScratch);
1691 { // --- Critical section
1692 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1694 _root = dynamic_cast<ComposedNode *>(_mainSched);
1695 if (!_root) throw Exception("Executor::Run, Internal Error!");
1696 _executorState = YACS::NOTYETINITIALIZED;
1697 sendEvent("executor");
1700 _errorDetected = false;
1701 _isWaitingEventsFromRunningTasks = false;
1702 _numberOfRunningTasks = 0;
1703 _runningTasks.clear();
1704 _numberOfEndedTasks = 0;
1705 string tracefile = "traceExec_";
1706 tracefile += _mainSched->getName();
1707 _trace.open(tracefile.c_str());
1708 _start = std::chrono::steady_clock::now();
1709 } // --- End of critical section
1711 if (debug > 1) _displayDot(graph);
1718 graph->exUpdateState();
1720 catch(Exception& ex)
1722 DEBTRACE("exception: "<< (ex.what()));
1723 _executorState = YACS::FINISHED;
1724 sendEvent("executor");
1728 _executorState = YACS::INITIALISED;
1729 sendEvent("executor");
1731 if (debug > 1) _displayDot(graph);
1737 _executorState = YACS::RUNNING;
1738 sendEvent("executor");
1740 WorkloadManager::DefaultAlgorithm algo;
1741 WorkloadManager::WorkloadManager wlm(algo);
1742 WlmTask::loadResources(wlm);
1747 DEBTRACE("--- executor main loop");
1748 sleepWhileNoEventsFromAnyRunningTask();
1749 DEBTRACE("--- events...");
1750 if (debug > 2) _displayDot(graph);
1751 { // --- Critical section
1752 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1753 std::vector<Task *> readyTasks=graph->getNextTasks(isMore);
1754 graph->selectRunnableTasks(readyTasks);
1756 for(Task * t : readyTasks)
1757 if(_runningTasks.find(t) == _runningTasks.end())
1758 _tasks.push_back(t);
1759 // TODO: to be removed
1760 filterTasksConsideringContainers(_tasks);
1761 numberAllTasks=_numberOfRunningTasks+_tasks.size();
1762 } // --- End of critical section
1763 if (debug > 2) _displayDot(graph);
1764 DEBTRACE("--- events...");
1765 if (_executorState == YACS::RUNNING)
1767 if (checkBreakPoints()) break; // end of thread requested, OK to exit at once;
1768 for(Task * task : _tasks)
1771 WlmTask* newTask = new WlmTask(*this, task);
1772 wlm.addTask(newTask);
1775 if (debug > 1) _displayDot(graph);
1776 { // --- Critical section
1778 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1779 //It is possible that the graph is finished but it remains running tasks (it's an error but we must take it into account)
1780 _toContinue = !graph->isFinished();
1782 DEBTRACE("_numberOfRunningTasks: " << _numberOfRunningTasks);
1783 DEBTRACE("_numberOfEndedTasks: " << _numberOfEndedTasks);
1784 DEBTRACE("_toContinue: " << _toContinue);
1785 if(_toContinue && numberAllTasks==0)
1787 //Problem : no running tasks and no task to launch ??
1789 std::cerr << "Problem in Executor : no running tasks and no task to launch ?? problemCount=" << problemCount << std::endl;
1790 //Pause to give a chance to interrupt
1792 if(problemCount > 25)
1794 // Too much problems encountered : stop execution
1801 _executorState = YACS::FINISHED;
1802 sendEvent("executor");
1803 _condForPilot.notify_all();
1805 } // --- End of critical section
1806 if (debug > 0) _displayDot(graph);
1807 DEBTRACE("_toContinue: " << _toContinue);
1811 DEBTRACE("End of main Loop");
1813 { // --- Critical section
1814 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1815 if ( _toContinue) // --- break while(): request to stop detected on checkBreakPoints()
1817 DEBTRACE("stop requested: End soon");
1818 _executorState = YACS::STOPPED;
1819 _toContinue = false;
1820 sendEvent("executor");
1822 } // --- End of critical section
1823 if ( _dumpOnErrorRequested && _errorDetected)
1825 saveState(_dumpErrorFile);
1828 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForTrace);
1833 void Executor::RunW(Scheduler *graph,int debug, bool fromScratch)
1835 std::string str_value = graph->getProperty("executor");
1836 if(str_value == "WorkloadManager"
1837 || str_value == "WORKLOADMANAGER"
1838 || str_value == "workloadmanager"
1839 || str_value == "WorkLoadManager")
1840 runWlm(graph, debug, fromScratch);
1842 RunB(graph, debug, fromScratch);