Salome HOME
Use pickel serialization for objects which are not numbers or strings.
authorOvidiu Mircescu <ovidiu.mircescu@edf.fr>
Fri, 28 Jun 2019 14:34:23 +0000 (16:34 +0200)
committerOvidiu Mircescu <ovidiu.mircescu@edf.fr>
Fri, 28 Jun 2019 14:34:23 +0000 (16:34 +0200)
src/cpp/TMonoPyJob.hxx
src/cpp/Test/StudyGeneralTest.cxx
src/cpp/Test/StudyRestartTest.cxx
src/pydefx/CMakeLists.txt
src/pydefx/sample.py
src/pydefx/samplecsviterator.py [deleted file]
src/pydefx/samplecsvmanager.py
src/pydefx/schemas/plugin.py

index 06ee22b9bed4a75abca53557830329fbe4081bc2..d5bb00b164094702bec5a0ca7d907af9af29dc7d 100644 (file)
 #define YDEFX_TMONOPYJOB_HXX
 #include "JobParametersProxy.hxx"
 #include "MonoPyJob.hxx"
-#include "Sample.hxx"
 #include "SamplePyConversions.hxx"
 #include "PyStudyFunction.hxx"
-#include <py2cpp/py2cpp.hxx>
 
 namespace ydefx
 {
index 49e25e4061517dda7b6800deb699cd80b82c99c3..2a3bf7572725dd82d8b7e0c2fa45e0190ae54a21 100644 (file)
@@ -41,11 +41,13 @@ void SampleTest::fullStudy()
 
     ydefx::JobParametersProxy jobParams;
     jobParams.configureResource("localhost");
+    jobParams.work_directory(jobParams.work_directory() + "/GeneralTest");
     jobParams.createResultDirectory("/tmp");
     std::string pyScript = 
 "def _exec(a, b):\n"
 "  d = a / b\n"
-"  return d\n";
+"  t = ['object which needs pickel protocol']\n"
+"  return d,t\n";
 
     ydefx::PyStudyFunction studyFunction;
     studyFunction.loadString(pyScript);
@@ -56,13 +58,16 @@ void SampleTest::fullStudy()
     const std::list<std::string>& outputs = studyFunction.outputNames();
     CPPUNIT_ASSERT(std::find(outputs.begin(), outputs.end(), "d")
                                                               != outputs.end());
-    
-    ydefx::Sample<double> sample;
+    CPPUNIT_ASSERT(std::find(outputs.begin(), outputs.end(), "t")
+                                                              != outputs.end());
+
+    ydefx::Sample<double, py2cpp::PyPtr > sample;
     std::vector<double> a_vals = {1.1, 4.4, 9, 4};
     std::vector<double> b_vals = {1.1, 2.2, 3, 1};
     sample.inputs<double>().set("a", a_vals);
     sample.inputs<double>().set("b", b_vals);
     sample.outputs<double>().addName("d");
+    sample.outputs<py2cpp::PyPtr >().addName("t");
 
     ydefx::Launcher l;
     ydefx::Job* myJob = l.submitMonoPyJob(studyFunction, sample, jobParams);
@@ -90,6 +95,10 @@ void SampleTest::fullStudy()
     std::vector<double> expectedResult = {1,2,3,4};
     const std::vector<double>& result = sample.outputs<double>().get("d");
     CPPUNIT_ASSERT(expectedResult == result);
+    const std::vector<py2cpp::PyPtr>& pyobjResult
+                                     = sample.outputs<py2cpp::PyPtr>().get("t");
+    for(const py2cpp::PyPtr& obj : pyobjResult)
+      CPPUNIT_ASSERT(obj.repr() == "['object which needs pickel protocol']");
     delete myJob;
   }
   Py_Finalize();
