Salome HOME
Work in progress: Prepare for tasks with imposed resource.
authorOvidiu Mircescu <ovidiu.mircescu@edf.fr>
Fri, 1 May 2020 10:01:09 +0000 (12:01 +0200)
committerOvidiu Mircescu <ovidiu.mircescu@edf.fr>
Fri, 1 May 2020 10:01:09 +0000 (12:01 +0200)
testSaveLoadRun fails because of side effects between test cases.

src/engine/Executor.cxx
src/engine/Executor.hxx
src/engine/Task.hxx
src/workloadmanager/DefaultAlgorithm.cxx
src/workloadmanager/Task.hxx
src/workloadmanager/Test/TestMain.cxx
src/workloadmanager/WorkloadAlgorithm.hxx
src/workloadmanager/WorkloadManager.cxx
src/yacsloader_swig/Test/testSaveLoadRun.py

index baa82d3d89927e8e5ce0c3c3ad5e1052007620ef..e79afb4c27929fd53848bed7a290100e9638267f 100644 (file)
@@ -30,6 +30,9 @@
 #include "ServiceNode.hxx"
 #include "ComposedNode.hxx"
 
+#include "workloadmanager/WorkloadManager.hxx"
+#include "workloadmanager/DefaultAlgorithm.hxx"
+
 #include <iostream>
 #include <fstream>
 #include <sys/stat.h>
@@ -1404,7 +1407,7 @@ std::string Executor::ComputePlacement(Task *zeTask)
 }
 
 ///////// NEW EXECUTOR ////////////////////////////////
-void Executor::loadTask(Task *task)
+void Executor::loadTask(Task *task, const WorkloadManager::RunInfo& runInfo)
 {
   if(task->getState() != YACS::TOLOAD)
     return;
@@ -1415,6 +1418,10 @@ void Executor::loadTask(Task *task)
   }//End of critical section
   try
     {
+      std::ostringstream container_name;
+      container_name << runInfo.resource.name << "-"
+                     << runInfo.type.name << "-" << runInfo.index;
+      task->imposeResource(runInfo.resource.name, container_name.str());
       traceExec(task, "load", ComputePlacement(task));
       task->load();
       traceExec(task, "initService", ComputePlacement(task));
@@ -1658,8 +1665,6 @@ void Executor::makeDatastreamConnections(Task *task)
   traceExec(task, "state:"+Node::getStateName(task->getState()),ComputePlacement(task));
 }
 
-#include "workloadmanager/WorkloadManager.hxx"
-#include "workloadmanager/DefaultAlgorithm.hxx"
 #include "Runtime.hxx"
 static
 void loadResources(WorkloadManager::WorkloadManager& wm)
@@ -1685,7 +1690,7 @@ class NewTask : public WorkloadManager::Task
 public:
   NewTask(Executor& executor, YACS::ENGINE::Task* yacsTask);
   const WorkloadManager::ContainerType& type()const override;
-  void run(const WorkloadManager::Container& c)override;
+  void run(const WorkloadManager::RunInfo& runInfo)override;
 private:
   WorkloadManager::ContainerType _type;
   Executor& _executor;
@@ -1697,9 +1702,20 @@ NewTask::NewTask(Executor& executor, YACS::ENGINE::Task* yacsTask)
 , _executor(executor)
 , _yacsTask(yacsTask)
 {
-  _type.neededCores = 0;
+  Container * yacsContainer = yacsTask->getContainer();
+  if(yacsContainer != nullptr && !yacsTask->canAcceptImposedResource())
+  {
+    _type.ignoreResources = false;
+    _type.name = yacsContainer->getName();
+    _type.neededCores = 1; // TODO: use the actual value
+  }
+  else
+  {
+    _type.ignoreResources = true;
+    _type.name = "test";
+    _type.neededCores = 0;
+  }
   _type.id = 0;
-  _type.name = "test";
 }
 
 const WorkloadManager::ContainerType& NewTask::type()const
