Salome HOME
Include sources of workloadmanager. agy/optim_fe_0_2
authorOvidiu Mircescu <ovidiu.mircescu@edf.fr>
Mon, 6 Apr 2020 09:48:09 +0000 (11:48 +0200)
committerOvidiu Mircescu <ovidiu.mircescu@edf.fr>
Mon, 6 Apr 2020 09:48:09 +0000 (11:48 +0200)
13 files changed:
src/CMakeLists.txt
src/CTestTestfileInstall.cmake
src/workloadmanager/CMakeLists.txt [new file with mode: 0644]
src/workloadmanager/DefaultAlgorithm.cxx [new file with mode: 0644]
src/workloadmanager/DefaultAlgorithm.hxx [new file with mode: 0644]
src/workloadmanager/Task.cxx [new file with mode: 0644]
src/workloadmanager/Task.hxx [new file with mode: 0644]
src/workloadmanager/Test/CMakeLists.txt [new file with mode: 0644]
src/workloadmanager/Test/CTestTestfileInstall.cmake [new file with mode: 0644]
src/workloadmanager/Test/TestMain.cxx [new file with mode: 0644]
src/workloadmanager/WorkloadAlgorithm.hxx [new file with mode: 0644]
src/workloadmanager/WorkloadManager.cxx [new file with mode: 0644]
src/workloadmanager/WorkloadManager.hxx [new file with mode: 0644]

index f981b3f6c3ddc539bd8641086aacd79acb02bffc..2ac3ab5be6569c52f84d85f61c63f7d3f1b89970 100644 (file)
@@ -21,7 +21,7 @@
 # Common packages
 ##
 SET(SUBDIRS_COMMON
-  bases engine wrappergen yacsorb salomeloader
+  bases engine wrappergen yacsorb salomeloader workloadmanager
   pmml
   )
 
index 352cf9ee6044ed60e329929e16f1f41a266ac7db..0738813ecc47c9fdd278547e884174c58b80913a 100644 (file)
@@ -35,4 +35,5 @@ SUBDIRS(
     yacsloader
     yacsloader_swig
     py2yacs
+    workloadmanager
     )
