Salome HOME
Minor documentation and code review corrections (33)
[modules/adao.git] / src / daComposant / daAlgorithms / SamplingTest.py
1 # -*- coding: utf-8 -*-
2 #
3 # Copyright (C) 2008-2022 EDF R&D
4 #
5 # This library is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation; either
8 # version 2.1 of the License.
9 #
10 # This library is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 # Lesser General Public License for more details.
14 #
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this library; if not, write to the Free Software
17 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
18 #
19 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
20 #
21 # Author: Jean-Philippe Argaud, jean-philippe.argaud@edf.fr, EDF R&D
22
23 import numpy, logging, itertools
24 from daCore import BasicObjects, NumericObjects
25 from daCore.PlatformInfo import PlatformInfo
26 mfp = PlatformInfo().MaximumPrecision()
27
28 # ==============================================================================
29 class ElementaryAlgorithm(BasicObjects.Algorithm):
30     def __init__(self):
31         BasicObjects.Algorithm.__init__(self, "SAMPLINGTEST")
32         self.defineRequiredParameter(
33             name     = "SampleAsnUplet",
34             default  = [],
35             typecast = tuple,
36             message  = "Points de calcul définis par une liste de n-uplet",
37             )
38         self.defineRequiredParameter(
39             name     = "SampleAsExplicitHyperCube",
40             default  = [],
41             typecast = tuple,
42             message  = "Points de calcul définis par un hyper-cube dont on donne la liste des échantillonnages de chaque variable comme une liste",
43             )
44         self.defineRequiredParameter(
45             name     = "SampleAsMinMaxStepHyperCube",
46             default  = [],
47             typecast = tuple,
48             message  = "Points de calcul définis par un hyper-cube dont on donne la liste des échantillonnages de chaque variable par un triplet [min,max,step]",
49             )
50         self.defineRequiredParameter(
51             name     = "SampleAsIndependantRandomVariables",
52             default  = [],
53             typecast = tuple,
54             message  = "Points de calcul définis par un hyper-cube dont les points sur chaque axe proviennent de l'échantillonnage indépendant de la variable selon la spécification ['distribution',[parametres],nombre]",
55             )
56         self.defineRequiredParameter(
57             name     = "QualityCriterion",
58             default  = "AugmentedWeightedLeastSquares",
59             typecast = str,
60             message  = "Critère de qualité utilisé",
61             listval  = [
62                 "DA",
63                 "AugmentedWeightedLeastSquares", "AWLS",
64                 "WeightedLeastSquares","WLS",
65                 "LeastSquares", "LS", "L2",
66                 "AbsoluteValue", "L1",
67                 "MaximumError", "ME", "Linf",
68                 ],
69             listadv  = [
70                 "AugmentedPonderatedLeastSquares", "APLS",
71                 "PonderatedLeastSquares", "PLS",
72                 ],
73             )
74         self.defineRequiredParameter(
75             name     = "SetDebug",
76             default  = False,
77             typecast = bool,
78             message  = "Activation du mode debug lors de l'exécution",
79             )
80         self.defineRequiredParameter(
81             name     = "StoreSupplementaryCalculations",
82             default  = [],
83             typecast = tuple,
84             message  = "Liste de calculs supplémentaires à stocker et/ou effectuer",
85             listval  = [
86                 "CostFunctionJ",
87                 "CostFunctionJb",
88                 "CostFunctionJo",
89                 "CurrentState",
90                 "InnovationAtCurrentState",
91                 "SimulatedObservationAtCurrentState",
92                 ]
93             )
94         self.defineRequiredParameter(
95             name     = "SetSeed",
96             typecast = numpy.random.seed,
97             message  = "Graine fixée pour le générateur aléatoire",
98             )
99         self.requireInputArguments(
100             mandatory= ("Xb", "HO"),
101             )
102         self.setAttributes(tags=(
103             "Checking",
104             ))
105
106     def run(self, Xb=None, Y=None, U=None, HO=None, EM=None, CM=None, R=None, B=None, Q=None, Parameters=None):
107         self._pre_run(Parameters, Xb, Y, U, HO, EM, CM, R, B, Q)
108         #
109         Hm = HO["Direct"].appliedTo
110         #
111         X0 = numpy.ravel( Xb )
112         Y0 = numpy.ravel( Y  )
113         #
114         # ---------------------------
115         sampleList = NumericObjects.BuildComplexSampleList(
116             self._parameters["SampleAsnUplet"],
117             self._parameters["SampleAsExplicitHyperCube"],
118             self._parameters["SampleAsMinMaxStepHyperCube"],
119             self._parameters["SampleAsIndependantRandomVariables"],
120             X0,
121             )
122         # ----------
123         BI = B.getI()
124         RI = R.getI()
125         def CostFunction(x, HmX, QualityMeasure="AugmentedWeightedLeastSquares"):
126             if numpy.any(numpy.isnan(HmX)):
127                 _X  = numpy.nan
128                 _HX = numpy.nan
129                 Jb, Jo, J = numpy.nan, numpy.nan, numpy.nan
130             else:
131                 _X  = numpy.ravel( x )
132                 _HX = numpy.ravel( HmX )
133                 if QualityMeasure in ["AugmentedWeightedLeastSquares","AWLS","AugmentedPonderatedLeastSquares","APLS","DA"]:
134                     if BI is None or RI is None:
135                         raise ValueError("Background and Observation error covariance matrix has to be properly defined!")
136                     Jb  = float( 0.5 *  (_X - X0).T * (BI * (_X - X0))  )
137                     Jo  = float( 0.5 * (Y0 - _HX).T * (RI * (Y0 - _HX)) )
138                 elif QualityMeasure in ["WeightedLeastSquares","WLS","PonderatedLeastSquares","PLS"]:
139                     if RI is None:
140                         raise ValueError("Observation error covariance matrix has to be properly defined!")
141                     Jb  = 0.
142                     Jo  = float( 0.5 * (Y0 - _HX).T * (RI * (Y0 - _HX)) )
143                 elif QualityMeasure in ["LeastSquares","LS","L2"]:
144                     Jb  = 0.
145                     Jo  = float( 0.5 * (Y0 - _HX).T @ (Y0 - _HX) )
146                 elif QualityMeasure in ["AbsoluteValue","L1"]:
147                     Jb  = 0.
148                     Jo  = float( numpy.sum( numpy.abs(Y0 - _HX), dtype=mfp ) )
149                 elif QualityMeasure in ["MaximumError","ME", "Linf"]:
150                     Jb  = 0.
151                     Jo  = numpy.max( numpy.abs(Y0 - _HX) )
152                 #
153                 J   = Jb + Jo
154             if self._toStore("CurrentState"):
155                 self.StoredVariables["CurrentState"].store( _X )
156             if self._toStore("InnovationAtCurrentState"):
157                 self.StoredVariables["InnovationAtCurrentState"].store( Y0 - _HX )
158             if self._toStore("SimulatedObservationAtCurrentState"):
159                 self.StoredVariables["SimulatedObservationAtCurrentState"].store( _HX )
160             self.StoredVariables["CostFunctionJb"].store( Jb )
161             self.StoredVariables["CostFunctionJo"].store( Jo )
162             self.StoredVariables["CostFunctionJ" ].store( J )
163             return J, Jb, Jo
164         # ----------
165         if self._parameters["SetDebug"]:
166             CUR_LEVEL = logging.getLogger().getEffectiveLevel()
167             logging.getLogger().setLevel(logging.DEBUG)
168             print("===> Beginning of evaluation, activating debug\n")
169             print("     %s\n"%("-"*75,))
170         #
171         # ----------
172         for i,Xx in enumerate(sampleList):
173             if self._parameters["SetDebug"]:
174                 print("===> Launching evaluation for state %i"%i)
175             try:
176                 Yy = Hm( numpy.ravel( Xx ) )
177             except:
178                 Yy = numpy.nan
179             #
180             J, Jb, Jo = CostFunction( Xx, Yy,  self._parameters["QualityCriterion"])
181         # ----------
182         #
183         if self._parameters["SetDebug"]:
184             print("\n     %s\n"%("-"*75,))
185             print("===> End evaluation, deactivating debug if necessary\n")
186             logging.getLogger().setLevel(CUR_LEVEL)
187         #
188         self._post_run(HO)
189         return 0
190
191 # ==============================================================================
192 if __name__ == "__main__":
193     print('\n AUTODIAGNOSTIC\n')