]> SALOME platform Git repositories - tools/ydefx.git/commitdiff
Salome HOME
Code review. agy/mpmcn_2 V9_11_0a1 V9_11_0a2 V9_11_0b1 2/head
authorOvidiu Mircescu <ovidiu.mircescu@edf.fr>
Tue, 17 Jan 2023 15:57:31 +0000 (16:57 +0100)
committerOvidiu Mircescu <ovidiu.mircescu@edf.fr>
Tue, 17 Jan 2023 15:57:31 +0000 (16:57 +0100)
src/CMakeLists.txt
src/pydefx/CMakeLists.txt
src/pydefx/configuration.py
src/pydefx/mpmcn.py [deleted file]
src/pytools/CMakeLists.txt [new file with mode: 0644]
src/pytools/mpmcn.py [new file with mode: 0644]

index fb26fe43fbb20738fabfa687c0826b093ebd699e..6045f2be4ef6f3e4a411931c8210afd41156f3e0 100644 (file)
@@ -26,6 +26,7 @@ ENDIF(SALOME_BUILD_TESTS)
 
 ADD_SUBDIRECTORY(cpp)
 ADD_SUBDIRECTORY(pydefx)
+ADD_SUBDIRECTORY(pytools)
 IF(YDEFX_BUILD_GUI)
   ADD_SUBDIRECTORY(gui)
 ENDIF(YDEFX_BUILD_GUI)
index 36174574dd803f0a75af232f436c23f5ff697e1e..f5a2ca3dd437c76f492c30c62230b9c05ddd1f15 100644 (file)
@@ -39,6 +39,5 @@ SET(SCRIPTS
   )
 
 INSTALL(FILES ${SCRIPTS} DESTINATION ${SALOME_INSTALL_PYTHON}/pydefx)
-install(FILES mpmcn.py DESTINATION ${SALOME_INSTALL_PYTHON})
 ADD_SUBDIRECTORY(schemas)
 ADD_SUBDIRECTORY(plugins)
index 210a2051ce80bcf8931a2eb65337d82519f7cb43..6852b75a5116e552a7bd363830c6d0681cd8f6d8 100644 (file)
@@ -29,22 +29,26 @@ def defaultWorkingDir(resource):
   return resource_definition.working_directory
 
 def defaultNbBranches(resource):
+  """
+  Return the number of cores available on a resource.
+  """
   resManager = salome_proxy.getResourcesManager()
   resource_definition = resManager.GetResourceDefinition(resource)
-  ret = resource_definition.nb_node
+  ret = resource_definition.nb_node * resource_definition.nb_proc_per_node
   if ret < 1:
     ret = 1
   return ret
-  
-def getNumberOfCoresForLocalhost():
-    """
-    Return total number of cores for all resources marked as able to run containers ( "canRunContainers" attribute
-    set to true in the CatalogResources.xml ).
-    """
-    import LifeCycleCORBA
-    params = LifeCycleCORBA.ResourceParameters(can_run_containers=True)
-    resources = salome_proxy.getResourcesManager().GetFittingResources(params)
-    return sum( [salome_proxy.getResourcesManager().GetResourceDefinition(res).nb_proc_per_node for res in resources] )
+
+def allCoresAvailable():
+  """
+  Return the total number of cores of all resources that can run containers.
+  ( "canRunContainers" attribute set to true in CatalogResources.xml )
+  """
+  resManager = salome_proxy.getResourcesManager()
+  params     = salome_proxy.createSalomeParameters()
+  params.resource_required.can_run_containers = True
+  resources  = resManager.GetFittingResources(params.resource_required)
+  return sum([defaultNbBranches(res) for res in resources ])
 
 def defaultBaseDirectory():
   """Return the default path to the root of any new result directory."""
@@ -63,11 +67,10 @@ def defaultWckey(resource="localhost"):
   return result
 
 def availableResources():
