1 // Copyright (C) 2006-2019 CEA/DEN, EDF R&D
3 // This library is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU Lesser General Public
5 // License as published by the Free Software Foundation; either
6 // version 2.1 of the License, or (at your option) any later version.
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 // Lesser General Public License for more details.
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
20 #include "Executor.hxx"
22 #include "AutoLocker.hxx"
23 #include "Scheduler.hxx"
24 #include "Dispatcher.hxx"
25 #include "Container.hxx"
26 #include "HomogeneousPoolContainer.hxx"
27 #include "ComponentInstance.hxx"
29 #include "VisitorSaveState.hxx"
30 #include "ServiceNode.hxx"
31 #include "ComposedNode.hxx"
33 #include "workloadmanager/WorkloadManager.hxx"
34 #include "workloadmanager/DefaultAlgorithm.hxx"
48 #define usleep(A) _sleep(A/1000)
49 #if !defined(S_ISCHR) || !defined(S_ISREG)
52 # define S_IFMT _S_IFMT
53 # define S_IFCHR _S_IFCHR
54 # define S_IFREG _S_IFREG
57 # define S_IFMT __S_IFMT
58 # define S_IFCHR __S_IFCHR
59 # define S_IFREG __S_IFREG
63 # define S_ISCHR(mode) (((mode) & S_IFMT) == S_IFCHR)
64 # define S_ISREG(mode) (((mode) & S_IFMT) == S_IFREG)
68 using namespace YACS::ENGINE;
71 using YACS::BASES::Mutex;
72 using YACS::BASES::Thread;
73 using YACS::BASES::Semaphore;
76 #include "YacsTrace.hxx"
78 int Executor::_maxThreads(1000);
79 size_t Executor::_threadStackSize(262144); // Default thread stack size is 256 kB == 2**18 because threads launched by YACS are lightweight
81 Executor::Executor():_mainSched(NULL),_isWaitingEventsFromRunningTasks(false),_nbOfConcurrentThreads(0), _semForMaxThreads(_maxThreads),_keepGoingOnFail(false),_DPLScopeSensitive(false)
86 _stopOnErrorRequested = false;
87 _dumpOnErrorRequested = false;
88 _errorDetected = false;
89 _isRunningunderExternalControl=false;
90 _executorState = YACS::NOTYETINITIALIZED;
91 _execMode = YACS::CONTINUE;
92 _semThreadCnt = _maxThreads;
93 _numberOfRunningTasks = 0;
94 _numberOfEndedTasks = 0;
95 DEBTRACE("Executor initialized with max threads = " << _maxThreads);
100 for(list<Thread *>::iterator iter=_groupOfAllThreadsCreated.begin();iter!=_groupOfAllThreadsCreated.end();iter++)
104 //! Execute a graph waiting for completion
106 * \param graph : schema to execute
107 * \param debug : display the graph with dot if debug == 1
108 * \param fromScratch : if true the graph is reinitialized
110 * Calls Scheduler::getNextTasks and Scheduler::selectRunnableTasks to select tasks to execute
112 * Calls Executor::launchTask to execute a selected Task.
114 * Completion when graph is finished (Scheduler::isFinished)
117 void Executor::RunA(Scheduler *graph,int debug, bool fromScratch)
119 DEBTRACE("Executor::RunW debug: " << debug << " fromScratch: " << fromScratch);
121 _root = dynamic_cast<ComposedNode *>(_mainSched);
122 if (!_root) throw Exception("Executor::Run, Internal Error!");
125 if(debug>1)_displayDot(graph);
129 graph->exUpdateState();
131 if(debug>1)_displayDot(graph);
132 vector<Task *> tasks;
133 vector<Task *>::iterator iter;
135 _execMode = YACS::CONTINUE;
136 _isWaitingEventsFromRunningTasks = false;
137 _numberOfRunningTasks = 0;
138 _runningTasks.clear();
139 _numberOfEndedTasks=0;
142 sleepWhileNoEventsFromAnyRunningTask();
144 if(debug>2)_displayDot(graph);
147 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
148 tasks=graph->getNextTasks(isMore);
149 graph->selectRunnableTasks(tasks);
150 }//End of critical section
152 if(debug>2)_displayDot(graph);
154 for(iter=tasks.begin();iter!=tasks.end();iter++)
155 loadTask(*iter,this);
157 if(debug>1)_displayDot(graph);
161 if(debug>1)_displayDot(graph);
164 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
165 _toContinue=!graph->isFinished();
166 }//End of critical section
167 DEBTRACE("_toContinue: " << _toContinue);
169 if(debug>0)_displayDot(graph);
175 //! Execute a graph with breakpoints or step by step
177 * To be launch in a thread (main thread controls the progression).
178 * \param graph : schema to execute
179 * \param debug : display the graph with dot if debug >0
180 * \param fromScratch : if false, state from a previous partial exection is already loaded
182 * Calls Scheduler::getNextTasks and Scheduler::selectRunnableTasks to select tasks to execute
184 * Calls Executor::checkBreakPoints to verify if a pause is requested
186 * Calls Executor::launchTask to execute a selected Task
188 * Completion when graph is finished (Scheduler::isFinished)
190 * States of execution:
191 * - YACS::NOTYETINITIALIZED
192 * - YACS::INITIALISED
193 * - YACS::RUNNING (to next breakpoint or step)
194 * - YACS::WAITINGTASKS (a breakpoint or step as been reached, but there are still running tasks)
195 * - YACS::PAUSED (a breakpoint or step as been reached, no more running tasks)
196 * - YACS::FINISHED (no more ready tasks, nore running tasks)
197 * - YACS::STOPPED (stopped by user before end)
199 * Modes of Execution:
200 * - YACS::CONTINUE (normal run without breakpoints)
201 * - YACS::STEPBYSTEP (pause at each loop)
202 * - YACS::STOPBEFORENODES (pause when a node is reached)
204 * A breakpoint is defined by a node name. The breakpoint is reached when the node becomes ready.
205 * Step by Step means execution node by node or group of node by group of nodes.
206 * At a given step, the user decides to launch all the ready nodes or only a subset
207 * (Caution: some nodes must run in parallel).
208 * The next event (end of task) may give a new set of ready nodes, and define a new step.
210 * The graph execution may be controled by a pilot which sends requests. Requests are asynchronous.
211 * Requests are taken into account only on certain states, otherwise return the status IgnoredRequest.
212 * - Executor::getCurrentExecMode
213 * - Executor::getExecutorState
214 * - Executor::setExecMode : change the execution mode for next loop
215 * - Executor::setListOfBreakPoints : must be set before setting YACS::STOPBEFORENODES
216 * - Executor::getTasksToLoad : when paused or waiting tasks, get the list of next tasks
217 * - Executor::setStepsToExecute : define a subset of the list given by Executor::getTasksToLoad
218 * - Executor::resumeCurrentBreakPoint : when paused or waiting tasks, resumes execution
219 * - Executor::isNotFinished
220 * - Executor::stopExecution : stop execution asap, i.e. set STEPBYSTEP and wait PAUSED
221 * - Executor::saveState : dump the current state of execution in an xml file
222 * - Executor::loadState : Not yet implemented
223 * - Executor::getNbOfThreads
224 * - Executor::displayDot
225 * - Executor::setStopOnError : ask to stop execution if a node is found in ERROR state
227 * If the pilot wants to wait the state YACS::PAUSED or YACS::WAITINGTASKS, synchronisation is obtained with:
228 * - Executor::waitPause
231 * - Pilot may connect to executor during execution, or deconnect.
232 * - Several Pilots may be connected at the same time (for observation...)
236 void Executor::RunB(Scheduler *graph,int debug, bool fromScratch)
238 DEBTRACE("Executor::RunB debug: "<< graph->getName() <<" "<< debug<<" fromScratch: "<<fromScratch);
240 { // --- Critical section
241 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
243 _root = dynamic_cast<ComposedNode *>(_mainSched);
244 if (!_root) throw Exception("Executor::Run, Internal Error!");
245 _executorState = YACS::NOTYETINITIALIZED;
246 sendEvent("executor");
249 _errorDetected = false;
250 _isWaitingEventsFromRunningTasks = false;
251 _numberOfRunningTasks = 0;
252 _runningTasks.clear();
253 _numberOfEndedTasks = 0;
254 string tracefile = "traceExec_";
255 tracefile += _mainSched->getName();
256 _trace.open(tracefile.c_str());
257 _start = std::chrono::steady_clock::now();
258 } // --- End of critical section
260 if (debug > 1) _displayDot(graph);
267 graph->exUpdateState();
271 DEBTRACE("exception: "<< (ex.what()));
272 _executorState = YACS::FINISHED;
273 sendEvent("executor");
277 _executorState = YACS::INITIALISED;
278 sendEvent("executor");
280 if (debug > 1) _displayDot(graph);
282 vector<Task *>::iterator iter;
287 _executorState = YACS::RUNNING;
288 sendEvent("executor");
291 DEBTRACE("--- executor main loop");
292 sleepWhileNoEventsFromAnyRunningTask();
293 DEBTRACE("--- events...");
294 if (debug > 2) _displayDot(graph);
295 { // --- Critical section
296 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
297 _tasks=graph->getNextTasks(isMore);
298 graph->selectRunnableTasks(_tasks);
299 FilterTasksConsideringContainers(_tasks);
300 numberAllTasks=_numberOfRunningTasks+_tasks.size();
301 } // --- End of critical section
302 if (debug > 2) _displayDot(graph);
303 if (_executorState == YACS::RUNNING)
305 if (checkBreakPoints()) break; // end of thread requested, OK to exit at once;
306 if (debug > 0) _displayDot(graph);
308 loadParallelTasks(_tasks,this);
309 if (debug > 1) _displayDot(graph);
314 if (debug > 1) _displayDot(graph);
315 { // --- Critical section
317 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
318 //It is possible that the graph is finished but it remains running tasks (it's an error but we must take it into account)
319 if(_numberOfRunningTasks == 0)
320 _toContinue = !graph->isFinished();
322 DEBTRACE("_numberOfRunningTasks: " << _numberOfRunningTasks);
323 DEBTRACE("_numberOfEndedTasks: " << _numberOfEndedTasks);
324 DEBTRACE("_toContinue: " << _toContinue);
325 if(_toContinue && numberAllTasks==0)
327 //Problem : no running tasks and no task to launch ??
329 std::cerr << "Problem in Executor : no running tasks and no task to launch ?? problemCount=" << problemCount << std::endl;
330 //Pause to give a chance to interrupt
332 if(problemCount > 25)
334 // Too much problems encountered : stop execution
341 _executorState = YACS::FINISHED;
342 sendEvent("executor");
343 _condForPilot.notify_all();
345 } // --- End of critical section
346 if (debug > 0) _displayDot(graph);
347 DEBTRACE("_toContinue: " << _toContinue);
350 DEBTRACE("End of main Loop");
352 { // --- Critical section
353 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
354 if ( _toContinue) // --- break while(): request to stop detected on checkBreakPoints()
356 DEBTRACE("stop requested: End soon");
357 _executorState = YACS::STOPPED;
359 sendEvent("executor");
361 } // --- End of critical section
362 if ( _dumpOnErrorRequested && _errorDetected)
364 saveState(_dumpErrorFile);
367 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForTrace);
370 DEBTRACE("End of RunB thread");
373 YACS::ExecutionMode Executor::getCurrentExecMode()
375 _isRunningunderExternalControl=true;
380 YACS::ExecutorState Executor::getExecutorState()
382 _isRunningunderExternalControl=true;
383 return _executorState;
387 bool Executor::isNotFinished()
389 _isRunningunderExternalControl=true;
393 //! ask to stop execution on the first node found in error
395 * \param dumpRequested produce a state dump when an error is found
396 * \param xmlFile name of file used for state dump
399 void Executor::setStopOnError(bool dumpRequested, std::string xmlFile)
401 { // --- Critical section
402 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
403 _dumpErrorFile=xmlFile;
404 _stopOnErrorRequested=true;
405 _dumpOnErrorRequested = dumpRequested;
406 if (dumpRequested && xmlFile.empty())
407 throw YACS::Exception("dump on error requested and no filename given for dump");
408 DEBTRACE("_dumpErrorFile " << _dumpErrorFile << " " << _dumpOnErrorRequested);
409 } // --- End of critical section
412 //! ask to do not stop execution on nodes found in error
416 void Executor::unsetStopOnError()
418 { // --- Critical section
419 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
420 _stopOnErrorRequested=false;
421 } // --- End of critical section
424 //! Dynamically set the current mode of execution
426 * The mode can be Continue, step by step, or stop before execution of a node
427 * defined in a list of breakpoints.
430 void Executor::setExecMode(YACS::ExecutionMode mode)
432 DEBTRACE("Executor::setExecMode(YACS::ExecutionMode mode) " << mode);
433 { // --- Critical section
434 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
435 _isRunningunderExternalControl=true;
437 } // --- End of critical section
440 //! wake up executor when in pause
442 * When Executor is in state paused or waiting for task completion, the thread
443 * running loop RunB waits on condition _condForStepByStep.
444 * Thread RunB is waken up.
445 * \return true when actually wakes up executor
448 bool Executor::resumeCurrentBreakPoint()
450 DEBTRACE("Executor::resumeCurrentBreakPoint()");
452 //bool doDump = false;
453 { // --- Critical section
454 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
455 _isRunningunderExternalControl=true;
456 DEBTRACE("_executorState: " << _executorState);
457 switch (_executorState)
459 case YACS::WAITINGTASKS:
462 _condForStepByStep.notify_all();
463 _executorState = YACS::RUNNING;
464 sendEvent("executor");
466 //if (_dumpOnErrorRequested && _errorDetected) doDump =true;
472 //if (_dumpOnErrorRequested && _errorDetected) doDump =true;
473 DEBTRACE("Graph Execution finished or stopped !");
478 // debug: no easy way to verify if main loop is acutally waiting on condition
482 //if (doDump) saveState(_dumpErrorFile);
483 } // --- End of critical section
488 //! define a list of nodes names as breakpoints in the graph
491 void Executor::setListOfBreakPoints(std::list<std::string> listOfBreakPoints)
493 DEBTRACE("Executor::setListOfBreakPoints(std::list<std::string> listOfBreakPoints)");
494 { // --- Critical section
495 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
496 _isRunningunderExternalControl=true;
497 _listOfBreakPoints = listOfBreakPoints;
498 } // --- End of critical section
502 //! Get the list of tasks to load, to define a subset to execute in step by step mode
504 * If the executor is not in mode YACS::WAITINGTASKS nor YACS::PAUSED, the list is empty.
505 * Use Executor::waitPause to wait.
507 std::list<std::string> Executor::getTasksToLoad()
509 DEBTRACE("Executor::getTasksToLoad()");
510 list<string> listOfNodesToLoad;
511 listOfNodesToLoad.clear();
512 { // --- Critical section
513 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
514 _isRunningunderExternalControl=true;
515 switch (_executorState)
517 case YACS::WAITINGTASKS:
520 listOfNodesToLoad = _listOfTasksToLoad;
523 case YACS::NOTYETINITIALIZED:
524 case YACS::INITIALISED:
533 } // --- End of critical section
534 return listOfNodesToLoad;
538 //! Define a subset of task to execute in step by step mode
540 * Behaviour is unpredictable if the list is not a subset of the list given by Executor::getTasksToLoad
541 * in the current step.
542 * If some nodes must run in parallel, they must stay together in the list.
545 bool Executor::setStepsToExecute(std::list<std::string> listToExecute)
547 DEBTRACE("Executor::setStepsToExecute(std::list<std::string> listToExecute)");
549 vector<Task *>::iterator iter;
550 vector<Task *> restrictedTasks;
551 { // --- Critical section
552 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
553 _isRunningunderExternalControl=true;
554 switch (_executorState)
556 case YACS::WAITINGTASKS:
559 for (iter=_tasksSave.begin(); iter!=_tasksSave.end(); iter++)
561 string readyNode = _mainSched->getTaskName(*iter);
562 if (find(listToExecute.begin(), listToExecute.end(), readyNode)
563 != listToExecute.end())
565 restrictedTasks.push_back(*iter);
566 DEBTRACE("node to execute " << readyNode);
570 for (iter=restrictedTasks.begin(); iter!=restrictedTasks.end(); iter++)
572 _tasks.push_back(*iter);
576 case YACS::NOTYETINITIALIZED:
577 case YACS::INITIALISED:
586 } // --- End of critical section
589 for (iter=restrictedTasks.begin(); iter!=restrictedTasks.end(); iter++)
591 _tasks.push_back(*iter);
593 for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
595 string readyNode = _mainSched->getTaskName(*iter);
596 DEBTRACE("selected node to execute " << readyNode);
602 //! suspend pilot execution until Executor is in pause or waiting tasks completion mode.
604 * Do nothing if execution is finished or in pause.
605 * Wait first step if Executor is running or in initialization.
608 void Executor::waitPause()
610 DEBTRACE("Executor::waitPause()" << _executorState);
611 { // --- Critical section
612 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
613 _isRunningunderExternalControl=true;
614 switch (_executorState)
619 case YACS::WAITINGTASKS:
624 case YACS::NOTYETINITIALIZED:
625 case YACS::INITIALISED:
628 _condForPilot.wait(_mutexForSchedulerUpdate); // wait until executor is PAUSED or WAITINGTASKS
632 } // --- End of critical section
637 * This method can be called at any time simultaneously during a RunB call.
638 * This method will wait until the executor is locked in a consistent state of a running graph.
640 * This method is expected to be called in association with resume method.
641 * The returned parameter is expected to be transfered to resume method.
643 bool Executor::suspendASAP()
645 // no AutoLocker here. It's not a bug.
646 _mutexForSchedulerUpdate.lock();
647 if(!_toContinue && _executorState==YACS::FINISHED)
648 {// execution is finished
649 _mutexForSchedulerUpdate.unLock();
650 return false;// the executor is no more running
652 //general case. Leave method with locker in locked status
657 * This method is expected to be called in association with suspendASAP method.
658 * Expected to be called just after suspendASAP with output of resume as input parameter
660 void Executor::resume(bool suspended)
663 _mutexForSchedulerUpdate.unLock();
666 //! stops the execution as soon as possible
668 void Executor::stopExecution()
670 setExecMode(YACS::STEPBYSTEP);
673 resumeCurrentBreakPoint();
676 //! save the current state of execution in an xml file
678 bool Executor::saveState(const std::string& xmlFile)
680 DEBTRACE("Executor::saveState() in " << xmlFile);
683 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
684 YACS::ENGINE::VisitorSaveState vst(_root);
685 vst.openFileDump(xmlFile.c_str());
690 catch(Exception& ex) {
691 std::cerr << ex.what() << std::endl;
696 //! not yet implemented
698 bool Executor::loadState()
700 DEBTRACE("Executor::loadState()");
701 _isRunningunderExternalControl=true;
706 static int isfile(const char *filename)
709 if (stat(filename, &buf) != 0)
711 if (!S_ISREG(buf.st_mode))
716 //! Display the graph state as a dot display, public method
718 void Executor::displayDot(Scheduler *graph)
720 _isRunningunderExternalControl=true;
724 //! Display the graph state as a dot display
726 * \param graph : the node to display
729 void Executor::_displayDot(Scheduler *graph)
731 std::ofstream g("titi");
732 ((ComposedNode*)graph)->writeDot(g);
734 const char displayScript[]="display.sh";
735 if(isfile(displayScript))
736 system("sh display.sh");
738 system("dot -Tpng titi|display -delay 5");
741 //! Wait reactivation in modes Step By step or with BreakPoints
743 * Check mode of execution (set by main thread):
744 * - YACS::CONTINUE : the graph execution continues.
745 * - YACS::STEPBYSTEP : wait on condition (pilot thread, Executor::resumeCurrentBreakPoint)
746 * - YACS::STOPBEFORENODES : if there are ready nodes in a list of breakpoints,
747 * wait on condition (pilot thread, Executor::resumeCurrentBreakPoint)
748 * else continue the graph execution.
749 * \return true if end of executor thread is requested
752 bool Executor::checkBreakPoints()
754 DEBTRACE("Executor::checkBreakPoints()");
755 vector<Task *>::iterator iter;
756 bool endRequested = false;
764 case YACS::STOPBEFORENODES:
767 { // --- Critical section
768 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
770 for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
772 string nodeToLoad = _mainSched->getTaskName(*iter);
773 if (find(_listOfBreakPoints.begin(), _listOfBreakPoints.end(), nodeToLoad)
774 != _listOfBreakPoints.end())
782 _listOfTasksToLoad.clear();
783 for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
785 string nodeToLoad = _mainSched->getTaskName(*iter);
786 _listOfTasksToLoad.push_back(nodeToLoad);
788 if (getNbOfThreads())
789 _executorState = YACS::WAITINGTASKS; // will be paused after completion of running tasks
791 _executorState = YACS::PAUSED;
792 sendEvent("executor");
793 _condForPilot.notify_all();
795 if (stop && !_isOKToEnd) waitResume(); // wait until pilot calls resumeCurrentBreakPoint(), mutex released during wait
796 if (_isOKToEnd) endRequested = true;
797 } // --- End of critical section
798 if (stop) DEBTRACE("wake up from waitResume");
802 case YACS::STEPBYSTEP:
804 { // --- Critical section
805 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
807 _listOfTasksToLoad.clear();
808 for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
810 string nodeToLoad = _mainSched->getTaskName(*iter);
811 _listOfTasksToLoad.push_back(nodeToLoad);
813 if (getNbOfThreads())
814 _executorState = YACS::WAITINGTASKS; // will be paused after completion of running tasks
816 _executorState = YACS::PAUSED;
817 sendEvent("executor");
818 _condForPilot.notify_all();
820 waitResume(); // wait until pilot calls resumeCurrentBreakPoint(), mutex released during wait
821 // or, if no pilot, wait until no more running tasks (stop on error)
822 if (_isOKToEnd) endRequested = true;
823 } // --- End of critical section
824 DEBTRACE("wake up from waitResume");
828 DEBTRACE("endRequested: " << endRequested);
833 //! in modes Step By step or with BreakPoint, wait until pilot resumes the execution
835 * With the condition Mutex, the mutex is released atomically during the wait.
836 * Pilot calls Executor::resumeCurrentBreakPoint to resume execution.
837 * Must be called while mutex is locked.
840 void Executor::waitResume()
842 DEBTRACE("Executor::waitResume()");
843 _condForStepByStep.wait(_mutexForSchedulerUpdate); // wait until pilot calls resumeCurrentBreakPoint()
848 //! Perform loading of a Task.
850 * \param task : Task to load
853 void Executor::loadTask(Task *task, const Executor *execInst)
855 DEBTRACE("Executor::loadTask(Task *task)");
856 if(task->getState() != YACS::TOLOAD)
858 traceExec(task, "state:TOLOAD", ComputePlacement(task));
860 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
861 _mainSched->notifyFrom(task,YACS::START,execInst);
862 }//End of critical section
865 traceExec(task, "load", ComputePlacement(task));
867 traceExec(task, "initService", ComputePlacement(task));
872 std::cerr << ex.what() << std::endl;
874 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
876 _mainSched->notifyFrom(task,YACS::ABORT,execInst);
877 traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
878 }//End of critical section
882 std::cerr << "Load failed" << std::endl;
884 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
886 _mainSched->notifyFrom(task,YACS::ABORT,execInst);
887 traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
888 }//End of critical section
899 void Executor::loadParallelTasks(const std::vector<Task *>& tasks, const Executor *execInst)
901 std::vector<Thread> ths(tasks.size());
902 std::size_t ithread(0);
903 for(std::vector<Task *>::const_iterator iter = _tasks.begin(); iter != _tasks.end(); iter++, ithread++)
905 DEBTRACE("Executor::loadParallelTasks(Task *task)");
906 struct threadargs *args(new threadargs);
907 args->task = (*iter);
908 args->sched = _mainSched;
909 args->execInst = this;
910 ths[ithread].go(functionForTaskLoad, args, _threadStackSize);
912 for(ithread=0;ithread<tasks.size();ithread++)
916 //! Execute a list of tasks possibly connected through datastream links
918 * \param tasks : a list of tasks to execute
921 void Executor::launchTasks(const std::vector<Task *>& tasks)
923 //First phase, make datastream connections
924 for(vector<Task *>::const_iterator iter=tasks.begin();iter!=tasks.end();iter++)
926 YACS::StatesForNode state=(*iter)->getState();
927 if(state != YACS::TOLOAD && state != YACS::TORECONNECT)continue;
930 (*iter)->connectService();
931 traceExec(*iter, "connectService",ComputePlacement(*iter));
933 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
934 (*iter)->connected();
935 }//End of critical section
939 std::cerr << ex.what() << std::endl;
942 (*iter)->disconnectService();
943 traceExec(*iter, "disconnectService",ComputePlacement(*iter));
947 // Disconnect has failed
948 traceExec(*iter, "disconnectService failed, ABORT",ComputePlacement(*iter));
951 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
953 _mainSched->notifyFrom(*iter,YACS::ABORT,this);
954 }//End of critical section
958 std::cerr << "Problem in connectService" << std::endl;
961 (*iter)->disconnectService();
962 traceExec(*iter, "disconnectService",ComputePlacement(*iter));
966 // Disconnect has failed
967 traceExec(*iter, "disconnectService failed, ABORT",ComputePlacement(*iter));
970 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
972 _mainSched->notifyFrom(*iter,YACS::ABORT,this);
973 }//End of critical section
975 if((*iter)->getState() == YACS::ERROR)
977 //try to put all coupled tasks in error
978 std::set<Task*> coupledSet;
979 (*iter)->getCoupledTasks(coupledSet);
980 for (std::set<Task*>::iterator it = coupledSet.begin(); it != coupledSet.end(); ++it)
983 if(t == *iter)continue;
984 if(t->getState() == YACS::ERROR)continue;
987 t->disconnectService();
988 traceExec(t, "disconnectService",ComputePlacement(*iter));
992 // Disconnect has failed
993 traceExec(t, "disconnectService failed, ABORT",ComputePlacement(*iter));
996 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
998 _mainSched->notifyFrom(t,YACS::ABORT,this);
999 }//End of critical section
1000 traceExec(t, "state:"+Node::getStateName(t->getState()),ComputePlacement(*iter));
1003 traceExec(*iter, "state:"+Node::getStateName((*iter)->getState()),ComputePlacement(*iter));
1006 //Second phase, execute each task in a thread
1007 for(vector<Task *>::const_iterator iter=tasks.begin();iter!=tasks.end();iter++)
1013 //! Execute a Task in a thread
1015 * \param task : Task to execute
1017 * Calls Scheduler::notifyFrom of main node (_mainSched) to notify start
1019 * Calls Executor::functionForTaskExecution in Thread
1022 void Executor::launchTask(Task *task)
1024 DEBTRACE("Executor::launchTask(Task *task)");
1025 struct threadargs *args;
1026 if(task->getState() != YACS::TOACTIVATE)return;
1028 DEBTRACE("before _semForMaxThreads.wait " << _semThreadCnt);
1029 if(_semThreadCnt == 0)
1031 // --- Critical section
1032 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1033 //check if we have enough threads to run
1034 std::set<Task*> tmpSet=_runningTasks;
1035 std::set<Task*>::iterator it = tmpSet.begin();
1036 std::string status="running";
1037 std::set<Task*> coupledSet;
1038 while( it != tmpSet.end() )
1042 tt->getCoupledTasks(coupledSet);
1044 for (std::set<Task*>::iterator iter = coupledSet.begin(); iter != coupledSet.end(); ++iter)
1046 if((*iter)->getState() == YACS::TOACTIVATE)status="toactivate";
1047 tmpSet.erase(*iter);
1049 if(status=="running")break;
1050 it = tmpSet.begin();
1053 if(status=="toactivate")
1055 std::cerr << "WARNING: maybe you need more threads to run your schema (current value="<< _maxThreads << ")" << std::endl;
1056 std::cerr << "If it is the case, set the YACS_MAX_THREADS environment variable to a bigger value (export YACS_MAX_THREADS=xxx)" << std::endl;
1058 // --- End of critical section
1061 _semForMaxThreads.wait();
1064 args= new threadargs;
1066 args->sched = _mainSched;
1067 args->execInst = this;
1069 traceExec(task, "launch",ComputePlacement(task));
1071 { // --- Critical section
1072 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1073 _numberOfRunningTasks++;
1074 _runningTasks.insert(task);
1075 task->begin(); //change state to ACTIVATED
1076 } // --- End of critical section
1077 Thread(functionForTaskExecution, args, _threadStackSize);
1080 //! wait until a running task ends
1082 void Executor::sleepWhileNoEventsFromAnyRunningTask()
1084 DEBTRACE("Executor::sleepWhileNoEventsFromAnyRunningTask()");
1085 // _semForNewTasksToPerform.wait(); //----utiliser pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
1086 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1087 if (_numberOfRunningTasks > 0 && _numberOfEndedTasks==0)
1089 _isWaitingEventsFromRunningTasks = true;
1090 _condForNewTasksToPerform.wait(_mutexForSchedulerUpdate); // mutex released during wait
1092 _numberOfEndedTasks=0;
1098 void Executor::notifyEndOfThread(YACS::BASES::Thread *thread)
1100 /*_mutexForNbOfConcurrentThreads.lock();
1101 _groupOfAllThreadsCreated.remove(thread);
1103 _mutexForNbOfConcurrentThreads.unlock();*/
1107 //! must be used protected by _mutexForSchedulerUpdate!
1109 void Executor::wakeUp()
1111 DEBTRACE("Executor::wakeUp() " << _isWaitingEventsFromRunningTasks);
1112 if (_isWaitingEventsFromRunningTasks)
1114 _isWaitingEventsFromRunningTasks = false;
1115 _condForNewTasksToPerform.notify_all();
1118 _numberOfEndedTasks++;
1121 //! number of running tasks
1123 int Executor::getNbOfThreads()
1126 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForNbOfConcurrentThreads);
1127 _isRunningunderExternalControl=true;
1128 ret = _groupOfAllThreadsCreated.size();
1133 * This thread is NOT supposed to be detached !
1135 void *Executor::functionForTaskLoad(void *arg)
1137 DEBTRACE("Executor::functionForTaskLoad(void *arg)");
1138 struct threadargs *args = (struct threadargs *) arg;
1139 Task *task=args->task;
1140 Scheduler *sched=args->sched;
1141 Executor *execInst=args->execInst;
1143 execInst->loadTask(task,execInst);// no throw of this method - all throw are catched !
1147 //! Function to perform execution of a task in a thread
1149 * \param arg : 3 elements (a Task, a Scheduler, an Executor)
1151 * Calls Task::execute
1153 * Calls Task::finished when the task is finished
1155 * Calls (notify with event YACS::FINISH) Scheduler::notifyFrom when the task is finished
1157 * Calls Executor::wakeUp and Executor::notifyEndOfThread
1160 void *Executor::functionForTaskExecution(void *arg)
1162 DEBTRACE("Executor::functionForTaskExecution(void *arg)");
1164 struct threadargs *args = (struct threadargs *) arg;
1165 Task *task=args->task;
1166 Scheduler *sched=args->sched;
1167 Executor *execInst=args->execInst;
1169 execInst->traceExec(task, "state:"+Node::getStateName(task->getState()),ComputePlacement(task));
1175 if(execInst->getDPLScopeSensitive())
1177 Node *node(dynamic_cast<Node *>(task));
1178 ComposedNode *gfn(dynamic_cast<ComposedNode *>(sched));
1179 if(node!=0 && gfn!=0)
1180 node->applyDPLScope(gfn);
1183 YACS::Event ev=YACS::FINISH;
1186 execInst->traceExec(task, "start execution",ComputePlacement(task));
1188 execInst->traceExec(task, "end execution OK",ComputePlacement(task));
1190 catch(Exception& ex)
1192 std::cerr << "YACS Exception during execute" << std::endl;
1193 std::cerr << ex.what() << std::endl;
1195 string message = "end execution ABORT, ";
1196 message += ex.what();
1197 execInst->traceExec(task, message,ComputePlacement(task));
1201 // Execution has failed
1202 std::cerr << "Execution has failed: unknown reason" << std::endl;
1204 execInst->traceExec(task, "end execution ABORT, unknown reason",ComputePlacement(task));
1210 DEBTRACE("task->disconnectService()");
1211 task->disconnectService();
1212 execInst->traceExec(task, "disconnectService",ComputePlacement(task));
1216 // Disconnect has failed
1217 std::cerr << "disconnect has failed" << std::endl;
1219 execInst->traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
1223 std::string placement(ComputePlacement(task));
1225 // container management for HomogeneousPoolOfContainer
1227 HomogeneousPoolContainer *contC(dynamic_cast<HomogeneousPoolContainer *>(task->getContainer()));
1230 std::lock_guard<std::mutex> alckCont(contC->getLocker());
1231 contC->release(task);
1234 DEBTRACE("End task->execute()");
1235 { // --- Critical section
1236 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&execInst->_mutexForSchedulerUpdate);
1239 if (ev == YACS::FINISH) task->finished();
1240 if (ev == YACS::ABORT)
1242 execInst->_errorDetected = true;
1243 if (execInst->_stopOnErrorRequested)
1245 execInst->_execMode = YACS::STEPBYSTEP;
1246 execInst->_isOKToEnd = true;
1250 execInst->traceExec(task, "state:"+Node::getStateName(task->getState()),placement);
1251 sched->notifyFrom(task,ev,execInst);
1253 catch(Exception& ex)
1255 //notify has failed : it is supposed to have set state
1256 //so no need to do anything
1257 std::cerr << "Error during notification" << std::endl;
1258 std::cerr << ex.what() << std::endl;
1262 //notify has failed : it is supposed to have set state
1263 //so no need to do anything
1264 std::cerr << "Notification failed" << std::endl;
1266 execInst->_numberOfRunningTasks--;
1267 execInst->_runningTasks.erase(task);
1268 DEBTRACE("_numberOfRunningTasks: " << execInst->_numberOfRunningTasks
1269 << " _execMode: " << execInst->_execMode
1270 << " _executorState: " << execInst->_executorState);
1271 if ((execInst->_numberOfRunningTasks == 0) && (execInst->_execMode != YACS::CONTINUE)) // no more running tasks
1273 if (execInst->_executorState == YACS::WAITINGTASKS)
1275 execInst->_executorState = YACS::PAUSED;
1276 execInst->sendEvent("executor");
1277 execInst->_condForPilot.notify_all();
1278 if (execInst->_errorDetected &&
1279 execInst->_stopOnErrorRequested &&
1280 !execInst->_isRunningunderExternalControl)
1281 execInst->_condForStepByStep.notify_all(); // exec thread may be on waitResume
1284 DEBTRACE("before _semForMaxThreads.post " << execInst->_semThreadCnt);
1285 execInst->_semForMaxThreads.post();
1286 execInst->_semThreadCnt += 1;
1287 DEBTRACE("after _semForMaxThreads.post " << execInst->_semThreadCnt);
1288 if (execInst->_executorState != YACS::PAUSED) execInst->wakeUp();
1290 } // --- End of critical section (change state)
1292 //execInst->notifyEndOfThread(0);
1297 void Executor::traceExec(Task *task, const std::string& message, const std::string& placement)
1299 string nodeName = _mainSched->getTaskName(task);
1300 Container *cont = task->getContainer();
1301 string containerName = "---";
1303 containerName = cont->getName();
1305 std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now();
1306 std::chrono::milliseconds millisec;
1307 millisec = std::chrono::duration_cast<std::chrono::milliseconds>(now -_start);
1308 double elapse = double(millisec.count()) / 1000.0;
1310 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForTrace);
1311 _trace << elapse << " " << containerName << " " << placement << " " << nodeName << " " << message << endl;
1316 //! emit notification to all observers registered with the dispatcher
1318 * The dispatcher is unique and can be obtained by getDispatcher()
1320 void Executor::sendEvent(const std::string& event)
1322 Dispatcher* disp=Dispatcher::getDispatcher();
1325 disp->dispatch(_root,event);
1330 bool operator()(HomogeneousPoolContainer * lhs, HomogeneousPoolContainer * rhs) const
1338 return lhs->getNumberOfCoresPerWorker() < rhs->getNumberOfCoresPerWorker();
1343 * This method takes in input a list of tasks and selects from that lists a part of it considering only the containers.
1344 * If tasks have no container instance subclass of HomogeneousPoolContainer this method will let the \a tsks untouched.
1346 * \param [in,out] tsks - list of tasks to be
1348 void Executor::FilterTasksConsideringContainers(std::vector<Task *>& tsks)
1350 std::map<HomogeneousPoolContainer *, std::vector<Task *>, HPCCompare > m;
1351 for(auto cur : tsks)
1355 Container *cont(cur->getContainer());
1358 m[nullptr].push_back(cur);
1361 HomogeneousPoolContainer *contC(dynamic_cast<HomogeneousPoolContainer *>(cont));
1364 m[nullptr].push_back(cur);
1367 m[contC].push_back(cur);
1370 std::vector<Task *> ret;
1373 HomogeneousPoolContainer *curhpc(it.first);
1374 const std::vector<Task *>& curtsks(it.second);
1377 ret.insert(ret.end(),curtsks.begin(),curtsks.end());
1381 // start of critical section for container curhpc
1382 std::lock_guard<std::mutex> alckCont(curhpc->getLocker());
1383 std::vector<const Task *> vecOfTaskSharingSameHPContToBeRunSimutaneously;
1384 std::size_t sz(curhpc->getNumberOfFreePlace());
1385 std::vector<Task *>::const_iterator it2(curtsks.begin());
1386 for(std::size_t i=0;i<sz && it2!=curtsks.end();i++,it2++)
1388 vecOfTaskSharingSameHPContToBeRunSimutaneously.push_back(*it2);
1389 ret.push_back(*it2);
1391 curhpc->allocateFor(vecOfTaskSharingSameHPContToBeRunSimutaneously);
1392 //end of critical section
1399 std::string Executor::ComputePlacement(Task *zeTask)
1401 std::string placement("---");
1404 if(zeTask->getContainer())
1405 placement=zeTask->getContainer()->getFullPlacementId(zeTask);
1409 ///////// NEW EXECUTOR ////////////////////////////////
1410 void Executor::loadTask(Task *task, const WorkloadManager::RunInfo& runInfo)
1412 if(task->getState() != YACS::TOLOAD)
1414 traceExec(task, "state:TOLOAD", ComputePlacement(task));
1416 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1417 _mainSched->notifyFrom(task,YACS::START,this);
1418 }//End of critical section
1421 std::ostringstream container_name;
1422 container_name << runInfo.resource.name << "-"
1423 << runInfo.type.name << "-" << runInfo.index;
1424 task->imposeResource(runInfo.resource.name, container_name.str());
1425 traceExec(task, "load", ComputePlacement(task));
1427 traceExec(task, "initService", ComputePlacement(task));
1428 task->initService();
1430 catch(Exception& ex)
1432 std::cerr << ex.what() << std::endl;
1434 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1436 _mainSched->notifyFrom(task,YACS::ABORT, this);
1437 traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
1438 }//End of critical section
1442 std::cerr << "Load failed" << std::endl;
1444 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1446 _mainSched->notifyFrom(task,YACS::ABORT, this);
1447 traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
1448 }//End of critical section
1452 void Executor::beginTask(Task *task)
1454 // --- Critical section
1455 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1456 _numberOfRunningTasks++;
1457 _runningTasks.insert(task);
1458 // --- End of critical section
1461 void Executor::endTask(Task *task, YACS::Event ev)
1463 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1466 if (ev == YACS::FINISH) task->finished();
1467 if (ev == YACS::ABORT)
1469 _errorDetected = true;
1470 if (_stopOnErrorRequested)
1472 _execMode = YACS::STEPBYSTEP;
1477 //traceExec(task, "state:"+Node::getStateName(task->getState()),placement);
1478 _mainSched->notifyFrom(task,ev,this);
1480 catch(Exception& ex)
1482 //notify has failed : it is supposed to have set state
1483 //so no need to do anything
1484 std::cerr << "Error during notification" << std::endl;
1485 std::cerr << ex.what() << std::endl;
1489 //notify has failed : it is supposed to have set state
1490 //so no need to do anything
1491 std::cerr << "Notification failed" << std::endl;
1493 _numberOfRunningTasks--;
1494 _runningTasks.erase(task);
1495 DEBTRACE("_numberOfRunningTasks: " << _numberOfRunningTasks
1496 << " _execMode: " << _execMode
1497 << " _executorState: " << _executorState);
1498 if ((_numberOfRunningTasks == 0) && (_execMode != YACS::CONTINUE)) // no more running tasks
1500 if (_executorState == YACS::WAITINGTASKS)
1502 _executorState = YACS::PAUSED;
1503 sendEvent("executor");
1504 _condForPilot.notify_all();
1505 if (_errorDetected &&
1506 _stopOnErrorRequested &&
1507 !_isRunningunderExternalControl)
1508 _condForStepByStep.notify_all(); // exec thread may be on waitResume
1511 if (_executorState != YACS::PAUSED)
1515 YACS::Event Executor::runTask(Task *task)
1517 { // --- Critical section
1518 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1519 task->begin(); //change state to ACTIVATED
1521 traceExec(task, "state:"+Node::getStateName(task->getState()),ComputePlacement(task));
1523 if(getDPLScopeSensitive())
1525 Node *node(dynamic_cast<Node *>(task));
1526 ComposedNode *gfn(dynamic_cast<ComposedNode *>(_mainSched));
1527 if(node!=0 && gfn!=0)
1528 node->applyDPLScope(gfn);
1531 YACS::Event ev=YACS::FINISH;
1534 traceExec(task, "start execution",ComputePlacement(task));
1536 traceExec(task, "end execution OK",ComputePlacement(task));
1538 catch(Exception& ex)
1540 std::cerr << "YACS Exception during execute" << std::endl;
1541 std::cerr << ex.what() << std::endl;
1543 string message = "end execution ABORT, ";
1544 message += ex.what();
1545 traceExec(task, message,ComputePlacement(task));
1549 // Execution has failed
1550 std::cerr << "Execution has failed: unknown reason" << std::endl;
1552 traceExec(task, "end execution ABORT, unknown reason",ComputePlacement(task));
1558 DEBTRACE("task->disconnectService()");
1559 task->disconnectService();
1560 traceExec(task, "disconnectService",ComputePlacement(task));
1564 // Disconnect has failed
1565 std::cerr << "disconnect has failed" << std::endl;
1567 traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
1571 std::string placement(ComputePlacement(task));
1573 // container management for HomogeneousPoolOfContainer
1575 HomogeneousPoolContainer *contC(dynamic_cast<HomogeneousPoolContainer *>(task->getContainer()));
1578 std::lock_guard<std::mutex> alckCont(contC->getLocker());
1579 contC->release(task);
1585 void Executor::makeDatastreamConnections(Task *task)
1587 YACS::StatesForNode state=task->getState();
1588 if(state != YACS::TOLOAD && state != YACS::TORECONNECT)
1592 task->connectService();
1593 traceExec(task, "connectService",ComputePlacement(task));
1595 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1597 }//End of critical section
1599 catch(Exception& ex)
1601 std::cerr << ex.what() << std::endl;
1604 (task)->disconnectService();
1605 traceExec(task, "disconnectService",ComputePlacement(task));
1609 // Disconnect has failed
1610 traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
1613 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1615 _mainSched->notifyFrom(task,YACS::ABORT,this);
1616 }//End of critical section
1620 std::cerr << "Problem in connectService" << std::endl;
1623 (task)->disconnectService();
1624 traceExec(task, "disconnectService",ComputePlacement(task));
1628 // Disconnect has failed
1629 traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
1632 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1634 _mainSched->notifyFrom(task,YACS::ABORT,this);
1635 }//End of critical section
1637 if(task->getState() == YACS::ERROR)
1639 //try to put all coupled tasks in error
1640 std::set<Task*> coupledSet;
1641 task->getCoupledTasks(coupledSet);
1642 for (std::set<Task*>::iterator it = coupledSet.begin(); it != coupledSet.end(); ++it)
1645 if(t == task)continue;
1646 if(t->getState() == YACS::ERROR)continue;
1649 t->disconnectService();
1650 traceExec(t, "disconnectService",ComputePlacement(task));
1654 // Disconnect has failed
1655 traceExec(t, "disconnectService failed, ABORT",ComputePlacement(task));
1658 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1660 _mainSched->notifyFrom(t,YACS::ABORT,this);
1661 }//End of critical section
1662 traceExec(t, "state:"+Node::getStateName(t->getState()),ComputePlacement(task));
1665 traceExec(task, "state:"+Node::getStateName(task->getState()),ComputePlacement(task));
1668 #include "Runtime.hxx"
1670 void loadResources(WorkloadManager::WorkloadManager& wm)
1672 Runtime *r(getRuntime());
1674 throw YACS::Exception("loadResources : no runtime !");
1675 std::vector< std::pair<std::string,int> > data(r->getCatalogOfComputeNodes());
1677 for(const std::pair<std::string,int>& res : data)
1679 WorkloadManager::Resource newResource;
1680 newResource.name = res.first;
1681 newResource.id = id;
1683 newResource.nbCores = res.second;
1684 wm.addResource(newResource);
1688 class NewTask : public WorkloadManager::Task
1691 NewTask(Executor& executor, YACS::ENGINE::Task* yacsTask);
1692 const WorkloadManager::ContainerType& type()const override;
1693 void run(const WorkloadManager::RunInfo& runInfo)override;
1694 bool isAccepted(const WorkloadManager::Resource& r)override;
1696 WorkloadManager::ContainerType _type;
1697 Executor& _executor;
1698 YACS::ENGINE::Task * _yacsTask;
1701 NewTask::NewTask(Executor& executor, YACS::ENGINE::Task* yacsTask)
1703 , _executor(executor)
1704 , _yacsTask(yacsTask)
1706 Container * yacsContainer = yacsTask->getContainer();
1707 if(yacsContainer != nullptr && yacsTask->canAcceptImposedResource())
1709 _type.ignoreResources = false;
1710 _type.name = yacsContainer->getName();
1711 std::string nb_procs_str = yacsContainer->getProperty("nb_parallel_procs");
1712 float needed_cores = 0.0;
1713 if(!nb_procs_str.empty())
1714 needed_cores = std::stof(nb_procs_str);
1715 _type.neededCores = needed_cores;
1719 _type.ignoreResources = true;
1720 _type.name = "test";
1721 _type.neededCores = 0;
1726 const WorkloadManager::ContainerType& NewTask::type()const
1731 void NewTask::run(const WorkloadManager::RunInfo& runInfo)
1733 _executor.loadTask(_yacsTask, runInfo);
1734 _executor.makeDatastreamConnections(_yacsTask);
1735 YACS::Event ev = _executor.runTask(_yacsTask);
1736 _executor.endTask(_yacsTask, ev);
1737 delete this; // provisoire
1740 bool NewTask::isAccepted(const WorkloadManager::Resource& r)
1742 Container * yacsContainer = _yacsTask->getContainer();
1743 std::string hostname = yacsContainer->getProperty("hostname");
1745 if(!hostname.empty())
1746 accept = (hostname == r.name);
1750 void Executor::runWlm(Scheduler *graph,int debug, bool fromScratch)
1752 DEBTRACE("Executor::runWlm debug: "<< graph->getName() <<" "<< debug<<" fromScratch: "<<fromScratch);
1753 { // --- Critical section
1754 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1756 _root = dynamic_cast<ComposedNode *>(_mainSched);
1757 if (!_root) throw Exception("Executor::Run, Internal Error!");
1758 _executorState = YACS::NOTYETINITIALIZED;
1759 sendEvent("executor");
1762 _errorDetected = false;
1763 _isWaitingEventsFromRunningTasks = false;
1764 _numberOfRunningTasks = 0;
1765 _runningTasks.clear();
1766 _numberOfEndedTasks = 0;
1767 string tracefile = "traceExec_";
1768 tracefile += _mainSched->getName();
1769 _trace.open(tracefile.c_str());
1770 _start = std::chrono::steady_clock::now();
1771 } // --- End of critical section
1773 if (debug > 1) _displayDot(graph);
1780 graph->exUpdateState();
1782 catch(Exception& ex)
1784 DEBTRACE("exception: "<< (ex.what()));
1785 _executorState = YACS::FINISHED;
1786 sendEvent("executor");
1790 _executorState = YACS::INITIALISED;
1791 sendEvent("executor");
1793 if (debug > 1) _displayDot(graph);
1799 _executorState = YACS::RUNNING;
1800 sendEvent("executor");
1802 WorkloadManager::DefaultAlgorithm algo;
1803 WorkloadManager::WorkloadManager wlm(algo);
1809 DEBTRACE("--- executor main loop");
1810 sleepWhileNoEventsFromAnyRunningTask();
1811 DEBTRACE("--- events...");
1812 if (debug > 2) _displayDot(graph);
1813 { // --- Critical section
1814 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1815 std::vector<Task *> readyTasks=graph->getNextTasks(isMore);
1816 graph->selectRunnableTasks(readyTasks);
1818 for(Task * t : readyTasks)
1819 if(_runningTasks.find(t) == _runningTasks.end())
1820 _tasks.push_back(t);
1821 // TODO: to be removed
1822 FilterTasksConsideringContainers(_tasks);
1823 numberAllTasks=_numberOfRunningTasks+_tasks.size();
1824 } // --- End of critical section
1825 if (debug > 2) _displayDot(graph);
1826 DEBTRACE("--- events...");
1827 if (_executorState == YACS::RUNNING)
1829 if (checkBreakPoints()) break; // end of thread requested, OK to exit at once;
1830 for(Task * task : _tasks)
1833 NewTask* newTask = new NewTask(*this, task);
1834 wlm.addTask(newTask);
1837 if (debug > 1) _displayDot(graph);
1838 { // --- Critical section
1840 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1841 //It is possible that the graph is finished but it remains running tasks (it's an error but we must take it into account)
1842 _toContinue = !graph->isFinished();
1844 DEBTRACE("_numberOfRunningTasks: " << _numberOfRunningTasks);
1845 DEBTRACE("_numberOfEndedTasks: " << _numberOfEndedTasks);
1846 DEBTRACE("_toContinue: " << _toContinue);
1847 if(_toContinue && numberAllTasks==0)
1849 //Problem : no running tasks and no task to launch ??
1851 std::cerr << "Problem in Executor : no running tasks and no task to launch ?? problemCount=" << problemCount << std::endl;
1852 //Pause to give a chance to interrupt
1854 if(problemCount > 25)
1856 // Too much problems encountered : stop execution
1863 _executorState = YACS::FINISHED;
1864 sendEvent("executor");
1865 _condForPilot.notify_all();
1867 } // --- End of critical section
1868 if (debug > 0) _displayDot(graph);
1869 DEBTRACE("_toContinue: " << _toContinue);
1873 DEBTRACE("End of main Loop");
1875 { // --- Critical section
1876 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1877 if ( _toContinue) // --- break while(): request to stop detected on checkBreakPoints()
1879 DEBTRACE("stop requested: End soon");
1880 _executorState = YACS::STOPPED;
1881 _toContinue = false;
1882 sendEvent("executor");
1884 } // --- End of critical section
1885 if ( _dumpOnErrorRequested && _errorDetected)
1887 saveState(_dumpErrorFile);
1890 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForTrace);
1893 DEBTRACE("End of RunB thread");
1896 void Executor::RunW(Scheduler *graph,int debug, bool fromScratch)
1898 std::string str_value = graph->getProperty("executor");
1899 if(str_value == "WorkloadManager"
1900 || str_value == "WORKLOADMANAGER"
1901 || str_value == "workloadmanager"
1902 || str_value == "WorkLoadManager")
1903 runWlm(graph, debug, fromScratch);
1905 RunB(graph, debug, fromScratch);