default = [],
typecast = tuple,
message = "Liste de calculs supplémentaires à stocker et/ou effectuer",
- listval = ["APosterioriCovariance", "Innovation"]
+ listval = ["APosterioriCovariance", "CostFunctionJ", "Innovation"]
+ )
+ self.defineRequiredParameter(
+ name = "EstimationType",
+ default = "State",
+ typecast = str,
+ message = "Estimation d'etat ou de parametres",
+ listval = ["State", "Parameters"],
)
def run(self, Xb=None, Y=None, U=None, HO=None, EM=None, CM=None, R=None, B=None, Q=None, Parameters=None):
if R is None:
raise ValueError("Observation error covariance matrix has to be properly defined!")
#
- H = HO["Direct"].appliedTo
+ H = HO["Direct"].appliedControledFormTo
#
- M = EM["Direct"].appliedTo
+ M = EM["Direct"].appliedControledFormTo
#
# Nombre de pas du Kalman identique au nombre de pas d'observations
# -----------------------------------------------------------------
- duration = Y.stepnumber()
+ if hasattr(Y,"stepnumber"):
+ duration = Y.stepnumber()
+ else:
+ duration = 2
+ #
+ # Précalcul des inversions de B et R
+ # ----------------------------------
+ if "CostFunctionJ" in self._parameters["StoreSupplementaryCalculations"]:
+ if B is not None:
+ BI = B.I
+ elif self._parameters["B_scalar"] is not None:
+ BI = 1.0 / self._parameters["B_scalar"]
+ #
+ if R is not None:
+ RI = R.I
+ elif self._parameters["R_scalar"] is not None:
+ RI = 1.0 / self._parameters["R_scalar"]
#
# Initialisation
# --------------
self.StoredVariables["APosterioriCovariance"].store( Pn )
#
for step in range(duration-1):
- Ynpu = numpy.asmatrix(numpy.ravel( Y[step+1] )).T
+ if hasattr(Y,"store"):
+ Ynpu = numpy.asmatrix(numpy.ravel( Y[step+1] )).T
+ else:
+ Ynpu = numpy.asmatrix(numpy.ravel( Y )).T
#
Ht = HO["Tangent"].asMatrix(ValueForMethodForm = Xn)
Ht = Ht.reshape(Ynpu.size,Xn.size) # ADAO & check shape
else:
Un = None
#
- Xn_predicted = M( (Xn, Un) )
+ if self._parameters["EstimationType"] == "State":
+ Xn_predicted = numpy.asmatrix(numpy.ravel( M( (Xn, Un) ) )).T
+ else:
+ Xn_predicted = numpy.asmatrix(numpy.ravel( M( (Xn, None) ) )).T
Pn_predicted = Mt * Pn * Ma + Q
#
- d = Ynpu - H( Xn_predicted )
+ if self._parameters["EstimationType"] == "Parameters":
+ d = Ynpu - numpy.asmatrix(numpy.ravel( H( (Xn_predicted, Un) ) )).T
+ else:
+ d = Ynpu - numpy.asmatrix(numpy.ravel( H( (Xn_predicted, None) ) )).T
K = Pn_predicted * Ha * (Ht * Pn_predicted * Ha + R).I
Xn = Xn_predicted + K * d
Pn = Pn_predicted - K * Ht * Pn_predicted
#
self.StoredVariables["Analysis"].store( Xn.A1 )
+ #
if "Innovation" in self._parameters["StoreSupplementaryCalculations"]:
self.StoredVariables["Innovation"].store( numpy.ravel( d.A1 ) )
if "APosterioriCovariance" in self._parameters["StoreSupplementaryCalculations"]:
self.StoredVariables["APosterioriCovariance"].store( Pn )
+ if "CostFunctionJ" in self._parameters["StoreSupplementaryCalculations"]:
+ Jb = 0.5 * (Xn - Xb).T * BI * (Xn - Xb)
+ Jo = 0.5 * d.T * RI * d
+ J = float( Jb ) + float( Jo )
+ self.StoredVariables["CostFunctionJb"].store( Jb )
+ self.StoredVariables["CostFunctionJo"].store( Jo )
+ self.StoredVariables["CostFunctionJ" ].store( J )
#
logging.debug("%s Taille mémoire utilisée de %.1f Mo"%(self._name, m.getUsedMemory("M")))
logging.debug("%s Terminé"%self._name)
default = [],
typecast = tuple,
message = "Liste de calculs supplémentaires à stocker et/ou effectuer",
- listval = ["APosterioriCovariance", "Innovation"]
+ listval = ["APosterioriCovariance", "CostFunctionJ", "Innovation"]
+ )
+ self.defineRequiredParameter(
+ name = "EstimationType",
+ default = "State",
+ typecast = str,
+ message = "Estimation d'etat ou de parametres",
+ listval = ["State", "Parameters"],
)
def run(self, Xb=None, Y=None, U=None, HO=None, EM=None, CM=None, R=None, B=None, Q=None, Parameters=None):
#
# Nombre de pas du Kalman identique au nombre de pas d'observations
# -----------------------------------------------------------------
- duration = Y.stepnumber()
+ if hasattr(Y,"stepnumber"):
+ duration = Y.stepnumber()
+ else:
+ duration = 2
+ #
+ # Précalcul des inversions de B et R
+ # ----------------------------------
+ if "CostFunctionJ" in self._parameters["StoreSupplementaryCalculations"]:
+ if B is not None:
+ BI = B.I
+ elif self._parameters["B_scalar"] is not None:
+ BI = 1.0 / self._parameters["B_scalar"]
+ #
+ if R is not None:
+ RI = R.I
+ elif self._parameters["R_scalar"] is not None:
+ RI = 1.0 / self._parameters["R_scalar"]
#
# Initialisation
# --------------
for step in range(duration-1):
Ynpu = numpy.asmatrix(numpy.ravel( Y[step+1] )).T
#
- if Cm is not None:
+ if U is not None:
if hasattr(U,"store") and len(U)>1:
Un = numpy.asmatrix(numpy.ravel( U[step] )).T
elif hasattr(U,"store") and len(U)==1:
Un = numpy.asmatrix(numpy.ravel( U[0] )).T
else:
Un = numpy.asmatrix(numpy.ravel( U )).T
- #
- Xn_predicted = Mt * Xn + Cm * Un
else:
Un = None
- #
+ #
+ if self._parameters["EstimationType"] == "State" and Cm is not None and Un is not None:
+ Xn_predicted = Mt * Xn + Cm * Un
+ else:
Xn_predicted = Mt * Xn
Pn_predicted = Mt * Pn * Ma + Q
#
- d = Ynpu - Ht * Xn_predicted
+ if self._parameters["EstimationType"] == "Parameters" and Cm is not None and Un is not None:
+ d = Ynpu - Ht * Xn_predicted - Cm * Un
+ else:
+ d = Ynpu - Ht * Xn_predicted
K = Pn_predicted * Ha * (Ht * Pn_predicted * Ha + R).I
Xn = Xn_predicted + K * d
Pn = Pn_predicted - K * Ht * Pn_predicted
self.StoredVariables["Innovation"].store( numpy.ravel( d.A1 ) )
if "APosterioriCovariance" in self._parameters["StoreSupplementaryCalculations"]:
self.StoredVariables["APosterioriCovariance"].store( Pn )
+ if "CostFunctionJ" in self._parameters["StoreSupplementaryCalculations"]:
+ Jb = 0.5 * (Xn - Xb).T * BI * (Xn - Xb)
+ Jo = 0.5 * d.T * RI * d
+ J = float( Jb ) + float( Jo )
+ self.StoredVariables["CostFunctionJb"].store( Jb )
+ self.StoredVariables["CostFunctionJo"].store( Jo )
+ self.StoredVariables["CostFunctionJ" ].store( J )
#
logging.debug("%s Taille mémoire utilisée de %.1f Mo"%(self._name, m.getUsedMemory("M")))
logging.debug("%s Terminé"%self._name)
else:
return self.__Method( xValue )
+ def appliedControledFormTo(self, (xValue, uValue) ):
+ """
+ Permet de restituer le résultat de l'application de l'opérateur à une
+ paire (xValue, uValue). Cette méthode se contente d'appliquer, son
+ argument devant a priori être du bon type. Si la uValue est None,
+ on suppose que l'opérateur ne s'applique qu'à xValue.
+ Arguments :
+ - xValue : argument X adapté pour appliquer l'opérateur
+ - uValue : argument U adapté pour appliquer l'opérateur
+ """
+ if self.__Matrix is not None:
+ return self.__Matrix * xValue
+ elif uValue is not None:
+ return self.__Method( (xValue, uValue) )
+ else:
+ return self.__Method( xValue )
+
def appliedInXTo(self, (xNominal, xValue) ):
"""
Permet de restituer le résultat de l'application de l'opérateur à un