]> SALOME platform Git repositories - tools/ydefx.git/commitdiff
Salome HOME
Integration of mpmcn python module to emulate multiprocessing std python module over...
authorAnthony Geay <anthony.geay@edf.fr>
Fri, 30 Dec 2022 12:07:35 +0000 (13:07 +0100)
committerAnthony Geay <anthony.geay@edf.fr>
Tue, 17 Jan 2023 07:58:41 +0000 (08:58 +0100)
src/pydefx/CMakeLists.txt
src/pydefx/mpmcn.py [new file with mode: 0644]
src/pyexample/CMakeLists.txt
src/pyexample/runUnitTest.sh
src/pyexample/test_mpmcn.py [new file with mode: 0644]

index f5a2ca3dd437c76f492c30c62230b9c05ddd1f15..36174574dd803f0a75af232f436c23f5ff697e1e 100644 (file)
@@ -39,5 +39,6 @@ SET(SCRIPTS
   )
 
 INSTALL(FILES ${SCRIPTS} DESTINATION ${SALOME_INSTALL_PYTHON}/pydefx)
+install(FILES mpmcn.py DESTINATION ${SALOME_INSTALL_PYTHON})
 ADD_SUBDIRECTORY(schemas)
 ADD_SUBDIRECTORY(plugins)
diff --git a/src/pydefx/mpmcn.py b/src/pydefx/mpmcn.py
new file mode 100644 (file)
index 0000000..12fcfc6
--- /dev/null
@@ -0,0 +1,176 @@
+#!/usr/bin/env python3
+#  -*- coding: utf-8 -*-
+# Copyright (C) 2022 EDF R&D
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
+#
+# See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
+
+import pydefx
+import salome
+salome.salome_init()
+
+class Pool:
+    def __init__(self, params):
+        """
+        :param params: parameter of the job
+        :type params: pydefx.Parameters
+        """
+        self.myStudy = None
+        self.myScript = pydefx.PyScript()
+        self.mySample = None
+        self.myParams = params
+        self.removeTmpDir = False
+
+    def getResultDirectory(self):
+        return self.myParams.salome_parameters.result_directory
+    
+    def map(self, func, iterable):
+        if len(iterable) == 0:
+            return []
+        #
+        import inspect
+        if not inspect.isfunction(func):
+            raise RuntimeError("Input is expected to be a function")
+        import importlib
+        fModule = importlib.import_module(func.__module__)
+        if fModule == "__main__":
+            raise RuntimeError("Input function is expected to be part of a module")
+        st = None
+        with open(fModule.__file__,"r") as ff:
+            st = ff.read()
+        # retrieve the varname holding the function in its module
+        fStr = func.__code__.co_name
+        if fStr is None:
+            raise RuntimeError("Impossible to locate function in the module containing it !")
+        # retrieve args of the func passed in input
+        fArgs = inspect.getfullargspec(func).args
+        # agregate the content of the Python module containing the function with expected _exec function for pydfx
+        pyScript = """{}
+def _exec({}):
+    yxyx = {}({})
+    return yxyx
+""".format(st,", ".join(fArgs),fStr,", ".join(fArgs))
+        #
+        self.myScript.loadString(pyScript)
+        self.mySample = self.myScript.CreateEmptySample()
+        # management of the case of single input
+        if not hasattr(iterable[0],"__iter__"):
+            iterable = [[elt] for elt in iterable]
+        #
+        self.mySample.setInputValues( {k:v for k,*v in zip(fArgs,*iterable)} )
+        #
+        self.myStudy.createNewJob(self.myScript, self.mySample, self.myParams)
+        #
+        self.myStudy.launch()
+        self.myStudy.wait()
+        # ask for result : this call implicitely copy back results to the client
+        self.myStudy.getResult()
+        #
+        if self.myStudy.getJobState() == "FINISHED" and self.myStudy.global_result.exit_code == "0":
+            messageFromSlaves = self.myStudy.sample.getMessages()
+            if all( [elt == "" for elt in messageFromSlaves] ):
+                ret = [elt for elt in zip(*[self.myStudy.sample.getOutput(n) for n in self.myStudy.sample.getOutputNames()])]
+                if len(self.myStudy.sample.getOutputNames()) == 1:
+                    ret = [elt[0] for elt in ret]
+                self.removeTmpDir = True
+                return ret
+            else:
+                excMsg = "\n".join(["Error for sample # {} : \'{}\' ".format(i,elt) for i,elt in enumerate(messageFromSlaves) if elt != ""])
+                excMsg += "\nDirectory containing information for debug : {}".format(self.getResultDirectory())
+                exc = RuntimeError( excMsg )
+                exc.tmp_dir = self.getResultDirectory()
+                raise exc
+        else:
+            raise RuntimeError("Error during job submission or during the driver execution (that should never happend)")
+
+    def __enter__(self):
+        self.myStudy = pydefx.PyStudy()
+        return self
+        
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        import os
+        from glob import glob
+        import shutil
+        if self.removeTmpDir:
+            for fn in glob(os.path.join(self.getResultDirectory(),"*")):
+                if os.path.isdir( fn ):
+                    shutil.rmtree( fn)
+                else:
+                    os.unlink( fn )
+        pass
+    pass
+
+def getResourcesAbleToLaunchJobs():
+    """
+    Regarding list of Resources in the CatalogResources.xml this method returns those able to launch jobs.
+    Ydefx engine delegate to a job encapsulating itself an execution of YACS graph with driver application.
+    Consequently, the resource to evaluate in parallel the function should one of those returned by this method.
+    """
+    import LifeCycleCORBA
+    params = LifeCycleCORBA.ResourceParameters(can_launch_batch_jobs=True)
+    # ask to resource manager to filter among all resources listed in CatalogResources those able to launch job.
+    return salome.rm.GetFittingResources(params)
+
+def getNumberOfCoresForLocalhost():
+    """
+    Return total number of cores for all resources marked as able to run containers ( "canRunContainers" attribute
+    set to true in the CatalogResources.xml ).
+    """
+    import LifeCycleCORBA
+    params = LifeCycleCORBA.ResourceParameters(can_run_containers=True)
+    resources = salome.rm.GetFittingResources(params)
+    return sum( [salome.rm.GetResourceDefinition(res).nb_proc_per_node for res in resources] )
+
+def init(resourceName, resultDirectory = "/tmp"):
+    """
+    Instanciate a pydefx.Parameters intance that can be overriden right after.
+    Here some example of typical override of the returned object of this method :
+
+    define a local directory for working files :
+    myParams.salome_parameters.result_directory with existing directory. If not myParams.createResultDirectory("/tmp")
+
+    Define additionnal files necessary for computation.
+    They will be copied to the remote working directory.
+    This parameter is empty by default :
+    myParams.salome_parameters.in_files = []
+
+    Override computation ressource :
+    myParams.salome_parameters.resource_required.name)
+
+    Override default # of parallel evaluation :
+    myParams.nb_branches
+
+    Override number of cores requested by the job when job resource is a cluster (for job scheduler query) :
+    myParams.salome_parameters.resource_required.nb_proc
+
+    Override number of computationnal nodes on cluster to be allocated when job resource is a cluster (for job scheduler query) :
+    myParams.salome_parameters.resource_required.nb_node
+
+    Override working directory :
+    myParams.salome_parameters.work_directory
+
+    :param resourceName: Name of the resource matching one of the ${KERNEL_ROOT_DIR}/share/salome/resources/kernel/CatalogResources.xml
+    :param resultDirectory: Directory used to transfer results
+    :return: a pydefx.Parameters instance
+    """
+    myParams = pydefx.Parameters()
+    if resourceName not in getResourcesAbleToLaunchJobs():
+        raise RuntimeError("Resource \"{}\" is not existing or not declared as able to launch job. Available resources are : {}".format(resourceName,str(getResourcesAbleToLaunchJobs())))
+    myParams.configureResource(resourceName)
+    myParams.createResultDirectory(resultDirectory)
+    if resourceName == "localhost":
+        myParams.nb_branches = getNumberOfCoresForLocalhost()
+    return myParams
index 2160cd74b79d4564a16e8c050f5656ad47737080..a144487a9b81e37f4b76aa82c95e2334edcd7531 100644 (file)
@@ -26,6 +26,7 @@ IF(SALOME_BUILD_TESTS)
   test_insitu.py
   test_prescript.py
   test_default.py
