import numpy, math
mfp = PlatformInfo.PlatformInfo().MaximumPrecision()
-# Using "Ensemble Kalman filtering", L. HOUTEKAMER and HERSCHEL L. MITCHELL, QJRMS (2005), 131, pp. 3269–3289
-
# ==============================================================================
class ElementaryAlgorithm(BasicObjects.Algorithm):
def __init__(self):
message = "Nombre de membres dans l'ensemble",
minval = -1,
)
- self.defineRequiredParameter(
- name = "Minimizer",
- default = "EnKF",
- typecast = str,
- message = "Schéma de mise a jour des informations d'ensemble",
- listval = ["EnKF", "ETKF", "DEnKF"],
- )
- self.defineRequiredParameter(
- name = "ConstrainedBy",
- default = "EstimateProjection",
- typecast = str,
- message = "Prise en compte des contraintes",
- listval = ["EstimateProjection"],
- )
self.defineRequiredParameter(
name = "EstimationOf",
default = "State",
message = "Estimation d'etat ou de parametres",
listval = ["State", "Parameters"],
)
+ self.defineRequiredParameter(
+ name = "SetSeed",
+ typecast = numpy.random.seed,
+ message = "Graine fixée pour le générateur aléatoire",
+ )
self.defineRequiredParameter(
name = "StoreInternalVariables",
default = False,
default = [],
typecast = tuple,
message = "Liste de calculs supplémentaires à stocker et/ou effectuer",
- listval = ["APosterioriCorrelations", "APosterioriCovariance", "APosterioriStandardDeviations", "APosterioriVariances", "BMA", "CurrentState", "CostFunctionJ", "CostFunctionJb", "CostFunctionJo", "Innovation"]
- )
- self.defineRequiredParameter( # Pas de type
- name = "Bounds",
- message = "Liste des valeurs de bornes",
- )
- self.defineRequiredParameter(
- name = "SetSeed",
- typecast = numpy.random.seed,
- message = "Graine fixée pour le générateur aléatoire",
+ listval = [
+ "APosterioriCorrelations",
+ "APosterioriCovariance",
+ "APosterioriStandardDeviations",
+ "APosterioriVariances",
+ "BMA",
+ "CostFunctionJ",
+ "CostFunctionJb",
+ "CostFunctionJo",
+ "CurrentState",
+ "Innovation",
+ ]
)
self.requireInputArguments(
mandatory= ("Xb", "Y", "HO", "R", "B"),
#
# Précalcul des inversions de B et R
# ----------------------------------
- if self._parameters["StoreInternalVariables"]:
+ if self._parameters["StoreInternalVariables"] \
+ or "CostFunctionJ" in self._parameters["StoreSupplementaryCalculations"] \
+ or "CostFunctionJb" in self._parameters["StoreSupplementaryCalculations"] \
+ or "CostFunctionJo" in self._parameters["StoreSupplementaryCalculations"] \
+ or "APosterioriCovariance" in self._parameters["StoreSupplementaryCalculations"]:
BI = B.getI()
RI = R.getI()
BIdemi = B.choleskyI()
self.StoredVariables["Analysis"].store( Xa )
#
del Yo, PfHT, HPfHT
+ if self._parameters["StoreInternalVariables"] \
+ or "CostFunctionJ" in self._parameters["StoreSupplementaryCalculations"] \
+ or "CostFunctionJb" in self._parameters["StoreSupplementaryCalculations"] \
+ or "CostFunctionJo" in self._parameters["StoreSupplementaryCalculations"] \
+ or "APosterioriCovariance" in self._parameters["StoreSupplementaryCalculations"] \
+ or "Innovation" in self._parameters["StoreSupplementaryCalculations"]:
+ d = Ynpu - numpy.asmatrix(numpy.ravel( H((Xa, Un)) )).T
+ self.StoredVariables["Innovation"].store( d )
+ if self._parameters["StoreInternalVariables"] \
+ or "CurrentState" in self._parameters["StoreSupplementaryCalculations"]:
+ self.StoredVariables["CurrentState"].store( Xn )
+ if self._parameters["StoreInternalVariables"] \
+ or "CostFunctionJ" in self._parameters["StoreSupplementaryCalculations"] \
+ or "CostFunctionJb" in self._parameters["StoreSupplementaryCalculations"] \
+ or "CostFunctionJo" in self._parameters["StoreSupplementaryCalculations"] \
+ or "APosterioriCovariance" in self._parameters["StoreSupplementaryCalculations"]:
+ Jb = 0.5 * (Xa - Xb).T * BI * (Xa - Xb)
+ Jo = 0.5 * d.T * RI * d
+ J = float( Jb ) + float( Jo )
+ self.StoredVariables["CostFunctionJb"].store( Jb )
+ self.StoredVariables["CostFunctionJo"].store( Jo )
+ self.StoredVariables["CostFunctionJ" ].store( J )
if "APosterioriCovariance" in self._parameters["StoreSupplementaryCalculations"]:
Ht = HO["Tangent"].asMatrix(ValueForMethodForm = Xa)
Ht = Ht.reshape(__p,__n) # ADAO & check shape
Pf = (1./(__m-1)) * Pf
Pn = (1. - K * Ht) * Pf
self.StoredVariables["APosterioriCovariance"].store( Pn )
+ if J < previousJMinimum:
+ previousJMinimum = J
+ Xa = Xn
+ covarianceXa = Pn
+ #
+ # Stockage supplementaire de l'optimum en estimation de parametres
+ # ----------------------------------------------------------------
+ if self._parameters["EstimationOf"] == "Parameters":
+ self.StoredVariables["Analysis"].store( Xa.A1 )
+ if "APosterioriCovariance" in self._parameters["StoreSupplementaryCalculations"]:
+ self.StoredVariables["APosterioriCovariance"].store( covarianceXa )
+ #
+ if "BMA" in self._parameters["StoreSupplementaryCalculations"]:
+ self.StoredVariables["BMA"].store( numpy.ravel(Xb) - numpy.ravel(Xa) )
#
self._post_run(HO)
return 0