.. [WikipediaUKF] Wikipedia, *Unscented Kalman Filter*, https://en.wikipedia.org/wiki/Unscented_Kalman_filter
+.. [ZambranoBigiarini13] Zambrano-Bigiarini M., Clerc M., Rojas R., *Standard Particle Swarm Optimisation 2011 at CEC-2013: A baseline for future PSO improvements*, 2013 IEEE Congress on Evolutionary Computation, pp.2337-2344, 2013
+
.. [Zhu97] Zhu C., Byrd R. H., Nocedal J., *L-BFGS-B: Algorithm 778: L-BFGS-B, FORTRAN routines for large scale bound constrained optimization*, ACM Transactions on Mathematical Software, 23(4), pp.550-560, 1997
.. [Zupanski05] Zupanski M., *Maximum likelihood ensemble filter: Theoretical aspects*, Monthly Weather Review, 133(6), pp.1710–1726, 2005
of a cost function :math:`J` by using an evolutionary strategy of particle
swarm. It is a method that does not use the derivatives of the cost function.
It is based on the evolution of a population (called a "swarm") of states (each
-state is called a "particle"). It falls in the same category than the
+state is called a "particle" or an "insect"). It falls in the same category
+than the
:ref:`section_ref_algorithm_DerivativeFreeOptimization`, the
:ref:`section_ref_algorithm_DifferentialEvolution` or the
:ref:`section_ref_algorithm_TabuSearch`.
:ref:`section_theory_optimization`. The default error function is the augmented
weighted least squares function, classically used in data assimilation.
-.. ------------------------------------ ..
+.There exists various variants of this algorithm. The following stable and
+robust formulations are proposed here:
+
+.. index::
+.. index::
+ pair: Variant ; PSO
+ pair: Variant ; CanonicalPSO
+ pair: Variant ; OGCR
+
+- "PSO" (Canonical PSO, see [ZambranoBigiarini13]_), canonical algorithm of particle swarm, robust and defining the reference for particle swarm algorithms,
+- "OGCR" (Simple PSO with no bounds on insects or velocities), simplified algorithm of particle swarm, not recommanded because less robust, but sometimes a lot more efficient.
+
+. ------------------------------------ ..
.. include:: snippets/Header2Algo02.rst
.. include:: snippets/Background.rst
.. ------------------------------------ ..
.. include:: snippets/Header2Algo03AdOp.rst
-.. include:: snippets/MaximumNumberOfIterations_50.rst
+.. include:: snippets/BoundsWithNone.rst
-.. include:: snippets/MaximumNumberOfFunctionEvaluations.rst
+.. include:: snippets/BoxBounds.rst
-.. include:: snippets/QualityCriterion.rst
+.. include:: snippets/CognitiveAcceleration.rst
-.. include:: snippets/NumberOfInsects.rst
+.. include:: snippets/InertiaWeight.rst
-.. include:: snippets/SwarmVelocity.rst
+.. include:: snippets/InitializationPoint.rst
-.. include:: snippets/GroupRecallRate.rst
+.. include:: snippets/MaximumNumberOfFunctionEvaluations.rst
-.. include:: snippets/BoxBounds.rst
+.. include:: snippets/MaximumNumberOfIterations_50.rst
+
+.. include:: snippets/NumberOfInsects.rst
+
+.. include:: snippets/QualityCriterion.rst
.. include:: snippets/SetSeed.rst
+.. include:: snippets/SocialAcceleration.rst
+
StoreSupplementaryCalculations
.. index:: single: StoreSupplementaryCalculations
Example :
``{"StoreSupplementaryCalculations":["CurrentState", "Residu"]}``
+.. include:: snippets/VelocityClampingFactor.rst
+
.. ------------------------------------ ..
.. include:: snippets/Header2Algo04.rst
.. include:: snippets/Header2Algo07.rst
- [WikipediaPSO]_
+- [ZambranoBigiarini13]_
--- /dev/null
+.. index:: single: CognitiveAcceleration
+
+CognitiveAcceleration
+ *Real value*. This key indicates the recall rate at the best previously known
+ value of the current insect. It is a floating point value between 0 and 1.
+ The default value is 0.5.
+
+ Example :
+ ``{"CognitiveAcceleration":0.5}``
+++ /dev/null
-.. index:: single: GroupRecallRate
-
-GroupRecallRate
- *Real value*. This key indicates the recall rate at the best swarm insect. It
- is a floating point value between 0 and 1. The default value is 0.5.
-
- Example :
- ``{"GroupRecallRate":0.5}``
--- /dev/null
+.. index:: single: InertiaWeight
+
+InertiaWeight
+ *Real value*. This key indicates the part of the insect velocity which is
+ imposed by the swarm, named "inertia weight". It is a positive floating point
+ value. It is a floating point value between 0 and 1. The default value is 1.
+
+ Example :
+ ``{"InertiaWeight":1.}``
--- /dev/null
+.. index:: single: SocialAcceleration
+
+SocialAcceleration
+ *Real value*. This key indicates the recall rate at the best swarm insect in
+ the neighbourhood of the current insect, which is by default the whole swarm.
+ It is a floating point value between 0 and 1. The default value is 0.5.
+
+ Example :
+ ``{"SocialAcceleration":0.5}``
+++ /dev/null
-.. index:: single: SwarmVelocity
-
-SwarmVelocity
- *Real value*. This key indicates the part of the insect velocity which is
- imposed by the swarm, named "group velocity". It is a positive floating point
- value. The default value is 1.
-
- Example :
- ``{"SwarmVelocity":1.}``
--- /dev/null
+.. index:: single: VelocityClampingFactor
+
+VelocityClampingFactor
+ *Real value*. This key indicates the rate of group velocity attenuation in
+ the update for each insect, useful to avoid swarm explosion, i.e.
+ uncontrolled growth of insect velocity. It is a floating point value between
+ 0+ and 1. The default value is 1.0.
+
+ Example :
+ ``{"VelocityClampingFactor":1.0}``
.. [WikipediaUKF] Wikipedia, *Unscented Kalman Filter*, https://en.wikipedia.org/wiki/Unscented_Kalman_filter
+.. [ZambranoBigiarini13] Zambrano-Bigiarini M., Clerc M., Rojas R., *Standard Particle Swarm Optimisation 2011 at CEC-2013: A baseline for future PSO improvements*, 2013 IEEE Congress on Evolutionary Computation, pp.2337-2344, 2013
+
.. [Zhu97] Zhu C., Byrd R. H., Nocedal J., *L-BFGS-B: Algorithm 778: L-BFGS-B, FORTRAN routines for large scale bound constrained optimization*, ACM Transactions on Mathematical Software, 23(4), pp.550-560, 1997
.. [Zupanski05] Zupanski M., *Maximum likelihood ensemble filter: Theoretical aspects*, Monthly Weather Review, 133(6), pp.1710–1726, 2005
d'une fonctionnelle d'écart :math:`J` en utilisant une méthode évolutionnaire
d'essaim particulaire. C'est une méthode qui n'utilise pas les dérivées de la
fonctionnelle d'écart. Elle est basée sur l'évolution d'une population (appelée
-"essaim") d'états (chaque individu étant appelé une "particule"). Elle entre
-dans la même catégorie que les
+"essaim") d'états (chaque individu étant appelé une "particule" ou un insecte).
+Elle entre dans la même catégorie que les
:ref:`section_ref_algorithm_DerivativeFreeOptimization`,
:ref:`section_ref_algorithm_DifferentialEvolution` ou
:ref:`section_ref_algorithm_TabuSearch`.
par défaut est celle de moindres carrés pondérés augmentés, classiquement
utilisée en assimilation de données.
+Il existe diverses variantes de cet algorithme. On propose ici les formulations
+stables et robustes suivantes :
+
+.. index::
+ pair: Variant ; PSO
+ pair: Variant ; CanonicalPSO
+ pair: Variant ; OGCR
+
+- "PSO" (Canonical PSO, voir [ZambranoBigiarini13]_), algorithme canonique d'essaim particulaire, robuste et défini comme la référence des algorithmes d'essaims particulaires,
+- "OGCR" (Simple PSO sans bornes sur les insectes ou les vitesses), algorithme simplifié d'essaim particulaire, déconseillé car peu robuste, mais parfois beaucoup plus rapide.
+
.. ------------------------------------ ..
.. include:: snippets/Header2Algo02.rst
.. ------------------------------------ ..
.. include:: snippets/Header2Algo03AdOp.rst
-.. include:: snippets/MaximumNumberOfIterations_50.rst
+.. include:: snippets/BoundsWithNone.rst
-.. include:: snippets/MaximumNumberOfFunctionEvaluations.rst
+.. include:: snippets/BoxBounds.rst
-.. include:: snippets/QualityCriterion.rst
+.. include:: snippets/CognitiveAcceleration.rst
-.. include:: snippets/NumberOfInsects.rst
+.. include:: snippets/InertiaWeight.rst
-.. include:: snippets/SwarmVelocity.rst
+.. include:: snippets/InitializationPoint.rst
-.. include:: snippets/GroupRecallRate.rst
+.. include:: snippets/MaximumNumberOfFunctionEvaluations.rst
-.. include:: snippets/BoxBounds.rst
+.. include:: snippets/MaximumNumberOfIterations_50.rst
+
+.. include:: snippets/NumberOfInsects.rst
+
+.. include:: snippets/QualityCriterion.rst
.. include:: snippets/SetSeed.rst
+.. include:: snippets/SocialAcceleration.rst
+
StoreSupplementaryCalculations
.. index:: single: StoreSupplementaryCalculations
Exemple :
``{"StoreSupplementaryCalculations":["CurrentState", "Residu"]}``
+.. include:: snippets/VelocityClampingFactor.rst
+
.. ------------------------------------ ..
.. include:: snippets/Header2Algo04.rst
.. include:: snippets/Header2Algo07.rst
- [WikipediaPSO]_
+- [ZambranoBigiarini13]_
--- /dev/null
+.. index:: single: CognitiveAcceleration
+
+CognitiveAcceleration
+ *Valeur réelle*. Cette clé indique le taux de rappel vers la meilleure valeur
+ connue précédemment de l'insecte courant. C'est une valeur réelle comprise
+ entre 0 et 1. Le défaut est de 0.5.
+
+ Exemple :
+ ``{"CognitiveAcceleration":0.5}``
+
+++ /dev/null
-.. index:: single: GroupRecallRate
-
-GroupRecallRate
- *Valeur réelle*. Cette clé indique le taux de rappel vers le meilleur insecte
- de l'essaim. C'est une valeur réelle comprise entre 0 et 1. Le défaut est de
- 0.5.
-
- Exemple :
- ``{"GroupRecallRate":0.5}``
--- /dev/null
+.. index:: single: InertiaWeight
+
+InertiaWeight
+ *Valeur réelle*. Cette clé indique la part de la vitesse de l'essaim qui est
+ imposée à l'insecte, dite "poids de l'inertie". C'est une valeur réelle
+ comprise entre 0 et 1. Le défaut est de 1.
+
+ Exemple :
+ ``{"InertiaWeight":1.}``
--- /dev/null
+.. index:: single: SocialAcceleration
+
+SocialAcceleration
+ *Valeur réelle*. Cette clé indique le taux de rappel vers le meilleur insecte
+ du voisinage de l'insecte courant, qui est par défaut l'essaim complet. C'est
+ une valeur réelle comprise entre 0 et 1. Le défaut est de 0.5.
+
+ Exemple :
+ ``{"SocialAcceleration":0.5}``
+++ /dev/null
-.. index:: single: SwarmVelocity
-
-SwarmVelocity
- *Valeur réelle*. Cette clé indique la part de la vitesse d'insecte qui est
- imposée par l'essaim, dite "vitesse de groupe". C'est une valeur réelle
- positive. Le défaut est de 1.
-
- Exemple :
- ``{"SwarmVelocity":1.}``
--- /dev/null
+.. index:: single: VelocityClampingFactor
+
+VelocityClampingFactor
+ *Valeur réelle*. Cette clé indique le taux d'atténuation de la vitesse de
+ groupe dans la mise à jour pour chaque insecte, utile pour éviter l'explosion
+ de l'essaim, c'est-à-dire une croissance incontrôlée de la vitesse des
+ insectes. C'est une valeur réelle comprise entre 0+ et 1. Le défaut est de
+ 1.0.
+
+ Exemple :
+ ``{"VelocityClampingFactor":1.0}``
+
--- /dev/null
+# -*- coding: utf-8 -*-
+#
+# Copyright (C) 2008-2023 EDF R&D
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+# See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
+#
+# Author: Jean-Philippe Argaud, jean-philippe.argaud@edf.fr, EDF R&D
+
+__doc__ = """
+ Standard Particle Swarm Optimization
+"""
+__author__ = "Jean-Philippe ARGAUD"
+
+import numpy, logging, copy
+from daCore.NumericObjects import ApplyBounds, ForceNumericBounds
+from numpy.random import uniform as rand
+
+# ==============================================================================
+def ecwnpso(selfA, Xb, Y, HO, R, B):
+ #
+ if ("BoxBounds" in selfA._parameters) and isinstance(selfA._parameters["BoxBounds"], (list, tuple)) and (len(selfA._parameters["BoxBounds"]) > 0):
+ BoxBounds = selfA._parameters["BoxBounds"]
+ logging.debug("%s Prise en compte des bornes d'incréments de paramètres effectuée"%(selfA._name,))
+ else:
+ raise ValueError("Particle Swarm Optimization requires bounds on all variables increments to be truly given (BoxBounds).")
+ BoxBounds = numpy.array(BoxBounds)
+ if numpy.isnan(BoxBounds).any():
+ raise ValueError("Particle Swarm Optimization requires bounds on all variables increments to be truly given (BoxBounds), \"None\" is not allowed. The actual increments bounds are:\n%s"%BoxBounds)
+ #
+ selfA._parameters["Bounds"] = ForceNumericBounds( selfA._parameters["Bounds"] )
+ #
+ Hm = HO["Direct"].appliedTo
+ #
+ BI = B.getI()
+ RI = R.getI()
+ #
+ Xini = selfA._parameters["InitializationPoint"]
+ #
+ def CostFunction(x, QualityMeasure="AugmentedWeightedLeastSquares"):
+ _X = numpy.asarray( x ).reshape((-1,1))
+ _HX = numpy.asarray( Hm( _X ) ).reshape((-1,1))
+ _Innovation = Y - _HX
+ #
+ if QualityMeasure in ["AugmentedWeightedLeastSquares","AWLS","DA"]:
+ if BI is None or RI is None:
+ raise ValueError("Background and Observation error covariance matrices has to be properly defined!")
+ Jb = 0.5 * (_X - Xb).T @ (BI @ (_X - Xb))
+ Jo = 0.5 * _Innovation.T @ (RI @ _Innovation)
+ elif QualityMeasure in ["WeightedLeastSquares","WLS"]:
+ if RI is None:
+ raise ValueError("Observation error covariance matrix has to be properly defined!")
+ Jb = 0.
+ Jo = 0.5 * _Innovation.T @ (RI @ _Innovation)
+ elif QualityMeasure in ["LeastSquares","LS","L2"]:
+ Jb = 0.
+ Jo = 0.5 * _Innovation.T @ _Innovation
+ elif QualityMeasure in ["AbsoluteValue","L1"]:
+ Jb = 0.
+ Jo = numpy.sum( numpy.abs(_Innovation) )
+ elif QualityMeasure in ["MaximumError","ME", "Linf"]:
+ Jb = 0.
+ Jo = numpy.max( numpy.abs(_Innovation) )
+ #
+ J = float( Jb ) + float( Jo )
+ #
+ return J, float( Jb ), float( Jo )
+ #
+ def KeepRunningCondition(__step, __nbfct):
+ if __step >= selfA._parameters["MaximumNumberOfIterations"]:
+ logging.debug("%s Stopping search because the number %i of evolving iterations is exceeding the maximum %i."%(selfA._name, __step, selfA._parameters["MaximumNumberOfIterations"]))
+ return False
+ elif __nbfct >= selfA._parameters["MaximumNumberOfFunctionEvaluations"]:
+ logging.debug("%s Stopping search because the number %i of function evaluations is exceeding the maximum %i."%(selfA._name, __nbfct, selfA._parameters["MaximumNumberOfFunctionEvaluations"]))
+ return False
+ else:
+ return True
+ #
+ # Paramètres internes
+ # -------------------
+ __nbI = selfA._parameters["NumberOfInsects"]
+ __nbP = len(Xini) # Dimension ou nombre de paramètres
+ #
+ __iw = float( selfA._parameters["InertiaWeight"] )
+ __sa = float( selfA._parameters["SocialAcceleration"] )
+ __ca = float( selfA._parameters["CognitiveAcceleration"] )
+ __vc = float( selfA._parameters["VelocityClampingFactor"] )
+ logging.debug("%s Cognitive acceleration (recall to the best previously known value of the insect) = %s"%(selfA._name, str(__ca)))
+ logging.debug("%s Social acceleration (recall to the best insect value of the group) = %s"%(selfA._name, str(__sa)))
+ logging.debug("%s Velocity clamping factor = %s"%(selfA._name, str(__vc)))
+ #
+ # Initialisation de l'essaim
+ # --------------------------
+ LimitStep = Xini.reshape((-1,1)) + BoxBounds
+ __dx = __vc * numpy.abs(BoxBounds[:,1]-BoxBounds[:,0])
+ LimitSpeed = numpy.vstack((-__dx,__dx)).T
+ #
+ NumberOfFunctionEvaluations = 1
+ JXini, JbXini, JoXini = CostFunction(Xini,selfA._parameters["QualityCriterion"])
+ #
+ Swarm = numpy.zeros((__nbI,3,__nbP)) # 3 car (x,v,xbest)
+ for __p in range(__nbP) :
+ Swarm[:,0,__p] = rand( low=LimitStep[__p,0], high=LimitStep[__p,1], size=__nbI) # Position
+ Swarm[:,1,__p] = rand( low=LimitSpeed[__p,0], high=LimitSpeed[__p,1], size=__nbI) # Velocity
+ logging.debug("%s Initialisation of the swarm with %i insects of size %i "%(selfA._name,Swarm.shape[0],Swarm.shape[2]))
+ #
+ qSwarm = JXini * numpy.ones((__nbI,3)) # Qualité (J, Jb, Jo) par insecte
+ for __i in range(__nbI):
+ NumberOfFunctionEvaluations += 1
+ JTest, JbTest, JoTest = CostFunction(Swarm[__i,0,:],selfA._parameters["QualityCriterion"])
+ if JTest < JXini:
+ Swarm[__i,2,:] = Swarm[__i,0,:] # xBest
+ qSwarm[__i,:] = (JTest, JbTest, JoTest)
+ else:
+ Swarm[__i,2,:] = Xini # xBest
+ qSwarm[__i,:] = (JXini, JbXini, JoXini)
+ logging.debug("%s Initialisation of the best previous insects"%selfA._name)
+ #
+ iBest = numpy.argmin(qSwarm[:,0])
+ xBest = Swarm[iBest,2,:]
+ if selfA._parameters["StoreInternalVariables"] or selfA._toStore("CurrentState"):
+ selfA.StoredVariables["CurrentState"].store( xBest )
+ selfA.StoredVariables["CostFunctionJ" ].store( qSwarm[iBest,0] )
+ selfA.StoredVariables["CostFunctionJb"].store( qSwarm[iBest,1] )
+ selfA.StoredVariables["CostFunctionJo"].store( qSwarm[iBest,2] )
+ #
+ selfA.StoredVariables["CurrentIterationNumber"].store( len(selfA.StoredVariables["CostFunctionJ"]) )
+ #
+ # Minimisation de la fonctionnelle
+ # --------------------------------
+ step = 0
+ while KeepRunningCondition(step, NumberOfFunctionEvaluations):
+ step += 1
+ for __i in range(__nbI):
+ rct = rand(size=__nbP)
+ rst = rand(size=__nbP)
+ # Vitesse
+ __velins = __iw * Swarm[__i,1,:] \
+ + __ca * rct * (Swarm[__i,2,:] - Swarm[__i,0,:]) \
+ + __sa * rst * (Swarm[iBest,2,:] - Swarm[__i,0,:])
+ Swarm[__i,1,:] = ApplyBounds( __velins, LimitSpeed )
+ # Position
+ __velins = Swarm[__i,0,:] + Swarm[__i,1,:]
+ Swarm[__i,0,:] = ApplyBounds( __velins, selfA._parameters["Bounds"] )
+ #
+ NumberOfFunctionEvaluations += 1
+ JTest, JbTest, JoTest = CostFunction(Swarm[__i,0,:],selfA._parameters["QualityCriterion"])
+ if JTest < qSwarm[__i,0]:
+ Swarm[__i,2,:] = Swarm[__i,0,:] # xBest
+ qSwarm[__i,:] = (JTest, JbTest, JoTest)
+ #
+ iBest = numpy.argmin(qSwarm[:,0])
+ xBest = Swarm[iBest,2,:]
+ selfA.StoredVariables["CurrentIterationNumber"].store( len(selfA.StoredVariables["CostFunctionJ"]) )
+ if selfA._parameters["StoreInternalVariables"] or selfA._toStore("CurrentState"):
+ selfA.StoredVariables["CurrentState"].store( xBest )
+ if selfA._toStore("SimulatedObservationAtCurrentState"):
+ selfA.StoredVariables["SimulatedObservationAtCurrentState"].store( Hm( xBest ) )
+ selfA.StoredVariables["CostFunctionJ" ].store( qSwarm[iBest,0] )
+ selfA.StoredVariables["CostFunctionJb"].store( qSwarm[iBest,1] )
+ selfA.StoredVariables["CostFunctionJo"].store( qSwarm[iBest,2] )
+ logging.debug("%s Step %i: insect %i is the better one with J =%.7f"%(selfA._name,step,iBest,qSwarm[iBest,0]))
+ #
+ # Obtention de l'analyse
+ # ----------------------
+ Xa = xBest
+ #
+ selfA.StoredVariables["Analysis"].store( Xa )
+ #
+ # Calculs et/ou stockages supplémentaires
+ # ---------------------------------------
+ if selfA._toStore("OMA") or \
+ selfA._toStore("SimulatedObservationAtOptimum"):
+ HXa = Hm(Xa)
+ if selfA._toStore("Innovation") or \
+ selfA._toStore("OMB") or \
+ selfA._toStore("SimulatedObservationAtBackground"):
+ HXb = Hm(Xb)
+ Innovation = Y - HXb
+ if selfA._toStore("Innovation"):
+ selfA.StoredVariables["Innovation"].store( Innovation )
+ if selfA._toStore("OMB"):
+ selfA.StoredVariables["OMB"].store( Innovation )
+ if selfA._toStore("BMA"):
+ selfA.StoredVariables["BMA"].store( numpy.ravel(Xb) - numpy.ravel(Xa) )
+ if selfA._toStore("OMA"):
+ selfA.StoredVariables["OMA"].store( numpy.ravel(Y) - numpy.ravel(HXa) )
+ if selfA._toStore("SimulatedObservationAtBackground"):
+ selfA.StoredVariables["SimulatedObservationAtBackground"].store( HXb )
+ if selfA._toStore("SimulatedObservationAtOptimum"):
+ selfA.StoredVariables["SimulatedObservationAtOptimum"].store( HXa )
+ #
+ selfA._post_run(HO)
+ return 0
+
+# ==============================================================================
+if __name__ == "__main__":
+ print('\n AUTODIAGNOSTIC\n')
--- /dev/null
+# -*- coding: utf-8 -*-
+#
+# Copyright (C) 2008-2023 EDF R&D
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+# See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
+#
+# Author: Jean-Philippe Argaud, jean-philippe.argaud@edf.fr, EDF R&D
+
+__doc__ = """
+ Standard Particle Swarm Optimization
+"""
+__author__ = "Jean-Philippe ARGAUD"
+
+import numpy, logging, copy
+from numpy.random import uniform as rand
+
+# ==============================================================================
+def ecwopso(selfA, Xb, Y, HO, R, B):
+ #
+ if ("BoxBounds" in selfA._parameters) and isinstance(selfA._parameters["BoxBounds"], (list, tuple)) and (len(selfA._parameters["BoxBounds"]) > 0):
+ BoxBounds = selfA._parameters["BoxBounds"]
+ logging.debug("%s Prise en compte des bornes d'incréments de paramètres effectuée"%(selfA._name,))
+ else:
+ raise ValueError("Particle Swarm Optimization requires bounds on all variables increments to be truly given (BoxBounds).")
+ BoxBounds = numpy.array(BoxBounds)
+ if numpy.isnan(BoxBounds).any():
+ raise ValueError("Particle Swarm Optimization requires bounds on all variables increments to be truly given (BoxBounds), \"None\" is not allowed. The actual increments bounds are:\n%s"%BoxBounds)
+ #
+ Hm = HO["Direct"].appliedTo
+ #
+ BI = B.getI()
+ RI = R.getI()
+ #
+ Xini = selfA._parameters["InitializationPoint"]
+ #
+ def CostFunction(x, QualityMeasure="AugmentedWeightedLeastSquares"):
+ _X = numpy.asarray( x ).reshape((-1,1))
+ _HX = numpy.asarray( Hm( _X ) ).reshape((-1,1))
+ _Innovation = Y - _HX
+ #
+ if QualityMeasure in ["AugmentedWeightedLeastSquares","AWLS","DA"]:
+ if BI is None or RI is None:
+ raise ValueError("Background and Observation error covariance matrices has to be properly defined!")
+ Jb = 0.5 * (_X - Xb).T @ (BI @ (_X - Xb))
+ Jo = 0.5 * _Innovation.T @ (RI @ _Innovation)
+ elif QualityMeasure in ["WeightedLeastSquares","WLS"]:
+ if RI is None:
+ raise ValueError("Observation error covariance matrix has to be properly defined!")
+ Jb = 0.
+ Jo = 0.5 * _Innovation.T @ (RI @ _Innovation)
+ elif QualityMeasure in ["LeastSquares","LS","L2"]:
+ Jb = 0.
+ Jo = 0.5 * _Innovation.T @ _Innovation
+ elif QualityMeasure in ["AbsoluteValue","L1"]:
+ Jb = 0.
+ Jo = numpy.sum( numpy.abs(_Innovation) )
+ elif QualityMeasure in ["MaximumError","ME", "Linf"]:
+ Jb = 0.
+ Jo = numpy.max( numpy.abs(_Innovation) )
+ #
+ J = float( Jb ) + float( Jo )
+ #
+ return J, float( Jb ), float( Jo )
+ #
+ def KeepRunningCondition(__step, __nbfct):
+ if __step >= selfA._parameters["MaximumNumberOfIterations"]:
+ logging.debug("%s Stopping search because the number %i of evolving iterations is exceeding the maximum %i."%(selfA._name, __step, selfA._parameters["MaximumNumberOfIterations"]))
+ return False
+ elif __nbfct >= selfA._parameters["MaximumNumberOfFunctionEvaluations"]:
+ logging.debug("%s Stopping search because the number %i of function evaluations is exceeding the maximum %i."%(selfA._name, __nbfct, selfA._parameters["MaximumNumberOfFunctionEvaluations"]))
+ return False
+ else:
+ return True
+ #
+ # Paramètres internes
+ # -------------------
+ __nbI = selfA._parameters["NumberOfInsects"]
+ __nbP = len(Xini) # Dimension ou nombre de paramètres
+ #
+ __iw = float( selfA._parameters["InertiaWeight"] )
+ __sa = float( selfA._parameters["SocialAcceleration"] )
+ __ca = float( selfA._parameters["CognitiveAcceleration"] )
+ __vc = float( selfA._parameters["VelocityClampingFactor"] )
+ logging.debug("%s Cognitive acceleration (recall to the best previously known value of the insect) = %s"%(selfA._name, str(__ca)))
+ logging.debug("%s Social acceleration (recall to the best insect value of the group) = %s"%(selfA._name, str(__sa)))
+ logging.debug("%s Velocity clamping factor = %s"%(selfA._name, str(__vc)))
+ #
+ # Initialisation de l'essaim
+ # --------------------------
+ LimitStep = Xini.reshape((-1,1)) + BoxBounds
+ __dx = __vc * numpy.abs(BoxBounds[:,1]-BoxBounds[:,0])
+ LimitSpeed = numpy.vstack((-__dx,__dx)).T
+ #
+ NumberOfFunctionEvaluations = 1
+ JXini, JbXini, JoXini = CostFunction(Xini,selfA._parameters["QualityCriterion"])
+ #
+ Swarm = numpy.zeros((__nbI,3,__nbP)) # 3 car (x,v,xbest)
+ for __p in range(__nbP) :
+ Swarm[:,0,__p] = rand( low=LimitStep[__p,0], high=LimitStep[__p,1], size=__nbI) # Position
+ Swarm[:,1,__p] = rand( low=LimitSpeed[__p,0], high=LimitSpeed[__p,1], size=__nbI) # Velocity
+ logging.debug("%s Initialisation of the swarm with %i insects of size %i "%(selfA._name,Swarm.shape[0],Swarm.shape[2]))
+ #
+ qSwarm = JXini * numpy.ones((__nbI,3)) # Qualité (J, Jb, Jo) par insecte
+ for __i in range(__nbI):
+ NumberOfFunctionEvaluations += 1
+ JTest, JbTest, JoTest = CostFunction(Swarm[__i,0,:],selfA._parameters["QualityCriterion"])
+ if JTest < JXini:
+ Swarm[__i,2,:] = Swarm[__i,0,:] # xBest
+ qSwarm[__i,:] = (JTest, JbTest, JoTest)
+ else:
+ Swarm[__i,2,:] = Xini # xBest
+ qSwarm[__i,:] = (JXini, JbXini, JoXini)
+ logging.debug("%s Initialisation of the best previous insects"%selfA._name)
+ #
+ iBest = numpy.argmin(qSwarm[:,0])
+ xBest = Swarm[iBest,2,:]
+ if selfA._parameters["StoreInternalVariables"] or selfA._toStore("CurrentState"):
+ selfA.StoredVariables["CurrentState"].store( xBest )
+ selfA.StoredVariables["CostFunctionJ" ].store( qSwarm[iBest,0] )
+ selfA.StoredVariables["CostFunctionJb"].store( qSwarm[iBest,1] )
+ selfA.StoredVariables["CostFunctionJo"].store( qSwarm[iBest,2] )
+ #
+ selfA.StoredVariables["CurrentIterationNumber"].store( len(selfA.StoredVariables["CostFunctionJ"]) )
+ #
+ # Minimisation de la fonctionnelle
+ # --------------------------------
+ step = 0
+ while KeepRunningCondition(step, NumberOfFunctionEvaluations):
+ step += 1
+ for __i in range(__nbI):
+ for __p in range(__nbP):
+ # Vitesse
+ Swarm[__i,1,__p] = __iw * Swarm[__i,1,__p] \
+ + __ca * rand() * (Swarm[__i,2,__p] - Swarm[__i,0,__p]) \
+ + __sa * rand() * (Swarm[iBest,2,__p] - Swarm[__i,0,__p])
+ # Position
+ Swarm[__i,0,__p] = Swarm[__i,0,__p] + Swarm[__i,1,__p]
+ #
+ NumberOfFunctionEvaluations += 1
+ JTest, JbTest, JoTest = CostFunction(Swarm[__i,0,:],selfA._parameters["QualityCriterion"])
+ if JTest < qSwarm[__i,0]:
+ Swarm[__i,2,:] = Swarm[__i,0,:] # xBest
+ qSwarm[__i,:] = (JTest, JbTest, JoTest)
+ #
+ iBest = numpy.argmin(qSwarm[:,0])
+ xBest = Swarm[iBest,2,:]
+ selfA.StoredVariables["CurrentIterationNumber"].store( len(selfA.StoredVariables["CostFunctionJ"]) )
+ if selfA._parameters["StoreInternalVariables"] or selfA._toStore("CurrentState"):
+ selfA.StoredVariables["CurrentState"].store( xBest )
+ if selfA._toStore("SimulatedObservationAtCurrentState"):
+ selfA.StoredVariables["SimulatedObservationAtCurrentState"].store( Hm( xBest ) )
+ selfA.StoredVariables["CostFunctionJ" ].store( qSwarm[iBest,0] )
+ selfA.StoredVariables["CostFunctionJb"].store( qSwarm[iBest,1] )
+ selfA.StoredVariables["CostFunctionJo"].store( qSwarm[iBest,2] )
+ logging.debug("%s Step %i: insect %i is the better one with J =%.7f"%(selfA._name,step,iBest,qSwarm[iBest,0]))
+ #
+ # Obtention de l'analyse
+ # ----------------------
+ Xa = xBest
+ #
+ selfA.StoredVariables["Analysis"].store( Xa )
+ #
+ # Calculs et/ou stockages supplémentaires
+ # ---------------------------------------
+ if selfA._toStore("OMA") or \
+ selfA._toStore("SimulatedObservationAtOptimum"):
+ HXa = Hm(Xa)
+ if selfA._toStore("Innovation") or \
+ selfA._toStore("OMB") or \
+ selfA._toStore("SimulatedObservationAtBackground"):
+ HXb = Hm(Xb)
+ Innovation = Y - HXb
+ if selfA._toStore("Innovation"):
+ selfA.StoredVariables["Innovation"].store( Innovation )
+ if selfA._toStore("OMB"):
+ selfA.StoredVariables["OMB"].store( Innovation )
+ if selfA._toStore("BMA"):
+ selfA.StoredVariables["BMA"].store( numpy.ravel(Xb) - numpy.ravel(Xa) )
+ if selfA._toStore("OMA"):
+ selfA.StoredVariables["OMA"].store( numpy.ravel(Y) - numpy.ravel(HXa) )
+ if selfA._toStore("SimulatedObservationAtBackground"):
+ selfA.StoredVariables["SimulatedObservationAtBackground"].store( HXb )
+ if selfA._toStore("SimulatedObservationAtOptimum"):
+ selfA.StoredVariables["SimulatedObservationAtOptimum"].store( HXa )
+ #
+ selfA._post_run(HO)
+ return 0
+
+# ==============================================================================
+if __name__ == "__main__":
+ print('\n AUTODIAGNOSTIC\n')
import numpy, logging, copy
from daCore import BasicObjects
+from daAlgorithms.Atoms import ecwnpso, ecwopso
# ==============================================================================
class ElementaryAlgorithm(BasicObjects.Algorithm):
def __init__(self):
BasicObjects.Algorithm.__init__(self, "PARTICLESWARMOPTIMIZATION")
+ self.defineRequiredParameter(
+ name = "Variant",
+ default = "PSO",
+ typecast = str,
+ message = "Variant ou formulation de la méthode",
+ listval = [
+ "PSO",
+ "OGCR",
+ ],
+ listadv = [
+ "CanonicalPSO",
+ "SPSO-2011",
+ ],
+ )
self.defineRequiredParameter(
name = "MaximumNumberOfIterations",
default = 50,
minval = -1,
)
self.defineRequiredParameter(
- name = "SwarmVelocity",
+ name = "InertiaWeight",
default = 1.,
typecast = float,
- message = "Vitesse de groupe imposée par l'essaim",
+ message = "Part de la vitesse de l'essaim qui est imposée à l'insecte, ou poids de l'inertie (entre 0 et 1)",
+ minval = 0.,
+ maxval = 1.,
+ oldname = "SwarmVelocity",
+ )
+ self.defineRequiredParameter(
+ name = "CognitiveAcceleration",
+ default = 0.5,
+ typecast = float,
+ message = "Taux de rappel à la meilleure position de l'insecte précédemment connue (entre 0 et 1)",
minval = 0.,
+ maxval = 1.,
)
self.defineRequiredParameter(
- name = "GroupRecallRate",
+ name = "SocialAcceleration",
default = 0.5,
typecast = float,
- message = "Taux de rappel au meilleur insecte du groupe (entre 0 et 1)",
+ message = "Taux de rappel au meilleur insecte du groupe local (entre 0 et 1)",
minval = 0.,
maxval = 1.,
+ oldname = "GroupRecallRate",
+ )
+ self.defineRequiredParameter(
+ name = "VelocityClampingFactor",
+ default = 0.3,
+ typecast = float,
+ message = "Facteur de réduction de l'amplitude de variation des vitesses (entre 0 et 1)",
+ minval = 0.0001,
+ maxval = 1.,
)
self.defineRequiredParameter(
name = "QualityCriterion",
name = "BoxBounds",
message = "Liste des valeurs de bornes d'incréments de paramètres",
)
+ self.defineRequiredParameter( # Pas de type
+ name = "Bounds",
+ message = "Liste des paires de bornes",
+ )
+ self.defineRequiredParameter(
+ name = "InitializationPoint",
+ typecast = numpy.ravel,
+ message = "État initial imposé (par défaut, c'est l'ébauche si None)",
+ )
self.requireInputArguments(
mandatory= ("Xb", "Y", "HO", "R", "B"),
)
def run(self, Xb=None, Y=None, U=None, HO=None, EM=None, CM=None, R=None, B=None, Q=None, Parameters=None):
self._pre_run(Parameters, Xb, Y, U, HO, EM, CM, R, B, Q)
#
- if ("BoxBounds" in self._parameters) and isinstance(self._parameters["BoxBounds"], (list, tuple)) and (len(self._parameters["BoxBounds"]) > 0):
- BoxBounds = self._parameters["BoxBounds"]
- logging.debug("%s Prise en compte des bornes d'incréments de paramètres effectuée"%(self._name,))
- else:
- raise ValueError("Particle Swarm Optimization requires bounds on all variables increments to be truly given (BoxBounds).")
- BoxBounds = numpy.array(BoxBounds)
- if numpy.isnan(BoxBounds).any():
- raise ValueError("Particle Swarm Optimization requires bounds on all variables increments to be truly given (BoxBounds), \"None\" is not allowed. The actual increments bounds are:\n%s"%BoxBounds)
- #
- Phig = float( self._parameters["GroupRecallRate"] )
- Phip = 1. - Phig
- logging.debug("%s Taux de rappel au meilleur insecte du groupe (entre 0 et 1) = %s et à la meilleure position précédente (son complémentaire à 1) = %s"%(self._name, str(Phig), str(Phip)))
+ #--------------------------
+ if self._parameters["Variant"] in ["CanonicalPSO", "PSO"]:
+ ecwnpso.ecwnpso(self, Xb, Y, HO, R, B)
#
- Hm = HO["Direct"].appliedTo
+ elif self._parameters["Variant"] in ["OGCR"]:
+ ecwopso.ecwopso(self, Xb, Y, HO, R, B)
#
- BI = B.getI()
- RI = R.getI()
- #
- def CostFunction(x, QualityMeasure="AugmentedWeightedLeastSquares"):
- _X = numpy.ravel( x ).reshape((-1,1))
- _HX = numpy.ravel( Hm( _X ) ).reshape((-1,1))
- _Innovation = Y - _HX
- #
- if QualityMeasure in ["AugmentedWeightedLeastSquares","AWLS","DA"]:
- if BI is None or RI is None:
- raise ValueError("Background and Observation error covariance matrices has to be properly defined!")
- Jb = 0.5 * (_X - Xb).T @ (BI @ (_X - Xb))
- Jo = 0.5 * _Innovation.T @ (RI @ _Innovation)
- elif QualityMeasure in ["WeightedLeastSquares","WLS"]:
- if RI is None:
- raise ValueError("Observation error covariance matrix has to be properly defined!")
- Jb = 0.
- Jo = 0.5 * _Innovation.T @ (RI @ _Innovation)
- elif QualityMeasure in ["LeastSquares","LS","L2"]:
- Jb = 0.
- Jo = 0.5 * _Innovation.T @ _Innovation
- elif QualityMeasure in ["AbsoluteValue","L1"]:
- Jb = 0.
- Jo = numpy.sum( numpy.abs(_Innovation) )
- elif QualityMeasure in ["MaximumError","ME", "Linf"]:
- Jb = 0.
- Jo = numpy.max( numpy.abs(_Innovation) )
- #
- J = float( Jb ) + float( Jo )
- #
- return J
- #
- if Xb is not None:
- Xini = numpy.ravel(Xb)
+ #--------------------------
else:
- Xini = numpy.zeros(len(BoxBounds[:,0]))
- #
- SpaceUp = BoxBounds[:,1] + Xini
- SpaceLow = BoxBounds[:,0] + Xini
- nbparam = len(SpaceUp)
- #
- # Initialisation de l'essaim
- # --------------------------
- NumberOfFunctionEvaluations = 0
- LimitVelocity = numpy.abs(SpaceUp-SpaceLow)
- #
- PosInsect = []
- VelocityInsect = []
- for i in range(nbparam) :
- PosInsect.append(numpy.random.uniform(low=SpaceLow[i], high=SpaceUp[i], size=self._parameters["NumberOfInsects"]))
- VelocityInsect.append(numpy.random.uniform(low=-LimitVelocity[i], high=LimitVelocity[i], size=self._parameters["NumberOfInsects"]))
- VelocityInsect = numpy.array(VelocityInsect)
- PosInsect = numpy.array(PosInsect)
- #
- BestPosInsect = numpy.array(PosInsect)
- qBestPosInsect = []
- _Best = copy.copy(SpaceLow)
- _qualityBest = CostFunction(_Best,self._parameters["QualityCriterion"])
- NumberOfFunctionEvaluations += 1
- #
- for i in range(self._parameters["NumberOfInsects"]):
- insect = numpy.ravel(PosInsect[:,i])
- quality = CostFunction(insect,self._parameters["QualityCriterion"])
- NumberOfFunctionEvaluations += 1
- qBestPosInsect.append(quality)
- if quality < _qualityBest:
- _Best = copy.copy( insect )
- _qualityBest = copy.copy( quality )
- logging.debug("%s Initialisation, Insecte = %s, Qualité = %s"%(self._name, str(_Best), str(_qualityBest)))
- #
- self.StoredVariables["CurrentIterationNumber"].store( len(self.StoredVariables["CostFunctionJ"]) )
- if self._parameters["StoreInternalVariables"] or self._toStore("CurrentState"):
- self.StoredVariables["CurrentState"].store( _Best )
- self.StoredVariables["CostFunctionJb"].store( 0. )
- self.StoredVariables["CostFunctionJo"].store( 0. )
- self.StoredVariables["CostFunctionJ" ].store( _qualityBest )
- #
- # Minimisation de la fonctionnelle
- # --------------------------------
- for n in range(self._parameters["MaximumNumberOfIterations"]):
- for i in range(self._parameters["NumberOfInsects"]) :
- insect = numpy.ravel(PosInsect[:,i])
- rp = numpy.random.uniform(size=nbparam)
- rg = numpy.random.uniform(size=nbparam)
- for j in range(nbparam) :
- VelocityInsect[j,i] = self._parameters["SwarmVelocity"]*VelocityInsect[j,i] + Phip*rp[j]*(BestPosInsect[j,i]-PosInsect[j,i]) + Phig*rg[j]*(_Best[j]-PosInsect[j,i])
- PosInsect[j,i] = PosInsect[j,i]+VelocityInsect[j,i]
- quality = CostFunction(insect,self._parameters["QualityCriterion"])
- NumberOfFunctionEvaluations += 1
- if quality < qBestPosInsect[i]:
- BestPosInsect[:,i] = copy.copy( insect )
- qBestPosInsect[i] = copy.copy( quality )
- if quality < _qualityBest :
- _Best = copy.copy( insect )
- _qualityBest = copy.copy( quality )
- logging.debug("%s Etape %i, Insecte = %s, Qualité = %s"%(self._name, n, str(_Best), str(_qualityBest)))
- #
- self.StoredVariables["CurrentIterationNumber"].store( len(self.StoredVariables["CostFunctionJ"]) )
- if self._parameters["StoreInternalVariables"] or self._toStore("CurrentState"):
- self.StoredVariables["CurrentState"].store( _Best )
- if self._toStore("SimulatedObservationAtCurrentState"):
- _HmX = Hm( _Best )
- self.StoredVariables["SimulatedObservationAtCurrentState"].store( _HmX )
- self.StoredVariables["CostFunctionJb"].store( 0. )
- self.StoredVariables["CostFunctionJo"].store( 0. )
- self.StoredVariables["CostFunctionJ" ].store( _qualityBest )
- if NumberOfFunctionEvaluations > self._parameters["MaximumNumberOfFunctionEvaluations"]:
- logging.debug("%s Stopping search because the number %i of function evaluations is exceeding the maximum %i."%(self._name, NumberOfFunctionEvaluations, self._parameters["MaximumNumberOfFunctionEvaluations"]))
- break
- #
- # Obtention de l'analyse
- # ----------------------
- Xa = _Best
- #
- self.StoredVariables["Analysis"].store( Xa )
- #
- # Calculs et/ou stockages supplémentaires
- # ---------------------------------------
- if self._toStore("OMA") or \
- self._toStore("SimulatedObservationAtOptimum"):
- HXa = Hm(Xa)
- if self._toStore("Innovation") or \
- self._toStore("OMB") or \
- self._toStore("SimulatedObservationAtBackground"):
- HXb = Hm(Xb)
- Innovation = Y - HXb
- if self._toStore("Innovation"):
- self.StoredVariables["Innovation"].store( Innovation )
- if self._toStore("OMB"):
- self.StoredVariables["OMB"].store( Innovation )
- if self._toStore("BMA"):
- self.StoredVariables["BMA"].store( numpy.ravel(Xb) - numpy.ravel(Xa) )
- if self._toStore("OMA"):
- self.StoredVariables["OMA"].store( numpy.ravel(Y) - numpy.ravel(HXa) )
- if self._toStore("SimulatedObservationAtBackground"):
- self.StoredVariables["SimulatedObservationAtBackground"].store( HXb )
- if self._toStore("SimulatedObservationAtOptimum"):
- self.StoredVariables["SimulatedObservationAtOptimum"].store( HXa )
+ raise ValueError("Error in Variant name: %s"%self._parameters["Variant"])
#
self._post_run(HO)
return 0
The documentation of the module is also covered by the license and the
requirement of quoting.
"""
+__author__ = "Jean-Philippe ARGAUD"
+__all__ = ["adaoBuilder"]
import os, sys, logging
adao_py_dir = os.path.abspath(os.path.dirname(__file__))