+  test_mpmcn.py
   )
   INSTALL(FILES ${TESTFILES} DESTINATION ${LOCAL_TEST_DIR})
   INSTALL(PROGRAMS runUnitTest.sh
index d516d31a48b3a25cce997a01d19d2eafe78103bf..7531d2d9e0c1d8cb94e12c170741779899d43421 100755 (executable)
@@ -18,6 +18,9 @@
 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
 #
 
-python3 -m unittest test_insitu.py test_prescript.py test_default.py
-ret=$?
+python3 -m unittest test_insitu.py test_prescript.py test_default.py 
+ret0=$?
+python3 test_mpmcn.py
+ret1=$?
+let ret=$ret0+$ret1
 exit $ret
diff --git a/src/pyexample/test_mpmcn.py b/src/pyexample/test_mpmcn.py
new file mode 100644 (file)
index 0000000..098bd38
--- /dev/null
@@ -0,0 +1,40 @@
+
+#!/usr/bin/env python3
+#  -*- coding: utf-8 -*-
+# 
+
+# multiple processes on multiple compute nodes
+
+def f(x):
+    return x*x*x
+    
+def f2(x):
+    if x==3:
+        raise RuntimeError("lllll")
+    return x*x*x
+
+import mpmcn
+
+def gg():
+    # case 0 : normal behavior with no raise
+    params = mpmcn.init("localhost")
+    with mpmcn.Pool(params) as p:
+        res = p.map(f,list(range(10)))
+        # getResultDirectory : for advanced users
+        p.getResultDirectory()
+    if res != [0.0, 1.0, 8.0, 27.0, 64.0, 125.0, 216.0, 343.0, 512.0, 729.0]:
+        raise RuntimeError("Test Failed !")
+    # case 1 : behavior with raise
+    params = mpmcn.init("localhost")
+    with mpmcn.Pool(params) as p2:
+        try:
+            res = p2.map(f2,list(range(10)))
+            raise RuntimeError("Exception not thrown -> Error !")
+        except RuntimeError as e:
+            strExpected = "Error for sample # 3 : \'lllll\'"
+            # tmp_dir attr of e returns the ResultDirectory to dive into
+            if str(e)[:len(strExpected)] != strExpected or not hasattr(e,"tmp_dir"):
+                raise RuntimeError("Test Failed 2 !")
+
+if __name__ == "__main__":
+    gg()