Salome HOME
Generate an error when no resource can run a task with the workload manager.
authorOvidiu Mircescu <ovidiu.mircescu@edf.fr>
Mon, 21 Dec 2020 11:00:50 +0000 (12:00 +0100)
committerOvidiu Mircescu <ovidiu.mircescu@edf.fr>
Mon, 21 Dec 2020 11:00:50 +0000 (12:00 +0100)
src/engine/Executor.cxx
src/engine/Executor.hxx
src/engine/WlmTask.cxx
src/workloadmanager/DefaultAlgorithm.cxx
src/workloadmanager/DefaultAlgorithm.hxx
src/workloadmanager/Task.hxx
src/workloadmanager/Test/TestMain.cxx
src/workloadmanager/WorkloadAlgorithm.hxx
src/workloadmanager/WorkloadManager.cxx
src/workloadmanager/WorkloadManager.hxx
src/yacsloader_swig/Test/testWorkloadManager.py

index f16453f4f4e65cef63650db3557d643068afe146..b68289d6ddecabeec972170ac3448a8e0fce34be 100644 (file)
@@ -1509,6 +1509,17 @@ void Executor::endTask(Task *task, YACS::Event ev)
     wakeUp();
 }
 
+void Executor::failTask(Task *task, const std::string& message)
+{
+  ElementaryNode* elemNode = dynamic_cast<ElementaryNode*>(task);
+  if(elemNode != nullptr)
+  {
+    StateLoader(elemNode, YACS::ERROR);
+    elemNode->setErrorDetails(message);
+  }
+  endTask(task, YACS::ABORT);
+}
+
 YACS::Event  Executor::runTask(Task *task)
 {
   { // --- Critical section
index f10a800b7d7e9add88c990af4e47e5df30b52345..776009a0cecd2f2386e801f7aa607ede34f10bdd 100644 (file)
@@ -134,6 +134,7 @@ namespace YACS
       void makeDatastreamConnections(Task *task);
       void beginTask(Task *task);
       void endTask(Task *task, YACS::Event ev);
+      void failTask(Task *task, const std::string& message);
       ////////////
     protected:
       bool checkBreakPoints();
index 09ae95b87c5f5fb54d65dc8874ee547640aa23a4..1d135f9fc49259eabaa95d341830b2592701c76e 100644 (file)
@@ -57,10 +57,17 @@ const WorkloadManager::ContainerType& WlmTask::type()const
 
 void WlmTask::run(const WorkloadManager::RunInfo& runInfo)
 {
-  _executor.loadTask(_yacsTask, runInfo);
-  _executor.makeDatastreamConnections(_yacsTask);
-  YACS::Event ev = _executor.runTask(_yacsTask);
-  _executor.endTask(_yacsTask, ev);
+  if(runInfo.isOk)
+  {
+    _executor.loadTask(_yacsTask, runInfo);
+    _executor.makeDatastreamConnections(_yacsTask);
+    YACS::Event ev = _executor.runTask(_yacsTask);
+    _executor.endTask(_yacsTask, ev);
+  }
+  else
+  {
+    _executor.failTask(_yacsTask, runInfo.error_message);
+  }
   delete this; // provisoire
 }
 
@@ -90,6 +97,7 @@ void WlmTask::loadResources(WorkloadManager::WorkloadManager& wm)
     newResource.nbCores = res.second;
     wm.addResource(newResource);
   }
+  wm.freezeResources();
 }
 
 }
index 8d69376aa7ac2d2bafb54180a29b02c660ba229c..1b1896c26c301149d19c2c6c3c71ee2102bb2725 100644 (file)
@@ -74,6 +74,7 @@ WorkloadAlgorithm::LaunchInfo DefaultAlgorithm::chooseTask()
         if(itResource->isSupported(ctype)
             && (*itTask)->isAccepted(itResource->resource()))
         {
+          isSupported = true;
           if(itResource->isAllocPossible(ctype))
           {
             float thisCost = itResource->cost(ctype);
@@ -90,9 +91,12 @@ WorkloadAlgorithm::LaunchInfo DefaultAlgorithm::chooseTask()
         result.worker.resource = best_resource->resource();
         result.worker.index = best_resource->alloc(ctype);
       }
-      else if(!isSupported)
+      else if(!isSupported && _resourcesFrozen)
       {
-        // TODO: This task can never be run by any available resource.
+        // This task can never be run by any available resource.
+        result.taskFound = true;
+        result.worker.isOk = false;
+        result.worker.error_message = "No resource can run this task.";
       }
     }
     if(result.taskFound)
