# Common packages
##
SET(SUBDIRS_COMMON
- bases engine wrappergen yacsorb salomeloader
+ bases engine wrappergen yacsorb salomeloader workloadmanager
pmml
)
yacsloader
yacsloader_swig
py2yacs
+ workloadmanager
)
--- /dev/null
+# Copyright (C) 2020 CEA/DEN, EDF R&D
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+# See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
+#
+
+SET (_link_LIBRARIES
+ #Threads::Threads
+ ${PTHREAD_LIBRARIES}
+)
+
+SET (_wlm_sources
+ Task.cxx
+ WorkloadManager.cxx
+ DefaultAlgorithm.cxx
+)
+
+SET (_wlm_headers
+ Task.hxx
+ WorkloadManager.hxx
+ WorkloadAlgorithm.hxx
+ DefaultAlgorithm.hxx
+)
+
+ADD_LIBRARY(YACSlibWorkloadmanager ${_wlm_sources})
+TARGET_LINK_LIBRARIES(YACSlibWorkloadmanager ${_link_LIBRARIES})
+INSTALL(TARGETS YACSlibWorkloadmanager EXPORT ${PROJECT_NAME}TargetGroup DESTINATION ${SALOME_INSTALL_LIBS})
+INSTALL(FILES ${_wlm_headers} DESTINATION ${SALOME_INSTALL_HEADERS})
+
+IF(SALOME_BUILD_TESTS)
+ ADD_SUBDIRECTORY(Test)
+ENDIF(SALOME_BUILD_TESTS)
--- /dev/null
+// Copyright (C) 2020 CEA/DEN, EDF R&D
+//
+// This library is free software; you can redistribute it and/or
+// modify it under the terms of the GNU Lesser General Public
+// License as published by the Free Software Foundation; either
+// version 2.1 of the License, or (at your option) any later version.
+//
+// This library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public
+// License along with this library; if not, write to the Free Software
+// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+//
+// See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
+//
+#include "DefaultAlgorithm.hxx"
+#include "Task.hxx"
+#include <stdexcept>
+#include <limits>
+
+namespace WorkloadManager
+{
+void DefaultAlgorithm::addTask(Task* t)
+{
+ // put the tasks which need more cores in front.
+ float newNeedCores = t->type()->neededCores;
+ if(_waitingTasks.empty())
+ _waitingTasks.push_back(t);
+ else if(_waitingTasks.back()->type()->neededCores >= newNeedCores)
+ _waitingTasks.push_back(t);
+ else
+ {
+ std::list<Task*>::iterator it = _waitingTasks.begin();
+ while(it != _waitingTasks.end() && (*it)->type()->neededCores >= newNeedCores)
+ it++;
+ _waitingTasks.insert(it, t);
+ }
+}
+
+bool DefaultAlgorithm::empty()const
+{
+ return _waitingTasks.empty();
+}
+
+void DefaultAlgorithm::addResource(Resource* r)
+{
+ // add the resource. The operation is ignored if the resource already exists.
+ _resources.emplace(std::piecewise_construct,
+ std::forward_as_tuple(r),
+ std::forward_as_tuple(r)
+ );
+}
+
+WorkloadAlgorithm::LaunchInfo DefaultAlgorithm::chooseTask()
+{
+ LaunchInfo result;
+ std::list<Task*>::iterator chosenTaskIt;
+ for( std::list<Task*>::iterator itTask = _waitingTasks.begin();
+ !result.taskFound && itTask != _waitingTasks.end();
+ itTask ++)
+ {
+ const ContainerType* ctype = (*itTask)->type();
+ std::map<const Resource *, ResourceLoadInfo>::iterator best_resource;
+ best_resource = _resources.end();
+ float best_cost = std::numeric_limits<float>::max();
+ for(auto itResource = _resources.begin();
+ itResource != _resources.end();
+ itResource++)
+ if(itResource->second.isSupported(ctype))
+ {
+ if(itResource->second.isAllocPossible(ctype))
+ {
+ float thisCost = itResource->second.cost(ctype);
+ if( best_cost > thisCost)
+ {
+ best_cost = thisCost;
+ best_resource = itResource;
+ }
+ }
+ }
+ if(best_resource != _resources.end())
+ {
+ chosenTaskIt = itTask;
+ result.task = (*itTask);
+ result.taskFound = true;
+ result.worker.resource = best_resource->first;
+ result.worker.type = ctype;
+ result.worker.index = best_resource->second.alloc(ctype);
+ }
+ }
+ if(result.taskFound)
+ _waitingTasks.erase(chosenTaskIt);
+ return result;
+}
+
+void DefaultAlgorithm::liberate(const LaunchInfo& info)
+{
+ const Resource* r = info.worker.resource;
+ unsigned int index = info.worker.index;
+ const ContainerType* ctype = info.worker.type;
+ std::map<const Resource* ,ResourceLoadInfo>::iterator it = _resources.find(r);
+ it->second.free(ctype, index);
+}
+
+// ResourceInfoForContainer
+
+DefaultAlgorithm::ResourceInfoForContainer::ResourceInfoForContainer
+ (const Resource * r, const ContainerType* ctype)
+: _ctype(ctype)
+, _resource(r)
+, _runningContainers()
+, _firstFreeContainer(0)
+{
+}
+
+unsigned int DefaultAlgorithm::ResourceInfoForContainer::maxContainers()const
+{
+ return float(_resource->nbCores) / _ctype->neededCores;
+}
+
+unsigned int DefaultAlgorithm::ResourceInfoForContainer::alloc()
+{
+ unsigned int result = _firstFreeContainer;
+ _runningContainers.insert(result);
+ _firstFreeContainer++;
+ while(isContainerRunning(_firstFreeContainer))
+ _firstFreeContainer++;
+ return result;
+}
+
+void DefaultAlgorithm::ResourceInfoForContainer::free(unsigned int index)
+{
+ _runningContainers.erase(index);
+ if(index < _firstFreeContainer)
+ _firstFreeContainer = index;
+}
+
+unsigned int DefaultAlgorithm::ResourceInfoForContainer::nbRunningContainers()const
+{
+ return _runningContainers.size();
+}
+
+bool DefaultAlgorithm::ResourceInfoForContainer::isContainerRunning
+ (unsigned int index)const
+{
+ return _runningContainers.find(index)!=_runningContainers.end();
+}
+
+// ResourceLoadInfo
+
+DefaultAlgorithm::ResourceLoadInfo::ResourceLoadInfo(const Resource * r)
+: _resource(r)
+, _load(0.0)
+, _ctypes()
+{
+}
+
+bool DefaultAlgorithm::ResourceLoadInfo::isSupported
+ (const ContainerType* ctype)const
+{
+ return ctype->neededCores <= _resource->nbCores ;
+}
+
+bool DefaultAlgorithm::ResourceLoadInfo::isAllocPossible
+ (const ContainerType* ctype)const
+{
+ return ctype->neededCores + _load <= _resource->nbCores;
+}
+
+float DefaultAlgorithm::ResourceLoadInfo::cost
+ (const ContainerType* ctype)const
+{
+ return _load * 100.0 / float(_resource->nbCores);
+}
+
+unsigned int DefaultAlgorithm::ResourceLoadInfo::alloc
+ (const ContainerType* ctype)
+{
+ std::map<const ContainerType*, ResourceInfoForContainer>::iterator it;
+ it = _ctypes.find(ctype);
+ if(it == _ctypes.end())
+ {
+ // add the type if not found
+ it = _ctypes.emplace(std::piecewise_construct,
+ std::forward_as_tuple(ctype),
+ std::forward_as_tuple(_resource, ctype)
+ ).first;
+ }
+ _load += ctype->neededCores;
+ return it->second.alloc();
+}
+
+void DefaultAlgorithm::ResourceLoadInfo::free
+ (const ContainerType* ctype, int index)
+{
+ _load -= ctype->neededCores;
+ std::map<const ContainerType*, ResourceInfoForContainer>::iterator it;
+ it = _ctypes.find(ctype);
+ it->second.free(index);
+}
+
+}
--- /dev/null
+// Copyright (C) 2020 CEA/DEN, EDF R&D
+//
+// This library is free software; you can redistribute it and/or
+// modify it under the terms of the GNU Lesser General Public
+// License as published by the Free Software Foundation; either
+// version 2.1 of the License, or (at your option) any later version.
+//
+// This library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public
+// License along with this library; if not, write to the Free Software
+// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+//
+// See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
+//
+#ifndef ALGORITHMIMPLEMENT_H
+#define ALGORITHMIMPLEMENT_H
+
+#include "WorkloadAlgorithm.hxx"
+#include <set>
+#include <map>
+#include <list>
+
+namespace WorkloadManager
+{
+/**
+ * @todo write docs
+ */
+class DefaultAlgorithm : public WorkloadAlgorithm
+{
+public:
+ void addTask(Task* t)override;
+ void addResource(Resource* r)override;
+ LaunchInfo chooseTask()override;
+ void liberate(const LaunchInfo& info)override;
+ bool empty()const override;
+
+// ----------------------------- PRIVATE ----------------------------- //
+private:
+ class ResourceInfoForContainer
+ {
+ public:
+ ResourceInfoForContainer(const Resource * r, const ContainerType* ctype);
+ unsigned int maxContainers()const;
+ unsigned int alloc();
+ void free(unsigned int index);
+ unsigned int nbRunningContainers()const;
+ bool isContainerRunning(unsigned int index)const;
+ private:
+ const ContainerType* _ctype;
+ const Resource* _resource;
+ std::set<unsigned int> _runningContainers;
+ unsigned int _firstFreeContainer;
+ };
+
+ class ResourceLoadInfo
+ {
+ public:
+ ResourceLoadInfo(const Resource * r);
+ bool isSupported(const ContainerType* ctype)const;
+ bool isAllocPossible(const ContainerType* ctype)const;
+ float cost(const ContainerType* ctype)const;
+ unsigned int alloc(const ContainerType* ctype);
+ void free(const ContainerType* ctype, int index);
+ private:
+ const Resource* _resource;
+ float _load;
+ std::map<const ContainerType*, ResourceInfoForContainer> _ctypes;
+ };
+
+private:
+ std::map<const Resource *, ResourceLoadInfo> _resources;
+ std::list<Task*> _waitingTasks;
+};
+}
+#endif // ALGORITHMIMPLEMENT_H
--- /dev/null
+// Copyright (C) 2020 CEA/DEN, EDF R&D
+//
+// This library is free software; you can redistribute it and/or
+// modify it under the terms of the GNU Lesser General Public
+// License as published by the Free Software Foundation; either
+// version 2.1 of the License, or (at your option) any later version.
+//
+// This library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public
+// License along with this library; if not, write to the Free Software
+// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+//
+// See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
+//
+#include "Task.hxx"
--- /dev/null
+// Copyright (C) 2020 CEA/DEN, EDF R&D
+//
+// This library is free software; you can redistribute it and/or
+// modify it under the terms of the GNU Lesser General Public
+// License as published by the Free Software Foundation; either
+// version 2.1 of the License, or (at your option) any later version.
+//
+// This library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public
+// License along with this library; if not, write to the Free Software
+// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+//
+// See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
+//
+#ifndef _TASK_H_
+#define _TASK_H_
+
+#include <string>
+
+namespace WorkloadManager
+{
+ struct ContainerType
+ {
+ float neededCores; // needed by WorkloadManager
+ // parameters for client use, not needed by WorkloadManager:
+ std::string name;
+ int id;
+ };
+
+ struct Resource
+ {
+ unsigned int nbCores; // needed by WorkloadManager
+ // parameters for client use, not needed by WorkloadManager:
+ std::string name;
+ int id;
+ };
+
+ struct Container
+ {
+ const ContainerType* type;
+ const Resource* resource;
+ unsigned int index; // worker index on the resource for this type
+ };
+
+ /**
+ * @todo write docs
+ */
+ class Task
+ {
+ public:
+ virtual const ContainerType* type()const =0;
+ virtual void run(const Container& c)=0;
+ };
+}
+
+#endif // _TASK_H_
--- /dev/null
+# Copyright (C) 2020 CEA/DEN, EDF R&D
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+# See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
+#
+
+SET(_link_LIBRARIES
+ ${CPPUNIT_LIBRARIES}
+ ${PTHREAD_LIBRARIES}
+ YACSlibWorkloadmanager
+)
+
+ADD_EXECUTABLE(WorkloadManagerTest TestMain.cxx)
+TARGET_LINK_LIBRARIES(WorkloadManagerTest ${_link_LIBRARIES})
+ADD_TEST(WorkloadManager WorkloadManagerTest)
+
+# For salome test
+SET(LOCAL_TEST_DIR ${SALOME_YACS_INSTALL_TEST}/workloadmanager)
+INSTALL(TARGETS WorkloadManagerTest DESTINATION ${LOCAL_TEST_DIR})
+INSTALL(FILES CTestTestfileInstall.cmake
+ DESTINATION ${LOCAL_TEST_DIR}
+ RENAME CTestTestfile.cmake)
--- /dev/null
+# Copyright (C) 2020 CEA/DEN, EDF R&D
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+# See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
+#
+
+IF(NOT WIN32)
+ SET(TEST_NAME ${COMPONENT_NAME}_WorkloadManager)
+ ADD_TEST(${TEST_NAME} WorkloadManagerTest)
+ SET_TESTS_PROPERTIES(${TEST_NAME} PROPERTIES
+ LABELS "${COMPONENT_NAME}"
+ ENVIRONMENT "LD_LIBRARY_PATH=${YACS_TEST_LIB}:$ENV{LD_LIBRARY_PATH}"
+ )
+ENDIF()
--- /dev/null
+// Copyright (C) 2020 EDF R&D
+//
+// This library is free software; you can redistribute it and/or
+// modify it under the terms of the GNU Lesser General Public
+// License as published by the Free Software Foundation; either
+// version 2.1 of the License, or (at your option) any later version.
+//
+// This library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public
+// License along with this library; if not, write to the Free Software
+// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+//
+//
+#include <cppunit/CompilerOutputter.h>
+#include <cppunit/TestResult.h>
+#include <cppunit/TestResultCollector.h>
+#include <cppunit/TextTestProgressListener.h>
+#include <cppunit/BriefTestProgressListener.h>
+#include <cppunit/extensions/TestFactoryRegistry.h>
+#include <cppunit/TestRunner.h>
+#include <cppunit/TextTestRunner.h>
+#include <stdexcept>
+
+#include <iostream>
+#include <fstream>
+#include <cstdlib>
+#include <cppunit/extensions/HelperMacros.h>
+
+#include <string>
+#include <sstream>
+
+#include <chrono>
+#include <ctime>
+#include <thread>
+
+#include "../WorkloadManager.hxx"
+#include "../DefaultAlgorithm.hxx"
+
+constexpr bool ACTIVATE_DEBUG_LOG = true;
+template<typename... Ts>
+void DEBUG_LOG(Ts... args)
+{
+ if(! ACTIVATE_DEBUG_LOG)
+ return;
+ if(sizeof...(Ts) == 0)
+ return;
+ std::ostringstream message;
+ // TODO: C++17 solution: ( (message << args), ...);
+ // since initializer lists guarantee sequencing, this can be used to
+ // call a function on each element of a pack, in order:
+ int dummy[] = { (message << args, 0)...};
+ message << std::endl;
+ std::cerr << message.str();
+}
+
+class MyTask;
+class AbstractChecker
+{
+public:
+ virtual void check(const WorkloadManager::Container& c, MyTask* t)=0;
+};
+
+template <std::size_t size_R, std::size_t size_T>
+class Checker : public AbstractChecker
+{
+public:
+ Checker();
+ void check(const WorkloadManager::Container& c, MyTask* t)override;
+ void globalCheck();
+ void reset();
+
+ WorkloadManager::Resource resources[size_R];
+ WorkloadManager::ContainerType types[size_T];
+private:
+ std::mutex _mutex;
+ int _maxContainersForResource[size_R][size_T];
+};
+
+class MyTask : public WorkloadManager::Task
+{
+public:
+ const WorkloadManager::ContainerType* type()const override {return _type;}
+ void run(const WorkloadManager::Container& c)override
+ {
+ _check->check(c, this);
+
+ DEBUG_LOG("Running task ", _id, " on ", c.resource->name, "-", c.type->name,
+ "-", c.index);
+ std::this_thread::sleep_for(std::chrono::seconds(_sleep));
+ DEBUG_LOG("Finish task ", _id);
+ }
+
+ void reset(int id,
+ const WorkloadManager::ContainerType* type,
+ int sleep,
+ AbstractChecker * check
+ )
+ {
+ _id = id;
+ _type = type;
+ _sleep = sleep;
+ _check = check;
+ }
+private:
+ int _id = 0;
+ const WorkloadManager::ContainerType* _type = nullptr;
+ int _sleep = 0;
+ AbstractChecker * _check;
+};
+
+template <std::size_t size_R, std::size_t size_T>
+Checker<size_R, size_T>::Checker()
+{
+ for(std::size_t i=0; i < size_R; i ++)
+ {
+ resources[i].id = i;
+ std::ostringstream name;
+ name << "r" << i;
+ resources[i].name = name.str();
+ }
+
+ for(std::size_t i=0; i < size_T; i ++)
+ {
+ types[i].id = i;
+ std::ostringstream name;
+ name << "t" << i;
+ types[i].name = name.str();
+ }
+
+ for(std::size_t i=0; i < size_R; i++)
+ for(std::size_t j=0; j < size_T; j++)
+ _maxContainersForResource[i][j] = 0;
+}
+
+template <std::size_t size_R, std::size_t size_T>
+void Checker<size_R, size_T>::check(const WorkloadManager::Container& c,
+ MyTask* t)
+{
+ std::unique_lock<std::mutex> lock(_mutex);
+ int& max = _maxContainersForResource[c.resource->id][c.type->id];
+ if( max < c.index)
+ max = c.index;
+}
+
+template <std::size_t size_R, std::size_t size_T>
+void Checker<size_R, size_T>::globalCheck()
+{
+ for(std::size_t i=0; i < size_R; i++)
+ {
+ float global_max = 0;
+ for(std::size_t j=0; j < size_T; j++)
+ {
+ int max = _maxContainersForResource[i][j];
+ DEBUG_LOG(resources[i].name, ", ", types[j].name, ":", max+1);
+ CPPUNIT_ASSERT( (max+1) * types[j].neededCores <= resources[i].nbCores );
+ global_max += types[j].neededCores * float(max+1);
+ }
+ DEBUG_LOG(resources[i].name, " global: ", global_max);
+ CPPUNIT_ASSERT(global_max >= resources[i].nbCores); // cores fully used
+ }
+}
+
+template <std::size_t size_R, std::size_t size_T>
+void Checker<size_R, size_T>::reset()
+{
+ for(std::size_t i=0; i < size_R; i++)
+ for(std::size_t j=0; j < size_T; j++)
+ _maxContainersForResource[i][j] = 0;
+}
+
+class MyTest: public CppUnit::TestFixture
+{
+ CPPUNIT_TEST_SUITE(MyTest);
+ CPPUNIT_TEST(atest);
+ CPPUNIT_TEST_SUITE_END();
+public:
+ void atest();
+};
+
+void MyTest::atest()
+{
+ constexpr std::size_t resourcesNumber = 2;
+ constexpr std::size_t typesNumber = 2;
+ Checker<resourcesNumber, typesNumber> check;
+ check.resources[0].nbCores = 10;
+ check.resources[1].nbCores = 18;
+ check.types[0].neededCores = 4.0;
+ check.types[1].neededCores = 1.0;
+
+ for(std::size_t i=0; i < resourcesNumber; i ++)
+ DEBUG_LOG(check.resources[i].name, " has ", check.resources[i].nbCores,
+ " cores.");
+ for(std::size_t i=0; i < typesNumber; i ++)
+ DEBUG_LOG(check.types[i].name, " needs ", check.types[i].neededCores,
+ " cores.");
+
+ constexpr std::size_t tasksNumber = 100;
+ MyTask tasks[tasksNumber];
+ for(std::size_t i = 0; i < tasksNumber / 2; i++)
+ tasks[i].reset(i, &check.types[0], 2, &check);
+ for(std::size_t i = tasksNumber / 2; i < tasksNumber; i++)
+ tasks[i].reset(i, &check.types[1], 1, &check);
+
+ DEBUG_LOG("Number of tasks: ", tasksNumber);
+ DEBUG_LOG("Tasks from 0 to ", tasksNumber/2, " are ", tasks[0].type()->name);
+ DEBUG_LOG("Tasks from ", tasksNumber/2, " to ", tasksNumber, " are ",
+ tasks[tasksNumber / 2].type()->name);
+
+ WorkloadManager::DefaultAlgorithm algo;
+ WorkloadManager::WorkloadManager wlm(algo);
+ for(std::size_t i=0; i < resourcesNumber; i ++)
+ wlm.addResource(&check.resources[i]);
+
+ // Add 4 core tasks first
+ check.reset();
+ for(std::size_t i = 0; i < tasksNumber; i++)
+ wlm.addTask(&tasks[i]);
+ std::chrono::steady_clock::time_point start_time;
+ start_time = std::chrono::steady_clock::now();
+ wlm.start(); // tasks can be added before start.
+ wlm.stop();
+ std::chrono::steady_clock::time_point end_time;
+ end_time = std::chrono::steady_clock::now();
+ std::chrono::seconds duration;
+ duration = std::chrono::duration_cast<std::chrono::seconds>
+ (end_time - start_time);
+ std::chrono::seconds maxExpectedDuration(22);
+ CPPUNIT_ASSERT( duration < maxExpectedDuration );
+ DEBUG_LOG("Test step duration : ", duration.count(), "s");
+ check.globalCheck();
+
+ // Add 1 core tasks first
+ check.reset();
+ // WARNING: std::size_t is always >= 0
+ for(int i = tasksNumber-1; i >= 0; i--)
+ wlm.addTask(&tasks[i]);
+ start_time = std::chrono::steady_clock::now();
+ wlm.start(); // tasks can be added before start.
+ wlm.stop();
+ end_time = std::chrono::steady_clock::now();
+ duration = std::chrono::duration_cast<std::chrono::seconds>
+ (end_time - start_time);
+ CPPUNIT_ASSERT( duration < maxExpectedDuration );
+ DEBUG_LOG("Test step duration : ", duration.count(), "s");
+ check.globalCheck();
+
+ // Add 1 core tasks first & start before addTask
+ check.reset();
+ start_time = std::chrono::steady_clock::now();
+ wlm.start();
+ for(int i = tasksNumber-1; i >= 0; i--)
+ wlm.addTask(&tasks[i]);
+ wlm.stop();
+ end_time = std::chrono::steady_clock::now();
+ duration = std::chrono::duration_cast<std::chrono::seconds>
+ (end_time - start_time);
+ CPPUNIT_ASSERT( duration < maxExpectedDuration );
+ DEBUG_LOG("Test step duration : ", duration.count(), "s");
+ check.globalCheck();
+
+}
+
+CPPUNIT_TEST_SUITE_REGISTRATION(MyTest);
+
+#include "BasicMainTest.hxx"
--- /dev/null
+// Copyright (C) 2020 CEA/DEN, EDF R&D
+//
+// This library is free software; you can redistribute it and/or
+// modify it under the terms of the GNU Lesser General Public
+// License as published by the Free Software Foundation; either
+// version 2.1 of the License, or (at your option) any later version.
+//
+// This library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public
+// License along with this library; if not, write to the Free Software
+// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+//
+// See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
+//
+#ifndef WORKLOADALGORITHM_H
+#define WORKLOADALGORITHM_H
+
+#include "Task.hxx"
+
+namespace WorkloadManager
+{
+class WorkloadAlgorithm
+{
+public:
+ struct LaunchInfo
+ {
+ bool taskFound=false;
+ Container worker;
+ Task* task=nullptr;
+ };
+
+ virtual void addTask(Task* t)=0;
+ virtual void addResource(Resource* r)=0;
+ virtual LaunchInfo chooseTask()=0;
+ virtual void liberate(const LaunchInfo& info)=0;
+ virtual bool empty()const =0;
+};
+}
+#endif // WORKLOADALGORITHM_H
--- /dev/null
+// Copyright (C) 2020 CEA/DEN, EDF R&D
+//
+// This library is free software; you can redistribute it and/or
+// modify it under the terms of the GNU Lesser General Public
+// License as published by the Free Software Foundation; either
+// version 2.1 of the License, or (at your option) any later version.
+//
+// This library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public
+// License along with this library; if not, write to the Free Software
+// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+//
+// See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
+//
+#include "WorkloadManager.hxx"
+#include "Task.hxx"
+//#include "Container.hxx"
+
+namespace WorkloadManager
+{
+ WorkloadManager::WorkloadManager(WorkloadAlgorithm& algo)
+ : _runningTasks()
+ , _finishedTasks()
+ , _nextIndex(0)
+ , _data_mutex()
+ , _startCondition()
+ , _endCondition()
+ , _stop(false)
+ , _otherThreads()
+ , _algo(algo)
+ {
+ }
+
+ WorkloadManager::~WorkloadManager()
+ {
+ stop();
+ }
+
+ void WorkloadManager::addResource(Resource* r)
+ {
+ std::unique_lock<std::mutex> lock(_data_mutex);
+ _algo.addResource(r);
+ _startCondition.notify_one();
+ }
+
+ void WorkloadManager::addTask(Task* t)
+ {
+ std::unique_lock<std::mutex> lock(_data_mutex);
+ _algo.addTask(t);
+ _startCondition.notify_one();
+ }
+
+ void WorkloadManager::start()
+ {
+ {
+ std::unique_lock<std::mutex> lock(_data_mutex);
+ _stop = false;
+ }
+ _otherThreads.emplace_back(std::async([this]
+ {
+ runTasks();
+ }));
+ _otherThreads.emplace_back(std::async([this]
+ {
+ endTasks();
+ }));
+ }
+
+ void WorkloadManager::stop()
+ {
+ {
+ std::unique_lock<std::mutex> lock(_data_mutex);
+ _stop = true;
+ }
+ _startCondition.notify_one();
+ _endCondition.notify_one();
+ for(std::future<void>& th : _otherThreads)
+ th.wait();
+ }
+
+ void WorkloadManager::runTasks()
+ {
+ bool threadStop = false;
+ while(!threadStop)
+ {
+ std::unique_lock<std::mutex> lock(_data_mutex);
+ _startCondition.wait(lock, [this] {return !_algo.empty();});
+ RunningInfo taskInfo;
+ while(chooseTaskToRun(taskInfo))
+ {
+ _runningTasks.emplace(taskInfo.id, std::async([this, taskInfo]
+ {
+ runOneTask(taskInfo);
+ }));
+ }
+ threadStop = _stop && _algo.empty();
+ }
+ }
+
+ void WorkloadManager::runOneTask(const RunningInfo& taskInfo)
+ {
+ taskInfo.info.task->run(taskInfo.info.worker);
+
+ {
+ std::unique_lock<std::mutex> lock(_data_mutex);
+ _finishedTasks.push(taskInfo);
+ _endCondition.notify_one();
+ }
+ }
+
+ void WorkloadManager::endTasks()
+ {
+ bool threadStop = false;
+ while(!threadStop)
+ {
+ std::unique_lock<std::mutex> lock(_data_mutex);
+ _endCondition.wait(lock, [this] {return !_finishedTasks.empty();});
+ while(!_finishedTasks.empty())
+ {
+ RunningInfo taskInfo = _finishedTasks.front();
+ _finishedTasks.pop();
+ _runningTasks[taskInfo.id].wait();
+ _runningTasks.erase(taskInfo.id);
+ _algo.liberate(taskInfo.info);
+ }
+ threadStop = _stop && _runningTasks.empty() && _algo.empty();
+ _startCondition.notify_one();
+ }
+ }
+
+ bool WorkloadManager::chooseTaskToRun(RunningInfo& taskInfo)
+ {
+ // We are already under the lock
+ taskInfo.id = _nextIndex;
+ _nextIndex ++;
+ taskInfo.info = _algo.chooseTask();
+ return taskInfo.info.taskFound;
+ }
+
+}
--- /dev/null
+// Copyright (C) 2020 CEA/DEN, EDF R&D
+//
+// This library is free software; you can redistribute it and/or
+// modify it under the terms of the GNU Lesser General Public
+// License as published by the Free Software Foundation; either
+// version 2.1 of the License, or (at your option) any later version.
+//
+// This library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public
+// License along with this library; if not, write to the Free Software
+// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+//
+// See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
+//
+#ifndef WORKLOADMANAGER_H
+#define WORKLOADMANAGER_H
+#include <mutex>
+#include <future>
+#include <condition_variable> // notifications
+#include <map>
+#include <queue>
+#include <list>
+#include "Task.hxx"
+#include "WorkloadAlgorithm.hxx"
+
+namespace WorkloadManager
+{
+ class WorkloadManager
+ {
+ public:
+ WorkloadManager(WorkloadAlgorithm& algo);
+ ~WorkloadManager();
+ void addTask(Task* t);
+ void addResource(Resource* r);
+ void start(); //! start execution
+ void stop(); //! stop execution
+
+ private:
+ typedef unsigned long TaskId;
+ struct RunningInfo
+ {
+ TaskId id;
+ WorkloadAlgorithm::LaunchInfo info;
+ };
+ std::map<TaskId, std::future<void> > _runningTasks;
+ std::queue<RunningInfo> _finishedTasks;
+ TaskId _nextIndex;
+ std::mutex _data_mutex;
+ std::condition_variable _startCondition; // start tasks thread notification
+ std::condition_variable _endCondition; // end tasks thread notification
+ bool _stop;
+ std::list< std::future<void> > _otherThreads;
+ WorkloadAlgorithm& _algo;
+
+ void runTasks();
+ void endTasks();
+ void runOneTask(const RunningInfo& taskInfo);
+ // choose a task and block a resource
+ bool chooseTaskToRun(RunningInfo& taskInfo);
+ };
+}
+#endif // WORKLOADMANAGER_H