1 # -*- coding: utf-8 -*-
3 # Copyright (C) 2008-2023 EDF R&D
5 # This library is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation; either
8 # version 2.1 of the License.
10 # This library is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 # Lesser General Public License for more details.
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this library; if not, write to the Free Software
17 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 # Author: Jean-Philippe Argaud, jean-philippe.argaud@edf.fr, EDF R&D
24 Standard Particle Swarm Optimization
26 __author__ = "Jean-Philippe ARGAUD"
28 import numpy, logging, copy
29 from numpy.random import uniform as rand
31 # ==============================================================================
32 def ecwopso(selfA, Xb, Y, HO, R, B):
34 if ("BoxBounds" in selfA._parameters) and isinstance(selfA._parameters["BoxBounds"], (list, tuple)) and (len(selfA._parameters["BoxBounds"]) > 0):
35 BoxBounds = selfA._parameters["BoxBounds"]
36 logging.debug("%s Prise en compte des bornes d'incréments de paramètres effectuée"%(selfA._name,))
38 raise ValueError("Particle Swarm Optimization requires bounds on all variables increments to be truly given (BoxBounds).")
39 BoxBounds = numpy.array(BoxBounds)
40 if numpy.isnan(BoxBounds).any():
41 raise ValueError("Particle Swarm Optimization requires bounds on all variables increments to be truly given (BoxBounds), \"None\" is not allowed. The actual increments bounds are:\n%s"%BoxBounds)
43 Hm = HO["Direct"].appliedTo
48 Xini = selfA._parameters["InitializationPoint"]
50 def CostFunction(x, QualityMeasure="AugmentedWeightedLeastSquares"):
51 _X = numpy.asarray( x ).reshape((-1,1))
52 _HX = numpy.asarray( Hm( _X ) ).reshape((-1,1))
55 if QualityMeasure in ["AugmentedWeightedLeastSquares","AWLS","DA"]:
56 if BI is None or RI is None:
57 raise ValueError("Background and Observation error covariance matrices has to be properly defined!")
58 Jb = 0.5 * (_X - Xb).T @ (BI @ (_X - Xb))
59 Jo = 0.5 * _Innovation.T @ (RI @ _Innovation)
60 elif QualityMeasure in ["WeightedLeastSquares","WLS"]:
62 raise ValueError("Observation error covariance matrix has to be properly defined!")
64 Jo = 0.5 * _Innovation.T @ (RI @ _Innovation)
65 elif QualityMeasure in ["LeastSquares","LS","L2"]:
67 Jo = 0.5 * _Innovation.T @ _Innovation
68 elif QualityMeasure in ["AbsoluteValue","L1"]:
70 Jo = numpy.sum( numpy.abs(_Innovation) )
71 elif QualityMeasure in ["MaximumError","ME", "Linf"]:
73 Jo = numpy.max( numpy.abs(_Innovation) )
75 J = float( Jb ) + float( Jo )
77 return J, float( Jb ), float( Jo )
79 def KeepRunningCondition(__step, __nbfct):
80 if __step >= selfA._parameters["MaximumNumberOfIterations"]:
81 logging.debug("%s Stopping search because the number %i of evolving iterations is exceeding the maximum %i."%(selfA._name, __step, selfA._parameters["MaximumNumberOfIterations"]))
83 elif __nbfct >= selfA._parameters["MaximumNumberOfFunctionEvaluations"]:
84 logging.debug("%s Stopping search because the number %i of function evaluations is exceeding the maximum %i."%(selfA._name, __nbfct, selfA._parameters["MaximumNumberOfFunctionEvaluations"]))
91 __nbI = selfA._parameters["NumberOfInsects"]
92 __nbP = len(Xini) # Dimension ou nombre de paramètres
94 __iw = float( selfA._parameters["InertiaWeight"] )
95 __sa = float( selfA._parameters["SocialAcceleration"] )
96 __ca = float( selfA._parameters["CognitiveAcceleration"] )
97 __vc = float( selfA._parameters["VelocityClampingFactor"] )
98 logging.debug("%s Cognitive acceleration (recall to the best previously known value of the insect) = %s"%(selfA._name, str(__ca)))
99 logging.debug("%s Social acceleration (recall to the best insect value of the group) = %s"%(selfA._name, str(__sa)))
100 logging.debug("%s Velocity clamping factor = %s"%(selfA._name, str(__vc)))
102 # Initialisation de l'essaim
103 # --------------------------
104 LimitStep = Xini.reshape((-1,1)) + BoxBounds
105 __dx = __vc * numpy.abs(BoxBounds[:,1]-BoxBounds[:,0])
106 LimitSpeed = numpy.vstack((-__dx,__dx)).T
108 NumberOfFunctionEvaluations = 1
109 JXini, JbXini, JoXini = CostFunction(Xini,selfA._parameters["QualityCriterion"])
111 Swarm = numpy.zeros((__nbI,3,__nbP)) # 3 car (x,v,xbest)
112 for __p in range(__nbP) :
113 Swarm[:,0,__p] = rand( low=LimitStep[__p,0], high=LimitStep[__p,1], size=__nbI) # Position
114 Swarm[:,1,__p] = rand( low=LimitSpeed[__p,0], high=LimitSpeed[__p,1], size=__nbI) # Velocity
115 logging.debug("%s Initialisation of the swarm with %i insects of size %i "%(selfA._name,Swarm.shape[0],Swarm.shape[2]))
117 qSwarm = JXini * numpy.ones((__nbI,3)) # Qualité (J, Jb, Jo) par insecte
118 for __i in range(__nbI):
119 NumberOfFunctionEvaluations += 1
120 JTest, JbTest, JoTest = CostFunction(Swarm[__i,0,:],selfA._parameters["QualityCriterion"])
122 Swarm[__i,2,:] = Swarm[__i,0,:] # xBest
123 qSwarm[__i,:] = (JTest, JbTest, JoTest)
125 Swarm[__i,2,:] = Xini # xBest
126 qSwarm[__i,:] = (JXini, JbXini, JoXini)
127 logging.debug("%s Initialisation of the best previous insects"%selfA._name)
129 iBest = numpy.argmin(qSwarm[:,0])
130 xBest = Swarm[iBest,2,:]
131 if selfA._parameters["StoreInternalVariables"] or selfA._toStore("CurrentState"):
132 selfA.StoredVariables["CurrentState"].store( xBest )
133 selfA.StoredVariables["CostFunctionJ" ].store( qSwarm[iBest,0] )
134 selfA.StoredVariables["CostFunctionJb"].store( qSwarm[iBest,1] )
135 selfA.StoredVariables["CostFunctionJo"].store( qSwarm[iBest,2] )
137 selfA.StoredVariables["CurrentIterationNumber"].store( len(selfA.StoredVariables["CostFunctionJ"]) )
139 # Minimisation de la fonctionnelle
140 # --------------------------------
142 while KeepRunningCondition(step, NumberOfFunctionEvaluations):
144 for __i in range(__nbI):
145 for __p in range(__nbP):
147 Swarm[__i,1,__p] = __iw * Swarm[__i,1,__p] \
148 + __ca * rand() * (Swarm[__i,2,__p] - Swarm[__i,0,__p]) \
149 + __sa * rand() * (Swarm[iBest,2,__p] - Swarm[__i,0,__p])
151 Swarm[__i,0,__p] = Swarm[__i,0,__p] + Swarm[__i,1,__p]
153 NumberOfFunctionEvaluations += 1
154 JTest, JbTest, JoTest = CostFunction(Swarm[__i,0,:],selfA._parameters["QualityCriterion"])
155 if JTest < qSwarm[__i,0]:
156 Swarm[__i,2,:] = Swarm[__i,0,:] # xBest
157 qSwarm[__i,:] = (JTest, JbTest, JoTest)
159 iBest = numpy.argmin(qSwarm[:,0])
160 xBest = Swarm[iBest,2,:]
161 selfA.StoredVariables["CurrentIterationNumber"].store( len(selfA.StoredVariables["CostFunctionJ"]) )
162 if selfA._parameters["StoreInternalVariables"] or selfA._toStore("CurrentState"):
163 selfA.StoredVariables["CurrentState"].store( xBest )
164 if selfA._toStore("SimulatedObservationAtCurrentState"):
165 selfA.StoredVariables["SimulatedObservationAtCurrentState"].store( Hm( xBest ) )
166 selfA.StoredVariables["CostFunctionJ" ].store( qSwarm[iBest,0] )
167 selfA.StoredVariables["CostFunctionJb"].store( qSwarm[iBest,1] )
168 selfA.StoredVariables["CostFunctionJo"].store( qSwarm[iBest,2] )
169 logging.debug("%s Step %i: insect %i is the better one with J =%.7f"%(selfA._name,step,iBest,qSwarm[iBest,0]))
171 # Obtention de l'analyse
172 # ----------------------
175 selfA.StoredVariables["Analysis"].store( Xa )
177 # Calculs et/ou stockages supplémentaires
178 # ---------------------------------------
179 if selfA._toStore("OMA") or \
180 selfA._toStore("SimulatedObservationAtOptimum"):
182 if selfA._toStore("Innovation") or \
183 selfA._toStore("OMB") or \
184 selfA._toStore("SimulatedObservationAtBackground"):
187 if selfA._toStore("Innovation"):
188 selfA.StoredVariables["Innovation"].store( Innovation )
189 if selfA._toStore("OMB"):
190 selfA.StoredVariables["OMB"].store( Innovation )
191 if selfA._toStore("BMA"):
192 selfA.StoredVariables["BMA"].store( numpy.ravel(Xb) - numpy.ravel(Xa) )
193 if selfA._toStore("OMA"):
194 selfA.StoredVariables["OMA"].store( numpy.ravel(Y) - numpy.ravel(HXa) )
195 if selfA._toStore("SimulatedObservationAtBackground"):
196 selfA.StoredVariables["SimulatedObservationAtBackground"].store( HXb )
197 if selfA._toStore("SimulatedObservationAtOptimum"):
198 selfA.StoredVariables["SimulatedObservationAtOptimum"].store( HXa )
203 # ==============================================================================
204 if __name__ == "__main__":
205 print('\n AUTODIAGNOSTIC\n')