1 # -*- coding: utf-8 -*-
3 # Copyright (C) 2008-2024 EDF R&D
5 # This library is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation; either
8 # version 2.1 of the License.
10 # This library is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 # Lesser General Public License for more details.
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this library; if not, write to the Free Software
17 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 # Author: Jean-Philippe Argaud, jean-philippe.argaud@edf.fr, EDF R&D
24 from daCore import BasicObjects, NumericObjects, PlatformInfo
25 from daCore.PlatformInfo import vfloat
26 from daAlgorithms.Atoms import eosg
27 mfp = PlatformInfo.PlatformInfo().MaximumPrecision()
29 # ==============================================================================
30 class ElementaryAlgorithm(BasicObjects.Algorithm):
32 BasicObjects.Algorithm.__init__(self, "SAMPLINGTEST")
33 self.defineRequiredParameter(
34 name = "EnsembleOfSnapshots",
36 typecast = numpy.array,
37 message = "Ensemble de vecteurs d'état physique (snapshots), 1 état par colonne",
39 self.defineRequiredParameter(
40 name = "SampleAsnUplet",
43 message = "Points de calcul définis par une liste de n-uplet",
45 self.defineRequiredParameter(
46 name = "SampleAsExplicitHyperCube",
49 message = "Points de calcul définis par un hyper-cube dont on donne la liste des échantillonnages explicites de chaque variable comme une liste", # noqa: E501
51 self.defineRequiredParameter(
52 name = "SampleAsMinMaxStepHyperCube",
55 message = "Points de calcul définis par un hyper-cube dont on donne la liste des échantillonnages implicites de chaque variable par un triplet [min,max,step]", # noqa: E501
57 self.defineRequiredParameter(
58 name = "SampleAsMinMaxLatinHyperCube",
61 message = "Points de calcul définis par un hyper-cube Latin dont on donne les bornes de chaque variable par une paire [min,max], suivi de la paire [dimension, nombre de points demandés]", # noqa: E501
63 self.defineRequiredParameter(
64 name = "SampleAsMinMaxSobolSequence",
67 message = "Points de calcul définis par une séquence de Sobol dont on donne les bornes de chaque variable par une paire [min,max], suivi de la paire [dimension, nombre minimal de points demandés]", # noqa: E501
69 self.defineRequiredParameter(
70 name = "SampleAsIndependantRandomVariables",
73 message = "Points de calcul définis par un hyper-cube dont les points sur chaque axe proviennent de l'échantillonnage indépendant de la variable selon la spécification ['distribution',[parametres],nombre]", # noqa: E501
75 self.defineRequiredParameter(
76 name = "QualityCriterion",
77 default = "AugmentedWeightedLeastSquares",
79 message = "Critère de qualité utilisé",
82 "AugmentedWeightedLeastSquares", "AWLS",
83 "WeightedLeastSquares", "WLS",
84 "LeastSquares", "LS", "L2",
85 "AbsoluteValue", "L1",
86 "MaximumError", "ME", "Linf",
89 "AugmentedPonderatedLeastSquares", "APLS",
90 "PonderatedLeastSquares", "PLS",
93 self.defineRequiredParameter(
97 message = "Activation du mode debug lors de l'exécution",
99 self.defineRequiredParameter(
100 name = "StoreSupplementaryCalculations",
103 message = "Liste de calculs supplémentaires à stocker et/ou effectuer",
109 "EnsembleOfSimulations",
112 "InnovationAtCurrentState",
113 "SimulatedObservationAtCurrentState",
116 self.defineRequiredParameter(
118 typecast = numpy.random.seed,
119 message = "Graine fixée pour le générateur aléatoire",
121 self.requireInputArguments(
122 mandatory= ("Xb", "Y", "R", "B"),
135 def run(self, Xb=None, Y=None, U=None, HO=None, EM=None, CM=None, R=None, B=None, Q=None, Parameters=None):
136 self._pre_run(Parameters, Xb, Y, U, HO, EM, CM, R, B, Q)
138 if hasattr(Y, "store"):
139 Yb = numpy.asarray( Y[-1] ).reshape((-1, 1)) # Y en Vector ou VectorSerie
141 Yb = numpy.asarray( Y ).reshape((-1, 1)) # Y en Vector ou VectorSerie
145 def CostFunction(x, HmX, QualityMeasure="AugmentedWeightedLeastSquares"):
146 if numpy.any(numpy.isnan(HmX)):
149 Jb, Jo, J = numpy.nan, numpy.nan, numpy.nan
151 _X = numpy.asarray( x ).reshape((-1, 1))
152 _HX = numpy.asarray( HmX ).reshape((-1, 1))
153 _Innovation = Yb - _HX
154 assert Yb.size == _HX.size
155 assert Yb.size == _Innovation.size
156 if QualityMeasure in ["AugmentedWeightedLeastSquares", "AWLS", "AugmentedPonderatedLeastSquares", "APLS", "DA"]: # noqa: E501
157 if BI is None or RI is None:
158 raise ValueError("Background and Observation error covariance matrix has to be properly defined!") # noqa: E501
159 Jb = vfloat( 0.5 * (_X - Xb).T * (BI * (_X - Xb)) )
160 Jo = vfloat( 0.5 * _Innovation.T * (RI * _Innovation) )
161 elif QualityMeasure in ["WeightedLeastSquares", "WLS", "PonderatedLeastSquares", "PLS"]:
163 raise ValueError("Observation error covariance matrix has to be properly defined!")
165 Jo = vfloat( 0.5 * _Innovation.T * (RI * _Innovation) )
166 elif QualityMeasure in ["LeastSquares", "LS", "L2"]:
168 Jo = vfloat( 0.5 * _Innovation.T @ _Innovation )
169 elif QualityMeasure in ["AbsoluteValue", "L1"]:
171 Jo = vfloat( numpy.sum( numpy.abs(_Innovation), dtype=mfp ) )
172 elif QualityMeasure in ["MaximumError", "ME", "Linf"]:
174 Jo = vfloat(numpy.max( numpy.abs(_Innovation) ))
177 if self._toStore("Innovation"):
178 self.StoredVariables["Innovation"].store( _Innovation )
179 if self._toStore("CurrentState"):
180 self.StoredVariables["CurrentState"].store( _X )
181 if self._toStore("InnovationAtCurrentState"):
182 self.StoredVariables["InnovationAtCurrentState"].store( _Innovation )
183 if self._toStore("SimulatedObservationAtCurrentState"):
184 self.StoredVariables["SimulatedObservationAtCurrentState"].store( _HX )
185 self.StoredVariables["CostFunctionJb"].store( Jb )
186 self.StoredVariables["CostFunctionJo"].store( Jo )
187 self.StoredVariables["CostFunctionJ" ].store( J )
191 if len(self._parameters["EnsembleOfSnapshots"]) > 0:
192 sampleList = NumericObjects.BuildComplexSampleList(
193 self._parameters["SampleAsnUplet"],
194 self._parameters["SampleAsExplicitHyperCube"],
195 self._parameters["SampleAsMinMaxStepHyperCube"],
196 self._parameters["SampleAsMinMaxLatinHyperCube"],
197 self._parameters["SampleAsMinMaxSobolSequence"],
198 self._parameters["SampleAsIndependantRandomVariables"],
200 self._parameters["SetSeed"],
202 if hasattr(sampleList, "__len__") and len(sampleList) == 0:
203 EOX = numpy.array([[]])
205 EOX = numpy.stack(tuple(copy.copy(sampleList)), axis=1)
206 EOS = self._parameters["EnsembleOfSnapshots"]
207 if EOX.shape[1] != EOS.shape[1]:
208 raise ValueError("Numbers of states (=%i) and snapshots (=%i) has to be the same!"%(EOX.shape[1], EOS.shape[1])) # noqa: E501
210 if self._toStore("EnsembleOfStates"):
211 self.StoredVariables["EnsembleOfStates"].store( EOX )
212 if self._toStore("EnsembleOfSimulations"):
213 self.StoredVariables["EnsembleOfSimulations"].store( EOS )
215 EOX, EOS = eosg.eosg(self, Xb, HO, True, False)
217 for i in range(EOS.shape[1]):
218 J, Jb, Jo = CostFunction( EOX[:, i], EOS[:, i], self._parameters["QualityCriterion"])
221 self._post_run(HO, EM)
224 # ==============================================================================
225 if __name__ == "__main__":
226 print("\n AUTODIAGNOSTIC\n")