Salome HOME
Documentation update with features and review corrections
[modules/adao.git] / src / daComposant / daAlgorithms / SamplingTest.py
1 # -*- coding: utf-8 -*-
2 #
3 # Copyright (C) 2008-2024 EDF R&D
4 #
5 # This library is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation; either
8 # version 2.1 of the License.
9 #
10 # This library is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 # Lesser General Public License for more details.
14 #
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this library; if not, write to the Free Software
17 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
18 #
19 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
20 #
21 # Author: Jean-Philippe Argaud, jean-philippe.argaud@edf.fr, EDF R&D
22
23 import numpy, copy
24 from daCore import BasicObjects, NumericObjects, PlatformInfo
25 from daCore.PlatformInfo import vfloat
26 from daAlgorithms.Atoms import eosg
27 mfp = PlatformInfo.PlatformInfo().MaximumPrecision()
28
29 # ==============================================================================
30 class ElementaryAlgorithm(BasicObjects.Algorithm):
31     def __init__(self):
32         BasicObjects.Algorithm.__init__(self, "SAMPLINGTEST")
33         self.defineRequiredParameter(
34             name     = "EnsembleOfSnapshots",
35             default  = [],
36             typecast = numpy.array,
37             message  = "Ensemble de vecteurs d'état physique (snapshots), 1 état par colonne",
38         )
39         self.defineRequiredParameter(
40             name     = "SampleAsnUplet",
41             default  = [],
42             typecast = tuple,
43             message  = "Points de calcul définis par une liste de n-uplet",
44         )
45         self.defineRequiredParameter(
46             name     = "SampleAsExplicitHyperCube",
47             default  = [],
48             typecast = tuple,
49             message  = "Points de calcul définis par un hyper-cube dont on donne la liste des échantillonnages explicites de chaque variable comme une liste",  # noqa: E501
50         )
51         self.defineRequiredParameter(
52             name     = "SampleAsMinMaxStepHyperCube",
53             default  = [],
54             typecast = tuple,
55             message  = "Points de calcul définis par un hyper-cube dont on donne la liste des échantillonnages implicites de chaque variable par un triplet [min,max,step]",  # noqa: E501
56         )
57         self.defineRequiredParameter(
58             name     = "SampleAsMinMaxLatinHyperCube",
59             default  = [],
60             typecast = tuple,
61             message  = "Points de calcul définis par un hyper-cube Latin dont on donne les bornes de chaque variable par une paire [min,max], suivi de la paire [dimension, nombre de points demandés]",  # noqa: E501
62         )
63         self.defineRequiredParameter(
64             name     = "SampleAsMinMaxSobolSequence",
65             default  = [],
66             typecast = tuple,
67             message  = "Points de calcul définis par une séquence de Sobol dont on donne les bornes de chaque variable par une paire [min,max], suivi de la paire [dimension, nombre minimal de points demandés]",  # noqa: E501
68         )
69         self.defineRequiredParameter(
70             name     = "SampleAsIndependantRandomVariables",
71             default  = [],
72             typecast = tuple,
73             message  = "Points de calcul définis par un hyper-cube dont les points sur chaque axe proviennent de l'échantillonnage indépendant de la variable selon la spécification ['distribution',[parametres],nombre]",  # noqa: E501
74         )
75         self.defineRequiredParameter(
76             name     = "QualityCriterion",
77             default  = "AugmentedWeightedLeastSquares",
78             typecast = str,
79             message  = "Critère de qualité utilisé",
80             listval  = [
81                 "DA",
82                 "AugmentedWeightedLeastSquares", "AWLS",
83                 "WeightedLeastSquares", "WLS",
84                 "LeastSquares", "LS", "L2",
85                 "AbsoluteValue", "L1",
86                 "MaximumError", "ME", "Linf",
87             ],
88             listadv  = [
89                 "AugmentedPonderatedLeastSquares", "APLS",
90                 "PonderatedLeastSquares", "PLS",
91             ],
92         )
93         self.defineRequiredParameter(
94             name     = "SetDebug",
95             default  = False,
96             typecast = bool,
97             message  = "Activation du mode debug lors de l'exécution",
98         )
99         self.defineRequiredParameter(
100             name     = "StoreSupplementaryCalculations",
101             default  = [],
102             typecast = tuple,
103             message  = "Liste de calculs supplémentaires à stocker et/ou effectuer",
104             listval  = [
105                 "CostFunctionJ",
106                 "CostFunctionJb",
107                 "CostFunctionJo",
108                 "CurrentState",
109                 "EnsembleOfSimulations",
110                 "EnsembleOfStates",
111                 "Innovation",
112                 "InnovationAtCurrentState",
113                 "SimulatedObservationAtCurrentState",
114             ]
115         )
116         self.defineRequiredParameter(
117             name     = "SetSeed",
118             typecast = numpy.random.seed,
119             message  = "Graine fixée pour le générateur aléatoire",
120         )
121         self.requireInputArguments(
122             mandatory= ("Xb", "Y", "R", "B"),
123             optional = ("HO"),
124         )
125         self.setAttributes(
126             tags=(
127                 "Checking",
128             ),
129             features=(
130                 "DerivativeFree",
131                 "ParallelAlgorithm",
132             ),
133         )
134
135     def run(self, Xb=None, Y=None, U=None, HO=None, EM=None, CM=None, R=None, B=None, Q=None, Parameters=None):
136         self._pre_run(Parameters, Xb, Y, U, HO, EM, CM, R, B, Q)
137         #
138         if hasattr(Y, "store"):
139             Yb = numpy.asarray( Y[-1] ).reshape((-1, 1))  # Y en Vector ou VectorSerie
140         else:
141             Yb = numpy.asarray( Y ).reshape((-1, 1))  # Y en Vector ou VectorSerie
142         BI = B.getI()
143         RI = R.getI()
144
145         def CostFunction(x, HmX, QualityMeasure="AugmentedWeightedLeastSquares"):
146             if numpy.any(numpy.isnan(HmX)):
147                 _X  = numpy.nan
148                 _HX = numpy.nan
149                 Jb, Jo, J = numpy.nan, numpy.nan, numpy.nan
150             else:
151                 _X  = numpy.asarray( x ).reshape((-1, 1))
152                 _HX = numpy.asarray( HmX ).reshape((-1, 1))
153                 _Innovation = Yb - _HX
154                 assert Yb.size == _HX.size
155                 assert Yb.size == _Innovation.size
156                 if QualityMeasure in ["AugmentedWeightedLeastSquares", "AWLS", "AugmentedPonderatedLeastSquares", "APLS", "DA"]:  # noqa: E501
157                     if BI is None or RI is None:
158                         raise ValueError("Background and Observation error covariance matrix has to be properly defined!")  # noqa: E501
159                     Jb = vfloat( 0.5 * (_X - Xb).T * (BI * (_X - Xb))  )
160                     Jo = vfloat( 0.5 * _Innovation.T * (RI * _Innovation) )
161                 elif QualityMeasure in ["WeightedLeastSquares", "WLS", "PonderatedLeastSquares", "PLS"]:
162                     if RI is None:
163                         raise ValueError("Observation error covariance matrix has to be properly defined!")
164                     Jb  = 0.
165                     Jo  = vfloat( 0.5 * _Innovation.T * (RI * _Innovation) )
166                 elif QualityMeasure in ["LeastSquares", "LS", "L2"]:
167                     Jb  = 0.
168                     Jo  = vfloat( 0.5 * _Innovation.T @ _Innovation )
169                 elif QualityMeasure in ["AbsoluteValue", "L1"]:
170                     Jb  = 0.
171                     Jo  = vfloat( numpy.sum( numpy.abs(_Innovation), dtype=mfp ) )
172                 elif QualityMeasure in ["MaximumError", "ME", "Linf"]:
173                     Jb  = 0.
174                     Jo  = vfloat(numpy.max( numpy.abs(_Innovation) ))
175                 #
176                 J   = Jb + Jo
177             if self._toStore("Innovation"):
178                 self.StoredVariables["Innovation"].store( _Innovation )
179             if self._toStore("CurrentState"):
180                 self.StoredVariables["CurrentState"].store( _X )
181             if self._toStore("InnovationAtCurrentState"):
182                 self.StoredVariables["InnovationAtCurrentState"].store( _Innovation )
183             if self._toStore("SimulatedObservationAtCurrentState"):
184                 self.StoredVariables["SimulatedObservationAtCurrentState"].store( _HX )
185             self.StoredVariables["CostFunctionJb"].store( Jb )
186             self.StoredVariables["CostFunctionJo"].store( Jo )
187             self.StoredVariables["CostFunctionJ" ].store( J )
188             return J, Jb, Jo
189         #
190         # ----------
191         if len(self._parameters["EnsembleOfSnapshots"]) > 0:
192             sampleList = NumericObjects.BuildComplexSampleList(
193                 self._parameters["SampleAsnUplet"],
194                 self._parameters["SampleAsExplicitHyperCube"],
195                 self._parameters["SampleAsMinMaxStepHyperCube"],
196                 self._parameters["SampleAsMinMaxLatinHyperCube"],
197                 self._parameters["SampleAsMinMaxSobolSequence"],
198                 self._parameters["SampleAsIndependantRandomVariables"],
199                 Xb,
200                 self._parameters["SetSeed"],
201             )
202             if hasattr(sampleList, "__len__") and len(sampleList) == 0:
203                 EOX = numpy.array([[]])
204             else:
205                 EOX = numpy.stack(tuple(copy.copy(sampleList)), axis=1)
206             EOS = self._parameters["EnsembleOfSnapshots"]
207             if EOX.shape[1] != EOS.shape[1]:
208                 raise ValueError("Numbers of states (=%i) and snapshots (=%i) has to be the same!"%(EOX.shape[1], EOS.shape[1]))  # noqa: E501
209             #
210             if self._toStore("EnsembleOfStates"):
211                 self.StoredVariables["EnsembleOfStates"].store( EOX )
212             if self._toStore("EnsembleOfSimulations"):
213                 self.StoredVariables["EnsembleOfSimulations"].store( EOS )
214         else:
215             EOX, EOS = eosg.eosg(self, Xb, HO, True, False)
216         #
217         for i in range(EOS.shape[1]):
218             J, Jb, Jo = CostFunction( EOX[:, i], EOS[:, i], self._parameters["QualityCriterion"])
219         # ----------
220         #
221         self._post_run(HO, EM)
222         return 0
223
224 # ==============================================================================
225 if __name__ == "__main__":
226     print("\n AUTODIAGNOSTIC\n")