-// Copyright (C) 2006-2016 CEA/DEN, EDF R&D
+// Copyright (C) 2006-2019 CEA/DEN, EDF R&D
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Lesser General Public
#include "ServiceNode.hxx"
#include "ComposedNode.hxx"
+#include "workloadmanager/WorkloadManager.hxx"
+#include "workloadmanager/DefaultAlgorithm.hxx"
+
#include <iostream>
#include <fstream>
#include <sys/stat.h>
int Executor::_maxThreads(1000);
size_t Executor::_threadStackSize(262144); // Default thread stack size is 256 kB == 2**18 because threads launched by YACS are lightweight
-Executor::Executor():_nbOfConcurrentThreads(0), _semForMaxThreads(_maxThreads),_keepGoingOnFail(false),_DPLScopeSensitive(false)
+Executor::Executor():_mainSched(NULL),_isWaitingEventsFromRunningTasks(false),_nbOfConcurrentThreads(0), _semForMaxThreads(_maxThreads),_keepGoingOnFail(false),_DPLScopeSensitive(false)
{
_root=0;
_toContinue = true;
_executorState = YACS::NOTYETINITIALIZED;
_execMode = YACS::CONTINUE;
_semThreadCnt = _maxThreads;
+ _numberOfRunningTasks = 0;
+ _numberOfEndedTasks = 0;
DEBTRACE("Executor initialized with max threads = " << _maxThreads);
}
string tracefile = "traceExec_";
tracefile += _mainSched->getName();
_trace.open(tracefile.c_str());
-#ifdef WIN32
- _start = timeGetTime();
-#else
- gettimeofday(&_start, NULL);
-#endif
-
+ _start = std::chrono::steady_clock::now();
} // --- End of critical section
if (debug > 1) _displayDot(graph);
if (checkBreakPoints()) break; // end of thread requested, OK to exit at once;
if (debug > 0) _displayDot(graph);
DEBTRACE("---");
- //loadTasks(_tasks);
loadParallelTasks(_tasks,this);
if (debug > 1) _displayDot(graph);
DEBTRACE("---");
DEBTRACE("---");
}
+/*!
+ * This method can be called at any time simultaneously during a RunB call.
+ * This method will wait until the executor is locked in a consistent state of a running graph.
+ *
+ * This method is expected to be called in association with resume method.
+ * The returned parameter is expected to be transfered to resume method.
+ */
+bool Executor::suspendASAP()
+{
+ // no AutoLocker here. It's not a bug.
+ _mutexForSchedulerUpdate.lock();
+ if(!_toContinue && _executorState==YACS::FINISHED)
+ {// execution is finished
+ _mutexForSchedulerUpdate.unLock();
+ return false;// the executor is no more running
+ }
+ //general case. Leave method with locker in locked status
+ return true;
+}
+
+/*!
+ * This method is expected to be called in association with suspendASAP method.
+ * Expected to be called just after suspendASAP with output of resume as input parameter
+ */
+void Executor::resume(bool suspended)
+{
+ if(suspended)
+ _mutexForSchedulerUpdate.unLock();
+}
+
//! stops the execution as soon as possible
void Executor::stopExecution()
traceExec(task, "state:TOLOAD", ComputePlacement(task));
{//Critical section
YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
- _mainSched->notifyFrom(task,YACS::START);
+ _mainSched->notifyFrom(task,YACS::START,execInst);
}//End of critical section
try
{
{//Critical section
YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
task->aborted();
- _mainSched->notifyFrom(task,YACS::ABORT);
+ _mainSched->notifyFrom(task,YACS::ABORT,execInst);
traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
}//End of critical section
}
{//Critical section
YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
task->aborted();
- _mainSched->notifyFrom(task,YACS::ABORT);
+ _mainSched->notifyFrom(task,YACS::ABORT,execInst);
traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
}//End of critical section
}
Executor *execInst;
};
-void Executor::loadTasks(const std::vector<Task *>& tasks, const Executor *execInst)
-{
- for(std::vector<Task *>::const_iterator iter = _tasks.begin(); iter != _tasks.end(); iter++)
- loadTask(*iter,execInst);
-}
-
void Executor::loadParallelTasks(const std::vector<Task *>& tasks, const Executor *execInst)
{
std::vector<Thread> ths(tasks.size());
{//Critical section
YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
(*iter)->aborted();
- _mainSched->notifyFrom(*iter,YACS::ABORT);
+ _mainSched->notifyFrom(*iter,YACS::ABORT,this);
}//End of critical section
}
catch(...)
{//Critical section
YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
(*iter)->aborted();
- _mainSched->notifyFrom(*iter,YACS::ABORT);
+ _mainSched->notifyFrom(*iter,YACS::ABORT,this);
}//End of critical section
}
if((*iter)->getState() == YACS::ERROR)
{//Critical section
YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
t->aborted();
- _mainSched->notifyFrom(t,YACS::ABORT);
+ _mainSched->notifyFrom(t,YACS::ABORT,this);
}//End of critical section
traceExec(t, "state:"+Node::getStateName(t->getState()),ComputePlacement(*iter));
}
HomogeneousPoolContainer *contC(dynamic_cast<HomogeneousPoolContainer *>(task->getContainer()));
if(contC)
{
- YACS::BASES::AutoLocker<Container> alckCont(contC);
+ std::lock_guard<std::mutex> alckCont(contC->getLocker());
contC->release(task);
}
task->aborted();
}
execInst->traceExec(task, "state:"+Node::getStateName(task->getState()),placement);
- sched->notifyFrom(task,ev);
+ sched->notifyFrom(task,ev,execInst);
}
catch(Exception& ex)
{
if (cont)
containerName = cont->getName();
-#ifdef WIN32
- DWORD now = timeGetTime();
- double elapse = (now - _start)/1000.0;
-#else
- timeval now;
- gettimeofday(&now, NULL);
- double elapse = (now.tv_sec - _start.tv_sec) + double(now.tv_usec - _start.tv_usec)/1000000.0;
-#endif
+ std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now();
+ std::chrono::milliseconds millisec;
+ millisec = std::chrono::duration_cast<std::chrono::milliseconds>(now -_start);
+ double elapse = double(millisec.count()) / 1000.0;
{
YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForTrace);
_trace << elapse << " " << containerName << " " << placement << " " << nodeName << " " << message << endl;
disp->dispatch(_root,event);
}
+struct HPCCompare
+{
+ bool operator()(HomogeneousPoolContainer * lhs, HomogeneousPoolContainer * rhs) const
+ {
+ if(!lhs && !rhs)
+ return false;
+ if(!lhs)
+ return true;
+ if(!rhs)
+ return false;
+ return lhs->getNumberOfCoresPerWorker() < rhs->getNumberOfCoresPerWorker();
+ }
+};
+
/*!
* This method takes in input a list of tasks and selects from that lists a part of it considering only the containers.
* If tasks have no container instance subclass of HomogeneousPoolContainer this method will let the \a tsks untouched.
*/
void Executor::FilterTasksConsideringContainers(std::vector<Task *>& tsks)
{
- std::map<HomogeneousPoolContainer *, std::vector<Task *> > m;
- for(std::vector<Task *>::const_iterator it=tsks.begin();it!=tsks.end();it++)
+ std::map<HomogeneousPoolContainer *, std::vector<Task *>, HPCCompare > m;
+ for(auto cur : tsks)
{
- Task *cur(*it);
if(!cur)
continue;
Container *cont(cur->getContainer());
if(!cont)
{
- m[(HomogeneousPoolContainer *)NULL].push_back(cur);
+ m[nullptr].push_back(cur);
continue;
}
HomogeneousPoolContainer *contC(dynamic_cast<HomogeneousPoolContainer *>(cont));
if(!contC)
{
- m[(HomogeneousPoolContainer *)NULL].push_back(cur);
+ m[nullptr].push_back(cur);
continue;
}
m[contC].push_back(cur);
}
//
std::vector<Task *> ret;
- for(std::map<HomogeneousPoolContainer *, std::vector<Task *> >::const_iterator it=m.begin();it!=m.end();it++)
+ for(auto it : m)
{
- HomogeneousPoolContainer *curhpc((*it).first);
- const std::vector<Task *>& curtsks((*it).second);
+ HomogeneousPoolContainer *curhpc(it.first);
+ const std::vector<Task *>& curtsks(it.second);
if(!curhpc)
{
ret.insert(ret.end(),curtsks.begin(),curtsks.end());
else
{
// start of critical section for container curhpc
- YACS::BASES::AutoLocker<Container> alckForCont(curhpc);
+ std::lock_guard<std::mutex> alckCont(curhpc->getLocker());
std::vector<const Task *> vecOfTaskSharingSameHPContToBeRunSimutaneously;
std::size_t sz(curhpc->getNumberOfFreePlace());
std::vector<Task *>::const_iterator it2(curtsks.begin());
placement=zeTask->getContainer()->getFullPlacementId(zeTask);
return placement;
}
+
+///////// NEW EXECUTOR ////////////////////////////////
+void Executor::loadTask(Task *task, const WorkloadManager::RunInfo& runInfo)
+{
+ if(task->getState() != YACS::TOLOAD)
+ return;
+ traceExec(task, "state:TOLOAD", ComputePlacement(task));
+ {//Critical section
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+ _mainSched->notifyFrom(task,YACS::START,this);
+ }//End of critical section
+ try
+ {
+ std::ostringstream container_name;
+ container_name << runInfo.resource.name << "-"
+ << runInfo.type.name << "-" << runInfo.index;
+ task->imposeResource(runInfo.resource.name, container_name.str());
+ traceExec(task, "load", ComputePlacement(task));
+ task->load();
+ traceExec(task, "initService", ComputePlacement(task));
+ task->initService();
+ }
+ catch(Exception& ex)
+ {
+ std::cerr << ex.what() << std::endl;
+ {//Critical section
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+ task->aborted();
+ _mainSched->notifyFrom(task,YACS::ABORT, this);
+ traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
+ }//End of critical section
+ }
+ catch(...)
+ {
+ std::cerr << "Load failed" << std::endl;
+ {//Critical section
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+ task->aborted();
+ _mainSched->notifyFrom(task,YACS::ABORT, this);
+ traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
+ }//End of critical section
+ }
+}
+
+void Executor::beginTask(Task *task)
+{
+ // --- Critical section
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+ _numberOfRunningTasks++;
+ _runningTasks.insert(task);
+ // --- End of critical section
+}
+
+void Executor::endTask(Task *task, YACS::Event ev)
+{
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+ try
+ {
+ if (ev == YACS::FINISH) task->finished();
+ if (ev == YACS::ABORT)
+ {
+ _errorDetected = true;
+ if (_stopOnErrorRequested)
+ {
+ _execMode = YACS::STEPBYSTEP;
+ _isOKToEnd = true;
+ }
+ task->aborted();
+ }
+ //traceExec(task, "state:"+Node::getStateName(task->getState()),placement);
+ _mainSched->notifyFrom(task,ev,this);
+ }
+ catch(Exception& ex)
+ {
+ //notify has failed : it is supposed to have set state
+ //so no need to do anything
+ std::cerr << "Error during notification" << std::endl;
+ std::cerr << ex.what() << std::endl;
+ }
+ catch(...)
+ {
+ //notify has failed : it is supposed to have set state
+ //so no need to do anything
+ std::cerr << "Notification failed" << std::endl;
+ }
+ _numberOfRunningTasks--;
+ _runningTasks.erase(task);
+ DEBTRACE("_numberOfRunningTasks: " << _numberOfRunningTasks
+ << " _execMode: " << _execMode
+ << " _executorState: " << _executorState);
+ if ((_numberOfRunningTasks == 0) && (_execMode != YACS::CONTINUE)) // no more running tasks
+ {
+ if (_executorState == YACS::WAITINGTASKS)
+ {
+ _executorState = YACS::PAUSED;
+ sendEvent("executor");
+ _condForPilot.notify_all();
+ if (_errorDetected &&
+ _stopOnErrorRequested &&
+ !_isRunningunderExternalControl)
+ _condForStepByStep.notify_all(); // exec thread may be on waitResume
+ }
+ }
+ if (_executorState != YACS::PAUSED)
+ wakeUp();
+}
+
+YACS::Event Executor::runTask(Task *task)
+{
+ { // --- Critical section
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+ task->begin(); //change state to ACTIVATED
+ }
+ traceExec(task, "state:"+Node::getStateName(task->getState()),ComputePlacement(task));
+
+ if(getDPLScopeSensitive())
+ {
+ Node *node(dynamic_cast<Node *>(task));
+ ComposedNode *gfn(dynamic_cast<ComposedNode *>(_mainSched));
+ if(node!=0 && gfn!=0)
+ node->applyDPLScope(gfn);
+ }
+
+ YACS::Event ev=YACS::FINISH;
+ try
+ {
+ traceExec(task, "start execution",ComputePlacement(task));
+ task->execute();
+ traceExec(task, "end execution OK",ComputePlacement(task));
+ }
+ catch(Exception& ex)
+ {
+ std::cerr << "YACS Exception during execute" << std::endl;
+ std::cerr << ex.what() << std::endl;
+ ev=YACS::ABORT;
+ string message = "end execution ABORT, ";
+ message += ex.what();
+ traceExec(task, message,ComputePlacement(task));
+ }
+ catch(...)
+ {
+ // Execution has failed
+ std::cerr << "Execution has failed: unknown reason" << std::endl;
+ ev=YACS::ABORT;
+ traceExec(task, "end execution ABORT, unknown reason",ComputePlacement(task));
+ }
+
+ // Disconnect task
+ try
+ {
+ DEBTRACE("task->disconnectService()");
+ task->disconnectService();
+ traceExec(task, "disconnectService",ComputePlacement(task));
+ }
+ catch(...)
+ {
+ // Disconnect has failed
+ std::cerr << "disconnect has failed" << std::endl;
+ ev=YACS::ABORT;
+ traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
+ }
+ //
+
+ std::string placement(ComputePlacement(task));
+
+ // container management for HomogeneousPoolOfContainer
+
+ HomogeneousPoolContainer *contC(dynamic_cast<HomogeneousPoolContainer *>(task->getContainer()));
+ if(contC)
+ {
+ std::lock_guard<std::mutex> alckCont(contC->getLocker());
+ contC->release(task);
+ }
+
+ return ev;
+}
+
+void Executor::makeDatastreamConnections(Task *task)
+{
+ YACS::StatesForNode state=task->getState();
+ if(state != YACS::TOLOAD && state != YACS::TORECONNECT)
+ return;
+ try
+ {
+ task->connectService();
+ traceExec(task, "connectService",ComputePlacement(task));
+ {//Critical section
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+ task->connected();
+ }//End of critical section
+ }
+ catch(Exception& ex)
+ {
+ std::cerr << ex.what() << std::endl;
+ try
+ {
+ (task)->disconnectService();
+ traceExec(task, "disconnectService",ComputePlacement(task));
+ }
+ catch(...)
+ {
+ // Disconnect has failed
+ traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
+ }
+ {//Critical section
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+ task->aborted();
+ _mainSched->notifyFrom(task,YACS::ABORT,this);
+ }//End of critical section
+ }
+ catch(...)
+ {
+ std::cerr << "Problem in connectService" << std::endl;
+ try
+ {
+ (task)->disconnectService();
+ traceExec(task, "disconnectService",ComputePlacement(task));
+ }
+ catch(...)
+ {
+ // Disconnect has failed
+ traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
+ }
+ {//Critical section
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+ task->aborted();
+ _mainSched->notifyFrom(task,YACS::ABORT,this);
+ }//End of critical section
+ }
+ if(task->getState() == YACS::ERROR)
+ {
+ //try to put all coupled tasks in error
+ std::set<Task*> coupledSet;
+ task->getCoupledTasks(coupledSet);
+ for (std::set<Task*>::iterator it = coupledSet.begin(); it != coupledSet.end(); ++it)
+ {
+ Task* t=*it;
+ if(t == task)continue;
+ if(t->getState() == YACS::ERROR)continue;
+ try
+ {
+ t->disconnectService();
+ traceExec(t, "disconnectService",ComputePlacement(task));
+ }
+ catch(...)
+ {
+ // Disconnect has failed
+ traceExec(t, "disconnectService failed, ABORT",ComputePlacement(task));
+ }
+ {//Critical section
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+ t->aborted();
+ _mainSched->notifyFrom(t,YACS::ABORT,this);
+ }//End of critical section
+ traceExec(t, "state:"+Node::getStateName(t->getState()),ComputePlacement(task));
+ }
+ }
+ traceExec(task, "state:"+Node::getStateName(task->getState()),ComputePlacement(task));
+}
+
+#include "Runtime.hxx"
+static
+void loadResources(WorkloadManager::WorkloadManager& wm)
+{
+ Runtime *r(getRuntime());
+ if(!r)
+ throw YACS::Exception("loadResources : no runtime !");
+ std::vector< std::pair<std::string,int> > data(r->getCatalogOfComputeNodes());
+ int id = 0;
+ for(const std::pair<std::string,int>& res : data)
+ {
+ WorkloadManager::Resource newResource;
+ newResource.name = res.first;
+ newResource.id = id;
+ id++;
+ newResource.nbCores = res.second;
+ wm.addResource(newResource);
+ }
+}
+
+class NewTask : public WorkloadManager::Task
+{
+public:
+ NewTask(Executor& executor, YACS::ENGINE::Task* yacsTask);
+ const WorkloadManager::ContainerType& type()const override;
+ void run(const WorkloadManager::RunInfo& runInfo)override;
+private:
+ WorkloadManager::ContainerType _type;
+ Executor& _executor;
+ YACS::ENGINE::Task * _yacsTask;
+};
+
+NewTask::NewTask(Executor& executor, YACS::ENGINE::Task* yacsTask)
+: _type()
+, _executor(executor)
+, _yacsTask(yacsTask)
+{
+ Container * yacsContainer = yacsTask->getContainer();
+ if(yacsContainer != nullptr && yacsTask->canAcceptImposedResource())
+ {
+ _type.ignoreResources = false;
+ _type.name = yacsContainer->getName();
+ std::string nb_procs_str = yacsContainer->getProperty("nb_parallel_procs");
+ float needed_cores = 0.0;
+ if(!nb_procs_str.empty())
+ needed_cores = std::stof(nb_procs_str);
+ _type.neededCores = needed_cores;
+ }
+ else
+ {
+ _type.ignoreResources = true;
+ _type.name = "test";
+ _type.neededCores = 0;
+ }
+ _type.id = 0;
+}
+
+const WorkloadManager::ContainerType& NewTask::type()const
+{
+ return _type;
+}
+
+void NewTask::run(const WorkloadManager::RunInfo& runInfo)
+{
+ _executor.loadTask(_yacsTask, runInfo);
+ _executor.makeDatastreamConnections(_yacsTask);
+ YACS::Event ev = _executor.runTask(_yacsTask);
+ _executor.endTask(_yacsTask, ev);
+ delete this; // provisoire
+}
+
+void Executor::newRun(Scheduler *graph,int debug, bool fromScratch)
+{
+ DEBTRACE("Executor::newRun debug: "<< graph->getName() <<" "<< debug<<" fromScratch: "<<fromScratch);
+ { // --- Critical section
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+ _mainSched = graph;
+ _root = dynamic_cast<ComposedNode *>(_mainSched);
+ if (!_root) throw Exception("Executor::Run, Internal Error!");
+ _executorState = YACS::NOTYETINITIALIZED;
+ sendEvent("executor");
+ _toContinue=true;
+ _isOKToEnd = false;
+ _errorDetected = false;
+ _isWaitingEventsFromRunningTasks = false;
+ _numberOfRunningTasks = 0;
+ _runningTasks.clear();
+ _numberOfEndedTasks = 0;
+ string tracefile = "traceExec_";
+ tracefile += _mainSched->getName();
+ _trace.open(tracefile.c_str());
+ _start = std::chrono::steady_clock::now();
+ } // --- End of critical section
+
+ if (debug > 1) _displayDot(graph);
+
+ if (fromScratch)
+ {
+ try
+ {
+ graph->init();
+ graph->exUpdateState();
+ }
+ catch(Exception& ex)
+ {
+ DEBTRACE("exception: "<< (ex.what()));
+ _executorState = YACS::FINISHED;
+ sendEvent("executor");
+ throw;
+ }
+ }
+ _executorState = YACS::INITIALISED;
+ sendEvent("executor");
+
+ if (debug > 1) _displayDot(graph);
+
+ bool isMore;
+ int problemCount=0;
+ int numberAllTasks;
+
+ _executorState = YACS::RUNNING;
+ sendEvent("executor");
+
+ WorkloadManager::DefaultAlgorithm algo;
+ WorkloadManager::WorkloadManager wlm(algo);
+ loadResources(wlm);
+ wlm.start();
+
+ while (_toContinue)
+ {
+ DEBTRACE("--- executor main loop");
+ sleepWhileNoEventsFromAnyRunningTask();
+ DEBTRACE("--- events...");
+ if (debug > 2) _displayDot(graph);
+ { // --- Critical section
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+ std::vector<Task *> readyTasks=graph->getNextTasks(isMore);
+ graph->selectRunnableTasks(readyTasks);
+ _tasks.clear();
+ for(Task * t : readyTasks)
+ if(_runningTasks.find(t) == _runningTasks.end())
+ _tasks.push_back(t);
+ // TODO: to be removed
+ FilterTasksConsideringContainers(_tasks);
+ numberAllTasks=_numberOfRunningTasks+_tasks.size();
+ } // --- End of critical section
+ if (debug > 2) _displayDot(graph);
+ DEBTRACE("--- events...");
+ if (_executorState == YACS::RUNNING)
+ {
+ if (checkBreakPoints()) break; // end of thread requested, OK to exit at once;
+ for(Task * task : _tasks)
+ {
+ beginTask(task);
+ NewTask* newTask = new NewTask(*this, task);
+ wlm.addTask(newTask);
+ }
+ }
+ if (debug > 1) _displayDot(graph);
+ { // --- Critical section
+ DEBTRACE("---");
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+ //It is possible that the graph is finished but it remains running tasks (it's an error but we must take it into account)
+ _toContinue = !graph->isFinished();
+
+ DEBTRACE("_numberOfRunningTasks: " << _numberOfRunningTasks);
+ DEBTRACE("_numberOfEndedTasks: " << _numberOfEndedTasks);
+ DEBTRACE("_toContinue: " << _toContinue);
+ if(_toContinue && numberAllTasks==0)
+ {
+ //Problem : no running tasks and no task to launch ??
+ problemCount++;
+ std::cerr << "Problem in Executor : no running tasks and no task to launch ?? problemCount=" << problemCount << std::endl;
+ //Pause to give a chance to interrupt
+ usleep(1000);
+ if(problemCount > 25)
+ {
+ // Too much problems encountered : stop execution
+ _toContinue=false;
+ }
+ }
+
+ if (! _toContinue)
+ {
+ _executorState = YACS::FINISHED;
+ sendEvent("executor");
+ _condForPilot.notify_all();
+ }
+ } // --- End of critical section
+ if (debug > 0) _displayDot(graph);
+ DEBTRACE("_toContinue: " << _toContinue);
+ }
+
+ wlm.stop();
+ DEBTRACE("End of main Loop");
+
+ { // --- Critical section
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+ if ( _toContinue) // --- break while(): request to stop detected on checkBreakPoints()
+ {
+ DEBTRACE("stop requested: End soon");
+ _executorState = YACS::STOPPED;
+ _toContinue = false;
+ sendEvent("executor");
+ }
+ } // --- End of critical section
+ if ( _dumpOnErrorRequested && _errorDetected)
+ {
+ saveState(_dumpErrorFile);
+ }
+ {
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForTrace);
+ _trace.close();
+ }
+ DEBTRACE("End of RunB thread");
+}
+
+void Executor::RunW(Scheduler *graph,int debug, bool fromScratch)
+{
+ std::string str_value = graph->getProperty("executor");
+ if(str_value == "WorkloadManager"
+ || str_value == "WORKLOADMANAGER"
+ || str_value == "workloadmanager"
+ || str_value == "WorkLoadManager")
+ newRun(graph, debug, fromScratch);
+ else
+ RunB(graph, debug, fromScratch);
+}