1 // Copyright (C) 2006-2012 CEA/DEN, EDF R&D
3 // This library is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU Lesser General Public
5 // License as published by the Free Software Foundation; either
6 // version 2.1 of the License.
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 // Lesser General Public License for more details.
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
20 #include "Executor.hxx"
22 #include "Scheduler.hxx"
23 #include "Dispatcher.hxx"
24 #include "Container.hxx"
25 #include "ComponentInstance.hxx"
27 #include "VisitorSaveState.hxx"
28 #include "ComposedNode.hxx"
42 #define usleep(A) _sleep(A/1000)
43 #if !defined(S_ISCHR) || !defined(S_ISREG)
46 # define S_IFMT _S_IFMT
47 # define S_IFCHR _S_IFCHR
48 # define S_IFREG _S_IFREG
51 # define S_IFMT __S_IFMT
52 # define S_IFCHR __S_IFCHR
53 # define S_IFREG __S_IFREG
57 # define S_ISCHR(mode) (((mode) & S_IFMT) == S_IFCHR)
58 # define S_ISREG(mode) (((mode) & S_IFMT) == S_IFREG)
62 using namespace YACS::ENGINE;
65 using YACS::BASES::Mutex;
66 using YACS::BASES::Thread;
67 using YACS::BASES::Semaphore;
70 #include "YacsTrace.hxx"
72 int Executor::_maxThreads(50);
74 Executor::Executor():_nbOfConcurrentThreads(0), _semForMaxThreads(_maxThreads)
79 _stopOnErrorRequested = false;
80 _dumpOnErrorRequested = false;
81 _errorDetected = false;
82 _isRunningunderExternalControl=false;
83 _executorState = YACS::NOTYETINITIALIZED;
84 _execMode = YACS::CONTINUE;
85 _semThreadCnt = _maxThreads;
86 DEBTRACE("Executor initialized with max threads = " << _maxThreads);
91 for(list<Thread *>::iterator iter=_groupOfAllThreadsCreated.begin();iter!=_groupOfAllThreadsCreated.end();iter++)
95 //! Execute a graph waiting for completion
97 * \param graph : schema to execute
98 * \param debug : display the graph with dot if debug == 1
99 * \param fromScratch : if true the graph is reinitialized
101 * Calls Scheduler::getNextTasks and Scheduler::selectRunnableTasks to select tasks to execute
103 * Calls Executor::launchTask to execute a selected Task.
105 * Completion when graph is finished (Scheduler::isFinished)
108 void Executor::RunA(Scheduler *graph,int debug, bool fromScratch)
110 DEBTRACE("Executor::RunW debug: " << debug << " fromScratch: " << fromScratch);
112 _root = dynamic_cast<ComposedNode *>(_mainSched);
113 if (!_root) throw Exception("Executor::Run, Internal Error!");
116 if(debug>1)_displayDot(graph);
120 graph->exUpdateState();
122 if(debug>1)_displayDot(graph);
123 vector<Task *> tasks;
124 vector<Task *>::iterator iter;
126 _execMode = YACS::CONTINUE;
127 _isWaitingEventsFromRunningTasks = false;
128 _numberOfRunningTasks = 0;
129 _runningTasks.clear();
130 _numberOfEndedTasks=0;
133 sleepWhileNoEventsFromAnyRunningTask();
135 if(debug>2)_displayDot(graph);
138 _mutexForSchedulerUpdate.lock();
139 tasks=graph->getNextTasks(isMore);
140 graph->selectRunnableTasks(tasks);
141 _mutexForSchedulerUpdate.unlock();
142 }//End of critical section
144 if(debug>2)_displayDot(graph);
146 for(iter=tasks.begin();iter!=tasks.end();iter++)
149 if(debug>1)_displayDot(graph);
153 if(debug>1)_displayDot(graph);
156 _mutexForSchedulerUpdate.lock();
157 _toContinue=!graph->isFinished();
158 _mutexForSchedulerUpdate.unlock();
159 }//End of critical section
160 DEBTRACE("_toContinue: " << _toContinue);
162 if(debug>0)_displayDot(graph);
168 //! Execute a graph with breakpoints or step by step
170 * To be launch in a thread (main thread controls the progression).
171 * \param graph : schema to execute
172 * \param debug : display the graph with dot if debug >0
173 * \param fromScratch : if false, state from a previous partial exection is already loaded
175 * Calls Scheduler::getNextTasks and Scheduler::selectRunnableTasks to select tasks to execute
177 * Calls Executor::checkBreakPoints to verify if a pause is requested
179 * Calls Executor::launchTask to execute a selected Task
181 * Completion when graph is finished (Scheduler::isFinished)
183 * States of execution:
184 * - YACS::NOTYETINITIALIZED
185 * - YACS::INITIALISED
186 * - YACS::RUNNING (to next breakpoint or step)
187 * - YACS::WAITINGTASKS (a breakpoint or step as been reached, but there are still running tasks)
188 * - YACS::PAUSED (a breakpoint or step as been reached, no more running tasks)
189 * - YACS::FINISHED (no more ready tasks, nore running tasks)
190 * - YACS::STOPPED (stopped by user before end)
192 * Modes of Execution:
193 * - YACS::CONTINUE (normal run without breakpoints)
194 * - YACS::STEPBYSTEP (pause at each loop)
195 * - YACS::STOPBEFORENODES (pause when a node is reached)
197 * A breakpoint is defined by a node name. The breakpoint is reached when the node becomes ready.
198 * Step by Step means execution node by node or group of node by group of nodes.
199 * At a given step, the user decides to launch all the ready nodes or only a subset
200 * (Caution: some nodes must run in parallel).
201 * The next event (end of task) may give a new set of ready nodes, and define a new step.
203 * The graph execution may be controled by a pilot which sends requests. Requests are asynchronous.
204 * Requests are taken into account only on certain states, otherwise return the status IgnoredRequest.
205 * - Executor::getCurrentExecMode
206 * - Executor::getExecutorState
207 * - Executor::setExecMode : change the execution mode for next loop
208 * - Executor::setListOfBreakPoints : must be set before setting YACS::STOPBEFORENODES
209 * - Executor::getTasksToLoad : when paused or waiting tasks, get the list of next tasks
210 * - Executor::setStepsToExecute : define a subset of the list given by Executor::getTasksToLoad
211 * - Executor::resumeCurrentBreakPoint : when paused or waiting tasks, resumes execution
212 * - Executor::isNotFinished
213 * - Executor::stopExecution : stop execution asap, i.e. set STEPBYSTEP and wait PAUSED
214 * - Executor::saveState : dump the current state of execution in an xml file
215 * - Executor::loadState : Not yet implemented
216 * - Executor::getNbOfThreads
217 * - Executor::displayDot
218 * - Executor::setStopOnError : ask to stop execution if a node is found in ERROR state
220 * If the pilot wants to wait the state YACS::PAUSED or YACS::WAITINGTASKS, synchronisation is obtained with:
221 * - Executor::waitPause
224 * - Pilot may connect to executor during execution, or deconnect.
225 * - Several Pilots may be connected at the same time (for observation...)
229 void Executor::RunB(Scheduler *graph,int debug, bool fromScratch)
231 DEBTRACE("Executor::RunB debug: "<< graph->getName() <<" "<< debug<<" fromScratch: "<<fromScratch);
233 { // --- Critical section
234 _mutexForSchedulerUpdate.lock();
236 _root = dynamic_cast<ComposedNode *>(_mainSched);
237 if (!_root) throw Exception("Executor::Run, Internal Error!");
238 _executorState = YACS::NOTYETINITIALIZED;
239 sendEvent("executor");
242 _errorDetected = false;
243 _isWaitingEventsFromRunningTasks = false;
244 _numberOfRunningTasks = 0;
245 _runningTasks.clear();
246 _numberOfEndedTasks = 0;
247 string tracefile = "traceExec_";
248 tracefile += _mainSched->getName();
249 _trace.open(tracefile.c_str());
251 _start = timeGetTime();
253 gettimeofday(&_start, NULL);
256 _mutexForSchedulerUpdate.unlock();
257 } // --- End of critical section
259 if (debug > 1) _displayDot(graph);
266 graph->exUpdateState();
270 DEBTRACE("exception: "<< (ex.what()));
271 _executorState = YACS::FINISHED;
272 sendEvent("executor");
276 _executorState = YACS::INITIALISED;
277 sendEvent("executor");
279 if (debug > 1) _displayDot(graph);
281 vector<Task *>::iterator iter;
286 _executorState = YACS::RUNNING;
287 sendEvent("executor");
290 DEBTRACE("--- executor main loop");
291 sleepWhileNoEventsFromAnyRunningTask();
292 DEBTRACE("--- events...");
293 if (debug > 2) _displayDot(graph);
294 { // --- Critical section
295 _mutexForSchedulerUpdate.lock();
296 _tasks=graph->getNextTasks(isMore);
297 numberAllTasks=_numberOfRunningTasks+_tasks.size();
298 graph->selectRunnableTasks(_tasks);
299 _mutexForSchedulerUpdate.unlock();
300 } // --- End of critical section
301 if (debug > 2) _displayDot(graph);
302 if (_executorState == YACS::RUNNING)
304 if (checkBreakPoints()) break; // end of thread requested, OK to exit at once;
305 if (debug > 0) _displayDot(graph);
307 for (iter = _tasks.begin(); iter != _tasks.end(); iter++)
309 if (debug > 1) _displayDot(graph);
314 if (debug > 1) _displayDot(graph);
315 { // --- Critical section
317 _mutexForSchedulerUpdate.lock();
318 //It is possible that the graph is finished but it remains running tasks (it's an error but we must take it into account)
319 if(_numberOfRunningTasks == 0)
320 _toContinue = !graph->isFinished();
322 DEBTRACE("_numberOfRunningTasks: " << _numberOfRunningTasks);
323 DEBTRACE("_numberOfEndedTasks: " << _numberOfEndedTasks);
324 DEBTRACE("_toContinue: " << _toContinue);
325 if(_toContinue && numberAllTasks==0)
327 //Problem : no running tasks and no task to launch ??
329 std::cerr << "Problem in Executor : no running tasks and no task to launch ?? problemCount=" << problemCount << std::endl;
330 //Pause to give a chance to interrupt
332 if(problemCount > 25)
334 // Too much problems encountered : stop execution
341 _executorState = YACS::FINISHED;
342 sendEvent("executor");
343 _condForPilot.notify_all();
345 _mutexForSchedulerUpdate.unlock();
346 } // --- End of critical section
347 if (debug > 0) _displayDot(graph);
348 DEBTRACE("_toContinue: " << _toContinue);
351 DEBTRACE("End of main Loop");
353 { // --- Critical section
354 _mutexForSchedulerUpdate.lock();
355 if ( _toContinue) // --- break while(): request to stop detected on checkBreakPoints()
357 DEBTRACE("stop requested: End soon");
358 _executorState = YACS::STOPPED;
360 sendEvent("executor");
362 _mutexForSchedulerUpdate.unlock();
363 } // --- End of critical section
364 if ( _dumpOnErrorRequested && _errorDetected)
366 saveState(_dumpErrorFile);
369 DEBTRACE("End of RunB thread");
372 YACS::ExecutionMode Executor::getCurrentExecMode()
374 _isRunningunderExternalControl=true;
379 YACS::ExecutorState Executor::getExecutorState()
381 _isRunningunderExternalControl=true;
382 return _executorState;
386 bool Executor::isNotFinished()
388 _isRunningunderExternalControl=true;
392 //! ask to stop execution on the first node found in error
394 * \param dumpRequested produce a state dump when an error is found
395 * \param xmlFile name of file used for state dump
398 void Executor::setStopOnError(bool dumpRequested, std::string xmlFile)
400 { // --- Critical section
401 _mutexForSchedulerUpdate.lock();
402 _dumpErrorFile=xmlFile;
403 _stopOnErrorRequested=true;
404 _dumpOnErrorRequested = dumpRequested;
405 if (dumpRequested && xmlFile.empty())
406 throw YACS::Exception("dump on error requested and no filename given for dump");
407 _mutexForSchedulerUpdate.unlock();
408 DEBTRACE("_dumpErrorFile " << _dumpErrorFile << " " << _dumpOnErrorRequested);
409 } // --- End of critical section
412 //! ask to do not stop execution on nodes found in error
416 void Executor::unsetStopOnError()
418 { // --- Critical section
419 _mutexForSchedulerUpdate.lock();
420 _stopOnErrorRequested=false;
421 _mutexForSchedulerUpdate.unlock();
422 } // --- End of critical section
425 //! Dynamically set the current mode of execution
427 * The mode can be Continue, step by step, or stop before execution of a node
428 * defined in a list of breakpoints.
431 void Executor::setExecMode(YACS::ExecutionMode mode)
433 DEBTRACE("Executor::setExecMode(YACS::ExecutionMode mode) " << mode);
434 { // --- Critical section
435 _mutexForSchedulerUpdate.lock();
436 _isRunningunderExternalControl=true;
438 _mutexForSchedulerUpdate.unlock();
439 } // --- End of critical section
442 //! wake up executor when in pause
444 * When Executor is in state paused or waiting for task completion, the thread
445 * running loop RunB waits on condition _condForStepByStep.
446 * Thread RunB is waken up.
447 * \return true when actually wakes up executor
450 bool Executor::resumeCurrentBreakPoint()
452 DEBTRACE("Executor::resumeCurrentBreakPoint()");
454 //bool doDump = false;
455 { // --- Critical section
456 _mutexForSchedulerUpdate.lock();
457 _isRunningunderExternalControl=true;
458 DEBTRACE("_executorState: " << _executorState);
459 switch (_executorState)
461 case YACS::WAITINGTASKS:
464 _condForStepByStep.notify_all();
465 _executorState = YACS::RUNNING;
466 sendEvent("executor");
468 //if (_dumpOnErrorRequested && _errorDetected) doDump =true;
474 //if (_dumpOnErrorRequested && _errorDetected) doDump =true;
475 DEBTRACE("Graph Execution finished or stopped !");
480 // debug: no easy way to verify if main loop is acutally waiting on condition
483 _mutexForSchedulerUpdate.unlock();
485 //if (doDump) saveState(_dumpErrorFile);
486 } // --- End of critical section
491 //! define a list of nodes names as breakpoints in the graph
494 void Executor::setListOfBreakPoints(std::list<std::string> listOfBreakPoints)
496 DEBTRACE("Executor::setListOfBreakPoints(std::list<std::string> listOfBreakPoints)");
497 { // --- Critical section
498 _mutexForSchedulerUpdate.lock();
499 _isRunningunderExternalControl=true;
500 _listOfBreakPoints = listOfBreakPoints;
501 _mutexForSchedulerUpdate.unlock();
502 } // --- End of critical section
506 //! Get the list of tasks to load, to define a subset to execute in step by step mode
508 * If the executor is not in mode YACS::WAITINGTASKS nor YACS::PAUSED, the list is empty.
509 * Use Executor::waitPause to wait.
511 std::list<std::string> Executor::getTasksToLoad()
513 DEBTRACE("Executor::getTasksToLoad()");
514 list<string> listOfNodesToLoad;
515 listOfNodesToLoad.clear();
516 { // --- Critical section
517 _mutexForSchedulerUpdate.lock();
518 _isRunningunderExternalControl=true;
519 switch (_executorState)
521 case YACS::WAITINGTASKS:
524 listOfNodesToLoad = _listOfTasksToLoad;
527 case YACS::NOTYETINITIALIZED:
528 case YACS::INITIALISED:
537 _mutexForSchedulerUpdate.unlock();
538 } // --- End of critical section
539 return listOfNodesToLoad;
543 //! Define a subset of task to execute in step by step mode
545 * Behaviour is unpredictable if the list is not a subset of the list given by Executor::getTasksToLoad
546 * in the current step.
547 * If some nodes must run in parallel, they must stay together in the list.
550 bool Executor::setStepsToExecute(std::list<std::string> listToExecute)
552 DEBTRACE("Executor::setStepsToExecute(std::list<std::string> listToExecute)");
554 vector<Task *>::iterator iter;
555 vector<Task *> restrictedTasks;
556 { // --- Critical section
557 _mutexForSchedulerUpdate.lock();
558 _isRunningunderExternalControl=true;
559 switch (_executorState)
561 case YACS::WAITINGTASKS:
564 for (iter=_tasksSave.begin(); iter!=_tasksSave.end(); iter++)
566 string readyNode = _mainSched->getTaskName(*iter);
567 if (find(listToExecute.begin(), listToExecute.end(), readyNode)
568 != listToExecute.end())
570 restrictedTasks.push_back(*iter);
571 DEBTRACE("node to execute " << readyNode);
575 for (iter=restrictedTasks.begin(); iter!=restrictedTasks.end(); iter++)
577 _tasks.push_back(*iter);
581 case YACS::NOTYETINITIALIZED:
582 case YACS::INITIALISED:
591 _mutexForSchedulerUpdate.unlock();
592 } // --- End of critical section
595 for (iter=restrictedTasks.begin(); iter!=restrictedTasks.end(); iter++)
597 _tasks.push_back(*iter);
599 for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
601 string readyNode = _mainSched->getTaskName(*iter);
602 DEBTRACE("selected node to execute " << readyNode);
608 //! suspend pilot execution until Executor is in pause or waiting tasks completion mode.
610 * Do nothing if execution is finished or in pause.
611 * Wait first step if Executor is running or in initialization.
614 void Executor::waitPause()
616 DEBTRACE("Executor::waitPause()" << _executorState);
617 { // --- Critical section
618 _mutexForSchedulerUpdate.lock();
619 _isRunningunderExternalControl=true;
620 switch (_executorState)
625 case YACS::WAITINGTASKS:
630 case YACS::NOTYETINITIALIZED:
631 case YACS::INITIALISED:
634 _condForPilot.wait(_mutexForSchedulerUpdate); // wait until executor is PAUSED or WAITINGTASKS
638 _mutexForSchedulerUpdate.unlock();
639 } // --- End of critical section
643 //! stops the execution as soon as possible
645 void Executor::stopExecution()
647 setExecMode(YACS::STEPBYSTEP);
650 resumeCurrentBreakPoint();
653 //! save the current state of execution in an xml file
655 bool Executor::saveState(const std::string& xmlFile)
657 DEBTRACE("Executor::saveState() in " << xmlFile);
658 YACS::ENGINE::VisitorSaveState vst(_root);
659 vst.openFileDump(xmlFile.c_str());
665 //! not yet implemented
667 bool Executor::loadState()
669 DEBTRACE("Executor::loadState()");
670 _isRunningunderExternalControl=true;
675 static int isfile(const char *filename)
678 if (stat(filename, &buf) != 0)
680 if (!S_ISREG(buf.st_mode))
685 //! Display the graph state as a dot display, public method
687 void Executor::displayDot(Scheduler *graph)
689 _isRunningunderExternalControl=true;
693 //! Display the graph state as a dot display
695 * \param graph : the node to display
698 void Executor::_displayDot(Scheduler *graph)
700 std::ofstream g("titi");
701 ((ComposedNode*)graph)->writeDot(g);
703 const char displayScript[]="display.sh";
704 if(isfile(displayScript))
705 system("sh display.sh");
707 system("dot -Tpng titi|display -delay 5");
710 //! Wait reactivation in modes Step By step or with BreakPoints
712 * Check mode of execution (set by main thread):
713 * - YACS::CONTINUE : the graph execution continues.
714 * - YACS::STEPBYSTEP : wait on condition (pilot thread, Executor::resumeCurrentBreakPoint)
715 * - YACS::STOPBEFORENODES : if there are ready nodes in a list of breakpoints,
716 * wait on condition (pilot thread, Executor::resumeCurrentBreakPoint)
717 * else continue the graph execution.
718 * \return true if end of executor thread is requested
721 bool Executor::checkBreakPoints()
723 DEBTRACE("Executor::checkBreakPoints()");
724 vector<Task *>::iterator iter;
725 bool endRequested = false;
733 case YACS::STOPBEFORENODES:
736 { // --- Critical section
737 _mutexForSchedulerUpdate.lock();
739 for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
741 string nodeToLoad = _mainSched->getTaskName(*iter);
742 if (find(_listOfBreakPoints.begin(), _listOfBreakPoints.end(), nodeToLoad)
743 != _listOfBreakPoints.end())
751 _listOfTasksToLoad.clear();
752 for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
754 string nodeToLoad = _mainSched->getTaskName(*iter);
755 _listOfTasksToLoad.push_back(nodeToLoad);
757 if (getNbOfThreads())
758 _executorState = YACS::WAITINGTASKS; // will be paused after completion of running tasks
760 _executorState = YACS::PAUSED;
761 sendEvent("executor");
762 _condForPilot.notify_all();
764 //_mutexForSchedulerUpdate.unlock();
765 //} // --- End of critical section
766 if (stop && !_isOKToEnd) waitResume(); // wait until pilot calls resumeCurrentBreakPoint(), mutex released during wait
767 if (_isOKToEnd) endRequested = true;
768 _mutexForSchedulerUpdate.unlock();
769 } // --- End of critical section
770 if (stop) DEBTRACE("wake up from waitResume");
774 case YACS::STEPBYSTEP:
776 { // --- Critical section
777 _mutexForSchedulerUpdate.lock();
779 _listOfTasksToLoad.clear();
780 for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
782 string nodeToLoad = _mainSched->getTaskName(*iter);
783 _listOfTasksToLoad.push_back(nodeToLoad);
785 if (getNbOfThreads())
786 _executorState = YACS::WAITINGTASKS; // will be paused after completion of running tasks
788 _executorState = YACS::PAUSED;
789 sendEvent("executor");
790 _condForPilot.notify_all();
792 waitResume(); // wait until pilot calls resumeCurrentBreakPoint(), mutex released during wait
793 // or, if no pilot, wait until no more running tasks (stop on error)
794 if (_isOKToEnd) endRequested = true;
795 _mutexForSchedulerUpdate.unlock();
796 } // --- End of critical section
797 DEBTRACE("wake up from waitResume");
801 DEBTRACE("endRequested: " << endRequested);
806 //! in modes Step By step or with BreakPoint, wait until pilot resumes the execution
808 * With the condition Mutex, the mutex is released atomically during the wait.
809 * Pilot calls Executor::resumeCurrentBreakPoint to resume execution.
810 * Must be called while mutex is locked.
813 void Executor::waitResume()
815 DEBTRACE("Executor::waitResume()");
816 _condForStepByStep.wait(_mutexForSchedulerUpdate); // wait until pilot calls resumeCurrentBreakPoint()
821 //! Perform loading of a Task.
823 * \param task : Task to load
826 void Executor::loadTask(Task *task)
828 DEBTRACE("Executor::loadTask(Task *task)");
829 if(task->getState() != YACS::TOLOAD)return;
830 traceExec(task, "state:TOLOAD");
832 _mutexForSchedulerUpdate.lock();
833 _mainSched->notifyFrom(task,YACS::START);
834 _mutexForSchedulerUpdate.unlock();
835 }//End of critical section
838 traceExec(task, "load");
840 traceExec(task, "initService");
845 std::cerr << ex.what() << std::endl;
847 _mutexForSchedulerUpdate.lock();
849 _mainSched->notifyFrom(task,YACS::ABORT);
850 traceExec(task, "state:"+Node::getStateName(task->getState()));
851 _mutexForSchedulerUpdate.unlock();
852 }//End of critical section
856 std::cerr << "Load failed" << std::endl;
858 _mutexForSchedulerUpdate.lock();
860 _mainSched->notifyFrom(task,YACS::ABORT);
861 traceExec(task, "state:"+Node::getStateName(task->getState()));
862 _mutexForSchedulerUpdate.unlock();
863 }//End of critical section
868 //! Execute a list of tasks possibly connected through datastream links
870 * \param tasks : a list of tasks to execute
873 void Executor::launchTasks(std::vector<Task *>& tasks)
875 vector<Task *>::iterator iter;
876 //First phase, make datastream connections
877 for(iter=tasks.begin();iter!=tasks.end();iter++)
879 YACS::StatesForNode state=(*iter)->getState();
880 if(state != YACS::TOLOAD && state != YACS::TORECONNECT)continue;
883 (*iter)->connectService();
884 traceExec(*iter, "connectService");
886 _mutexForSchedulerUpdate.lock();
887 (*iter)->connected();
888 _mutexForSchedulerUpdate.unlock();
889 }//End of critical section
893 std::cerr << ex.what() << std::endl;
896 (*iter)->disconnectService();
897 traceExec(*iter, "disconnectService");
901 // Disconnect has failed
902 traceExec(*iter, "disconnectService failed, ABORT");
905 _mutexForSchedulerUpdate.lock();
907 _mainSched->notifyFrom(*iter,YACS::ABORT);
908 _mutexForSchedulerUpdate.unlock();
909 }//End of critical section
913 std::cerr << "Problem in connectService" << std::endl;
916 (*iter)->disconnectService();
917 traceExec(*iter, "disconnectService");
921 // Disconnect has failed
922 traceExec(*iter, "disconnectService failed, ABORT");
925 _mutexForSchedulerUpdate.lock();
927 _mainSched->notifyFrom(*iter,YACS::ABORT);
928 _mutexForSchedulerUpdate.unlock();
929 }//End of critical section
931 if((*iter)->getState() == YACS::ERROR)
933 //try to put all coupled tasks in error
934 std::set<Task*> coupledSet;
935 (*iter)->getCoupledTasks(coupledSet);
936 for (std::set<Task*>::iterator it = coupledSet.begin(); it != coupledSet.end(); ++it)
939 if(t == *iter)continue;
940 if(t->getState() == YACS::ERROR)continue;
943 t->disconnectService();
944 traceExec(t, "disconnectService");
948 // Disconnect has failed
949 traceExec(t, "disconnectService failed, ABORT");
952 _mutexForSchedulerUpdate.lock();
954 _mainSched->notifyFrom(t,YACS::ABORT);
955 _mutexForSchedulerUpdate.unlock();
956 }//End of critical section
957 traceExec(t, "state:"+Node::getStateName(t->getState()));
960 traceExec(*iter, "state:"+Node::getStateName((*iter)->getState()));
963 //Second phase, execute each task in a thread
964 for(iter=tasks.begin();iter!=tasks.end();iter++)
976 //! Execute a Task in a thread
978 * \param task : Task to execute
980 * Calls Scheduler::notifyFrom of main node (_mainSched) to notify start
982 * Calls Executor::functionForTaskExecution in Thread
985 void Executor::launchTask(Task *task)
987 DEBTRACE("Executor::launchTask(Task *task)");
988 struct threadargs *args;
989 if(task->getState() != YACS::TOACTIVATE)return;
991 DEBTRACE("before _semForMaxThreads.wait " << _semThreadCnt);
992 if(_semThreadCnt == 0)
994 //check if we have enough threads to run
995 std::set<Task*> tmpSet=_runningTasks;
996 std::set<Task*>::iterator it = tmpSet.begin();
997 std::string status="running";
998 std::set<Task*> coupledSet;
999 while( it != tmpSet.end() )
1003 tt->getCoupledTasks(coupledSet);
1005 for (std::set<Task*>::iterator iter = coupledSet.begin(); iter != coupledSet.end(); ++iter)
1007 if((*iter)->getState() == YACS::TOACTIVATE)status="toactivate";
1008 tmpSet.erase(*iter);
1010 if(status=="running")break;
1011 it = tmpSet.begin();
1014 if(status=="toactivate")
1016 std::cerr << "WARNING: maybe you need more threads to run your schema (current value="<< _maxThreads << ")" << std::endl;
1017 std::cerr << "If it is the case, set the YACS_MAX_THREADS environment variable to a bigger value (export YACS_MAX_THREADS=xxx)" << std::endl;
1021 _semForMaxThreads.wait();
1024 args= new threadargs;
1026 args->sched = _mainSched;
1027 args->execInst = this;
1029 traceExec(task, "launch");
1031 { // --- Critical section
1032 _mutexForSchedulerUpdate.lock();
1033 _numberOfRunningTasks++;
1034 _runningTasks.insert(task);
1035 task->begin(); //change state to ACTIVATED
1036 _mutexForSchedulerUpdate.unlock();
1037 } // --- End of critical section
1038 Thread(functionForTaskExecution,args);
1041 //! wait until a running task ends
1043 void Executor::sleepWhileNoEventsFromAnyRunningTask()
1045 DEBTRACE("Executor::sleepWhileNoEventsFromAnyRunningTask()");
1046 // _semForNewTasksToPerform.wait(); //----utiliser pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
1047 _mutexForSchedulerUpdate.lock();
1048 if (_numberOfRunningTasks > 0 && _numberOfEndedTasks==0)
1050 _isWaitingEventsFromRunningTasks = true;
1051 _condForNewTasksToPerform.wait(_mutexForSchedulerUpdate); // mutex released during wait
1053 _numberOfEndedTasks=0;
1054 _mutexForSchedulerUpdate.unlock();
1060 void Executor::notifyEndOfThread(YACS::BASES::Thread *thread)
1062 /*_mutexForNbOfConcurrentThreads.lock();
1063 _groupOfAllThreadsCreated.remove(thread);
1065 _mutexForNbOfConcurrentThreads.unlock();*/
1069 //! must be used protected by _mutexForSchedulerUpdate!
1071 void Executor::wakeUp()
1073 DEBTRACE("Executor::wakeUp() " << _isWaitingEventsFromRunningTasks);
1074 if (_isWaitingEventsFromRunningTasks)
1076 _isWaitingEventsFromRunningTasks = false;
1077 _condForNewTasksToPerform.notify_all();
1080 _numberOfEndedTasks++;
1083 //! number of running tasks
1085 int Executor::getNbOfThreads()
1088 _mutexForNbOfConcurrentThreads.lock();
1089 _isRunningunderExternalControl=true;
1090 ret = _groupOfAllThreadsCreated.size();
1091 _mutexForNbOfConcurrentThreads.unlock();
1096 //! Function to perform execution of a task in a thread
1098 * \param arg : 3 elements (a Task, a Scheduler, an Executor)
1100 * Calls Task::execute
1102 * Calls Task::finished when the task is finished
1104 * Calls (notify with event YACS::FINISH) Scheduler::notifyFrom when the task is finished
1106 * Calls Executor::wakeUp and Executor::notifyEndOfThread
1109 void *Executor::functionForTaskExecution(void *arg)
1111 DEBTRACE("Executor::functionForTaskExecution(void *arg)");
1113 struct threadargs *args = (struct threadargs *) arg;
1114 Task *task=args->task;
1115 Scheduler *sched=args->sched;
1116 Executor *execInst=args->execInst;
1118 execInst->traceExec(task, "state:"+Node::getStateName(task->getState()));
1124 YACS::Event ev=YACS::FINISH;
1127 execInst->traceExec(task, "start execution");
1129 execInst->traceExec(task, "end execution OK");
1131 catch(Exception& ex)
1133 std::cerr << "YACS Exception during execute" << std::endl;
1134 std::cerr << ex.what() << std::endl;
1136 string message = "end execution ABORT, ";
1137 message += ex.what();
1138 execInst->traceExec(task, message);
1142 // Execution has failed
1143 std::cerr << "Execution has failed: unknown reason" << std::endl;
1145 execInst->traceExec(task, "end execution ABORT, unknown reason");
1151 DEBTRACE("task->disconnectService()");
1152 task->disconnectService();
1153 execInst->traceExec(task, "disconnectService");
1157 // Disconnect has failed
1158 std::cerr << "disconnect has failed" << std::endl;
1160 execInst->traceExec(task, "disconnectService failed, ABORT");
1163 DEBTRACE("End task->execute()");
1164 { // --- Critical section
1165 execInst->_mutexForSchedulerUpdate.lock();
1168 if (ev == YACS::FINISH) task->finished();
1169 if (ev == YACS::ABORT)
1171 execInst->_errorDetected = true;
1172 if (execInst->_stopOnErrorRequested)
1174 execInst->_execMode = YACS::STEPBYSTEP;
1175 execInst->_isOKToEnd = true;
1179 execInst->traceExec(task, "state:"+Node::getStateName(task->getState()));
1180 sched->notifyFrom(task,ev);
1182 catch(Exception& ex)
1184 //notify has failed : it is supposed to have set state
1185 //so no need to do anything
1186 std::cerr << "Error during notification" << std::endl;
1187 std::cerr << ex.what() << std::endl;
1191 //notify has failed : it is supposed to have set state
1192 //so no need to do anything
1193 std::cerr << "Notification failed" << std::endl;
1195 execInst->_numberOfRunningTasks--;
1196 execInst->_runningTasks.erase(task);
1197 DEBTRACE("_numberOfRunningTasks: " << execInst->_numberOfRunningTasks
1198 << " _execMode: " << execInst->_execMode
1199 << " _executorState: " << execInst->_executorState);
1200 if ((execInst->_numberOfRunningTasks == 0) && (execInst->_execMode != YACS::CONTINUE)) // no more running tasks
1202 if (execInst->_executorState == YACS::WAITINGTASKS)
1204 execInst->_executorState = YACS::PAUSED;
1205 execInst->sendEvent("executor");
1206 execInst->_condForPilot.notify_all();
1207 if (execInst->_errorDetected &&
1208 execInst->_stopOnErrorRequested &&
1209 !execInst->_isRunningunderExternalControl)
1210 execInst->_condForStepByStep.notify_all(); // exec thread may be on waitResume
1213 DEBTRACE("before _semForMaxThreads.post " << execInst->_semThreadCnt);
1214 execInst->_semForMaxThreads.post();
1215 execInst->_semThreadCnt += 1;
1216 DEBTRACE("after _semForMaxThreads.post " << execInst->_semThreadCnt);
1217 if (execInst->_executorState != YACS::PAUSED) execInst->wakeUp();
1219 execInst->_mutexForSchedulerUpdate.unlock();
1220 } // --- End of critical section (change state)
1222 //execInst->notifyEndOfThread(0);
1227 void Executor::traceExec(Task *task, const std::string& message)
1229 string nodeName = _mainSched->getTaskName(task);
1230 Container *cont = task->getContainer();
1231 string containerName = "---";
1232 string placement = "---";
1235 containerName = cont->getName();
1236 ComponentInstance *compo = task->getComponent();
1238 placement = cont->getFullPlacementId(compo);
1241 DWORD now = timeGetTime();
1242 double elapse = (now - _start)/1000.0;
1245 gettimeofday(&now, NULL);
1246 double elapse = (now.tv_sec - _start.tv_sec) + double(now.tv_usec - _start.tv_usec)/1000000.0;
1248 _mutexForTrace.lock();
1249 _trace << elapse << " " << containerName << " " << placement << " " << nodeName << " " << message << endl;
1251 _mutexForTrace.unlock();
1254 //! emit notification to all observers registered with the dispatcher
1256 * The dispatcher is unique and can be obtained by getDispatcher()
1258 void Executor::sendEvent(const std::string& event)
1260 Dispatcher* disp=Dispatcher::getDispatcher();
1263 disp->dispatch(_root,event);