From 5e734a1a316c3bb3b4a17e73f5e54dd431e5c855 Mon Sep 17 00:00:00 2001 From: Ovidiu Mircescu Date: Mon, 6 Apr 2020 11:48:09 +0200 Subject: [PATCH] Include sources of workloadmanager. --- src/CMakeLists.txt | 2 +- src/CTestTestfileInstall.cmake | 1 + src/workloadmanager/CMakeLists.txt | 45 +++ src/workloadmanager/DefaultAlgorithm.cxx | 205 +++++++++++++ src/workloadmanager/DefaultAlgorithm.hxx | 79 +++++ src/workloadmanager/Task.cxx | 19 ++ src/workloadmanager/Task.hxx | 60 ++++ src/workloadmanager/Test/CMakeLists.txt | 35 +++ .../Test/CTestTestfileInstall.cmake | 27 ++ src/workloadmanager/Test/TestMain.cxx | 269 ++++++++++++++++++ src/workloadmanager/WorkloadAlgorithm.hxx | 43 +++ src/workloadmanager/WorkloadManager.cxx | 144 ++++++++++ src/workloadmanager/WorkloadManager.hxx | 66 +++++ 13 files changed, 994 insertions(+), 1 deletion(-) create mode 100644 src/workloadmanager/CMakeLists.txt create mode 100644 src/workloadmanager/DefaultAlgorithm.cxx create mode 100644 src/workloadmanager/DefaultAlgorithm.hxx create mode 100644 src/workloadmanager/Task.cxx create mode 100644 src/workloadmanager/Task.hxx create mode 100644 src/workloadmanager/Test/CMakeLists.txt create mode 100644 src/workloadmanager/Test/CTestTestfileInstall.cmake create mode 100644 src/workloadmanager/Test/TestMain.cxx create mode 100644 src/workloadmanager/WorkloadAlgorithm.hxx create mode 100644 src/workloadmanager/WorkloadManager.cxx create mode 100644 src/workloadmanager/WorkloadManager.hxx diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index f981b3f6c..2ac3ab5be 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -21,7 +21,7 @@ # Common packages ## SET(SUBDIRS_COMMON - bases engine wrappergen yacsorb salomeloader + bases engine wrappergen yacsorb salomeloader workloadmanager pmml ) diff --git a/src/CTestTestfileInstall.cmake b/src/CTestTestfileInstall.cmake index 352cf9ee6..0738813ec 100644 --- a/src/CTestTestfileInstall.cmake +++ b/src/CTestTestfileInstall.cmake @@ -35,4 +35,5 @@ SUBDIRS( yacsloader yacsloader_swig py2yacs + workloadmanager ) diff --git a/src/workloadmanager/CMakeLists.txt b/src/workloadmanager/CMakeLists.txt new file mode 100644 index 000000000..654ef6f32 --- /dev/null +++ b/src/workloadmanager/CMakeLists.txt @@ -0,0 +1,45 @@ +# Copyright (C) 2020 CEA/DEN, EDF R&D +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com +# + +SET (_link_LIBRARIES + #Threads::Threads + ${PTHREAD_LIBRARIES} +) + +SET (_wlm_sources + Task.cxx + WorkloadManager.cxx + DefaultAlgorithm.cxx +) + +SET (_wlm_headers + Task.hxx + WorkloadManager.hxx + WorkloadAlgorithm.hxx + DefaultAlgorithm.hxx +) + +ADD_LIBRARY(YACSlibWorkloadmanager ${_wlm_sources}) +TARGET_LINK_LIBRARIES(YACSlibWorkloadmanager ${_link_LIBRARIES}) +INSTALL(TARGETS YACSlibWorkloadmanager EXPORT ${PROJECT_NAME}TargetGroup DESTINATION ${SALOME_INSTALL_LIBS}) +INSTALL(FILES ${_wlm_headers} DESTINATION ${SALOME_INSTALL_HEADERS}) + +IF(SALOME_BUILD_TESTS) + ADD_SUBDIRECTORY(Test) +ENDIF(SALOME_BUILD_TESTS) diff --git a/src/workloadmanager/DefaultAlgorithm.cxx b/src/workloadmanager/DefaultAlgorithm.cxx new file mode 100644 index 000000000..d25d18f46 --- /dev/null +++ b/src/workloadmanager/DefaultAlgorithm.cxx @@ -0,0 +1,205 @@ +// Copyright (C) 2020 CEA/DEN, EDF R&D +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Lesser General Public +// License as published by the Free Software Foundation; either +// version 2.1 of the License, or (at your option) any later version. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public +// License along with this library; if not, write to the Free Software +// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +// +// See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com +// +#include "DefaultAlgorithm.hxx" +#include "Task.hxx" +#include +#include + +namespace WorkloadManager +{ +void DefaultAlgorithm::addTask(Task* t) +{ + // put the tasks which need more cores in front. + float newNeedCores = t->type()->neededCores; + if(_waitingTasks.empty()) + _waitingTasks.push_back(t); + else if(_waitingTasks.back()->type()->neededCores >= newNeedCores) + _waitingTasks.push_back(t); + else + { + std::list::iterator it = _waitingTasks.begin(); + while(it != _waitingTasks.end() && (*it)->type()->neededCores >= newNeedCores) + it++; + _waitingTasks.insert(it, t); + } +} + +bool DefaultAlgorithm::empty()const +{ + return _waitingTasks.empty(); +} + +void DefaultAlgorithm::addResource(Resource* r) +{ + // add the resource. The operation is ignored if the resource already exists. + _resources.emplace(std::piecewise_construct, + std::forward_as_tuple(r), + std::forward_as_tuple(r) + ); +} + +WorkloadAlgorithm::LaunchInfo DefaultAlgorithm::chooseTask() +{ + LaunchInfo result; + std::list::iterator chosenTaskIt; + for( std::list::iterator itTask = _waitingTasks.begin(); + !result.taskFound && itTask != _waitingTasks.end(); + itTask ++) + { + const ContainerType* ctype = (*itTask)->type(); + std::map::iterator best_resource; + best_resource = _resources.end(); + float best_cost = std::numeric_limits::max(); + for(auto itResource = _resources.begin(); + itResource != _resources.end(); + itResource++) + if(itResource->second.isSupported(ctype)) + { + if(itResource->second.isAllocPossible(ctype)) + { + float thisCost = itResource->second.cost(ctype); + if( best_cost > thisCost) + { + best_cost = thisCost; + best_resource = itResource; + } + } + } + if(best_resource != _resources.end()) + { + chosenTaskIt = itTask; + result.task = (*itTask); + result.taskFound = true; + result.worker.resource = best_resource->first; + result.worker.type = ctype; + result.worker.index = best_resource->second.alloc(ctype); + } + } + if(result.taskFound) + _waitingTasks.erase(chosenTaskIt); + return result; +} + +void DefaultAlgorithm::liberate(const LaunchInfo& info) +{ + const Resource* r = info.worker.resource; + unsigned int index = info.worker.index; + const ContainerType* ctype = info.worker.type; + std::map::iterator it = _resources.find(r); + it->second.free(ctype, index); +} + +// ResourceInfoForContainer + +DefaultAlgorithm::ResourceInfoForContainer::ResourceInfoForContainer + (const Resource * r, const ContainerType* ctype) +: _ctype(ctype) +, _resource(r) +, _runningContainers() +, _firstFreeContainer(0) +{ +} + +unsigned int DefaultAlgorithm::ResourceInfoForContainer::maxContainers()const +{ + return float(_resource->nbCores) / _ctype->neededCores; +} + +unsigned int DefaultAlgorithm::ResourceInfoForContainer::alloc() +{ + unsigned int result = _firstFreeContainer; + _runningContainers.insert(result); + _firstFreeContainer++; + while(isContainerRunning(_firstFreeContainer)) + _firstFreeContainer++; + return result; +} + +void DefaultAlgorithm::ResourceInfoForContainer::free(unsigned int index) +{ + _runningContainers.erase(index); + if(index < _firstFreeContainer) + _firstFreeContainer = index; +} + +unsigned int DefaultAlgorithm::ResourceInfoForContainer::nbRunningContainers()const +{ + return _runningContainers.size(); +} + +bool DefaultAlgorithm::ResourceInfoForContainer::isContainerRunning + (unsigned int index)const +{ + return _runningContainers.find(index)!=_runningContainers.end(); +} + +// ResourceLoadInfo + +DefaultAlgorithm::ResourceLoadInfo::ResourceLoadInfo(const Resource * r) +: _resource(r) +, _load(0.0) +, _ctypes() +{ +} + +bool DefaultAlgorithm::ResourceLoadInfo::isSupported + (const ContainerType* ctype)const +{ + return ctype->neededCores <= _resource->nbCores ; +} + +bool DefaultAlgorithm::ResourceLoadInfo::isAllocPossible + (const ContainerType* ctype)const +{ + return ctype->neededCores + _load <= _resource->nbCores; +} + +float DefaultAlgorithm::ResourceLoadInfo::cost + (const ContainerType* ctype)const +{ + return _load * 100.0 / float(_resource->nbCores); +} + +unsigned int DefaultAlgorithm::ResourceLoadInfo::alloc + (const ContainerType* ctype) +{ + std::map::iterator it; + it = _ctypes.find(ctype); + if(it == _ctypes.end()) + { + // add the type if not found + it = _ctypes.emplace(std::piecewise_construct, + std::forward_as_tuple(ctype), + std::forward_as_tuple(_resource, ctype) + ).first; + } + _load += ctype->neededCores; + return it->second.alloc(); +} + +void DefaultAlgorithm::ResourceLoadInfo::free + (const ContainerType* ctype, int index) +{ + _load -= ctype->neededCores; + std::map::iterator it; + it = _ctypes.find(ctype); + it->second.free(index); +} + +} diff --git a/src/workloadmanager/DefaultAlgorithm.hxx b/src/workloadmanager/DefaultAlgorithm.hxx new file mode 100644 index 000000000..d30f50499 --- /dev/null +++ b/src/workloadmanager/DefaultAlgorithm.hxx @@ -0,0 +1,79 @@ +// Copyright (C) 2020 CEA/DEN, EDF R&D +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Lesser General Public +// License as published by the Free Software Foundation; either +// version 2.1 of the License, or (at your option) any later version. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public +// License along with this library; if not, write to the Free Software +// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +// +// See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com +// +#ifndef ALGORITHMIMPLEMENT_H +#define ALGORITHMIMPLEMENT_H + +#include "WorkloadAlgorithm.hxx" +#include +#include +#include + +namespace WorkloadManager +{ +/** + * @todo write docs + */ +class DefaultAlgorithm : public WorkloadAlgorithm +{ +public: + void addTask(Task* t)override; + void addResource(Resource* r)override; + LaunchInfo chooseTask()override; + void liberate(const LaunchInfo& info)override; + bool empty()const override; + +// ----------------------------- PRIVATE ----------------------------- // +private: + class ResourceInfoForContainer + { + public: + ResourceInfoForContainer(const Resource * r, const ContainerType* ctype); + unsigned int maxContainers()const; + unsigned int alloc(); + void free(unsigned int index); + unsigned int nbRunningContainers()const; + bool isContainerRunning(unsigned int index)const; + private: + const ContainerType* _ctype; + const Resource* _resource; + std::set _runningContainers; + unsigned int _firstFreeContainer; + }; + + class ResourceLoadInfo + { + public: + ResourceLoadInfo(const Resource * r); + bool isSupported(const ContainerType* ctype)const; + bool isAllocPossible(const ContainerType* ctype)const; + float cost(const ContainerType* ctype)const; + unsigned int alloc(const ContainerType* ctype); + void free(const ContainerType* ctype, int index); + private: + const Resource* _resource; + float _load; + std::map _ctypes; + }; + +private: + std::map _resources; + std::list _waitingTasks; +}; +} +#endif // ALGORITHMIMPLEMENT_H diff --git a/src/workloadmanager/Task.cxx b/src/workloadmanager/Task.cxx new file mode 100644 index 000000000..1d803a85a --- /dev/null +++ b/src/workloadmanager/Task.cxx @@ -0,0 +1,19 @@ +// Copyright (C) 2020 CEA/DEN, EDF R&D +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Lesser General Public +// License as published by the Free Software Foundation; either +// version 2.1 of the License, or (at your option) any later version. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public +// License along with this library; if not, write to the Free Software +// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +// +// See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com +// +#include "Task.hxx" diff --git a/src/workloadmanager/Task.hxx b/src/workloadmanager/Task.hxx new file mode 100644 index 000000000..bdada0b90 --- /dev/null +++ b/src/workloadmanager/Task.hxx @@ -0,0 +1,60 @@ +// Copyright (C) 2020 CEA/DEN, EDF R&D +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Lesser General Public +// License as published by the Free Software Foundation; either +// version 2.1 of the License, or (at your option) any later version. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public +// License along with this library; if not, write to the Free Software +// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +// +// See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com +// +#ifndef _TASK_H_ +#define _TASK_H_ + +#include + +namespace WorkloadManager +{ + struct ContainerType + { + float neededCores; // needed by WorkloadManager + // parameters for client use, not needed by WorkloadManager: + std::string name; + int id; + }; + + struct Resource + { + unsigned int nbCores; // needed by WorkloadManager + // parameters for client use, not needed by WorkloadManager: + std::string name; + int id; + }; + + struct Container + { + const ContainerType* type; + const Resource* resource; + unsigned int index; // worker index on the resource for this type + }; + + /** + * @todo write docs + */ + class Task + { + public: + virtual const ContainerType* type()const =0; + virtual void run(const Container& c)=0; + }; +} + +#endif // _TASK_H_ diff --git a/src/workloadmanager/Test/CMakeLists.txt b/src/workloadmanager/Test/CMakeLists.txt new file mode 100644 index 000000000..805691f30 --- /dev/null +++ b/src/workloadmanager/Test/CMakeLists.txt @@ -0,0 +1,35 @@ +# Copyright (C) 2020 CEA/DEN, EDF R&D +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com +# + +SET(_link_LIBRARIES + ${CPPUNIT_LIBRARIES} + ${PTHREAD_LIBRARIES} + YACSlibWorkloadmanager +) + +ADD_EXECUTABLE(WorkloadManagerTest TestMain.cxx) +TARGET_LINK_LIBRARIES(WorkloadManagerTest ${_link_LIBRARIES}) +ADD_TEST(WorkloadManager WorkloadManagerTest) + +# For salome test +SET(LOCAL_TEST_DIR ${SALOME_YACS_INSTALL_TEST}/workloadmanager) +INSTALL(TARGETS WorkloadManagerTest DESTINATION ${LOCAL_TEST_DIR}) +INSTALL(FILES CTestTestfileInstall.cmake + DESTINATION ${LOCAL_TEST_DIR} + RENAME CTestTestfile.cmake) diff --git a/src/workloadmanager/Test/CTestTestfileInstall.cmake b/src/workloadmanager/Test/CTestTestfileInstall.cmake new file mode 100644 index 000000000..fb9364c83 --- /dev/null +++ b/src/workloadmanager/Test/CTestTestfileInstall.cmake @@ -0,0 +1,27 @@ +# Copyright (C) 2020 CEA/DEN, EDF R&D +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com +# + +IF(NOT WIN32) + SET(TEST_NAME ${COMPONENT_NAME}_WorkloadManager) + ADD_TEST(${TEST_NAME} WorkloadManagerTest) + SET_TESTS_PROPERTIES(${TEST_NAME} PROPERTIES + LABELS "${COMPONENT_NAME}" + ENVIRONMENT "LD_LIBRARY_PATH=${YACS_TEST_LIB}:$ENV{LD_LIBRARY_PATH}" + ) +ENDIF() diff --git a/src/workloadmanager/Test/TestMain.cxx b/src/workloadmanager/Test/TestMain.cxx new file mode 100644 index 000000000..b3b53c9d9 --- /dev/null +++ b/src/workloadmanager/Test/TestMain.cxx @@ -0,0 +1,269 @@ +// Copyright (C) 2020 EDF R&D +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Lesser General Public +// License as published by the Free Software Foundation; either +// version 2.1 of the License, or (at your option) any later version. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public +// License along with this library; if not, write to the Free Software +// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +// +// +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include +#include + +#include +#include +#include + +#include "../WorkloadManager.hxx" +#include "../DefaultAlgorithm.hxx" + +constexpr bool ACTIVATE_DEBUG_LOG = true; +template +void DEBUG_LOG(Ts... args) +{ + if(! ACTIVATE_DEBUG_LOG) + return; + if(sizeof...(Ts) == 0) + return; + std::ostringstream message; + // TODO: C++17 solution: ( (message << args), ...); + // since initializer lists guarantee sequencing, this can be used to + // call a function on each element of a pack, in order: + int dummy[] = { (message << args, 0)...}; + message << std::endl; + std::cerr << message.str(); +} + +class MyTask; +class AbstractChecker +{ +public: + virtual void check(const WorkloadManager::Container& c, MyTask* t)=0; +}; + +template +class Checker : public AbstractChecker +{ +public: + Checker(); + void check(const WorkloadManager::Container& c, MyTask* t)override; + void globalCheck(); + void reset(); + + WorkloadManager::Resource resources[size_R]; + WorkloadManager::ContainerType types[size_T]; +private: + std::mutex _mutex; + int _maxContainersForResource[size_R][size_T]; +}; + +class MyTask : public WorkloadManager::Task +{ +public: + const WorkloadManager::ContainerType* type()const override {return _type;} + void run(const WorkloadManager::Container& c)override + { + _check->check(c, this); + + DEBUG_LOG("Running task ", _id, " on ", c.resource->name, "-", c.type->name, + "-", c.index); + std::this_thread::sleep_for(std::chrono::seconds(_sleep)); + DEBUG_LOG("Finish task ", _id); + } + + void reset(int id, + const WorkloadManager::ContainerType* type, + int sleep, + AbstractChecker * check + ) + { + _id = id; + _type = type; + _sleep = sleep; + _check = check; + } +private: + int _id = 0; + const WorkloadManager::ContainerType* _type = nullptr; + int _sleep = 0; + AbstractChecker * _check; +}; + +template +Checker::Checker() +{ + for(std::size_t i=0; i < size_R; i ++) + { + resources[i].id = i; + std::ostringstream name; + name << "r" << i; + resources[i].name = name.str(); + } + + for(std::size_t i=0; i < size_T; i ++) + { + types[i].id = i; + std::ostringstream name; + name << "t" << i; + types[i].name = name.str(); + } + + for(std::size_t i=0; i < size_R; i++) + for(std::size_t j=0; j < size_T; j++) + _maxContainersForResource[i][j] = 0; +} + +template +void Checker::check(const WorkloadManager::Container& c, + MyTask* t) +{ + std::unique_lock lock(_mutex); + int& max = _maxContainersForResource[c.resource->id][c.type->id]; + if( max < c.index) + max = c.index; +} + +template +void Checker::globalCheck() +{ + for(std::size_t i=0; i < size_R; i++) + { + float global_max = 0; + for(std::size_t j=0; j < size_T; j++) + { + int max = _maxContainersForResource[i][j]; + DEBUG_LOG(resources[i].name, ", ", types[j].name, ":", max+1); + CPPUNIT_ASSERT( (max+1) * types[j].neededCores <= resources[i].nbCores ); + global_max += types[j].neededCores * float(max+1); + } + DEBUG_LOG(resources[i].name, " global: ", global_max); + CPPUNIT_ASSERT(global_max >= resources[i].nbCores); // cores fully used + } +} + +template +void Checker::reset() +{ + for(std::size_t i=0; i < size_R; i++) + for(std::size_t j=0; j < size_T; j++) + _maxContainersForResource[i][j] = 0; +} + +class MyTest: public CppUnit::TestFixture +{ + CPPUNIT_TEST_SUITE(MyTest); + CPPUNIT_TEST(atest); + CPPUNIT_TEST_SUITE_END(); +public: + void atest(); +}; + +void MyTest::atest() +{ + constexpr std::size_t resourcesNumber = 2; + constexpr std::size_t typesNumber = 2; + Checker check; + check.resources[0].nbCores = 10; + check.resources[1].nbCores = 18; + check.types[0].neededCores = 4.0; + check.types[1].neededCores = 1.0; + + for(std::size_t i=0; i < resourcesNumber; i ++) + DEBUG_LOG(check.resources[i].name, " has ", check.resources[i].nbCores, + " cores."); + for(std::size_t i=0; i < typesNumber; i ++) + DEBUG_LOG(check.types[i].name, " needs ", check.types[i].neededCores, + " cores."); + + constexpr std::size_t tasksNumber = 100; + MyTask tasks[tasksNumber]; + for(std::size_t i = 0; i < tasksNumber / 2; i++) + tasks[i].reset(i, &check.types[0], 2, &check); + for(std::size_t i = tasksNumber / 2; i < tasksNumber; i++) + tasks[i].reset(i, &check.types[1], 1, &check); + + DEBUG_LOG("Number of tasks: ", tasksNumber); + DEBUG_LOG("Tasks from 0 to ", tasksNumber/2, " are ", tasks[0].type()->name); + DEBUG_LOG("Tasks from ", tasksNumber/2, " to ", tasksNumber, " are ", + tasks[tasksNumber / 2].type()->name); + + WorkloadManager::DefaultAlgorithm algo; + WorkloadManager::WorkloadManager wlm(algo); + for(std::size_t i=0; i < resourcesNumber; i ++) + wlm.addResource(&check.resources[i]); + + // Add 4 core tasks first + check.reset(); + for(std::size_t i = 0; i < tasksNumber; i++) + wlm.addTask(&tasks[i]); + std::chrono::steady_clock::time_point start_time; + start_time = std::chrono::steady_clock::now(); + wlm.start(); // tasks can be added before start. + wlm.stop(); + std::chrono::steady_clock::time_point end_time; + end_time = std::chrono::steady_clock::now(); + std::chrono::seconds duration; + duration = std::chrono::duration_cast + (end_time - start_time); + std::chrono::seconds maxExpectedDuration(22); + CPPUNIT_ASSERT( duration < maxExpectedDuration ); + DEBUG_LOG("Test step duration : ", duration.count(), "s"); + check.globalCheck(); + + // Add 1 core tasks first + check.reset(); + // WARNING: std::size_t is always >= 0 + for(int i = tasksNumber-1; i >= 0; i--) + wlm.addTask(&tasks[i]); + start_time = std::chrono::steady_clock::now(); + wlm.start(); // tasks can be added before start. + wlm.stop(); + end_time = std::chrono::steady_clock::now(); + duration = std::chrono::duration_cast + (end_time - start_time); + CPPUNIT_ASSERT( duration < maxExpectedDuration ); + DEBUG_LOG("Test step duration : ", duration.count(), "s"); + check.globalCheck(); + + // Add 1 core tasks first & start before addTask + check.reset(); + start_time = std::chrono::steady_clock::now(); + wlm.start(); + for(int i = tasksNumber-1; i >= 0; i--) + wlm.addTask(&tasks[i]); + wlm.stop(); + end_time = std::chrono::steady_clock::now(); + duration = std::chrono::duration_cast + (end_time - start_time); + CPPUNIT_ASSERT( duration < maxExpectedDuration ); + DEBUG_LOG("Test step duration : ", duration.count(), "s"); + check.globalCheck(); + +} + +CPPUNIT_TEST_SUITE_REGISTRATION(MyTest); + +#include "BasicMainTest.hxx" diff --git a/src/workloadmanager/WorkloadAlgorithm.hxx b/src/workloadmanager/WorkloadAlgorithm.hxx new file mode 100644 index 000000000..bad0dcf39 --- /dev/null +++ b/src/workloadmanager/WorkloadAlgorithm.hxx @@ -0,0 +1,43 @@ +// Copyright (C) 2020 CEA/DEN, EDF R&D +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Lesser General Public +// License as published by the Free Software Foundation; either +// version 2.1 of the License, or (at your option) any later version. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public +// License along with this library; if not, write to the Free Software +// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +// +// See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com +// +#ifndef WORKLOADALGORITHM_H +#define WORKLOADALGORITHM_H + +#include "Task.hxx" + +namespace WorkloadManager +{ +class WorkloadAlgorithm +{ +public: + struct LaunchInfo + { + bool taskFound=false; + Container worker; + Task* task=nullptr; + }; + + virtual void addTask(Task* t)=0; + virtual void addResource(Resource* r)=0; + virtual LaunchInfo chooseTask()=0; + virtual void liberate(const LaunchInfo& info)=0; + virtual bool empty()const =0; +}; +} +#endif // WORKLOADALGORITHM_H diff --git a/src/workloadmanager/WorkloadManager.cxx b/src/workloadmanager/WorkloadManager.cxx new file mode 100644 index 000000000..5d9f4e3e8 --- /dev/null +++ b/src/workloadmanager/WorkloadManager.cxx @@ -0,0 +1,144 @@ +// Copyright (C) 2020 CEA/DEN, EDF R&D +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Lesser General Public +// License as published by the Free Software Foundation; either +// version 2.1 of the License, or (at your option) any later version. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public +// License along with this library; if not, write to the Free Software +// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +// +// See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com +// +#include "WorkloadManager.hxx" +#include "Task.hxx" +//#include "Container.hxx" + +namespace WorkloadManager +{ + WorkloadManager::WorkloadManager(WorkloadAlgorithm& algo) + : _runningTasks() + , _finishedTasks() + , _nextIndex(0) + , _data_mutex() + , _startCondition() + , _endCondition() + , _stop(false) + , _otherThreads() + , _algo(algo) + { + } + + WorkloadManager::~WorkloadManager() + { + stop(); + } + + void WorkloadManager::addResource(Resource* r) + { + std::unique_lock lock(_data_mutex); + _algo.addResource(r); + _startCondition.notify_one(); + } + + void WorkloadManager::addTask(Task* t) + { + std::unique_lock lock(_data_mutex); + _algo.addTask(t); + _startCondition.notify_one(); + } + + void WorkloadManager::start() + { + { + std::unique_lock lock(_data_mutex); + _stop = false; + } + _otherThreads.emplace_back(std::async([this] + { + runTasks(); + })); + _otherThreads.emplace_back(std::async([this] + { + endTasks(); + })); + } + + void WorkloadManager::stop() + { + { + std::unique_lock lock(_data_mutex); + _stop = true; + } + _startCondition.notify_one(); + _endCondition.notify_one(); + for(std::future& th : _otherThreads) + th.wait(); + } + + void WorkloadManager::runTasks() + { + bool threadStop = false; + while(!threadStop) + { + std::unique_lock lock(_data_mutex); + _startCondition.wait(lock, [this] {return !_algo.empty();}); + RunningInfo taskInfo; + while(chooseTaskToRun(taskInfo)) + { + _runningTasks.emplace(taskInfo.id, std::async([this, taskInfo] + { + runOneTask(taskInfo); + })); + } + threadStop = _stop && _algo.empty(); + } + } + + void WorkloadManager::runOneTask(const RunningInfo& taskInfo) + { + taskInfo.info.task->run(taskInfo.info.worker); + + { + std::unique_lock lock(_data_mutex); + _finishedTasks.push(taskInfo); + _endCondition.notify_one(); + } + } + + void WorkloadManager::endTasks() + { + bool threadStop = false; + while(!threadStop) + { + std::unique_lock lock(_data_mutex); + _endCondition.wait(lock, [this] {return !_finishedTasks.empty();}); + while(!_finishedTasks.empty()) + { + RunningInfo taskInfo = _finishedTasks.front(); + _finishedTasks.pop(); + _runningTasks[taskInfo.id].wait(); + _runningTasks.erase(taskInfo.id); + _algo.liberate(taskInfo.info); + } + threadStop = _stop && _runningTasks.empty() && _algo.empty(); + _startCondition.notify_one(); + } + } + + bool WorkloadManager::chooseTaskToRun(RunningInfo& taskInfo) + { + // We are already under the lock + taskInfo.id = _nextIndex; + _nextIndex ++; + taskInfo.info = _algo.chooseTask(); + return taskInfo.info.taskFound; + } + +} diff --git a/src/workloadmanager/WorkloadManager.hxx b/src/workloadmanager/WorkloadManager.hxx new file mode 100644 index 000000000..264b58920 --- /dev/null +++ b/src/workloadmanager/WorkloadManager.hxx @@ -0,0 +1,66 @@ +// Copyright (C) 2020 CEA/DEN, EDF R&D +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Lesser General Public +// License as published by the Free Software Foundation; either +// version 2.1 of the License, or (at your option) any later version. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public +// License along with this library; if not, write to the Free Software +// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +// +// See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com +// +#ifndef WORKLOADMANAGER_H +#define WORKLOADMANAGER_H +#include +#include +#include // notifications +#include +#include +#include +#include "Task.hxx" +#include "WorkloadAlgorithm.hxx" + +namespace WorkloadManager +{ + class WorkloadManager + { + public: + WorkloadManager(WorkloadAlgorithm& algo); + ~WorkloadManager(); + void addTask(Task* t); + void addResource(Resource* r); + void start(); //! start execution + void stop(); //! stop execution + + private: + typedef unsigned long TaskId; + struct RunningInfo + { + TaskId id; + WorkloadAlgorithm::LaunchInfo info; + }; + std::map > _runningTasks; + std::queue _finishedTasks; + TaskId _nextIndex; + std::mutex _data_mutex; + std::condition_variable _startCondition; // start tasks thread notification + std::condition_variable _endCondition; // end tasks thread notification + bool _stop; + std::list< std::future > _otherThreads; + WorkloadAlgorithm& _algo; + + void runTasks(); + void endTasks(); + void runOneTask(const RunningInfo& taskInfo); + // choose a task and block a resource + bool chooseTaskToRun(RunningInfo& taskInfo); + }; +} +#endif // WORKLOADMANAGER_H -- 2.39.2