]> SALOME platform Git repositories - modules/adao.git/blob - src/daComposant/daAlgorithms/SamplingTest.py
Salome HOME
Extending sampling control and output
[modules/adao.git] / src / daComposant / daAlgorithms / SamplingTest.py
1 # -*- coding: utf-8 -*-
2 #
3 # Copyright (C) 2008-2023 EDF R&D
4 #
5 # This library is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation; either
8 # version 2.1 of the License.
9 #
10 # This library is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 # Lesser General Public License for more details.
14 #
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this library; if not, write to the Free Software
17 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
18 #
19 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
20 #
21 # Author: Jean-Philippe Argaud, jean-philippe.argaud@edf.fr, EDF R&D
22
23 import numpy, logging
24 from daCore import BasicObjects, NumericObjects
25 from daAlgorithms.Atoms import eosg
26 from daCore.PlatformInfo import PlatformInfo
27 mfp = PlatformInfo().MaximumPrecision()
28
29 # ==============================================================================
30 class ElementaryAlgorithm(BasicObjects.Algorithm):
31     def __init__(self):
32         BasicObjects.Algorithm.__init__(self, "SAMPLINGTEST")
33         self.defineRequiredParameter(
34             name     = "SampleAsnUplet",
35             default  = [],
36             typecast = tuple,
37             message  = "Points de calcul définis par une liste de n-uplet",
38             )
39         self.defineRequiredParameter(
40             name     = "SampleAsExplicitHyperCube",
41             default  = [],
42             typecast = tuple,
43             message  = "Points de calcul définis par un hyper-cube dont on donne la liste des échantillonnages de chaque variable comme une liste",
44             )
45         self.defineRequiredParameter(
46             name     = "SampleAsMinMaxStepHyperCube",
47             default  = [],
48             typecast = tuple,
49             message  = "Points de calcul définis par un hyper-cube dont on donne la liste des échantillonnages de chaque variable par un triplet [min,max,step]",
50             )
51         self.defineRequiredParameter(
52             name     = "SampleAsIndependantRandomVariables",
53             default  = [],
54             typecast = tuple,
55             message  = "Points de calcul définis par un hyper-cube dont les points sur chaque axe proviennent de l'échantillonnage indépendant de la variable selon la spécification ['distribution',[parametres],nombre]",
56             )
57         self.defineRequiredParameter(
58             name     = "QualityCriterion",
59             default  = "AugmentedWeightedLeastSquares",
60             typecast = str,
61             message  = "Critère de qualité utilisé",
62             listval  = [
63                 "DA",
64                 "AugmentedWeightedLeastSquares", "AWLS",
65                 "WeightedLeastSquares","WLS",
66                 "LeastSquares", "LS", "L2",
67                 "AbsoluteValue", "L1",
68                 "MaximumError", "ME", "Linf",
69                 ],
70             listadv  = [
71                 "AugmentedPonderatedLeastSquares", "APLS",
72                 "PonderatedLeastSquares", "PLS",
73                 ],
74             )
75         self.defineRequiredParameter(
76             name     = "SetDebug",
77             default  = False,
78             typecast = bool,
79             message  = "Activation du mode debug lors de l'exécution",
80             )
81         self.defineRequiredParameter(
82             name     = "StoreSupplementaryCalculations",
83             default  = [],
84             typecast = tuple,
85             message  = "Liste de calculs supplémentaires à stocker et/ou effectuer",
86             listval  = [
87                 "CostFunctionJ",
88                 "CostFunctionJb",
89                 "CostFunctionJo",
90                 "CurrentState",
91                 "EnsembleOfSimulations",
92                 "EnsembleOfStates",
93                 "InnovationAtCurrentState",
94                 "SimulatedObservationAtCurrentState",
95                 ]
96             )
97         self.defineRequiredParameter(
98             name     = "SetSeed",
99             typecast = numpy.random.seed,
100             message  = "Graine fixée pour le générateur aléatoire",
101             )
102         self.requireInputArguments(
103             mandatory= ("Xb", "Y", "HO", "R", "B"),
104             )
105         self.setAttributes(tags=(
106             "Checking",
107             ))
108
109     def run(self, Xb=None, Y=None, U=None, HO=None, EM=None, CM=None, R=None, B=None, Q=None, Parameters=None):
110         self._pre_run(Parameters, Xb, Y, U, HO, EM, CM, R, B, Q)
111         #
112         if hasattr(Y,"store"):
113             Yb = numpy.asarray( Y[-1] ).reshape((-1,1)) # Y en Vector ou VectorSerie
114         else:
115             Yb = numpy.asarray( Y ).reshape((-1,1)) # Y en Vector ou VectorSerie
116         BI = B.getI()
117         RI = R.getI()
118         def CostFunction(x, HmX, QualityMeasure="AugmentedWeightedLeastSquares"):
119             if numpy.any(numpy.isnan(HmX)):
120                 _X  = numpy.nan
121                 _HX = numpy.nan
122                 Jb, Jo, J = numpy.nan, numpy.nan, numpy.nan
123             else:
124                 _X  = numpy.asarray( x ).reshape((-1,1))
125                 _HX = numpy.asarray( HmX ).reshape((-1,1))
126                 _Innovation = Yb - _HX
127                 assert Yb.size == _HX.size
128                 assert Yb.size == _Innovation.size
129                 if QualityMeasure in ["AugmentedWeightedLeastSquares","AWLS","AugmentedPonderatedLeastSquares","APLS","DA"]:
130                     if BI is None or RI is None:
131                         raise ValueError("Background and Observation error covariance matrix has to be properly defined!")
132                     Jb  = float( 0.5 *  (_X - Xb).T * (BI * (_X - Xb))  )
133                     Jo  = float( 0.5 * _Innovation.T * (RI * _Innovation) )
134                 elif QualityMeasure in ["WeightedLeastSquares","WLS","PonderatedLeastSquares","PLS"]:
135                     if RI is None:
136                         raise ValueError("Observation error covariance matrix has to be properly defined!")
137                     Jb  = 0.
138                     Jo  = float( 0.5 * _Innovation.T * (RI * _Innovation) )
139                 elif QualityMeasure in ["LeastSquares","LS","L2"]:
140                     Jb  = 0.
141                     Jo  = float( 0.5 * _Innovation.T @ _Innovation )
142                 elif QualityMeasure in ["AbsoluteValue","L1"]:
143                     Jb  = 0.
144                     Jo  = float( numpy.sum( numpy.abs(_Innovation), dtype=mfp ) )
145                 elif QualityMeasure in ["MaximumError","ME", "Linf"]:
146                     Jb  = 0.
147                     Jo  = numpy.max( numpy.abs(_Innovation) )
148                 #
149                 J   = Jb + Jo
150             if self._toStore("CurrentState"):
151                 self.StoredVariables["CurrentState"].store( _X )
152             if self._toStore("InnovationAtCurrentState"):
153                 self.StoredVariables["InnovationAtCurrentState"].store( _Innovation )
154             if self._toStore("SimulatedObservationAtCurrentState"):
155                 self.StoredVariables["SimulatedObservationAtCurrentState"].store( _HX )
156             self.StoredVariables["CostFunctionJb"].store( Jb )
157             self.StoredVariables["CostFunctionJo"].store( Jo )
158             self.StoredVariables["CostFunctionJ" ].store( J )
159             return J, Jb, Jo
160         #
161         # ----------
162         EOX, EOS = eosg.eosg(self, Xb, HO, True)
163         #
164         for i in range(EOS.shape[1]):
165             J, Jb, Jo = CostFunction( EOX[:,i], EOS[:,i],  self._parameters["QualityCriterion"])
166         # ----------
167         #
168         self._post_run(HO)
169         return 0
170
171 # ==============================================================================
172 if __name__ == "__main__":
173     print('\n AUTODIAGNOSTIC\n')