1 # -*- coding: utf-8 -*-
3 # Copyright (C) 2008-2024 EDF R&D
5 # This library is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation; either
8 # version 2.1 of the License.
10 # This library is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 # Lesser General Public License for more details.
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this library; if not, write to the Free Software
17 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 # Author: Jean-Philippe Argaud, jean-philippe.argaud@edf.fr, EDF R&D
23 import numpy, logging, scipy.optimize
24 from daCore import BasicObjects, PlatformInfo
25 from daCore.NumericObjects import ApplyBounds, ForceNumericBounds
26 from daCore.PlatformInfo import vfloat
28 # ==============================================================================
29 class ElementaryAlgorithm(BasicObjects.Algorithm):
31 BasicObjects.Algorithm.__init__(self, "DERIVATIVEFREEOPTIMIZATION")
32 self.defineRequiredParameter(
36 message = "Minimiseur utilisé",
46 self.defineRequiredParameter(
47 name = "MaximumNumberOfIterations",
50 message = "Nombre maximal de pas d'optimisation",
52 oldname = "MaximumNumberOfSteps",
54 self.defineRequiredParameter(
55 name = "MaximumNumberOfFunctionEvaluations",
58 message = "Nombre maximal d'évaluations de la fonction",
61 self.defineRequiredParameter(
62 name = "StateVariationTolerance",
65 message = "Variation relative maximale de l'état lors de l'arrêt",
67 self.defineRequiredParameter(
68 name = "CostDecrementTolerance",
71 message = "Diminution relative minimale du cout lors de l'arrêt",
73 self.defineRequiredParameter(
74 name = "QualityCriterion",
75 default = "AugmentedWeightedLeastSquares",
77 message = "Critère de qualité utilisé",
79 "AugmentedWeightedLeastSquares", "AWLS", "DA",
80 "WeightedLeastSquares", "WLS",
81 "LeastSquares", "LS", "L2",
82 "AbsoluteValue", "L1",
83 "MaximumError", "ME", "Linf",
86 self.defineRequiredParameter(
87 name = "StoreInternalVariables",
90 message = "Stockage des variables internes ou intermédiaires du calcul",
92 self.defineRequiredParameter(
93 name = "StoreSupplementaryCalculations",
96 message = "Liste de calculs supplémentaires à stocker et/ou effectuer",
103 "CostFunctionJAtCurrentOptimum",
104 "CostFunctionJbAtCurrentOptimum",
105 "CostFunctionJoAtCurrentOptimum",
106 "CurrentIterationNumber",
111 "InnovationAtCurrentState",
114 "SimulatedObservationAtBackground",
115 "SimulatedObservationAtCurrentOptimum",
116 "SimulatedObservationAtCurrentState",
117 "SimulatedObservationAtOptimum",
120 self.defineRequiredParameter( # Pas de type
122 message = "Liste des valeurs de bornes",
124 self.requireInputArguments(
125 mandatory= ("Xb", "Y", "HO", "R", "B"),
134 "NonLocalOptimization",
140 def run(self, Xb=None, Y=None, U=None, HO=None, EM=None, CM=None, R=None, B=None, Q=None, Parameters=None):
141 self._pre_run(Parameters, Xb, Y, U, HO, EM, CM, R, B, Q)
143 if not PlatformInfo.has_nlopt and not self._parameters["Minimizer"] in ["COBYLA", "POWELL", "SIMPLEX"]:
145 "%s Minimization by SIMPLEX is forced because %s "%(self._name, self._parameters["Minimizer"]) + \
146 "is unavailable (COBYLA, POWELL are also available)")
147 self._parameters["Minimizer"] = "SIMPLEX"
149 Hm = HO["Direct"].appliedTo
154 def CostFunction(x, QualityMeasure="AugmentedWeightedLeastSquares"):
155 _X = numpy.ravel( x ).reshape((-1, 1))
156 _HX = numpy.ravel( Hm( _X ) ).reshape((-1, 1))
157 _Innovation = Y - _HX
158 self.StoredVariables["CurrentState"].store( _X )
159 if self._toStore("SimulatedObservationAtCurrentState") or \
160 self._toStore("SimulatedObservationAtCurrentOptimum"):
161 self.StoredVariables["SimulatedObservationAtCurrentState"].store( _HX )
162 if self._toStore("InnovationAtCurrentState"):
163 self.StoredVariables["InnovationAtCurrentState"].store( _Innovation )
165 if QualityMeasure in ["AugmentedWeightedLeastSquares", "AWLS", "DA"]:
166 if BI is None or RI is None:
167 raise ValueError("Background and Observation error covariance matrices has to be properly defined!")
168 Jb = vfloat(0.5 * (_X - Xb).T @ (BI @ (_X - Xb)))
169 Jo = vfloat(0.5 * _Innovation.T @ (RI @ _Innovation))
170 elif QualityMeasure in ["WeightedLeastSquares", "WLS"]:
172 raise ValueError("Observation error covariance matrix has to be properly defined!")
174 Jo = vfloat(0.5 * _Innovation.T @ (RI @ _Innovation))
175 elif QualityMeasure in ["LeastSquares", "LS", "L2"]:
177 Jo = vfloat(0.5 * _Innovation.T @ _Innovation)
178 elif QualityMeasure in ["AbsoluteValue", "L1"]:
180 Jo = vfloat(numpy.sum( numpy.abs(_Innovation) ))
181 elif QualityMeasure in ["MaximumError", "ME", "Linf"]:
183 Jo = vfloat(numpy.max( numpy.abs(_Innovation) ))
187 self.StoredVariables["CurrentIterationNumber"].store( len(self.StoredVariables["CostFunctionJ"]) )
188 self.StoredVariables["CostFunctionJb"].store( Jb )
189 self.StoredVariables["CostFunctionJo"].store( Jo )
190 self.StoredVariables["CostFunctionJ" ].store( J )
191 if self._toStore("IndexOfOptimum") or \
192 self._toStore("CurrentOptimum") or \
193 self._toStore("CostFunctionJAtCurrentOptimum") or \
194 self._toStore("CostFunctionJbAtCurrentOptimum") or \
195 self._toStore("CostFunctionJoAtCurrentOptimum") or \
196 self._toStore("SimulatedObservationAtCurrentOptimum"):
197 IndexMin = numpy.argmin( self.StoredVariables["CostFunctionJ"][nbPreviousSteps:] ) + nbPreviousSteps
198 if self._toStore("IndexOfOptimum"):
199 self.StoredVariables["IndexOfOptimum"].store( IndexMin )
200 if self._toStore("CurrentOptimum"):
201 self.StoredVariables["CurrentOptimum"].store(
202 self.StoredVariables["CurrentState"][IndexMin] )
203 if self._toStore("SimulatedObservationAtCurrentOptimum"):
204 self.StoredVariables["SimulatedObservationAtCurrentOptimum"].store(
205 self.StoredVariables["SimulatedObservationAtCurrentState"][IndexMin]
207 if self._toStore("CostFunctionJAtCurrentOptimum"):
208 self.StoredVariables["CostFunctionJAtCurrentOptimum" ].store(
209 self.StoredVariables["CostFunctionJ" ][IndexMin] )
210 if self._toStore("CostFunctionJbAtCurrentOptimum"):
211 self.StoredVariables["CostFunctionJbAtCurrentOptimum"].store(
212 self.StoredVariables["CostFunctionJb"][IndexMin] )
213 if self._toStore("CostFunctionJoAtCurrentOptimum"):
214 self.StoredVariables["CostFunctionJoAtCurrentOptimum"].store(
215 self.StoredVariables["CostFunctionJo"][IndexMin] )
218 Xini = numpy.ravel(Xb)
219 if len(Xini) < 2 and self._parameters["Minimizer"] == "NEWUOA":
221 "The minimizer %s "%self._parameters["Minimizer"] + \
222 "can not be used when the optimisation state dimension " + \
223 "is 1. Please choose another minimizer.")
225 # Minimisation de la fonctionnelle
226 # --------------------------------
227 nbPreviousSteps = self.StoredVariables["CostFunctionJ"].stepnumber()
229 if self._parameters["Minimizer"] == "POWELL":
230 Minimum, J_optimal, direc, niter, nfeval, rc = scipy.optimize.fmin_powell(
233 args = (self._parameters["QualityCriterion"],),
234 maxiter = self._parameters["MaximumNumberOfIterations"] - 1,
235 maxfun = self._parameters["MaximumNumberOfFunctionEvaluations"],
236 xtol = self._parameters["StateVariationTolerance"],
237 ftol = self._parameters["CostDecrementTolerance"],
239 disp = self._parameters["optdisp"],
241 elif self._parameters["Minimizer"] == "COBYLA" and not PlatformInfo.has_nlopt:
242 def make_constraints(bounds):
244 for (i, (a, b)) in enumerate(bounds):
245 lower = lambda x: x[i] - a # noqa: E731
246 upper = lambda x: b - x[i] # noqa: E731
247 constraints = constraints + [lower] + [upper]
249 if self._parameters["Bounds"] is None:
250 raise ValueError("Bounds have to be given for all axes as a list of lower/upper pairs!")
251 self._parameters["Bounds"] = ForceNumericBounds( self._parameters["Bounds"] )
252 Xini = ApplyBounds( Xini, self._parameters["Bounds"] )
253 Minimum = scipy.optimize.fmin_cobyla(
256 cons = make_constraints( self._parameters["Bounds"] ),
257 args = (self._parameters["QualityCriterion"],),
258 consargs = (), # To avoid extra-args
259 maxfun = self._parameters["MaximumNumberOfFunctionEvaluations"],
261 rhoend = self._parameters["StateVariationTolerance"],
262 catol = 2. * self._parameters["StateVariationTolerance"],
263 disp = self._parameters["optdisp"],
265 elif self._parameters["Minimizer"] == "COBYLA" and PlatformInfo.has_nlopt:
267 opt = nlopt.opt(nlopt.LN_COBYLA, Xini.size)
270 # DFO, so no gradient
271 return CostFunction(_Xx, self._parameters["QualityCriterion"])
272 opt.set_min_objective(_f)
273 self._parameters["Bounds"] = ForceNumericBounds( self._parameters["Bounds"] )
274 Xini = ApplyBounds( Xini, self._parameters["Bounds"] )
275 if self._parameters["Bounds"] is not None:
276 lub = numpy.array(self._parameters["Bounds"], dtype=float).reshape((Xini.size, 2))
277 lb = lub[:, 0]; lb[numpy.isnan(lb)] = -float('inf') # noqa: E702
278 ub = lub[:, 1]; ub[numpy.isnan(ub)] = +float('inf') # noqa: E702
279 if self._parameters["optdisp"]:
280 print("%s: upper bounds %s"%(opt.get_algorithm_name(), ub))
281 print("%s: lower bounds %s"%(opt.get_algorithm_name(), lb))
282 opt.set_upper_bounds(ub)
283 opt.set_lower_bounds(lb)
284 opt.set_ftol_rel(self._parameters["CostDecrementTolerance"])
285 opt.set_xtol_rel(2. * self._parameters["StateVariationTolerance"])
286 opt.set_maxeval(self._parameters["MaximumNumberOfFunctionEvaluations"])
287 Minimum = opt.optimize( Xini )
288 if self._parameters["optdisp"]:
289 print("%s: optimal state: %s"%(opt.get_algorithm_name(), Minimum))
290 print("%s: minimum of J: %s"%(opt.get_algorithm_name(), opt.last_optimum_value()))
291 print("%s: return code: %i"%(opt.get_algorithm_name(), opt.last_optimize_result()))
292 elif self._parameters["Minimizer"] == "SIMPLEX" and not PlatformInfo.has_nlopt:
293 Minimum, J_optimal, niter, nfeval, rc = scipy.optimize.fmin(
296 args = (self._parameters["QualityCriterion"],),
297 maxiter = self._parameters["MaximumNumberOfIterations"] - 1,
298 maxfun = self._parameters["MaximumNumberOfFunctionEvaluations"],
299 xtol = self._parameters["StateVariationTolerance"],
300 ftol = self._parameters["CostDecrementTolerance"],
302 disp = self._parameters["optdisp"],
304 elif self._parameters["Minimizer"] == "SIMPLEX" and PlatformInfo.has_nlopt:
306 opt = nlopt.opt(nlopt.LN_NELDERMEAD, Xini.size)
309 # DFO, so no gradient
310 return CostFunction(_Xx, self._parameters["QualityCriterion"])
311 opt.set_min_objective(_f)
312 self._parameters["Bounds"] = ForceNumericBounds( self._parameters["Bounds"] )
313 Xini = ApplyBounds( Xini, self._parameters["Bounds"] )
314 if self._parameters["Bounds"] is not None:
315 lub = numpy.array(self._parameters["Bounds"], dtype=float).reshape((Xini.size, 2))
316 lb = lub[:, 0]; lb[numpy.isnan(lb)] = -float('inf') # noqa: E702
317 ub = lub[:, 1]; ub[numpy.isnan(ub)] = +float('inf') # noqa: E702
318 if self._parameters["optdisp"]:
319 print("%s: upper bounds %s"%(opt.get_algorithm_name(), ub))
320 print("%s: lower bounds %s"%(opt.get_algorithm_name(), lb))
321 opt.set_upper_bounds(ub)
322 opt.set_lower_bounds(lb)
323 opt.set_ftol_rel(self._parameters["CostDecrementTolerance"])
324 opt.set_xtol_rel(2. * self._parameters["StateVariationTolerance"])
325 opt.set_maxeval(self._parameters["MaximumNumberOfFunctionEvaluations"])
326 Minimum = opt.optimize( Xini )
327 if self._parameters["optdisp"]:
328 print("%s: optimal state: %s"%(opt.get_algorithm_name(), Minimum))
329 print("%s: minimum of J: %s"%(opt.get_algorithm_name(), opt.last_optimum_value()))
330 print("%s: return code: %i"%(opt.get_algorithm_name(), opt.last_optimize_result()))
331 elif self._parameters["Minimizer"] == "BOBYQA" and PlatformInfo.has_nlopt:
333 opt = nlopt.opt(nlopt.LN_BOBYQA, Xini.size)
336 # DFO, so no gradient
337 return CostFunction(_Xx, self._parameters["QualityCriterion"])
338 opt.set_min_objective(_f)
339 self._parameters["Bounds"] = ForceNumericBounds( self._parameters["Bounds"] )
340 Xini = ApplyBounds( Xini, self._parameters["Bounds"] )
341 if self._parameters["Bounds"] is not None:
342 lub = numpy.array(self._parameters["Bounds"], dtype=float).reshape((Xini.size, 2))
343 lb = lub[:, 0]; lb[numpy.isnan(lb)] = -float('inf') # noqa: E702
344 ub = lub[:, 1]; ub[numpy.isnan(ub)] = +float('inf') # noqa: E702
345 if self._parameters["optdisp"]:
346 print("%s: upper bounds %s"%(opt.get_algorithm_name(), ub))
347 print("%s: lower bounds %s"%(opt.get_algorithm_name(), lb))
348 opt.set_upper_bounds(ub)
349 opt.set_lower_bounds(lb)
350 opt.set_ftol_rel(self._parameters["CostDecrementTolerance"])
351 opt.set_xtol_rel(2. * self._parameters["StateVariationTolerance"])
352 opt.set_maxeval(self._parameters["MaximumNumberOfFunctionEvaluations"])
353 Minimum = opt.optimize( Xini )
354 if self._parameters["optdisp"]:
355 print("%s: optimal state: %s"%(opt.get_algorithm_name(), Minimum))
356 print("%s: minimum of J: %s"%(opt.get_algorithm_name(), opt.last_optimum_value()))
357 print("%s: return code: %i"%(opt.get_algorithm_name(), opt.last_optimize_result()))
358 elif self._parameters["Minimizer"] == "NEWUOA" and PlatformInfo.has_nlopt:
360 opt = nlopt.opt(nlopt.LN_NEWUOA, Xini.size)
363 # DFO, so no gradient
364 return CostFunction(_Xx, self._parameters["QualityCriterion"])
365 opt.set_min_objective(_f)
366 self._parameters["Bounds"] = ForceNumericBounds( self._parameters["Bounds"] )
367 Xini = ApplyBounds( Xini, self._parameters["Bounds"] )
368 if self._parameters["Bounds"] is not None:
369 lub = numpy.array(self._parameters["Bounds"], dtype=float).reshape((Xini.size, 2))
370 lb = lub[:, 0]; lb[numpy.isnan(lb)] = -float('inf') # noqa: E702
371 ub = lub[:, 1]; ub[numpy.isnan(ub)] = +float('inf') # noqa: E702
372 if self._parameters["optdisp"]:
373 print("%s: upper bounds %s"%(opt.get_algorithm_name(), ub))
374 print("%s: lower bounds %s"%(opt.get_algorithm_name(), lb))
375 opt.set_upper_bounds(ub)
376 opt.set_lower_bounds(lb)
377 opt.set_ftol_rel(self._parameters["CostDecrementTolerance"])
378 opt.set_xtol_rel(2. * self._parameters["StateVariationTolerance"])
379 opt.set_maxeval(self._parameters["MaximumNumberOfFunctionEvaluations"])
380 Minimum = opt.optimize( Xini )
381 if self._parameters["optdisp"]:
382 print("%s: optimal state: %s"%(opt.get_algorithm_name(), Minimum))
383 print("%s: minimum of J: %s"%(opt.get_algorithm_name(), opt.last_optimum_value()))
384 print("%s: return code: %i"%(opt.get_algorithm_name(), opt.last_optimize_result()))
385 elif self._parameters["Minimizer"] == "SUBPLEX" and PlatformInfo.has_nlopt:
387 opt = nlopt.opt(nlopt.LN_SBPLX, Xini.size)
390 # DFO, so no gradient
391 return CostFunction(_Xx, self._parameters["QualityCriterion"])
392 opt.set_min_objective(_f)
393 self._parameters["Bounds"] = ForceNumericBounds( self._parameters["Bounds"] )
394 Xini = ApplyBounds( Xini, self._parameters["Bounds"] )
395 if self._parameters["Bounds"] is not None:
396 lub = numpy.array(self._parameters["Bounds"], dtype=float).reshape((Xini.size, 2))
397 lb = lub[:, 0]; lb[numpy.isnan(lb)] = -float('inf') # noqa: E702
398 ub = lub[:, 1]; ub[numpy.isnan(ub)] = +float('inf') # noqa: E702
399 if self._parameters["optdisp"]:
400 print("%s: upper bounds %s"%(opt.get_algorithm_name(), ub))
401 print("%s: lower bounds %s"%(opt.get_algorithm_name(), lb))
402 opt.set_upper_bounds(ub)
403 opt.set_lower_bounds(lb)
404 opt.set_ftol_rel(self._parameters["CostDecrementTolerance"])
405 opt.set_xtol_rel(2. * self._parameters["StateVariationTolerance"])
406 opt.set_maxeval(self._parameters["MaximumNumberOfFunctionEvaluations"])
407 Minimum = opt.optimize( Xini )
408 if self._parameters["optdisp"]:
409 print("%s: optimal state: %s"%(opt.get_algorithm_name(), Minimum))
410 print("%s: minimum of J: %s"%(opt.get_algorithm_name(), opt.last_optimum_value()))
411 print("%s: return code: %i"%(opt.get_algorithm_name(), opt.last_optimize_result()))
413 raise ValueError("Error in minimizer name: %s is unkown"%self._parameters["Minimizer"])
415 IndexMin = numpy.argmin( self.StoredVariables["CostFunctionJ"][nbPreviousSteps:] ) + nbPreviousSteps
416 Minimum = self.StoredVariables["CurrentState"][IndexMin]
418 # Obtention de l'analyse
419 # ----------------------
422 self.StoredVariables["Analysis"].store( Xa )
424 # Calculs et/ou stockages supplémentaires
425 # ---------------------------------------
426 if self._toStore("OMA") or \
427 self._toStore("SimulatedObservationAtOptimum"):
428 if self._toStore("SimulatedObservationAtCurrentState"):
429 HXa = self.StoredVariables["SimulatedObservationAtCurrentState"][IndexMin]
430 elif self._toStore("SimulatedObservationAtCurrentOptimum"):
431 HXa = self.StoredVariables["SimulatedObservationAtCurrentOptimum"][-1]
434 HXa = HXa.reshape((-1, 1))
435 if self._toStore("Innovation") or \
436 self._toStore("OMB") or \
437 self._toStore("SimulatedObservationAtBackground"):
438 HXb = Hm(Xb).reshape((-1, 1))
440 if self._toStore("Innovation"):
441 self.StoredVariables["Innovation"].store( Innovation )
442 if self._toStore("OMB"):
443 self.StoredVariables["OMB"].store( Innovation )
444 if self._toStore("BMA"):
445 self.StoredVariables["BMA"].store( numpy.ravel(Xb) - numpy.ravel(Xa) )
446 if self._toStore("OMA"):
447 self.StoredVariables["OMA"].store( Y - HXa )
448 if self._toStore("SimulatedObservationAtBackground"):
449 self.StoredVariables["SimulatedObservationAtBackground"].store( HXb )
450 if self._toStore("SimulatedObservationAtOptimum"):
451 self.StoredVariables["SimulatedObservationAtOptimum"].store( HXa )
453 self._post_run(HO, EM)
456 # ==============================================================================
457 if __name__ == "__main__":
458 print("\n AUTODIAGNOSTIC\n")