Salome HOME
Minor documentation and code review corrections (40)
[modules/adao.git] / src / daComposant / daAlgorithms / SamplingTest.py
1 # -*- coding: utf-8 -*-
2 #
3 # Copyright (C) 2008-2023 EDF R&D
4 #
5 # This library is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation; either
8 # version 2.1 of the License.
9 #
10 # This library is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 # Lesser General Public License for more details.
14 #
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this library; if not, write to the Free Software
17 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
18 #
19 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
20 #
21 # Author: Jean-Philippe Argaud, jean-philippe.argaud@edf.fr, EDF R&D
22
23 import numpy, logging
24 from daCore import BasicObjects, NumericObjects, PlatformInfo
25 from daAlgorithms.Atoms import eosg
26 mfp = PlatformInfo.PlatformInfo().MaximumPrecision()
27
28 # ==============================================================================
29 class ElementaryAlgorithm(BasicObjects.Algorithm):
30     def __init__(self):
31         BasicObjects.Algorithm.__init__(self, "SAMPLINGTEST")
32         self.defineRequiredParameter(
33             name     = "SampleAsnUplet",
34             default  = [],
35             typecast = tuple,
36             message  = "Points de calcul définis par une liste de n-uplet",
37             )
38         self.defineRequiredParameter(
39             name     = "SampleAsExplicitHyperCube",
40             default  = [],
41             typecast = tuple,
42             message  = "Points de calcul définis par un hyper-cube dont on donne la liste des échantillonnages de chaque variable comme une liste",
43             )
44         self.defineRequiredParameter(
45             name     = "SampleAsMinMaxStepHyperCube",
46             default  = [],
47             typecast = tuple,
48             message  = "Points de calcul définis par un hyper-cube dont on donne la liste des échantillonnages de chaque variable par un triplet [min,max,step]",
49             )
50         self.defineRequiredParameter(
51             name     = "SampleAsIndependantRandomVariables",
52             default  = [],
53             typecast = tuple,
54             message  = "Points de calcul définis par un hyper-cube dont les points sur chaque axe proviennent de l'échantillonnage indépendant de la variable selon la spécification ['distribution',[parametres],nombre]",
55             )
56         self.defineRequiredParameter(
57             name     = "QualityCriterion",
58             default  = "AugmentedWeightedLeastSquares",
59             typecast = str,
60             message  = "Critère de qualité utilisé",
61             listval  = [
62                 "DA",
63                 "AugmentedWeightedLeastSquares", "AWLS",
64                 "WeightedLeastSquares","WLS",
65                 "LeastSquares", "LS", "L2",
66                 "AbsoluteValue", "L1",
67                 "MaximumError", "ME", "Linf",
68                 ],
69             listadv  = [
70                 "AugmentedPonderatedLeastSquares", "APLS",
71                 "PonderatedLeastSquares", "PLS",
72                 ],
73             )
74         self.defineRequiredParameter(
75             name     = "SetDebug",
76             default  = False,
77             typecast = bool,
78             message  = "Activation du mode debug lors de l'exécution",
79             )
80         self.defineRequiredParameter(
81             name     = "StoreSupplementaryCalculations",
82             default  = [],
83             typecast = tuple,
84             message  = "Liste de calculs supplémentaires à stocker et/ou effectuer",
85             listval  = [
86                 "CostFunctionJ",
87                 "CostFunctionJb",
88                 "CostFunctionJo",
89                 "CurrentState",
90                 "EnsembleOfSimulations",
91                 "EnsembleOfStates",
92                 "Innovation",
93                 "InnovationAtCurrentState",
94                 "SimulatedObservationAtCurrentState",
95                 ]
96             )
97         self.defineRequiredParameter(
98             name     = "SetSeed",
99             typecast = numpy.random.seed,
100             message  = "Graine fixée pour le générateur aléatoire",
101             )
102         self.requireInputArguments(
103             mandatory= ("Xb", "Y", "HO", "R", "B"),
104             )
105         self.setAttributes(tags=(
106             "Checking",
107             ))
108
109     def run(self, Xb=None, Y=None, U=None, HO=None, EM=None, CM=None, R=None, B=None, Q=None, Parameters=None):
110         self._pre_run(Parameters, Xb, Y, U, HO, EM, CM, R, B, Q)
111         #
112         if hasattr(Y,"store"):
113             Yb = numpy.asarray( Y[-1] ).reshape((-1,1)) # Y en Vector ou VectorSerie
114         else:
115             Yb = numpy.asarray( Y ).reshape((-1,1)) # Y en Vector ou VectorSerie
116         BI = B.getI()
117         RI = R.getI()
118         def CostFunction(x, HmX, QualityMeasure="AugmentedWeightedLeastSquares"):
119             if numpy.any(numpy.isnan(HmX)):
120                 _X  = numpy.nan
121                 _HX = numpy.nan
122                 Jb, Jo, J = numpy.nan, numpy.nan, numpy.nan
123             else:
124                 _X  = numpy.asarray( x ).reshape((-1,1))
125                 _HX = numpy.asarray( HmX ).reshape((-1,1))
126                 _Innovation = Yb - _HX
127                 assert Yb.size == _HX.size
128                 assert Yb.size == _Innovation.size
129                 if QualityMeasure in ["AugmentedWeightedLeastSquares","AWLS","AugmentedPonderatedLeastSquares","APLS","DA"]:
130                     if BI is None or RI is None:
131                         raise ValueError("Background and Observation error covariance matrix has to be properly defined!")
132                     Jb  = float( 0.5 *  (_X - Xb).T * (BI * (_X - Xb))  )
133                     Jo  = float( 0.5 * _Innovation.T * (RI * _Innovation) )
134                 elif QualityMeasure in ["WeightedLeastSquares","WLS","PonderatedLeastSquares","PLS"]:
135                     if RI is None:
136                         raise ValueError("Observation error covariance matrix has to be properly defined!")
137                     Jb  = 0.
138                     Jo  = float( 0.5 * _Innovation.T * (RI * _Innovation) )
139                 elif QualityMeasure in ["LeastSquares","LS","L2"]:
140                     Jb  = 0.
141                     Jo  = float( 0.5 * _Innovation.T @ _Innovation )
142                 elif QualityMeasure in ["AbsoluteValue","L1"]:
143                     Jb  = 0.
144                     Jo  = float( numpy.sum( numpy.abs(_Innovation), dtype=mfp ) )
145                 elif QualityMeasure in ["MaximumError","ME", "Linf"]:
146                     Jb  = 0.
147                     Jo  = numpy.max( numpy.abs(_Innovation) )
148                 #
149                 J   = Jb + Jo
150             if self._toStore("Innovation"):
151                 self.StoredVariables["Innovation"].store( _Innovation )
152             if self._toStore("CurrentState"):
153                 self.StoredVariables["CurrentState"].store( _X )
154             if self._toStore("InnovationAtCurrentState"):
155                 self.StoredVariables["InnovationAtCurrentState"].store( _Innovation )
156             if self._toStore("SimulatedObservationAtCurrentState"):
157                 self.StoredVariables["SimulatedObservationAtCurrentState"].store( _HX )
158             self.StoredVariables["CostFunctionJb"].store( Jb )
159             self.StoredVariables["CostFunctionJo"].store( Jo )
160             self.StoredVariables["CostFunctionJ" ].store( J )
161             return J, Jb, Jo
162         #
163         # ----------
164         EOX, EOS = eosg.eosg(self, Xb, HO, True)
165         #
166         for i in range(EOS.shape[1]):
167             J, Jb, Jo = CostFunction( EOX[:,i], EOS[:,i],  self._parameters["QualityCriterion"])
168         # ----------
169         #
170         self._post_run(HO)
171         return 0
172
173 # ==============================================================================
174 if __name__ == "__main__":
175     print('\n AUTODIAGNOSTIC\n')