1 // Copyright (C) 2006-2014 CEA/DEN, EDF R&D
3 // This library is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU Lesser General Public
5 // License as published by the Free Software Foundation; either
6 // version 2.1 of the License, or (at your option) any later version.
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 // Lesser General Public License for more details.
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
20 #include "Executor.hxx"
22 #include "AutoLocker.hxx"
23 #include "Scheduler.hxx"
24 #include "Dispatcher.hxx"
25 #include "Container.hxx"
26 #include "ComponentInstance.hxx"
28 #include "VisitorSaveState.hxx"
29 #include "ServiceNode.hxx"
30 #include "ComposedNode.hxx"
44 #define usleep(A) _sleep(A/1000)
45 #if !defined(S_ISCHR) || !defined(S_ISREG)
48 # define S_IFMT _S_IFMT
49 # define S_IFCHR _S_IFCHR
50 # define S_IFREG _S_IFREG
53 # define S_IFMT __S_IFMT
54 # define S_IFCHR __S_IFCHR
55 # define S_IFREG __S_IFREG
59 # define S_ISCHR(mode) (((mode) & S_IFMT) == S_IFCHR)
60 # define S_ISREG(mode) (((mode) & S_IFMT) == S_IFREG)
64 using namespace YACS::ENGINE;
67 using YACS::BASES::Mutex;
68 using YACS::BASES::Thread;
69 using YACS::BASES::Semaphore;
72 #include "YacsTrace.hxx"
74 int Executor::_maxThreads(50);
75 size_t Executor::_threadStackSize(1048576); // Default thread stack size is 1MB
77 Executor::Executor():_nbOfConcurrentThreads(0), _semForMaxThreads(_maxThreads)
82 _stopOnErrorRequested = false;
83 _dumpOnErrorRequested = false;
84 _errorDetected = false;
85 _isRunningunderExternalControl=false;
86 _executorState = YACS::NOTYETINITIALIZED;
87 _execMode = YACS::CONTINUE;
88 _semThreadCnt = _maxThreads;
89 DEBTRACE("Executor initialized with max threads = " << _maxThreads);
94 for(list<Thread *>::iterator iter=_groupOfAllThreadsCreated.begin();iter!=_groupOfAllThreadsCreated.end();iter++)
98 //! Execute a graph waiting for completion
100 * \param graph : schema to execute
101 * \param debug : display the graph with dot if debug == 1
102 * \param fromScratch : if true the graph is reinitialized
104 * Calls Scheduler::getNextTasks and Scheduler::selectRunnableTasks to select tasks to execute
106 * Calls Executor::launchTask to execute a selected Task.
108 * Completion when graph is finished (Scheduler::isFinished)
111 void Executor::RunA(Scheduler *graph,int debug, bool fromScratch)
113 DEBTRACE("Executor::RunW debug: " << debug << " fromScratch: " << fromScratch);
115 _root = dynamic_cast<ComposedNode *>(_mainSched);
116 if (!_root) throw Exception("Executor::Run, Internal Error!");
119 if(debug>1)_displayDot(graph);
123 graph->exUpdateState();
125 if(debug>1)_displayDot(graph);
126 vector<Task *> tasks;
127 vector<Task *>::iterator iter;
129 _execMode = YACS::CONTINUE;
130 _isWaitingEventsFromRunningTasks = false;
131 _numberOfRunningTasks = 0;
132 _runningTasks.clear();
133 _numberOfEndedTasks=0;
136 sleepWhileNoEventsFromAnyRunningTask();
138 if(debug>2)_displayDot(graph);
141 YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
142 tasks=graph->getNextTasks(isMore);
143 graph->selectRunnableTasks(tasks);
144 }//End of critical section
146 if(debug>2)_displayDot(graph);
148 for(iter=tasks.begin();iter!=tasks.end();iter++)
151 if(debug>1)_displayDot(graph);
155 if(debug>1)_displayDot(graph);
158 YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
159 _toContinue=!graph->isFinished();
160 }//End of critical section
161 DEBTRACE("_toContinue: " << _toContinue);
163 if(debug>0)_displayDot(graph);
169 //! Execute a graph with breakpoints or step by step
171 * To be launch in a thread (main thread controls the progression).
172 * \param graph : schema to execute
173 * \param debug : display the graph with dot if debug >0
174 * \param fromScratch : if false, state from a previous partial exection is already loaded
176 * Calls Scheduler::getNextTasks and Scheduler::selectRunnableTasks to select tasks to execute
178 * Calls Executor::checkBreakPoints to verify if a pause is requested
180 * Calls Executor::launchTask to execute a selected Task
182 * Completion when graph is finished (Scheduler::isFinished)
184 * States of execution:
185 * - YACS::NOTYETINITIALIZED
186 * - YACS::INITIALISED
187 * - YACS::RUNNING (to next breakpoint or step)
188 * - YACS::WAITINGTASKS (a breakpoint or step as been reached, but there are still running tasks)
189 * - YACS::PAUSED (a breakpoint or step as been reached, no more running tasks)
190 * - YACS::FINISHED (no more ready tasks, nore running tasks)
191 * - YACS::STOPPED (stopped by user before end)
193 * Modes of Execution:
194 * - YACS::CONTINUE (normal run without breakpoints)
195 * - YACS::STEPBYSTEP (pause at each loop)
196 * - YACS::STOPBEFORENODES (pause when a node is reached)
198 * A breakpoint is defined by a node name. The breakpoint is reached when the node becomes ready.
199 * Step by Step means execution node by node or group of node by group of nodes.
200 * At a given step, the user decides to launch all the ready nodes or only a subset
201 * (Caution: some nodes must run in parallel).
202 * The next event (end of task) may give a new set of ready nodes, and define a new step.
204 * The graph execution may be controled by a pilot which sends requests. Requests are asynchronous.
205 * Requests are taken into account only on certain states, otherwise return the status IgnoredRequest.
206 * - Executor::getCurrentExecMode
207 * - Executor::getExecutorState
208 * - Executor::setExecMode : change the execution mode for next loop
209 * - Executor::setListOfBreakPoints : must be set before setting YACS::STOPBEFORENODES
210 * - Executor::getTasksToLoad : when paused or waiting tasks, get the list of next tasks
211 * - Executor::setStepsToExecute : define a subset of the list given by Executor::getTasksToLoad
212 * - Executor::resumeCurrentBreakPoint : when paused or waiting tasks, resumes execution
213 * - Executor::isNotFinished
214 * - Executor::stopExecution : stop execution asap, i.e. set STEPBYSTEP and wait PAUSED
215 * - Executor::saveState : dump the current state of execution in an xml file
216 * - Executor::loadState : Not yet implemented
217 * - Executor::getNbOfThreads
218 * - Executor::displayDot
219 * - Executor::setStopOnError : ask to stop execution if a node is found in ERROR state
221 * If the pilot wants to wait the state YACS::PAUSED or YACS::WAITINGTASKS, synchronisation is obtained with:
222 * - Executor::waitPause
225 * - Pilot may connect to executor during execution, or deconnect.
226 * - Several Pilots may be connected at the same time (for observation...)
230 void Executor::RunB(Scheduler *graph,int debug, bool fromScratch)
232 DEBTRACE("Executor::RunB debug: "<< graph->getName() <<" "<< debug<<" fromScratch: "<<fromScratch);
234 { // --- Critical section
235 YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
237 _root = dynamic_cast<ComposedNode *>(_mainSched);
238 if (!_root) throw Exception("Executor::Run, Internal Error!");
239 _executorState = YACS::NOTYETINITIALIZED;
240 sendEvent("executor");
243 _errorDetected = false;
244 _isWaitingEventsFromRunningTasks = false;
245 _numberOfRunningTasks = 0;
246 _runningTasks.clear();
247 _numberOfEndedTasks = 0;
248 string tracefile = "traceExec_";
249 tracefile += _mainSched->getName();
250 _trace.open(tracefile.c_str());
252 _start = timeGetTime();
254 gettimeofday(&_start, NULL);
257 } // --- End of critical section
259 if (debug > 1) _displayDot(graph);
266 graph->exUpdateState();
270 DEBTRACE("exception: "<< (ex.what()));
271 _executorState = YACS::FINISHED;
272 sendEvent("executor");
276 _executorState = YACS::INITIALISED;
277 sendEvent("executor");
279 if (debug > 1) _displayDot(graph);
281 vector<Task *>::iterator iter;
286 _executorState = YACS::RUNNING;
287 sendEvent("executor");
290 DEBTRACE("--- executor main loop");
291 sleepWhileNoEventsFromAnyRunningTask();
292 DEBTRACE("--- events...");
293 if (debug > 2) _displayDot(graph);
294 { // --- Critical section
295 YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
296 _tasks=graph->getNextTasks(isMore);
297 numberAllTasks=_numberOfRunningTasks+_tasks.size();
298 graph->selectRunnableTasks(_tasks);
299 } // --- End of critical section
300 if (debug > 2) _displayDot(graph);
301 if (_executorState == YACS::RUNNING)
303 if (checkBreakPoints()) break; // end of thread requested, OK to exit at once;
304 if (debug > 0) _displayDot(graph);
306 for (iter = _tasks.begin(); iter != _tasks.end(); iter++)
308 if (debug > 1) _displayDot(graph);
313 if (debug > 1) _displayDot(graph);
314 { // --- Critical section
316 YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
317 //It is possible that the graph is finished but it remains running tasks (it's an error but we must take it into account)
318 if(_numberOfRunningTasks == 0)
319 _toContinue = !graph->isFinished();
321 DEBTRACE("_numberOfRunningTasks: " << _numberOfRunningTasks);
322 DEBTRACE("_numberOfEndedTasks: " << _numberOfEndedTasks);
323 DEBTRACE("_toContinue: " << _toContinue);
324 if(_toContinue && numberAllTasks==0)
326 //Problem : no running tasks and no task to launch ??
328 std::cerr << "Problem in Executor : no running tasks and no task to launch ?? problemCount=" << problemCount << std::endl;
329 //Pause to give a chance to interrupt
331 if(problemCount > 25)
333 // Too much problems encountered : stop execution
340 _executorState = YACS::FINISHED;
341 sendEvent("executor");
342 _condForPilot.notify_all();
344 } // --- End of critical section
345 if (debug > 0) _displayDot(graph);
346 DEBTRACE("_toContinue: " << _toContinue);
349 DEBTRACE("End of main Loop");
351 { // --- Critical section
352 YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
353 if ( _toContinue) // --- break while(): request to stop detected on checkBreakPoints()
355 DEBTRACE("stop requested: End soon");
356 _executorState = YACS::STOPPED;
358 sendEvent("executor");
360 } // --- End of critical section
361 if ( _dumpOnErrorRequested && _errorDetected)
363 saveState(_dumpErrorFile);
366 DEBTRACE("End of RunB thread");
369 YACS::ExecutionMode Executor::getCurrentExecMode()
371 _isRunningunderExternalControl=true;
376 YACS::ExecutorState Executor::getExecutorState()
378 _isRunningunderExternalControl=true;
379 return _executorState;
383 bool Executor::isNotFinished()
385 _isRunningunderExternalControl=true;
389 //! ask to stop execution on the first node found in error
391 * \param dumpRequested produce a state dump when an error is found
392 * \param xmlFile name of file used for state dump
395 void Executor::setStopOnError(bool dumpRequested, std::string xmlFile)
397 { // --- Critical section
398 YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
399 _dumpErrorFile=xmlFile;
400 _stopOnErrorRequested=true;
401 _dumpOnErrorRequested = dumpRequested;
402 if (dumpRequested && xmlFile.empty())
403 throw YACS::Exception("dump on error requested and no filename given for dump");
404 DEBTRACE("_dumpErrorFile " << _dumpErrorFile << " " << _dumpOnErrorRequested);
405 } // --- End of critical section
408 //! ask to do not stop execution on nodes found in error
412 void Executor::unsetStopOnError()
414 { // --- Critical section
415 YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
416 _stopOnErrorRequested=false;
417 } // --- End of critical section
420 //! Dynamically set the current mode of execution
422 * The mode can be Continue, step by step, or stop before execution of a node
423 * defined in a list of breakpoints.
426 void Executor::setExecMode(YACS::ExecutionMode mode)
428 DEBTRACE("Executor::setExecMode(YACS::ExecutionMode mode) " << mode);
429 { // --- Critical section
430 YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
431 _isRunningunderExternalControl=true;
433 } // --- End of critical section
436 //! wake up executor when in pause
438 * When Executor is in state paused or waiting for task completion, the thread
439 * running loop RunB waits on condition _condForStepByStep.
440 * Thread RunB is waken up.
441 * \return true when actually wakes up executor
444 bool Executor::resumeCurrentBreakPoint()
446 DEBTRACE("Executor::resumeCurrentBreakPoint()");
448 //bool doDump = false;
449 { // --- Critical section
450 YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
451 _isRunningunderExternalControl=true;
452 DEBTRACE("_executorState: " << _executorState);
453 switch (_executorState)
455 case YACS::WAITINGTASKS:
458 _condForStepByStep.notify_all();
459 _executorState = YACS::RUNNING;
460 sendEvent("executor");
462 //if (_dumpOnErrorRequested && _errorDetected) doDump =true;
468 //if (_dumpOnErrorRequested && _errorDetected) doDump =true;
469 DEBTRACE("Graph Execution finished or stopped !");
474 // debug: no easy way to verify if main loop is acutally waiting on condition
478 //if (doDump) saveState(_dumpErrorFile);
479 } // --- End of critical section
484 //! define a list of nodes names as breakpoints in the graph
487 void Executor::setListOfBreakPoints(std::list<std::string> listOfBreakPoints)
489 DEBTRACE("Executor::setListOfBreakPoints(std::list<std::string> listOfBreakPoints)");
490 { // --- Critical section
491 YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
492 _isRunningunderExternalControl=true;
493 _listOfBreakPoints = listOfBreakPoints;
494 } // --- End of critical section
498 //! Get the list of tasks to load, to define a subset to execute in step by step mode
500 * If the executor is not in mode YACS::WAITINGTASKS nor YACS::PAUSED, the list is empty.
501 * Use Executor::waitPause to wait.
503 std::list<std::string> Executor::getTasksToLoad()
505 DEBTRACE("Executor::getTasksToLoad()");
506 list<string> listOfNodesToLoad;
507 listOfNodesToLoad.clear();
508 { // --- Critical section
509 YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
510 _isRunningunderExternalControl=true;
511 switch (_executorState)
513 case YACS::WAITINGTASKS:
516 listOfNodesToLoad = _listOfTasksToLoad;
519 case YACS::NOTYETINITIALIZED:
520 case YACS::INITIALISED:
529 } // --- End of critical section
530 return listOfNodesToLoad;
534 //! Define a subset of task to execute in step by step mode
536 * Behaviour is unpredictable if the list is not a subset of the list given by Executor::getTasksToLoad
537 * in the current step.
538 * If some nodes must run in parallel, they must stay together in the list.
541 bool Executor::setStepsToExecute(std::list<std::string> listToExecute)
543 DEBTRACE("Executor::setStepsToExecute(std::list<std::string> listToExecute)");
545 vector<Task *>::iterator iter;
546 vector<Task *> restrictedTasks;
547 { // --- Critical section
548 YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
549 _isRunningunderExternalControl=true;
550 switch (_executorState)
552 case YACS::WAITINGTASKS:
555 for (iter=_tasksSave.begin(); iter!=_tasksSave.end(); iter++)
557 string readyNode = _mainSched->getTaskName(*iter);
558 if (find(listToExecute.begin(), listToExecute.end(), readyNode)
559 != listToExecute.end())
561 restrictedTasks.push_back(*iter);
562 DEBTRACE("node to execute " << readyNode);
566 for (iter=restrictedTasks.begin(); iter!=restrictedTasks.end(); iter++)
568 _tasks.push_back(*iter);
572 case YACS::NOTYETINITIALIZED:
573 case YACS::INITIALISED:
582 } // --- End of critical section
585 for (iter=restrictedTasks.begin(); iter!=restrictedTasks.end(); iter++)
587 _tasks.push_back(*iter);
589 for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
591 string readyNode = _mainSched->getTaskName(*iter);
592 DEBTRACE("selected node to execute " << readyNode);
598 //! suspend pilot execution until Executor is in pause or waiting tasks completion mode.
600 * Do nothing if execution is finished or in pause.
601 * Wait first step if Executor is running or in initialization.
604 void Executor::waitPause()
606 DEBTRACE("Executor::waitPause()" << _executorState);
607 { // --- Critical section
608 YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
609 _isRunningunderExternalControl=true;
610 switch (_executorState)
615 case YACS::WAITINGTASKS:
620 case YACS::NOTYETINITIALIZED:
621 case YACS::INITIALISED:
624 _condForPilot.wait(_mutexForSchedulerUpdate); // wait until executor is PAUSED or WAITINGTASKS
628 } // --- End of critical section
632 //! stops the execution as soon as possible
634 void Executor::stopExecution()
636 setExecMode(YACS::STEPBYSTEP);
639 resumeCurrentBreakPoint();
642 //! save the current state of execution in an xml file
644 bool Executor::saveState(const std::string& xmlFile)
646 DEBTRACE("Executor::saveState() in " << xmlFile);
647 YACS::ENGINE::VisitorSaveState vst(_root);
648 vst.openFileDump(xmlFile.c_str());
654 //! not yet implemented
656 bool Executor::loadState()
658 DEBTRACE("Executor::loadState()");
659 _isRunningunderExternalControl=true;
664 static int isfile(const char *filename)
667 if (stat(filename, &buf) != 0)
669 if (!S_ISREG(buf.st_mode))
674 //! Display the graph state as a dot display, public method
676 void Executor::displayDot(Scheduler *graph)
678 _isRunningunderExternalControl=true;
682 //! Display the graph state as a dot display
684 * \param graph : the node to display
687 void Executor::_displayDot(Scheduler *graph)
689 std::ofstream g("titi");
690 ((ComposedNode*)graph)->writeDot(g);
692 const char displayScript[]="display.sh";
693 if(isfile(displayScript))
694 system("sh display.sh");
696 system("dot -Tpng titi|display -delay 5");
699 //! Wait reactivation in modes Step By step or with BreakPoints
701 * Check mode of execution (set by main thread):
702 * - YACS::CONTINUE : the graph execution continues.
703 * - YACS::STEPBYSTEP : wait on condition (pilot thread, Executor::resumeCurrentBreakPoint)
704 * - YACS::STOPBEFORENODES : if there are ready nodes in a list of breakpoints,
705 * wait on condition (pilot thread, Executor::resumeCurrentBreakPoint)
706 * else continue the graph execution.
707 * \return true if end of executor thread is requested
710 bool Executor::checkBreakPoints()
712 DEBTRACE("Executor::checkBreakPoints()");
713 vector<Task *>::iterator iter;
714 bool endRequested = false;
722 case YACS::STOPBEFORENODES:
725 { // --- Critical section
726 YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
728 for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
730 string nodeToLoad = _mainSched->getTaskName(*iter);
731 if (find(_listOfBreakPoints.begin(), _listOfBreakPoints.end(), nodeToLoad)
732 != _listOfBreakPoints.end())
740 _listOfTasksToLoad.clear();
741 for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
743 string nodeToLoad = _mainSched->getTaskName(*iter);
744 _listOfTasksToLoad.push_back(nodeToLoad);
746 if (getNbOfThreads())
747 _executorState = YACS::WAITINGTASKS; // will be paused after completion of running tasks
749 _executorState = YACS::PAUSED;
750 sendEvent("executor");
751 _condForPilot.notify_all();
753 if (stop && !_isOKToEnd) waitResume(); // wait until pilot calls resumeCurrentBreakPoint(), mutex released during wait
754 if (_isOKToEnd) endRequested = true;
755 } // --- End of critical section
756 if (stop) DEBTRACE("wake up from waitResume");
760 case YACS::STEPBYSTEP:
762 { // --- Critical section
763 YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
765 _listOfTasksToLoad.clear();
766 for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
768 string nodeToLoad = _mainSched->getTaskName(*iter);
769 _listOfTasksToLoad.push_back(nodeToLoad);
771 if (getNbOfThreads())
772 _executorState = YACS::WAITINGTASKS; // will be paused after completion of running tasks
774 _executorState = YACS::PAUSED;
775 sendEvent("executor");
776 _condForPilot.notify_all();
778 waitResume(); // wait until pilot calls resumeCurrentBreakPoint(), mutex released during wait
779 // or, if no pilot, wait until no more running tasks (stop on error)
780 if (_isOKToEnd) endRequested = true;
781 } // --- End of critical section
782 DEBTRACE("wake up from waitResume");
786 DEBTRACE("endRequested: " << endRequested);
791 //! in modes Step By step or with BreakPoint, wait until pilot resumes the execution
793 * With the condition Mutex, the mutex is released atomically during the wait.
794 * Pilot calls Executor::resumeCurrentBreakPoint to resume execution.
795 * Must be called while mutex is locked.
798 void Executor::waitResume()
800 DEBTRACE("Executor::waitResume()");
801 _condForStepByStep.wait(_mutexForSchedulerUpdate); // wait until pilot calls resumeCurrentBreakPoint()
806 //! Perform loading of a Task.
808 * \param task : Task to load
811 void Executor::loadTask(Task *task)
813 DEBTRACE("Executor::loadTask(Task *task)");
814 if(task->getState() != YACS::TOLOAD)return;
815 traceExec(task, "state:TOLOAD");
817 YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
818 _mainSched->notifyFrom(task,YACS::START);
819 }//End of critical section
822 traceExec(task, "load");
824 traceExec(task, "initService");
829 std::cerr << ex.what() << std::endl;
831 YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
833 _mainSched->notifyFrom(task,YACS::ABORT);
834 traceExec(task, "state:"+Node::getStateName(task->getState()));
835 }//End of critical section
839 std::cerr << "Load failed" << std::endl;
841 YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
843 _mainSched->notifyFrom(task,YACS::ABORT);
844 traceExec(task, "state:"+Node::getStateName(task->getState()));
845 }//End of critical section
850 //! Execute a list of tasks possibly connected through datastream links
852 * \param tasks : a list of tasks to execute
855 void Executor::launchTasks(std::vector<Task *>& tasks)
857 vector<Task *>::iterator iter;
858 //First phase, make datastream connections
859 for(iter=tasks.begin();iter!=tasks.end();iter++)
861 YACS::StatesForNode state=(*iter)->getState();
862 if(state != YACS::TOLOAD && state != YACS::TORECONNECT)continue;
865 (*iter)->connectService();
866 traceExec(*iter, "connectService");
868 YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
869 (*iter)->connected();
870 }//End of critical section
874 std::cerr << ex.what() << std::endl;
877 (*iter)->disconnectService();
878 traceExec(*iter, "disconnectService");
882 // Disconnect has failed
883 traceExec(*iter, "disconnectService failed, ABORT");
886 YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
888 _mainSched->notifyFrom(*iter,YACS::ABORT);
889 }//End of critical section
893 std::cerr << "Problem in connectService" << std::endl;
896 (*iter)->disconnectService();
897 traceExec(*iter, "disconnectService");
901 // Disconnect has failed
902 traceExec(*iter, "disconnectService failed, ABORT");
905 YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
907 _mainSched->notifyFrom(*iter,YACS::ABORT);
908 }//End of critical section
910 if((*iter)->getState() == YACS::ERROR)
912 //try to put all coupled tasks in error
913 std::set<Task*> coupledSet;
914 (*iter)->getCoupledTasks(coupledSet);
915 for (std::set<Task*>::iterator it = coupledSet.begin(); it != coupledSet.end(); ++it)
918 if(t == *iter)continue;
919 if(t->getState() == YACS::ERROR)continue;
922 t->disconnectService();
923 traceExec(t, "disconnectService");
927 // Disconnect has failed
928 traceExec(t, "disconnectService failed, ABORT");
931 YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
933 _mainSched->notifyFrom(t,YACS::ABORT);
934 }//End of critical section
935 traceExec(t, "state:"+Node::getStateName(t->getState()));
938 traceExec(*iter, "state:"+Node::getStateName((*iter)->getState()));
941 //Second phase, execute each task in a thread
942 for(iter=tasks.begin();iter!=tasks.end();iter++)
954 //! Execute a Task in a thread
956 * \param task : Task to execute
958 * Calls Scheduler::notifyFrom of main node (_mainSched) to notify start
960 * Calls Executor::functionForTaskExecution in Thread
963 void Executor::launchTask(Task *task)
965 DEBTRACE("Executor::launchTask(Task *task)");
966 struct threadargs *args;
967 if(task->getState() != YACS::TOACTIVATE)return;
969 DEBTRACE("before _semForMaxThreads.wait " << _semThreadCnt);
970 if(_semThreadCnt == 0)
972 //check if we have enough threads to run
973 std::set<Task*> tmpSet=_runningTasks;
974 std::set<Task*>::iterator it = tmpSet.begin();
975 std::string status="running";
976 std::set<Task*> coupledSet;
977 while( it != tmpSet.end() )
981 tt->getCoupledTasks(coupledSet);
983 for (std::set<Task*>::iterator iter = coupledSet.begin(); iter != coupledSet.end(); ++iter)
985 if((*iter)->getState() == YACS::TOACTIVATE)status="toactivate";
988 if(status=="running")break;
992 if(status=="toactivate")
994 std::cerr << "WARNING: maybe you need more threads to run your schema (current value="<< _maxThreads << ")" << std::endl;
995 std::cerr << "If it is the case, set the YACS_MAX_THREADS environment variable to a bigger value (export YACS_MAX_THREADS=xxx)" << std::endl;
999 _semForMaxThreads.wait();
1002 args= new threadargs;
1004 args->sched = _mainSched;
1005 args->execInst = this;
1007 traceExec(task, "launch");
1009 { // --- Critical section
1010 YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
1011 _numberOfRunningTasks++;
1012 _runningTasks.insert(task);
1013 task->begin(); //change state to ACTIVATED
1014 } // --- End of critical section
1015 Thread(functionForTaskExecution, args, _threadStackSize);
1018 //! wait until a running task ends
1020 void Executor::sleepWhileNoEventsFromAnyRunningTask()
1022 DEBTRACE("Executor::sleepWhileNoEventsFromAnyRunningTask()");
1023 // _semForNewTasksToPerform.wait(); //----utiliser pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
1024 YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
1025 if (_numberOfRunningTasks > 0 && _numberOfEndedTasks==0)
1027 _isWaitingEventsFromRunningTasks = true;
1028 _condForNewTasksToPerform.wait(_mutexForSchedulerUpdate); // mutex released during wait
1030 _numberOfEndedTasks=0;
1036 void Executor::notifyEndOfThread(YACS::BASES::Thread *thread)
1038 /*_mutexForNbOfConcurrentThreads.lock();
1039 _groupOfAllThreadsCreated.remove(thread);
1041 _mutexForNbOfConcurrentThreads.unlock();*/
1045 //! must be used protected by _mutexForSchedulerUpdate!
1047 void Executor::wakeUp()
1049 DEBTRACE("Executor::wakeUp() " << _isWaitingEventsFromRunningTasks);
1050 if (_isWaitingEventsFromRunningTasks)
1052 _isWaitingEventsFromRunningTasks = false;
1053 _condForNewTasksToPerform.notify_all();
1056 _numberOfEndedTasks++;
1059 //! number of running tasks
1061 int Executor::getNbOfThreads()
1064 YACS::BASES::AutoLocker alck(&_mutexForNbOfConcurrentThreads);
1065 _isRunningunderExternalControl=true;
1066 ret = _groupOfAllThreadsCreated.size();
1071 //! Function to perform execution of a task in a thread
1073 * \param arg : 3 elements (a Task, a Scheduler, an Executor)
1075 * Calls Task::execute
1077 * Calls Task::finished when the task is finished
1079 * Calls (notify with event YACS::FINISH) Scheduler::notifyFrom when the task is finished
1081 * Calls Executor::wakeUp and Executor::notifyEndOfThread
1084 void *Executor::functionForTaskExecution(void *arg)
1086 DEBTRACE("Executor::functionForTaskExecution(void *arg)");
1088 struct threadargs *args = (struct threadargs *) arg;
1089 Task *task=args->task;
1090 Scheduler *sched=args->sched;
1091 Executor *execInst=args->execInst;
1093 execInst->traceExec(task, "state:"+Node::getStateName(task->getState()));
1099 YACS::Event ev=YACS::FINISH;
1102 execInst->traceExec(task, "start execution");
1104 execInst->traceExec(task, "end execution OK");
1106 catch(Exception& ex)
1108 std::cerr << "YACS Exception during execute" << std::endl;
1109 std::cerr << ex.what() << std::endl;
1111 string message = "end execution ABORT, ";
1112 message += ex.what();
1113 execInst->traceExec(task, message);
1117 // Execution has failed
1118 std::cerr << "Execution has failed: unknown reason" << std::endl;
1120 execInst->traceExec(task, "end execution ABORT, unknown reason");
1126 DEBTRACE("task->disconnectService()");
1127 task->disconnectService();
1128 execInst->traceExec(task, "disconnectService");
1132 // Disconnect has failed
1133 std::cerr << "disconnect has failed" << std::endl;
1135 execInst->traceExec(task, "disconnectService failed, ABORT");
1138 DEBTRACE("End task->execute()");
1139 { // --- Critical section
1140 YACS::BASES::AutoLocker alck(&execInst->_mutexForSchedulerUpdate);
1143 if (ev == YACS::FINISH) task->finished();
1144 if (ev == YACS::ABORT)
1146 execInst->_errorDetected = true;
1147 if (execInst->_stopOnErrorRequested)
1149 execInst->_execMode = YACS::STEPBYSTEP;
1150 execInst->_isOKToEnd = true;
1154 execInst->traceExec(task, "state:"+Node::getStateName(task->getState()));
1155 sched->notifyFrom(task,ev);
1157 catch(Exception& ex)
1159 //notify has failed : it is supposed to have set state
1160 //so no need to do anything
1161 std::cerr << "Error during notification" << std::endl;
1162 std::cerr << ex.what() << std::endl;
1166 //notify has failed : it is supposed to have set state
1167 //so no need to do anything
1168 std::cerr << "Notification failed" << std::endl;
1170 execInst->_numberOfRunningTasks--;
1171 execInst->_runningTasks.erase(task);
1172 DEBTRACE("_numberOfRunningTasks: " << execInst->_numberOfRunningTasks
1173 << " _execMode: " << execInst->_execMode
1174 << " _executorState: " << execInst->_executorState);
1175 if ((execInst->_numberOfRunningTasks == 0) && (execInst->_execMode != YACS::CONTINUE)) // no more running tasks
1177 if (execInst->_executorState == YACS::WAITINGTASKS)
1179 execInst->_executorState = YACS::PAUSED;
1180 execInst->sendEvent("executor");
1181 execInst->_condForPilot.notify_all();
1182 if (execInst->_errorDetected &&
1183 execInst->_stopOnErrorRequested &&
1184 !execInst->_isRunningunderExternalControl)
1185 execInst->_condForStepByStep.notify_all(); // exec thread may be on waitResume
1188 DEBTRACE("before _semForMaxThreads.post " << execInst->_semThreadCnt);
1189 execInst->_semForMaxThreads.post();
1190 execInst->_semThreadCnt += 1;
1191 DEBTRACE("after _semForMaxThreads.post " << execInst->_semThreadCnt);
1192 if (execInst->_executorState != YACS::PAUSED) execInst->wakeUp();
1194 } // --- End of critical section (change state)
1196 //execInst->notifyEndOfThread(0);
1201 void Executor::traceExec(Task *task, const std::string& message)
1203 string nodeName = _mainSched->getTaskName(task);
1204 Container *cont = task->getContainer();
1205 string containerName = "---";
1206 string placement = "---";
1209 containerName = cont->getName();
1210 ComponentInstance *compo = task->getComponent();
1211 ServiceNode *taskCast(dynamic_cast<ServiceNode *>(task));
1213 placement = cont->getFullPlacementId(taskCast);
1216 DWORD now = timeGetTime();
1217 double elapse = (now - _start)/1000.0;
1220 gettimeofday(&now, NULL);
1221 double elapse = (now.tv_sec - _start.tv_sec) + double(now.tv_usec - _start.tv_usec)/1000000.0;
1224 YACS::BASES::AutoLocker alck(&_mutexForTrace);
1225 _trace << elapse << " " << containerName << " " << placement << " " << nodeName << " " << message << endl;
1230 //! emit notification to all observers registered with the dispatcher
1232 * The dispatcher is unique and can be obtained by getDispatcher()
1234 void Executor::sendEvent(const std::string& event)
1236 Dispatcher* disp=Dispatcher::getDispatcher();
1239 disp->dispatch(_root,event);