1 # -*- coding: utf-8 -*-
3 # Copyright (C) 2008-2023 EDF R&D
5 # This library is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation; either
8 # version 2.1 of the License.
10 # This library is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 # Lesser General Public License for more details.
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this library; if not, write to the Free Software
17 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 # Author: Jean-Philippe Argaud, jean-philippe.argaud@edf.fr, EDF R&D
24 from daCore import BasicObjects, NumericObjects
25 from daAlgorithms.Atoms import eosg
26 from daCore.PlatformInfo import PlatformInfo
27 mfp = PlatformInfo().MaximumPrecision()
29 # ==============================================================================
30 class ElementaryAlgorithm(BasicObjects.Algorithm):
32 BasicObjects.Algorithm.__init__(self, "SAMPLINGTEST")
33 self.defineRequiredParameter(
34 name = "SampleAsnUplet",
37 message = "Points de calcul définis par une liste de n-uplet",
39 self.defineRequiredParameter(
40 name = "SampleAsExplicitHyperCube",
43 message = "Points de calcul définis par un hyper-cube dont on donne la liste des échantillonnages de chaque variable comme une liste",
45 self.defineRequiredParameter(
46 name = "SampleAsMinMaxStepHyperCube",
49 message = "Points de calcul définis par un hyper-cube dont on donne la liste des échantillonnages de chaque variable par un triplet [min,max,step]",
51 self.defineRequiredParameter(
52 name = "SampleAsIndependantRandomVariables",
55 message = "Points de calcul définis par un hyper-cube dont les points sur chaque axe proviennent de l'échantillonnage indépendant de la variable selon la spécification ['distribution',[parametres],nombre]",
57 self.defineRequiredParameter(
58 name = "QualityCriterion",
59 default = "AugmentedWeightedLeastSquares",
61 message = "Critère de qualité utilisé",
64 "AugmentedWeightedLeastSquares", "AWLS",
65 "WeightedLeastSquares","WLS",
66 "LeastSquares", "LS", "L2",
67 "AbsoluteValue", "L1",
68 "MaximumError", "ME", "Linf",
71 "AugmentedPonderatedLeastSquares", "APLS",
72 "PonderatedLeastSquares", "PLS",
75 self.defineRequiredParameter(
79 message = "Activation du mode debug lors de l'exécution",
81 self.defineRequiredParameter(
82 name = "StoreSupplementaryCalculations",
85 message = "Liste de calculs supplémentaires à stocker et/ou effectuer",
91 "EnsembleOfSimulations",
93 "InnovationAtCurrentState",
94 "SimulatedObservationAtCurrentState",
97 self.defineRequiredParameter(
99 typecast = numpy.random.seed,
100 message = "Graine fixée pour le générateur aléatoire",
102 self.requireInputArguments(
103 mandatory= ("Xb", "Y", "HO", "R", "B"),
105 self.setAttributes(tags=(
109 def run(self, Xb=None, Y=None, U=None, HO=None, EM=None, CM=None, R=None, B=None, Q=None, Parameters=None):
110 self._pre_run(Parameters, Xb, Y, U, HO, EM, CM, R, B, Q)
112 if hasattr(Y,"store"):
113 Yb = numpy.asarray( Y[-1] ).reshape((-1,1)) # Y en Vector ou VectorSerie
115 Yb = numpy.asarray( Y ).reshape((-1,1)) # Y en Vector ou VectorSerie
118 def CostFunction(x, HmX, QualityMeasure="AugmentedWeightedLeastSquares"):
119 if numpy.any(numpy.isnan(HmX)):
122 Jb, Jo, J = numpy.nan, numpy.nan, numpy.nan
124 _X = numpy.asarray( x ).reshape((-1,1))
125 _HX = numpy.asarray( HmX ).reshape((-1,1))
126 _Innovation = Yb - _HX
127 assert Yb.size == _HX.size
128 assert Yb.size == _Innovation.size
129 if QualityMeasure in ["AugmentedWeightedLeastSquares","AWLS","AugmentedPonderatedLeastSquares","APLS","DA"]:
130 if BI is None or RI is None:
131 raise ValueError("Background and Observation error covariance matrix has to be properly defined!")
132 Jb = float( 0.5 * (_X - Xb).T * (BI * (_X - Xb)) )
133 Jo = float( 0.5 * _Innovation.T * (RI * _Innovation) )
134 elif QualityMeasure in ["WeightedLeastSquares","WLS","PonderatedLeastSquares","PLS"]:
136 raise ValueError("Observation error covariance matrix has to be properly defined!")
138 Jo = float( 0.5 * _Innovation.T * (RI * _Innovation) )
139 elif QualityMeasure in ["LeastSquares","LS","L2"]:
141 Jo = float( 0.5 * _Innovation.T @ _Innovation )
142 elif QualityMeasure in ["AbsoluteValue","L1"]:
144 Jo = float( numpy.sum( numpy.abs(_Innovation), dtype=mfp ) )
145 elif QualityMeasure in ["MaximumError","ME", "Linf"]:
147 Jo = numpy.max( numpy.abs(_Innovation) )
150 if self._toStore("CurrentState"):
151 self.StoredVariables["CurrentState"].store( _X )
152 if self._toStore("InnovationAtCurrentState"):
153 self.StoredVariables["InnovationAtCurrentState"].store( _Innovation )
154 if self._toStore("SimulatedObservationAtCurrentState"):
155 self.StoredVariables["SimulatedObservationAtCurrentState"].store( _HX )
156 self.StoredVariables["CostFunctionJb"].store( Jb )
157 self.StoredVariables["CostFunctionJo"].store( Jo )
158 self.StoredVariables["CostFunctionJ" ].store( J )
162 EOX, EOS = eosg.eosg(self, Xb, HO, True)
164 for i in range(EOS.shape[1]):
165 J, Jb, Jo = CostFunction( EOX[:,i], EOS[:,i], self._parameters["QualityCriterion"])
171 # ==============================================================================
172 if __name__ == "__main__":
173 print('\n AUTODIAGNOSTIC\n')