1 // Copyright (C) 2006-2019 CEA/DEN, EDF R&D
3 // This library is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU Lesser General Public
5 // License as published by the Free Software Foundation; either
6 // version 2.1 of the License, or (at your option) any later version.
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 // Lesser General Public License for more details.
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
20 #include "Executor.hxx"
22 #include "AutoLocker.hxx"
23 #include "Scheduler.hxx"
24 #include "Dispatcher.hxx"
25 #include "Container.hxx"
26 #include "HomogeneousPoolContainer.hxx"
27 #include "ComponentInstance.hxx"
29 #include "VisitorSaveState.hxx"
30 #include "ServiceNode.hxx"
31 #include "ComposedNode.hxx"
45 #define usleep(A) _sleep(A/1000)
46 #if !defined(S_ISCHR) || !defined(S_ISREG)
49 # define S_IFMT _S_IFMT
50 # define S_IFCHR _S_IFCHR
51 # define S_IFREG _S_IFREG
54 # define S_IFMT __S_IFMT
55 # define S_IFCHR __S_IFCHR
56 # define S_IFREG __S_IFREG
60 # define S_ISCHR(mode) (((mode) & S_IFMT) == S_IFCHR)
61 # define S_ISREG(mode) (((mode) & S_IFMT) == S_IFREG)
65 using namespace YACS::ENGINE;
68 using YACS::BASES::Mutex;
69 using YACS::BASES::Thread;
70 using YACS::BASES::Semaphore;
73 #include "YacsTrace.hxx"
75 int Executor::_maxThreads(1000);
76 size_t Executor::_threadStackSize(262144); // Default thread stack size is 256 kB == 2**18 because threads launched by YACS are lightweight
78 Executor::Executor():_mainSched(NULL),_isWaitingEventsFromRunningTasks(false),_nbOfConcurrentThreads(0), _semForMaxThreads(_maxThreads),_keepGoingOnFail(false),_DPLScopeSensitive(false)
83 _stopOnErrorRequested = false;
84 _dumpOnErrorRequested = false;
85 _errorDetected = false;
86 _isRunningunderExternalControl=false;
87 _executorState = YACS::NOTYETINITIALIZED;
88 _execMode = YACS::CONTINUE;
89 _semThreadCnt = _maxThreads;
90 _numberOfRunningTasks = 0;
91 _numberOfEndedTasks = 0;
92 DEBTRACE("Executor initialized with max threads = " << _maxThreads);
97 for(list<Thread *>::iterator iter=_groupOfAllThreadsCreated.begin();iter!=_groupOfAllThreadsCreated.end();iter++)
101 //! Execute a graph waiting for completion
103 * \param graph : schema to execute
104 * \param debug : display the graph with dot if debug == 1
105 * \param fromScratch : if true the graph is reinitialized
107 * Calls Scheduler::getNextTasks and Scheduler::selectRunnableTasks to select tasks to execute
109 * Calls Executor::launchTask to execute a selected Task.
111 * Completion when graph is finished (Scheduler::isFinished)
114 void Executor::RunA(Scheduler *graph,int debug, bool fromScratch)
116 DEBTRACE("Executor::RunW debug: " << debug << " fromScratch: " << fromScratch);
118 _root = dynamic_cast<ComposedNode *>(_mainSched);
119 if (!_root) throw Exception("Executor::Run, Internal Error!");
122 if(debug>1)_displayDot(graph);
126 graph->exUpdateState();
128 if(debug>1)_displayDot(graph);
129 vector<Task *> tasks;
130 vector<Task *>::iterator iter;
132 _execMode = YACS::CONTINUE;
133 _isWaitingEventsFromRunningTasks = false;
134 _numberOfRunningTasks = 0;
135 _runningTasks.clear();
136 _numberOfEndedTasks=0;
139 sleepWhileNoEventsFromAnyRunningTask();
141 if(debug>2)_displayDot(graph);
144 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
145 tasks=graph->getNextTasks(isMore);
146 graph->selectRunnableTasks(tasks);
147 }//End of critical section
149 if(debug>2)_displayDot(graph);
151 for(iter=tasks.begin();iter!=tasks.end();iter++)
152 loadTask(*iter,this);
154 if(debug>1)_displayDot(graph);
158 if(debug>1)_displayDot(graph);
161 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
162 _toContinue=!graph->isFinished();
163 }//End of critical section
164 DEBTRACE("_toContinue: " << _toContinue);
166 if(debug>0)_displayDot(graph);
172 //! Execute a graph with breakpoints or step by step
174 * To be launch in a thread (main thread controls the progression).
175 * \param graph : schema to execute
176 * \param debug : display the graph with dot if debug >0
177 * \param fromScratch : if false, state from a previous partial exection is already loaded
179 * Calls Scheduler::getNextTasks and Scheduler::selectRunnableTasks to select tasks to execute
181 * Calls Executor::checkBreakPoints to verify if a pause is requested
183 * Calls Executor::launchTask to execute a selected Task
185 * Completion when graph is finished (Scheduler::isFinished)
187 * States of execution:
188 * - YACS::NOTYETINITIALIZED
189 * - YACS::INITIALISED
190 * - YACS::RUNNING (to next breakpoint or step)
191 * - YACS::WAITINGTASKS (a breakpoint or step as been reached, but there are still running tasks)
192 * - YACS::PAUSED (a breakpoint or step as been reached, no more running tasks)
193 * - YACS::FINISHED (no more ready tasks, nore running tasks)
194 * - YACS::STOPPED (stopped by user before end)
196 * Modes of Execution:
197 * - YACS::CONTINUE (normal run without breakpoints)
198 * - YACS::STEPBYSTEP (pause at each loop)
199 * - YACS::STOPBEFORENODES (pause when a node is reached)
201 * A breakpoint is defined by a node name. The breakpoint is reached when the node becomes ready.
202 * Step by Step means execution node by node or group of node by group of nodes.
203 * At a given step, the user decides to launch all the ready nodes or only a subset
204 * (Caution: some nodes must run in parallel).
205 * The next event (end of task) may give a new set of ready nodes, and define a new step.
207 * The graph execution may be controled by a pilot which sends requests. Requests are asynchronous.
208 * Requests are taken into account only on certain states, otherwise return the status IgnoredRequest.
209 * - Executor::getCurrentExecMode
210 * - Executor::getExecutorState
211 * - Executor::setExecMode : change the execution mode for next loop
212 * - Executor::setListOfBreakPoints : must be set before setting YACS::STOPBEFORENODES
213 * - Executor::getTasksToLoad : when paused or waiting tasks, get the list of next tasks
214 * - Executor::setStepsToExecute : define a subset of the list given by Executor::getTasksToLoad
215 * - Executor::resumeCurrentBreakPoint : when paused or waiting tasks, resumes execution
216 * - Executor::isNotFinished
217 * - Executor::stopExecution : stop execution asap, i.e. set STEPBYSTEP and wait PAUSED
218 * - Executor::saveState : dump the current state of execution in an xml file
219 * - Executor::loadState : Not yet implemented
220 * - Executor::getNbOfThreads
221 * - Executor::displayDot
222 * - Executor::setStopOnError : ask to stop execution if a node is found in ERROR state
224 * If the pilot wants to wait the state YACS::PAUSED or YACS::WAITINGTASKS, synchronisation is obtained with:
225 * - Executor::waitPause
228 * - Pilot may connect to executor during execution, or deconnect.
229 * - Several Pilots may be connected at the same time (for observation...)
233 void Executor::RunB(Scheduler *graph,int debug, bool fromScratch)
235 DEBTRACE("Executor::RunB debug: "<< graph->getName() <<" "<< debug<<" fromScratch: "<<fromScratch);
237 { // --- Critical section
238 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
240 _root = dynamic_cast<ComposedNode *>(_mainSched);
241 if (!_root) throw Exception("Executor::Run, Internal Error!");
242 _executorState = YACS::NOTYETINITIALIZED;
243 sendEvent("executor");
246 _errorDetected = false;
247 _isWaitingEventsFromRunningTasks = false;
248 _numberOfRunningTasks = 0;
249 _runningTasks.clear();
250 _numberOfEndedTasks = 0;
251 string tracefile = "traceExec_";
252 tracefile += _mainSched->getName();
253 _trace.open(tracefile.c_str());
254 _start = std::chrono::steady_clock::now();
255 } // --- End of critical section
257 if (debug > 1) _displayDot(graph);
264 graph->exUpdateState();
268 DEBTRACE("exception: "<< (ex.what()));
269 _executorState = YACS::FINISHED;
270 sendEvent("executor");
274 _executorState = YACS::INITIALISED;
275 sendEvent("executor");
277 if (debug > 1) _displayDot(graph);
279 vector<Task *>::iterator iter;
284 _executorState = YACS::RUNNING;
285 sendEvent("executor");
288 DEBTRACE("--- executor main loop");
289 sleepWhileNoEventsFromAnyRunningTask();
290 DEBTRACE("--- events...");
291 if (debug > 2) _displayDot(graph);
292 { // --- Critical section
293 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
294 _tasks=graph->getNextTasks(isMore);
295 graph->selectRunnableTasks(_tasks);
296 FilterTasksConsideringContainers(_tasks);
297 numberAllTasks=_numberOfRunningTasks+_tasks.size();
298 } // --- End of critical section
299 if (debug > 2) _displayDot(graph);
300 if (_executorState == YACS::RUNNING)
302 if (checkBreakPoints()) break; // end of thread requested, OK to exit at once;
303 if (debug > 0) _displayDot(graph);
305 loadParallelTasks(_tasks,this);
306 if (debug > 1) _displayDot(graph);
311 if (debug > 1) _displayDot(graph);
312 { // --- Critical section
314 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
315 //It is possible that the graph is finished but it remains running tasks (it's an error but we must take it into account)
316 if(_numberOfRunningTasks == 0)
317 _toContinue = !graph->isFinished();
319 DEBTRACE("_numberOfRunningTasks: " << _numberOfRunningTasks);
320 DEBTRACE("_numberOfEndedTasks: " << _numberOfEndedTasks);
321 DEBTRACE("_toContinue: " << _toContinue);
322 if(_toContinue && numberAllTasks==0)
324 //Problem : no running tasks and no task to launch ??
326 std::cerr << "Problem in Executor : no running tasks and no task to launch ?? problemCount=" << problemCount << std::endl;
327 //Pause to give a chance to interrupt
329 if(problemCount > 25)
331 // Too much problems encountered : stop execution
338 _executorState = YACS::FINISHED;
339 sendEvent("executor");
340 _condForPilot.notify_all();
342 } // --- End of critical section
343 if (debug > 0) _displayDot(graph);
344 DEBTRACE("_toContinue: " << _toContinue);
347 DEBTRACE("End of main Loop");
349 { // --- Critical section
350 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
351 if ( _toContinue) // --- break while(): request to stop detected on checkBreakPoints()
353 DEBTRACE("stop requested: End soon");
354 _executorState = YACS::STOPPED;
356 sendEvent("executor");
358 } // --- End of critical section
359 if ( _dumpOnErrorRequested && _errorDetected)
361 saveState(_dumpErrorFile);
364 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForTrace);
367 DEBTRACE("End of RunB thread");
370 YACS::ExecutionMode Executor::getCurrentExecMode()
372 _isRunningunderExternalControl=true;
377 YACS::ExecutorState Executor::getExecutorState()
379 _isRunningunderExternalControl=true;
380 return _executorState;
384 bool Executor::isNotFinished()
386 _isRunningunderExternalControl=true;
390 //! ask to stop execution on the first node found in error
392 * \param dumpRequested produce a state dump when an error is found
393 * \param xmlFile name of file used for state dump
396 void Executor::setStopOnError(bool dumpRequested, std::string xmlFile)
398 { // --- Critical section
399 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
400 _dumpErrorFile=xmlFile;
401 _stopOnErrorRequested=true;
402 _dumpOnErrorRequested = dumpRequested;
403 if (dumpRequested && xmlFile.empty())
404 throw YACS::Exception("dump on error requested and no filename given for dump");
405 DEBTRACE("_dumpErrorFile " << _dumpErrorFile << " " << _dumpOnErrorRequested);
406 } // --- End of critical section
409 //! ask to do not stop execution on nodes found in error
413 void Executor::unsetStopOnError()
415 { // --- Critical section
416 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
417 _stopOnErrorRequested=false;
418 } // --- End of critical section
421 //! Dynamically set the current mode of execution
423 * The mode can be Continue, step by step, or stop before execution of a node
424 * defined in a list of breakpoints.
427 void Executor::setExecMode(YACS::ExecutionMode mode)
429 DEBTRACE("Executor::setExecMode(YACS::ExecutionMode mode) " << mode);
430 { // --- Critical section
431 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
432 _isRunningunderExternalControl=true;
434 } // --- End of critical section
437 //! wake up executor when in pause
439 * When Executor is in state paused or waiting for task completion, the thread
440 * running loop RunB waits on condition _condForStepByStep.
441 * Thread RunB is waken up.
442 * \return true when actually wakes up executor
445 bool Executor::resumeCurrentBreakPoint()
447 DEBTRACE("Executor::resumeCurrentBreakPoint()");
449 //bool doDump = false;
450 { // --- Critical section
451 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
452 _isRunningunderExternalControl=true;
453 DEBTRACE("_executorState: " << _executorState);
454 switch (_executorState)
456 case YACS::WAITINGTASKS:
459 _condForStepByStep.notify_all();
460 _executorState = YACS::RUNNING;
461 sendEvent("executor");
463 //if (_dumpOnErrorRequested && _errorDetected) doDump =true;
469 //if (_dumpOnErrorRequested && _errorDetected) doDump =true;
470 DEBTRACE("Graph Execution finished or stopped !");
475 // debug: no easy way to verify if main loop is acutally waiting on condition
479 //if (doDump) saveState(_dumpErrorFile);
480 } // --- End of critical section
485 //! define a list of nodes names as breakpoints in the graph
488 void Executor::setListOfBreakPoints(std::list<std::string> listOfBreakPoints)
490 DEBTRACE("Executor::setListOfBreakPoints(std::list<std::string> listOfBreakPoints)");
491 { // --- Critical section
492 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
493 _isRunningunderExternalControl=true;
494 _listOfBreakPoints = listOfBreakPoints;
495 } // --- End of critical section
499 //! Get the list of tasks to load, to define a subset to execute in step by step mode
501 * If the executor is not in mode YACS::WAITINGTASKS nor YACS::PAUSED, the list is empty.
502 * Use Executor::waitPause to wait.
504 std::list<std::string> Executor::getTasksToLoad()
506 DEBTRACE("Executor::getTasksToLoad()");
507 list<string> listOfNodesToLoad;
508 listOfNodesToLoad.clear();
509 { // --- Critical section
510 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
511 _isRunningunderExternalControl=true;
512 switch (_executorState)
514 case YACS::WAITINGTASKS:
517 listOfNodesToLoad = _listOfTasksToLoad;
520 case YACS::NOTYETINITIALIZED:
521 case YACS::INITIALISED:
530 } // --- End of critical section
531 return listOfNodesToLoad;
535 //! Define a subset of task to execute in step by step mode
537 * Behaviour is unpredictable if the list is not a subset of the list given by Executor::getTasksToLoad
538 * in the current step.
539 * If some nodes must run in parallel, they must stay together in the list.
542 bool Executor::setStepsToExecute(std::list<std::string> listToExecute)
544 DEBTRACE("Executor::setStepsToExecute(std::list<std::string> listToExecute)");
546 vector<Task *>::iterator iter;
547 vector<Task *> restrictedTasks;
548 { // --- Critical section
549 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
550 _isRunningunderExternalControl=true;
551 switch (_executorState)
553 case YACS::WAITINGTASKS:
556 for (iter=_tasksSave.begin(); iter!=_tasksSave.end(); iter++)
558 string readyNode = _mainSched->getTaskName(*iter);
559 if (find(listToExecute.begin(), listToExecute.end(), readyNode)
560 != listToExecute.end())
562 restrictedTasks.push_back(*iter);
563 DEBTRACE("node to execute " << readyNode);
567 for (iter=restrictedTasks.begin(); iter!=restrictedTasks.end(); iter++)
569 _tasks.push_back(*iter);
573 case YACS::NOTYETINITIALIZED:
574 case YACS::INITIALISED:
583 } // --- End of critical section
586 for (iter=restrictedTasks.begin(); iter!=restrictedTasks.end(); iter++)
588 _tasks.push_back(*iter);
590 for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
592 string readyNode = _mainSched->getTaskName(*iter);
593 DEBTRACE("selected node to execute " << readyNode);
599 //! suspend pilot execution until Executor is in pause or waiting tasks completion mode.
601 * Do nothing if execution is finished or in pause.
602 * Wait first step if Executor is running or in initialization.
605 void Executor::waitPause()
607 DEBTRACE("Executor::waitPause()" << _executorState);
608 { // --- Critical section
609 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
610 _isRunningunderExternalControl=true;
611 switch (_executorState)
616 case YACS::WAITINGTASKS:
621 case YACS::NOTYETINITIALIZED:
622 case YACS::INITIALISED:
625 _condForPilot.wait(_mutexForSchedulerUpdate); // wait until executor is PAUSED or WAITINGTASKS
629 } // --- End of critical section
634 * This method can be called at any time simultaneously during a RunB call.
635 * This method will wait until the executor is locked in a consistent state of a running graph.
637 * This method is expected to be called in association with resume method.
638 * The returned parameter is expected to be transfered to resume method.
640 bool Executor::suspendASAP()
642 // no AutoLocker here. It's not a bug.
643 _mutexForSchedulerUpdate.lock();
644 if(!_toContinue && _executorState==YACS::FINISHED)
645 {// execution is finished
646 _mutexForSchedulerUpdate.unLock();
647 return false;// the executor is no more running
649 //general case. Leave method with locker in locked status
654 * This method is expected to be called in association with suspendASAP method.
655 * Expected to be called just after suspendASAP with output of resume as input parameter
657 void Executor::resume(bool suspended)
660 _mutexForSchedulerUpdate.unLock();
663 //! stops the execution as soon as possible
665 void Executor::stopExecution()
667 setExecMode(YACS::STEPBYSTEP);
670 resumeCurrentBreakPoint();
673 //! save the current state of execution in an xml file
675 bool Executor::saveState(const std::string& xmlFile)
677 DEBTRACE("Executor::saveState() in " << xmlFile);
680 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
681 YACS::ENGINE::VisitorSaveState vst(_root);
682 vst.openFileDump(xmlFile.c_str());
687 catch(Exception& ex) {
688 std::cerr << ex.what() << std::endl;
693 //! not yet implemented
695 bool Executor::loadState()
697 DEBTRACE("Executor::loadState()");
698 _isRunningunderExternalControl=true;
703 static int isfile(const char *filename)
706 if (stat(filename, &buf) != 0)
708 if (!S_ISREG(buf.st_mode))
713 //! Display the graph state as a dot display, public method
715 void Executor::displayDot(Scheduler *graph)
717 _isRunningunderExternalControl=true;
721 //! Display the graph state as a dot display
723 * \param graph : the node to display
726 void Executor::_displayDot(Scheduler *graph)
728 std::ofstream g("titi");
729 ((ComposedNode*)graph)->writeDot(g);
731 const char displayScript[]="display.sh";
732 if(isfile(displayScript))
733 system("sh display.sh");
735 system("dot -Tpng titi|display -delay 5");
738 //! Wait reactivation in modes Step By step or with BreakPoints
740 * Check mode of execution (set by main thread):
741 * - YACS::CONTINUE : the graph execution continues.
742 * - YACS::STEPBYSTEP : wait on condition (pilot thread, Executor::resumeCurrentBreakPoint)
743 * - YACS::STOPBEFORENODES : if there are ready nodes in a list of breakpoints,
744 * wait on condition (pilot thread, Executor::resumeCurrentBreakPoint)
745 * else continue the graph execution.
746 * \return true if end of executor thread is requested
749 bool Executor::checkBreakPoints()
751 DEBTRACE("Executor::checkBreakPoints()");
752 vector<Task *>::iterator iter;
753 bool endRequested = false;
761 case YACS::STOPBEFORENODES:
764 { // --- Critical section
765 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
767 for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
769 string nodeToLoad = _mainSched->getTaskName(*iter);
770 if (find(_listOfBreakPoints.begin(), _listOfBreakPoints.end(), nodeToLoad)
771 != _listOfBreakPoints.end())
779 _listOfTasksToLoad.clear();
780 for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
782 string nodeToLoad = _mainSched->getTaskName(*iter);
783 _listOfTasksToLoad.push_back(nodeToLoad);
785 if (getNbOfThreads())
786 _executorState = YACS::WAITINGTASKS; // will be paused after completion of running tasks
788 _executorState = YACS::PAUSED;
789 sendEvent("executor");
790 _condForPilot.notify_all();
792 if (stop && !_isOKToEnd) waitResume(); // wait until pilot calls resumeCurrentBreakPoint(), mutex released during wait
793 if (_isOKToEnd) endRequested = true;
794 } // --- End of critical section
795 if (stop) DEBTRACE("wake up from waitResume");
799 case YACS::STEPBYSTEP:
801 { // --- Critical section
802 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
804 _listOfTasksToLoad.clear();
805 for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
807 string nodeToLoad = _mainSched->getTaskName(*iter);
808 _listOfTasksToLoad.push_back(nodeToLoad);
810 if (getNbOfThreads())
811 _executorState = YACS::WAITINGTASKS; // will be paused after completion of running tasks
813 _executorState = YACS::PAUSED;
814 sendEvent("executor");
815 _condForPilot.notify_all();
817 waitResume(); // wait until pilot calls resumeCurrentBreakPoint(), mutex released during wait
818 // or, if no pilot, wait until no more running tasks (stop on error)
819 if (_isOKToEnd) endRequested = true;
820 } // --- End of critical section
821 DEBTRACE("wake up from waitResume");
825 DEBTRACE("endRequested: " << endRequested);
830 //! in modes Step By step or with BreakPoint, wait until pilot resumes the execution
832 * With the condition Mutex, the mutex is released atomically during the wait.
833 * Pilot calls Executor::resumeCurrentBreakPoint to resume execution.
834 * Must be called while mutex is locked.
837 void Executor::waitResume()
839 DEBTRACE("Executor::waitResume()");
840 _condForStepByStep.wait(_mutexForSchedulerUpdate); // wait until pilot calls resumeCurrentBreakPoint()
845 //! Perform loading of a Task.
847 * \param task : Task to load
850 void Executor::loadTask(Task *task, const Executor *execInst)
852 DEBTRACE("Executor::loadTask(Task *task)");
853 if(task->getState() != YACS::TOLOAD)
855 traceExec(task, "state:TOLOAD", ComputePlacement(task));
857 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
858 _mainSched->notifyFrom(task,YACS::START,execInst);
859 }//End of critical section
862 traceExec(task, "load", ComputePlacement(task));
864 traceExec(task, "initService", ComputePlacement(task));
869 std::cerr << ex.what() << std::endl;
871 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
873 _mainSched->notifyFrom(task,YACS::ABORT,execInst);
874 traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
875 }//End of critical section
879 std::cerr << "Load failed" << std::endl;
881 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
883 _mainSched->notifyFrom(task,YACS::ABORT,execInst);
884 traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
885 }//End of critical section
896 void Executor::loadParallelTasks(const std::vector<Task *>& tasks, const Executor *execInst)
898 std::vector<Thread> ths(tasks.size());
899 std::size_t ithread(0);
900 for(std::vector<Task *>::const_iterator iter = _tasks.begin(); iter != _tasks.end(); iter++, ithread++)
902 DEBTRACE("Executor::loadParallelTasks(Task *task)");
903 struct threadargs *args(new threadargs);
904 args->task = (*iter);
905 args->sched = _mainSched;
906 args->execInst = this;
907 ths[ithread].go(functionForTaskLoad, args, _threadStackSize);
909 for(ithread=0;ithread<tasks.size();ithread++)
913 //! Execute a list of tasks possibly connected through datastream links
915 * \param tasks : a list of tasks to execute
918 void Executor::launchTasks(const std::vector<Task *>& tasks)
920 //First phase, make datastream connections
921 for(vector<Task *>::const_iterator iter=tasks.begin();iter!=tasks.end();iter++)
923 YACS::StatesForNode state=(*iter)->getState();
924 if(state != YACS::TOLOAD && state != YACS::TORECONNECT)continue;
927 (*iter)->connectService();
928 traceExec(*iter, "connectService",ComputePlacement(*iter));
930 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
931 (*iter)->connected();
932 }//End of critical section
936 std::cerr << ex.what() << std::endl;
939 (*iter)->disconnectService();
940 traceExec(*iter, "disconnectService",ComputePlacement(*iter));
944 // Disconnect has failed
945 traceExec(*iter, "disconnectService failed, ABORT",ComputePlacement(*iter));
948 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
950 _mainSched->notifyFrom(*iter,YACS::ABORT,this);
951 }//End of critical section
955 std::cerr << "Problem in connectService" << std::endl;
958 (*iter)->disconnectService();
959 traceExec(*iter, "disconnectService",ComputePlacement(*iter));
963 // Disconnect has failed
964 traceExec(*iter, "disconnectService failed, ABORT",ComputePlacement(*iter));
967 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
969 _mainSched->notifyFrom(*iter,YACS::ABORT,this);
970 }//End of critical section
972 if((*iter)->getState() == YACS::ERROR)
974 //try to put all coupled tasks in error
975 std::set<Task*> coupledSet;
976 (*iter)->getCoupledTasks(coupledSet);
977 for (std::set<Task*>::iterator it = coupledSet.begin(); it != coupledSet.end(); ++it)
980 if(t == *iter)continue;
981 if(t->getState() == YACS::ERROR)continue;
984 t->disconnectService();
985 traceExec(t, "disconnectService",ComputePlacement(*iter));
989 // Disconnect has failed
990 traceExec(t, "disconnectService failed, ABORT",ComputePlacement(*iter));
993 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
995 _mainSched->notifyFrom(t,YACS::ABORT,this);
996 }//End of critical section
997 traceExec(t, "state:"+Node::getStateName(t->getState()),ComputePlacement(*iter));
1000 traceExec(*iter, "state:"+Node::getStateName((*iter)->getState()),ComputePlacement(*iter));
1003 //Second phase, execute each task in a thread
1004 for(vector<Task *>::const_iterator iter=tasks.begin();iter!=tasks.end();iter++)
1010 //! Execute a Task in a thread
1012 * \param task : Task to execute
1014 * Calls Scheduler::notifyFrom of main node (_mainSched) to notify start
1016 * Calls Executor::functionForTaskExecution in Thread
1019 void Executor::launchTask(Task *task)
1021 DEBTRACE("Executor::launchTask(Task *task)");
1022 struct threadargs *args;
1023 if(task->getState() != YACS::TOACTIVATE)return;
1025 DEBTRACE("before _semForMaxThreads.wait " << _semThreadCnt);
1026 if(_semThreadCnt == 0)
1028 // --- Critical section
1029 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1030 //check if we have enough threads to run
1031 std::set<Task*> tmpSet=_runningTasks;
1032 std::set<Task*>::iterator it = tmpSet.begin();
1033 std::string status="running";
1034 std::set<Task*> coupledSet;
1035 while( it != tmpSet.end() )
1039 tt->getCoupledTasks(coupledSet);
1041 for (std::set<Task*>::iterator iter = coupledSet.begin(); iter != coupledSet.end(); ++iter)
1043 if((*iter)->getState() == YACS::TOACTIVATE)status="toactivate";
1044 tmpSet.erase(*iter);
1046 if(status=="running")break;
1047 it = tmpSet.begin();
1050 if(status=="toactivate")
1052 std::cerr << "WARNING: maybe you need more threads to run your schema (current value="<< _maxThreads << ")" << std::endl;
1053 std::cerr << "If it is the case, set the YACS_MAX_THREADS environment variable to a bigger value (export YACS_MAX_THREADS=xxx)" << std::endl;
1055 // --- End of critical section
1058 _semForMaxThreads.wait();
1061 args= new threadargs;
1063 args->sched = _mainSched;
1064 args->execInst = this;
1066 traceExec(task, "launch",ComputePlacement(task));
1068 { // --- Critical section
1069 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1070 _numberOfRunningTasks++;
1071 _runningTasks.insert(task);
1072 task->begin(); //change state to ACTIVATED
1073 } // --- End of critical section
1074 Thread(functionForTaskExecution, args, _threadStackSize);
1077 //! wait until a running task ends
1079 void Executor::sleepWhileNoEventsFromAnyRunningTask()
1081 DEBTRACE("Executor::sleepWhileNoEventsFromAnyRunningTask()");
1082 // _semForNewTasksToPerform.wait(); //----utiliser pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
1083 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1084 if (_numberOfRunningTasks > 0 && _numberOfEndedTasks==0)
1086 _isWaitingEventsFromRunningTasks = true;
1087 _condForNewTasksToPerform.wait(_mutexForSchedulerUpdate); // mutex released during wait
1089 _numberOfEndedTasks=0;
1095 void Executor::notifyEndOfThread(YACS::BASES::Thread *thread)
1097 /*_mutexForNbOfConcurrentThreads.lock();
1098 _groupOfAllThreadsCreated.remove(thread);
1100 _mutexForNbOfConcurrentThreads.unlock();*/
1104 //! must be used protected by _mutexForSchedulerUpdate!
1106 void Executor::wakeUp()
1108 DEBTRACE("Executor::wakeUp() " << _isWaitingEventsFromRunningTasks);
1109 if (_isWaitingEventsFromRunningTasks)
1111 _isWaitingEventsFromRunningTasks = false;
1112 _condForNewTasksToPerform.notify_all();
1115 _numberOfEndedTasks++;
1118 //! number of running tasks
1120 int Executor::getNbOfThreads()
1123 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForNbOfConcurrentThreads);
1124 _isRunningunderExternalControl=true;
1125 ret = _groupOfAllThreadsCreated.size();
1130 * This thread is NOT supposed to be detached !
1132 void *Executor::functionForTaskLoad(void *arg)
1134 DEBTRACE("Executor::functionForTaskLoad(void *arg)");
1135 struct threadargs *args = (struct threadargs *) arg;
1136 Task *task=args->task;
1137 Scheduler *sched=args->sched;
1138 Executor *execInst=args->execInst;
1140 execInst->loadTask(task,execInst);// no throw of this method - all throw are catched !
1144 //! Function to perform execution of a task in a thread
1146 * \param arg : 3 elements (a Task, a Scheduler, an Executor)
1148 * Calls Task::execute
1150 * Calls Task::finished when the task is finished
1152 * Calls (notify with event YACS::FINISH) Scheduler::notifyFrom when the task is finished
1154 * Calls Executor::wakeUp and Executor::notifyEndOfThread
1157 void *Executor::functionForTaskExecution(void *arg)
1159 DEBTRACE("Executor::functionForTaskExecution(void *arg)");
1161 struct threadargs *args = (struct threadargs *) arg;
1162 Task *task=args->task;
1163 Scheduler *sched=args->sched;
1164 Executor *execInst=args->execInst;
1166 execInst->traceExec(task, "state:"+Node::getStateName(task->getState()),ComputePlacement(task));
1172 if(execInst->getDPLScopeSensitive())
1174 Node *node(dynamic_cast<Node *>(task));
1175 ComposedNode *gfn(dynamic_cast<ComposedNode *>(sched));
1176 if(node!=0 && gfn!=0)
1177 node->applyDPLScope(gfn);
1180 YACS::Event ev=YACS::FINISH;
1183 execInst->traceExec(task, "start execution",ComputePlacement(task));
1185 execInst->traceExec(task, "end execution OK",ComputePlacement(task));
1187 catch(Exception& ex)
1189 std::cerr << "YACS Exception during execute" << std::endl;
1190 std::cerr << ex.what() << std::endl;
1192 string message = "end execution ABORT, ";
1193 message += ex.what();
1194 execInst->traceExec(task, message,ComputePlacement(task));
1198 // Execution has failed
1199 std::cerr << "Execution has failed: unknown reason" << std::endl;
1201 execInst->traceExec(task, "end execution ABORT, unknown reason",ComputePlacement(task));
1207 DEBTRACE("task->disconnectService()");
1208 task->disconnectService();
1209 execInst->traceExec(task, "disconnectService",ComputePlacement(task));
1213 // Disconnect has failed
1214 std::cerr << "disconnect has failed" << std::endl;
1216 execInst->traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
1220 std::string placement(ComputePlacement(task));
1222 // container management for HomogeneousPoolOfContainer
1224 HomogeneousPoolContainer *contC(dynamic_cast<HomogeneousPoolContainer *>(task->getContainer()));
1227 std::lock_guard<std::mutex> alckCont(contC->getLocker());
1228 contC->release(task);
1231 DEBTRACE("End task->execute()");
1232 { // --- Critical section
1233 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&execInst->_mutexForSchedulerUpdate);
1236 if (ev == YACS::FINISH) task->finished();
1237 if (ev == YACS::ABORT)
1239 execInst->_errorDetected = true;
1240 if (execInst->_stopOnErrorRequested)
1242 execInst->_execMode = YACS::STEPBYSTEP;
1243 execInst->_isOKToEnd = true;
1247 execInst->traceExec(task, "state:"+Node::getStateName(task->getState()),placement);
1248 sched->notifyFrom(task,ev,execInst);
1250 catch(Exception& ex)
1252 //notify has failed : it is supposed to have set state
1253 //so no need to do anything
1254 std::cerr << "Error during notification" << std::endl;
1255 std::cerr << ex.what() << std::endl;
1259 //notify has failed : it is supposed to have set state
1260 //so no need to do anything
1261 std::cerr << "Notification failed" << std::endl;
1263 execInst->_numberOfRunningTasks--;
1264 execInst->_runningTasks.erase(task);
1265 DEBTRACE("_numberOfRunningTasks: " << execInst->_numberOfRunningTasks
1266 << " _execMode: " << execInst->_execMode
1267 << " _executorState: " << execInst->_executorState);
1268 if ((execInst->_numberOfRunningTasks == 0) && (execInst->_execMode != YACS::CONTINUE)) // no more running tasks
1270 if (execInst->_executorState == YACS::WAITINGTASKS)
1272 execInst->_executorState = YACS::PAUSED;
1273 execInst->sendEvent("executor");
1274 execInst->_condForPilot.notify_all();
1275 if (execInst->_errorDetected &&
1276 execInst->_stopOnErrorRequested &&
1277 !execInst->_isRunningunderExternalControl)
1278 execInst->_condForStepByStep.notify_all(); // exec thread may be on waitResume
1281 DEBTRACE("before _semForMaxThreads.post " << execInst->_semThreadCnt);
1282 execInst->_semForMaxThreads.post();
1283 execInst->_semThreadCnt += 1;
1284 DEBTRACE("after _semForMaxThreads.post " << execInst->_semThreadCnt);
1285 if (execInst->_executorState != YACS::PAUSED) execInst->wakeUp();
1287 } // --- End of critical section (change state)
1289 //execInst->notifyEndOfThread(0);
1294 void Executor::traceExec(Task *task, const std::string& message, const std::string& placement)
1296 string nodeName = _mainSched->getTaskName(task);
1297 Container *cont = task->getContainer();
1298 string containerName = "---";
1300 containerName = cont->getName();
1302 std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now();
1303 std::chrono::milliseconds millisec;
1304 millisec = std::chrono::duration_cast<std::chrono::milliseconds>(now -_start);
1305 double elapse = double(millisec.count()) / 1000.0;
1307 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForTrace);
1308 _trace << elapse << " " << containerName << " " << placement << " " << nodeName << " " << message << endl;
1313 //! emit notification to all observers registered with the dispatcher
1315 * The dispatcher is unique and can be obtained by getDispatcher()
1317 void Executor::sendEvent(const std::string& event)
1319 Dispatcher* disp=Dispatcher::getDispatcher();
1322 disp->dispatch(_root,event);
1327 bool operator()(HomogeneousPoolContainer * lhs, HomogeneousPoolContainer * rhs) const
1335 return lhs->getNumberOfCoresPerWorker() < rhs->getNumberOfCoresPerWorker();
1340 * This method takes in input a list of tasks and selects from that lists a part of it considering only the containers.
1341 * If tasks have no container instance subclass of HomogeneousPoolContainer this method will let the \a tsks untouched.
1343 * \param [in,out] tsks - list of tasks to be
1345 void Executor::FilterTasksConsideringContainers(std::vector<Task *>& tsks)
1347 std::map<HomogeneousPoolContainer *, std::vector<Task *>, HPCCompare > m;
1348 for(auto cur : tsks)
1352 Container *cont(cur->getContainer());
1355 m[nullptr].push_back(cur);
1358 HomogeneousPoolContainer *contC(dynamic_cast<HomogeneousPoolContainer *>(cont));
1361 m[nullptr].push_back(cur);
1364 m[contC].push_back(cur);
1367 std::vector<Task *> ret;
1370 HomogeneousPoolContainer *curhpc(it.first);
1371 const std::vector<Task *>& curtsks(it.second);
1374 ret.insert(ret.end(),curtsks.begin(),curtsks.end());
1378 // start of critical section for container curhpc
1379 std::lock_guard<std::mutex> alckCont(curhpc->getLocker());
1380 std::vector<const Task *> vecOfTaskSharingSameHPContToBeRunSimutaneously;
1381 std::size_t sz(curhpc->getNumberOfFreePlace());
1382 std::vector<Task *>::const_iterator it2(curtsks.begin());
1383 for(std::size_t i=0;i<sz && it2!=curtsks.end();i++,it2++)
1385 vecOfTaskSharingSameHPContToBeRunSimutaneously.push_back(*it2);
1386 ret.push_back(*it2);
1388 curhpc->allocateFor(vecOfTaskSharingSameHPContToBeRunSimutaneously);
1389 //end of critical section
1396 std::string Executor::ComputePlacement(Task *zeTask)
1398 std::string placement("---");
1401 if(zeTask->getContainer())
1402 placement=zeTask->getContainer()->getFullPlacementId(zeTask);
1406 ///////// NEW EXECUTOR ////////////////////////////////
1407 void Executor::loadTask(Task *task)
1409 if(task->getState() != YACS::TOLOAD)
1411 traceExec(task, "state:TOLOAD", ComputePlacement(task));
1413 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1414 _mainSched->notifyFrom(task,YACS::START,this);
1415 }//End of critical section
1418 traceExec(task, "load", ComputePlacement(task));
1420 traceExec(task, "initService", ComputePlacement(task));
1421 task->initService();
1423 catch(Exception& ex)
1425 std::cerr << ex.what() << std::endl;
1427 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1429 _mainSched->notifyFrom(task,YACS::ABORT, this);
1430 traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
1431 }//End of critical section
1435 std::cerr << "Load failed" << std::endl;
1437 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1439 _mainSched->notifyFrom(task,YACS::ABORT, this);
1440 traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
1441 }//End of critical section
1445 void Executor::beginTask(Task *task)
1447 // --- Critical section
1448 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1449 _numberOfRunningTasks++;
1450 _runningTasks.insert(task);
1451 // --- End of critical section
1454 void Executor::endTask(Task *task, YACS::Event ev)
1456 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1459 if (ev == YACS::FINISH) task->finished();
1460 if (ev == YACS::ABORT)
1462 _errorDetected = true;
1463 if (_stopOnErrorRequested)
1465 _execMode = YACS::STEPBYSTEP;
1470 //traceExec(task, "state:"+Node::getStateName(task->getState()),placement);
1471 _mainSched->notifyFrom(task,ev,this);
1473 catch(Exception& ex)
1475 //notify has failed : it is supposed to have set state
1476 //so no need to do anything
1477 std::cerr << "Error during notification" << std::endl;
1478 std::cerr << ex.what() << std::endl;
1482 //notify has failed : it is supposed to have set state
1483 //so no need to do anything
1484 std::cerr << "Notification failed" << std::endl;
1486 _numberOfRunningTasks--;
1487 _runningTasks.erase(task);
1488 DEBTRACE("_numberOfRunningTasks: " << _numberOfRunningTasks
1489 << " _execMode: " << _execMode
1490 << " _executorState: " << _executorState);
1491 if ((_numberOfRunningTasks == 0) && (_execMode != YACS::CONTINUE)) // no more running tasks
1493 if (_executorState == YACS::WAITINGTASKS)
1495 _executorState = YACS::PAUSED;
1496 sendEvent("executor");
1497 _condForPilot.notify_all();
1498 if (_errorDetected &&
1499 _stopOnErrorRequested &&
1500 !_isRunningunderExternalControl)
1501 _condForStepByStep.notify_all(); // exec thread may be on waitResume
1504 if (_executorState != YACS::PAUSED)
1508 YACS::Event Executor::runTask(Task *task)
1510 { // --- Critical section
1511 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1512 task->begin(); //change state to ACTIVATED
1514 traceExec(task, "state:"+Node::getStateName(task->getState()),ComputePlacement(task));
1516 if(getDPLScopeSensitive())
1518 Node *node(dynamic_cast<Node *>(task));
1519 ComposedNode *gfn(dynamic_cast<ComposedNode *>(_mainSched));
1520 if(node!=0 && gfn!=0)
1521 node->applyDPLScope(gfn);
1524 YACS::Event ev=YACS::FINISH;
1527 traceExec(task, "start execution",ComputePlacement(task));
1529 traceExec(task, "end execution OK",ComputePlacement(task));
1531 catch(Exception& ex)
1533 std::cerr << "YACS Exception during execute" << std::endl;
1534 std::cerr << ex.what() << std::endl;
1536 string message = "end execution ABORT, ";
1537 message += ex.what();
1538 traceExec(task, message,ComputePlacement(task));
1542 // Execution has failed
1543 std::cerr << "Execution has failed: unknown reason" << std::endl;
1545 traceExec(task, "end execution ABORT, unknown reason",ComputePlacement(task));
1551 DEBTRACE("task->disconnectService()");
1552 task->disconnectService();
1553 traceExec(task, "disconnectService",ComputePlacement(task));
1557 // Disconnect has failed
1558 std::cerr << "disconnect has failed" << std::endl;
1560 traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
1564 std::string placement(ComputePlacement(task));
1566 // container management for HomogeneousPoolOfContainer
1568 HomogeneousPoolContainer *contC(dynamic_cast<HomogeneousPoolContainer *>(task->getContainer()));
1571 std::lock_guard<std::mutex> alckCont(contC->getLocker());
1572 contC->release(task);
1578 void Executor::makeDatastreamConnections(Task *task)
1580 YACS::StatesForNode state=task->getState();
1581 if(state != YACS::TOLOAD && state != YACS::TORECONNECT)
1585 task->connectService();
1586 traceExec(task, "connectService",ComputePlacement(task));
1588 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1590 }//End of critical section
1592 catch(Exception& ex)
1594 std::cerr << ex.what() << std::endl;
1597 (task)->disconnectService();
1598 traceExec(task, "disconnectService",ComputePlacement(task));
1602 // Disconnect has failed
1603 traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
1606 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1608 _mainSched->notifyFrom(task,YACS::ABORT,this);
1609 }//End of critical section
1613 std::cerr << "Problem in connectService" << std::endl;
1616 (task)->disconnectService();
1617 traceExec(task, "disconnectService",ComputePlacement(task));
1621 // Disconnect has failed
1622 traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
1625 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1627 _mainSched->notifyFrom(task,YACS::ABORT,this);
1628 }//End of critical section
1630 if(task->getState() == YACS::ERROR)
1632 //try to put all coupled tasks in error
1633 std::set<Task*> coupledSet;
1634 task->getCoupledTasks(coupledSet);
1635 for (std::set<Task*>::iterator it = coupledSet.begin(); it != coupledSet.end(); ++it)
1638 if(t == task)continue;
1639 if(t->getState() == YACS::ERROR)continue;
1642 t->disconnectService();
1643 traceExec(t, "disconnectService",ComputePlacement(task));
1647 // Disconnect has failed
1648 traceExec(t, "disconnectService failed, ABORT",ComputePlacement(task));
1651 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1653 _mainSched->notifyFrom(t,YACS::ABORT,this);
1654 }//End of critical section
1655 traceExec(t, "state:"+Node::getStateName(t->getState()),ComputePlacement(task));
1658 traceExec(task, "state:"+Node::getStateName(task->getState()),ComputePlacement(task));
1661 #include "workloadmanager/WorkloadManager.hxx"
1662 #include "workloadmanager/DefaultAlgorithm.hxx"
1663 #include "Runtime.hxx"
1665 void loadResources(WorkloadManager::WorkloadManager& wm)
1667 Runtime *r(getRuntime());
1669 throw YACS::Exception("loadResources : no runtime !");
1670 std::vector< std::pair<std::string,int> > data(r->getCatalogOfComputeNodes());
1672 for(const std::pair<std::string,int>& res : data)
1674 WorkloadManager::Resource newResource;
1675 newResource.name = res.first;
1676 newResource.id = id;
1678 newResource.nbCores = res.second;
1679 wm.addResource(newResource);
1683 class NewTask : public WorkloadManager::Task
1686 NewTask(Executor& executor, YACS::ENGINE::Task* yacsTask);
1687 const WorkloadManager::ContainerType& type()const override;
1688 void run(const WorkloadManager::Container& c)override;
1690 WorkloadManager::ContainerType _type;
1691 Executor& _executor;
1692 YACS::ENGINE::Task * _yacsTask;
1695 NewTask::NewTask(Executor& executor, YACS::ENGINE::Task* yacsTask)
1697 , _executor(executor)
1698 , _yacsTask(yacsTask)
1700 _type.neededCores = 0;
1702 _type.name = "test";
1705 const WorkloadManager::ContainerType& NewTask::type()const
1710 void NewTask::run(const WorkloadManager::Container& c)
1712 _executor.loadTask(_yacsTask);
1713 _executor.makeDatastreamConnections(_yacsTask);
1714 YACS::Event ev = _executor.runTask(_yacsTask);
1715 _executor.endTask(_yacsTask, ev);
1716 delete this; // provisoire
1719 void Executor::newRun(Scheduler *graph,int debug, bool fromScratch)
1721 DEBTRACE("Executor::newRun debug: "<< graph->getName() <<" "<< debug<<" fromScratch: "<<fromScratch);
1722 { // --- Critical section
1723 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1725 _root = dynamic_cast<ComposedNode *>(_mainSched);
1726 if (!_root) throw Exception("Executor::Run, Internal Error!");
1727 _executorState = YACS::NOTYETINITIALIZED;
1728 sendEvent("executor");
1731 _errorDetected = false;
1732 _isWaitingEventsFromRunningTasks = false;
1733 _numberOfRunningTasks = 0;
1734 _runningTasks.clear();
1735 _numberOfEndedTasks = 0;
1736 string tracefile = "traceExec_";
1737 tracefile += _mainSched->getName();
1738 _trace.open(tracefile.c_str());
1739 _start = std::chrono::steady_clock::now();
1740 } // --- End of critical section
1742 if (debug > 1) _displayDot(graph);
1749 graph->exUpdateState();
1751 catch(Exception& ex)
1753 DEBTRACE("exception: "<< (ex.what()));
1754 _executorState = YACS::FINISHED;
1755 sendEvent("executor");
1759 _executorState = YACS::INITIALISED;
1760 sendEvent("executor");
1762 if (debug > 1) _displayDot(graph);
1768 _executorState = YACS::RUNNING;
1769 sendEvent("executor");
1771 WorkloadManager::DefaultAlgorithm algo;
1772 WorkloadManager::WorkloadManager wlm(algo);
1778 DEBTRACE("--- executor main loop");
1779 sleepWhileNoEventsFromAnyRunningTask();
1780 DEBTRACE("--- events...");
1781 if (debug > 2) _displayDot(graph);
1782 { // --- Critical section
1783 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1784 std::vector<Task *> readyTasks=graph->getNextTasks(isMore);
1785 graph->selectRunnableTasks(readyTasks);
1787 for(Task * t : readyTasks)
1788 if(_runningTasks.find(t) == _runningTasks.end())
1789 _tasks.push_back(t);
1790 // TODO: to be removed
1791 FilterTasksConsideringContainers(_tasks);
1792 numberAllTasks=_numberOfRunningTasks+_tasks.size();
1793 } // --- End of critical section
1794 if (debug > 2) _displayDot(graph);
1795 DEBTRACE("--- events...");
1796 if (_executorState == YACS::RUNNING)
1798 if (checkBreakPoints()) break; // end of thread requested, OK to exit at once;
1799 for(Task * task : _tasks)
1802 NewTask* newTask = new NewTask(*this, task);
1803 wlm.addTask(newTask);
1806 if (debug > 1) _displayDot(graph);
1807 { // --- Critical section
1809 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1810 //It is possible that the graph is finished but it remains running tasks (it's an error but we must take it into account)
1811 _toContinue = !graph->isFinished();
1813 DEBTRACE("_numberOfRunningTasks: " << _numberOfRunningTasks);
1814 DEBTRACE("_numberOfEndedTasks: " << _numberOfEndedTasks);
1815 DEBTRACE("_toContinue: " << _toContinue);
1816 if(_toContinue && numberAllTasks==0)
1818 //Problem : no running tasks and no task to launch ??
1820 std::cerr << "Problem in Executor : no running tasks and no task to launch ?? problemCount=" << problemCount << std::endl;
1821 //Pause to give a chance to interrupt
1823 if(problemCount > 25)
1825 // Too much problems encountered : stop execution
1832 _executorState = YACS::FINISHED;
1833 sendEvent("executor");
1834 _condForPilot.notify_all();
1836 } // --- End of critical section
1837 if (debug > 0) _displayDot(graph);
1838 DEBTRACE("_toContinue: " << _toContinue);
1842 DEBTRACE("End of main Loop");
1844 { // --- Critical section
1845 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1846 if ( _toContinue) // --- break while(): request to stop detected on checkBreakPoints()
1848 DEBTRACE("stop requested: End soon");
1849 _executorState = YACS::STOPPED;
1850 _toContinue = false;
1851 sendEvent("executor");
1853 } // --- End of critical section
1854 if ( _dumpOnErrorRequested && _errorDetected)
1856 saveState(_dumpErrorFile);
1859 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForTrace);
1862 DEBTRACE("End of RunB thread");