From: Ovidiu Mircescu Date: Fri, 28 Jun 2019 14:34:23 +0000 (+0200) Subject: Use pickel serialization for objects which are not numbers or strings. X-Git-Tag: V9_4_0rc1~11 X-Git-Url: http://git.salome-platform.org/gitweb/?a=commitdiff_plain;h=348d4136d07671ddebb28c24bc5944b4c9caebcf;p=tools%2Fydefx.git Use pickel serialization for objects which are not numbers or strings. --- diff --git a/src/cpp/TMonoPyJob.hxx b/src/cpp/TMonoPyJob.hxx index 06ee22b..d5bb00b 100644 --- a/src/cpp/TMonoPyJob.hxx +++ b/src/cpp/TMonoPyJob.hxx @@ -20,10 +20,8 @@ #define YDEFX_TMONOPYJOB_HXX #include "JobParametersProxy.hxx" #include "MonoPyJob.hxx" -#include "Sample.hxx" #include "SamplePyConversions.hxx" #include "PyStudyFunction.hxx" -#include namespace ydefx { diff --git a/src/cpp/Test/StudyGeneralTest.cxx b/src/cpp/Test/StudyGeneralTest.cxx index 49e25e4..2a3bf75 100644 --- a/src/cpp/Test/StudyGeneralTest.cxx +++ b/src/cpp/Test/StudyGeneralTest.cxx @@ -41,11 +41,13 @@ void SampleTest::fullStudy() ydefx::JobParametersProxy jobParams; jobParams.configureResource("localhost"); + jobParams.work_directory(jobParams.work_directory() + "/GeneralTest"); jobParams.createResultDirectory("/tmp"); std::string pyScript = "def _exec(a, b):\n" " d = a / b\n" -" return d\n"; +" t = ['object which needs pickel protocol']\n" +" return d,t\n"; ydefx::PyStudyFunction studyFunction; studyFunction.loadString(pyScript); @@ -56,13 +58,16 @@ void SampleTest::fullStudy() const std::list& outputs = studyFunction.outputNames(); CPPUNIT_ASSERT(std::find(outputs.begin(), outputs.end(), "d") != outputs.end()); - - ydefx::Sample sample; + CPPUNIT_ASSERT(std::find(outputs.begin(), outputs.end(), "t") + != outputs.end()); + + ydefx::Sample sample; std::vector a_vals = {1.1, 4.4, 9, 4}; std::vector b_vals = {1.1, 2.2, 3, 1}; sample.inputs().set("a", a_vals); sample.inputs().set("b", b_vals); sample.outputs().addName("d"); + sample.outputs().addName("t"); ydefx::Launcher l; ydefx::Job* myJob = l.submitMonoPyJob(studyFunction, sample, jobParams); @@ -90,6 +95,10 @@ void SampleTest::fullStudy() std::vector expectedResult = {1,2,3,4}; const std::vector& result = sample.outputs().get("d"); CPPUNIT_ASSERT(expectedResult == result); + const std::vector& pyobjResult + = sample.outputs().get("t"); + for(const py2cpp::PyPtr& obj : pyobjResult) + CPPUNIT_ASSERT(obj.repr() == "['object which needs pickel protocol']"); delete myJob; } Py_Finalize(); diff --git a/src/cpp/Test/StudyRestartTest.cxx b/src/cpp/Test/StudyRestartTest.cxx index 7636c65..354f483 100644 --- a/src/cpp/Test/StudyRestartTest.cxx +++ b/src/cpp/Test/StudyRestartTest.cxx @@ -45,6 +45,7 @@ void SampleTest::studyTest() { ydefx::JobParametersProxy jobParams; jobParams.configureResource("localhost"); + jobParams.work_directory(jobParams.work_directory() + "/RestartTest"); jobParams.createResultDirectory("/tmp"); std::string pyScript = diff --git a/src/pydefx/CMakeLists.txt b/src/pydefx/CMakeLists.txt index 6d5d94c..e8b1d94 100644 --- a/src/pydefx/CMakeLists.txt +++ b/src/pydefx/CMakeLists.txt @@ -23,7 +23,6 @@ SET(SCRIPTS pyscript.py pystudy.py sample.py - samplecsviterator.py samplecsvmanager.py defaultschemabuilder.py ) diff --git a/src/pydefx/sample.py b/src/pydefx/sample.py index 76297d4..6e6ead7 100644 --- a/src/pydefx/sample.py +++ b/src/pydefx/sample.py @@ -47,10 +47,10 @@ class Sample: # None = point not evaluated, # "" = results available, no error # "any string"= error message for that index. - + def __iter__(self): return Dico_iter(self._input) - + def inputIterator(self): """ Iterate over input values. @@ -117,7 +117,6 @@ class Sample: foundId = curId break return foundId - def isValidId(self, idToCheck, inputToCheck): """ diff --git a/src/pydefx/samplecsviterator.py b/src/pydefx/samplecsviterator.py deleted file mode 100644 index 8e79dc0..0000000 --- a/src/pydefx/samplecsviterator.py +++ /dev/null @@ -1,113 +0,0 @@ -# Copyright (C) 2019 EDF R&D -# -# This library is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License as published by the Free Software Foundation; either -# version 2.1 of the License, or (at your option) any later version. -# -# This library is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public -# License along with this library; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -# -# See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com -# -import csv -import os - -def tofile(t): - result = {} - for k,v in t: - if isinstance(v, (int, float)): - result[k]=v - else: - result[k] = repr(e) - return result - -def fromfile(t): - result = {} - for k,v in t: - if isinstance(v, str): - result[k] = eval(e) - else: - result[k] = e - return result - -class SampleIterator: - DATAFILE = "idefixdata.csv" - OUTPUTNAMESFILE = "idefixoutputnames.csv" - RESULTFILE = "idefixresult.csv" - ERRORCOLUMN = "idefix_error" - IDCOLUMN ="idefix_id" - def __init__(self, directory=None): - if directory: - datapath = os.path.join(directory, SampleIterator.DATAFILE) - outputnamespath = os.path.join(directory, SampleIterator.OUTPUTNAMESFILE) - resultpath = os.path.join(directory, SampleIterator.RESULTFILE) - else: - datapath = SampleIterator.DATAFILE - outputnamespath = SampleIterator.OUTPUTNAMESFILE - resultpath = SampleIterator.RESULTFILE - - self.iterNb = -1 - self.datafile = open(datapath, newline='') - self.data = csv.DictReader(self.datafile, quoting=csv.QUOTE_NONNUMERIC) - result_columns = [SampleIterator.IDCOLUMN] - result_columns.extend(self.data.fieldnames) - self.outputnames = _loadOutputNames(outputnamespath) - result_columns.extend(self.outputnames) - result_columns.append(SampleIterator.ERRORCOLUMN) - self.result_file = open(resultpath, 'w', newline='') - self.result_csv = csv.DictWriter( self.result_file, - fieldnames=result_columns, - quoting=csv.QUOTE_NONNUMERIC ) - self.result_csv.writeheader() - self.result_file.flush() - - def addResult(self, currentId, currentInput, currentOutput, currentError): - """ - currentId : int value - currentInput : dictionary {"input name":value} - currentOutput : result returned by _exec. Can be a tuple, a simple value or - None in case of error. - currentError : string or None if no error - """ - currentRecord = {} - currentRecord[SampleIterator.IDCOLUMN] = currentId - for name in self.data.fieldnames: - currentRecord[name] = currentInput[name] - if currentError is None: - if len(self.outputnames) == 1 : - outputname = self.outputnames[0] - currentRecord[outputname] = currentOutput - elif len(self.outputnames) > 1 : - outputIter = iter(currentOutput) - for name in self.outputnames: - currentRecord[name] = next(outputIter) - else: - for name in self.outputnames: - currentRecord[name] = None - currentRecord[SampleIterator.ERRORCOLUMN] = currentError - self.result_csv.writerow(currentRecord) - self.result_file.flush() - - def __next__(self): - self.iterNb += 1 - return self.iterNb, next(self.data) - #TODO: we need to close the files somewhere, but we cannot do it here - - def __iter__(self): - return self - -# Private functions -def _loadOutputNames(filepath): - outputnames = [] - with open(filepath, "r") as namesfile: - for line in namesfile: - line = line.rstrip() # remove whitespaces at the end - outputnames.append(line) - return outputnames diff --git a/src/pydefx/samplecsvmanager.py b/src/pydefx/samplecsvmanager.py index 819593f..6ffa78b 100644 --- a/src/pydefx/samplecsvmanager.py +++ b/src/pydefx/samplecsvmanager.py @@ -20,14 +20,26 @@ import csv import inspect import os import pathlib -from .samplecsviterator import SampleIterator -from . import samplecsviterator +import numbers +import pickle from . import sample class SampleManager: + DATAFILE = "idefixdata.csv" + OUTPUTNAMESFILE = "idefixoutputnames.csv" + RESULTDIR = "idefixresult" # directory which contains all the result files + RESULTFILE = "idefixresult.csv" # main result file - values for every point + GLOBALFILE = "idefixglobal" # global result - one value for the whole simulation + ERRORCOLUMN = "idefix_error" + IDCOLUMN ="idefix_id" + ESCAPE_CHAR = "@" # prefix a value that needs particular save/load procedure + PICK_CHAR = "p" # @p : csv value saved in another file using pickle def __init__(self): + self.datafile = None + self.result_file = None pass + # Functions used by the study def prepareRun(self, sample, directory): """ Create a dump of the sample in the given directory. @@ -36,7 +48,7 @@ class SampleManager: copied. This directory should be already created. Return a list of files to add to the input files list of the job. """ - datapath = os.path.join(directory, SampleIterator.DATAFILE) + datapath = os.path.join(directory, SampleManager.DATAFILE) with open(datapath, 'w', newline='') as csvfile: writer = csv.DictWriter(csvfile, fieldnames=sample.getInputNames(), @@ -44,41 +56,41 @@ class SampleManager: writer.writeheader() writer.writerows(sample.inputIterator()) - outnamespath = os.path.join(directory, SampleIterator.OUTPUTNAMESFILE) + outnamespath = os.path.join(directory, SampleManager.OUTPUTNAMESFILE) with open(outnamespath, 'w') as outputfile: for v in sample.getOutputNames(): outputfile.write(v+'\n') filename = inspect.getframeinfo(inspect.currentframe()).filename - install_directory = pathlib.Path(filename).resolve().parent - iteratorFile = os.path.join(install_directory, "samplecsviterator.py") return [datapath, outnamespath, - iteratorFile + filename ] def loadResult(self, sample, directory): - """ The directory should contain a file with the name given by - getResultFileName. The results are loaded from that file to the sample. + """ + The directory should contain a RESULTDIR directory with the result files. + The results are loaded into the sample. Return the modified sample. """ - datapath = os.path.join(directory, SampleIterator.RESULTFILE) + resultdir = os.path.join(directory, SampleManager.RESULTDIR) + datapath = os.path.join(resultdir, SampleManager.RESULTFILE) with open(datapath, newline='') as datafile: data = csv.DictReader(datafile, quoting=csv.QUOTE_NONNUMERIC) for elt in data: - index = int(elt[SampleIterator.IDCOLUMN]) # float to int + index = int(elt[SampleManager.IDCOLUMN]) # float to int input_vals = {} for name in sample.getInputNames(): input_vals[name] = elt[name] output_vals = {} for name in sample.getOutputNames(): - output_vals[name] = elt[name] + output_vals[name] = self.decodeOutput(elt[name], resultdir) try: sample.checkId(index, input_vals) except Exception as err: extraInfo = "Error on processing file {} index number {}:".format( datapath, str(index)) raise Exception(extraInfo + str(err)) - sample.addResult(index, output_vals, elt[SampleIterator.ERRORCOLUMN]) + sample.addResult(index, output_vals, elt[SampleManager.ERRORCOLUMN]) return sample def loadSample(self, directory): @@ -86,27 +98,190 @@ class SampleManager: sample object is created and returned from those files. This function is used to recover a previous run. """ - outputnamesfile = os.path.join(directory, SampleIterator.OUTPUTNAMESFILE) - outputnames = samplecsviterator._loadOutputNames(outputnamesfile) - inputFilePath = os.path.join(directory, SampleIterator.DATAFILE) - with open(inputFilePath) as datafile: - data = csv.DictReader(datafile, quoting=csv.QUOTE_NONNUMERIC) - inputvalues = {} - for name in data.fieldnames: - inputvalues[name] = [] - for line in data: - for name in data.fieldnames: - inputvalues[name].append(line[name]) - result = sample.Sample(data.fieldnames, outputnames) + sampleIt = self.initInputIterator(directory) + inputvalues = {} + for name in self.inputnames: + inputvalues[name] = [] + for newid, values in sampleIt: + for name in self.inputnames: + inputvalues[name].append(values[name]) + + result = sample.Sample(self.inputnames, self.outputnames) result.setInputValues(inputvalues) + self.terminate() return result - def getModuleName(self): """ - Return the module name which contains the class SampleIterator. + Return the module name which contains the class SampleManager. """ - return "samplecsviterator" + return __name__ def getResultFileName(self): - return SampleIterator.RESULTFILE + """ + Name of the file or directory which contains the result and needs to be + copied from the remote computer. + """ + return SampleManager.RESULTDIR + + # Functions used by the optimizerloop plugin + def initInputIterator(self, directory=None): + """ + Iterate over the input values read from the csv file. + """ + if directory: + datapath = os.path.join(directory, SampleManager.DATAFILE) + outputnamespath = os.path.join(directory, SampleManager.OUTPUTNAMESFILE) + self.directory = directory + else: + datapath = SampleManager.DATAFILE + outputnamespath = SampleManager.OUTPUTNAMESFILE + self.directory = None + + self.datafile = open(datapath, newline='') + data = csv.DictReader(self.datafile, quoting=csv.QUOTE_NONNUMERIC) + self.inputnames = data.fieldnames + self.outputnames = _loadOutputNames(outputnamespath) + return InputSampleIterator(data) + + def writeHeaders(self): + """ + This function can be called after initInputIterator and before the first + call to addResult in order to write the names of the parameters in the + result file. + """ + if self.directory: + resultdir = os.path.join(self.directory, SampleManager.RESULTDIR) + outputnamespath = os.path.join(self.directory, + SampleManager.OUTPUTNAMESFILE) + else: + resultdir = SampleManager.RESULTDIR + outputnamespath = SampleManager.OUTPUTNAMESFILE + os.makedirs(resultdir, exist_ok=True) + resultpath = os.path.join(resultdir, SampleManager.RESULTFILE) + result_columns = [SampleManager.IDCOLUMN] + result_columns.extend(self.inputnames) + result_columns.extend(self.outputnames) + result_columns.append(SampleManager.ERRORCOLUMN) + self.result_file = open(resultpath, 'w', newline='') + self.result_csv = csv.DictWriter( self.result_file, + fieldnames=result_columns, + quoting=csv.QUOTE_NONNUMERIC ) + self.result_csv.writeheader() + self.result_file.flush() + + def addResult(self, currentId, currentInput, currentOutput, currentError): + """ + You need to call initInputIterator and writeHeaders before the first call + of this function. + currentId : int value + currentInput : dictionary {"input name":value} + currentOutput : result returned by _exec. Can be a tuple, a simple value or + None in case of error. + currentError : string or None if no error + """ + currentRecord = {} + currentRecord[SampleManager.IDCOLUMN] = currentId + for name in self.inputnames: + currentRecord[name] = currentInput[name] + if currentError is None: + if len(self.outputnames) == 1 : + outputname = self.outputnames[0] + currentRecord[outputname] = self.codeOutput(currentOutput, + currentId, + outputname) + elif len(self.outputnames) > 1 : + outputIter = iter(currentOutput) + for name in self.outputnames: + currentRecord[name]=self.codeOutput(next(outputIter), currentId, name) + else: + for name in self.outputnames: + currentRecord[name] = None + currentRecord[SampleManager.ERRORCOLUMN] = currentError + self.result_csv.writerow(currentRecord) + self.result_file.flush() + + def terminate(self): + """ + Call this function at the end of the simulation in order to close every + open files. + """ + if not self.datafile is None: + self.datafile.close() + self.datafile = None + if not self.result_file is None: + self.result_file.close() + self.result_file = None + + # Read and write results (output parameters) + def codeOutput(self, value, currentId, name): + """ + Define how a value should be saved. + value: object to be saved - value of a parameter + currentId: number of the current line (current point). + name: name of the parameter (name of the column in the csv file). + return: string to be saved in the csv file. + """ + res = None + if isinstance(value, numbers.Number): + res = value + elif isinstance(value, str): + res = value + if res[0:1] == SampleManager.ESCAPE_CHAR : + res = SampleManager.ESCAPE_CHAR + res + else: + file_name = "idefixresult-{}-{}.pick".format(name, currentId) + res = SampleManager.ESCAPE_CHAR + SampleManager.PICK_CHAR + file_name + file_path = os.path.join(SampleManager.RESULTDIR, file_name) + if self.directory : + file_path = os.path.join(self.directory, file_path) + with open(file_path, "wb") as f: + pickle.dump(value, f) + return res + + def decodeOutput(self, obj, resultdir): + """ + Decode a value read from the csv file. + obj: object to decode (string or number). + resultdir : directory which contains the result files + return: decoded object. + """ + res = None + if isinstance(obj, numbers.Number): + res = obj + elif isinstance(obj, str): + res = obj + if res[0:1] == SampleManager.ESCAPE_CHAR : + res = res[1:] + if res[0:1] == SampleManager.ESCAPE_CHAR :# obj = @@string begins with@ + pass + elif res[0:1] == SampleManager.PICK_CHAR:# obj = @pidefixresult-x-1.pick + file_path = os.path.join(resultdir, res[1:]) + with open(file_path, "rb") as f: + res = pickle.load(f) + else: + raise Exception("Unknown escape value:" + obj) + return res + +class InputSampleIterator: + """ + Iterator used to iterate over the input values of a sample, adding an order + number. + """ + def __init__(self, iterable): + self.it = iter(iterable) + self.iterNb = -1 + def __next__(self): + self.iterNb += 1 + return self.iterNb, next(self.it) + def __iter__(self): + return self + +# Private functions +def _loadOutputNames(filepath): + outputnames = [] + with open(filepath, "r") as namesfile: + for line in namesfile: + line = line.rstrip() # remove whitespaces at the end + outputnames.append(line) + return outputnames diff --git a/src/pydefx/schemas/plugin.py b/src/pydefx/schemas/plugin.py index 3abc68f..a5fe5b5 100644 --- a/src/pydefx/schemas/plugin.py +++ b/src/pydefx/schemas/plugin.py @@ -38,11 +38,13 @@ class myalgosync(SALOMERuntime.OptimizerAlgSync): """Start to fill the pool with samples to evaluate.""" itModuleName = self.config["sampleIterator"] itModule = importlib.import_module(itModuleName) - self.data = itModule.SampleIterator() + self.manager = itModule.SampleManager() + self.iterator = self.manager.initInputIterator() + self.manager.writeHeaders() values=None for i in range(0, self.getNbOfBranches()): try: - newid, values = next(self.data) + newid, values = next(self.iterator) self.pool.pushInSample(newid, pickle.dumps(values, protocol=0).decode()) except StopIteration: pass @@ -57,9 +59,9 @@ class myalgosync(SALOMERuntime.OptimizerAlgSync): sample = pickle.loads(samplebyte) resultbyte=self.pool.getCurrentOutSample().getStringValue().encode() error,result = pickle.loads(resultbyte) - self.data.addResult(currentId, sample, result, error) + self.manager.addResult(currentId, sample, result, error) try: - newid, values = next(self.data) + newid, values = next(self.iterator) self.pool.pushInSample(newid, pickle.dumps(values, protocol=0).decode()) except StopIteration: pass @@ -67,6 +69,12 @@ class myalgosync(SALOMERuntime.OptimizerAlgSync): def finish(self): """Optional method called when the algorithm has finished, successfully or not, to perform any necessary clean up.""" + # We need a try catch because finish is also called at the beginning of + # the algorthm, before any other call. + try: + self.manager.terminate() + except: + pass self.pool.destroyAll() def getAlgoResult(self):