From: Ovidiu Mircescu Date: Mon, 21 Dec 2020 11:00:50 +0000 (+0100) Subject: Generate an error when no resource can run a task with the workload manager. X-Git-Tag: V9_7_0a1~6 X-Git-Url: http://git.salome-platform.org/gitweb/?a=commitdiff_plain;h=7a808eb18fce9d9ec2b8271763b397a9ec1fd604;p=modules%2Fyacs.git Generate an error when no resource can run a task with the workload manager. --- diff --git a/src/engine/Executor.cxx b/src/engine/Executor.cxx index f16453f4f..b68289d6d 100644 --- a/src/engine/Executor.cxx +++ b/src/engine/Executor.cxx @@ -1509,6 +1509,17 @@ void Executor::endTask(Task *task, YACS::Event ev) wakeUp(); } +void Executor::failTask(Task *task, const std::string& message) +{ + ElementaryNode* elemNode = dynamic_cast(task); + if(elemNode != nullptr) + { + StateLoader(elemNode, YACS::ERROR); + elemNode->setErrorDetails(message); + } + endTask(task, YACS::ABORT); +} + YACS::Event Executor::runTask(Task *task) { { // --- Critical section diff --git a/src/engine/Executor.hxx b/src/engine/Executor.hxx index f10a800b7..776009a0c 100644 --- a/src/engine/Executor.hxx +++ b/src/engine/Executor.hxx @@ -134,6 +134,7 @@ namespace YACS void makeDatastreamConnections(Task *task); void beginTask(Task *task); void endTask(Task *task, YACS::Event ev); + void failTask(Task *task, const std::string& message); //////////// protected: bool checkBreakPoints(); diff --git a/src/engine/WlmTask.cxx b/src/engine/WlmTask.cxx index 09ae95b87..1d135f9fc 100644 --- a/src/engine/WlmTask.cxx +++ b/src/engine/WlmTask.cxx @@ -57,10 +57,17 @@ const WorkloadManager::ContainerType& WlmTask::type()const void WlmTask::run(const WorkloadManager::RunInfo& runInfo) { - _executor.loadTask(_yacsTask, runInfo); - _executor.makeDatastreamConnections(_yacsTask); - YACS::Event ev = _executor.runTask(_yacsTask); - _executor.endTask(_yacsTask, ev); + if(runInfo.isOk) + { + _executor.loadTask(_yacsTask, runInfo); + _executor.makeDatastreamConnections(_yacsTask); + YACS::Event ev = _executor.runTask(_yacsTask); + _executor.endTask(_yacsTask, ev); + } + else + { + _executor.failTask(_yacsTask, runInfo.error_message); + } delete this; // provisoire } @@ -90,6 +97,7 @@ void WlmTask::loadResources(WorkloadManager::WorkloadManager& wm) newResource.nbCores = res.second; wm.addResource(newResource); } + wm.freezeResources(); } } diff --git a/src/workloadmanager/DefaultAlgorithm.cxx b/src/workloadmanager/DefaultAlgorithm.cxx index 8d69376aa..1b1896c26 100644 --- a/src/workloadmanager/DefaultAlgorithm.cxx +++ b/src/workloadmanager/DefaultAlgorithm.cxx @@ -74,6 +74,7 @@ WorkloadAlgorithm::LaunchInfo DefaultAlgorithm::chooseTask() if(itResource->isSupported(ctype) && (*itTask)->isAccepted(itResource->resource())) { + isSupported = true; if(itResource->isAllocPossible(ctype)) { float thisCost = itResource->cost(ctype); @@ -90,9 +91,12 @@ WorkloadAlgorithm::LaunchInfo DefaultAlgorithm::chooseTask() result.worker.resource = best_resource->resource(); result.worker.index = best_resource->alloc(ctype); } - else if(!isSupported) + else if(!isSupported && _resourcesFrozen) { - // TODO: This task can never be run by any available resource. + // This task can never be run by any available resource. + result.taskFound = true; + result.worker.isOk = false; + result.worker.error_message = "No resource can run this task."; } } if(result.taskFound) @@ -110,7 +114,7 @@ WorkloadAlgorithm::LaunchInfo DefaultAlgorithm::chooseTask() void DefaultAlgorithm::liberate(const LaunchInfo& info) { const ContainerType& ctype = info.worker.type; - if(!ctype.ignoreResources) + if(!ctype.ignoreResources && info.worker.isOk) { const Resource& r = info.worker.resource; unsigned int index = info.worker.index; diff --git a/src/workloadmanager/DefaultAlgorithm.hxx b/src/workloadmanager/DefaultAlgorithm.hxx index 4c88f3084..6169d535e 100644 --- a/src/workloadmanager/DefaultAlgorithm.hxx +++ b/src/workloadmanager/DefaultAlgorithm.hxx @@ -38,6 +38,7 @@ public: LaunchInfo chooseTask()override; void liberate(const LaunchInfo& info)override; bool empty()const override; + void freezeResources() override { _resourcesFrozen = true;} // ----------------------------- PRIVATE ----------------------------- // private: @@ -87,6 +88,7 @@ private: private: std::list _resources; std::list _waitingTasks; + bool _resourcesFrozen = false; }; } #endif // ALGORITHMIMPLEMENT_H diff --git a/src/workloadmanager/Task.hxx b/src/workloadmanager/Task.hxx index 71dc10eed..f00c8408a 100644 --- a/src/workloadmanager/Task.hxx +++ b/src/workloadmanager/Task.hxx @@ -64,6 +64,8 @@ namespace WorkloadManager ContainerType type; Resource resource; unsigned int index=0; // worker index on the resource for this type + bool isOk = true; + std::string error_message = ""; }; /** diff --git a/src/workloadmanager/Test/TestMain.cxx b/src/workloadmanager/Test/TestMain.cxx index 8f9cca00d..ca73f9200 100644 --- a/src/workloadmanager/Test/TestMain.cxx +++ b/src/workloadmanager/Test/TestMain.cxx @@ -173,10 +173,12 @@ class MyTest: public CppUnit::TestFixture CPPUNIT_TEST_SUITE(MyTest); CPPUNIT_TEST(atest); CPPUNIT_TEST(btest); + CPPUNIT_TEST(ctest); CPPUNIT_TEST_SUITE_END(); public: void atest(); void btest(); // ignore resources + void ctest(); // no available resource }; /** @@ -305,6 +307,70 @@ void MyTest::btest() CPPUNIT_ASSERT( duration <= maxExpectedDuration); } +/** + * Test the case of a task which need more cores than any resource has. + */ +class ErrorTask : public WorkloadManager::Task +{ +public: + ErrorTask(int nb_cores): WorkloadManager::Task(), _type(), _ok(), _message() + { + _type.ignoreResources = false; + _type.neededCores = nb_cores; + } + + const WorkloadManager::ContainerType& type()const override {return _type;} + + void run(const WorkloadManager::RunInfo& c)override + { + _ok = c.isOk; + _message = c.error_message; + } + + bool checkState(bool ok, const std::string& message) + { + return (ok == _ok) && (message == _message); + } + +private: + WorkloadManager::ContainerType _type; + bool _ok; + std::string _message; +}; + +void MyTest::ctest() +{ + WorkloadManager::Resource r; + r.id = 1; + r.name = "r1"; + r.nbCores = 1; + ErrorTask t1(1), t2(10); + WorkloadManager::DefaultAlgorithm algo; + WorkloadManager::WorkloadManager wlm(algo); + wlm.addResource(r); + wlm.addTask(&t1); + wlm.addTask(&t2); + wlm.start(); + wlm.stop(); + CPPUNIT_ASSERT(t1.checkState(true, "")); + CPPUNIT_ASSERT(t2.checkState(false, "No resource can run this task.")); + // no error mode: wait for a resource to be added + WorkloadManager::DefaultAlgorithm algo_noerror; + WorkloadManager::WorkloadManager wlm2(algo_noerror); + wlm2.addResource(r); + wlm2.addTask(&t1); + wlm2.addTask(&t2); + wlm2.start(); + std::this_thread::sleep_for(std::chrono::seconds(1)); + r.id = 2; + r.name = "r2"; + r.nbCores = 20; + wlm2.addResource(r); + wlm2.stop(); + CPPUNIT_ASSERT(t1.checkState(true, "")); + CPPUNIT_ASSERT(t2.checkState(true, "")); +} + CPPUNIT_TEST_SUITE_REGISTRATION(MyTest); #include "BasicMainTest.hxx" diff --git a/src/workloadmanager/WorkloadAlgorithm.hxx b/src/workloadmanager/WorkloadAlgorithm.hxx index 01683ccf0..66804ee95 100644 --- a/src/workloadmanager/WorkloadAlgorithm.hxx +++ b/src/workloadmanager/WorkloadAlgorithm.hxx @@ -39,6 +39,7 @@ public: virtual LaunchInfo chooseTask()=0; virtual void liberate(const LaunchInfo& info)=0; virtual bool empty()const =0; + virtual void freezeResources()=0; }; } #endif // WORKLOADALGORITHM_H diff --git a/src/workloadmanager/WorkloadManager.cxx b/src/workloadmanager/WorkloadManager.cxx index d0ac55609..40d6b7e9e 100644 --- a/src/workloadmanager/WorkloadManager.cxx +++ b/src/workloadmanager/WorkloadManager.cxx @@ -28,7 +28,7 @@ namespace WorkloadManager , _data_mutex() , _startCondition() , _endCondition() - , _stop(false) + , _stop(true) , _otherThreads() , _algo(algo) { @@ -46,6 +46,13 @@ namespace WorkloadManager _startCondition.notify_one(); } + void WorkloadManager::freezeResources() + { + std::unique_lock lock(_data_mutex); + _algo.freezeResources(); + _startCondition.notify_one(); + } + void WorkloadManager::addTask(Task* t) { std::unique_lock lock(_data_mutex); @@ -57,6 +64,8 @@ namespace WorkloadManager { { std::unique_lock lock(_data_mutex); + if(!_stop) + return; // already started _stop = false; } _otherThreads.emplace_back(std::async(std::launch::async, [this] @@ -74,6 +83,7 @@ namespace WorkloadManager { std::unique_lock lock(_data_mutex); _stop = true; + _algo.freezeResources(); } _startCondition.notify_one(); _endCondition.notify_one(); diff --git a/src/workloadmanager/WorkloadManager.hxx b/src/workloadmanager/WorkloadManager.hxx index e971d1ea6..86c25afa3 100644 --- a/src/workloadmanager/WorkloadManager.hxx +++ b/src/workloadmanager/WorkloadManager.hxx @@ -40,6 +40,7 @@ namespace WorkloadManager ~WorkloadManager(); void addTask(Task* t); void addResource(const Resource& r); + void freezeResources(); //! no more resources can be added void start(); //! start execution void stop(); //! stop execution diff --git a/src/yacsloader_swig/Test/testWorkloadManager.py b/src/yacsloader_swig/Test/testWorkloadManager.py index 334dcd2af..c24d59cd7 100755 --- a/src/yacsloader_swig/Test/testWorkloadManager.py +++ b/src/yacsloader_swig/Test/testWorkloadManager.py @@ -109,6 +109,14 @@ class TestEdit(unittest.TestCase): err_message = proc.getChildByName("End").getOutputPort("err_message").getPyObj() self.fail(err_message) + def test4(self): + """ Verify the execution is stoped if no resource can run a task. + """ + proc = self.l.load("samples/wlm_error.xml") + self.e.RunW(proc,0) + self.assertEqual(proc.getState(),pilot.FAILED) + self.assertEqual(proc.getChildByName("ErrorNode").getState(),pilot.ERROR) + if __name__ == '__main__': dir_test = tempfile.mkdtemp(suffix=".yacstest") file_test = os.path.join(dir_test,"UnitTestsResult")