From a7b52402d4683e6ee2c4f8ff1b375d550f27330e Mon Sep 17 00:00:00 2001 From: Anthony Geay Date: Mon, 16 Jan 2023 16:22:32 +0100 Subject: [PATCH] Implementation OK for testcase. Including exception management --- src/pydefx/mpmcn.py | 98 +++++++++++++++++++++++++++++++-------------- 1 file changed, 68 insertions(+), 30 deletions(-) diff --git a/src/pydefx/mpmcn.py b/src/pydefx/mpmcn.py index b6a9569..f66d317 100644 --- a/src/pydefx/mpmcn.py +++ b/src/pydefx/mpmcn.py @@ -22,16 +22,18 @@ import pydefx import salome salome.salome_init() -myParams = None - class Pool: - def __init__(self): + def __init__(self, params): + """ + :param params: parameter of the job + :type params: pydefx.Parameters + """ self.myStudy = None self.myScript = pydefx.PyScript() self.mySample = None + self.myParams = params def map(self, func, iterable): - global myParams if len(iterable) == 0: return [] # @@ -45,16 +47,13 @@ class Pool: st = None with open(fModule.__file__,"r") as ff: st = ff.read() + # retrieve the varname holding the function in its module fStr = func.__code__.co_name - """fStr = None - for elt in dir(fModule): - if eval(elt) == f: - fStr = elt - print(fStr) - break""" if fStr is None: raise RuntimeError("Impossible to locate function in the module containing it !") + # retrieve args of the func passed in input fArgs = inspect.getfullargspec(func).args + # agregate the content of the Python module containing the function with expected _exec function for pydfx pyScript = """{} def _exec({}): yxyx = {}({}) @@ -63,22 +62,31 @@ def _exec({}): # self.myScript.loadString(pyScript) self.mySample = self.myScript.CreateEmptySample() - # + # management of the case of single input if not hasattr(iterable[0],"__iter__"): iterable = [[elt] for elt in iterable] # self.mySample.setInputValues( {k:v for k,*v in zip(fArgs,*iterable)} ) # - self.myStudy.createNewJob(self.myScript, self.mySample, myParams) + self.myStudy.createNewJob(self.myScript, self.mySample, self.myParams) # self.myStudy.launch() self.myStudy.wait() + # ask for result : this call implicitely copy back results to the client self.myStudy.getResult() # - ret = [elt for elt in zip(*[self.myStudy.sample._output[n] for n in self.myStudy.sample.getOutputNames()])] - if len(self.myStudy.sample.getOutputNames()) == 1: - ret = [elt[0] for elt in ret] - return ret + if self.myStudy.getJobState() == "FINISHED" and self.myStudy.global_result.exit_code == "0": + messageFromSlaves = self.myStudy.sample._messages + if all( [elt == "" for elt in messageFromSlaves] ): + ret = [elt for elt in zip(*[self.myStudy.sample._output[n] for n in self.myStudy.sample.getOutputNames()])] + if len(self.myStudy.sample.getOutputNames()) == 1: + ret = [elt[0] for elt in ret] + return ret + else: + excMsg = "\n".join(["Error for sample # {} : \'{}\' ".format(i,elt) for i,elt in enumerate(messageFromSlaves) if elt != ""]) + raise RuntimeError( excMsg ) + else: + raise RuntimeError("Error during job submission or during the driver execution (that should never happend)") def __enter__(self): self.myStudy = pydefx.PyStudy() @@ -99,24 +107,54 @@ def getResourcesAbleToLaunchJobs(): # ask to resource manager to filter among all resources listed in CatalogResources those able to launch job. return salome.rm.GetFittingResources(params) +def getNumberOfCoresForLocalhost(): + """ + Return total number of cores for all resources marked as able to run containers ( "canRunContainers" attribute + set to true in the CatalogResources.xml ). + """ + import LifeCycleCORBA + params = LifeCycleCORBA.ResourceParameters(can_run_containers=True) + resources = salome.rm.GetFittingResources(params) + return sum( [salome.rm.GetResourceDefinition(res).nb_proc_per_node for res in resources] ) -def init(resultDirectory = "/tmp"): +def init(resourceName, resultDirectory = "/tmp"): """ + + Instanciate a pydefx.Parameters intance that can be overriden right after. + Here some example of typical override of the returned object of this method : + + define a local directory for working files : + myParams.salome_parameters.result_directory with existing directory. If not myParams.createResultDirectory("/tmp") + + Define additionnal files necessary for computation. + They will be copied to the remote working directory. + This parameter is empty by default : + myParams.salome_parameters.in_files = [] + + Override computation ressource : + myParams.salome_parameters.resource_required.name) + + Override default # of parallel evaluation : + myParams.nb_branches + + Override number of cores requested by the job when job resource is a cluster (for job scheduler query) : + myParams.salome_parameters.resource_required.nb_proc + + Override number of computationnal nodes on cluster to be allocated when job resource is a cluster (for job scheduler query) : + myParams.salome_parameters.resource_required.nb_node + + Override working directory : + myParams.salome_parameters.work_directory + :param resourceName: Name of the resource matching one of the ${KERNEL_ROOT_DIR}/share/salome/resources/kernel/CatalogResources.xml :param resultDirectory: Directory used to transfer results + :return: a pydefx.Parameters instance """ - global myParams myParams = pydefx.Parameters() - - # pour cluster avec 10 noeuds de 28 coeurs chacuns - # - - # On positionne la resource qui va lancer le job qui inclu drive - # rmyParams.nb_branches = 4 - myParams.configureResource("localhost") + if resourceName not in getResourcesAbleToLaunchJobs(): + raise RuntimeError("Resource \"{}\" is not existing or not declared as able to launch job. Available resources are : {}".format(resourceName,str(getResourcesAbleToLaunchJobs()))) + myParams.configureResource(resourceName) myParams.createResultDirectory(resultDirectory) - myParams.nb_branches = 280 - myParams.salome_parameters. - # myParams.salome_parameters.work_directory = wd - pass - + if resourceName == "localhost": + myParams.nb_branches = getNumberOfCoresForLocalhost() + return myParams -- 2.39.2