1 // Copyright (C) 2006-2014 CEA/DEN, EDF R&D
3 // This library is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU Lesser General Public
5 // License as published by the Free Software Foundation; either
6 // version 2.1 of the License, or (at your option) any later version.
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 // Lesser General Public License for more details.
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
20 #include "Executor.hxx"
22 #include "AutoLocker.hxx"
23 #include "Scheduler.hxx"
24 #include "Dispatcher.hxx"
25 #include "Container.hxx"
26 #include "HomogeneousPoolContainer.hxx"
27 #include "ComponentInstance.hxx"
29 #include "VisitorSaveState.hxx"
30 #include "ServiceNode.hxx"
31 #include "ComposedNode.hxx"
45 #define usleep(A) _sleep(A/1000)
46 #if !defined(S_ISCHR) || !defined(S_ISREG)
49 # define S_IFMT _S_IFMT
50 # define S_IFCHR _S_IFCHR
51 # define S_IFREG _S_IFREG
54 # define S_IFMT __S_IFMT
55 # define S_IFCHR __S_IFCHR
56 # define S_IFREG __S_IFREG
60 # define S_ISCHR(mode) (((mode) & S_IFMT) == S_IFCHR)
61 # define S_ISREG(mode) (((mode) & S_IFMT) == S_IFREG)
65 using namespace YACS::ENGINE;
68 using YACS::BASES::Mutex;
69 using YACS::BASES::Thread;
70 using YACS::BASES::Semaphore;
73 #include "YacsTrace.hxx"
75 int Executor::_maxThreads(50);
76 size_t Executor::_threadStackSize(1048576); // Default thread stack size is 1MB
78 Executor::Executor():_nbOfConcurrentThreads(0), _semForMaxThreads(_maxThreads)
83 _stopOnErrorRequested = false;
84 _dumpOnErrorRequested = false;
85 _errorDetected = false;
86 _isRunningunderExternalControl=false;
87 _executorState = YACS::NOTYETINITIALIZED;
88 _execMode = YACS::CONTINUE;
89 _semThreadCnt = _maxThreads;
90 DEBTRACE("Executor initialized with max threads = " << _maxThreads);
95 for(list<Thread *>::iterator iter=_groupOfAllThreadsCreated.begin();iter!=_groupOfAllThreadsCreated.end();iter++)
99 //! Execute a graph waiting for completion
101 * \param graph : schema to execute
102 * \param debug : display the graph with dot if debug == 1
103 * \param fromScratch : if true the graph is reinitialized
105 * Calls Scheduler::getNextTasks and Scheduler::selectRunnableTasks to select tasks to execute
107 * Calls Executor::launchTask to execute a selected Task.
109 * Completion when graph is finished (Scheduler::isFinished)
112 void Executor::RunA(Scheduler *graph,int debug, bool fromScratch)
114 DEBTRACE("Executor::RunW debug: " << debug << " fromScratch: " << fromScratch);
116 _root = dynamic_cast<ComposedNode *>(_mainSched);
117 if (!_root) throw Exception("Executor::Run, Internal Error!");
120 if(debug>1)_displayDot(graph);
124 graph->exUpdateState();
126 if(debug>1)_displayDot(graph);
127 vector<Task *> tasks;
128 vector<Task *>::iterator iter;
130 _execMode = YACS::CONTINUE;
131 _isWaitingEventsFromRunningTasks = false;
132 _numberOfRunningTasks = 0;
133 _runningTasks.clear();
134 _numberOfEndedTasks=0;
137 sleepWhileNoEventsFromAnyRunningTask();
139 if(debug>2)_displayDot(graph);
142 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
143 tasks=graph->getNextTasks(isMore);
144 graph->selectRunnableTasks(tasks);
145 }//End of critical section
147 if(debug>2)_displayDot(graph);
149 for(iter=tasks.begin();iter!=tasks.end();iter++)
152 if(debug>1)_displayDot(graph);
156 if(debug>1)_displayDot(graph);
159 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
160 _toContinue=!graph->isFinished();
161 }//End of critical section
162 DEBTRACE("_toContinue: " << _toContinue);
164 if(debug>0)_displayDot(graph);
170 //! Execute a graph with breakpoints or step by step
172 * To be launch in a thread (main thread controls the progression).
173 * \param graph : schema to execute
174 * \param debug : display the graph with dot if debug >0
175 * \param fromScratch : if false, state from a previous partial exection is already loaded
177 * Calls Scheduler::getNextTasks and Scheduler::selectRunnableTasks to select tasks to execute
179 * Calls Executor::checkBreakPoints to verify if a pause is requested
181 * Calls Executor::launchTask to execute a selected Task
183 * Completion when graph is finished (Scheduler::isFinished)
185 * States of execution:
186 * - YACS::NOTYETINITIALIZED
187 * - YACS::INITIALISED
188 * - YACS::RUNNING (to next breakpoint or step)
189 * - YACS::WAITINGTASKS (a breakpoint or step as been reached, but there are still running tasks)
190 * - YACS::PAUSED (a breakpoint or step as been reached, no more running tasks)
191 * - YACS::FINISHED (no more ready tasks, nore running tasks)
192 * - YACS::STOPPED (stopped by user before end)
194 * Modes of Execution:
195 * - YACS::CONTINUE (normal run without breakpoints)
196 * - YACS::STEPBYSTEP (pause at each loop)
197 * - YACS::STOPBEFORENODES (pause when a node is reached)
199 * A breakpoint is defined by a node name. The breakpoint is reached when the node becomes ready.
200 * Step by Step means execution node by node or group of node by group of nodes.
201 * At a given step, the user decides to launch all the ready nodes or only a subset
202 * (Caution: some nodes must run in parallel).
203 * The next event (end of task) may give a new set of ready nodes, and define a new step.
205 * The graph execution may be controled by a pilot which sends requests. Requests are asynchronous.
206 * Requests are taken into account only on certain states, otherwise return the status IgnoredRequest.
207 * - Executor::getCurrentExecMode
208 * - Executor::getExecutorState
209 * - Executor::setExecMode : change the execution mode for next loop
210 * - Executor::setListOfBreakPoints : must be set before setting YACS::STOPBEFORENODES
211 * - Executor::getTasksToLoad : when paused or waiting tasks, get the list of next tasks
212 * - Executor::setStepsToExecute : define a subset of the list given by Executor::getTasksToLoad
213 * - Executor::resumeCurrentBreakPoint : when paused or waiting tasks, resumes execution
214 * - Executor::isNotFinished
215 * - Executor::stopExecution : stop execution asap, i.e. set STEPBYSTEP and wait PAUSED
216 * - Executor::saveState : dump the current state of execution in an xml file
217 * - Executor::loadState : Not yet implemented
218 * - Executor::getNbOfThreads
219 * - Executor::displayDot
220 * - Executor::setStopOnError : ask to stop execution if a node is found in ERROR state
222 * If the pilot wants to wait the state YACS::PAUSED or YACS::WAITINGTASKS, synchronisation is obtained with:
223 * - Executor::waitPause
226 * - Pilot may connect to executor during execution, or deconnect.
227 * - Several Pilots may be connected at the same time (for observation...)
231 void Executor::RunB(Scheduler *graph,int debug, bool fromScratch)
233 DEBTRACE("Executor::RunB debug: "<< graph->getName() <<" "<< debug<<" fromScratch: "<<fromScratch);
235 { // --- Critical section
236 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
238 _root = dynamic_cast<ComposedNode *>(_mainSched);
239 if (!_root) throw Exception("Executor::Run, Internal Error!");
240 _executorState = YACS::NOTYETINITIALIZED;
241 sendEvent("executor");
244 _errorDetected = false;
245 _isWaitingEventsFromRunningTasks = false;
246 _numberOfRunningTasks = 0;
247 _runningTasks.clear();
248 _numberOfEndedTasks = 0;
249 string tracefile = "traceExec_";
250 tracefile += _mainSched->getName();
251 _trace.open(tracefile.c_str());
253 _start = timeGetTime();
255 gettimeofday(&_start, NULL);
258 } // --- End of critical section
260 if (debug > 1) _displayDot(graph);
267 graph->exUpdateState();
271 DEBTRACE("exception: "<< (ex.what()));
272 _executorState = YACS::FINISHED;
273 sendEvent("executor");
277 _executorState = YACS::INITIALISED;
278 sendEvent("executor");
280 if (debug > 1) _displayDot(graph);
282 vector<Task *>::iterator iter;
287 _executorState = YACS::RUNNING;
288 sendEvent("executor");
291 DEBTRACE("--- executor main loop");
292 sleepWhileNoEventsFromAnyRunningTask();
293 DEBTRACE("--- events...");
294 if (debug > 2) _displayDot(graph);
295 { // --- Critical section
296 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
297 _tasks=graph->getNextTasks(isMore);
298 numberAllTasks=_numberOfRunningTasks+_tasks.size();
299 graph->selectRunnableTasks(_tasks);
300 FilterTasksConsideringContainers(_tasks);
301 } // --- End of critical section
302 if (debug > 2) _displayDot(graph);
303 if (_executorState == YACS::RUNNING)
305 if (checkBreakPoints()) break; // end of thread requested, OK to exit at once;
306 if (debug > 0) _displayDot(graph);
309 loadParallelTasks(_tasks);
310 if (debug > 1) _displayDot(graph);
315 if (debug > 1) _displayDot(graph);
316 { // --- Critical section
318 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
319 //It is possible that the graph is finished but it remains running tasks (it's an error but we must take it into account)
320 if(_numberOfRunningTasks == 0)
321 _toContinue = !graph->isFinished();
323 DEBTRACE("_numberOfRunningTasks: " << _numberOfRunningTasks);
324 DEBTRACE("_numberOfEndedTasks: " << _numberOfEndedTasks);
325 DEBTRACE("_toContinue: " << _toContinue);
326 if(_toContinue && numberAllTasks==0)
328 //Problem : no running tasks and no task to launch ??
330 std::cerr << "Problem in Executor : no running tasks and no task to launch ?? problemCount=" << problemCount << std::endl;
331 //Pause to give a chance to interrupt
333 if(problemCount > 25)
335 // Too much problems encountered : stop execution
342 _executorState = YACS::FINISHED;
343 sendEvent("executor");
344 _condForPilot.notify_all();
346 } // --- End of critical section
347 if (debug > 0) _displayDot(graph);
348 DEBTRACE("_toContinue: " << _toContinue);
351 DEBTRACE("End of main Loop");
353 { // --- Critical section
354 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
355 if ( _toContinue) // --- break while(): request to stop detected on checkBreakPoints()
357 DEBTRACE("stop requested: End soon");
358 _executorState = YACS::STOPPED;
360 sendEvent("executor");
362 } // --- End of critical section
363 if ( _dumpOnErrorRequested && _errorDetected)
365 saveState(_dumpErrorFile);
368 DEBTRACE("End of RunB thread");
371 YACS::ExecutionMode Executor::getCurrentExecMode()
373 _isRunningunderExternalControl=true;
378 YACS::ExecutorState Executor::getExecutorState()
380 _isRunningunderExternalControl=true;
381 return _executorState;
385 bool Executor::isNotFinished()
387 _isRunningunderExternalControl=true;
391 //! ask to stop execution on the first node found in error
393 * \param dumpRequested produce a state dump when an error is found
394 * \param xmlFile name of file used for state dump
397 void Executor::setStopOnError(bool dumpRequested, std::string xmlFile)
399 { // --- Critical section
400 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
401 _dumpErrorFile=xmlFile;
402 _stopOnErrorRequested=true;
403 _dumpOnErrorRequested = dumpRequested;
404 if (dumpRequested && xmlFile.empty())
405 throw YACS::Exception("dump on error requested and no filename given for dump");
406 DEBTRACE("_dumpErrorFile " << _dumpErrorFile << " " << _dumpOnErrorRequested);
407 } // --- End of critical section
410 //! ask to do not stop execution on nodes found in error
414 void Executor::unsetStopOnError()
416 { // --- Critical section
417 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
418 _stopOnErrorRequested=false;
419 } // --- End of critical section
422 //! Dynamically set the current mode of execution
424 * The mode can be Continue, step by step, or stop before execution of a node
425 * defined in a list of breakpoints.
428 void Executor::setExecMode(YACS::ExecutionMode mode)
430 DEBTRACE("Executor::setExecMode(YACS::ExecutionMode mode) " << mode);
431 { // --- Critical section
432 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
433 _isRunningunderExternalControl=true;
435 } // --- End of critical section
438 //! wake up executor when in pause
440 * When Executor is in state paused or waiting for task completion, the thread
441 * running loop RunB waits on condition _condForStepByStep.
442 * Thread RunB is waken up.
443 * \return true when actually wakes up executor
446 bool Executor::resumeCurrentBreakPoint()
448 DEBTRACE("Executor::resumeCurrentBreakPoint()");
450 //bool doDump = false;
451 { // --- Critical section
452 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
453 _isRunningunderExternalControl=true;
454 DEBTRACE("_executorState: " << _executorState);
455 switch (_executorState)
457 case YACS::WAITINGTASKS:
460 _condForStepByStep.notify_all();
461 _executorState = YACS::RUNNING;
462 sendEvent("executor");
464 //if (_dumpOnErrorRequested && _errorDetected) doDump =true;
470 //if (_dumpOnErrorRequested && _errorDetected) doDump =true;
471 DEBTRACE("Graph Execution finished or stopped !");
476 // debug: no easy way to verify if main loop is acutally waiting on condition
480 //if (doDump) saveState(_dumpErrorFile);
481 } // --- End of critical section
486 //! define a list of nodes names as breakpoints in the graph
489 void Executor::setListOfBreakPoints(std::list<std::string> listOfBreakPoints)
491 DEBTRACE("Executor::setListOfBreakPoints(std::list<std::string> listOfBreakPoints)");
492 { // --- Critical section
493 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
494 _isRunningunderExternalControl=true;
495 _listOfBreakPoints = listOfBreakPoints;
496 } // --- End of critical section
500 //! Get the list of tasks to load, to define a subset to execute in step by step mode
502 * If the executor is not in mode YACS::WAITINGTASKS nor YACS::PAUSED, the list is empty.
503 * Use Executor::waitPause to wait.
505 std::list<std::string> Executor::getTasksToLoad()
507 DEBTRACE("Executor::getTasksToLoad()");
508 list<string> listOfNodesToLoad;
509 listOfNodesToLoad.clear();
510 { // --- Critical section
511 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
512 _isRunningunderExternalControl=true;
513 switch (_executorState)
515 case YACS::WAITINGTASKS:
518 listOfNodesToLoad = _listOfTasksToLoad;
521 case YACS::NOTYETINITIALIZED:
522 case YACS::INITIALISED:
531 } // --- End of critical section
532 return listOfNodesToLoad;
536 //! Define a subset of task to execute in step by step mode
538 * Behaviour is unpredictable if the list is not a subset of the list given by Executor::getTasksToLoad
539 * in the current step.
540 * If some nodes must run in parallel, they must stay together in the list.
543 bool Executor::setStepsToExecute(std::list<std::string> listToExecute)
545 DEBTRACE("Executor::setStepsToExecute(std::list<std::string> listToExecute)");
547 vector<Task *>::iterator iter;
548 vector<Task *> restrictedTasks;
549 { // --- Critical section
550 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
551 _isRunningunderExternalControl=true;
552 switch (_executorState)
554 case YACS::WAITINGTASKS:
557 for (iter=_tasksSave.begin(); iter!=_tasksSave.end(); iter++)
559 string readyNode = _mainSched->getTaskName(*iter);
560 if (find(listToExecute.begin(), listToExecute.end(), readyNode)
561 != listToExecute.end())
563 restrictedTasks.push_back(*iter);
564 DEBTRACE("node to execute " << readyNode);
568 for (iter=restrictedTasks.begin(); iter!=restrictedTasks.end(); iter++)
570 _tasks.push_back(*iter);
574 case YACS::NOTYETINITIALIZED:
575 case YACS::INITIALISED:
584 } // --- End of critical section
587 for (iter=restrictedTasks.begin(); iter!=restrictedTasks.end(); iter++)
589 _tasks.push_back(*iter);
591 for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
593 string readyNode = _mainSched->getTaskName(*iter);
594 DEBTRACE("selected node to execute " << readyNode);
600 //! suspend pilot execution until Executor is in pause or waiting tasks completion mode.
602 * Do nothing if execution is finished or in pause.
603 * Wait first step if Executor is running or in initialization.
606 void Executor::waitPause()
608 DEBTRACE("Executor::waitPause()" << _executorState);
609 { // --- Critical section
610 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
611 _isRunningunderExternalControl=true;
612 switch (_executorState)
617 case YACS::WAITINGTASKS:
622 case YACS::NOTYETINITIALIZED:
623 case YACS::INITIALISED:
626 _condForPilot.wait(_mutexForSchedulerUpdate); // wait until executor is PAUSED or WAITINGTASKS
630 } // --- End of critical section
634 //! stops the execution as soon as possible
636 void Executor::stopExecution()
638 setExecMode(YACS::STEPBYSTEP);
641 resumeCurrentBreakPoint();
644 //! save the current state of execution in an xml file
646 bool Executor::saveState(const std::string& xmlFile)
648 DEBTRACE("Executor::saveState() in " << xmlFile);
649 YACS::ENGINE::VisitorSaveState vst(_root);
650 vst.openFileDump(xmlFile.c_str());
656 //! not yet implemented
658 bool Executor::loadState()
660 DEBTRACE("Executor::loadState()");
661 _isRunningunderExternalControl=true;
666 static int isfile(const char *filename)
669 if (stat(filename, &buf) != 0)
671 if (!S_ISREG(buf.st_mode))
676 //! Display the graph state as a dot display, public method
678 void Executor::displayDot(Scheduler *graph)
680 _isRunningunderExternalControl=true;
684 //! Display the graph state as a dot display
686 * \param graph : the node to display
689 void Executor::_displayDot(Scheduler *graph)
691 std::ofstream g("titi");
692 ((ComposedNode*)graph)->writeDot(g);
694 const char displayScript[]="display.sh";
695 if(isfile(displayScript))
696 system("sh display.sh");
698 system("dot -Tpng titi|display -delay 5");
701 //! Wait reactivation in modes Step By step or with BreakPoints
703 * Check mode of execution (set by main thread):
704 * - YACS::CONTINUE : the graph execution continues.
705 * - YACS::STEPBYSTEP : wait on condition (pilot thread, Executor::resumeCurrentBreakPoint)
706 * - YACS::STOPBEFORENODES : if there are ready nodes in a list of breakpoints,
707 * wait on condition (pilot thread, Executor::resumeCurrentBreakPoint)
708 * else continue the graph execution.
709 * \return true if end of executor thread is requested
712 bool Executor::checkBreakPoints()
714 DEBTRACE("Executor::checkBreakPoints()");
715 vector<Task *>::iterator iter;
716 bool endRequested = false;
724 case YACS::STOPBEFORENODES:
727 { // --- Critical section
728 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
730 for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
732 string nodeToLoad = _mainSched->getTaskName(*iter);
733 if (find(_listOfBreakPoints.begin(), _listOfBreakPoints.end(), nodeToLoad)
734 != _listOfBreakPoints.end())
742 _listOfTasksToLoad.clear();
743 for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
745 string nodeToLoad = _mainSched->getTaskName(*iter);
746 _listOfTasksToLoad.push_back(nodeToLoad);
748 if (getNbOfThreads())
749 _executorState = YACS::WAITINGTASKS; // will be paused after completion of running tasks
751 _executorState = YACS::PAUSED;
752 sendEvent("executor");
753 _condForPilot.notify_all();
755 if (stop && !_isOKToEnd) waitResume(); // wait until pilot calls resumeCurrentBreakPoint(), mutex released during wait
756 if (_isOKToEnd) endRequested = true;
757 } // --- End of critical section
758 if (stop) DEBTRACE("wake up from waitResume");
762 case YACS::STEPBYSTEP:
764 { // --- Critical section
765 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
767 _listOfTasksToLoad.clear();
768 for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
770 string nodeToLoad = _mainSched->getTaskName(*iter);
771 _listOfTasksToLoad.push_back(nodeToLoad);
773 if (getNbOfThreads())
774 _executorState = YACS::WAITINGTASKS; // will be paused after completion of running tasks
776 _executorState = YACS::PAUSED;
777 sendEvent("executor");
778 _condForPilot.notify_all();
780 waitResume(); // wait until pilot calls resumeCurrentBreakPoint(), mutex released during wait
781 // or, if no pilot, wait until no more running tasks (stop on error)
782 if (_isOKToEnd) endRequested = true;
783 } // --- End of critical section
784 DEBTRACE("wake up from waitResume");
788 DEBTRACE("endRequested: " << endRequested);
793 //! in modes Step By step or with BreakPoint, wait until pilot resumes the execution
795 * With the condition Mutex, the mutex is released atomically during the wait.
796 * Pilot calls Executor::resumeCurrentBreakPoint to resume execution.
797 * Must be called while mutex is locked.
800 void Executor::waitResume()
802 DEBTRACE("Executor::waitResume()");
803 _condForStepByStep.wait(_mutexForSchedulerUpdate); // wait until pilot calls resumeCurrentBreakPoint()
808 //! Perform loading of a Task.
810 * \param task : Task to load
813 void Executor::loadTask(Task *task)
815 DEBTRACE("Executor::loadTask(Task *task)");
816 if(task->getState() != YACS::TOLOAD)
818 traceExec(task, "state:TOLOAD", ComputePlacement(task));
820 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
821 _mainSched->notifyFrom(task,YACS::START);
822 }//End of critical section
825 traceExec(task, "load", ComputePlacement(task));
827 traceExec(task, "initService", ComputePlacement(task));
832 std::cerr << ex.what() << std::endl;
834 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
836 _mainSched->notifyFrom(task,YACS::ABORT);
837 traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
838 }//End of critical section
842 std::cerr << "Load failed" << std::endl;
844 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
846 _mainSched->notifyFrom(task,YACS::ABORT);
847 traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
848 }//End of critical section
859 void Executor::loadTasks(const std::vector<Task *>& tasks)
861 for(std::vector<Task *>::const_iterator iter = _tasks.begin(); iter != _tasks.end(); iter++)
865 void Executor::loadParallelTasks(const std::vector<Task *>& tasks)
867 std::vector<Thread> ths(tasks.size());
868 std::size_t ithread(0);
869 for(std::vector<Task *>::const_iterator iter = _tasks.begin(); iter != _tasks.end(); iter++, ithread++)
871 DEBTRACE("Executor::loadParallelTasks(Task *task)");
872 struct threadargs *args(new threadargs);
873 args->task = (*iter);
874 args->sched = _mainSched;
875 args->execInst = this;
876 ths[ithread].go(functionForTaskLoad, args, _threadStackSize);
878 for(ithread=0;ithread<tasks.size();ithread++)
882 //! Execute a list of tasks possibly connected through datastream links
884 * \param tasks : a list of tasks to execute
887 void Executor::launchTasks(const std::vector<Task *>& tasks)
889 //First phase, make datastream connections
890 for(vector<Task *>::const_iterator iter=tasks.begin();iter!=tasks.end();iter++)
892 YACS::StatesForNode state=(*iter)->getState();
893 if(state != YACS::TOLOAD && state != YACS::TORECONNECT)continue;
896 (*iter)->connectService();
897 traceExec(*iter, "connectService",ComputePlacement(*iter));
899 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
900 (*iter)->connected();
901 }//End of critical section
905 std::cerr << ex.what() << std::endl;
908 (*iter)->disconnectService();
909 traceExec(*iter, "disconnectService",ComputePlacement(*iter));
913 // Disconnect has failed
914 traceExec(*iter, "disconnectService failed, ABORT",ComputePlacement(*iter));
917 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
919 _mainSched->notifyFrom(*iter,YACS::ABORT);
920 }//End of critical section
924 std::cerr << "Problem in connectService" << std::endl;
927 (*iter)->disconnectService();
928 traceExec(*iter, "disconnectService",ComputePlacement(*iter));
932 // Disconnect has failed
933 traceExec(*iter, "disconnectService failed, ABORT",ComputePlacement(*iter));
936 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
938 _mainSched->notifyFrom(*iter,YACS::ABORT);
939 }//End of critical section
941 if((*iter)->getState() == YACS::ERROR)
943 //try to put all coupled tasks in error
944 std::set<Task*> coupledSet;
945 (*iter)->getCoupledTasks(coupledSet);
946 for (std::set<Task*>::iterator it = coupledSet.begin(); it != coupledSet.end(); ++it)
949 if(t == *iter)continue;
950 if(t->getState() == YACS::ERROR)continue;
953 t->disconnectService();
954 traceExec(t, "disconnectService",ComputePlacement(*iter));
958 // Disconnect has failed
959 traceExec(t, "disconnectService failed, ABORT",ComputePlacement(*iter));
962 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
964 _mainSched->notifyFrom(t,YACS::ABORT);
965 }//End of critical section
966 traceExec(t, "state:"+Node::getStateName(t->getState()),ComputePlacement(*iter));
969 traceExec(*iter, "state:"+Node::getStateName((*iter)->getState()),ComputePlacement(*iter));
972 //Second phase, execute each task in a thread
973 for(vector<Task *>::const_iterator iter=tasks.begin();iter!=tasks.end();iter++)
979 //! Execute a Task in a thread
981 * \param task : Task to execute
983 * Calls Scheduler::notifyFrom of main node (_mainSched) to notify start
985 * Calls Executor::functionForTaskExecution in Thread
988 void Executor::launchTask(Task *task)
990 DEBTRACE("Executor::launchTask(Task *task)");
991 struct threadargs *args;
992 if(task->getState() != YACS::TOACTIVATE)return;
994 DEBTRACE("before _semForMaxThreads.wait " << _semThreadCnt);
995 if(_semThreadCnt == 0)
997 //check if we have enough threads to run
998 std::set<Task*> tmpSet=_runningTasks;
999 std::set<Task*>::iterator it = tmpSet.begin();
1000 std::string status="running";
1001 std::set<Task*> coupledSet;
1002 while( it != tmpSet.end() )
1006 tt->getCoupledTasks(coupledSet);
1008 for (std::set<Task*>::iterator iter = coupledSet.begin(); iter != coupledSet.end(); ++iter)
1010 if((*iter)->getState() == YACS::TOACTIVATE)status="toactivate";
1011 tmpSet.erase(*iter);
1013 if(status=="running")break;
1014 it = tmpSet.begin();
1017 if(status=="toactivate")
1019 std::cerr << "WARNING: maybe you need more threads to run your schema (current value="<< _maxThreads << ")" << std::endl;
1020 std::cerr << "If it is the case, set the YACS_MAX_THREADS environment variable to a bigger value (export YACS_MAX_THREADS=xxx)" << std::endl;
1024 _semForMaxThreads.wait();
1027 args= new threadargs;
1029 args->sched = _mainSched;
1030 args->execInst = this;
1032 traceExec(task, "launch",ComputePlacement(task));
1034 { // --- Critical section
1035 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1036 _numberOfRunningTasks++;
1037 _runningTasks.insert(task);
1038 task->begin(); //change state to ACTIVATED
1039 } // --- End of critical section
1040 Thread(functionForTaskExecution, args, _threadStackSize);
1043 //! wait until a running task ends
1045 void Executor::sleepWhileNoEventsFromAnyRunningTask()
1047 DEBTRACE("Executor::sleepWhileNoEventsFromAnyRunningTask()");
1048 // _semForNewTasksToPerform.wait(); //----utiliser pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
1049 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
1050 if (_numberOfRunningTasks > 0 && _numberOfEndedTasks==0)
1052 _isWaitingEventsFromRunningTasks = true;
1053 _condForNewTasksToPerform.wait(_mutexForSchedulerUpdate); // mutex released during wait
1055 _numberOfEndedTasks=0;
1061 void Executor::notifyEndOfThread(YACS::BASES::Thread *thread)
1063 /*_mutexForNbOfConcurrentThreads.lock();
1064 _groupOfAllThreadsCreated.remove(thread);
1066 _mutexForNbOfConcurrentThreads.unlock();*/
1070 //! must be used protected by _mutexForSchedulerUpdate!
1072 void Executor::wakeUp()
1074 DEBTRACE("Executor::wakeUp() " << _isWaitingEventsFromRunningTasks);
1075 if (_isWaitingEventsFromRunningTasks)
1077 _isWaitingEventsFromRunningTasks = false;
1078 _condForNewTasksToPerform.notify_all();
1081 _numberOfEndedTasks++;
1084 //! number of running tasks
1086 int Executor::getNbOfThreads()
1089 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForNbOfConcurrentThreads);
1090 _isRunningunderExternalControl=true;
1091 ret = _groupOfAllThreadsCreated.size();
1096 * This thread is NOT supposed to be detached !
1098 void *Executor::functionForTaskLoad(void *arg)
1100 DEBTRACE("Executor::functionForTaskLoad(void *arg)");
1101 struct threadargs *args = (struct threadargs *) arg;
1102 Task *task=args->task;
1103 Scheduler *sched=args->sched;
1104 Executor *execInst=args->execInst;
1106 execInst->loadTask(task);// no throw of this method - all throw are catched !
1110 //! Function to perform execution of a task in a thread
1112 * \param arg : 3 elements (a Task, a Scheduler, an Executor)
1114 * Calls Task::execute
1116 * Calls Task::finished when the task is finished
1118 * Calls (notify with event YACS::FINISH) Scheduler::notifyFrom when the task is finished
1120 * Calls Executor::wakeUp and Executor::notifyEndOfThread
1123 void *Executor::functionForTaskExecution(void *arg)
1125 DEBTRACE("Executor::functionForTaskExecution(void *arg)");
1127 struct threadargs *args = (struct threadargs *) arg;
1128 Task *task=args->task;
1129 Scheduler *sched=args->sched;
1130 Executor *execInst=args->execInst;
1132 execInst->traceExec(task, "state:"+Node::getStateName(task->getState()),ComputePlacement(task));
1138 YACS::Event ev=YACS::FINISH;
1141 execInst->traceExec(task, "start execution",ComputePlacement(task));
1143 execInst->traceExec(task, "end execution OK",ComputePlacement(task));
1145 catch(Exception& ex)
1147 std::cerr << "YACS Exception during execute" << std::endl;
1148 std::cerr << ex.what() << std::endl;
1150 string message = "end execution ABORT, ";
1151 message += ex.what();
1152 execInst->traceExec(task, message,ComputePlacement(task));
1156 // Execution has failed
1157 std::cerr << "Execution has failed: unknown reason" << std::endl;
1159 execInst->traceExec(task, "end execution ABORT, unknown reason",ComputePlacement(task));
1165 DEBTRACE("task->disconnectService()");
1166 task->disconnectService();
1167 execInst->traceExec(task, "disconnectService",ComputePlacement(task));
1171 // Disconnect has failed
1172 std::cerr << "disconnect has failed" << std::endl;
1174 execInst->traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
1178 std::string placement(ComputePlacement(task));
1180 // container management for HomogeneousPoolOfContainer
1182 HomogeneousPoolContainer *contC(dynamic_cast<HomogeneousPoolContainer *>(task->getContainer()));
1185 YACS::BASES::AutoLocker<Container> alckCont(contC);
1186 contC->release(task);
1189 DEBTRACE("End task->execute()");
1190 { // --- Critical section
1191 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&execInst->_mutexForSchedulerUpdate);
1194 if (ev == YACS::FINISH) task->finished();
1195 if (ev == YACS::ABORT)
1197 execInst->_errorDetected = true;
1198 if (execInst->_stopOnErrorRequested)
1200 execInst->_execMode = YACS::STEPBYSTEP;
1201 execInst->_isOKToEnd = true;
1205 execInst->traceExec(task, "state:"+Node::getStateName(task->getState()),placement);
1206 sched->notifyFrom(task,ev);
1208 catch(Exception& ex)
1210 //notify has failed : it is supposed to have set state
1211 //so no need to do anything
1212 std::cerr << "Error during notification" << std::endl;
1213 std::cerr << ex.what() << std::endl;
1217 //notify has failed : it is supposed to have set state
1218 //so no need to do anything
1219 std::cerr << "Notification failed" << std::endl;
1221 execInst->_numberOfRunningTasks--;
1222 execInst->_runningTasks.erase(task);
1223 DEBTRACE("_numberOfRunningTasks: " << execInst->_numberOfRunningTasks
1224 << " _execMode: " << execInst->_execMode
1225 << " _executorState: " << execInst->_executorState);
1226 if ((execInst->_numberOfRunningTasks == 0) && (execInst->_execMode != YACS::CONTINUE)) // no more running tasks
1228 if (execInst->_executorState == YACS::WAITINGTASKS)
1230 execInst->_executorState = YACS::PAUSED;
1231 execInst->sendEvent("executor");
1232 execInst->_condForPilot.notify_all();
1233 if (execInst->_errorDetected &&
1234 execInst->_stopOnErrorRequested &&
1235 !execInst->_isRunningunderExternalControl)
1236 execInst->_condForStepByStep.notify_all(); // exec thread may be on waitResume
1239 DEBTRACE("before _semForMaxThreads.post " << execInst->_semThreadCnt);
1240 execInst->_semForMaxThreads.post();
1241 execInst->_semThreadCnt += 1;
1242 DEBTRACE("after _semForMaxThreads.post " << execInst->_semThreadCnt);
1243 if (execInst->_executorState != YACS::PAUSED) execInst->wakeUp();
1245 } // --- End of critical section (change state)
1247 //execInst->notifyEndOfThread(0);
1252 void Executor::traceExec(Task *task, const std::string& message, const std::string& placement)
1254 string nodeName = _mainSched->getTaskName(task);
1255 Container *cont = task->getContainer();
1256 string containerName = "---";
1258 containerName = cont->getName();
1261 DWORD now = timeGetTime();
1262 double elapse = (now - _start)/1000.0;
1265 gettimeofday(&now, NULL);
1266 double elapse = (now.tv_sec - _start.tv_sec) + double(now.tv_usec - _start.tv_usec)/1000000.0;
1269 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForTrace);
1270 _trace << elapse << " " << containerName << " " << placement << " " << nodeName << " " << message << endl;
1275 //! emit notification to all observers registered with the dispatcher
1277 * The dispatcher is unique and can be obtained by getDispatcher()
1279 void Executor::sendEvent(const std::string& event)
1281 Dispatcher* disp=Dispatcher::getDispatcher();
1284 disp->dispatch(_root,event);
1288 * This method takes in input a list of tasks and selects from that lists a part of it considering only the containers.
1289 * If tasks have no container instance subclass of HomogeneousPoolContainer this method will let the \a tsks untouched.
1291 * \param [in,out] tsks - list of tasks to be
1293 void Executor::FilterTasksConsideringContainers(std::vector<Task *>& tsks)
1295 std::map<HomogeneousPoolContainer *, std::vector<Task *> > m;
1296 for(std::vector<Task *>::const_iterator it=tsks.begin();it!=tsks.end();it++)
1301 Container *cont(cur->getContainer());
1304 m[(HomogeneousPoolContainer *)NULL].push_back(cur);
1307 HomogeneousPoolContainer *contC(dynamic_cast<HomogeneousPoolContainer *>(cont));
1310 m[(HomogeneousPoolContainer *)NULL].push_back(cur);
1313 m[contC].push_back(cur);
1316 std::vector<Task *> ret;
1317 for(std::map<HomogeneousPoolContainer *, std::vector<Task *> >::const_iterator it=m.begin();it!=m.end();it++)
1319 HomogeneousPoolContainer *curhpc((*it).first);
1320 const std::vector<Task *>& curtsks((*it).second);
1323 ret.insert(ret.end(),curtsks.begin(),curtsks.end());
1327 // start of critical section for container curhpc
1328 YACS::BASES::AutoLocker<Container> alckForCont(curhpc);
1329 std::vector<const Task *> vecOfTaskSharingSameHPContToBeRunSimutaneously;
1330 std::size_t sz(curhpc->getNumberOfFreePlace());
1331 std::vector<Task *>::const_iterator it2(curtsks.begin());
1332 for(std::size_t i=0;i<sz && it2!=curtsks.end();i++,it2++)
1334 vecOfTaskSharingSameHPContToBeRunSimutaneously.push_back(*it2);
1335 ret.push_back(*it2);
1337 curhpc->allocateFor(vecOfTaskSharingSameHPContToBeRunSimutaneously);
1338 //end of critical section
1345 std::string Executor::ComputePlacement(Task *zeTask)
1347 std::string placement("---");
1350 if(zeTask->getContainer())
1351 placement=zeTask->getContainer()->getFullPlacementId(zeTask);