# -*- coding: utf-8 -*-
#
-# Copyright (C) 2008-2018 EDF R&D
+# Copyright (C) 2008-2023 EDF R&D
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
#
# Author: Jean-Philippe Argaud, jean-philippe.argaud@edf.fr, EDF R&D
-import logging
+import numpy, logging, copy
from daCore import BasicObjects
-import numpy, copy
+from daAlgorithms.Atoms import ecwnpso, ecwopso, ecwapso, ecwspso, ecwpspso
# ==============================================================================
class ElementaryAlgorithm(BasicObjects.Algorithm):
def __init__(self):
BasicObjects.Algorithm.__init__(self, "PARTICLESWARMOPTIMIZATION")
self.defineRequiredParameter(
- name = "MaximumNumberOfSteps",
+ name = "Variant",
+ default = "CanonicalPSO",
+ typecast = str,
+ message = "Variant ou formulation de la méthode",
+ listval = [
+ "CanonicalPSO",
+ "OGCR",
+ "SPSO-2011",
+ "SPSO-2011-AIS",
+ "SPSO-2011-SIS",
+ "SPSO-2011-PSIS",
+ ],
+ listadv = [
+ "PSO",
+ ],
+ )
+ self.defineRequiredParameter(
+ name = "MaximumNumberOfIterations",
default = 50,
typecast = int,
message = "Nombre maximal de pas d'optimisation",
minval = 0,
+ oldname = "MaximumNumberOfSteps",
)
self.defineRequiredParameter(
name = "MaximumNumberOfFunctionEvaluations",
)
self.defineRequiredParameter(
name = "NumberOfInsects",
- default = 100,
+ default = 40,
typecast = int,
message = "Nombre d'insectes dans l'essaim",
minval = -1,
)
self.defineRequiredParameter(
- name = "SwarmVelocity",
- default = 1.,
+ name = "SwarmTopology",
+ default = "FullyConnectedNeighborhood",
+ typecast = str,
+ message = "Mode de définition du voisinage de chaque particule",
+ listval = [
+ "FullyConnectedNeighborhood", "FullyConnectedNeighbourhood", "gbest",
+ "RingNeighborhoodWithRadius1", "RingNeighbourhoodWithRadius1", "lbest",
+ "RingNeighborhoodWithRadius2", "RingNeighbourhoodWithRadius2",
+ "AdaptativeRandomWith3Neighbors", "AdaptativeRandomWith3Neighbours", "abest",
+ "AdaptativeRandomWith5Neighbors", "AdaptativeRandomWith5Neighbours",
+ ],
+ listadv = [
+ "VonNeumannNeighborhood", "VonNeumannNeighbourhood",
+ ],
+ )
+ self.defineRequiredParameter(
+ name = "InertiaWeight",
+ default = 0.72135, # 1/(2*ln(2))
+ typecast = float,
+ message = "Part de la vitesse de l'essaim qui est imposée à l'insecte, ou poids de l'inertie (entre 0 et 1)",
+ minval = 0.,
+ maxval = 1.,
+ oldname = "SwarmVelocity",
+ )
+ self.defineRequiredParameter(
+ name = "CognitiveAcceleration",
+ default = 1.19315, # 1/2+ln(2)
typecast = float,
- message = "Vitesse de groupe imposée par l'essaim",
+ message = "Taux de rappel à la meilleure position de l'insecte précédemment connue (positif)",
minval = 0.,
)
self.defineRequiredParameter(
- name = "GroupRecallRate",
- default = 0.5,
+ name = "SocialAcceleration",
+ default = 1.19315, # 1/2+ln(2)
typecast = float,
- message = "Taux de rappel au meilleur insecte du groupe (entre 0 et 1)",
+ message = "Taux de rappel au meilleur insecte du groupe local (positif)",
minval = 0.,
+ oldname = "GroupRecallRate",
+ )
+ self.defineRequiredParameter(
+ name = "VelocityClampingFactor",
+ default = 0.3,
+ typecast = float,
+ message = "Facteur de réduction de l'amplitude de variation des vitesses (entre 0 et 1)",
+ minval = 0.0001,
maxval = 1.,
)
self.defineRequiredParameter(
default = "AugmentedWeightedLeastSquares",
typecast = str,
message = "Critère de qualité utilisé",
- listval = ["AugmentedWeightedLeastSquares","AWLS","AugmentedPonderatedLeastSquares","APLS","DA",
- "WeightedLeastSquares","WLS","PonderatedLeastSquares","PLS",
- "LeastSquares","LS","L2",
- "AbsoluteValue","L1",
- "MaximumError","ME"],
+ listval = [
+ "AugmentedWeightedLeastSquares", "AWLS", "DA",
+ "WeightedLeastSquares", "WLS",
+ "LeastSquares", "LS", "L2",
+ "AbsoluteValue", "L1",
+ "MaximumError", "ME", "Linf",
+ ],
)
self.defineRequiredParameter(
name = "StoreInternalVariables",
typecast = tuple,
message = "Liste de calculs supplémentaires à stocker et/ou effectuer",
listval = [
+ "Analysis",
"BMA",
- "OMA",
- "OMB",
- "CurrentState",
"CostFunctionJ",
"CostFunctionJb",
"CostFunctionJo",
+ "CurrentIterationNumber",
+ "CurrentState",
"Innovation",
+ "InternalCostFunctionJ",
+ "InternalCostFunctionJb",
+ "InternalCostFunctionJo",
+ "InternalStates",
+ "OMA",
+ "OMB",
"SimulatedObservationAtBackground",
"SimulatedObservationAtCurrentState",
"SimulatedObservationAtOptimum",
]
)
+ self.defineRequiredParameter( # Pas de type
+ name = "Bounds",
+ message = "Liste des paires de bornes",
+ )
self.defineRequiredParameter( # Pas de type
name = "BoxBounds",
- message = "Liste des valeurs de bornes d'incréments de paramètres",
+ message = "Liste des paires de bornes d'incréments",
+ )
+ self.defineRequiredParameter(
+ name = "InitializationPoint",
+ typecast = numpy.ravel,
+ message = "État initial imposé (par défaut, c'est l'ébauche si None)",
)
self.requireInputArguments(
mandatory= ("Xb", "Y", "HO", "R", "B"),
)
+ self.setAttributes(tags=(
+ "Optimization",
+ "NonLinear",
+ "MetaHeuristic",
+ "Population",
+ ))
def run(self, Xb=None, Y=None, U=None, HO=None, EM=None, CM=None, R=None, B=None, Q=None, Parameters=None):
- self._pre_run(Parameters, Xb, Y, R, B, Q)
+ self._pre_run(Parameters, Xb, Y, U, HO, EM, CM, R, B, Q)
#
- if ("BoxBounds" in self._parameters) and isinstance(self._parameters["BoxBounds"], (list, tuple)) and (len(self._parameters["BoxBounds"]) > 0):
- BoxBounds = self._parameters["BoxBounds"]
- logging.debug("%s Prise en compte des bornes d'incréments de paramètres effectuee"%(self._name,))
- else:
- raise ValueError("Particle Swarm Optimization requires bounds on all variables to be given.")
- BoxBounds = numpy.array(BoxBounds)
- if numpy.isnan(BoxBounds).any():
- raise ValueError("Particle Swarm Optimization requires bounds on all variables increments to be truly given, \"None\" is not allowed. The actual increments bounds are:\n%s"%BoxBounds)
+ #--------------------------
+ if self._parameters["Variant"] in ["CanonicalPSO", "PSO"]:
+ ecwnpso.ecwnpso(self, Xb, Y, HO, R, B)
#
- Phig = float( self._parameters["GroupRecallRate"] )
- Phip = 1. - Phig
- logging.debug("%s Taux de rappel au meilleur insecte du groupe (entre 0 et 1) = %s et à la meilleure position précédente (son complémentaire à 1) = %s"%(self._name, str(Phig), str(Phip)))
+ elif self._parameters["Variant"] in ["OGCR"]:
+ ecwopso.ecwopso(self, Xb, Y, HO, R, B)
#
- # Opérateur d'observation
- # -----------------------
- Hm = HO["Direct"].appliedTo
+ # Default SPSO-2011 = SPSO-2011-AIS
+ elif self._parameters["Variant"] in ["SPSO-2011", "SPSO-2011-AIS"]:
+ ecwapso.ecwapso(self, Xb, Y, HO, R, B)
#
- # Précalcul des inversions de B et R
- # ----------------------------------
- BI = B.getI()
- RI = R.getI()
+ elif self._parameters["Variant"] in ["SPSO-2011-SIS"]:
+ ecwspso.ecwspso(self, Xb, Y, HO, R, B)
#
- # Définition de la fonction-coût
- # ------------------------------
- def CostFunction(x, QualityMeasure="AugmentedWeightedLeastSquares"):
- _X = numpy.asmatrix(numpy.ravel( x )).T
- _HX = Hm( _X )
- _HX = numpy.asmatrix(numpy.ravel( _HX )).T
- #
- if QualityMeasure in ["AugmentedWeightedLeastSquares","AWLS","AugmentedPonderatedLeastSquares","APLS","DA"]:
- if BI is None or RI is None:
- raise ValueError("Background and Observation error covariance matrix has to be properly defined!")
- Jb = 0.5 * (_X - Xb).T * BI * (_X - Xb)
- Jo = 0.5 * (Y - _HX).T * RI * (Y - _HX)
- elif QualityMeasure in ["WeightedLeastSquares","WLS","PonderatedLeastSquares","PLS"]:
- if RI is None:
- raise ValueError("Observation error covariance matrix has to be properly defined!")
- Jb = 0.
- Jo = 0.5 * (Y - _HX).T * RI * (Y - _HX)
- elif QualityMeasure in ["LeastSquares","LS","L2"]:
- Jb = 0.
- Jo = 0.5 * (Y - _HX).T * (Y - _HX)
- elif QualityMeasure in ["AbsoluteValue","L1"]:
- Jb = 0.
- Jo = numpy.sum( numpy.abs(Y - _HX) )
- elif QualityMeasure in ["MaximumError","ME"]:
- Jb = 0.
- Jo = numpy.max( numpy.abs(Y - _HX) )
- #
- J = float( Jb ) + float( Jo )
- #
- return J
+ elif self._parameters["Variant"] in ["SPSO-2011-PSIS"]:
+ ecwpspso.ecwpspso(self, Xb, Y, HO, R, B)
#
- # Point de démarrage de l'optimisation : Xini = Xb
- # ------------------------------------
- if isinstance(Xb, type(numpy.matrix([]))):
- Xini = Xb.A1.tolist()
- elif Xb is not None:
- Xini = list(Xb)
+ #--------------------------
else:
- Xini = numpy.zeros(len(BoxBounds[:,0]))
- #
- # Initialisation des bornes
- # -------------------------
- SpaceUp = BoxBounds[:,1] + Xini
- SpaceLow = BoxBounds[:,0] + Xini
- nbparam = len(SpaceUp)
- #
- # Initialisation de l'essaim
- # --------------------------
- NumberOfFunctionEvaluations = 0
- LimitVelocity = numpy.abs(SpaceUp-SpaceLow)
- #
- PosInsect = []
- VelocityInsect = []
- for i in range(nbparam) :
- PosInsect.append(numpy.random.uniform(low=SpaceLow[i], high=SpaceUp[i], size=self._parameters["NumberOfInsects"]))
- VelocityInsect.append(numpy.random.uniform(low=-LimitVelocity[i], high=LimitVelocity[i], size=self._parameters["NumberOfInsects"]))
- VelocityInsect = numpy.matrix(VelocityInsect)
- PosInsect = numpy.matrix(PosInsect)
- #
- BestPosInsect = numpy.array(PosInsect)
- qBestPosInsect = []
- Best = copy.copy(SpaceLow)
- qBest = CostFunction(Best,self._parameters["QualityCriterion"])
- NumberOfFunctionEvaluations += 1
- #
- for i in range(self._parameters["NumberOfInsects"]):
- insect = numpy.ravel(PosInsect[:,i])
- quality = CostFunction(insect,self._parameters["QualityCriterion"])
- NumberOfFunctionEvaluations += 1
- qBestPosInsect.append(quality)
- if quality < qBest:
- Best = copy.copy( insect )
- qBest = copy.copy( quality )
- logging.debug("%s Initialisation, Insecte = %s, Qualité = %s"%(self._name, str(Best), str(qBest)))
- #
- if self._parameters["StoreInternalVariables"] or "CurrentState" in self._parameters["StoreSupplementaryCalculations"]:
- self.StoredVariables["CurrentState"].store( Best )
- self.StoredVariables["CostFunctionJb"].store( 0. )
- self.StoredVariables["CostFunctionJo"].store( 0. )
- self.StoredVariables["CostFunctionJ" ].store( qBest )
- #
- # Minimisation de la fonctionnelle
- # --------------------------------
- for n in range(self._parameters["MaximumNumberOfSteps"]):
- for i in range(self._parameters["NumberOfInsects"]) :
- insect = numpy.ravel(PosInsect[:,i])
- rp = numpy.random.uniform(size=nbparam)
- rg = numpy.random.uniform(size=nbparam)
- for j in range(nbparam) :
- VelocityInsect[j,i] = self._parameters["SwarmVelocity"]*VelocityInsect[j,i] + Phip*rp[j]*(BestPosInsect[j,i]-PosInsect[j,i]) + Phig*rg[j]*(Best[j]-PosInsect[j,i])
- PosInsect[j,i] = PosInsect[j,i]+VelocityInsect[j,i]
- quality = CostFunction(insect,self._parameters["QualityCriterion"])
- NumberOfFunctionEvaluations += 1
- if quality < qBestPosInsect[i]:
- BestPosInsect[:,i] = copy.copy( insect )
- qBestPosInsect[i] = copy.copy( quality )
- if quality < qBest :
- Best = copy.copy( insect )
- qBest = copy.copy( quality )
- logging.debug("%s Etape %i, Insecte = %s, Qualité = %s"%(self._name, n, str(Best), str(qBest)))
- #
- if self._parameters["StoreInternalVariables"] or "CurrentState" in self._parameters["StoreSupplementaryCalculations"]:
- self.StoredVariables["CurrentState"].store( Best )
- if "SimulatedObservationAtCurrentState" in self._parameters["StoreSupplementaryCalculations"]:
- _HmX = Hm( numpy.asmatrix(numpy.ravel( Best )).T )
- _HmX = numpy.asmatrix(numpy.ravel( _HmX )).T
- self.StoredVariables["SimulatedObservationAtCurrentState"].store( _HmX )
- self.StoredVariables["CostFunctionJb"].store( 0. )
- self.StoredVariables["CostFunctionJo"].store( 0. )
- self.StoredVariables["CostFunctionJ" ].store( qBest )
- if NumberOfFunctionEvaluations > self._parameters["MaximumNumberOfFunctionEvaluations"]:
- logging.debug("%s Stopping search because the number %i of function evaluations is exceeding the maximum %i."%(self._name, NumberOfFunctionEvaluations, self._parameters["MaximumNumberOfFunctionEvaluations"]))
- break
- #
- # Obtention de l'analyse
- # ----------------------
- Xa = numpy.asmatrix(numpy.ravel( Best )).T
- #
- self.StoredVariables["Analysis"].store( Xa.A1 )
- #
- if "Innovation" in self._parameters["StoreSupplementaryCalculations"] or \
- "OMB" in self._parameters["StoreSupplementaryCalculations"] or \
- "SimulatedObservationAtBackground" in self._parameters["StoreSupplementaryCalculations"]:
- HXb = Hm(Xb)
- d = Y - HXb
- if "OMA" in self._parameters["StoreSupplementaryCalculations"] or \
- "SimulatedObservationAtOptimum" in self._parameters["StoreSupplementaryCalculations"]:
- HXa = Hm(Xa)
- #
- # Calculs et/ou stockages supplémentaires
- # ---------------------------------------
- if "Innovation" in self._parameters["StoreSupplementaryCalculations"]:
- self.StoredVariables["Innovation"].store( numpy.ravel(d) )
- if "BMA" in self._parameters["StoreSupplementaryCalculations"]:
- self.StoredVariables["BMA"].store( numpy.ravel(Xb) - numpy.ravel(Xa) )
- if "OMA" in self._parameters["StoreSupplementaryCalculations"]:
- self.StoredVariables["OMA"].store( numpy.ravel(Y) - numpy.ravel(HXa) )
- if "OMB" in self._parameters["StoreSupplementaryCalculations"]:
- self.StoredVariables["OMB"].store( numpy.ravel(d) )
- if "SimulatedObservationAtBackground" in self._parameters["StoreSupplementaryCalculations"]:
- self.StoredVariables["SimulatedObservationAtBackground"].store( numpy.ravel(HXb) )
- if "SimulatedObservationAtOptimum" in self._parameters["StoreSupplementaryCalculations"]:
- self.StoredVariables["SimulatedObservationAtOptimum"].store( numpy.ravel(HXa) )
+ raise ValueError("Error in Variant name: %s"%self._parameters["Variant"])
#
self._post_run(HO)
return 0
# ==============================================================================
if __name__ == "__main__":
- print('\n AUTODIAGNOSTIC \n')
+ print("\n AUTODIAGNOSTIC\n")