From b68ec0b66862adf39404d95ceddc736d1cc97dcc Mon Sep 17 00:00:00 2001 From: Ovidiu Mircescu Date: Mon, 20 Apr 2020 15:47:02 +0200 Subject: [PATCH] Work in progress : workload manager engine test ok --- CMakeLists.txt | 2 +- src/engine/CMakeLists.txt | 2 + src/engine/Executor.cxx | 483 +++++++++++++++++- src/engine/Executor.hxx | 22 +- .../Test/RuntimeForEngineIntegrationTest.cxx | 8 +- src/engine/Test/RuntimeForEngineTest.cxx | 8 +- src/workloadmanager/CMakeLists.txt | 2 +- src/workloadmanager/DefaultAlgorithm.cxx | 93 ++-- src/workloadmanager/DefaultAlgorithm.hxx | 38 +- src/workloadmanager/Task.hxx | 29 +- src/workloadmanager/Test/TestMain.cxx | 46 +- src/workloadmanager/WorkloadAlgorithm.hxx | 2 +- src/workloadmanager/WorkloadManager.cxx | 17 +- src/workloadmanager/WorkloadManager.hxx | 6 +- 14 files changed, 629 insertions(+), 129 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 064962876..11eda1188 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -286,7 +286,7 @@ INCLUDE(CMakePackageConfigHelpers) # List of targets in this project we want to make visible to the rest of the world. # They all have to be INSTALL'd with the option "EXPORT ${PROJECT_NAME}TargetGroup" SET(_${PROJECT_NAME}_exposed_targets - YACSBases YACSlibEngine + YACSBases YACSlibEngine YACSlibWorkloadmanager ) IF(SALOME_YACS_USE_KERNEL) diff --git a/src/engine/CMakeLists.txt b/src/engine/CMakeLists.txt index c6e9b85ea..d8f4d296d 100644 --- a/src/engine/CMakeLists.txt +++ b/src/engine/CMakeLists.txt @@ -31,12 +31,14 @@ ADD_DEFINITIONS( INCLUDE_DIRECTORIES( ${PTHREAD_INCLUDE_DIR} ${PROJECT_SOURCE_DIR}/src/bases + ${PROJECT_SOURCE_DIR}/src ) # libraries to link to SET(_link_LIBRARIES ${PTHREAD_LIBRARIES} YACSBases + YACSlibWorkloadmanager ) # --- headers --- diff --git a/src/engine/Executor.cxx b/src/engine/Executor.cxx index e4a76ec5f..2795d6a6d 100644 --- a/src/engine/Executor.cxx +++ b/src/engine/Executor.cxx @@ -251,12 +251,7 @@ void Executor::RunB(Scheduler *graph,int debug, bool fromScratch) string tracefile = "traceExec_"; tracefile += _mainSched->getName(); _trace.open(tracefile.c_str()); -#ifdef WIN32 - _start = timeGetTime(); -#else - gettimeofday(&_start, NULL); -#endif - + _start = std::chrono::steady_clock::now(); } // --- End of critical section if (debug > 1) _displayDot(graph); @@ -307,7 +302,6 @@ void Executor::RunB(Scheduler *graph,int debug, bool fromScratch) if (checkBreakPoints()) break; // end of thread requested, OK to exit at once; if (debug > 0) _displayDot(graph); DEBTRACE("---"); - //loadTasks(_tasks); loadParallelTasks(_tasks,this); if (debug > 1) _displayDot(graph); DEBTRACE("---"); @@ -899,12 +893,6 @@ struct threadargs Executor *execInst; }; -void Executor::loadTasks(const std::vector& tasks, const Executor *execInst) -{ - for(std::vector::const_iterator iter = _tasks.begin(); iter != _tasks.end(); iter++) - loadTask(*iter,execInst); -} - void Executor::loadParallelTasks(const std::vector& tasks, const Executor *execInst) { std::vector ths(tasks.size()); @@ -1311,14 +1299,10 @@ void Executor::traceExec(Task *task, const std::string& message, const std::stri if (cont) containerName = cont->getName(); -#ifdef WIN32 - DWORD now = timeGetTime(); - double elapse = (now - _start)/1000.0; -#else - timeval now; - gettimeofday(&now, NULL); - double elapse = (now.tv_sec - _start.tv_sec) + double(now.tv_usec - _start.tv_usec)/1000000.0; -#endif + std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now(); + std::chrono::milliseconds millisec; + millisec = std::chrono::duration_cast(now -_start); + double elapse = double(millisec.count()) / 1000.0; { YACS::BASES::AutoLocker alck(&_mutexForTrace); _trace << elapse << " " << containerName << " " << placement << " " << nodeName << " " << message << endl; @@ -1418,3 +1402,460 @@ std::string Executor::ComputePlacement(Task *zeTask) placement=zeTask->getContainer()->getFullPlacementId(zeTask); return placement; } + +///////// NEW EXECUTOR //////////////////////////////// +void Executor::loadTask(Task *task) +{ + if(task->getState() != YACS::TOLOAD) + return; + traceExec(task, "state:TOLOAD", ComputePlacement(task)); + {//Critical section + YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate); + _mainSched->notifyFrom(task,YACS::START,this); + }//End of critical section + try + { + traceExec(task, "load", ComputePlacement(task)); + task->load(); + traceExec(task, "initService", ComputePlacement(task)); + task->initService(); + } + catch(Exception& ex) + { + std::cerr << ex.what() << std::endl; + {//Critical section + YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate); + task->aborted(); + _mainSched->notifyFrom(task,YACS::ABORT, this); + traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task)); + }//End of critical section + } + catch(...) + { + std::cerr << "Load failed" << std::endl; + {//Critical section + YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate); + task->aborted(); + _mainSched->notifyFrom(task,YACS::ABORT, this); + traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task)); + }//End of critical section + } +} + +void Executor::beginTask(Task *task) +{ + // --- Critical section + YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate); + _numberOfRunningTasks++; + _runningTasks.insert(task); + // --- End of critical section +} + +void Executor::endTask(Task *task, YACS::Event ev) +{ + YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate); + try + { + if (ev == YACS::FINISH) task->finished(); + if (ev == YACS::ABORT) + { + _errorDetected = true; + if (_stopOnErrorRequested) + { + _execMode = YACS::STEPBYSTEP; + _isOKToEnd = true; + } + task->aborted(); + } + //traceExec(task, "state:"+Node::getStateName(task->getState()),placement); + _mainSched->notifyFrom(task,ev,this); + } + catch(Exception& ex) + { + //notify has failed : it is supposed to have set state + //so no need to do anything + std::cerr << "Error during notification" << std::endl; + std::cerr << ex.what() << std::endl; + } + catch(...) + { + //notify has failed : it is supposed to have set state + //so no need to do anything + std::cerr << "Notification failed" << std::endl; + } + _numberOfRunningTasks--; + _runningTasks.erase(task); + DEBTRACE("_numberOfRunningTasks: " << _numberOfRunningTasks + << " _execMode: " << _execMode + << " _executorState: " << _executorState); + if ((_numberOfRunningTasks == 0) && (_execMode != YACS::CONTINUE)) // no more running tasks + { + if (_executorState == YACS::WAITINGTASKS) + { + _executorState = YACS::PAUSED; + sendEvent("executor"); + _condForPilot.notify_all(); + if (_errorDetected && + _stopOnErrorRequested && + !_isRunningunderExternalControl) + _condForStepByStep.notify_all(); // exec thread may be on waitResume + } + } + if (_executorState != YACS::PAUSED) + wakeUp(); +} + +YACS::Event Executor::runTask(Task *task) +{ + { // --- Critical section + YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate); + task->begin(); //change state to ACTIVATED + } + traceExec(task, "state:"+Node::getStateName(task->getState()),ComputePlacement(task)); + + if(getDPLScopeSensitive()) + { + Node *node(dynamic_cast(task)); + ComposedNode *gfn(dynamic_cast(_mainSched)); + if(node!=0 && gfn!=0) + node->applyDPLScope(gfn); + } + + YACS::Event ev=YACS::FINISH; + try + { + traceExec(task, "start execution",ComputePlacement(task)); + task->execute(); + traceExec(task, "end execution OK",ComputePlacement(task)); + } + catch(Exception& ex) + { + std::cerr << "YACS Exception during execute" << std::endl; + std::cerr << ex.what() << std::endl; + ev=YACS::ABORT; + string message = "end execution ABORT, "; + message += ex.what(); + traceExec(task, message,ComputePlacement(task)); + } + catch(...) + { + // Execution has failed + std::cerr << "Execution has failed: unknown reason" << std::endl; + ev=YACS::ABORT; + traceExec(task, "end execution ABORT, unknown reason",ComputePlacement(task)); + } + + // Disconnect task + try + { + DEBTRACE("task->disconnectService()"); + task->disconnectService(); + traceExec(task, "disconnectService",ComputePlacement(task)); + } + catch(...) + { + // Disconnect has failed + std::cerr << "disconnect has failed" << std::endl; + ev=YACS::ABORT; + traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task)); + } + // + + std::string placement(ComputePlacement(task)); + + // container management for HomogeneousPoolOfContainer + + HomogeneousPoolContainer *contC(dynamic_cast(task->getContainer())); + if(contC) + { + std::lock_guard alckCont(contC->getLocker()); + contC->release(task); + } + + return ev; +} + +void Executor::makeDatastreamConnections(Task *task) +{ + YACS::StatesForNode state=task->getState(); + if(state != YACS::TOLOAD && state != YACS::TORECONNECT) + return; + try + { + task->connectService(); + traceExec(task, "connectService",ComputePlacement(task)); + {//Critical section + YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate); + task->connected(); + }//End of critical section + } + catch(Exception& ex) + { + std::cerr << ex.what() << std::endl; + try + { + (task)->disconnectService(); + traceExec(task, "disconnectService",ComputePlacement(task)); + } + catch(...) + { + // Disconnect has failed + traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task)); + } + {//Critical section + YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate); + task->aborted(); + _mainSched->notifyFrom(task,YACS::ABORT,this); + }//End of critical section + } + catch(...) + { + std::cerr << "Problem in connectService" << std::endl; + try + { + (task)->disconnectService(); + traceExec(task, "disconnectService",ComputePlacement(task)); + } + catch(...) + { + // Disconnect has failed + traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task)); + } + {//Critical section + YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate); + task->aborted(); + _mainSched->notifyFrom(task,YACS::ABORT,this); + }//End of critical section + } + if(task->getState() == YACS::ERROR) + { + //try to put all coupled tasks in error + std::set coupledSet; + task->getCoupledTasks(coupledSet); + for (std::set::iterator it = coupledSet.begin(); it != coupledSet.end(); ++it) + { + Task* t=*it; + if(t == task)continue; + if(t->getState() == YACS::ERROR)continue; + try + { + t->disconnectService(); + traceExec(t, "disconnectService",ComputePlacement(task)); + } + catch(...) + { + // Disconnect has failed + traceExec(t, "disconnectService failed, ABORT",ComputePlacement(task)); + } + {//Critical section + YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate); + t->aborted(); + _mainSched->notifyFrom(t,YACS::ABORT,this); + }//End of critical section + traceExec(t, "state:"+Node::getStateName(t->getState()),ComputePlacement(task)); + } + } + traceExec(task, "state:"+Node::getStateName(task->getState()),ComputePlacement(task)); +} + +#include "workloadmanager/WorkloadManager.hxx" +#include "workloadmanager/DefaultAlgorithm.hxx" +#include "Runtime.hxx" +static +void loadResources(WorkloadManager::WorkloadManager& wm) +{ + Runtime *r(getRuntime()); + if(!r) + throw YACS::Exception("loadResources : no runtime !"); + std::vector< std::pair > data(r->getCatalogOfComputeNodes()); + int id = 0; + for(const std::pair& res : data) + { + WorkloadManager::Resource newResource; + newResource.name = res.first; + newResource.id = id; + id++; + newResource.nbCores = res.second; + wm.addResource(newResource); + } +} + +class NewTask : public WorkloadManager::Task +{ +public: + NewTask(Executor& executor, YACS::ENGINE::Task* yacsTask); + const WorkloadManager::ContainerType& type()const override; + void run(const WorkloadManager::Container& c)override; +private: + WorkloadManager::ContainerType _type; + Executor& _executor; + YACS::ENGINE::Task * _yacsTask; +}; + +NewTask::NewTask(Executor& executor, YACS::ENGINE::Task* yacsTask) +: _type() +, _executor(executor) +, _yacsTask(yacsTask) +{ + _type.neededCores = 0; + _type.id = 0; + _type.name = "test"; +} + +const WorkloadManager::ContainerType& NewTask::type()const +{ + return _type; +} + +void NewTask::run(const WorkloadManager::Container& c) +{ + _executor.loadTask(_yacsTask); + _executor.makeDatastreamConnections(_yacsTask); + YACS::Event ev = _executor.runTask(_yacsTask); + _executor.endTask(_yacsTask, ev); + delete this; // provisoire +} + +void Executor::newRun(Scheduler *graph,int debug, bool fromScratch) +{ + DEBTRACE("Executor::newRun debug: "<< graph->getName() <<" "<< debug<<" fromScratch: "< alck(&_mutexForSchedulerUpdate); + _mainSched = graph; + _root = dynamic_cast(_mainSched); + if (!_root) throw Exception("Executor::Run, Internal Error!"); + _executorState = YACS::NOTYETINITIALIZED; + sendEvent("executor"); + _toContinue=true; + _isOKToEnd = false; + _errorDetected = false; + _isWaitingEventsFromRunningTasks = false; + _numberOfRunningTasks = 0; + _runningTasks.clear(); + _numberOfEndedTasks = 0; + string tracefile = "traceExec_"; + tracefile += _mainSched->getName(); + _trace.open(tracefile.c_str()); + _start = std::chrono::steady_clock::now(); + } // --- End of critical section + + if (debug > 1) _displayDot(graph); + + if (fromScratch) + { + try + { + graph->init(); + graph->exUpdateState(); + } + catch(Exception& ex) + { + DEBTRACE("exception: "<< (ex.what())); + _executorState = YACS::FINISHED; + sendEvent("executor"); + throw; + } + } + _executorState = YACS::INITIALISED; + sendEvent("executor"); + + if (debug > 1) _displayDot(graph); + + bool isMore; + int problemCount=0; + int numberAllTasks; + + _executorState = YACS::RUNNING; + sendEvent("executor"); + + WorkloadManager::DefaultAlgorithm algo; + WorkloadManager::WorkloadManager wlm(algo); + loadResources(wlm); + wlm.start(); + + while (_toContinue) + { + DEBTRACE("--- executor main loop"); + sleepWhileNoEventsFromAnyRunningTask(); + DEBTRACE("--- events..."); + if (debug > 2) _displayDot(graph); + { // --- Critical section + YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate); + std::vector readyTasks=graph->getNextTasks(isMore); + graph->selectRunnableTasks(readyTasks); + _tasks.clear(); + for(Task * t : readyTasks) + if(_runningTasks.find(t) == _runningTasks.end()) + _tasks.push_back(t); + numberAllTasks=_numberOfRunningTasks+_tasks.size(); + } // --- End of critical section + if (debug > 2) _displayDot(graph); + DEBTRACE("--- events..."); + if (_executorState == YACS::RUNNING) + { + if (checkBreakPoints()) break; // end of thread requested, OK to exit at once; + for(Task * task : _tasks) + { + beginTask(task); + NewTask* newTask = new NewTask(*this, task); + wlm.addTask(newTask); + } + } + if (debug > 1) _displayDot(graph); + { // --- Critical section + DEBTRACE("---"); + YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate); + //It is possible that the graph is finished but it remains running tasks (it's an error but we must take it into account) + _toContinue = !graph->isFinished(); + + DEBTRACE("_numberOfRunningTasks: " << _numberOfRunningTasks); + DEBTRACE("_numberOfEndedTasks: " << _numberOfEndedTasks); + DEBTRACE("_toContinue: " << _toContinue); + if(_toContinue && numberAllTasks==0) + { + //Problem : no running tasks and no task to launch ?? + problemCount++; + std::cerr << "Problem in Executor : no running tasks and no task to launch ?? problemCount=" << problemCount << std::endl; + //Pause to give a chance to interrupt + usleep(1000); + if(problemCount > 25) + { + // Too much problems encountered : stop execution + _toContinue=false; + } + } + + if (! _toContinue) + { + _executorState = YACS::FINISHED; + sendEvent("executor"); + _condForPilot.notify_all(); + } + } // --- End of critical section + if (debug > 0) _displayDot(graph); + DEBTRACE("_toContinue: " << _toContinue); + } + + wlm.stop(); + DEBTRACE("End of main Loop"); + + { // --- Critical section + YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate); + if ( _toContinue) // --- break while(): request to stop detected on checkBreakPoints() + { + DEBTRACE("stop requested: End soon"); + _executorState = YACS::STOPPED; + _toContinue = false; + sendEvent("executor"); + } + } // --- End of critical section + if ( _dumpOnErrorRequested && _errorDetected) + { + saveState(_dumpErrorFile); + } + { + YACS::BASES::AutoLocker alck(&_mutexForTrace); + _trace.close(); + } + DEBTRACE("End of RunB thread"); +} diff --git a/src/engine/Executor.hxx b/src/engine/Executor.hxx index cb1f72854..50168ebd9 100644 --- a/src/engine/Executor.hxx +++ b/src/engine/Executor.hxx @@ -37,6 +37,7 @@ #include #include #include +#include namespace YACS { @@ -88,17 +89,18 @@ namespace YACS bool _keepGoingOnFail; //! specifies if scope DynParaLoop is active or not. False by default. bool _DPLScopeSensitive; -#ifdef WIN32 - DWORD _start; -#else - timeval _start; -#endif + std::chrono::steady_clock::time_point _start; public: Executor(); virtual ~Executor(); void RunA(Scheduler *graph,int debug=0, bool fromScratch=true); - void RunW(Scheduler *graph,int debug=0, bool fromScratch=true) { RunB(graph, debug, fromScratch); } + void RunW(Scheduler *graph,int debug=0, bool fromScratch=true) + { + //RunB(graph, debug, fromScratch); + newRun(graph, debug, fromScratch); + } void RunB(Scheduler *graph,int debug=0, bool fromScratch=true); + void newRun(Scheduler *graph,int debug=0, bool fromScratch=true); void setKeepGoingProperty(bool newVal) { _keepGoingOnFail=newVal; } bool getKeepGoingProperty() const { return _keepGoingOnFail; } void setDPLScopeSensitive(bool newVal) { _DPLScopeSensitive=newVal; } @@ -125,11 +127,17 @@ namespace YACS static int _maxThreads; static size_t _threadStackSize; YACS::BASES::Mutex& getTheMutexForSchedulerUpdate() { return _mutexForSchedulerUpdate; } + ///// new executor !!!!! + void loadTask(Task *task); + YACS::Event runTask(Task *task); + void makeDatastreamConnections(Task *task); + void beginTask(Task *task); + void endTask(Task *task, YACS::Event ev); + //////////// protected: bool checkBreakPoints(); void waitResume(); void loadTask(Task *task, const Executor *execInst); - void loadTasks(const std::vector& tasks, const Executor *execInst); void loadParallelTasks(const std::vector& tasks, const Executor *execInst); void launchTasks(const std::vector& tasks); void launchTask(Task *task); diff --git a/src/engine/Test/RuntimeForEngineIntegrationTest.cxx b/src/engine/Test/RuntimeForEngineIntegrationTest.cxx index bb1ec1875..159e56f46 100644 --- a/src/engine/Test/RuntimeForEngineIntegrationTest.cxx +++ b/src/engine/Test/RuntimeForEngineIntegrationTest.cxx @@ -36,7 +36,13 @@ void RuntimeForEngineIntegrationTest::setRuntime() std::vector< std::pair > RuntimeForEngineIntegrationTest::getCatalogOfComputeNodes() const { - throw Exception("RuntimeForEngineIntegrationTest not implemented !"); + std::vector< std::pair > result(1); + std::pair localhost; + localhost.first = "localhost"; + localhost.second = 8; + result[0] = localhost; + return result; +// throw Exception("RuntimeForEngineTest::getCatalogOfComputeNodes : not implemented !"); } ElementaryNode* RuntimeForEngineIntegrationTest::createNode(const std::string& implementation, const std::string& name) throw (YACS::Exception) diff --git a/src/engine/Test/RuntimeForEngineTest.cxx b/src/engine/Test/RuntimeForEngineTest.cxx index d97035268..45eff9e91 100644 --- a/src/engine/Test/RuntimeForEngineTest.cxx +++ b/src/engine/Test/RuntimeForEngineTest.cxx @@ -115,7 +115,13 @@ void RuntimeForEngineTest::setRuntime() std::vector< std::pair > RuntimeForEngineTest::getCatalogOfComputeNodes() const { - throw Exception("RuntimeForEngineTest::getCatalogOfComputeNodes : not implemented !"); + std::vector< std::pair > result(1); + std::pair localhost; + localhost.first = "localhost"; + localhost.second = 8; + result[0] = localhost; + return result; +// throw Exception("RuntimeForEngineTest::getCatalogOfComputeNodes : not implemented !"); } ElementaryNode* RuntimeForEngineTest::createNode(const string& implementation, const string& name) throw(YACS::Exception) diff --git a/src/workloadmanager/CMakeLists.txt b/src/workloadmanager/CMakeLists.txt index 654ef6f32..ec3c10d4e 100644 --- a/src/workloadmanager/CMakeLists.txt +++ b/src/workloadmanager/CMakeLists.txt @@ -38,7 +38,7 @@ SET (_wlm_headers ADD_LIBRARY(YACSlibWorkloadmanager ${_wlm_sources}) TARGET_LINK_LIBRARIES(YACSlibWorkloadmanager ${_link_LIBRARIES}) INSTALL(TARGETS YACSlibWorkloadmanager EXPORT ${PROJECT_NAME}TargetGroup DESTINATION ${SALOME_INSTALL_LIBS}) -INSTALL(FILES ${_wlm_headers} DESTINATION ${SALOME_INSTALL_HEADERS}) +INSTALL(FILES ${_wlm_headers} DESTINATION ${SALOME_INSTALL_HEADERS}/workloadmanager) IF(SALOME_BUILD_TESTS) ADD_SUBDIRECTORY(Test) diff --git a/src/workloadmanager/DefaultAlgorithm.cxx b/src/workloadmanager/DefaultAlgorithm.cxx index d25d18f46..96e48b4f6 100644 --- a/src/workloadmanager/DefaultAlgorithm.cxx +++ b/src/workloadmanager/DefaultAlgorithm.cxx @@ -20,21 +20,22 @@ #include "Task.hxx" #include #include +#include namespace WorkloadManager { void DefaultAlgorithm::addTask(Task* t) { // put the tasks which need more cores in front. - float newNeedCores = t->type()->neededCores; + float newNeedCores = t->type().neededCores; if(_waitingTasks.empty()) _waitingTasks.push_back(t); - else if(_waitingTasks.back()->type()->neededCores >= newNeedCores) + else if(_waitingTasks.back()->type().neededCores >= newNeedCores) _waitingTasks.push_back(t); else { std::list::iterator it = _waitingTasks.begin(); - while(it != _waitingTasks.end() && (*it)->type()->neededCores >= newNeedCores) + while(it != _waitingTasks.end() && (*it)->type().neededCores >= newNeedCores) it++; _waitingTasks.insert(it, t); } @@ -45,13 +46,9 @@ bool DefaultAlgorithm::empty()const return _waitingTasks.empty(); } -void DefaultAlgorithm::addResource(Resource* r) +void DefaultAlgorithm::addResource(const Resource& r) { - // add the resource. The operation is ignored if the resource already exists. - _resources.emplace(std::piecewise_construct, - std::forward_as_tuple(r), - std::forward_as_tuple(r) - ); + _resources.emplace_back(r); } WorkloadAlgorithm::LaunchInfo DefaultAlgorithm::chooseTask() @@ -62,18 +59,19 @@ WorkloadAlgorithm::LaunchInfo DefaultAlgorithm::chooseTask() !result.taskFound && itTask != _waitingTasks.end(); itTask ++) { - const ContainerType* ctype = (*itTask)->type(); - std::map::iterator best_resource; + const ContainerType& ctype = (*itTask)->type(); + std::list::iterator best_resource; best_resource = _resources.end(); float best_cost = std::numeric_limits::max(); + bool isSupported = false; for(auto itResource = _resources.begin(); itResource != _resources.end(); itResource++) - if(itResource->second.isSupported(ctype)) + if(itResource->isSupported(ctype)) { - if(itResource->second.isAllocPossible(ctype)) + if(itResource->isAllocPossible(ctype)) { - float thisCost = itResource->second.cost(ctype); + float thisCost = itResource->cost(ctype); if( best_cost > thisCost) { best_cost = thisCost; @@ -86,9 +84,13 @@ WorkloadAlgorithm::LaunchInfo DefaultAlgorithm::chooseTask() chosenTaskIt = itTask; result.task = (*itTask); result.taskFound = true; - result.worker.resource = best_resource->first; + result.worker.resource = best_resource->resource(); result.worker.type = ctype; - result.worker.index = best_resource->second.alloc(ctype); + result.worker.index = best_resource->alloc(ctype); + } + else if(!isSupported) + { + // TODO: This task can never be run by any available resource. } } if(result.taskFound) @@ -98,17 +100,19 @@ WorkloadAlgorithm::LaunchInfo DefaultAlgorithm::chooseTask() void DefaultAlgorithm::liberate(const LaunchInfo& info) { - const Resource* r = info.worker.resource; + const Resource& r = info.worker.resource; unsigned int index = info.worker.index; - const ContainerType* ctype = info.worker.type; - std::map::iterator it = _resources.find(r); - it->second.free(ctype, index); + const ContainerType& ctype = info.worker.type; + std::list::iterator it = std::find(_resources.begin(), + _resources.end(), + r); + it->free(ctype, index); // we are sure to find it } // ResourceInfoForContainer DefaultAlgorithm::ResourceInfoForContainer::ResourceInfoForContainer - (const Resource * r, const ContainerType* ctype) + (const Resource& r, const ContainerType& ctype) : _ctype(ctype) , _resource(r) , _runningContainers() @@ -118,7 +122,7 @@ DefaultAlgorithm::ResourceInfoForContainer::ResourceInfoForContainer unsigned int DefaultAlgorithm::ResourceInfoForContainer::maxContainers()const { - return float(_resource->nbCores) / _ctype->neededCores; + return float(_resource.nbCores) / _ctype.neededCores; } unsigned int DefaultAlgorithm::ResourceInfoForContainer::alloc() @@ -151,7 +155,7 @@ bool DefaultAlgorithm::ResourceInfoForContainer::isContainerRunning // ResourceLoadInfo -DefaultAlgorithm::ResourceLoadInfo::ResourceLoadInfo(const Resource * r) +DefaultAlgorithm::ResourceLoadInfo::ResourceLoadInfo(const Resource& r) : _resource(r) , _load(0.0) , _ctypes() @@ -159,47 +163,48 @@ DefaultAlgorithm::ResourceLoadInfo::ResourceLoadInfo(const Resource * r) } bool DefaultAlgorithm::ResourceLoadInfo::isSupported - (const ContainerType* ctype)const + (const ContainerType& ctype)const { - return ctype->neededCores <= _resource->nbCores ; + return ctype.neededCores <= _resource.nbCores ; } bool DefaultAlgorithm::ResourceLoadInfo::isAllocPossible - (const ContainerType* ctype)const + (const ContainerType& ctype)const { - return ctype->neededCores + _load <= _resource->nbCores; + return ctype.neededCores + _load <= _resource.nbCores; } float DefaultAlgorithm::ResourceLoadInfo::cost - (const ContainerType* ctype)const + (const ContainerType& ctype)const { - return _load * 100.0 / float(_resource->nbCores); + return _load * 100.0 / float(_resource.nbCores); } unsigned int DefaultAlgorithm::ResourceLoadInfo::alloc - (const ContainerType* ctype) + (const ContainerType& ctype) { - std::map::iterator it; - it = _ctypes.find(ctype); + std::list::iterator it = std::find(_ctypes.begin(), + _ctypes.end(), + ctype); + // add the type if not found if(it == _ctypes.end()) { - // add the type if not found - it = _ctypes.emplace(std::piecewise_construct, - std::forward_as_tuple(ctype), - std::forward_as_tuple(_resource, ctype) - ).first; + _ctypes.emplace_back(_resource, ctype); + it = _ctypes.end(); + it--; } - _load += ctype->neededCores; - return it->second.alloc(); + _load += ctype.neededCores; + return it->alloc(); } void DefaultAlgorithm::ResourceLoadInfo::free - (const ContainerType* ctype, int index) + (const ContainerType& ctype, int index) { - _load -= ctype->neededCores; - std::map::iterator it; - it = _ctypes.find(ctype); - it->second.free(index); + _load -= ctype.neededCores; + std::list::iterator it = std::find(_ctypes.begin(), + _ctypes.end(), + ctype); + it->free(index); } } diff --git a/src/workloadmanager/DefaultAlgorithm.hxx b/src/workloadmanager/DefaultAlgorithm.hxx index d30f50499..2e58c66c5 100644 --- a/src/workloadmanager/DefaultAlgorithm.hxx +++ b/src/workloadmanager/DefaultAlgorithm.hxx @@ -33,7 +33,7 @@ class DefaultAlgorithm : public WorkloadAlgorithm { public: void addTask(Task* t)override; - void addResource(Resource* r)override; + void addResource(const Resource& r)override; LaunchInfo chooseTask()override; void liberate(const LaunchInfo& info)override; bool empty()const override; @@ -43,36 +43,46 @@ private: class ResourceInfoForContainer { public: - ResourceInfoForContainer(const Resource * r, const ContainerType* ctype); + ResourceInfoForContainer(const Resource& r, const ContainerType& ctype); unsigned int maxContainers()const; unsigned int alloc(); void free(unsigned int index); unsigned int nbRunningContainers()const; bool isContainerRunning(unsigned int index)const; + bool operator<(const ResourceInfoForContainer& other)const + { return _ctype < other._ctype;} + bool operator==(const ContainerType& other)const + { return _ctype == other;} + const ContainerType& type()const { return _ctype;} private: - const ContainerType* _ctype; - const Resource* _resource; - std::set _runningContainers; + ContainerType _ctype; + const Resource& _resource; // same ref as ResourceLoadInfo + std::set _runningContainers; // 0 to max possible containers on this resource unsigned int _firstFreeContainer; }; class ResourceLoadInfo { public: - ResourceLoadInfo(const Resource * r); - bool isSupported(const ContainerType* ctype)const; - bool isAllocPossible(const ContainerType* ctype)const; - float cost(const ContainerType* ctype)const; - unsigned int alloc(const ContainerType* ctype); - void free(const ContainerType* ctype, int index); + ResourceLoadInfo(const Resource& r); + bool isSupported(const ContainerType& ctype)const; + bool isAllocPossible(const ContainerType& ctype)const; + float cost(const ContainerType& ctype)const; + unsigned int alloc(const ContainerType& ctype); + void free(const ContainerType& ctype, int index); + bool operator<(const ResourceLoadInfo& other)const + { return _resource < other._resource;} + bool operator==(const Resource& other)const + { return _resource == other;} + const Resource& resource()const { return _resource;} private: - const Resource* _resource; + Resource _resource; float _load; - std::map _ctypes; + std::list _ctypes; }; private: - std::map _resources; + std::list _resources; std::list _waitingTasks; }; } diff --git a/src/workloadmanager/Task.hxx b/src/workloadmanager/Task.hxx index bdada0b90..3dd7c93d0 100644 --- a/src/workloadmanager/Task.hxx +++ b/src/workloadmanager/Task.hxx @@ -25,24 +25,38 @@ namespace WorkloadManager { struct ContainerType { - float neededCores; // needed by WorkloadManager - // parameters for client use, not needed by WorkloadManager: + float neededCores = 0.0; // needed by WorkloadManager + // parameters for client use, used by WorkloadManager to distinguish objects std::string name; - int id; + int id = 0; + bool operator==(const ContainerType& other)const + { return id == other.id && name == other.name;} + bool operator<(const ContainerType& other)const + { + return (id < other.id) || + (id == other.id && name < other.name); + } }; struct Resource { unsigned int nbCores; // needed by WorkloadManager - // parameters for client use, not needed by WorkloadManager: + // parameters for client use, used by WorkloadManager to distinguish objects std::string name; int id; + bool operator==(const Resource& other)const + { return id == other.id && name == other.name;} + bool operator<(const Resource& other)const + { + return (id < other.id) || + (id == other.id && name < other.name); + } }; struct Container { - const ContainerType* type; - const Resource* resource; + ContainerType type; + Resource resource; unsigned int index; // worker index on the resource for this type }; @@ -52,7 +66,8 @@ namespace WorkloadManager class Task { public: - virtual const ContainerType* type()const =0; + virtual ~Task(){}; + virtual const ContainerType& type()const =0; virtual void run(const Container& c)=0; }; } diff --git a/src/workloadmanager/Test/TestMain.cxx b/src/workloadmanager/Test/TestMain.cxx index b3b53c9d9..a55095951 100644 --- a/src/workloadmanager/Test/TestMain.cxx +++ b/src/workloadmanager/Test/TestMain.cxx @@ -15,14 +15,8 @@ // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA // // -#include -#include -#include -#include -#include -#include -#include -#include + +#include #include #include @@ -83,12 +77,12 @@ private: class MyTask : public WorkloadManager::Task { public: - const WorkloadManager::ContainerType* type()const override {return _type;} + const WorkloadManager::ContainerType& type()const override {return *_type;} void run(const WorkloadManager::Container& c)override { _check->check(c, this); - DEBUG_LOG("Running task ", _id, " on ", c.resource->name, "-", c.type->name, + DEBUG_LOG("Running task ", _id, " on ", c.resource.name, "-", c.type.name, "-", c.index); std::this_thread::sleep_for(std::chrono::seconds(_sleep)); DEBUG_LOG("Finish task ", _id); @@ -141,7 +135,7 @@ void Checker::check(const WorkloadManager::Container& c, MyTask* t) { std::unique_lock lock(_mutex); - int& max = _maxContainersForResource[c.resource->id][c.type->id]; + int& max = _maxContainersForResource[c.resource.id][c.type.id]; if( max < c.index) max = c.index; } @@ -155,11 +149,13 @@ void Checker::globalCheck() for(std::size_t j=0; j < size_T; j++) { int max = _maxContainersForResource[i][j]; - DEBUG_LOG(resources[i].name, ", ", types[j].name, ":", max+1); + DEBUG_LOG(resources[i].name, ", ", types[j].name, + " max simultaneous runs:", max+1); CPPUNIT_ASSERT( (max+1) * types[j].neededCores <= resources[i].nbCores ); global_max += types[j].neededCores * float(max+1); } - DEBUG_LOG(resources[i].name, " global: ", global_max); + DEBUG_LOG(resources[i].name, " max cores added for evry type: ", global_max); + // This assertion may be false if there are more resources than needed. CPPUNIT_ASSERT(global_max >= resources[i].nbCores); // cores fully used } } @@ -184,12 +180,13 @@ public: void MyTest::atest() { constexpr std::size_t resourcesNumber = 2; - constexpr std::size_t typesNumber = 2; + constexpr std::size_t typesNumber = 3; Checker check; check.resources[0].nbCores = 10; check.resources[1].nbCores = 18; check.types[0].neededCores = 4.0; check.types[1].neededCores = 1.0; + check.types[2].neededCores = 0.0; // tasks to be run with no cost for(std::size_t i=0; i < resourcesNumber; i ++) DEBUG_LOG(check.resources[i].name, " has ", check.resources[i].nbCores, @@ -198,22 +195,25 @@ void MyTest::atest() DEBUG_LOG(check.types[i].name, " needs ", check.types[i].neededCores, " cores."); - constexpr std::size_t tasksNumber = 100; + constexpr std::size_t tasksNumber = 150; MyTask tasks[tasksNumber]; - for(std::size_t i = 0; i < tasksNumber / 2; i++) - tasks[i].reset(i, &check.types[0], 2, &check); - for(std::size_t i = tasksNumber / 2; i < tasksNumber; i++) - tasks[i].reset(i, &check.types[1], 1, &check); + for(int type_id = 0; type_id < typesNumber; type_id++) + for(int j = type_id * tasksNumber / typesNumber; + j < (type_id + 1) * tasksNumber / typesNumber; + j++) + // id, ContainerType, sleep (1|2s) + tasks[j].reset(j, &check.types[type_id], 2-type_id%2, &check); DEBUG_LOG("Number of tasks: ", tasksNumber); - DEBUG_LOG("Tasks from 0 to ", tasksNumber/2, " are ", tasks[0].type()->name); - DEBUG_LOG("Tasks from ", tasksNumber/2, " to ", tasksNumber, " are ", - tasks[tasksNumber / 2].type()->name); + for(int type_id = 0; type_id < typesNumber; type_id++) + DEBUG_LOG("Tasks from ", type_id * tasksNumber / typesNumber, + " to ", (type_id + 1) * tasksNumber / typesNumber, + " are of type ", check.types[type_id].name); WorkloadManager::DefaultAlgorithm algo; WorkloadManager::WorkloadManager wlm(algo); for(std::size_t i=0; i < resourcesNumber; i ++) - wlm.addResource(&check.resources[i]); + wlm.addResource(check.resources[i]); // Add 4 core tasks first check.reset(); diff --git a/src/workloadmanager/WorkloadAlgorithm.hxx b/src/workloadmanager/WorkloadAlgorithm.hxx index bad0dcf39..064747faf 100644 --- a/src/workloadmanager/WorkloadAlgorithm.hxx +++ b/src/workloadmanager/WorkloadAlgorithm.hxx @@ -34,7 +34,7 @@ public: }; virtual void addTask(Task* t)=0; - virtual void addResource(Resource* r)=0; + virtual void addResource(const Resource& r)=0; virtual LaunchInfo chooseTask()=0; virtual void liberate(const LaunchInfo& info)=0; virtual bool empty()const =0; diff --git a/src/workloadmanager/WorkloadManager.cxx b/src/workloadmanager/WorkloadManager.cxx index 5d9f4e3e8..f3896ca52 100644 --- a/src/workloadmanager/WorkloadManager.cxx +++ b/src/workloadmanager/WorkloadManager.cxx @@ -40,7 +40,7 @@ namespace WorkloadManager stop(); } - void WorkloadManager::addResource(Resource* r) + void WorkloadManager::addResource(const Resource& r) { std::unique_lock lock(_data_mutex); _algo.addResource(r); @@ -78,8 +78,8 @@ namespace WorkloadManager } _startCondition.notify_one(); _endCondition.notify_one(); - for(std::future& th : _otherThreads) - th.wait(); + for(std::future& th : _otherThreads) + th.wait(); } void WorkloadManager::runTasks() @@ -88,7 +88,7 @@ namespace WorkloadManager while(!threadStop) { std::unique_lock lock(_data_mutex); - _startCondition.wait(lock, [this] {return !_algo.empty();}); + _startCondition.wait(lock, [this] {return !_algo.empty() || _stop;}); RunningInfo taskInfo; while(chooseTaskToRun(taskInfo)) { @@ -118,7 +118,11 @@ namespace WorkloadManager while(!threadStop) { std::unique_lock lock(_data_mutex); - _endCondition.wait(lock, [this] {return !_finishedTasks.empty();}); + _endCondition.wait(lock, [this] + { + return !_finishedTasks.empty() || + (_stop && _runningTasks.empty() && _algo.empty()); + }); while(!_finishedTasks.empty()) { RunningInfo taskInfo = _finishedTasks.front(); @@ -136,8 +140,9 @@ namespace WorkloadManager { // We are already under the lock taskInfo.id = _nextIndex; - _nextIndex ++; taskInfo.info = _algo.chooseTask(); + if(taskInfo.info.taskFound) + _nextIndex ++; return taskInfo.info.taskFound; } diff --git a/src/workloadmanager/WorkloadManager.hxx b/src/workloadmanager/WorkloadManager.hxx index 264b58920..8c033b956 100644 --- a/src/workloadmanager/WorkloadManager.hxx +++ b/src/workloadmanager/WorkloadManager.hxx @@ -33,9 +33,11 @@ namespace WorkloadManager { public: WorkloadManager(WorkloadAlgorithm& algo); + WorkloadManager(const WorkloadManager&) = delete; + WorkloadManager()=delete; ~WorkloadManager(); void addTask(Task* t); - void addResource(Resource* r); + void addResource(const Resource& r); void start(); //! start execution void stop(); //! stop execution @@ -53,7 +55,7 @@ namespace WorkloadManager std::condition_variable _startCondition; // start tasks thread notification std::condition_variable _endCondition; // end tasks thread notification bool _stop; - std::list< std::future > _otherThreads; + std::vector< std::future > _otherThreads; WorkloadAlgorithm& _algo; void runTasks(); -- 2.39.2