@@ -1707,9 +1723,9 @@ const WorkloadManager::ContainerType& NewTask::type()const
   return _type;
 }
 
-void NewTask::run(const WorkloadManager::Container& c)
+void NewTask::run(const WorkloadManager::RunInfo& runInfo)
 {
-  _executor.loadTask(_yacsTask);
+  _executor.loadTask(_yacsTask, runInfo);
   _executor.makeDatastreamConnections(_yacsTask);
   YACS::Event ev = _executor.runTask(_yacsTask);
   _executor.endTask(_yacsTask, ev);
index 50168ebd9752043d71556d4a871b91b9720162b9..acd78083339412aef610860c255daa81cc3ab3a5 100644 (file)
 #include <ctime>
 #include <chrono>
 
+namespace WorkloadManager
+{
+  struct RunInfo;
+}
+
 namespace YACS
 {
   namespace ENGINE
@@ -128,7 +133,7 @@ namespace YACS
       static size_t _threadStackSize;
       YACS::BASES::Mutex& getTheMutexForSchedulerUpdate() { return _mutexForSchedulerUpdate; }
       ///// new executor !!!!!
-      void loadTask(Task *task);
+      void loadTask(Task *task, const WorkloadManager::RunInfo& runInfo);
       YACS::Event runTask(Task *task);
       void makeDatastreamConnections(Task *task);
       void beginTask(Task *task);
index ed1b4377350defafa7e6cbee6e7460eb2ac8715b..590ebf20da6b5825381bc3f937f7f12ec9f15519 100644 (file)
@@ -23,6 +23,7 @@
 #include "YACSlibEngineExport.hxx"
 #include "define.hxx"
 #include <set>
+#include <string>
 
 namespace YACS
 {
@@ -51,6 +52,9 @@ namespace YACS
       virtual YACS::StatesForNode getState() const = 0;
       virtual void finished() = 0;
       virtual void aborted() = 0;
+      virtual void imposeResource(const std::string& resource_name,
+                                  const std::string& container_name) {}
+      virtual bool canAcceptImposedResource() { return false;}
       virtual ~Task();
     };
   }
index 96e48b4f65c5a645fa3a35a51112ada4c6466aa6..dbe588807c1f8943c3ba8668da7d8dd728040810 100644 (file)
@@ -60,37 +60,46 @@ WorkloadAlgorithm::LaunchInfo DefaultAlgorithm::chooseTask()
       itTask ++)
   {
     const ContainerType& ctype = (*itTask)->type();
-    std::list<ResourceLoadInfo>::iterator best_resource;
-    best_resource = _resources.end();
-    float best_cost = std::numeric_limits<float>::max();
-    bool isSupported = false;
-    for(auto itResource = _resources.begin();
-        itResource != _resources.end();
-        itResource++)
-      if(itResource->isSupported(ctype))
-      {
-        if(itResource->isAllocPossible(ctype))
+    if(ctype.ignoreResources)
+      result.taskFound = true;
+    else
+    {
+      std::list<ResourceLoadInfo>::iterator best_resource;
+      best_resource = _resources.end();
+      float best_cost = std::numeric_limits<float>::max();
+      bool isSupported = false;
+      for(auto itResource = _resources.begin();
+          itResource != _resources.end();
+          itResource++)
+        if(itResource->isSupported(ctype)
+            && (*itTask)->isAccepted(itResource->resource()))
         {
-          float thisCost = itResource->cost(ctype);
-          if( best_cost > thisCost)
+          if(itResource->isAllocPossible(ctype))
           {
-            best_cost = thisCost;
-            best_resource = itResource;
+            float thisCost = itResource->cost(ctype);
+            if( best_cost > thisCost)
+            {
+              best_cost = thisCost;
+              best_resource = itResource;
+            }
           }
         }
+      if(best_resource != _resources.end())
+      {
+        result.taskFound = true;
+        result.worker.resource = best_resource->resource();
+        result.worker.index = best_resource->alloc(ctype);
+      }
+      else if(!isSupported)
+      {
+        // TODO: This task can never be run by any available resource.
       }
-    if(best_resource != _resources.end())
+    }
+    if(result.taskFound)
     {
       chosenTaskIt = itTask;
       result.task = (*itTask);
-      result.taskFound = true;
-      result.worker.resource = best_resource->resource();
       result.worker.type = ctype;
-      result.worker.index = best_resource->alloc(ctype);
-    }
-    else if(!isSupported)
-    {
-      // TODO: This task can never be run by any available resource.
     }
   }
   if(result.taskFound)
@@ -100,13 +109,16 @@ WorkloadAlgorithm::LaunchInfo DefaultAlgorithm::chooseTask()
 
 void DefaultAlgorithm::liberate(const LaunchInfo& info)
 {
-  const Resource& r = info.worker.resource;
-  unsigned int index = info.worker.index;
   const ContainerType& ctype = info.worker.type;
-  std::list<ResourceLoadInfo>::iterator it = std::find(_resources.begin(),
-                                                       _resources.end(),
-                                                       r);
-  it->free(ctype, index); // we are sure to find it
+  if(!ctype.ignoreResources)
+  {
+    const Resource& r = info.worker.resource;
+    unsigned int index = info.worker.index;
+    std::list<ResourceLoadInfo>::iterator it = std::find(_resources.begin(),
+                                                        _resources.end(),
+                                                        r);
+    it->free(ctype, index); // we are sure to find it
+  }
 }
 
 // ResourceInfoForContainer
index 3dd7c93d08e67b4617b939a97316f726c5c26b76..9724f342c01aab82b408a40486c3f8a54ce61d5f 100644 (file)
@@ -25,7 +25,11 @@ namespace WorkloadManager
 {
   struct ContainerType
   {
-    float neededCores = 0.0; // needed by WorkloadManager
+    // parameters needed by WorkloadManager
+    float neededCores = 0.0;
+    bool ignoreResources = false; // if true, the task can be run as soon as
+                                  // added to the manager without any resource
+                                  // allocation
     // parameters for client use, used by WorkloadManager to distinguish objects
     std::string name;
     int id = 0;
@@ -40,10 +44,10 @@ namespace WorkloadManager
   
   struct Resource
   {
-    unsigned int nbCores; // needed by WorkloadManager
+    unsigned int nbCores = 0; // needed by WorkloadManager
     // parameters for client use, used by WorkloadManager to distinguish objects
     std::string name;
-    int id;
+    int id = 0;
     bool operator==(const Resource& other)const
     { return id == other.id && name == other.name;}
     bool operator<(const Resource& other)const
@@ -53,7 +57,7 @@ namespace WorkloadManager
     }
   };
   
-  struct Container
+  struct RunInfo
   {
     ContainerType type;
     Resource resource;
@@ -68,7 +72,14 @@ namespace WorkloadManager
   public:
     virtual ~Task(){};
     virtual const ContainerType& type()const =0;
-    virtual void run(const Container& c)=0;
+    virtual void run(const RunInfo& c)=0;
+
+    // Is it possible to run the task on this resource?
+    virtual bool isAccepted(const Resource& r)
+    {
+      // by default, a task can be run on any resource.
+      return true;
+    }
   };
 }
 
index a550959518f2c02420839aab2746375d33d74723..e1bb76c56fd658420910347f6a6b4413ba230b91 100644 (file)
@@ -55,7 +55,7 @@ class MyTask;
 class AbstractChecker
 {
 public:
-  virtual void check(const WorkloadManager::Container& c, MyTask* t)=0;
+  virtual void check(const WorkloadManager::RunInfo& c, MyTask* t)=0;
 };
 
 template <std::size_t size_R, std::size_t size_T>
@@ -63,7 +63,7 @@ class Checker : public AbstractChecker
 {
 public:
   Checker();
-  void check(const WorkloadManager::Container& c, MyTask* t)override;
+  void check(const WorkloadManager::RunInfo& c, MyTask* t)override;
   void globalCheck();
   void reset();
 
@@ -78,7 +78,7 @@ class MyTask : public WorkloadManager::Task
 {
 public:
   const WorkloadManager::ContainerType& type()const override {return *_type;}
-  void run(const WorkloadManager::Container& c)override
+  void run(const WorkloadManager::RunInfo& c)override
   {
     _check->check(c, this);
 
@@ -131,7 +131,7 @@ Checker<size_R, size_T>::Checker()
 }
 
 template <std::size_t size_R, std::size_t size_T>
-void Checker<size_R, size_T>::check(const WorkloadManager::Container& c,
+void Checker<size_R, size_T>::check(const WorkloadManager::RunInfo& c,
                                     MyTask* t)
 {
   std::unique_lock<std::mutex> lock(_mutex);
@@ -172,9 +172,11 @@ class MyTest: public CppUnit::TestFixture
 {
   CPPUNIT_TEST_SUITE(MyTest);
   CPPUNIT_TEST(atest);
+  CPPUNIT_TEST(btest);
   CPPUNIT_TEST_SUITE_END();
 public:
   void atest();
+  void btest(); // ignore resources
 };
 
 void MyTest::atest()
@@ -264,6 +266,33 @@ void MyTest::atest()
 
 }
 
+void MyTest::btest()
+{
+  Checker<1, 1> check;
+  WorkloadManager::ContainerType ctype;
+  ctype.ignoreResources = true;
+  constexpr std::size_t tasksNumber = 20;
+  MyTask tasks[tasksNumber];
+  for(std::size_t i = 0; i < tasksNumber; i++)
+    tasks[i].reset(i, &ctype, 1, &check);
+  WorkloadManager::DefaultAlgorithm algo;
+  WorkloadManager::WorkloadManager wlm(algo);
+  // no resource needed
+  std::chrono::steady_clock::time_point start_time;
+  std::chrono::steady_clock::time_point end_time;
+  std::chrono::seconds duration;
+  start_time = std::chrono::steady_clock::now();
+  wlm.start();
+  for(std::size_t i = 0; i < tasksNumber; i++)
+    wlm.addTask(&tasks[i]);
+  wlm.stop();
+  end_time = std::chrono::steady_clock::now();
+  duration = std::chrono::duration_cast<std::chrono::seconds>
+             (end_time - start_time);
+  std::chrono::seconds maxExpectedDuration(2);
+  CPPUNIT_ASSERT( duration <= maxExpectedDuration);
+}
+
 CPPUNIT_TEST_SUITE_REGISTRATION(MyTest);
 
 #include "BasicMainTest.hxx"
index 064747faf2d5898e7522c1593e207b66bf3214c5..92b587f7a54663472eeb2ac48227b8b87b39c26d 100644 (file)
@@ -29,7 +29,7 @@ public:
   struct LaunchInfo
   {
     bool taskFound=false;
-    Container worker;
+    RunInfo worker;
     Task* task=nullptr;
   };
 
index f3896ca52cbfd756a028d4220e5ecb3c5c13d927..5b1daa63ae290b15d57bebbb7705a3ad826aa3fb 100644 (file)
@@ -18,7 +18,6 @@
 //
 #include "WorkloadManager.hxx"
 #include "Task.hxx"
-//#include "Container.hxx"
 
 namespace WorkloadManager
 {
index 6ea538cb95af41dab12e144abe2625fba142baf3..734de057a4cd8add2bb953691d4a19fcd3ddabfe 100755 (executable)
@@ -1391,6 +1391,7 @@ assert(my_dpl_localization[0][1]>=0 and my_dpl_localization[0][1]<3)
     ex.RunW(p,0)
     self.assertEqual(p.getState(),pilot.DONE)
 
+    self.skipTest("Skip HPContainertest")
     # run remote
     p=l.load(fname)
     pg=pilot.PlayGround()