From: Ovidiu Mircescu Date: Tue, 17 Jan 2023 15:57:31 +0000 (+0100) Subject: Code review. X-Git-Tag: V9_11_0a1 X-Git-Url: http://git.salome-platform.org/gitweb/?a=commitdiff_plain;h=72004af6c9c1a8e025ea7f929cd796927dcea83b;p=tools%2Fydefx.git Code review. --- diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index fb26fe4..6045f2b 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -26,6 +26,7 @@ ENDIF(SALOME_BUILD_TESTS) ADD_SUBDIRECTORY(cpp) ADD_SUBDIRECTORY(pydefx) +ADD_SUBDIRECTORY(pytools) IF(YDEFX_BUILD_GUI) ADD_SUBDIRECTORY(gui) ENDIF(YDEFX_BUILD_GUI) diff --git a/src/pydefx/CMakeLists.txt b/src/pydefx/CMakeLists.txt index 3617457..f5a2ca3 100644 --- a/src/pydefx/CMakeLists.txt +++ b/src/pydefx/CMakeLists.txt @@ -39,6 +39,5 @@ SET(SCRIPTS ) INSTALL(FILES ${SCRIPTS} DESTINATION ${SALOME_INSTALL_PYTHON}/pydefx) -install(FILES mpmcn.py DESTINATION ${SALOME_INSTALL_PYTHON}) ADD_SUBDIRECTORY(schemas) ADD_SUBDIRECTORY(plugins) diff --git a/src/pydefx/configuration.py b/src/pydefx/configuration.py index 210a205..6852b75 100644 --- a/src/pydefx/configuration.py +++ b/src/pydefx/configuration.py @@ -29,22 +29,26 @@ def defaultWorkingDir(resource): return resource_definition.working_directory def defaultNbBranches(resource): + """ + Return the number of cores available on a resource. + """ resManager = salome_proxy.getResourcesManager() resource_definition = resManager.GetResourceDefinition(resource) - ret = resource_definition.nb_node + ret = resource_definition.nb_node * resource_definition.nb_proc_per_node if ret < 1: ret = 1 return ret - -def getNumberOfCoresForLocalhost(): - """ - Return total number of cores for all resources marked as able to run containers ( "canRunContainers" attribute - set to true in the CatalogResources.xml ). - """ - import LifeCycleCORBA - params = LifeCycleCORBA.ResourceParameters(can_run_containers=True) - resources = salome_proxy.getResourcesManager().GetFittingResources(params) - return sum( [salome_proxy.getResourcesManager().GetResourceDefinition(res).nb_proc_per_node for res in resources] ) + +def allCoresAvailable(): + """ + Return the total number of cores of all resources that can run containers. + ( "canRunContainers" attribute set to true in CatalogResources.xml ) + """ + resManager = salome_proxy.getResourcesManager() + params = salome_proxy.createSalomeParameters() + params.resource_required.can_run_containers = True + resources = resManager.GetFittingResources(params.resource_required) + return sum([defaultNbBranches(res) for res in resources ]) def defaultBaseDirectory(): """Return the default path to the root of any new result directory.""" @@ -63,11 +67,10 @@ def defaultWckey(resource="localhost"): return result def availableResources(): - """ Return the list of resources defined in the current catalog. - - Regarding list of Resources in the CatalogResources.xml this method returns those able to launch jobs. - Ydefx engine delegate to a job encapsulating itself an execution of YACS graph with driver application. - Consequently, the resource to evaluate in parallel the function should one of those returned by this method. + """ + Return the list of resources defined in the current catalog that are able to + launch jobs. + Ydefx can launch the evaluations in a job on one of these resources. """ resManager = salome_proxy.getResourcesManager() params = salome_proxy.createSalomeParameters() diff --git a/src/pydefx/mpmcn.py b/src/pydefx/mpmcn.py deleted file mode 100644 index 0c68003..0000000 --- a/src/pydefx/mpmcn.py +++ /dev/null @@ -1,154 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -# Copyright (C) 2022 EDF R&D -# -# This library is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License as published by the Free Software Foundation; either -# version 2.1 of the License, or (at your option) any later version. -# -# This library is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public -# License along with this library; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -# -# See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com - -import pydefx - -class Pool: - def __init__(self, params): - """ - :param params: parameter of the job - :type params: pydefx.Parameters - """ - self.myStudy = None - self.myScript = pydefx.PyScript() - self.mySample = None - self.myParams = params - self.removeTmpDir = False - - def getResultDirectory(self): - return self.myParams.salome_parameters.result_directory - - def map(self, func, iterable): - if len(iterable) == 0: - return [] - # - import inspect - if not inspect.isfunction(func): - raise RuntimeError("Input is expected to be a function") - import importlib - fModule = importlib.import_module(func.__module__) - if fModule == "__main__": - raise RuntimeError("Input function is expected to be part of a module") - st = None - with open(fModule.__file__,"r") as ff: - st = ff.read() - # retrieve the varname holding the function in its module - fStr = func.__code__.co_name - if fStr is None: - raise RuntimeError("Impossible to locate function in the module containing it !") - # retrieve args of the func passed in input - fArgs = inspect.getfullargspec(func).args - # agregate the content of the Python module containing the function with expected _exec function for pydfx - pyScript = """{} -def _exec({}): - yxyx = {}({}) - return yxyx -""".format(st,", ".join(fArgs),fStr,", ".join(fArgs)) - # - self.myScript.loadString(pyScript) - self.mySample = self.myScript.CreateEmptySample() - # management of the case of single input - if not hasattr(iterable[0],"__iter__"): - iterable = [[elt] for elt in iterable] - # - self.mySample.setInputValues( {k:v for k,*v in zip(fArgs,*iterable)} ) - # - self.myStudy.createNewJob(self.myScript, self.mySample, self.myParams) - # - self.myStudy.launch() - self.myStudy.wait() - # ask for result : this call implicitely copy back results to the client - self.myStudy.getResult() - # - if self.myStudy.getJobState() == "FINISHED" and self.myStudy.global_result.exit_code == "0": - messageFromSlaves = self.myStudy.sample.getMessages() - if all( [elt == "" for elt in messageFromSlaves] ): - ret = [elt for elt in zip(*[self.myStudy.sample.getOutput(n) for n in self.myStudy.sample.getOutputNames()])] - if len(self.myStudy.sample.getOutputNames()) == 1: - ret = [elt[0] for elt in ret] - self.removeTmpDir = True - return ret - else: - excMsg = "\n".join(["Error for sample # {} : \'{}\' ".format(i,elt) for i,elt in enumerate(messageFromSlaves) if elt != ""]) - excMsg += "\nDirectory containing information for debug : {}".format(self.getResultDirectory()) - exc = RuntimeError( excMsg ) - exc.tmp_dir = self.getResultDirectory() - raise exc - else: - raise RuntimeError( "Error during job submission or during the driver execution (that should never happend). Internal error : {}".format(self.myStudy.getResult().getErrors()) ) - - def __enter__(self): - self.myStudy = pydefx.PyStudy() - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - import os - from glob import glob - import shutil - if self.removeTmpDir: - for fn in glob(os.path.join(self.getResultDirectory(),"*")): - if os.path.isdir( fn ): - shutil.rmtree( fn) - else: - os.unlink( fn ) - pass - pass - -def init(resourceName, resultDirectory = "/tmp"): - """ - Instanciate a pydefx.Parameters intance that can be overriden right after. - Here some example of typical override of the returned object of this method : - - define a local directory for working files : - myParams.salome_parameters.result_directory with existing directory. If not myParams.createResultDirectory("/tmp") - - Define additionnal files necessary for computation. - They will be copied to the remote working directory. - This parameter is empty by default : - myParams.salome_parameters.in_files = [] - - Override computation ressource : - myParams.salome_parameters.resource_required.name) - - Override default # of parallel evaluation : - myParams.nb_branches - - Override number of cores requested by the job when job resource is a cluster (for job scheduler query) : - myParams.salome_parameters.resource_required.nb_proc - - Override number of computationnal nodes on cluster to be allocated when job resource is a cluster (for job scheduler query) : - myParams.salome_parameters.resource_required.nb_node - - Override working directory : - myParams.salome_parameters.work_directory - - :param resourceName: Name of the resource matching one of the ${KERNEL_ROOT_DIR}/share/salome/resources/kernel/CatalogResources.xml - :param resultDirectory: Directory used to transfer results - :return: a pydefx.Parameters instance - """ - myParams = pydefx.Parameters() - from pydefx import configuration - if resourceName not in configuration.availableResources(): - raise RuntimeError("Resource \"{}\" is not existing or not declared as able to launch job. Available resources are : {}".format(resourceName,str(configuration.availableResources()))) - myParams.configureResource(resourceName) - myParams.createResultDirectory(resultDirectory) - if resourceName == "localhost": - myParams.nb_branches = configuration.getNumberOfCoresForLocalhost() - return myParams diff --git a/src/pytools/CMakeLists.txt b/src/pytools/CMakeLists.txt new file mode 100644 index 0000000..57ac808 --- /dev/null +++ b/src/pytools/CMakeLists.txt @@ -0,0 +1,23 @@ +# Copyright (C) 2023 EDF R&D +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com +# +SET(SCRIPTS +mpmcn.py +) + +INSTALL(FILES ${SCRIPTS} DESTINATION ${SALOME_INSTALL_PYTHON}) diff --git a/src/pytools/mpmcn.py b/src/pytools/mpmcn.py new file mode 100644 index 0000000..bbbb3a2 --- /dev/null +++ b/src/pytools/mpmcn.py @@ -0,0 +1,154 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# Copyright (C) 2022 EDF R&D +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com + +import pydefx + +class Pool: + def __init__(self, params): + """ + :param params: parameter of the job + :type params: pydefx.Parameters + """ + self.myStudy = None + self.myScript = pydefx.PyScript() + self.mySample = None + self.myParams = params + self.removeTmpDir = False + + def getResultDirectory(self): + return self.myParams.salome_parameters.result_directory + + def map(self, func, iterable): + if len(iterable) == 0: + return [] + # + import inspect + if not inspect.isfunction(func): + raise RuntimeError("Input is expected to be a function") + import importlib + fModule = importlib.import_module(func.__module__) + if fModule == "__main__": + raise RuntimeError("Input function is expected to be part of a module") + st = None + with open(fModule.__file__,"r") as ff: + st = ff.read() + # retrieve the varname holding the function in its module + fStr = func.__code__.co_name + if fStr is None: + raise RuntimeError("Impossible to locate function in the module containing it !") + # retrieve args of the func passed in input + fArgs = inspect.getfullargspec(func).args + # agregate the content of the Python module containing the function with expected _exec function for pydfx + pyScript = """{} +def _exec({}): + yxyx = {}({}) + return yxyx +""".format(st,", ".join(fArgs),fStr,", ".join(fArgs)) + # + self.myScript.loadString(pyScript) + self.mySample = self.myScript.CreateEmptySample() + # management of the case of single input + if not hasattr(iterable[0],"__iter__"): + iterable = [[elt] for elt in iterable] + # + self.mySample.setInputValues( {k:v for k,*v in zip(fArgs,*iterable)} ) + # + self.myStudy.createNewJob(self.myScript, self.mySample, self.myParams) + # + self.myStudy.launch() + self.myStudy.wait() + # ask for result : this call implicitely copy back results to the client + self.myStudy.getResult() + # + if self.myStudy.getJobState() == "FINISHED" and self.myStudy.global_result.exit_code == "0": + messageFromSlaves = self.myStudy.sample.getMessages() + if all( [elt == "" for elt in messageFromSlaves] ): + ret = [elt for elt in zip(*[self.myStudy.sample.getOutput(n) for n in self.myStudy.sample.getOutputNames()])] + if len(self.myStudy.sample.getOutputNames()) == 1: + ret = [elt[0] for elt in ret] + self.removeTmpDir = True + return ret + else: + excMsg = "\n".join(["Error for sample # {} : \'{}\' ".format(i,elt) for i,elt in enumerate(messageFromSlaves) if elt != ""]) + excMsg += "\nDirectory containing information for debug : {}".format(self.getResultDirectory()) + exc = RuntimeError( excMsg ) + exc.tmp_dir = self.getResultDirectory() + raise exc + else: + raise RuntimeError( "Error during job submission or during the driver execution (that should never happend). Internal error : {}".format(self.myStudy.getResult().getErrors()) ) + + def __enter__(self): + self.myStudy = pydefx.PyStudy() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + import os + from glob import glob + import shutil + if self.removeTmpDir: + for fn in glob(os.path.join(self.getResultDirectory(),"*")): + if os.path.isdir( fn ): + shutil.rmtree( fn) + else: + os.unlink( fn ) + pass + pass + +def init(resourceName, resultDirectory = "/tmp"): + """ + Instanciate a pydefx.Parameters intance that can be overriden right after. + Here some example of typical override of the returned object of this method : + + define a local directory for working files : + myParams.salome_parameters.result_directory with existing directory. If not myParams.createResultDirectory("/tmp") + + Define additionnal files necessary for computation. + They will be copied to the remote working directory. + This parameter is empty by default : + myParams.salome_parameters.in_files = [] + + Override computation ressource : + myParams.salome_parameters.resource_required.name) + + Override default # of parallel evaluation : + myParams.nb_branches + + Override number of cores requested by the job when job resource is a cluster (for job scheduler query) : + myParams.salome_parameters.resource_required.nb_proc + + Override number of computationnal nodes on cluster to be allocated when job resource is a cluster (for job scheduler query) : + myParams.salome_parameters.resource_required.nb_node + + Override working directory : + myParams.salome_parameters.work_directory + + :param resourceName: Name of the resource matching one of the ${KERNEL_ROOT_DIR}/share/salome/resources/kernel/CatalogResources.xml + :param resultDirectory: Directory used to transfer results + :return: a pydefx.Parameters instance + """ + myParams = pydefx.Parameters() + from pydefx import configuration + if resourceName not in configuration.availableResources(): + raise RuntimeError("Resource \"{}\" is not existing or not declared as able to launch job. Available resources are : {}".format(resourceName,str(configuration.availableResources()))) + myParams.configureResource(resourceName) + myParams.createResultDirectory(resultDirectory) + if resourceName == "localhost": + myParams.nb_branches = configuration.allCoresAvailable() + return myParams