JobParametersProxy.cxx
Exceptions.cxx
MonoPyJob.cxx
+ PyStudyJob.cxx
)
SET(ydefx_HEADERS
TMonoPyJob.hxx
Job.hxx
Launcher.hxx
+ PyStudyJob.hxx
+ TPyStudyJob.hxx
)
SET(ydefx_LINK
#define YDEFX_LAUNCHER_H
#include "TMonoPyJob.hxx"
+#include "TPyStudyJob.hxx"
namespace ydefx
{
Sample<Ts...>& sample,
const JobParametersProxy& params);
+ template <class ...Ts>
+ Job* submitPyStudyJob(py2cpp::PyPtr& pyStudyObj,
+ const PyStudyFunction& fnScript,
+ Sample<Ts...>& sample,
+ const JobParametersProxy& params);
+
/*!
* Connect to an already created job.
* Return nullptr in case of failure. Check the error with lastError().
return result;
}
+template <class ...Ts>
+Job* Launcher::submitPyStudyJob(py2cpp::PyPtr& pyStudyObj,
+ const PyStudyFunction& fnScript,
+ Sample<Ts...>& sample,
+ const JobParametersProxy& params)
+{
+ Job* result = nullptr;
+ _lastError = "";
+ try
+ {
+ result = new TPyStudyJob<Ts...>(pyStudyObj, fnScript, sample, params);
+ }
+ catch(std::exception& e)
+ {
+ if(result != nullptr)
+ delete result;
+ result = nullptr;
+ _lastError = e.what();
+ return result;
+ }
+
+ if(!result->lastError().empty())
+ {
+ _lastError = result->lastError();
+ delete result;
+ result = nullptr;
+ return result;
+ }
+
+ if(!result->launch())
+ {
+ _lastError = "Failed to submit job.\n";
+ _lastError += result->lastError();
+ delete result;
+ result = nullptr;
+ }
+ return result;
+}
+
template <class ...Ts>
Job* Launcher::connectJob(const std::string& jobDump,
Sample<Ts...>& sample)
--- /dev/null
+// Copyright (C) 2019 EDF R&D
+//
+// This library is free software; you can redistribute it and/or
+// modify it under the terms of the GNU Lesser General Public
+// License as published by the Free Software Foundation; either
+// version 2.1 of the License, or (at your option) any later version.
+//
+// This library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public
+// License along with this library; if not, write to the Free Software
+// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+//
+// See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
+//
+#include "PyStudyJob.hxx"
+#include <py2cpp/py2cpp.hxx>
+
+namespace ydefx
+{
+PyStudyJob::PyStudyJob()
+: _pyStudy()
+, _lastError()
+, _waitDelay(10)
+{
+ py2cpp::PyFunction objConstructor;
+ objConstructor.loadExp("pydefx", "PyStudy");
+ _pyStudy = objConstructor();
+}
+
+PyStudyJob::PyStudyJob(const std::string& pymodule_name, const std::string& pyclass_name)
+: _pyStudy()
+, _lastError()
+, _waitDelay(10)
+{
+ py2cpp::PyFunction objConstructor;
+ objConstructor.loadExp(pymodule_name, pyclass_name);
+ _pyStudy = objConstructor();
+}
+
+PyStudyJob::PyStudyJob(py2cpp::PyPtr& pyStudyObj)
+: _pyStudy(pyStudyObj)
+, _lastError()
+, _waitDelay(10)
+{
+}
+
+PyStudyJob::~PyStudyJob()
+{
+}
+
+std::string PyStudyJob::state()
+{
+ std::string result;
+ _lastError = "";
+ try
+ {
+ py2cpp::PyFunction pyFn;
+ pyFn.loadExp(_pyStudy, "getJobState");
+ py2cpp::pyResult(result) = pyFn();
+ }
+ catch(std::exception& e)
+ {
+ _lastError = "An error occured while retrieving job's state.\n";
+ _lastError += e.what();
+ }
+ return result;
+}
+
+double PyStudyJob::progress()
+{
+ double result;
+ py2cpp::PyFunction pyFn;
+ _lastError = "";
+ try
+ {
+ pyFn.loadExp(_pyStudy, "getProgress");
+ py2cpp::pyResult(result) = pyFn();
+ }
+ catch(std::exception& e)
+ {
+ _lastError = "An error occured while retrieving job's progress.\n";
+ _lastError += e.what();
+ }
+ return result;
+}
+
+std::string PyStudyJob::dump()
+{
+ std::string result;
+ _lastError = "";
+ try
+ {
+ py2cpp::PyFunction pyFn;
+ pyFn.loadExp(_pyStudy, "dump");
+ py2cpp::pyResult(result) = pyFn();
+ }
+ catch(std::exception& e)
+ {
+ _lastError = "An error occured while dumping the job.\n";
+ _lastError += e.what();
+ }
+ return result;
+}
+
+bool PyStudyJob::launch()
+{
+ _lastError = "";
+ try
+ {
+ py2cpp::PyFunction pyFn;
+ pyFn.loadExp(_pyStudy, "launch");
+ pyFn();
+ }
+ catch(std::exception& e)
+ {
+ _lastError = "An error occured while launching the job.\n";
+ _lastError += e.what();
+ }
+ return _lastError.empty();
+}
+
+const std::string& PyStudyJob::lastError()
+{
+ return _lastError;
+}
+
+bool PyStudyJob::wait()
+{
+ _lastError = "";
+ try
+ {
+ py2cpp::PyFunction pyFn;
+ pyFn.loadExp(_pyStudy, "wait");
+ pyFn(_waitDelay);
+ }
+ catch(std::exception& e)
+ {
+ _lastError = "An error occured while waiting the end of the job.\n";
+ _lastError += e.what();
+ }
+ return _lastError.empty();
+}
+
+void PyStudyJob::configureWaitDelay(int seconds)
+{
+ _waitDelay = seconds;
+}
+
+}
--- /dev/null
+// Copyright (C) 2019 EDF R&D
+//
+// This library is free software; you can redistribute it and/or
+// modify it under the terms of the GNU Lesser General Public
+// License as published by the Free Software Foundation; either
+// version 2.1 of the License, or (at your option) any later version.
+//
+// This library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public
+// License along with this library; if not, write to the Free Software
+// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+//
+// See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
+//
+#ifndef YDEFX_PYSTUDYJOB_HXX
+#define YDEFX_PYSTUDYJOB_HXX
+#include "Job.hxx"
+#include <py2cpp/PyPtr.hxx>
+
+namespace ydefx
+{
+class PyStudyJob : public Job
+{
+public:
+ PyStudyJob(const std::string& pymodule_name, const std::string& pyclass_name);
+ PyStudyJob(py2cpp::PyPtr& pyStudyObj);
+ PyStudyJob();
+ virtual ~PyStudyJob();
+ virtual std::string state();
+ virtual double progress();
+ virtual std::string dump();
+ virtual bool launch(); // return false when it fails
+ virtual bool fetch()=0; // return false when it fails
+ virtual const std::string& lastError();
+ virtual bool wait(); // Wait for the end of the job. Return false when it fails.
+ virtual void configureWaitDelay(int seconds);
+protected:
+ py2cpp::PyPtr _pyStudy;
+ std::string _lastError;
+ int _waitDelay;
+};
+
+}
+
+#endif //YDEFX_PYSTUDYJOB_HXX
--- /dev/null
+// Copyright (C) 2019 EDF R&D
+//
+// This library is free software; you can redistribute it and/or
+// modify it under the terms of the GNU Lesser General Public
+// License as published by the Free Software Foundation; either
+// version 2.1 of the License, or (at your option) any later version.
+//
+// This library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public
+// License along with this library; if not, write to the Free Software
+// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+//
+// See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
+//
+#ifndef YDEFX_TPYSTUDYJOB_HXX
+#define YDEFX_TPYSTUDYJOB_HXX
+#include "JobParametersProxy.hxx"
+#include "PyStudyJob.hxx"
+#include "SamplePyConversions.hxx"
+#include "PyStudyFunction.hxx"
+
+namespace ydefx
+{
+template <class ...Ts>
+class TPyStudyJob : public PyStudyJob
+{
+public:
+ //! Create a new job using the default pystudy class.
+ TPyStudyJob(const PyStudyFunction& fnScript,
+ Sample<Ts...>& sample,
+ const JobParametersProxy& params)
+ : PyStudyJob()
+ , _sample(sample)
+ {
+ createNewJob(fnScript, params);
+ }
+
+ TPyStudyJob(py2cpp::PyPtr& pyStudyObj,
+ const PyStudyFunction& fnScript,
+ Sample<Ts...>& sample,
+ const JobParametersProxy& params)
+ : PyStudyJob(pyStudyObj)
+ , _sample(sample)
+ {
+ createNewJob(fnScript, params);
+ }
+
+ //! Connect to an existing job.
+ TPyStudyJob(const std::string& jobDump, Sample<Ts...>& sample)
+ : PyStudyJob()
+ , _sample(sample)
+ {
+ if(_lastError.empty()) // no errors during parent construction
+ {
+ try
+ {
+ py2cpp::PyFunction pyFn;
+ pyFn.loadExp(_pyStudy, "loadFromString");
+ pyFn(jobDump);
+ }
+ catch(std::exception& e)
+ {
+ _lastError = "An error occured while creating the job.\n";
+ _lastError += e.what();
+ }
+ }
+ }
+
+ virtual ~TPyStudyJob(){}
+ virtual bool fetch()
+ {
+ _lastError = "";
+ try
+ {
+ py2cpp::PyFunction pyFn;
+ pyFn.loadExp(_pyStudy, "getResult");
+ pyFn(); // python call: _pyStudy.getResult()
+ fetchResults(_pyStudy.getAttr("sample"), _sample);
+ }
+ catch(std::exception& e)
+ {
+ _lastError = "An error occured while fetching the results.\n";
+ _lastError += e.what();
+ }
+ return _lastError.empty();
+ }
+
+ const Sample<Ts...>& getSample()const{return _sample;}
+
+private:
+ void createNewJob(const PyStudyFunction& fnScript, const JobParametersProxy& params)
+ {
+ if(_lastError.empty()) // no errors during parent construction
+ {
+ try
+ {
+ py2cpp::PyPtr pySample = createPySample(_sample);
+ py2cpp::PyFunction pyFn;
+ pyFn.loadExp(_pyStudy, "createNewJob");
+ pyFn(fnScript, pySample, params);
+ }
+ catch(std::exception& e)
+ {
+ _lastError = "An error occured while creating the job.\n";
+ _lastError += e.what();
+ }
+ }
+ }
+
+private:
+ Sample<Ts...>& _sample;
+};
+
+}
+
+#endif //YDEFX_TPYSTUDYJOB_HXX
--- /dev/null
+// Copyright (C) 2019 EDF R&D
+//
+// This library is free software; you can redistribute it and/or
+// modify it under the terms of the GNU Lesser General Public
+// License as published by the Free Software Foundation; either
+// version 2.1 of the License, or (at your option) any later version.
+//
+// This library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public
+// License along with this library; if not, write to the Free Software
+// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+//
+// See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
+//
+#include <cppunit/CompilerOutputter.h>
+#include <cppunit/TestResult.h>
+#include <cppunit/TestResultCollector.h>
+#include <cppunit/TextTestProgressListener.h>
+#include <cppunit/BriefTestProgressListener.h>
+#include <cppunit/extensions/TestFactoryRegistry.h>
+#include <cppunit/TestRunner.h>
+#include <cppunit/TextTestRunner.h>
+#include <stdexcept>
+
+#include <iostream>
+#include <fstream>
+#include <stdlib.h>
+#include <Python.h>
+
+// ============================================================================
+/*!
+ * Main program source for Unit Tests with cppunit package does not depend
+ * on actual tests, so we use the same for all partial unit tests.
+ * This version of TestMain initializes the python library and it can be used
+ * if you have several tests which need Py_Initialize and salome_init.
+ */
+// ============================================================================
+
+int main(int argc, char* argv[])
+{
+ Py_Initialize();
+ // --- Create the event manager and test controller
+ CPPUNIT_NS::TestResult controller;
+
+ // --- Add a listener that collects test result
+ CPPUNIT_NS::TestResultCollector result;
+ controller.addListener( &result );
+
+ // --- Add a listener that print dots as test run.
+#ifdef WIN32
+ CPPUNIT_NS::TextTestProgressListener progress;
+#else
+ CPPUNIT_NS::BriefTestProgressListener progress;
+#endif
+ controller.addListener( &progress );
+
+ // --- Get the top level suite from the registry
+
+ CPPUNIT_NS::Test *suite =
+ CPPUNIT_NS::TestFactoryRegistry::getRegistry().makeTest();
+
+ // --- Adds the test to the list of test to run
+
+ CPPUNIT_NS::TestRunner runner;
+ runner.addTest( suite );
+ runner.run( controller);
+
+ // --- Print test in a compiler compatible format.
+ std::ofstream testFile;
+ testFile.open("test.log", std::ios::out | std::ios::app);
+ testFile << "------ Idefix test log:" << std::endl;
+ CPPUNIT_NS::CompilerOutputter outputter( &result, testFile );
+ outputter.write();
+
+ // --- Run the tests.
+
+ bool wasSucessful = result.wasSuccessful();
+ testFile.close();
+ Py_Finalize();
+
+ // --- Return error code 1 if the one of test failed.
+
+ return wasSucessful ? 0 : 1;
+}
void SampleTest::fullStudy()
{
- Py_Initialize();
- {
std::list<std::string> resources = ydefx::JobParametersProxy::AvailableResources();
CPPUNIT_ASSERT(resources.size() > 0);
jobParams.configureResource("localhost");
jobParams.work_directory(jobParams.work_directory() + "/GeneralTest");
jobParams.createResultDirectory("/tmp");
- std::string pyScript =
+ std::string pyScript =
"def _exec(a, b):\n"
" d = a / b\n"
" t = ['object which needs pickel protocol']\n"
myJob = l.submitMonoPyJob(wrongStudy, sample, jobParams);
CPPUNIT_ASSERT(myJob == nullptr);
CPPUNIT_ASSERT(l.lastError().find("SyntaxError") != std::string::npos);
- }
- Py_Finalize();
+}
+
+void SampleTest::genericStudy()
+{
+ std::list<std::string> resources = ydefx::JobParametersProxy::AvailableResources();
+ CPPUNIT_ASSERT(resources.size() > 0);
+
+ ydefx::JobParametersProxy jobParams;
+ jobParams.configureResource("localhost");
+ jobParams.work_directory(jobParams.work_directory() + "/GenericTest");
+ jobParams.createResultDirectory("/tmp");
+ std::string pyScript =
+"def _exec(a, b):\n"
+" d = a / b\n"
+" t = ['object which needs pickel protocol']\n"
+" return d,t\n";
+
+ ydefx::PyStudyFunction studyFunction;
+ studyFunction.loadString(pyScript);
+ CPPUNIT_ASSERT(studyFunction.isValid());
+ const std::list<std::string>& inputs = studyFunction.inputNames();
+ CPPUNIT_ASSERT(std::find(inputs.begin(), inputs.end(), "a")!=inputs.end());
+ CPPUNIT_ASSERT(std::find(inputs.begin(), inputs.end(), "b")!=inputs.end());
+ const std::list<std::string>& outputs = studyFunction.outputNames();
+ CPPUNIT_ASSERT(std::find(outputs.begin(), outputs.end(), "d")
+ != outputs.end());
+ CPPUNIT_ASSERT(std::find(outputs.begin(), outputs.end(), "t")
+ != outputs.end());
+
+ ydefx::Sample<double, py2cpp::PyPtr > sample;
+ std::vector<double> a_vals = {1.1, 4.4, 9, 4};
+ std::vector<double> b_vals = {1.1, 2.2, 3, 1};
+ sample.inputs<double>().set("a", a_vals);
+ sample.inputs<double>().set("b", b_vals);
+ sample.outputs<double>().addName("d");
+ sample.outputs<py2cpp::PyPtr >().addName("t");
+
+ py2cpp::PyFunction objConstructor;
+ objConstructor.loadExp("pydefx", "PyStudy");
+ py2cpp::PyPtr pyStudy = objConstructor();
+
+ ydefx::Launcher l;
+ ydefx::Job* myJob = l.submitPyStudyJob(pyStudy, studyFunction, sample, jobParams);
+ CPPUNIT_ASSERT(myJob);
+ CPPUNIT_ASSERT(l.lastError().empty());
+ std::string jobDump = myJob->dump();
+ CPPUNIT_ASSERT(myJob->lastError().empty());
+ std::string jobState = myJob->state();
+ CPPUNIT_ASSERT(myJob->lastError().empty());
+ CPPUNIT_ASSERT(jobState == "QUEUED" || jobState == "RUNNING"
+ || jobState == "FINISHED");
+ double progress = myJob->progress();
+ CPPUNIT_ASSERT(progress >= 0.0 && progress <= 1.0 );
+ CPPUNIT_ASSERT(myJob->lastError().empty());
+ bool ok = myJob->wait();
+ CPPUNIT_ASSERT(ok);
+ CPPUNIT_ASSERT(myJob->lastError().empty());
+ jobState = myJob->state();
+ CPPUNIT_ASSERT(jobState == "FINISHED");
+ progress = myJob->progress();
+ CPPUNIT_ASSERT(progress == 1.0);
+ ok = myJob->fetch();
+ CPPUNIT_ASSERT(ok);
+ CPPUNIT_ASSERT(myJob->lastError().empty());
+ std::vector<double> expectedResult = {1,2,3,4};
+ const std::vector<double>& result = sample.outputs<double>().get("d");
+ CPPUNIT_ASSERT(expectedResult == result);
+ const std::vector<py2cpp::PyPtr>& pyobjResult
+ = sample.outputs<py2cpp::PyPtr>().get("t");
+ for(const py2cpp::PyPtr& obj : pyobjResult)
+ CPPUNIT_ASSERT(obj.repr() == "['object which needs pickel protocol']");
+ delete myJob;
+
+ // test a case of error
+ std::string wrongScript = "wrong 'script";
+ ydefx::PyStudyFunction wrongStudy;
+ wrongStudy.loadString(wrongScript);
+ CPPUNIT_ASSERT(!wrongStudy.isValid());
+ myJob = l.submitPyStudyJob(pyStudy, wrongStudy, sample, jobParams);
+ CPPUNIT_ASSERT(myJob == nullptr);
+ CPPUNIT_ASSERT(l.lastError().find("SyntaxError") != std::string::npos);
}
CPPUNIT_TEST_SUITE_REGISTRATION( SampleTest );
-#include "TestMain.cxx"
+#include "PyTestMain.cxx"
{
CPPUNIT_TEST_SUITE(SampleTest);
CPPUNIT_TEST(fullStudy);
+ CPPUNIT_TEST(genericStudy);
CPPUNIT_TEST_SUITE_END();
public:
void setUp();
void tearDown();
void cleanUp();
void fullStudy();
+ void genericStudy();
};
#endif // YDEFX_SAMPLETEST_HXX
#
SET(SCRIPTS
__init__.py
+ allpurposebuilder.py
configuration.py
parameters.py
pyscript.py
studyexception.py
studyresult.py
salome_proxy.py
+ multijobbuilder.py
+ multijobstudy.py
+ slurmbuilder.py
+ slurmstudy.py
+ localbuilder.py
+ localstudy.py
)
INSTALL(FILES ${SCRIPTS} DESTINATION ${SALOME_INSTALL_PYTHON}/pydefx)
ADD_SUBDIRECTORY(schemas)
+ADD_SUBDIRECTORY(plugins)
from .pystudy import PyStudy
from .sample import Sample
from .defaultschemabuilder import DefaultSchemaBuilder
+from .allpurposebuilder import AllPurposeBuilder
+from .localbuilder import LocalBuilder
+from .multijobbuilder import MultiJobBuilder
+from .slurmbuilder import SlurmBuilder
from .salome_proxy import forceSalomeServers, forceNoSalomeServers
+from .multijobstudy import MultiJobStudy
+from .slurmstudy import SlurmStudy
+from .localstudy import LocalStudy
--- /dev/null
+# -*- coding: utf-8 -*-
+# Copyright (C) 2019 EDF R&D
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+# See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
+#
+import inspect
+import pathlib
+import os
+
+class AllPurposeBuilder:
+ def __init__(self, executor = None, pointEval = None, mainJob = None):
+ filename = inspect.getframeinfo(inspect.currentframe()).filename
+ install_root_directory = pathlib.Path(filename).resolve().parent
+ install_files_directory = os.path.join(install_root_directory, "plugins")
+
+ if executor is None:
+ raise TypeError("Parameter executor should not be None.")
+ self.executor = executor
+
+ if pointEval is None:
+ pointEval = os.path.join(install_files_directory, "pointeval.py")
+ self.pointEval = pointEval
+
+ if mainJob is None:
+ mainJob = os.path.join(install_files_directory, "mainjob.py")
+ self.mainJob = mainJob
+
+ def getMainJob(self):
+ return self.mainJob
+
+ def getExecutor(self):
+ return self.executor
+
+ def getPointEval(self):
+ return self.pointEval
+
+ def getPluginName(self):
+ basename = os.path.basename(self.executor)
+ if not basename.endswith(".py"):
+ raise ValueError("File name {} does not end with '.py'.".format(
+ self.executor))
+ return basename[:-3]
# See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
#
from . import salome_proxy
+from . import parameters
import tempfile
import pathlib
+import os
+import json
def defaultWorkingDir(resource):
resManager = salome_proxy.getResourcesManager()
# GetFittingResources returns a tuple if in no salome session mode.
# Force to list for uniformity between the two modes.
return list(resManager.GetFittingResources(params.resource_required))
+
+def exportConfig(dicconfig, directory = None):
+ """ Save the configuration to a directory.
+ dicconfig is a dictionary which contains the parameters to be saved.
+ If directory is None, the configuration is saved to the current directory.
+ Return the path to the configuration file.
+ """
+ if directory is None:
+ directory = os.getcwd()
+ configpath = os.path.join(directory, "idefixconfig.json")
+ with open(configpath, "w") as f:
+ json.dump(dicconfig, f, indent=2)
+ return configpath
+
+def loadConfig(directory = None):
+ """ Return the configuration dictionary from a directory.
+ If the directory is None, use the current directory.
+ """
+ if directory is None:
+ directory = os.getcwd()
+ configpath = os.path.join(directory, "idefixconfig.json")
+ if not pathlib.Path(configpath).is_file():
+ configpath = os.path.join(directory, "..", "idefixconfig.json")
+ if not pathlib.Path(configpath).is_file():
+ message = "Configuration file not found in directory " + str(directory)
+ raise FileNotFoundError(message)
+ with open(configpath, "r") as f:
+ config = json.load(f)
+ return config
+
+def loadJobConfig(directory = None):
+ """ Return the salome job parameters loaded from a directory which contains
+ a idefixconfig.json file.
+ If the directory is None, use the current directory.
+ """
+ config = loadConfig(directory)
+ params = parameters.Parameters()
+ params.loadDict(config["params"])
+ result = params.salome_parameters
+ return result
--- /dev/null
+# -*- coding: utf-8 -*-
+# Copyright (C) 2019 EDF R&D
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+# See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
+#
+import inspect
+import pathlib
+import os
+from .allpurposebuilder import AllPurposeBuilder
+
+class LocalBuilder(AllPurposeBuilder):
+ def __init__(self, executor = None, pointEval = None, mainJob = None):
+ filename = inspect.getframeinfo(inspect.currentframe()).filename
+ install_root_directory = pathlib.Path(filename).resolve().parent
+ install_files_directory = os.path.join(install_root_directory, "plugins")
+ if executor is None:
+ executor = os.path.join(install_files_directory, "localexecutor.py")
+ elif executor == "localexecutor" or executor == "localexecutor.py":
+ executor = os.path.join(install_files_directory, "localexecutor.py")
+ elif executor == "lightexecutor" or executor == "lightexecutor.py":
+ executor = os.path.join(install_files_directory, "lightexecutor.py")
+ super().__init__(executor, pointEval, mainJob)
+
--- /dev/null
+# -*- coding: utf-8 -*-
+# Copyright (C) 2019 EDF R&D
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+# See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
+#
+import copy
+import os
+import json
+from . import pystudy
+from . import localbuilder
+from . import salome_proxy
+from . import configuration
+
+class LocalStudy(pystudy.PyStudy):
+ """
+ This study is always localy evaluated.
+ """
+ def __init__(self, sampleManager=None, schemaBuilder=None):
+ if schemaBuilder is None:
+ schemaBuilder = localbuilder.LocalBuilder()
+ super().__init__(sampleManager, schemaBuilder)
+
+ def createNewJob(self, script, sample, params):
+ self._check(script,sample)
+ self.sample = sample
+ self.params = copy.deepcopy(params)
+ # dump the remote jobs parameters to the configuration file
+ params_dic = params.dumpDict()
+ # modify the parameters for the local loop job
+ self.params.salome_parameters.resource_required.name = "localhost"
+ self.params.salome_parameters.job_type = "command_salome" #"python_salome"
+ self.params.createTmpResultDirectory()
+ result_directory = self.params.salome_parameters.result_directory
+ # export sample to result_directory
+ inputFiles = self.sampleManager.prepareRun(self.sample, result_directory)
+ inputFiles.extend([self.schemaBuilder.getExecutor(),
+ self.schemaBuilder.getPointEval()])
+ self.params.salome_parameters.job_file = self.schemaBuilder.getMainJob()
+
+ # export config
+ dicconfig = {}
+ dicconfig["nbbranches"] = self.params.nb_branches
+ dicconfig["studymodule"] = "idefixstudy"
+ dicconfig["sampleIterator"] = self.sampleManager.getModuleName()
+ dicconfig["params"] = params_dic
+ dicconfig["plugin"] = self.schemaBuilder.getPluginName()
+ configpath = configuration.exportConfig(dicconfig, result_directory)
+ studypath = os.path.join(result_directory, "idefixstudy.py")
+ with open(studypath, "w") as f:
+ f.write(script.script)
+
+ inputFiles.extend([configpath, studypath])
+
+ # this list manipulation is needed because in_files is not a python list
+ # if we don't use a salome session. In that case swig uses a python tuple
+ # in order to map a std::list as a parameter of a structure.
+ in_files_as_list = list(self.params.salome_parameters.in_files)
+ self.params.salome_parameters.in_files = in_files_as_list + inputFiles
+ launcher = salome_proxy.getLauncher()
+ self.job_id = launcher.createJob(self.params.salome_parameters)
+ return self.job_id
+
+ def jobType(self):
+ return "command_salome"
--- /dev/null
+# -*- coding: utf-8 -*-
+# Copyright (C) 2019 EDF R&D
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+# See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
+#
+import inspect
+import pathlib
+import os
+from .allpurposebuilder import AllPurposeBuilder
+
+class MultiJobBuilder(AllPurposeBuilder):
+ def __init__(self, executor = None, pointEval = None, mainJob = None):
+ filename = inspect.getframeinfo(inspect.currentframe()).filename
+ install_root_directory = pathlib.Path(filename).resolve().parent
+ install_files_directory = os.path.join(install_root_directory, "plugins")
+
+ if executor is None:
+ executor = os.path.join(install_files_directory, "jobexecutor.py")
+ super().__init__(executor, pointEval, mainJob)
--- /dev/null
+# -*- coding: utf-8 -*-
+# Copyright (C) 2019 EDF R&D
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+# See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
+#
+import copy
+import os
+import json
+from . import pystudy
+from . import multijobbuilder
+from . import salome_proxy
+from . import configuration
+
+
+class MultiJobStudy(pystudy.PyStudy):
+ """
+ This study uses one different job for each evaluation.
+ """
+ def __init__(self, sampleManager=None, schemaBuilder=None):
+ if schemaBuilder is None:
+ schemaBuilder = multijobbuilder.MultiJobBuilder()
+ super().__init__(sampleManager, schemaBuilder)
+
+ def createNewJob(self, script, sample, params):
+ self._check(script,sample)
+ self.sample = sample
+ self.params = copy.deepcopy(params)
+ main_job_work_dir = self.params.salome_parameters.result_directory
+ params_dic = params.dumpDict()
+ params_dic["salome_parameters"]["job_type"] = "command_salome"
+ params_dic["salome_parameters"]["job_file"] = self.schemaBuilder.getPointEval()
+ params_dic["salome_parameters"]["local_directory"] = main_job_work_dir
+ # set the parameters of the local job
+ self.params.salome_parameters.resource_required.name = "localhost"
+ self.params.salome_parameters.job_type = "command_salome" #"python_salome"
+
+ self.params.salome_parameters.work_directory = main_job_work_dir
+ self.params.createTmpResultDirectory()
+ result_directory = self.params.salome_parameters.result_directory
+ # export sample to result_directory
+ inputFiles = self.sampleManager.prepareRun(self.sample, result_directory)
+ inputFiles.extend([self.schemaBuilder.getExecutor(),
+ self.schemaBuilder.getPointEval()])
+ self.params.salome_parameters.job_file = self.schemaBuilder.getMainJob()
+
+ # export config
+ dicconfig = {}
+ dicconfig["nbbranches"] = self.params.nb_branches
+ dicconfig["studymodule"] = "idefixstudy"
+ dicconfig["sampleIterator"] = self.sampleManager.getModuleName()
+ dicconfig["params"] = params_dic
+ dicconfig["plugin"] = self.schemaBuilder.getPluginName()
+ configpath = configuration.exportConfig(dicconfig, result_directory)
+ studypath = os.path.join(result_directory, "idefixstudy.py")
+ with open(studypath, "w") as f:
+ f.write(script.script)
+
+ inputFiles.extend([configpath, studypath])
+
+ # this list manipulation is needed because in_files is not a python list
+ # if we don't use a salome session. In that case swig uses a python tuple
+ # in order to map a std::list as a parameter of a structure.
+ in_files_as_list = list(self.params.salome_parameters.in_files)
+ self.params.salome_parameters.in_files = in_files_as_list + inputFiles
+ launcher = salome_proxy.getLauncher()
+ self.job_id = launcher.createJob(self.params.salome_parameters)
+ return self.job_id
+
+ def jobType(self):
+ return "command_salome"
newobj.salome_parameters.mem_per_cpu = self.salome_parameters.mem_per_cpu
newobj.salome_parameters.wckey = self.salome_parameters.wckey
newobj.salome_parameters.extra_params = self.salome_parameters.extra_params
- newobj.salome_parameters.specific_parameters = self.salome_parameters.specific_parameters
+ #newobj.salome_parameters.specific_parameters = self.salome_parameters.specific_parameters
newobj.salome_parameters.resource_required.name = self.salome_parameters.resource_required.name
newobj.salome_parameters.resource_required.hostname = self.salome_parameters.resource_required.hostname
newobj.salome_parameters.resource_required.can_launch_batch_jobs = self.salome_parameters.resource_required.can_launch_batch_jobs
newobj.salome_parameters.resource_required.nb_proc_per_node = self.salome_parameters.resource_required.nb_proc_per_node
return newobj
+
+ def dumpDict(self):
+ """Create a dictionary with all the properties.
+ Can be used for serialization with json."""
+ newdict = {
+ "nb_branches" : self.nb_branches,
+ "salome_parameters" : {
+ "job_name" : self.salome_parameters.job_name,
+ "job_type" : self.salome_parameters.job_type,
+ "job_file" : self.salome_parameters.job_file,
+ "pre_command" : self.salome_parameters.pre_command,
+ "env_file" : self.salome_parameters.env_file,
+ "in_files" : list(self.salome_parameters.in_files),
+ "out_files" : list(self.salome_parameters.out_files),
+ "work_directory" : self.salome_parameters.work_directory,
+ "local_directory" : self.salome_parameters.local_directory,
+ "result_directory" : self.salome_parameters.result_directory,
+ "maximum_duration" : self.salome_parameters.maximum_duration,
+ "queue" : self.salome_parameters.queue,
+ "partition" : self.salome_parameters.partition,
+ "exclusive" : self.salome_parameters.exclusive,
+ "mem_per_cpu" : self.salome_parameters.mem_per_cpu,
+ "wckey" : self.salome_parameters.wckey,
+ "extra_params" : self.salome_parameters.extra_params,
+ #"specific_parameters" : str(self.salome_parameters.specific_parameters),
+ "resource_required" : {
+ "name" : self.salome_parameters.resource_required.name,
+ "hostname" : self.salome_parameters.resource_required.hostname,
+ "can_launch_batch_jobs" : self.salome_parameters.resource_required.can_launch_batch_jobs,
+ "can_run_containers" : self.salome_parameters.resource_required.can_run_containers,
+ "OS" : self.salome_parameters.resource_required.OS,
+ "nb_proc" : self.salome_parameters.resource_required.nb_proc,
+ "mem_mb" : self.salome_parameters.resource_required.mem_mb,
+ "cpu_clock" : self.salome_parameters.resource_required.cpu_clock,
+ "nb_node" : self.salome_parameters.resource_required.nb_node,
+ "nb_proc_per_node" : self.salome_parameters.resource_required.nb_proc_per_node
+ }
+ }
+ }
+ return newdict
+
+ def loadDict(self, dico):
+ self.nb_branches = dico["nb_branches"]
+ #self.salome_parameters = salome_proxy.createSalomeParameters()
+ self.salome_parameters.job_name = dico["salome_parameters"]["job_name"]
+ self.salome_parameters.job_type = dico["salome_parameters"]["job_type"]
+ self.salome_parameters.job_file = dico["salome_parameters"]["job_file"]
+ self.salome_parameters.pre_command = dico["salome_parameters"]["pre_command"]
+ self.salome_parameters.env_file = dico["salome_parameters"]["env_file"]
+ self.salome_parameters.in_files = dico["salome_parameters"]["in_files"]
+ self.salome_parameters.out_files = dico["salome_parameters"]["out_files"]
+ self.salome_parameters.work_directory = dico["salome_parameters"]["work_directory"]
+ self.salome_parameters.local_directory = dico["salome_parameters"]["local_directory"]
+ self.salome_parameters.result_directory = dico["salome_parameters"]["result_directory"]
+ self.salome_parameters.maximum_duration = dico["salome_parameters"]["maximum_duration"]
+ self.salome_parameters.queue = dico["salome_parameters"]["queue"]
+ self.salome_parameters.partition = dico["salome_parameters"]["partition"]
+ self.salome_parameters.exclusive = dico["salome_parameters"]["exclusive"]
+ self.salome_parameters.mem_per_cpu = dico["salome_parameters"]["mem_per_cpu"]
+ self.salome_parameters.wckey = dico["salome_parameters"]["wckey"]
+ self.salome_parameters.extra_params = dico["salome_parameters"]["extra_params"]
+ self.salome_parameters.resource_required.name = dico["salome_parameters"]["resource_required"]["name"]
+ self.salome_parameters.resource_required.hostname = dico["salome_parameters"]["resource_required"]["hostname"]
+ self.salome_parameters.resource_required.can_launch_batch_jobs = dico["salome_parameters"]["resource_required"]["can_launch_batch_jobs"]
+ self.salome_parameters.resource_required.can_run_containers = dico["salome_parameters"]["resource_required"]["can_run_containers"]
+ self.salome_parameters.resource_required.OS = dico["salome_parameters"]["resource_required"]["OS"]
+ self.salome_parameters.resource_required.nb_proc = dico["salome_parameters"]["resource_required"]["nb_proc"]
+ self.salome_parameters.resource_required.mem_mb = dico["salome_parameters"]["resource_required"]["mem_mb"]
+ self.salome_parameters.resource_required.cpu_clock = dico["salome_parameters"]["resource_required"]["cpu_clock"]
+ self.salome_parameters.resource_required.nb_node = dico["salome_parameters"]["resource_required"]["nb_node"]
+ self.salome_parameters.resource_required.nb_proc_per_node = dico["salome_parameters"]["resource_required"]["nb_proc_per_node"]
--- /dev/null
+SET(SCHEMA_FILES
+ jobexecutor.py
+ lightexecutor.py
+ localexecutor.py
+ srunexecutor.py
+ mainjob.py
+ pointeval.py
+ )
+
+INSTALL(FILES ${SCHEMA_FILES} DESTINATION ${SALOME_INSTALL_PYTHON}/pydefx/plugins)
--- /dev/null
+import pydefx
+import os
+import pickle
+import time
+import traceback
+
+pydefx.forceNoSalomeServers()
+class Context:
+ def __init__(self):
+ self.launcher = pydefx.salome_proxy.getLauncher() # getLauncher()
+ pass
+
+class JobExecutor:
+ def __init__(self, config):
+ self.config = config
+
+ def initialize(self):
+ """ This is executed before the first evaluation.
+ Put here global processing needed by all the evaluations like the copy of
+ commun files.
+ """
+ # Copy the commun files to the root work directory
+ params = pydefx.Parameters() # global parameters
+ params.loadDict(self.config["params"])
+ # use a fake empty command.
+ # Using launcher to copy some files on the remote file system,
+ # without launching a job.
+ command = os.path.join(os.getcwd(), "empty.sh")
+ open(command, "w").close()
+ params.salome_parameters.job_file = command
+ params.salome_parameters.job_type = "command"
+ study_module = os.path.join(os.getcwd(), self.config["studymodule"]+".py")
+ infiles = list(params.salome_parameters.in_files)
+ params.salome_parameters.in_files = infiles + [study_module]
+ launcher = pydefx.salome_proxy.getLauncher()
+ job_id = launcher.createJob(params.salome_parameters)
+ launcher.exportInputFiles(job_id)
+
+ def evaluate(self, idx, point):
+ """ This is executed for every point to be evaluated.
+ """
+ context = Context()
+ error = None
+ out_values = None
+ try:
+ self.prepare(idx, point, context)
+ if self.noRunFound(idx, point, context):
+ self.runjob(idx, point, context)
+ error, out_values = self.getResult(context)
+ except Exception as e:
+ error = str(e)
+ traceback.print_exc()
+ return error, out_values
+
+ def prepare(self, idx, point, context):
+ """
+ Define local and remote work directory.
+ Define job script.
+ """
+ context.params = pydefx.Parameters()
+ context.params.loadDict(self.config["params"])
+ salome_parameters = context.params.salome_parameters
+ root_local_dir = salome_parameters.result_directory
+ root_remote_dir = salome_parameters.work_directory
+ input_files = [] # commun files are already copied to the root directory
+ point_name = "job_"+str(idx)
+ context.local_dir = os.path.join(root_local_dir, point_name)
+ point_remote_dir = os.path.join(root_remote_dir, point_name)
+ if not os.path.exists(context.local_dir):
+ os.mkdir(context.local_dir)
+ # export the point to a file
+ data_file_name = "idefixdata.csv"
+ data_file_path = os.path.join(context.local_dir, data_file_name)
+ with open(data_file_path, "w") as f:
+ # explicit dict convertion is needed for compatibility between python versions
+ f.write(repr(dict(point)))
+ input_files.append(data_file_path)
+
+ #command_path = os.path.join(root_local_dir, "command.py")
+ #salome_parameters.job_type = "command_salome"
+ #salome_parameters.job_file = command_path
+
+ salome_parameters.in_files = input_files
+ salome_parameters.out_files = ["idefixresult.txt", "idefixerror.txt"]
+ salome_parameters.work_directory = point_remote_dir
+ salome_parameters.result_directory = context.local_dir
+
+ def noRunFound(self, idx, point, context):
+ return True
+
+ def runjob(self, idx, point, context):
+ """
+ Create, launch and wait for the end of the job.
+ """
+ import random
+ sleep_delay = random.randint(5, 15) #10
+ #launcher = pydefx.salome_proxy.getLauncher()
+ launcher = context.launcher
+ context.job_id = launcher.createJob(context.params.salome_parameters)
+ launcher.launchJob(context.job_id)
+ jobState = launcher.getJobState(context.job_id)
+ while jobState=="QUEUED" or jobState=="IN_PROCESS" or jobState=="RUNNING" :
+ time.sleep(sleep_delay)
+ jobState = launcher.getJobState(context.job_id)
+
+ def getResult(self, context):
+ """
+ Check the job state, fetch the result file.
+ """
+ #launcher = pydefx.salome_proxy.getLauncher()
+ launcher = context.launcher
+ jobState = launcher.getJobState(context.job_id)
+ error=""
+ result=None
+ if jobState != "FINISHED" :
+ error = "Job has not finished correctly."
+ else:
+ launcher.getJobResults(context.job_id, "")
+ error_file = os.path.join(context.local_dir, "idefixerror.txt")
+ result_file = os.path.join(context.local_dir, "idefixresult.txt")
+ with open(error_file, "r") as f:
+ error = f.read()
+ with open(result_file, "r") as f:
+ result_str = f.read()
+ result = eval(result_str)
+
+ return error, result
+
+def createExecutor(config):
+ return JobExecutor(config)
--- /dev/null
+import os
+import pickle
+import time
+import traceback
+import subprocess
+
+class Context:
+ def __init__(self):
+ #self.launcher = pydefx.salome_proxy.getLauncher() # getLauncher()
+ pass
+
+class JobExecutor:
+ def __init__(self, config):
+ self.config = config
+
+ def initialize(self):
+ """ This is executed before the first evaluation.
+ Put here global processing needed by all the evaluations like the copy of
+ commun files.
+ """
+ pass
+
+ def evaluate(self, idx, point):
+ """ This is executed for every point to be evaluated.
+ """
+ context = Context()
+ error = None
+ out_values = None
+ studymodule=self.config["studymodule"]
+ import importlib
+ try:
+ idefixstudy=importlib.import_module(studymodule)
+ out_values=idefixstudy._exec(**point)
+ except Exception as e:
+ error=str(e)
+ traceback.print_exc()
+ return error, out_values
+
+def createExecutor(config):
+ return JobExecutor(config)
--- /dev/null
+import os
+import pickle
+import time
+import traceback
+import subprocess
+
+class Context:
+ def __init__(self):
+ pass
+
+class JobExecutor:
+ def __init__(self, config):
+ self.config = config
+
+ def initialize(self):
+ """ This is executed before the first evaluation.
+ Put here global processing needed by all the evaluations like the copy of
+ commun files.
+ """
+ pointeval = os.path.join(os.getcwd(), "pointeval.py")
+ os.chmod(pointeval, 0o755)
+
+ def evaluate(self, idx, point):
+ """ This is executed for every point to be evaluated.
+ """
+ context = Context()
+ error = None
+ out_values = None
+ try:
+ self.prepare(idx, point, context)
+ if self.noRunFound(idx, point, context):
+ self.runjob(idx, point, context)
+ error, out_values = self.getResult(context)
+ except Exception as e:
+ error = str(e)
+ traceback.print_exc()
+ return error, out_values
+
+ def prepare(self, idx, point, context):
+ """
+ Define local and remote work directory.
+ Define job script.
+ """
+ root_dir = os.getcwd()
+ point_name = "job_"+str(idx)
+ context.local_dir = os.path.join(root_dir, point_name)
+ if not os.path.exists(context.local_dir):
+ os.mkdir(context.local_dir)
+ # export the point to a file
+ data_file_name = "idefixdata.csv"
+ data_file_path = os.path.join(context.local_dir, data_file_name)
+ with open(data_file_path, "w") as f:
+ # explicit dict convertion is needed for compatibility between python versions
+ f.write(repr(dict(point)))
+
+ def noRunFound(self, idx, point, context):
+ return True
+
+ def runjob(self, idx, point, context):
+ """
+ Create, launch and wait for the end of the job.
+ """
+ pointeval = os.path.join(os.getcwd(), "pointeval.py")
+ return_code = subprocess.check_call(pointeval, shell=True, cwd=context.local_dir)
+
+ def getResult(self, context):
+ """
+ Check the job state, fetch the result file.
+ """
+ error_file = os.path.join(context.local_dir, "idefixerror.txt")
+ result_file = os.path.join(context.local_dir, "idefixresult.txt")
+ with open(error_file, "r") as f:
+ error = f.read()
+ with open(result_file, "r") as f:
+ result_str = f.read()
+ result = eval(result_str)
+
+ return error, result
+
+def createExecutor(config):
+ return JobExecutor(config)
--- /dev/null
+#! /usr/bin/env python3
+import json
+import importlib
+from multiprocessing import Pool
+import traceback
+
+class StartJob:
+ def __init__(self, executor):
+ self.executor = executor
+
+ def __call__(self, idx, in_values):
+ error=None
+ out_values=None
+ try:
+ error, out_values = self.executor.evaluate(idx, in_values)
+ except Exception as e:
+ error=str(e)
+ traceback.print_exc()
+ return idx, in_values, out_values, error
+
+class TerminateJob:
+ def __init__(self, manager):
+ self.manager = manager
+
+ def __call__(self, result):
+ # without try statement we may experience deadlock in case of error.
+ try:
+ idx, in_values, out_values, error = result
+ if not error:
+ error = None
+ self.manager.addResult(idx, in_values, out_values, error)
+ except Exception as e:
+ traceback.print_exc()
+
+if __name__ == '__main__':
+ with open("idefixconfig.json", "r") as f:
+ config = json.load(f)
+ plugin_module = importlib.import_module(config["plugin"])
+ executor = plugin_module.createExecutor(config)
+ # global initialization - commun work for every evaluation.
+ executor.initialize()
+
+ itModuleName = config["sampleIterator"]
+ itModule = importlib.import_module(itModuleName)
+ sampleManager = itModule.SampleIterator()
+ sampleManager.writeHeaders()
+
+ nbbranches=config["nbbranches"]
+ pool = Pool(nbbranches)
+ runPoint = StartJob(executor)
+ endOk = TerminateJob(sampleManager)
+ for point in sampleManager:
+ pool.apply_async(runPoint, point, callback=endOk)
+ pool.close()
+ pool.join()
+ sampleManager.terminate()
--- /dev/null
+#! /usr/bin/env python3
+import traceback
+import os
+
+data_file_name = "idefixdata.csv"
+study_module = "idefixstudy.py"
+error_result = "idefixerror.txt"
+value_result = "idefixresult.txt"
+traceback_result = "idefixtraceback.txt"
+
+with open(data_file_name, "r") as f:
+ values = f.read()
+inputvals = eval(values)
+
+error=""
+result=None
+old_dir = os.getcwd()
+
+try:
+ os.chdir("..") # go to commun root directory
+ with open(study_module, "r") as study_file:
+ study_string = study_file.read()
+ exec(study_string)
+ result = _exec(**inputvals)
+except Exception as e:
+ error=str(e)
+ os.chdir(old_dir) # back to the current case job directory
+ with open(traceback_result, "w") as f:
+ traceback.print_exc(file=f)
+
+os.chdir(old_dir) # back to the current case job directory
+
+with open(error_result, "w") as f:
+ f.write(error)
+
+with open(value_result, "w") as f:
+ f.write(repr(result))
--- /dev/null
+import os
+import pickle
+import time
+import traceback
+import subprocess
+
+class Context:
+ def __init__(self):
+ #self.launcher = pydefx.salome_proxy.getLauncher() # getLauncher()
+ pass
+
+class JobExecutor:
+ def __init__(self, config):
+ self.config = config
+
+ def initialize(self):
+ """
+ Execute prescript.
+ """
+ pointeval = os.path.join(os.getcwd(), "pointeval.py")
+ os.chmod(pointeval, 0o755)
+
+ def evaluate(self, idx, point):
+ """ This is executed for every point to be evaluated.
+ """
+ context = Context()
+ error = None
+ out_values = None
+ try:
+ self.prepare(idx, point, context)
+ if self.noRunFound(idx, point, context):
+ self.runjob(idx, point, context)
+ error, out_values = self.getResult(context)
+ except Exception as e:
+ error = str(e)
+ traceback.print_exc()
+ return error, out_values
+
+ def prepare(self, idx, point, context):
+ """
+ Define local and remote work directory.
+ Define job script.
+ """
+ root_dir = os.getcwd()
+ point_name = "job_"+str(idx)
+ context.local_dir = os.path.join(root_dir, point_name)
+ if not os.path.exists(context.local_dir):
+ os.mkdir(context.local_dir)
+ # export the point to a file
+ data_file_name = "idefixdata.csv"
+ data_file_path = os.path.join(context.local_dir, data_file_name)
+ with open(data_file_path, "w") as f:
+ # explicit dict convertion is needed for compatibility between python versions
+ f.write(repr(dict(point)))
+
+ def noRunFound(self, idx, point, context):
+ return True
+
+ def runjob(self, idx, point, context):
+ """
+ Create, launch and wait for the end of the job.
+ """
+ # srun
+ ntasks = self.config["tasksPerEval"]
+ pointeval = os.path.join(os.getcwd(), "pointeval.py")
+ command = "srun --ntasks={} --nodes=1 --chdir={} {} ".format(
+ str(ntasks),
+ context.local_dir,
+ pointeval)
+ return_code = subprocess.call(command, shell=True)
+
+ def getResult(self, context):
+ """
+ Check the job state, fetch the result file.
+ """
+ error_file = os.path.join(context.local_dir, "idefixerror.txt")
+ result_file = os.path.join(context.local_dir, "idefixresult.txt")
+ with open(error_file, "r") as f:
+ error = f.read()
+ with open(result_file, "r") as f:
+ result_str = f.read()
+ result = eval(result_str)
+
+ return error, result
+
+def createExecutor(config):
+ return JobExecutor(config)
job_string = loadJobString(path)
launcher = salome_proxy.getLauncher()
self.job_id = launcher.restoreJob(job_string)
- if job_id >= 0:
- salome_params = launcher.getJobParameters(job_id)
+ if self.job_id >= 0:
+ salome_params = launcher.getJobParameters(self.job_id)
self.params = parameters.Parameters(salome_parameters=salome_params)
self.getResult()
return self.job_id
if exit_code == "0" :
errorIfNoResults = True # we expect to have full results
else:
- errorMessage = "An error occured during the execution of the YACS schema."
+ errorMessage = "An error occured during the execution of the job."
else:
- errorMessage = "Failed to get the exit code of the YACS schema execution."
+ errorMessage = "Failed to get the exit code of the job."
elif state == "RUNNING" or state == "PAUSED" or state == "ERROR" :
# partial results may be available
inputFiles = self.sampleManager.prepareRun(self.sample, result_directory)
# export nbbranches
- configpath = os.path.join(result_directory, "idefixconfig.json")
dicconfig = {}
dicconfig["nbbranches"] = self.params.nb_branches
dicconfig["studymodule"] = "idefixstudy"
dicconfig["sampleIterator"] = self.sampleManager.getModuleName()
- with open(configpath, "w") as f:
- json.dump(dicconfig, f, indent=2)
+ configpath = configuration.exportConfig(dicconfig, result_directory)
studypath = os.path.join(result_directory, "idefixstudy.py")
with open(studypath, "w") as f:
f.write(script.script)
raise StudyUseException("Parameter {} not found in the sample.".format(nm))
### Deprecated!!!!
-def dumpJob(result_directory, jobString):
- """
- Save the jobString to a file into result_directory.
- result_directory is a string representing a path to a directory.
- jobString is a string representing the serialization of a job.
- Use loadJobString for reloading the string saved here.
- """
- jobDumpPath = os.path.join(result_directory, PyStudy.JOB_DUMP_NAME)
- with open(jobDumpPath, "w") as f:
- f.write(job_string)
-
def loadJobString(result_directory):
"""
Return the jobString saved by the dumpJob function into a directory.
- Use dumpJob for saving a the string.
+ Use dumpJob for saving the string.
"""
jobDumpPath = os.path.join(result_directory, PyStudy.JOB_DUMP_NAME)
with open(jobDumpPath, "r") as f:
def writeHeaders(self):
"""
- This function can be called after initInputIterator and before the first
- call to addResult in order to write the names of the parameters in the
- result file.
+ This function can be called before the first call to addResult in order to
+ write the names of the parameters in the result file.
"""
if self.directory:
outputnamespath = os.path.join(self.directory,
def addResult(self, currentId, currentInput, currentOutput, currentError):
"""
- You need to call initInputIterator and writeHeaders before the first call
- of this function.
+ You need to call writeHeaders before the first call of this function.
currentId : int value
currentInput : dictionary {"input name":value}
currentOutput : result returned by _exec. Can be a tuple, a simple value or
--- /dev/null
+# -*- coding: utf-8 -*-
+# Copyright (C) 2019 EDF R&D
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+# See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
+#
+import inspect
+import pathlib
+import os
+from .allpurposebuilder import AllPurposeBuilder
+
+class SlurmBuilder(AllPurposeBuilder):
+ def __init__(self, executor = None, pointEval = None, mainJob = None):
+ filename = inspect.getframeinfo(inspect.currentframe()).filename
+ install_root_directory = pathlib.Path(filename).resolve().parent
+ install_files_directory = os.path.join(install_root_directory, "plugins")
+
+ if executor is None:
+ executor = os.path.join(install_files_directory, "srunexecutor.py")
+ super().__init__(executor, pointEval, mainJob)
--- /dev/null
+# -*- coding: utf-8 -*-
+# Copyright (C) 2019 EDF R&D
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+# See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
+#
+import copy
+import os
+import json
+from . import pystudy
+from . import slurmbuilder
+from . import salome_proxy
+from . import configuration
+
+class SlurmStudy(pystudy.PyStudy):
+ def __init__(self, sampleManager=None, schemaBuilder=None):
+ if schemaBuilder is None:
+ schemaBuilder = slurmbuilder.SlurmBuilder()
+ super().__init__(sampleManager, schemaBuilder)
+
+ def createNewJob(self, script, sample, params):
+ # TODO: modifier le copier/coller
+ self._check(script,sample)
+ self.sample = sample
+ self.params = copy.deepcopy(params)
+ main_job_work_dir = self.params.salome_parameters.result_directory
+ if not os.path.exists(main_job_work_dir):
+ os.makedirs(main_job_work_dir)
+ # set the parameters of the local job
+ self.params.salome_parameters.job_type = self.jobType()
+
+ result_directory = self.params.salome_parameters.result_directory
+ # export sample to result_directory
+ inputFiles = self.sampleManager.prepareRun(self.sample, result_directory)
+ inputFiles.extend([self.schemaBuilder.getExecutor(),
+ self.schemaBuilder.getPointEval()])
+ self.params.salome_parameters.job_file = self.schemaBuilder.getMainJob()
+
+ # export config
+ dicconfig = {}
+ dicconfig["nbbranches"] = self.params.nb_branches
+ dicconfig["studymodule"] = "idefixstudy"
+ dicconfig["sampleIterator"] = self.sampleManager.getModuleName()
+ dicconfig["plugin"] = self.schemaBuilder.getPluginName()
+ nbproc = self.params.salome_parameters.resource_required.nb_proc
+ dicconfig["tasksPerEval"] = nbproc // self.params.nb_branches
+ configpath = configuration.exportConfig(dicconfig, result_directory)
+ studypath = os.path.join(result_directory, "idefixstudy.py")
+ with open(studypath, "w") as f:
+ f.write(script.script)
+
+ inputFiles.extend([configpath, studypath])
+
+ # this list manipulation is needed because in_files is not a python list
+ # if we don't use a salome session. In that case swig uses a python tuple
+ # in order to map a std::list as a parameter of a structure.
+ in_files_as_list = list(self.params.salome_parameters.in_files)
+ self.params.salome_parameters.in_files = in_files_as_list + inputFiles
+ launcher = salome_proxy.getLauncher()
+ self.job_id = launcher.createJob(self.params.salome_parameters)
+ return self.job_id
+
+ def jobType(self):
+ return "command_salome"
--- /dev/null
+import pydefx.configuration
+import pydefx.salome_proxy
+import os
+import time
+
+def _exec(n):
+ # get the job parameters
+ salome_parameters = pydefx.configuration.loadJobConfig()
+
+ launcher = pydefx.salome_proxy.getLauncher() # CORBA or not CORBA
+
+ # have a different working directory for each computation
+ resource = salome_parameters.resource_required.name
+ default_wd = pydefx.configuration.defaultWorkingDir(resource)
+ new_wd = os.path.join(default_wd, "myjob_"+str(n))
+ salome_parameters.work_directory = new_wd
+
+ # create and launch the job
+ job_id = launcher.createJob(salome_parameters)
+ launcher.launchJob(job_id)
+
+ # wait for the end of the job
+ jobState = launcher.getJobState(job_id)
+ while jobState != "FINISHED" and jobState != "FAILED" :
+ time.sleep(5)
+ jobState = launcher.getJobState(job_id)
+ return jobState
--- /dev/null
+import pydefx
+import os
+
+myParams = pydefx.Parameters()
+myParams.configureResource("eole")
+#myParams.createResultDirectory("/tmp")
+myParams.nb_branches = 4
+myParams.salome_parameters.resource_required.nb_proc = 4
+myParams.salome_parameters.result_directory=os.path.join(os.getcwd(),"rundir")
+myParams.salome_parameters.work_directory="/scratch/I35256/workingdir/testjob/"
+myParams.salome_parameters.local_directory = os.getcwd()
+myParams.salome_parameters.in_files=["template_jdd.txt", "mysolver.py"]
+
+pyScript = os.path.join(os.getcwd(), "mystudy.py")
+
+myScript = pydefx.PyScript()
+myScript.loadFile(pyScript)
+
+mySample = myScript.CreateEmptySample()
+mydata = {"x":range(10)}
+mySample.setInputValues(mydata)
+
+myStudy = pydefx.PyStudy()
+myStudy.createNewJob(myScript, mySample, myParams)
+myStudy.launch()
+
+myStudy.getJobState()
+myStudy.wait()
+print(myStudy.getResult())
+print(myStudy.sample)
+#print(myStudy.global_result)
--- /dev/null
+import pydefx
+import os
+
+myParams = pydefx.Parameters()
+myParams.configureResource("localhost")
+myParams.nb_branches = 4
+myParams.salome_parameters.resource_required.nb_proc = 1
+myParams.salome_parameters.work_directory=os.path.join(os.getcwd(),"runbasic")
+myParams.salome_parameters.local_directory = os.getcwd()
+
+pyScript = """
+def _exec(a,b):
+ d = a / b
+ return d
+"""
+
+myScript = pydefx.PyScript()
+myScript.loadString(pyScript)
+
+mySample = myScript.CreateEmptySample()
+mydata = {"a":[x // 10 for x in range(100)],
+ "b":[x % 10 for x in range(100)]}
+mySample.setInputValues(mydata)
+
+myStudy = pydefx.LocalStudy(schemaBuilder=pydefx.LocalBuilder("lightexecutor"))
+myStudy.createNewJob(myScript, mySample, myParams)
+myStudy.launch()
+
+myStudy.getJobState()
+myStudy.wait()
+print(myStudy.getResult())
+print(myStudy.sample)
--- /dev/null
+import pydefx
+import os
+
+myParams = pydefx.Parameters()
+myParams.nb_branches = 4
+myParams.salome_parameters.work_directory=os.path.join(os.getcwd(),"runbasic")
+myParams.salome_parameters.local_directory = os.getcwd()
+myParams.salome_parameters.resource_required.nb_proc = 1
+myParams.salome_parameters.job_name = "basic_job"
+myParams.salome_parameters.job_type = "command"
+myParams.salome_parameters.job_file = os.path.join(os.getcwd(), "simple_command.sh")
+myParams.salome_parameters.resource_required.name = "eole"
+myParams.salome_parameters.wckey = "P11N0:SALOME"
+
+myScript = pydefx.PyScript()
+
+pyScript = os.path.join(os.getcwd(), "jobstudy.py")
+myScript.loadFile(pyScript)
+
+mySample = myScript.CreateEmptySample()
+mydata = {"n":range(10)}
+mySample.setInputValues(mydata)
+
+myStudy = pydefx.LocalStudy(schemaBuilder=pydefx.LocalBuilder("lightexecutor"))
+myStudy.createNewJob(myScript, mySample, myParams)
+myStudy.launch()
+
+myStudy.getJobState()
+myStudy.wait()
+print(myStudy.getResult())
+print(myStudy.sample)
--- /dev/null
+import pydefx
+import os
+
+myParams = pydefx.Parameters()
+myParams.configureResource("eole")
+#myParams.createResultDirectory("/tmp")
+myParams.nb_branches = 4
+myParams.salome_parameters.resource_required.nb_proc = 1
+myParams.salome_parameters.result_directory=os.path.join(os.getcwd(),"runmulti")
+myParams.salome_parameters.work_directory="/scratch/I35256/workingdir/test_multijob/"
+myParams.salome_parameters.local_directory = os.getcwd()
+myParams.salome_parameters.in_files=["template_jdd.txt", "mysolver.py"]
+
+pyScript = os.path.join(os.getcwd(), "mystudy.py")
+
+myScript = pydefx.PyScript()
+myScript.loadFile(pyScript)
+
+mySample = myScript.CreateEmptySample()
+mydata = {"x":range(10)}
+mySample.setInputValues(mydata)
+
+myStudy = pydefx.MultiJobStudy()
+myStudy.createNewJob(myScript, mySample, myParams)
+myStudy.launch()
+
+myStudy.getJobState()
+myStudy.wait()
+print(myStudy.getResult())
+print(myStudy.sample)
+#print(myStudy.global_result)
--- /dev/null
+import pydefx
+import os
+
+myParams = pydefx.Parameters()
+myParams.configureResource("eole")
+#myParams.createResultDirectory("/tmp")
+myParams.nb_branches = 4
+myParams.salome_parameters.resource_required.nb_proc = 4
+myParams.salome_parameters.result_directory=os.path.join(os.getcwd(),"runsrun")
+myParams.salome_parameters.work_directory="/scratch/I35256/workingdir/test_srunjob/"
+myParams.salome_parameters.local_directory = os.getcwd()
+myParams.salome_parameters.in_files=["template_jdd.txt", "mysolver.py"]
+
+pyScript = os.path.join(os.getcwd(), "mystudy.py")
+
+myScript = pydefx.PyScript()
+myScript.loadFile(pyScript)
+
+mySample = myScript.CreateEmptySample()
+mydata = {"x":range(10)}
+mySample.setInputValues(mydata)
+
+myStudy = pydefx.SlurmStudy()
+myStudy.createNewJob(myScript, mySample, myParams)
+myStudy.launch()
+
+myStudy.getJobState()
+myStudy.wait()
+print(myStudy.getResult())
+print(myStudy.sample)
+#print(myStudy.global_result)
--- /dev/null
+#! /usr/bin/env python3
+import argparse
+if __name__ == '__main__':
+ parser = argparse.ArgumentParser(description="Fake solver.")
+ parser.add_argument("jdd", help="Input file.")
+ parser.add_argument("resultat", help="Output file.")
+ args = parser.parse_args()
+ with open(args.jdd, 'r') as f:
+ in_value = float(f.read())
+ with open(args.resultat, 'w') as f:
+ f.write(str(in_value * in_value))
--- /dev/null
+import os
+def root_dir():
+ return os.getcwd()
+
+#def case_dir(*args):
+ #import hashlib
+ #h = hashlib.md5(repr(args).encode('utf-8'))
+ #return os.path.join(root_dir(), h.hexdigest())
+
+def case_dir(v):
+ case_name = "c_"+repr(v)
+ return os.path.join(root_dir(), case_name)
+
+class Study:
+ def __init__(self, value):
+ self.value = value
+ self.caseDir = case_dir(self.value)
+ self.rootDir = root_dir()
+
+ def getResults(self):
+ result_file = os.path.join(self.caseDir, "result.txt")
+ with open(result_file, 'r') as f:
+ result = float(f.read())
+ return result
+
+ def caseExists(self):
+ ok = True
+ if os.path.isdir(self.caseDir):
+ try:
+ self.getResults()
+ ok = True
+ except:
+ ok = False
+ else:
+ ok = False
+ return ok
+
+ def prepareCase(self):
+ if not os.path.isdir(self.caseDir):
+ os.mkdir(self.caseDir)
+ template_file = os.path.join(self.rootDir, "template_jdd.txt")
+ case_file = os.path.join(self.caseDir, "jdd.txt")
+ with open(template_file,'r') as f:
+ filedata = f.read()
+ filedata = filedata.format(**{'value':repr(self.value)})
+ with open(case_file,'w') as f:
+ f.write(filedata)
+
+ def runCase(self):
+ import subprocess
+ command = "{} {} {}".format(
+ os.path.join(self.rootDir, "mysolver.py"),
+ os.path.join(self.caseDir, "jdd.txt"),
+ os.path.join(self.caseDir, "result.txt"))
+ subprocess.run(command, shell=True)
+
+def _exec(x):
+ e = Study(x)
+ if not e.caseExists():
+ e.prepareCase()
+ e.runCase()
+ r = e.getResults()
+ return r