@@ -110,7 +114,7 @@ WorkloadAlgorithm::LaunchInfo DefaultAlgorithm::chooseTask()
 void DefaultAlgorithm::liberate(const LaunchInfo& info)
 {
   const ContainerType& ctype = info.worker.type;
-  if(!ctype.ignoreResources)
+  if(!ctype.ignoreResources && info.worker.isOk)
   {
     const Resource& r = info.worker.resource;
     unsigned int index = info.worker.index;
index 4c88f308475442a16e2338b3b4a373114d846124..6169d535e8d015e9a2b6e2ad8c26924c0f3fc6f6 100644 (file)
@@ -38,6 +38,7 @@ public:
   LaunchInfo chooseTask()override;
   void liberate(const LaunchInfo& info)override;
   bool empty()const override;
+  void freezeResources() override { _resourcesFrozen = true;}
 
 // ----------------------------- PRIVATE ----------------------------- //
 private:
@@ -87,6 +88,7 @@ private:
 private:
   std::list<ResourceLoadInfo> _resources;
   std::list<Task*> _waitingTasks;
+  bool _resourcesFrozen = false;
 };
 }
 #endif // ALGORITHMIMPLEMENT_H
index 71dc10eedbabbe24d812534e1ac4384ea0974d02..f00c8408a9828cb646828d6c44d114dc62b86087 100644 (file)
@@ -64,6 +64,8 @@ namespace WorkloadManager
     ContainerType type;
     Resource resource;
     unsigned int index=0; // worker index on the resource for this type
+    bool isOk = true;
+    std::string error_message = "";
   };
 
   /**
index 8f9cca00dd8f1ad237cd983ed953708b7e80252a..ca73f920040d83b82e31eef8e99826180efd1d5f 100644 (file)
@@ -173,10 +173,12 @@ class MyTest: public CppUnit::TestFixture
   CPPUNIT_TEST_SUITE(MyTest);
   CPPUNIT_TEST(atest);
   CPPUNIT_TEST(btest);
+  CPPUNIT_TEST(ctest);
   CPPUNIT_TEST_SUITE_END();
 public:
   void atest();
   void btest(); // ignore resources
+  void ctest(); // no available resource
 };
 
 /**
@@ -305,6 +307,70 @@ void MyTest::btest()
   CPPUNIT_ASSERT( duration <= maxExpectedDuration);
 }
 
+/**
+ * Test the case of a task which need more cores than any resource has.
+ */
+class ErrorTask : public WorkloadManager::Task
+{
+public:
+  ErrorTask(int nb_cores): WorkloadManager::Task(), _type(), _ok(), _message()
+  {
+    _type.ignoreResources = false;
+    _type.neededCores = nb_cores;
+  }
+
+  const WorkloadManager::ContainerType& type()const override {return _type;}
+
+  void run(const WorkloadManager::RunInfo& c)override
+  {
+    _ok = c.isOk;
+    _message = c.error_message;
+  }
+
+  bool checkState(bool ok, const std::string& message)
+  {
+    return (ok == _ok) && (message == _message);
+  }
+
+private:
+  WorkloadManager::ContainerType _type;
+  bool _ok;
+  std::string _message;
+};
+
+void MyTest::ctest()
+{
+  WorkloadManager::Resource r;
+  r.id = 1;
+  r.name = "r1";
+  r.nbCores = 1;
+  ErrorTask t1(1), t2(10);
+  WorkloadManager::DefaultAlgorithm algo;
+  WorkloadManager::WorkloadManager wlm(algo);
+  wlm.addResource(r);
+  wlm.addTask(&t1);
+  wlm.addTask(&t2);
+  wlm.start();
+  wlm.stop();
+  CPPUNIT_ASSERT(t1.checkState(true, ""));
+  CPPUNIT_ASSERT(t2.checkState(false, "No resource can run this task."));
+  // no error mode: wait for a resource to be added
+  WorkloadManager::DefaultAlgorithm algo_noerror;
+  WorkloadManager::WorkloadManager wlm2(algo_noerror);
+  wlm2.addResource(r);
+  wlm2.addTask(&t1);
+  wlm2.addTask(&t2);
+  wlm2.start();
+  std::this_thread::sleep_for(std::chrono::seconds(1));
+  r.id = 2;
+  r.name = "r2";
+  r.nbCores = 20;
+  wlm2.addResource(r);
+  wlm2.stop();
+  CPPUNIT_ASSERT(t1.checkState(true, ""));
+  CPPUNIT_ASSERT(t2.checkState(true, ""));
+}
+
 CPPUNIT_TEST_SUITE_REGISTRATION(MyTest);
 
 #include "BasicMainTest.hxx"
