1 # -*- coding: utf-8 -*-
3 # Copyright (C) 2008-2023 EDF R&D
5 # This library is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation; either
8 # version 2.1 of the License.
10 # This library is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 # Lesser General Public License for more details.
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this library; if not, write to the Free Software
17 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 # Author: Jean-Philippe Argaud, jean-philippe.argaud@edf.fr, EDF R&D
23 import numpy, logging, copy
24 from daCore import BasicObjects, NumericObjects, PlatformInfo
25 from daCore.PlatformInfo import PlatformInfo, vfloat
26 from daAlgorithms.Atoms import eosg
27 mfp = PlatformInfo().MaximumPrecision()
29 # ==============================================================================
30 class ElementaryAlgorithm(BasicObjects.Algorithm):
32 BasicObjects.Algorithm.__init__(self, "SAMPLINGTEST")
33 self.defineRequiredParameter(
34 name = "EnsembleOfSnapshots",
36 typecast = numpy.array,
37 message = "Ensemble de vecteurs d'état physique (snapshots), 1 état par colonne",
39 self.defineRequiredParameter(
40 name = "SampleAsnUplet",
43 message = "Points de calcul définis par une liste de n-uplet",
45 self.defineRequiredParameter(
46 name = "SampleAsExplicitHyperCube",
49 message = "Points de calcul définis par un hyper-cube dont on donne la liste des échantillonnages explicites de chaque variable comme une liste",
51 self.defineRequiredParameter(
52 name = "SampleAsMinMaxStepHyperCube",
55 message = "Points de calcul définis par un hyper-cube dont on donne la liste des échantillonnages implicites de chaque variable par un triplet [min,max,step]",
57 self.defineRequiredParameter(
58 name = "SampleAsMinMaxLatinHyperCube",
61 message = "Points de calcul définis par un hyper-cube Latin dont on donne les bornes de chaque variable par une paire [min,max], suivi de la paire [dimension, nombre de points demandés]",
63 self.defineRequiredParameter(
64 name = "SampleAsIndependantRandomVariables",
67 message = "Points de calcul définis par un hyper-cube dont les points sur chaque axe proviennent de l'échantillonnage indépendant de la variable selon la spécification ['distribution',[parametres],nombre]",
69 self.defineRequiredParameter(
70 name = "QualityCriterion",
71 default = "AugmentedWeightedLeastSquares",
73 message = "Critère de qualité utilisé",
76 "AugmentedWeightedLeastSquares", "AWLS",
77 "WeightedLeastSquares","WLS",
78 "LeastSquares", "LS", "L2",
79 "AbsoluteValue", "L1",
80 "MaximumError", "ME", "Linf",
83 "AugmentedPonderatedLeastSquares", "APLS",
84 "PonderatedLeastSquares", "PLS",
87 self.defineRequiredParameter(
91 message = "Activation du mode debug lors de l'exécution",
93 self.defineRequiredParameter(
94 name = "StoreSupplementaryCalculations",
97 message = "Liste de calculs supplémentaires à stocker et/ou effectuer",
103 "EnsembleOfSimulations",
106 "InnovationAtCurrentState",
107 "SimulatedObservationAtCurrentState",
110 self.defineRequiredParameter(
112 typecast = numpy.random.seed,
113 message = "Graine fixée pour le générateur aléatoire",
115 self.requireInputArguments(
116 mandatory= ("Xb", "Y", "R", "B"),
119 self.setAttributes(tags=(
123 def run(self, Xb=None, Y=None, U=None, HO=None, EM=None, CM=None, R=None, B=None, Q=None, Parameters=None):
124 self._pre_run(Parameters, Xb, Y, U, HO, EM, CM, R, B, Q)
126 if hasattr(Y,"store"):
127 Yb = numpy.asarray( Y[-1] ).reshape((-1,1)) # Y en Vector ou VectorSerie
129 Yb = numpy.asarray( Y ).reshape((-1,1)) # Y en Vector ou VectorSerie
132 def CostFunction(x, HmX, QualityMeasure="AugmentedWeightedLeastSquares"):
133 if numpy.any(numpy.isnan(HmX)):
136 Jb, Jo, J = numpy.nan, numpy.nan, numpy.nan
138 _X = numpy.asarray( x ).reshape((-1,1))
139 _HX = numpy.asarray( HmX ).reshape((-1,1))
140 _Innovation = Yb - _HX
141 assert Yb.size == _HX.size
142 assert Yb.size == _Innovation.size
143 if QualityMeasure in ["AugmentedWeightedLeastSquares","AWLS","AugmentedPonderatedLeastSquares","APLS","DA"]:
144 if BI is None or RI is None:
145 raise ValueError("Background and Observation error covariance matrix has to be properly defined!")
146 Jb = vfloat( 0.5 * (_X - Xb).T * (BI * (_X - Xb)) )
147 Jo = vfloat( 0.5 * _Innovation.T * (RI * _Innovation) )
148 elif QualityMeasure in ["WeightedLeastSquares","WLS","PonderatedLeastSquares","PLS"]:
150 raise ValueError("Observation error covariance matrix has to be properly defined!")
152 Jo = vfloat( 0.5 * _Innovation.T * (RI * _Innovation) )
153 elif QualityMeasure in ["LeastSquares","LS","L2"]:
155 Jo = vfloat( 0.5 * _Innovation.T @ _Innovation )
156 elif QualityMeasure in ["AbsoluteValue","L1"]:
158 Jo = vfloat( numpy.sum( numpy.abs(_Innovation), dtype=mfp ) )
159 elif QualityMeasure in ["MaximumError","ME", "Linf"]:
161 Jo = vfloat(numpy.max( numpy.abs(_Innovation) ))
164 if self._toStore("Innovation"):
165 self.StoredVariables["Innovation"].store( _Innovation )
166 if self._toStore("CurrentState"):
167 self.StoredVariables["CurrentState"].store( _X )
168 if self._toStore("InnovationAtCurrentState"):
169 self.StoredVariables["InnovationAtCurrentState"].store( _Innovation )
170 if self._toStore("SimulatedObservationAtCurrentState"):
171 self.StoredVariables["SimulatedObservationAtCurrentState"].store( _HX )
172 self.StoredVariables["CostFunctionJb"].store( Jb )
173 self.StoredVariables["CostFunctionJo"].store( Jo )
174 self.StoredVariables["CostFunctionJ" ].store( J )
178 if len(self._parameters["EnsembleOfSnapshots"]) > 0:
179 sampleList = NumericObjects.BuildComplexSampleList(
180 self._parameters["SampleAsnUplet"],
181 self._parameters["SampleAsExplicitHyperCube"],
182 self._parameters["SampleAsMinMaxStepHyperCube"],
183 self._parameters["SampleAsMinMaxLatinHyperCube"],
184 self._parameters["SampleAsIndependantRandomVariables"],
186 self._parameters["SetSeed"],
188 if hasattr(sampleList,"__len__") and len(sampleList) == 0:
189 EOX = numpy.array([[]])
191 EOX = numpy.stack(tuple(copy.copy(sampleList)), axis=1)
192 EOS = self._parameters["EnsembleOfSnapshots"]
193 if EOX.shape[1] != EOS.shape[1]:
194 raise ValueError("Numbers of states (=%i) and snapshots (=%i) has to be the same!"%(EOX.shape[1], EOS.shape[1]))
196 if self._toStore("EnsembleOfStates"):
197 self.StoredVariables["EnsembleOfStates"].store( EOX )
198 if self._toStore("EnsembleOfSimulations"):
199 self.StoredVariables["EnsembleOfSimulations"].store( EOS )
201 EOX, EOS = eosg.eosg(self, Xb, HO, True, False)
203 for i in range(EOS.shape[1]):
204 J, Jb, Jo = CostFunction( EOX[:,i], EOS[:,i], self._parameters["QualityCriterion"])
210 # ==============================================================================
211 if __name__ == "__main__":
212 print("\n AUTODIAGNOSTIC\n")