1 # -*- coding: utf-8 -*-
3 # Copyright (C) 2008-2023 EDF R&D
5 # This library is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation; either
8 # version 2.1 of the License.
10 # This library is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 # Lesser General Public License for more details.
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this library; if not, write to the Free Software
17 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 # Author: Jean-Philippe Argaud, jean-philippe.argaud@edf.fr, EDF R&D
24 Standard Particle Swarm Optimization
26 __author__ = "Jean-Philippe ARGAUD"
28 import numpy, logging, copy
29 from daCore.NumericObjects import ApplyBounds, ForceNumericBounds
30 from numpy.random import uniform as rand
32 # ==============================================================================
33 def ecwnpso(selfA, Xb, Y, HO, R, B):
35 if ("BoxBounds" in selfA._parameters) and isinstance(selfA._parameters["BoxBounds"], (list, tuple)) and (len(selfA._parameters["BoxBounds"]) > 0):
36 BoxBounds = selfA._parameters["BoxBounds"]
37 logging.debug("%s Prise en compte des bornes d'incréments de paramètres effectuée"%(selfA._name,))
39 raise ValueError("Particle Swarm Optimization requires bounds on all variables increments to be truly given (BoxBounds).")
40 BoxBounds = numpy.array(BoxBounds)
41 if numpy.isnan(BoxBounds).any():
42 raise ValueError("Particle Swarm Optimization requires bounds on all variables increments to be truly given (BoxBounds), \"None\" is not allowed. The actual increments bounds are:\n%s"%BoxBounds)
44 selfA._parameters["Bounds"] = ForceNumericBounds( selfA._parameters["Bounds"] )
46 Hm = HO["Direct"].appliedTo
51 Xini = selfA._parameters["InitializationPoint"]
53 def CostFunction(x, QualityMeasure="AugmentedWeightedLeastSquares"):
54 _X = numpy.asarray( x ).reshape((-1,1))
55 _HX = numpy.asarray( Hm( _X ) ).reshape((-1,1))
58 if QualityMeasure in ["AugmentedWeightedLeastSquares","AWLS","DA"]:
59 if BI is None or RI is None:
60 raise ValueError("Background and Observation error covariance matrices has to be properly defined!")
61 Jb = 0.5 * (_X - Xb).T @ (BI @ (_X - Xb))
62 Jo = 0.5 * _Innovation.T @ (RI @ _Innovation)
63 elif QualityMeasure in ["WeightedLeastSquares","WLS"]:
65 raise ValueError("Observation error covariance matrix has to be properly defined!")
67 Jo = 0.5 * _Innovation.T @ (RI @ _Innovation)
68 elif QualityMeasure in ["LeastSquares","LS","L2"]:
70 Jo = 0.5 * _Innovation.T @ _Innovation
71 elif QualityMeasure in ["AbsoluteValue","L1"]:
73 Jo = numpy.sum( numpy.abs(_Innovation) )
74 elif QualityMeasure in ["MaximumError","ME", "Linf"]:
76 Jo = numpy.max( numpy.abs(_Innovation) )
78 J = float( Jb ) + float( Jo )
80 return J, float( Jb ), float( Jo )
82 def KeepRunningCondition(__step, __nbfct):
83 if __step >= selfA._parameters["MaximumNumberOfIterations"]:
84 logging.debug("%s Stopping search because the number %i of evolving iterations is exceeding the maximum %i."%(selfA._name, __step, selfA._parameters["MaximumNumberOfIterations"]))
86 elif __nbfct >= selfA._parameters["MaximumNumberOfFunctionEvaluations"]:
87 logging.debug("%s Stopping search because the number %i of function evaluations is exceeding the maximum %i."%(selfA._name, __nbfct, selfA._parameters["MaximumNumberOfFunctionEvaluations"]))
94 __nbI = selfA._parameters["NumberOfInsects"]
95 __nbP = len(Xini) # Dimension ou nombre de paramètres
97 __iw = float( selfA._parameters["InertiaWeight"] )
98 __sa = float( selfA._parameters["SocialAcceleration"] )
99 __ca = float( selfA._parameters["CognitiveAcceleration"] )
100 __vc = float( selfA._parameters["VelocityClampingFactor"] )
101 logging.debug("%s Cognitive acceleration (recall to the best previously known value of the insect) = %s"%(selfA._name, str(__ca)))
102 logging.debug("%s Social acceleration (recall to the best insect value of the group) = %s"%(selfA._name, str(__sa)))
103 logging.debug("%s Velocity clamping factor = %s"%(selfA._name, str(__vc)))
105 # Initialisation de l'essaim
106 # --------------------------
107 LimitStep = Xini.reshape((-1,1)) + BoxBounds
108 __dx = __vc * numpy.abs(BoxBounds[:,1]-BoxBounds[:,0])
109 LimitSpeed = numpy.vstack((-__dx,__dx)).T
111 NumberOfFunctionEvaluations = 1
112 JXini, JbXini, JoXini = CostFunction(Xini,selfA._parameters["QualityCriterion"])
114 Swarm = numpy.zeros((__nbI,3,__nbP)) # 3 car (x,v,xbest)
115 for __p in range(__nbP) :
116 Swarm[:,0,__p] = rand( low=LimitStep[__p,0], high=LimitStep[__p,1], size=__nbI) # Position
117 Swarm[:,1,__p] = rand( low=LimitSpeed[__p,0], high=LimitSpeed[__p,1], size=__nbI) # Velocity
118 logging.debug("%s Initialisation of the swarm with %i insects of size %i "%(selfA._name,Swarm.shape[0],Swarm.shape[2]))
120 qSwarm = JXini * numpy.ones((__nbI,3)) # Qualité (J, Jb, Jo) par insecte
121 for __i in range(__nbI):
122 NumberOfFunctionEvaluations += 1
123 JTest, JbTest, JoTest = CostFunction(Swarm[__i,0,:],selfA._parameters["QualityCriterion"])
125 Swarm[__i,2,:] = Swarm[__i,0,:] # xBest
126 qSwarm[__i,:] = (JTest, JbTest, JoTest)
128 Swarm[__i,2,:] = Xini # xBest
129 qSwarm[__i,:] = (JXini, JbXini, JoXini)
130 logging.debug("%s Initialisation of the best previous insects"%selfA._name)
132 iBest = numpy.argmin(qSwarm[:,0])
133 xBest = Swarm[iBest,2,:]
134 if selfA._parameters["StoreInternalVariables"] or selfA._toStore("CurrentState"):
135 selfA.StoredVariables["CurrentState"].store( xBest )
136 selfA.StoredVariables["CostFunctionJ" ].store( qSwarm[iBest,0] )
137 selfA.StoredVariables["CostFunctionJb"].store( qSwarm[iBest,1] )
138 selfA.StoredVariables["CostFunctionJo"].store( qSwarm[iBest,2] )
140 selfA.StoredVariables["CurrentIterationNumber"].store( len(selfA.StoredVariables["CostFunctionJ"]) )
142 # Minimisation de la fonctionnelle
143 # --------------------------------
145 while KeepRunningCondition(step, NumberOfFunctionEvaluations):
147 for __i in range(__nbI):
148 rct = rand(size=__nbP)
149 rst = rand(size=__nbP)
151 __velins = __iw * Swarm[__i,1,:] \
152 + __ca * rct * (Swarm[__i,2,:] - Swarm[__i,0,:]) \
153 + __sa * rst * (Swarm[iBest,2,:] - Swarm[__i,0,:])
154 Swarm[__i,1,:] = ApplyBounds( __velins, LimitSpeed )
156 __velins = Swarm[__i,0,:] + Swarm[__i,1,:]
157 Swarm[__i,0,:] = ApplyBounds( __velins, selfA._parameters["Bounds"] )
159 NumberOfFunctionEvaluations += 1
160 JTest, JbTest, JoTest = CostFunction(Swarm[__i,0,:],selfA._parameters["QualityCriterion"])
161 if JTest < qSwarm[__i,0]:
162 Swarm[__i,2,:] = Swarm[__i,0,:] # xBest
163 qSwarm[__i,:] = (JTest, JbTest, JoTest)
165 iBest = numpy.argmin(qSwarm[:,0])
166 xBest = Swarm[iBest,2,:]
167 selfA.StoredVariables["CurrentIterationNumber"].store( len(selfA.StoredVariables["CostFunctionJ"]) )
168 if selfA._parameters["StoreInternalVariables"] or selfA._toStore("CurrentState"):
169 selfA.StoredVariables["CurrentState"].store( xBest )
170 if selfA._toStore("SimulatedObservationAtCurrentState"):
171 selfA.StoredVariables["SimulatedObservationAtCurrentState"].store( Hm( xBest ) )
172 selfA.StoredVariables["CostFunctionJ" ].store( qSwarm[iBest,0] )
173 selfA.StoredVariables["CostFunctionJb"].store( qSwarm[iBest,1] )
174 selfA.StoredVariables["CostFunctionJo"].store( qSwarm[iBest,2] )
175 logging.debug("%s Step %i: insect %i is the better one with J =%.7f"%(selfA._name,step,iBest,qSwarm[iBest,0]))
177 # Obtention de l'analyse
178 # ----------------------
181 selfA.StoredVariables["Analysis"].store( Xa )
183 # Calculs et/ou stockages supplémentaires
184 # ---------------------------------------
185 if selfA._toStore("OMA") or \
186 selfA._toStore("SimulatedObservationAtOptimum"):
188 if selfA._toStore("Innovation") or \
189 selfA._toStore("OMB") or \
190 selfA._toStore("SimulatedObservationAtBackground"):
193 if selfA._toStore("Innovation"):
194 selfA.StoredVariables["Innovation"].store( Innovation )
195 if selfA._toStore("OMB"):
196 selfA.StoredVariables["OMB"].store( Innovation )
197 if selfA._toStore("BMA"):
198 selfA.StoredVariables["BMA"].store( numpy.ravel(Xb) - numpy.ravel(Xa) )
199 if selfA._toStore("OMA"):
200 selfA.StoredVariables["OMA"].store( numpy.ravel(Y) - numpy.ravel(HXa) )
201 if selfA._toStore("SimulatedObservationAtBackground"):
202 selfA.StoredVariables["SimulatedObservationAtBackground"].store( HXb )
203 if selfA._toStore("SimulatedObservationAtOptimum"):
204 selfA.StoredVariables["SimulatedObservationAtOptimum"].store( HXa )
209 # ==============================================================================
210 if __name__ == "__main__":
211 print('\n AUTODIAGNOSTIC\n')