1 // Copyright (C) 2006-2013 CEA/DEN, EDF R&D
3 // This library is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU Lesser General Public
5 // License as published by the Free Software Foundation; either
6 // version 2.1 of the License.
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 // Lesser General Public License for more details.
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
20 #include "Executor.hxx"
22 #include "Scheduler.hxx"
23 #include "Dispatcher.hxx"
24 #include "Container.hxx"
25 #include "ComponentInstance.hxx"
27 #include "VisitorSaveState.hxx"
28 #include "ComposedNode.hxx"
42 #define usleep(A) _sleep(A/1000)
43 #if !defined(S_ISCHR) || !defined(S_ISREG)
46 # define S_IFMT _S_IFMT
47 # define S_IFCHR _S_IFCHR
48 # define S_IFREG _S_IFREG
51 # define S_IFMT __S_IFMT
52 # define S_IFCHR __S_IFCHR
53 # define S_IFREG __S_IFREG
57 # define S_ISCHR(mode) (((mode) & S_IFMT) == S_IFCHR)
58 # define S_ISREG(mode) (((mode) & S_IFMT) == S_IFREG)
62 using namespace YACS::ENGINE;
65 using YACS::BASES::Mutex;
66 using YACS::BASES::Thread;
67 using YACS::BASES::Semaphore;
70 #include "YacsTrace.hxx"
72 int Executor::_maxThreads(50);
73 size_t Executor::_threadStackSize(1048576); // Default thread stack size is 1MB
75 Executor::Executor():_nbOfConcurrentThreads(0), _semForMaxThreads(_maxThreads)
80 _stopOnErrorRequested = false;
81 _dumpOnErrorRequested = false;
82 _errorDetected = false;
83 _isRunningunderExternalControl=false;
84 _executorState = YACS::NOTYETINITIALIZED;
85 _execMode = YACS::CONTINUE;
86 _semThreadCnt = _maxThreads;
87 DEBTRACE("Executor initialized with max threads = " << _maxThreads);
92 for(list<Thread *>::iterator iter=_groupOfAllThreadsCreated.begin();iter!=_groupOfAllThreadsCreated.end();iter++)
96 //! Execute a graph waiting for completion
98 * \param graph : schema to execute
99 * \param debug : display the graph with dot if debug == 1
100 * \param fromScratch : if true the graph is reinitialized
102 * Calls Scheduler::getNextTasks and Scheduler::selectRunnableTasks to select tasks to execute
104 * Calls Executor::launchTask to execute a selected Task.
106 * Completion when graph is finished (Scheduler::isFinished)
109 void Executor::RunA(Scheduler *graph,int debug, bool fromScratch)
111 DEBTRACE("Executor::RunW debug: " << debug << " fromScratch: " << fromScratch);
113 _root = dynamic_cast<ComposedNode *>(_mainSched);
114 if (!_root) throw Exception("Executor::Run, Internal Error!");
117 if(debug>1)_displayDot(graph);
121 graph->exUpdateState();
123 if(debug>1)_displayDot(graph);
124 vector<Task *> tasks;
125 vector<Task *>::iterator iter;
127 _execMode = YACS::CONTINUE;
128 _isWaitingEventsFromRunningTasks = false;
129 _numberOfRunningTasks = 0;
130 _runningTasks.clear();
131 _numberOfEndedTasks=0;
134 sleepWhileNoEventsFromAnyRunningTask();
136 if(debug>2)_displayDot(graph);
139 _mutexForSchedulerUpdate.lock();
140 tasks=graph->getNextTasks(isMore);
141 graph->selectRunnableTasks(tasks);
142 _mutexForSchedulerUpdate.unlock();
143 }//End of critical section
145 if(debug>2)_displayDot(graph);
147 for(iter=tasks.begin();iter!=tasks.end();iter++)
150 if(debug>1)_displayDot(graph);
154 if(debug>1)_displayDot(graph);
157 _mutexForSchedulerUpdate.lock();
158 _toContinue=!graph->isFinished();
159 _mutexForSchedulerUpdate.unlock();
160 }//End of critical section
161 DEBTRACE("_toContinue: " << _toContinue);
163 if(debug>0)_displayDot(graph);
169 //! Execute a graph with breakpoints or step by step
171 * To be launch in a thread (main thread controls the progression).
172 * \param graph : schema to execute
173 * \param debug : display the graph with dot if debug >0
174 * \param fromScratch : if false, state from a previous partial exection is already loaded
176 * Calls Scheduler::getNextTasks and Scheduler::selectRunnableTasks to select tasks to execute
178 * Calls Executor::checkBreakPoints to verify if a pause is requested
180 * Calls Executor::launchTask to execute a selected Task
182 * Completion when graph is finished (Scheduler::isFinished)
184 * States of execution:
185 * - YACS::NOTYETINITIALIZED
186 * - YACS::INITIALISED
187 * - YACS::RUNNING (to next breakpoint or step)
188 * - YACS::WAITINGTASKS (a breakpoint or step as been reached, but there are still running tasks)
189 * - YACS::PAUSED (a breakpoint or step as been reached, no more running tasks)
190 * - YACS::FINISHED (no more ready tasks, nore running tasks)
191 * - YACS::STOPPED (stopped by user before end)
193 * Modes of Execution:
194 * - YACS::CONTINUE (normal run without breakpoints)
195 * - YACS::STEPBYSTEP (pause at each loop)
196 * - YACS::STOPBEFORENODES (pause when a node is reached)
198 * A breakpoint is defined by a node name. The breakpoint is reached when the node becomes ready.
199 * Step by Step means execution node by node or group of node by group of nodes.
200 * At a given step, the user decides to launch all the ready nodes or only a subset
201 * (Caution: some nodes must run in parallel).
202 * The next event (end of task) may give a new set of ready nodes, and define a new step.
204 * The graph execution may be controled by a pilot which sends requests. Requests are asynchronous.
205 * Requests are taken into account only on certain states, otherwise return the status IgnoredRequest.
206 * - Executor::getCurrentExecMode
207 * - Executor::getExecutorState
208 * - Executor::setExecMode : change the execution mode for next loop
209 * - Executor::setListOfBreakPoints : must be set before setting YACS::STOPBEFORENODES
210 * - Executor::getTasksToLoad : when paused or waiting tasks, get the list of next tasks
211 * - Executor::setStepsToExecute : define a subset of the list given by Executor::getTasksToLoad
212 * - Executor::resumeCurrentBreakPoint : when paused or waiting tasks, resumes execution
213 * - Executor::isNotFinished
214 * - Executor::stopExecution : stop execution asap, i.e. set STEPBYSTEP and wait PAUSED
215 * - Executor::saveState : dump the current state of execution in an xml file
216 * - Executor::loadState : Not yet implemented
217 * - Executor::getNbOfThreads
218 * - Executor::displayDot
219 * - Executor::setStopOnError : ask to stop execution if a node is found in ERROR state
221 * If the pilot wants to wait the state YACS::PAUSED or YACS::WAITINGTASKS, synchronisation is obtained with:
222 * - Executor::waitPause
225 * - Pilot may connect to executor during execution, or deconnect.
226 * - Several Pilots may be connected at the same time (for observation...)
230 void Executor::RunB(Scheduler *graph,int debug, bool fromScratch)
232 DEBTRACE("Executor::RunB debug: "<< graph->getName() <<" "<< debug<<" fromScratch: "<<fromScratch);
234 { // --- Critical section
235 _mutexForSchedulerUpdate.lock();
237 _root = dynamic_cast<ComposedNode *>(_mainSched);
238 if (!_root) throw Exception("Executor::Run, Internal Error!");
239 _executorState = YACS::NOTYETINITIALIZED;
240 sendEvent("executor");
243 _errorDetected = false;
244 _isWaitingEventsFromRunningTasks = false;
245 _numberOfRunningTasks = 0;
246 _runningTasks.clear();
247 _numberOfEndedTasks = 0;
248 string tracefile = "traceExec_";
249 tracefile += _mainSched->getName();
250 _trace.open(tracefile.c_str());
252 _start = timeGetTime();
254 gettimeofday(&_start, NULL);
257 _mutexForSchedulerUpdate.unlock();
258 } // --- End of critical section
260 if (debug > 1) _displayDot(graph);
267 graph->exUpdateState();
271 DEBTRACE("exception: "<< (ex.what()));
272 _executorState = YACS::FINISHED;
273 sendEvent("executor");
277 _executorState = YACS::INITIALISED;
278 sendEvent("executor");
280 if (debug > 1) _displayDot(graph);
282 vector<Task *>::iterator iter;
287 _executorState = YACS::RUNNING;
288 sendEvent("executor");
291 DEBTRACE("--- executor main loop");
292 sleepWhileNoEventsFromAnyRunningTask();
293 DEBTRACE("--- events...");
294 if (debug > 2) _displayDot(graph);
295 { // --- Critical section
296 _mutexForSchedulerUpdate.lock();
297 _tasks=graph->getNextTasks(isMore);
298 numberAllTasks=_numberOfRunningTasks+_tasks.size();
299 graph->selectRunnableTasks(_tasks);
300 _mutexForSchedulerUpdate.unlock();
301 } // --- End of critical section
302 if (debug > 2) _displayDot(graph);
303 if (_executorState == YACS::RUNNING)
305 if (checkBreakPoints()) break; // end of thread requested, OK to exit at once;
306 if (debug > 0) _displayDot(graph);
308 for (iter = _tasks.begin(); iter != _tasks.end(); iter++)
310 if (debug > 1) _displayDot(graph);
315 if (debug > 1) _displayDot(graph);
316 { // --- Critical section
318 _mutexForSchedulerUpdate.lock();
319 //It is possible that the graph is finished but it remains running tasks (it's an error but we must take it into account)
320 if(_numberOfRunningTasks == 0)
321 _toContinue = !graph->isFinished();
323 DEBTRACE("_numberOfRunningTasks: " << _numberOfRunningTasks);
324 DEBTRACE("_numberOfEndedTasks: " << _numberOfEndedTasks);
325 DEBTRACE("_toContinue: " << _toContinue);
326 if(_toContinue && numberAllTasks==0)
328 //Problem : no running tasks and no task to launch ??
330 std::cerr << "Problem in Executor : no running tasks and no task to launch ?? problemCount=" << problemCount << std::endl;
331 //Pause to give a chance to interrupt
333 if(problemCount > 25)
335 // Too much problems encountered : stop execution
342 _executorState = YACS::FINISHED;
343 sendEvent("executor");
344 _condForPilot.notify_all();
346 _mutexForSchedulerUpdate.unlock();
347 } // --- End of critical section
348 if (debug > 0) _displayDot(graph);
349 DEBTRACE("_toContinue: " << _toContinue);
352 DEBTRACE("End of main Loop");
354 { // --- Critical section
355 _mutexForSchedulerUpdate.lock();
356 if ( _toContinue) // --- break while(): request to stop detected on checkBreakPoints()
358 DEBTRACE("stop requested: End soon");
359 _executorState = YACS::STOPPED;
361 sendEvent("executor");
363 _mutexForSchedulerUpdate.unlock();
364 } // --- End of critical section
365 if ( _dumpOnErrorRequested && _errorDetected)
367 saveState(_dumpErrorFile);
370 DEBTRACE("End of RunB thread");
373 YACS::ExecutionMode Executor::getCurrentExecMode()
375 _isRunningunderExternalControl=true;
380 YACS::ExecutorState Executor::getExecutorState()
382 _isRunningunderExternalControl=true;
383 return _executorState;
387 bool Executor::isNotFinished()
389 _isRunningunderExternalControl=true;
393 //! ask to stop execution on the first node found in error
395 * \param dumpRequested produce a state dump when an error is found
396 * \param xmlFile name of file used for state dump
399 void Executor::setStopOnError(bool dumpRequested, std::string xmlFile)
401 { // --- Critical section
402 _mutexForSchedulerUpdate.lock();
403 _dumpErrorFile=xmlFile;
404 _stopOnErrorRequested=true;
405 _dumpOnErrorRequested = dumpRequested;
406 if (dumpRequested && xmlFile.empty())
407 throw YACS::Exception("dump on error requested and no filename given for dump");
408 _mutexForSchedulerUpdate.unlock();
409 DEBTRACE("_dumpErrorFile " << _dumpErrorFile << " " << _dumpOnErrorRequested);
410 } // --- End of critical section
413 //! ask to do not stop execution on nodes found in error
417 void Executor::unsetStopOnError()
419 { // --- Critical section
420 _mutexForSchedulerUpdate.lock();
421 _stopOnErrorRequested=false;
422 _mutexForSchedulerUpdate.unlock();
423 } // --- End of critical section
426 //! Dynamically set the current mode of execution
428 * The mode can be Continue, step by step, or stop before execution of a node
429 * defined in a list of breakpoints.
432 void Executor::setExecMode(YACS::ExecutionMode mode)
434 DEBTRACE("Executor::setExecMode(YACS::ExecutionMode mode) " << mode);
435 { // --- Critical section
436 _mutexForSchedulerUpdate.lock();
437 _isRunningunderExternalControl=true;
439 _mutexForSchedulerUpdate.unlock();
440 } // --- End of critical section
443 //! wake up executor when in pause
445 * When Executor is in state paused or waiting for task completion, the thread
446 * running loop RunB waits on condition _condForStepByStep.
447 * Thread RunB is waken up.
448 * \return true when actually wakes up executor
451 bool Executor::resumeCurrentBreakPoint()
453 DEBTRACE("Executor::resumeCurrentBreakPoint()");
455 //bool doDump = false;
456 { // --- Critical section
457 _mutexForSchedulerUpdate.lock();
458 _isRunningunderExternalControl=true;
459 DEBTRACE("_executorState: " << _executorState);
460 switch (_executorState)
462 case YACS::WAITINGTASKS:
465 _condForStepByStep.notify_all();
466 _executorState = YACS::RUNNING;
467 sendEvent("executor");
469 //if (_dumpOnErrorRequested && _errorDetected) doDump =true;
475 //if (_dumpOnErrorRequested && _errorDetected) doDump =true;
476 DEBTRACE("Graph Execution finished or stopped !");
481 // debug: no easy way to verify if main loop is acutally waiting on condition
484 _mutexForSchedulerUpdate.unlock();
486 //if (doDump) saveState(_dumpErrorFile);
487 } // --- End of critical section
492 //! define a list of nodes names as breakpoints in the graph
495 void Executor::setListOfBreakPoints(std::list<std::string> listOfBreakPoints)
497 DEBTRACE("Executor::setListOfBreakPoints(std::list<std::string> listOfBreakPoints)");
498 { // --- Critical section
499 _mutexForSchedulerUpdate.lock();
500 _isRunningunderExternalControl=true;
501 _listOfBreakPoints = listOfBreakPoints;
502 _mutexForSchedulerUpdate.unlock();
503 } // --- End of critical section
507 //! Get the list of tasks to load, to define a subset to execute in step by step mode
509 * If the executor is not in mode YACS::WAITINGTASKS nor YACS::PAUSED, the list is empty.
510 * Use Executor::waitPause to wait.
512 std::list<std::string> Executor::getTasksToLoad()
514 DEBTRACE("Executor::getTasksToLoad()");
515 list<string> listOfNodesToLoad;
516 listOfNodesToLoad.clear();
517 { // --- Critical section
518 _mutexForSchedulerUpdate.lock();
519 _isRunningunderExternalControl=true;
520 switch (_executorState)
522 case YACS::WAITINGTASKS:
525 listOfNodesToLoad = _listOfTasksToLoad;
528 case YACS::NOTYETINITIALIZED:
529 case YACS::INITIALISED:
538 _mutexForSchedulerUpdate.unlock();
539 } // --- End of critical section
540 return listOfNodesToLoad;
544 //! Define a subset of task to execute in step by step mode
546 * Behaviour is unpredictable if the list is not a subset of the list given by Executor::getTasksToLoad
547 * in the current step.
548 * If some nodes must run in parallel, they must stay together in the list.
551 bool Executor::setStepsToExecute(std::list<std::string> listToExecute)
553 DEBTRACE("Executor::setStepsToExecute(std::list<std::string> listToExecute)");
555 vector<Task *>::iterator iter;
556 vector<Task *> restrictedTasks;
557 { // --- Critical section
558 _mutexForSchedulerUpdate.lock();
559 _isRunningunderExternalControl=true;
560 switch (_executorState)
562 case YACS::WAITINGTASKS:
565 for (iter=_tasksSave.begin(); iter!=_tasksSave.end(); iter++)
567 string readyNode = _mainSched->getTaskName(*iter);
568 if (find(listToExecute.begin(), listToExecute.end(), readyNode)
569 != listToExecute.end())
571 restrictedTasks.push_back(*iter);
572 DEBTRACE("node to execute " << readyNode);
576 for (iter=restrictedTasks.begin(); iter!=restrictedTasks.end(); iter++)
578 _tasks.push_back(*iter);
582 case YACS::NOTYETINITIALIZED:
583 case YACS::INITIALISED:
592 _mutexForSchedulerUpdate.unlock();
593 } // --- End of critical section
596 for (iter=restrictedTasks.begin(); iter!=restrictedTasks.end(); iter++)
598 _tasks.push_back(*iter);
600 for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
602 string readyNode = _mainSched->getTaskName(*iter);
603 DEBTRACE("selected node to execute " << readyNode);
609 //! suspend pilot execution until Executor is in pause or waiting tasks completion mode.
611 * Do nothing if execution is finished or in pause.
612 * Wait first step if Executor is running or in initialization.
615 void Executor::waitPause()
617 DEBTRACE("Executor::waitPause()" << _executorState);
618 { // --- Critical section
619 _mutexForSchedulerUpdate.lock();
620 _isRunningunderExternalControl=true;
621 switch (_executorState)
626 case YACS::WAITINGTASKS:
631 case YACS::NOTYETINITIALIZED:
632 case YACS::INITIALISED:
635 _condForPilot.wait(_mutexForSchedulerUpdate); // wait until executor is PAUSED or WAITINGTASKS
639 _mutexForSchedulerUpdate.unlock();
640 } // --- End of critical section
644 //! stops the execution as soon as possible
646 void Executor::stopExecution()
648 setExecMode(YACS::STEPBYSTEP);
651 resumeCurrentBreakPoint();
654 //! save the current state of execution in an xml file
656 bool Executor::saveState(const std::string& xmlFile)
658 DEBTRACE("Executor::saveState() in " << xmlFile);
659 YACS::ENGINE::VisitorSaveState vst(_root);
660 vst.openFileDump(xmlFile.c_str());
666 //! not yet implemented
668 bool Executor::loadState()
670 DEBTRACE("Executor::loadState()");
671 _isRunningunderExternalControl=true;
676 static int isfile(const char *filename)
679 if (stat(filename, &buf) != 0)
681 if (!S_ISREG(buf.st_mode))
686 //! Display the graph state as a dot display, public method
688 void Executor::displayDot(Scheduler *graph)
690 _isRunningunderExternalControl=true;
694 //! Display the graph state as a dot display
696 * \param graph : the node to display
699 void Executor::_displayDot(Scheduler *graph)
701 std::ofstream g("titi");
702 ((ComposedNode*)graph)->writeDot(g);
704 const char displayScript[]="display.sh";
705 if(isfile(displayScript))
706 system("sh display.sh");
708 system("dot -Tpng titi|display -delay 5");
711 //! Wait reactivation in modes Step By step or with BreakPoints
713 * Check mode of execution (set by main thread):
714 * - YACS::CONTINUE : the graph execution continues.
715 * - YACS::STEPBYSTEP : wait on condition (pilot thread, Executor::resumeCurrentBreakPoint)
716 * - YACS::STOPBEFORENODES : if there are ready nodes in a list of breakpoints,
717 * wait on condition (pilot thread, Executor::resumeCurrentBreakPoint)
718 * else continue the graph execution.
719 * \return true if end of executor thread is requested
722 bool Executor::checkBreakPoints()
724 DEBTRACE("Executor::checkBreakPoints()");
725 vector<Task *>::iterator iter;
726 bool endRequested = false;
734 case YACS::STOPBEFORENODES:
737 { // --- Critical section
738 _mutexForSchedulerUpdate.lock();
740 for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
742 string nodeToLoad = _mainSched->getTaskName(*iter);
743 if (find(_listOfBreakPoints.begin(), _listOfBreakPoints.end(), nodeToLoad)
744 != _listOfBreakPoints.end())
752 _listOfTasksToLoad.clear();
753 for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
755 string nodeToLoad = _mainSched->getTaskName(*iter);
756 _listOfTasksToLoad.push_back(nodeToLoad);
758 if (getNbOfThreads())
759 _executorState = YACS::WAITINGTASKS; // will be paused after completion of running tasks
761 _executorState = YACS::PAUSED;
762 sendEvent("executor");
763 _condForPilot.notify_all();
765 //_mutexForSchedulerUpdate.unlock();
766 //} // --- End of critical section
767 if (stop && !_isOKToEnd) waitResume(); // wait until pilot calls resumeCurrentBreakPoint(), mutex released during wait
768 if (_isOKToEnd) endRequested = true;
769 _mutexForSchedulerUpdate.unlock();
770 } // --- End of critical section
771 if (stop) DEBTRACE("wake up from waitResume");
775 case YACS::STEPBYSTEP:
777 { // --- Critical section
778 _mutexForSchedulerUpdate.lock();
780 _listOfTasksToLoad.clear();
781 for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
783 string nodeToLoad = _mainSched->getTaskName(*iter);
784 _listOfTasksToLoad.push_back(nodeToLoad);
786 if (getNbOfThreads())
787 _executorState = YACS::WAITINGTASKS; // will be paused after completion of running tasks
789 _executorState = YACS::PAUSED;
790 sendEvent("executor");
791 _condForPilot.notify_all();
793 waitResume(); // wait until pilot calls resumeCurrentBreakPoint(), mutex released during wait
794 // or, if no pilot, wait until no more running tasks (stop on error)
795 if (_isOKToEnd) endRequested = true;
796 _mutexForSchedulerUpdate.unlock();
797 } // --- End of critical section
798 DEBTRACE("wake up from waitResume");
802 DEBTRACE("endRequested: " << endRequested);
807 //! in modes Step By step or with BreakPoint, wait until pilot resumes the execution
809 * With the condition Mutex, the mutex is released atomically during the wait.
810 * Pilot calls Executor::resumeCurrentBreakPoint to resume execution.
811 * Must be called while mutex is locked.
814 void Executor::waitResume()
816 DEBTRACE("Executor::waitResume()");
817 _condForStepByStep.wait(_mutexForSchedulerUpdate); // wait until pilot calls resumeCurrentBreakPoint()
822 //! Perform loading of a Task.
824 * \param task : Task to load
827 void Executor::loadTask(Task *task)
829 DEBTRACE("Executor::loadTask(Task *task)");
830 if(task->getState() != YACS::TOLOAD)return;
831 traceExec(task, "state:TOLOAD");
833 _mutexForSchedulerUpdate.lock();
834 _mainSched->notifyFrom(task,YACS::START);
835 _mutexForSchedulerUpdate.unlock();
836 }//End of critical section
839 traceExec(task, "load");
841 traceExec(task, "initService");
846 std::cerr << ex.what() << std::endl;
848 _mutexForSchedulerUpdate.lock();
850 _mainSched->notifyFrom(task,YACS::ABORT);
851 traceExec(task, "state:"+Node::getStateName(task->getState()));
852 _mutexForSchedulerUpdate.unlock();
853 }//End of critical section
857 std::cerr << "Load failed" << std::endl;
859 _mutexForSchedulerUpdate.lock();
861 _mainSched->notifyFrom(task,YACS::ABORT);
862 traceExec(task, "state:"+Node::getStateName(task->getState()));
863 _mutexForSchedulerUpdate.unlock();
864 }//End of critical section
869 //! Execute a list of tasks possibly connected through datastream links
871 * \param tasks : a list of tasks to execute
874 void Executor::launchTasks(std::vector<Task *>& tasks)
876 vector<Task *>::iterator iter;
877 //First phase, make datastream connections
878 for(iter=tasks.begin();iter!=tasks.end();iter++)
880 YACS::StatesForNode state=(*iter)->getState();
881 if(state != YACS::TOLOAD && state != YACS::TORECONNECT)continue;
884 (*iter)->connectService();
885 traceExec(*iter, "connectService");
887 _mutexForSchedulerUpdate.lock();
888 (*iter)->connected();
889 _mutexForSchedulerUpdate.unlock();
890 }//End of critical section
894 std::cerr << ex.what() << std::endl;
897 (*iter)->disconnectService();
898 traceExec(*iter, "disconnectService");
902 // Disconnect has failed
903 traceExec(*iter, "disconnectService failed, ABORT");
906 _mutexForSchedulerUpdate.lock();
908 _mainSched->notifyFrom(*iter,YACS::ABORT);
909 _mutexForSchedulerUpdate.unlock();
910 }//End of critical section
914 std::cerr << "Problem in connectService" << std::endl;
917 (*iter)->disconnectService();
918 traceExec(*iter, "disconnectService");
922 // Disconnect has failed
923 traceExec(*iter, "disconnectService failed, ABORT");
926 _mutexForSchedulerUpdate.lock();
928 _mainSched->notifyFrom(*iter,YACS::ABORT);
929 _mutexForSchedulerUpdate.unlock();
930 }//End of critical section
932 if((*iter)->getState() == YACS::ERROR)
934 //try to put all coupled tasks in error
935 std::set<Task*> coupledSet;
936 (*iter)->getCoupledTasks(coupledSet);
937 for (std::set<Task*>::iterator it = coupledSet.begin(); it != coupledSet.end(); ++it)
940 if(t == *iter)continue;
941 if(t->getState() == YACS::ERROR)continue;
944 t->disconnectService();
945 traceExec(t, "disconnectService");
949 // Disconnect has failed
950 traceExec(t, "disconnectService failed, ABORT");
953 _mutexForSchedulerUpdate.lock();
955 _mainSched->notifyFrom(t,YACS::ABORT);
956 _mutexForSchedulerUpdate.unlock();
957 }//End of critical section
958 traceExec(t, "state:"+Node::getStateName(t->getState()));
961 traceExec(*iter, "state:"+Node::getStateName((*iter)->getState()));
964 //Second phase, execute each task in a thread
965 for(iter=tasks.begin();iter!=tasks.end();iter++)
977 //! Execute a Task in a thread
979 * \param task : Task to execute
981 * Calls Scheduler::notifyFrom of main node (_mainSched) to notify start
983 * Calls Executor::functionForTaskExecution in Thread
986 void Executor::launchTask(Task *task)
988 DEBTRACE("Executor::launchTask(Task *task)");
989 struct threadargs *args;
990 if(task->getState() != YACS::TOACTIVATE)return;
992 DEBTRACE("before _semForMaxThreads.wait " << _semThreadCnt);
993 if(_semThreadCnt == 0)
995 //check if we have enough threads to run
996 std::set<Task*> tmpSet=_runningTasks;
997 std::set<Task*>::iterator it = tmpSet.begin();
998 std::string status="running";
999 std::set<Task*> coupledSet;
1000 while( it != tmpSet.end() )
1004 tt->getCoupledTasks(coupledSet);
1006 for (std::set<Task*>::iterator iter = coupledSet.begin(); iter != coupledSet.end(); ++iter)
1008 if((*iter)->getState() == YACS::TOACTIVATE)status="toactivate";
1009 tmpSet.erase(*iter);
1011 if(status=="running")break;
1012 it = tmpSet.begin();
1015 if(status=="toactivate")
1017 std::cerr << "WARNING: maybe you need more threads to run your schema (current value="<< _maxThreads << ")" << std::endl;
1018 std::cerr << "If it is the case, set the YACS_MAX_THREADS environment variable to a bigger value (export YACS_MAX_THREADS=xxx)" << std::endl;
1022 _semForMaxThreads.wait();
1025 args= new threadargs;
1027 args->sched = _mainSched;
1028 args->execInst = this;
1030 traceExec(task, "launch");
1032 { // --- Critical section
1033 _mutexForSchedulerUpdate.lock();
1034 _numberOfRunningTasks++;
1035 _runningTasks.insert(task);
1036 task->begin(); //change state to ACTIVATED
1037 _mutexForSchedulerUpdate.unlock();
1038 } // --- End of critical section
1039 Thread(functionForTaskExecution, args, _threadStackSize);
1042 //! wait until a running task ends
1044 void Executor::sleepWhileNoEventsFromAnyRunningTask()
1046 DEBTRACE("Executor::sleepWhileNoEventsFromAnyRunningTask()");
1047 // _semForNewTasksToPerform.wait(); //----utiliser pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
1048 _mutexForSchedulerUpdate.lock();
1049 if (_numberOfRunningTasks > 0 && _numberOfEndedTasks==0)
1051 _isWaitingEventsFromRunningTasks = true;
1052 _condForNewTasksToPerform.wait(_mutexForSchedulerUpdate); // mutex released during wait
1054 _numberOfEndedTasks=0;
1055 _mutexForSchedulerUpdate.unlock();
1061 void Executor::notifyEndOfThread(YACS::BASES::Thread *thread)
1063 /*_mutexForNbOfConcurrentThreads.lock();
1064 _groupOfAllThreadsCreated.remove(thread);
1066 _mutexForNbOfConcurrentThreads.unlock();*/
1070 //! must be used protected by _mutexForSchedulerUpdate!
1072 void Executor::wakeUp()
1074 DEBTRACE("Executor::wakeUp() " << _isWaitingEventsFromRunningTasks);
1075 if (_isWaitingEventsFromRunningTasks)
1077 _isWaitingEventsFromRunningTasks = false;
1078 _condForNewTasksToPerform.notify_all();
1081 _numberOfEndedTasks++;
1084 //! number of running tasks
1086 int Executor::getNbOfThreads()
1089 _mutexForNbOfConcurrentThreads.lock();
1090 _isRunningunderExternalControl=true;
1091 ret = _groupOfAllThreadsCreated.size();
1092 _mutexForNbOfConcurrentThreads.unlock();
1097 //! Function to perform execution of a task in a thread
1099 * \param arg : 3 elements (a Task, a Scheduler, an Executor)
1101 * Calls Task::execute
1103 * Calls Task::finished when the task is finished
1105 * Calls (notify with event YACS::FINISH) Scheduler::notifyFrom when the task is finished
1107 * Calls Executor::wakeUp and Executor::notifyEndOfThread
1110 void *Executor::functionForTaskExecution(void *arg)
1112 DEBTRACE("Executor::functionForTaskExecution(void *arg)");
1114 struct threadargs *args = (struct threadargs *) arg;
1115 Task *task=args->task;
1116 Scheduler *sched=args->sched;
1117 Executor *execInst=args->execInst;
1119 execInst->traceExec(task, "state:"+Node::getStateName(task->getState()));
1125 YACS::Event ev=YACS::FINISH;
1128 execInst->traceExec(task, "start execution");
1130 execInst->traceExec(task, "end execution OK");
1132 catch(Exception& ex)
1134 std::cerr << "YACS Exception during execute" << std::endl;
1135 std::cerr << ex.what() << std::endl;
1137 string message = "end execution ABORT, ";
1138 message += ex.what();
1139 execInst->traceExec(task, message);
1143 // Execution has failed
1144 std::cerr << "Execution has failed: unknown reason" << std::endl;
1146 execInst->traceExec(task, "end execution ABORT, unknown reason");
1152 DEBTRACE("task->disconnectService()");
1153 task->disconnectService();
1154 execInst->traceExec(task, "disconnectService");
1158 // Disconnect has failed
1159 std::cerr << "disconnect has failed" << std::endl;
1161 execInst->traceExec(task, "disconnectService failed, ABORT");
1164 DEBTRACE("End task->execute()");
1165 { // --- Critical section
1166 execInst->_mutexForSchedulerUpdate.lock();
1169 if (ev == YACS::FINISH) task->finished();
1170 if (ev == YACS::ABORT)
1172 execInst->_errorDetected = true;
1173 if (execInst->_stopOnErrorRequested)
1175 execInst->_execMode = YACS::STEPBYSTEP;
1176 execInst->_isOKToEnd = true;
1180 execInst->traceExec(task, "state:"+Node::getStateName(task->getState()));
1181 sched->notifyFrom(task,ev);
1183 catch(Exception& ex)
1185 //notify has failed : it is supposed to have set state
1186 //so no need to do anything
1187 std::cerr << "Error during notification" << std::endl;
1188 std::cerr << ex.what() << std::endl;
1192 //notify has failed : it is supposed to have set state
1193 //so no need to do anything
1194 std::cerr << "Notification failed" << std::endl;
1196 execInst->_numberOfRunningTasks--;
1197 execInst->_runningTasks.erase(task);
1198 DEBTRACE("_numberOfRunningTasks: " << execInst->_numberOfRunningTasks
1199 << " _execMode: " << execInst->_execMode
1200 << " _executorState: " << execInst->_executorState);
1201 if ((execInst->_numberOfRunningTasks == 0) && (execInst->_execMode != YACS::CONTINUE)) // no more running tasks
1203 if (execInst->_executorState == YACS::WAITINGTASKS)
1205 execInst->_executorState = YACS::PAUSED;
1206 execInst->sendEvent("executor");
1207 execInst->_condForPilot.notify_all();
1208 if (execInst->_errorDetected &&
1209 execInst->_stopOnErrorRequested &&
1210 !execInst->_isRunningunderExternalControl)
1211 execInst->_condForStepByStep.notify_all(); // exec thread may be on waitResume
1214 DEBTRACE("before _semForMaxThreads.post " << execInst->_semThreadCnt);
1215 execInst->_semForMaxThreads.post();
1216 execInst->_semThreadCnt += 1;
1217 DEBTRACE("after _semForMaxThreads.post " << execInst->_semThreadCnt);
1218 if (execInst->_executorState != YACS::PAUSED) execInst->wakeUp();
1220 execInst->_mutexForSchedulerUpdate.unlock();
1221 } // --- End of critical section (change state)
1223 //execInst->notifyEndOfThread(0);
1228 void Executor::traceExec(Task *task, const std::string& message)
1230 string nodeName = _mainSched->getTaskName(task);
1231 Container *cont = task->getContainer();
1232 string containerName = "---";
1233 string placement = "---";
1236 containerName = cont->getName();
1237 ComponentInstance *compo = task->getComponent();
1239 placement = cont->getFullPlacementId(compo);
1242 DWORD now = timeGetTime();
1243 double elapse = (now - _start)/1000.0;
1246 gettimeofday(&now, NULL);
1247 double elapse = (now.tv_sec - _start.tv_sec) + double(now.tv_usec - _start.tv_usec)/1000000.0;
1249 _mutexForTrace.lock();
1250 _trace << elapse << " " << containerName << " " << placement << " " << nodeName << " " << message << endl;
1252 _mutexForTrace.unlock();
1255 //! emit notification to all observers registered with the dispatcher
1257 * The dispatcher is unique and can be obtained by getDispatcher()
1259 void Executor::sendEvent(const std::string& event)
1261 Dispatcher* disp=Dispatcher::getDispatcher();
1264 disp->dispatch(_root,event);