Salome HOME
Documentation and code update for PSO
[modules/adao.git] / src / daComposant / daAlgorithms / Atoms / ecwnpso.py
1 # -*- coding: utf-8 -*-
2 #
3 # Copyright (C) 2008-2023 EDF R&D
4 #
5 # This library is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation; either
8 # version 2.1 of the License.
9 #
10 # This library is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 # Lesser General Public License for more details.
14 #
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this library; if not, write to the Free Software
17 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
18 #
19 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
20 #
21 # Author: Jean-Philippe Argaud, jean-philippe.argaud@edf.fr, EDF R&D
22
23 __doc__ = """
24     Standard Particle Swarm Optimization
25 """
26 __author__ = "Jean-Philippe ARGAUD"
27
28 import numpy, logging, copy
29 from daCore.NumericObjects import ApplyBounds, ForceNumericBounds
30 from numpy.random import uniform as rand
31
32 # ==============================================================================
33 def ecwnpso(selfA, Xb, Y, HO, R, B):
34     #
35     if ("BoxBounds" in selfA._parameters) and isinstance(selfA._parameters["BoxBounds"], (list, tuple)) and (len(selfA._parameters["BoxBounds"]) > 0):
36         BoxBounds = selfA._parameters["BoxBounds"]
37         logging.debug("%s Prise en compte des bornes d'incréments de paramètres effectuée"%(selfA._name,))
38     else:
39         raise ValueError("Particle Swarm Optimization requires bounds on all variables increments to be truly given (BoxBounds).")
40     BoxBounds   = numpy.array(BoxBounds)
41     if numpy.isnan(BoxBounds).any():
42         raise ValueError("Particle Swarm Optimization requires bounds on all variables increments to be truly given (BoxBounds), \"None\" is not allowed. The actual increments bounds are:\n%s"%BoxBounds)
43     #
44     selfA._parameters["Bounds"] = ForceNumericBounds( selfA._parameters["Bounds"] )
45     #
46     Hm = HO["Direct"].appliedTo
47     #
48     BI = B.getI()
49     RI = R.getI()
50     #
51     Xini = selfA._parameters["InitializationPoint"]
52     #
53     def CostFunction(x, QualityMeasure="AugmentedWeightedLeastSquares"):
54         _X  = numpy.asarray( x ).reshape((-1,1))
55         _HX = numpy.asarray( Hm( _X ) ).reshape((-1,1))
56         _Innovation = Y - _HX
57         #
58         if QualityMeasure in ["AugmentedWeightedLeastSquares","AWLS","DA"]:
59             if BI is None or RI is None:
60                 raise ValueError("Background and Observation error covariance matrices has to be properly defined!")
61             Jb  = 0.5 * (_X - Xb).T @ (BI @ (_X - Xb))
62             Jo  = 0.5 * _Innovation.T @ (RI @ _Innovation)
63         elif QualityMeasure in ["WeightedLeastSquares","WLS"]:
64             if RI is None:
65                 raise ValueError("Observation error covariance matrix has to be properly defined!")
66             Jb  = 0.
67             Jo  = 0.5 * _Innovation.T @ (RI @ _Innovation)
68         elif QualityMeasure in ["LeastSquares","LS","L2"]:
69             Jb  = 0.
70             Jo  = 0.5 * _Innovation.T @ _Innovation
71         elif QualityMeasure in ["AbsoluteValue","L1"]:
72             Jb  = 0.
73             Jo  = numpy.sum( numpy.abs(_Innovation) )
74         elif QualityMeasure in ["MaximumError","ME", "Linf"]:
75             Jb  = 0.
76             Jo  = numpy.max( numpy.abs(_Innovation) )
77         #
78         J   = float( Jb ) + float( Jo )
79         #
80         return J, float( Jb ), float( Jo )
81     #
82     def KeepRunningCondition(__step, __nbfct):
83         if __step >= selfA._parameters["MaximumNumberOfIterations"]:
84             logging.debug("%s Stopping search because the number %i of evolving iterations is exceeding the maximum %i."%(selfA._name, __step, selfA._parameters["MaximumNumberOfIterations"]))
85             return False
86         elif __nbfct >= selfA._parameters["MaximumNumberOfFunctionEvaluations"]:
87             logging.debug("%s Stopping search because the number %i of function evaluations is exceeding the maximum %i."%(selfA._name, __nbfct, selfA._parameters["MaximumNumberOfFunctionEvaluations"]))
88             return False
89         else:
90             return True
91     #
92     # Paramètres internes
93     # -------------------
94     __nbI = selfA._parameters["NumberOfInsects"]
95     __nbP = len(Xini) # Dimension ou nombre de paramètres
96     #
97     __iw = float( selfA._parameters["InertiaWeight"] )
98     __sa = float( selfA._parameters["SocialAcceleration"] )
99     __ca = float( selfA._parameters["CognitiveAcceleration"] )
100     __vc = float( selfA._parameters["VelocityClampingFactor"] )
101     logging.debug("%s Cognitive acceleration (recall to the best previously known value of the insect) = %s"%(selfA._name, str(__ca)))
102     logging.debug("%s Social acceleration (recall to the best insect value of the group) = %s"%(selfA._name, str(__sa)))
103     logging.debug("%s Velocity clamping factor = %s"%(selfA._name, str(__vc)))
104     #
105     # Initialisation de l'essaim
106     # --------------------------
107     LimitStep  = Xini.reshape((-1,1)) + BoxBounds
108     __dx = __vc * numpy.abs(BoxBounds[:,1]-BoxBounds[:,0])
109     LimitSpeed = numpy.vstack((-__dx,__dx)).T
110     #
111     NumberOfFunctionEvaluations = 1
112     JXini, JbXini, JoXini = CostFunction(Xini,selfA._parameters["QualityCriterion"])
113     #
114     Swarm  = numpy.zeros((__nbI,3,__nbP)) # 3 car (x,v,xbest)
115     for __p in range(__nbP) :
116         Swarm[:,0,__p] = rand( low=LimitStep[__p,0],  high=LimitStep[__p,1],  size=__nbI) # Position
117         Swarm[:,1,__p] = rand( low=LimitSpeed[__p,0], high=LimitSpeed[__p,1], size=__nbI) # Velocity
118     logging.debug("%s Initialisation of the swarm with %i insects of size %i "%(selfA._name,Swarm.shape[0],Swarm.shape[2]))
119     #
120     qSwarm = JXini * numpy.ones((__nbI,3)) # Qualité (J, Jb, Jo) par insecte
121     for __i in range(__nbI):
122         NumberOfFunctionEvaluations += 1
123         JTest, JbTest, JoTest = CostFunction(Swarm[__i,0,:],selfA._parameters["QualityCriterion"])
124         if JTest < JXini:
125             Swarm[__i,2,:] = Swarm[__i,0,:] # xBest
126             qSwarm[__i,:]  = (JTest, JbTest, JoTest)
127         else:
128             Swarm[__i,2,:] = Xini # xBest
129             qSwarm[__i,:]  = (JXini, JbXini, JoXini)
130     logging.debug("%s Initialisation of the best previous insects"%selfA._name)
131     #
132     iBest = numpy.argmin(qSwarm[:,0])
133     xBest = Swarm[iBest,2,:]
134     if selfA._parameters["StoreInternalVariables"] or selfA._toStore("CurrentState"):
135         selfA.StoredVariables["CurrentState"].store( xBest )
136     selfA.StoredVariables["CostFunctionJ" ].store( qSwarm[iBest,0]  )
137     selfA.StoredVariables["CostFunctionJb"].store( qSwarm[iBest,1] )
138     selfA.StoredVariables["CostFunctionJo"].store( qSwarm[iBest,2] )
139     #
140     selfA.StoredVariables["CurrentIterationNumber"].store( len(selfA.StoredVariables["CostFunctionJ"]) )
141     #
142     # Minimisation de la fonctionnelle
143     # --------------------------------
144     step = 0
145     while KeepRunningCondition(step, NumberOfFunctionEvaluations):
146         step += 1
147         for __i in range(__nbI):
148             rct = rand(size=__nbP)
149             rst = rand(size=__nbP)
150             # Vitesse
151             __velins = __iw * Swarm[__i,1,:] \
152                      + __ca * rct * (Swarm[__i,2,:]   - Swarm[__i,0,:]) \
153                      + __sa * rst * (Swarm[iBest,2,:] - Swarm[__i,0,:])
154             Swarm[__i,1,:] = ApplyBounds( __velins, LimitSpeed )
155             # Position
156             __velins  = Swarm[__i,0,:] + Swarm[__i,1,:]
157             Swarm[__i,0,:] = ApplyBounds( __velins, selfA._parameters["Bounds"] )
158             #
159             NumberOfFunctionEvaluations += 1
160             JTest, JbTest, JoTest = CostFunction(Swarm[__i,0,:],selfA._parameters["QualityCriterion"])
161             if JTest < qSwarm[__i,0]:
162                 Swarm[__i,2,:] = Swarm[__i,0,:] # xBest
163                 qSwarm[__i,:]  = (JTest, JbTest, JoTest)
164         #
165         iBest = numpy.argmin(qSwarm[:,0])
166         xBest = Swarm[iBest,2,:]
167         selfA.StoredVariables["CurrentIterationNumber"].store( len(selfA.StoredVariables["CostFunctionJ"]) )
168         if selfA._parameters["StoreInternalVariables"] or selfA._toStore("CurrentState"):
169             selfA.StoredVariables["CurrentState"].store( xBest )
170         if selfA._toStore("SimulatedObservationAtCurrentState"):
171             selfA.StoredVariables["SimulatedObservationAtCurrentState"].store( Hm( xBest ) )
172         selfA.StoredVariables["CostFunctionJ" ].store( qSwarm[iBest,0]  )
173         selfA.StoredVariables["CostFunctionJb"].store( qSwarm[iBest,1] )
174         selfA.StoredVariables["CostFunctionJo"].store( qSwarm[iBest,2] )
175         logging.debug("%s Step %i: insect %i is the better one with J =%.7f"%(selfA._name,step,iBest,qSwarm[iBest,0]))
176     #
177     # Obtention de l'analyse
178     # ----------------------
179     Xa = xBest
180     #
181     selfA.StoredVariables["Analysis"].store( Xa )
182     #
183     # Calculs et/ou stockages supplémentaires
184     # ---------------------------------------
185     if selfA._toStore("OMA") or \
186         selfA._toStore("SimulatedObservationAtOptimum"):
187         HXa = Hm(Xa)
188     if selfA._toStore("Innovation") or \
189         selfA._toStore("OMB") or \
190         selfA._toStore("SimulatedObservationAtBackground"):
191         HXb = Hm(Xb)
192         Innovation = Y - HXb
193     if selfA._toStore("Innovation"):
194         selfA.StoredVariables["Innovation"].store( Innovation )
195     if selfA._toStore("OMB"):
196         selfA.StoredVariables["OMB"].store( Innovation )
197     if selfA._toStore("BMA"):
198         selfA.StoredVariables["BMA"].store( numpy.ravel(Xb) - numpy.ravel(Xa) )
199     if selfA._toStore("OMA"):
200         selfA.StoredVariables["OMA"].store( numpy.ravel(Y) - numpy.ravel(HXa) )
201     if selfA._toStore("SimulatedObservationAtBackground"):
202         selfA.StoredVariables["SimulatedObservationAtBackground"].store( HXb )
203     if selfA._toStore("SimulatedObservationAtOptimum"):
204         selfA.StoredVariables["SimulatedObservationAtOptimum"].store( HXa )
205     #
206     selfA._post_run(HO)
207     return 0
208
209 # ==============================================================================
210 if __name__ == "__main__":
211     print('\n AUTODIAGNOSTIC\n')