Salome HOME
Updating copyright date information
[modules/adao.git] / src / daComposant / daAlgorithms / DerivativeFreeOptimization.py
1 # -*- coding: utf-8 -*-
2 #
3 # Copyright (C) 2008-2024 EDF R&D
4 #
5 # This library is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation; either
8 # version 2.1 of the License.
9 #
10 # This library is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 # Lesser General Public License for more details.
14 #
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this library; if not, write to the Free Software
17 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
18 #
19 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
20 #
21 # Author: Jean-Philippe Argaud, jean-philippe.argaud@edf.fr, EDF R&D
22
23 import numpy, logging, scipy.optimize
24 from daCore import BasicObjects, PlatformInfo
25 from daCore.PlatformInfo import vfloat
26
27 # ==============================================================================
28 class ElementaryAlgorithm(BasicObjects.Algorithm):
29     def __init__(self):
30         BasicObjects.Algorithm.__init__(self, "DERIVATIVEFREEOPTIMIZATION")
31         self.defineRequiredParameter(
32             name     = "Minimizer",
33             default  = "BOBYQA",
34             typecast = str,
35             message  = "Minimiseur utilisé",
36             listval  = [
37                 "BOBYQA",
38                 "COBYLA",
39                 "NEWUOA",
40                 "POWELL",
41                 "SIMPLEX",
42                 "SUBPLEX",
43                 ],
44             )
45         self.defineRequiredParameter(
46             name     = "MaximumNumberOfIterations",
47             default  = 15000,
48             typecast = int,
49             message  = "Nombre maximal de pas d'optimisation",
50             minval   = -1,
51             oldname  = "MaximumNumberOfSteps",
52             )
53         self.defineRequiredParameter(
54             name     = "MaximumNumberOfFunctionEvaluations",
55             default  = 15000,
56             typecast = int,
57             message  = "Nombre maximal d'évaluations de la fonction",
58             minval   = -1,
59             )
60         self.defineRequiredParameter(
61             name     = "StateVariationTolerance",
62             default  = 1.e-4,
63             typecast = float,
64             message  = "Variation relative maximale de l'état lors de l'arrêt",
65             )
66         self.defineRequiredParameter(
67             name     = "CostDecrementTolerance",
68             default  = 1.e-7,
69             typecast = float,
70             message  = "Diminution relative minimale du cout lors de l'arrêt",
71             )
72         self.defineRequiredParameter(
73             name     = "QualityCriterion",
74             default  = "AugmentedWeightedLeastSquares",
75             typecast = str,
76             message  = "Critère de qualité utilisé",
77             listval  = [
78                 "AugmentedWeightedLeastSquares", "AWLS", "DA",
79                 "WeightedLeastSquares", "WLS",
80                 "LeastSquares", "LS", "L2",
81                 "AbsoluteValue", "L1",
82                 "MaximumError", "ME", "Linf",
83                 ],
84             )
85         self.defineRequiredParameter(
86             name     = "StoreInternalVariables",
87             default  = False,
88             typecast = bool,
89             message  = "Stockage des variables internes ou intermédiaires du calcul",
90             )
91         self.defineRequiredParameter(
92             name     = "StoreSupplementaryCalculations",
93             default  = [],
94             typecast = tuple,
95             message  = "Liste de calculs supplémentaires à stocker et/ou effectuer",
96             listval  = [
97                 "Analysis",
98                 "BMA",
99                 "CostFunctionJ",
100                 "CostFunctionJb",
101                 "CostFunctionJo",
102                 "CostFunctionJAtCurrentOptimum",
103                 "CostFunctionJbAtCurrentOptimum",
104                 "CostFunctionJoAtCurrentOptimum",
105                 "CurrentIterationNumber",
106                 "CurrentOptimum",
107                 "CurrentState",
108                 "IndexOfOptimum",
109                 "Innovation",
110                 "InnovationAtCurrentState",
111                 "OMA",
112                 "OMB",
113                 "SimulatedObservationAtBackground",
114                 "SimulatedObservationAtCurrentOptimum",
115                 "SimulatedObservationAtCurrentState",
116                 "SimulatedObservationAtOptimum",
117                 ]
118             )
119         self.defineRequiredParameter( # Pas de type
120             name     = "Bounds",
121             message  = "Liste des valeurs de bornes",
122             )
123         self.requireInputArguments(
124             mandatory= ("Xb", "Y", "HO", "R", "B"),
125             )
126         self.setAttributes(tags=(
127             "Optimization",
128             "NonLinear",
129             "MetaHeuristic",
130             ))
131
132     def run(self, Xb=None, Y=None, U=None, HO=None, EM=None, CM=None, R=None, B=None, Q=None, Parameters=None):
133         self._pre_run(Parameters, Xb, Y, U, HO, EM, CM, R, B, Q)
134         #
135         if not PlatformInfo.has_nlopt and not self._parameters["Minimizer"] in ["COBYLA", "POWELL", "SIMPLEX"]:
136             logging.warning("%s Minimization by SIMPLEX is forced because %s is unavailable (COBYLA, POWELL are also available)"%(self._name,self._parameters["Minimizer"]))
137             self._parameters["Minimizer"] = "SIMPLEX"
138         #
139         Hm = HO["Direct"].appliedTo
140         #
141         BI = B.getI()
142         RI = R.getI()
143         #
144         def CostFunction(x, QualityMeasure="AugmentedWeightedLeastSquares"):
145             _X  = numpy.ravel( x ).reshape((-1,1))
146             _HX = numpy.ravel( Hm( _X ) ).reshape((-1,1))
147             _Innovation = Y - _HX
148             self.StoredVariables["CurrentState"].store( _X )
149             if self._toStore("SimulatedObservationAtCurrentState") or \
150                 self._toStore("SimulatedObservationAtCurrentOptimum"):
151                 self.StoredVariables["SimulatedObservationAtCurrentState"].store( _HX )
152             if self._toStore("InnovationAtCurrentState"):
153                 self.StoredVariables["InnovationAtCurrentState"].store( _Innovation )
154             #
155             if QualityMeasure in ["AugmentedWeightedLeastSquares","AWLS","DA"]:
156                 if BI is None or RI is None:
157                     raise ValueError("Background and Observation error covariance matrices has to be properly defined!")
158                 Jb  = vfloat(0.5 * (_X - Xb).T @ (BI @ (_X - Xb)))
159                 Jo  = vfloat(0.5 * _Innovation.T @ (RI @ _Innovation))
160             elif QualityMeasure in ["WeightedLeastSquares","WLS"]:
161                 if RI is None:
162                     raise ValueError("Observation error covariance matrix has to be properly defined!")
163                 Jb  = 0.
164                 Jo  = vfloat(0.5 * _Innovation.T @ (RI @ _Innovation))
165             elif QualityMeasure in ["LeastSquares","LS","L2"]:
166                 Jb  = 0.
167                 Jo  = vfloat(0.5 * _Innovation.T @ _Innovation)
168             elif QualityMeasure in ["AbsoluteValue","L1"]:
169                 Jb  = 0.
170                 Jo  = vfloat(numpy.sum( numpy.abs(_Innovation) ))
171             elif QualityMeasure in ["MaximumError","ME", "Linf"]:
172                 Jb  = 0.
173                 Jo  = vfloat(numpy.max( numpy.abs(_Innovation) ))
174             #
175             J   = Jb + Jo
176             #
177             self.StoredVariables["CurrentIterationNumber"].store( len(self.StoredVariables["CostFunctionJ"]) )
178             self.StoredVariables["CostFunctionJb"].store( Jb )
179             self.StoredVariables["CostFunctionJo"].store( Jo )
180             self.StoredVariables["CostFunctionJ" ].store( J )
181             if self._toStore("IndexOfOptimum") or \
182                 self._toStore("CurrentOptimum") or \
183                 self._toStore("CostFunctionJAtCurrentOptimum") or \
184                 self._toStore("CostFunctionJbAtCurrentOptimum") or \
185                 self._toStore("CostFunctionJoAtCurrentOptimum") or \
186                 self._toStore("SimulatedObservationAtCurrentOptimum"):
187                 IndexMin = numpy.argmin( self.StoredVariables["CostFunctionJ"][nbPreviousSteps:] ) + nbPreviousSteps
188             if self._toStore("IndexOfOptimum"):
189                 self.StoredVariables["IndexOfOptimum"].store( IndexMin )
190             if self._toStore("CurrentOptimum"):
191                 self.StoredVariables["CurrentOptimum"].store( self.StoredVariables["CurrentState"][IndexMin] )
192             if self._toStore("SimulatedObservationAtCurrentOptimum"):
193                 self.StoredVariables["SimulatedObservationAtCurrentOptimum"].store( self.StoredVariables["SimulatedObservationAtCurrentState"][IndexMin] )
194             if self._toStore("CostFunctionJAtCurrentOptimum"):
195                 self.StoredVariables["CostFunctionJAtCurrentOptimum" ].store( self.StoredVariables["CostFunctionJ" ][IndexMin] )
196             if self._toStore("CostFunctionJbAtCurrentOptimum"):
197                 self.StoredVariables["CostFunctionJbAtCurrentOptimum"].store( self.StoredVariables["CostFunctionJb"][IndexMin] )
198             if self._toStore("CostFunctionJoAtCurrentOptimum"):
199                 self.StoredVariables["CostFunctionJoAtCurrentOptimum"].store( self.StoredVariables["CostFunctionJo"][IndexMin] )
200             return J
201         #
202         Xini = numpy.ravel(Xb)
203         if len(Xini) < 2 and self._parameters["Minimizer"] == "NEWUOA":
204             raise ValueError("The minimizer %s can not be used when the optimisation state dimension is 1. Please choose another minimizer."%self._parameters["Minimizer"])
205         #
206         # Minimisation de la fonctionnelle
207         # --------------------------------
208         nbPreviousSteps = self.StoredVariables["CostFunctionJ"].stepnumber()
209         #
210         if self._parameters["Minimizer"] == "POWELL":
211             Minimum, J_optimal, direc, niter, nfeval, rc = scipy.optimize.fmin_powell(
212                 func        = CostFunction,
213                 x0          = Xini,
214                 args        = (self._parameters["QualityCriterion"],),
215                 maxiter     = self._parameters["MaximumNumberOfIterations"]-1,
216                 maxfun      = self._parameters["MaximumNumberOfFunctionEvaluations"],
217                 xtol        = self._parameters["StateVariationTolerance"],
218                 ftol        = self._parameters["CostDecrementTolerance"],
219                 full_output = True,
220                 disp        = self._parameters["optdisp"],
221                 )
222         elif self._parameters["Minimizer"] == "COBYLA" and not PlatformInfo.has_nlopt:
223             def make_constraints(bounds):
224                 constraints = []
225                 for (i,(a,b)) in enumerate(bounds):
226                     lower = lambda x: x[i] - a
227                     upper = lambda x: b - x[i]
228                     constraints = constraints + [lower] + [upper]
229                 return constraints
230             if self._parameters["Bounds"] is None:
231                 raise ValueError("Bounds have to be given for all axes as a list of lower/upper pairs!")
232             Minimum = scipy.optimize.fmin_cobyla(
233                 func        = CostFunction,
234                 x0          = Xini,
235                 cons        = make_constraints( self._parameters["Bounds"] ),
236                 args        = (self._parameters["QualityCriterion"],),
237                 consargs    = (), # To avoid extra-args
238                 maxfun      = self._parameters["MaximumNumberOfFunctionEvaluations"],
239                 rhobeg      = 1.0,
240                 rhoend      = self._parameters["StateVariationTolerance"],
241                 catol       = 2.*self._parameters["StateVariationTolerance"],
242                 disp        = self._parameters["optdisp"],
243                 )
244         elif self._parameters["Minimizer"] == "COBYLA" and PlatformInfo.has_nlopt:
245             import nlopt
246             opt = nlopt.opt(nlopt.LN_COBYLA, Xini.size)
247             def _f(_Xx, Grad):
248                 # DFO, so no gradient
249                 return CostFunction(_Xx, self._parameters["QualityCriterion"])
250             opt.set_min_objective(_f)
251             if self._parameters["Bounds"] is not None:
252                 lub = numpy.array(self._parameters["Bounds"],dtype=float).reshape((Xini.size,2))
253                 lb = lub[:,0] ; lb[numpy.isnan(lb)] = -float('inf')
254                 ub = lub[:,1] ; ub[numpy.isnan(ub)] = +float('inf')
255                 if self._parameters["optdisp"]:
256                     print("%s: upper bounds %s"%(opt.get_algorithm_name(),ub))
257                     print("%s: lower bounds %s"%(opt.get_algorithm_name(),lb))
258                 opt.set_upper_bounds(ub)
259                 opt.set_lower_bounds(lb)
260             opt.set_ftol_rel(self._parameters["CostDecrementTolerance"])
261             opt.set_xtol_rel(2.*self._parameters["StateVariationTolerance"])
262             opt.set_maxeval(self._parameters["MaximumNumberOfFunctionEvaluations"])
263             Minimum = opt.optimize( Xini )
264             if self._parameters["optdisp"]:
265                 print("%s: optimal state: %s"%(opt.get_algorithm_name(),Minimum))
266                 print("%s: minimum of J: %s"%(opt.get_algorithm_name(),opt.last_optimum_value()))
267                 print("%s: return code: %i"%(opt.get_algorithm_name(),opt.last_optimize_result()))
268         elif self._parameters["Minimizer"] == "SIMPLEX" and not PlatformInfo.has_nlopt:
269             Minimum, J_optimal, niter, nfeval, rc = scipy.optimize.fmin(
270                 func        = CostFunction,
271                 x0          = Xini,
272                 args        = (self._parameters["QualityCriterion"],),
273                 maxiter     = self._parameters["MaximumNumberOfIterations"]-1,
274                 maxfun      = self._parameters["MaximumNumberOfFunctionEvaluations"],
275                 xtol        = self._parameters["StateVariationTolerance"],
276                 ftol        = self._parameters["CostDecrementTolerance"],
277                 full_output = True,
278                 disp        = self._parameters["optdisp"],
279                 )
280         elif self._parameters["Minimizer"] == "SIMPLEX" and PlatformInfo.has_nlopt:
281             import nlopt
282             opt = nlopt.opt(nlopt.LN_NELDERMEAD, Xini.size)
283             def _f(_Xx, Grad):
284                 # DFO, so no gradient
285                 return CostFunction(_Xx, self._parameters["QualityCriterion"])
286             opt.set_min_objective(_f)
287             if self._parameters["Bounds"] is not None:
288                 lub = numpy.array(self._parameters["Bounds"],dtype=float).reshape((Xini.size,2))
289                 lb = lub[:,0] ; lb[numpy.isnan(lb)] = -float('inf')
290                 ub = lub[:,1] ; ub[numpy.isnan(ub)] = +float('inf')
291                 if self._parameters["optdisp"]:
292                     print("%s: upper bounds %s"%(opt.get_algorithm_name(),ub))
293                     print("%s: lower bounds %s"%(opt.get_algorithm_name(),lb))
294                 opt.set_upper_bounds(ub)
295                 opt.set_lower_bounds(lb)
296             opt.set_ftol_rel(self._parameters["CostDecrementTolerance"])
297             opt.set_xtol_rel(2.*self._parameters["StateVariationTolerance"])
298             opt.set_maxeval(self._parameters["MaximumNumberOfFunctionEvaluations"])
299             Minimum = opt.optimize( Xini )
300             if self._parameters["optdisp"]:
301                 print("%s: optimal state: %s"%(opt.get_algorithm_name(),Minimum))
302                 print("%s: minimum of J: %s"%(opt.get_algorithm_name(),opt.last_optimum_value()))
303                 print("%s: return code: %i"%(opt.get_algorithm_name(),opt.last_optimize_result()))
304         elif self._parameters["Minimizer"] == "BOBYQA" and PlatformInfo.has_nlopt:
305             import nlopt
306             opt = nlopt.opt(nlopt.LN_BOBYQA, Xini.size)
307             def _f(_Xx, Grad):
308                 # DFO, so no gradient
309                 return CostFunction(_Xx, self._parameters["QualityCriterion"])
310             opt.set_min_objective(_f)
311             if self._parameters["Bounds"] is not None:
312                 lub = numpy.array(self._parameters["Bounds"],dtype=float).reshape((Xini.size,2))
313                 lb = lub[:,0] ; lb[numpy.isnan(lb)] = -float('inf')
314                 ub = lub[:,1] ; ub[numpy.isnan(ub)] = +float('inf')
315                 if self._parameters["optdisp"]:
316                     print("%s: upper bounds %s"%(opt.get_algorithm_name(),ub))
317                     print("%s: lower bounds %s"%(opt.get_algorithm_name(),lb))
318                 opt.set_upper_bounds(ub)
319                 opt.set_lower_bounds(lb)
320             opt.set_ftol_rel(self._parameters["CostDecrementTolerance"])
321             opt.set_xtol_rel(2.*self._parameters["StateVariationTolerance"])
322             opt.set_maxeval(self._parameters["MaximumNumberOfFunctionEvaluations"])
323             Minimum = opt.optimize( Xini )
324             if self._parameters["optdisp"]:
325                 print("%s: optimal state: %s"%(opt.get_algorithm_name(),Minimum))
326                 print("%s: minimum of J: %s"%(opt.get_algorithm_name(),opt.last_optimum_value()))
327                 print("%s: return code: %i"%(opt.get_algorithm_name(),opt.last_optimize_result()))
328         elif self._parameters["Minimizer"] == "NEWUOA" and PlatformInfo.has_nlopt:
329             import nlopt
330             opt = nlopt.opt(nlopt.LN_NEWUOA, Xini.size)
331             def _f(_Xx, Grad):
332                 # DFO, so no gradient
333                 return CostFunction(_Xx, self._parameters["QualityCriterion"])
334             opt.set_min_objective(_f)
335             if self._parameters["Bounds"] is not None:
336                 lub = numpy.array(self._parameters["Bounds"],dtype=float).reshape((Xini.size,2))
337                 lb = lub[:,0] ; lb[numpy.isnan(lb)] = -float('inf')
338                 ub = lub[:,1] ; ub[numpy.isnan(ub)] = +float('inf')
339                 if self._parameters["optdisp"]:
340                     print("%s: upper bounds %s"%(opt.get_algorithm_name(),ub))
341                     print("%s: lower bounds %s"%(opt.get_algorithm_name(),lb))
342                 opt.set_upper_bounds(ub)
343                 opt.set_lower_bounds(lb)
344             opt.set_ftol_rel(self._parameters["CostDecrementTolerance"])
345             opt.set_xtol_rel(2.*self._parameters["StateVariationTolerance"])
346             opt.set_maxeval(self._parameters["MaximumNumberOfFunctionEvaluations"])
347             Minimum = opt.optimize( Xini )
348             if self._parameters["optdisp"]:
349                 print("%s: optimal state: %s"%(opt.get_algorithm_name(),Minimum))
350                 print("%s: minimum of J: %s"%(opt.get_algorithm_name(),opt.last_optimum_value()))
351                 print("%s: return code: %i"%(opt.get_algorithm_name(),opt.last_optimize_result()))
352         elif self._parameters["Minimizer"] == "SUBPLEX" and PlatformInfo.has_nlopt:
353             import nlopt
354             opt = nlopt.opt(nlopt.LN_SBPLX, Xini.size)
355             def _f(_Xx, Grad):
356                 # DFO, so no gradient
357                 return CostFunction(_Xx, self._parameters["QualityCriterion"])
358             opt.set_min_objective(_f)
359             if self._parameters["Bounds"] is not None:
360                 lub = numpy.array(self._parameters["Bounds"],dtype=float).reshape((Xini.size,2))
361                 lb = lub[:,0] ; lb[numpy.isnan(lb)] = -float('inf')
362                 ub = lub[:,1] ; ub[numpy.isnan(ub)] = +float('inf')
363                 if self._parameters["optdisp"]:
364                     print("%s: upper bounds %s"%(opt.get_algorithm_name(),ub))
365                     print("%s: lower bounds %s"%(opt.get_algorithm_name(),lb))
366                 opt.set_upper_bounds(ub)
367                 opt.set_lower_bounds(lb)
368             opt.set_ftol_rel(self._parameters["CostDecrementTolerance"])
369             opt.set_xtol_rel(2.*self._parameters["StateVariationTolerance"])
370             opt.set_maxeval(self._parameters["MaximumNumberOfFunctionEvaluations"])
371             Minimum = opt.optimize( Xini )
372             if self._parameters["optdisp"]:
373                 print("%s: optimal state: %s"%(opt.get_algorithm_name(),Minimum))
374                 print("%s: minimum of J: %s"%(opt.get_algorithm_name(),opt.last_optimum_value()))
375                 print("%s: return code: %i"%(opt.get_algorithm_name(),opt.last_optimize_result()))
376         else:
377             raise ValueError("Error in minimizer name: %s is unkown"%self._parameters["Minimizer"])
378         #
379         IndexMin = numpy.argmin( self.StoredVariables["CostFunctionJ"][nbPreviousSteps:] ) + nbPreviousSteps
380         MinJ     = self.StoredVariables["CostFunctionJ"][IndexMin]
381         Minimum  = self.StoredVariables["CurrentState"][IndexMin]
382         #
383         # Obtention de l'analyse
384         # ----------------------
385         Xa = Minimum
386         #
387         self.StoredVariables["Analysis"].store( Xa )
388         #
389         # Calculs et/ou stockages supplémentaires
390         # ---------------------------------------
391         if self._toStore("OMA") or \
392             self._toStore("SimulatedObservationAtOptimum"):
393             if self._toStore("SimulatedObservationAtCurrentState"):
394                 HXa = self.StoredVariables["SimulatedObservationAtCurrentState"][IndexMin]
395             elif self._toStore("SimulatedObservationAtCurrentOptimum"):
396                 HXa = self.StoredVariables["SimulatedObservationAtCurrentOptimum"][-1]
397             else:
398                 HXa = Hm(Xa)
399             HXa = HXa.reshape((-1,1))
400         if self._toStore("Innovation") or \
401             self._toStore("OMB") or \
402             self._toStore("SimulatedObservationAtBackground"):
403             HXb = Hm(Xb).reshape((-1,1))
404             Innovation = Y - HXb
405         if self._toStore("Innovation"):
406             self.StoredVariables["Innovation"].store( Innovation )
407         if self._toStore("OMB"):
408             self.StoredVariables["OMB"].store( Innovation )
409         if self._toStore("BMA"):
410             self.StoredVariables["BMA"].store( numpy.ravel(Xb) - numpy.ravel(Xa) )
411         if self._toStore("OMA"):
412             self.StoredVariables["OMA"].store( Y - HXa )
413         if self._toStore("SimulatedObservationAtBackground"):
414             self.StoredVariables["SimulatedObservationAtBackground"].store( HXb )
415         if self._toStore("SimulatedObservationAtOptimum"):
416             self.StoredVariables["SimulatedObservationAtOptimum"].store( HXa )
417         #
418         self._post_run(HO)
419         return 0
420
421 # ==============================================================================
422 if __name__ == "__main__":
423     print("\n AUTODIAGNOSTIC\n")