index 01683ccf08a28f593f0f4d2cbf257d6e24985b71..66804ee95a98d07606d970ca19b5f733f944f224 100644 (file)
@@ -39,6 +39,7 @@ public:
   virtual LaunchInfo chooseTask()=0;
   virtual void liberate(const LaunchInfo& info)=0;
   virtual bool empty()const =0;
+  virtual void freezeResources()=0;
 };
 }
 #endif // WORKLOADALGORITHM_H
index d0ac55609844338209b263123c1122551828b85f..40d6b7e9edc5387ffac1a14950ae559b57923667 100644 (file)
@@ -28,7 +28,7 @@ namespace WorkloadManager
   , _data_mutex()
   , _startCondition()
   , _endCondition()
-  , _stop(false)
+  , _stop(true)
   , _otherThreads()
   , _algo(algo)
   {
@@ -46,6 +46,13 @@ namespace WorkloadManager
     _startCondition.notify_one();
   }
 
+  void WorkloadManager::freezeResources()
+  {
+    std::unique_lock<std::mutex> lock(_data_mutex);
+    _algo.freezeResources();
+    _startCondition.notify_one();
+  }
+
   void WorkloadManager::addTask(Task* t)
   {
     std::unique_lock<std::mutex> lock(_data_mutex);
@@ -57,6 +64,8 @@ namespace WorkloadManager
   {
     {
       std::unique_lock<std::mutex> lock(_data_mutex);
+      if(!_stop)
+        return; // already started
       _stop = false;
     }
     _otherThreads.emplace_back(std::async(std::launch::async, [this]
@@ -74,6 +83,7 @@ namespace WorkloadManager
     {
       std::unique_lock<std::mutex> lock(_data_mutex);
       _stop = true;
+      _algo.freezeResources();
     }
     _startCondition.notify_one();
     _endCondition.notify_one();
index e971d1ea6293b467c933983b3327a23f629fd0db..86c25afa3cd51be3813fc12c792210b6024ad335 100644 (file)
@@ -40,6 +40,7 @@ namespace WorkloadManager
     ~WorkloadManager();
     void addTask(Task* t);
     void addResource(const Resource& r);
+    void freezeResources(); //! no more resources can be added
     void start(); //! start execution
     void stop(); //! stop execution
 
index 334dcd2af604110ef90388c7c5b37180060ae05e..c24d59cd7aafd53a7654b5c7592949c46910772d 100755 (executable)
@@ -109,6 +109,14 @@ class TestEdit(unittest.TestCase):
           err_message = proc.getChildByName("End").getOutputPort("err_message").getPyObj()
           self.fail(err_message)
 
+    def test4(self):
+        """ Verify the execution is stoped if no resource can run a task.
+        """
+        proc = self.l.load("samples/wlm_error.xml")
+        self.e.RunW(proc,0)
+        self.assertEqual(proc.getState(),pilot.FAILED)
+        self.assertEqual(proc.getChildByName("ErrorNode").getState(),pilot.ERROR)
+
 if __name__ == '__main__':
   dir_test = tempfile.mkdtemp(suffix=".yacstest")
   file_test = os.path.join(dir_test,"UnitTestsResult")