-// Copyright (C) 2006-2014 CEA/DEN, EDF R&D
+// Copyright (C) 2006-2019 CEA/DEN, EDF R&D
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Lesser General Public
#include "Executor.hxx"
#include "Task.hxx"
+#include "AutoLocker.hxx"
#include "Scheduler.hxx"
#include "Dispatcher.hxx"
#include "Container.hxx"
+#include "HomogeneousPoolContainer.hxx"
#include "ComponentInstance.hxx"
#include "VisitorSaveState.hxx"
+#include "ServiceNode.hxx"
#include "ComposedNode.hxx"
+#include "workloadmanager/WorkloadManager.hxx"
+#include "workloadmanager/DefaultAlgorithm.hxx"
+
#include <iostream>
#include <fstream>
#include <sys/stat.h>
//#define _DEVDEBUG_
#include "YacsTrace.hxx"
-int Executor::_maxThreads(50);
-size_t Executor::_threadStackSize(1048576); // Default thread stack size is 1MB
+int Executor::_maxThreads(1000);
+size_t Executor::_threadStackSize(262144); // Default thread stack size is 256 kB == 2**18 because threads launched by YACS are lightweight
-Executor::Executor():_nbOfConcurrentThreads(0), _semForMaxThreads(_maxThreads)
+Executor::Executor():_mainSched(NULL),_isWaitingEventsFromRunningTasks(false),_nbOfConcurrentThreads(0), _semForMaxThreads(_maxThreads),_keepGoingOnFail(false),_DPLScopeSensitive(false)
{
_root=0;
_toContinue = true;
_executorState = YACS::NOTYETINITIALIZED;
_execMode = YACS::CONTINUE;
_semThreadCnt = _maxThreads;
+ _numberOfRunningTasks = 0;
+ _numberOfEndedTasks = 0;
DEBTRACE("Executor initialized with max threads = " << _maxThreads);
}
if(debug>2)_displayDot(graph);
{//Critical section
- _mutexForSchedulerUpdate.lock();
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
tasks=graph->getNextTasks(isMore);
graph->selectRunnableTasks(tasks);
- _mutexForSchedulerUpdate.unlock();
}//End of critical section
if(debug>2)_displayDot(graph);
for(iter=tasks.begin();iter!=tasks.end();iter++)
- loadTask(*iter);
+ loadTask(*iter,this);
if(debug>1)_displayDot(graph);
if(debug>1)_displayDot(graph);
{//Critical section
- _mutexForSchedulerUpdate.lock();
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
_toContinue=!graph->isFinished();
- _mutexForSchedulerUpdate.unlock();
}//End of critical section
DEBTRACE("_toContinue: " << _toContinue);
DEBTRACE("Executor::RunB debug: "<< graph->getName() <<" "<< debug<<" fromScratch: "<<fromScratch);
{ // --- Critical section
- _mutexForSchedulerUpdate.lock();
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
_mainSched = graph;
_root = dynamic_cast<ComposedNode *>(_mainSched);
if (!_root) throw Exception("Executor::Run, Internal Error!");
string tracefile = "traceExec_";
tracefile += _mainSched->getName();
_trace.open(tracefile.c_str());
-#ifdef WIN32
- _start = timeGetTime();
-#else
- gettimeofday(&_start, NULL);
-#endif
-
- _mutexForSchedulerUpdate.unlock();
+ _start = std::chrono::steady_clock::now();
} // --- End of critical section
if (debug > 1) _displayDot(graph);
DEBTRACE("--- events...");
if (debug > 2) _displayDot(graph);
{ // --- Critical section
- _mutexForSchedulerUpdate.lock();
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
_tasks=graph->getNextTasks(isMore);
- numberAllTasks=_numberOfRunningTasks+_tasks.size();
graph->selectRunnableTasks(_tasks);
- _mutexForSchedulerUpdate.unlock();
+ FilterTasksConsideringContainers(_tasks);
+ numberAllTasks=_numberOfRunningTasks+_tasks.size();
} // --- End of critical section
if (debug > 2) _displayDot(graph);
if (_executorState == YACS::RUNNING)
if (checkBreakPoints()) break; // end of thread requested, OK to exit at once;
if (debug > 0) _displayDot(graph);
DEBTRACE("---");
- for (iter = _tasks.begin(); iter != _tasks.end(); iter++)
- loadTask(*iter);
+ loadParallelTasks(_tasks,this);
if (debug > 1) _displayDot(graph);
DEBTRACE("---");
launchTasks(_tasks);
if (debug > 1) _displayDot(graph);
{ // --- Critical section
DEBTRACE("---");
- _mutexForSchedulerUpdate.lock();
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
//It is possible that the graph is finished but it remains running tasks (it's an error but we must take it into account)
if(_numberOfRunningTasks == 0)
_toContinue = !graph->isFinished();
sendEvent("executor");
_condForPilot.notify_all();
}
- _mutexForSchedulerUpdate.unlock();
} // --- End of critical section
if (debug > 0) _displayDot(graph);
DEBTRACE("_toContinue: " << _toContinue);
DEBTRACE("End of main Loop");
{ // --- Critical section
- _mutexForSchedulerUpdate.lock();
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
if ( _toContinue) // --- break while(): request to stop detected on checkBreakPoints()
{
DEBTRACE("stop requested: End soon");
_toContinue = false;
sendEvent("executor");
}
- _mutexForSchedulerUpdate.unlock();
} // --- End of critical section
if ( _dumpOnErrorRequested && _errorDetected)
{
saveState(_dumpErrorFile);
}
- _trace.close();
+ {
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForTrace);
+ _trace.close();
+ }
DEBTRACE("End of RunB thread");
}
void Executor::setStopOnError(bool dumpRequested, std::string xmlFile)
{
{ // --- Critical section
- _mutexForSchedulerUpdate.lock();
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
_dumpErrorFile=xmlFile;
_stopOnErrorRequested=true;
_dumpOnErrorRequested = dumpRequested;
if (dumpRequested && xmlFile.empty())
throw YACS::Exception("dump on error requested and no filename given for dump");
- _mutexForSchedulerUpdate.unlock();
DEBTRACE("_dumpErrorFile " << _dumpErrorFile << " " << _dumpOnErrorRequested);
} // --- End of critical section
}
void Executor::unsetStopOnError()
{
{ // --- Critical section
- _mutexForSchedulerUpdate.lock();
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
_stopOnErrorRequested=false;
- _mutexForSchedulerUpdate.unlock();
} // --- End of critical section
}
{
DEBTRACE("Executor::setExecMode(YACS::ExecutionMode mode) " << mode);
{ // --- Critical section
- _mutexForSchedulerUpdate.lock();
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
_isRunningunderExternalControl=true;
_execMode = mode;
- _mutexForSchedulerUpdate.unlock();
} // --- End of critical section
}
bool ret = false;
//bool doDump = false;
{ // --- Critical section
- _mutexForSchedulerUpdate.lock();
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
_isRunningunderExternalControl=true;
DEBTRACE("_executorState: " << _executorState);
switch (_executorState)
// debug: no easy way to verify if main loop is acutally waiting on condition
}
}
- _mutexForSchedulerUpdate.unlock();
DEBTRACE("---");
//if (doDump) saveState(_dumpErrorFile);
} // --- End of critical section
{
DEBTRACE("Executor::setListOfBreakPoints(std::list<std::string> listOfBreakPoints)");
{ // --- Critical section
- _mutexForSchedulerUpdate.lock();
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
_isRunningunderExternalControl=true;
_listOfBreakPoints = listOfBreakPoints;
- _mutexForSchedulerUpdate.unlock();
} // --- End of critical section
}
list<string> listOfNodesToLoad;
listOfNodesToLoad.clear();
{ // --- Critical section
- _mutexForSchedulerUpdate.lock();
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
_isRunningunderExternalControl=true;
switch (_executorState)
{
break;
}
}
- _mutexForSchedulerUpdate.unlock();
} // --- End of critical section
return listOfNodesToLoad;
}
vector<Task *>::iterator iter;
vector<Task *> restrictedTasks;
{ // --- Critical section
- _mutexForSchedulerUpdate.lock();
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
_isRunningunderExternalControl=true;
switch (_executorState)
{
break;
}
}
- _mutexForSchedulerUpdate.unlock();
} // --- End of critical section
_tasks.clear();
{
DEBTRACE("Executor::waitPause()" << _executorState);
{ // --- Critical section
- _mutexForSchedulerUpdate.lock();
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
_isRunningunderExternalControl=true;
switch (_executorState)
{
break;
}
}
- _mutexForSchedulerUpdate.unlock();
} // --- End of critical section
DEBTRACE("---");
}
+/*!
+ * This method can be called at any time simultaneously during a RunB call.
+ * This method will wait until the executor is locked in a consistent state of a running graph.
+ *
+ * This method is expected to be called in association with resume method.
+ * The returned parameter is expected to be transfered to resume method.
+ */
+bool Executor::suspendASAP()
+{
+ // no AutoLocker here. It's not a bug.
+ _mutexForSchedulerUpdate.lock();
+ if(!_toContinue && _executorState==YACS::FINISHED)
+ {// execution is finished
+ _mutexForSchedulerUpdate.unLock();
+ return false;// the executor is no more running
+ }
+ //general case. Leave method with locker in locked status
+ return true;
+}
+
+/*!
+ * This method is expected to be called in association with suspendASAP method.
+ * Expected to be called just after suspendASAP with output of resume as input parameter
+ */
+void Executor::resume(bool suspended)
+{
+ if(suspended)
+ _mutexForSchedulerUpdate.unLock();
+}
+
//! stops the execution as soon as possible
void Executor::stopExecution()
bool Executor::saveState(const std::string& xmlFile)
{
DEBTRACE("Executor::saveState() in " << xmlFile);
- YACS::ENGINE::VisitorSaveState vst(_root);
- vst.openFileDump(xmlFile.c_str());
- _root->accept(&vst);
- vst.closeFileDump();
- return true;
+ bool result = false;
+ try {
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+ YACS::ENGINE::VisitorSaveState vst(_root);
+ vst.openFileDump(xmlFile.c_str());
+ _root->accept(&vst);
+ vst.closeFileDump();
+ result = true;
+ }
+ catch(Exception& ex) {
+ std::cerr << ex.what() << std::endl;
+ }
+ return result;
}
//! not yet implemented
{
bool stop = false;
{ // --- Critical section
- _mutexForSchedulerUpdate.lock();
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
_tasksSave = _tasks;
for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
{
sendEvent("executor");
_condForPilot.notify_all();
}
- //_mutexForSchedulerUpdate.unlock();
- //} // --- End of critical section
if (stop && !_isOKToEnd) waitResume(); // wait until pilot calls resumeCurrentBreakPoint(), mutex released during wait
if (_isOKToEnd) endRequested = true;
- _mutexForSchedulerUpdate.unlock();
} // --- End of critical section
if (stop) DEBTRACE("wake up from waitResume");
break;
case YACS::STEPBYSTEP:
{
{ // --- Critical section
- _mutexForSchedulerUpdate.lock();
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
_tasksSave = _tasks;
_listOfTasksToLoad.clear();
for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
waitResume(); // wait until pilot calls resumeCurrentBreakPoint(), mutex released during wait
// or, if no pilot, wait until no more running tasks (stop on error)
if (_isOKToEnd) endRequested = true;
- _mutexForSchedulerUpdate.unlock();
} // --- End of critical section
DEBTRACE("wake up from waitResume");
break;
* \param task : Task to load
*/
-void Executor::loadTask(Task *task)
+void Executor::loadTask(Task *task, const Executor *execInst)
{
DEBTRACE("Executor::loadTask(Task *task)");
- if(task->getState() != YACS::TOLOAD)return;
- traceExec(task, "state:TOLOAD");
+ if(task->getState() != YACS::TOLOAD)
+ return;
+ traceExec(task, "state:TOLOAD", ComputePlacement(task));
{//Critical section
- _mutexForSchedulerUpdate.lock();
- _mainSched->notifyFrom(task,YACS::START);
- _mutexForSchedulerUpdate.unlock();
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+ _mainSched->notifyFrom(task,YACS::START,execInst);
}//End of critical section
try
{
- traceExec(task, "load");
+ traceExec(task, "load", ComputePlacement(task));
task->load();
- traceExec(task, "initService");
+ traceExec(task, "initService", ComputePlacement(task));
task->initService();
}
catch(Exception& ex)
{
std::cerr << ex.what() << std::endl;
{//Critical section
- _mutexForSchedulerUpdate.lock();
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
task->aborted();
- _mainSched->notifyFrom(task,YACS::ABORT);
- traceExec(task, "state:"+Node::getStateName(task->getState()));
- _mutexForSchedulerUpdate.unlock();
+ _mainSched->notifyFrom(task,YACS::ABORT,execInst);
+ traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
}//End of critical section
}
catch(...)
{
std::cerr << "Load failed" << std::endl;
{//Critical section
- _mutexForSchedulerUpdate.lock();
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
task->aborted();
- _mainSched->notifyFrom(task,YACS::ABORT);
- traceExec(task, "state:"+Node::getStateName(task->getState()));
- _mutexForSchedulerUpdate.unlock();
+ _mainSched->notifyFrom(task,YACS::ABORT,execInst);
+ traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
}//End of critical section
}
}
+struct threadargs
+{
+ Task *task;
+ Scheduler *sched;
+ Executor *execInst;
+};
+
+void Executor::loadParallelTasks(const std::vector<Task *>& tasks, const Executor *execInst)
+{
+ std::vector<Thread> ths(tasks.size());
+ std::size_t ithread(0);
+ for(std::vector<Task *>::const_iterator iter = _tasks.begin(); iter != _tasks.end(); iter++, ithread++)
+ {
+ DEBTRACE("Executor::loadParallelTasks(Task *task)");
+ struct threadargs *args(new threadargs);
+ args->task = (*iter);
+ args->sched = _mainSched;
+ args->execInst = this;
+ ths[ithread].go(functionForTaskLoad, args, _threadStackSize);
+ }
+ for(ithread=0;ithread<tasks.size();ithread++)
+ ths[ithread].join();
+}
//! Execute a list of tasks possibly connected through datastream links
/*!
* \param tasks : a list of tasks to execute
*
*/
-void Executor::launchTasks(std::vector<Task *>& tasks)
+void Executor::launchTasks(const std::vector<Task *>& tasks)
{
- vector<Task *>::iterator iter;
//First phase, make datastream connections
- for(iter=tasks.begin();iter!=tasks.end();iter++)
+ for(vector<Task *>::const_iterator iter=tasks.begin();iter!=tasks.end();iter++)
{
YACS::StatesForNode state=(*iter)->getState();
if(state != YACS::TOLOAD && state != YACS::TORECONNECT)continue;
try
{
(*iter)->connectService();
- traceExec(*iter, "connectService");
+ traceExec(*iter, "connectService",ComputePlacement(*iter));
{//Critical section
- _mutexForSchedulerUpdate.lock();
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
(*iter)->connected();
- _mutexForSchedulerUpdate.unlock();
}//End of critical section
}
catch(Exception& ex)
try
{
(*iter)->disconnectService();
- traceExec(*iter, "disconnectService");
+ traceExec(*iter, "disconnectService",ComputePlacement(*iter));
}
catch(...)
{
// Disconnect has failed
- traceExec(*iter, "disconnectService failed, ABORT");
+ traceExec(*iter, "disconnectService failed, ABORT",ComputePlacement(*iter));
}
{//Critical section
- _mutexForSchedulerUpdate.lock();
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
(*iter)->aborted();
- _mainSched->notifyFrom(*iter,YACS::ABORT);
- _mutexForSchedulerUpdate.unlock();
+ _mainSched->notifyFrom(*iter,YACS::ABORT,this);
}//End of critical section
}
catch(...)
try
{
(*iter)->disconnectService();
- traceExec(*iter, "disconnectService");
+ traceExec(*iter, "disconnectService",ComputePlacement(*iter));
}
catch(...)
{
// Disconnect has failed
- traceExec(*iter, "disconnectService failed, ABORT");
+ traceExec(*iter, "disconnectService failed, ABORT",ComputePlacement(*iter));
}
{//Critical section
- _mutexForSchedulerUpdate.lock();
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
(*iter)->aborted();
- _mainSched->notifyFrom(*iter,YACS::ABORT);
- _mutexForSchedulerUpdate.unlock();
+ _mainSched->notifyFrom(*iter,YACS::ABORT,this);
}//End of critical section
}
if((*iter)->getState() == YACS::ERROR)
try
{
t->disconnectService();
- traceExec(t, "disconnectService");
+ traceExec(t, "disconnectService",ComputePlacement(*iter));
}
catch(...)
{
// Disconnect has failed
- traceExec(t, "disconnectService failed, ABORT");
+ traceExec(t, "disconnectService failed, ABORT",ComputePlacement(*iter));
}
{//Critical section
- _mutexForSchedulerUpdate.lock();
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
t->aborted();
- _mainSched->notifyFrom(t,YACS::ABORT);
- _mutexForSchedulerUpdate.unlock();
+ _mainSched->notifyFrom(t,YACS::ABORT,this);
}//End of critical section
- traceExec(t, "state:"+Node::getStateName(t->getState()));
+ traceExec(t, "state:"+Node::getStateName(t->getState()),ComputePlacement(*iter));
}
}
- traceExec(*iter, "state:"+Node::getStateName((*iter)->getState()));
+ traceExec(*iter, "state:"+Node::getStateName((*iter)->getState()),ComputePlacement(*iter));
}
//Second phase, execute each task in a thread
- for(iter=tasks.begin();iter!=tasks.end();iter++)
+ for(vector<Task *>::const_iterator iter=tasks.begin();iter!=tasks.end();iter++)
{
launchTask(*iter);
}
}
-struct threadargs {
- Task *task;
- Scheduler *sched;
- Executor *execInst;
-};
-
//! Execute a Task in a thread
/*!
* \param task : Task to execute
DEBTRACE("before _semForMaxThreads.wait " << _semThreadCnt);
if(_semThreadCnt == 0)
{
+ // --- Critical section
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
//check if we have enough threads to run
std::set<Task*> tmpSet=_runningTasks;
std::set<Task*>::iterator it = tmpSet.begin();
std::cerr << "WARNING: maybe you need more threads to run your schema (current value="<< _maxThreads << ")" << std::endl;
std::cerr << "If it is the case, set the YACS_MAX_THREADS environment variable to a bigger value (export YACS_MAX_THREADS=xxx)" << std::endl;
}
+ // --- End of critical section
}
_semForMaxThreads.wait();
args->sched = _mainSched;
args->execInst = this;
- traceExec(task, "launch");
+ traceExec(task, "launch",ComputePlacement(task));
{ // --- Critical section
- _mutexForSchedulerUpdate.lock();
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
_numberOfRunningTasks++;
_runningTasks.insert(task);
task->begin(); //change state to ACTIVATED
- _mutexForSchedulerUpdate.unlock();
} // --- End of critical section
Thread(functionForTaskExecution, args, _threadStackSize);
}
{
DEBTRACE("Executor::sleepWhileNoEventsFromAnyRunningTask()");
// _semForNewTasksToPerform.wait(); //----utiliser pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
- _mutexForSchedulerUpdate.lock();
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
if (_numberOfRunningTasks > 0 && _numberOfEndedTasks==0)
{
_isWaitingEventsFromRunningTasks = true;
_condForNewTasksToPerform.wait(_mutexForSchedulerUpdate); // mutex released during wait
}
_numberOfEndedTasks=0;
- _mutexForSchedulerUpdate.unlock();
DEBTRACE("---");
}
int Executor::getNbOfThreads()
{
int ret;
- _mutexForNbOfConcurrentThreads.lock();
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForNbOfConcurrentThreads);
_isRunningunderExternalControl=true;
ret = _groupOfAllThreadsCreated.size();
- _mutexForNbOfConcurrentThreads.unlock();
return ret;
}
+/*!
+ * This thread is NOT supposed to be detached !
+ */
+void *Executor::functionForTaskLoad(void *arg)
+{
+ DEBTRACE("Executor::functionForTaskLoad(void *arg)");
+ struct threadargs *args = (struct threadargs *) arg;
+ Task *task=args->task;
+ Scheduler *sched=args->sched;
+ Executor *execInst=args->execInst;
+ delete args;
+ execInst->loadTask(task,execInst);// no throw of this method - all throw are catched !
+ return 0;
+}
//! Function to perform execution of a task in a thread
/*!
Scheduler *sched=args->sched;
Executor *execInst=args->execInst;
delete args;
- execInst->traceExec(task, "state:"+Node::getStateName(task->getState()));
+ execInst->traceExec(task, "state:"+Node::getStateName(task->getState()),ComputePlacement(task));
Thread::detach();
// Execute task
+ if(execInst->getDPLScopeSensitive())
+ {
+ Node *node(dynamic_cast<Node *>(task));
+ ComposedNode *gfn(dynamic_cast<ComposedNode *>(sched));
+ if(node!=0 && gfn!=0)
+ node->applyDPLScope(gfn);
+ }
+
YACS::Event ev=YACS::FINISH;
try
{
- execInst->traceExec(task, "start execution");
+ execInst->traceExec(task, "start execution",ComputePlacement(task));
task->execute();
- execInst->traceExec(task, "end execution OK");
+ execInst->traceExec(task, "end execution OK",ComputePlacement(task));
}
catch(Exception& ex)
{
ev=YACS::ABORT;
string message = "end execution ABORT, ";
message += ex.what();
- execInst->traceExec(task, message);
+ execInst->traceExec(task, message,ComputePlacement(task));
}
catch(...)
{
// Execution has failed
std::cerr << "Execution has failed: unknown reason" << std::endl;
ev=YACS::ABORT;
- execInst->traceExec(task, "end execution ABORT, unknown reason");
+ execInst->traceExec(task, "end execution ABORT, unknown reason",ComputePlacement(task));
}
// Disconnect task
{
DEBTRACE("task->disconnectService()");
task->disconnectService();
- execInst->traceExec(task, "disconnectService");
+ execInst->traceExec(task, "disconnectService",ComputePlacement(task));
}
catch(...)
{
// Disconnect has failed
std::cerr << "disconnect has failed" << std::endl;
ev=YACS::ABORT;
- execInst->traceExec(task, "disconnectService failed, ABORT");
+ execInst->traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
+ }
+ //
+
+ std::string placement(ComputePlacement(task));
+
+ // container management for HomogeneousPoolOfContainer
+
+ HomogeneousPoolContainer *contC(dynamic_cast<HomogeneousPoolContainer *>(task->getContainer()));
+ if(contC)
+ {
+ std::lock_guard<std::mutex> alckCont(contC->getLocker());
+ contC->release(task);
}
DEBTRACE("End task->execute()");
{ // --- Critical section
- execInst->_mutexForSchedulerUpdate.lock();
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&execInst->_mutexForSchedulerUpdate);
try
{
if (ev == YACS::FINISH) task->finished();
}
task->aborted();
}
- execInst->traceExec(task, "state:"+Node::getStateName(task->getState()));
- sched->notifyFrom(task,ev);
+ execInst->traceExec(task, "state:"+Node::getStateName(task->getState()),placement);
+ sched->notifyFrom(task,ev,execInst);
}
catch(Exception& ex)
{
DEBTRACE("after _semForMaxThreads.post " << execInst->_semThreadCnt);
if (execInst->_executorState != YACS::PAUSED) execInst->wakeUp();
- execInst->_mutexForSchedulerUpdate.unlock();
} // --- End of critical section (change state)
//execInst->notifyEndOfThread(0);
return 0;
}
-void Executor::traceExec(Task *task, const std::string& message)
+void Executor::traceExec(Task *task, const std::string& message, const std::string& placement)
{
string nodeName = _mainSched->getTaskName(task);
Container *cont = task->getContainer();
string containerName = "---";
- string placement = "---";
if (cont)
- {
- containerName = cont->getName();
- ComponentInstance *compo = task->getComponent();
- //if (compo)
- placement = cont->getFullPlacementId(compo);
- }
-#ifdef WIN32
- DWORD now = timeGetTime();
- double elapse = (now - _start)/1000.0;
-#else
- timeval now;
- gettimeofday(&now, NULL);
- double elapse = (now.tv_sec - _start.tv_sec) + double(now.tv_usec - _start.tv_usec)/1000000.0;
-#endif
- _mutexForTrace.lock();
- _trace << elapse << " " << containerName << " " << placement << " " << nodeName << " " << message << endl;
- _trace << flush;
- _mutexForTrace.unlock();
+ containerName = cont->getName();
+
+ std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now();
+ std::chrono::milliseconds millisec;
+ millisec = std::chrono::duration_cast<std::chrono::milliseconds>(now -_start);
+ double elapse = double(millisec.count()) / 1000.0;
+ {
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForTrace);
+ _trace << elapse << " " << containerName << " " << placement << " " << nodeName << " " << message << endl;
+ _trace << flush;
+ }
}
//! emit notification to all observers registered with the dispatcher
YASSERT(_root);
disp->dispatch(_root,event);
}
+
+struct HPCCompare
+{
+ bool operator()(HomogeneousPoolContainer * lhs, HomogeneousPoolContainer * rhs) const
+ {
+ if(!lhs && !rhs)
+ return false;
+ if(!lhs)
+ return true;
+ if(!rhs)
+ return false;
+ return lhs->getNumberOfCoresPerWorker() < rhs->getNumberOfCoresPerWorker();
+ }
+};
+
+/*!
+ * This method takes in input a list of tasks and selects from that lists a part of it considering only the containers.
+ * If tasks have no container instance subclass of HomogeneousPoolContainer this method will let the \a tsks untouched.
+ *
+ * \param [in,out] tsks - list of tasks to be
+ */
+void Executor::FilterTasksConsideringContainers(std::vector<Task *>& tsks)
+{
+ std::map<HomogeneousPoolContainer *, std::vector<Task *>, HPCCompare > m;
+ for(auto cur : tsks)
+ {
+ if(!cur)
+ continue;
+ Container *cont(cur->getContainer());
+ if(!cont)
+ {
+ m[nullptr].push_back(cur);
+ continue;
+ }
+ HomogeneousPoolContainer *contC(dynamic_cast<HomogeneousPoolContainer *>(cont));
+ if(!contC)
+ {
+ m[nullptr].push_back(cur);
+ continue;
+ }
+ m[contC].push_back(cur);
+ }
+ //
+ std::vector<Task *> ret;
+ for(auto it : m)
+ {
+ HomogeneousPoolContainer *curhpc(it.first);
+ const std::vector<Task *>& curtsks(it.second);
+ if(!curhpc)
+ {
+ ret.insert(ret.end(),curtsks.begin(),curtsks.end());
+ }
+ else
+ {
+ // start of critical section for container curhpc
+ std::lock_guard<std::mutex> alckCont(curhpc->getLocker());
+ std::vector<const Task *> vecOfTaskSharingSameHPContToBeRunSimutaneously;
+ std::size_t sz(curhpc->getNumberOfFreePlace());
+ std::vector<Task *>::const_iterator it2(curtsks.begin());
+ for(std::size_t i=0;i<sz && it2!=curtsks.end();i++,it2++)
+ {
+ vecOfTaskSharingSameHPContToBeRunSimutaneously.push_back(*it2);
+ ret.push_back(*it2);
+ }
+ curhpc->allocateFor(vecOfTaskSharingSameHPContToBeRunSimutaneously);
+ //end of critical section
+ }
+ }
+ //
+ tsks=ret;
+}
+
+std::string Executor::ComputePlacement(Task *zeTask)
+{
+ std::string placement("---");
+ if(!zeTask)
+ return placement;
+ if(zeTask->getContainer())
+ placement=zeTask->getContainer()->getFullPlacementId(zeTask);
+ return placement;
+}
+
+///////// NEW EXECUTOR ////////////////////////////////
+void Executor::loadTask(Task *task, const WorkloadManager::RunInfo& runInfo)
+{
+ if(task->getState() != YACS::TOLOAD)
+ return;
+ traceExec(task, "state:TOLOAD", ComputePlacement(task));
+ {//Critical section
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+ _mainSched->notifyFrom(task,YACS::START,this);
+ }//End of critical section
+ try
+ {
+ std::ostringstream container_name;
+ container_name << runInfo.resource.name << "-"
+ << runInfo.type.name << "-" << runInfo.index;
+ task->imposeResource(runInfo.resource.name, container_name.str());
+ traceExec(task, "load", ComputePlacement(task));
+ task->load();
+ traceExec(task, "initService", ComputePlacement(task));
+ task->initService();
+ }
+ catch(Exception& ex)
+ {
+ std::cerr << ex.what() << std::endl;
+ {//Critical section
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+ task->aborted();
+ _mainSched->notifyFrom(task,YACS::ABORT, this);
+ traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
+ }//End of critical section
+ }
+ catch(...)
+ {
+ std::cerr << "Load failed" << std::endl;
+ {//Critical section
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+ task->aborted();
+ _mainSched->notifyFrom(task,YACS::ABORT, this);
+ traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
+ }//End of critical section
+ }
+}
+
+void Executor::beginTask(Task *task)
+{
+ // --- Critical section
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+ _numberOfRunningTasks++;
+ _runningTasks.insert(task);
+ // --- End of critical section
+}
+
+void Executor::endTask(Task *task, YACS::Event ev)
+{
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+ try
+ {
+ if (ev == YACS::FINISH) task->finished();
+ if (ev == YACS::ABORT)
+ {
+ _errorDetected = true;
+ if (_stopOnErrorRequested)
+ {
+ _execMode = YACS::STEPBYSTEP;
+ _isOKToEnd = true;
+ }
+ task->aborted();
+ }
+ //traceExec(task, "state:"+Node::getStateName(task->getState()),placement);
+ _mainSched->notifyFrom(task,ev,this);
+ }
+ catch(Exception& ex)
+ {
+ //notify has failed : it is supposed to have set state
+ //so no need to do anything
+ std::cerr << "Error during notification" << std::endl;
+ std::cerr << ex.what() << std::endl;
+ }
+ catch(...)
+ {
+ //notify has failed : it is supposed to have set state
+ //so no need to do anything
+ std::cerr << "Notification failed" << std::endl;
+ }
+ _numberOfRunningTasks--;
+ _runningTasks.erase(task);
+ DEBTRACE("_numberOfRunningTasks: " << _numberOfRunningTasks
+ << " _execMode: " << _execMode
+ << " _executorState: " << _executorState);
+ if ((_numberOfRunningTasks == 0) && (_execMode != YACS::CONTINUE)) // no more running tasks
+ {
+ if (_executorState == YACS::WAITINGTASKS)
+ {
+ _executorState = YACS::PAUSED;
+ sendEvent("executor");
+ _condForPilot.notify_all();
+ if (_errorDetected &&
+ _stopOnErrorRequested &&
+ !_isRunningunderExternalControl)
+ _condForStepByStep.notify_all(); // exec thread may be on waitResume
+ }
+ }
+ if (_executorState != YACS::PAUSED)
+ wakeUp();
+}
+
+YACS::Event Executor::runTask(Task *task)
+{
+ { // --- Critical section
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+ task->begin(); //change state to ACTIVATED
+ }
+ traceExec(task, "state:"+Node::getStateName(task->getState()),ComputePlacement(task));
+
+ if(getDPLScopeSensitive())
+ {
+ Node *node(dynamic_cast<Node *>(task));
+ ComposedNode *gfn(dynamic_cast<ComposedNode *>(_mainSched));
+ if(node!=0 && gfn!=0)
+ node->applyDPLScope(gfn);
+ }
+
+ YACS::Event ev=YACS::FINISH;
+ try
+ {
+ traceExec(task, "start execution",ComputePlacement(task));
+ task->execute();
+ traceExec(task, "end execution OK",ComputePlacement(task));
+ }
+ catch(Exception& ex)
+ {
+ std::cerr << "YACS Exception during execute" << std::endl;
+ std::cerr << ex.what() << std::endl;
+ ev=YACS::ABORT;
+ string message = "end execution ABORT, ";
+ message += ex.what();
+ traceExec(task, message,ComputePlacement(task));
+ }
+ catch(...)
+ {
+ // Execution has failed
+ std::cerr << "Execution has failed: unknown reason" << std::endl;
+ ev=YACS::ABORT;
+ traceExec(task, "end execution ABORT, unknown reason",ComputePlacement(task));
+ }
+
+ // Disconnect task
+ try
+ {
+ DEBTRACE("task->disconnectService()");
+ task->disconnectService();
+ traceExec(task, "disconnectService",ComputePlacement(task));
+ }
+ catch(...)
+ {
+ // Disconnect has failed
+ std::cerr << "disconnect has failed" << std::endl;
+ ev=YACS::ABORT;
+ traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
+ }
+ //
+
+ std::string placement(ComputePlacement(task));
+
+ // container management for HomogeneousPoolOfContainer
+
+ HomogeneousPoolContainer *contC(dynamic_cast<HomogeneousPoolContainer *>(task->getContainer()));
+ if(contC)
+ {
+ std::lock_guard<std::mutex> alckCont(contC->getLocker());
+ contC->release(task);
+ }
+
+ return ev;
+}
+
+void Executor::makeDatastreamConnections(Task *task)
+{
+ YACS::StatesForNode state=task->getState();
+ if(state != YACS::TOLOAD && state != YACS::TORECONNECT)
+ return;
+ try
+ {
+ task->connectService();
+ traceExec(task, "connectService",ComputePlacement(task));
+ {//Critical section
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+ task->connected();
+ }//End of critical section
+ }
+ catch(Exception& ex)
+ {
+ std::cerr << ex.what() << std::endl;
+ try
+ {
+ (task)->disconnectService();
+ traceExec(task, "disconnectService",ComputePlacement(task));
+ }
+ catch(...)
+ {
+ // Disconnect has failed
+ traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
+ }
+ {//Critical section
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+ task->aborted();
+ _mainSched->notifyFrom(task,YACS::ABORT,this);
+ }//End of critical section
+ }
+ catch(...)
+ {
+ std::cerr << "Problem in connectService" << std::endl;
+ try
+ {
+ (task)->disconnectService();
+ traceExec(task, "disconnectService",ComputePlacement(task));
+ }
+ catch(...)
+ {
+ // Disconnect has failed
+ traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
+ }
+ {//Critical section
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+ task->aborted();
+ _mainSched->notifyFrom(task,YACS::ABORT,this);
+ }//End of critical section
+ }
+ if(task->getState() == YACS::ERROR)
+ {
+ //try to put all coupled tasks in error
+ std::set<Task*> coupledSet;
+ task->getCoupledTasks(coupledSet);
+ for (std::set<Task*>::iterator it = coupledSet.begin(); it != coupledSet.end(); ++it)
+ {
+ Task* t=*it;
+ if(t == task)continue;
+ if(t->getState() == YACS::ERROR)continue;
+ try
+ {
+ t->disconnectService();
+ traceExec(t, "disconnectService",ComputePlacement(task));
+ }
+ catch(...)
+ {
+ // Disconnect has failed
+ traceExec(t, "disconnectService failed, ABORT",ComputePlacement(task));
+ }
+ {//Critical section
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+ t->aborted();
+ _mainSched->notifyFrom(t,YACS::ABORT,this);
+ }//End of critical section
+ traceExec(t, "state:"+Node::getStateName(t->getState()),ComputePlacement(task));
+ }
+ }
+ traceExec(task, "state:"+Node::getStateName(task->getState()),ComputePlacement(task));
+}
+
+#include "Runtime.hxx"
+static
+void loadResources(WorkloadManager::WorkloadManager& wm)
+{
+ Runtime *r(getRuntime());
+ if(!r)
+ throw YACS::Exception("loadResources : no runtime !");
+ std::vector< std::pair<std::string,int> > data(r->getCatalogOfComputeNodes());
+ int id = 0;
+ for(const std::pair<std::string,int>& res : data)
+ {
+ WorkloadManager::Resource newResource;
+ newResource.name = res.first;
+ newResource.id = id;
+ id++;
+ newResource.nbCores = res.second;
+ wm.addResource(newResource);
+ }
+}
+
+class NewTask : public WorkloadManager::Task
+{
+public:
+ NewTask(Executor& executor, YACS::ENGINE::Task* yacsTask);
+ const WorkloadManager::ContainerType& type()const override;
+ void run(const WorkloadManager::RunInfo& runInfo)override;
+private:
+ WorkloadManager::ContainerType _type;
+ Executor& _executor;
+ YACS::ENGINE::Task * _yacsTask;
+};
+
+NewTask::NewTask(Executor& executor, YACS::ENGINE::Task* yacsTask)
+: _type()
+, _executor(executor)
+, _yacsTask(yacsTask)
+{
+ Container * yacsContainer = yacsTask->getContainer();
+ if(yacsContainer != nullptr && yacsTask->canAcceptImposedResource())
+ {
+ _type.ignoreResources = false;
+ _type.name = yacsContainer->getName();
+ std::string nb_procs_str = yacsContainer->getProperty("nb_parallel_procs");
+ float needed_cores = 0.0;
+ if(!nb_procs_str.empty())
+ needed_cores = std::stof(nb_procs_str);
+ _type.neededCores = needed_cores;
+ }
+ else
+ {
+ _type.ignoreResources = true;
+ _type.name = "test";
+ _type.neededCores = 0;
+ }
+ _type.id = 0;
+}
+
+const WorkloadManager::ContainerType& NewTask::type()const
+{
+ return _type;
+}
+
+void NewTask::run(const WorkloadManager::RunInfo& runInfo)
+{
+ _executor.loadTask(_yacsTask, runInfo);
+ _executor.makeDatastreamConnections(_yacsTask);
+ YACS::Event ev = _executor.runTask(_yacsTask);
+ _executor.endTask(_yacsTask, ev);
+ delete this; // provisoire
+}
+
+void Executor::newRun(Scheduler *graph,int debug, bool fromScratch)
+{
+ DEBTRACE("Executor::newRun debug: "<< graph->getName() <<" "<< debug<<" fromScratch: "<<fromScratch);
+ { // --- Critical section
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+ _mainSched = graph;
+ _root = dynamic_cast<ComposedNode *>(_mainSched);
+ if (!_root) throw Exception("Executor::Run, Internal Error!");
+ _executorState = YACS::NOTYETINITIALIZED;
+ sendEvent("executor");
+ _toContinue=true;
+ _isOKToEnd = false;
+ _errorDetected = false;
+ _isWaitingEventsFromRunningTasks = false;
+ _numberOfRunningTasks = 0;
+ _runningTasks.clear();
+ _numberOfEndedTasks = 0;
+ string tracefile = "traceExec_";
+ tracefile += _mainSched->getName();
+ _trace.open(tracefile.c_str());
+ _start = std::chrono::steady_clock::now();
+ } // --- End of critical section
+
+ if (debug > 1) _displayDot(graph);
+
+ if (fromScratch)
+ {
+ try
+ {
+ graph->init();
+ graph->exUpdateState();
+ }
+ catch(Exception& ex)
+ {
+ DEBTRACE("exception: "<< (ex.what()));
+ _executorState = YACS::FINISHED;
+ sendEvent("executor");
+ throw;
+ }
+ }
+ _executorState = YACS::INITIALISED;
+ sendEvent("executor");
+
+ if (debug > 1) _displayDot(graph);
+
+ bool isMore;
+ int problemCount=0;
+ int numberAllTasks;
+
+ _executorState = YACS::RUNNING;
+ sendEvent("executor");
+
+ WorkloadManager::DefaultAlgorithm algo;
+ WorkloadManager::WorkloadManager wlm(algo);
+ loadResources(wlm);
+ wlm.start();
+
+ while (_toContinue)
+ {
+ DEBTRACE("--- executor main loop");
+ sleepWhileNoEventsFromAnyRunningTask();
+ DEBTRACE("--- events...");
+ if (debug > 2) _displayDot(graph);
+ { // --- Critical section
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+ std::vector<Task *> readyTasks=graph->getNextTasks(isMore);
+ graph->selectRunnableTasks(readyTasks);
+ _tasks.clear();
+ for(Task * t : readyTasks)
+ if(_runningTasks.find(t) == _runningTasks.end())
+ _tasks.push_back(t);
+ // TODO: to be removed
+ FilterTasksConsideringContainers(_tasks);
+ numberAllTasks=_numberOfRunningTasks+_tasks.size();
+ } // --- End of critical section
+ if (debug > 2) _displayDot(graph);
+ DEBTRACE("--- events...");
+ if (_executorState == YACS::RUNNING)
+ {
+ if (checkBreakPoints()) break; // end of thread requested, OK to exit at once;
+ for(Task * task : _tasks)
+ {
+ beginTask(task);
+ NewTask* newTask = new NewTask(*this, task);
+ wlm.addTask(newTask);
+ }
+ }
+ if (debug > 1) _displayDot(graph);
+ { // --- Critical section
+ DEBTRACE("---");
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+ //It is possible that the graph is finished but it remains running tasks (it's an error but we must take it into account)
+ _toContinue = !graph->isFinished();
+
+ DEBTRACE("_numberOfRunningTasks: " << _numberOfRunningTasks);
+ DEBTRACE("_numberOfEndedTasks: " << _numberOfEndedTasks);
+ DEBTRACE("_toContinue: " << _toContinue);
+ if(_toContinue && numberAllTasks==0)
+ {
+ //Problem : no running tasks and no task to launch ??
+ problemCount++;
+ std::cerr << "Problem in Executor : no running tasks and no task to launch ?? problemCount=" << problemCount << std::endl;
+ //Pause to give a chance to interrupt
+ usleep(1000);
+ if(problemCount > 25)
+ {
+ // Too much problems encountered : stop execution
+ _toContinue=false;
+ }
+ }
+
+ if (! _toContinue)
+ {
+ _executorState = YACS::FINISHED;
+ sendEvent("executor");
+ _condForPilot.notify_all();
+ }
+ } // --- End of critical section
+ if (debug > 0) _displayDot(graph);
+ DEBTRACE("_toContinue: " << _toContinue);
+ }
+
+ wlm.stop();
+ DEBTRACE("End of main Loop");
+
+ { // --- Critical section
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+ if ( _toContinue) // --- break while(): request to stop detected on checkBreakPoints()
+ {
+ DEBTRACE("stop requested: End soon");
+ _executorState = YACS::STOPPED;
+ _toContinue = false;
+ sendEvent("executor");
+ }
+ } // --- End of critical section
+ if ( _dumpOnErrorRequested && _errorDetected)
+ {
+ saveState(_dumpErrorFile);
+ }
+ {
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForTrace);
+ _trace.close();
+ }
+ DEBTRACE("End of RunB thread");
+}
+
+void Executor::RunW(Scheduler *graph,int debug, bool fromScratch)
+{
+ std::string str_value = graph->getProperty("executor");
+ if(str_value == "WorkloadManager"
+ || str_value == "WORKLOADMANAGER"
+ || str_value == "workloadmanager"
+ || str_value == "WorkLoadManager")
+ newRun(graph, debug, fromScratch);
+ else
+ RunB(graph, debug, fromScratch);
+}