#include "Scheduler.hxx"
#include "Dispatcher.hxx"
#include "Container.hxx"
+#include "HomogeneousPoolContainer.hxx"
#include "ComponentInstance.hxx"
#include "VisitorSaveState.hxx"
if(debug>2)_displayDot(graph);
{//Critical section
- YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
tasks=graph->getNextTasks(isMore);
graph->selectRunnableTasks(tasks);
}//End of critical section
if(debug>1)_displayDot(graph);
{//Critical section
- YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
_toContinue=!graph->isFinished();
}//End of critical section
DEBTRACE("_toContinue: " << _toContinue);
DEBTRACE("Executor::RunB debug: "<< graph->getName() <<" "<< debug<<" fromScratch: "<<fromScratch);
{ // --- Critical section
- YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
_mainSched = graph;
_root = dynamic_cast<ComposedNode *>(_mainSched);
if (!_root) throw Exception("Executor::Run, Internal Error!");
DEBTRACE("--- events...");
if (debug > 2) _displayDot(graph);
{ // --- Critical section
- YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
_tasks=graph->getNextTasks(isMore);
numberAllTasks=_numberOfRunningTasks+_tasks.size();
graph->selectRunnableTasks(_tasks);
+ FilterTasksConsideringContainers(_tasks);
} // --- End of critical section
if (debug > 2) _displayDot(graph);
if (_executorState == YACS::RUNNING)
if (debug > 1) _displayDot(graph);
{ // --- Critical section
DEBTRACE("---");
- YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
//It is possible that the graph is finished but it remains running tasks (it's an error but we must take it into account)
if(_numberOfRunningTasks == 0)
_toContinue = !graph->isFinished();
DEBTRACE("End of main Loop");
{ // --- Critical section
- YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
if ( _toContinue) // --- break while(): request to stop detected on checkBreakPoints()
{
DEBTRACE("stop requested: End soon");
void Executor::setStopOnError(bool dumpRequested, std::string xmlFile)
{
{ // --- Critical section
- YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
_dumpErrorFile=xmlFile;
_stopOnErrorRequested=true;
_dumpOnErrorRequested = dumpRequested;
void Executor::unsetStopOnError()
{
{ // --- Critical section
- YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
_stopOnErrorRequested=false;
} // --- End of critical section
}
{
DEBTRACE("Executor::setExecMode(YACS::ExecutionMode mode) " << mode);
{ // --- Critical section
- YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
_isRunningunderExternalControl=true;
_execMode = mode;
} // --- End of critical section
bool ret = false;
//bool doDump = false;
{ // --- Critical section
- YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
_isRunningunderExternalControl=true;
DEBTRACE("_executorState: " << _executorState);
switch (_executorState)
{
DEBTRACE("Executor::setListOfBreakPoints(std::list<std::string> listOfBreakPoints)");
{ // --- Critical section
- YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
_isRunningunderExternalControl=true;
_listOfBreakPoints = listOfBreakPoints;
} // --- End of critical section
list<string> listOfNodesToLoad;
listOfNodesToLoad.clear();
{ // --- Critical section
- YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
_isRunningunderExternalControl=true;
switch (_executorState)
{
vector<Task *>::iterator iter;
vector<Task *> restrictedTasks;
{ // --- Critical section
- YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
_isRunningunderExternalControl=true;
switch (_executorState)
{
{
DEBTRACE("Executor::waitPause()" << _executorState);
{ // --- Critical section
- YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
_isRunningunderExternalControl=true;
switch (_executorState)
{
{
bool stop = false;
{ // --- Critical section
- YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
_tasksSave = _tasks;
for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
{
case YACS::STEPBYSTEP:
{
{ // --- Critical section
- YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
_tasksSave = _tasks;
_listOfTasksToLoad.clear();
for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
{
DEBTRACE("Executor::loadTask(Task *task)");
if(task->getState() != YACS::TOLOAD)return;
- traceExec(task, "state:TOLOAD");
+ traceExec(task, "state:TOLOAD", ComputePlacement(task));
{//Critical section
- YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
_mainSched->notifyFrom(task,YACS::START);
}//End of critical section
try
{
- traceExec(task, "load");
+ traceExec(task, "load", ComputePlacement(task));
task->load();
- traceExec(task, "initService");
+ traceExec(task, "initService", ComputePlacement(task));
task->initService();
}
catch(Exception& ex)
{
std::cerr << ex.what() << std::endl;
{//Critical section
- YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
task->aborted();
_mainSched->notifyFrom(task,YACS::ABORT);
- traceExec(task, "state:"+Node::getStateName(task->getState()));
+ traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
}//End of critical section
}
catch(...)
{
std::cerr << "Load failed" << std::endl;
{//Critical section
- YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
task->aborted();
_mainSched->notifyFrom(task,YACS::ABORT);
- traceExec(task, "state:"+Node::getStateName(task->getState()));
+ traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
}//End of critical section
}
}
try
{
(*iter)->connectService();
- traceExec(*iter, "connectService");
+ traceExec(*iter, "connectService",ComputePlacement(*iter));
{//Critical section
- YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
(*iter)->connected();
}//End of critical section
}
try
{
(*iter)->disconnectService();
- traceExec(*iter, "disconnectService");
+ traceExec(*iter, "disconnectService",ComputePlacement(*iter));
}
catch(...)
{
// Disconnect has failed
- traceExec(*iter, "disconnectService failed, ABORT");
+ traceExec(*iter, "disconnectService failed, ABORT",ComputePlacement(*iter));
}
{//Critical section
- YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
(*iter)->aborted();
_mainSched->notifyFrom(*iter,YACS::ABORT);
}//End of critical section
try
{
(*iter)->disconnectService();
- traceExec(*iter, "disconnectService");
+ traceExec(*iter, "disconnectService",ComputePlacement(*iter));
}
catch(...)
{
// Disconnect has failed
- traceExec(*iter, "disconnectService failed, ABORT");
+ traceExec(*iter, "disconnectService failed, ABORT",ComputePlacement(*iter));
}
{//Critical section
- YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
(*iter)->aborted();
_mainSched->notifyFrom(*iter,YACS::ABORT);
}//End of critical section
try
{
t->disconnectService();
- traceExec(t, "disconnectService");
+ traceExec(t, "disconnectService",ComputePlacement(*iter));
}
catch(...)
{
// Disconnect has failed
- traceExec(t, "disconnectService failed, ABORT");
+ traceExec(t, "disconnectService failed, ABORT",ComputePlacement(*iter));
}
{//Critical section
- YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
t->aborted();
_mainSched->notifyFrom(t,YACS::ABORT);
}//End of critical section
- traceExec(t, "state:"+Node::getStateName(t->getState()));
+ traceExec(t, "state:"+Node::getStateName(t->getState()),ComputePlacement(*iter));
}
}
- traceExec(*iter, "state:"+Node::getStateName((*iter)->getState()));
+ traceExec(*iter, "state:"+Node::getStateName((*iter)->getState()),ComputePlacement(*iter));
}
//Second phase, execute each task in a thread
args->sched = _mainSched;
args->execInst = this;
- traceExec(task, "launch");
+ traceExec(task, "launch",ComputePlacement(task));
{ // --- Critical section
- YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
_numberOfRunningTasks++;
_runningTasks.insert(task);
task->begin(); //change state to ACTIVATED
{
DEBTRACE("Executor::sleepWhileNoEventsFromAnyRunningTask()");
// _semForNewTasksToPerform.wait(); //----utiliser pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
- YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate);
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
if (_numberOfRunningTasks > 0 && _numberOfEndedTasks==0)
{
_isWaitingEventsFromRunningTasks = true;
int Executor::getNbOfThreads()
{
int ret;
- YACS::BASES::AutoLocker alck(&_mutexForNbOfConcurrentThreads);
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForNbOfConcurrentThreads);
_isRunningunderExternalControl=true;
ret = _groupOfAllThreadsCreated.size();
return ret;
Scheduler *sched=args->sched;
Executor *execInst=args->execInst;
delete args;
- execInst->traceExec(task, "state:"+Node::getStateName(task->getState()));
+ execInst->traceExec(task, "state:"+Node::getStateName(task->getState()),ComputePlacement(task));
Thread::detach();
YACS::Event ev=YACS::FINISH;
try
{
- execInst->traceExec(task, "start execution");
+ execInst->traceExec(task, "start execution",ComputePlacement(task));
task->execute();
- execInst->traceExec(task, "end execution OK");
+ execInst->traceExec(task, "end execution OK",ComputePlacement(task));
}
catch(Exception& ex)
{
ev=YACS::ABORT;
string message = "end execution ABORT, ";
message += ex.what();
- execInst->traceExec(task, message);
+ execInst->traceExec(task, message,ComputePlacement(task));
}
catch(...)
{
// Execution has failed
std::cerr << "Execution has failed: unknown reason" << std::endl;
ev=YACS::ABORT;
- execInst->traceExec(task, "end execution ABORT, unknown reason");
+ execInst->traceExec(task, "end execution ABORT, unknown reason",ComputePlacement(task));
}
// Disconnect task
{
DEBTRACE("task->disconnectService()");
task->disconnectService();
- execInst->traceExec(task, "disconnectService");
+ execInst->traceExec(task, "disconnectService",ComputePlacement(task));
}
catch(...)
{
// Disconnect has failed
std::cerr << "disconnect has failed" << std::endl;
ev=YACS::ABORT;
- execInst->traceExec(task, "disconnectService failed, ABORT");
+ execInst->traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
+ }
+ //
+
+ std::string placement(ComputePlacement(task));
+
+ // container management for HomogeneousPoolOfContainer
+
+ HomogeneousPoolContainer *contC(dynamic_cast<HomogeneousPoolContainer *>(task->getContainer()));
+ if(contC)
+ {
+ YACS::BASES::AutoLocker<Container> alckCont(contC);
+ contC->release(task);
}
DEBTRACE("End task->execute()");
{ // --- Critical section
- YACS::BASES::AutoLocker alck(&execInst->_mutexForSchedulerUpdate);
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&execInst->_mutexForSchedulerUpdate);
try
{
if (ev == YACS::FINISH) task->finished();
}
task->aborted();
}
- execInst->traceExec(task, "state:"+Node::getStateName(task->getState()));
+ execInst->traceExec(task, "state:"+Node::getStateName(task->getState()),placement);
sched->notifyFrom(task,ev);
}
catch(Exception& ex)
return 0;
}
-void Executor::traceExec(Task *task, const std::string& message)
+void Executor::traceExec(Task *task, const std::string& message, const std::string& placement)
{
string nodeName = _mainSched->getTaskName(task);
Container *cont = task->getContainer();
string containerName = "---";
- string placement = "---";
if (cont)
- {
- containerName = cont->getName();
- ComponentInstance *compo = task->getComponent();
- ServiceNode *taskCast(dynamic_cast<ServiceNode *>(task));
- if(taskCast)
- placement = cont->getFullPlacementId(taskCast);
- }
+ containerName = cont->getName();
+
#ifdef WIN32
DWORD now = timeGetTime();
double elapse = (now - _start)/1000.0;
double elapse = (now.tv_sec - _start.tv_sec) + double(now.tv_usec - _start.tv_usec)/1000000.0;
#endif
{
- YACS::BASES::AutoLocker alck(&_mutexForTrace);
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForTrace);
_trace << elapse << " " << containerName << " " << placement << " " << nodeName << " " << message << endl;
_trace << flush;
}
YASSERT(_root);
disp->dispatch(_root,event);
}
+
+/*!
+ * This method takes in input a list of tasks and selects from that lists a part of it considering only the containers.
+ * If tasks have no container instance subclass of HomogeneousPoolContainer this method will let the \a tsks untouched.
+ *
+ * \param [in,out] tsks - list of tasks to be
+ */
+void Executor::FilterTasksConsideringContainers(std::vector<Task *>& tsks)
+{
+ std::map<HomogeneousPoolContainer *, std::vector<Task *> > m;
+ for(std::vector<Task *>::const_iterator it=tsks.begin();it!=tsks.end();it++)
+ {
+ Task *cur(*it);
+ if(!cur)
+ continue;
+ Container *cont(cur->getContainer());
+ if(!cont)
+ {
+ m[(HomogeneousPoolContainer *)NULL].push_back(cur);
+ continue;
+ }
+ HomogeneousPoolContainer *contC(dynamic_cast<HomogeneousPoolContainer *>(cont));
+ if(!contC)
+ {
+ m[(HomogeneousPoolContainer *)NULL].push_back(cur);
+ continue;
+ }
+ m[contC].push_back(cur);
+ }
+ //
+ std::vector<Task *> ret;
+ for(std::map<HomogeneousPoolContainer *, std::vector<Task *> >::const_iterator it=m.begin();it!=m.end();it++)
+ {
+ HomogeneousPoolContainer *curhpc((*it).first);
+ const std::vector<Task *>& curtsks((*it).second);
+ if(!curhpc)
+ {
+ ret.insert(ret.end(),curtsks.begin(),curtsks.end());
+ }
+ else
+ {
+ // start of critical section for container curhpc
+ YACS::BASES::AutoLocker<Container> alckForCont(curhpc);
+ std::vector<const Task *> vecOfTaskSharingSameHPContToBeRunSimutaneously;
+ std::size_t sz(curhpc->getNumberOfFreePlace());
+ std::vector<Task *>::const_iterator it2(curtsks.begin());
+ for(std::size_t i=0;i<sz && it2!=curtsks.end();i++,it2++)
+ {
+ vecOfTaskSharingSameHPContToBeRunSimutaneously.push_back(*it2);
+ ret.push_back(*it2);
+ }
+ curhpc->allocateFor(vecOfTaskSharingSameHPContToBeRunSimutaneously);
+ //end of critical section
+ }
+ }
+ //
+ tsks=ret;
+}
+
+std::string Executor::ComputePlacement(Task *zeTask)
+{
+ std::string placement("---");
+ if(!zeTask)
+ return placement;
+ if(zeTask->getContainer())
+ placement=zeTask->getContainer()->getFullPlacementId(zeTask);
+ return placement;
+}