Salome HOME
Updating copyright date information
[modules/adao.git] / src / daComposant / daAlgorithms / DerivativeFreeOptimization.py
1 # -*- coding: utf-8 -*-
2 #
3 # Copyright (C) 2008-2022 EDF R&D
4 #
5 # This library is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation; either
8 # version 2.1 of the License.
9 #
10 # This library is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 # Lesser General Public License for more details.
14 #
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this library; if not, write to the Free Software
17 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
18 #
19 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
20 #
21 # Author: Jean-Philippe Argaud, jean-philippe.argaud@edf.fr, EDF R&D
22
23 import logging
24 from daCore import BasicObjects, PlatformInfo
25 import numpy, scipy.optimize
26
27 # ==============================================================================
28 class ElementaryAlgorithm(BasicObjects.Algorithm):
29     def __init__(self):
30         BasicObjects.Algorithm.__init__(self, "DERIVATIVEFREEOPTIMIZATION")
31         self.defineRequiredParameter(
32             name     = "Minimizer",
33             default  = "BOBYQA",
34             typecast = str,
35             message  = "Minimiseur utilisé",
36             listval  = [
37                 "BOBYQA",
38                 "COBYLA",
39                 "NEWUOA",
40                 "POWELL",
41                 "SIMPLEX",
42                 "SUBPLEX",
43                 ],
44             )
45         self.defineRequiredParameter(
46             name     = "MaximumNumberOfSteps",
47             default  = 15000,
48             typecast = int,
49             message  = "Nombre maximal de pas d'optimisation",
50             minval   = -1,
51             )
52         self.defineRequiredParameter(
53             name     = "MaximumNumberOfFunctionEvaluations",
54             default  = 15000,
55             typecast = int,
56             message  = "Nombre maximal d'évaluations de la fonction",
57             minval   = -1,
58             )
59         self.defineRequiredParameter(
60             name     = "StateVariationTolerance",
61             default  = 1.e-4,
62             typecast = float,
63             message  = "Variation relative maximale de l'état lors de l'arrêt",
64             )
65         self.defineRequiredParameter(
66             name     = "CostDecrementTolerance",
67             default  = 1.e-7,
68             typecast = float,
69             message  = "Diminution relative minimale du cout lors de l'arrêt",
70             )
71         self.defineRequiredParameter(
72             name     = "QualityCriterion",
73             default  = "AugmentedWeightedLeastSquares",
74             typecast = str,
75             message  = "Critère de qualité utilisé",
76             listval  = [
77                 "AugmentedWeightedLeastSquares","AWLS","DA",
78                 "WeightedLeastSquares","WLS",
79                 "LeastSquares","LS","L2",
80                 "AbsoluteValue","L1",
81                 "MaximumError","ME",
82                 ],
83             )
84         self.defineRequiredParameter(
85             name     = "StoreInternalVariables",
86             default  = False,
87             typecast = bool,
88             message  = "Stockage des variables internes ou intermédiaires du calcul",
89             )
90         self.defineRequiredParameter(
91             name     = "StoreSupplementaryCalculations",
92             default  = [],
93             typecast = tuple,
94             message  = "Liste de calculs supplémentaires à stocker et/ou effectuer",
95             listval  = [
96                 "Analysis",
97                 "BMA",
98                 "CostFunctionJ",
99                 "CostFunctionJb",
100                 "CostFunctionJo",
101                 "CostFunctionJAtCurrentOptimum",
102                 "CostFunctionJbAtCurrentOptimum",
103                 "CostFunctionJoAtCurrentOptimum",
104                 "CurrentIterationNumber",
105                 "CurrentOptimum",
106                 "CurrentState",
107                 "IndexOfOptimum",
108                 "Innovation",
109                 "InnovationAtCurrentState",
110                 "OMA",
111                 "OMB",
112                 "SimulatedObservationAtBackground",
113                 "SimulatedObservationAtCurrentOptimum",
114                 "SimulatedObservationAtCurrentState",
115                 "SimulatedObservationAtOptimum",
116                 ]
117             )
118         self.defineRequiredParameter( # Pas de type
119             name     = "Bounds",
120             message  = "Liste des valeurs de bornes",
121             )
122         self.requireInputArguments(
123             mandatory= ("Xb", "Y", "HO", "R", "B" ),
124             )
125         self.setAttributes(tags=(
126             "Optimization",
127             "NonLinear",
128             "MetaHeuristic",
129             ))
130
131     def run(self, Xb=None, Y=None, U=None, HO=None, EM=None, CM=None, R=None, B=None, Q=None, Parameters=None):
132         self._pre_run(Parameters, Xb, Y, U, HO, EM, CM, R, B, Q)
133         #
134         if not PlatformInfo.has_nlopt and not self._parameters["Minimizer"] in ["COBYLA", "POWELL", "SIMPLEX"]:
135             logging.warning("%s Minimization by SIMPLEX is forced because %s is unavailable (COBYLA, POWELL are also available)"%(self._name,self._parameters["Minimizer"]))
136             self._parameters["Minimizer"] = "SIMPLEX"
137         #
138         # Opérateurs
139         # ----------
140         Hm = HO["Direct"].appliedTo
141         #
142         # Précalcul des inversions de B et R
143         # ----------------------------------
144         BI = B.getI()
145         RI = R.getI()
146         #
147         # Définition de la fonction-coût
148         # ------------------------------
149         def CostFunction(x, QualityMeasure="AugmentedWeightedLeastSquares"):
150             _X  = numpy.asmatrix(numpy.ravel( x )).T
151             self.StoredVariables["CurrentState"].store( _X )
152             _HX = Hm( _X )
153             _HX = numpy.asmatrix(numpy.ravel( _HX )).T
154             _Innovation = Y - _HX
155             if self._toStore("SimulatedObservationAtCurrentState") or \
156                 self._toStore("SimulatedObservationAtCurrentOptimum"):
157                 self.StoredVariables["SimulatedObservationAtCurrentState"].store( _HX )
158             if self._toStore("InnovationAtCurrentState"):
159                 self.StoredVariables["InnovationAtCurrentState"].store( _Innovation )
160             #
161             if QualityMeasure in ["AugmentedWeightedLeastSquares","AWLS","DA"]:
162                 if BI is None or RI is None:
163                     raise ValueError("Background and Observation error covariance matrix has to be properly defined!")
164                 Jb  = 0.5 * (_X - Xb).T * (BI * (_X - Xb))
165                 Jo  = 0.5 * _Innovation.T * (RI * _Innovation)
166             elif QualityMeasure in ["WeightedLeastSquares","WLS"]:
167                 if RI is None:
168                     raise ValueError("Observation error covariance matrix has to be properly defined!")
169                 Jb  = 0.
170                 Jo  = 0.5 * (_Innovation).T * RI * (_Innovation)
171             elif QualityMeasure in ["LeastSquares","LS","L2"]:
172                 Jb  = 0.
173                 Jo  = 0.5 * (_Innovation).T * (_Innovation)
174             elif QualityMeasure in ["AbsoluteValue","L1"]:
175                 Jb  = 0.
176                 Jo  = numpy.sum( numpy.abs(_Innovation) )
177             elif QualityMeasure in ["MaximumError","ME"]:
178                 Jb  = 0.
179                 Jo  = numpy.max( numpy.abs(_Innovation) )
180             #
181             J   = float( Jb ) + float( Jo )
182             #
183             self.StoredVariables["CurrentIterationNumber"].store( len(self.StoredVariables["CostFunctionJ"]) )
184             self.StoredVariables["CostFunctionJb"].store( Jb )
185             self.StoredVariables["CostFunctionJo"].store( Jo )
186             self.StoredVariables["CostFunctionJ" ].store( J )
187             if self._toStore("IndexOfOptimum") or \
188                 self._toStore("CurrentOptimum") or \
189                 self._toStore("CostFunctionJAtCurrentOptimum") or \
190                 self._toStore("CostFunctionJbAtCurrentOptimum") or \
191                 self._toStore("CostFunctionJoAtCurrentOptimum") or \
192                 self._toStore("SimulatedObservationAtCurrentOptimum"):
193                 IndexMin = numpy.argmin( self.StoredVariables["CostFunctionJ"][nbPreviousSteps:] ) + nbPreviousSteps
194             if self._toStore("IndexOfOptimum"):
195                 self.StoredVariables["IndexOfOptimum"].store( IndexMin )
196             if self._toStore("CurrentOptimum"):
197                 self.StoredVariables["CurrentOptimum"].store( self.StoredVariables["CurrentState"][IndexMin] )
198             if self._toStore("SimulatedObservationAtCurrentOptimum"):
199                 self.StoredVariables["SimulatedObservationAtCurrentOptimum"].store( self.StoredVariables["SimulatedObservationAtCurrentState"][IndexMin] )
200             if self._toStore("CostFunctionJAtCurrentOptimum"):
201                 self.StoredVariables["CostFunctionJAtCurrentOptimum" ].store( self.StoredVariables["CostFunctionJ" ][IndexMin] )
202             if self._toStore("CostFunctionJbAtCurrentOptimum"):
203                 self.StoredVariables["CostFunctionJbAtCurrentOptimum"].store( self.StoredVariables["CostFunctionJb"][IndexMin] )
204             if self._toStore("CostFunctionJoAtCurrentOptimum"):
205                 self.StoredVariables["CostFunctionJoAtCurrentOptimum"].store( self.StoredVariables["CostFunctionJo"][IndexMin] )
206             return J
207         #
208         # Point de démarrage de l'optimisation : Xini = Xb
209         # ------------------------------------
210         Xini = numpy.ravel(Xb)
211         if len(Xini) < 2 and self._parameters["Minimizer"] == "NEWUOA":
212             raise ValueError("The minimizer %s can not be used when the optimisation state dimension is 1. Please choose another minimizer."%self._parameters["Minimizer"])
213         #
214         # Minimisation de la fonctionnelle
215         # --------------------------------
216         nbPreviousSteps = self.StoredVariables["CostFunctionJ"].stepnumber()
217         #
218         if self._parameters["Minimizer"] == "POWELL":
219             Minimum, J_optimal, direc, niter, nfeval, rc = scipy.optimize.fmin_powell(
220                 func        = CostFunction,
221                 x0          = Xini,
222                 args        = (self._parameters["QualityCriterion"],),
223                 maxiter     = self._parameters["MaximumNumberOfSteps"]-1,
224                 maxfun      = self._parameters["MaximumNumberOfFunctionEvaluations"],
225                 xtol        = self._parameters["StateVariationTolerance"],
226                 ftol        = self._parameters["CostDecrementTolerance"],
227                 full_output = True,
228                 disp        = self._parameters["optdisp"],
229                 )
230         elif self._parameters["Minimizer"] == "COBYLA" and not PlatformInfo.has_nlopt:
231             def make_constraints(bounds):
232                 constraints = []
233                 for (i,(a,b)) in enumerate(bounds):
234                     lower = lambda x: x[i] - a
235                     upper = lambda x: b - x[i]
236                     constraints = constraints + [lower] + [upper]
237                 return constraints
238             if self._parameters["Bounds"] is None:
239                 raise ValueError("Bounds have to be given for all axes as a list of lower/upper pairs!")
240             Minimum = scipy.optimize.fmin_cobyla(
241                 func        = CostFunction,
242                 x0          = Xini,
243                 cons        = make_constraints( self._parameters["Bounds"] ),
244                 args        = (self._parameters["QualityCriterion"],),
245                 consargs    = (), # To avoid extra-args
246                 maxfun      = self._parameters["MaximumNumberOfFunctionEvaluations"],
247                 rhobeg      = 1.0,
248                 rhoend      = self._parameters["StateVariationTolerance"],
249                 catol       = 2.*self._parameters["StateVariationTolerance"],
250                 disp        = self._parameters["optdisp"],
251                 )
252         elif self._parameters["Minimizer"] == "COBYLA" and PlatformInfo.has_nlopt:
253             import nlopt
254             opt = nlopt.opt(nlopt.LN_COBYLA, Xini.size)
255             def _f(_Xx, Grad):
256                 # DFO, so no gradient
257                 return CostFunction(_Xx, self._parameters["QualityCriterion"])
258             opt.set_min_objective(_f)
259             if self._parameters["Bounds"] is not None:
260                 lub = numpy.array(self._parameters["Bounds"],dtype=float).reshape((Xini.size,2))
261                 lb = lub[:,0] ; lb[numpy.isnan(lb)] = -float('inf')
262                 ub = lub[:,1] ; ub[numpy.isnan(ub)] = +float('inf')
263                 if self._parameters["optdisp"]:
264                     print("%s: upper bounds %s"%(opt.get_algorithm_name(),ub))
265                     print("%s: lower bounds %s"%(opt.get_algorithm_name(),lb))
266                 opt.set_upper_bounds(ub)
267                 opt.set_lower_bounds(lb)
268             opt.set_ftol_rel(self._parameters["CostDecrementTolerance"])
269             opt.set_xtol_rel(2.*self._parameters["StateVariationTolerance"])
270             opt.set_maxeval(self._parameters["MaximumNumberOfFunctionEvaluations"])
271             Minimum = opt.optimize( Xini )
272             if self._parameters["optdisp"]:
273                 print("%s: optimal state: %s"%(opt.get_algorithm_name(),Minimum))
274                 print("%s: minimum of J: %s"%(opt.get_algorithm_name(),opt.last_optimum_value()))
275                 print("%s: return code: %i"%(opt.get_algorithm_name(),opt.last_optimize_result()))
276         elif self._parameters["Minimizer"] == "SIMPLEX" and not PlatformInfo.has_nlopt:
277             Minimum, J_optimal, niter, nfeval, rc = scipy.optimize.fmin(
278                 func        = CostFunction,
279                 x0          = Xini,
280                 args        = (self._parameters["QualityCriterion"],),
281                 maxiter     = self._parameters["MaximumNumberOfSteps"]-1,
282                 maxfun      = self._parameters["MaximumNumberOfFunctionEvaluations"],
283                 xtol        = self._parameters["StateVariationTolerance"],
284                 ftol        = self._parameters["CostDecrementTolerance"],
285                 full_output = True,
286                 disp        = self._parameters["optdisp"],
287                 )
288         elif self._parameters["Minimizer"] == "SIMPLEX" and PlatformInfo.has_nlopt:
289             import nlopt
290             opt = nlopt.opt(nlopt.LN_NELDERMEAD, Xini.size)
291             def _f(_Xx, Grad):
292                 # DFO, so no gradient
293                 return CostFunction(_Xx, self._parameters["QualityCriterion"])
294             opt.set_min_objective(_f)
295             if self._parameters["Bounds"] is not None:
296                 lub = numpy.array(self._parameters["Bounds"],dtype=float).reshape((Xini.size,2))
297                 lb = lub[:,0] ; lb[numpy.isnan(lb)] = -float('inf')
298                 ub = lub[:,1] ; ub[numpy.isnan(ub)] = +float('inf')
299                 if self._parameters["optdisp"]:
300                     print("%s: upper bounds %s"%(opt.get_algorithm_name(),ub))
301                     print("%s: lower bounds %s"%(opt.get_algorithm_name(),lb))
302                 opt.set_upper_bounds(ub)
303                 opt.set_lower_bounds(lb)
304             opt.set_ftol_rel(self._parameters["CostDecrementTolerance"])
305             opt.set_xtol_rel(2.*self._parameters["StateVariationTolerance"])
306             opt.set_maxeval(self._parameters["MaximumNumberOfFunctionEvaluations"])
307             Minimum = opt.optimize( Xini )
308             if self._parameters["optdisp"]:
309                 print("%s: optimal state: %s"%(opt.get_algorithm_name(),Minimum))
310                 print("%s: minimum of J: %s"%(opt.get_algorithm_name(),opt.last_optimum_value()))
311                 print("%s: return code: %i"%(opt.get_algorithm_name(),opt.last_optimize_result()))
312         elif self._parameters["Minimizer"] == "BOBYQA" and PlatformInfo.has_nlopt:
313             import nlopt
314             opt = nlopt.opt(nlopt.LN_BOBYQA, Xini.size)
315             def _f(_Xx, Grad):
316                 # DFO, so no gradient
317                 return CostFunction(_Xx, self._parameters["QualityCriterion"])
318             opt.set_min_objective(_f)
319             if self._parameters["Bounds"] is not None:
320                 lub = numpy.array(self._parameters["Bounds"],dtype=float).reshape((Xini.size,2))
321                 lb = lub[:,0] ; lb[numpy.isnan(lb)] = -float('inf')
322                 ub = lub[:,1] ; ub[numpy.isnan(ub)] = +float('inf')
323                 if self._parameters["optdisp"]:
324                     print("%s: upper bounds %s"%(opt.get_algorithm_name(),ub))
325                     print("%s: lower bounds %s"%(opt.get_algorithm_name(),lb))
326                 opt.set_upper_bounds(ub)
327                 opt.set_lower_bounds(lb)
328             opt.set_ftol_rel(self._parameters["CostDecrementTolerance"])
329             opt.set_xtol_rel(2.*self._parameters["StateVariationTolerance"])
330             opt.set_maxeval(self._parameters["MaximumNumberOfFunctionEvaluations"])
331             Minimum = opt.optimize( Xini )
332             if self._parameters["optdisp"]:
333                 print("%s: optimal state: %s"%(opt.get_algorithm_name(),Minimum))
334                 print("%s: minimum of J: %s"%(opt.get_algorithm_name(),opt.last_optimum_value()))
335                 print("%s: return code: %i"%(opt.get_algorithm_name(),opt.last_optimize_result()))
336         elif self._parameters["Minimizer"] == "NEWUOA" and PlatformInfo.has_nlopt:
337             import nlopt
338             opt = nlopt.opt(nlopt.LN_NEWUOA, Xini.size)
339             def _f(_Xx, Grad):
340                 # DFO, so no gradient
341                 return CostFunction(_Xx, self._parameters["QualityCriterion"])
342             opt.set_min_objective(_f)
343             if self._parameters["Bounds"] is not None:
344                 lub = numpy.array(self._parameters["Bounds"],dtype=float).reshape((Xini.size,2))
345                 lb = lub[:,0] ; lb[numpy.isnan(lb)] = -float('inf')
346                 ub = lub[:,1] ; ub[numpy.isnan(ub)] = +float('inf')
347                 if self._parameters["optdisp"]:
348                     print("%s: upper bounds %s"%(opt.get_algorithm_name(),ub))
349                     print("%s: lower bounds %s"%(opt.get_algorithm_name(),lb))
350                 opt.set_upper_bounds(ub)
351                 opt.set_lower_bounds(lb)
352             opt.set_ftol_rel(self._parameters["CostDecrementTolerance"])
353             opt.set_xtol_rel(2.*self._parameters["StateVariationTolerance"])
354             opt.set_maxeval(self._parameters["MaximumNumberOfFunctionEvaluations"])
355             Minimum = opt.optimize( Xini )
356             if self._parameters["optdisp"]:
357                 print("%s: optimal state: %s"%(opt.get_algorithm_name(),Minimum))
358                 print("%s: minimum of J: %s"%(opt.get_algorithm_name(),opt.last_optimum_value()))
359                 print("%s: return code: %i"%(opt.get_algorithm_name(),opt.last_optimize_result()))
360         elif self._parameters["Minimizer"] == "SUBPLEX" and PlatformInfo.has_nlopt:
361             import nlopt
362             opt = nlopt.opt(nlopt.LN_SBPLX, Xini.size)
363             def _f(_Xx, Grad):
364                 # DFO, so no gradient
365                 return CostFunction(_Xx, self._parameters["QualityCriterion"])
366             opt.set_min_objective(_f)
367             if self._parameters["Bounds"] is not None:
368                 lub = numpy.array(self._parameters["Bounds"],dtype=float).reshape((Xini.size,2))
369                 lb = lub[:,0] ; lb[numpy.isnan(lb)] = -float('inf')
370                 ub = lub[:,1] ; ub[numpy.isnan(ub)] = +float('inf')
371                 if self._parameters["optdisp"]:
372                     print("%s: upper bounds %s"%(opt.get_algorithm_name(),ub))
373                     print("%s: lower bounds %s"%(opt.get_algorithm_name(),lb))
374                 opt.set_upper_bounds(ub)
375                 opt.set_lower_bounds(lb)
376             opt.set_ftol_rel(self._parameters["CostDecrementTolerance"])
377             opt.set_xtol_rel(2.*self._parameters["StateVariationTolerance"])
378             opt.set_maxeval(self._parameters["MaximumNumberOfFunctionEvaluations"])
379             Minimum = opt.optimize( Xini )
380             if self._parameters["optdisp"]:
381                 print("%s: optimal state: %s"%(opt.get_algorithm_name(),Minimum))
382                 print("%s: minimum of J: %s"%(opt.get_algorithm_name(),opt.last_optimum_value()))
383                 print("%s: return code: %i"%(opt.get_algorithm_name(),opt.last_optimize_result()))
384         else:
385             raise ValueError("Error in Minimizer name: %s"%self._parameters["Minimizer"])
386         #
387         IndexMin = numpy.argmin( self.StoredVariables["CostFunctionJ"][nbPreviousSteps:] ) + nbPreviousSteps
388         MinJ     = self.StoredVariables["CostFunctionJ"][IndexMin]
389         Minimum  = self.StoredVariables["CurrentState"][IndexMin]
390         #
391         # Obtention de l'analyse
392         # ----------------------
393         Xa = numpy.ravel( Minimum )
394         #
395         self.StoredVariables["Analysis"].store( Xa )
396         #
397         # Calculs et/ou stockages supplémentaires
398         # ---------------------------------------
399         if self._toStore("OMA") or \
400             self._toStore("SimulatedObservationAtOptimum"):
401             if self._toStore("SimulatedObservationAtCurrentState"):
402                 HXa = self.StoredVariables["SimulatedObservationAtCurrentState"][IndexMin]
403             elif self._toStore("SimulatedObservationAtCurrentOptimum"):
404                 HXa = self.StoredVariables["SimulatedObservationAtCurrentOptimum"][-1]
405             else:
406                 HXa = Hm(Xa)
407         if self._toStore("Innovation") or \
408             self._toStore("OMB") or \
409             self._toStore("SimulatedObservationAtBackground"):
410             HXb = Hm(Xb)
411             Innovation  = Y - HXb
412         if self._toStore("Innovation"):
413             self.StoredVariables["Innovation"].store( Innovation )
414         if self._toStore("OMB"):
415             self.StoredVariables["OMB"].store( Innovation )
416         if self._toStore("BMA"):
417             self.StoredVariables["BMA"].store( numpy.ravel(Xb) - numpy.ravel(Xa) )
418         if self._toStore("OMA"):
419             self.StoredVariables["OMA"].store( numpy.ravel(Y) - numpy.ravel(HXa) )
420         if self._toStore("SimulatedObservationAtBackground"):
421             self.StoredVariables["SimulatedObservationAtBackground"].store( HXb )
422         if self._toStore("SimulatedObservationAtOptimum"):
423             self.StoredVariables["SimulatedObservationAtOptimum"].store( HXa )
424         #
425         self._post_run()
426         return 0
427
428 # ==============================================================================
429 if __name__ == "__main__":
430     print('\n AUTODIAGNOSTIC\n')