index 7636c6598874e23b234c0dba5e1b6a3a45b7465b..354f4833846e84112377b0025f991c3da4b3e3e0 100644 (file)
@@ -45,6 +45,7 @@ void SampleTest::studyTest()
   {
     ydefx::JobParametersProxy jobParams;
     jobParams.configureResource("localhost");
+    jobParams.work_directory(jobParams.work_directory() + "/RestartTest");
     jobParams.createResultDirectory("/tmp");
 
     std::string pyScript = 
index 6d5d94c894427c21285cd363b04a7a3b45de752e..e8b1d94f41db0b3160bc3d54ff668e7e57f8ebd6 100644 (file)
@@ -23,7 +23,6 @@ SET(SCRIPTS
   pyscript.py
   pystudy.py
   sample.py
-  samplecsviterator.py
   samplecsvmanager.py
   defaultschemabuilder.py
   )
index 76297d4eb029a8c2ce0f985753f7c51a209e4ed6..6e6ead7d634214298c154a38dc04f9b27f636ccf 100644 (file)
@@ -47,10 +47,10 @@ class Sample:
                       #        None = point not evaluated,
                       #          "" = results available, no error
                       # "any string"= error message for that index.
-  
+
   def __iter__(self):
     return Dico_iter(self._input)
-  
+
   def inputIterator(self):
     """
     Iterate over input values.
@@ -117,7 +117,6 @@ class Sample:
         foundId = curId
         break
     return foundId
-    
 
   def isValidId(self, idToCheck, inputToCheck):
     """
diff --git a/src/pydefx/samplecsviterator.py b/src/pydefx/samplecsviterator.py
deleted file mode 100644 (file)
index 8e79dc0..0000000
+++ /dev/null
@@ -1,113 +0,0 @@
-# Copyright (C) 2019  EDF R&D
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2.1 of the License, or (at your option) any later version.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
-#
-# See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
-#
-import csv
-import os
-
-def tofile(t):
-  result = {}
-  for k,v in t:
-    if isinstance(v, (int, float)):
-      result[k]=v
-    else:
-      result[k] = repr(e)
-  return result
-
-def fromfile(t):
-  result = {}
-  for k,v in t:
-    if isinstance(v, str):
-      result[k] = eval(e)
-    else:
-      result[k] = e
-  return result
-
-class SampleIterator:
-  DATAFILE = "idefixdata.csv"
-  OUTPUTNAMESFILE = "idefixoutputnames.csv"
-  RESULTFILE = "idefixresult.csv"
-  ERRORCOLUMN = "idefix_error"
-  IDCOLUMN ="idefix_id"
-  def __init__(self, directory=None):
-    if directory:
-      datapath = os.path.join(directory, SampleIterator.DATAFILE)
-      outputnamespath = os.path.join(directory, SampleIterator.OUTPUTNAMESFILE)
-      resultpath = os.path.join(directory, SampleIterator.RESULTFILE)
-    else:
-      datapath = SampleIterator.DATAFILE
-      outputnamespath = SampleIterator.OUTPUTNAMESFILE
-      resultpath = SampleIterator.RESULTFILE
-
-    self.iterNb   = -1
-    self.datafile = open(datapath, newline='')
-    self.data     = csv.DictReader(self.datafile, quoting=csv.QUOTE_NONNUMERIC)
-    result_columns = [SampleIterator.IDCOLUMN]
-    result_columns.extend(self.data.fieldnames)
-    self.outputnames = _loadOutputNames(outputnamespath)
-    result_columns.extend(self.outputnames)
-    result_columns.append(SampleIterator.ERRORCOLUMN)
-    self.result_file = open(resultpath, 'w', newline='')
-    self.result_csv = csv.DictWriter( self.result_file,
-                                      fieldnames=result_columns,
-                                      quoting=csv.QUOTE_NONNUMERIC )
-    self.result_csv.writeheader()
-    self.result_file.flush()
-
-  def addResult(self, currentId, currentInput, currentOutput, currentError):
-    """
-    currentId : int value
-    currentInput : dictionary {"input name":value}
-    currentOutput : result returned by _exec.  Can be a tuple, a simple value or
-    None in case of error.
-    currentError : string or None if no error
-    """
-    currentRecord = {}
-    currentRecord[SampleIterator.IDCOLUMN] = currentId
-    for name in self.data.fieldnames:
-      currentRecord[name] = currentInput[name]
-    if currentError is None:
-      if len(self.outputnames) == 1 :
-        outputname = self.outputnames[0]
-        currentRecord[outputname] = currentOutput
-      elif len(self.outputnames) > 1 :
-        outputIter = iter(currentOutput)
-        for name in self.outputnames:
-          currentRecord[name] = next(outputIter)
-    else:
-      for name in self.outputnames:
-        currentRecord[name] = None
-    currentRecord[SampleIterator.ERRORCOLUMN] = currentError
-    self.result_csv.writerow(currentRecord)
-    self.result_file.flush()
-
-  def __next__(self):
-    self.iterNb += 1
-    return self.iterNb, next(self.data)
-    #TODO: we need to close the files somewhere, but we cannot do it here
-
-  def __iter__(self):
-    return self
-
-# Private functions
-def _loadOutputNames(filepath):
-    outputnames = []
-    with open(filepath, "r") as namesfile:
-      for line in namesfile:
-        line = line.rstrip() # remove whitespaces at the end
-        outputnames.append(line)
-    return outputnames
index 819593f667333ac318cda919287cde40d8e743ff..6ffa78befa41343a239666861feca6210fef741a 100644 (file)
@@ -20,14 +20,26 @@ import csv
 import inspect
 import os
 import pathlib
-from .samplecsviterator import SampleIterator
-from . import samplecsviterator
+import numbers
+import pickle
 from . import sample
 
 class SampleManager:
+  DATAFILE = "idefixdata.csv"
+  OUTPUTNAMESFILE = "idefixoutputnames.csv"
+  RESULTDIR = "idefixresult" # directory which contains all the result files
+  RESULTFILE = "idefixresult.csv" # main result file - values for every point
+  GLOBALFILE = "idefixglobal"     # global result - one value for the whole simulation
+  ERRORCOLUMN = "idefix_error"
+  IDCOLUMN ="idefix_id"
+  ESCAPE_CHAR = "@"   # prefix a value that needs particular save/load procedure
+  PICK_CHAR = "p"     # @p : csv value saved in another file using pickle
   def __init__(self):
+    self.datafile = None
+    self.result_file = None
     pass
 
+  # Functions used by the study
   def prepareRun(self, sample, directory):
     """
     Create a dump of the sample in the given directory.
@@ -36,7 +48,7 @@ class SampleManager:
                copied. This directory should be already created.
     Return a list of files to add to the input files list of the job.
     """
-    datapath = os.path.join(directory, SampleIterator.DATAFILE)
+    datapath = os.path.join(directory, SampleManager.DATAFILE)
     with open(datapath, 'w', newline='') as csvfile:
       writer = csv.DictWriter(csvfile,
                               fieldnames=sample.getInputNames(),
@@ -44,41 +56,41 @@ class SampleManager:
       writer.writeheader()
       writer.writerows(sample.inputIterator())
 
-    outnamespath = os.path.join(directory, SampleIterator.OUTPUTNAMESFILE)
+    outnamespath = os.path.join(directory, SampleManager.OUTPUTNAMESFILE)
     with open(outnamespath, 'w') as outputfile:
       for v in sample.getOutputNames():
         outputfile.write(v+'\n')
     filename = inspect.getframeinfo(inspect.currentframe()).filename
-    install_directory = pathlib.Path(filename).resolve().parent
-    iteratorFile = os.path.join(install_directory, "samplecsviterator.py")
     return [datapath,
             outnamespath,
-            iteratorFile
+            filename
             ]
 
   def loadResult(self, sample, directory):
-    """ The directory should contain a file with the name given by
-    getResultFileName. The results are loaded from that file to the sample.
+    """
+    The directory should contain a RESULTDIR directory with the result files.
+    The results are loaded into the sample.
     Return the modified sample.
     """
-    datapath = os.path.join(directory, SampleIterator.RESULTFILE)
+    resultdir = os.path.join(directory, SampleManager.RESULTDIR)
+    datapath = os.path.join(resultdir, SampleManager.RESULTFILE)
     with open(datapath, newline='') as datafile:
       data = csv.DictReader(datafile, quoting=csv.QUOTE_NONNUMERIC)
       for elt in data:
-        index = int(elt[SampleIterator.IDCOLUMN]) # float to int
+        index = int(elt[SampleManager.IDCOLUMN]) # float to int
         input_vals = {}
         for name in sample.getInputNames():
           input_vals[name] = elt[name]
         output_vals = {}
         for name in sample.getOutputNames():
-          output_vals[name] = elt[name]
+          output_vals[name] = self.decodeOutput(elt[name], resultdir)
         try:
           sample.checkId(index, input_vals)
         except Exception as err:
           extraInfo = "Error on processing file {} index number {}:".format(
                                                 datapath,       str(index))
           raise Exception(extraInfo + str(err))
-        sample.addResult(index, output_vals, elt[SampleIterator.ERRORCOLUMN])
+        sample.addResult(index, output_vals, elt[SampleManager.ERRORCOLUMN])
     return sample
 
   def loadSample(self, directory):
@@ -86,27 +98,190 @@ class SampleManager:
     sample object is created and returned from those files.
     This function is used to recover a previous run.
     """
-    outputnamesfile = os.path.join(directory, SampleIterator.OUTPUTNAMESFILE)
-    outputnames = samplecsviterator._loadOutputNames(outputnamesfile)
-    inputFilePath = os.path.join(directory, SampleIterator.DATAFILE)
-    with open(inputFilePath) as datafile:
-      data = csv.DictReader(datafile, quoting=csv.QUOTE_NONNUMERIC)
-      inputvalues = {}
-      for name in data.fieldnames:
-        inputvalues[name] = []
-      for line in data:
-        for name in data.fieldnames:
-          inputvalues[name].append(line[name])
-    result = sample.Sample(data.fieldnames, outputnames)
+    sampleIt = self.initInputIterator(directory)
+    inputvalues = {}
+    for name in self.inputnames:
+      inputvalues[name] = []
+    for newid, values in sampleIt:
+      for name in self.inputnames:
+        inputvalues[name].append(values[name])
+    
+    result = sample.Sample(self.inputnames, self.outputnames)
     result.setInputValues(inputvalues)
+    self.terminate()
     return result
-        
 
   def getModuleName(self):
     """
-    Return the module name which contains the class SampleIterator.
+    Return the module name which contains the class SampleManager.
     """
-    return "samplecsviterator"
+    return __name__
   
   def getResultFileName(self):
-    return SampleIterator.RESULTFILE
+    """
+    Name of the file or directory which contains the result and needs to be
+    copied from the remote computer.
+    """
+    return SampleManager.RESULTDIR
+
+  # Functions used by the optimizerloop plugin
+  def initInputIterator(self, directory=None):
+    """
+    Iterate over the input values read from the csv file.
+    """
+    if directory:
+      datapath = os.path.join(directory, SampleManager.DATAFILE)
+      outputnamespath = os.path.join(directory, SampleManager.OUTPUTNAMESFILE)
+      self.directory = directory
+    else:
+      datapath = SampleManager.DATAFILE
+      outputnamespath = SampleManager.OUTPUTNAMESFILE
+      self.directory = None
+
+    self.datafile = open(datapath, newline='')
+    data     = csv.DictReader(self.datafile, quoting=csv.QUOTE_NONNUMERIC)
+    self.inputnames = data.fieldnames
+    self.outputnames = _loadOutputNames(outputnamespath)
+    return InputSampleIterator(data)
+
+  def writeHeaders(self):
+    """
+    This function can be called after initInputIterator and before the first
+    call to addResult in order to write the names of the parameters in the
+    result file.
+    """
+    if self.directory:
+      resultdir = os.path.join(self.directory, SampleManager.RESULTDIR)
+      outputnamespath = os.path.join(self.directory,
+                                     SampleManager.OUTPUTNAMESFILE)
+    else:
+      resultdir = SampleManager.RESULTDIR
+      outputnamespath = SampleManager.OUTPUTNAMESFILE
+    os.makedirs(resultdir, exist_ok=True)
+    resultpath = os.path.join(resultdir, SampleManager.RESULTFILE)
+    result_columns = [SampleManager.IDCOLUMN]
+    result_columns.extend(self.inputnames)
+    result_columns.extend(self.outputnames)
+    result_columns.append(SampleManager.ERRORCOLUMN)
+    self.result_file = open(resultpath, 'w', newline='')
+    self.result_csv = csv.DictWriter( self.result_file,
+                                      fieldnames=result_columns,
+                                      quoting=csv.QUOTE_NONNUMERIC )
+    self.result_csv.writeheader()
+    self.result_file.flush()
+
+  def addResult(self, currentId, currentInput, currentOutput, currentError):
+    """
+    You need to call initInputIterator and writeHeaders before the first call
+    of this function.
+    currentId : int value
+    currentInput : dictionary {"input name":value}
+    currentOutput : result returned by _exec.  Can be a tuple, a simple value or
+    None in case of error.
+    currentError : string or None if no error
+    """
+    currentRecord = {}
+    currentRecord[SampleManager.IDCOLUMN] = currentId
+    for name in self.inputnames:
+      currentRecord[name] = currentInput[name]
+    if currentError is None:
+      if len(self.outputnames) == 1 :
+        outputname = self.outputnames[0]
+        currentRecord[outputname] = self.codeOutput(currentOutput,
+                                                    currentId,
+                                                    outputname)
+      elif len(self.outputnames) > 1 :
+        outputIter = iter(currentOutput)
+        for name in self.outputnames:
+          currentRecord[name]=self.codeOutput(next(outputIter), currentId, name)
+    else:
+      for name in self.outputnames:
+        currentRecord[name] = None
+    currentRecord[SampleManager.ERRORCOLUMN] = currentError
+    self.result_csv.writerow(currentRecord)
+    self.result_file.flush()
+
+  def terminate(self):
+    """
+    Call this function at the end of the simulation in order to close every
+    open files.
+    """
+    if not self.datafile is None:
+      self.datafile.close()
+      self.datafile = None
+    if not self.result_file is None:
+      self.result_file.close()
+      self.result_file = None
+
+  # Read and write results (output parameters)
+  def codeOutput(self, value, currentId, name):
+    """
+    Define how a value should be saved.
+    value: object to be saved - value of a parameter
+    currentId: number of the current line (current point).
+    name: name of the parameter (name of the column in the csv file).
+    return: string to be saved in the csv file.
+    """
+    res = None
+    if isinstance(value, numbers.Number):
+      res = value
+    elif isinstance(value, str):
+      res = value
+      if res[0:1] == SampleManager.ESCAPE_CHAR :
+        res = SampleManager.ESCAPE_CHAR + res
+    else:
+      file_name = "idefixresult-{}-{}.pick".format(name, currentId)
+      res = SampleManager.ESCAPE_CHAR + SampleManager.PICK_CHAR + file_name
+      file_path = os.path.join(SampleManager.RESULTDIR, file_name)
+      if self.directory :
+        file_path = os.path.join(self.directory, file_path)
+      with open(file_path, "wb") as f:
+        pickle.dump(value, f)
+    return res
+
+  def decodeOutput(self, obj, resultdir):
+    """
+    Decode a value read from the csv file.
+    obj: object to decode (string or number).
+    resultdir : directory which contains the result files
+    return: decoded object.
+    """
+    res = None
+    if isinstance(obj, numbers.Number):
+      res = obj
+    elif isinstance(obj, str):
+      res = obj
+      if res[0:1] == SampleManager.ESCAPE_CHAR :
+        res = res[1:]
+        if res[0:1] == SampleManager.ESCAPE_CHAR :# obj = @@string begins with@
+          pass
+        elif res[0:1] == SampleManager.PICK_CHAR:# obj = @pidefixresult-x-1.pick
+          file_path = os.path.join(resultdir, res[1:])
+          with open(file_path, "rb") as f:
+            res = pickle.load(f)
+        else:
+          raise Exception("Unknown escape value:" + obj)
+    return res
+
+class InputSampleIterator:
+  """
+  Iterator used to iterate over the input values of a sample, adding an order
+  number.
+  """
+  def __init__(self, iterable):
+    self.it = iter(iterable)
+    self.iterNb = -1
+  def __next__(self):
+    self.iterNb += 1
+    return self.iterNb, next(self.it)
+  def __iter__(self):
+    return self
+
+# Private functions
+def _loadOutputNames(filepath):
+    outputnames = []
+    with open(filepath, "r") as namesfile:
+      for line in namesfile:
+        line = line.rstrip() # remove whitespaces at the end
+        outputnames.append(line)
+    return outputnames
index 3abc68f0c154443c19d6186d739ab2820e0fc6df..a5fe5b58f98793b5ba993a08418fecec470882a3 100644 (file)
@@ -38,11 +38,13 @@ class myalgosync(SALOMERuntime.OptimizerAlgSync):
     """Start to fill the pool with samples to evaluate."""
     itModuleName = self.config["sampleIterator"]
     itModule = importlib.import_module(itModuleName)
-    self.data = itModule.SampleIterator()
+    self.manager = itModule.SampleManager()
+    self.iterator = self.manager.initInputIterator()
+    self.manager.writeHeaders()
     values=None
     for i in range(0, self.getNbOfBranches()):
       try:
-        newid, values = next(self.data)
+        newid, values = next(self.iterator)
         self.pool.pushInSample(newid, pickle.dumps(values, protocol=0).decode())
       except StopIteration:
         pass
@@ -57,9 +59,9 @@ class myalgosync(SALOMERuntime.OptimizerAlgSync):
     sample = pickle.loads(samplebyte)
     resultbyte=self.pool.getCurrentOutSample().getStringValue().encode()
     error,result = pickle.loads(resultbyte)
-    self.data.addResult(currentId, sample, result, error)
+    self.manager.addResult(currentId, sample, result, error)
     try:
-      newid, values = next(self.data)
+      newid, values = next(self.iterator)
       self.pool.pushInSample(newid, pickle.dumps(values, protocol=0).decode())
     except StopIteration:
       pass
@@ -67,6 +69,12 @@ class myalgosync(SALOMERuntime.OptimizerAlgSync):
   def finish(self):
     """Optional method called when the algorithm has finished, successfully
        or not, to perform any necessary clean up."""
+    # We need a try catch because finish is also called at the beginning of
+    # the algorthm, before any other call.
+    try:
+      self.manager.terminate()
+    except:
+      pass
     self.pool.destroyAll()
 
   def getAlgoResult(self):