1 # -*- coding: utf-8 -*-
3 # Copyright (C) 2008-2022 EDF R&D
5 # This library is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation; either
8 # version 2.1 of the License.
10 # This library is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 # Lesser General Public License for more details.
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this library; if not, write to the Free Software
17 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 # Author: Jean-Philippe Argaud, jean-philippe.argaud@edf.fr, EDF R&D
23 import numpy, logging, itertools
24 from daCore import BasicObjects, NumericObjects
25 from daCore.PlatformInfo import PlatformInfo
26 mfp = PlatformInfo().MaximumPrecision()
28 # ==============================================================================
29 class ElementaryAlgorithm(BasicObjects.Algorithm):
31 BasicObjects.Algorithm.__init__(self, "SAMPLINGTEST")
32 self.defineRequiredParameter(
33 name = "SampleAsnUplet",
36 message = "Points de calcul définis par une liste de n-uplet",
38 self.defineRequiredParameter(
39 name = "SampleAsExplicitHyperCube",
42 message = "Points de calcul définis par un hyper-cube dont on donne la liste des échantillonnages de chaque variable comme une liste",
44 self.defineRequiredParameter(
45 name = "SampleAsMinMaxStepHyperCube",
48 message = "Points de calcul définis par un hyper-cube dont on donne la liste des échantillonnages de chaque variable par un triplet [min,max,step]",
50 self.defineRequiredParameter(
51 name = "SampleAsIndependantRandomVariables",
54 message = "Points de calcul définis par un hyper-cube dont les points sur chaque axe proviennent de l'échantillonnage indépendant de la variable selon la spécification ['distribution',[parametres],nombre]",
56 self.defineRequiredParameter(
57 name = "QualityCriterion",
58 default = "AugmentedWeightedLeastSquares",
60 message = "Critère de qualité utilisé",
63 "AugmentedWeightedLeastSquares", "AWLS",
64 "WeightedLeastSquares","WLS",
65 "LeastSquares", "LS", "L2",
66 "AbsoluteValue", "L1",
67 "MaximumError", "ME", "Linf",
70 "AugmentedPonderatedLeastSquares", "APLS",
71 "PonderatedLeastSquares", "PLS",
74 self.defineRequiredParameter(
78 message = "Activation du mode debug lors de l'exécution",
80 self.defineRequiredParameter(
81 name = "StoreSupplementaryCalculations",
84 message = "Liste de calculs supplémentaires à stocker et/ou effectuer",
90 "InnovationAtCurrentState",
91 "SimulatedObservationAtCurrentState",
94 self.defineRequiredParameter(
96 typecast = numpy.random.seed,
97 message = "Graine fixée pour le générateur aléatoire",
99 self.requireInputArguments(
100 mandatory= ("Xb", "HO"),
102 self.setAttributes(tags=(
106 def run(self, Xb=None, Y=None, U=None, HO=None, EM=None, CM=None, R=None, B=None, Q=None, Parameters=None):
107 self._pre_run(Parameters, Xb, Y, U, HO, EM, CM, R, B, Q)
109 Hm = HO["Direct"].appliedTo
111 X0 = numpy.ravel( Xb )
112 Y0 = numpy.ravel( Y )
114 # ---------------------------
115 sampleList = NumericObjects.BuildComplexSampleList(
116 self._parameters["SampleAsnUplet"],
117 self._parameters["SampleAsExplicitHyperCube"],
118 self._parameters["SampleAsMinMaxStepHyperCube"],
119 self._parameters["SampleAsIndependantRandomVariables"],
125 def CostFunction(x, HmX, QualityMeasure="AugmentedWeightedLeastSquares"):
126 if numpy.any(numpy.isnan(HmX)):
129 Jb, Jo, J = numpy.nan, numpy.nan, numpy.nan
131 _X = numpy.ravel( x )
132 _HX = numpy.ravel( HmX )
133 if QualityMeasure in ["AugmentedWeightedLeastSquares","AWLS","AugmentedPonderatedLeastSquares","APLS","DA"]:
134 if BI is None or RI is None:
135 raise ValueError("Background and Observation error covariance matrix has to be properly defined!")
136 Jb = float( 0.5 * (_X - X0).T * (BI * (_X - X0)) )
137 Jo = float( 0.5 * (Y0 - _HX).T * (RI * (Y0 - _HX)) )
138 elif QualityMeasure in ["WeightedLeastSquares","WLS","PonderatedLeastSquares","PLS"]:
140 raise ValueError("Observation error covariance matrix has to be properly defined!")
142 Jo = float( 0.5 * (Y0 - _HX).T * (RI * (Y0 - _HX)) )
143 elif QualityMeasure in ["LeastSquares","LS","L2"]:
145 Jo = float( 0.5 * (Y0 - _HX).T @ (Y0 - _HX) )
146 elif QualityMeasure in ["AbsoluteValue","L1"]:
148 Jo = float( numpy.sum( numpy.abs(Y0 - _HX), dtype=mfp ) )
149 elif QualityMeasure in ["MaximumError","ME", "Linf"]:
151 Jo = numpy.max( numpy.abs(Y0 - _HX) )
154 if self._toStore("CurrentState"):
155 self.StoredVariables["CurrentState"].store( _X )
156 if self._toStore("InnovationAtCurrentState"):
157 self.StoredVariables["InnovationAtCurrentState"].store( Y0 - _HX )
158 if self._toStore("SimulatedObservationAtCurrentState"):
159 self.StoredVariables["SimulatedObservationAtCurrentState"].store( _HX )
160 self.StoredVariables["CostFunctionJb"].store( Jb )
161 self.StoredVariables["CostFunctionJo"].store( Jo )
162 self.StoredVariables["CostFunctionJ" ].store( J )
165 if self._parameters["SetDebug"]:
166 CUR_LEVEL = logging.getLogger().getEffectiveLevel()
167 logging.getLogger().setLevel(logging.DEBUG)
168 print("===> Beginning of evaluation, activating debug\n")
169 print(" %s\n"%("-"*75,))
172 for i,Xx in enumerate(sampleList):
173 if self._parameters["SetDebug"]:
174 print("===> Launching evaluation for state %i"%i)
176 Yy = Hm( numpy.ravel( Xx ) )
180 J, Jb, Jo = CostFunction( Xx, Yy, self._parameters["QualityCriterion"])
183 if self._parameters["SetDebug"]:
184 print("\n %s\n"%("-"*75,))
185 print("===> End evaluation, deactivating debug if necessary\n")
186 logging.getLogger().setLevel(CUR_LEVEL)
191 # ==============================================================================
192 if __name__ == "__main__":
193 print('\n AUTODIAGNOSTIC\n')