salome_proxy.py
multijobbuilder.py
multijobstudy.py
- noyacsbuilder.py
- noyacsstudy.py
+ slurmbuilder.py
+ slurmstudy.py
)
INSTALL(FILES ${SCRIPTS} DESTINATION ${SALOME_INSTALL_PYTHON}/pydefx)
ADD_SUBDIRECTORY(schemas)
ADD_SUBDIRECTORY(multijob)
-ADD_SUBDIRECTORY(noyacs)
+ADD_SUBDIRECTORY(slurm)
from .salome_proxy import forceSalomeServers, forceNoSalomeServers
from .multijobstudy import MultiJobStudy
-from .noyacsstudy import NoYacsStudy
+from .slurmstudy import SlurmStudy
#! /usr/bin/env python3
import json
import importlib
-from multiprocessing.dummy import Pool
+from multiprocessing import Pool
import traceback
class StartJob:
+++ /dev/null
-SET(SCHEMA_FILES
- executor.py
- mainjob.py
- pointeval.py
- )
-
-INSTALL(FILES ${SCHEMA_FILES} DESTINATION ${SALOME_INSTALL_PYTHON}/pydefx/noyacs)
+++ /dev/null
-#import pydefx
-import os
-import pickle
-import time
-import traceback
-import subprocess
-
-class Context:
- def __init__(self):
- #self.launcher = pydefx.salome_proxy.getLauncher() # getLauncher()
- pass
-
-class JobExecutor:
- def __init__(self, config):
- self.config = config
-
- def initialize(self):
- """
- Execute prescript.
- """
- pointeval = os.path.join(os.getcwd(), "pointeval.py")
- os.chmod(pointeval, 0o755)
-
- def evaluate(self, idx, point):
- """ This is executed for every point to be evaluated.
- """
- context = Context()
- error = None
- out_values = None
- try:
- self.prepare(idx, point, context)
- if self.noRunFound(idx, point, context):
- self.runjob(idx, point, context)
- error, out_values = self.getResult(context)
- except Exception as e:
- error = str(e)
- traceback.print_exc()
- return error, out_values
-
- def prepare(self, idx, point, context):
- """
- Define local and remote work directory.
- Define job script.
- """
- root_dir = os.getcwd()
- point_name = "job_"+str(idx)
- context.local_dir = os.path.join(root_dir, point_name)
- if not os.path.exists(context.local_dir):
- os.mkdir(context.local_dir)
- # export the point to a file
- data_file_name = "idefixdata.csv"
- data_file_path = os.path.join(context.local_dir, data_file_name)
- with open(data_file_path, "w") as f:
- # explicit dict convertion is needed for compatibility between python versions
- f.write(repr(dict(point)))
-
-
- def noRunFound(self, idx, point, context):
- return True
-
- def runjob(self, idx, point, context):
- """
- Create, launch and wait for the end of the job.
- """
- # srun
- pointeval = os.path.join(os.getcwd(), "pointeval.py")
- command = "srun --ntasks=1 --nodes=1 --chdir={} {} ".format(context.local_dir,
- pointeval)
- return_code = subprocess.call(command, shell=True)
-
- def getResult(self, context):
- """
- Check the job state, fetch the result file.
- """
- error_file = os.path.join(context.local_dir, "idefixerror.txt")
- result_file = os.path.join(context.local_dir, "idefixresult.txt")
- with open(error_file, "r") as f:
- error = f.read()
- with open(result_file, "r") as f:
- result_str = f.read()
- result = eval(result_str)
-
- return error, result
-
-def createExecutor(config):
- return JobExecutor(config)
+++ /dev/null
-#! /usr/bin/env python3
-import json
-import importlib
-from multiprocessing.dummy import Pool
-import traceback
-
-class StartJob:
- def __init__(self, executor):
- self.executor = executor
-
- def __call__(self, idx, in_values):
- error=None
- out_values=None
- try:
- error, out_values = self.executor.evaluate(idx, in_values)
- except Exception as e:
- error=str(e)
- traceback.print_exc()
- return idx, in_values, out_values, error
-
-class TerminateJob:
- def __init__(self, manager):
- self.manager = manager
-
- def __call__(self, result):
- # without try statement we may experience deadlock in case of error.
- try:
- idx, in_values, out_values, error = result
- if not error:
- error = None
- self.manager.addResult(idx, in_values, out_values, error)
- except Exception as e:
- traceback.print_exc()
-
-if __name__ == '__main__':
- with open("idefixconfig.json", "r") as f:
- config = json.load(f)
- plugin_module = importlib.import_module(config["plugin"])
- executor = plugin_module.createExecutor(config)
- # global initialization - commun work for every evaluation.
- executor.initialize()
-
- itModuleName = config["sampleIterator"]
- itModule = importlib.import_module(itModuleName)
- sampleManager = itModule.SampleIterator()
- sampleManager.writeHeaders()
-
- nbbranches=config["nbbranches"]
- pool = Pool(nbbranches)
- runPoint = StartJob(executor)
- endOk = TerminateJob(sampleManager)
- for point in sampleManager:
- pool.apply_async(runPoint, point, callback=endOk)
- pool.close()
- pool.join()
- sampleManager.terminate()
+++ /dev/null
-#! /usr/bin/env python3
-import traceback
-import os
-
-data_file_name = "idefixdata.csv"
-study_module = "idefixstudy.py"
-error_result = "idefixerror.txt"
-value_result = "idefixresult.txt"
-traceback_result = "idefixtraceback.txt"
-
-with open(data_file_name, "r") as f:
- values = f.read()
-inputvals = eval(values)
-
-error=""
-result=None
-old_dir = os.getcwd()
-
-try:
- os.chdir("..") # go to commun root directory
- with open(study_module, "r") as study_file:
- study_string = study_file.read()
- exec(study_string)
- result = _exec(**inputvals)
-except Exception as e:
- error=str(e)
- os.chdir(old_dir) # back to the current case job directory
- with open(traceback_result, "w") as f:
- traceback.print_exc(file=f)
-
-os.chdir(old_dir) # back to the current case job directory
-
-with open(error_result, "w") as f:
- f.write(error)
-
-with open(value_result, "w") as f:
- f.write(repr(result))
+++ /dev/null
-# -*- coding: utf-8 -*-
-# Copyright (C) 2019 EDF R&D
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2.1 of the License, or (at your option) any later version.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-#
-# See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
-#
-import inspect
-import pathlib
-import os
-
-class NoYacsBuilder:
- def __init__(self, executor = None, pointEval = None, mainJob = None):
- filename = inspect.getframeinfo(inspect.currentframe()).filename
- install_root_directory = pathlib.Path(filename).resolve().parent
- install_files_directory = os.path.join(install_root_directory, "noyacs")
-
- if executor is None:
- executor = os.path.join(install_files_directory, "executor.py")
- self.executor = executor
-
- if pointEval is None:
- pointEval = os.path.join(install_files_directory, "pointeval.py")
- self.pointEval = pointEval
-
- if mainJob is None:
- mainJob = os.path.join(install_files_directory, "mainjob.py")
- self.mainJob = mainJob
-
- def getMainJob(self):
- return self.mainJob
-
- def getExecutor(self):
- return self.executor
-
- def getPointEval(self):
- return self.pointEval
-
- def getPluginName(self):
- basename = os.path.basename(self.executor)
- if not basename.endswith(".py"):
- raise Exception("File name {} does not end with '.py'.".format(
- self.executor))
- return basename[:-3]
+++ /dev/null
-# -*- coding: utf-8 -*-
-# Copyright (C) 2019 EDF R&D
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2.1 of the License, or (at your option) any later version.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-#
-# See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
-#
-import copy
-import os
-import json
-from . import pystudy
-from . import noyacsbuilder
-from . import salome_proxy
-
-
-class NoYacsStudy(pystudy.PyStudy):
- def __init__(self, sampleManager=None, schemaBuilder=None):
- if schemaBuilder is None:
- schemaBuilder = noyacsbuilder.NoYacsBuilder()
- super().__init__(sampleManager, schemaBuilder)
-
- def createNewJob(self, script, sample, params):
- # TODO: modifier le copier/coller
- self._check(script,sample)
- self.sample = sample
- self.params = copy.deepcopy(params)
- main_job_work_dir = self.params.salome_parameters.result_directory
- if not os.path.exists(main_job_work_dir):
- os.makedirs(main_job_work_dir)
- # set the parameters of the local job
- self.params.salome_parameters.job_type = self.jobType()
-
- result_directory = self.params.salome_parameters.result_directory
- # export sample to result_directory
- inputFiles = self.sampleManager.prepareRun(self.sample, result_directory)
- inputFiles.extend([self.schemaBuilder.getExecutor(),
- self.schemaBuilder.getPointEval()])
- self.params.salome_parameters.job_file = self.schemaBuilder.getMainJob()
-
- # export config
- configpath = os.path.join(result_directory, "idefixconfig.json")
- dicconfig = {}
- dicconfig["nbbranches"] = self.params.nb_branches
- dicconfig["studymodule"] = "idefixstudy"
- dicconfig["sampleIterator"] = self.sampleManager.getModuleName()
- dicconfig["plugin"] = self.schemaBuilder.getPluginName()
- with open(configpath, "w") as f:
- json.dump(dicconfig, f, indent=2)
- studypath = os.path.join(result_directory, "idefixstudy.py")
- with open(studypath, "w") as f:
- f.write(script.script)
-
- inputFiles.extend([configpath, studypath])
-
- # this list manipulation is needed because in_files is not a python list
- # if we don't use a salome session. In that case swig uses a python tuple
- # in order to map a std::list as a parameter of a structure.
- in_files_as_list = list(self.params.salome_parameters.in_files)
- self.params.salome_parameters.in_files = in_files_as_list + inputFiles
- launcher = salome_proxy.getLauncher()
- self.job_id = launcher.createJob(self.params.salome_parameters)
- return self.job_id
-
- def jobType(self):
- return "command_salome"
--- /dev/null
+SET(SCHEMA_FILES
+ executor.py
+ mainjob.py
+ pointeval.py
+ )
+
+INSTALL(FILES ${SCHEMA_FILES} DESTINATION ${SALOME_INSTALL_PYTHON}/pydefx/slurm)
--- /dev/null
+#import pydefx
+import os
+import pickle
+import time
+import traceback
+import subprocess
+
+class Context:
+ def __init__(self):
+ #self.launcher = pydefx.salome_proxy.getLauncher() # getLauncher()
+ pass
+
+class JobExecutor:
+ def __init__(self, config):
+ self.config = config
+
+ def initialize(self):
+ """
+ Execute prescript.
+ """
+ pointeval = os.path.join(os.getcwd(), "pointeval.py")
+ os.chmod(pointeval, 0o755)
+
+ def evaluate(self, idx, point):
+ """ This is executed for every point to be evaluated.
+ """
+ context = Context()
+ error = None
+ out_values = None
+ try:
+ self.prepare(idx, point, context)
+ if self.noRunFound(idx, point, context):
+ self.runjob(idx, point, context)
+ error, out_values = self.getResult(context)
+ except Exception as e:
+ error = str(e)
+ traceback.print_exc()
+ return error, out_values
+
+ def prepare(self, idx, point, context):
+ """
+ Define local and remote work directory.
+ Define job script.
+ """
+ root_dir = os.getcwd()
+ point_name = "job_"+str(idx)
+ context.local_dir = os.path.join(root_dir, point_name)
+ if not os.path.exists(context.local_dir):
+ os.mkdir(context.local_dir)
+ # export the point to a file
+ data_file_name = "idefixdata.csv"
+ data_file_path = os.path.join(context.local_dir, data_file_name)
+ with open(data_file_path, "w") as f:
+ # explicit dict convertion is needed for compatibility between python versions
+ f.write(repr(dict(point)))
+
+
+ def noRunFound(self, idx, point, context):
+ return True
+
+ def runjob(self, idx, point, context):
+ """
+ Create, launch and wait for the end of the job.
+ """
+ # srun
+ ntasks = self.config["tasksPerEval"]
+ pointeval = os.path.join(os.getcwd(), "pointeval.py")
+ command = "srun --ntasks={} --nodes=1 --chdir={} {} ".format(
+ str(ntasks),
+ context.local_dir,
+ pointeval)
+ return_code = subprocess.call(command, shell=True)
+
+ def getResult(self, context):
+ """
+ Check the job state, fetch the result file.
+ """
+ error_file = os.path.join(context.local_dir, "idefixerror.txt")
+ result_file = os.path.join(context.local_dir, "idefixresult.txt")
+ with open(error_file, "r") as f:
+ error = f.read()
+ with open(result_file, "r") as f:
+ result_str = f.read()
+ result = eval(result_str)
+
+ return error, result
+
+def createExecutor(config):
+ return JobExecutor(config)
--- /dev/null
+#! /usr/bin/env python3
+import json
+import importlib
+from multiprocessing import Pool
+import traceback
+
+class StartJob:
+ def __init__(self, executor):
+ self.executor = executor
+
+ def __call__(self, idx, in_values):
+ error=None
+ out_values=None
+ try:
+ error, out_values = self.executor.evaluate(idx, in_values)
+ except Exception as e:
+ error=str(e)
+ traceback.print_exc()
+ return idx, in_values, out_values, error
+
+class TerminateJob:
+ def __init__(self, manager):
+ self.manager = manager
+
+ def __call__(self, result):
+ # without try statement we may experience deadlock in case of error.
+ try:
+ idx, in_values, out_values, error = result
+ if not error:
+ error = None
+ self.manager.addResult(idx, in_values, out_values, error)
+ except Exception as e:
+ traceback.print_exc()
+
+if __name__ == '__main__':
+ with open("idefixconfig.json", "r") as f:
+ config = json.load(f)
+ plugin_module = importlib.import_module(config["plugin"])
+ executor = plugin_module.createExecutor(config)
+ # global initialization - commun work for every evaluation.
+ executor.initialize()
+
+ itModuleName = config["sampleIterator"]
+ itModule = importlib.import_module(itModuleName)
+ sampleManager = itModule.SampleIterator()
+ sampleManager.writeHeaders()
+
+ nbbranches=config["nbbranches"]
+ pool = Pool(nbbranches)
+ runPoint = StartJob(executor)
+ endOk = TerminateJob(sampleManager)
+ for point in sampleManager:
+ pool.apply_async(runPoint, point, callback=endOk)
+ pool.close()
+ pool.join()
+ sampleManager.terminate()
--- /dev/null
+#! /usr/bin/env python3
+import traceback
+import os
+
+data_file_name = "idefixdata.csv"
+study_module = "idefixstudy.py"
+error_result = "idefixerror.txt"
+value_result = "idefixresult.txt"
+traceback_result = "idefixtraceback.txt"
+
+with open(data_file_name, "r") as f:
+ values = f.read()
+inputvals = eval(values)
+
+error=""
+result=None
+old_dir = os.getcwd()
+
+try:
+ os.chdir("..") # go to commun root directory
+ with open(study_module, "r") as study_file:
+ study_string = study_file.read()
+ exec(study_string)
+ result = _exec(**inputvals)
+except Exception as e:
+ error=str(e)
+ os.chdir(old_dir) # back to the current case job directory
+ with open(traceback_result, "w") as f:
+ traceback.print_exc(file=f)
+
+os.chdir(old_dir) # back to the current case job directory
+
+with open(error_result, "w") as f:
+ f.write(error)
+
+with open(value_result, "w") as f:
+ f.write(repr(result))
--- /dev/null
+# -*- coding: utf-8 -*-
+# Copyright (C) 2019 EDF R&D
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+# See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
+#
+import inspect
+import pathlib
+import os
+
+class SlurmBuilder:
+ def __init__(self, executor = None, pointEval = None, mainJob = None):
+ filename = inspect.getframeinfo(inspect.currentframe()).filename
+ install_root_directory = pathlib.Path(filename).resolve().parent
+ install_files_directory = os.path.join(install_root_directory, "slurm")
+
+ if executor is None:
+ executor = os.path.join(install_files_directory, "executor.py")
+ self.executor = executor
+
+ if pointEval is None:
+ pointEval = os.path.join(install_files_directory, "pointeval.py")
+ self.pointEval = pointEval
+
+ if mainJob is None:
+ mainJob = os.path.join(install_files_directory, "mainjob.py")
+ self.mainJob = mainJob
+
+ def getMainJob(self):
+ return self.mainJob
+
+ def getExecutor(self):
+ return self.executor
+
+ def getPointEval(self):
+ return self.pointEval
+
+ def getPluginName(self):
+ basename = os.path.basename(self.executor)
+ if not basename.endswith(".py"):
+ raise Exception("File name {} does not end with '.py'.".format(
+ self.executor))
+ return basename[:-3]
--- /dev/null
+# -*- coding: utf-8 -*-
+# Copyright (C) 2019 EDF R&D
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+# See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
+#
+import copy
+import os
+import json
+from . import pystudy
+from . import slurmbuilder
+from . import salome_proxy
+
+
+class SlurmStudy(pystudy.PyStudy):
+ def __init__(self, sampleManager=None, schemaBuilder=None):
+ if schemaBuilder is None:
+ schemaBuilder = slurmbuilder.SlurmBuilder()
+ super().__init__(sampleManager, schemaBuilder)
+
+ def createNewJob(self, script, sample, params):
+ # TODO: modifier le copier/coller
+ self._check(script,sample)
+ self.sample = sample
+ self.params = copy.deepcopy(params)
+ main_job_work_dir = self.params.salome_parameters.result_directory
+ if not os.path.exists(main_job_work_dir):
+ os.makedirs(main_job_work_dir)
+ # set the parameters of the local job
+ self.params.salome_parameters.job_type = self.jobType()
+
+ result_directory = self.params.salome_parameters.result_directory
+ # export sample to result_directory
+ inputFiles = self.sampleManager.prepareRun(self.sample, result_directory)
+ inputFiles.extend([self.schemaBuilder.getExecutor(),
+ self.schemaBuilder.getPointEval()])
+ self.params.salome_parameters.job_file = self.schemaBuilder.getMainJob()
+
+ # export config
+ configpath = os.path.join(result_directory, "idefixconfig.json")
+ dicconfig = {}
+ dicconfig["nbbranches"] = self.params.nb_branches
+ dicconfig["studymodule"] = "idefixstudy"
+ dicconfig["sampleIterator"] = self.sampleManager.getModuleName()
+ dicconfig["plugin"] = self.schemaBuilder.getPluginName()
+ nbproc = self.params.salome_parameters.resource_required.nb_proc
+ dicconfig["tasksPerEval"] = nbproc // self.params.nb_branches
+ with open(configpath, "w") as f:
+ json.dump(dicconfig, f, indent=2)
+ studypath = os.path.join(result_directory, "idefixstudy.py")
+ with open(studypath, "w") as f:
+ f.write(script.script)
+
+ inputFiles.extend([configpath, studypath])
+
+ # this list manipulation is needed because in_files is not a python list
+ # if we don't use a salome session. In that case swig uses a python tuple
+ # in order to map a std::list as a parameter of a structure.
+ in_files_as_list = list(self.params.salome_parameters.in_files)
+ self.params.salome_parameters.in_files = in_files_as_list + inputFiles
+ launcher = salome_proxy.getLauncher()
+ self.job_id = launcher.createJob(self.params.salome_parameters)
+ return self.job_id
+
+ def jobType(self):
+ return "command_salome"