-  """ Return the list of resources defined in the current catalog.
-  
-  Regarding list of Resources in the CatalogResources.xml this method returns those able to launch jobs.
-  Ydefx engine delegate to a job encapsulating itself an execution of YACS graph with driver application.
-  Consequently, the resource to evaluate in parallel the function should one of those returned by this method.
+  """
+  Return the list of resources defined in the current catalog that are able to
+  launch jobs.
+  Ydefx can launch the evaluations in a job on one of these resources.
   """
   resManager = salome_proxy.getResourcesManager()
   params     = salome_proxy.createSalomeParameters()
diff --git a/src/pydefx/mpmcn.py b/src/pydefx/mpmcn.py
deleted file mode 100644 (file)
index 0c68003..0000000
+++ /dev/null
@@ -1,154 +0,0 @@
-#!/usr/bin/env python3
-#  -*- coding: utf-8 -*-
-# Copyright (C) 2022 EDF R&D
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2.1 of the License, or (at your option) any later version.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
-#
-# See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
-
-import pydefx
-
-class Pool:
-    def __init__(self, params):
-        """
-        :param params: parameter of the job
-        :type params: pydefx.Parameters
-        """
-        self.myStudy = None
-        self.myScript = pydefx.PyScript()
-        self.mySample = None
-        self.myParams = params
-        self.removeTmpDir = False
-
-    def getResultDirectory(self):
-        return self.myParams.salome_parameters.result_directory
-    
-    def map(self, func, iterable):
-        if len(iterable) == 0:
-            return []
-        #
-        import inspect
-        if not inspect.isfunction(func):
-            raise RuntimeError("Input is expected to be a function")
-        import importlib
-        fModule = importlib.import_module(func.__module__)
-        if fModule == "__main__":
-            raise RuntimeError("Input function is expected to be part of a module")
-        st = None
-        with open(fModule.__file__,"r") as ff:
-            st = ff.read()
-        # retrieve the varname holding the function in its module
-        fStr = func.__code__.co_name
-        if fStr is None:
-            raise RuntimeError("Impossible to locate function in the module containing it !")
-        # retrieve args of the func passed in input
-        fArgs = inspect.getfullargspec(func).args
-        # agregate the content of the Python module containing the function with expected _exec function for pydfx
-        pyScript = """{}
-def _exec({}):
-    yxyx = {}({})
-    return yxyx
-""".format(st,", ".join(fArgs),fStr,", ".join(fArgs))
-        #
-        self.myScript.loadString(pyScript)
-        self.mySample = self.myScript.CreateEmptySample()
-        # management of the case of single input
-        if not hasattr(iterable[0],"__iter__"):
-            iterable = [[elt] for elt in iterable]
-        #
-        self.mySample.setInputValues( {k:v for k,*v in zip(fArgs,*iterable)} )
-        #
-        self.myStudy.createNewJob(self.myScript, self.mySample, self.myParams)
-        #
-        self.myStudy.launch()
-        self.myStudy.wait()
-        # ask for result : this call implicitely copy back results to the client
-        self.myStudy.getResult()
-        #
-        if self.myStudy.getJobState() == "FINISHED" and self.myStudy.global_result.exit_code == "0":
-            messageFromSlaves = self.myStudy.sample.getMessages()
-            if all( [elt == "" for elt in messageFromSlaves] ):
-                ret = [elt for elt in zip(*[self.myStudy.sample.getOutput(n) for n in self.myStudy.sample.getOutputNames()])]
-                if len(self.myStudy.sample.getOutputNames()) == 1:
-                    ret = [elt[0] for elt in ret]
-                self.removeTmpDir = True
-                return ret
-            else:
-                excMsg = "\n".join(["Error for sample # {} : \'{}\' ".format(i,elt) for i,elt in enumerate(messageFromSlaves) if elt != ""])
-                excMsg += "\nDirectory containing information for debug : {}".format(self.getResultDirectory())
-                exc = RuntimeError( excMsg )
-                exc.tmp_dir = self.getResultDirectory()
-                raise exc
-        else:
-            raise RuntimeError( "Error during job submission or during the driver execution (that should never happend). Internal error : {}".format(self.myStudy.getResult().getErrors()) )
-
-    def __enter__(self):
-        self.myStudy = pydefx.PyStudy()
-        return self
-        
-    def __exit__(self, exc_type, exc_val, exc_tb):
-        import os
-        from glob import glob
-        import shutil
-        if self.removeTmpDir:
-            for fn in glob(os.path.join(self.getResultDirectory(),"*")):
-                if os.path.isdir( fn ):
-                    shutil.rmtree( fn)
-                else:
-                    os.unlink( fn )
-        pass
-    pass
-
-def init(resourceName, resultDirectory = "/tmp"):
-    """
-    Instanciate a pydefx.Parameters intance that can be overriden right after.
-    Here some example of typical override of the returned object of this method :
-
-    define a local directory for working files :
-    myParams.salome_parameters.result_directory with existing directory. If not myParams.createResultDirectory("/tmp")
-
-    Define additionnal files necessary for computation.
-    They will be copied to the remote working directory.
-    This parameter is empty by default :
-    myParams.salome_parameters.in_files = []
-
-    Override computation ressource :
-    myParams.salome_parameters.resource_required.name)
-
-    Override default # of parallel evaluation :
-    myParams.nb_branches
-
-    Override number of cores requested by the job when job resource is a cluster (for job scheduler query) :
-    myParams.salome_parameters.resource_required.nb_proc
-
-    Override number of computationnal nodes on cluster to be allocated when job resource is a cluster (for job scheduler query) :
-    myParams.salome_parameters.resource_required.nb_node
-
-    Override working directory :
-    myParams.salome_parameters.work_directory
-
-    :param resourceName: Name of the resource matching one of the ${KERNEL_ROOT_DIR}/share/salome/resources/kernel/CatalogResources.xml
-    :param resultDirectory: Directory used to transfer results
-    :return: a pydefx.Parameters instance
-    """
-    myParams = pydefx.Parameters()
-    from pydefx import configuration
-    if resourceName not in configuration.availableResources():
-        raise RuntimeError("Resource \"{}\" is not existing or not declared as able to launch job. Available resources are : {}".format(resourceName,str(configuration.availableResources())))
-    myParams.configureResource(resourceName)
-    myParams.createResultDirectory(resultDirectory)
-    if resourceName == "localhost":
-        myParams.nb_branches = configuration.getNumberOfCoresForLocalhost()
-    return myParams
diff --git a/src/pytools/CMakeLists.txt b/src/pytools/CMakeLists.txt
new file mode 100644 (file)
index 0000000..57ac808
--- /dev/null
@@ -0,0 +1,23 @@
+# Copyright (C) 2023 EDF R&D
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
+#
+# See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
+#
+SET(SCRIPTS
+mpmcn.py
+)
+
+INSTALL(FILES ${SCRIPTS} DESTINATION ${SALOME_INSTALL_PYTHON})
diff --git a/src/pytools/mpmcn.py b/src/pytools/mpmcn.py
new file mode 100644 (file)
index 0000000..bbbb3a2
--- /dev/null
@@ -0,0 +1,154 @@
+#!/usr/bin/env python3
+#  -*- coding: utf-8 -*-
+# Copyright (C) 2022 EDF R&D
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
+#
+# See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
+
+import pydefx
+
+class Pool:
+    def __init__(self, params):
+        """
+        :param params: parameter of the job
+        :type params: pydefx.Parameters
+        """
+        self.myStudy = None
+        self.myScript = pydefx.PyScript()
+        self.mySample = None
+        self.myParams = params
+        self.removeTmpDir = False
+
+    def getResultDirectory(self):
+        return self.myParams.salome_parameters.result_directory
+    
+    def map(self, func, iterable):
+        if len(iterable) == 0:
+            return []
+        #
+        import inspect
+        if not inspect.isfunction(func):
+            raise RuntimeError("Input is expected to be a function")
+        import importlib
+        fModule = importlib.import_module(func.__module__)
+        if fModule == "__main__":
+            raise RuntimeError("Input function is expected to be part of a module")
+        st = None
+        with open(fModule.__file__,"r") as ff:
+            st = ff.read()
+        # retrieve the varname holding the function in its module
+        fStr = func.__code__.co_name
+        if fStr is None:
+            raise RuntimeError("Impossible to locate function in the module containing it !")
+        # retrieve args of the func passed in input
+        fArgs = inspect.getfullargspec(func).args
+        # agregate the content of the Python module containing the function with expected _exec function for pydfx
+        pyScript = """{}
+def _exec({}):
+    yxyx = {}({})
+    return yxyx
+""".format(st,", ".join(fArgs),fStr,", ".join(fArgs))
+        #
+        self.myScript.loadString(pyScript)
+        self.mySample = self.myScript.CreateEmptySample()
+        # management of the case of single input
+        if not hasattr(iterable[0],"__iter__"):
+            iterable = [[elt] for elt in iterable]
+        #
+        self.mySample.setInputValues( {k:v for k,*v in zip(fArgs,*iterable)} )
+        #
+        self.myStudy.createNewJob(self.myScript, self.mySample, self.myParams)
+        #
+        self.myStudy.launch()
+        self.myStudy.wait()
+        # ask for result : this call implicitely copy back results to the client
+        self.myStudy.getResult()
+        #
+        if self.myStudy.getJobState() == "FINISHED" and self.myStudy.global_result.exit_code == "0":
+            messageFromSlaves = self.myStudy.sample.getMessages()
+            if all( [elt == "" for elt in messageFromSlaves] ):
+                ret = [elt for elt in zip(*[self.myStudy.sample.getOutput(n) for n in self.myStudy.sample.getOutputNames()])]
+                if len(self.myStudy.sample.getOutputNames()) == 1:
+                    ret = [elt[0] for elt in ret]
+                self.removeTmpDir = True
+                return ret
+            else:
+                excMsg = "\n".join(["Error for sample # {} : \'{}\' ".format(i,elt) for i,elt in enumerate(messageFromSlaves) if elt != ""])
+                excMsg += "\nDirectory containing information for debug : {}".format(self.getResultDirectory())
+                exc = RuntimeError( excMsg )
+                exc.tmp_dir = self.getResultDirectory()
+                raise exc
+        else:
+            raise RuntimeError( "Error during job submission or during the driver execution (that should never happend). Internal error : {}".format(self.myStudy.getResult().getErrors()) )
+
+    def __enter__(self):
+        self.myStudy = pydefx.PyStudy()
+        return self
+        
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        import os
+        from glob import glob
+        import shutil
+        if self.removeTmpDir:
+            for fn in glob(os.path.join(self.getResultDirectory(),"*")):
+                if os.path.isdir( fn ):
+                    shutil.rmtree( fn)
+                else:
+                    os.unlink( fn )
+        pass
+    pass
+
+def init(resourceName, resultDirectory = "/tmp"):
+    """
+    Instanciate a pydefx.Parameters intance that can be overriden right after.
+    Here some example of typical override of the returned object of this method :
+
+    define a local directory for working files :
+    myParams.salome_parameters.result_directory with existing directory. If not myParams.createResultDirectory("/tmp")
+
+    Define additionnal files necessary for computation.
+    They will be copied to the remote working directory.
+    This parameter is empty by default :
+    myParams.salome_parameters.in_files = []
+
+    Override computation ressource :
+    myParams.salome_parameters.resource_required.name)
+
+    Override default # of parallel evaluation :
+    myParams.nb_branches
+
+    Override number of cores requested by the job when job resource is a cluster (for job scheduler query) :
+    myParams.salome_parameters.resource_required.nb_proc
+
+    Override number of computationnal nodes on cluster to be allocated when job resource is a cluster (for job scheduler query) :
+    myParams.salome_parameters.resource_required.nb_node
+
+    Override working directory :
+    myParams.salome_parameters.work_directory
+
+    :param resourceName: Name of the resource matching one of the ${KERNEL_ROOT_DIR}/share/salome/resources/kernel/CatalogResources.xml
+    :param resultDirectory: Directory used to transfer results
+    :return: a pydefx.Parameters instance
+    """
+    myParams = pydefx.Parameters()
+    from pydefx import configuration
+    if resourceName not in configuration.availableResources():
+        raise RuntimeError("Resource \"{}\" is not existing or not declared as able to launch job. Available resources are : {}".format(resourceName,str(configuration.availableResources())))
+    myParams.configureResource(resourceName)
+    myParams.createResultDirectory(resultDirectory)
+    if resourceName == "localhost":
+        myParams.nb_branches = configuration.allCoresAvailable()
+    return myParams