+
+///////// NEW EXECUTOR ////////////////////////////////
+void Executor::loadTask(Task *task)
+{
+ if(task->getState() != YACS::TOLOAD)
+ return;
+ traceExec(task, "state:TOLOAD", ComputePlacement(task));
+ {//Critical section
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+ _mainSched->notifyFrom(task,YACS::START,this);
+ }//End of critical section
+ try
+ {
+ traceExec(task, "load", ComputePlacement(task));
+ task->load();
+ traceExec(task, "initService", ComputePlacement(task));
+ task->initService();
+ }
+ catch(Exception& ex)
+ {
+ std::cerr << ex.what() << std::endl;
+ {//Critical section
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+ task->aborted();
+ _mainSched->notifyFrom(task,YACS::ABORT, this);
+ traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
+ }//End of critical section
+ }
+ catch(...)
+ {
+ std::cerr << "Load failed" << std::endl;
+ {//Critical section
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+ task->aborted();
+ _mainSched->notifyFrom(task,YACS::ABORT, this);
+ traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
+ }//End of critical section
+ }
+}
+
+void Executor::beginTask(Task *task)
+{
+ // --- Critical section
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+ _numberOfRunningTasks++;
+ _runningTasks.insert(task);
+ // --- End of critical section
+}
+
+void Executor::endTask(Task *task, YACS::Event ev)
+{
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+ try
+ {
+ if (ev == YACS::FINISH) task->finished();
+ if (ev == YACS::ABORT)
+ {
+ _errorDetected = true;
+ if (_stopOnErrorRequested)
+ {
+ _execMode = YACS::STEPBYSTEP;
+ _isOKToEnd = true;
+ }
+ task->aborted();
+ }
+ //traceExec(task, "state:"+Node::getStateName(task->getState()),placement);
+ _mainSched->notifyFrom(task,ev,this);
+ }
+ catch(Exception& ex)
+ {
+ //notify has failed : it is supposed to have set state
+ //so no need to do anything
+ std::cerr << "Error during notification" << std::endl;
+ std::cerr << ex.what() << std::endl;
+ }
+ catch(...)
+ {
+ //notify has failed : it is supposed to have set state
+ //so no need to do anything
+ std::cerr << "Notification failed" << std::endl;
+ }
+ _numberOfRunningTasks--;
+ _runningTasks.erase(task);
+ DEBTRACE("_numberOfRunningTasks: " << _numberOfRunningTasks
+ << " _execMode: " << _execMode
+ << " _executorState: " << _executorState);
+ if ((_numberOfRunningTasks == 0) && (_execMode != YACS::CONTINUE)) // no more running tasks
+ {
+ if (_executorState == YACS::WAITINGTASKS)
+ {
+ _executorState = YACS::PAUSED;
+ sendEvent("executor");
+ _condForPilot.notify_all();
+ if (_errorDetected &&
+ _stopOnErrorRequested &&
+ !_isRunningunderExternalControl)
+ _condForStepByStep.notify_all(); // exec thread may be on waitResume
+ }
+ }
+ if (_executorState != YACS::PAUSED)
+ wakeUp();
+}
+
+YACS::Event Executor::runTask(Task *task)
+{
+ { // --- Critical section
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+ task->begin(); //change state to ACTIVATED
+ }
+ traceExec(task, "state:"+Node::getStateName(task->getState()),ComputePlacement(task));
+
+ if(getDPLScopeSensitive())
+ {
+ Node *node(dynamic_cast<Node *>(task));
+ ComposedNode *gfn(dynamic_cast<ComposedNode *>(_mainSched));
+ if(node!=0 && gfn!=0)
+ node->applyDPLScope(gfn);
+ }
+
+ YACS::Event ev=YACS::FINISH;
+ try
+ {
+ traceExec(task, "start execution",ComputePlacement(task));
+ task->execute();
+ traceExec(task, "end execution OK",ComputePlacement(task));
+ }
+ catch(Exception& ex)
+ {
+ std::cerr << "YACS Exception during execute" << std::endl;
+ std::cerr << ex.what() << std::endl;
+ ev=YACS::ABORT;
+ string message = "end execution ABORT, ";
+ message += ex.what();
+ traceExec(task, message,ComputePlacement(task));
+ }
+ catch(...)
+ {
+ // Execution has failed
+ std::cerr << "Execution has failed: unknown reason" << std::endl;
+ ev=YACS::ABORT;
+ traceExec(task, "end execution ABORT, unknown reason",ComputePlacement(task));
+ }
+
+ // Disconnect task
+ try
+ {
+ DEBTRACE("task->disconnectService()");
+ task->disconnectService();
+ traceExec(task, "disconnectService",ComputePlacement(task));
+ }
+ catch(...)
+ {
+ // Disconnect has failed
+ std::cerr << "disconnect has failed" << std::endl;
+ ev=YACS::ABORT;
+ traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
+ }
+ //
+
+ std::string placement(ComputePlacement(task));
+
+ // container management for HomogeneousPoolOfContainer
+
+ HomogeneousPoolContainer *contC(dynamic_cast<HomogeneousPoolContainer *>(task->getContainer()));
+ if(contC)
+ {
+ std::lock_guard<std::mutex> alckCont(contC->getLocker());
+ contC->release(task);
+ }
+
+ return ev;
+}
+
+void Executor::makeDatastreamConnections(Task *task)
+{
+ YACS::StatesForNode state=task->getState();
+ if(state != YACS::TOLOAD && state != YACS::TORECONNECT)
+ return;
+ try
+ {
+ task->connectService();
+ traceExec(task, "connectService",ComputePlacement(task));
+ {//Critical section
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+ task->connected();
+ }//End of critical section
+ }
+ catch(Exception& ex)
+ {
+ std::cerr << ex.what() << std::endl;
+ try
+ {
+ (task)->disconnectService();
+ traceExec(task, "disconnectService",ComputePlacement(task));
+ }
+ catch(...)
+ {
+ // Disconnect has failed
+ traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
+ }
+ {//Critical section
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+ task->aborted();
+ _mainSched->notifyFrom(task,YACS::ABORT,this);
+ }//End of critical section
+ }
+ catch(...)
+ {
+ std::cerr << "Problem in connectService" << std::endl;
+ try
+ {
+ (task)->disconnectService();
+ traceExec(task, "disconnectService",ComputePlacement(task));
+ }
+ catch(...)
+ {
+ // Disconnect has failed
+ traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
+ }
+ {//Critical section
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+ task->aborted();
+ _mainSched->notifyFrom(task,YACS::ABORT,this);
+ }//End of critical section
+ }
+ if(task->getState() == YACS::ERROR)
+ {
+ //try to put all coupled tasks in error
+ std::set<Task*> coupledSet;
+ task->getCoupledTasks(coupledSet);
+ for (std::set<Task*>::iterator it = coupledSet.begin(); it != coupledSet.end(); ++it)
+ {
+ Task* t=*it;
+ if(t == task)continue;
+ if(t->getState() == YACS::ERROR)continue;
+ try
+ {
+ t->disconnectService();
+ traceExec(t, "disconnectService",ComputePlacement(task));
+ }
+ catch(...)
+ {
+ // Disconnect has failed
+ traceExec(t, "disconnectService failed, ABORT",ComputePlacement(task));
+ }
+ {//Critical section
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+ t->aborted();
+ _mainSched->notifyFrom(t,YACS::ABORT,this);
+ }//End of critical section
+ traceExec(t, "state:"+Node::getStateName(t->getState()),ComputePlacement(task));
+ }
+ }
+ traceExec(task, "state:"+Node::getStateName(task->getState()),ComputePlacement(task));
+}
+
+#include "workloadmanager/WorkloadManager.hxx"
+#include "workloadmanager/DefaultAlgorithm.hxx"
+#include "Runtime.hxx"
+static
+void loadResources(WorkloadManager::WorkloadManager& wm)
+{
+ Runtime *r(getRuntime());
+ if(!r)
+ throw YACS::Exception("loadResources : no runtime !");
+ std::vector< std::pair<std::string,int> > data(r->getCatalogOfComputeNodes());
+ int id = 0;
+ for(const std::pair<std::string,int>& res : data)
+ {
+ WorkloadManager::Resource newResource;
+ newResource.name = res.first;
+ newResource.id = id;
+ id++;
+ newResource.nbCores = res.second;
+ wm.addResource(newResource);
+ }
+}
+
+class NewTask : public WorkloadManager::Task
+{
+public:
+ NewTask(Executor& executor, YACS::ENGINE::Task* yacsTask);
+ const WorkloadManager::ContainerType& type()const override;
+ void run(const WorkloadManager::Container& c)override;
+private:
+ WorkloadManager::ContainerType _type;
+ Executor& _executor;
+ YACS::ENGINE::Task * _yacsTask;
+};
+
+NewTask::NewTask(Executor& executor, YACS::ENGINE::Task* yacsTask)
+: _type()
+, _executor(executor)
+, _yacsTask(yacsTask)
+{
+ _type.neededCores = 0;
+ _type.id = 0;
+ _type.name = "test";
+}
+
+const WorkloadManager::ContainerType& NewTask::type()const
+{
+ return _type;
+}
+
+void NewTask::run(const WorkloadManager::Container& c)
+{
+ _executor.loadTask(_yacsTask);
+ _executor.makeDatastreamConnections(_yacsTask);
+ YACS::Event ev = _executor.runTask(_yacsTask);
+ _executor.endTask(_yacsTask, ev);
+ delete this; // provisoire
+}
+
+void Executor::newRun(Scheduler *graph,int debug, bool fromScratch)
+{
+ DEBTRACE("Executor::newRun debug: "<< graph->getName() <<" "<< debug<<" fromScratch: "<<fromScratch);
+ { // --- Critical section
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+ _mainSched = graph;
+ _root = dynamic_cast<ComposedNode *>(_mainSched);
+ if (!_root) throw Exception("Executor::Run, Internal Error!");
+ _executorState = YACS::NOTYETINITIALIZED;
+ sendEvent("executor");
+ _toContinue=true;
+ _isOKToEnd = false;
+ _errorDetected = false;
+ _isWaitingEventsFromRunningTasks = false;
+ _numberOfRunningTasks = 0;
+ _runningTasks.clear();
+ _numberOfEndedTasks = 0;
+ string tracefile = "traceExec_";
+ tracefile += _mainSched->getName();
+ _trace.open(tracefile.c_str());
+ _start = std::chrono::steady_clock::now();
+ } // --- End of critical section
+
+ if (debug > 1) _displayDot(graph);
+
+ if (fromScratch)
+ {
+ try
+ {
+ graph->init();
+ graph->exUpdateState();
+ }
+ catch(Exception& ex)
+ {
+ DEBTRACE("exception: "<< (ex.what()));
+ _executorState = YACS::FINISHED;
+ sendEvent("executor");
+ throw;
+ }
+ }
+ _executorState = YACS::INITIALISED;
+ sendEvent("executor");
+
+ if (debug > 1) _displayDot(graph);
+
+ bool isMore;
+ int problemCount=0;
+ int numberAllTasks;
+
+ _executorState = YACS::RUNNING;
+ sendEvent("executor");
+
+ WorkloadManager::DefaultAlgorithm algo;
+ WorkloadManager::WorkloadManager wlm(algo);
+ loadResources(wlm);
+ wlm.start();
+
+ while (_toContinue)
+ {
+ DEBTRACE("--- executor main loop");
+ sleepWhileNoEventsFromAnyRunningTask();
+ DEBTRACE("--- events...");
+ if (debug > 2) _displayDot(graph);
+ { // --- Critical section
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+ std::vector<Task *> readyTasks=graph->getNextTasks(isMore);
+ graph->selectRunnableTasks(readyTasks);
+ _tasks.clear();
+ for(Task * t : readyTasks)
+ if(_runningTasks.find(t) == _runningTasks.end())
+ _tasks.push_back(t);
+ numberAllTasks=_numberOfRunningTasks+_tasks.size();
+ } // --- End of critical section
+ if (debug > 2) _displayDot(graph);
+ DEBTRACE("--- events...");
+ if (_executorState == YACS::RUNNING)
+ {
+ if (checkBreakPoints()) break; // end of thread requested, OK to exit at once;
+ for(Task * task : _tasks)
+ {
+ beginTask(task);
+ NewTask* newTask = new NewTask(*this, task);
+ wlm.addTask(newTask);
+ }
+ }
+ if (debug > 1) _displayDot(graph);
+ { // --- Critical section
+ DEBTRACE("---");
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+ //It is possible that the graph is finished but it remains running tasks (it's an error but we must take it into account)
+ _toContinue = !graph->isFinished();
+
+ DEBTRACE("_numberOfRunningTasks: " << _numberOfRunningTasks);
+ DEBTRACE("_numberOfEndedTasks: " << _numberOfEndedTasks);
+ DEBTRACE("_toContinue: " << _toContinue);
+ if(_toContinue && numberAllTasks==0)
+ {
+ //Problem : no running tasks and no task to launch ??
+ problemCount++;
+ std::cerr << "Problem in Executor : no running tasks and no task to launch ?? problemCount=" << problemCount << std::endl;
+ //Pause to give a chance to interrupt
+ usleep(1000);
+ if(problemCount > 25)
+ {
+ // Too much problems encountered : stop execution
+ _toContinue=false;
+ }
+ }
+
+ if (! _toContinue)
+ {
+ _executorState = YACS::FINISHED;
+ sendEvent("executor");
+ _condForPilot.notify_all();
+ }
+ } // --- End of critical section
+ if (debug > 0) _displayDot(graph);
+ DEBTRACE("_toContinue: " << _toContinue);
+ }
+
+ wlm.stop();
+ DEBTRACE("End of main Loop");
+
+ { // --- Critical section
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+ if ( _toContinue) // --- break while(): request to stop detected on checkBreakPoints()
+ {
+ DEBTRACE("stop requested: End soon");
+ _executorState = YACS::STOPPED;
+ _toContinue = false;
+ sendEvent("executor");
+ }
+ } // --- End of critical section
+ if ( _dumpOnErrorRequested && _errorDetected)
+ {
+ saveState(_dumpErrorFile);
+ }
+ {
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForTrace);
+ _trace.close();
+ }
+ DEBTRACE("End of RunB thread");
+}