diff --git a/src/workloadmanager/CMakeLists.txt b/src/workloadmanager/CMakeLists.txt
new file mode 100644 (file)
index 0000000..654ef6f
--- /dev/null
@@ -0,0 +1,45 @@
+# Copyright (C) 2020  CEA/DEN, EDF R&D
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
+#
+# See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
+#
+
+SET (_link_LIBRARIES
+  #Threads::Threads
+  ${PTHREAD_LIBRARIES}
+)
+
+SET (_wlm_sources
+  Task.cxx
+  WorkloadManager.cxx
+  DefaultAlgorithm.cxx
+)
+
+SET (_wlm_headers
+  Task.hxx
+  WorkloadManager.hxx
+  WorkloadAlgorithm.hxx
+  DefaultAlgorithm.hxx
+)
+
+ADD_LIBRARY(YACSlibWorkloadmanager ${_wlm_sources})
+TARGET_LINK_LIBRARIES(YACSlibWorkloadmanager ${_link_LIBRARIES})
+INSTALL(TARGETS YACSlibWorkloadmanager EXPORT ${PROJECT_NAME}TargetGroup DESTINATION ${SALOME_INSTALL_LIBS})
+INSTALL(FILES ${_wlm_headers} DESTINATION ${SALOME_INSTALL_HEADERS})
+
+IF(SALOME_BUILD_TESTS)
+  ADD_SUBDIRECTORY(Test)
+ENDIF(SALOME_BUILD_TESTS)
diff --git a/src/workloadmanager/DefaultAlgorithm.cxx b/src/workloadmanager/DefaultAlgorithm.cxx
new file mode 100644 (file)
index 0000000..d25d18f
--- /dev/null
@@ -0,0 +1,205 @@
+// Copyright (C) 2020  CEA/DEN, EDF R&D
+//
+// This library is free software; you can redistribute it and/or
+// modify it under the terms of the GNU Lesser General Public
+// License as published by the Free Software Foundation; either
+// version 2.1 of the License, or (at your option) any later version.
+//
+// This library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+// Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public
+// License along with this library; if not, write to the Free Software
+// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
+//
+// See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
+//
+#include "DefaultAlgorithm.hxx"
+#include "Task.hxx"
+#include <stdexcept>
+#include <limits>
+
+namespace WorkloadManager
+{
+void DefaultAlgorithm::addTask(Task* t)
+{
+  // put the tasks which need more cores in front.
+  float newNeedCores = t->type()->neededCores;
+  if(_waitingTasks.empty())
+    _waitingTasks.push_back(t);
+  else if(_waitingTasks.back()->type()->neededCores >= newNeedCores)
+    _waitingTasks.push_back(t);
+  else
+  {
+    std::list<Task*>::iterator it = _waitingTasks.begin();
+    while(it != _waitingTasks.end() && (*it)->type()->neededCores >= newNeedCores)
+      it++;
+    _waitingTasks.insert(it, t);
+  }
+}
+
+bool DefaultAlgorithm::empty()const
+{
+  return _waitingTasks.empty();
+}
+
+void DefaultAlgorithm::addResource(Resource* r)
+{
+  // add the resource. The operation is ignored if the resource already exists.
+  _resources.emplace(std::piecewise_construct,
+                     std::forward_as_tuple(r),
+                     std::forward_as_tuple(r)
+                    );
+}
+
+WorkloadAlgorithm::LaunchInfo DefaultAlgorithm::chooseTask()
+{
+  LaunchInfo result;
+  std::list<Task*>::iterator chosenTaskIt;
+  for( std::list<Task*>::iterator itTask = _waitingTasks.begin();
+      !result.taskFound && itTask != _waitingTasks.end();
+      itTask ++)
+  {
+    const ContainerType* ctype = (*itTask)->type();
+    std::map<const Resource *, ResourceLoadInfo>::iterator best_resource;
+    best_resource = _resources.end();
+    float best_cost = std::numeric_limits<float>::max();
+    for(auto itResource = _resources.begin();
+        itResource != _resources.end();
+        itResource++)
+      if(itResource->second.isSupported(ctype))
+      {
+        if(itResource->second.isAllocPossible(ctype))
+        {
+          float thisCost = itResource->second.cost(ctype);
+          if( best_cost > thisCost)
+          {
+            best_cost = thisCost;
+            best_resource = itResource;
+          }
+        }
+      }
+    if(best_resource != _resources.end())
+    {
+      chosenTaskIt = itTask;
+      result.task = (*itTask);
+      result.taskFound = true;
+      result.worker.resource = best_resource->first;
+      result.worker.type = ctype;
+      result.worker.index = best_resource->second.alloc(ctype);
+    }
+  }
+  if(result.taskFound)
+    _waitingTasks.erase(chosenTaskIt);
+  return result;
+}
+
+void DefaultAlgorithm::liberate(const LaunchInfo& info)
+{
+  const Resource* r = info.worker.resource;
+  unsigned int index = info.worker.index;
+  const ContainerType* ctype = info.worker.type;
+  std::map<const Resource* ,ResourceLoadInfo>::iterator it = _resources.find(r);
+  it->second.free(ctype, index);
+}
+
+// ResourceInfoForContainer
+
+DefaultAlgorithm::ResourceInfoForContainer::ResourceInfoForContainer
+                                (const Resource * r, const ContainerType* ctype)
+: _ctype(ctype)
+, _resource(r)
+, _runningContainers()
+, _firstFreeContainer(0)
+{
+}
+
+unsigned int DefaultAlgorithm::ResourceInfoForContainer::maxContainers()const
+{
+  return float(_resource->nbCores) / _ctype->neededCores;
+}
+
+unsigned int  DefaultAlgorithm::ResourceInfoForContainer::alloc()
+{
+  unsigned int result = _firstFreeContainer;
+  _runningContainers.insert(result);
+  _firstFreeContainer++;
+  while(isContainerRunning(_firstFreeContainer))
+    _firstFreeContainer++;
+  return result;
+}
+
+void DefaultAlgorithm::ResourceInfoForContainer::free(unsigned int index)
+{
+  _runningContainers.erase(index);
+  if(index < _firstFreeContainer)
+    _firstFreeContainer = index;
+}
+
+unsigned int DefaultAlgorithm::ResourceInfoForContainer::nbRunningContainers()const
+{
+  return _runningContainers.size();
+}
+
+bool DefaultAlgorithm::ResourceInfoForContainer::isContainerRunning
+                                (unsigned int index)const
+{
+  return _runningContainers.find(index)!=_runningContainers.end();
+}
+
+// ResourceLoadInfo
+
+DefaultAlgorithm::ResourceLoadInfo::ResourceLoadInfo(const Resource * r)
+: _resource(r)
+, _load(0.0)
+, _ctypes()
+{
+}
+
+bool DefaultAlgorithm::ResourceLoadInfo::isSupported
+                                (const ContainerType* ctype)const
+{
+  return ctype->neededCores <= _resource->nbCores ;
+}
+                                          
+bool DefaultAlgorithm::ResourceLoadInfo::isAllocPossible
+                                (const ContainerType* ctype)const
+{
+  return ctype->neededCores + _load <= _resource->nbCores;
+}
+
+float DefaultAlgorithm::ResourceLoadInfo::cost
+                                (const ContainerType* ctype)const
+{
+  return _load * 100.0 / float(_resource->nbCores);
+}
+
+unsigned int DefaultAlgorithm::ResourceLoadInfo::alloc
+                                (const ContainerType* ctype)
+{
+  std::map<const ContainerType*, ResourceInfoForContainer>::iterator it;
+  it = _ctypes.find(ctype);
+  if(it == _ctypes.end())
+  {
+    // add the type if not found
+    it = _ctypes.emplace(std::piecewise_construct,
+                         std::forward_as_tuple(ctype),
+                         std::forward_as_tuple(_resource, ctype)
+                        ).first;
+  }
+  _load += ctype->neededCores;
+  return it->second.alloc();
+}
+
+void DefaultAlgorithm::ResourceLoadInfo::free
+                                (const ContainerType* ctype, int index)
+{
+  _load -= ctype->neededCores;
+  std::map<const ContainerType*, ResourceInfoForContainer>::iterator it;
+  it = _ctypes.find(ctype);
+  it->second.free(index);
+}
+
+}
diff --git a/src/workloadmanager/DefaultAlgorithm.hxx b/src/workloadmanager/DefaultAlgorithm.hxx
new file mode 100644 (file)
index 0000000..d30f504
--- /dev/null
@@ -0,0 +1,79 @@
+// Copyright (C) 2020  CEA/DEN, EDF R&D
+//
+// This library is free software; you can redistribute it and/or
+// modify it under the terms of the GNU Lesser General Public
+// License as published by the Free Software Foundation; either
+// version 2.1 of the License, or (at your option) any later version.
+//
+// This library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+// Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public
+// License along with this library; if not, write to the Free Software
+// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
+//
+// See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
+//
+#ifndef ALGORITHMIMPLEMENT_H
+#define ALGORITHMIMPLEMENT_H
+
+#include "WorkloadAlgorithm.hxx"
+#include <set>
+#include <map>
+#include <list>
+
+namespace WorkloadManager
+{
+/**
+ * @todo write docs
+ */
+class DefaultAlgorithm : public WorkloadAlgorithm
+{
+public:
+  void addTask(Task* t)override;
+  void addResource(Resource* r)override;
+  LaunchInfo chooseTask()override;
+  void liberate(const LaunchInfo& info)override;
+  bool empty()const override;
+
+// ----------------------------- PRIVATE ----------------------------- //
+private:
+  class ResourceInfoForContainer
+  {
+  public:
+    ResourceInfoForContainer(const Resource * r, const ContainerType* ctype);
+    unsigned int maxContainers()const;
+    unsigned int  alloc();
+    void free(unsigned int index);
+    unsigned int nbRunningContainers()const;
+    bool isContainerRunning(unsigned int index)const;
+  private:
+    const ContainerType* _ctype;
+    const Resource* _resource;
+    std::set<unsigned int> _runningContainers;
+    unsigned int _firstFreeContainer;
+  };
+  
+  class ResourceLoadInfo
+  {
+  public:
+    ResourceLoadInfo(const Resource * r);
+    bool isSupported(const ContainerType* ctype)const;
+    bool isAllocPossible(const ContainerType* ctype)const;
+    float cost(const ContainerType* ctype)const;
+    unsigned int alloc(const ContainerType* ctype);
+    void free(const ContainerType* ctype, int index);
+  private:
+    const Resource* _resource;
+    float _load;
+    std::map<const ContainerType*, ResourceInfoForContainer> _ctypes;
+  };
+  
+private:
+  std::map<const Resource *, ResourceLoadInfo> _resources;
+  std::list<Task*> _waitingTasks;
+};
+}
+#endif // ALGORITHMIMPLEMENT_H
diff --git a/src/workloadmanager/Task.cxx b/src/workloadmanager/Task.cxx
new file mode 100644 (file)
index 0000000..1d803a8
--- /dev/null
@@ -0,0 +1,19 @@
+// Copyright (C) 2020  CEA/DEN, EDF R&D
+//
+// This library is free software; you can redistribute it and/or
+// modify it under the terms of the GNU Lesser General Public
+// License as published by the Free Software Foundation; either
+// version 2.1 of the License, or (at your option) any later version.
+//
+// This library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+// Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public
+// License along with this library; if not, write to the Free Software
+// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
+//
+// See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
+//
+#include "Task.hxx"
diff --git a/src/workloadmanager/Task.hxx b/src/workloadmanager/Task.hxx
new file mode 100644 (file)
index 0000000..bdada0b
--- /dev/null
@@ -0,0 +1,60 @@
+// Copyright (C) 2020  CEA/DEN, EDF R&D
+//
+// This library is free software; you can redistribute it and/or
+// modify it under the terms of the GNU Lesser General Public
+// License as published by the Free Software Foundation; either
+// version 2.1 of the License, or (at your option) any later version.
+//
+// This library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+// Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public
+// License along with this library; if not, write to the Free Software
+// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
+//
+// See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
+//
+#ifndef _TASK_H_
+#define _TASK_H_
+
+#include <string>
+
+namespace WorkloadManager
+{
+  struct ContainerType
+  {
+    float neededCores; // needed by WorkloadManager
+    // parameters for client use, not needed by WorkloadManager:
+    std::string name;
+    int id;
+  };
+  
+  struct Resource
+  {
+    unsigned int nbCores; // needed by WorkloadManager
+    // parameters for client use, not needed by WorkloadManager:
+    std::string name;
+    int id;
+  };
+  
+  struct Container
+  {
+    const ContainerType* type;
+    const Resource* resource;
+    unsigned int index; // worker index on the resource for this type
+  };
+
+  /**
+  * @todo write docs
+  */
+  class Task
+  {
+  public:
+    virtual const ContainerType* type()const =0;
+    virtual void run(const Container& c)=0;
+  };
+}
+
+#endif // _TASK_H_
diff --git a/src/workloadmanager/Test/CMakeLists.txt b/src/workloadmanager/Test/CMakeLists.txt
new file mode 100644 (file)
index 0000000..805691f
--- /dev/null
@@ -0,0 +1,35 @@
+# Copyright (C) 2020  CEA/DEN, EDF R&D
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
+#
+# See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
+#
+
+SET(_link_LIBRARIES
+  ${CPPUNIT_LIBRARIES} 
+  ${PTHREAD_LIBRARIES}
+  YACSlibWorkloadmanager
+)
+
+ADD_EXECUTABLE(WorkloadManagerTest TestMain.cxx)
+TARGET_LINK_LIBRARIES(WorkloadManagerTest ${_link_LIBRARIES})
+ADD_TEST(WorkloadManager WorkloadManagerTest)
+
+# For salome test
+SET(LOCAL_TEST_DIR ${SALOME_YACS_INSTALL_TEST}/workloadmanager)
+INSTALL(TARGETS WorkloadManagerTest DESTINATION ${LOCAL_TEST_DIR})
+INSTALL(FILES CTestTestfileInstall.cmake
+        DESTINATION ${LOCAL_TEST_DIR}
+        RENAME CTestTestfile.cmake)
diff --git a/src/workloadmanager/Test/CTestTestfileInstall.cmake b/src/workloadmanager/Test/CTestTestfileInstall.cmake
new file mode 100644 (file)
index 0000000..fb9364c
--- /dev/null
@@ -0,0 +1,27 @@
+# Copyright (C) 2020  CEA/DEN, EDF R&D
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
+#
+# See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
+#
+
+IF(NOT WIN32)
+  SET(TEST_NAME ${COMPONENT_NAME}_WorkloadManager)
+  ADD_TEST(${TEST_NAME} WorkloadManagerTest)
+  SET_TESTS_PROPERTIES(${TEST_NAME} PROPERTIES
+                                    LABELS "${COMPONENT_NAME}"
+                                    ENVIRONMENT "LD_LIBRARY_PATH=${YACS_TEST_LIB}:$ENV{LD_LIBRARY_PATH}"
+                      )
+ENDIF()
diff --git a/src/workloadmanager/Test/TestMain.cxx b/src/workloadmanager/Test/TestMain.cxx
new file mode 100644 (file)
index 0000000..b3b53c9
--- /dev/null
@@ -0,0 +1,269 @@
+// Copyright (C) 2020  EDF R&D
+//
+// This library is free software; you can redistribute it and/or
+// modify it under the terms of the GNU Lesser General Public
+// License as published by the Free Software Foundation; either
+// version 2.1 of the License, or (at your option) any later version.
+//
+// This library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+// Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public
+// License along with this library; if not, write to the Free Software
+// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
+//
+//
+#include <cppunit/CompilerOutputter.h>
+#include <cppunit/TestResult.h>
+#include <cppunit/TestResultCollector.h>
+#include <cppunit/TextTestProgressListener.h>
+#include <cppunit/BriefTestProgressListener.h>
+#include <cppunit/extensions/TestFactoryRegistry.h>
+#include <cppunit/TestRunner.h>
+#include <cppunit/TextTestRunner.h>
+#include <stdexcept>
+
+#include <iostream>
+#include <fstream>
+#include <cstdlib>
+#include <cppunit/extensions/HelperMacros.h>
+
+#include <string>
+#include <sstream>
+
+#include <chrono>
+#include <ctime>
+#include <thread>
+
+#include "../WorkloadManager.hxx"
+#include "../DefaultAlgorithm.hxx"
+
+constexpr bool ACTIVATE_DEBUG_LOG = true;
+template<typename... Ts>
+void DEBUG_LOG(Ts... args)
+{
+  if(! ACTIVATE_DEBUG_LOG)
+    return;
+  if(sizeof...(Ts) == 0)
+    return;
+  std::ostringstream message;
+  // TODO:  C++17 solution: ( (message << args), ...);
+  // since initializer lists guarantee sequencing, this can be used to
+  // call a function on each element of a pack, in order:
+  int dummy[] = { (message << args, 0)...};
+  message << std::endl;
+  std::cerr << message.str();
+}
+
+class MyTask;
+class AbstractChecker
+{
+public:
+  virtual void check(const WorkloadManager::Container& c, MyTask* t)=0;
+};
+
+template <std::size_t size_R, std::size_t size_T>
+class Checker : public AbstractChecker
+{
+public:
+  Checker();
+  void check(const WorkloadManager::Container& c, MyTask* t)override;
+  void globalCheck();
+  void reset();
+
+  WorkloadManager::Resource resources[size_R];
+  WorkloadManager::ContainerType types[size_T];
+private:
+  std::mutex _mutex;
+  int _maxContainersForResource[size_R][size_T];
+};
+
+class MyTask : public WorkloadManager::Task
+{
+public:
+  const WorkloadManager::ContainerType* type()const override {return _type;}
+  void run(const WorkloadManager::Container& c)override
+  {
+    _check->check(c, this);
+
+    DEBUG_LOG("Running task ", _id, " on ", c.resource->name, "-", c.type->name,
+              "-", c.index);
+    std::this_thread::sleep_for(std::chrono::seconds(_sleep));
+    DEBUG_LOG("Finish task ", _id);
+  }
+
+  void reset(int id,
+             const WorkloadManager::ContainerType* type,
+             int sleep,
+             AbstractChecker * check
+            )
+  {
+    _id = id;
+    _type = type;
+    _sleep = sleep;
+    _check = check;
+  }
+private:
+  int _id = 0;
+  const WorkloadManager::ContainerType* _type = nullptr;
+  int _sleep = 0;
+  AbstractChecker * _check;
+};
+
+template <std::size_t size_R, std::size_t size_T>
+Checker<size_R, size_T>::Checker()
+{
+  for(std::size_t i=0; i < size_R; i ++)
+  {
+    resources[i].id = i;
+    std::ostringstream name;
+    name << "r" << i;
+    resources[i].name = name.str();
+  }
+
+  for(std::size_t i=0; i < size_T; i ++)
+  {
+    types[i].id = i;
+    std::ostringstream name;
+    name << "t" << i;
+    types[i].name = name.str();
+  }
+
+  for(std::size_t i=0; i < size_R; i++)
+    for(std::size_t j=0; j < size_T; j++)
+      _maxContainersForResource[i][j] = 0;
+}
+
+template <std::size_t size_R, std::size_t size_T>
+void Checker<size_R, size_T>::check(const WorkloadManager::Container& c,
+                                    MyTask* t)
+{
+  std::unique_lock<std::mutex> lock(_mutex);
+  int& max = _maxContainersForResource[c.resource->id][c.type->id];
+  if( max < c.index)
+    max = c.index;
+}
+
+template <std::size_t size_R, std::size_t size_T>
+void Checker<size_R, size_T>::globalCheck()
+{
+  for(std::size_t i=0; i < size_R; i++)
+  {
+    float global_max = 0;
+    for(std::size_t j=0; j < size_T; j++)
+    {
+      int max = _maxContainersForResource[i][j];
+      DEBUG_LOG(resources[i].name, ", ", types[j].name, ":", max+1);
+      CPPUNIT_ASSERT( (max+1) * types[j].neededCores <= resources[i].nbCores );
+      global_max += types[j].neededCores * float(max+1);
+    }
+    DEBUG_LOG(resources[i].name, " global: ", global_max);
+    CPPUNIT_ASSERT(global_max >= resources[i].nbCores); // cores fully used
+  }
+}
+
+template <std::size_t size_R, std::size_t size_T>
+void Checker<size_R, size_T>::reset()
+{
+  for(std::size_t i=0; i < size_R; i++)
+    for(std::size_t j=0; j < size_T; j++)
+      _maxContainersForResource[i][j] = 0;
+}
+
+class MyTest: public CppUnit::TestFixture
+{
+  CPPUNIT_TEST_SUITE(MyTest);
+  CPPUNIT_TEST(atest);
+  CPPUNIT_TEST_SUITE_END();
+public:
+  void atest();
+};
+
+void MyTest::atest()
+{
+  constexpr std::size_t resourcesNumber = 2;
+  constexpr std::size_t typesNumber = 2;
+  Checker<resourcesNumber, typesNumber> check;
+  check.resources[0].nbCores = 10;
+  check.resources[1].nbCores = 18;
+  check.types[0].neededCores = 4.0;
+  check.types[1].neededCores = 1.0;
+
+  for(std::size_t i=0; i < resourcesNumber; i ++)
+    DEBUG_LOG(check.resources[i].name, " has ", check.resources[i].nbCores,
+              " cores.");
+  for(std::size_t i=0; i < typesNumber; i ++)
+    DEBUG_LOG(check.types[i].name, " needs ", check.types[i].neededCores,
+              " cores.");
+
+  constexpr std::size_t tasksNumber = 100;
+  MyTask tasks[tasksNumber];
+  for(std::size_t i = 0; i < tasksNumber / 2; i++)
+    tasks[i].reset(i, &check.types[0], 2, &check);
+  for(std::size_t i = tasksNumber / 2; i < tasksNumber; i++)
+    tasks[i].reset(i, &check.types[1], 1, &check);
+
+  DEBUG_LOG("Number of tasks: ", tasksNumber);
+  DEBUG_LOG("Tasks from 0 to ", tasksNumber/2, " are ", tasks[0].type()->name);
+  DEBUG_LOG("Tasks from ", tasksNumber/2, " to ", tasksNumber, " are ",
+            tasks[tasksNumber / 2].type()->name);
+
+  WorkloadManager::DefaultAlgorithm algo;
+  WorkloadManager::WorkloadManager wlm(algo);
+  for(std::size_t i=0; i < resourcesNumber; i ++)
+    wlm.addResource(&check.resources[i]);
+
+  // Add 4 core tasks first
+  check.reset();
+  for(std::size_t i = 0; i < tasksNumber; i++)
+    wlm.addTask(&tasks[i]);
+  std::chrono::steady_clock::time_point start_time;
+  start_time = std::chrono::steady_clock::now();
+  wlm.start(); // tasks can be added before start.
+  wlm.stop();
+  std::chrono::steady_clock::time_point end_time;
+  end_time = std::chrono::steady_clock::now();
+  std::chrono::seconds duration;
+  duration = std::chrono::duration_cast<std::chrono::seconds>
+             (end_time - start_time);
+  std::chrono::seconds maxExpectedDuration(22);
+  CPPUNIT_ASSERT( duration < maxExpectedDuration );
+  DEBUG_LOG("Test step duration : ", duration.count(), "s");
+  check.globalCheck();
+
+  // Add 1 core tasks first
+  check.reset();
+  // WARNING: std::size_t is always >= 0
+  for(int i = tasksNumber-1; i >= 0; i--)
+    wlm.addTask(&tasks[i]);
+  start_time = std::chrono::steady_clock::now();
+  wlm.start(); // tasks can be added before start.
+  wlm.stop();
+  end_time = std::chrono::steady_clock::now();
+  duration = std::chrono::duration_cast<std::chrono::seconds>
+             (end_time - start_time);
+  CPPUNIT_ASSERT( duration < maxExpectedDuration );
+  DEBUG_LOG("Test step duration : ", duration.count(), "s");
+  check.globalCheck();
+
+  // Add 1 core tasks first & start before addTask
+  check.reset();
+  start_time = std::chrono::steady_clock::now();
+  wlm.start();
+  for(int i = tasksNumber-1; i >= 0; i--)
+    wlm.addTask(&tasks[i]);
+  wlm.stop();
+  end_time = std::chrono::steady_clock::now();
+  duration = std::chrono::duration_cast<std::chrono::seconds>
+             (end_time - start_time);
+  CPPUNIT_ASSERT( duration < maxExpectedDuration );
+  DEBUG_LOG("Test step duration : ", duration.count(), "s");
+  check.globalCheck();
+
+}
+
+CPPUNIT_TEST_SUITE_REGISTRATION(MyTest);
+
+#include "BasicMainTest.hxx"
diff --git a/src/workloadmanager/WorkloadAlgorithm.hxx b/src/workloadmanager/WorkloadAlgorithm.hxx
new file mode 100644 (file)
index 0000000..bad0dcf
--- /dev/null
@@ -0,0 +1,43 @@
+// Copyright (C) 2020  CEA/DEN, EDF R&D
+//
+// This library is free software; you can redistribute it and/or
+// modify it under the terms of the GNU Lesser General Public
+// License as published by the Free Software Foundation; either
+// version 2.1 of the License, or (at your option) any later version.
+//
+// This library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+// Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public
+// License along with this library; if not, write to the Free Software
+// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
+//
+// See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
+//
+#ifndef WORKLOADALGORITHM_H
+#define WORKLOADALGORITHM_H
+
+#include "Task.hxx"
+
+namespace WorkloadManager
+{
+class WorkloadAlgorithm
+{
+public:
+  struct LaunchInfo
+  {
+    bool taskFound=false;
+    Container worker;
+    Task* task=nullptr;
+  };
+
+  virtual void addTask(Task* t)=0;
+  virtual void addResource(Resource* r)=0;
+  virtual LaunchInfo chooseTask()=0;
+  virtual void liberate(const LaunchInfo& info)=0;
+  virtual bool empty()const =0;
+};
+}
+#endif // WORKLOADALGORITHM_H
diff --git a/src/workloadmanager/WorkloadManager.cxx b/src/workloadmanager/WorkloadManager.cxx
new file mode 100644 (file)
index 0000000..5d9f4e3
--- /dev/null
@@ -0,0 +1,144 @@
+// Copyright (C) 2020  CEA/DEN, EDF R&D
+//
+// This library is free software; you can redistribute it and/or
+// modify it under the terms of the GNU Lesser General Public
+// License as published by the Free Software Foundation; either
+// version 2.1 of the License, or (at your option) any later version.
+//
+// This library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+// Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public
+// License along with this library; if not, write to the Free Software
+// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
+//
+// See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
+//
+#include "WorkloadManager.hxx"
+#include "Task.hxx"
+//#include "Container.hxx"
+
+namespace WorkloadManager
+{
+  WorkloadManager::WorkloadManager(WorkloadAlgorithm& algo)
+  : _runningTasks()
+  , _finishedTasks()
+  , _nextIndex(0)
+  , _data_mutex()
+  , _startCondition()
+  , _endCondition()
+  , _stop(false)
+  , _otherThreads()
+  , _algo(algo)
+  {
+  }
+  
+  WorkloadManager::~WorkloadManager()
+  {
+    stop();
+  }
+  
+  void WorkloadManager::addResource(Resource* r)
+  {
+    std::unique_lock<std::mutex> lock(_data_mutex);
+    _algo.addResource(r);
+    _startCondition.notify_one();
+  }
+  
+  void WorkloadManager::addTask(Task* t)
+  {
+    std::unique_lock<std::mutex> lock(_data_mutex);
+    _algo.addTask(t);
+    _startCondition.notify_one();
+  }
+
+  void WorkloadManager::start()
+  {
+    {
+      std::unique_lock<std::mutex> lock(_data_mutex);
+      _stop = false;
+    }
+    _otherThreads.emplace_back(std::async([this]
+      {
+        runTasks();
+      }));
+    _otherThreads.emplace_back(std::async([this]
+      {
+        endTasks();
+      }));
+  }
+
+  void WorkloadManager::stop()
+  {
+    {
+      std::unique_lock<std::mutex> lock(_data_mutex);
+      _stop = true;
+    }
+    _startCondition.notify_one();
+    _endCondition.notify_one();
+    for(std::future<void>& th : _otherThreads)
+      th.wait();
+  }
+
+  void WorkloadManager::runTasks()
+  {
+    bool threadStop = false;
+    while(!threadStop)
+    {
+      std::unique_lock<std::mutex> lock(_data_mutex);
+      _startCondition.wait(lock, [this] {return !_algo.empty();});
+      RunningInfo taskInfo;
+      while(chooseTaskToRun(taskInfo))
+      {
+        _runningTasks.emplace(taskInfo.id, std::async([this, taskInfo]
+          {
+            runOneTask(taskInfo);
+          }));
+      }
+      threadStop = _stop && _algo.empty();
+    }
+  }
+
+  void WorkloadManager::runOneTask(const RunningInfo& taskInfo)
+  {
+    taskInfo.info.task->run(taskInfo.info.worker);
+
+    {
+      std::unique_lock<std::mutex> lock(_data_mutex);
+      _finishedTasks.push(taskInfo);
+      _endCondition.notify_one();
+    }
+  }
+
+  void WorkloadManager::endTasks()
+  {
+    bool threadStop = false;
+    while(!threadStop)
+    {
+      std::unique_lock<std::mutex> lock(_data_mutex);
+      _endCondition.wait(lock, [this] {return !_finishedTasks.empty();});
+      while(!_finishedTasks.empty())
+      {
+        RunningInfo taskInfo = _finishedTasks.front();
+        _finishedTasks.pop();
+        _runningTasks[taskInfo.id].wait();
+        _runningTasks.erase(taskInfo.id);
+        _algo.liberate(taskInfo.info);
+      }
+      threadStop = _stop && _runningTasks.empty() && _algo.empty();
+      _startCondition.notify_one();
+    }
+  }
+
+  bool WorkloadManager::chooseTaskToRun(RunningInfo& taskInfo)
+  {
+    // We are already under the lock
+    taskInfo.id = _nextIndex;
+    _nextIndex ++;
+    taskInfo.info = _algo.chooseTask();
+    return taskInfo.info.taskFound;
+  }
+
+}
diff --git a/src/workloadmanager/WorkloadManager.hxx b/src/workloadmanager/WorkloadManager.hxx
new file mode 100644 (file)
index 0000000..264b589
--- /dev/null
@@ -0,0 +1,66 @@
+// Copyright (C) 2020  CEA/DEN, EDF R&D
+//
+// This library is free software; you can redistribute it and/or
+// modify it under the terms of the GNU Lesser General Public
+// License as published by the Free Software Foundation; either
+// version 2.1 of the License, or (at your option) any later version.
+//
+// This library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+// Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public
+// License along with this library; if not, write to the Free Software
+// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
+//
+// See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
+//
+#ifndef WORKLOADMANAGER_H
+#define WORKLOADMANAGER_H
+#include <mutex>
+#include <future>
+#include <condition_variable> // notifications
+#include <map>
+#include <queue>
+#include <list>
+#include "Task.hxx"
+#include "WorkloadAlgorithm.hxx"
+
+namespace WorkloadManager
+{
+  class WorkloadManager
+  {
+  public:
+    WorkloadManager(WorkloadAlgorithm& algo);
+    ~WorkloadManager();
+    void addTask(Task* t);
+    void addResource(Resource* r);
+    void start(); //! start execution
+    void stop(); //! stop execution
+
+  private:
+    typedef unsigned long TaskId;
+    struct RunningInfo
+    {
+      TaskId id;
+      WorkloadAlgorithm::LaunchInfo info;
+    };
+    std::map<TaskId, std::future<void> > _runningTasks;
+    std::queue<RunningInfo> _finishedTasks;
+    TaskId _nextIndex;
+    std::mutex _data_mutex;
+    std::condition_variable _startCondition; // start tasks thread notification
+    std::condition_variable _endCondition; // end tasks thread notification
+    bool _stop;
+    std::list< std::future<void> > _otherThreads;
+    WorkloadAlgorithm& _algo;
+
+    void runTasks();
+    void endTasks();
+    void runOneTask(const RunningInfo& taskInfo);
+    // choose a task and block a resource
+    bool chooseTaskToRun(RunningInfo& taskInfo);
+  };
+}
+#endif // WORKLOADMANAGER_H