From: Ovidiu Mircescu Date: Thu, 1 Oct 2020 08:31:26 +0000 (+0200) Subject: Rename noyacs to slurm. X-Git-Tag: V9_7_0b1~1^2~5 X-Git-Url: http://git.salome-platform.org/gitweb/?a=commitdiff_plain;h=607bbdaf1dcc711bf98d9e8923eeab055ac9a4b1;p=tools%2Fydefx.git Rename noyacs to slurm. --- diff --git a/src/pydefx/CMakeLists.txt b/src/pydefx/CMakeLists.txt index e228afa..8f16098 100644 --- a/src/pydefx/CMakeLists.txt +++ b/src/pydefx/CMakeLists.txt @@ -31,11 +31,11 @@ SET(SCRIPTS salome_proxy.py multijobbuilder.py multijobstudy.py - noyacsbuilder.py - noyacsstudy.py + slurmbuilder.py + slurmstudy.py ) INSTALL(FILES ${SCRIPTS} DESTINATION ${SALOME_INSTALL_PYTHON}/pydefx) ADD_SUBDIRECTORY(schemas) ADD_SUBDIRECTORY(multijob) -ADD_SUBDIRECTORY(noyacs) +ADD_SUBDIRECTORY(slurm) diff --git a/src/pydefx/__init__.py b/src/pydefx/__init__.py index a30a453..d59cde8 100644 --- a/src/pydefx/__init__.py +++ b/src/pydefx/__init__.py @@ -24,4 +24,4 @@ from .defaultschemabuilder import DefaultSchemaBuilder from .salome_proxy import forceSalomeServers, forceNoSalomeServers from .multijobstudy import MultiJobStudy -from .noyacsstudy import NoYacsStudy +from .slurmstudy import SlurmStudy diff --git a/src/pydefx/multijob/mainjob.py b/src/pydefx/multijob/mainjob.py index 2ff8bea..b9cfe3f 100644 --- a/src/pydefx/multijob/mainjob.py +++ b/src/pydefx/multijob/mainjob.py @@ -1,7 +1,7 @@ #! /usr/bin/env python3 import json import importlib -from multiprocessing.dummy import Pool +from multiprocessing import Pool import traceback class StartJob: diff --git a/src/pydefx/noyacs/CMakeLists.txt b/src/pydefx/noyacs/CMakeLists.txt deleted file mode 100644 index ccb015a..0000000 --- a/src/pydefx/noyacs/CMakeLists.txt +++ /dev/null @@ -1,7 +0,0 @@ -SET(SCHEMA_FILES - executor.py - mainjob.py - pointeval.py - ) - -INSTALL(FILES ${SCHEMA_FILES} DESTINATION ${SALOME_INSTALL_PYTHON}/pydefx/noyacs) diff --git a/src/pydefx/noyacs/executor.py b/src/pydefx/noyacs/executor.py deleted file mode 100644 index d7a3014..0000000 --- a/src/pydefx/noyacs/executor.py +++ /dev/null @@ -1,86 +0,0 @@ -#import pydefx -import os -import pickle -import time -import traceback -import subprocess - -class Context: - def __init__(self): - #self.launcher = pydefx.salome_proxy.getLauncher() # getLauncher() - pass - -class JobExecutor: - def __init__(self, config): - self.config = config - - def initialize(self): - """ - Execute prescript. - """ - pointeval = os.path.join(os.getcwd(), "pointeval.py") - os.chmod(pointeval, 0o755) - - def evaluate(self, idx, point): - """ This is executed for every point to be evaluated. - """ - context = Context() - error = None - out_values = None - try: - self.prepare(idx, point, context) - if self.noRunFound(idx, point, context): - self.runjob(idx, point, context) - error, out_values = self.getResult(context) - except Exception as e: - error = str(e) - traceback.print_exc() - return error, out_values - - def prepare(self, idx, point, context): - """ - Define local and remote work directory. - Define job script. - """ - root_dir = os.getcwd() - point_name = "job_"+str(idx) - context.local_dir = os.path.join(root_dir, point_name) - if not os.path.exists(context.local_dir): - os.mkdir(context.local_dir) - # export the point to a file - data_file_name = "idefixdata.csv" - data_file_path = os.path.join(context.local_dir, data_file_name) - with open(data_file_path, "w") as f: - # explicit dict convertion is needed for compatibility between python versions - f.write(repr(dict(point))) - - - def noRunFound(self, idx, point, context): - return True - - def runjob(self, idx, point, context): - """ - Create, launch and wait for the end of the job. - """ - # srun - pointeval = os.path.join(os.getcwd(), "pointeval.py") - command = "srun --ntasks=1 --nodes=1 --chdir={} {} ".format(context.local_dir, - pointeval) - return_code = subprocess.call(command, shell=True) - - def getResult(self, context): - """ - Check the job state, fetch the result file. - """ - error_file = os.path.join(context.local_dir, "idefixerror.txt") - result_file = os.path.join(context.local_dir, "idefixresult.txt") - with open(error_file, "r") as f: - error = f.read() - with open(result_file, "r") as f: - result_str = f.read() - result = eval(result_str) - - return error, result - -def createExecutor(config): - return JobExecutor(config) diff --git a/src/pydefx/noyacs/mainjob.py b/src/pydefx/noyacs/mainjob.py deleted file mode 100644 index 2ff8bea..0000000 --- a/src/pydefx/noyacs/mainjob.py +++ /dev/null @@ -1,56 +0,0 @@ -#! /usr/bin/env python3 -import json -import importlib -from multiprocessing.dummy import Pool -import traceback - -class StartJob: - def __init__(self, executor): - self.executor = executor - - def __call__(self, idx, in_values): - error=None - out_values=None - try: - error, out_values = self.executor.evaluate(idx, in_values) - except Exception as e: - error=str(e) - traceback.print_exc() - return idx, in_values, out_values, error - -class TerminateJob: - def __init__(self, manager): - self.manager = manager - - def __call__(self, result): - # without try statement we may experience deadlock in case of error. - try: - idx, in_values, out_values, error = result - if not error: - error = None - self.manager.addResult(idx, in_values, out_values, error) - except Exception as e: - traceback.print_exc() - -if __name__ == '__main__': - with open("idefixconfig.json", "r") as f: - config = json.load(f) - plugin_module = importlib.import_module(config["plugin"]) - executor = plugin_module.createExecutor(config) - # global initialization - commun work for every evaluation. - executor.initialize() - - itModuleName = config["sampleIterator"] - itModule = importlib.import_module(itModuleName) - sampleManager = itModule.SampleIterator() - sampleManager.writeHeaders() - - nbbranches=config["nbbranches"] - pool = Pool(nbbranches) - runPoint = StartJob(executor) - endOk = TerminateJob(sampleManager) - for point in sampleManager: - pool.apply_async(runPoint, point, callback=endOk) - pool.close() - pool.join() - sampleManager.terminate() diff --git a/src/pydefx/noyacs/pointeval.py b/src/pydefx/noyacs/pointeval.py deleted file mode 100644 index c652b26..0000000 --- a/src/pydefx/noyacs/pointeval.py +++ /dev/null @@ -1,37 +0,0 @@ -#! /usr/bin/env python3 -import traceback -import os - -data_file_name = "idefixdata.csv" -study_module = "idefixstudy.py" -error_result = "idefixerror.txt" -value_result = "idefixresult.txt" -traceback_result = "idefixtraceback.txt" - -with open(data_file_name, "r") as f: - values = f.read() -inputvals = eval(values) - -error="" -result=None -old_dir = os.getcwd() - -try: - os.chdir("..") # go to commun root directory - with open(study_module, "r") as study_file: - study_string = study_file.read() - exec(study_string) - result = _exec(**inputvals) -except Exception as e: - error=str(e) - os.chdir(old_dir) # back to the current case job directory - with open(traceback_result, "w") as f: - traceback.print_exc(file=f) - -os.chdir(old_dir) # back to the current case job directory - -with open(error_result, "w") as f: - f.write(error) - -with open(value_result, "w") as f: - f.write(repr(result)) diff --git a/src/pydefx/noyacsbuilder.py b/src/pydefx/noyacsbuilder.py deleted file mode 100644 index 81ffe65..0000000 --- a/src/pydefx/noyacsbuilder.py +++ /dev/null @@ -1,56 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright (C) 2019 EDF R&D -# -# This library is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License as published by the Free Software Foundation; either -# version 2.1 of the License, or (at your option) any later version. -# -# This library is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public -# License along with this library; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -# -# See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com -# -import inspect -import pathlib -import os - -class NoYacsBuilder: - def __init__(self, executor = None, pointEval = None, mainJob = None): - filename = inspect.getframeinfo(inspect.currentframe()).filename - install_root_directory = pathlib.Path(filename).resolve().parent - install_files_directory = os.path.join(install_root_directory, "noyacs") - - if executor is None: - executor = os.path.join(install_files_directory, "executor.py") - self.executor = executor - - if pointEval is None: - pointEval = os.path.join(install_files_directory, "pointeval.py") - self.pointEval = pointEval - - if mainJob is None: - mainJob = os.path.join(install_files_directory, "mainjob.py") - self.mainJob = mainJob - - def getMainJob(self): - return self.mainJob - - def getExecutor(self): - return self.executor - - def getPointEval(self): - return self.pointEval - - def getPluginName(self): - basename = os.path.basename(self.executor) - if not basename.endswith(".py"): - raise Exception("File name {} does not end with '.py'.".format( - self.executor)) - return basename[:-3] diff --git a/src/pydefx/noyacsstudy.py b/src/pydefx/noyacsstudy.py deleted file mode 100644 index 91e1579..0000000 --- a/src/pydefx/noyacsstudy.py +++ /dev/null @@ -1,77 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright (C) 2019 EDF R&D -# -# This library is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License as published by the Free Software Foundation; either -# version 2.1 of the License, or (at your option) any later version. -# -# This library is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public -# License along with this library; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -# -# See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com -# -import copy -import os -import json -from . import pystudy -from . import noyacsbuilder -from . import salome_proxy - - -class NoYacsStudy(pystudy.PyStudy): - def __init__(self, sampleManager=None, schemaBuilder=None): - if schemaBuilder is None: - schemaBuilder = noyacsbuilder.NoYacsBuilder() - super().__init__(sampleManager, schemaBuilder) - - def createNewJob(self, script, sample, params): - # TODO: modifier le copier/coller - self._check(script,sample) - self.sample = sample - self.params = copy.deepcopy(params) - main_job_work_dir = self.params.salome_parameters.result_directory - if not os.path.exists(main_job_work_dir): - os.makedirs(main_job_work_dir) - # set the parameters of the local job - self.params.salome_parameters.job_type = self.jobType() - - result_directory = self.params.salome_parameters.result_directory - # export sample to result_directory - inputFiles = self.sampleManager.prepareRun(self.sample, result_directory) - inputFiles.extend([self.schemaBuilder.getExecutor(), - self.schemaBuilder.getPointEval()]) - self.params.salome_parameters.job_file = self.schemaBuilder.getMainJob() - - # export config - configpath = os.path.join(result_directory, "idefixconfig.json") - dicconfig = {} - dicconfig["nbbranches"] = self.params.nb_branches - dicconfig["studymodule"] = "idefixstudy" - dicconfig["sampleIterator"] = self.sampleManager.getModuleName() - dicconfig["plugin"] = self.schemaBuilder.getPluginName() - with open(configpath, "w") as f: - json.dump(dicconfig, f, indent=2) - studypath = os.path.join(result_directory, "idefixstudy.py") - with open(studypath, "w") as f: - f.write(script.script) - - inputFiles.extend([configpath, studypath]) - - # this list manipulation is needed because in_files is not a python list - # if we don't use a salome session. In that case swig uses a python tuple - # in order to map a std::list as a parameter of a structure. - in_files_as_list = list(self.params.salome_parameters.in_files) - self.params.salome_parameters.in_files = in_files_as_list + inputFiles - launcher = salome_proxy.getLauncher() - self.job_id = launcher.createJob(self.params.salome_parameters) - return self.job_id - - def jobType(self): - return "command_salome" diff --git a/src/pydefx/slurm/CMakeLists.txt b/src/pydefx/slurm/CMakeLists.txt new file mode 100644 index 0000000..78b99a1 --- /dev/null +++ b/src/pydefx/slurm/CMakeLists.txt @@ -0,0 +1,7 @@ +SET(SCHEMA_FILES + executor.py + mainjob.py + pointeval.py + ) + +INSTALL(FILES ${SCHEMA_FILES} DESTINATION ${SALOME_INSTALL_PYTHON}/pydefx/slurm) diff --git a/src/pydefx/slurm/executor.py b/src/pydefx/slurm/executor.py new file mode 100644 index 0000000..a779c92 --- /dev/null +++ b/src/pydefx/slurm/executor.py @@ -0,0 +1,89 @@ +#import pydefx +import os +import pickle +import time +import traceback +import subprocess + +class Context: + def __init__(self): + #self.launcher = pydefx.salome_proxy.getLauncher() # getLauncher() + pass + +class JobExecutor: + def __init__(self, config): + self.config = config + + def initialize(self): + """ + Execute prescript. + """ + pointeval = os.path.join(os.getcwd(), "pointeval.py") + os.chmod(pointeval, 0o755) + + def evaluate(self, idx, point): + """ This is executed for every point to be evaluated. + """ + context = Context() + error = None + out_values = None + try: + self.prepare(idx, point, context) + if self.noRunFound(idx, point, context): + self.runjob(idx, point, context) + error, out_values = self.getResult(context) + except Exception as e: + error = str(e) + traceback.print_exc() + return error, out_values + + def prepare(self, idx, point, context): + """ + Define local and remote work directory. + Define job script. + """ + root_dir = os.getcwd() + point_name = "job_"+str(idx) + context.local_dir = os.path.join(root_dir, point_name) + if not os.path.exists(context.local_dir): + os.mkdir(context.local_dir) + # export the point to a file + data_file_name = "idefixdata.csv" + data_file_path = os.path.join(context.local_dir, data_file_name) + with open(data_file_path, "w") as f: + # explicit dict convertion is needed for compatibility between python versions + f.write(repr(dict(point))) + + + def noRunFound(self, idx, point, context): + return True + + def runjob(self, idx, point, context): + """ + Create, launch and wait for the end of the job. + """ + # srun + ntasks = self.config["tasksPerEval"] + pointeval = os.path.join(os.getcwd(), "pointeval.py") + command = "srun --ntasks={} --nodes=1 --chdir={} {} ".format( + str(ntasks), + context.local_dir, + pointeval) + return_code = subprocess.call(command, shell=True) + + def getResult(self, context): + """ + Check the job state, fetch the result file. + """ + error_file = os.path.join(context.local_dir, "idefixerror.txt") + result_file = os.path.join(context.local_dir, "idefixresult.txt") + with open(error_file, "r") as f: + error = f.read() + with open(result_file, "r") as f: + result_str = f.read() + result = eval(result_str) + + return error, result + +def createExecutor(config): + return JobExecutor(config) diff --git a/src/pydefx/slurm/mainjob.py b/src/pydefx/slurm/mainjob.py new file mode 100644 index 0000000..b9cfe3f --- /dev/null +++ b/src/pydefx/slurm/mainjob.py @@ -0,0 +1,56 @@ +#! /usr/bin/env python3 +import json +import importlib +from multiprocessing import Pool +import traceback + +class StartJob: + def __init__(self, executor): + self.executor = executor + + def __call__(self, idx, in_values): + error=None + out_values=None + try: + error, out_values = self.executor.evaluate(idx, in_values) + except Exception as e: + error=str(e) + traceback.print_exc() + return idx, in_values, out_values, error + +class TerminateJob: + def __init__(self, manager): + self.manager = manager + + def __call__(self, result): + # without try statement we may experience deadlock in case of error. + try: + idx, in_values, out_values, error = result + if not error: + error = None + self.manager.addResult(idx, in_values, out_values, error) + except Exception as e: + traceback.print_exc() + +if __name__ == '__main__': + with open("idefixconfig.json", "r") as f: + config = json.load(f) + plugin_module = importlib.import_module(config["plugin"]) + executor = plugin_module.createExecutor(config) + # global initialization - commun work for every evaluation. + executor.initialize() + + itModuleName = config["sampleIterator"] + itModule = importlib.import_module(itModuleName) + sampleManager = itModule.SampleIterator() + sampleManager.writeHeaders() + + nbbranches=config["nbbranches"] + pool = Pool(nbbranches) + runPoint = StartJob(executor) + endOk = TerminateJob(sampleManager) + for point in sampleManager: + pool.apply_async(runPoint, point, callback=endOk) + pool.close() + pool.join() + sampleManager.terminate() diff --git a/src/pydefx/slurm/pointeval.py b/src/pydefx/slurm/pointeval.py new file mode 100644 index 0000000..c652b26 --- /dev/null +++ b/src/pydefx/slurm/pointeval.py @@ -0,0 +1,37 @@ +#! /usr/bin/env python3 +import traceback +import os + +data_file_name = "idefixdata.csv" +study_module = "idefixstudy.py" +error_result = "idefixerror.txt" +value_result = "idefixresult.txt" +traceback_result = "idefixtraceback.txt" + +with open(data_file_name, "r") as f: + values = f.read() +inputvals = eval(values) + +error="" +result=None +old_dir = os.getcwd() + +try: + os.chdir("..") # go to commun root directory + with open(study_module, "r") as study_file: + study_string = study_file.read() + exec(study_string) + result = _exec(**inputvals) +except Exception as e: + error=str(e) + os.chdir(old_dir) # back to the current case job directory + with open(traceback_result, "w") as f: + traceback.print_exc(file=f) + +os.chdir(old_dir) # back to the current case job directory + +with open(error_result, "w") as f: + f.write(error) + +with open(value_result, "w") as f: + f.write(repr(result)) diff --git a/src/pydefx/slurmbuilder.py b/src/pydefx/slurmbuilder.py new file mode 100644 index 0000000..3f8eae2 --- /dev/null +++ b/src/pydefx/slurmbuilder.py @@ -0,0 +1,56 @@ +# -*- coding: utf-8 -*- +# Copyright (C) 2019 EDF R&D +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com +# +import inspect +import pathlib +import os + +class SlurmBuilder: + def __init__(self, executor = None, pointEval = None, mainJob = None): + filename = inspect.getframeinfo(inspect.currentframe()).filename + install_root_directory = pathlib.Path(filename).resolve().parent + install_files_directory = os.path.join(install_root_directory, "slurm") + + if executor is None: + executor = os.path.join(install_files_directory, "executor.py") + self.executor = executor + + if pointEval is None: + pointEval = os.path.join(install_files_directory, "pointeval.py") + self.pointEval = pointEval + + if mainJob is None: + mainJob = os.path.join(install_files_directory, "mainjob.py") + self.mainJob = mainJob + + def getMainJob(self): + return self.mainJob + + def getExecutor(self): + return self.executor + + def getPointEval(self): + return self.pointEval + + def getPluginName(self): + basename = os.path.basename(self.executor) + if not basename.endswith(".py"): + raise Exception("File name {} does not end with '.py'.".format( + self.executor)) + return basename[:-3] diff --git a/src/pydefx/slurmstudy.py b/src/pydefx/slurmstudy.py new file mode 100644 index 0000000..8be211b --- /dev/null +++ b/src/pydefx/slurmstudy.py @@ -0,0 +1,79 @@ +# -*- coding: utf-8 -*- +# Copyright (C) 2019 EDF R&D +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com +# +import copy +import os +import json +from . import pystudy +from . import slurmbuilder +from . import salome_proxy + + +class SlurmStudy(pystudy.PyStudy): + def __init__(self, sampleManager=None, schemaBuilder=None): + if schemaBuilder is None: + schemaBuilder = slurmbuilder.SlurmBuilder() + super().__init__(sampleManager, schemaBuilder) + + def createNewJob(self, script, sample, params): + # TODO: modifier le copier/coller + self._check(script,sample) + self.sample = sample + self.params = copy.deepcopy(params) + main_job_work_dir = self.params.salome_parameters.result_directory + if not os.path.exists(main_job_work_dir): + os.makedirs(main_job_work_dir) + # set the parameters of the local job + self.params.salome_parameters.job_type = self.jobType() + + result_directory = self.params.salome_parameters.result_directory + # export sample to result_directory + inputFiles = self.sampleManager.prepareRun(self.sample, result_directory) + inputFiles.extend([self.schemaBuilder.getExecutor(), + self.schemaBuilder.getPointEval()]) + self.params.salome_parameters.job_file = self.schemaBuilder.getMainJob() + + # export config + configpath = os.path.join(result_directory, "idefixconfig.json") + dicconfig = {} + dicconfig["nbbranches"] = self.params.nb_branches + dicconfig["studymodule"] = "idefixstudy" + dicconfig["sampleIterator"] = self.sampleManager.getModuleName() + dicconfig["plugin"] = self.schemaBuilder.getPluginName() + nbproc = self.params.salome_parameters.resource_required.nb_proc + dicconfig["tasksPerEval"] = nbproc // self.params.nb_branches + with open(configpath, "w") as f: + json.dump(dicconfig, f, indent=2) + studypath = os.path.join(result_directory, "idefixstudy.py") + with open(studypath, "w") as f: + f.write(script.script) + + inputFiles.extend([configpath, studypath]) + + # this list manipulation is needed because in_files is not a python list + # if we don't use a salome session. In that case swig uses a python tuple + # in order to map a std::list as a parameter of a structure. + in_files_as_list = list(self.params.salome_parameters.in_files) + self.params.salome_parameters.in_files = in_files_as_list + inputFiles + launcher = salome_proxy.getLauncher() + self.job_id = launcher.createJob(self.params.salome_parameters) + return self.job_id + + def jobType(self): + return "command_salome"