Salome HOME
Documentation and code update for PSO
[modules/adao.git] / src / daComposant / daAlgorithms / Atoms / ecwopso.py
1 # -*- coding: utf-8 -*-
2 #
3 # Copyright (C) 2008-2023 EDF R&D
4 #
5 # This library is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation; either
8 # version 2.1 of the License.
9 #
10 # This library is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 # Lesser General Public License for more details.
14 #
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this library; if not, write to the Free Software
17 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
18 #
19 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
20 #
21 # Author: Jean-Philippe Argaud, jean-philippe.argaud@edf.fr, EDF R&D
22
23 __doc__ = """
24     Standard Particle Swarm Optimization
25 """
26 __author__ = "Jean-Philippe ARGAUD"
27
28 import numpy, logging, copy
29 from numpy.random import uniform as rand
30
31 # ==============================================================================
32 def ecwopso(selfA, Xb, Y, HO, R, B):
33     #
34     if ("BoxBounds" in selfA._parameters) and isinstance(selfA._parameters["BoxBounds"], (list, tuple)) and (len(selfA._parameters["BoxBounds"]) > 0):
35         BoxBounds = selfA._parameters["BoxBounds"]
36         logging.debug("%s Prise en compte des bornes d'incréments de paramètres effectuée"%(selfA._name,))
37     else:
38         raise ValueError("Particle Swarm Optimization requires bounds on all variables increments to be truly given (BoxBounds).")
39     BoxBounds   = numpy.array(BoxBounds)
40     if numpy.isnan(BoxBounds).any():
41         raise ValueError("Particle Swarm Optimization requires bounds on all variables increments to be truly given (BoxBounds), \"None\" is not allowed. The actual increments bounds are:\n%s"%BoxBounds)
42     #
43     Hm = HO["Direct"].appliedTo
44     #
45     BI = B.getI()
46     RI = R.getI()
47     #
48     Xini = selfA._parameters["InitializationPoint"]
49     #
50     def CostFunction(x, QualityMeasure="AugmentedWeightedLeastSquares"):
51         _X  = numpy.asarray( x ).reshape((-1,1))
52         _HX = numpy.asarray( Hm( _X ) ).reshape((-1,1))
53         _Innovation = Y - _HX
54         #
55         if QualityMeasure in ["AugmentedWeightedLeastSquares","AWLS","DA"]:
56             if BI is None or RI is None:
57                 raise ValueError("Background and Observation error covariance matrices has to be properly defined!")
58             Jb  = 0.5 * (_X - Xb).T @ (BI @ (_X - Xb))
59             Jo  = 0.5 * _Innovation.T @ (RI @ _Innovation)
60         elif QualityMeasure in ["WeightedLeastSquares","WLS"]:
61             if RI is None:
62                 raise ValueError("Observation error covariance matrix has to be properly defined!")
63             Jb  = 0.
64             Jo  = 0.5 * _Innovation.T @ (RI @ _Innovation)
65         elif QualityMeasure in ["LeastSquares","LS","L2"]:
66             Jb  = 0.
67             Jo  = 0.5 * _Innovation.T @ _Innovation
68         elif QualityMeasure in ["AbsoluteValue","L1"]:
69             Jb  = 0.
70             Jo  = numpy.sum( numpy.abs(_Innovation) )
71         elif QualityMeasure in ["MaximumError","ME", "Linf"]:
72             Jb  = 0.
73             Jo  = numpy.max( numpy.abs(_Innovation) )
74         #
75         J   = float( Jb ) + float( Jo )
76         #
77         return J, float( Jb ), float( Jo )
78     #
79     def KeepRunningCondition(__step, __nbfct):
80         if __step >= selfA._parameters["MaximumNumberOfIterations"]:
81             logging.debug("%s Stopping search because the number %i of evolving iterations is exceeding the maximum %i."%(selfA._name, __step, selfA._parameters["MaximumNumberOfIterations"]))
82             return False
83         elif __nbfct >= selfA._parameters["MaximumNumberOfFunctionEvaluations"]:
84             logging.debug("%s Stopping search because the number %i of function evaluations is exceeding the maximum %i."%(selfA._name, __nbfct, selfA._parameters["MaximumNumberOfFunctionEvaluations"]))
85             return False
86         else:
87             return True
88     #
89     # Paramètres internes
90     # -------------------
91     __nbI = selfA._parameters["NumberOfInsects"]
92     __nbP = len(Xini) # Dimension ou nombre de paramètres
93     #
94     __iw = float( selfA._parameters["InertiaWeight"] )
95     __sa = float( selfA._parameters["SocialAcceleration"] )
96     __ca = float( selfA._parameters["CognitiveAcceleration"] )
97     __vc = float( selfA._parameters["VelocityClampingFactor"] )
98     logging.debug("%s Cognitive acceleration (recall to the best previously known value of the insect) = %s"%(selfA._name, str(__ca)))
99     logging.debug("%s Social acceleration (recall to the best insect value of the group) = %s"%(selfA._name, str(__sa)))
100     logging.debug("%s Velocity clamping factor = %s"%(selfA._name, str(__vc)))
101     #
102     # Initialisation de l'essaim
103     # --------------------------
104     LimitStep  = Xini.reshape((-1,1)) + BoxBounds
105     __dx = __vc * numpy.abs(BoxBounds[:,1]-BoxBounds[:,0])
106     LimitSpeed = numpy.vstack((-__dx,__dx)).T
107     #
108     NumberOfFunctionEvaluations = 1
109     JXini, JbXini, JoXini = CostFunction(Xini,selfA._parameters["QualityCriterion"])
110     #
111     Swarm  = numpy.zeros((__nbI,3,__nbP)) # 3 car (x,v,xbest)
112     for __p in range(__nbP) :
113         Swarm[:,0,__p] = rand( low=LimitStep[__p,0],  high=LimitStep[__p,1],  size=__nbI) # Position
114         Swarm[:,1,__p] = rand( low=LimitSpeed[__p,0], high=LimitSpeed[__p,1], size=__nbI) # Velocity
115     logging.debug("%s Initialisation of the swarm with %i insects of size %i "%(selfA._name,Swarm.shape[0],Swarm.shape[2]))
116     #
117     qSwarm = JXini * numpy.ones((__nbI,3)) # Qualité (J, Jb, Jo) par insecte
118     for __i in range(__nbI):
119         NumberOfFunctionEvaluations += 1
120         JTest, JbTest, JoTest = CostFunction(Swarm[__i,0,:],selfA._parameters["QualityCriterion"])
121         if JTest < JXini:
122             Swarm[__i,2,:] = Swarm[__i,0,:] # xBest
123             qSwarm[__i,:]  = (JTest, JbTest, JoTest)
124         else:
125             Swarm[__i,2,:] = Xini # xBest
126             qSwarm[__i,:]  = (JXini, JbXini, JoXini)
127     logging.debug("%s Initialisation of the best previous insects"%selfA._name)
128     #
129     iBest = numpy.argmin(qSwarm[:,0])
130     xBest = Swarm[iBest,2,:]
131     if selfA._parameters["StoreInternalVariables"] or selfA._toStore("CurrentState"):
132         selfA.StoredVariables["CurrentState"].store( xBest )
133     selfA.StoredVariables["CostFunctionJ" ].store( qSwarm[iBest,0]  )
134     selfA.StoredVariables["CostFunctionJb"].store( qSwarm[iBest,1] )
135     selfA.StoredVariables["CostFunctionJo"].store( qSwarm[iBest,2] )
136     #
137     selfA.StoredVariables["CurrentIterationNumber"].store( len(selfA.StoredVariables["CostFunctionJ"]) )
138     #
139     # Minimisation de la fonctionnelle
140     # --------------------------------
141     step = 0
142     while KeepRunningCondition(step, NumberOfFunctionEvaluations):
143         step += 1
144         for __i in range(__nbI):
145             for __p in range(__nbP):
146                 # Vitesse
147                 Swarm[__i,1,__p] = __iw * Swarm[__i,1,__p] \
148                                  + __ca * rand() * (Swarm[__i,2,__p]   - Swarm[__i,0,__p]) \
149                                  + __sa * rand() * (Swarm[iBest,2,__p] - Swarm[__i,0,__p])
150                 # Position
151                 Swarm[__i,0,__p]  = Swarm[__i,0,__p] + Swarm[__i,1,__p]
152                 #
153             NumberOfFunctionEvaluations += 1
154             JTest, JbTest, JoTest = CostFunction(Swarm[__i,0,:],selfA._parameters["QualityCriterion"])
155             if JTest < qSwarm[__i,0]:
156                 Swarm[__i,2,:] = Swarm[__i,0,:] # xBest
157                 qSwarm[__i,:]  = (JTest, JbTest, JoTest)
158         #
159         iBest = numpy.argmin(qSwarm[:,0])
160         xBest = Swarm[iBest,2,:]
161         selfA.StoredVariables["CurrentIterationNumber"].store( len(selfA.StoredVariables["CostFunctionJ"]) )
162         if selfA._parameters["StoreInternalVariables"] or selfA._toStore("CurrentState"):
163             selfA.StoredVariables["CurrentState"].store( xBest )
164         if selfA._toStore("SimulatedObservationAtCurrentState"):
165             selfA.StoredVariables["SimulatedObservationAtCurrentState"].store( Hm( xBest ) )
166         selfA.StoredVariables["CostFunctionJ" ].store( qSwarm[iBest,0]  )
167         selfA.StoredVariables["CostFunctionJb"].store( qSwarm[iBest,1] )
168         selfA.StoredVariables["CostFunctionJo"].store( qSwarm[iBest,2] )
169         logging.debug("%s Step %i: insect %i is the better one with J =%.7f"%(selfA._name,step,iBest,qSwarm[iBest,0]))
170     #
171     # Obtention de l'analyse
172     # ----------------------
173     Xa = xBest
174     #
175     selfA.StoredVariables["Analysis"].store( Xa )
176     #
177     # Calculs et/ou stockages supplémentaires
178     # ---------------------------------------
179     if selfA._toStore("OMA") or \
180         selfA._toStore("SimulatedObservationAtOptimum"):
181         HXa = Hm(Xa)
182     if selfA._toStore("Innovation") or \
183         selfA._toStore("OMB") or \
184         selfA._toStore("SimulatedObservationAtBackground"):
185         HXb = Hm(Xb)
186         Innovation = Y - HXb
187     if selfA._toStore("Innovation"):
188         selfA.StoredVariables["Innovation"].store( Innovation )
189     if selfA._toStore("OMB"):
190         selfA.StoredVariables["OMB"].store( Innovation )
191     if selfA._toStore("BMA"):
192         selfA.StoredVariables["BMA"].store( numpy.ravel(Xb) - numpy.ravel(Xa) )
193     if selfA._toStore("OMA"):
194         selfA.StoredVariables["OMA"].store( numpy.ravel(Y) - numpy.ravel(HXa) )
195     if selfA._toStore("SimulatedObservationAtBackground"):
196         selfA.StoredVariables["SimulatedObservationAtBackground"].store( HXb )
197     if selfA._toStore("SimulatedObservationAtOptimum"):
198         selfA.StoredVariables["SimulatedObservationAtOptimum"].store( HXa )
199     #
200     selfA._post_run(HO)
201     return 0
202
203 # ==============================================================================
204 if __name__ == "__main__":
205     print('\n AUTODIAGNOSTIC\n')