From 8248cd58d0098c386a11e34f84542ea5297cb6ec Mon Sep 17 00:00:00 2001 From: Ovidiu Mircescu Date: Thu, 7 May 2020 16:54:51 +0200 Subject: [PATCH] Work in progress : workload manager step 2 Workload manager recommendations are used by python nodes if their containers have the property "multi". Test YACS_YacsLoaderTest_swig randomly fails, but this is the same behaviour as before - more work should be done on this item. --- src/engine/Container.cxx | 12 +++++++ src/engine/Container.hxx | 4 +++ src/engine/Executor.cxx | 4 ++- src/runtime/PythonNode.cxx | 33 +++++++++++++++++ src/runtime/PythonNode.hxx | 8 +++++ src/runtime/RuntimeSALOME.cxx | 39 +++++++++++++-------- src/runtime/SalomeContainer.cxx | 22 +++++++++++- src/runtime/SalomeContainer.hxx | 6 +++- src/workloadmanager/Task.hxx | 2 +- src/yacsloader_swig/Test/testSaveLoadRun.py | 4 +-- 10 files changed, 114 insertions(+), 20 deletions(-) diff --git a/src/engine/Container.cxx b/src/engine/Container.cxx index 65efe968b..7dbbac645 100644 --- a/src/engine/Container.cxx +++ b/src/engine/Container.cxx @@ -47,6 +47,18 @@ std::string Container::getDiscreminantStrOfThis(const Task *askingNode) const return oss.str(); } +void Container::start(const Task *askingNode, + const std::string& resource_name, + const std::string& container_name) +{ + return start(askingNode); +} + +bool Container::canAcceptImposedResource() +{ + return false; +} + /*! * If \a val is equal to true the current container 'this' is not destined to be deeply copied on clone call. * If \a val is equal to false the current container 'this' is destined to be deeply copied on clone call. diff --git a/src/engine/Container.hxx b/src/engine/Container.hxx index 3c752a543..9ec9e594d 100644 --- a/src/engine/Container.hxx +++ b/src/engine/Container.hxx @@ -51,6 +51,10 @@ namespace YACS virtual std::string getDiscreminantStrOfThis(const Task *askingNode) const; virtual bool isAlreadyStarted(const Task *askingNode) const = 0; virtual void start(const Task *askingNode) = 0; + virtual void start(const Task *askingNode, + const std::string& resource_name, + const std::string& container_name); + virtual bool canAcceptImposedResource(); virtual std::string getPlacementId(const Task *askingNode) const = 0; virtual std::string getFullPlacementId(const Task *askingNode) const = 0; //Edition only methods diff --git a/src/engine/Executor.cxx b/src/engine/Executor.cxx index 1a3bb64dd..453eed195 100644 --- a/src/engine/Executor.cxx +++ b/src/engine/Executor.cxx @@ -1682,6 +1682,8 @@ void loadResources(WorkloadManager::WorkloadManager& wm) id++; newResource.nbCores = res.second; wm.addResource(newResource); + std::cerr << "Add resource " << newResource.name << " with " + << newResource.nbCores << " cores." << std::endl; } } @@ -1788,7 +1790,7 @@ void Executor::newRun(Scheduler *graph,int debug, bool fromScratch) WorkloadManager::WorkloadManager wlm(algo); loadResources(wlm); wlm.start(); - + while (_toContinue) { DEBTRACE("--- executor main loop"); diff --git a/src/runtime/PythonNode.cxx b/src/runtime/PythonNode.cxx index 547d23360..4f7bd5117 100644 --- a/src/runtime/PythonNode.cxx +++ b/src/runtime/PythonNode.cxx @@ -102,6 +102,9 @@ void PythonEntry::commonRemoteLoadPart1(InlineNode *reqNode) { try { + if(!_imposedResource.empty() && !_imposedContainer.empty()) + container->start(reqNode, _imposedResource, _imposedContainer); + else container->start(reqNode); } catch(Exception& e) @@ -722,6 +725,21 @@ void PythonNode::shutdown(int level) } } +void PythonNode::imposeResource(const std::string& resource_name, + const std::string& container_name) +{ + if(!resource_name.empty() && !container_name.empty()) + { + _imposedResource = resource_name; + _imposedContainer = container_name; + } +} + +bool PythonNode::canAcceptImposedResource() +{ + return _container != nullptr && _container->canAcceptImposedResource(); +} + Node *PythonNode::simpleClone(ComposedNode *father, bool editionOnly) const { return new PythonNode(*this,father); @@ -1332,3 +1350,18 @@ void PyFuncNode::shutdown(int level) } } +void PyFuncNode::imposeResource(const std::string& resource_name, + const std::string& container_name) +{ + if(!resource_name.empty() && !container_name.empty()) + { + _imposedResource = resource_name; + _imposedContainer = container_name; + } +} + +bool PyFuncNode::canAcceptImposedResource() +{ + return _container != nullptr && _container->canAcceptImposedResource(); +} + diff --git a/src/runtime/PythonNode.hxx b/src/runtime/PythonNode.hxx index def2b876f..d90c17b29 100644 --- a/src/runtime/PythonNode.hxx +++ b/src/runtime/PythonNode.hxx @@ -55,6 +55,8 @@ namespace YACS PyObject *_pyfuncSer; PyObject *_pyfuncUnser; PyObject *_pyfuncSimpleSer; + std::string _imposedResource; + std::string _imposedContainer; public: static const char SCRIPT_FOR_SIMPLE_SERIALIZATION[]; }; @@ -81,6 +83,9 @@ namespace YACS virtual void executeRemote(); virtual void executeLocal(); virtual void shutdown(int level); + void imposeResource(const std::string& resource_name, + const std::string& container_name) override; + bool canAcceptImposedResource()override; std::string getContainerLog(); PythonNode* cloneNode(const std::string& name); virtual std::string typeName() { return "YACS__ENGINE__PythonNode"; } @@ -125,6 +130,9 @@ namespace YACS virtual void executeRemote(); virtual void executeLocal(); virtual void shutdown(int level); + void imposeResource(const std::string& resource_name, + const std::string& container_name) override; + bool canAcceptImposedResource()override; std::string getContainerLog(); PyFuncNode* cloneNode(const std::string& name); virtual std::string typeName() { return "YACS__ENGINE__PyFuncNode"; } diff --git a/src/runtime/RuntimeSALOME.cxx b/src/runtime/RuntimeSALOME.cxx index 94b205986..5bcca56fc 100644 --- a/src/runtime/RuntimeSALOME.cxx +++ b/src/runtime/RuntimeSALOME.cxx @@ -465,22 +465,33 @@ std::vector< std::pair > RuntimeSALOME::getCatalogOfComputeNode if(CORBA::is_nil(resManager)) throw Exception("SalomeContainerToolsSpreadOverTheResDecorator::getParameters : Internal error ! The entry attached to the res manager in NS does not have right type !"); std::vector< std::pair > ret; + Engines::ResourceParameters params; + params.name = ""; + params.hostname = ""; + params.OS = ""; + params.nb_proc = 0; + params.mem_mb = 0; + params.cpu_clock = 0; + params.nb_node = 0; + params.nb_proc_per_node = 0; + params.policy = ""; + params.can_launch_batch_jobs = false; + params.can_run_containers = true; + params.componentList.length(0); + Engines::ResourceList_var resourceList; + resourceList = resManager->GetFittingResources(params); + ret.reserve(resourceList->length()); + for(int i = 0; ilength(); i++) { - Engines::ResourceList *rl(0); - Engines::IntegerList *il(0); - resManager->ListAllAvailableResources(rl,il); - int sz(rl->length()); - if(il->length()!=sz) - throw Exception("SalomeContainerToolsSpreadOverTheResDecorator::getParameters : Internal error ! Invalid size !"); - ret.resize(sz); - for(int i=0;i(s,(*il)[i]); - } - delete rl; - delete il; + const char* resource_name = resourceList[i]; + std::string std_resource_name = resource_name; + Engines::ResourceDefinition_var resource_definition + = resManager->GetResourceDefinition(resource_name); + int nb_cores = resource_definition->nb_node * + resource_definition->nb_proc_per_node; + ret.push_back(std::pair(resource_name, nb_cores)); } + return ret; } diff --git a/src/runtime/SalomeContainer.cxx b/src/runtime/SalomeContainer.cxx index 19714c9bf..150b52048 100644 --- a/src/runtime/SalomeContainer.cxx +++ b/src/runtime/SalomeContainer.cxx @@ -221,11 +221,31 @@ Engines::Container_ptr SalomeContainer::getContainerPtr(const Task *askingNode) /*! * \param inst the component instance */ -void SalomeContainer::start(const Task *askingNode) +void SalomeContainer::start(const Task *askingNode) { SalomeContainerTools::Start(_componentNames,_launchModeType,_sct,_shutdownLevel,this,askingNode); } +void SalomeContainer::start(const Task *askingNode, + const std::string& resource_name, + const std::string& container_name) +{ + if(canAcceptImposedResource()) + { + SalomeContainerTools tempSct = _sct; + tempSct.setProperty("name", resource_name); + tempSct.setProperty("container_name", container_name); + SalomeContainerTools::Start(_componentNames,_launchModeType,tempSct,_shutdownLevel,this,askingNode); + } + else + start(askingNode); +} + +bool SalomeContainer::canAcceptImposedResource() +{ + return _launchModeType->getType() == SalomeContainerMultiHelper::TYPE_NAME; +} + void SalomeContainer::shutdown(int level) { DEBTRACE("SalomeContainer::shutdown: " << _name << "," << level << "," << _shutdownLevel); diff --git a/src/runtime/SalomeContainer.hxx b/src/runtime/SalomeContainer.hxx index 66814aa5d..900d3e0f9 100644 --- a/src/runtime/SalomeContainer.hxx +++ b/src/runtime/SalomeContainer.hxx @@ -51,7 +51,11 @@ namespace YACS std::string getKind() const; bool isAlreadyStarted(const Task *askingNode) const; Engines::Container_ptr getContainerPtr(const Task *askingNode) const; - void start(const Task *askingNode) ; + void start(const Task *askingNode) override; + void start(const Task *askingNode, + const std::string& resource_name, + const std::string& container_name) override; + bool canAcceptImposedResource() override; Container *clone() const; Container *cloneAlways() const; std::string getPlacementId(const Task *askingNode) const; diff --git a/src/workloadmanager/Task.hxx b/src/workloadmanager/Task.hxx index 9724f342c..87c467798 100644 --- a/src/workloadmanager/Task.hxx +++ b/src/workloadmanager/Task.hxx @@ -61,7 +61,7 @@ namespace WorkloadManager { ContainerType type; Resource resource; - unsigned int index; // worker index on the resource for this type + unsigned int index=0; // worker index on the resource for this type }; /** diff --git a/src/yacsloader_swig/Test/testSaveLoadRun.py b/src/yacsloader_swig/Test/testSaveLoadRun.py index 6ea538cb9..ad0632238 100755 --- a/src/yacsloader_swig/Test/testSaveLoadRun.py +++ b/src/yacsloader_swig/Test/testSaveLoadRun.py @@ -1511,7 +1511,7 @@ dd=range(10)""") n1.edAddChild(n10) n10.setScript(""" import time -time.sleep(2) +time.sleep(4) o2=2*i1 """) i1=n10.edAddInputPort("i1",ti) @@ -1542,7 +1542,7 @@ o2=2*i1 myRun=threading.Thread(None, ex.RunW, None, (p,0)) myRun.start() import time - time.sleep(5) + time.sleep(7) SALOMERuntime.schemaSaveState(p, ex, xmlStateFileName) a,b,c=n1.getPassedResults(ex) myRun.join() -- 2.39.2