1 #include "Executor.hxx"
3 #include "Scheduler.hxx"
4 #include "Dispatcher.hxx"
6 #include "VisitorSaveState.hxx"
7 #include "ComposedNode.hxx"
14 using namespace YACS::ENGINE;
17 using YACS::BASES::Mutex;
18 using YACS::BASES::Thread;
19 using YACS::BASES::Semaphore;
22 #include "YacsTrace.hxx"
24 Executor::Executor():_nbOfConcurrentThreads(0), _semForMaxThreads(50)
29 _stopOnErrorRequested = false;
30 _dumpOnErrorRequested = false;
31 _errorDetected = false;
32 _isRunningunderExternalControl=false;
33 _executorState = YACS::NOTYETINITIALIZED;
34 _execMode = YACS::CONTINUE;
40 for(list<Thread *>::iterator iter=_groupOfAllThreadsCreated.begin();iter!=_groupOfAllThreadsCreated.end();iter++)
44 //! Execute a graph waiting for completion
46 * \param graph : schema to execute
47 * \param debug : display the graph with dot if debug == 1
49 * Calls Scheduler::getNextTasks and Scheduler::selectRunnableTasks to select tasks to execute
51 * Calls Executor::launchTask to execute a selected Task.
53 * Completion when graph is finished (Scheduler::isFinished)
56 void Executor::RunA(Scheduler *graph,int debug, bool fromScratch)
58 DEBTRACE("Executor::RunW debug: " << debug << " fromScratch: " << fromScratch);
60 _root = dynamic_cast<ComposedNode *>(_mainSched);
61 if (!_root) throw Exception("Executor::Run, Internal Error!");
64 if(debug>1)_displayDot(graph);
68 graph->exUpdateState();
70 if(debug>1)_displayDot(graph);
72 vector<Task *>::iterator iter;
74 _execMode = YACS::CONTINUE;
75 _isWaitingEventsFromRunningTasks = false;
76 _numberOfRunningTasks = 0;
79 sleepWhileNoEventsFromAnyRunningTask();
81 if(debug>2)_displayDot(graph);
84 _mutexForSchedulerUpdate.lock();
85 tasks=graph->getNextTasks(isMore);
86 graph->selectRunnableTasks(tasks);
87 _mutexForSchedulerUpdate.unlock();
88 }//End of critical section
90 if(debug>2)_displayDot(graph);
92 for(iter=tasks.begin();iter!=tasks.end();iter++)
95 if(debug>1)_displayDot(graph);
99 if(debug>1)_displayDot(graph);
102 _mutexForSchedulerUpdate.lock();
103 _toContinue=!graph->isFinished();
104 _mutexForSchedulerUpdate.unlock();
105 }//End of critical section
106 DEBTRACE("_toContinue: " << _toContinue);
108 if(debug>0)_displayDot(graph);
114 //! Execute a graph with breakpoints or step by step
116 * To be launch in a thread (main thread controls the progression).
117 * \param graph : schema to execute
118 * \param debug : display the graph with dot if debug >0
119 * \param fromscratch : if false, state from a previous partial exection is already loaded
121 * Calls Scheduler::getNextTasks and Scheduler::selectRunnableTasks to select tasks to execute
123 * Calls Executor::checkBreakPoints to verify if a pause is requested
125 * Calls Executor::launchTask to execute a selected Task
127 * Completion when graph is finished (Scheduler::isFinished)
129 * States of execution:
130 * - YACS::NOTYETINITIALIZED
131 * - YACS::INITIALISED
132 * - YACS::RUNNING (to next breakpoint or step)
133 * - YACS::WAITINGTASKS (a breakpoint or step as been reached, but there are still running tasks)
134 * - YACS::PAUSED (a breakpoint or step as been reached, no more running tasks)
135 * - YACS::FINISHED (no more ready tasks, nore running tasks)
136 * - YACS::STOPPED (stopped by user before end)
138 * Modes of Execution:
139 * - YACS::CONTINUE (normal run without breakpoints)
140 * - YACS::STEPBYSTEP (pause at each loop)
141 * - YACS::STOPBEFORENODES (pause when a node is reached)
143 * A breakpoint is defined by a node name. The breakpoint is reached when the node becomes ready.
144 * Step by Step means execution node by node or group of node by group of nodes.
145 * At a given step, the user decides to launch all the ready nodes or only a subset
146 * (Caution: some nodes must run in parallel).
147 * The next event (end of task) may give a new set of ready nodes, and define a new step.
149 * The graph execution may be controled by a pilot which sends requests. Requests are asynchronous.
150 * Requests are taken into account only on certain states, otherwise return the status IgnoredRequest.
151 * - Executor::getCurrentExecMode
152 * - Executor::getExecutorState
153 * - Executor::setExecMode : change the execution mode for next loop
154 * - Executor::setListOfBreakPoints : must be set before setting YACS::STOPBEFORENODES
155 * - Executor::getTasksToLoad : when paused or waiting tasks, get the list of next tasks
156 * - Executor::setStepsToExecute : define a subset of the list given by Executor::getTasksToLoad
157 * - Executor::resumeCurrentBreakPoint : when paused or waiting tasks, resumes execution
158 * - Executor::isNotFinished
159 * - Executor::stopExecution : stop execution asap, i.e. set STEPBYSTEP and wait PAUSED
160 * - Executor::saveState : dump the current state of execution in an xml file
161 * - Executor::loadState : Not yet implemented
162 * - Executor::getNbOfThreads
163 * - Executor::displayDot
164 * - Executor::setStopOnError : ask to stop execution if a node is found in ERROR state
166 * If the pilot wants to wait the state YACS::PAUSED or YACS::WAITINGTASKS, synchronisation is obtained with:
167 * - Executor::waitPause
170 * - Pilot may connect to executor during execution, or deconnect.
171 * - Several Pilots may be connected at the same time (for observation...)
175 void Executor::RunB(Scheduler *graph,int debug, bool fromScratch)
177 DEBTRACE("Executor::RunB debug: "<< graph->getName() <<" "<< debug<<" fromScratch: "<<fromScratch);
179 { // --- Critical section
180 _mutexForSchedulerUpdate.lock();
182 _root = dynamic_cast<ComposedNode *>(_mainSched);
183 if (!_root) throw Exception("Executor::Run, Internal Error!");
184 _executorState = YACS::NOTYETINITIALIZED;
185 sendEvent("executor");
188 _errorDetected = false;
189 _isWaitingEventsFromRunningTasks = false;
190 _numberOfRunningTasks = 0;
191 string tracefile = "traceExec_";
192 tracefile += _mainSched->getName();
193 _trace.open(tracefile.c_str());
194 _mutexForSchedulerUpdate.unlock();
195 } // --- End of critical section
197 if (debug > 1) _displayDot(graph);
202 graph->exUpdateState();
204 _executorState = YACS::INITIALISED;
205 sendEvent("executor");
207 if (debug > 1) _displayDot(graph);
209 vector<Task *>::iterator iter;
214 _executorState = YACS::RUNNING;
215 sendEvent("executor");
218 DEBTRACE("--- executor main loop");
219 sleepWhileNoEventsFromAnyRunningTask();
220 DEBTRACE("--- events...");
221 if (debug > 2) _displayDot(graph);
222 { // --- Critical section
223 _mutexForSchedulerUpdate.lock();
224 _tasks=graph->getNextTasks(isMore);
225 numberAllTasks=_numberOfRunningTasks+_tasks.size();
226 graph->selectRunnableTasks(_tasks);
227 _mutexForSchedulerUpdate.unlock();
228 } // --- End of critical section
229 if (debug > 2) _displayDot(graph);
230 if (_executorState == YACS::RUNNING)
232 if (checkBreakPoints()) break; // end of thread requested, OK to exit at once;
233 if (debug > 0) _displayDot(graph);
235 for (iter = _tasks.begin(); iter != _tasks.end(); iter++)
237 if (debug > 1) _displayDot(graph);
242 if (debug > 1) _displayDot(graph);
243 { // --- Critical section
245 _mutexForSchedulerUpdate.lock();
246 _toContinue = !graph->isFinished();
248 if(_toContinue && numberAllTasks==0)
250 //Problem : no running tasks and no task to launch ??
252 std::cerr << "Problem in Executor : no running tasks and no task to launch ?? problemCount=" << problemCount << std::endl;
253 //Pause to give a chance to interrupt
255 if(problemCount > 25)
257 // Too much problems encountered : stop execution
264 _executorState = YACS::FINISHED;
265 sendEvent("executor");
266 _condForPilot.notify_all();
268 _mutexForSchedulerUpdate.unlock();
269 } // --- End of critical section
270 if (debug > 0) _displayDot(graph);
271 DEBTRACE("_toContinue: " << _toContinue);
274 DEBTRACE("End of main Loop");
276 { // --- Critical section
277 _mutexForSchedulerUpdate.lock();
278 if ( _toContinue) // --- break while(): request to stop detected on checkBreakPoints()
280 DEBTRACE("stop requested: End soon");
281 _executorState = YACS::STOPPED;
283 sendEvent("executor");
285 _mutexForSchedulerUpdate.unlock();
286 } // --- End of critical section
287 if ( _dumpOnErrorRequested && _errorDetected && !_isRunningunderExternalControl)
289 saveState(_dumpErrorFile);
292 DEBTRACE("End of RunB thread");
295 YACS::ExecutionMode Executor::getCurrentExecMode()
297 _isRunningunderExternalControl=true;
302 YACS::ExecutorState Executor::getExecutorState()
304 _isRunningunderExternalControl=true;
305 return _executorState;
309 bool Executor::isNotFinished()
311 _isRunningunderExternalControl=true;
315 //! ask to stop execution on the first node found in error
317 * \param dumpRequested produce a state dump when an error is found
318 * \param xmlFile name of file used for state dump
321 void Executor::setStopOnError(bool dumpRequested, std::string xmlFile)
323 { // --- Critical section
324 _mutexForSchedulerUpdate.lock();
325 _dumpErrorFile=xmlFile;
326 _stopOnErrorRequested=true;
327 _dumpOnErrorRequested = dumpRequested;
328 if (dumpRequested && xmlFile.empty())
329 throw YACS::Exception("dump on error requested and no filename given for dump");
330 _mutexForSchedulerUpdate.unlock();
331 } // --- End of critical section
334 //! Dynamically set the current mode of execution
336 * The mode can be Continue, step by step, or stop before execution of a node
337 * defined in a list of breakpoints.
340 void Executor::setExecMode(YACS::ExecutionMode mode)
342 DEBTRACE("Executor::setExecMode(YACS::ExecutionMode mode) " << mode);
343 { // --- Critical section
344 _mutexForSchedulerUpdate.lock();
345 _isRunningunderExternalControl=true;
347 _mutexForSchedulerUpdate.unlock();
348 } // --- End of critical section
351 //! wake up executor when in pause
353 * When Executor is in state paused or waiting for task completion, the thread
354 * running loop RunB waits on condition _condForStepByStep.
355 * Thread RunB is waken up.
356 * \return true when actually wakes up executor
359 bool Executor::resumeCurrentBreakPoint()
361 DEBTRACE("Executor::resumeCurrentBreakPoint()");
363 //bool doDump = false;
364 { // --- Critical section
365 _mutexForSchedulerUpdate.lock();
366 _isRunningunderExternalControl=true;
367 DEBTRACE("_executorState: " << _executorState);
368 switch (_executorState)
370 case YACS::WAITINGTASKS:
373 _condForStepByStep.notify_all();
374 _executorState = YACS::RUNNING;
375 sendEvent("executor");
377 //if (_dumpOnErrorRequested && _errorDetected) doDump =true;
383 //if (_dumpOnErrorRequested && _errorDetected) doDump =true;
384 DEBTRACE("Graph Execution finished or stopped !");
389 // debug: no easy way to verify if main loop is acutally waiting on condition
392 _mutexForSchedulerUpdate.unlock();
394 //if (doDump) saveState(_dumpErrorFile);
395 } // --- End of critical section
400 //! define a list of nodes names as breakpoints in the graph
403 void Executor::setListOfBreakPoints(std::list<std::string> listOfBreakPoints)
405 DEBTRACE("Executor::setListOfBreakPoints(std::list<std::string> listOfBreakPoints)");
406 { // --- Critical section
407 _mutexForSchedulerUpdate.lock();
408 _isRunningunderExternalControl=true;
409 _listOfBreakPoints = listOfBreakPoints;
410 _mutexForSchedulerUpdate.unlock();
411 } // --- End of critical section
415 //! Get the list of tasks to load, to define a subset to execute in step by step mode
417 * If the executor is not in mode YACS::WAITINGTASKS nor YACS::PAUSED, the list is empty.
418 * Use Executor::waitPause to wait.
420 std::list<std::string> Executor::getTasksToLoad()
422 DEBTRACE("Executor::getTasksToLoad()");
423 list<string> listOfNodesToLoad;
424 listOfNodesToLoad.clear();
425 { // --- Critical section
426 _mutexForSchedulerUpdate.lock();
427 _isRunningunderExternalControl=true;
428 switch (_executorState)
430 case YACS::WAITINGTASKS:
433 listOfNodesToLoad = _listOfTasksToLoad;
436 case YACS::NOTYETINITIALIZED:
437 case YACS::INITIALISED:
446 _mutexForSchedulerUpdate.unlock();
447 } // --- End of critical section
448 return listOfNodesToLoad;
452 //! Define a subset of task to execute in step by step mode
454 * Behaviour is unpredictable if the list is not a subset of the list given by Executor::getTasksToLoad
455 * in the current step.
456 * If some nodes must run in parallel, they must stay together in the list.
459 bool Executor::setStepsToExecute(std::list<std::string> listToExecute)
461 DEBTRACE("Executor::setStepsToExecute(std::list<std::string> listToExecute)");
463 vector<Task *>::iterator iter;
464 vector<Task *> restrictedTasks;
465 { // --- Critical section
466 _mutexForSchedulerUpdate.lock();
467 _isRunningunderExternalControl=true;
468 switch (_executorState)
470 case YACS::WAITINGTASKS:
473 for (iter=_tasksSave.begin(); iter!=_tasksSave.end(); iter++)
475 string readyNode = _mainSched->getTaskName(*iter);
476 if (find(listToExecute.begin(), listToExecute.end(), readyNode)
477 != listToExecute.end())
479 restrictedTasks.push_back(*iter);
480 DEBTRACE("node to execute " << readyNode);
484 for (iter=restrictedTasks.begin(); iter!=restrictedTasks.end(); iter++)
486 _tasks.push_back(*iter);
490 case YACS::NOTYETINITIALIZED:
491 case YACS::INITIALISED:
500 _mutexForSchedulerUpdate.unlock();
501 } // --- End of critical section
504 for (iter=restrictedTasks.begin(); iter!=restrictedTasks.end(); iter++)
506 _tasks.push_back(*iter);
508 for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
510 string readyNode = _mainSched->getTaskName(*iter);
511 DEBTRACE("selected node to execute " << readyNode);
516 //! suspend pilot execution until Executor is in pause or waiting tasks completion mode.
518 * Do nothing if execution is finished or in pause.
519 * Wait first step if Executor is running or in initialization.
522 void Executor::waitPause()
524 DEBTRACE("Executor::waitPause()");
525 { // --- Critical section
526 _mutexForSchedulerUpdate.lock();
527 _isRunningunderExternalControl=true;
528 switch (_executorState)
533 case YACS::WAITINGTASKS:
538 case YACS::NOTYETINITIALIZED:
539 case YACS::INITIALISED:
542 _condForPilot.wait(_mutexForSchedulerUpdate); // wait until executor is PAUSED or WAITINGTASKS
546 _mutexForSchedulerUpdate.unlock();
547 } // --- End of critical section
551 //! stops the execution as soon as possible
553 void Executor::stopExecution()
555 setExecMode(YACS::STEPBYSTEP);
558 resumeCurrentBreakPoint();
561 //! save the current state of execution in an xml file
563 bool Executor::saveState(const std::string& xmlFile)
565 DEBTRACE("Executor::saveState() in " << xmlFile);
566 YACS::ENGINE::VisitorSaveState vst(_root);
567 vst.openFileDump(xmlFile.c_str());
572 //! not yet implemented
574 bool Executor::loadState()
576 DEBTRACE("Executor::loadState()");
577 _isRunningunderExternalControl=true;
581 static int isfile(char *filename)
584 if (stat(filename, &buf) != 0)
586 if (!S_ISREG(buf.st_mode))
591 //! Display the graph state as a dot display, public method
593 void Executor::displayDot(Scheduler *graph)
595 _isRunningunderExternalControl=true;
599 //! Display the graph state as a dot display
601 * \param graph : the node to display
604 void Executor::_displayDot(Scheduler *graph)
606 std::ofstream g("titi");
607 ((ComposedNode*)graph)->writeDot(g);
609 if(isfile("display.sh"))
610 system("sh display.sh");
612 system("dot -Tpng titi|display -delay 5");
615 //! Wait reactivation in modes Step By step or with BreakPoints
617 * Check mode of execution (set by main thread):
618 * - YACS::CONTINUE : the graph execution continues.
619 * - YACS::STEPBYSTEP : wait on condition (pilot thread, Executor::resumeCurrentBreakPoint)
620 * - YACS::STOPBEFORENODES : if there are ready nodes in a list of breakpoints,
621 * wait on condition (pilot thread, Executor::resumeCurrentBreakPoint)
622 * else continue the graph execution.
623 * \return true if end of executor thread is requested
626 bool Executor::checkBreakPoints()
628 DEBTRACE("Executor::checkBreakPoints()");
629 vector<Task *>::iterator iter;
630 bool endRequested = false;
638 case YACS::STOPBEFORENODES:
641 { // --- Critical section
642 _mutexForSchedulerUpdate.lock();
644 for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
646 string nodeToLoad = _mainSched->getTaskName(*iter);
647 if (find(_listOfBreakPoints.begin(), _listOfBreakPoints.end(), nodeToLoad)
648 != _listOfBreakPoints.end())
656 _listOfTasksToLoad.clear();
657 for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
659 string nodeToLoad = _mainSched->getTaskName(*iter);
660 _listOfTasksToLoad.push_back(nodeToLoad);
662 if (getNbOfThreads())
663 _executorState = YACS::WAITINGTASKS; // will be paused after completion of running tasks
665 _executorState = YACS::PAUSED;
666 sendEvent("executor");
667 _condForPilot.notify_all();
669 //_mutexForSchedulerUpdate.unlock();
670 //} // --- End of critical section
671 if (stop && !_isOKToEnd) waitResume(); // wait until pilot calls resumeCurrentBreakPoint(), mutex released during wait
672 if (_isOKToEnd) endRequested = true;
673 _mutexForSchedulerUpdate.unlock();
674 } // --- End of critical section
675 if (stop) DEBTRACE("wake up from waitResume");
679 case YACS::STEPBYSTEP:
681 { // --- Critical section
682 _mutexForSchedulerUpdate.lock();
684 _listOfTasksToLoad.clear();
685 for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
687 string nodeToLoad = _mainSched->getTaskName(*iter);
688 _listOfTasksToLoad.push_back(nodeToLoad);
690 if (getNbOfThreads())
691 _executorState = YACS::WAITINGTASKS; // will be paused after completion of running tasks
693 _executorState = YACS::PAUSED;
694 sendEvent("executor");
695 _condForPilot.notify_all();
697 waitResume(); // wait until pilot calls resumeCurrentBreakPoint(), mutex released during wait
698 // or, if no pilot, wait until no more running tasks (stop on error)
699 if (_isOKToEnd) endRequested = true;
700 _mutexForSchedulerUpdate.unlock();
701 } // --- End of critical section
702 DEBTRACE("wake up from waitResume");
706 DEBTRACE("endRequested: " << endRequested);
711 //! in modes Step By step or with BreakPoint, wait until pilot resumes the execution
713 * With the condition Mutex, the mutex is released atomically during the wait.
714 * Pilot calls Executor::resumeCurrentBreakPoint to resume execution.
715 * Must be called while mutex is locked.
718 void Executor::waitResume()
720 DEBTRACE("Executor::waitResume()");
721 _condForStepByStep.wait(_mutexForSchedulerUpdate); // wait until pilot calls resumeCurrentBreakPoint()
726 //! Perform loading of a Task.
728 * \param task : Task to load
731 void Executor::loadTask(Task *task)
733 DEBTRACE("Executor::loadTask(Task *task)");
734 if(task->getState() != YACS::TOLOAD)return;
736 _mutexForSchedulerUpdate.lock();
738 _mainSched->notifyFrom(task,YACS::START);
739 _mutexForSchedulerUpdate.unlock();
740 }//End of critical section
747 std::cerr << ex.what() << std::endl;
749 _mutexForSchedulerUpdate.lock();
751 _mainSched->notifyFrom(task,YACS::ABORT);
752 _mutexForSchedulerUpdate.unlock();
753 }//End of critical section
757 std::cerr << "Load failed" << std::endl;
759 _mutexForSchedulerUpdate.lock();
761 _mainSched->notifyFrom(task,YACS::ABORT);
762 _mutexForSchedulerUpdate.unlock();
763 }//End of critical section
768 //! Execute a list of tasks possibly connected through datastream links
770 * \param tasks : a list of tasks to execute
773 void Executor::launchTasks(std::vector<Task *>& tasks)
775 vector<Task *>::iterator iter;
776 //First phase, initialize the execution
777 for(iter=tasks.begin();iter!=tasks.end();iter++)
779 if((*iter)->getState() != YACS::TOACTIVATE)continue;
782 (*iter)->initService();
783 traceExec(*iter, "initService");
787 std::cerr << ex.what() << std::endl;
789 _mutexForSchedulerUpdate.lock();
791 _mainSched->notifyFrom(*iter,YACS::ABORT);
792 _mutexForSchedulerUpdate.unlock();
793 }//End of critical section
797 std::cerr << "Problem in initService" << std::endl;
799 _mutexForSchedulerUpdate.lock();
801 _mainSched->notifyFrom(*iter,YACS::ABORT);
802 _mutexForSchedulerUpdate.unlock();
803 }//End of critical section
806 //Second phase, make datastream connections
807 for(iter=tasks.begin();iter!=tasks.end();iter++)
809 if((*iter)->getState() != YACS::TOACTIVATE)continue;
812 (*iter)->connectService();
813 traceExec(*iter, "connectService");
817 std::cerr << ex.what() << std::endl;
819 _mutexForSchedulerUpdate.lock();
821 _mainSched->notifyFrom(*iter,YACS::ABORT);
822 _mutexForSchedulerUpdate.unlock();
823 }//End of critical section
827 std::cerr << "Problem in connectService" << std::endl;
829 _mutexForSchedulerUpdate.lock();
831 _mainSched->notifyFrom(*iter,YACS::ABORT);
832 _mutexForSchedulerUpdate.unlock();
833 }//End of critical section
836 //Third phase, execute each task in a thread
837 for(iter=tasks.begin();iter!=tasks.end();iter++)
839 DEBTRACE("before _semForMaxThreads.wait " << _semThreadCnt);
840 _semForMaxThreads.wait();
846 //! Execute a Task in a thread
848 * \param task : Task to execute
850 * Calls Scheduler::notifyFrom of main node (_mainSched) to notify start
852 * Calls Executor::functionForTaskExecution in Thread
855 void Executor::launchTask(Task *task)
857 DEBTRACE("Executor::launchTask(Task *task)");
858 if(task->getState() != YACS::TOACTIVATE)return;
859 void **args=new void *[3];
860 args[0]=(void *)task;
861 args[1]=(void *)_mainSched;
862 args[2]=(void *)this;
863 traceExec(task, "launch");
865 { // --- Critical section
866 _mutexForSchedulerUpdate.lock();
867 _numberOfRunningTasks++;
868 task->begin(); //change state to ACTIVATED
869 //no more need : done when loading
870 //_mainSched->notifyFrom(task,YACS::START);
871 _mutexForSchedulerUpdate.unlock();
872 } // --- End of critical section
873 Thread(functionForTaskExecution,args);
874 //functionForTaskExecution(args);
877 //! wait until a running task ends
879 void Executor::sleepWhileNoEventsFromAnyRunningTask()
881 DEBTRACE("Executor::sleepWhileNoEventsFromAnyRunningTask()");
882 // _semForNewTasksToPerform.wait(); //----utiliser pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
883 _mutexForSchedulerUpdate.lock();
884 if (_numberOfRunningTasks > 0)
886 _isWaitingEventsFromRunningTasks = true;
887 _condForNewTasksToPerform.wait(_mutexForSchedulerUpdate); // mutex released during wait
889 _mutexForSchedulerUpdate.unlock();
895 void Executor::notifyEndOfThread(YACS::BASES::Thread *thread)
897 /*_mutexForNbOfConcurrentThreads.lock();
898 _groupOfAllThreadsCreated.remove(thread);
900 _mutexForNbOfConcurrentThreads.unlock();*/
904 //! must be used protected by _mutexForSchedulerUpdate!
906 void Executor::wakeUp()
908 DEBTRACE("Executor::wakeUp()");
909 if (_isWaitingEventsFromRunningTasks)
911 _isWaitingEventsFromRunningTasks = false;
912 _condForNewTasksToPerform.notify_all();
916 //! number of running tasks
918 int Executor::getNbOfThreads()
921 _mutexForNbOfConcurrentThreads.lock();
922 _isRunningunderExternalControl=true;
923 ret = _groupOfAllThreadsCreated.size();
924 _mutexForNbOfConcurrentThreads.unlock();
929 //! Function to perform execution of a task in a thread
931 * \param arg : 3 elements (a Task, a Scheduler, an Executor)
933 * Calls Task::execute
935 * Calls Task::finished when the task is finished
937 * Calls (notify with event YACS::FINISH) Scheduler::notifyFrom when the task is finished
939 * Calls Executor::wakeUp and Executor::notifyEndOfThread
942 void *Executor::functionForTaskExecution(void *arg)
944 DEBTRACE("Executor::functionForTaskExecution(void *arg)");
945 void **argT=(void **)arg;
946 Task *task=(Task *)argT[0];
947 Scheduler *sched=(Scheduler *)argT[1];
948 Executor *execInst=(Executor *)argT[2];
954 YACS::Event ev=YACS::FINISH;
957 execInst->traceExec(task, "start execution");
959 execInst->traceExec(task, "end execution OK");
963 std::cerr << "YACS Exception during execute" << std::endl;
964 std::cerr << ex.what() << std::endl;
966 string message = "end execution ABORT, ";
967 message += ex.what();
968 execInst->traceExec(task, message);
972 // Execution has failed
973 std::cerr << "Execution has failed: unknown reason" << std::endl;
975 execInst->traceExec(task, "end execution ABORT, unknown reason");
981 task->disconnectService();
982 execInst->traceExec(task, "disconnectService");
986 // Disconnect has failed
987 std::cerr << "disconnect has failed" << std::endl;
989 execInst->traceExec(task, "disconnectService failed, ABORT");
992 DEBTRACE("End task->execute()");
993 { // --- Critical section
994 execInst->_mutexForSchedulerUpdate.lock();
997 if (ev == YACS::FINISH) task->finished();
998 if (ev == YACS::ABORT)
1000 execInst->_errorDetected = true;
1001 if (execInst->_stopOnErrorRequested)
1003 execInst->_execMode = YACS::STEPBYSTEP;
1004 if (!execInst->_isRunningunderExternalControl) execInst->_isOKToEnd = true;
1008 sched->notifyFrom(task,ev);
1010 catch(Exception& ex)
1012 //notify has failed : it is supposed to have set state
1013 //so no need to do anything
1014 std::cerr << "Error during notification" << std::endl;
1015 std::cerr << ex.what() << std::endl;
1019 //notify has failed : it is supposed to have set state
1020 //so no need to do anything
1021 std::cerr << "Notification failed" << std::endl;
1023 execInst->_numberOfRunningTasks--;
1024 DEBTRACE("_numberOfRunningTasks: " << execInst->_numberOfRunningTasks
1025 << " _execMode: " << execInst->_execMode
1026 << " _executorState: " << execInst->_executorState);
1027 if ((execInst->_numberOfRunningTasks == 0) && (execInst->_execMode != YACS::CONTINUE)) // no more running tasks
1029 if (execInst->_executorState == YACS::WAITINGTASKS)
1031 execInst->_executorState = YACS::PAUSED;
1032 execInst->sendEvent("executor");
1033 execInst->_condForPilot.notify_all();
1034 if (execInst->_errorDetected &&
1035 execInst->_stopOnErrorRequested &&
1036 !execInst->_isRunningunderExternalControl)
1037 execInst->_condForStepByStep.notify_all(); // exec thread may be on waitResume
1040 DEBTRACE("before _semForMaxThreads.post " << execInst->_semThreadCnt);
1041 execInst->_semForMaxThreads.post();
1042 execInst->_semThreadCnt += 1;
1043 DEBTRACE("after _semForMaxThreads.post " << execInst->_semThreadCnt);
1044 if (execInst->_executorState != YACS::PAUSED) execInst->wakeUp();
1046 execInst->_mutexForSchedulerUpdate.unlock();
1047 } // --- End of critical section (change state)
1049 //execInst->notifyEndOfThread(0);
1054 void Executor::traceExec(Task *task, const std::string& message)
1056 string nodeName = _mainSched->getTaskName(task);
1057 _mutexForTrace.lock();
1058 _trace << nodeName << " " << message << endl;
1060 _mutexForTrace.unlock();
1063 //! emit notification to all observers registered with the dispatcher
1065 * The dispatcher is unique and can be obtained by getDispatcher()
1067 void Executor::sendEvent(const std::string& event)
1069 Dispatcher* disp=Dispatcher::getDispatcher();
1072 disp->dispatch(_root,event);