--- /dev/null
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+# Copyright (C) 2022 EDF R&D
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+# See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
+
+import pydefx
+import salome
+salome.salome_init()
+
+class Pool:
+ def __init__(self, params):
+ """
+ :param params: parameter of the job
+ :type params: pydefx.Parameters
+ """
+ self.myStudy = None
+ self.myScript = pydefx.PyScript()
+ self.mySample = None
+ self.myParams = params
+ self.removeTmpDir = False
+
+ def getResultDirectory(self):
+ return self.myParams.salome_parameters.result_directory
+
+ def map(self, func, iterable):
+ if len(iterable) == 0:
+ return []
+ #
+ import inspect
+ if not inspect.isfunction(func):
+ raise RuntimeError("Input is expected to be a function")
+ import importlib
+ fModule = importlib.import_module(func.__module__)
+ if fModule == "__main__":
+ raise RuntimeError("Input function is expected to be part of a module")
+ st = None
+ with open(fModule.__file__,"r") as ff:
+ st = ff.read()
+ # retrieve the varname holding the function in its module
+ fStr = func.__code__.co_name
+ if fStr is None:
+ raise RuntimeError("Impossible to locate function in the module containing it !")
+ # retrieve args of the func passed in input
+ fArgs = inspect.getfullargspec(func).args
+ # agregate the content of the Python module containing the function with expected _exec function for pydfx
+ pyScript = """{}
+def _exec({}):
+ yxyx = {}({})
+ return yxyx
+""".format(st,", ".join(fArgs),fStr,", ".join(fArgs))
+ #
+ self.myScript.loadString(pyScript)
+ self.mySample = self.myScript.CreateEmptySample()
+ # management of the case of single input
+ if not hasattr(iterable[0],"__iter__"):
+ iterable = [[elt] for elt in iterable]
+ #
+ self.mySample.setInputValues( {k:v for k,*v in zip(fArgs,*iterable)} )
+ #
+ self.myStudy.createNewJob(self.myScript, self.mySample, self.myParams)
+ #
+ self.myStudy.launch()
+ self.myStudy.wait()
+ # ask for result : this call implicitely copy back results to the client
+ self.myStudy.getResult()
+ #
+ if self.myStudy.getJobState() == "FINISHED" and self.myStudy.global_result.exit_code == "0":
+ messageFromSlaves = self.myStudy.sample.getMessages()
+ if all( [elt == "" for elt in messageFromSlaves] ):
+ ret = [elt for elt in zip(*[self.myStudy.sample.getOutput(n) for n in self.myStudy.sample.getOutputNames()])]
+ if len(self.myStudy.sample.getOutputNames()) == 1:
+ ret = [elt[0] for elt in ret]
+ self.removeTmpDir = True
+ return ret
+ else:
+ excMsg = "\n".join(["Error for sample # {} : \'{}\' ".format(i,elt) for i,elt in enumerate(messageFromSlaves) if elt != ""])
+ excMsg += "\nDirectory containing information for debug : {}".format(self.getResultDirectory())
+ exc = RuntimeError( excMsg )
+ exc.tmp_dir = self.getResultDirectory()
+ raise exc
+ else:
+ raise RuntimeError("Error during job submission or during the driver execution (that should never happend)")
+
+ def __enter__(self):
+ self.myStudy = pydefx.PyStudy()
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ import os
+ from glob import glob
+ import shutil
+ if self.removeTmpDir:
+ for fn in glob(os.path.join(self.getResultDirectory(),"*")):
+ if os.path.isdir( fn ):
+ shutil.rmtree( fn)
+ else:
+ os.unlink( fn )
+ pass
+ pass
+
+def getResourcesAbleToLaunchJobs():
+ """
+ Regarding list of Resources in the CatalogResources.xml this method returns those able to launch jobs.
+ Ydefx engine delegate to a job encapsulating itself an execution of YACS graph with driver application.
+ Consequently, the resource to evaluate in parallel the function should one of those returned by this method.
+ """
+ import LifeCycleCORBA
+ params = LifeCycleCORBA.ResourceParameters(can_launch_batch_jobs=True)
+ # ask to resource manager to filter among all resources listed in CatalogResources those able to launch job.
+ return salome.rm.GetFittingResources(params)
+
+def getNumberOfCoresForLocalhost():
+ """
+ Return total number of cores for all resources marked as able to run containers ( "canRunContainers" attribute
+ set to true in the CatalogResources.xml ).
+ """
+ import LifeCycleCORBA
+ params = LifeCycleCORBA.ResourceParameters(can_run_containers=True)
+ resources = salome.rm.GetFittingResources(params)
+ return sum( [salome.rm.GetResourceDefinition(res).nb_proc_per_node for res in resources] )
+
+def init(resourceName, resultDirectory = "/tmp"):
+ """
+ Instanciate a pydefx.Parameters intance that can be overriden right after.
+ Here some example of typical override of the returned object of this method :
+
+ define a local directory for working files :
+ myParams.salome_parameters.result_directory with existing directory. If not myParams.createResultDirectory("/tmp")
+
+ Define additionnal files necessary for computation.
+ They will be copied to the remote working directory.
+ This parameter is empty by default :
+ myParams.salome_parameters.in_files = []
+
+ Override computation ressource :
+ myParams.salome_parameters.resource_required.name)
+
+ Override default # of parallel evaluation :
+ myParams.nb_branches
+
+ Override number of cores requested by the job when job resource is a cluster (for job scheduler query) :
+ myParams.salome_parameters.resource_required.nb_proc
+
+ Override number of computationnal nodes on cluster to be allocated when job resource is a cluster (for job scheduler query) :
+ myParams.salome_parameters.resource_required.nb_node
+
+ Override working directory :
+ myParams.salome_parameters.work_directory
+
+ :param resourceName: Name of the resource matching one of the ${KERNEL_ROOT_DIR}/share/salome/resources/kernel/CatalogResources.xml
+ :param resultDirectory: Directory used to transfer results
+ :return: a pydefx.Parameters instance
+ """
+ myParams = pydefx.Parameters()
+ if resourceName not in getResourcesAbleToLaunchJobs():
+ raise RuntimeError("Resource \"{}\" is not existing or not declared as able to launch job. Available resources are : {}".format(resourceName,str(getResourcesAbleToLaunchJobs())))
+ myParams.configureResource(resourceName)
+ myParams.createResultDirectory(resultDirectory)
+ if resourceName == "localhost":
+ myParams.nb_branches = getNumberOfCoresForLocalhost()
+ return myParams
--- /dev/null
+
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+#
+
+# multiple processes on multiple compute nodes
+
+def f(x):
+ return x*x*x
+
+def f2(x):
+ if x==3:
+ raise RuntimeError("lllll")
+ return x*x*x
+
+import mpmcn
+
+def gg():
+ # case 0 : normal behavior with no raise
+ params = mpmcn.init("localhost")
+ with mpmcn.Pool(params) as p:
+ res = p.map(f,list(range(10)))
+ # getResultDirectory : for advanced users
+ p.getResultDirectory()
+ if res != [0.0, 1.0, 8.0, 27.0, 64.0, 125.0, 216.0, 343.0, 512.0, 729.0]:
+ raise RuntimeError("Test Failed !")
+ # case 1 : behavior with raise
+ params = mpmcn.init("localhost")
+ with mpmcn.Pool(params) as p2:
+ try:
+ res = p2.map(f2,list(range(10)))
+ raise RuntimeError("Exception not thrown -> Error !")
+ except RuntimeError as e:
+ strExpected = "Error for sample # 3 : \'lllll\'"
+ # tmp_dir attr of e returns the ResultDirectory to dive into
+ if str(e)[:len(strExpected)] != strExpected or not hasattr(e,"tmp_dir"):
+ raise RuntimeError("Test Failed 2 !")
+
+if __name__ == "__main__":
+ gg()