]> SALOME platform Git repositories - tools/ydefx.git/commitdiff
Salome HOME
Implementation OK for testcase. Including exception management
authorAnthony Geay <anthony.geay@edf.fr>
Mon, 16 Jan 2023 15:22:32 +0000 (16:22 +0100)
committerAnthony Geay <anthony.geay@edf.fr>
Mon, 16 Jan 2023 15:22:32 +0000 (16:22 +0100)
src/pydefx/mpmcn.py

index b6a95696917bfe9b1dd7b2a2b777cb20913f178a..f66d31744c0b1a748851ec17e6f8beab4d973974 100644 (file)
@@ -22,16 +22,18 @@ import pydefx
 import salome
 salome.salome_init()
 
-myParams = None
-
 class Pool:
-    def __init__(self):
+    def __init__(self, params):
+        """
+        :param params: parameter of the job
+        :type params: pydefx.Parameters
+        """
         self.myStudy = None
         self.myScript = pydefx.PyScript()
         self.mySample = None
+        self.myParams = params
     
     def map(self, func, iterable):
-        global myParams
         if len(iterable) == 0:
             return []
         #
@@ -45,16 +47,13 @@ class Pool:
         st = None
         with open(fModule.__file__,"r") as ff:
             st = ff.read()
+        # retrieve the varname holding the function in its module
         fStr = func.__code__.co_name
-        """fStr = None
-        for elt in dir(fModule):
-            if eval(elt) == f:
-                fStr = elt
-                print(fStr)
-                break"""
         if fStr is None:
             raise RuntimeError("Impossible to locate function in the module containing it !")
+        # retrieve args of the func passed in input
         fArgs = inspect.getfullargspec(func).args
+        # agregate the content of the Python module containing the function with expected _exec function for pydfx
         pyScript = """{}
 def _exec({}):
     yxyx = {}({})
@@ -63,22 +62,31 @@ def _exec({}):
         #
         self.myScript.loadString(pyScript)
         self.mySample = self.myScript.CreateEmptySample()
-        #
+        # management of the case of single input
         if not hasattr(iterable[0],"__iter__"):
             iterable = [[elt] for elt in iterable]
         #
         self.mySample.setInputValues( {k:v for k,*v in zip(fArgs,*iterable)} )
         #
-        self.myStudy.createNewJob(self.myScript, self.mySample, myParams)
+        self.myStudy.createNewJob(self.myScript, self.mySample, self.myParams)
         #
         self.myStudy.launch()
         self.myStudy.wait()
+        # ask for result : this call implicitely copy back results to the client
         self.myStudy.getResult()
         #
-        ret = [elt for elt in zip(*[self.myStudy.sample._output[n] for n in self.myStudy.sample.getOutputNames()])]
-        if len(self.myStudy.sample.getOutputNames()) == 1:
-            ret = [elt[0] for elt in ret]
-        return ret
+        if self.myStudy.getJobState() == "FINISHED" and self.myStudy.global_result.exit_code == "0":
+            messageFromSlaves = self.myStudy.sample._messages
+            if all( [elt == "" for elt in messageFromSlaves] ):
+                ret = [elt for elt in zip(*[self.myStudy.sample._output[n] for n in self.myStudy.sample.getOutputNames()])]
+                if len(self.myStudy.sample.getOutputNames()) == 1:
+                    ret = [elt[0] for elt in ret]
+                return ret
+            else:
+                excMsg = "\n".join(["Error for sample # {} : \'{}\' ".format(i,elt) for i,elt in enumerate(messageFromSlaves) if elt != ""])
+                raise RuntimeError( excMsg )
+        else:
+            raise RuntimeError("Error during job submission or during the driver execution (that should never happend)")
 
     def __enter__(self):
         self.myStudy = pydefx.PyStudy()
@@ -99,24 +107,54 @@ def getResourcesAbleToLaunchJobs():
     # ask to resource manager to filter among all resources listed in CatalogResources those able to launch job.
     return salome.rm.GetFittingResources(params)
 
+def getNumberOfCoresForLocalhost():
+    """
+    Return total number of cores for all resources marked as able to run containers ( "canRunContainers" attribute
+    set to true in the CatalogResources.xml ).
+    """
+    import LifeCycleCORBA
+    params = LifeCycleCORBA.ResourceParameters(can_run_containers=True)
+    resources = salome.rm.GetFittingResources(params)
+    return sum( [salome.rm.GetResourceDefinition(res).nb_proc_per_node for res in resources] )
 
-def init(resultDirectory = "/tmp"):
+def init(resourceName, resultDirectory = "/tmp"):
     """
+    
+    Instanciate a pydefx.Parameters intance that can be overriden right after.
+    Here some example of typical override of the returned object of this method :
+
+    define a local directory for working files :
+    myParams.salome_parameters.result_directory with existing directory. If not myParams.createResultDirectory("/tmp")
+
+    Define additionnal files necessary for computation.
+    They will be copied to the remote working directory.
+    This parameter is empty by default :
+    myParams.salome_parameters.in_files = []
+
+    Override computation ressource :
+    myParams.salome_parameters.resource_required.name)
+
+    Override default # of parallel evaluation :
+    myParams.nb_branches
+
+    Override number of cores requested by the job when job resource is a cluster (for job scheduler query) :
+    myParams.salome_parameters.resource_required.nb_proc
+
+    Override number of computationnal nodes on cluster to be allocated when job resource is a cluster (for job scheduler query) :
+    myParams.salome_parameters.resource_required.nb_node
+
+    Override working directory :
+    myParams.salome_parameters.work_directory
+
     :param resourceName: Name of the resource matching one of the ${KERNEL_ROOT_DIR}/share/salome/resources/kernel/CatalogResources.xml
     :param resultDirectory: Directory used to transfer results
+    :return: a pydefx.Parameters instance
     """
-    global myParams
     myParams = pydefx.Parameters()
-
-    # pour cluster avec 10 noeuds de 28 coeurs chacuns
-    #
-
-    # On positionne la resource qui va lancer le job qui inclu drive
-    # rmyParams.nb_branches = 4
-    myParams.configureResource("localhost")
+    if resourceName not in getResourcesAbleToLaunchJobs():
+        raise RuntimeError("Resource \"{}\" is not existing or not declared as able to launch job. Available resources are : {}".format(resourceName,str(getResourcesAbleToLaunchJobs())))
+    myParams.configureResource(resourceName)
     myParams.createResultDirectory(resultDirectory)
-    myParams.nb_branches = 280
-    myParams.salome_parameters.
-    # myParams.salome_parameters.work_directory = wd
-    pass
-
+    if resourceName == "localhost":
+        myParams.nb_branches = getNumberOfCoresForLocalhost()
+    return myParams