From ad4545befed4831a0a6b6a227514a67758bc5370 Mon Sep 17 00:00:00 2001 From: Anthony Geay Date: Fri, 30 Dec 2022 13:07:35 +0100 Subject: [PATCH] Integration of mpmcn python module to emulate multiprocessing std python module over multiple nodes. --- src/pydefx/CMakeLists.txt | 1 + src/pydefx/mpmcn.py | 176 +++++++++++++++++++++++++++++++++++ src/pyexample/CMakeLists.txt | 1 + src/pyexample/runUnitTest.sh | 7 +- src/pyexample/test_mpmcn.py | 40 ++++++++ 5 files changed, 223 insertions(+), 2 deletions(-) create mode 100644 src/pydefx/mpmcn.py create mode 100644 src/pyexample/test_mpmcn.py diff --git a/src/pydefx/CMakeLists.txt b/src/pydefx/CMakeLists.txt index f5a2ca3..3617457 100644 --- a/src/pydefx/CMakeLists.txt +++ b/src/pydefx/CMakeLists.txt @@ -39,5 +39,6 @@ SET(SCRIPTS ) INSTALL(FILES ${SCRIPTS} DESTINATION ${SALOME_INSTALL_PYTHON}/pydefx) +install(FILES mpmcn.py DESTINATION ${SALOME_INSTALL_PYTHON}) ADD_SUBDIRECTORY(schemas) ADD_SUBDIRECTORY(plugins) diff --git a/src/pydefx/mpmcn.py b/src/pydefx/mpmcn.py new file mode 100644 index 0000000..12fcfc6 --- /dev/null +++ b/src/pydefx/mpmcn.py @@ -0,0 +1,176 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# Copyright (C) 2022 EDF R&D +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com + +import pydefx +import salome +salome.salome_init() + +class Pool: + def __init__(self, params): + """ + :param params: parameter of the job + :type params: pydefx.Parameters + """ + self.myStudy = None + self.myScript = pydefx.PyScript() + self.mySample = None + self.myParams = params + self.removeTmpDir = False + + def getResultDirectory(self): + return self.myParams.salome_parameters.result_directory + + def map(self, func, iterable): + if len(iterable) == 0: + return [] + # + import inspect + if not inspect.isfunction(func): + raise RuntimeError("Input is expected to be a function") + import importlib + fModule = importlib.import_module(func.__module__) + if fModule == "__main__": + raise RuntimeError("Input function is expected to be part of a module") + st = None + with open(fModule.__file__,"r") as ff: + st = ff.read() + # retrieve the varname holding the function in its module + fStr = func.__code__.co_name + if fStr is None: + raise RuntimeError("Impossible to locate function in the module containing it !") + # retrieve args of the func passed in input + fArgs = inspect.getfullargspec(func).args + # agregate the content of the Python module containing the function with expected _exec function for pydfx + pyScript = """{} +def _exec({}): + yxyx = {}({}) + return yxyx +""".format(st,", ".join(fArgs),fStr,", ".join(fArgs)) + # + self.myScript.loadString(pyScript) + self.mySample = self.myScript.CreateEmptySample() + # management of the case of single input + if not hasattr(iterable[0],"__iter__"): + iterable = [[elt] for elt in iterable] + # + self.mySample.setInputValues( {k:v for k,*v in zip(fArgs,*iterable)} ) + # + self.myStudy.createNewJob(self.myScript, self.mySample, self.myParams) + # + self.myStudy.launch() + self.myStudy.wait() + # ask for result : this call implicitely copy back results to the client + self.myStudy.getResult() + # + if self.myStudy.getJobState() == "FINISHED" and self.myStudy.global_result.exit_code == "0": + messageFromSlaves = self.myStudy.sample.getMessages() + if all( [elt == "" for elt in messageFromSlaves] ): + ret = [elt for elt in zip(*[self.myStudy.sample.getOutput(n) for n in self.myStudy.sample.getOutputNames()])] + if len(self.myStudy.sample.getOutputNames()) == 1: + ret = [elt[0] for elt in ret] + self.removeTmpDir = True + return ret + else: + excMsg = "\n".join(["Error for sample # {} : \'{}\' ".format(i,elt) for i,elt in enumerate(messageFromSlaves) if elt != ""]) + excMsg += "\nDirectory containing information for debug : {}".format(self.getResultDirectory()) + exc = RuntimeError( excMsg ) + exc.tmp_dir = self.getResultDirectory() + raise exc + else: + raise RuntimeError("Error during job submission or during the driver execution (that should never happend)") + + def __enter__(self): + self.myStudy = pydefx.PyStudy() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + import os + from glob import glob + import shutil + if self.removeTmpDir: + for fn in glob(os.path.join(self.getResultDirectory(),"*")): + if os.path.isdir( fn ): + shutil.rmtree( fn) + else: + os.unlink( fn ) + pass + pass + +def getResourcesAbleToLaunchJobs(): + """ + Regarding list of Resources in the CatalogResources.xml this method returns those able to launch jobs. + Ydefx engine delegate to a job encapsulating itself an execution of YACS graph with driver application. + Consequently, the resource to evaluate in parallel the function should one of those returned by this method. + """ + import LifeCycleCORBA + params = LifeCycleCORBA.ResourceParameters(can_launch_batch_jobs=True) + # ask to resource manager to filter among all resources listed in CatalogResources those able to launch job. + return salome.rm.GetFittingResources(params) + +def getNumberOfCoresForLocalhost(): + """ + Return total number of cores for all resources marked as able to run containers ( "canRunContainers" attribute + set to true in the CatalogResources.xml ). + """ + import LifeCycleCORBA + params = LifeCycleCORBA.ResourceParameters(can_run_containers=True) + resources = salome.rm.GetFittingResources(params) + return sum( [salome.rm.GetResourceDefinition(res).nb_proc_per_node for res in resources] ) + +def init(resourceName, resultDirectory = "/tmp"): + """ + Instanciate a pydefx.Parameters intance that can be overriden right after. + Here some example of typical override of the returned object of this method : + + define a local directory for working files : + myParams.salome_parameters.result_directory with existing directory. If not myParams.createResultDirectory("/tmp") + + Define additionnal files necessary for computation. + They will be copied to the remote working directory. + This parameter is empty by default : + myParams.salome_parameters.in_files = [] + + Override computation ressource : + myParams.salome_parameters.resource_required.name) + + Override default # of parallel evaluation : + myParams.nb_branches + + Override number of cores requested by the job when job resource is a cluster (for job scheduler query) : + myParams.salome_parameters.resource_required.nb_proc + + Override number of computationnal nodes on cluster to be allocated when job resource is a cluster (for job scheduler query) : + myParams.salome_parameters.resource_required.nb_node + + Override working directory : + myParams.salome_parameters.work_directory + + :param resourceName: Name of the resource matching one of the ${KERNEL_ROOT_DIR}/share/salome/resources/kernel/CatalogResources.xml + :param resultDirectory: Directory used to transfer results + :return: a pydefx.Parameters instance + """ + myParams = pydefx.Parameters() + if resourceName not in getResourcesAbleToLaunchJobs(): + raise RuntimeError("Resource \"{}\" is not existing or not declared as able to launch job. Available resources are : {}".format(resourceName,str(getResourcesAbleToLaunchJobs()))) + myParams.configureResource(resourceName) + myParams.createResultDirectory(resultDirectory) + if resourceName == "localhost": + myParams.nb_branches = getNumberOfCoresForLocalhost() + return myParams diff --git a/src/pyexample/CMakeLists.txt b/src/pyexample/CMakeLists.txt index 2160cd7..a144487 100644 --- a/src/pyexample/CMakeLists.txt +++ b/src/pyexample/CMakeLists.txt @@ -26,6 +26,7 @@ IF(SALOME_BUILD_TESTS) test_insitu.py test_prescript.py test_default.py + test_mpmcn.py ) INSTALL(FILES ${TESTFILES} DESTINATION ${LOCAL_TEST_DIR}) INSTALL(PROGRAMS runUnitTest.sh diff --git a/src/pyexample/runUnitTest.sh b/src/pyexample/runUnitTest.sh index d516d31..7531d2d 100755 --- a/src/pyexample/runUnitTest.sh +++ b/src/pyexample/runUnitTest.sh @@ -18,6 +18,9 @@ # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com # -python3 -m unittest test_insitu.py test_prescript.py test_default.py -ret=$? +python3 -m unittest test_insitu.py test_prescript.py test_default.py +ret0=$? +python3 test_mpmcn.py +ret1=$? +let ret=$ret0+$ret1 exit $ret diff --git a/src/pyexample/test_mpmcn.py b/src/pyexample/test_mpmcn.py new file mode 100644 index 0000000..098bd38 --- /dev/null +++ b/src/pyexample/test_mpmcn.py @@ -0,0 +1,40 @@ + +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# + +# multiple processes on multiple compute nodes + +def f(x): + return x*x*x + +def f2(x): + if x==3: + raise RuntimeError("lllll") + return x*x*x + +import mpmcn + +def gg(): + # case 0 : normal behavior with no raise + params = mpmcn.init("localhost") + with mpmcn.Pool(params) as p: + res = p.map(f,list(range(10))) + # getResultDirectory : for advanced users + p.getResultDirectory() + if res != [0.0, 1.0, 8.0, 27.0, 64.0, 125.0, 216.0, 343.0, 512.0, 729.0]: + raise RuntimeError("Test Failed !") + # case 1 : behavior with raise + params = mpmcn.init("localhost") + with mpmcn.Pool(params) as p2: + try: + res = p2.map(f2,list(range(10))) + raise RuntimeError("Exception not thrown -> Error !") + except RuntimeError as e: + strExpected = "Error for sample # 3 : \'lllll\'" + # tmp_dir attr of e returns the ResultDirectory to dive into + if str(e)[:len(strExpected)] != strExpected or not hasattr(e,"tmp_dir"): + raise RuntimeError("Test Failed 2 !") + +if __name__ == "__main__": + gg() -- 2.39.2