name = "Bounds",
message = "Liste des valeurs de bornes",
)
+ self.defineRequiredParameter(
+ name = "InitializationPoint",
+ typecast = numpy.ravel,
+ message = "État initial imposé (par défaut, c'est l'ébauche si None)",
+ )
self.requireInputArguments(
mandatory= ("Xb", "Y", "HO" ),
)
# Utilisation éventuelle d'un vecteur H(Xb) précalculé
# ----------------------------------------------------
if HO["AppliedInX"] is not None and "HXb" in HO["AppliedInX"]:
- HXb = Hm( Xb, HO["AppliedInX"]["HXb"])
+ HXb = Hm( Xb, HO["AppliedInX"]["HXb"] )
else:
HXb = Hm( Xb )
HXb = numpy.asmatrix(numpy.ravel( HXb )).T
- #
- # Calcul de l'innovation
- # ----------------------
if Y.size != HXb.size:
raise ValueError("The size %i of observations Y and %i of observed calculation H(X) are different, they have to be identical."%(Y.size,HXb.size))
if max(Y.shape) != max(HXb.shape):
# ------------------------------
def CostFunction(x):
_X = numpy.asmatrix(numpy.ravel( x )).T
- if self._parameters["StoreInternalVariables"] or self._toStore("CurrentState"):
+ if self._parameters["StoreInternalVariables"] or \
+ self._toStore("CurrentState"):
self.StoredVariables["CurrentState"].store( _X )
_HX = Hm( _X )
_HX = numpy.asmatrix(numpy.ravel( _HX )).T
Hg = HO["Tangent"].asMatrix( _X )
return Hg
#
- # Point de démarrage de l'optimisation : Xini = Xb
+ # Point de démarrage de l'optimisation
# ------------------------------------
- if isinstance(Xb, type(numpy.matrix([]))):
- Xini = Xb.A1.tolist()
+ if self._parameters["InitializationPoint"] is not None:
+ __ipt = numpy.ravel(self._parameters["InitializationPoint"])
+ if __ipt.size != numpy.ravel(Xb).size:
+ raise ValueError("Incompatible size %i of forced initial point to replace the Xb of size %i" \
+ %(__ipt.size,numpy.ravel(Xb).size))
+ else:
+ Xini = __ipt
else:
- Xini = list(Xb)
+ Xini = numpy.ravel(Xb)
#
# Minimisation de la fonctionnelle
# --------------------------------
#
if self._toStore("OMA") or \
self._toStore("SimulatedObservationAtOptimum"):
- HXa = Hm(Xa)
+ HXa = Hm( Xa )
#
# Calculs et/ou stockages supplémentaires
# ---------------------------------------
if self._toStore("Innovation"):
self.StoredVariables["Innovation"].store( numpy.ravel(d) )
if self._toStore("BMA"):
- self.StoredVariables["BMA"].store( numpy.ravel(Xb - Xa) )
+ self.StoredVariables["BMA"].store( numpy.ravel(Xb) - numpy.ravel(Xa) )
if self._toStore("OMA"):
- self.StoredVariables["OMA"].store( numpy.ravel(Y - HXa) )
+ self.StoredVariables["OMA"].store( numpy.ravel(Y) - numpy.ravel(HXa) )
if self._toStore("OMB"):
self.StoredVariables["OMB"].store( numpy.ravel(d) )
if self._toStore("SimulatedObservationAtBackground"):
def clearCache(self):
"Vide le cache"
- self.__listOPCV = [] # Previous Calculated Points, Results, Point Norms, Operator
+ self.__listOPCV = []
self.__seenNames = []
- # logging.debug("CM Tolerance de determination des doublons : %.2e", self.__tolerBP)
- def wasCalculatedIn(self, xValue, oName="" ): #, info="" ):
+ def wasCalculatedIn(self, xValue, oName="" ):
"Vérifie l'existence d'un calcul correspondant à la valeur"
__alc = False
__HxV = None
if self.__enabled:
for i in range(min(len(self.__listOPCV),self.__lenghtOR)-1,-1,-1):
- if not hasattr(xValue, 'size') or (str(oName) != self.__listOPCV[i][3]) or (xValue.size != self.__listOPCV[i][0].size):
- # logging.debug("CM Différence de la taille %s de X et de celle %s du point %i déjà calculé", xValue.shape,i,self.__listOPCP[i].shape)
+ if not hasattr(xValue, 'size'):
pass
- elif numpy.linalg.norm(numpy.ravel(xValue) - self.__listOPCV[i][0]) < self.__tolerBP * self.__listOPCV[i][2]:
+ elif (str(oName) != self.__listOPCV[i][3]):
+ pass
+ elif (xValue.size != self.__listOPCV[i][0].size):
+ pass
+ elif (numpy.ravel(xValue)[0] - self.__listOPCV[i][0][0]) > (self.__tolerBP * self.__listOPCV[i][2] / self.__listOPCV[i][0].size):
+ pass
+ elif numpy.linalg.norm(numpy.ravel(xValue) - self.__listOPCV[i][0]) < (self.__tolerBP * self.__listOPCV[i][2]):
__alc = True
__HxV = self.__listOPCV[i][1]
- # logging.debug("CM Cas%s déja calculé, portant le numéro %i", info, i)
break
return __alc, __HxV
def storeValueInX(self, xValue, HxValue, oName="" ):
"Stocke pour un opérateur o un calcul Hx correspondant à la valeur x"
if self.__lenghtOR < 0:
- self.__lenghtOR = 2 * xValue.size + 2
+ self.__lenghtOR = 2 * min(xValue.size, 50) + 2 # 2 * xValue.size + 2
self.__initlnOR = self.__lenghtOR
self.__seenNames.append(str(oName))
if str(oName) not in self.__seenNames: # Etend la liste si nouveau
- self.__lenghtOR += 2 * xValue.size + 2
+ self.__lenghtOR += 2 * min(xValue.size, 50) + 2 # 2 * xValue.size + 2
self.__initlnOR += self.__lenghtOR
self.__seenNames.append(str(oName))
while len(self.__listOPCV) > self.__lenghtOR:
- # logging.debug("CM Réduction de la liste des cas à %i éléments par suppression du premier", self.__lenghtOR)
self.__listOPCV.pop(0)
self.__listOPCV.append( (
- copy.copy(numpy.ravel(xValue)),
- copy.copy(HxValue),
- numpy.linalg.norm(xValue),
- str(oName),
+ copy.copy(numpy.ravel(xValue)), # 0 Previous point
+ copy.copy(HxValue), # 1 Previous value
+ numpy.linalg.norm(xValue), # 2 Norm
+ str(oName), # 3 Operator name
) )
def disable(self):