1 # -*- coding: utf-8 -*-
3 # Copyright (C) 2008-2024 EDF R&D
5 # This library is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation; either
8 # version 2.1 of the License.
10 # This library is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 # Lesser General Public License for more details.
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this library; if not, write to the Free Software
17 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 # Author: Jean-Philippe Argaud, jean-philippe.argaud@edf.fr, EDF R&D
26 __author__ = "Jean-Philippe ARGAUD"
28 import numpy, scipy, scipy.optimize, scipy.version
29 from daCore.NumericObjects import ForceNumericBounds, ApplyBounds
30 from daCore.PlatformInfo import PlatformInfo, vt, vfloat
31 mpr = PlatformInfo().MachinePrecision()
32 mfp = PlatformInfo().MaximumPrecision()
34 # ==============================================================================
35 def std4dvar(selfA, Xb, Y, U, HO, EM, CM, R, B, Q):
44 Hm = HO["Direct"].appliedControledFormTo
45 Mm = EM["Direct"].appliedControledFormTo
47 if CM is not None and "Tangent" in CM and U is not None:
48 Cm = CM["Tangent"].asMatrix(Xb)
54 if hasattr(U, "store") and 1 <= _step < len(U):
55 _Un = numpy.ravel( U[_step] ).reshape((-1, 1))
56 elif hasattr(U, "store") and len(U) == 1:
57 _Un = numpy.ravel( U[0] ).reshape((-1, 1))
59 _Un = numpy.ravel( U ).reshape((-1, 1))
65 if Cm is not None and _un is not None: # Attention : si Cm est aussi dans M, doublon !
66 _Cm = Cm.reshape(_xn.size, _un.size) # ADAO & check shape
67 _CmUn = (_Cm @ _un).reshape((-1, 1))
72 # Remarque : les observations sont exploitées à partir du pas de temps
73 # numéro 1, et sont utilisées dans Yo comme rangées selon ces indices.
74 # Donc le pas 0 n'est pas utilisé puisque la première étape commence
75 # avec l'observation du pas 1.
77 # Nombre de pas identique au nombre de pas d'observations
78 if hasattr(Y, "stepnumber"):
79 duration = Y.stepnumber()
83 # Précalcul des inversions de B et R
87 # Point de démarrage de l'optimisation
88 Xini = selfA._parameters["InitializationPoint"]
90 # Définition de la fonction-coût
91 # ------------------------------
92 selfA.DirectCalculation = [None,] # Le pas 0 n'est pas observé
93 selfA.DirectInnovation = [None,] # Le pas 0 n'est pas observé
96 _X = numpy.asarray(x).reshape((-1, 1))
97 if selfA._parameters["StoreInternalVariables"] or \
98 selfA._toStore("CurrentState") or \
99 selfA._toStore("CurrentOptimum"):
100 selfA.StoredVariables["CurrentState"].store( _X )
101 Jb = vfloat( 0.5 * (_X - Xb).T * (BI * (_X - Xb)) )
102 selfA.DirectCalculation = [None,]
103 selfA.DirectInnovation = [None,]
106 for step in range(0, duration - 1):
107 if hasattr(Y, "store"):
108 _Ynpu = numpy.ravel( Y[step + 1] ).reshape((-1, 1))
110 _Ynpu = numpy.ravel( Y ).reshape((-1, 1))
114 if selfA._parameters["EstimationOf"] == "State":
115 _Xn = Mm( (_Xn, _Un) ).reshape((-1, 1)) + CmUn(_Xn, _Un)
116 elif selfA._parameters["EstimationOf"] == "Parameters":
119 if selfA._parameters["Bounds"] is not None and selfA._parameters["ConstrainedBy"] == "EstimateProjection":
120 _Xn = ApplyBounds( _Xn, ForceNumericBounds(selfA._parameters["Bounds"]) )
122 # Etape de différence aux observations
123 if selfA._parameters["EstimationOf"] == "State":
124 _YmHMX = _Ynpu - numpy.ravel( Hm( (_Xn, None) ) ).reshape((-1, 1))
125 elif selfA._parameters["EstimationOf"] == "Parameters":
126 _YmHMX = _Ynpu - numpy.ravel( Hm( (_Xn, None) ) ).reshape((-1, 1)) - CmUn(_Xn, _Un)
129 selfA.DirectCalculation.append( _Xn )
130 selfA.DirectInnovation.append( _YmHMX )
132 # Ajout dans la fonctionnelle d'observation
133 Jo = Jo + 0.5 * vfloat( _YmHMX.T * (RI * _YmHMX) )
136 selfA.StoredVariables["CurrentIterationNumber"].store( len(selfA.StoredVariables["CostFunctionJ"]) )
137 selfA.StoredVariables["CostFunctionJb"].store( Jb )
138 selfA.StoredVariables["CostFunctionJo"].store( Jo )
139 selfA.StoredVariables["CostFunctionJ" ].store( J )
140 if selfA._toStore("IndexOfOptimum") or \
141 selfA._toStore("CurrentOptimum") or \
142 selfA._toStore("CostFunctionJAtCurrentOptimum") or \
143 selfA._toStore("CostFunctionJbAtCurrentOptimum") or \
144 selfA._toStore("CostFunctionJoAtCurrentOptimum"):
145 IndexMin = numpy.argmin( selfA.StoredVariables["CostFunctionJ"][nbPreviousSteps:] ) + nbPreviousSteps
146 if selfA._toStore("IndexOfOptimum"):
147 selfA.StoredVariables["IndexOfOptimum"].store( IndexMin )
148 if selfA._toStore("CurrentOptimum"):
149 selfA.StoredVariables["CurrentOptimum"].store( selfA.StoredVariables["CurrentState"][IndexMin] )
150 if selfA._toStore("CostFunctionJAtCurrentOptimum"):
151 selfA.StoredVariables["CostFunctionJAtCurrentOptimum" ].store( selfA.StoredVariables["CostFunctionJ" ][IndexMin] ) # noqa: E501
152 if selfA._toStore("CostFunctionJbAtCurrentOptimum"):
153 selfA.StoredVariables["CostFunctionJbAtCurrentOptimum"].store( selfA.StoredVariables["CostFunctionJb"][IndexMin] ) # noqa: E501
154 if selfA._toStore("CostFunctionJoAtCurrentOptimum"):
155 selfA.StoredVariables["CostFunctionJoAtCurrentOptimum"].store( selfA.StoredVariables["CostFunctionJo"][IndexMin] ) # noqa: E501
158 def GradientOfCostFunction(x):
159 _X = numpy.asarray(x).reshape((-1, 1))
160 GradJb = BI * (_X - Xb)
162 for step in range(duration - 1, 0, -1):
163 # Étape de récupération du dernier stockage de l'évolution
164 _Xn = selfA.DirectCalculation.pop()
165 # Étape de récupération du dernier stockage de l'innovation
166 _YmHMX = selfA.DirectInnovation.pop()
167 # Calcul des adjoints
168 Ha = HO["Adjoint"].asMatrix(ValueForMethodForm = _Xn)
169 Ha = Ha.reshape(_Xn.size, _YmHMX.size) # ADAO & check shape
170 Ma = EM["Adjoint"].asMatrix(ValueForMethodForm = _Xn)
171 Ma = Ma.reshape(_Xn.size, _Xn.size) # ADAO & check shape
172 # Calcul du gradient par état adjoint
173 GradJo = GradJo + Ha * (RI * _YmHMX) # Équivaut pour Ha linéaire à : Ha( (_Xn, RI * _YmHMX) )
174 GradJo = Ma * GradJo # Équivaut pour Ma linéaire à : Ma( (_Xn, GradJo) )
175 GradJ = numpy.ravel( GradJb ) - numpy.ravel( GradJo )
178 # Minimisation de la fonctionnelle
179 # --------------------------------
180 nbPreviousSteps = selfA.StoredVariables["CostFunctionJ"].stepnumber()
182 if selfA._parameters["Minimizer"] == "LBFGSB":
183 if vt("0.19") <= vt(scipy.version.version) <= vt("1.4.99"):
184 import daAlgorithms.Atoms.lbfgsb14hlt as optimiseur
185 elif vt("1.5.0") <= vt(scipy.version.version) <= vt("1.7.99"):
186 import daAlgorithms.Atoms.lbfgsb17hlt as optimiseur
187 elif vt("1.8.0") <= vt(scipy.version.version) <= vt("1.8.99"):
188 import daAlgorithms.Atoms.lbfgsb18hlt as optimiseur
189 elif vt("1.9.0") <= vt(scipy.version.version) <= vt("1.10.99"):
190 import daAlgorithms.Atoms.lbfgsb19hlt as optimiseur
191 elif vt("1.11.0") <= vt(scipy.version.version) <= vt("1.11.99"):
192 import daAlgorithms.Atoms.lbfgsb111hlt as optimiseur
193 elif vt("1.12.0") <= vt(scipy.version.version) <= vt("1.12.99"):
194 import daAlgorithms.Atoms.lbfgsb112hlt as optimiseur
195 elif vt("1.13.0") <= vt(scipy.version.version) <= vt("1.13.99"):
196 import daAlgorithms.Atoms.lbfgsb113hlt as optimiseur
197 elif vt("1.14.0") <= vt(scipy.version.version) <= vt("1.14.99"):
198 import daAlgorithms.Atoms.lbfgsb114hlt as optimiseur
200 import scipy.optimize as optimiseur
201 Minimum, J_optimal, Informations = optimiseur.fmin_l_bfgs_b(
204 fprime = GradientOfCostFunction,
206 bounds = selfA._parameters["Bounds"],
207 maxfun = selfA._parameters["MaximumNumberOfIterations"] - 1,
208 factr = selfA._parameters["CostDecrementTolerance"] * 1.e14,
209 pgtol = selfA._parameters["ProjectedGradientTolerance"],
210 iprint = selfA._parameters["optiprint"],
212 # nfeval = Informations['funcalls']
213 # rc = Informations['warnflag']
214 elif selfA._parameters["Minimizer"] == "TNC":
215 Minimum, nfeval, rc = scipy.optimize.fmin_tnc(
218 fprime = GradientOfCostFunction,
220 bounds = selfA._parameters["Bounds"],
221 maxfun = selfA._parameters["MaximumNumberOfIterations"],
222 pgtol = selfA._parameters["ProjectedGradientTolerance"],
223 ftol = selfA._parameters["CostDecrementTolerance"],
224 messages = selfA._parameters["optmessages"],
226 elif selfA._parameters["Minimizer"] == "CG":
227 Minimum, fopt, nfeval, grad_calls, rc = scipy.optimize.fmin_cg(
230 fprime = GradientOfCostFunction,
232 maxiter = selfA._parameters["MaximumNumberOfIterations"],
233 gtol = selfA._parameters["GradientNormTolerance"],
234 disp = selfA._parameters["optdisp"],
237 elif selfA._parameters["Minimizer"] == "NCG":
238 Minimum, fopt, nfeval, grad_calls, hcalls, rc = scipy.optimize.fmin_ncg(
241 fprime = GradientOfCostFunction,
243 maxiter = selfA._parameters["MaximumNumberOfIterations"],
244 avextol = selfA._parameters["CostDecrementTolerance"],
245 disp = selfA._parameters["optdisp"],
248 elif selfA._parameters["Minimizer"] == "BFGS":
249 Minimum, fopt, gopt, Hopt, nfeval, grad_calls, rc = scipy.optimize.fmin_bfgs(
252 fprime = GradientOfCostFunction,
254 maxiter = selfA._parameters["MaximumNumberOfIterations"],
255 gtol = selfA._parameters["GradientNormTolerance"],
256 disp = selfA._parameters["optdisp"],
260 raise ValueError("Error in minimizer name: %s is unkown"%selfA._parameters["Minimizer"])
262 IndexMin = numpy.argmin( selfA.StoredVariables["CostFunctionJ"][nbPreviousSteps:] ) + nbPreviousSteps
264 # Correction pour pallier a un bug de TNC sur le retour du Minimum
265 # ----------------------------------------------------------------
266 if selfA._parameters["StoreInternalVariables"] or selfA._toStore("CurrentState"):
267 Minimum = selfA.StoredVariables["CurrentState"][IndexMin]
269 # Obtention de l'analyse
270 # ----------------------
273 selfA.StoredVariables["Analysis"].store( Xa )
275 # Calculs et/ou stockages supplémentaires
276 # ---------------------------------------
277 if selfA._toStore("BMA"):
278 selfA.StoredVariables["BMA"].store( numpy.ravel(Xb) - numpy.ravel(Xa) )
282 # ==============================================================================
283 if __name__ == "__main__":
284 print('\n AUTODIAGNOSTIC\n')