# -*- coding: utf-8 -*-
#
-# Copyright (C) 2008-2018 EDF R&D
+# Copyright (C) 2008-2020 EDF R&D
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
import numpy, math
mfp = PlatformInfo.PlatformInfo().MaximumPrecision()
-# Using "Ensemble Kalman filtering", L. HOUTEKAMER and HERSCHEL L. MITCHELL, QJRMS (2005), 131, pp. 3269–3289
-
# ==============================================================================
class ElementaryAlgorithm(BasicObjects.Algorithm):
def __init__(self):
message = "Nombre de membres dans l'ensemble",
minval = -1,
)
- self.defineRequiredParameter(
- name = "Minimizer",
- default = "EnKF",
- typecast = str,
- message = "Schéma de mise a jour des informations d'ensemble",
- listval = ["EnKF", "ETKF", "DEnKF"],
- )
- self.defineRequiredParameter(
- name = "ConstrainedBy",
- default = "EstimateProjection",
- typecast = str,
- message = "Prise en compte des contraintes",
- listval = ["EstimateProjection"],
- )
self.defineRequiredParameter(
name = "EstimationOf",
default = "State",
message = "Estimation d'etat ou de parametres",
listval = ["State", "Parameters"],
)
+ self.defineRequiredParameter(
+ name = "SetSeed",
+ typecast = numpy.random.seed,
+ message = "Graine fixée pour le générateur aléatoire",
+ )
self.defineRequiredParameter(
name = "StoreInternalVariables",
default = False,
default = [],
typecast = tuple,
message = "Liste de calculs supplémentaires à stocker et/ou effectuer",
- listval = ["APosterioriCorrelations", "APosterioriCovariance", "APosterioriStandardDeviations", "APosterioriVariances", "BMA", "CurrentState", "CostFunctionJ", "CostFunctionJb", "CostFunctionJo", "Innovation"]
- )
- self.defineRequiredParameter( # Pas de type
- name = "Bounds",
- message = "Liste des valeurs de bornes",
- )
- self.defineRequiredParameter(
- name = "SetSeed",
- typecast = numpy.random.seed,
- message = "Graine fixée pour le générateur aléatoire",
+ listval = [
+ "Analysis",
+ "APosterioriCorrelations",
+ "APosterioriCovariance",
+ "APosterioriStandardDeviations",
+ "APosterioriVariances",
+ "BMA",
+ "CostFunctionJ",
+ "CostFunctionJAtCurrentOptimum",
+ "CostFunctionJb",
+ "CostFunctionJbAtCurrentOptimum",
+ "CostFunctionJo",
+ "CostFunctionJoAtCurrentOptimum",
+ "CurrentOptimum",
+ "CurrentState",
+ "ForecastState",
+ "IndexOfOptimum",
+ "InnovationAtCurrentAnalysis",
+ "InnovationAtCurrentState",
+ "SimulatedObservationAtCurrentAnalysis",
+ "SimulatedObservationAtCurrentOptimum",
+ "SimulatedObservationAtCurrentState",
+ ]
)
self.requireInputArguments(
mandatory= ("Xb", "Y", "HO", "R", "B"),
#
# Précalcul des inversions de B et R
# ----------------------------------
- if self._parameters["StoreInternalVariables"]:
+ if self._parameters["StoreInternalVariables"] \
+ or self._toStore("CostFunctionJ") \
+ or self._toStore("CostFunctionJb") \
+ or self._toStore("CostFunctionJo") \
+ or self._toStore("CurrentOptimum") \
+ or self._toStore("APosterioriCovariance"):
BI = B.getI()
RI = R.getI()
- BIdemi = B.choleskyI()
- RIdemi = R.choleskyI()
+ # BIdemi = B.choleskyI()
+ # RIdemi = R.choleskyI()
#
# Initialisation
# --------------
if hasattr(Q,"asfullmatrix"): Qn = Q.asfullmatrix(__n)
else: Qn = Q
#
- self.StoredVariables["Analysis"].store( Xb.A1 )
- if "APosterioriCovariance" in self._parameters["StoreSupplementaryCalculations"]:
- self.StoredVariables["APosterioriCovariance"].store( Pn )
- covarianceXa = Pn
+ if len(self.StoredVariables["Analysis"])==0 or not self._parameters["nextStep"]:
+ self.StoredVariables["Analysis"].store( numpy.ravel(Xb) )
+ if self._toStore("APosterioriCovariance"):
+ self.StoredVariables["APosterioriCovariance"].store( Pn )
+ covarianceXa = Pn
+ #
Xa = Xb
+ XaMin = Xb
previousJMinimum = numpy.finfo(float).max
#
# Predimensionnement
#
if self._parameters["EstimationOf"] == "State":
for i in range(__m):
- qi = numpy.asmatrix(numpy.random.multivariate_normal(numpy.zeros(__n), Qn)).T
+ qi = numpy.asmatrix(numpy.random.multivariate_normal(numpy.zeros(__n), Qn, (1,1,1))).T
Xn_predicted[:,i] = numpy.asmatrix(numpy.ravel( M((Xn[:,i], Un)) )).T + qi
HX_predicted[:,i] = numpy.asmatrix(numpy.ravel( H((Xn_predicted[:,i], Un)) )).T
if Cm is not None and Un is not None: # Attention : si Cm est aussi dans M, doublon !
#
Xfm = numpy.asmatrix(numpy.ravel(Xn_predicted.mean(axis=1, dtype=mfp))).T
Hfm = numpy.asmatrix(numpy.ravel(HX_predicted.mean(axis=1, dtype=mfp))).T
- Af = Xn_predicted - Xfm
- Hf = HX_predicted - Hfm
#
PfHT, HPfHT = 0., 0.
for i in range(__m):
- PfHT += Af[:,i] * Hf[:,i].T
- HPfHT += Hf[:,i] * Hf[:,i].T
+ Exfi = Xn_predicted[:,i] - Xfm
+ Eyfi = HX_predicted[:,i] - Hfm
+ PfHT += Exfi * Eyfi.T
+ HPfHT += Eyfi * Eyfi.T
PfHT = (1./(__m-1)) * PfHT
HPfHT = (1./(__m-1)) * HPfHT
+ K = PfHT * ( R + HPfHT ).I
+ del PfHT, HPfHT
#
- K = PfHT * ( R + HPfHT ).I
- #
- Yo = numpy.asmatrix(numpy.zeros((__p,__m)))
for i in range(__m):
- ri = numpy.asmatrix(numpy.random.multivariate_normal(numpy.zeros(__p), Rn)).T
- Yo[:,i] = Ynpu + ri
- #
- for i in range(__m):
- Xn[:,i] = Xn_predicted[:,i] + K * (Yo[:,i] - HX_predicted[:,i])
+ ri = numpy.asmatrix(numpy.random.multivariate_normal(numpy.zeros(__p), Rn, (1,1,1))).T
+ Xn[:,i] = Xn_predicted[:,i] + K * (Ynpu + ri - HX_predicted[:,i])
#
Xa = Xn.mean(axis=1, dtype=mfp)
- self.StoredVariables["Analysis"].store( Xa )
#
- del Yo, PfHT, HPfHT
- if "APosterioriCovariance" in self._parameters["StoreSupplementaryCalculations"]:
- Ht = HO["Tangent"].asMatrix(ValueForMethodForm = Xa)
- Ht = Ht.reshape(__p,__n) # ADAO & check shape
- Pf = 0.
+ if self._parameters["StoreInternalVariables"] \
+ or self._toStore("CostFunctionJ") \
+ or self._toStore("CostFunctionJb") \
+ or self._toStore("CostFunctionJo") \
+ or self._toStore("APosterioriCovariance") \
+ or self._toStore("InnovationAtCurrentAnalysis") \
+ or self._toStore("SimulatedObservationAtCurrentAnalysis") \
+ or self._toStore("SimulatedObservationAtCurrentOptimum"):
+ _HXa = numpy.asmatrix(numpy.ravel( H((Xa, Un)) )).T
+ _Innovation = Ynpu - _HXa
+ #
+ # ---> avec analysis
+ self.StoredVariables["Analysis"].store( Xa )
+ if self._toStore("SimulatedObservationAtCurrentAnalysis"):
+ self.StoredVariables["SimulatedObservationAtCurrentAnalysis"].store( _HXa )
+ if self._toStore("InnovationAtCurrentAnalysis"):
+ self.StoredVariables["InnovationAtCurrentAnalysis"].store( _Innovation )
+ # ---> avec current state
+ if self._parameters["StoreInternalVariables"] \
+ or self._toStore("CurrentState"):
+ self.StoredVariables["CurrentState"].store( Xn )
+ if self._toStore("ForecastState"):
+ self.StoredVariables["ForecastState"].store( Xn_predicted )
+ if self._toStore("BMA"):
+ self.StoredVariables["BMA"].store( Xn_predicted - Xa )
+ if self._toStore("InnovationAtCurrentState"):
+ self.StoredVariables["InnovationAtCurrentState"].store( - HX_predicted + Ynpu )
+ if self._toStore("SimulatedObservationAtCurrentState") \
+ or self._toStore("SimulatedObservationAtCurrentOptimum"):
+ self.StoredVariables["SimulatedObservationAtCurrentState"].store( HX_predicted )
+ # ---> autres
+ if self._parameters["StoreInternalVariables"] \
+ or self._toStore("CostFunctionJ") \
+ or self._toStore("CostFunctionJb") \
+ or self._toStore("CostFunctionJo") \
+ or self._toStore("CurrentOptimum") \
+ or self._toStore("APosterioriCovariance"):
+ Jb = float( 0.5 * (Xa - Xb).T * BI * (Xa - Xb) )
+ Jo = float( 0.5 * _Innovation.T * RI * _Innovation )
+ J = Jb + Jo
+ self.StoredVariables["CostFunctionJb"].store( Jb )
+ self.StoredVariables["CostFunctionJo"].store( Jo )
+ self.StoredVariables["CostFunctionJ" ].store( J )
+ #
+ if self._toStore("IndexOfOptimum") \
+ or self._toStore("CurrentOptimum") \
+ or self._toStore("CostFunctionJAtCurrentOptimum") \
+ or self._toStore("CostFunctionJbAtCurrentOptimum") \
+ or self._toStore("CostFunctionJoAtCurrentOptimum") \
+ or self._toStore("SimulatedObservationAtCurrentOptimum"):
+ IndexMin = numpy.argmin( self.StoredVariables["CostFunctionJ"][nbPreviousSteps:] ) + nbPreviousSteps
+ if self._toStore("IndexOfOptimum"):
+ self.StoredVariables["IndexOfOptimum"].store( IndexMin )
+ if self._toStore("CurrentOptimum"):
+ self.StoredVariables["CurrentOptimum"].store( self.StoredVariables["Analysis"][IndexMin] )
+ if self._toStore("SimulatedObservationAtCurrentOptimum"):
+ self.StoredVariables["SimulatedObservationAtCurrentOptimum"].store( self.StoredVariables["SimulatedObservationAtCurrentAnalysis"][IndexMin] )
+ if self._toStore("CostFunctionJbAtCurrentOptimum"):
+ self.StoredVariables["CostFunctionJbAtCurrentOptimum"].store( self.StoredVariables["CostFunctionJb"][IndexMin] )
+ if self._toStore("CostFunctionJoAtCurrentOptimum"):
+ self.StoredVariables["CostFunctionJoAtCurrentOptimum"].store( self.StoredVariables["CostFunctionJo"][IndexMin] )
+ if self._toStore("CostFunctionJAtCurrentOptimum"):
+ self.StoredVariables["CostFunctionJAtCurrentOptimum" ].store( self.StoredVariables["CostFunctionJ" ][IndexMin] )
+ if self._toStore("APosterioriCovariance"):
+ Pn = 0.
for i in range(__m):
- Pf += Af[:,i] * Af[:,i].T
- Pf = (1./(__m-1)) * Pf
- Pn = (1. - K * Ht) * Pf
+ Eai = Xn[:,i] - Xa
+ Pn += Eai * Eai.T
+ Pn = (1./(__m-1)) * Pn
self.StoredVariables["APosterioriCovariance"].store( Pn )
+ if self._parameters["EstimationOf"] == "Parameters" \
+ and J < previousJMinimum:
+ previousJMinimum = J
+ XaMin = Xa
+ if self._toStore("APosterioriCovariance"):
+ covarianceXaMin = Pn
+ #
+ # Stockage final supplémentaire de l'optimum en estimation de paramètres
+ # ----------------------------------------------------------------------
+ if self._parameters["EstimationOf"] == "Parameters":
+ self.StoredVariables["Analysis"].store( XaMin )
+ if self._toStore("APosterioriCovariance"):
+ self.StoredVariables["APosterioriCovariance"].store( covarianceXaMin )
+ if self._toStore("BMA"):
+ self.StoredVariables["BMA"].store( numpy.ravel(Xb) - numpy.ravel(XaMin) )
#
self._post_run(HO)
return 0
# ==============================================================================
if __name__ == "__main__":
- print('\n AUTODIAGNOSTIC \n')
+ print('\n AUTODIAGNOSTIC\n')