From 5464fabc0216852c342838f67eafd69e505c245b Mon Sep 17 00:00:00 2001 From: Jean-Philippe ARGAUD Date: Wed, 13 Jan 2021 21:20:46 +0100 Subject: [PATCH] Improvement and extension of EnKF algorithm (MLEF-B) --- .../daAlgorithms/EnsembleKalmanFilter.py | 8 + src/daComposant/daCore/NumericObjects.py | 225 ++++++++++++++++++ 2 files changed, 233 insertions(+) diff --git a/src/daComposant/daAlgorithms/EnsembleKalmanFilter.py b/src/daComposant/daAlgorithms/EnsembleKalmanFilter.py index ead0c30..313ea75 100644 --- a/src/daComposant/daAlgorithms/EnsembleKalmanFilter.py +++ b/src/daComposant/daAlgorithms/EnsembleKalmanFilter.py @@ -42,6 +42,8 @@ class ElementaryAlgorithm(BasicObjects.Algorithm): "ETKF-N-11", "ETKF-N-15", "ETKF-N-16", + "MLEF", + "MLEF-B", ], ) self.defineRequiredParameter( @@ -182,6 +184,12 @@ class ElementaryAlgorithm(BasicObjects.Algorithm): elif self._parameters["Minimizer"] in ["ETKF-N-16", "ETKF-N"]: NumericObjects.etkf(self, Xb, Y, U, HO, EM, CM, R, B, Q, KorV="FiniteSize16") # + #-------------------------- + # Default MLEF = MLEF-B + elif self._parameters["Minimizer"] in ["MLEF-B", "MLEF"]: + NumericObjects.mlef(self, Xb, Y, U, HO, EM, CM, R, B, Q, BnotT=False) + # + #-------------------------- else: raise ValueError("Error in Minimizer name: %s"%self._parameters["Minimizer"]) # diff --git a/src/daComposant/daCore/NumericObjects.py b/src/daComposant/daCore/NumericObjects.py index 402000a..9d6ed56 100644 --- a/src/daComposant/daCore/NumericObjects.py +++ b/src/daComposant/daCore/NumericObjects.py @@ -1151,6 +1151,231 @@ def etkf(selfA, Xb, Y, U, HO, EM, CM, R, B, Q, KorV="KalmanFilterFormula"): # return 0 +# ============================================================================== +def mlef(selfA, Xb, Y, U, HO, EM, CM, R, B, Q, BnotT=False, _epsilon=1.e-1, _e=1.e-7, _jmax=15000): + """ + Maximum Likelihood Ensemble Filter (EnKF/MLEF Zupanski 2005, Bocquet 2013) + + selfA est identique au "self" d'algorithme appelant et contient les + valeurs. + """ + if selfA._parameters["EstimationOf"] == "Parameters": + selfA._parameters["StoreInternalVariables"] = True + # + # Opérateurs + # ---------- + H = HO["Direct"].appliedControledFormTo + # + if selfA._parameters["EstimationOf"] == "State": + M = EM["Direct"].appliedControledFormTo + # + if CM is not None and "Tangent" in CM and U is not None: + Cm = CM["Tangent"].asMatrix(Xb) + else: + Cm = None + # + # Nombre de pas identique au nombre de pas d'observations + # ------------------------------------------------------- + if hasattr(Y,"stepnumber"): + duration = Y.stepnumber() + __p = numpy.cumprod(Y.shape())[-1] + else: + duration = 2 + __p = numpy.array(Y).size + # + # Précalcul des inversions de B et R + # ---------------------------------- + if selfA._parameters["StoreInternalVariables"] \ + or selfA._toStore("CostFunctionJ") \ + or selfA._toStore("CostFunctionJb") \ + or selfA._toStore("CostFunctionJo") \ + or selfA._toStore("CurrentOptimum") \ + or selfA._toStore("APosterioriCovariance"): + BI = B.getI() + RI = R.getI() + # + # Initialisation + # -------------- + __n = Xb.size + __m = selfA._parameters["NumberOfMembers"] + Xn = numpy.asmatrix(numpy.dot( Xb.reshape(__n,1), numpy.ones((1,__m)) )) + if hasattr(B,"asfullmatrix"): Pn = B.asfullmatrix(__n) + else: Pn = B + if hasattr(R,"asfullmatrix"): Rn = R.asfullmatrix(__p) + else: Rn = R + if hasattr(Q,"asfullmatrix"): Qn = Q.asfullmatrix(__n) + else: Qn = Q + # + if len(selfA.StoredVariables["Analysis"])==0 or not selfA._parameters["nextStep"]: + selfA.StoredVariables["Analysis"].store( numpy.ravel(Xb) ) + if selfA._toStore("APosterioriCovariance"): + selfA.StoredVariables["APosterioriCovariance"].store( Pn ) + covarianceXa = Pn + # + previousJMinimum = numpy.finfo(float).max + # + # Predimensionnement + Xn_predicted = numpy.asmatrix(numpy.zeros((__n,__m))) + # + for step in range(duration-1): + if hasattr(Y,"store"): + Ynpu = numpy.asmatrix(numpy.ravel( Y[step+1] )).T + else: + Ynpu = numpy.asmatrix(numpy.ravel( Y )).T + # + if U is not None: + if hasattr(U,"store") and len(U)>1: + Un = numpy.asmatrix(numpy.ravel( U[step] )).T + elif hasattr(U,"store") and len(U)==1: + Un = numpy.asmatrix(numpy.ravel( U[0] )).T + else: + Un = numpy.asmatrix(numpy.ravel( U )).T + else: + Un = None + # + if selfA._parameters["EstimationOf"] == "State": # Forecast + Q and observation of forecast + EMX = M( [(Xn[:,i], Un) for i in range(__m)], argsAsSerie = True ) + for i in range(__m): + qi = numpy.random.multivariate_normal(numpy.zeros(__n), Qn) + Xn_predicted[:,i] = (numpy.ravel( EMX[i] ) + qi).reshape((__n,-1)) + if Cm is not None and Un is not None: # Attention : si Cm est aussi dans M, doublon ! + Cm = Cm.reshape(__n,Un.size) # ADAO & check shape + Xn_predicted = Xn_predicted + Cm * Un + elif selfA._parameters["EstimationOf"] == "Parameters": # Observation of forecast + # --- > Par principe, M = Id, Q = 0 + Xn_predicted = Xn + # + # Mean of forecast and observation of forecast + Xfm = Xn_predicted.mean(axis=1, dtype=mfp).astype('float') + # + EaX = (Xn_predicted - Xfm.reshape((__n,-1))) / numpy.sqrt(__m-1) + # + #-------------------------- + Ua = numpy.eye(__m) + Ta = numpy.eye(__m) + # + __j = 0 # 4: + vw = numpy.zeros(__m) # 4: + Deltaw = 1 + while numpy.linalg.norm(Deltaw) >= _e or __j >= _jmax: # 5: et 19: + vx = numpy.ravel(Xfm) + EaX @ vw # 6: + # + if BnotT: + EE = vx.reshape((__n,-1)) + _epsilon * EaX # 7: + # + EZ = H( [(EE[:,i], Un) for i in range(__m)], + argsAsSerie = True, + returnSerieAsArrayMatrix = True ) + # + ybar = EZ.mean(axis=1, dtype=mfp).astype('float').reshape((__p,-1)) # 10: Observation mean + # + if BnotT: + EY = (EZ - ybar) / _epsilon # 11: + # + GradJ = numpy.ravel(vw.reshape((__m,1)) - EY.transpose() @ (RI * (Ynpu - ybar))) # 13: + mH = numpy.eye(__m) + EY.transpose() @ (RI * EY) # 14: + Deltaw = numpy.linalg.solve(mH,GradJ) # 15: + vw = vw - Deltaw # 16: + if not BnotT: + Ta = numpy.linalg.inv(numpy.real(scipy.linalg.sqrtm( mH ))) # 17: + __j = __j + 1 # 18: + # + if BnotT: + Ta = numpy.linalg.inv(numpy.real(scipy.linalg.sqrtm( mH ))) # 20: + # + Xn = vx.reshape((__n,-1)) + numpy.sqrt(__m-1) * EaX @ Ta @ Ua # 21: + # + Xa = Xn.mean(axis=1, dtype=mfp).astype('float') + #-------------------------- + # + if selfA._parameters["StoreInternalVariables"] \ + or selfA._toStore("CostFunctionJ") \ + or selfA._toStore("CostFunctionJb") \ + or selfA._toStore("CostFunctionJo") \ + or selfA._toStore("APosterioriCovariance") \ + or selfA._toStore("InnovationAtCurrentAnalysis") \ + or selfA._toStore("SimulatedObservationAtCurrentAnalysis") \ + or selfA._toStore("SimulatedObservationAtCurrentOptimum"): + _HXa = numpy.asmatrix(numpy.ravel( H((Xa, Un)) )).T + _Innovation = Ynpu - _HXa + # + selfA.StoredVariables["CurrentIterationNumber"].store( len(selfA.StoredVariables["Analysis"]) ) + # ---> avec analysis + selfA.StoredVariables["Analysis"].store( Xa ) + if selfA._toStore("SimulatedObservationAtCurrentAnalysis"): + selfA.StoredVariables["SimulatedObservationAtCurrentAnalysis"].store( _HXa ) + if selfA._toStore("InnovationAtCurrentAnalysis"): + selfA.StoredVariables["InnovationAtCurrentAnalysis"].store( _Innovation ) + # ---> avec current state + if selfA._parameters["StoreInternalVariables"] \ + or selfA._toStore("CurrentState"): + selfA.StoredVariables["CurrentState"].store( Xn ) + if selfA._toStore("ForecastState"): + selfA.StoredVariables["ForecastState"].store( Xn_predicted ) + if selfA._toStore("BMA"): + selfA.StoredVariables["BMA"].store( Xn_predicted - Xa ) + #~ if selfA._toStore("InnovationAtCurrentState"): + #~ selfA.StoredVariables["InnovationAtCurrentState"].store( - HX_predicted + Ynpu ) + #~ if selfA._toStore("SimulatedObservationAtCurrentState") \ + #~ or selfA._toStore("SimulatedObservationAtCurrentOptimum"): + #~ selfA.StoredVariables["SimulatedObservationAtCurrentState"].store( HX_predicted ) + # ---> autres + if selfA._parameters["StoreInternalVariables"] \ + or selfA._toStore("CostFunctionJ") \ + or selfA._toStore("CostFunctionJb") \ + or selfA._toStore("CostFunctionJo") \ + or selfA._toStore("CurrentOptimum") \ + or selfA._toStore("APosterioriCovariance"): + Jb = float( 0.5 * (Xa - Xb).T * BI * (Xa - Xb) ) + Jo = float( 0.5 * _Innovation.T * RI * _Innovation ) + J = Jb + Jo + selfA.StoredVariables["CostFunctionJb"].store( Jb ) + selfA.StoredVariables["CostFunctionJo"].store( Jo ) + selfA.StoredVariables["CostFunctionJ" ].store( J ) + # + if selfA._toStore("IndexOfOptimum") \ + or selfA._toStore("CurrentOptimum") \ + or selfA._toStore("CostFunctionJAtCurrentOptimum") \ + or selfA._toStore("CostFunctionJbAtCurrentOptimum") \ + or selfA._toStore("CostFunctionJoAtCurrentOptimum") \ + or selfA._toStore("SimulatedObservationAtCurrentOptimum"): + IndexMin = numpy.argmin( selfA.StoredVariables["CostFunctionJ"][nbPreviousSteps:] ) + nbPreviousSteps + if selfA._toStore("IndexOfOptimum"): + selfA.StoredVariables["IndexOfOptimum"].store( IndexMin ) + if selfA._toStore("CurrentOptimum"): + selfA.StoredVariables["CurrentOptimum"].store( selfA.StoredVariables["Analysis"][IndexMin] ) + if selfA._toStore("SimulatedObservationAtCurrentOptimum"): + selfA.StoredVariables["SimulatedObservationAtCurrentOptimum"].store( selfA.StoredVariables["SimulatedObservationAtCurrentAnalysis"][IndexMin] ) + if selfA._toStore("CostFunctionJbAtCurrentOptimum"): + selfA.StoredVariables["CostFunctionJbAtCurrentOptimum"].store( selfA.StoredVariables["CostFunctionJb"][IndexMin] ) + if selfA._toStore("CostFunctionJoAtCurrentOptimum"): + selfA.StoredVariables["CostFunctionJoAtCurrentOptimum"].store( selfA.StoredVariables["CostFunctionJo"][IndexMin] ) + if selfA._toStore("CostFunctionJAtCurrentOptimum"): + selfA.StoredVariables["CostFunctionJAtCurrentOptimum" ].store( selfA.StoredVariables["CostFunctionJ" ][IndexMin] ) + if selfA._toStore("APosterioriCovariance"): + Eai = (1/numpy.sqrt(__m-1)) * (Xn - Xa.reshape((__n,-1))) # Anomalies + Pn = Eai @ Eai.T + Pn = 0.5 * (Pn + Pn.T) + selfA.StoredVariables["APosterioriCovariance"].store( Pn ) + if selfA._parameters["EstimationOf"] == "Parameters" \ + and J < previousJMinimum: + previousJMinimum = J + XaMin = Xa + if selfA._toStore("APosterioriCovariance"): + covarianceXaMin = Pn + # + # Stockage final supplémentaire de l'optimum en estimation de paramètres + # ---------------------------------------------------------------------- + if selfA._parameters["EstimationOf"] == "Parameters": + selfA.StoredVariables["CurrentIterationNumber"].store( len(selfA.StoredVariables["Analysis"]) ) + selfA.StoredVariables["Analysis"].store( XaMin ) + if selfA._toStore("APosterioriCovariance"): + selfA.StoredVariables["APosterioriCovariance"].store( covarianceXaMin ) + if selfA._toStore("BMA"): + selfA.StoredVariables["BMA"].store( numpy.ravel(Xb) - numpy.ravel(XaMin) ) + # + return 0 + # ============================================================================== if __name__ == "__main__": print('\n AUTODIAGNOSTIC\n') -- 2.39.2