import salome
salome.salome_init()
-myParams = None
-
class Pool:
- def __init__(self):
+ def __init__(self, params):
+ """
+ :param params: parameter of the job
+ :type params: pydefx.Parameters
+ """
self.myStudy = None
self.myScript = pydefx.PyScript()
self.mySample = None
+ self.myParams = params
def map(self, func, iterable):
- global myParams
if len(iterable) == 0:
return []
#
st = None
with open(fModule.__file__,"r") as ff:
st = ff.read()
+ # retrieve the varname holding the function in its module
fStr = func.__code__.co_name
- """fStr = None
- for elt in dir(fModule):
- if eval(elt) == f:
- fStr = elt
- print(fStr)
- break"""
if fStr is None:
raise RuntimeError("Impossible to locate function in the module containing it !")
+ # retrieve args of the func passed in input
fArgs = inspect.getfullargspec(func).args
+ # agregate the content of the Python module containing the function with expected _exec function for pydfx
pyScript = """{}
def _exec({}):
yxyx = {}({})
#
self.myScript.loadString(pyScript)
self.mySample = self.myScript.CreateEmptySample()
- #
+ # management of the case of single input
if not hasattr(iterable[0],"__iter__"):
iterable = [[elt] for elt in iterable]
#
self.mySample.setInputValues( {k:v for k,*v in zip(fArgs,*iterable)} )
#
- self.myStudy.createNewJob(self.myScript, self.mySample, myParams)
+ self.myStudy.createNewJob(self.myScript, self.mySample, self.myParams)
#
self.myStudy.launch()
self.myStudy.wait()
+ # ask for result : this call implicitely copy back results to the client
self.myStudy.getResult()
#
- ret = [elt for elt in zip(*[self.myStudy.sample._output[n] for n in self.myStudy.sample.getOutputNames()])]
- if len(self.myStudy.sample.getOutputNames()) == 1:
- ret = [elt[0] for elt in ret]
- return ret
+ if self.myStudy.getJobState() == "FINISHED" and self.myStudy.global_result.exit_code == "0":
+ messageFromSlaves = self.myStudy.sample._messages
+ if all( [elt == "" for elt in messageFromSlaves] ):
+ ret = [elt for elt in zip(*[self.myStudy.sample._output[n] for n in self.myStudy.sample.getOutputNames()])]
+ if len(self.myStudy.sample.getOutputNames()) == 1:
+ ret = [elt[0] for elt in ret]
+ return ret
+ else:
+ excMsg = "\n".join(["Error for sample # {} : \'{}\' ".format(i,elt) for i,elt in enumerate(messageFromSlaves) if elt != ""])
+ raise RuntimeError( excMsg )
+ else:
+ raise RuntimeError("Error during job submission or during the driver execution (that should never happend)")
def __enter__(self):
self.myStudy = pydefx.PyStudy()
# ask to resource manager to filter among all resources listed in CatalogResources those able to launch job.
return salome.rm.GetFittingResources(params)
+def getNumberOfCoresForLocalhost():
+ """
+ Return total number of cores for all resources marked as able to run containers ( "canRunContainers" attribute
+ set to true in the CatalogResources.xml ).
+ """
+ import LifeCycleCORBA
+ params = LifeCycleCORBA.ResourceParameters(can_run_containers=True)
+ resources = salome.rm.GetFittingResources(params)
+ return sum( [salome.rm.GetResourceDefinition(res).nb_proc_per_node for res in resources] )
-def init(resultDirectory = "/tmp"):
+def init(resourceName, resultDirectory = "/tmp"):
"""
+
+ Instanciate a pydefx.Parameters intance that can be overriden right after.
+ Here some example of typical override of the returned object of this method :
+
+ define a local directory for working files :
+ myParams.salome_parameters.result_directory with existing directory. If not myParams.createResultDirectory("/tmp")
+
+ Define additionnal files necessary for computation.
+ They will be copied to the remote working directory.
+ This parameter is empty by default :
+ myParams.salome_parameters.in_files = []
+
+ Override computation ressource :
+ myParams.salome_parameters.resource_required.name)
+
+ Override default # of parallel evaluation :
+ myParams.nb_branches
+
+ Override number of cores requested by the job when job resource is a cluster (for job scheduler query) :
+ myParams.salome_parameters.resource_required.nb_proc
+
+ Override number of computationnal nodes on cluster to be allocated when job resource is a cluster (for job scheduler query) :
+ myParams.salome_parameters.resource_required.nb_node
+
+ Override working directory :
+ myParams.salome_parameters.work_directory
+
:param resourceName: Name of the resource matching one of the ${KERNEL_ROOT_DIR}/share/salome/resources/kernel/CatalogResources.xml
:param resultDirectory: Directory used to transfer results
+ :return: a pydefx.Parameters instance
"""
- global myParams
myParams = pydefx.Parameters()
-
- # pour cluster avec 10 noeuds de 28 coeurs chacuns
- #
-
- # On positionne la resource qui va lancer le job qui inclu drive
- # rmyParams.nb_branches = 4
- myParams.configureResource("localhost")
+ if resourceName not in getResourcesAbleToLaunchJobs():
+ raise RuntimeError("Resource \"{}\" is not existing or not declared as able to launch job. Available resources are : {}".format(resourceName,str(getResourcesAbleToLaunchJobs())))
+ myParams.configureResource(resourceName)
myParams.createResultDirectory(resultDirectory)
- myParams.nb_branches = 280
- myParams.salome_parameters.
- # myParams.salome_parameters.work_directory = wd
- pass
-
+ if resourceName == "localhost":
+ myParams.nb_branches = getNumberOfCoresForLocalhost()
+ return myParams