]> SALOME platform Git repositories - modules/adao.git/blob - src/daComposant/daAlgorithms/SamplingTest.py
Salome HOME
Documentation update and method improvement
[modules/adao.git] / src / daComposant / daAlgorithms / SamplingTest.py
1 # -*- coding: utf-8 -*-
2 #
3 # Copyright (C) 2008-2023 EDF R&D
4 #
5 # This library is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation; either
8 # version 2.1 of the License.
9 #
10 # This library is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 # Lesser General Public License for more details.
14 #
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this library; if not, write to the Free Software
17 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
18 #
19 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
20 #
21 # Author: Jean-Philippe Argaud, jean-philippe.argaud@edf.fr, EDF R&D
22
23 import numpy, logging, copy
24 from daCore import BasicObjects, NumericObjects, PlatformInfo
25 from daCore.PlatformInfo import PlatformInfo, vfloat
26 from daAlgorithms.Atoms import eosg
27 mfp = PlatformInfo().MaximumPrecision()
28
29 # ==============================================================================
30 class ElementaryAlgorithm(BasicObjects.Algorithm):
31     def __init__(self):
32         BasicObjects.Algorithm.__init__(self, "SAMPLINGTEST")
33         self.defineRequiredParameter(
34             name     = "EnsembleOfSnapshots",
35             default  = [],
36             typecast = numpy.array,
37             message  = "Ensemble de vecteurs d'état physique (snapshots), 1 état par colonne",
38             )
39         self.defineRequiredParameter(
40             name     = "SampleAsnUplet",
41             default  = [],
42             typecast = tuple,
43             message  = "Points de calcul définis par une liste de n-uplet",
44             )
45         self.defineRequiredParameter(
46             name     = "SampleAsExplicitHyperCube",
47             default  = [],
48             typecast = tuple,
49             message  = "Points de calcul définis par un hyper-cube dont on donne la liste des échantillonnages explicites de chaque variable comme une liste",
50             )
51         self.defineRequiredParameter(
52             name     = "SampleAsMinMaxStepHyperCube",
53             default  = [],
54             typecast = tuple,
55             message  = "Points de calcul définis par un hyper-cube dont on donne la liste des échantillonnages implicites de chaque variable par un triplet [min,max,step]",
56             )
57         self.defineRequiredParameter(
58             name     = "SampleAsMinMaxLatinHyperCube",
59             default  = [],
60             typecast = tuple,
61             message  = "Points de calcul définis par un hyper-cube Latin dont on donne les bornes de chaque variable par une paire [min,max], suivi de la paire [dimension, nombre de points demandés]",
62             )
63         self.defineRequiredParameter(
64             name     = "SampleAsIndependantRandomVariables",
65             default  = [],
66             typecast = tuple,
67             message  = "Points de calcul définis par un hyper-cube dont les points sur chaque axe proviennent de l'échantillonnage indépendant de la variable selon la spécification ['distribution',[parametres],nombre]",
68             )
69         self.defineRequiredParameter(
70             name     = "QualityCriterion",
71             default  = "AugmentedWeightedLeastSquares",
72             typecast = str,
73             message  = "Critère de qualité utilisé",
74             listval  = [
75                 "DA",
76                 "AugmentedWeightedLeastSquares", "AWLS",
77                 "WeightedLeastSquares","WLS",
78                 "LeastSquares", "LS", "L2",
79                 "AbsoluteValue", "L1",
80                 "MaximumError", "ME", "Linf",
81                 ],
82             listadv  = [
83                 "AugmentedPonderatedLeastSquares", "APLS",
84                 "PonderatedLeastSquares", "PLS",
85                 ],
86             )
87         self.defineRequiredParameter(
88             name     = "SetDebug",
89             default  = False,
90             typecast = bool,
91             message  = "Activation du mode debug lors de l'exécution",
92             )
93         self.defineRequiredParameter(
94             name     = "StoreSupplementaryCalculations",
95             default  = [],
96             typecast = tuple,
97             message  = "Liste de calculs supplémentaires à stocker et/ou effectuer",
98             listval  = [
99                 "CostFunctionJ",
100                 "CostFunctionJb",
101                 "CostFunctionJo",
102                 "CurrentState",
103                 "EnsembleOfSimulations",
104                 "EnsembleOfStates",
105                 "Innovation",
106                 "InnovationAtCurrentState",
107                 "SimulatedObservationAtCurrentState",
108                 ]
109             )
110         self.defineRequiredParameter(
111             name     = "SetSeed",
112             typecast = numpy.random.seed,
113             message  = "Graine fixée pour le générateur aléatoire",
114             )
115         self.requireInputArguments(
116             mandatory= ("Xb", "Y", "R", "B"),
117             optional = ("HO"),
118             )
119         self.setAttributes(tags=(
120             "Checking",
121             ))
122
123     def run(self, Xb=None, Y=None, U=None, HO=None, EM=None, CM=None, R=None, B=None, Q=None, Parameters=None):
124         self._pre_run(Parameters, Xb, Y, U, HO, EM, CM, R, B, Q)
125         #
126         if hasattr(Y,"store"):
127             Yb = numpy.asarray( Y[-1] ).reshape((-1,1)) # Y en Vector ou VectorSerie
128         else:
129             Yb = numpy.asarray( Y ).reshape((-1,1)) # Y en Vector ou VectorSerie
130         BI = B.getI()
131         RI = R.getI()
132         def CostFunction(x, HmX, QualityMeasure="AugmentedWeightedLeastSquares"):
133             if numpy.any(numpy.isnan(HmX)):
134                 _X  = numpy.nan
135                 _HX = numpy.nan
136                 Jb, Jo, J = numpy.nan, numpy.nan, numpy.nan
137             else:
138                 _X  = numpy.asarray( x ).reshape((-1,1))
139                 _HX = numpy.asarray( HmX ).reshape((-1,1))
140                 _Innovation = Yb - _HX
141                 assert Yb.size == _HX.size
142                 assert Yb.size == _Innovation.size
143                 if QualityMeasure in ["AugmentedWeightedLeastSquares","AWLS","AugmentedPonderatedLeastSquares","APLS","DA"]:
144                     if BI is None or RI is None:
145                         raise ValueError("Background and Observation error covariance matrix has to be properly defined!")
146                     Jb  = vfloat( 0.5 *  (_X - Xb).T * (BI * (_X - Xb))  )
147                     Jo  = vfloat( 0.5 * _Innovation.T * (RI * _Innovation) )
148                 elif QualityMeasure in ["WeightedLeastSquares","WLS","PonderatedLeastSquares","PLS"]:
149                     if RI is None:
150                         raise ValueError("Observation error covariance matrix has to be properly defined!")
151                     Jb  = 0.
152                     Jo  = vfloat( 0.5 * _Innovation.T * (RI * _Innovation) )
153                 elif QualityMeasure in ["LeastSquares","LS","L2"]:
154                     Jb  = 0.
155                     Jo  = vfloat( 0.5 * _Innovation.T @ _Innovation )
156                 elif QualityMeasure in ["AbsoluteValue","L1"]:
157                     Jb  = 0.
158                     Jo  = vfloat( numpy.sum( numpy.abs(_Innovation), dtype=mfp ) )
159                 elif QualityMeasure in ["MaximumError","ME", "Linf"]:
160                     Jb  = 0.
161                     Jo  = vfloat(numpy.max( numpy.abs(_Innovation) ))
162                 #
163                 J   = Jb + Jo
164             if self._toStore("Innovation"):
165                 self.StoredVariables["Innovation"].store( _Innovation )
166             if self._toStore("CurrentState"):
167                 self.StoredVariables["CurrentState"].store( _X )
168             if self._toStore("InnovationAtCurrentState"):
169                 self.StoredVariables["InnovationAtCurrentState"].store( _Innovation )
170             if self._toStore("SimulatedObservationAtCurrentState"):
171                 self.StoredVariables["SimulatedObservationAtCurrentState"].store( _HX )
172             self.StoredVariables["CostFunctionJb"].store( Jb )
173             self.StoredVariables["CostFunctionJo"].store( Jo )
174             self.StoredVariables["CostFunctionJ" ].store( J )
175             return J, Jb, Jo
176         #
177         # ----------
178         if len(self._parameters["EnsembleOfSnapshots"]) > 0:
179             sampleList = NumericObjects.BuildComplexSampleList(
180                 self._parameters["SampleAsnUplet"],
181                 self._parameters["SampleAsExplicitHyperCube"],
182                 self._parameters["SampleAsMinMaxStepHyperCube"],
183                 self._parameters["SampleAsMinMaxLatinHyperCube"],
184                 self._parameters["SampleAsIndependantRandomVariables"],
185                 Xb,
186                 self._parameters["SetSeed"],
187                 )
188             if hasattr(sampleList,"__len__") and len(sampleList) == 0:
189                 EOX = numpy.array([[]])
190             else:
191                 EOX = numpy.stack(tuple(copy.copy(sampleList)), axis=1)
192             EOS = self._parameters["EnsembleOfSnapshots"]
193             if EOX.shape[1] != EOS.shape[1]:
194                 raise ValueError("Numbers of states (=%i) and snapshots (=%i) has to be the same!"%(EOX.shape[1], EOS.shape[1]))
195             #
196             if self._toStore("EnsembleOfStates"):
197                 self.StoredVariables["EnsembleOfStates"].store( EOX )
198             if self._toStore("EnsembleOfSimulations"):
199                 self.StoredVariables["EnsembleOfSimulations"].store( EOS )
200         else:
201             EOX, EOS = eosg.eosg(self, Xb, HO, True, False)
202         #
203         for i in range(EOS.shape[1]):
204             J, Jb, Jo = CostFunction( EOX[:,i], EOS[:,i],  self._parameters["QualityCriterion"])
205         # ----------
206         #
207         self._post_run(HO)
208         return 0
209
210 # ==============================================================================
211 if __name__ == "__main__":
212     print("\n AUTODIAGNOSTIC\n")