1 # -*- coding: utf-8 -*-
3 # Copyright (C) 2008-2022 EDF R&D
5 # This library is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation; either
8 # version 2.1 of the License.
10 # This library is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 # Lesser General Public License for more details.
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this library; if not, write to the Free Software
17 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 # Author: Jean-Philippe Argaud, jean-philippe.argaud@edf.fr, EDF R&D
26 __author__ = "Jean-Philippe ARGAUD"
28 import numpy, scipy, scipy.optimize, scipy.version
29 from daCore.NumericObjects import ForceNumericBounds, ApplyBounds
30 from daCore.PlatformInfo import PlatformInfo
31 mpr = PlatformInfo().MachinePrecision()
32 mfp = PlatformInfo().MaximumPrecision()
34 # ==============================================================================
35 def std4dvar(selfA, Xb, Y, U, HO, EM, CM, R, B, Q):
44 Hm = HO["Direct"].appliedControledFormTo
45 Mm = EM["Direct"].appliedControledFormTo
47 if CM is not None and "Tangent" in CM and U is not None:
48 Cm = CM["Tangent"].asMatrix(Xb)
54 if hasattr(U,"store") and 1<=_step<len(U) :
55 _Un = numpy.ravel( U[_step] ).reshape((-1,1))
56 elif hasattr(U,"store") and len(U)==1:
57 _Un = numpy.ravel( U[0] ).reshape((-1,1))
59 _Un = numpy.ravel( U ).reshape((-1,1))
64 if Cm is not None and _un is not None: # Attention : si Cm est aussi dans M, doublon !
65 _Cm = Cm.reshape(_xn.size,_un.size) # ADAO & check shape
66 _CmUn = (_Cm @ _un).reshape((-1,1))
71 # Remarque : les observations sont exploitées à partir du pas de temps
72 # numéro 1, et sont utilisées dans Yo comme rangées selon ces indices.
73 # Donc le pas 0 n'est pas utilisé puisque la première étape commence
74 # avec l'observation du pas 1.
76 # Nombre de pas identique au nombre de pas d'observations
77 if hasattr(Y,"stepnumber"):
78 duration = Y.stepnumber()
82 # Précalcul des inversions de B et R
86 # Point de démarrage de l'optimisation
87 Xini = selfA._parameters["InitializationPoint"]
89 # Définition de la fonction-coût
90 # ------------------------------
91 selfA.DirectCalculation = [None,] # Le pas 0 n'est pas observé
92 selfA.DirectInnovation = [None,] # Le pas 0 n'est pas observé
94 _X = numpy.asarray(x).reshape((-1,1))
95 if selfA._parameters["StoreInternalVariables"] or \
96 selfA._toStore("CurrentState") or \
97 selfA._toStore("CurrentOptimum"):
98 selfA.StoredVariables["CurrentState"].store( _X )
99 Jb = float( 0.5 * (_X - Xb).T * (BI * (_X - Xb)) )
100 selfA.DirectCalculation = [None,]
101 selfA.DirectInnovation = [None,]
104 for step in range(0,duration-1):
105 if hasattr(Y,"store"):
106 _Ynpu = numpy.ravel( Y[step+1] ).reshape((-1,1))
108 _Ynpu = numpy.ravel( Y ).reshape((-1,1))
112 if selfA._parameters["EstimationOf"] == "State":
113 _Xn = Mm( (_Xn, _Un) ).reshape((-1,1)) + CmUn(_Xn, _Un)
114 elif selfA._parameters["EstimationOf"] == "Parameters":
117 if selfA._parameters["Bounds"] is not None and selfA._parameters["ConstrainedBy"] == "EstimateProjection":
118 _Xn = ApplyBounds( _Xn, ForceNumericBounds(selfA._parameters["Bounds"]) )
120 # Etape de différence aux observations
121 if selfA._parameters["EstimationOf"] == "State":
122 _YmHMX = _Ynpu - numpy.ravel( Hm( (_Xn, None) ) ).reshape((-1,1))
123 elif selfA._parameters["EstimationOf"] == "Parameters":
124 _YmHMX = _Ynpu - numpy.ravel( Hm( (_Xn, _Un) ) ).reshape((-1,1)) - CmUn(_Xn, _Un)
127 selfA.DirectCalculation.append( _Xn )
128 selfA.DirectInnovation.append( _YmHMX )
130 # Ajout dans la fonctionnelle d'observation
131 Jo = Jo + 0.5 * float( _YmHMX.T * (RI * _YmHMX) )
134 selfA.StoredVariables["CurrentIterationNumber"].store( len(selfA.StoredVariables["CostFunctionJ"]) )
135 selfA.StoredVariables["CostFunctionJb"].store( Jb )
136 selfA.StoredVariables["CostFunctionJo"].store( Jo )
137 selfA.StoredVariables["CostFunctionJ" ].store( J )
138 if selfA._toStore("IndexOfOptimum") or \
139 selfA._toStore("CurrentOptimum") or \
140 selfA._toStore("CostFunctionJAtCurrentOptimum") or \
141 selfA._toStore("CostFunctionJbAtCurrentOptimum") or \
142 selfA._toStore("CostFunctionJoAtCurrentOptimum"):
143 IndexMin = numpy.argmin( selfA.StoredVariables["CostFunctionJ"][nbPreviousSteps:] ) + nbPreviousSteps
144 if selfA._toStore("IndexOfOptimum"):
145 selfA.StoredVariables["IndexOfOptimum"].store( IndexMin )
146 if selfA._toStore("CurrentOptimum"):
147 selfA.StoredVariables["CurrentOptimum"].store( selfA.StoredVariables["CurrentState"][IndexMin] )
148 if selfA._toStore("CostFunctionJAtCurrentOptimum"):
149 selfA.StoredVariables["CostFunctionJAtCurrentOptimum" ].store( selfA.StoredVariables["CostFunctionJ" ][IndexMin] )
150 if selfA._toStore("CostFunctionJbAtCurrentOptimum"):
151 selfA.StoredVariables["CostFunctionJbAtCurrentOptimum"].store( selfA.StoredVariables["CostFunctionJb"][IndexMin] )
152 if selfA._toStore("CostFunctionJoAtCurrentOptimum"):
153 selfA.StoredVariables["CostFunctionJoAtCurrentOptimum"].store( selfA.StoredVariables["CostFunctionJo"][IndexMin] )
156 def GradientOfCostFunction(x):
157 _X = numpy.asarray(x).reshape((-1,1))
158 GradJb = BI * (_X - Xb)
160 for step in range(duration-1,0,-1):
161 # Étape de récupération du dernier stockage de l'évolution
162 _Xn = selfA.DirectCalculation.pop()
163 # Étape de récupération du dernier stockage de l'innovation
164 _YmHMX = selfA.DirectInnovation.pop()
165 # Calcul des adjoints
166 Ha = HO["Adjoint"].asMatrix(ValueForMethodForm = _Xn)
167 Ha = Ha.reshape(_Xn.size,_YmHMX.size) # ADAO & check shape
168 Ma = EM["Adjoint"].asMatrix(ValueForMethodForm = _Xn)
169 Ma = Ma.reshape(_Xn.size,_Xn.size) # ADAO & check shape
170 # Calcul du gradient par état adjoint
171 GradJo = GradJo + Ha * (RI * _YmHMX) # Équivaut pour Ha linéaire à : Ha( (_Xn, RI * _YmHMX) )
172 GradJo = Ma * GradJo # Équivaut pour Ma linéaire à : Ma( (_Xn, GradJo) )
173 GradJ = numpy.ravel( GradJb ) - numpy.ravel( GradJo )
176 # Minimisation de la fonctionnelle
177 # --------------------------------
178 nbPreviousSteps = selfA.StoredVariables["CostFunctionJ"].stepnumber()
180 if selfA._parameters["Minimizer"] == "LBFGSB":
181 if "0.19" <= scipy.version.version <= "1.4.99":
182 import daAlgorithms.Atoms.lbfgsb14hlt as optimiseur
183 elif "1.5.0" <= scipy.version.version <= "1.7.99":
184 import daAlgorithms.Atoms.lbfgsb17hlt as optimiseur
185 elif "1.8.0" <= scipy.version.version <= "1.8.99":
186 import daAlgorithms.Atoms.lbfgsb18hlt as optimiseur
187 elif "1.9.0" <= scipy.version.version <= "1.9.99":
188 import daAlgorithms.Atoms.lbfgsb19hlt as optimiseur
190 import scipy.optimize as optimiseur
191 Minimum, J_optimal, Informations = optimiseur.fmin_l_bfgs_b(
194 fprime = GradientOfCostFunction,
196 bounds = selfA._parameters["Bounds"],
197 maxfun = selfA._parameters["MaximumNumberOfIterations"]-1,
198 factr = selfA._parameters["CostDecrementTolerance"]*1.e14,
199 pgtol = selfA._parameters["ProjectedGradientTolerance"],
200 iprint = selfA._parameters["optiprint"],
202 # nfeval = Informations['funcalls']
203 # rc = Informations['warnflag']
204 elif selfA._parameters["Minimizer"] == "TNC":
205 Minimum, nfeval, rc = scipy.optimize.fmin_tnc(
208 fprime = GradientOfCostFunction,
210 bounds = selfA._parameters["Bounds"],
211 maxfun = selfA._parameters["MaximumNumberOfIterations"],
212 pgtol = selfA._parameters["ProjectedGradientTolerance"],
213 ftol = selfA._parameters["CostDecrementTolerance"],
214 messages = selfA._parameters["optmessages"],
216 elif selfA._parameters["Minimizer"] == "CG":
217 Minimum, fopt, nfeval, grad_calls, rc = scipy.optimize.fmin_cg(
220 fprime = GradientOfCostFunction,
222 maxiter = selfA._parameters["MaximumNumberOfIterations"],
223 gtol = selfA._parameters["GradientNormTolerance"],
224 disp = selfA._parameters["optdisp"],
227 elif selfA._parameters["Minimizer"] == "NCG":
228 Minimum, fopt, nfeval, grad_calls, hcalls, rc = scipy.optimize.fmin_ncg(
231 fprime = GradientOfCostFunction,
233 maxiter = selfA._parameters["MaximumNumberOfIterations"],
234 avextol = selfA._parameters["CostDecrementTolerance"],
235 disp = selfA._parameters["optdisp"],
238 elif selfA._parameters["Minimizer"] == "BFGS":
239 Minimum, fopt, gopt, Hopt, nfeval, grad_calls, rc = scipy.optimize.fmin_bfgs(
242 fprime = GradientOfCostFunction,
244 maxiter = selfA._parameters["MaximumNumberOfIterations"],
245 gtol = selfA._parameters["GradientNormTolerance"],
246 disp = selfA._parameters["optdisp"],
250 raise ValueError("Error in minimizer name: %s is unkown"%selfA._parameters["Minimizer"])
252 IndexMin = numpy.argmin( selfA.StoredVariables["CostFunctionJ"][nbPreviousSteps:] ) + nbPreviousSteps
254 # Correction pour pallier a un bug de TNC sur le retour du Minimum
255 # ----------------------------------------------------------------
256 if selfA._parameters["StoreInternalVariables"] or selfA._toStore("CurrentState"):
257 Minimum = selfA.StoredVariables["CurrentState"][IndexMin]
259 # Obtention de l'analyse
260 # ----------------------
263 selfA.StoredVariables["Analysis"].store( Xa )
265 # Calculs et/ou stockages supplémentaires
266 # ---------------------------------------
267 if selfA._toStore("BMA"):
268 selfA.StoredVariables["BMA"].store( numpy.ravel(Xb) - numpy.ravel(Xa) )
272 # ==============================================================================
273 if __name__ == "__main__":
274 print('\n AUTODIAGNOSTIC\n')