Salome HOME
Minor documentation and code review corrections (33)
[modules/adao.git] / src / daComposant / daAlgorithms / DerivativeFreeOptimization.py
1 # -*- coding: utf-8 -*-
2 #
3 # Copyright (C) 2008-2022 EDF R&D
4 #
5 # This library is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation; either
8 # version 2.1 of the License.
9 #
10 # This library is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 # Lesser General Public License for more details.
14 #
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this library; if not, write to the Free Software
17 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
18 #
19 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
20 #
21 # Author: Jean-Philippe Argaud, jean-philippe.argaud@edf.fr, EDF R&D
22
23 import numpy, logging, scipy.optimize
24 from daCore import BasicObjects, PlatformInfo
25
26 # ==============================================================================
27 class ElementaryAlgorithm(BasicObjects.Algorithm):
28     def __init__(self):
29         BasicObjects.Algorithm.__init__(self, "DERIVATIVEFREEOPTIMIZATION")
30         self.defineRequiredParameter(
31             name     = "Minimizer",
32             default  = "BOBYQA",
33             typecast = str,
34             message  = "Minimiseur utilisé",
35             listval  = [
36                 "BOBYQA",
37                 "COBYLA",
38                 "NEWUOA",
39                 "POWELL",
40                 "SIMPLEX",
41                 "SUBPLEX",
42                 ],
43             )
44         self.defineRequiredParameter(
45             name     = "MaximumNumberOfIterations",
46             default  = 15000,
47             typecast = int,
48             message  = "Nombre maximal de pas d'optimisation",
49             minval   = -1,
50             oldname  = "MaximumNumberOfSteps",
51             )
52         self.defineRequiredParameter(
53             name     = "MaximumNumberOfFunctionEvaluations",
54             default  = 15000,
55             typecast = int,
56             message  = "Nombre maximal d'évaluations de la fonction",
57             minval   = -1,
58             )
59         self.defineRequiredParameter(
60             name     = "StateVariationTolerance",
61             default  = 1.e-4,
62             typecast = float,
63             message  = "Variation relative maximale de l'état lors de l'arrêt",
64             )
65         self.defineRequiredParameter(
66             name     = "CostDecrementTolerance",
67             default  = 1.e-7,
68             typecast = float,
69             message  = "Diminution relative minimale du cout lors de l'arrêt",
70             )
71         self.defineRequiredParameter(
72             name     = "QualityCriterion",
73             default  = "AugmentedWeightedLeastSquares",
74             typecast = str,
75             message  = "Critère de qualité utilisé",
76             listval  = [
77                 "AugmentedWeightedLeastSquares", "AWLS", "DA",
78                 "WeightedLeastSquares", "WLS",
79                 "LeastSquares", "LS", "L2",
80                 "AbsoluteValue", "L1",
81                 "MaximumError", "ME", "Linf",
82                 ],
83             )
84         self.defineRequiredParameter(
85             name     = "StoreInternalVariables",
86             default  = False,
87             typecast = bool,
88             message  = "Stockage des variables internes ou intermédiaires du calcul",
89             )
90         self.defineRequiredParameter(
91             name     = "StoreSupplementaryCalculations",
92             default  = [],
93             typecast = tuple,
94             message  = "Liste de calculs supplémentaires à stocker et/ou effectuer",
95             listval  = [
96                 "Analysis",
97                 "BMA",
98                 "CostFunctionJ",
99                 "CostFunctionJb",
100                 "CostFunctionJo",
101                 "CostFunctionJAtCurrentOptimum",
102                 "CostFunctionJbAtCurrentOptimum",
103                 "CostFunctionJoAtCurrentOptimum",
104                 "CurrentIterationNumber",
105                 "CurrentOptimum",
106                 "CurrentState",
107                 "IndexOfOptimum",
108                 "Innovation",
109                 "InnovationAtCurrentState",
110                 "OMA",
111                 "OMB",
112                 "SimulatedObservationAtBackground",
113                 "SimulatedObservationAtCurrentOptimum",
114                 "SimulatedObservationAtCurrentState",
115                 "SimulatedObservationAtOptimum",
116                 ]
117             )
118         self.defineRequiredParameter( # Pas de type
119             name     = "Bounds",
120             message  = "Liste des valeurs de bornes",
121             )
122         self.requireInputArguments(
123             mandatory= ("Xb", "Y", "HO", "R", "B"),
124             )
125         self.setAttributes(tags=(
126             "Optimization",
127             "NonLinear",
128             "MetaHeuristic",
129             ))
130
131     def run(self, Xb=None, Y=None, U=None, HO=None, EM=None, CM=None, R=None, B=None, Q=None, Parameters=None):
132         self._pre_run(Parameters, Xb, Y, U, HO, EM, CM, R, B, Q)
133         #
134         if not PlatformInfo.has_nlopt and not self._parameters["Minimizer"] in ["COBYLA", "POWELL", "SIMPLEX"]:
135             logging.warning("%s Minimization by SIMPLEX is forced because %s is unavailable (COBYLA, POWELL are also available)"%(self._name,self._parameters["Minimizer"]))
136             self._parameters["Minimizer"] = "SIMPLEX"
137         #
138         Hm = HO["Direct"].appliedTo
139         #
140         BI = B.getI()
141         RI = R.getI()
142         #
143         def CostFunction(x, QualityMeasure="AugmentedWeightedLeastSquares"):
144             _X  = numpy.ravel( x ).reshape((-1,1))
145             _HX = numpy.ravel( Hm( _X ) ).reshape((-1,1))
146             _Innovation = Y - _HX
147             self.StoredVariables["CurrentState"].store( _X )
148             if self._toStore("SimulatedObservationAtCurrentState") or \
149                 self._toStore("SimulatedObservationAtCurrentOptimum"):
150                 self.StoredVariables["SimulatedObservationAtCurrentState"].store( _HX )
151             if self._toStore("InnovationAtCurrentState"):
152                 self.StoredVariables["InnovationAtCurrentState"].store( _Innovation )
153             #
154             if QualityMeasure in ["AugmentedWeightedLeastSquares","AWLS","DA"]:
155                 if BI is None or RI is None:
156                     raise ValueError("Background and Observation error covariance matrices has to be properly defined!")
157                 Jb  = 0.5 * (_X - Xb).T @ (BI @ (_X - Xb))
158                 Jo  = 0.5 * _Innovation.T @ (RI @ _Innovation)
159             elif QualityMeasure in ["WeightedLeastSquares","WLS"]:
160                 if RI is None:
161                     raise ValueError("Observation error covariance matrix has to be properly defined!")
162                 Jb  = 0.
163                 Jo  = 0.5 * _Innovation.T @ (RI @ _Innovation)
164             elif QualityMeasure in ["LeastSquares","LS","L2"]:
165                 Jb  = 0.
166                 Jo  = 0.5 * _Innovation.T @ _Innovation
167             elif QualityMeasure in ["AbsoluteValue","L1"]:
168                 Jb  = 0.
169                 Jo  = numpy.sum( numpy.abs(_Innovation) )
170             elif QualityMeasure in ["MaximumError","ME", "Linf"]:
171                 Jb  = 0.
172                 Jo  = numpy.max( numpy.abs(_Innovation) )
173             #
174             J   = float( Jb ) + float( Jo )
175             #
176             self.StoredVariables["CurrentIterationNumber"].store( len(self.StoredVariables["CostFunctionJ"]) )
177             self.StoredVariables["CostFunctionJb"].store( Jb )
178             self.StoredVariables["CostFunctionJo"].store( Jo )
179             self.StoredVariables["CostFunctionJ" ].store( J )
180             if self._toStore("IndexOfOptimum") or \
181                 self._toStore("CurrentOptimum") or \
182                 self._toStore("CostFunctionJAtCurrentOptimum") or \
183                 self._toStore("CostFunctionJbAtCurrentOptimum") or \
184                 self._toStore("CostFunctionJoAtCurrentOptimum") or \
185                 self._toStore("SimulatedObservationAtCurrentOptimum"):
186                 IndexMin = numpy.argmin( self.StoredVariables["CostFunctionJ"][nbPreviousSteps:] ) + nbPreviousSteps
187             if self._toStore("IndexOfOptimum"):
188                 self.StoredVariables["IndexOfOptimum"].store( IndexMin )
189             if self._toStore("CurrentOptimum"):
190                 self.StoredVariables["CurrentOptimum"].store( self.StoredVariables["CurrentState"][IndexMin] )
191             if self._toStore("SimulatedObservationAtCurrentOptimum"):
192                 self.StoredVariables["SimulatedObservationAtCurrentOptimum"].store( self.StoredVariables["SimulatedObservationAtCurrentState"][IndexMin] )
193             if self._toStore("CostFunctionJAtCurrentOptimum"):
194                 self.StoredVariables["CostFunctionJAtCurrentOptimum" ].store( self.StoredVariables["CostFunctionJ" ][IndexMin] )
195             if self._toStore("CostFunctionJbAtCurrentOptimum"):
196                 self.StoredVariables["CostFunctionJbAtCurrentOptimum"].store( self.StoredVariables["CostFunctionJb"][IndexMin] )
197             if self._toStore("CostFunctionJoAtCurrentOptimum"):
198                 self.StoredVariables["CostFunctionJoAtCurrentOptimum"].store( self.StoredVariables["CostFunctionJo"][IndexMin] )
199             return J
200         #
201         Xini = numpy.ravel(Xb)
202         if len(Xini) < 2 and self._parameters["Minimizer"] == "NEWUOA":
203             raise ValueError("The minimizer %s can not be used when the optimisation state dimension is 1. Please choose another minimizer."%self._parameters["Minimizer"])
204         #
205         # Minimisation de la fonctionnelle
206         # --------------------------------
207         nbPreviousSteps = self.StoredVariables["CostFunctionJ"].stepnumber()
208         #
209         if self._parameters["Minimizer"] == "POWELL":
210             Minimum, J_optimal, direc, niter, nfeval, rc = scipy.optimize.fmin_powell(
211                 func        = CostFunction,
212                 x0          = Xini,
213                 args        = (self._parameters["QualityCriterion"],),
214                 maxiter     = self._parameters["MaximumNumberOfIterations"]-1,
215                 maxfun      = self._parameters["MaximumNumberOfFunctionEvaluations"],
216                 xtol        = self._parameters["StateVariationTolerance"],
217                 ftol        = self._parameters["CostDecrementTolerance"],
218                 full_output = True,
219                 disp        = self._parameters["optdisp"],
220                 )
221         elif self._parameters["Minimizer"] == "COBYLA" and not PlatformInfo.has_nlopt:
222             def make_constraints(bounds):
223                 constraints = []
224                 for (i,(a,b)) in enumerate(bounds):
225                     lower = lambda x: x[i] - a
226                     upper = lambda x: b - x[i]
227                     constraints = constraints + [lower] + [upper]
228                 return constraints
229             if self._parameters["Bounds"] is None:
230                 raise ValueError("Bounds have to be given for all axes as a list of lower/upper pairs!")
231             Minimum = scipy.optimize.fmin_cobyla(
232                 func        = CostFunction,
233                 x0          = Xini,
234                 cons        = make_constraints( self._parameters["Bounds"] ),
235                 args        = (self._parameters["QualityCriterion"],),
236                 consargs    = (), # To avoid extra-args
237                 maxfun      = self._parameters["MaximumNumberOfFunctionEvaluations"],
238                 rhobeg      = 1.0,
239                 rhoend      = self._parameters["StateVariationTolerance"],
240                 catol       = 2.*self._parameters["StateVariationTolerance"],
241                 disp        = self._parameters["optdisp"],
242                 )
243         elif self._parameters["Minimizer"] == "COBYLA" and PlatformInfo.has_nlopt:
244             import nlopt
245             opt = nlopt.opt(nlopt.LN_COBYLA, Xini.size)
246             def _f(_Xx, Grad):
247                 # DFO, so no gradient
248                 return CostFunction(_Xx, self._parameters["QualityCriterion"])
249             opt.set_min_objective(_f)
250             if self._parameters["Bounds"] is not None:
251                 lub = numpy.array(self._parameters["Bounds"],dtype=float).reshape((Xini.size,2))
252                 lb = lub[:,0] ; lb[numpy.isnan(lb)] = -float('inf')
253                 ub = lub[:,1] ; ub[numpy.isnan(ub)] = +float('inf')
254                 if self._parameters["optdisp"]:
255                     print("%s: upper bounds %s"%(opt.get_algorithm_name(),ub))
256                     print("%s: lower bounds %s"%(opt.get_algorithm_name(),lb))
257                 opt.set_upper_bounds(ub)
258                 opt.set_lower_bounds(lb)
259             opt.set_ftol_rel(self._parameters["CostDecrementTolerance"])
260             opt.set_xtol_rel(2.*self._parameters["StateVariationTolerance"])
261             opt.set_maxeval(self._parameters["MaximumNumberOfFunctionEvaluations"])
262             Minimum = opt.optimize( Xini )
263             if self._parameters["optdisp"]:
264                 print("%s: optimal state: %s"%(opt.get_algorithm_name(),Minimum))
265                 print("%s: minimum of J: %s"%(opt.get_algorithm_name(),opt.last_optimum_value()))
266                 print("%s: return code: %i"%(opt.get_algorithm_name(),opt.last_optimize_result()))
267         elif self._parameters["Minimizer"] == "SIMPLEX" and not PlatformInfo.has_nlopt:
268             Minimum, J_optimal, niter, nfeval, rc = scipy.optimize.fmin(
269                 func        = CostFunction,
270                 x0          = Xini,
271                 args        = (self._parameters["QualityCriterion"],),
272                 maxiter     = self._parameters["MaximumNumberOfIterations"]-1,
273                 maxfun      = self._parameters["MaximumNumberOfFunctionEvaluations"],
274                 xtol        = self._parameters["StateVariationTolerance"],
275                 ftol        = self._parameters["CostDecrementTolerance"],
276                 full_output = True,
277                 disp        = self._parameters["optdisp"],
278                 )
279         elif self._parameters["Minimizer"] == "SIMPLEX" and PlatformInfo.has_nlopt:
280             import nlopt
281             opt = nlopt.opt(nlopt.LN_NELDERMEAD, Xini.size)
282             def _f(_Xx, Grad):
283                 # DFO, so no gradient
284                 return CostFunction(_Xx, self._parameters["QualityCriterion"])
285             opt.set_min_objective(_f)
286             if self._parameters["Bounds"] is not None:
287                 lub = numpy.array(self._parameters["Bounds"],dtype=float).reshape((Xini.size,2))
288                 lb = lub[:,0] ; lb[numpy.isnan(lb)] = -float('inf')
289                 ub = lub[:,1] ; ub[numpy.isnan(ub)] = +float('inf')
290                 if self._parameters["optdisp"]:
291                     print("%s: upper bounds %s"%(opt.get_algorithm_name(),ub))
292                     print("%s: lower bounds %s"%(opt.get_algorithm_name(),lb))
293                 opt.set_upper_bounds(ub)
294                 opt.set_lower_bounds(lb)
295             opt.set_ftol_rel(self._parameters["CostDecrementTolerance"])
296             opt.set_xtol_rel(2.*self._parameters["StateVariationTolerance"])
297             opt.set_maxeval(self._parameters["MaximumNumberOfFunctionEvaluations"])
298             Minimum = opt.optimize( Xini )
299             if self._parameters["optdisp"]:
300                 print("%s: optimal state: %s"%(opt.get_algorithm_name(),Minimum))
301                 print("%s: minimum of J: %s"%(opt.get_algorithm_name(),opt.last_optimum_value()))
302                 print("%s: return code: %i"%(opt.get_algorithm_name(),opt.last_optimize_result()))
303         elif self._parameters["Minimizer"] == "BOBYQA" and PlatformInfo.has_nlopt:
304             import nlopt
305             opt = nlopt.opt(nlopt.LN_BOBYQA, Xini.size)
306             def _f(_Xx, Grad):
307                 # DFO, so no gradient
308                 return CostFunction(_Xx, self._parameters["QualityCriterion"])
309             opt.set_min_objective(_f)
310             if self._parameters["Bounds"] is not None:
311                 lub = numpy.array(self._parameters["Bounds"],dtype=float).reshape((Xini.size,2))
312                 lb = lub[:,0] ; lb[numpy.isnan(lb)] = -float('inf')
313                 ub = lub[:,1] ; ub[numpy.isnan(ub)] = +float('inf')
314                 if self._parameters["optdisp"]:
315                     print("%s: upper bounds %s"%(opt.get_algorithm_name(),ub))
316                     print("%s: lower bounds %s"%(opt.get_algorithm_name(),lb))
317                 opt.set_upper_bounds(ub)
318                 opt.set_lower_bounds(lb)
319             opt.set_ftol_rel(self._parameters["CostDecrementTolerance"])
320             opt.set_xtol_rel(2.*self._parameters["StateVariationTolerance"])
321             opt.set_maxeval(self._parameters["MaximumNumberOfFunctionEvaluations"])
322             Minimum = opt.optimize( Xini )
323             if self._parameters["optdisp"]:
324                 print("%s: optimal state: %s"%(opt.get_algorithm_name(),Minimum))
325                 print("%s: minimum of J: %s"%(opt.get_algorithm_name(),opt.last_optimum_value()))
326                 print("%s: return code: %i"%(opt.get_algorithm_name(),opt.last_optimize_result()))
327         elif self._parameters["Minimizer"] == "NEWUOA" and PlatformInfo.has_nlopt:
328             import nlopt
329             opt = nlopt.opt(nlopt.LN_NEWUOA, Xini.size)
330             def _f(_Xx, Grad):
331                 # DFO, so no gradient
332                 return CostFunction(_Xx, self._parameters["QualityCriterion"])
333             opt.set_min_objective(_f)
334             if self._parameters["Bounds"] is not None:
335                 lub = numpy.array(self._parameters["Bounds"],dtype=float).reshape((Xini.size,2))
336                 lb = lub[:,0] ; lb[numpy.isnan(lb)] = -float('inf')
337                 ub = lub[:,1] ; ub[numpy.isnan(ub)] = +float('inf')
338                 if self._parameters["optdisp"]:
339                     print("%s: upper bounds %s"%(opt.get_algorithm_name(),ub))
340                     print("%s: lower bounds %s"%(opt.get_algorithm_name(),lb))
341                 opt.set_upper_bounds(ub)
342                 opt.set_lower_bounds(lb)
343             opt.set_ftol_rel(self._parameters["CostDecrementTolerance"])
344             opt.set_xtol_rel(2.*self._parameters["StateVariationTolerance"])
345             opt.set_maxeval(self._parameters["MaximumNumberOfFunctionEvaluations"])
346             Minimum = opt.optimize( Xini )
347             if self._parameters["optdisp"]:
348                 print("%s: optimal state: %s"%(opt.get_algorithm_name(),Minimum))
349                 print("%s: minimum of J: %s"%(opt.get_algorithm_name(),opt.last_optimum_value()))
350                 print("%s: return code: %i"%(opt.get_algorithm_name(),opt.last_optimize_result()))
351         elif self._parameters["Minimizer"] == "SUBPLEX" and PlatformInfo.has_nlopt:
352             import nlopt
353             opt = nlopt.opt(nlopt.LN_SBPLX, Xini.size)
354             def _f(_Xx, Grad):
355                 # DFO, so no gradient
356                 return CostFunction(_Xx, self._parameters["QualityCriterion"])
357             opt.set_min_objective(_f)
358             if self._parameters["Bounds"] is not None:
359                 lub = numpy.array(self._parameters["Bounds"],dtype=float).reshape((Xini.size,2))
360                 lb = lub[:,0] ; lb[numpy.isnan(lb)] = -float('inf')
361                 ub = lub[:,1] ; ub[numpy.isnan(ub)] = +float('inf')
362                 if self._parameters["optdisp"]:
363                     print("%s: upper bounds %s"%(opt.get_algorithm_name(),ub))
364                     print("%s: lower bounds %s"%(opt.get_algorithm_name(),lb))
365                 opt.set_upper_bounds(ub)
366                 opt.set_lower_bounds(lb)
367             opt.set_ftol_rel(self._parameters["CostDecrementTolerance"])
368             opt.set_xtol_rel(2.*self._parameters["StateVariationTolerance"])
369             opt.set_maxeval(self._parameters["MaximumNumberOfFunctionEvaluations"])
370             Minimum = opt.optimize( Xini )
371             if self._parameters["optdisp"]:
372                 print("%s: optimal state: %s"%(opt.get_algorithm_name(),Minimum))
373                 print("%s: minimum of J: %s"%(opt.get_algorithm_name(),opt.last_optimum_value()))
374                 print("%s: return code: %i"%(opt.get_algorithm_name(),opt.last_optimize_result()))
375         else:
376             raise ValueError("Error in minimizer name: %s is unkown"%self._parameters["Minimizer"])
377         #
378         IndexMin = numpy.argmin( self.StoredVariables["CostFunctionJ"][nbPreviousSteps:] ) + nbPreviousSteps
379         MinJ     = self.StoredVariables["CostFunctionJ"][IndexMin]
380         Minimum  = self.StoredVariables["CurrentState"][IndexMin]
381         #
382         # Obtention de l'analyse
383         # ----------------------
384         Xa = Minimum
385         #
386         self.StoredVariables["Analysis"].store( Xa )
387         #
388         # Calculs et/ou stockages supplémentaires
389         # ---------------------------------------
390         if self._toStore("OMA") or \
391             self._toStore("SimulatedObservationAtOptimum"):
392             if self._toStore("SimulatedObservationAtCurrentState"):
393                 HXa = self.StoredVariables["SimulatedObservationAtCurrentState"][IndexMin]
394             elif self._toStore("SimulatedObservationAtCurrentOptimum"):
395                 HXa = self.StoredVariables["SimulatedObservationAtCurrentOptimum"][-1]
396             else:
397                 HXa = Hm(Xa)
398             HXa = HXa.reshape((-1,1))
399         if self._toStore("Innovation") or \
400             self._toStore("OMB") or \
401             self._toStore("SimulatedObservationAtBackground"):
402             HXb = Hm(Xb).reshape((-1,1))
403             Innovation = Y - HXb
404         if self._toStore("Innovation"):
405             self.StoredVariables["Innovation"].store( Innovation )
406         if self._toStore("OMB"):
407             self.StoredVariables["OMB"].store( Innovation )
408         if self._toStore("BMA"):
409             self.StoredVariables["BMA"].store( numpy.ravel(Xb) - numpy.ravel(Xa) )
410         if self._toStore("OMA"):
411             self.StoredVariables["OMA"].store( Y - HXa )
412         if self._toStore("SimulatedObservationAtBackground"):
413             self.StoredVariables["SimulatedObservationAtBackground"].store( HXb )
414         if self._toStore("SimulatedObservationAtOptimum"):
415             self.StoredVariables["SimulatedObservationAtOptimum"].store( HXa )
416         #
417         self._post_run(HO)
418         return 0
419
420 # ==============================================================================
421 if __name__ == "__main__":
422     print('\n AUTODIAGNOSTIC\n')