From cd9612fd2542d1bc371c34f2c0ea32375e13ad8b Mon Sep 17 00:00:00 2001 From: Jean-Philippe ARGAUD Date: Thu, 6 Jun 2024 18:37:34 +0200 Subject: [PATCH] Documentation and models update, pst4mod --- .../TwoDimensionalInverseDistanceCS2010.py | 14 +- .../TwoDimensionalRosenbrockFunctionR1960.py | 113 + .../daNumerics/pst4mod/__init__.py | 3 + .../pst4mod/modelica_calibration/__init__.py | 257 ++ .../pst4mod/modelica_calibration/case.py | 2721 +++++++++++++++++ .../pst4mod/modelica_libraries/__init__.py | 21 + .../modelica_libraries/around_simulation.py | 865 ++++++ .../automatic_simulation.py | 447 +++ .../change_working_point.py | 769 +++++ .../pst4mod/modelica_libraries/functions.py | 66 + .../modelica_libraries/write_in_dsin.py | 201 ++ 11 files changed, 5470 insertions(+), 7 deletions(-) create mode 100644 src/daComposant/daNumerics/Models/TwoDimensionalRosenbrockFunctionR1960.py create mode 100644 src/daComposant/daNumerics/pst4mod/modelica_calibration/__init__.py create mode 100644 src/daComposant/daNumerics/pst4mod/modelica_calibration/case.py create mode 100644 src/daComposant/daNumerics/pst4mod/modelica_libraries/__init__.py create mode 100644 src/daComposant/daNumerics/pst4mod/modelica_libraries/around_simulation.py create mode 100644 src/daComposant/daNumerics/pst4mod/modelica_libraries/automatic_simulation.py create mode 100644 src/daComposant/daNumerics/pst4mod/modelica_libraries/change_working_point.py create mode 100644 src/daComposant/daNumerics/pst4mod/modelica_libraries/functions.py create mode 100644 src/daComposant/daNumerics/pst4mod/modelica_libraries/write_in_dsin.py diff --git a/src/daComposant/daNumerics/Models/TwoDimensionalInverseDistanceCS2010.py b/src/daComposant/daNumerics/Models/TwoDimensionalInverseDistanceCS2010.py index c86e9e0..052d3eb 100644 --- a/src/daComposant/daNumerics/Models/TwoDimensionalInverseDistanceCS2010.py +++ b/src/daComposant/daNumerics/Models/TwoDimensionalInverseDistanceCS2010.py @@ -29,7 +29,7 @@ class TwoDimensionalInverseDistanceCS2010: s(x,y,µ) = 1 / sqrt( (x-µ1)² + (y-µ2)² + 0.1² ) - avec (x,y) ∈ [0.1,0.9]² et µ=(µ1,µ2) ∈ [-1,-0.01]². + with (x,y) ∈ [0.1,0.9]² and µ=(µ1,µ2) ∈ [-1,-0.01]². This is the non-linear parametric function (3.38) of the reference: Chaturantabut, S., Sorensen, D. C., @@ -43,14 +43,14 @@ class TwoDimensionalInverseDistanceCS2010: self.x = numpy.linspace(0.1, 0.9, self.nx, dtype=float) self.y = numpy.linspace(0.1, 0.9, self.ny, dtype=float) - def FunctionH(self, XX ): + def FieldG(self, mu ): "Fonction simulation pour un paramètre donné" - __mu1, __mu2 = numpy.ravel(XX) + mu1, mu2 = numpy.ravel( mu ) # - __x, __y = numpy.meshgrid( self.x, self.y ) - __sxymu = 1. / numpy.sqrt( (__x - __mu1)**2 + (__y - __mu2)**2 + 0.1**2 ) + x, y = numpy.meshgrid( self.x, self.y ) + sxymu = 1. / numpy.sqrt( (x - mu1)**2 + (y - mu2)**2 + 0.1**2 ) # - return __sxymu + return sxymu def get_x(self): "Renvoie le maillage spatial" @@ -81,7 +81,7 @@ class TwoDimensionalInverseDistanceCS2010: "Renvoie les bornes sur le maillage paramétrique" return [[-1, -0.01]] * 2 - OneRealisation = FunctionH + OneRealisation = FieldG # ============================================================================== class LocalTest(unittest.TestCase): diff --git a/src/daComposant/daNumerics/Models/TwoDimensionalRosenbrockFunctionR1960.py b/src/daComposant/daNumerics/Models/TwoDimensionalRosenbrockFunctionR1960.py new file mode 100644 index 0000000..d7c36e2 --- /dev/null +++ b/src/daComposant/daNumerics/Models/TwoDimensionalRosenbrockFunctionR1960.py @@ -0,0 +1,113 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2008-2024 EDF R&D +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com +# +# Author: Jean-Philippe Argaud, jean-philippe.argaud@edf.fr, EDF R&D + +import sys, unittest, math, numpy + +# ============================================================================== +class TwoDimensionalRosenbrockFunctionR1960: + """ + Two-dimensional spatial function of µ=(a,b): + + f(x,y) = (a - x)² + b (y -x²)² + + with (x,y) ∈ [-2,2]x[-1,3]² and usually a=1, b=100. There is a + global minimum at (x,y) = (a,a²) for which f(x,y) = 0. + + This is the non-linear non-convex parametric function of the reference: + Rosenbrock, H. H., An Automatic Method for Finding the Greatest or + Least Value of a Function, The Computer Journal, 3(3), pp.175–184, + (1960) + """ + def __init__(self, nx: int = 40, ny: int = 40): + "Définition du maillage spatial" + self.nx = max(1, nx) + self.ny = max(1, ny) + self.x = numpy.linspace(-2, 2, self.nx, dtype=float) + self.y = numpy.linspace(-1, 3, self.ny, dtype=float) + + def FieldZ(self, mu ): + "Fonction simulation pour un paramètre donné" + a, b = numpy.ravel( mu ) + # + x, y = numpy.meshgrid( self.x, self.y ) + sxymu = (a - x)**2 + b * (y - x**2)**2 + # + return sxymu + + def FunctionH(self, xy, a = 1, b = 100): + "Construit la fonction de Rosenbrock en L2 (Scipy 1.8.1 p.1322)" + xy = numpy.ravel( xy ).reshape((-1, 2)) # Deux colonnes + x = xy[:, 0] + y = xy[:, 1] + return numpy.array([(a - x), math.sqrt(b) * (y - x**2)]) + + def get_x(self): + "Renvoie le maillage spatial" + return self.x, self.y + + def get_sample_of_mu(self, ns1: int = 20, ns2: int = 20): + "Renvoie l'échantillonnage paramétrique régulier" + sa = numpy.linspace(0, 2, ns1, dtype=float) + sb = numpy.linspace(1, 200, ns2, dtype=float) + smu = numpy.array([(a, b) for a in sa for b in sb]) + return smu + + def get_random_sample_of_mu(self, ns1: int = 1, ns2: int = 1): + "Renvoie l'échantillonnage paramétrique aléatoire" + smu = [] + for i in range(ns1 * ns2): + smu1 = numpy.random.uniform(0, 2) + smu2 = numpy.random.uniform(1, 200) + smu.append((smu1, smu2)) + smu = numpy.array(smu) + return smu + + def get_bounds_on_space(self): + "Renvoie les bornes sur le maillage spatial" + return [[min(self.x), max(self.x)], [min(self.y), max(self.y)]] + + def get_bounds_on_parameter(self): + "Renvoie les bornes sur le maillage paramétrique" + return [[0, 2], [1, 200]] + + OneRealisation = FieldZ + +# ============================================================================== +class LocalTest(unittest.TestCase): + @classmethod + def setUpClass(cls): + print('\nAUTODIAGNOSTIC\n==============\n') + print(" " + TwoDimensionalRosenbrockFunctionR1960().__doc__.strip()) + + def test001(self): + numpy.random.seed(123456789) + Equation = TwoDimensionalRosenbrockFunctionR1960() + optimum = Equation.ValueZ( [1, 1] ) + self.assertTrue( max(optimum) <= 0.) + + def tearDown(cls): + print("\n Tests OK\n") + +# ============================================================================== +if __name__ == "__main__": + sys.stderr = sys.stdout + unittest.main(verbosity=0) diff --git a/src/daComposant/daNumerics/pst4mod/__init__.py b/src/daComposant/daNumerics/pst4mod/__init__.py index a32af42..351a873 100644 --- a/src/daComposant/daNumerics/pst4mod/__init__.py +++ b/src/daComposant/daNumerics/pst4mod/__init__.py @@ -19,3 +19,6 @@ # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com # # Author: Jean-Philippe Argaud, jean-philippe.argaud@edf.fr, EDF R&D +__doc__ = """ +Python Support Tools for Modelica +""" diff --git a/src/daComposant/daNumerics/pst4mod/modelica_calibration/__init__.py b/src/daComposant/daNumerics/pst4mod/modelica_calibration/__init__.py new file mode 100644 index 0000000..ee4b328 --- /dev/null +++ b/src/daComposant/daNumerics/pst4mod/modelica_calibration/__init__.py @@ -0,0 +1,257 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2016-2024 EDF R&D +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# Author: Jean-Philippe Argaud, jean-philippe.argaud@edf.fr, EDF R&D +# Author: Luis Corona Mesa-Molles, luis.corona-mesa-molles@edf.fr, EDF R&D + +__doc__ = """ +This module leads to easy calibration of a dynamical model from DYMOLA or +Modelica, with respect to experimental measurements, with the help of +optimization algorithms available in ADAO tool. + +SYNOPSIS +-------- + +The most simple use of the module, using default arguments and files living in +the current directory, is the following: + + from modelica_calibration.case import Calibration + import os + newcase = Calibration() + resultats = newcase.calibrate( + DataName = "measures.txt", + ModelName = os.path.join("Models","dymosim.exe"), + MethodName = "parameters.py", + VariablesToCalibrate = ["COP_val","hVE_val","PEE_val","hEE_val"], + OutputVariables = ["proeeF.T","Ev.P","prospC.T","Sp.Q","proseF.T"], + ) + print() + for k,v in resultats.items(): + print("%18s = %s"%(k,v)) + print() + +ARGUMENTS TO BE GIVEN AS INPUT FOR "CALIBRATE" TASK +--------------------------------------------------- + +The "calibrate" method is used to set and realize a calibration (optimal +estimation task) of model parameters using measures data. It can be directly +called with all the input arguments to set the full calibration problem. + +The input arguments are the following named ones. They are not all required, +and can be divided in category. + +Category: dynamic model description + + ModelName............: required file (or directory name that contains the + files) for the dynamic model. + (default None) + + ModelFormat..........: this indicates the format of the dynamic model. It + can be given as a Dymola executable pair "dymosim.exe/dsin.txt". + (default "GUESS", allowed "DYMOSIM", "PYSIM", "GUESS") + +Category: measures description + + DataName.............: required file name for measures + (default None) + + DataFormat...........: this indicates the format of the measures, given as + columns of chronological series, with the same column names that are + used for variables in the output of the model simulation. + (default "GUESS", allowed "TXT", "CSV", "TSV", "GUESS") + +Category: initial/background values description + + BackgroundName.......: file name for initial/background. If no file name is + given, it is assumed that the initial value will come from model. + (default None) + + BackgroundFormat.....: this indicates the source of the initial values used + as background ones. The background values can be read in ADAO, DSIN or + USER format. The priority is "USER", then "ADAO", then "DSIN". + (default "GUESS", allowed "ADAO", "DSIN", "USER", "GUESS") + +Category: calibration method description + + MethodName...........: file name for ADAO parameters + (default None) + + MethodFormat.........: this indicates the format of the assimilation + parameters, excluding background. + (default "ADAO", allowed "ADAO") + + VariablesToCalibrate.: model names to calibrate, that have to exist as + input variables in the model description. + (default None) + + OutputVariables......: model names of result to compare to measures, that + have to exist as output variables in the model description. + (default None) + +Category: results description + + ResultName...........: required file name for writing results. + (default "results.txt") + + ResultFormat.........: this indicates the format of the result output. + (default "GUESS", allowed "TXT", "PY", "GUESS") + + ResultLevel..........: quantity of results that will be retrieved. By + default, it will always write at least the calibration results. + (default None, allowed None="Final", "Final", "IntermediaryFinal") + + Verbose..............: True or False + (default False) + +ARGUMENTS OUTPUT THAT CAN BE OBTAINDED FOR "CALIBRATE" TASK +----------------------------------------------------------- + +The output results are returned at the end of the calibration process, and +evantually during it. The output will be at least written in a file, and in the +same time returned in a python dictionary containing result information. The +dictionnary entries are the following ones: + + NamesOfParameters....: names of the individual parameter variables to be + calibrated. + + InitialParameters....: initial or background parameter set, as known before + calibration. If it is given by the user, it can differ from the initial + model values embedded in the model. + + OptimalParameters....: optimal parameter set, as known after calibration. + + + NamesOfMeasures......: names of the measures subsets used for the + calibration. + + Measures.............: measures subsets used for calibration + + OptimalSimulation....: simulation results obtained with optimal parameter + set, using the model. It can be directly compared to measures. + +HOW TO DESCRIBE A DYNAMIC MODEL? +-------------------------------- + +Model can be given either by the name of the model parameter file (e.g. +"dsin.txt") with absolute or relative path, or by the directory where live the +DYMOLA compiled executable "dymosim.exe" and the parameters file "dsin.txt". + +HOW TO DESCRIBE THE MEASURES? +----------------------------- + +Measures can be given by colums files, with named colums, in TXT, CVS or TSV +file types (with respective delimiters "space", ";" and "tab" and respective +file extension ".txt", ".csv" and ".tsv"). Beginning lines with "#" indicates a +comment line, with the first uncommented line having to present the names of the +columns variables if available. + +Example of a TXT file: + # Measures text file + # + Date Heure Variable1 Variable2 + 22/04/2018 8:0:0 295 0.01 + 23/04/2018 8:0:0 296 0.01 + 24/04/2018 8:0:0 294 0.01 + 25/04/2018 8:0:0 293 0.001 + 26/04/2018 8:0:0 295 0.01 + 27/04/2018 8:0:0 297 0.5 + +Example of a CSV file: + # Measures text file + # + Date,Heure,Variable1,Variable2 + 22/04/2018,8:0:0,295,0.01 + 23/04/2018,8:0:0,296,0.01 + 24/04/2018,8:0:0,294,0.01 + 25/04/2018,8:0:0,293,0.001 + 26/04/2018,8:0:0,295,0.01 + 27/04/2018,8:0:0,297,0.5 + +HOW TO SET CALIBRATION PARAMETERS? +---------------------------------- + +ADAO parameters can be given by Python file, with named standard variables (see +ADAO documentation for details). All parameters have wise defaults). Error +covariance matrices can be set also in this parameter file. + +The variables, that can (but has not to) be set (see ADAO documentation +for details), are: + Algorithm........: calibration algorithm (default "3DVAR") + Parameters.......: dictionary of optimization parameters and of + optimization bounds (default Model ones) + BackgroundError..: a priori errors covariance matrix (default 1.) + ObservationError.: a priori model-observation errors covariance + matrix (default 1.) + Background.......: see below. + +Initial values and bounds of the variables to calibrate can be read from user +file, ADAO parameters file or model parameters file, allowing to superseed the +model initial default values. The priority is "USER", then "ADAO", then "DSIN", +and the default one correspond to "DSIN". So the background can be set using +three alternative ways: + + - by reading named parameters in a user file, with 2 (or 4) colums + describing pairs (or quadruplets) for each variable with the Name, the + Values (and Min/Max bounds if required). The file can be in CSV or TXT + format. + + Choice: + BackgroundFormat = "USER" + BackgroundName = ... + + Example of CSV file content: + Name,Value,Minimum,Maximum + COP_val,1.4,0.2,1 + hVE_val,2321820,0,None + PEE_val,8.3226E06,0,None + hEE_val,759483,None,None + + - by reading values and bounds as series in the calibration parameters file + with the ADAO standard syntax. + + Choice: + BackgroundFormat = "ADAO" + + Example of subset of Python parameters calibration file: + Parameters={ + "Bounds":[[0.2,1],[0,None],[0,None],[None,None]], + } + Background=[1.4,2321820,8.3226E06,759483] + + - by reading initial values in a model description file as the "dsin.txt" + for Dymola. In this case, the only information required is the names of + the calibrated variables as known in Dymola "dsin.txt" file. + + Choice: + BackgroundFormat = "DSIN" + +LICENSE +------- +The license for this tool and its documentation is the GNU Lesser General +Public License (Lesser GPL). + +REFERENCES +---------- +For more information, see: + * DYMOLA User documentation + * ADAO, a module for Data Assimilation and Optimization, + http://www.salome-platform.org/ + +""" +__author__ = "Jean-Philippe Argaud, jean-philippe.argaud@edf.fr, EDF R&D" +__all__ = ["case"] diff --git a/src/daComposant/daNumerics/pst4mod/modelica_calibration/case.py b/src/daComposant/daNumerics/pst4mod/modelica_calibration/case.py new file mode 100644 index 0000000..65b921c --- /dev/null +++ b/src/daComposant/daNumerics/pst4mod/modelica_calibration/case.py @@ -0,0 +1,2721 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2016-2024 EDF R&D +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# Author: Jean-Philippe Argaud, jean-philippe.argaud@edf.fr, EDF R&D +# Author: Luis Corona Mesa-Molles, luis.corona-mesa-molles@edf.fr, EDF R&D + +__all__ = ["Calibration", "name", "version", "year"] + +# ============================================================================== + +name = "Modelica and Dymola Calibration Tools" +version = "1.0.9.7.0" # "x.x"+"adao version" +year = "2021" + +# ============================================================================== +# Default configuration +# --------------------- +import os, sys, io, shutil, numpy, scipy, tempfile, time, logging, pandas, subprocess, copy, csv +from datetime import timedelta +from datetime import datetime + +try: + import adao + from adao import adaoBuilder + from daCore.Interfaces import ImportFromFile, ImportScalarLinesFromFile +except ImportError: + raise ImportError("ADAO module not found, please install it first.") + +try: + from buildingspy.io.outputfile import Reader +except ImportError: + raise ImportError("buildingspy module not found, please install it first.") + +try: + from pst4mod.modelica_libraries.automatic_simulation import Automatic_Simulation + from pst4mod.modelica_libraries.around_simulation import Around_Simulation + from pst4mod.modelica_libraries import write_in_dsin + from pst4mod.modelica_libraries.change_working_point import Dict_Var_To_Fix + from pst4mod.modelica_libraries.change_working_point import Working_Point_Modification +except ImportError: + raise ImportError("modelica_libraries module not found, please install it first.") + +try: + from fmpy import simulate_fmu + from fmpy.fmi1 import FMICallException +# except ImportError: +# raise ImportError("fmpy library not found, please install it first") +except: + pass + +_configuration = { + "DirectoryModels" : "Models", + "DirectoryMeasures" : "Measures", + "DirectoryMethods" : "Methods", + "DirectoryResults" : "Results", + "Launcher" : "configuration.py", + } +__paths = [] +__paths.append( os.path.abspath(os.path.join(os.path.dirname(__file__), '..','..','buildingspy-1.5.0')) ) +__paths.append( os.path.abspath(os.path.join(os.path.dirname(__file__), '..','..','Packages')) ) +for __path in __paths: + if os.path.exists(__path): sys.path.append(__path) + +# ============================================================================== +class Calibration(object): + "ADAO case based parameters inverse estimation tools" + def __init__(self, Name = "Calibration case", SaveStdoutIn = None, Verbose = False): + """ + Name : name as a string + SaveAllStdoutIn: filename to be used for stdout stream + Verbose : verbose output + """ + self.__name = str(Name) + self.__measures = {} + self.__model = {} + self.__link = {} + self.__method = {} + self.__result = {} + self._setConfigurationDefaults() + self.__verbose = bool(Verbose) + self.__stdoutid = sys.stdout + if SaveStdoutIn is not None: + sys.stdout = open(SaveStdoutIn,"w") + if self.__verbose: + __msg = "[VERBOSE] %s"%self.__name + print("") + print(" %s"%__msg) + print(" %s"%("-"*len(__msg),)) + print("") + VersionsLogicielles() + + def _setConfigurationDefaults(self, Configuration = None): + """ + Impose case directory and files structure configuration defaults based + on argument dictionary + """ + if Configuration is None or type(Configuration) is not dict: + Configuration = _configuration + for __k,__v in Configuration.items(): + setattr(self, __k, __v) + + def _setModelTmpDir(self, __model_name="dsin.txt", __model_format="DYMOSIM", __model_dest = "dsin.txt", __before_tmp = None, __suffix=None): + if __model_name is None: + raise ValueError('Model file or directory has to be set and can not be None') + elif os.path.isfile(__model_name): + __model_nam = os.path.basename(__model_name) + __model_dir = os.path.abspath(os.path.dirname(__model_name)) + __model_dst = __model_dest + elif os.path.isdir(__model_name): + __model_nam = "dsin.txt" + __model_dir = os.path.abspath(__model_name) + __model_dst = __model_dest + else: + raise ValueError('Model file or directory not found using %s'%str(__model_name)) + # + if __before_tmp is not None: # Mettre "../.." si nécessaire + __mtmp = os.path.join(__model_dir, __before_tmp, "tmp") + else: + __mtmp = os.path.join(__model_dir, "tmp") + if not os.path.exists(__mtmp): + os.mkdir(__mtmp) + __prefix = time.strftime('%Y%m%d_%Hh%Mm%Ss_tmp_',time.localtime()) + if __suffix is not None: + __prefix = __prefix + __suffix + "_" + __ltmp = tempfile.mkdtemp( prefix=__prefix, dir=__mtmp ) + # + for sim, pmf, dst in ( + (__model_nam,__model_format,__model_dst), + ("dymosim.exe","DYMOSIM","dymosim.exe"), + ("pythonsim.exe","PYSIM","pythonsim.exe"), + ): + # Recherche un fichier de données ou un simulateur dans cet ordre + + if os.path.isfile(os.path.join(__model_dir, sim)) and (pmf.upper() in [__model_format, "GUESS"]): + shutil.copy( + os.path.join(__model_dir, sim), + os.path.join(__ltmp, dst) + ) + + # _secure_copy_textfile( + # os.path.join(__model_dir, sim), + # os.path.join(__ltmp, dst) + # ) + + break + return __ltmp + + def _setModelTmpDir_simple(self, __model_name="dsin.txt", __model_format="DYMOSIM", __model_dest = "dsin.txt"): + if __model_name is None: + raise ValueError('Model file or directory has to be set and can not be None') + elif os.path.isfile(__model_name): + if __model_format == "OPENMODELICA": #same structure as in the directory case + __model_dir = os.path.abspath(os.path.dirname(__model_name)) + + __model_nam_1 = os.path.basename(__model_name) + __model_nam_2 = __model_nam_1[:-9]+'.exe' + __model_nam = [__model_nam_1, __model_nam_2] #the first found model is kept + + __model_nam_3 = __model_nam_1[:-9]+'_info.json' #check if this file exists as well + if os.path.exists(os.path.join(__model_dir,__model_nam_3)): + __model_nam.append(__model_nam_3) #get the three files necessar for simulation + __model_dst = __model_nam #the same file name is kept + + else: #cas classique + __model_nam = os.path.basename(__model_name) + __model_dir = os.path.abspath(os.path.dirname(__model_name)) + __model_dst = os.path.basename(__model_dest) + elif os.path.isdir(__model_name): + if __model_format == "DYMOSIM": + __model_nam = "dsin.txt" + __model_dir = os.path.abspath(__model_name) + __model_dst = os.path.basename(__model_dest) + + elif __model_format == "FMI" or __model_format == "FMU": + __model_dir = os.path.abspath(__model_name) + __model_nam = [files for files in os.listdir(__model_dir) if files[-4:] =='.fmu'][0] #the first found model is kept + __model_dst = __model_nam #the same file name is kept + + elif __model_format == "OPENMODELICA": + __model_dir = os.path.abspath(__model_name) + __model_nam_1 = [files for files in os.listdir(__model_dir) if files[-4:] =='.xml'][0] #the first found model is kept, .xml extension is considered + __model_nam_2 = __model_nam_1[:-9]+'.exe' + __model_nam = [__model_nam_1, __model_nam_2] #the first found model is kept + + __model_nam_3 = __model_nam_1[:-9]+'_info.json' #check if this file exists as well + if os.path.exists(os.path.join(__model_dir,__model_nam_3)): + __model_nam.append(__model_nam_3) #get the three files necessar for simulation + __model_dst = __model_nam #the same file name is kept + + else : + raise ValueError('If the model is not provided in DYMOSIM, OpenModelica or FMI format, the name of the model must refer to a file name (in a relative way with respect to the configuration file)') + else: + raise ValueError('Model file or directory not found using %s'%str(__model_name)) + # + __mtmp = os.path.join(__model_dir, "tmp") + if not os.path.exists(__mtmp): + os.mkdir(__mtmp) + __ltmp = __mtmp + # + if type(__model_nam) == list: #it means that it is an OpenModelica model + for sim_element in __model_nam: + shutil.copy( + os.path.join(__model_dir, sim_element), + os.path.join(__ltmp, sim_element) #the same name is kept for the files + ) + else: + for sim, pmf, dst in ( + (__model_nam,__model_format,__model_dst), + ("dsin.txt","DYMOSIM","dsin.txt"), + ("pythonsim.exe","PYSIM","pythonsim.exe"), + ): + # Recherche un fichier de données ou un simulateur dans cet ordre + if os.path.isfile(os.path.join(__model_dir, sim)) and (pmf.upper() in [__model_format, "GUESS"]): + + shutil.copy( + os.path.join(__model_dir, sim), + os.path.join(__ltmp, dst) + ) + # _secure_copy_textfile( + # os.path.join(__model_dir, sim), + # os.path.join(__ltmp, dst) + # ) + + break + return __ltmp + + def _get_Name_ModelTmpDir_simple(self, __model_name="dsin.txt"): + if __model_name is None: + raise ValueError('Model file or directory has to be set and can not be None') + elif os.path.isfile(__model_name): + __model_dir = os.path.abspath(os.path.dirname(__model_name)) + elif os.path.isdir(__model_name): + __model_dir = os.path.abspath(__model_name) + else: + raise ValueError('Model file or directory not found using %s'%str(__model_name)) + # + __mtmp = os.path.join(__model_dir, "tmp") + __ltmp = __mtmp + return __ltmp + + def _setModelTmpDir_REF_CALCULATION(self, __model_name="dsin.txt", __model_format="DYMOSIM", __suffix=None, __model_dest = "dsin.txt"): + if __model_name is None: + raise ValueError('Model file or directory has to be set and can not be None') + elif os.path.isfile(__model_name): + __model_dir = os.path.abspath(os.path.dirname(__model_name)) + __model_nam = os.path.basename(__model_name) + elif os.path.isdir(__model_name): + __model_dir = os.path.abspath(__model_name) + __model_nam = "dsin.txt" + else: + raise ValueError('Model file or directory not found using %s'%str(__model_name)) + if __suffix is None: + raise ValueError('Observation files must be named') + # + __mtmp = os.path.join(__model_dir, "tmp") + if not os.path.exists(__mtmp): + os.mkdir(__mtmp) + for sim, pmf in ((__model_nam,__model_format), ("dymosim.exe","DYMOSIM"), ("pythonsim.exe","PYSIM")): + # Recherche un simulateur dans cet ordre + if os.path.isfile(os.path.join(__model_dir, sim)) and (pmf.upper() in [__model_format, "GUESS"]): + __pref = time.strftime('%Y%m%d_%Hh%Mm%Ss_REF_CALCULATION_',time.localtime()) + if __suffix is not None: + __pref = __pref+__suffix+"_" + __ltmp = tempfile.mkdtemp( prefix=__pref, dir=__mtmp ) + shutil.copy( + os.path.join(__model_dir, sim), + os.path.join(__ltmp, __model_dest) + ) + # _secure_copy_textfile( + # os.path.join(__model_dir, sim), + # os.path.join(__ltmp, __model_dest) + # ) + + __ok = True + break + else: + __ok = False + if not __ok: + raise ValueError("Model simulator not found using \"%s\" path and \"%s\" format"%(__model_name,__model_format)) + return __ltmp + + def describeMeasures(self, SourceName, Variables, Format="Guess"): + "To store description of measures" + self.__measures["SourceName"] = str(SourceName) # File or data name + self.__measures["Variables"] = tuple(Variables) # Name of variables + self.__measures["Format"] = str(Format) # File or data format + if self.__verbose: print(" [VERBOSE] Measures described") + return self.__measures + + def describeMultiMeasures(self, SourceNames, Variables, Format="Guess"): + "To store description of measures" + self.__measures["SourceName"] = tuple(SourceNames) # List of file or data name + self.__measures["Variables"] = tuple(Variables) # Name of variables + self.__measures["Format"] = str(Format) # File or data format + if self.__verbose: print(" [VERBOSE] Measures described") + return self.__measures + + def describeModel(self, SourceName, VariablesToCalibrate, OutputVariables, Format="Guess"): + "To store description of model" + self.__model["SourceName"] = str(SourceName) # Model command file + self.__model["VariablesToCalibrate"] = tuple(VariablesToCalibrate) # Name of variables + self.__model["OutputVariables"] = tuple(OutputVariables) # Name of variables + self.__model["Format"] = str(Format) # File or model format + if os.path.isfile(SourceName): + self.__model["Verbose_printing_name"] = os.path.basename(SourceName) + elif os.path.isdir(SourceName): + if self.__model["Format"] == "DYMOSIM": + self.__model["Verbose_printing_name"] = "dsin.txt + dymosim.exe in " + os.path.basename(SourceName) + elif self.__model["Format"] == "FMI" or self.__model["Format"] == "FMU": + self.__model["Verbose_printing_name"] = [files for files in os.listdir(os.path.abspath(SourceName)) if files[-4:] =='.fmu'][0] + elif self.__model["Format"] == "OPENMODELICA": + xml_file_name = [files for files in os.listdir(os.path.abspath(SourceName)) if files[-4:] =='.xml'][0] + exe_file_name = xml_file_name[:-9] + '.exe' + json_file_name = xml_file_name[:-9] + '_info.json' + if os.path.exists(os.path.join(os.path.abspath(SourceName),json_file_name)): + self.__model["Verbose_printing_name"] = xml_file_name + ' ' + exe_file_name + ' ' + json_file_name + else: + self.__model["Verbose_printing_name"] = xml_file_name + ' ' + exe_file_name + ' ' + else: + print(" [VERBOSE] Model described: ", "/!\\ Unknown ModelFormat /!\\ ") + exit() + if self.__verbose: print(" [VERBOSE] Model described: ", self.__model["Verbose_printing_name"] ) + return self.__model + + def describeLink(self, MeasuresNames, ModelName, LinkName, LinkVariable, Format="Guess"): + "To store description of link between measures and model input data" + self.__link["SourceName"] = str(LinkName) # File or link name + self.__link["MeasuresNames"] = tuple(MeasuresNames) # List of file or name + self.__link["ModelName"] = tuple(ModelName) # File or model name + self.__link["Variable"] = str(LinkVariable) # Name of variable names column + self.__link["Format"] = str(Format) # File or model format + if self.__verbose: print(" [VERBOSE] Link between model and multiple measures described") + return self.__link + + def describeMethod(self, SourceName, MethodFormat="PY", BackgroundName=None, BackgroundFormat="Guess"): + "To store description of calibration method" + self.__method["SourceName"] = str(SourceName) # File or method name + self.__method["Format"] = str(MethodFormat) # File or model format + self.__method["BackgroundName"] = str(BackgroundName) # File or background name + self.__method["BackgroundFormat"] = str(BackgroundFormat) # File or model format + if self.__verbose: print(" [VERBOSE] Optimization method settings described") + return self.__method + + def describeResult(self, ResultName, Level=None, Format="TXT"): + "To store description of results" + self.__result["SourceName"] = str(ResultName) # File or method name + self.__result["Level"] = str(Level) # Level = "Final", "IntermediaryFinal" + self.__result["Format"] = str(Format) # File or model format + if self.__verbose: print(" [VERBOSE] Results settings described") + return self.__result + + def calibrate(self, + ModelName = None, # Model command file + ModelFormat = "Guess", + DataName = None, # Measures + DataFormat = "Guess", + BackgroundName = None, # Background + BackgroundFormat = "Guess", + MethodName = None, # Assimilation + MethodFormat = "Guess", + VariablesToCalibrate = None, + OutputVariables = None, + ResultName = "results.txt", # Results + ResultFormat = "Guess", + ResultLevel = None, + Verbose = False, + ): + """ + General method to set and realize a calibration (optimal estimation + task) of model parameters using measures data. + """ + if Verbose: + self.__verbose = True + print(" [VERBOSE] Verbose mode activated") + else: + self.__verbose = False + if DataName is not None: + # On force l'utilisateur à nommer dans son fichier de mesures + # les variables avec le même nom que des sorties dans le modèle + self.describeMeasures(DataName, OutputVariables, DataFormat) + if ModelName is not None: + self.describeModel(ModelName, VariablesToCalibrate, OutputVariables, ModelFormat) + if MethodName is not None: + self.describeMethod(MethodName, MethodFormat, BackgroundName, BackgroundFormat) + if ResultName is not None: + self.describeResult(ResultName, ResultLevel, ResultFormat) + # + ONames, Observations = _readMeasures( + self.__measures["SourceName"], + self.__measures["Variables"], + self.__measures["Format"], + ) + Algo, Params, CovB, CovR = _readMethod( + self.__method["SourceName"], + self.__method["Format"], + ) + __bgfile = None + if self.__method["BackgroundFormat"] == "DSIN": __bgfile = self.__model["SourceName"] + if self.__method["BackgroundFormat"] == "ADAO": __bgfile = self.__method["SourceName"] + if self.__method["BackgroundFormat"] == "USER": __bgfile = self.__method["BackgroundName"] + if self.__method["BackgroundFormat"] == "Guess": + if self.__method["BackgroundName"] is not None: __bgfile = self.__method["BackgroundName"] + if self.__method["SourceName"] is not None: __bgfile = self.__method["SourceName"] + if self.__model["SourceName"] is not None: __bgfile = self.__model["SourceName"] + BNames, Background, Bounds = _readBackground( + __bgfile, + self.__model["VariablesToCalibrate"], + self.__method["BackgroundFormat"], + ) + if "Bounds" not in Params: # On force la priorité aux bornes utilisateur + Params["Bounds"] = Bounds + if self.__verbose: + print(" [VERBOSE] Measures read") + print(" [VERBOSE] Optimization information read") + if "MaximumNumberOfSteps" in Params: + print(" [VERBOSE] Maximum possible number of iteration:",Params['MaximumNumberOfSteps']) + print(" [VERBOSE] Background read:",Background) + print(" [VERBOSE] Bounds read:",Params['Bounds']) + # + if "DifferentialIncrement" not in Params: + Params["DifferentialIncrement"] = 0.001 + if "StoreSupplementaryCalculations" not in Params: + Params["StoreSupplementaryCalculations"] = ["SimulatedObservationAtOptimum",] + if "StoreSupplementaryCalculations" in Params and \ + "SimulatedObservationAtOptimum" not in Params["StoreSupplementaryCalculations"]: + Params["StoreSupplementaryCalculations"].append("SimulatedObservationAtOptimum") + if self.__verbose and "StoreSupplementaryCalculations" in Params and \ + "CurrentOptimum" not in Params["StoreSupplementaryCalculations"]: + Params["StoreSupplementaryCalculations"].append("CurrentOptimum") + if self.__verbose and "StoreSupplementaryCalculations" in Params and \ + "CostFunctionJAtCurrentOptimum" not in Params["StoreSupplementaryCalculations"]: + Params["StoreSupplementaryCalculations"].append("CostFunctionJAtCurrentOptimum") + # + def exedymosim( x_values_matrix ): + "Appel du modèle et restitution des résultats" + from buildingspy.io.outputfile import Reader + from pst4mod.modelica_libraries.automatic_simulation import Automatic_Simulation + # + # x_values = list(numpy.ravel(x_values_matrix) * Background) + x_values = list(numpy.ravel(x_values_matrix)) + dict_inputs=dict(zip(VariablesToCalibrate,x_values)) + # if Verbose: print(" Simulation for %s"%numpy.ravel(x_values)) + # + simudir = self._setModelTmpDir(ModelName, ModelFormat) + auto_simul = Automatic_Simulation( + simu_dir=simudir, + dict_inputs=dict_inputs, + logfile=True, + timeout=60, + without_modelicares=True) #linux=true is removed + auto_simul.single_simulation() + reader = Reader(os.path.join(simudir,'dsres.mat'),'dymola') + y_values = [reader.values(y_name)[1][-1] for y_name in OutputVariables] + y_values_matrix = numpy.asarray(y_values) + if not Verbose: + shutil.rmtree(simudir, ignore_errors=True) + return y_values_matrix + # + def exepython( x_values_matrix ): + "Appel du modèle et restitution des résultats" + # x_values = list(numpy.ravel(x_values_matrix)) + # dict_inputs=dict(zip(VariablesToCalibrate,x_values)) + # + simudir = self._setModelTmpDir(ModelName, ModelFormat) + auto_simul = Python_Simulation( + simu_dir=simudir, + val_inputs=x_values_matrix, + logfile=True, + timeout=60, + verbose=self.__verbose) + y_values_matrix = auto_simul.single_simulation() + if not Verbose: + shutil.rmtree(simudir, ignore_errors=True) + return y_values_matrix + # + if self.__model["Format"].upper() in ["DYMOSIM", "GUESS"]: + simulateur = exedymosim + elif self.__model["Format"].upper() == "PYSIM": + simulateur = exepython + else: + raise ValueError("Unknown model format \"%s\""%self.__model["Format"].upper()) + # + __adaocase = adaoBuilder.New() + __adaocase.set( 'AlgorithmParameters', Algorithm=Algo, Parameters=Params ) + __adaocase.set( 'Background', Vector=Background) + __adaocase.set( 'Observation', Vector=Observations ) + if type(CovB) is float: + __adaocase.set( 'BackgroundError', ScalarSparseMatrix=CovB ) + else: + __adaocase.set( 'BackgroundError', Matrix=CovB ) + if type(CovR) is float: + __adaocase.set( 'ObservationError', ScalarSparseMatrix=CovR ) + else: + __adaocase.set( 'ObservationError', Matrix=CovR ) + __adaocase.set( 'ObservationOperator', OneFunction=simulateur, Parameters = {"DifferentialIncrement":Params["DifferentialIncrement"]} ) + if self.__verbose: + __adaocase.set( 'Observer', Variable="CostFunctionJAtCurrentOptimum", String="print('\\n -----------------------------------\\n [VERBOSE] Current iteration: %i'%(len(var),))" ) + __adaocase.set( 'Observer', Variable="CostFunctionJAtCurrentOptimum", Template="ValuePrinter", Info="\n [VERBOSE] Current best cost value:" ) + __adaocase.set( 'Observer', Variable="CurrentOptimum", Template="ValuePrinter", Info="\n [VERBOSE] Current optimal state..:" ) + __adaocase.execute() + # + __resultats = dict( + InitialParameters = numpy.asarray(Background), + OptimalParameters = __adaocase.get("Analysis")[-1], # * Background, + OptimalSimulation = __adaocase.get("SimulatedObservationAtOptimum")[-1], + Measures = Observations, + NamesOfParameters = BNames, + NamesOfMeasures = ONames, + ) + if self.__verbose: print("") + del __adaocase + # + _saveResults( + __resultats, + self.__result["SourceName"], + self.__result["Level"], + self.__result["Format"], + self.__verbose, + ) + # + return __resultats + + + + def calibrateMultiObs(self, + ModelName = None, # Model command file + ModelFormat = "Guess", + DataNames = None, # Multiple measures + DataFormat = "Guess", + LinkName = None, # CL with data names + LinkVariable = "Variable", # Name of variable names column + LinkFormat = "Guess", + BackgroundName = None, # Background + BackgroundFormat = "Guess", + MethodName = None, # Assimilation + MethodFormat = "Guess", + VariablesToCalibrate = None, + OutputVariables = None, + ResultName = "results.txt", # Results + ResultFormat = "Guess", + ResultLevel = None, + Verbose = False, + IntermediateInformation = False, + ComplexModel = False, + FMUInput = None, + NeedInitVariables = False, + KeepCalculationFolders = False, + Linux = False, + CreateOptimaldsin = False, + InitialSimulation = False, + VerifyFunction = False, + GlobalSensitivityCheck = False, + ParamSensitivityCheck = False, + CreateCSVoutputResults = False, + AdaptObservationError = False, + ResultsSummary = False, + AdvancedDebugModel = False, + TimeoutModelExecution = 300, + ModelStabilityCheck = False, + ModelStabilityCheckingPoints = 10 + ): + """ + General method to set and realize a calibration (optimal estimation + task) of model parameters using multiple measures data, with an + explicit link between measures and data for the model. + """ + if Verbose: + self.__verbose = True + print(" [VERBOSE] Verbose mode activated") + if IntermediateInformation: + if self.__verbose: + self.__IntermediateInformation = True + print(" [VERBOSE] IntermediateInformation provided") + else: + self.__IntermediateInformation = False + print(" [VERBOSE] IntermediateInformation requires Verbose mode activated") + else: + self.__IntermediateInformation = False + + if TimeoutModelExecution < 0: + raise ValueError("TimeoutModelExecution must be positive") + + if DataNames is not None: + # On force l'utilisateur à nommer dans son fichier de mesures + # les variables avec le même nom que des sorties dans le modèle + self.describeMultiMeasures(DataNames, OutputVariables, DataFormat) + # + if self.__verbose: + print(" [VERBOSE] Timeoout for model execution is:", TimeoutModelExecution, "seconds") + # + if ModelFormat is not None: #included sot that the model format is allways in upper format + ModelFormat = ModelFormat.upper() + if ModelFormat == "DYMOLA": #to handle situations in which name Dymola is indicated + ModelFormat = "DYMOSIM" + if ModelName is not None: + self.describeModel(ModelName, VariablesToCalibrate, OutputVariables, ModelFormat) + if LinkName is not None: + # On force l'utilisateur à utiliser les fichiers de mesures requis + # et non pas tous ceux présents dans le Link + self.describeLink(DataNames, ModelName, LinkName, LinkVariable, LinkFormat) + if MethodName is not None: + self.describeMethod(MethodName, MethodFormat, BackgroundName, BackgroundFormat) + if ResultName is not None: + self.describeResult(ResultName, ResultLevel, ResultFormat) + # + #handling of the new option for complex models while keeping the previous key word in the code + if ComplexModel: + print(" The option ComplexModel is deprecated and has to be replaced by NeedInitVariables. Please update your code.") + NeedInitVariables = ComplexModel + ComplexModel = NeedInitVariables + # + + ONames, Observations = _readMultiMeasures( + self.__measures["SourceName"], + self.__measures["Variables"], + self.__measures["Format"], + ) + + + Algo, Params, CovB, CovR = _readMethod( + self.__method["SourceName"], + self.__method["Format"], + ) + __bgfile = None + if self.__method["BackgroundFormat"] == "DSIN": __bgfile = self.__model["SourceName"] + if self.__method["BackgroundFormat"] == "ADAO": __bgfile = self.__method["SourceName"] + if self.__method["BackgroundFormat"] == "USER": __bgfile = self.__method["BackgroundName"] + if self.__method["BackgroundFormat"] == "Guess": + if self.__method["BackgroundName"] is not None: __bgfile = self.__method["BackgroundName"] + if self.__method["SourceName"] is not None: __bgfile = self.__method["SourceName"] + if self.__model["SourceName"] is not None: __bgfile = self.__model["SourceName"] + BNames, Background, Bounds = _readBackground( + __bgfile, + self.__model["VariablesToCalibrate"], + self.__method["BackgroundFormat"], + ) + if "Bounds" not in Params: # On force la priorité aux bornes utilisateur + Params["Bounds"] = Bounds + LNames, LColumns, LVariablesToChange = _readLink( + self.__link["SourceName"], + self.__link["MeasuresNames"], + self.__link["Variable"], + self.__link["Format"], + ) + if self.__verbose: + print(" [VERBOSE] Measures read") + print(" [VERBOSE] Links read") + print(" [VERBOSE] Optimization information read") + print(" [VERBOSE] Background read:",Background) + print(" [VERBOSE] Bounds read:",Params['Bounds']) + # + if "DifferentialIncrement" not in Params: + Params["DifferentialIncrement"] = 0.001 + if "EnableMultiProcessing" not in Params: + Params["EnableMultiProcessing"] = 0 + if "NumberOfProcesses" not in Params: + Params["NumberOfProcesses"] = 0 + if "StoreSupplementaryCalculations" not in Params: + Params["StoreSupplementaryCalculations"] = ["SimulatedObservationAtOptimum",] + if "StoreSupplementaryCalculations" in Params and \ + "SimulatedObservationAtOptimum" not in Params["StoreSupplementaryCalculations"]: + Params["StoreSupplementaryCalculations"].append("SimulatedObservationAtOptimum") + if self.__verbose and "StoreSupplementaryCalculations" in Params and \ + "CurrentOptimum" not in Params["StoreSupplementaryCalculations"]: + Params["StoreSupplementaryCalculations"].append("CurrentOptimum") + if self.__verbose and "StoreSupplementaryCalculations" in Params and \ + "CostFunctionJAtCurrentOptimum" not in Params["StoreSupplementaryCalculations"]: + Params["StoreSupplementaryCalculations"].append("CostFunctionJAtCurrentOptimum") + + dict_dsin_path_ref ={} #creation of the dict_dsin_ref (for storing the path of the dsfinal file for each dataset + # + + VariablesToCalibrate = list(BNames) + + def exedymosimMultiobs_REF_CALCULATION( x_values_matrix, dict_dsin_paths = dict_dsin_path_ref): #2ème argument à ne pas modifier + "Appel du modèle et restitution des résultats" + + x_values = list(numpy.ravel(x_values_matrix)) + x_inputs = dict(zip(VariablesToCalibrate,x_values)) + dict_inputs_for_CWP = {} + +# multi_y_values_matrix = [] + for etat in range(len(LNames)): + simudir = self._setModelTmpDir_REF_CALCULATION(ModelName, ModelFormat, LNames[etat]) + + try: #to handle situations in which there is only one CL (boundary conditions) file + var_int = numpy.ravel(LColumns[:,etat]) + except : + var_int = numpy.ravel(LColumns) + dict_inputs = dict(zip(LVariablesToChange, var_int)) + + dict_inputs.update( x_inputs ) + + auto_simul = Automatic_Simulation( + simu_dir=simudir, + dymosim_path = os.path.join(simudir, os.pardir, os.pardir) , + dsres_path = os.path.join(simudir, str("dsres" +LNames[etat]+ ".mat")), + dict_inputs=dict_inputs, + logfile=True, + timeout=TimeoutModelExecution, + without_modelicares=True, + linux = Linux) + auto_simul.single_simulation() + + + + if AdvancedDebugModel: + dslog_file = os.path.join(simudir, 'dslog.txt') + debug_model_file = os.path.join(simudir, os.pardir, 'log_debug_model.txt') + try: + shutil.copy(dslog_file,debug_model_file) + except: + pass + + if auto_simul.success_code == 2 : + raise ValueError("The simulation falis after initialization, please check the log file and your model in order to be sure that the whole simulation can be performed (specially for dynamic models)") + + if auto_simul.success_code == 3 : + raise ValueError("The simulation did not reach the final time, probably due to a lack of time, please increase the time for model execution by modifying the TimeoutModelExecution value in configuration.py") + + if auto_simul.success_code == 0: + + path_around_simu_no_CWP = os.path.join(simudir, os.pardir, os.pardir) + around_simu_no_CWP = Around_Simulation(dymola_version='2012', + curdir=path_around_simu_no_CWP, + source_list_iter_var = 'from_dymtranslog', + copy_from_dym_trans_log= os.path.join(path_around_simu_no_CWP,'ini.txt'), + mat = os.path.join(simudir, str("dsres" +LNames[etat]+ ".mat")), + iter_var_values_options={'source':'from_mat', 'moment':'initial'}, + verbose = False) + around_simu_no_CWP.set_list_iter_var() + around_simu_no_CWP.set_dict_iter_var() + + writer_no_CWP = write_in_dsin.Write_in_dsin(dict_inputs =around_simu_no_CWP.dict_iter_var,filedir=simudir,dsin_name='dsin.txt',old_file_name='old_dsin.txt',new_file_name=str("dsin_updated_" +LNames[etat]+ ".txt")) + writer_no_CWP.write_in_dsin + writer_no_CWP.write_in_dsin() + + dict_dsin_paths[LNames[etat]] = os.path.join(simudir,str("dsin_updated_" +LNames[etat]+ ".txt")) + + else: + from pst4mod.modelica_libraries.change_working_point import Dict_Var_To_Fix + from pst4mod.modelica_libraries.change_working_point import Working_Point_Modification + + path_around_simu = os.path.join(simudir,os.path.pardir,os.path.pardir) + + if not(os.path.exists(os.path.join(path_around_simu, str("ref" + ".mat")))): + auto_simul_ref = Automatic_Simulation( + simu_dir=path_around_simu, + dymosim_path = os.path.join(simudir, os.pardir, os.pardir) , + dict_inputs={}, + logfile=True, + dsres_path = os.path.join(path_around_simu, str("ref" + ".mat")), + timeout=TimeoutModelExecution, + without_modelicares=True, + linux = Linux) + auto_simul_ref.single_simulation() + + if not(auto_simul_ref.success_code == 0): + raise ValueError("A working dsin.txt file must be provided, provide a dsin.txt that makes it possible for the initial simulation to converge") + + temporary_files_to_remove = [os.path.join(path_around_simu,'dsfinal.txt'), + os.path.join(path_around_simu,'dsin_old.txt'), + os.path.join(path_around_simu,'dslog.txt'), + os.path.join(path_around_simu,'status'), + os.path.join(path_around_simu,'success') + ] + for path_to_remove in temporary_files_to_remove: + if os.path.exists(path_to_remove): + os.remove(path_to_remove) + + around_simu = Around_Simulation(dymola_version='2012', + curdir=path_around_simu, + source_list_iter_var = 'from_dymtranslog', + copy_from_dym_trans_log= os.path.join(path_around_simu,'ini.txt'), + mat = os.path.join(path_around_simu, str("ref" + ".mat")), + iter_var_values_options={'source':'from_mat'}, + verbose = False) + around_simu.set_list_iter_var() + around_simu.set_dict_iter_var() + + reader_CWP = Reader(os.path.join(path_around_simu, str("ref" + ".mat")),'dymola') + x_values_intemediary = [reader_CWP.values(x_name)[1][-1] for x_name in VariablesToCalibrate] + x_values_from_ref_calculation = numpy.asarray(x_values_intemediary) + x_inputs_from_ref_calculation = dict(zip(VariablesToCalibrate,x_values_from_ref_calculation)) + + cl_values_intemediary = [reader_CWP.values(cl_name)[1][-1] for cl_name in LVariablesToChange] + cl_values_from_ref_calculation = numpy.asarray(cl_values_intemediary) + cl_inputs_from_ref_calculation = dict(zip(LVariablesToChange,cl_values_from_ref_calculation)) + + + for var_calib in x_inputs_from_ref_calculation.keys(): + dict_inputs_for_CWP[var_calib] = (x_inputs_from_ref_calculation[var_calib], x_inputs[var_calib]) + + try: #to handle situations in which there is only one CL (boundary conditions) file + var_int = numpy.ravel(LColumns[:,etat]) + except : + var_int = numpy.ravel(LColumns) + + dict_inputs = dict(zip(LVariablesToChange, var_int)) + + for var_cl in cl_inputs_from_ref_calculation.keys(): + dict_inputs_for_CWP[var_cl] = (cl_inputs_from_ref_calculation[var_cl], dict_inputs[var_cl]) + + dict_var_to_fix2 = Dict_Var_To_Fix(option='automatic', + dict_auto_var_to_fix=dict_inputs_for_CWP) + + dict_var_to_fix2.set_dict_var_to_fix() + + if Verbose: + LOG_FILENAME = os.path.join(simudir,'change_working_point.log') + for handler in logging.root.handlers[:]: + logging.root.removeHandler(handler) + logging.basicConfig(filename=LOG_FILENAME,level=logging.DEBUG,filemode='w') + + working_point_modif = Working_Point_Modification(main_dir = simudir, + simu_material_dir = simudir, + dymosim_path = os.path.join(simudir, os.pardir, os.pardir), + simu_dir = 'SIMUS', + store_res_dir = 'RES', + dict_var_to_fix = dict_var_to_fix2.dict_var_to_fix, + around_simulation0 = around_simu, + var_to_follow_path= os.path.join(simudir,'var_to_follow.csv'), + gen_scripts_ini= False, + nit_max= 1000000000000000, + min_step_val = 0.00000000000005, + timeout = TimeoutModelExecution, + linux = Linux) + + working_point_modif.create_working_directory() + working_point_modif.working_point_modification(skip_reference_simulation=True) + + if AdvancedDebugModel: + dslog_file = os.path.join(simudir, 'SIMUS', 'dslog.txt') + debug_model_file = os.path.join(simudir, os.pardir, 'log_debug_model.txt') + try: + shutil.copy(dslog_file,debug_model_file) + except: + pass + + if not(os.path.exists(os.path.join(simudir, 'RES','1.0.mat'))): + raise ValueError(str("Simulation with Background values does not converge automatically, try to give a dsin.txt file that makes it possible to run a simulation with boundary conditions of " + LNames[etat])) + + shutil.copy( + os.path.join(simudir, 'RES','1.0.mat'), + os.path.join(simudir, str("dsres" +LNames[etat]+ ".mat")) + ) + + path_around_simu_after_CWP = os.path.join(simudir,os.path.pardir,os.path.pardir) + around_simu_after_CWP = Around_Simulation(dymola_version='2012', + curdir=path_around_simu_after_CWP, + source_list_iter_var = 'from_dymtranslog', + copy_from_dym_trans_log= os.path.join(path_around_simu_after_CWP,'ini.txt'), + mat = os.path.join(simudir, 'RES','1.0.mat'), #Le fichier .mat qui vient d'être créé et pour lequel on atteint les valeurs cibles de la variable + iter_var_values_options={'source':'from_mat', 'moment':'initial'}, + verbose = False) + around_simu_after_CWP.set_list_iter_var() + around_simu_after_CWP.set_dict_iter_var() + + writer_after_CWP = write_in_dsin.Write_in_dsin(dict_inputs =around_simu_after_CWP.dict_iter_var,filedir=os.path.join(simudir, 'SIMUS'),dsin_name='dsin.txt',old_file_name='old_dsin.txt',new_file_name='dsin_updated.txt') + writer_after_CWP.write_in_dsin + writer_after_CWP.write_in_dsin() + + shutil.copy( + os.path.join(simudir, 'SIMUS','dsin_updated.txt'), + os.path.join(simudir, str("dsin_updated_" +LNames[etat]+ ".txt")) + ) + dict_dsin_paths[LNames[etat]] = os.path.join(simudir,str("dsin_updated_" +LNames[etat]+ ".txt")) + + os.rename( + os.path.join(os.path.dirname(dict_dsin_paths[LNames[etat]]), str("dsres" +LNames[etat]+ ".mat")), + os.path.join(os.path.dirname(dict_dsin_paths[LNames[etat]]), str("REF_for_CWP" + ".mat")) + ) + + return + + + + def preparation_exedymosimMultiobs_simple(): + from pst4mod.modelica_libraries import write_in_dsin + + for etat in range(len(LNames)): + simudir = self._setModelTmpDir_simple(ModelName, ModelFormat, str("dsin_" +LNames[etat]+ ".txt")) + # + try: #to handle situations in which there is only one CL (boundary conditions) file + var_int = numpy.ravel(LColumns[:,etat]) + except : + var_int = numpy.ravel(LColumns) + dict_inputs = dict(zip(LVariablesToChange, var_int)) + writer_no_CWP = write_in_dsin.Write_in_dsin(dict_inputs = dict_inputs, filedir=simudir, dsin_name=str("dsin_" +LNames[etat]+ ".txt"), old_file_name='old_dsin.txt',new_file_name = str("dsin_" +LNames[etat]+ ".txt")) + writer_no_CWP.write_in_dsin + writer_no_CWP.write_in_dsin() + return + + def preparation_exefmu_om_Multiobs(): + self._setModelTmpDir_simple(ModelName, ModelFormat, ModelName) + + def exepythonMultiobs( x_values_matrix ): + "Appel du modèle et restitution des résultats" + # + x_values = list(numpy.ravel(x_values_matrix)) + x_inputs = dict(zip(VariablesToCalibrate,x_values)) + # + multi_y_values_matrix = [] + for etat in range(len(LNames)): + simudir = self._setModelTmpDir(ModelName, ModelFormat) + # + dict_inputs = dict(zip(LVariablesToChange, numpy.ravel(LColumns[:,etat]))) + dict_inputs.update( x_inputs ) + # + auto_simul = Python_Simulation( + simu_dir=simudir, + val_inputs=x_values_matrix, + logfile=True, + timeout=TimeoutModelExecution, + verbose=self.__verbose) + y_values_matrix = auto_simul.single_simulation() + multi_y_values_matrix.append( y_values_matrix ) + if not Verbose: + shutil.rmtree(simudir, ignore_errors=True) + y_values_matrix = numpy.ravel(numpy.array(multi_y_values_matrix)) + return y_values_matrix + # + + + #Test for output variables - not repetition - This test must be before the redefinition of OutputVariables + OutputVariables_set = set(OutputVariables) + if len(OutputVariables) != len(OutputVariables_set): + diff_number_values = len(OutputVariables) - len(OutputVariables_set) + raise ValueError("There are " + str(diff_number_values) + " repeated output variables in the definition of OutputVariables, please remove repeated names.") + + #Handle several times same obs -START + OutputVariables_new_tmp = [] + + whole_obs_in_fileobs = readObsnamesfile(self.__measures["SourceName"]) + + for outputname in OutputVariables: + if outputname not in whole_obs_in_fileobs: + print(" [VERBOSE] /!\\ ", outputname," is not defined in the measurements file") + for obsname in whole_obs_in_fileobs: + if obsname in OutputVariables: + OutputVariables_new_tmp.append(obsname) + + OutputVariables_new = [] + for obs_name in OutputVariables: #to get the same order as for measures + count_obs_name = OutputVariables_new_tmp.count(obs_name) + for i in range(count_obs_name): + OutputVariables_new.append(obs_name) + OutputVariables = OutputVariables_new + + ONames = OutputVariables + #Handle several times same obs - END + + + ODates,OHours = readMultiDatesHours(self.__measures["SourceName"]) + + List_Multideltatime =[] + + for etat in range(len(ODates)): + time_initial = datetime.strptime(ODates[etat][0]+ ' ' + OHours[etat][0], '%d/%m/%Y %H:%M:%S') + list_deltatime = [] + + if len(ODates[etat])>1: + list_deltatime.append(0) + for index_tmp in range(len(ODates[etat])-1): #the first date is considered as reference + time_index_tmp = datetime.strptime(ODates[etat][index_tmp+1] + ' ' + OHours[etat][index_tmp+1], '%d/%m/%Y %H:%M:%S') + list_deltatime.append((time_index_tmp-time_initial).total_seconds()) + + if (time_index_tmp-time_initial).total_seconds() < 0: + raise ValueError('The initial date is not chronological for state %s'%str(etat)) + + List_Multideltatime.append(list_deltatime) + + + #Bricolage Debut + Observations_new =[] + number_observations_by_cl = [] #to be used in the test function, number of measurements by boundary condition (i.e. number of lines in the measurements file) + + for etat in range(len(ODates)): #correspond au nombre de conditions aux limites + Obs_etat = Observations[etat].tolist() + list_obs_etat = [] + for obs in range(len(list(ONames))): + if len(ODates[etat])==1: #cas classique, statique + list_obs_etat = Obs_etat + if not(isinstance(list_obs_etat, list)): + list_obs_etat = [list_obs_etat] + + else: + for time_etat in range(len(Obs_etat)): + if not(isinstance(Obs_etat[time_etat], list)): #une seule grandeur observée + list_obs_etat.append(Obs_etat[time_etat]) + else: + list_obs_etat.append(Obs_etat[time_etat][obs]) + + #EXTRA for verify tests - START + if len(ODates[etat])==1: + number_observations_by_cl.append(1) #in this case only one line in the measurements file, this is just to be used as input of the test functions + else: + number_observations_by_cl.append(len(Obs_etat)) #we get the number of lines of the measurements file, this is just to be used as input of the test functions + #EXTRA for verify tests - END + + Observations_new = Observations_new + list_obs_etat + Observations = Observations_new + #Bricolage Fin + + + + + def prev_adao_version_TOP_LEVEL_exedymosimMultiobs( x_values_matrix): + y_values_matrix = TOP_LEVEL_exedymosimMultiobs( x_values_matrix=x_values_matrix, VariablesToCalibrate=VariablesToCalibrate, OutputVariables=OutputVariables, LNames = LNames, ModelFormat = ModelFormat, KeepCalculationFolders = KeepCalculationFolders, Verbose = Verbose, dict_dsin_paths = dict_dsin_path_ref, Linux = Linux, List_Multideltatime = List_Multideltatime, AdvancedDebugModel = AdvancedDebugModel, TimeoutModelExecution = TimeoutModelExecution) + return y_values_matrix + + def prev_adao_version_TOPLEVEL_exedymosimMultiobs_simple(x_values_matrix): + y_values_matrix = TOPLEVEL_exedymosimMultiobs_simple(x_values_matrix = x_values_matrix, VariablesToCalibrate=VariablesToCalibrate, OutputVariables=OutputVariables, LNames = LNames, ref_simudir = self._get_Name_ModelTmpDir_simple(ModelName), KeepCalculationFolders = KeepCalculationFolders, Linux = Linux, List_Multideltatime = List_Multideltatime, AdvancedDebugModel = AdvancedDebugModel, TimeoutModelExecution = TimeoutModelExecution) + return y_values_matrix + + if self.__model["Format"].upper() in ["DYMOSIM", "GUESS"]: + + if adao.version[:5] < '9.7.0' and adao.version[5]=='.': #to handle differences in the way directoperator is mangaged after modifications brought to direct operator + if ComplexModel: + simulateur = prev_adao_version_TOP_LEVEL_exedymosimMultiobs + exedymosimMultiobs_REF_CALCULATION(Background) + else: + simulateur = prev_adao_version_TOPLEVEL_exedymosimMultiobs_simple + preparation_exedymosimMultiobs_simple() + + else: + if ComplexModel: + simulateur = TOP_LEVEL_exedymosimMultiobs #to handle parallel computing + exedymosimMultiobs_REF_CALCULATION(Background) + else: + simulateur = TOPLEVEL_exedymosimMultiobs_simple #to handle parallel computing + preparation_exedymosimMultiobs_simple() + + elif self.__model["Format"].upper() == "PYSIM": + simulateur = exepythonMultiobs + elif self.__model["Format"].upper() == "FMI" or self.__model["Format"].upper() == "FMU": + simulateur = TOP_LEVEL_exefmuMultiobs + preparation_exefmu_om_Multiobs() + elif self.__model["Format"].upper() == "OPENMODELICA": + preparation_exefmu_om_Multiobs() + simulateur = TOP_LEVEL_exeOpenModelicaMultiobs + + else: + raise ValueError("Unknown model format \"%s\""%self.__model["Format"].upper()) + + def verify_function(x_values, model_format): + + + simulation_results_all = [] + list_simulation_time = [] + for i in list(range(3)): #we consider only three iterations + start_time_verify = time.time() + if model_format in ["DYMOSIM"]: + if ComplexModel: + simulation_results = prev_adao_version_TOP_LEVEL_exedymosimMultiobs(x_values) + else: + simulation_results = prev_adao_version_TOPLEVEL_exedymosimMultiobs_simple(x_values) + elif model_format in ["FMI","FMU"]: + simulation_results = TOP_LEVEL_exefmuMultiobs(x_values, VariablesToCalibrate=VariablesToCalibrate, OutputVariables=OutputVariables, LNames = LNames, LColumns = LColumns, LVariablesToChange=LVariablesToChange, ref_simudir = self._get_Name_ModelTmpDir_simple(ModelName), ModelName = ModelName, List_Multideltatime = List_Multideltatime, AdvancedDebugModel = AdvancedDebugModel, TimeoutModelExecution = TimeoutModelExecution, FMUInput = FMUInput) + elif model_format in ["OPENMODELICA"]: + simulation_results = TOP_LEVEL_exeOpenModelicaMultiobs(x_values, KeepCalculationFolders = KeepCalculationFolders, VariablesToCalibrate=VariablesToCalibrate, OutputVariables=OutputVariables, LNames = LNames, LColumns = LColumns, LVariablesToChange=LVariablesToChange, ref_simudir = self._get_Name_ModelTmpDir_simple(ModelName), ModelName = ModelName, List_Multideltatime = List_Multideltatime, Linux = Linux, AdvancedDebugModel = AdvancedDebugModel, TimeoutModelExecution = TimeoutModelExecution) + else: + raise NotImplementedError("Not yet implemented for current model format: ", model_format) + end_time_verify = time.time() + + list_simulation_time.append(round(end_time_verify - start_time_verify,3)) + simulation_results_all.append(simulation_results) + + + elapsed_time = round(sum(list_simulation_time)/len(list_simulation_time),3) + #not necessary, already in good format + # numpy.set_printoptions(precision=2,linewidth=5000, threshold = 10000) + # reshaped_simulation_results_all = numpy.array(simulation_results_all).reshape((len(OutputVariables),-1)).transpose() + + # numpy.set_printoptions(precision=2,linewidth=5000) + + list_check_output = [] + list_false_output = [] + simulation_results_all_one_boundary_condition =[] + + for simulation_output in simulation_results_all: #iteration over the three simulations performed + # THIS DOES NOT WORK IF OBSERVATIONS FILES ARE OF DIFFERENT SIZE# simultation_ouput_reshaped = simulation_output.reshape((len(LNames),-1))[0] #consideration of only one set of boundary conditions (otherwise outputs could be modified by boundary condition modifications) + total_number_measurements_first_simulation = number_observations_by_cl[0]*len(OutputVariables) + first_simulation_results = simulation_output[:total_number_measurements_first_simulation] + simulation_results_all_one_boundary_condition.append(first_simulation_results) + + for index_line_time_simulation in range(len(simulation_results_all_one_boundary_condition[0].reshape(len(OutputVariables),-1).transpose())): #iteration over each time results individually + for i in range(len(OutputVariables)): + if (simulation_results_all_one_boundary_condition[0].reshape(len(OutputVariables),-1).transpose()[index_line_time_simulation][i] == simulation_results_all_one_boundary_condition[1].reshape(len(OutputVariables),-1).transpose()[index_line_time_simulation][i]) & (simulation_results_all_one_boundary_condition[0].reshape(len(OutputVariables),-1).transpose()[index_line_time_simulation][i] == simulation_results_all_one_boundary_condition[2].reshape(len(OutputVariables),-1).transpose()[index_line_time_simulation][i]): + list_check_output.append("True") + + else: + list_check_output.append("False") + if OutputVariables[i] in list_false_output: #problematic variable already included + pass + else: + list_false_output.append(OutputVariables[i]) + + if "False" in list_check_output: + verify_function_result = "False" + else: + verify_function_result = "True" + return verify_function_result, list_false_output, elapsed_time + + def verify_function_sensitivity(x_values, increment, model_format): + + if model_format in ["DYMOSIM"]: + if ComplexModel: + simulation_results = prev_adao_version_TOP_LEVEL_exedymosimMultiobs(x_values) + else: + simulation_results = prev_adao_version_TOPLEVEL_exedymosimMultiobs_simple(x_values) + elif model_format in ["FMI","FMU"]: + simulation_results = TOP_LEVEL_exefmuMultiobs(x_values, VariablesToCalibrate=VariablesToCalibrate, OutputVariables=OutputVariables, LNames = LNames, LColumns = LColumns, LVariablesToChange=LVariablesToChange, ref_simudir = self._get_Name_ModelTmpDir_simple(ModelName), ModelName = ModelName, List_Multideltatime = List_Multideltatime, AdvancedDebugModel = AdvancedDebugModel, TimeoutModelExecution = TimeoutModelExecution, FMUInput = FMUInput) + elif model_format in ["OPENMODELICA"]: + simulation_results = TOP_LEVEL_exeOpenModelicaMultiobs(x_values, KeepCalculationFolders = KeepCalculationFolders, VariablesToCalibrate=VariablesToCalibrate, OutputVariables=OutputVariables, LNames = LNames, LColumns = LColumns, LVariablesToChange=LVariablesToChange, ref_simudir = self._get_Name_ModelTmpDir_simple(ModelName), ModelName = ModelName, List_Multideltatime = List_Multideltatime, Linux = Linux, AdvancedDebugModel = AdvancedDebugModel, TimeoutModelExecution = TimeoutModelExecution) + else: + raise NotImplementedError("Not yet implemented for current model format: ", model_format) + + + x_values_modif = [x*(1+increment) for x in x_values] + + if model_format in ["DYMOSIM"]: + if ComplexModel: + simulation_results_modif = prev_adao_version_TOP_LEVEL_exedymosimMultiobs(x_values_modif) + else: + simulation_results_modif = prev_adao_version_TOPLEVEL_exedymosimMultiobs_simple(x_values_modif) + elif model_format in ["FMI","FMU"]: + simulation_results_modif = TOP_LEVEL_exefmuMultiobs(x_values_modif, VariablesToCalibrate=VariablesToCalibrate, OutputVariables=OutputVariables, LNames = LNames, LColumns = LColumns, LVariablesToChange=LVariablesToChange, ref_simudir = self._get_Name_ModelTmpDir_simple(ModelName), ModelName = ModelName, List_Multideltatime = List_Multideltatime, AdvancedDebugModel = AdvancedDebugModel, TimeoutModelExecution = TimeoutModelExecution, FMUInput = FMUInput) + elif model_format in ["OPENMODELICA"]: + simulation_results_modif = TOP_LEVEL_exeOpenModelicaMultiobs(x_values_modif, KeepCalculationFolders = KeepCalculationFolders, VariablesToCalibrate=VariablesToCalibrate, OutputVariables=OutputVariables, LNames = LNames, LColumns = LColumns, LVariablesToChange=LVariablesToChange, ref_simudir = self._get_Name_ModelTmpDir_simple(ModelName), ModelName = ModelName, List_Multideltatime = List_Multideltatime, Linux = Linux, AdvancedDebugModel = AdvancedDebugModel, TimeoutModelExecution = TimeoutModelExecution) + else: + raise NotImplementedError("Not yet implemented for current model format: ", model_format) + + list_check_output = ["False" for i in OutputVariables] #everything initialized as not good and then if something changes (at least one point), then it is OK + list_not_modified_variables =[] + + total_number_measurements_first_simulation = number_observations_by_cl[0]*len(OutputVariables) + + # THIS DOES NOT WORK IF OBSERVATIONS FILES ARE OF DIFFERENT SIZE# simulation_results_one_boundary_condition = simulation_results.reshape((len(LNames),-1))[0] + # THIS DOES NOT WORK IF OBSERVATIONS FILES ARE OF DIFFERENT SIZE# simulation_results_modif_one_boundary_condition = simulation_results_modif.reshape((len(LNames),-1))[0] + simulation_results_one_boundary_condition = simulation_results[:total_number_measurements_first_simulation] + simulation_results_modif_one_boundary_condition = simulation_results_modif[:total_number_measurements_first_simulation] + + for index_line_time_simulation in range(len(simulation_results_one_boundary_condition.reshape(len(OutputVariables),-1).transpose())): + + for i in range(len(OutputVariables)): + if simulation_results_one_boundary_condition.reshape(len(OutputVariables),-1).transpose()[index_line_time_simulation][i] != simulation_results_modif_one_boundary_condition.reshape(len(OutputVariables),-1).transpose()[index_line_time_simulation][i] : + list_check_output[i] = "True" + + + for i in range(len(OutputVariables)): + if list_check_output[i] == "False": #it means the variable has not changed + list_not_modified_variables.append(OutputVariables[i]) + + if "False" in list_check_output: + verify_function_sensitivity_result = "False" + else: + verify_function_sensitivity_result = "True" + + return verify_function_sensitivity_result, list_not_modified_variables + + def verify_function_sensitivity_parameter(x_values, increment, model_format): + + if model_format in ["DYMOSIM"]: + if ComplexModel: + simulation_results_ref = prev_adao_version_TOP_LEVEL_exedymosimMultiobs(x_values) + else: + simulation_results_ref = prev_adao_version_TOPLEVEL_exedymosimMultiobs_simple(x_values) + elif model_format in ["FMI","FMU"]: + simulation_results_ref = TOP_LEVEL_exefmuMultiobs(x_values, VariablesToCalibrate=VariablesToCalibrate, OutputVariables=OutputVariables, LNames = LNames, LColumns = LColumns, LVariablesToChange=LVariablesToChange, ref_simudir = self._get_Name_ModelTmpDir_simple(ModelName), ModelName = ModelName, List_Multideltatime = List_Multideltatime, AdvancedDebugModel = AdvancedDebugModel, TimeoutModelExecution = TimeoutModelExecution, FMUInput = FMUInput) + elif model_format in ["OPENMODELICA"]: + simulation_results_ref = TOP_LEVEL_exeOpenModelicaMultiobs(x_values, KeepCalculationFolders = KeepCalculationFolders, VariablesToCalibrate=VariablesToCalibrate, OutputVariables=OutputVariables, LNames = LNames, LColumns = LColumns, LVariablesToChange=LVariablesToChange, ref_simudir = self._get_Name_ModelTmpDir_simple(ModelName), ModelName = ModelName, List_Multideltatime = List_Multideltatime, Linux = Linux, AdvancedDebugModel = AdvancedDebugModel, TimeoutModelExecution = TimeoutModelExecution) + else: + raise NotImplementedError("Not yet implemented for current model format: ", model_format) + + list_param_with_no_impact = [] + Check_params_impact = "True" + + x_values_modif = [x*(1+increment) for x in x_values] + + + for param_index in range(len(x_values)): #the goal is to modify each param individually and check if it is possible to observe variations + x_values_param = list(x_values) #avoid memory surprises + x_values_param[param_index] = x_values_modif[param_index] + if model_format in ["DYMOSIM"]: + if ComplexModel: + simulation_results_param = prev_adao_version_TOP_LEVEL_exedymosimMultiobs(x_values_param) + else: + simulation_results_param = prev_adao_version_TOPLEVEL_exedymosimMultiobs_simple(x_values_param) + elif model_format in ["FMI","FMU"]: + simulation_results_param = TOP_LEVEL_exefmuMultiobs(x_values_param, VariablesToCalibrate=VariablesToCalibrate, OutputVariables=OutputVariables, LNames = LNames, LColumns = LColumns, LVariablesToChange=LVariablesToChange, ref_simudir = self._get_Name_ModelTmpDir_simple(ModelName), ModelName = ModelName, List_Multideltatime = List_Multideltatime, AdvancedDebugModel = AdvancedDebugModel, TimeoutModelExecution = TimeoutModelExecution, FMUInput = FMUInput) + elif model_format in ["OPENMODELICA"]: + simulation_results_param = TOP_LEVEL_exeOpenModelicaMultiobs(x_values_param, KeepCalculationFolders = KeepCalculationFolders, VariablesToCalibrate=VariablesToCalibrate, OutputVariables=OutputVariables, LNames = LNames, LColumns = LColumns, LVariablesToChange=LVariablesToChange, ref_simudir = self._get_Name_ModelTmpDir_simple(ModelName), ModelName = ModelName, List_Multideltatime = List_Multideltatime, Linux = Linux, AdvancedDebugModel = AdvancedDebugModel, TimeoutModelExecution = TimeoutModelExecution) + else: + raise NotImplementedError("Not yet implemented for current model format: ", model_format) + + if numpy.array_equal(simulation_results_param,simulation_results_ref): #there is not impact on the whole simulation + list_param_with_no_impact.append(VariablesToCalibrate[param_index]) + Check_params_impact = "False" + + return Check_params_impact, list_param_with_no_impact + + + def check_model_stability(x_values_names, background_values, bounds, number_of_tests, model_format): + + dict_check_model_stability = {} + for index_x in range(len(x_values_names)): + x_bg_tested = x_values_names[index_x] + + print(" [VERBOSE] Model stability check is being performed for the following variable to be calibrated: ", x_bg_tested) + + if bounds[index_x][0] == None or bounds[index_x][0] == "None": + bounds_inf_x_bg_tested = background_values[index_x] - abs(background_values[index_x]*100) + else: + bounds_inf_x_bg_tested = bounds[index_x][0] + + if bounds[index_x][1] == None or bounds[index_x][1] == "None": + bounds_sup_x_bg_tested = background_values[index_x] + abs(background_values[index_x]*100) + else: + bounds_sup_x_bg_tested = bounds[index_x][1] + + #avoid problems with values to be tested hen the bounds are strictly equal to 0 + if bounds_inf_x_bg_tested == 0: #which means that bounds_sup_x_bg_tested > 0 + bounds_inf_x_bg_tested = min(1e-9,bounds_sup_x_bg_tested*1e-9) + if bounds_sup_x_bg_tested == 0: #which means that bounds_inf_x_bg_tested < 0 + bounds_sup_x_bg_tested = max(-1e-9,bounds_inf_x_bg_tested*1e-9) + + list_x_bg_tested = [] + + if bounds_inf_x_bg_tested < 0: + if bounds_sup_x_bg_tested<0: + list_x_bg_tested = list(numpy.geomspace(bounds_inf_x_bg_tested,bounds_sup_x_bg_tested,int(number_of_tests))) + else: + list_x_bg_tested = list(numpy.geomspace(bounds_inf_x_bg_tested,bounds_inf_x_bg_tested*1e-4,int(number_of_tests/2))) + list_x_bg_tested = list_x_bg_tested + list((numpy.geomspace(bounds_sup_x_bg_tested*1e-4,bounds_sup_x_bg_tested,int(number_of_tests/2)))) + else: #both bounds are >0 + list_x_bg_tested = list(numpy.geomspace(bounds_inf_x_bg_tested,bounds_sup_x_bg_tested,int(number_of_tests))) + + #for a linear partition, we consider a log repartition better + # for index_partition in range(number_of_tests): + # list_x_bg_tested.append(bounds_inf_x_bg_tested + index_partition*(bounds_sup_x_bg_tested-bounds_inf_x_bg_tested)/(number_of_tests-1)) + + list_failed_model_evaluation = [] + for x_value_bg_tested in list_x_bg_tested: + + x_whole_values_with_bg = copy.deepcopy(background_values) #avoid modifiction of backgroundvalues + x_whole_values_with_bg[index_x] = x_value_bg_tested + try: + if model_format in ["DYMOSIM"]: + if ComplexModel: + simulation_results = prev_adao_version_TOP_LEVEL_exedymosimMultiobs(x_whole_values_with_bg) + else: + simulation_results = prev_adao_version_TOPLEVEL_exedymosimMultiobs_simple(x_whole_values_with_bg) + elif model_format in ["FMI","FMU"]: + simulation_results = TOP_LEVEL_exefmuMultiobs(x_whole_values_with_bg, VariablesToCalibrate=VariablesToCalibrate, OutputVariables=OutputVariables, LNames = LNames, LColumns = LColumns, LVariablesToChange=LVariablesToChange, ref_simudir = self._get_Name_ModelTmpDir_simple(ModelName), ModelName = ModelName, List_Multideltatime = List_Multideltatime, AdvancedDebugModel = AdvancedDebugModel, TimeoutModelExecution = TimeoutModelExecution, FMUInput = FMUInput) + elif model_format in ["OPENMODELICA"]: + simulation_results = TOP_LEVEL_exeOpenModelicaMultiobs(x_whole_values_with_bg, KeepCalculationFolders = KeepCalculationFolders, VariablesToCalibrate=VariablesToCalibrate, OutputVariables=OutputVariables, LNames = LNames, LColumns = LColumns, LVariablesToChange=LVariablesToChange, ref_simudir = self._get_Name_ModelTmpDir_simple(ModelName), ModelName = ModelName, List_Multideltatime = List_Multideltatime, Linux = Linux, AdvancedDebugModel = AdvancedDebugModel, TimeoutModelExecution = TimeoutModelExecution) + else: + raise NotImplementedError("Not yet implemented for current model format: ", model_format) + except: + list_failed_model_evaluation.append(x_value_bg_tested) + + dict_check_model_stability[x_bg_tested] = list_failed_model_evaluation + return dict_check_model_stability + + if VerifyFunction: + check_function, list_not_same_model_outputs, elapsed_time_for_simu = verify_function(x_values = Background, model_format = self.__model["Format"].upper()) + if check_function == "True": + print(" [VERBOSE] Results of function checking: ", "OK for all model outputs, ","simulation time (including all boundary conditions) is ",elapsed_time_for_simu, " seconds" ) + else: + print(" [VERBOSE] Results of function checking: ", "NOT OK for all model outputs") + print(" [VERBOSE] The following model outputs are not the same when repeating a simulation: ", list_not_same_model_outputs ) + + if GlobalSensitivityCheck: + check_sensitivity, list_same_model_outputs = verify_function_sensitivity(x_values = Background, increment = Params["DifferentialIncrement"], model_format = self.__model["Format"].upper()) + if VerifyFunction == False: + print(" [VERBOSE] /!\\ Activate VerifyFunction option and check that the result of the check is OK to get reliable results from GlobalSensitivityCheck option") + if check_sensitivity == "True": + print(" [VERBOSE] Results of function global sensitivity analysis: ", "All model outputs vary when background values are modified") + else: + print(" [VERBOSE] Results of function global sensitivity analysis: ", "Some model outputs DO NOT vary when background values are modified") + print(" [VERBOSE] The following model outputs DO NOT vary when background values are modified: ", list_same_model_outputs ) + + if ParamSensitivityCheck: + check_function, list_params_without_impact = verify_function_sensitivity_parameter(x_values = Background, increment = Params["DifferentialIncrement"], model_format = self.__model["Format"].upper()) + if VerifyFunction == False: + print(" [VERBOSE] /!\\ Activate VerifyFunction option and check that the result of the check is OK to get reliable results from ParamSensitivityCheck option") + if check_function == "True": + print(" [VERBOSE] Results of parameter sensitivity checking: ", "OK, all parameters have an impact on model outputs") + else: + print(" [VERBOSE] Results of parameter sensitivity checking: ", "NOT OK, some parameters do not have an impact on model outputs") + print(" [VERBOSE] The following parameters do not have an impact on model outputs: ", list_params_without_impact ) + + if ModelStabilityCheck: + print(" [VERBOSE] Model stability check is being performed, this check might take several minutes depending on the model and number of variables to calibrate (you can reduce the value of ModelStabilityCheckingPoints option to reduce checking time, by default its value is 10)") + check_stability = check_model_stability(x_values_names = VariablesToCalibrate, background_values = Background, bounds = Params['Bounds'], number_of_tests = ModelStabilityCheckingPoints, model_format = self.__model["Format"].upper()) + if check_stability == dict(zip(VariablesToCalibrate,[list() for i in VariablesToCalibrate])): + print(" [VERBOSE] Results of Model stability check: ", "The model simulates correctly when modifying individually the values of the variables to calibrate within the range in which their optimal value should be found") + else: + print(" [VERBOSE] Results of Model stability check: ", "The model FAILS to converge with the following values of variables to calibrate", check_stability) + print(" [VERBOSE] The data assimilation procedure might not succeed if the model fails to converge, please revise the model and/or the range in which the optimal value of the variables to calibrate should be found") + + # + + __adaocase = adaoBuilder.New() + __adaocase.set( 'AlgorithmParameters', Algorithm=Algo, Parameters=Params ) + __adaocase.set( 'Background', Vector=Background) + __adaocase.set( 'Observation', Vector=Observations ) + if type(CovB) is float: + __adaocase.set( 'BackgroundError', ScalarSparseMatrix=CovB ) + else: + __adaocase.set( 'BackgroundError', Matrix=CovB ) + + if AdaptObservationError: + Square_Observations = [x*x for x in Observations] + + # if (type(CovR) is float and CovR!= 1.0 ):# by default we still consider the square, better not to modify it + if type(CovR) is float: + ObsError = CovR*numpy.diag(Square_Observations) + __adaocase.set( 'ObservationError', Matrix = ObsError ) + else: + ObsError = 1e-15*numpy.diag(Square_Observations) #arbitary low value to eventually handle pressures in Pa + __adaocase.set( 'ObservationError', Matrix = ObsError ) + print(" [VERBOSE] AdaptObservationError was set to True, the following ObservationError matrix has been considered: ") + print(ObsError) + else: #classical situation + if type(CovR) is float: + __adaocase.set( 'ObservationError', ScalarSparseMatrix=CovR ) + else: + __adaocase.set( 'ObservationError', Matrix=CovR ) + + if Params["EnableMultiProcessing"]==1 : + ParallelComputationGradient = True + else: + ParallelComputationGradient = False + + + if self.__model["Format"].upper() in ["DYMOSIM"]: + if ParallelComputationGradient: + + if adao.version[:5] < '9.7.0' and adao.version[5]=='.': + raise ValueError("ADAO version must be at least 9.7.0 to support parallel computation of the gradient, current version is %s"%str(adao.version[:5])) + + if ComplexModel: + __adaocase.set( 'ObservationOperator', OneFunction=simulateur, + Parameters = {"DifferentialIncrement":Params["DifferentialIncrement"], "EnableMultiProcessing":Params["EnableMultiProcessing"], "NumberOfProcesses" : Params["NumberOfProcesses"]}, + ExtraArguments = {'VariablesToCalibrate':VariablesToCalibrate, 'OutputVariables' : OutputVariables, 'LNames': LNames, 'ModelFormat': ModelFormat, 'KeepCalculationFolders': KeepCalculationFolders, 'Verbose' : Verbose, 'dict_dsin_paths' : dict_dsin_path_ref, 'Linux': Linux, 'List_Multideltatime' : List_Multideltatime, 'AdvancedDebugModel':AdvancedDebugModel, 'TimeoutModelExecution':TimeoutModelExecution }, + ) + else: + __adaocase.set( 'ObservationOperator', OneFunction=simulateur, + Parameters = {"DifferentialIncrement":Params["DifferentialIncrement"], "EnableMultiProcessing":Params["EnableMultiProcessing"], "NumberOfProcesses" : Params["NumberOfProcesses"]}, + ExtraArguments = {'VariablesToCalibrate':VariablesToCalibrate, 'OutputVariables' : OutputVariables, 'LNames': LNames, 'ref_simudir': self._get_Name_ModelTmpDir_simple(ModelName), 'KeepCalculationFolders': KeepCalculationFolders, 'Linux': Linux, 'List_Multideltatime' : List_Multideltatime, 'AdvancedDebugModel':AdvancedDebugModel, 'TimeoutModelExecution':TimeoutModelExecution }, + ) + + else: + if adao.version[:5] < '9.7.0' and adao.version[5]=='.': + __adaocase.set( 'ObservationOperator', OneFunction=simulateur, Parameters = {"DifferentialIncrement":Params["DifferentialIncrement"]}) + + else: + if ComplexModel: + __adaocase.set( 'ObservationOperator', OneFunction=simulateur, + Parameters = {"DifferentialIncrement":Params["DifferentialIncrement"]}, + ExtraArguments = {'VariablesToCalibrate':VariablesToCalibrate, 'OutputVariables' : OutputVariables, 'LNames': LNames, 'ModelFormat': ModelFormat, 'KeepCalculationFolders': KeepCalculationFolders, 'Verbose' : Verbose, 'dict_dsin_paths' : dict_dsin_path_ref, 'Linux': Linux, 'List_Multideltatime' : List_Multideltatime, 'AdvancedDebugModel':AdvancedDebugModel, 'TimeoutModelExecution':TimeoutModelExecution }, + ) + else: + __adaocase.set( 'ObservationOperator', OneFunction=simulateur, + Parameters = {"DifferentialIncrement":Params["DifferentialIncrement"]}, + ExtraArguments = {'VariablesToCalibrate':VariablesToCalibrate, 'OutputVariables' : OutputVariables, 'LNames': LNames, 'ref_simudir': self._get_Name_ModelTmpDir_simple(ModelName), 'KeepCalculationFolders': KeepCalculationFolders, 'Linux': Linux, 'List_Multideltatime' : List_Multideltatime, 'AdvancedDebugModel':AdvancedDebugModel, 'TimeoutModelExecution':TimeoutModelExecution }) + + elif self.__model["Format"].upper() in ["OPENMODELICA"]: #OPENMODELICA (different from FMI since there are different keywords for the function: Linux and KeepCalculationFolders) + + if ComplexModel: + print("ComplexModel option is only valid for DYMOSIM format (it has no effect for the current calculation)") + + if ParallelComputationGradient: + if adao.version[:5] < '9.7.0' and adao.version[5]=='.': + raise ValueError("ADAO version must be at least 9.7.0 to support parallel computation of the gradient, current version is %s"%str(adao.version[:5])) + else: + __adaocase.set( 'ObservationOperator', OneFunction=simulateur, + Parameters = {"DifferentialIncrement":Params["DifferentialIncrement"]}, + ExtraArguments = {'VariablesToCalibrate':VariablesToCalibrate, 'OutputVariables' : OutputVariables, 'LNames': LNames, 'LColumns':LColumns, 'LVariablesToChange':LVariablesToChange, 'ref_simudir': self._get_Name_ModelTmpDir_simple(ModelName), 'ModelName': ModelName, 'List_Multideltatime' : List_Multideltatime, 'Linux': Linux, 'KeepCalculationFolders': KeepCalculationFolders, 'AdvancedDebugModel':AdvancedDebugModel, 'TimeoutModelExecution':TimeoutModelExecution}) + + else: + if adao.version[:5] < '9.7.0' and adao.version[5]=='.': + raise ValueError("ADAO version must be at least 9.7.0 to support other formats than DYMOSIM") + else: + __adaocase.set( 'ObservationOperator', OneFunction=simulateur, + Parameters = {"DifferentialIncrement":Params["DifferentialIncrement"]}, + ExtraArguments = {'VariablesToCalibrate':VariablesToCalibrate, 'OutputVariables' : OutputVariables, 'LNames': LNames, 'LColumns':LColumns, 'LVariablesToChange':LVariablesToChange, 'ref_simudir': self._get_Name_ModelTmpDir_simple(ModelName), 'ModelName': ModelName, 'List_Multideltatime' : List_Multideltatime, 'Linux': Linux, 'KeepCalculationFolders': KeepCalculationFolders, 'AdvancedDebugModel':AdvancedDebugModel, 'TimeoutModelExecution':TimeoutModelExecution}) + + else: #FMI or GUESS format + + if ComplexModel: + print("ComplexModel option is only valid for DYMOSIM format (it has no effect for the current calculation)") + + if ParallelComputationGradient: + if adao.version[:5] < '9.7.0' and adao.version[5]=='.': + raise ValueError("ADAO version must be at least 9.7.0 to support parallel computation of the gradient, current version is %s"%str(adao.version[:5])) + else: + __adaocase.set( 'ObservationOperator', OneFunction=simulateur, + Parameters = {"DifferentialIncrement":Params["DifferentialIncrement"]}, + ExtraArguments = {'VariablesToCalibrate':VariablesToCalibrate, 'OutputVariables' : OutputVariables, 'LNames': LNames, 'LColumns':LColumns, 'LVariablesToChange':LVariablesToChange, 'ref_simudir': self._get_Name_ModelTmpDir_simple(ModelName), 'ModelName': ModelName, 'List_Multideltatime' : List_Multideltatime, 'AdvancedDebugModel':AdvancedDebugModel, 'TimeoutModelExecution':TimeoutModelExecution, 'FMUInput':FMUInput}) + + else: + if adao.version[:5] < '9.7.0' and adao.version[5]=='.': + raise ValueError("ADAO version must be at least 9.7.0 to support other formats than DYMOSIM") + else: + __adaocase.set( 'ObservationOperator', OneFunction=simulateur, + Parameters = {"DifferentialIncrement":Params["DifferentialIncrement"]}, + ExtraArguments = {'VariablesToCalibrate':VariablesToCalibrate, 'OutputVariables' : OutputVariables, 'LNames': LNames, 'LColumns':LColumns, 'LVariablesToChange':LVariablesToChange, 'ref_simudir': self._get_Name_ModelTmpDir_simple(ModelName), 'ModelName': ModelName, 'List_Multideltatime' : List_Multideltatime, 'AdvancedDebugModel':AdvancedDebugModel, 'TimeoutModelExecution':TimeoutModelExecution, 'FMUInput':FMUInput}) + + + # + if self.__verbose: + __adaocase.set( 'Observer', Variable="CostFunctionJAtCurrentOptimum", String="print('\\n -----------------------------------\\n [VERBOSE] Current iteration number: %i'%len(var))" ) + __adaocase.set( 'Observer', Variable="CostFunctionJAtCurrentOptimum", Template="ValuePrinter", Info="\n [VERBOSE] Current optimal cost value.:" ) + __adaocase.set( 'Observer', Variable="CurrentOptimum", Template="ValuePrinter", Info="\n [VERBOSE] Current optimal state......:" ) # + + if self.__IntermediateInformation: + __adaocase.set( 'Observer', Variable="CostFunctionJ", Template="ValuePrinter", Info="\n [VERBOSE] Current cost value.:" ) + __adaocase.set( 'Observer', Variable="CurrentState", Template="ValuePrinter", Info="\n [VERBOSE] Current state......:" ) + + __adaocase.execute() + # + + if InitialSimulation: #the prev_adao_version_TOP_LEVEL_XXX is already defined with proper arguments (boundary conditions etc.) + + if self.__model["Format"].upper() in ["DYMOSIM"]: + if ComplexModel: + initialsimulation_results = prev_adao_version_TOP_LEVEL_exedymosimMultiobs(Background) + else: + initialsimulation_results = prev_adao_version_TOPLEVEL_exedymosimMultiobs_simple(Background) + elif self.__model["Format"].upper() in ["FMI", "FMU"]: + initialsimulation_results = TOP_LEVEL_exefmuMultiobs(Background, VariablesToCalibrate=VariablesToCalibrate, OutputVariables=OutputVariables, LNames = LNames, LColumns = LColumns, LVariablesToChange=LVariablesToChange, ref_simudir = self._get_Name_ModelTmpDir_simple(ModelName), ModelName = ModelName, List_Multideltatime = List_Multideltatime, AdvancedDebugModel = AdvancedDebugModel, TimeoutModelExecution = TimeoutModelExecution, FMUInput = FMUInput) + + elif self.__model["Format"].upper() in ["OPENMODELICA"]: + initialsimulation_results = TOP_LEVEL_exeOpenModelicaMultiobs(Background, KeepCalculationFolders = KeepCalculationFolders, VariablesToCalibrate=VariablesToCalibrate, OutputVariables=OutputVariables, LNames = LNames, LColumns = LColumns, LVariablesToChange=LVariablesToChange, ref_simudir = self._get_Name_ModelTmpDir_simple(ModelName), ModelName = ModelName, List_Multideltatime = List_Multideltatime, Linux = Linux, AdvancedDebugModel = AdvancedDebugModel, TimeoutModelExecution = TimeoutModelExecution ) + + __resultats = dict( + InitialParameters = numpy.asarray(Background), + OptimalParameters = __adaocase.get("Analysis")[-1], # * Background, + InitialSimulation = numpy.asarray(initialsimulation_results), #the simulation results are given for backgroung values, can be used to evaluate the overall performance of the calibration procedure + OptimalSimulation = __adaocase.get("SimulatedObservationAtOptimum")[-1], + Measures = Observations, + NamesOfParameters = BNames, + NamesOfMeasures = ONames, + ) + else: + __resultats = dict( + InitialParameters = numpy.asarray(Background), + OptimalParameters = __adaocase.get("Analysis")[-1], # * Background, + OptimalSimulation = __adaocase.get("SimulatedObservationAtOptimum")[-1], + Measures = Observations, + NamesOfParameters = BNames, + NamesOfMeasures = ONames, + ) + + if "APosterioriCovariance" in Params["StoreSupplementaryCalculations"]: + __resultats["APosterioriCovariance"] = __adaocase.get("APosterioriCovariance")[-1] + + if ResultsSummary: + if InitialSimulation: + measures_input = __resultats ["Measures"] + initial_simu = __resultats ["InitialSimulation"] + optimal_simu = __resultats ["OptimalSimulation"] + + diff_measures_initial_rmse = numpy.array([(measures_input[i]-initial_simu[i])**2 for i in range(len(measures_input))]) + diff_measures_initial_reldiff = numpy.array([abs((measures_input[i]-initial_simu[i])/measures_input[i]) for i in range(len(measures_input)) if measures_input[i] !=0]) + + diff_measures_optimal_rmse = numpy.array([(measures_input[i]-optimal_simu[i])**2 for i in range(len(measures_input))]) + diff_measures_optimal_reldiff = numpy.array([abs((measures_input[i]-optimal_simu[i])/measures_input[i]) for i in range(len(measures_input)) if measures_input[i] !=0]) + + diff_measures_initial_rmse_res = str(numpy.format_float_scientific(numpy.sqrt(diff_measures_initial_rmse.mean()), precision = 3)) + diff_measures_initial_reldiff_res = str(numpy.round(diff_measures_initial_reldiff.mean()*100,decimals =2)) + + diff_measures_optimal_rmse_res = str(numpy.format_float_scientific(numpy.sqrt(diff_measures_optimal_rmse.mean()), precision = 3)) + diff_measures_optimal_reldiff_res = str(numpy.round(diff_measures_optimal_reldiff.mean()*100,decimals =2)) + + __resultats["ResultsSummary_RMSE"] = ["Average_RMSE_INITIAL = " + diff_measures_initial_rmse_res, "Average_RMSE_OPTIMAL = " + diff_measures_optimal_rmse_res ] + __resultats["ResultsSummary_RelativeDifference"] = ["Average_RelativeDifference_INITIAL = " + diff_measures_initial_reldiff_res + "%", "Average_RelativeDifference_OPTIMAL = " + diff_measures_optimal_reldiff_res + "%" ] + else: + measures_input = __resultats ["Measures"] + optimal_simu = __resultats ["OptimalSimulation"] + + diff_measures_optimal_rmse = numpy.array([(measures_input[i]-optimal_simu[i])**2 for i in range(len(measures_input))]) + diff_measures_optimal_reldiff = numpy.array([abs((measures_input[i]-optimal_simu[i])/measures_input[i]) for i in range(len(measures_input)) if measures_input[i] !=0]) + + + diff_measures_optimal_rmse_res = str(numpy.format_float_scientific(numpy.sqrt(diff_measures_optimal_rmse.mean()), precision = 3)) + diff_measures_optimal_reldiff_res = str(numpy.round(diff_measures_optimal_reldiff.mean()*100,decimals =2)) + + __resultats["ResultsSummary_RMSE"] = ["Average_RMSE_OPTIMAL = " + diff_measures_optimal_rmse_res ] + __resultats["ResultsSummary_RelativeDifference"] = ["Average_RelativeDifference_OPTIMAL = " + diff_measures_optimal_reldiff_res + "%" ] + + # + _saveResults( + __resultats, + self.__result["SourceName"], + self.__result["Level"], + self.__result["Format"], + self.__verbose, + ) + # + + if CreateOptimaldsin: + if self.__model["Format"].upper() in ["DYMOSIM"]: + for etat in range(len(LNames)): + dict_optimal_params = dict(zip(VariablesToCalibrate, __adaocase.get("Analysis")[-1])) #dans la boucle, sinon il se vide à chaque fois que l'on fait itère + if not(ComplexModel): + dir_for_dsin_opti = os.path.abspath(os.path.join(self._get_Name_ModelTmpDir_simple(ModelName),os.pardir)) + simu_dir_opti = os.path.abspath(self._get_Name_ModelTmpDir_simple(ModelName)) + + writer_opti = write_in_dsin.Write_in_dsin(dict_inputs =dict_optimal_params,filedir=simu_dir_opti,dsin_name=str("dsin_" +LNames[etat]+ ".txt"),old_file_name='dsin.txt',new_file_name= str("dsin_" +LNames[etat]+ "_optimal.txt")) + writer_opti.write_in_dsin + + writer_opti.write_in_dsin() + + try: + os.remove(os.path.join(dir_for_dsin_opti,str("dsin_" +LNames[etat]+ "_optimal.txt"))) + except: + pass + + shutil.copyfile( + os.path.join(simu_dir_opti, str("dsin_" +LNames[etat]+ "_optimal.txt")), + os.path.join(dir_for_dsin_opti, str("dsin_" +LNames[etat]+ "_optimal.txt")) + ) + + auto_simul_opti = Automatic_Simulation( + simu_dir=simu_dir_opti, + dsin_name= str("dsin_" +LNames[etat]+ "_optimal.txt"), + dymosim_path = os.path.join(simu_dir_opti, os.pardir) , + dsres_path = str("dsres_" +LNames[etat]+ "_opti.mat"), + dict_inputs= {}, + logfile=False, + timeout=TimeoutModelExecution, + without_modelicares=True, + linux=Linux + ) + auto_simul_opti.single_simulation() + + if not(auto_simul_opti.success_code == 0): + print("The dsin.txt with the optimal values does not converge directly, please update the value of iteration variables using Change_working_Point script (consider both calibrated paramaters and boundary conditions in the procedure)") + else: + try: #to handle situations in which there is only one CL (boundary conditions) file + var_int = numpy.ravel(LColumns[:,etat]) + except : + var_int = numpy.ravel(LColumns) + dict_inputs_boundary_conditions_optimal_params = dict(zip(LVariablesToChange, var_int)) + dict_inputs_boundary_conditions_optimal_params.update(dict_optimal_params) + + try: + os.remove(os.path.join(dir_for_dsin_opti,str("dsin_" +LNames[etat]+ "_optimal.txt"))) + except: + pass + dir_for_dsin_opti = os.path.abspath(os.path.join(self._get_Name_ModelTmpDir_simple(ModelName),os.pardir)) + shutil.copyfile( + os.path.join(dir_for_dsin_opti, str("dsin.txt")), + os.path.join(dir_for_dsin_opti, str("dsin_" +LNames[etat]+ "_optimal.txt")) + ) + writer_opti = write_in_dsin.Write_in_dsin(dict_inputs =dict_inputs_boundary_conditions_optimal_params,filedir=dir_for_dsin_opti,dsin_name=str("dsin_" +LNames[etat]+ "_optimal.txt"),old_file_name=str("dsin_" +LNames[etat]+ "_old.txt"),new_file_name= str("dsin_" +LNames[etat]+ "_optimal.txt")) + writer_opti.write_in_dsin + writer_opti.write_in_dsin() + os.remove(os.path.join(dir_for_dsin_opti,str("dsin_" +LNames[etat]+ "_old.txt"))) + print("The dsin.txt with the optimal values has not been tested, updating the values of iteration variables might be necessary: this can be done by using Change_working_Point script (consider both calibrated paramaters and boundary conditions in the procedure)") + + else: #FMI, OM or GUESS format + print("CreateOptimaldsin option is only valid for DYMOSIM format (it has no effect for the current calculation)") + + if self.__verbose: print("") + del __adaocase + + if not(KeepCalculationFolders): + shutil.rmtree(self._get_Name_ModelTmpDir_simple(ModelName), ignore_errors=True) + + if CreateCSVoutputResults: + print("Creation of CSV output results") + + + optimal_results = __resultats["OptimalSimulation"] + for obs_file_index in range(len(self.__measures["SourceName"])): + obs_filename = self.__measures["SourceName"][obs_file_index] #for the name of the outputresult + + time_measures_number = len(OHours[obs_file_index]) #get the number of lines in the obs file + total_measures_number_per_cl = time_measures_number*len(__resultats["NamesOfMeasures"]) #size of the list to be removed from the whole results file + optimal_results_cl = optimal_results[:total_measures_number_per_cl] # output results for the current cl + optimal_results = optimal_results[total_measures_number_per_cl:] # update of the whole results file (we remove te results already treated) + + optimal_results_cl_to_csv = optimal_results_cl.reshape((len(__resultats["NamesOfMeasures"]),-1)).transpose() #reshape to get one line per timedate + df_optimal_results_cl_to_csv = pandas.DataFrame(optimal_results_cl_to_csv, columns = __resultats["NamesOfMeasures"]) #creation of the df with the names of measures + df_optimal_results_cl_to_csv.insert(0,"Hour", OHours[obs_file_index]) #add Hour column + df_optimal_results_cl_to_csv.insert(0,"Date", ODates[obs_file_index]) # add Date column + + optimal_file_results_name_cl_tmp = "Optimal_simulation_tmp.csv" + df_optimal_results_cl_to_csv.to_csv(optimal_file_results_name_cl_tmp, sep = ";", index = False) + + optimal_file_results_name_cl = "Optimal_simulation_"+obs_filename + with open(optimal_file_results_name_cl_tmp, 'r') as infile: + readie=csv.reader(infile, delimiter=';') + with open(optimal_file_results_name_cl, 'wt', newline='') as output: + outwriter=csv.writer(output, delimiter=';') + outwriter.writerow(["#"]) + for row in readie: + outwriter.writerow(row) + try: + os.remove(optimal_file_results_name_cl_tmp) + except: + pass + + if InitialSimulation: #If initial simulation is required as well, the Initial results files are given as well + initial_results = __resultats["InitialSimulation"] + for obs_file_index in range(len(self.__measures["SourceName"])): + obs_filename = self.__measures["SourceName"][obs_file_index] #for the name of the outputresult + + time_measures_number = len(OHours[obs_file_index]) #get the number of lines in the obs file + total_measures_number_per_cl = time_measures_number*len(__resultats["NamesOfMeasures"]) #size of the list to be removed from the whole results file + initial_results_cl = initial_results[:total_measures_number_per_cl] # output results for the current cl + initial_results = initial_results[total_measures_number_per_cl:] # update of the whole results file (we remove te results already treated) + + initial_results_cl_to_csv = initial_results_cl.reshape((len(__resultats["NamesOfMeasures"]),-1)).transpose() #reshape to get one line per timedate + df_initial_results_cl_to_csv = pandas.DataFrame(initial_results_cl_to_csv, columns = __resultats["NamesOfMeasures"]) #creation of the df with the names of measures + df_initial_results_cl_to_csv.insert(0,"Hour", OHours[obs_file_index]) #add Hour column + df_initial_results_cl_to_csv.insert(0,"Date", ODates[obs_file_index]) # add Date column + + initial_file_results_name_cl_tmp = "Initial_simulation_tmp.csv" + df_initial_results_cl_to_csv.to_csv(initial_file_results_name_cl_tmp, sep = ";", index = False) + + initial_file_results_name_cl = "Initial_simulation_"+obs_filename + with open(initial_file_results_name_cl_tmp, 'r') as infile: + readie=csv.reader(infile, delimiter=';') + with open(initial_file_results_name_cl, 'wt', newline='') as output: + outwriter=csv.writer(output, delimiter=';') + outwriter.writerow(["#"]) + for row in readie: + outwriter.writerow(row) + try: + os.remove(initial_file_results_name_cl_tmp) + except: + pass + + return __resultats + + + def verify_gradient(self): + raise NotImplementedError("Not yet implemented") + +# ============================================================================== +def _readLink(__filename=None, __colnames=None, __indexname="Variable", __format="Guess"): + """ + Read file of link between model and measures + + Arguments: + - File name + - Names of the columns to read, known by their variable header + - Name of the column containing the variables to adapt + - Format of the data (CSV, TSV... or can be guessed) + """ + # + if __colnames is None: + raise + if __indexname is None: __indexname="Variable" + + try: #Solution provisoire pou gérer les cas sans CL précisées par l'utilisateur + with ImportFromFile(__filename, __colnames, __indexname, __format, False) as reading: + colnames, columns, indexname, variablestochange = reading.getvalue() + except: + colnames =__colnames + columns =[] + variablestochange =[] + print("/!\\ Boundary conditions indicated in ", __filename, " are not considered for calculation. Indicate at least two boundary conditions so that they are considered. /!\\ ") + # + variablestochange = tuple([v.strip() for v in variablestochange]) + return (colnames, columns, variablestochange) + +# ============================================================================== +def _readMultiMeasures(__filenames=None, __colnames=None, __format="Guess"): + """ + Read files of measures, using only some named variables by columns + + Arguments: + - File name + - Names of the columns to read, known by their variable header + - Format of the data (CSV, TSV... or can be guessed) + """ + # + MultiMeasures = [] + for __filename in __filenames: + colnames, columns = _readMeasures(__filename, __colnames, __format) + MultiMeasures.append( columns ) + # + return (colnames, MultiMeasures) + +# ============================================================================== +def _readMeasures(__filename=None, __colnames=None, __format="Guess"): + """ + Read files of measures, using only some named variables by columns + + Arguments: + - File name + - Names of the columns to read, known by their variable header + - Format of the data (CSV, TSV... or can be guessed) + """ + # + with ImportFromFile(__filename, __colnames, None, __format) as reading: + colnames, columns, indexname, index = reading.getvalue() + # + return (colnames, columns) + +# ============================================================================== +def _readBackground(__filename="dsin.txt", __varnames=None, __backgroundformat="Guess"): + """ + Read background in model definition. The priority is "USER", then "ADAO", + then "DSIN", and the default one correspond to "DSIN". + + Arguments: + - File name + - Names of the variables to be found in the model data + - Format of the data (can be guessed) + """ + __format = None + if __backgroundformat.upper() in ["GUESS", "DSIN"]: + __format = "DSIN" + if __filename is not None and os.path.isfile(__filename): + __model_dir = os.path.dirname(__filename) + __model_fil = os.path.basename(__filename) + elif __filename is not None and os.path.isdir(__filename) and os.path.isfile(os.path.join(__filename, "dsin.txt")): + __model_dir = os.path.abspath(__filename) + __model_fil = "dsin.txt" + else: + raise ValueError('No such file or directory: %s'%str(__filename)) + __bgfile = os.path.abspath(os.path.join(__model_dir, __model_fil)) + if (len(__bgfile)<7) and (__bgfile[-8:].lower() != "dsin.txt" ): + raise ValueError("Model definition file \"dsin.txt\" can not be found.") + if __varnames is None: + raise ValueError("Model variables to be read has to be given") + if __backgroundformat.upper() == "ADAO": + __format = "ADAO" + if __filename is None or not os.path.isfile(__filename): + raise ValueError('No such file or directory: %s'%str(__filename)) + __bgfile = os.path.abspath(__filename) + if not( len(__bgfile)>4 ): + raise ValueError("User file name \"%s\" is too short and seems not to be valid."%__bgfile) + if __backgroundformat.upper() == "USER": + __format = "USER" + if __filename is None or not os.path.isfile(__filename): + raise ValueError('No such file or directory: %s'%str(__filename)) + __bgfile = os.path.abspath(__filename) + if not( len(__bgfile)>5 ): + raise ValueError("User file name \"%s\" is too short and seems not to be valid."%__bgfile) + if __format not in ["DSIN", "ADAO", "USER"]: + raise ValueError("Background initial values asked format \"%s\" is not valid."%__format) + # + #--------------------------------------------- + if __format == "DSIN": + if __varnames is None: + raise ValueError("Names of variables to read has to be given") + # + __names, __background, __bounds = [], [], [] + with open(__bgfile, 'r') as fid: + __content = fid.readlines() + for v in tuple(__varnames): + if _verify_existence_of_variable_in_dsin(v, __content): + __value = _read_existing_variable_in_dsin(v, __content) + __names.append(v) + __background.append(__value[0]) + __bounds.append(__value[1:3]) + # print "%s = "%v, __value + __names = tuple(__names) + __background = numpy.ravel(__background) + __bounds = tuple(__bounds) + #--------------------------------------------- + elif __format.upper() == "ADAO": + with open(__bgfile, 'r') as fid: + exec(fid.read()) + # + __background = locals().get("Background", None) + if __background is None: + raise ValueError("Background is not defined") + else: + __background = numpy.ravel(__background) + __Params = locals().get("Parameters", {}) + if "Bounds" not in __Params: + raise ValueError("Bounds can not be find in file \"%s\""%str(__filename)) + else: + __bounds = tuple(__Params["Bounds"]) + if len(__bounds) != len(__background): + raise ValueError("Initial background length does not match bounds number") + # + if __varnames is None: + __names = () + else: + __names = tuple(__varnames) + if len(__names) != len(__background): + raise ValueError("Initial background length does not match names number") + #--------------------------------------------- + elif __format.upper() == "USER": + __names, __background, __bounds = ImportScalarLinesFromFile(__bgfile).getvalue(__varnames) + #--------------------------------------------- + for i,b in enumerate(__bounds) : + if b[0] is not None and b[1] is not None and not (b[0] < b[1]) : + raise ValueError(f'Inconsistent boundaries values for "{__names[i]}": {b[0]} not < {b[1]}') + + return (__names, __background, __bounds) + +def _verify_existence_of_variable_in_dsin(__variable, __content, __number = 2): + "Verify if the variable exist in the model file content" + if "".join(__content).count(__variable) >= __number: + return True + else: + return False + +def _read_existing_variable_in_dsin(__variable, __content): + "Return the value of the real variable" + __previousline, __currentline = "", "" + internalOrder = 0 + initialValuePart = False + for line in __content: + if not initialValuePart and "initialValue" not in line: continue + if not initialValuePart and "initialValue" in line: initialValuePart = True + __previousline = __currentline + __currentline = line + if __variable in __currentline and "#" in __currentline and "#" not in __previousline: + # Informations ecrites sur deux lignes successives + vini, value, vmin, vmax = __previousline.split() + # vcat, vtype, diez, vnam = __currentline.split() + value = float(value) + if float(vmin) >= float(vmax): + vmin, vmax = None, None + else: + vmin, vmax = float(vmin), float(vmax) + return (value, vmin, vmax) + elif __variable in __currentline and "#" in __currentline and "#" in __previousline: + # Informations ecrites sur une seule ligne + vini, value, vmin, vmax, reste = __previousline.split(maxsplit=5) + value = float(value) + if float(vmin) >= float(vmax): + vmin, vmax = None, None + else: + vmin, vmax = float(vmin), float(vmax) + return (value, vmin, vmax) + # + raise ValueError("Value of variable %s not found in the file content"%__variable) + +# ============================================================================== +def _readMethod(__filename="parameters.py", __format="ADAO"): + """ + Read data assimilation method parameters + + Arguments: + - File name + - Format of the data (can be guessed) + """ + if not os.path.isfile(__filename): + raise ValueError('No such file or directory: %s'%str(__filename)) + # + #--------------------------------------------- + if __format.upper() in ["GUESS", "ADAO"]: + with open(__filename, 'r') as fid: + exec(fid.read()) + __Algo = locals().get("Algorithm", "3DVAR") + __Params = locals().get("Parameters", {}) + __CovB = locals().get("BackgroundError", 1.) + __CovR = locals().get("ObservationError", 1.) + # __Xb = locals().get("Background", __background) + # if not isinstance(__Params, dict): __Params = {"Bounds":__bounds} + # if "Bounds" not in __Params: __Params["Bounds"] = __bounds + #--------------------------------------------- + else: + __Algo, __Params, __CovB, __CovR = "3DVAR", {}, 1., 1. + #--------------------------------------------- + # if __Xb is None: + # raise ValueError("Background is not defined") + # else: + # __Xb = numpy.ravel(__Xb) + # + return (__Algo, __Params, __CovB, __CovR) + +# ============================================================================== +def _saveResults(__resultats, __filename=None, __level=None, __format="Guess", __verbose=False): + """ + Save results relatively to the level required + + Arguments! + - File name + - Level of saving : final output only or intermediary with final output + - Format of the data + """ + if __filename is None: + raise ValueError('A file to save results has to be named, please specify one') + if __format.upper() == "GUESS": + if __filename.split(".")[-1].lower() == "txt": + __format = "TXT" + elif __filename.split(".")[-1].lower() == "py": + __format = "PY" + else: + raise ValueError("Can not guess the file format of \"%s\", please specify the good one"%__filename) + else: + __format = str(__format).upper() + if __format not in ["TXT", "PY"]: + raise ValueError("Result file format \"%s\" is not valid."%__format) + if __format == "PY" and os.path.splitext(__filename)[1] != ".py": + __filename = os.path.splitext(__filename)[0] + ".py" + # + #--------------------------------------------- + if __format.upper() == "TXT": + output = ["# Final results",] + keys = list(__resultats.keys()) + keys.sort() + for k in keys: + output.append("%18s = %s"%(k,tuple(__resultats[k]))) + output.append("") + with open(__filename, 'w') as fid: + fid.write( "\n".join(output) ) + fid.flush() + #--------------------------------------------- + elif __format.upper() == "PY": + output = ["# Final results",] + keys = list(__resultats.keys()) + keys.sort() + for k in keys: + output.append("%s = %s"%(k,tuple(__resultats[k]))) + output.append("") + with open(__filename, 'w') as fid: + fid.write( "\n".join(output) ) + fid.flush() + #--------------------------------------------- + elif __format.upper() == "CSV": + raise NotImplementedError("Not yet implemented") + #--------------------------------------------- + elif __format.upper() == "DICT": + raise NotImplementedError("Not yet implemented") + #--------------------------------------------- + if __verbose: # Format TXT + output = ["# Final results",] + keys = list(__resultats.keys()) + keys.sort() + for k in keys: + output.append("%18s = %s"%(k,tuple(__resultats[k]))) + output.append("") + print( "\n".join(output) ) + #--------------------------------------------- + return True + +def _secure_copy_textfile(__original_file, __destination_file): + "Copy the file guranteeing that it is copied" + + + shutil.copy( + __original_file, + __destination_file + ) + shutil.copystat( # commande pour assurer que le fichier soit bien copié + __original_file, + __destination_file + ) + os.path.getsize(__destination_file)# commande bis pour assurer que le fichier soit bien copié + + def file_len(fname): + count = 0 + with open(fname) as f: + for line in f: + count += 1 + return count + + while file_len(__original_file)!= file_len(__destination_file): + shutil.copy( + __original_file, + __destination_file + ) + +# ============================================================================== +class Python_Simulation(object): + def __init__(self, simu_dir=".", val_inputs=(), logfile=True, timeout=60, verbose=True): + self.__simu_dir = os.path.abspath(simu_dir) + self.__inputs = numpy.ravel(val_inputs) + if verbose: + print() + print(" [VERBOSE] Input values %s"%self.__inputs) + + def single_simulation(self, __inputs = None): + with open(os.path.join(self.__simu_dir,"pythonsim.exe"), 'r') as fid: + exec(fid.read()) + __directoperator = locals().get("DirectOperator", None) + if __inputs is None: __inputs = self.__inputs + return __directoperator(__inputs) + +# ============================================================================== +class VersionsLogicielles (object): + "Modules version information" + def __init__(self): + print(" [VERBOSE] System configuration:") + print(" - Python...:",sys.version.split()[0]) + print(" - Numpy....:",numpy.version.version) + print(" - Scipy....:",scipy.version.version) + print(" - ADAO.....:",adao.version) + print("") + +def TOP_LEVEL_exefmuMultiobs_ref( x_values_matrix , VariablesToCalibrate=None, OutputVariables=None, ref_simudir = None, ModelName = None ): + "Appel du modèle en format FMU et restitution des résultats" + x_values = list(numpy.ravel(x_values_matrix)) + x_inputs=dict(zip(VariablesToCalibrate,x_values)) + + fmuname = os.path.basename(ModelName) #in case ModelName is given as a path (works also if it is a file name) + fmu = os.path.join(ref_simudir,fmuname) + + reader = simulate_fmu(fmu, output = OutputVariables, start_values = x_inputs) + y_values = [reader[y_name][-1] for y_name in OutputVariables] + y_values_matrix = numpy.asarray(y_values) + + return y_values_matrix + +def TOP_LEVEL_exefmuMultiobs( x_values_matrix , VariablesToCalibrate=None, OutputVariables=None, LNames = None, LColumns = None, LVariablesToChange=None, ref_simudir = None, ModelName = None, List_Multideltatime = None, AdvancedDebugModel = None, TimeoutModelExecution = None, FMUInput = None): + "Appel du modèle en format FMU et restitution des résultats" + + if VariablesToCalibrate is None: + raise ValueError("VariablesToCalibrate") + + if OutputVariables is None: + raise ValueError("OutputVariables") + + if LNames is None: + raise ValueError("LNames") + + if LColumns is None: + raise ValueError("LColumns") + + if LVariablesToChange is None: + raise ValueError("LVariablesToChange") + + if ref_simudir is None: + raise ValueError("ref_simudir") + + if List_Multideltatime is None: + raise ValueError("Problem defining simulation output results") + + if AdvancedDebugModel is None: + raise ValueError("AdvancedDebugModel") + + if TimeoutModelExecution is None: + raise ValueError("TimeoutModelExecution") + + x_values = list(numpy.ravel(x_values_matrix)) + x_inputs=dict(zip(VariablesToCalibrate,x_values)) + + if os.path.isfile(ModelName): + fmuname = os.path.basename(ModelName) + elif os.path.isdir(ModelName): + fmuname = [files for files in os.listdir(os.path.abspath(ModelName)) if files[-4:] =='.fmu'][0] + + fmu = os.path.join(ref_simudir,fmuname) + + multi_y_values_matrix = [] + + for etat in range(len(LNames)): + y_values =[] + + try: #to handle situations in which there is only one CL (boundary conditions) file + var_int = numpy.ravel(LColumns[:,etat]) + except : + var_int = numpy.ravel(LColumns) + + dict_inputs = dict(zip(LVariablesToChange, var_int)) + # + dict_inputs.update(x_inputs) + + if FMUInput: + fmu_inputs = FMUInput[LNames[etat]] + timestep = fmu_inputs[1][0] - fmu_inputs[0][0] #Assuming constant timestep + if AdvancedDebugModel: print(f'The timestep for {LNames[etat]} is {timestep} seconds') + else: + fmu_inputs = None + timestep = None + + try: + stoptime_fmu = List_Multideltatime[etat][-1] + + except IndexError: + stoptime_fmu = None + + + if AdvancedDebugModel == True: + old_stdout = sys.stdout + new_stdout = io.StringIO() + sys.stdout = new_stdout + + start_time_simulation_fmi = time.time()#timeout manangement since fmpy does not raise an error for this, it just ends the simulations and continues + + try : + reader = simulate_fmu(fmu, output = OutputVariables, start_values = dict_inputs, debug_logging = True, timeout = TimeoutModelExecution, input = fmu_inputs, stop_time= stoptime_fmu, output_interval= timestep) + except FMICallException as e: + output = new_stdout.getvalue() + sys.stdout = old_stdout + print('[ERROR] Failed simulation with the following input:\n',dict_inputs) + raise e + + elapsed_time_simulation_fmi = time.time() - start_time_simulation_fmi + if elapsed_time_simulation_fmi > TimeoutModelExecution: + raise TimeoutError("Timeout for simulation reached, please increase it in order to be able to simulate your model and/or check if your model is correct (use TimeoutModelExecution option in configuration.py file)") + + output = new_stdout.getvalue() + sys.stdout = old_stdout + + dir_run=ref_simudir + log_file=os.path.join(dir_run, + 'log_debug_model.txt') + try: #try toremove the previous file + os.remove(log_file) + except: + pass + + f=open(log_file,'a') + for line in output: + f.write(line) + + else: + start_time_simulation_fmi = time.time() + + try : + reader = simulate_fmu(fmu, output = OutputVariables, start_values = dict_inputs, timeout = TimeoutModelExecution, input = fmu_inputs, stop_time= stoptime_fmu, output_interval= timestep) + except FMICallException as e: + print('[ERROR] Failed simulation with the following input:\n',dict_inputs) + raise e + + elapsed_time_simulation_fmi = time.time() - start_time_simulation_fmi + if elapsed_time_simulation_fmi > TimeoutModelExecution: + raise TimeoutError("Timeout for simulation reached, please increase it in order to be able to simulate your model and/or check if your model is correct (use TimeoutModelExecution option in configuration.py file)") + + y_whole = [reader[y_name] for y_name in OutputVariables] + + for y_ind in range(len(OutputVariables)): + y_ind_whole_values = y_whole[y_ind] + y_ind_whole_time = reader['time'] #standard output of fmu + + if len(List_Multideltatime[etat])>1: + index_y_values = [find_nearest_index(y_ind_whole_time,time)[0] for time in List_Multideltatime[etat]] + else: + index_y_values = [-1] #we only take the last point if one measure point + y_ind_values = [y_ind_whole_values[i] for i in index_y_values] + y_values = y_values + y_ind_values + + y_values_matrix = y_values #pbs in results management + multi_y_values_matrix = multi_y_values_matrix + y_values_matrix + + y_values_matrix_def = numpy.ravel(numpy.array(multi_y_values_matrix)) + return y_values_matrix_def + +def TOP_LEVEL_exeOpenModelicaMultiobs( x_values_matrix , KeepCalculationFolders = None, VariablesToCalibrate=None, OutputVariables=None, LNames = None, LColumns = None, LVariablesToChange=None, ref_simudir = None, ModelName = None, List_Multideltatime = None, Linux = None, AdvancedDebugModel = None, TimeoutModelExecution = None): + "Appel du modèle en format OpenModelica et restitution des résultats" + + if VariablesToCalibrate is None: + raise ValueError("VariablesToCalibrate") + + if OutputVariables is None: + raise ValueError("OutputVariables") + + if LNames is None: + raise ValueError("LNames") + + if LColumns is None: + raise ValueError("LColumns") + + if LVariablesToChange is None: + raise ValueError("LVariablesToChange") + + if ref_simudir is None: + raise ValueError("ref_simudir") + + if KeepCalculationFolders is None: + raise ValueError("KeepCalculationFolders") + + if Linux is None: + raise ValueError("Linux") + + if List_Multideltatime is None: + raise ValueError("Problem defining simulation output results") + + if AdvancedDebugModel is None: + raise ValueError("AdvancedDebugModel") + + if TimeoutModelExecution is None: + raise ValueError("TimeoutModelExecution") + + x_values = list(numpy.ravel(x_values_matrix)) + x_inputs=dict(zip(VariablesToCalibrate,x_values)) + + if os.path.isfile(ModelName): + om_xml = os.path.basename(ModelName) + elif os.path.isdir(ModelName): + om_xml = [files for files in os.listdir(os.path.abspath(ModelName)) if files[-4:] =='.xml'][0] + + om_xml = os.path.abspath(os.path.join(ref_simudir,om_xml)) + + multi_y_values_matrix = [] + + for etat in range(len(LNames)): + y_values =[] + + try: #to handle situations in which there is only one CL (boundary conditions) file + var_int = numpy.ravel(LColumns[:,etat]) + except : + var_int = numpy.ravel(LColumns) + + dict_inputs = dict(zip(LVariablesToChange, var_int)) + # + + dict_inputs.update(x_inputs) + + + results_dir = tempfile.mkdtemp( dir=os.path.dirname(om_xml) ) #to handle parallel computation + + results_file_name = os.path.join(results_dir,os.path.basename(om_xml)[:-9] + '_' + LNames[etat].replace('.','_') + ".mat") + + OM_execution = run_OM_model(om_xml, dict_inputs = dict_inputs, result_file_name = results_file_name , Linux = Linux, AdvancedDebugModel = AdvancedDebugModel, TimeoutModelExecution = TimeoutModelExecution) + + try: + reader = Reader(results_file_name,'dymola') #dymola even if it is OpenModelica + except: + raise ValueError("Simulation cannot be performed: reduce the number of parameters to calibrate and/or the range in which their optimal value should be found (or modify and simplify the model to make it easier to simulate)" ) + + y_whole = [reader.values(y_name) for y_name in OutputVariables] + + for y_ind in range(len(OutputVariables)): + y_ind_whole_values = y_whole[y_ind][1] + y_ind_whole_time = y_whole[y_ind][0] + + if len(List_Multideltatime[etat])>1: + index_y_values = [find_nearest_index(y_ind_whole_time,time)[0] for time in List_Multideltatime[etat]] + else: + index_y_values = [-1] #we only take the last point if one measure point + + y_ind_values = [y_ind_whole_values[i] for i in index_y_values] + y_values = y_values + y_ind_values +# y_values_matrix = numpy.asarray(y_values) + y_values_matrix = y_values #pbs in results management + multi_y_values_matrix = multi_y_values_matrix + y_values_matrix + + if not KeepCalculationFolders: + shutil.rmtree(results_dir, ignore_errors=True) + + y_values_matrix_def = numpy.ravel(numpy.array(multi_y_values_matrix)) + + return y_values_matrix_def + +def TOPLEVEL_exedymosimMultiobs_simple( x_values_matrix , VariablesToCalibrate=None, OutputVariables=None, LNames = None, ref_simudir = None, KeepCalculationFolders = None, Linux = None, List_Multideltatime = None, AdvancedDebugModel = None, TimeoutModelExecution = None): + "Appel du modèle en format DYMOSIM et restitution des résultats" + + + if VariablesToCalibrate is None: + raise ValueError("VariablesToCalibrate") + + if OutputVariables is None: + raise ValueError("OutputVariables") + + if LNames is None: + raise ValueError("LNames") + + if ref_simudir is None: + raise ValueError("ref_simudir") + + if KeepCalculationFolders is None: + raise ValueError("KeepCalculationFolders") + + if Linux is None: + raise ValueError("Linux") + + if List_Multideltatime is None: + raise ValueError("Problem defining simulation output results") + + if AdvancedDebugModel is None: + raise ValueError("AdvancedDebugModel") + + if TimeoutModelExecution is None: + raise ValueError("TimeoutModelExecution") + # + # x_values = list(numpy.ravel(x_values_matrix) * Background) + x_values = list(numpy.ravel(x_values_matrix)) + x_inputs=dict(zip(VariablesToCalibrate,x_values)) + # if Verbose: print(" Simulation for %s"%numpy.ravel(x_values)) + # + + multi_y_values_matrix = [] + + for etat in range(len(LNames)): + y_values =[] + + simudir = tempfile.mkdtemp( dir=ref_simudir ) + # + dict_inputs ={} + dict_inputs.update( x_inputs ) + # + shutil.copy( + os.path.join(ref_simudir, str("dsin_" +LNames[etat]+ ".txt")), + os.path.join(simudir, str("dsin_" +LNames[etat]+ ".txt")) + ) + _secure_copy_textfile( + os.path.join(ref_simudir, str("dsin_" +LNames[etat]+ ".txt")), + os.path.join(simudir, str("dsin_" +LNames[etat]+ ".txt")) + ) + + auto_simul = Automatic_Simulation( + simu_dir = simudir, + dymosim_path = os.path.join(ref_simudir,os.pardir) , + dsin_name = str("dsin_" +LNames[etat]+ ".txt"), + dsres_path = os.path.join(simudir, str("dsres" +LNames[etat]+ ".mat")), + dict_inputs = dict_inputs, + logfile = True, + timeout = TimeoutModelExecution, + without_modelicares = True, + linux=Linux, + ) + auto_simul.single_simulation() + + if AdvancedDebugModel: + dslog_file = os.path.join(simudir, 'dslog.txt') + debug_model_file = os.path.join(ref_simudir,'log_debug_model.txt') + try: + shutil.copy(dslog_file,debug_model_file) + except: + pass + + if auto_simul.success_code == 2 : + raise ValueError("The simulation falis after initialization, please check the log file and your model in order to be sure that the whole simulation can be performed (specially for dynamic models)") + + if auto_simul.success_code == 3 : + raise ValueError("The simulation did not reach the final time, probably due to a lack of time, please increase the time for model execution by modifying the TimeoutModelExecution value in configuration.py") + + if auto_simul.success_code == 0: + pass #means that everything OK, we keep going + else : + raise ValueError("The NeedInitVariables iteration should be set to True, convergence issues found for %s" % str(LNames[etat])) #means this value is 1--> not correct initialization + + reader = Reader(os.path.join(simudir, str("dsres" +LNames[etat]+ ".mat")),'dymola') + + y_whole = [reader.values(y_name) for y_name in OutputVariables] + + for y_ind in range(len(OutputVariables)): + y_ind_whole_values = y_whole[y_ind][1] + y_ind_whole_time = y_whole[y_ind][0] + + if len(List_Multideltatime[etat])>1: + index_y_values = [find_nearest_index(y_ind_whole_time,time)[0] for time in List_Multideltatime[etat]] + else: + index_y_values = [-1] #we only take the last point if one measure point + + y_ind_values = [y_ind_whole_values[i] for i in index_y_values] + y_values = y_values + y_ind_values +# y_values_matrix = numpy.asarray(y_values) + y_values_matrix = y_values #pbs in results management + multi_y_values_matrix = multi_y_values_matrix + y_values_matrix + + if not KeepCalculationFolders: + shutil.rmtree(simudir, ignore_errors=True) + + y_values_matrix_def = numpy.ravel(numpy.array(multi_y_values_matrix)) + return y_values_matrix_def + +def TOP_LEVEL_exedymosimMultiobs( x_values_matrix , VariablesToCalibrate=None, OutputVariables=None, LNames = None, ModelFormat = None, KeepCalculationFolders = None, Verbose = None, dict_dsin_paths = None, Linux = None, List_Multideltatime = None, AdvancedDebugModel = None, TimeoutModelExecution = None): #2ème argument à ne pas modifier + "Appel du modèle en format DYMOSIM et restitution des résultats" + + if VariablesToCalibrate is None: + raise ValueError("VariablesToCalibrate") + + if OutputVariables is None: + raise ValueError("OutputVariables") + + if LNames is None: + raise ValueError("LNames") + + if ModelFormat is None: + raise ValueError("ModelFormat") + + if KeepCalculationFolders is None: + raise ValueError("KeepCalculationFolders") + + if Verbose is None: + raise ValueError("Verbose") + + if dict_dsin_paths is None: + raise ValueError("dict_dsin_paths") + + if Linux is None: + raise ValueError("Linux") + + if List_Multideltatime is None: + raise ValueError("Problem defining simulation output results") + + if AdvancedDebugModel is None: + raise ValueError("AdvancedDebugModel") + + if TimeoutModelExecution is None: + raise ValueError("TimeoutModelExecution") +# + # x_values = list(numpy.ravel(x_values_matrix) * Background) + x_values = list(numpy.ravel(x_values_matrix)) + x_inputs = dict(zip(VariablesToCalibrate,x_values)) + x_inputs_for_CWP = {} + # if Verbose: print(" Simulation for %s"%numpy.ravel(x_values)) + + multi_y_values_matrix = [] + + + + for etat in range(len(LNames)): + y_values =[] + + ref_simudir = Calibration._setModelTmpDir( None,dict_dsin_paths[LNames[etat]], ModelFormat, 'dsin.txt', os.path.join(os.path.pardir,os.path.pardir), LNames[etat]) + simudir = ref_simudir + dict_inputs={} + dict_inputs.update(x_inputs) + + + auto_simul = Automatic_Simulation( + simu_dir=simudir, + dymosim_path = os.path.join(simudir, os.pardir, os.pardir) , + dsres_path = os.path.join(simudir, str("dsres" +LNames[etat]+ ".mat")), + dict_inputs=dict_inputs, + logfile=True, + timeout=TimeoutModelExecution, + without_modelicares=True, + linux=Linux + ) + auto_simul.single_simulation() + + if AdvancedDebugModel: + dslog_file = os.path.join(simudir, 'dslog.txt') + debug_model_file = os.path.join(simudir, os.pardir, 'log_debug_model.txt') + try: + shutil.copy(dslog_file,debug_model_file) + except: + pass + + if auto_simul.success_code == 2 : + raise ValueError("The simulation falis after initialization, please check the log file and your model in order to be sure that the whole simulation can be performed (specially for dynamic models)") + + if auto_simul.success_code == 3 : + raise ValueError("The simulation did not reach the final time, probably due to a lack of time, please increase the time for model execution by modifying the TimeoutModelExecution value in configuration.py") + + + if auto_simul.success_code == 0: + reader = Reader(os.path.join(simudir, str("dsres" +LNames[etat]+ ".mat")),'dymola') + + else: + + path_around_simu = os.path.join(simudir,os.path.pardir,os.path.pardir) + around_simu = Around_Simulation(dymola_version='2012', + curdir=path_around_simu, + source_list_iter_var = 'from_dymtranslog', + copy_from_dym_trans_log= os.path.join(path_around_simu,'ini.txt'), + mat = os.path.join(os.path.dirname(dict_dsin_paths[LNames[etat]]), str("REF_for_CWP" + ".mat")), + iter_var_values_options={'source':'from_mat','moment':'initial'}, + verbose = False) + around_simu.set_list_iter_var() + around_simu.set_dict_iter_var() + + reader_CWP = Reader(os.path.join(os.path.dirname(dict_dsin_paths[LNames[etat]]), str("REF_for_CWP" + ".mat")),'dymola') + x_values_intemediary = [reader_CWP.values(x_name)[1][-1] for x_name in VariablesToCalibrate] + x_values_from_last_calculation = numpy.asarray(x_values_intemediary) + x_inputs_from_last_calculation = dict(zip(VariablesToCalibrate,x_values_from_last_calculation)) + + + for var_calib in x_inputs_from_last_calculation.keys(): + x_inputs_for_CWP[var_calib] = (x_inputs_from_last_calculation[var_calib], x_inputs[var_calib]) + + dict_var_to_fix2 = Dict_Var_To_Fix(option='automatic', + dict_auto_var_to_fix=x_inputs_for_CWP) + + dict_var_to_fix2.set_dict_var_to_fix() + + if Verbose: + LOG_FILENAME = os.path.join(simudir,'change_working_point.log') + for handler in logging.root.handlers[:]: + logging.root.removeHandler(handler) + logging.basicConfig(filename=LOG_FILENAME,level=logging.DEBUG,filemode='w') + + working_point_modif = Working_Point_Modification(main_dir = simudir, + simu_material_dir = simudir, + dymosim_path = os.path.join(simudir, os.pardir, os.pardir), + simu_dir = 'SIMUS', + store_res_dir = 'RES', + dict_var_to_fix = dict_var_to_fix2.dict_var_to_fix, + around_simulation0 = around_simu, + var_to_follow_path= os.path.join(simudir,'var_to_follow.csv'), + gen_scripts_ini= False, + nit_max= 1000000000000000, + min_step_val = 0.00000000000005, + timeout = TimeoutModelExecution, + linux=Linux) + + working_point_modif.create_working_directory() + working_point_modif.working_point_modification(skip_reference_simulation=True) + + if AdvancedDebugModel: + dslog_file = os.path.join(simudir,'SIMUS', 'dslog.txt') + debug_model_file = os.path.join(simudir, os.pardir, 'log_debug_model.txt') + try: + shutil.copy(dslog_file,debug_model_file) + except: + pass + try: + reader = Reader(os.path.join(simudir, 'RES','1.0.mat'),'dymola') + except: + raise ValueError("Simulation cannot be performed: reduce the number of parameters to calibrate and/or the range in which their optimal value should be found (or modify and simplify the model to make it easier to simulate)" ) + + + y_whole = [reader.values(y_name) for y_name in OutputVariables] + + for y_ind in range(len(OutputVariables)): + y_ind_whole_values = y_whole[y_ind][1] + y_ind_whole_time = y_whole[y_ind][0] + if len(List_Multideltatime[etat])>1: + index_y_values = [find_nearest_index(y_ind_whole_time,time)[0] for time in List_Multideltatime[etat]] + else: + index_y_values = [-1] #we only take the last point if one measure point + + y_ind_values = [y_ind_whole_values[i] for i in index_y_values] + y_values = y_values + y_ind_values +# y_values_matrix = numpy.asarray(y_values) + y_values_matrix = y_values #pbs in results management + multi_y_values_matrix = multi_y_values_matrix + y_values_matrix + + if not KeepCalculationFolders: + shutil.rmtree(simudir, ignore_errors=True) + y_values_matrix_def = numpy.ravel(numpy.array(multi_y_values_matrix)) + return y_values_matrix_def + +def run_OM_model(xml_file, #req arg, file path to _init.xml file + result_file_name = None, #file path for storing results + dict_inputs = None, #optionnal argument : for overriding paramters + Linux = False, + AdvancedDebugModel = None, + TimeoutModelExecution = None): + + Command=[] #initialize command with list of argument + + xml_file = os.path.abspath(xml_file) + + #set main command to be runned + if Linux: + OM_binary = xml_file.replace('_init.xml','') # not tested + else: #WIndows + OM_binary = xml_file.replace('_init.xml','.exe') + Command.append(OM_binary) + + #Get base path for binaries and input + + + inputPath='-inputPath='+os.path.dirname(xml_file) + Command.append(inputPath) + + #Generate override command + if dict_inputs !=None: + Override='-override=' + for elt in dict_inputs.items(): + Override=Override+elt[0]+'='+str(elt[1])+',' + Override=Override.rstrip(',') #remove last ',' if any + Command.append(Override) + + #Generate name of result file + if result_file_name != None: + result='-r='+result_file_name + Command.append(result) + else: + result='-r='+OM_binary+'.mat' #by default result is stored next to _init and bin file + Command.append(result) + + result='-w' #by default result is stored next to _init and bin file + Command.append(result) + + result='-lv='+'LOG_STATS,LOG_NLS_V' #by default result is stored next to _init and bin file + Command.append(result) + + inputPath='-outputPath='+os.path.dirname(xml_file) + Command.append(inputPath) + + #launch calculation + proc = subprocess.Popen(Command, #Command in the form of a text list + stderr=subprocess.STDOUT, + stdout=subprocess.PIPE, + universal_newlines=True) + # for line in proc.stdout.readlines(): + # print(line) + + if AdvancedDebugModel == True: + dir_run=os.path.join(os.path.dirname(result_file_name),os.pardir) + log_file=os.path.join(dir_run, + 'log_debug_model.txt') + + try: + os.remove(log_file) + except: + pass + try: + f=open(log_file,'a') + for line in proc.stdout.readlines(): + f.write(line) + f.close() + proc.communicate(timeout = TimeoutModelExecution) + except subprocess.TimeoutExpired: + raise ValueError("Timeout for simulation reached, please increase it in order to be able to simulate your model and/or check if your model is correct (use TimeoutModelExecution option in configuration.py file)") + + else: + try: + proc.communicate(timeout = TimeoutModelExecution) + except: + raise ValueError("Timeout for simulation reached, please increase it in order to be able to simulate your model and/or check if your model is correct (use TimeoutModelExecution option in configuration.py file)") + + # proc.communicate() + + # r = proc.poll() + # while r !=0: + # r = proc.poll() + + # if proc.returncode !=0: + # raise ValueError("Simulation not ended") + + + # sys.stdout.flush() + # sys.stderr.flush() + + # dir_run=os.path.dirname(result_file_name) + # log_file=os.path.join(dir_run, + # 'log.txt') + # try: + # proc.wait(timeout=50) + + # f=open(log_file,'a') + # for line in proc.stdout.readlines(): + # f.write(line) + + # f.close() + # return_code=proc.returncode + + # except subprocess.TimeoutExpired: + + # f=open(log_file,'a') + # for line in proc.stdout.readlines(): + # f.write(line) + + # f.write('Simulation stoppe because of TimeOut') + # f.close() + + # return_code=2 + + return #return_code + +def readObsnamesfile(__filenames=None): + "Provisionnel pour lire les noms des observations d'un fichier" + for __filename in __filenames: + try: + df = pandas.read_csv(__filename, sep = ";", header =0) #so that repeated names are not modified + obsnames_infile = df.loc[0, :].values.tolist()[2:] + except: + df_tmp = pandas.read_csv(__filename, sep = ";", header =1) + with open(__filename, 'r') as infile: + readie=csv.reader(infile, delimiter=';') + with open('tmp_obs_file.csv', 'wt', newline='') as output: + outwriter=csv.writer(output, delimiter=';') + list_row = ['#'] + df_tmp.columns + outwriter.writerow(list_row) + for row in readie: + outwriter.writerow(row) + + df = pandas.read_csv('tmp_obs_file.csv', sep = ";", header =0) #so that repeated names are not modified + df = df.iloc[1: , :] + obsnames_infile = df.iloc[0, :].values.tolist()[2:] + + try: + os.remove('tmp_obs_file.csv') + except: + pass + return obsnames_infile + +def readDatesHours(__filename=None): + "Provisionnel pour lire les heures et les dates d'une simulation dynamique" + df = pandas.read_csv(__filename, sep = ";", header =1) + dates = numpy.array(df['Date']) + hours = numpy.array(df['Hour']) + return (dates,hours) + +def readMultiDatesHours(__filenames=None): + "Provisionnel pour lire les heures et les dates d'une simulation dynamique dans le cas multi-observations" + Multidates =[] + Multihours = [] + for __filename in __filenames: + df = pandas.read_csv(__filename, sep = ";", header =1) + dates = numpy.array(df[df.columns[0]]) + hours = numpy.array(df[df.columns[1]]) + Multidates.append( dates ) + Multihours.append( hours ) + return (Multidates,Multihours) + +def find_nearest_index(array, value): + "Trouver le point de la simulation qui se rapproche le plus aux observations" + array = numpy.asarray(array) + idx = (numpy.abs(array - value)).argmin() + return [idx, array[idx]] +# ============================================================================== +if __name__ == "__main__": + print('\n AUTODIAGNOSTIC\n ==============\n') + print(" Module name..............: %s"%name) + print(" Module version...........: %s"%version) + print(" Module year..............: %s"%year) + print("") + print(" Configuration attributes.:") + for __k, __v in _configuration.items(): + print("%26s : %s"%(__k,__v)) + print("") diff --git a/src/daComposant/daNumerics/pst4mod/modelica_libraries/__init__.py b/src/daComposant/daNumerics/pst4mod/modelica_libraries/__init__.py new file mode 100644 index 0000000..906b510 --- /dev/null +++ b/src/daComposant/daNumerics/pst4mod/modelica_libraries/__init__.py @@ -0,0 +1,21 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2016-2024 EDF R&D +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# Author: Jean-Philippe Argaud, jean-philippe.argaud@edf.fr, EDF R&D +# Author: Luis Corona Mesa-Molles, luis.corona-mesa-molles@edf.fr, EDF R&D +# Author: Julien Lagarde, julien.lagarde@edf.fr, EDF R&D diff --git a/src/daComposant/daNumerics/pst4mod/modelica_libraries/around_simulation.py b/src/daComposant/daNumerics/pst4mod/modelica_libraries/around_simulation.py new file mode 100644 index 0000000..3785b4d --- /dev/null +++ b/src/daComposant/daNumerics/pst4mod/modelica_libraries/around_simulation.py @@ -0,0 +1,865 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2016-2024 EDF R&D +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +############################################################################### +# LIBRARIES IMPORT # +############################################################################### +import re +from buildingspy.io.outputfile import Reader # To read dsres.mat +from os import path,getcwd,pardir,mkdir +from numpy import mean +from sys import exit +import pandas as pd +import numpy as np +import os + +from .functions import get_cur_time, Edit_File + +class NotImplementedError(Exception) : + """ + This exception is raised when there is only one alive (not enough to continue) + """ + pass + + +############################################################################### +# CLASS Simulation_Files # +############################################################################### + +class Around_Simulation(object): + """ + Aim: creating an efficient and user-friendly software dedicated to creating + initialization scripts for Modelica based simulation tools. + + Notes: + - Commentaries are written directly in the code + - Examples are provided with the code to make it easier to use + - It is recommended to create the build the list of the iteration variables from + the information that is displayed in the log printed by Dymola during the + translation (ie: choose source_list_iter_var = 'from_dymtranslog' when it is possible) + - The derivative of the variables are never considered as iteration variables + - It should be possible to create the list of the iteration variables from file + dslog.txt and this script gives an option to do it, but this option has never been + tested and may also require some modifications. + + Required libraries that are not in the standard distribution: + - buildingspy + """ + + def __init__(self,dymola_version='Unknown',curdir=getcwd(),source_list_iter_var=None,\ + mo=None,mos=None,copy_from_dym_trans_log=None,dslog_txt=None,mat=None,\ + iter_var_values_options=None,generated_script_options=None,alphabetical_order=True,verbose=True): + + self.verbose = verbose #Boolean, if set to True, some informations are printed to the stdout. + + if iter_var_values_options == None: + iter_var_values_options={'source':'from_mat','moment':'initial','display':False,'updated_vars':False,'GUI':'Dymola'} + + if generated_script_options == None: + generated_script_options={'file_name':'Script_ini.mos','dest_dir':None,'pref':'','filters':{},'renamed':None,'compute':{}} + + #Attributes whose values are setted without specific setters + self.__vdymola = dymola_version + self.__curdir = path.abspath(curdir) + + #Call setters + self.source_list_iter_var = source_list_iter_var + self.mo = mo #The mo file could be used to check that no fixed value has been added to the list of the iteration variables + self.mos = mos + self.copy_from_dym_trans_log = copy_from_dym_trans_log + self.dslog_txt = dslog_txt + self.mat = mat + self.iter_var_values_options = iter_var_values_options + self.generated_script_options = generated_script_options + self.__alphabetical_order = alphabetical_order + #Attributes that cannot be given a value at the creation of the object + self.__not_found_iter_var = [] + self.__not_found_complex_iter_var = [] + self.__list_iter_var = [] + self.__dict_iter_var = {} + self.__list_complex_iter_var = [] + self.__dict_complex_iter_var = {} + self.__dict_iter_var_from_ini = {} # pour récupérer la valeur du fichier ini.txt + self.computed_variables = [] #List to store nale of variable computed by user defined function (if any) + + +############################################################################### +# GETTERS # +############################################################################### + + @property + def vdymola(self): + """ + Dymola version + """ + return self.__vdymola + + @property + def curdir(self): + """ + Directory in which the files are contained + """ + return self.__curdir + + @property + def source_list_iter_var(self): + """ + Type of the file in which the iteration variables will be extracted. + Its values is setted by set_source_list_iter_var + """ + return self.__source_list_iter_var + + @property + def mo(self): + """ + Path of the mo file + """ + return self.__mo + + @property + def mos(self): + """ + Path of the mos file + """ + return self.__mos + + @property + def copy_from_dym_trans_log(self): + """ + Path of the file which contains a copy of the text automatically + written by Dymola in the translation part of the log window when + Advanced.LogStartValuesForIteration = true. + """ + return self.__copy_from_dym_trans_log + + @property + def dslog_txt(self): + """ + Path of dslog.txt + """ + return self.__dslog_txt + + @property + def mat(self): + """ + Path of the mat file or list of paths of mat file + """ + return self.__mat + + @property + def not_found_iter_var(self): + """ + List of the iteration variables whose values have been searched and not + found + """ + return self.__not_found_iter_var + + @property + def not_found_complex_iter_var(self): + """ + List of the images of complex iteration variables whose values has not + been found + """ + return self.__not_found_complex_iter_var + + @property + def list_iter_var(self): + """ + List of the iteration variables + """ + return self.__list_iter_var + + @property + def list_complex_iter_var(self): + """ + List of iteration variables which are given the value of another variable. + Classic example: Tp is given the value of T0. + These values must be identified because the variable that is to be written + in the initialization script in the case of the example would be T0 and not Tp. + This list is included in list_iter_var + """ + return self.__list_complex_iter_var + + @property + def dict_iter_var(self): + """ + Dictionary which associates the name of an iteration variable to its value + """ + return self.__dict_iter_var + + @property + def dict_iter_var_from_ini(self): # pour récupérer la valeur du fichier ini.txt + """ + Dictionary which associates the name of an iteration variable to its value in ini.txt file + """ + return self.__dict_iter_var_from_ini + + @property + def dict_complex_iter_var(self): + """ + Dictionary that is necessary to initialize the values of the iteration + variables that are in list_complex_iter_var + Dictionary that associates the value + key Variable which should be given a value so that the associated + iteration variables are initialized + value Dictionary which associates + key Associated iteration variable + value Value of the associated iteration variables + Example: + 'Tp1' is initialized with the value 'T0' + 'Tp2' is initialized with the value 'T0' + dict_complex_iter_var = {'T0':{'Tp1':Tp1_value,'Tp2':Tp2_value}} + In the initialization script, the mean of Tp1_value and Tp2_value will be written + + This method can only work when the list of the iteration variables has been + built with the text written in the log during translation (source_list_iter_var = 'from_dymtranslog') + """ + return self.__dict_complex_iter_var + + @property + def iter_var_values_options(self): + """ + Dictionary containing the options that will be used to create the dictionary + containing the values of the iteration variables + """ + return self.__iter_var_values_options + + @property + def generated_script_options(self): + """ + Dictionary continaing the options that will be used to generate the + .mos script file + """ + return self.__generated_script_options + + @property + def alphabetical_order(self): + """ + If True, the list of the iteration variables is sorted according to the + alphabetical order. If False, it is sorted as in the source which is used + to create the list of the iteration variables. + """ + return self.__alphabetical_order + +############################################################################### +# SETTERS # +############################################################################### + @source_list_iter_var.setter + def source_list_iter_var(self,source_list_iter_var): + """ + Set the value of attribute source_list_iter_var. + + Available values are: + 'from_mos' The list is created from a .mos file + The mos file is supposed to be written as follows + nom_var1 = ...; + nom_var2 = ...; + 'from_dymtranslog" The list is created from a .txt file which contains + the text automatically written by Dymola in the translation + part of the log window when + Advanced.LogStartValuesForIteration = true + 'from_dslog' The list is created from a dslog.txt file + """ + available_sources = ['from_mos','from_dymtranslog','from_dslog'] + if source_list_iter_var == None: + print('No value given to self.__source_list_iter_var') + self.__source_list_iter_var = None + elif source_list_iter_var in available_sources: + self.__source_list_iter_var = source_list_iter_var + else: + print('The chosen value for source_list_iter_var is not valid. The only valid values are\ + from_mos, from_dymtranslog, from_dslog') + + @mo.setter + def mo(self,mo_file_name): + """ + Set the path of the mo file + """ + if mo_file_name == None: + self.__mo = None + else: + self.__mo = path.join(self.curdir,mo_file_name) + + @mos.setter + def mos(self,mos_file_name): + """ + Set the path of the mos file + """ + if mos_file_name == None: + self.__mos = None + else: + self.__mos = path.join(self.curdir,mos_file_name) + + @copy_from_dym_trans_log.setter + def copy_from_dym_trans_log(self,copy_from_dym_trans_log): + """ + Set the path of the file which contains a copy of the text automatically + written by Dymola in the translation part of the log window when + Advanced.LogStartValuesForIteration = true. + """ + if copy_from_dym_trans_log == None: + self.__copy_from_dym_trans_log = None + else: + self.__copy_from_dym_trans_log = path.join(self.curdir,copy_from_dym_trans_log) + + @dslog_txt.setter + def dslog_txt(self,dslog_txt): + """ + Set the path of dslog.txt + """ + if dslog_txt == None: + self.__dslog_txt = None + else: + self.__dslog_txt = path.join(self.curdir,dslog_txt) + + @mat.setter + def mat(self,mat_file_name): + """ + Set the path of the mat file + """ + if not(isinstance(mat_file_name,list)): + if mat_file_name == None: + self.__mat = None + else: + self.__mat = [path.join(self.curdir,mat_file_name)] + else: + self.__mat = [] + for mat_file_name_elem in mat_file_name: + self.__mat.append(path.join(self.curdir,mat_file_name_elem)) + + + @iter_var_values_options.setter + def iter_var_values_options(self,iter_var_values_options): + """ + Set the value of attribute iter_var_values_options. + This attribute has to be a dictionary conataining the following keys: + source (compulsory) + Available values for source are "from_mat" and "from_mos" + moment (optional) + Available values for moment are "initial" and "final" + display (optional) + Available values for display are False and True + updated_vars (optional) + Available values for updated_vars are False and True + """ + iter_var_values_options_base = {'source':'from_mat','moment':'initial','display':False,'updated_vars':False,'GUI':'Dymola'} + + for key in list(iter_var_values_options_base.keys()): + try: + iter_var_values_options_base[key] = iter_var_values_options[key] + except: + if self.verbose : + print('Default value used for generated_script_options[\''+str(key)+'\']') + + iter_var_values_options_base['updated_vars'] = iter_var_values_options_base['updated_vars'] and iter_var_values_options_base['display'] #Comparison is only possible if the file is changed on the fly + + try: + for key in list(iter_var_values_options.keys()): + if not key in list(iter_var_values_options_base.keys()): + print('iter_var_values_options[\''+str(key)+'\'] will not be used: only the following keys are allowed:') + for ki in iter_var_values_options_base.keys() : + print(ki) + except: + print('iter_var_values_options should be a dictionary') + + self.__iter_var_values_options = iter_var_values_options_base + + @generated_script_options.setter + def generated_script_options(self,generated_script_options): + #Default value. It garantees that the structure will always be the same + generated_script_options_base={'file_name':'Script_ini.mos','dest_dir':None,'pref':'','filters':{},'renamed':None,'compute':{}} + + for key in list(generated_script_options_base.keys()): + try: + generated_script_options_base[key] = generated_script_options[key] + except: + if self.verbose : + print('Default value used for generated_script_options[\''+str(key)+'\']') + + try: + for key in list(generated_script_options.keys()): + if not key in list(generated_script_options_base.keys()): + print('generated_script_options[\''+str(key)+'\'] will not be used:only values associated to "file_name", "dest_dir", "pref", "filters", "compute" and "renamed" can be taken into account') + + except: + print('generated_script_options should be a dictionary') + + self.__generated_script_options = generated_script_options_base +############################################################################### +# RESETTERS # +############################################################################### + + def reset_list_iter_var(self): + """ + Reset the value of attributes list_iter_var and list_complex_iter_var + """ + self.__list_iter_var = [] + self.__list_complex_iter_var = [] + + def reset_not_found_iter_var(self): + """ + Reset the value of attributes not_found_iter_var and not_found_images_of_complex_iter_var + """ + self.__not_found_iter_var = [] + self.__not_found_complex_iter_var = [] + +############################################################################### +# MAIN METHODS # +############################################################################### + + def set_list_iter_var(self): + """ + Set the value of attribute list_iter_var, which contains the list of the iteration variables. + This list can be created from + - file dslog.txt (created in case of a translation error) - to be used carefully + - a .mos file that contains the iteration variables + - a .txt file that contains the text written by Dymola if option + Advanced.LogStartValuesForIteration is activated (text written in + the translation part of the log window) + + This method will look for the iteration variables in a source defined by + self.source_list_iter_var. + """ + self.reset_list_iter_var() + + if self.source_list_iter_var == 'from_mos': + + #Regular expression to find the interesting lines - be careful: it does not return the whole line + input_line_regex = re.compile(r'^[^\\][A-Za-z0-9_.[\]\s]+=\s*-?[0-9]+') + + with open(self.mos,"r") as lines: + for line in lines: + if input_line_regex.search(line): + words = [word.strip() for word in line.split('=')] + #Add the name of the current iteration variable to the list of iteration variables + self.list_iter_var.append(words[0]) + self.__dict_iter_var_from_ini[words[0]] = words[1] # valeur du .mos brute_init + + #Only option that can handle the iteration variables that are given the value of other variables + elif self.source_list_iter_var == 'from_dymtranslog': + if self.iter_var_values_options['updated_vars'] : + old_list = self.dymtranslog_read(oldlist=True) + if self.iter_var_values_options['display'] : #If required, open the 'ini.txt' file to allow modifications (copy) + #file_display = Popen("Notepad " + self.copy_from_dym_trans_log) + #file_display.wait() + Edit_File(self.copy_from_dym_trans_log) + self.dymtranslog_read() + if self.iter_var_values_options['updated_vars'] : + print('Exiting variables: %s' % ' ,'.join(set(old_list).difference(self.list_iter_var))) + print('Entering variables: %s' % ' ,'.join(set(self.list_iter_var).difference(old_list))) + + elif self.source_list_iter_var == 'from_dslog': + if self.iter_var_values_options['GUI'] == 'OM' : + raise NotImplementedError("Automatic reading of the log is still not implemented for OpenModelica") from None + input_line_regex = re.compile(r'^[^\\][A-Za-z0-9_.[\]\s]+=\s*-?[0-9]+') + read = False + with open(self.dslog_txt,'r') as lines: + for line in lines: + if ('Last value of the solution:' in line) or ('Last values of solution vector' in line): + read = True + elif ('Last value of the solution:' in line) or ('Last values of solution vector' in line): + read = False + if read and input_line_regex.search(line): + words = [word.strip() for word in line.split('=')] + #Add the name of the current iteration variable to the list of iteration variables + self.list_iter_var.append(words[0]) + self.__dict_iter_var_from_ini[words[0]] = words[1] # valeur du dslog.txt brute_init + else: + exit('Option given to method set_list_iter_var is not defined. \n\ + Available options are "from_mos", "from_dymtranslog" and "from_dslog".') + + #Alphabetical order + if self.alphabetical_order: + self.list_iter_var.sort() + + def dymtranslog_read(self,oldlist=False): + """ + Method dedicated to the dymtranslog reading + """ + if oldlist: + out = [] + else: + out = self.list_iter_var + #Regular expression to find the interesting lines + var_and_start_regex = re.compile(r'[A-Za-z]+[A-Za-z0-9_.[,\s\]]*') + with open(self.copy_from_dym_trans_log,'r') as lines: + for line in lines: + if ('=' in line) and not ('der(' == line[0:4] or 'der (' == line[0:5]) : #Should be a mathematical expression; then, no derivative. + unknown_length=False + tmp_words = var_and_start_regex.findall(line) + words = [word.strip() for word in tmp_words] + + #Elimination of start + try: + del words[words.index('start')] + except ValueError : #The line does not contain "start" word : this line does not talk about initialization variable -> skip + try : + del words[words.index('each start')] + except ValueError : + continue + + if self.iter_var_values_options['GUI'] == 'OM' : + del words[words.index('nominal')] #Added for OpenModelica + + #If 'E' appears in the right part of a line, it should not be considered as a variable name + #because it is almost certainly the 'E' of the scientific writing + #Do not call 'E' any of your variable!! + if any(s in words[1:] for s in ['E','e']) : + words.reverse() + try: + words.pop(words.index('E')) + except ValueError : + words.pop(words.index('e')) #Added for OpenModelica + words.reverse() + + #Add the name of the current iteration variable to the list of iteration variables + if self.iter_var_values_options['GUI'] == 'OM' : + out.append(words[0].split(' ',1)[1]) #OpenModelica : Errors lines copied from the compilation log contains the 'unit' as first word (Integer, Real...) + else: #Dymola, default case + out.append(words[0]) + self.__dict_iter_var_from_ini[words[0]] =[word.strip() for word in line.split('=')][1][:-1] # valeur du ini.txt brute_init + + + #Detection of the iteration variables that are given the values of other variables + if not(oldlist) and len(words) >1: + self.list_complex_iter_var.append(words[0]) + self.dict_complex_iter_var[words[1]] = {words[0]:None} + if oldlist: + return out + + def set_dict_iter_var(self): + """ + Set the value of attribute dict_iter_var (and not_found_iter_var). + dict_iter_var contains the values of the iteration variables that should + be given to Dymola to help the simulation to initialize. + These values can be found in a .mat file (containing the result of a simulation) + or in a .mos file (which has been constructed either manually or automatically). + + This method will use the source specified in self.iter_var_values_options['source'] + to find the values of the iteration variables. It will use the final value of each + variable if self.iter_var_values_options['source']== 'final' and the initial + value of each variable if self.iter_var_values_options['source']== 'initial' + + self.iter_var_values_options is a dictionary whose entries must be the following ones: + 'source' (compulsory) + Possible values: + - 'from_mat' + - 'from_mos' + + 'moment' (compulsory if the value associated to key 'source' is 'from_mat') + Possible values: + - 'initial' + - 'final' + + 'display' (optional) + Possible values: + - False + - True + """ + #Get special_options + pref = self.generated_script_options['pref'] + + #Reset the list of the iteration variables that have not been found + self.reset_not_found_iter_var() + + #Create a dictionary to store the intermediate variable values to handle + #the iteration variables that are given values of other variables + dict_intermediate_values = {} + + #Get value of filters + filters = self.generated_script_options['filters'] + + if self.iter_var_values_options['source'] == 'from_mat':#It is recommended to use this option + + #Start or final value to be taken from the mat file + if self.iter_var_values_options['moment'] == 'initial': + mom = 0 + elif self.iter_var_values_options['moment'] == 'final': + mom = -1 + else : + exit("'moment' option can be set only to 'initial' or 'final'") + + # Read file .mat which contains simulation results + var_to_drop = [] #Because of array expansion + for mat_file in self.mat: + structData = Reader(mat_file,'dymola') + + for name_var in self.list_iter_var: + varTmp = None #Initialisation + res_name_var = name_var + + if pref != '': + if name_var[:len(pref)] == pref: + #+1 because of the '.' that is next to pref + res_name_var = name_var[len(pref)+1:] + + # Only if pref is not used (conservative, even if not necessary...) + elif not(self.generated_script_options['renamed'] is None): + for newname in self.generated_script_options['renamed'].keys() : #For each of renamed component + if res_name_var.startswith(newname) : #Check whether the new name in the dictionary matches the current variable + res_name_var = self.generated_script_options['renamed'][newname] + res_name_var[len(newname):] #Substitute the new name with the old one (who has to be looked for in the .mat file) + + for cur_filter in filters: + #If the end of the variable name is in list(filters.keys()) -> ex: h_vol, + #the corresponding part of the variable is replaced by filers[key] -> ex:h. + #The value which will be looked for will therefore be module1.h if the iteration + #variable is module1.h_vol. In the script, the name of the iteration variable + #(module1.h_vol) will also be given the value calculated for module1.h + if cur_filter == name_var[-len(cur_filter):]: + res_name_var = res_name_var[:-len(cur_filter)]+filters[cur_filter] + + try: + varTmp = structData.values(res_name_var)[1][mom] + except KeyError: #The variable is not found + + try: + node_pos = re.search("\[\d+\]",res_name_var).span() #Position of the node number if any (in case the error is due to a remesh of the model) + except AttributeError: #No node number found + + try: + node_pos = re.search("\[\]",res_name_var).span() #Position of empty brakets (all array values are due) + except AttributeError : + pass + else: + if res_name_var != name_var : + print('Filters and prefixes still have to be implemented for arrays of unknown size') + else : + print(f'{name_var}: unknown size -> Taking all values in .mat (if any)') + tmp = f'{res_name_var[:node_pos[0]]}\[\d\{res_name_var[node_pos[0]+1:]}' + expanded_vars = structData.varNames(tmp) + for tmp_var in expanded_vars : + self.__dict_iter_var[tmp_var] = structData.values(tmp_var)[1][mom] + if expanded_vars : + var_to_drop.append(name_var) + continue + + else: #If a node number is found + name_start = res_name_var[:node_pos[0]+1] + name_end = res_name_var[node_pos[1]-1:] + node = int(res_name_var[node_pos[0]+1:node_pos[1]-1]) #Requested node + for ii in range(node): + try: + varTmp = structData.values(name_start + str(node-ii-1) + name_end)[1][mom] #Look for an upstream node + break + except KeyError: + pass + + if varTmp is None : #If varTmp not defined (name_var not found, not due to remesh) + #Try to compute it + for var,f in self.generated_script_options['compute'].items() : + if res_name_var.endswith(var) : + #Collect function inputs + root = res_name_var[:-len(var)] #Root path of the variable + f_inputs = {} + for k,v in f['f_inputs'].items() : + f_inputs[k] = structData.values(root+v)[1][mom] + varTmp = f['user_f'](f_inputs) + self.computed_variables.append(name_var) + + if varTmp is None : #If varTmp not defined (name_var not found, not due to remesh, not computed) + self.not_found_iter_var.append(name_var) + else : + + #Retrieve the initialization value from the .mat + if name_var in self.list_complex_iter_var: + dict_intermediate_values[name_var] = varTmp + else: + self.__dict_iter_var[name_var] = varTmp + + for image_of_complex_var in list(self.dict_complex_iter_var.keys()): + for complex_iter_var in list(self.dict_complex_iter_var[image_of_complex_var].keys()): + try: + self.dict_complex_iter_var[image_of_complex_var][complex_iter_var] = dict_intermediate_values[complex_iter_var] + except:#In case the value of variable complex_iter_var has not been found in the mat file + if complex_iter_var not in self.not_found_complex_iter_var: + self.not_found_complex_iter_var.append(complex_iter_var) + else: + pass + + for i in var_to_drop : + self.list_iter_var.remove(i) + + elif self.iter_var_values_options['source'] == 'from_mos': + #Regular expression to find the interesting lines - be careful: it does not return the whole line + input_line_regex = re.compile(r'^[^\\][A-Za-z0-9_.[\]\s]+=\s*-?[0-9]+') + value_regex = re.compile(r'[0-9e+-.]*') + + with open(self.mos,'r') as lines: + for line in lines: + if input_line_regex.search(line): + words = [word.strip() for word in line.split("=")] + name_var = words[0] + if pref+name_var in self.list_iter_var: + #The following lines eliminates spaces, ;, and other non-numerical elements + value = float(value_regex.findall(words[1])[0]) + #Add the name of the current iteration variable to the list of iteration variables + self.__dict_iter_var[name_var] = value + + #All the iteration variables in self.list_iter_var are added to self.not_found_iter_var if their value has not been found + for name_var in self.list_iter_var: + if not name_var in self.dict_iter_var: + self.not_found_iter_var.append(name_var) + + else: + print('Option given to method set_list_iter_var is not defined. \n\ + Available options are "from_mat" and "from_mos".') + + #Sort the not found iteration variables lists + #Warning: these sorting method puts all the variables whose first letter + #is uppercase before the variables whose first letter is lowercase. + self.not_found_iter_var.sort() + self.not_found_complex_iter_var.sort() + + + def create_script_ini(self): + """ + Create a .mos initialization script. + The list of the iteration variables must have been created before using this method + with set_list_iter_var. The values of the iteration variables must have been found + before using this method with set_dict_iter_var. + This whole process can be done in one single step with method create_quickly_script_ini. + + self.generated_script_options is a dictionary whose entries must be the following ones: + file_name String + Name of the .mos file that will be generated + dest_dir String + path of the directory in which the file should be saved + pref String + pref + '.' will be added at the beginning of all variables. + It is useful if a model has been inserted in a module of a new model + filters Dictionary + Modifications that have to be applied to the names of the iteration variables + Example: {'h_vol':'h'} -> the variable whose name finishes by h_vol will be given the value of the variable whose name finishes by h. Be careful with the use of this filter + """ + #Put the values of self.generated_script_options in short varaibles to make it easier to write the following function + file_name,dest_dir = self.generated_script_options['file_name'],self.generated_script_options['dest_dir'] + if dest_dir == None: + dest_dir=self.curdir + + dest_file_path = path.join(dest_dir,file_name) + + with open(dest_file_path,'w') as output: + output.write('///////////////////////////////////////////////////////////////////\n') + output.write('// Script automatically generated by Python ') + + #Date of the creation of the file + output.write('on '+get_cur_time()+'\n') + + #Dymola version + output.write('// Dymola version: '+self.vdymola+'\n') + + #Source of the list of iteration variables + output.write('// List of the iteration variables from file ') + if self.source_list_iter_var == 'from_mos': + output.write(self.mos+'\n') + elif self.source_list_iter_var == 'from_dymtranslog': + output.write(self.copy_from_dym_trans_log+'\n') + elif self.source_list_iter_var == 'from_dslog': + output.write(self.dslog_txt+'\n') + + #Source of the values of the iteration variables + output.write('// Values of the iteration variables from file ') + if self.iter_var_values_options['source'] == 'from_mos': + output.write(self.mos+'\n') + elif self.iter_var_values_options['source'] == 'from_mat': + output.write(self.mat[0]+'\n') #The name of the first mat file is kept if several are given + output.write('///////////////////////////////////////////////////////////////////\n\n\n') + + #Iteration variables whose values have not been found + if len(self.not_found_iter_var) > 0: + output.write('///////////////////////////////////////////////////////////////////\n') + output.write('// Iteration variables whose values have not been found:\n') + for var_name in self.not_found_iter_var: + if var_name not in self.not_found_complex_iter_var: + output.write('// '+var_name+'\n') + output.write('///////////////////////////////////////////////////////////////////\n\n') + + #Iteration variables whose values have not been found + if len(self.not_found_complex_iter_var) > 0: + output.write('///////////////////////////////////////////////////////////////////\n') + output.write('// Complex iteration variables whose values have not been found:\n') + for var_name in self.not_found_complex_iter_var: + output.write('// '+var_name+'\n') + + #Explain the user which values he should give to which variables + output.write('//It would have no effect to give these complex variables a value in an initialization script.\n') + output.write('//The initialization should be done by giving the following values to the following variables:\n') + for complex_iter_var in self.not_found_complex_iter_var: + for image_of_complex_iter_var in list(self.dict_complex_iter_var.keys()): + if complex_iter_var in list(self.dict_complex_iter_var[image_of_complex_iter_var].keys()): + current_iter_vars = list(self.dict_complex_iter_var[image_of_complex_iter_var].keys()) + output.write('//'+image_of_complex_iter_var+' should be given the mean of the following variables:\n') + for cur_var in current_iter_vars: + output.write('//- '+cur_var+'\n') + output.write('///////////////////////////////////////////////////////////////////\n\n') + + #Values of the iteration variables + output.write('// Iteration variables values:\n') + #for var_name in self.__list_iter_var: + #Appending the additional elements to the end of the list + tmp_not_originally_present = set(self.__list_iter_var) ^ set(self.__dict_iter_var.keys()) + print(f'Variables from expansion: {tmp_not_originally_present}') + tmp_list = self.__list_iter_var + list(tmp_not_originally_present) + for var_name in tmp_list : + if not var_name in self.not_found_iter_var: + if not var_name in self.list_complex_iter_var: + cmp_str = '' if var_name not in self.computed_variables else '\t//Computed by user defined function' + var_value = self.__dict_iter_var[var_name] + output.write(var_name.ljust(40)+'\t\t=\t\t'+str(var_value)+';'+cmp_str+'\n') + + #Variables that are given the value of other variables + if len(list(self.dict_complex_iter_var.keys())) > 0: + if self.verbose : + print("WARNING: Some of the initialising values depends on model's parameters values : automatic changes are unsafe!") + print("WARNING: However, some proposals based on previous results are wrote to the script, but commented.") + print("WARNING: If required to successfully initialize, the user is invited to check the proposed values and, potentially, uncomment them.") + output.write('\n// Complex iteration variables values:\n') + output.write('// WARNING: Some of the following variables may be key parameters of your model.\n') + output.write('// WARNING: To change their values automatically may be dangerous since it may change the model itself.\n') + output.write('// WARNING: That is why the following lines are disabled (commented) by default.\n') + output.write('// WARNING: However, some of them may be initialization parameters (Tstart, ...) and may be safely set.\n') + output.write('// WARNING: In this case, if you still have initialisation issues, you can safely change their values (uncomment the lines).\n\n') + for var_name in list(self.dict_complex_iter_var.keys()): + #Get the list of values associated to var_name + intermediate_list = [self.dict_complex_iter_var[var_name][key] for key in list(self.dict_complex_iter_var[var_name].keys())] + #Eliminate values None + intermediate_list2 = [elem for elem in intermediate_list if elem != None] + #If the list contains at least one value + if intermediate_list2 != []: + #The mean of the values of the list is calculated + var_value = mean([elem for elem in intermediate_list if elem != None]) + #The mean value is written in the initialization script + #Check if any of the values is computed by user function + computed = [i for i in intermediate_list if i in self.computed_variables] + cmp_str = '' if not computed else '\t//Computed by user defined function' + output.write('//'+var_name.ljust(40)+'\t\t=\t\t'+str(var_value)+';'+cmp_str+'\n') + + def create_quickly_script_ini(self): + """ + Method that: + - looks for the list of iteration variables with set_list_iter_var; + - looks for the values of the iteration variables with set_dict_iter_var; + - crearte a initialization script with create_script_ini. + + """ + self.set_list_iter_var() + self.set_dict_iter_var() + self.create_script_ini() + +if __name__ == '__main__': + print('\n AUTODIAGNOSTIC\n ==============\n') diff --git a/src/daComposant/daNumerics/pst4mod/modelica_libraries/automatic_simulation.py b/src/daComposant/daNumerics/pst4mod/modelica_libraries/automatic_simulation.py new file mode 100644 index 0000000..88b5b55 --- /dev/null +++ b/src/daComposant/daNumerics/pst4mod/modelica_libraries/automatic_simulation.py @@ -0,0 +1,447 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2016-2024 EDF R&D +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +############################################################################### +# LIBRARIES IMPORT # +############################################################################### +import sys +import logging +from os import path,remove,getcwd,chdir,pardir,mkdir +from shutil import copyfile,move +from buildingspy.io.outputfile import Reader + +if sys.version_info[0]<3: + import subprocess32 as subprocess +else: + import subprocess + +from .functions import get_cur_time, tol_equality + +#The library to write in dsin.txt is imported directly in method single_simulation +#It can be either modelicares or write_in_dsin + +############################################################################### +# CLASS Automatic_Simulation # +############################################################################### + +class DiscardedInput(Exception): + pass + +############################################################################### +# CLASS Automatic_Simulation # +############################################################################### + +class Automatic_Simulation(object): + """ + Aim: making it easier to run automatic Dymola simulations. + + Notes: + - Commentaries are written directly in the code + - Examples are provided with the code to make it easier to use + - There is a slight difference between the Python2 and Python3 implementation + of method singleSimulation + + Required libraries that are not in the standard distribution: + - modelicares + - subprocess32 (only for Python2) + - subprocess (only for Python3) + + New features introduced 02/2019: + - possibility of performing simulations with dsint.txt and dymosim.exe files + not nessarily in the same folder + - possibility of creating a results' folder in another folder + - possibility of using an equivalent file to dsin.txt but with a different name + - possibility of modifying the results file name + - the previous use of this module should remain unchanged compared to the previous version + - Not yet checked for Linux but should work, Python within a Linux environment is not yet implemented. + + Update 06/2019: + - correction of set_success_code function in order to consider the directory in which the results mat file is created (and not the default one) + + Update 03/2021: + - better handling of linux simulations (definition dymosim_path) + + Update 09/2022: + - bug correction in copy_dsres_to and check_inputs + """ + + def __init__(self,simu_dir,dict_inputs, + dymosim_path = None, dsin_name = 'dsin.txt', dsres_path = None, + logfile=None,timeout=60,copy_dsres_to=None, + copy_initial_dsin=None,without_modelicares=False,linux=False,check_inputs=[]): + self.__simu_dir = path.abspath(simu_dir) + self.__dsin_name = path.join(path.abspath(simu_dir),dsin_name) + if dsres_path is None: + self.__dsres_path = path.join(path.abspath(simu_dir),'dsres.mat') + elif path.basename(dsres_path)[-4:] =='.mat': + self.__dsres_path = dsres_path + else: + self.__dsres_path = path.join(path.abspath(dsres_path),'dsres.mat') + self.__logfile = logfile + self.__timeout = timeout + self.__copy_dsres_to = copy_dsres_to + self.__copy_initial_dsin = copy_initial_dsin + self.__without_modelicares = without_modelicares + self.__linux = linux + + if dymosim_path is None: + if self.linux: + self.__dymosim_path = path.join(path.abspath(simu_dir),'dymosim') + else: + self.__dymosim_path = path.join(path.abspath(simu_dir),'dymosim.exe') + elif path.isdir(dymosim_path): + if self.linux: + self.__dymosim_path = path.join(path.abspath(dymosim_path),'dymosim') + else: + self.__dymosim_path = path.join(path.abspath(dymosim_path),'dymosim.exe') + elif path.isfile(dymosim_path): + self.__dymosim_path = dymosim_path + else: + raise ValueError('Model file or directory not found using %s'%str(dymosim_path)) + + self.__success_code = 99 + + #Potential separation of standard inputs parameters from time varying inputs (dsu.txt) + if 'Time' in dict_inputs.keys() : #Reserved keyword for time variation + logging.info("Reserved variable Time found -> dsu.txt creation") + dsu_keys = [] + for key in dict_inputs : + #if hasattr(dict_inputs[key],'__iter__') and not isinstance(dict_inputs[key],str) : #Identifying iterables not string + if isinstance(dict_inputs[key],(list,tuple)) : #Identifying list or tuples + dsu_keys.append(key) + self.dsu_inputs = {key:dict_inputs[key] for key in dsu_keys} #Dictionary restricted to time varying inputs + for key in dsu_keys : del dict_inputs[key] #Removing the variable from the standard dictionary + else : + self.dsu_inputs = None + self.__dict_inputs = dict_inputs + + if check_inputs is True: + self.__check_inputs = dict_inputs.keys() + else: + self.__check_inputs = check_inputs + +############################################################################### +# GETTERS # +############################################################################### + @property + def simu_dir(self): + """ + Path of the directory in which the simulation will be done (directory which + contains file dsin.txt or equivalent) + """ + return self.__simu_dir + + @property + def check_inputs(self): + """ + Variable used to select inputs that have to be checked: set value vs. used + Possible values: + - True : all variables in dict_inputs are checked + - List of variables name to be checked + """ + return self.__check_inputs + + @property + def dict_inputs(self): + """ + Dictionary associating to each variable whose value has to be given to + dsin.txt (inputs + iteration variables if necessary) its value. + """ + return self.__dict_inputs + + @property + def logfile(self): + """ + log file + """ + return self.__logfile + + @property + def timeout(self): + """ + If a simulation has not converged after timeout, the siumlation is stopped. + """ + return self.__timeout + + @property + def copy_dsres_to(self): + """ + Path of the file where dsres.mat should be copied if simulation converges + """ + return self.__copy_dsres_to + + @property + def success_code(self): + """ + Type: Integer + If a simulation converges, this attribute is setted to 0 + If a simulation fails to initialize, this attribute is setted to 1 + If a simulation fails after the initialization, this attribute is setted to 2 + If a simulation does not fail but does not converge either, this attribute is setted to 3 + Default value: 99 + """ + return self.__success_code + + @property + def copy_initial_dsin_to(self): + """ + Path of the file where dsin.txt should be copied before the first simulation is run. + """ + return self.__copy_initial_dsin + + @property + def without_modelicares(self): + """ + If True, modelicares is not imported. + Instead, the script named "writer" is imported. A method of class Writer + is used to write in dsin.txt. + """ + return self.__without_modelicares + + @property + def linux(self): + """ + If the script has to be run on Linux, this attribute should be setted to True. + On Linux, the exe file is indeed named "dymosim" instead of "dymosim.exe". + """ + return self.__linux + + + def dymosim_path(self): + """ + Path of the directory that contains the dymosim.exe (or equivalent) file, + can be different of simu_dir + """ + return self.__dymosim_path + + def dsin_name(self): + """ + Name of the dsint.txt (or equivalent) file, must be inclueded in simu_dir + """ + return self.__dsin_name + + def dsres_path(self): + """ + Path of the file in which the results mat file will be created + """ + return self.__dsres_path +############################################################################### +# SETTERS # +############################################################################### + def set_success_code(self): + """ + This function sets the values of the following attributes: + "success": + True if simulation has converged (a file named success exists (success. on Linux)) + False if simulation has failed (a file named failure exists (failure. on Linux)) + None if simulation has not converged (For instance if the simulation was not given the time to converge) + "initsuccess": + True if the simulation succeeded to initialise (a dsres.mat file is found) + False if the initialization failed (no dsres.mat file is found) + """ + if path.exists(self.__dsres_path): + if self.linux: + if path.exists(path.join(path.dirname(self.__dsres_path),'success.')): + self.__success_code = 0 + elif path.exists(path.join(path.dirname(self.__dsres_path),'failure.')): + self.__success_code = 2 + else: + self.__success_code = 3 + else: + if path.exists(path.join(path.dirname(self.__dsres_path),'success')): + self.__success_code = 0 + elif path.exists(path.join(path.dirname(self.__dsres_path),'failure')): + self.__success_code = 2 + else: + self.__success_code = 3 + else: + self.__success_code = 1 #Suggestion: We should probably also consider timeout happens durng the initialization phase (neither 'dsres.mat' nor 'failure') + +############################################################################### +# RESETTERS # +############################################################################### + def reset_success_code(self): + """ + Reset value of attribute success + """ + self.__success_code = 99 +############################################################################### +# MAIN METHODS # +############################################################################### + + def cleaning(self): + """ + Remove the files that are automatically created during a simulation + """ + if path.exists(path.join(self.simu_dir,"success")): + remove(path.join(self.simu_dir,"success")) + if path.exists(path.join(self.simu_dir,"failure")): + remove(path.join(self.simu_dir,"failure")) + if path.exists(path.join(self.simu_dir,"null")): + remove(path.join(self.simu_dir,"null")) + if path.exists(path.join(self.simu_dir,"dsfinal.txt")): + remove(path.join(self.simu_dir,"dsfinal.txt")) + if path.exists(path.join(self.simu_dir,"status")): + remove(path.join(self.simu_dir,"status")) + if path.exists(path.join(self.simu_dir,"dslog.txt")): + remove(path.join(self.simu_dir,"dslog.txt")) + if path.exists(self.__dsres_path): + remove(self.__dsres_path) + + def single_simulation(self): + """ + Run an simulation using dymosim.exe after inserting the values that are + contained in self.dict_inputs in dsin.txt + """ + #Import a library to write in dsin.txt + if self.without_modelicares == True: + from .write_in_dsin import Write_in_dsin + else: + import modelicares + + #Save the value of the current working directory in CUR_DIR + CUR_DIR = getcwd() + + #It is necessary to change the working directory before running dymosim.exe + chdir(self.simu_dir) + if self.logfile: + logging.info('Time = '+get_cur_time()+' \t'+'Script working in directory '+self.simu_dir) + + #Cleaning the working directory (mandatory to check the intialisation success) + self.cleaning() + + #File dsin.txt is copied before being changed if it has been given a value + if self.copy_initial_dsin_to: + dir_copy_dsin = path.abspath(path.join(self.copy_initial_dsin_to,pardir)) + if not(path.exists(dir_copy_dsin)): + mkdir(dir_copy_dsin) + copyfile(src=path.basename(self.__dsin_name),dst=self.copy_initial_dsin_to) + + # Change file dsin.txt + if self.logfile: + #print('Changing dsin.txt') + logging.info('Time = '+get_cur_time()+' \t'+'Changing dsin.txt') + if self.without_modelicares: + writer = Write_in_dsin(dict_inputs = self.dict_inputs,filedir = self.simu_dir, dsin_name = path.basename(self.__dsin_name), new_file_name = path.basename(self.__dsin_name)) + writer.write_in_dsin() + else: + modelicares.exps.write_params(self.dict_inputs,path.basename(self.__dsin_name)) + + #A folder for results file is created if it does not exist (usually for dsres.mat) + if self.dsres_path: + dir_dsres_path = path.abspath(path.join(self.__dsres_path,pardir)) + if not(path.exists(dir_dsres_path)): + mkdir(dir_dsres_path) + + # Simulation launching : execution of dymosim.exe + if self.logfile: + #print('Launching simulation') + logging.info('Time = '+get_cur_time()+' \t'+"Launching simulation") +############################################################################### +# WARNING: DIFFERENT SOLUTIONS FOR PYTHON 2 AND PYTHON 3 # +############################################################################### +#Python2 solution - working solution + if sys.version_info[0]<3: + try: + if self.linux: + #print('Running dymosim') + subprocess.call(str(self.__dymosim_path + ' ' + self.__dsin_name + ' ' + self.__dsres_path),timeout=self.timeout) #NOT TESTED + else: + #print('Running dymosim.exe') + subprocess.call(str(self.__dymosim_path + ' ' + self.__dsin_name + ' ' + self.__dsres_path),timeout=self.timeout) #NOT TESTED + + except subprocess.TimeoutExpired: + if self.logfile: + logging.info('Time = '+get_cur_time()+' \t'+'Time limit reached. Simulation is stopped.') + #Add information about the simulation time when the simulation is stopped + try: + with open('status','r') as lines: + for line in lines: + words = line.split() + if len(words) > 0: + stop_time = line.split()[-1] + logging.info('Simulation time when simulation is stopped: '+str(stop_time)) + except: + pass + +#Python3 solution - best solution. Cannot be easily implemented on Python2 because +#the TimeoutExpired exception creates an error (apparently because of subprocess32) + else: + try: + #print('Running dymosim.exe') + proc = subprocess.Popen([str(arg) for arg in [self.__dymosim_path,self.__dsin_name,self.__dsres_path]], + stderr=subprocess.STDOUT,stdout=subprocess.PIPE,universal_newlines=True) + dymout, _ = proc.communicate(timeout=self.timeout) + if self.logfile: + logging.debug(dymout) + + except subprocess.TimeoutExpired: + proc.kill() + dymout, _ = proc.communicate() #Finalize communication (python guidelines) + if self.logfile: + logging.debug(dymout) + logging.info('Time = '+get_cur_time()+' \t'+'Time limit reached. Simulation is stopped.') + try: + with open('status','r') as lines: + for line in lines: + words = line.split() + if len(words) > 0: + stop_time = line.split()[-1] + logging.info('Simulation time when simulation is stopped: '+str(stop_time)) + except: + pass +############################################################################### +# END OF WARNING # +############################################################################### + + #Set the value of attribute success + self.set_success_code() + + #Copy dsres.mat if attribute copy_dsres_to is not None + if self.success_code == 0: + logging.info('Time = '+get_cur_time()+' \t'+'Simulation has converged') + if self.copy_dsres_to: + move(self.__dsres_path,self.copy_dsres_to) #Test + + elif self.success_code == 1: + logging.info('Time = '+get_cur_time()+' \t'+'Simulation has failed - Error during initialization') + + elif self.success_code == 2: + logging.info('Time = '+get_cur_time()+' \t'+'Simulation has failed - Error after the initialization') + + elif self.success_code == 3: + #The message is written in the TimeoutExpired Exception + pass + # Check whether the inputs have been taken into account + if self.success_code != 1 : + for key in self.check_inputs : + if self.copy_dsres_to: + outfile = Reader(self.copy_dsres_to,'dymola') + else: + outfile = Reader(self.__dsres_path,'dymola') + used_val = outfile.values(key)[1][0] + if self.logfile: + logging.debug('Checking %s : %s (set) vs. %s (used)' % (key,self.dict_inputs[key],used_val)) + if not tol_equality((self.dict_inputs[key],outfile.values(key)[1][0])): + raise DiscardedInput('Parameter %s: %s set but %s used' % (key,self.dict_inputs[key],used_val)) + + #Go back to CUR_DIR + chdir(CUR_DIR) + +if __name__ == '__main__': + print('\n AUTODIAGNOSTIC\n ==============\n') diff --git a/src/daComposant/daNumerics/pst4mod/modelica_libraries/change_working_point.py b/src/daComposant/daNumerics/pst4mod/modelica_libraries/change_working_point.py new file mode 100644 index 0000000..e46775b --- /dev/null +++ b/src/daComposant/daNumerics/pst4mod/modelica_libraries/change_working_point.py @@ -0,0 +1,769 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2016-2024 EDF R&D +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +############################################################################### +# LIBRARIES IMPORT # +############################################################################### +import logging +from copy import copy +from os import path,mkdir,remove +from shutil import copytree,rmtree + +from .automatic_simulation import Automatic_Simulation +from .functions import get_cur_time +############################################################################### +# CLASS Working_Point_Modification # +############################################################################### + +class Working_Point_Modification(object): + """ + Class Working_Point_Modification aims at making it easier to make a Modelica model + converge when one (or several) of its variables has (have) to be changed. Depending on the impact of + the variable, changing the value by a few percent can indeed be very hard. + + Generally, when the initial state is too far from the reference one for Dymola + to be able to converge towards the solution with its reference initialisation + script, it is possible to give ramps to Dymola (or other software) to help + it converging. Yet, if the model contains inverted values that should be + inverted only in a stationary state, giving ramps to help the model + converging does not work (the parameters that have been calculated at t=0 + would be wrong). + + Class Working_Point_Modification aims at helping solving this difficulty. + It can also be used for a model which does not contain any inverted values, in + case the modeler does not want to change his model so that the parameters could + be given as ramps. + + It runs several iterations, converging iteratively from the reference initial + state to the desired state. + + Dymola must be able to simulate the Modelica model in the reference state. + + Required material: + main_dir String + Path of the directory in which directories will be created + simu_material_dir String + Name of the subdirectory of main_dir which contains all the required material to run a simulation (dymosim.exe, dsin.txt, .dll files, files that are necesssary to create an initialization script) + around_simulation0 Object of type Around_Simulation (class defined in around_simulation) + Contains the material to create an initialization script + dict_var_to_fix Dictionary + Contains the names of the variables that must be imposed to + different values between reference state and the target state, + and the values they must be given at each state between the + reference and target states (information contained by a function) + The other arguments are optional, but more information can be found in the attributes getters + + New features introduced 02/2019: + - the possibility of using a dymosim situated in a given folder (featured introduced in automatic_simulation.py) + is introduced + - the previous use of this module should remain unchanged compared to previous version + - possibility of running this script in Linux (not tested) + """ + def __init__(self,main_dir,simu_material_dir, + around_simulation0,dict_var_to_fix, + dymosim_path=None, + simu_dir='SIMUS_DIR',store_res_dir='Working_point_change', + linux=False, + timeout=60,cur_step=1.,nit_max=20,var_to_follow_path='var_to_follow.csv', + gen_scripts_ini=False,pause=5.,min_step_val = 0.01,check_inputs=[]): + + self.__main_dir = path.abspath(main_dir) + self.__simu_material_dir = simu_material_dir + + self.__store_res_dir = store_res_dir + around_simulation0.verbose = False #To avoid repetitive messages + self.__around_simulation0 = around_simulation0 + self.__dict_var_to_fix = dict_var_to_fix + self.__simu_dir = simu_dir + self.__linux = linux + self.__timeout = timeout + self.__cur_step = cur_step + self.__nit_max = nit_max + self.__var_to_follow_path = var_to_follow_path + self.__gen_scripts_ini = gen_scripts_ini + self.__pause = pause + self.__min_step_val = min_step_val + + self.__around_simulation = None + self.__dict_inputs = {} + self.__val = 0. + self.__ref_success_code = 99 + self.__success_code = 99 + self.__nit = 0 + self.__converging_val = None + self.__list_inputs = [] + self.__res = None + + if dymosim_path is None: + self.__dymosim_path = None + elif path.isdir(dymosim_path): + if self.linux: + self.__dymosim_path = path.join(path.abspath(dymosim_path),'dymosim') + else: + self.__dymosim_path = path.join(path.abspath(dymosim_path),'dymosim.exe') + elif path.isfile(dymosim_path): + self.__dymosim_path = dymosim_path + else: + raise ValueError('Model file or directory not found using %s'%str(dymosim_path)) + + if check_inputs is True: + self.__check_inputs = dict_var_to_fix.keys() + else: + self.__check_inputs = check_inputs + +############################################################################### +# GETTERS # +############################################################################### + @property + def main_dir(self): + """ + Type: String + Path of the main directory - relative path from the path where the script + is executed or absolute path. + """ + return self.__main_dir + + @property + def check_inputs(self): + """ + Variable used to select inputs that have to be checked: set value vs. used + Possible values: + - True : all variables in dict_inputs are checked + - List of variables name to be checked + """ + return self.__check_inputs + + @property + def simu_material_dir(self): + """ + Type: String + Path of the main directory which contains all the files that are required to + simulate the model. These files will be copied and this directory will + not be changed - relative path from main_dir + """ + return self.__simu_material_dir + + @property + def store_res_dir(self): + """ + Type: String + Name of the directory where the simulation results will be stored + """ + return self.__store_res_dir + + @property + def around_simulation0(self): + """ + Type: Around_Simulation + around_simulation0 must contain enough information to create a script that + can make the simulation converge in the reference state + More information about object of type Around_Simulation in file + around_simulation.py. + """ + return self.__around_simulation0 + + @property + def dict_var_to_fix(self): + """ + Type: Dictionary + Dictionary which associates + key (Modelica) name of the variable whose value must be fixed + value Function which returns the value of variable key (0-> ref_val, + 1 -> target_val) + dict_var_to_fix can be generated using class Dict_Var_To_Fix. + """ + return self.__dict_var_to_fix + + @property + def simu_dir(self): + """ + Type: String + Path of the directory in which all the simulations will be run + """ + return self.__simu_dir + + def dymosim_path(self): + """ + Path of the directory that contains the dymosim.exe (or equivalent) file, + can be different of simu_dir + """ + return self.__dymosim_path + + @property + def linux(self): + """ + If the script has to be run on Linux, this attribute should be setted to True. + On Linux, the exe file is indeed named "dymosim" instead of "dymosim.exe". + """ + return self.__linux + + @property + def timeout(self): + """ + Type: Integer + If a simulation has not converged after timeout, the siumlation is stopped. + """ + return self.__timeout + + @property + def cur_step(self): + """ + Type: Float + Current step (0: reference value;1: target value) + -> with cur_step=0.5,the simulation will try to converge in 2 steps. + """ + return self.__cur_step + + @property + def nit_max(self): + """ + Type: Integer + Maximum number of iterations + """ + return self.__nit_max + + @property + def var_to_follow_path(self): + """ + Type: String + Path of the file containing the values of the interesting + variables which are saved each time a simulation converges + """ + return self.__var_to_follow_path + + @property + def gen_scripts_ini(self): + """ + Type: Boolean + If True, each time a simulation has converged, the associated + initialization script is generated. + """ + return self.__gen_scripts_ini + + @property + def pause(self): + """ + Type: Float + Number of seconds that are given to the system to copy the files + I could try to replace it by calling a subprocess and waiting for the end + of this subprocess + """ + return self.__pause + + @property + def around_simulation(self): + """ + Type: Around_Simulation + around_simulation contains enough information to generated initialization + scripts. + + More information about object of type Around_Simulation in file + around_simulation.py. + """ + return self.__around_simulation + + @property + def dict_inputs(self): + """ + Type: Dictionary + Dictionary associating to each variable whose value has to be given to + dsin.txt (variables whose values must be fixed + iteration variables) its value. + """ + return self.__dict_inputs + + @property + def val(self): + """ + Type: float + Caracterise the distance from the reference state and to the target state. + 0. in the reference state + 1. in the final state when the working point change has reached its target + """ + return self.__val + + @property + def success_code(self): + """ + Type: Integer + If a simulation converges, this attribute is setted to 0 + If a simulation fails to initialize, this attribute is setted to 1 + If a simulation fails after the initialization, this attribute is setted to 2 + If a simulation does not fail but does not converge either, this attribute is setted to 3 + Default value: 99 + """ + return self.__success_code + + @property + def ref_success_code(self): + """ + Type: Integer + Before trying to reach the target state, the user should try to simulate + the reference state. + If this "reference simulation" converges, this attribute is setted to 0. + If this "reference simulation" fails to initialize, this attribute is setted to 1 + If this "reference simulation" fails after the initialization, this attribute is setted to 2 + If this "reference simulation" does not fail but does not converge either, this attribute is setted to 3 + Default value: 99 + + If the value of this attribute is 1, no further calculation will be made. + """ + return self.__ref_success_code + + @property + def nit(self): + """ + Type: Integer + Number of simulation that have been run + """ + return self.__nit + + @property + def converging_val(self): + """ + Type: Float + Highest value for which the simulation has converged + 0 -> reference state + 1 -> target state + """ + return self.__converging_val + + @property + def min_step_val(self): + """ + Type: Float + Mininum allowed value for the step. If the script tries to use a value + lower than min_step_val, it will stop. + """ + return self.__min_step_val + + @property + def list_inputs(self): + """ + Type: List + List of the input variable (Modelica) names + """ + return self.__list_inputs + + @property + def res(self): + """ + Type: Dictionary + It is a dictionary containing the values of self.list_inputs at the end + of a simulation (the values of the iteration variables in this attribute + will therefore be injected in the next simulation) + """ + return self.__res +############################################################################### +# SETTERS # +############################################################################### + + def set_success_code(self,value): + """ + Inputs + value Integer + + See the description of attribute success_code for more information + """ + self.__success_code = value + + def set_ref_success_code(self,value): + """ + Inputs + value Integer + See the description of attribute success_code for more information + """ + self.__ref_success_code = value + + def set_converging_val(self,value): + """ + Inputs + value Float + Highest value for which the simulation has converged up to the moment + this method is called + """ + self.__converging_val = value + +############################################################################### +# RESETTERS # +############################################################################### + def reset_val(self): + """ + Reset value of attribute val to its reference value: 0. + """ + self.__val = 0. + +############################################################################### +# MAIN METHODS # +############################################################################### + + def increase_nit(self): + """ + Increase value of attribute nit. + This method must be called when a simulation is run. + """ + self.__nit += 1 + + def get_path_store_res_dir(self): + """ + Return the path in which the results are saved. + """ + return path.abspath(path.join(self.main_dir,self.store_res_dir)) + + def create_working_directory(self): + """ + A directory in which all the simulations will be made is created. + The files that are in self.simu_material_dir are copie into this new directory. + If self.store_res_dir does not exist in main_dir, a directory named self.store_res_dir is created + """ + if path.exists(path.join(self.main_dir,self.simu_dir)): + rmtree(path.join(self.main_dir,self.simu_dir), ignore_errors=True) #added 19_07_2018 + copytree(path.abspath(path.join(self.main_dir,self.simu_material_dir)),path.join(self.main_dir,self.simu_dir)) + + if path.exists(path.join(self.main_dir,self.store_res_dir)): + rmtree(path.join(self.main_dir,self.store_res_dir), ignore_errors=True) #added 19_07_2018 + mkdir(path.join(self.main_dir,self.store_res_dir)) + + def reset_res(self): + """ + Reset value of attribute res to {} (=a dictionary without any keys) + """ + self.__res = {} + + def set_res(self): + """ + Set the value of attribute res. + This method has te be called when a simulation has converged and before + the result is written into a file by method write_res + """ + self.reset_res() + + for var in self.list_inputs: + if var in self.dict_var_to_fix: + self.__res[var] = self.dict_inputs[var] + else: + #After a simulation, self.dict_inputs is not modified (it would + #be a nonsense to change self.dict_inputs if no simulation has + #to be done), but self.around_simulation.dict_iter_var is modified, + #that is the reason why it is necessary to extract the values + #in self.around_simulation.dict_iter_var. + self.__res[var] = self.around_simulation.dict_iter_var[var] + + + def set_dict_inputs(self,ref=False): + """ + Set the value of attribute dict_inputs + The values of the iterations variables comes from the last simulation + which has converged. The values of the parameters that have to be + fixed are calculated with the functions of self.dict_var_to_fix. + + If ref=True, val is given the value 0 and the iteration variables are + given the values they have in self.simulation0.dict_iter_var. + """ + if ref: + self.reset_val() + self.__dict_inputs = copy(self.around_simulation0.dict_iter_var) + + else: + #Get the iteration variables and their values + self.__dict_inputs = copy(self.around_simulation.dict_iter_var) + + #Get the values of the variables that have to be fixed + for var_to_fix in self.dict_var_to_fix: + self.dict_inputs[var_to_fix] = self.dict_var_to_fix[var_to_fix](self.val) + + #Set the value of attribute list_inputs if it has no value + if self.list_inputs == []: + self.__list_inputs = list(self.dict_inputs.keys()) + self.list_inputs.sort() + + def write_res(self,ref=False): + """ + Write in self.var_to_follow_path the values of the inputs at the end + of the simulation + """ + with open(self.var_to_follow_path,'a') as output: + if ref: + output.write('val'.ljust(50)+';') + for var in self.list_inputs[:-1]: + output.write(var.ljust(50)+';') + output.write(self.list_inputs[-1]+'\n') + + output.write(str(self.val).ljust(50)+';') + for var in self.list_inputs[:-1]: + output.write(str(self.res[var]).ljust(50)+';') + output.write(str(self.res[var])+'\n') + + def clean_var_to_follow(self): + """ + If the file self.var_to_follow_path exists, it is erased when this + method is called + """ + if path.exists(self.var_to_follow_path): + remove(self.var_to_follow_path) + + def fauto_simu(self,ref=False,final_cleaning=False): + """ + Erase the old simulation files from the simulation directory and run a + simulation with the current values of the iteration variables + """ + #Create an object of type Automatic_Simulation, which will make it easy + #to run the simulation + #Check inputs only for the target case + logging.debug('Val %s' % self.val) + if (not ref) and (self.val == 1.) : + tmp_check = self.check_inputs + logging.debug('Performing input check on the target case (run %s)' % self.nit) + else : + tmp_check = [] + + logging.debug('Input of the current case:') + for key in self.dict_inputs.keys(): + logging.debug('%s = %s' % (key,self.dict_inputs[key])) + + auto_simu = Automatic_Simulation(simu_dir=path.join(self.main_dir,self.simu_dir), + dict_inputs=self.dict_inputs, + dymosim_path = self.__dymosim_path, + logfile=True, + timeout=self.timeout, + copy_dsres_to=path.join(self.get_path_store_res_dir(),str(self.val)+'.mat'), + check_inputs=tmp_check, + linux = self.linux, + without_modelicares = True) # A garder à False car sinon write_in_dsin efface dic_inputs... update 12/2022: writ_in_dsin modifié pour ne pas vider dict_inputs + + #Write in log file + logging.info('Time = '+get_cur_time()+' \t'+'Current value: '+str(self.val)) + + self.set_success_code(99) + self.increase_nit() + + #Run the simulation + auto_simu.single_simulation() + + + #If the simulation has converged + if auto_simu.success_code == 0: + + #If an initialization script has to be created + if self.gen_scripts_ini: + #If the simulation state is the reference state + if ref: + self.around_simulation0.generated_script_options={'file_name':path.join(self.get_path_store_res_dir(),str(self.val)+'.mos')} + self.around_simulation0.create_script_ini() + else: + # -----> Option should be added to the dictionary and not reset !!!!! + self.around_simulation.generated_script_options={'file_name':path.join(self.get_path_store_res_dir(),str(self.val)+'.mos')} + self.around_simulation.create_script_ini() + + if ref: + logging.info('Time = '+get_cur_time()+' \t'+'Reference simulation has converged') + self.set_ref_success_code(0) + self.__around_simulation = copy(self.around_simulation0) + else: + logging.info('Time = '+get_cur_time()+' \t'+'Simulation has converged') + + #If this simulation is the first to converge, the next values of the iteration variables will be built from the mat result file. + if self.converging_val == None: + self.around_simulation.iter_var_values_options['source'] = 'from_mat' + + + self.set_success_code(0) + self.set_converging_val(self.val) + self.around_simulation.mat = path.join(self.get_path_store_res_dir(),str(self.val)+".mat") + self.around_simulation.set_dict_iter_var() + self.set_res() + self.write_res(ref=ref) + + else: + self.set_success_code(auto_simu.success_code) + + if ref: + self.set_ref_success_code(auto_simu.success_code) + logging.info('Time = '+get_cur_time()+' \t'+'Reference simulation has not converged. Success_code = '+str(self.ref_success_code)) + else: + logging.info('Time = '+get_cur_time()+' \t'+'Simulation has not converged. Success_code = '+str(self.success_code)) + if final_cleaning: + auto_simu.cleaning() + + def check_ref_simulation(self): + """ + Run the simulation in the reference state + """ + #Erase the file in which the values of the inputs at the end of the + #simulations are saved + self.clean_var_to_follow() + + #reset the values of the inputs + self.set_dict_inputs(ref=True) + + #Run the simulation + self.fauto_simu(ref=True) + + def change_val(self): + """ + Adapt the step if it is necessary and change the value of attribute val. + The step is divided by 2 when a simulation does not converge. + """ + if self.success_code == 1: + self.__cur_step = float(self.cur_step)/2. + elif self.success_code in [2,3]: + logging.info('Time = '+get_cur_time()+' \t'+'Method change_val has been called whereas the success_code is in [2,3] which \ + is not normal. The methods designed in this script can only help to initialize the simulations. If the simulation fails \ + whereas the initialization has been a success, it is not useful to decrease the step') + self.__cur_step = 0 + else: + if self.val+self.cur_step > 1: + self.__cur_step = 1.-self.val + if self.converging_val == None: + self.__val = self.cur_step + else: + self.__val = self.converging_val + self.cur_step + + def working_point_modification(self,final_cleaning=False,skip_reference_simulation=False): + """ + Main method of this class. + It tries to reach the target state from the reference state. + The step is adapted with change_val depending on the success of the simulations + Before a simulation is run, the variables are initialized to + Their calculated value if it is a variable whose value should be imposed + The value calculated by Dymola in the last simulation that has converged if it is an iteration variable + """ + if skip_reference_simulation: + self.skip_reference_simulation() + + if self.ref_success_code == 0 or skip_reference_simulation: + while self.nit < self.nit_max and (self.converging_val == None or self.converging_val < 1) and self.success_code not in [2,3] : + self.change_val() + if self.crit_check() : + break + if self.cur_step < self.min_step_val : #Check after change_val call + break + self.set_dict_inputs(ref=False) + logging.debug(".nit : %s\n.converging_val : %s\n.around_simulation.mat : %s\n.around_simulation._source_ : %s" %(self.nit,self.converging_val,self.around_simulation.mat,self.around_simulation.iter_var_values_options['source'])) + self.fauto_simu(ref=False,final_cleaning=final_cleaning) + + if not(self.nit < self.nit_max): + logging.info('Time = '+get_cur_time()+' \t'+'STOP: maximum number of iterations reached.') + if self.converging_val != None and not(self.converging_val < 1): + logging.info('Time = '+get_cur_time()+' \t'+'FINISHED: Target values reached.') + if self.success_code in [2,3]: + logging.info('Time = '+get_cur_time()+' \t'+'STOP: Simulation initialization has succeeded, but simulation has failed or was not given enough time to converge') + + def crit_check(self): + """ + Check whether the step-size and the max-number-of-iterations criteria has been met + """ + check = False + if self.success_code == 1 : #Check criteria only if the previous iteration did not initialize + if (self.cur_step < self.min_step_val) : #Minimum step size criterium + logging.info('Time = '+get_cur_time()+' \t'+'STOP: The step ('+str(self.cur_step)+') has crossed the minimum allowed value: '+str(self.min_step_val)) + check = True + elif ( (self.val + self.cur_step*(self.nit_max - self.nit - 1)) < 1. ) : #Max number of step criterium (with "forecast") + logging.info('Time = '+get_cur_time()+' \t'+'STOP: Impossible to success before hitting the number of iterations limit: '+str(self.nit_max)) + logging.info('Time = '+get_cur_time()+' \t'+'STOP: Completed iterations: '+str(self.nit)+'; next value: '+str(self.val)+'; current step: '+str(self.cur_step)) + check = True + return check + + def skip_reference_simulation(self): + logging.info('Time = '+get_cur_time()+' \t'+'Reference simulation is not launched. It is safer to choose skip_reference_simulation=False') + self.__around_simulation = copy(self.around_simulation0) + +############################################################################### +# CLASS Dict_Var_To_Fix # +############################################################################### + +class Dict_Var_To_Fix(object): + """ + This class makes it easier to create the attribute dict_var_to_fix of the objects + of class Working_Point_Modification in the case there are only a ref value and + a target value. + """ + def __init__(self,option,dict_var_to_fix=None,dict_auto_var_to_fix=None): + if dict_var_to_fix == None: + dict_var_to_fix = {} + + self.__option = option + self.__dict_var_to_fix = dict_var_to_fix + self.__dict_auto_var_to_fix = dict_auto_var_to_fix + + @property + def option(self): + """ + Type: String + Available values for option are + 'manual' -> dict_var_to_fix is directly used + 'automatic' -> dict_var_to_fix is built from dict_auto_var_to_fix + """ + return self.__option + + @property + def dict_var_to_fix(self): + """ + Type: Dictionary + Dict_var_to_fix is a dictionary which associates + key name of the variable whose value must be fixed + value Function which returns the value of variable key when val is x + 0 -> reference value of key + 1 -> target value of key + """ + return self.__dict_var_to_fix + + @property + def dict_auto_var_to_fix(self): + """ + Type: Dictionary + This dictionary is used if option == 'automatic' + It associates + key name of the variable whose value must be fixed + value (ref_val,target_val) : A 2-tuple composed of the value of + the variable key in the reference state and the target value + of the variable key + """ + return self.__dict_auto_var_to_fix + + def set_dict_var_to_fix(self): + """ + Set the value of attribute dict_var_to_fix when option is 'automatic'. + If option is 'manual', attribute dict_var_to_fix already has a value. + + Create a function for each variable, which returns + function(0) -> ref_val + function(1) -> target_val + """ + if self.option == 'automatic': + for var in self.dict_auto_var_to_fix: + ref_val = self.dict_auto_var_to_fix[var][0] + target_val = self.dict_auto_var_to_fix[var][1] + self.__dict_var_to_fix[var] = self.f_creator(ref_val,target_val) + + def f_creator(self,ref_val,target_val): + """ + Return the linear function returning + ref_val if it is given 0 + target_val if ti is given 1 + This function is used in method set_dict_var_to_fix of class Dict_Var_To_Fix + when option == 'automatic' + """ + def f(x): + if x<0 or x>1: + exit("x has to be between 0 and 1") + else: + return ref_val+x*(target_val-ref_val) + return f + +if __name__ == '__main__': + print('\n AUTODIAGNOSTIC\n ==============\n') diff --git a/src/daComposant/daNumerics/pst4mod/modelica_libraries/functions.py b/src/daComposant/daNumerics/pst4mod/modelica_libraries/functions.py new file mode 100644 index 0000000..c02a3c0 --- /dev/null +++ b/src/daComposant/daNumerics/pst4mod/modelica_libraries/functions.py @@ -0,0 +1,66 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2016-2024 EDF R&D +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +############################################################################### +# LIBRARIES IMPORT # +############################################################################### +from time import localtime +import os +from subprocess import Popen + +############################################################################### +# USEFUL FUNCTIONS # +############################################################################### + +def get_cur_time(): + """ + Return a string in the format : year/month/day hour:min:sec + """ + real_time = localtime() + str_real_time = \ + '{0:04d}'.format(real_time.tm_year) + '/' + \ + '{0:02d}'.format(real_time.tm_mon) + '/' + \ + '{0:02d}'.format(real_time.tm_mday) + ' ' + \ + '{0:02d}'.format(real_time.tm_hour) + ':' + \ + '{0:02d}'.format(real_time.tm_min) + ':' + \ + '{0:02d}'.format(real_time.tm_sec) + + return str_real_time + + +def tol_equality(vars, tol=1.e-4): + """ + Check whether two numbers (vars[0] and vars[1]) are approximately equal (with 'tol' tollerance) + """ + try: + return abs(float(vars[1]) / float(vars[0]) - 1.) < tol + except ZeroDivisionError: + return abs(vars[1] - vars[0]) < tol + + +def Edit_File(filename): + if os.name == 'nt': # Windows + editor = ['Notepad'] + else: + editor = ['gedit', '-s'] + file_display = Popen(editor + [f'{filename}']) + file_display.wait() + + +if __name__ == '__main__': + print('\n AUTODIAGNOSTIC\n ==============\n') diff --git a/src/daComposant/daNumerics/pst4mod/modelica_libraries/write_in_dsin.py b/src/daComposant/daNumerics/pst4mod/modelica_libraries/write_in_dsin.py new file mode 100644 index 0000000..ef89936 --- /dev/null +++ b/src/daComposant/daNumerics/pst4mod/modelica_libraries/write_in_dsin.py @@ -0,0 +1,201 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2016-2024 EDF R&D +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +from os import path,getcwd,rename,remove,replace +from sys import exit + +class Write_in_dsin: + """ + This class has been written to change dsin.txt (a file that is generated by + Dymola and that contains the values of the parameters of a simulation). Changing + the values of parameters in dsin.txt lets the user launch a different simulation. + This can also be done with method modelicares.exps.write_params. + The present class has been written in case in order to make it possible to + change the parameters values in dsin.txt without importing modelicares. + + New features introduced 02/2019: + - possibility of change a dsin.txt type file having a different name + - by default, the previous use of this module should remain unchanged + + New features introduced 09/2022: + - script adapted to the situations in which the variables are described on + a single line (instead of 2 traditionnally) in the dsin file + """ + + def __init__(self,dict_inputs,filedir=path.abspath(getcwd()),dsin_name='dsin.txt',old_file_name='dsin_old.txt',new_file_name='dsin.txt'): + self.__dict_inputs = dict_inputs.copy() #this way the dictionary is not emptied + self.__filedir = filedir + self.__dsin_name = dsin_name + self.__old_file_name = old_file_name + self.__new_file_name = new_file_name + self.__maybe_par_dict = {} + + @property + def dict_inputs(self): + """ + Dictionary associating to each variable whose value has to be given to + dsin.txt (inputs + iteration variables if necessary) its value. + """ + return self.__dict_inputs + + @property + def dsin_name(self): + """ + Name of the dsin.txt file to change, it might have a different name + """ + return self.__dsin_name + + @property + def maybe_par_dict(self): + """ + List of variables to be changed which are not identified as 'parameter' in the dsin.txt. + """ + return self.__maybe_par_dict + + @property + def filedir(self): + """ + Path of the directory in which the file dsin.txt is located. + The new version of this file will also be saved in this directory. + """ + return self.__filedir + + @property + def old_file_name(self): + """ + The original file dsin.txt that is modified is saved under a name that can be chosen. + dsin_old.txt seems appropriate + """ + return self.__old_file_name + + @property + def new_file_name(self): + """ + The new version of dsin.txt is given a name that can be chosen through + this attribute. In order to be able to launch a simulation, this name has + to be dsin.txt. + """ + return self.__new_file_name + +######################################################################## +# MAIN METHODS # +######################################################################## + def write_in_dsin(self): + """ + Main method of this class. + This method creates a new dsin.txt file. + The original file is renamed 'dsin_old.txt'. + The new file is named 'dsin.txt' + """ + #reading has value 1 while the part of the file currently being + #read is the part of the file in which the variables are given + #values. Only this part of the file has to be changed in order + #to change the parameters values. + reading=0 + in_line1,in_line2=None,None + with open(path.join(self.filedir,self.dsin_name),'r') as lines: + with open(path.join(self.filedir,'dsin_tmp.txt'),'w') as output: + for line in lines: + if reading==1 and len(line)>1 and not line[0] == '#': + if in_line1 == None: + in_line1 = line + else: + in_line2 = line + else: + output.write(line) + + if 'initialValue' in line: + reading=1 + #If the structure of file dsin.txt generated by + #Dymola was changed, the following line could have + #to be changed + if '# Matrix with 6 columns' in line: + reading=0 + + # - START - Take into account the possibility of handling dsin.txt files with one single line for describin variables + if in_line1 != None and len(in_line1.split())>4 and reading==1: #Case where there is one line per parameter rather than two in the dsin file + + line_list = in_line1.split() #The variable name may contain spaces... + valtype1,val,elem13,elem14,valtype2,elem16,elem17 = line_list[0],line_list[1],line_list[2],line_list[3],line_list[4],line_list[5],line_list[6] + + var = '' + for reste in line_list[7:]: + var = var + reste + + if var in self.dict_inputs.keys(): + val = self.dict_inputs[var] + #The format is not the same depending on the length of some variables + if len(str(val))<=7 and len(elem14)<=7: + txt ='{elem11:>3} {elem12:>7} {elem13:>18} {elem14:>7} {elem15:>18} {elem16:>5} {elem17:>3} {elem18:<}\n' + if len(str(val))<=7 and len(elem14)>7: + txt ='{elem11:>3} {elem12:>7} {elem13:>18} {elem14:>23} {elem15:>2} {elem16:>5} {elem17:>3} {elem18:<}\n' + if len(str(val))>7 and len(elem14)<=7: + txt ='{elem11:>3} {elem12:>23} {elem13:>2} {elem14:>7} {elem15:>18} {elem16:>5} {elem17:>3} {elem18:<}\n' + if len(str(val))>7 and len(elem14)>7: + txt ='{elem11:>3} {elem12:>23} {elem13:>2} {elem14:>23} {elem15:>2} {elem16:>5} {elem17:>3} {elem18:<}\n' + + out_line = txt.format(elem11=valtype1,elem12=val,elem13=elem13,elem14=elem14,elem15=valtype2,elem16=elem16,elem17=elem17,elem18=var) + + if valtype1 != '-1' or (valtype2 != '1' and valtype2 != '2') : #(0,6) combination looks like the only case to look into + self.__maybe_par_dict[var] = val + self.__dict_inputs.pop(var) #Deleting the variable from the list of 'to do' + + else: + out_line = in_line1 + + output.write(out_line) + + in_line1=None + # - END - Take into account the possibility of handling dsin.txt files with one single line for describin variables + + + elif in_line2 != None: + valtype1,val,elem13,elem14 = in_line1.split() + #valtype2,elem22,elem23,var = in_line2.split() + line2_list = in_line2.split() #The variable name may contain spaces... + valtype2 = line2_list[0] + var = '' + for reste in line2_list[3:]: + var = var + reste + + if var in self.dict_inputs.keys(): + val = self.dict_inputs[var] + out_line1 = '{elem11:>3} {elem12:>23} {elem13:23} {elem14:>23}\n'.format(elem11=valtype1,elem12=val,elem13=elem13,elem14=elem14) + if valtype1 != '-1' or (valtype2 != '1' and valtype2 != '2') : #(0,6) combination looks like the only case to look into + self.__maybe_par_dict[var] = val + self.__dict_inputs.pop(var) #Deleting the variable from the list of 'to do' + else: + out_line1 = in_line1 + out_line2 = in_line2 + + output.write(out_line1) + output.write(out_line2) + + in_line1,in_line2=None,None + +# rename(path.join(self.filedir,'dsin.txt'),path.join(self.filedir,self.old_file_name)) + replace(path.join(self.filedir,self.dsin_name),path.join(self.filedir,self.old_file_name)) # resolve the conflit with old version + + if len(self.dict_inputs) > 0 : #At least one variable was not changed + remove(path.join(self.filedir,'dsin_tmp.txt')) + raise KeyError("The following variables were not found in dsin.txt:\n %s" % list(self.dict_inputs.keys())) + + rename(path.join(self.filedir,'dsin_tmp.txt'),path.join(self.filedir,self.new_file_name)) + +if __name__ == '__main__': + print('\n AUTODIAGNOSTIC\n ==============\n') -- 2.30.2