- #
- if selfA._parameters["StoreInternalVariables"] \
- or selfA._toStore("CostFunctionJ") \
- or selfA._toStore("CostFunctionJb") \
- or selfA._toStore("CostFunctionJo") \
- or selfA._toStore("APosterioriCovariance") \
- or selfA._toStore("InnovationAtCurrentAnalysis") \
- or selfA._toStore("SimulatedObservationAtCurrentAnalysis") \
- or selfA._toStore("SimulatedObservationAtCurrentOptimum"):
- _HXa = numpy.asmatrix(numpy.ravel( H((Xa, Un)) )).T
- _Innovation = Ynpu - _HXa
- #
- selfA.StoredVariables["CurrentIterationNumber"].store( len(selfA.StoredVariables["Analysis"]) )
- # ---> avec analysis
- selfA.StoredVariables["Analysis"].store( Xa )
- if selfA._toStore("SimulatedObservationAtCurrentAnalysis"):
- selfA.StoredVariables["SimulatedObservationAtCurrentAnalysis"].store( _HXa )
- if selfA._toStore("InnovationAtCurrentAnalysis"):
- selfA.StoredVariables["InnovationAtCurrentAnalysis"].store( _Innovation )
- # ---> avec current state
- if selfA._parameters["StoreInternalVariables"] \
- or selfA._toStore("CurrentState"):
- selfA.StoredVariables["CurrentState"].store( Xn )
- if selfA._toStore("ForecastState"):
- selfA.StoredVariables["ForecastState"].store( E2 )
- if selfA._toStore("ForecastCovariance"):
- selfA.StoredVariables["ForecastCovariance"].store( EnsembleErrorCovariance(E2) )
- if selfA._toStore("BMA"):
- selfA.StoredVariables["BMA"].store( E2 - Xa )
- if selfA._toStore("InnovationAtCurrentState"):
- selfA.StoredVariables["InnovationAtCurrentState"].store( - HE2 + Ynpu )
- if selfA._toStore("SimulatedObservationAtCurrentState") \
- or selfA._toStore("SimulatedObservationAtCurrentOptimum"):
- selfA.StoredVariables["SimulatedObservationAtCurrentState"].store( HE2 )
- # ---> autres
- if selfA._parameters["StoreInternalVariables"] \
- or selfA._toStore("CostFunctionJ") \
- or selfA._toStore("CostFunctionJb") \
- or selfA._toStore("CostFunctionJo") \
- or selfA._toStore("CurrentOptimum") \
- or selfA._toStore("APosterioriCovariance"):
- Jb = float( 0.5 * (Xa - Xb).T * BI * (Xa - Xb) )
- Jo = float( 0.5 * _Innovation.T * RI * _Innovation )
- J = Jb + Jo
- selfA.StoredVariables["CostFunctionJb"].store( Jb )
- selfA.StoredVariables["CostFunctionJo"].store( Jo )
- selfA.StoredVariables["CostFunctionJ" ].store( J )
- #
- if selfA._toStore("IndexOfOptimum") \
- or selfA._toStore("CurrentOptimum") \
- or selfA._toStore("CostFunctionJAtCurrentOptimum") \
- or selfA._toStore("CostFunctionJbAtCurrentOptimum") \
- or selfA._toStore("CostFunctionJoAtCurrentOptimum") \
- or selfA._toStore("SimulatedObservationAtCurrentOptimum"):
- IndexMin = numpy.argmin( selfA.StoredVariables["CostFunctionJ"][nbPreviousSteps:] ) + nbPreviousSteps
- if selfA._toStore("IndexOfOptimum"):
- selfA.StoredVariables["IndexOfOptimum"].store( IndexMin )
- if selfA._toStore("CurrentOptimum"):
- selfA.StoredVariables["CurrentOptimum"].store( selfA.StoredVariables["Analysis"][IndexMin] )
- if selfA._toStore("SimulatedObservationAtCurrentOptimum"):
- selfA.StoredVariables["SimulatedObservationAtCurrentOptimum"].store( selfA.StoredVariables["SimulatedObservationAtCurrentAnalysis"][IndexMin] )
- if selfA._toStore("CostFunctionJbAtCurrentOptimum"):
- selfA.StoredVariables["CostFunctionJbAtCurrentOptimum"].store( selfA.StoredVariables["CostFunctionJb"][IndexMin] )
- if selfA._toStore("CostFunctionJoAtCurrentOptimum"):
- selfA.StoredVariables["CostFunctionJoAtCurrentOptimum"].store( selfA.StoredVariables["CostFunctionJo"][IndexMin] )
- if selfA._toStore("CostFunctionJAtCurrentOptimum"):
- selfA.StoredVariables["CostFunctionJAtCurrentOptimum" ].store( selfA.StoredVariables["CostFunctionJ" ][IndexMin] )
- if selfA._toStore("APosterioriCovariance"):
- selfA.StoredVariables["APosterioriCovariance"].store( EnsembleErrorCovariance(Xn) )
- if selfA._parameters["EstimationOf"] == "Parameters" \
- and J < previousJMinimum:
- previousJMinimum = J
- XaMin = Xa
- if selfA._toStore("APosterioriCovariance"):
- covarianceXaMin = selfA.StoredVariables["APosterioriCovariance"][-1]
- #
- # Stockage final supplémentaire de l'optimum en estimation de paramètres
- # ----------------------------------------------------------------------
- if selfA._parameters["EstimationOf"] == "Parameters":
- selfA.StoredVariables["CurrentIterationNumber"].store( len(selfA.StoredVariables["Analysis"]) )
- selfA.StoredVariables["Analysis"].store( XaMin )
- if selfA._toStore("APosterioriCovariance"):
- selfA.StoredVariables["APosterioriCovariance"].store( covarianceXaMin )
- if selfA._toStore("BMA"):
- selfA.StoredVariables["BMA"].store( numpy.ravel(Xb) - numpy.ravel(XaMin) )
- #
- return 0
-
-# ==============================================================================
-def incr3dvar(selfA, Xb, Y, U, HO, EM, CM, R, B, Q):
- """
- 3DVAR incrémental
- """
- #
- # Initialisations
- # ---------------
- #
- # Opérateur non-linéaire pour la boucle externe
- Hm = HO["Direct"].appliedTo
- #
- # Précalcul des inversions de B et R
- BI = B.getI()
- RI = R.getI()
- #
- # Point de démarrage de l'optimisation
- Xini = selfA._parameters["InitializationPoint"]
- #
- HXb = numpy.asmatrix(numpy.ravel( Hm( Xb ) )).T
- Innovation = Y - HXb
- #
- # Outer Loop
- # ----------
- iOuter = 0
- J = 1./mpr
- DeltaJ = 1./mpr
- Xr = Xini.reshape((-1,1))
- while abs(DeltaJ) >= selfA._parameters["CostDecrementTolerance"] and iOuter <= selfA._parameters["MaximumNumberOfSteps"]:
- #
- # Inner Loop
- # ----------
- Ht = HO["Tangent"].asMatrix(Xr)
- Ht = Ht.reshape(Y.size,Xr.size) # ADAO & check shape
- #
- # Définition de la fonction-coût
- # ------------------------------
- def CostFunction(dx):
- _dX = numpy.asmatrix(numpy.ravel( dx )).T
- if selfA._parameters["StoreInternalVariables"] or \
- selfA._toStore("CurrentState") or \
- selfA._toStore("CurrentOptimum"):
- selfA.StoredVariables["CurrentState"].store( Xb + _dX )
- _HdX = Ht * _dX
- _HdX = numpy.asmatrix(numpy.ravel( _HdX )).T
- _dInnovation = Innovation - _HdX
- if selfA._toStore("SimulatedObservationAtCurrentState") or \
- selfA._toStore("SimulatedObservationAtCurrentOptimum"):
- selfA.StoredVariables["SimulatedObservationAtCurrentState"].store( HXb + _HdX )
- if selfA._toStore("InnovationAtCurrentState"):
- selfA.StoredVariables["InnovationAtCurrentState"].store( _dInnovation )
- #
- Jb = float( 0.5 * _dX.T * BI * _dX )
- Jo = float( 0.5 * _dInnovation.T * RI * _dInnovation )
- J = Jb + Jo
- #
- selfA.StoredVariables["CurrentIterationNumber"].store( len(selfA.StoredVariables["CostFunctionJ"]) )
- selfA.StoredVariables["CostFunctionJb"].store( Jb )
- selfA.StoredVariables["CostFunctionJo"].store( Jo )
- selfA.StoredVariables["CostFunctionJ" ].store( J )
- if selfA._toStore("IndexOfOptimum") or \
- selfA._toStore("CurrentOptimum") or \
- selfA._toStore("CostFunctionJAtCurrentOptimum") or \
- selfA._toStore("CostFunctionJbAtCurrentOptimum") or \
- selfA._toStore("CostFunctionJoAtCurrentOptimum") or \
- selfA._toStore("SimulatedObservationAtCurrentOptimum"):
- IndexMin = numpy.argmin( selfA.StoredVariables["CostFunctionJ"][nbPreviousSteps:] ) + nbPreviousSteps
- if selfA._toStore("IndexOfOptimum"):
- selfA.StoredVariables["IndexOfOptimum"].store( IndexMin )
- if selfA._toStore("CurrentOptimum"):
- selfA.StoredVariables["CurrentOptimum"].store( selfA.StoredVariables["CurrentState"][IndexMin] )
- if selfA._toStore("SimulatedObservationAtCurrentOptimum"):
- selfA.StoredVariables["SimulatedObservationAtCurrentOptimum"].store( selfA.StoredVariables["SimulatedObservationAtCurrentState"][IndexMin] )
- if selfA._toStore("CostFunctionJbAtCurrentOptimum"):
- selfA.StoredVariables["CostFunctionJbAtCurrentOptimum"].store( selfA.StoredVariables["CostFunctionJb"][IndexMin] )
- if selfA._toStore("CostFunctionJoAtCurrentOptimum"):
- selfA.StoredVariables["CostFunctionJoAtCurrentOptimum"].store( selfA.StoredVariables["CostFunctionJo"][IndexMin] )
- if selfA._toStore("CostFunctionJAtCurrentOptimum"):
- selfA.StoredVariables["CostFunctionJAtCurrentOptimum" ].store( selfA.StoredVariables["CostFunctionJ" ][IndexMin] )
- return J
- #
- def GradientOfCostFunction(dx):
- _dX = numpy.asmatrix(numpy.ravel( dx )).T
- _HdX = Ht * _dX
- _HdX = numpy.asmatrix(numpy.ravel( _HdX )).T
- _dInnovation = Innovation - _HdX
- GradJb = BI * _dX
- GradJo = - Ht.T @ (RI * _dInnovation)
- GradJ = numpy.ravel( GradJb ) + numpy.ravel( GradJo )
- return GradJ
- #
- # Minimisation de la fonctionnelle
- # --------------------------------
- nbPreviousSteps = selfA.StoredVariables["CostFunctionJ"].stepnumber()
- #
- if selfA._parameters["Minimizer"] == "LBFGSB":
- # Minimum, J_optimal, Informations = scipy.optimize.fmin_l_bfgs_b(
- if "0.19" <= scipy.version.version <= "1.1.0":
- import lbfgsbhlt as optimiseur
- else:
- import scipy.optimize as optimiseur
- Minimum, J_optimal, Informations = optimiseur.fmin_l_bfgs_b(
- func = CostFunction,
- x0 = numpy.zeros(Xini.size),
- fprime = GradientOfCostFunction,
- args = (),
- bounds = RecentredBounds(selfA._parameters["Bounds"], Xb),
- maxfun = selfA._parameters["MaximumNumberOfSteps"]-1,
- factr = selfA._parameters["CostDecrementTolerance"]*1.e14,
- pgtol = selfA._parameters["ProjectedGradientTolerance"],
- iprint = selfA._parameters["optiprint"],
- )
- nfeval = Informations['funcalls']
- rc = Informations['warnflag']
- elif selfA._parameters["Minimizer"] == "TNC":
- Minimum, nfeval, rc = scipy.optimize.fmin_tnc(
- func = CostFunction,
- x0 = numpy.zeros(Xini.size),
- fprime = GradientOfCostFunction,
- args = (),
- bounds = RecentredBounds(selfA._parameters["Bounds"], Xb),
- maxfun = selfA._parameters["MaximumNumberOfSteps"],
- pgtol = selfA._parameters["ProjectedGradientTolerance"],
- ftol = selfA._parameters["CostDecrementTolerance"],
- messages = selfA._parameters["optmessages"],
- )
- elif selfA._parameters["Minimizer"] == "CG":
- Minimum, fopt, nfeval, grad_calls, rc = scipy.optimize.fmin_cg(
- f = CostFunction,
- x0 = numpy.zeros(Xini.size),
- fprime = GradientOfCostFunction,
- args = (),
- maxiter = selfA._parameters["MaximumNumberOfSteps"],
- gtol = selfA._parameters["GradientNormTolerance"],
- disp = selfA._parameters["optdisp"],
- full_output = True,
- )
- elif selfA._parameters["Minimizer"] == "NCG":
- Minimum, fopt, nfeval, grad_calls, hcalls, rc = scipy.optimize.fmin_ncg(
- f = CostFunction,
- x0 = numpy.zeros(Xini.size),
- fprime = GradientOfCostFunction,
- args = (),
- maxiter = selfA._parameters["MaximumNumberOfSteps"],
- avextol = selfA._parameters["CostDecrementTolerance"],
- disp = selfA._parameters["optdisp"],
- full_output = True,
- )
- elif selfA._parameters["Minimizer"] == "BFGS":
- Minimum, fopt, gopt, Hopt, nfeval, grad_calls, rc = scipy.optimize.fmin_bfgs(
- f = CostFunction,
- x0 = numpy.zeros(Xini.size),
- fprime = GradientOfCostFunction,
- args = (),
- maxiter = selfA._parameters["MaximumNumberOfSteps"],
- gtol = selfA._parameters["GradientNormTolerance"],
- disp = selfA._parameters["optdisp"],
- full_output = True,
- )
- else:
- raise ValueError("Error in Minimizer name: %s"%selfA._parameters["Minimizer"])
- #
- IndexMin = numpy.argmin( selfA.StoredVariables["CostFunctionJ"][nbPreviousSteps:] ) + nbPreviousSteps
- MinJ = selfA.StoredVariables["CostFunctionJ"][IndexMin]
- #
- if selfA._parameters["StoreInternalVariables"] or selfA._toStore("CurrentState"):
- Minimum = selfA.StoredVariables["CurrentState"][IndexMin]
- Minimum = numpy.asmatrix(numpy.ravel( Minimum )).T
- else:
- Minimum = Xb + numpy.asmatrix(numpy.ravel( Minimum )).T
- #
- Xr = Minimum
- DeltaJ = selfA.StoredVariables["CostFunctionJ" ][-1] - J
- iOuter = selfA.StoredVariables["CurrentIterationNumber"][-1]
- #
- # Obtention de l'analyse
- # ----------------------
- Xa = Xr
- #
- selfA.StoredVariables["Analysis"].store( Xa )
- #
- if selfA._toStore("OMA") or \
- selfA._toStore("SigmaObs2") or \
- selfA._toStore("SimulationQuantiles") or \
- selfA._toStore("SimulatedObservationAtOptimum"):
- if selfA._toStore("SimulatedObservationAtCurrentState"):
- HXa = selfA.StoredVariables["SimulatedObservationAtCurrentState"][IndexMin]
- elif selfA._toStore("SimulatedObservationAtCurrentOptimum"):
- HXa = selfA.StoredVariables["SimulatedObservationAtCurrentOptimum"][-1]
- else:
- HXa = Hm( Xa )
- #
- # Calcul de la covariance d'analyse
- # ---------------------------------
- if selfA._toStore("APosterioriCovariance") or \
- selfA._toStore("SimulationQuantiles") or \
- selfA._toStore("JacobianMatrixAtOptimum") or \
- selfA._toStore("KalmanGainAtOptimum"):
- HtM = HO["Tangent"].asMatrix(ValueForMethodForm = Xa)
- HtM = HtM.reshape(Y.size,Xa.size) # ADAO & check shape
- if selfA._toStore("APosterioriCovariance") or \
- selfA._toStore("SimulationQuantiles") or \
- selfA._toStore("KalmanGainAtOptimum"):
- HaM = HO["Adjoint"].asMatrix(ValueForMethodForm = Xa)
- HaM = HaM.reshape(Xa.size,Y.size) # ADAO & check shape
- if selfA._toStore("APosterioriCovariance") or \
- selfA._toStore("SimulationQuantiles"):
- HessienneI = []
- nb = Xa.size
- for i in range(nb):
- _ee = numpy.matrix(numpy.zeros(nb)).T
- _ee[i] = 1.
- _HtEE = numpy.dot(HtM,_ee)
- _HtEE = numpy.asmatrix(numpy.ravel( _HtEE )).T
- HessienneI.append( numpy.ravel( BI*_ee + HaM * (RI * _HtEE) ) )
- HessienneI = numpy.matrix( HessienneI )
- A = HessienneI.I
- if min(A.shape) != max(A.shape):
- raise ValueError("The %s a posteriori covariance matrix A is of shape %s, despites it has to be a squared matrix. There is an error in the observation operator, please check it."%(selfA._name,str(A.shape)))
- if (numpy.diag(A) < 0).any():
- raise ValueError("The %s a posteriori covariance matrix A has at least one negative value on its diagonal. There is an error in the observation operator, please check it."%(selfA._name,))
- if logging.getLogger().level < logging.WARNING: # La verification n'a lieu qu'en debug
- try:
- L = numpy.linalg.cholesky( A )
- except:
- raise ValueError("The %s a posteriori covariance matrix A is not symmetric positive-definite. Please check your a priori covariances and your observation operator."%(selfA._name,))
- if selfA._toStore("APosterioriCovariance"):
- selfA.StoredVariables["APosterioriCovariance"].store( A )
- if selfA._toStore("JacobianMatrixAtOptimum"):
- selfA.StoredVariables["JacobianMatrixAtOptimum"].store( HtM )
- if selfA._toStore("KalmanGainAtOptimum"):
- if (Y.size <= Xb.size): KG = B * HaM * (R + numpy.dot(HtM, B * HaM)).I
- elif (Y.size > Xb.size): KG = (BI + numpy.dot(HaM, RI * HtM)).I * HaM * RI
- selfA.StoredVariables["KalmanGainAtOptimum"].store( KG )
- #
- # Calculs et/ou stockages supplémentaires
- # ---------------------------------------
- if selfA._toStore("Innovation") or \
- selfA._toStore("SigmaObs2") or \
- selfA._toStore("MahalanobisConsistency") or \
- selfA._toStore("OMB"):
- d = Y - HXb
- if selfA._toStore("Innovation"):
- selfA.StoredVariables["Innovation"].store( numpy.ravel(d) )
- if selfA._toStore("BMA"):
- selfA.StoredVariables["BMA"].store( numpy.ravel(Xb) - numpy.ravel(Xa) )
- if selfA._toStore("OMA"):
- selfA.StoredVariables["OMA"].store( numpy.ravel(Y) - numpy.ravel(HXa) )
- if selfA._toStore("OMB"):
- selfA.StoredVariables["OMB"].store( numpy.ravel(d) )
- if selfA._toStore("SigmaObs2"):
- TraceR = R.trace(Y.size)
- selfA.StoredVariables["SigmaObs2"].store( float( (d.T * (numpy.asmatrix(numpy.ravel(Y)).T-numpy.asmatrix(numpy.ravel(HXa)).T)) ) / TraceR )
- if selfA._toStore("MahalanobisConsistency"):
- selfA.StoredVariables["MahalanobisConsistency"].store( float( 2.*MinJ/d.size ) )
- if selfA._toStore("SimulationQuantiles"):
- QuantilesEstimations(selfA, A, Xa, HXa, Hm, HtM)
- if selfA._toStore("SimulatedObservationAtBackground"):
- selfA.StoredVariables["SimulatedObservationAtBackground"].store( numpy.ravel(HXb) )
- if selfA._toStore("SimulatedObservationAtOptimum"):
- selfA.StoredVariables["SimulatedObservationAtOptimum"].store( numpy.ravel(HXa) )
- #
- return 0
-
-# ==============================================================================
-def mlef(selfA, Xb, Y, U, HO, EM, CM, R, B, Q, VariantM="MLEF13",
- BnotT=False, _epsilon=1.e-3, _e=1.e-7, _jmax=15000):
- """
- Maximum Likelihood Ensemble Filter
- """
- if selfA._parameters["EstimationOf"] == "Parameters":
- selfA._parameters["StoreInternalVariables"] = True
- #
- # Opérateurs
- H = HO["Direct"].appliedControledFormTo
- #
- if selfA._parameters["EstimationOf"] == "State":
- M = EM["Direct"].appliedControledFormTo
- #
- if CM is not None and "Tangent" in CM and U is not None:
- Cm = CM["Tangent"].asMatrix(Xb)
- else:
- Cm = None
- #
- # Durée d'observation et tailles
- if hasattr(Y,"stepnumber"):
- duration = Y.stepnumber()
- __p = numpy.cumprod(Y.shape())[-1]
- else:
- duration = 2
- __p = numpy.array(Y).size
- #
- # Précalcul des inversions de B et R
- if selfA._parameters["StoreInternalVariables"] \
- or selfA._toStore("CostFunctionJ") \
- or selfA._toStore("CostFunctionJb") \
- or selfA._toStore("CostFunctionJo") \
- or selfA._toStore("CurrentOptimum") \
- or selfA._toStore("APosterioriCovariance"):
- BI = B.getI()
- RI = R.getI()
- #
- __n = Xb.size
- __m = selfA._parameters["NumberOfMembers"]
- #
- if len(selfA.StoredVariables["Analysis"])==0 or not selfA._parameters["nextStep"]:
- Xn = EnsembleOfBackgroundPerturbations( Xb, None, __m )
- selfA.StoredVariables["Analysis"].store( Xb )
- if selfA._toStore("APosterioriCovariance"):
- if hasattr(B,"asfullmatrix"):
- selfA.StoredVariables["APosterioriCovariance"].store( B.asfullmatrix(__n) )
- else:
- selfA.StoredVariables["APosterioriCovariance"].store( B )
- selfA._setInternalState("seed", numpy.random.get_state())
- elif selfA._parameters["nextStep"]:
- Xn = selfA._getInternalState("Xn")
- #
- previousJMinimum = numpy.finfo(float).max
- #
- for step in range(duration-1):
- numpy.random.set_state(selfA._getInternalState("seed"))
- if hasattr(Y,"store"):
- Ynpu = numpy.ravel( Y[step+1] ).reshape((__p,1))
- else:
- Ynpu = numpy.ravel( Y ).reshape((__p,1))
- #
- if U is not None:
- if hasattr(U,"store") and len(U)>1:
- Un = numpy.asmatrix(numpy.ravel( U[step] )).T
- elif hasattr(U,"store") and len(U)==1:
- Un = numpy.asmatrix(numpy.ravel( U[0] )).T
- else:
- Un = numpy.asmatrix(numpy.ravel( U )).T
- else:
- Un = None
- #
- if selfA._parameters["InflationType"] == "MultiplicativeOnBackgroundAnomalies":
- Xn = CovarianceInflation( Xn,
- selfA._parameters["InflationType"],
- selfA._parameters["InflationFactor"],
- )
- #
- if selfA._parameters["EstimationOf"] == "State": # Forecast + Q and observation of forecast
- EMX = M( [(Xn[:,i], Un) for i in range(__m)],
- argsAsSerie = True,
- returnSerieAsArrayMatrix = True )
- Xn_predicted = EnsemblePerturbationWithGivenCovariance( EMX, Q )
- if Cm is not None and Un is not None: # Attention : si Cm est aussi dans M, doublon !
- Cm = Cm.reshape(__n,Un.size) # ADAO & check shape
- Xn_predicted = Xn_predicted + Cm * Un
- elif selfA._parameters["EstimationOf"] == "Parameters": # Observation of forecast
- # --- > Par principe, M = Id, Q = 0
- Xn_predicted = EMX = Xn
- #
- #--------------------------
- if VariantM == "MLEF13":
- Xfm = numpy.ravel(Xn_predicted.mean(axis=1, dtype=mfp).astype('float'))
- EaX = EnsembleOfAnomalies( Xn_predicted, Xfm, 1./math.sqrt(__m-1) )
- Ua = numpy.identity(__m)
- __j = 0
- Deltaw = 1
- if not BnotT:
- Ta = numpy.identity(__m)
- vw = numpy.zeros(__m)
- while numpy.linalg.norm(Deltaw) >= _e and __j <= _jmax:
- vx1 = (Xfm + EaX @ vw).reshape((__n,1))
- #
- if BnotT:
- E1 = vx1 + _epsilon * EaX
- else:
- E1 = vx1 + math.sqrt(__m-1) * EaX @ Ta
- #
- HE2 = H( [(E1[:,i,numpy.newaxis], Un) for i in range(__m)],
- argsAsSerie = True,
- returnSerieAsArrayMatrix = True )
- vy2 = HE2.mean(axis=1, dtype=mfp).astype('float').reshape((__p,1))
- #
- if BnotT:
- EaY = (HE2 - vy2) / _epsilon
- else:
- EaY = ( (HE2 - vy2) @ numpy.linalg.inv(Ta) ) / math.sqrt(__m-1)
- #
- GradJ = numpy.ravel(vw[:,None] - EaY.transpose() @ (RI * ( Ynpu - vy2 )))
- mH = numpy.identity(__m) + EaY.transpose() @ (RI * EaY).reshape((-1,__m))
- Deltaw = - numpy.linalg.solve(mH,GradJ)
- #
- vw = vw + Deltaw
- #
- if not BnotT:
- Ta = numpy.real(scipy.linalg.sqrtm(numpy.linalg.inv( mH )))
- #
- __j = __j + 1
- #
- if BnotT:
- Ta = numpy.real(scipy.linalg.sqrtm(numpy.linalg.inv( mH )))
- #
- Xn = vx1 + math.sqrt(__m-1) * EaX @ Ta @ Ua
- #--------------------------
- else:
- raise ValueError("VariantM has to be chosen in the authorized methods list.")
- #
- if selfA._parameters["InflationType"] == "MultiplicativeOnAnalysisAnomalies":
- Xn = CovarianceInflation( Xn,
- selfA._parameters["InflationType"],
- selfA._parameters["InflationFactor"],
- )
- #
- Xa = Xn.mean(axis=1, dtype=mfp).astype('float').reshape((__n,1))
- #--------------------------
- selfA._setInternalState("Xn", Xn)
- selfA._setInternalState("seed", numpy.random.get_state())
- #--------------------------
- #
- if selfA._parameters["StoreInternalVariables"] \
- or selfA._toStore("CostFunctionJ") \
- or selfA._toStore("CostFunctionJb") \
- or selfA._toStore("CostFunctionJo") \
- or selfA._toStore("APosterioriCovariance") \
- or selfA._toStore("InnovationAtCurrentAnalysis") \
- or selfA._toStore("SimulatedObservationAtCurrentAnalysis") \
- or selfA._toStore("SimulatedObservationAtCurrentOptimum"):
- _HXa = numpy.asmatrix(numpy.ravel( H((Xa, Un)) )).T
- _Innovation = Ynpu - _HXa
- #
- selfA.StoredVariables["CurrentIterationNumber"].store( len(selfA.StoredVariables["Analysis"]) )
- # ---> avec analysis
- selfA.StoredVariables["Analysis"].store( Xa )
- if selfA._toStore("SimulatedObservationAtCurrentAnalysis"):
- selfA.StoredVariables["SimulatedObservationAtCurrentAnalysis"].store( _HXa )
- if selfA._toStore("InnovationAtCurrentAnalysis"):
- selfA.StoredVariables["InnovationAtCurrentAnalysis"].store( _Innovation )
- # ---> avec current state
- if selfA._parameters["StoreInternalVariables"] \
- or selfA._toStore("CurrentState"):
- selfA.StoredVariables["CurrentState"].store( Xn )
- if selfA._toStore("ForecastState"):
- selfA.StoredVariables["ForecastState"].store( EMX )
- if selfA._toStore("ForecastCovariance"):
- selfA.StoredVariables["ForecastCovariance"].store( EnsembleErrorCovariance(EMX) )
- if selfA._toStore("BMA"):
- selfA.StoredVariables["BMA"].store( EMX - Xa )
- if selfA._toStore("InnovationAtCurrentState"):
- selfA.StoredVariables["InnovationAtCurrentState"].store( - HE2 + Ynpu )
- if selfA._toStore("SimulatedObservationAtCurrentState") \
- or selfA._toStore("SimulatedObservationAtCurrentOptimum"):
- selfA.StoredVariables["SimulatedObservationAtCurrentState"].store( HE2 )
- # ---> autres
- if selfA._parameters["StoreInternalVariables"] \
- or selfA._toStore("CostFunctionJ") \
- or selfA._toStore("CostFunctionJb") \
- or selfA._toStore("CostFunctionJo") \
- or selfA._toStore("CurrentOptimum") \
- or selfA._toStore("APosterioriCovariance"):
- Jb = float( 0.5 * (Xa - Xb).T * BI * (Xa - Xb) )
- Jo = float( 0.5 * _Innovation.T * RI * _Innovation )
- J = Jb + Jo
- selfA.StoredVariables["CostFunctionJb"].store( Jb )
- selfA.StoredVariables["CostFunctionJo"].store( Jo )
- selfA.StoredVariables["CostFunctionJ" ].store( J )
- #
- if selfA._toStore("IndexOfOptimum") \
- or selfA._toStore("CurrentOptimum") \
- or selfA._toStore("CostFunctionJAtCurrentOptimum") \
- or selfA._toStore("CostFunctionJbAtCurrentOptimum") \
- or selfA._toStore("CostFunctionJoAtCurrentOptimum") \
- or selfA._toStore("SimulatedObservationAtCurrentOptimum"):
- IndexMin = numpy.argmin( selfA.StoredVariables["CostFunctionJ"][nbPreviousSteps:] ) + nbPreviousSteps
- if selfA._toStore("IndexOfOptimum"):
- selfA.StoredVariables["IndexOfOptimum"].store( IndexMin )
- if selfA._toStore("CurrentOptimum"):
- selfA.StoredVariables["CurrentOptimum"].store( selfA.StoredVariables["Analysis"][IndexMin] )
- if selfA._toStore("SimulatedObservationAtCurrentOptimum"):
- selfA.StoredVariables["SimulatedObservationAtCurrentOptimum"].store( selfA.StoredVariables["SimulatedObservationAtCurrentAnalysis"][IndexMin] )
- if selfA._toStore("CostFunctionJbAtCurrentOptimum"):
- selfA.StoredVariables["CostFunctionJbAtCurrentOptimum"].store( selfA.StoredVariables["CostFunctionJb"][IndexMin] )
- if selfA._toStore("CostFunctionJoAtCurrentOptimum"):
- selfA.StoredVariables["CostFunctionJoAtCurrentOptimum"].store( selfA.StoredVariables["CostFunctionJo"][IndexMin] )
- if selfA._toStore("CostFunctionJAtCurrentOptimum"):
- selfA.StoredVariables["CostFunctionJAtCurrentOptimum" ].store( selfA.StoredVariables["CostFunctionJ" ][IndexMin] )
- if selfA._toStore("APosterioriCovariance"):
- selfA.StoredVariables["APosterioriCovariance"].store( EnsembleErrorCovariance(Xn) )
- if selfA._parameters["EstimationOf"] == "Parameters" \
- and J < previousJMinimum:
- previousJMinimum = J
- XaMin = Xa
- if selfA._toStore("APosterioriCovariance"):
- covarianceXaMin = selfA.StoredVariables["APosterioriCovariance"][-1]
- #
- # Stockage final supplémentaire de l'optimum en estimation de paramètres
- # ----------------------------------------------------------------------
- if selfA._parameters["EstimationOf"] == "Parameters":
- selfA.StoredVariables["CurrentIterationNumber"].store( len(selfA.StoredVariables["Analysis"]) )
- selfA.StoredVariables["Analysis"].store( XaMin )
- if selfA._toStore("APosterioriCovariance"):
- selfA.StoredVariables["APosterioriCovariance"].store( covarianceXaMin )
- if selfA._toStore("BMA"):
- selfA.StoredVariables["BMA"].store( numpy.ravel(Xb) - numpy.ravel(XaMin) )
- #
- return 0
-
-# ==============================================================================
-def mmqr(
- func = None,
- x0 = None,
- fprime = None,
- bounds = None,
- quantile = 0.5,
- maxfun = 15000,
- toler = 1.e-06,
- y = None,
- ):
- """
- Implémentation informatique de l'algorithme MMQR, basée sur la publication :
- David R. Hunter, Kenneth Lange, "Quantile Regression via an MM Algorithm",
- Journal of Computational and Graphical Statistics, 9, 1, pp.60-77, 2000.
- """
- #
- # Recuperation des donnees et informations initiales
- # --------------------------------------------------
- variables = numpy.ravel( x0 )
- mesures = numpy.ravel( y )
- increment = sys.float_info[0]
- p = variables.size
- n = mesures.size
- quantile = float(quantile)
- #
- # Calcul des parametres du MM
- # ---------------------------
- tn = float(toler) / n
- e0 = -tn / math.log(tn)
- epsilon = (e0-tn)/(1+math.log(e0))
- #
- # Calculs d'initialisation
- # ------------------------
- residus = mesures - numpy.ravel( func( variables ) )
- poids = 1./(epsilon+numpy.abs(residus))
- veps = 1. - 2. * quantile - residus * poids
- lastsurrogate = -numpy.sum(residus*veps) - (1.-2.*quantile)*numpy.sum(residus)
- iteration = 0
- #
- # Recherche iterative
- # -------------------
- while (increment > toler) and (iteration < maxfun) :
- iteration += 1
- #
- Derivees = numpy.array(fprime(variables))
- Derivees = Derivees.reshape(n,p) # Necessaire pour remettre en place la matrice si elle passe par des tuyaux YACS
- DeriveesT = Derivees.transpose()
- M = numpy.dot( DeriveesT , (numpy.array(numpy.matrix(p*[poids,]).T)*Derivees) )
- SM = numpy.transpose(numpy.dot( DeriveesT , veps ))
- step = - numpy.linalg.lstsq( M, SM, rcond=-1 )[0]
- #
- variables = variables + step
- if bounds is not None:
- # Attention : boucle infinie à éviter si un intervalle est trop petit
- while( (variables < numpy.ravel(numpy.asmatrix(bounds)[:,0])).any() or (variables > numpy.ravel(numpy.asmatrix(bounds)[:,1])).any() ):
- step = step/2.
- variables = variables - step
- residus = mesures - numpy.ravel( func(variables) )
- surrogate = numpy.sum(residus**2 * poids) + (4.*quantile-2.) * numpy.sum(residus)
- #
- while ( (surrogate > lastsurrogate) and ( max(list(numpy.abs(step))) > 1.e-16 ) ) :
- step = step/2.
- variables = variables - step
- residus = mesures - numpy.ravel( func(variables) )
- surrogate = numpy.sum(residus**2 * poids) + (4.*quantile-2.) * numpy.sum(residus)
- #
- increment = lastsurrogate-surrogate
- poids = 1./(epsilon+numpy.abs(residus))
- veps = 1. - 2. * quantile - residus * poids
- lastsurrogate = -numpy.sum(residus * veps) - (1.-2.*quantile)*numpy.sum(residus)
- #
- # Mesure d'écart
- # --------------
- Ecart = quantile * numpy.sum(residus) - numpy.sum( residus[residus<0] )
- #
- return variables, Ecart, [n,p,iteration,increment,0]
-
-# ==============================================================================
-def multi3dvar(selfA, Xb, Y, U, HO, EM, CM, R, B, Q, oneCycle):
- """
- 3DVAR multi-pas et multi-méthodes
- """
- #
- # Initialisation
- if selfA._parameters["EstimationOf"] == "State":
- M = EM["Direct"].appliedTo
- #
- if len(selfA.StoredVariables["Analysis"])==0 or not selfA._parameters["nextStep"]:
- Xn = numpy.ravel(Xb).reshape((-1,1))
- selfA.StoredVariables["Analysis"].store( Xn )
- if selfA._toStore("APosterioriCovariance"):
- if hasattr(B,"asfullmatrix"):
- selfA.StoredVariables["APosterioriCovariance"].store( B.asfullmatrix(Xn.size) )
- else:
- selfA.StoredVariables["APosterioriCovariance"].store( B )
- if selfA._toStore("ForecastState"):
- selfA.StoredVariables["ForecastState"].store( Xn )
- elif selfA._parameters["nextStep"]:
- Xn = selfA._getInternalState("Xn")
- else:
- Xn = numpy.ravel(Xb).reshape((-1,1))
- #
- if hasattr(Y,"stepnumber"):
- duration = Y.stepnumber()
- else:
- duration = 2
- #
- # Multi-pas
- for step in range(duration-1):
- if hasattr(Y,"store"):
- Ynpu = numpy.ravel( Y[step+1] ).reshape((-1,1))
- else:
- Ynpu = numpy.ravel( Y ).reshape((-1,1))
- #
- if selfA._parameters["EstimationOf"] == "State": # Forecast
- Xn_predicted = M( Xn )
- if selfA._toStore("ForecastState"):
- selfA.StoredVariables["ForecastState"].store( Xn_predicted )
- elif selfA._parameters["EstimationOf"] == "Parameters": # No forecast
- # --- > Par principe, M = Id, Q = 0
- Xn_predicted = Xn
- Xn_predicted = numpy.ravel(Xn_predicted).reshape((-1,1))
- #
- oneCycle(selfA, Xn_predicted, Ynpu, U, HO, None, None, R, B, None)
- #
- Xn = selfA.StoredVariables["Analysis"][-1]
- #--------------------------
- selfA._setInternalState("Xn", Xn)
- #
- return 0
-
-# ==============================================================================
-def psas3dvar(selfA, Xb, Y, U, HO, EM, CM, R, B, Q):
- """
- 3DVAR PSAS
- """
- #
- # Initialisations
- # ---------------
- #
- # Opérateurs
- Hm = HO["Direct"].appliedTo
- #
- # Utilisation éventuelle d'un vecteur H(Xb) précalculé
- if HO["AppliedInX"] is not None and "HXb" in HO["AppliedInX"]:
- HXb = Hm( Xb, HO["AppliedInX"]["HXb"] )
- else:
- HXb = Hm( Xb )
- HXb = numpy.asmatrix(numpy.ravel( HXb )).T
- if Y.size != HXb.size:
- raise ValueError("The size %i of observations Y and %i of observed calculation H(X) are different, they have to be identical."%(Y.size,HXb.size))
- if max(Y.shape) != max(HXb.shape):
- raise ValueError("The shapes %s of observations Y and %s of observed calculation H(X) are different, they have to be identical."%(Y.shape,HXb.shape))
- #
- if selfA._toStore("JacobianMatrixAtBackground"):
- HtMb = HO["Tangent"].asMatrix(ValueForMethodForm = Xb)
- HtMb = HtMb.reshape(Y.size,Xb.size) # ADAO & check shape
- selfA.StoredVariables["JacobianMatrixAtBackground"].store( HtMb )
- #
- Ht = HO["Tangent"].asMatrix(Xb)
- BHT = B * Ht.T
- HBHTpR = R + Ht * BHT
- Innovation = Y - HXb
- #
- # Point de démarrage de l'optimisation
- Xini = numpy.zeros(Xb.shape)
- #
- # Définition de la fonction-coût
- # ------------------------------
- def CostFunction(w):
- _W = numpy.asmatrix(numpy.ravel( w )).T
- if selfA._parameters["StoreInternalVariables"] or \
- selfA._toStore("CurrentState") or \
- selfA._toStore("CurrentOptimum"):
- selfA.StoredVariables["CurrentState"].store( Xb + BHT * _W )
- if selfA._toStore("SimulatedObservationAtCurrentState") or \
- selfA._toStore("SimulatedObservationAtCurrentOptimum"):
- selfA.StoredVariables["SimulatedObservationAtCurrentState"].store( Hm( Xb + BHT * _W ) )
- if selfA._toStore("InnovationAtCurrentState"):
- selfA.StoredVariables["InnovationAtCurrentState"].store( Innovation )
- #
- Jb = float( 0.5 * _W.T * HBHTpR * _W )
- Jo = float( - _W.T * Innovation )
- J = Jb + Jo
- #
- selfA.StoredVariables["CurrentIterationNumber"].store( len(selfA.StoredVariables["CostFunctionJ"]) )
- selfA.StoredVariables["CostFunctionJb"].store( Jb )
- selfA.StoredVariables["CostFunctionJo"].store( Jo )
- selfA.StoredVariables["CostFunctionJ" ].store( J )
- if selfA._toStore("IndexOfOptimum") or \
- selfA._toStore("CurrentOptimum") or \
- selfA._toStore("CostFunctionJAtCurrentOptimum") or \
- selfA._toStore("CostFunctionJbAtCurrentOptimum") or \
- selfA._toStore("CostFunctionJoAtCurrentOptimum") or \
- selfA._toStore("SimulatedObservationAtCurrentOptimum"):
- IndexMin = numpy.argmin( selfA.StoredVariables["CostFunctionJ"][nbPreviousSteps:] ) + nbPreviousSteps
- if selfA._toStore("IndexOfOptimum"):
- selfA.StoredVariables["IndexOfOptimum"].store( IndexMin )
- if selfA._toStore("CurrentOptimum"):
- selfA.StoredVariables["CurrentOptimum"].store( selfA.StoredVariables["CurrentState"][IndexMin] )
- if selfA._toStore("SimulatedObservationAtCurrentOptimum"):
- selfA.StoredVariables["SimulatedObservationAtCurrentOptimum"].store( selfA.StoredVariables["SimulatedObservationAtCurrentState"][IndexMin] )
- if selfA._toStore("CostFunctionJbAtCurrentOptimum"):
- selfA.StoredVariables["CostFunctionJbAtCurrentOptimum"].store( selfA.StoredVariables["CostFunctionJb"][IndexMin] )
- if selfA._toStore("CostFunctionJoAtCurrentOptimum"):
- selfA.StoredVariables["CostFunctionJoAtCurrentOptimum"].store( selfA.StoredVariables["CostFunctionJo"][IndexMin] )
- if selfA._toStore("CostFunctionJAtCurrentOptimum"):
- selfA.StoredVariables["CostFunctionJAtCurrentOptimum" ].store( selfA.StoredVariables["CostFunctionJ" ][IndexMin] )
- return J
- #
- def GradientOfCostFunction(w):
- _W = numpy.asmatrix(numpy.ravel( w )).T
- GradJb = HBHTpR * _W
- GradJo = - Innovation
- GradJ = numpy.ravel( GradJb ) + numpy.ravel( GradJo )
- return GradJ
- #
- # Minimisation de la fonctionnelle
- # --------------------------------
- nbPreviousSteps = selfA.StoredVariables["CostFunctionJ"].stepnumber()
- #
- if selfA._parameters["Minimizer"] == "LBFGSB":
- if "0.19" <= scipy.version.version <= "1.1.0":
- import lbfgsbhlt as optimiseur
- else:
- import scipy.optimize as optimiseur
- Minimum, J_optimal, Informations = optimiseur.fmin_l_bfgs_b(
- func = CostFunction,
- x0 = Xini,
- fprime = GradientOfCostFunction,
- args = (),
- bounds = RecentredBounds(selfA._parameters["Bounds"], Xb),
- maxfun = selfA._parameters["MaximumNumberOfSteps"]-1,
- factr = selfA._parameters["CostDecrementTolerance"]*1.e14,
- pgtol = selfA._parameters["ProjectedGradientTolerance"],
- iprint = selfA._parameters["optiprint"],
- )
- nfeval = Informations['funcalls']
- rc = Informations['warnflag']
- elif selfA._parameters["Minimizer"] == "TNC":
- Minimum, nfeval, rc = scipy.optimize.fmin_tnc(
- func = CostFunction,
- x0 = Xini,
- fprime = GradientOfCostFunction,
- args = (),
- bounds = RecentredBounds(selfA._parameters["Bounds"], Xb),
- maxfun = selfA._parameters["MaximumNumberOfSteps"],
- pgtol = selfA._parameters["ProjectedGradientTolerance"],
- ftol = selfA._parameters["CostDecrementTolerance"],
- messages = selfA._parameters["optmessages"],
- )
- elif selfA._parameters["Minimizer"] == "CG":
- Minimum, fopt, nfeval, grad_calls, rc = scipy.optimize.fmin_cg(
- f = CostFunction,
- x0 = Xini,
- fprime = GradientOfCostFunction,
- args = (),
- maxiter = selfA._parameters["MaximumNumberOfSteps"],
- gtol = selfA._parameters["GradientNormTolerance"],
- disp = selfA._parameters["optdisp"],
- full_output = True,
- )
- elif selfA._parameters["Minimizer"] == "NCG":
- Minimum, fopt, nfeval, grad_calls, hcalls, rc = scipy.optimize.fmin_ncg(
- f = CostFunction,
- x0 = Xini,
- fprime = GradientOfCostFunction,
- args = (),
- maxiter = selfA._parameters["MaximumNumberOfSteps"],
- avextol = selfA._parameters["CostDecrementTolerance"],
- disp = selfA._parameters["optdisp"],
- full_output = True,
- )
- elif selfA._parameters["Minimizer"] == "BFGS":
- Minimum, fopt, gopt, Hopt, nfeval, grad_calls, rc = scipy.optimize.fmin_bfgs(
- f = CostFunction,
- x0 = Xini,
- fprime = GradientOfCostFunction,
- args = (),
- maxiter = selfA._parameters["MaximumNumberOfSteps"],
- gtol = selfA._parameters["GradientNormTolerance"],
- disp = selfA._parameters["optdisp"],
- full_output = True,
- )
- else:
- raise ValueError("Error in Minimizer name: %s"%selfA._parameters["Minimizer"])
- #
- IndexMin = numpy.argmin( selfA.StoredVariables["CostFunctionJ"][nbPreviousSteps:] ) + nbPreviousSteps
- MinJ = selfA.StoredVariables["CostFunctionJ"][IndexMin]
- #
- # Correction pour pallier a un bug de TNC sur le retour du Minimum
- # ----------------------------------------------------------------
- if selfA._parameters["StoreInternalVariables"] or selfA._toStore("CurrentState"):
- Minimum = selfA.StoredVariables["CurrentState"][IndexMin]
- Minimum = numpy.asmatrix(numpy.ravel( Minimum )).T
- else:
- Minimum = Xb + BHT * numpy.asmatrix(numpy.ravel( Minimum )).T
- #
- # Obtention de l'analyse
- # ----------------------
- Xa = Minimum
- #
- selfA.StoredVariables["Analysis"].store( Xa )
- #
- if selfA._toStore("OMA") or \
- selfA._toStore("SigmaObs2") or \
- selfA._toStore("SimulationQuantiles") or \
- selfA._toStore("SimulatedObservationAtOptimum"):
- if selfA._toStore("SimulatedObservationAtCurrentState"):
- HXa = selfA.StoredVariables["SimulatedObservationAtCurrentState"][IndexMin]
- elif selfA._toStore("SimulatedObservationAtCurrentOptimum"):
- HXa = selfA.StoredVariables["SimulatedObservationAtCurrentOptimum"][-1]
- else:
- HXa = Hm( Xa )
- #
- # Calcul de la covariance d'analyse
- # ---------------------------------
- if selfA._toStore("APosterioriCovariance") or \
- selfA._toStore("SimulationQuantiles") or \
- selfA._toStore("JacobianMatrixAtOptimum") or \
- selfA._toStore("KalmanGainAtOptimum"):
- HtM = HO["Tangent"].asMatrix(ValueForMethodForm = Xa)
- HtM = HtM.reshape(Y.size,Xa.size) # ADAO & check shape
- if selfA._toStore("APosterioriCovariance") or \
- selfA._toStore("SimulationQuantiles") or \
- selfA._toStore("KalmanGainAtOptimum"):
- HaM = HO["Adjoint"].asMatrix(ValueForMethodForm = Xa)
- HaM = HaM.reshape(Xa.size,Y.size) # ADAO & check shape
- if selfA._toStore("APosterioriCovariance") or \
- selfA._toStore("SimulationQuantiles"):
- BI = B.getI()
- RI = R.getI()
- HessienneI = []
- nb = Xa.size
- for i in range(nb):
- _ee = numpy.matrix(numpy.zeros(nb)).T
- _ee[i] = 1.
- _HtEE = numpy.dot(HtM,_ee)
- _HtEE = numpy.asmatrix(numpy.ravel( _HtEE )).T
- HessienneI.append( numpy.ravel( BI*_ee + HaM * (RI * _HtEE) ) )
- HessienneI = numpy.matrix( HessienneI )
- A = HessienneI.I
- if min(A.shape) != max(A.shape):
- raise ValueError("The %s a posteriori covariance matrix A is of shape %s, despites it has to be a squared matrix. There is an error in the observation operator, please check it."%(selfA._name,str(A.shape)))
- if (numpy.diag(A) < 0).any():
- raise ValueError("The %s a posteriori covariance matrix A has at least one negative value on its diagonal. There is an error in the observation operator, please check it."%(selfA._name,))
- if logging.getLogger().level < logging.WARNING: # La verification n'a lieu qu'en debug
- try:
- L = numpy.linalg.cholesky( A )
- except:
- raise ValueError("The %s a posteriori covariance matrix A is not symmetric positive-definite. Please check your a priori covariances and your observation operator."%(selfA._name,))
- if selfA._toStore("APosterioriCovariance"):
- selfA.StoredVariables["APosterioriCovariance"].store( A )
- if selfA._toStore("JacobianMatrixAtOptimum"):
- selfA.StoredVariables["JacobianMatrixAtOptimum"].store( HtM )
- if selfA._toStore("KalmanGainAtOptimum"):
- if (Y.size <= Xb.size): KG = B * HaM * (R + numpy.dot(HtM, B * HaM)).I
- elif (Y.size > Xb.size): KG = (BI + numpy.dot(HaM, RI * HtM)).I * HaM * RI
- selfA.StoredVariables["KalmanGainAtOptimum"].store( KG )
- #
- # Calculs et/ou stockages supplémentaires
- # ---------------------------------------
- if selfA._toStore("Innovation") or \
- selfA._toStore("SigmaObs2") or \
- selfA._toStore("MahalanobisConsistency") or \
- selfA._toStore("OMB"):
- d = Y - HXb
- if selfA._toStore("Innovation"):
- selfA.StoredVariables["Innovation"].store( numpy.ravel(d) )
- if selfA._toStore("BMA"):
- selfA.StoredVariables["BMA"].store( numpy.ravel(Xb) - numpy.ravel(Xa) )
- if selfA._toStore("OMA"):
- selfA.StoredVariables["OMA"].store( numpy.ravel(Y) - numpy.ravel(HXa) )
- if selfA._toStore("OMB"):
- selfA.StoredVariables["OMB"].store( numpy.ravel(d) )
- if selfA._toStore("SigmaObs2"):
- TraceR = R.trace(Y.size)
- selfA.StoredVariables["SigmaObs2"].store( float( (d.T * (numpy.asmatrix(numpy.ravel(Y)).T-numpy.asmatrix(numpy.ravel(HXa)).T)) ) / TraceR )
- if selfA._toStore("MahalanobisConsistency"):
- selfA.StoredVariables["MahalanobisConsistency"].store( float( 2.*MinJ/d.size ) )
- if selfA._toStore("SimulationQuantiles"):
- QuantilesEstimations(selfA, A, Xa, HXa, Hm, HtM)
- if selfA._toStore("SimulatedObservationAtBackground"):
- selfA.StoredVariables["SimulatedObservationAtBackground"].store( numpy.ravel(HXb) )
- if selfA._toStore("SimulatedObservationAtOptimum"):
- selfA.StoredVariables["SimulatedObservationAtOptimum"].store( numpy.ravel(HXa) )
- #
- return 0
-
-# ==============================================================================
-def senkf(selfA, Xb, Y, U, HO, EM, CM, R, B, Q, VariantM="KalmanFilterFormula16"):
- """
- Stochastic EnKF
- """
- if selfA._parameters["EstimationOf"] == "Parameters":
- selfA._parameters["StoreInternalVariables"] = True
- #
- # Opérateurs
- H = HO["Direct"].appliedControledFormTo
- #
- if selfA._parameters["EstimationOf"] == "State":
- M = EM["Direct"].appliedControledFormTo
- #
- if CM is not None and "Tangent" in CM and U is not None:
- Cm = CM["Tangent"].asMatrix(Xb)
- else:
- Cm = None
- #
- # Durée d'observation et tailles
- if hasattr(Y,"stepnumber"):
- duration = Y.stepnumber()
- __p = numpy.cumprod(Y.shape())[-1]
- else:
- duration = 2
- __p = numpy.array(Y).size
- #
- # Précalcul des inversions de B et R
- if selfA._parameters["StoreInternalVariables"] \
- or selfA._toStore("CostFunctionJ") \
- or selfA._toStore("CostFunctionJb") \
- or selfA._toStore("CostFunctionJo") \
- or selfA._toStore("CurrentOptimum") \
- or selfA._toStore("APosterioriCovariance"):
- BI = B.getI()
- RI = R.getI()
- #
- __n = Xb.size
- __m = selfA._parameters["NumberOfMembers"]
- #
- if hasattr(R,"asfullmatrix"): Rn = R.asfullmatrix(__p)
- else: Rn = R
- #
- if len(selfA.StoredVariables["Analysis"])==0 or not selfA._parameters["nextStep"]:
- Xn = EnsembleOfBackgroundPerturbations( Xb, None, __m )
- selfA.StoredVariables["Analysis"].store( Xb )
- if selfA._toStore("APosterioriCovariance"):
- if hasattr(B,"asfullmatrix"):
- selfA.StoredVariables["APosterioriCovariance"].store( B.asfullmatrix(__n) )
- else:
- selfA.StoredVariables["APosterioriCovariance"].store( B )
- selfA._setInternalState("seed", numpy.random.get_state())
- elif selfA._parameters["nextStep"]:
- Xn = selfA._getInternalState("Xn")
- #
- previousJMinimum = numpy.finfo(float).max
- #
- for step in range(duration-1):
- numpy.random.set_state(selfA._getInternalState("seed"))
- if hasattr(Y,"store"):
- Ynpu = numpy.ravel( Y[step+1] ).reshape((__p,1))
- else:
- Ynpu = numpy.ravel( Y ).reshape((__p,1))
- #
- if U is not None:
- if hasattr(U,"store") and len(U)>1:
- Un = numpy.asmatrix(numpy.ravel( U[step] )).T
- elif hasattr(U,"store") and len(U)==1:
- Un = numpy.asmatrix(numpy.ravel( U[0] )).T
- else:
- Un = numpy.asmatrix(numpy.ravel( U )).T
- else:
- Un = None
- #
- if selfA._parameters["InflationType"] == "MultiplicativeOnBackgroundAnomalies":
- Xn = CovarianceInflation( Xn,
- selfA._parameters["InflationType"],
- selfA._parameters["InflationFactor"],
- )
- #
- if selfA._parameters["EstimationOf"] == "State": # Forecast + Q and observation of forecast
- EMX = M( [(Xn[:,i], Un) for i in range(__m)],
- argsAsSerie = True,
- returnSerieAsArrayMatrix = True )
- Xn_predicted = EnsemblePerturbationWithGivenCovariance( EMX, Q )
- HX_predicted = H( [(Xn_predicted[:,i], Un) for i in range(__m)],
- argsAsSerie = True,
- returnSerieAsArrayMatrix = True )
- if Cm is not None and Un is not None: # Attention : si Cm est aussi dans M, doublon !
- Cm = Cm.reshape(__n,Un.size) # ADAO & check shape
- Xn_predicted = Xn_predicted + Cm * Un
- elif selfA._parameters["EstimationOf"] == "Parameters": # Observation of forecast
- # --- > Par principe, M = Id, Q = 0
- Xn_predicted = EMX = Xn
- HX_predicted = H( [(Xn_predicted[:,i], Un) for i in range(__m)],
- argsAsSerie = True,
- returnSerieAsArrayMatrix = True )
- #
- # Mean of forecast and observation of forecast
- Xfm = Xn_predicted.mean(axis=1, dtype=mfp).astype('float').reshape((__n,1))
- Hfm = HX_predicted.mean(axis=1, dtype=mfp).astype('float').reshape((__p,1))
- #
- #--------------------------
- if VariantM == "KalmanFilterFormula05":
- PfHT, HPfHT = 0., 0.
- for i in range(__m):
- Exfi = Xn_predicted[:,i].reshape((__n,1)) - Xfm
- Eyfi = HX_predicted[:,i].reshape((__p,1)) - Hfm
- PfHT += Exfi * Eyfi.T
- HPfHT += Eyfi * Eyfi.T
- PfHT = (1./(__m-1)) * PfHT
- HPfHT = (1./(__m-1)) * HPfHT
- Kn = PfHT * ( R + HPfHT ).I
- del PfHT, HPfHT
- #
- for i in range(__m):
- ri = numpy.random.multivariate_normal(numpy.zeros(__p), Rn)
- Xn[:,i] = numpy.ravel(Xn_predicted[:,i]) + Kn @ (numpy.ravel(Ynpu) + ri - HX_predicted[:,i])
- #--------------------------
- elif VariantM == "KalmanFilterFormula16":
- EpY = EnsembleOfCenteredPerturbations(Ynpu, Rn, __m)
- EpYm = EpY.mean(axis=1, dtype=mfp).astype('float').reshape((__p,1))
- #
- EaX = EnsembleOfAnomalies( Xn_predicted ) / math.sqrt(__m-1)
- EaY = (HX_predicted - Hfm - EpY + EpYm) / math.sqrt(__m-1)
- #
- Kn = EaX @ EaY.T @ numpy.linalg.inv( EaY @ EaY.T)
- #
- for i in range(__m):
- Xn[:,i] = numpy.ravel(Xn_predicted[:,i]) + Kn @ (numpy.ravel(EpY[:,i]) - HX_predicted[:,i])
- #--------------------------
- else:
- raise ValueError("VariantM has to be chosen in the authorized methods list.")
- #
- if selfA._parameters["InflationType"] == "MultiplicativeOnAnalysisAnomalies":
- Xn = CovarianceInflation( Xn,
- selfA._parameters["InflationType"],
- selfA._parameters["InflationFactor"],
- )
- #
- Xa = Xn.mean(axis=1, dtype=mfp).astype('float').reshape((__n,1))
- #--------------------------
- selfA._setInternalState("Xn", Xn)
- selfA._setInternalState("seed", numpy.random.get_state())
- #--------------------------
- #
- if selfA._parameters["StoreInternalVariables"] \
- or selfA._toStore("CostFunctionJ") \
- or selfA._toStore("CostFunctionJb") \
- or selfA._toStore("CostFunctionJo") \
- or selfA._toStore("APosterioriCovariance") \
- or selfA._toStore("InnovationAtCurrentAnalysis") \
- or selfA._toStore("SimulatedObservationAtCurrentAnalysis") \
- or selfA._toStore("SimulatedObservationAtCurrentOptimum"):
- _HXa = numpy.asmatrix(numpy.ravel( H((Xa, Un)) )).T
- _Innovation = Ynpu - _HXa
- #
- selfA.StoredVariables["CurrentIterationNumber"].store( len(selfA.StoredVariables["Analysis"]) )
- # ---> avec analysis
- selfA.StoredVariables["Analysis"].store( Xa )
- if selfA._toStore("SimulatedObservationAtCurrentAnalysis"):
- selfA.StoredVariables["SimulatedObservationAtCurrentAnalysis"].store( _HXa )
- if selfA._toStore("InnovationAtCurrentAnalysis"):
- selfA.StoredVariables["InnovationAtCurrentAnalysis"].store( _Innovation )
- # ---> avec current state
- if selfA._parameters["StoreInternalVariables"] \
- or selfA._toStore("CurrentState"):
- selfA.StoredVariables["CurrentState"].store( Xn )
- if selfA._toStore("ForecastState"):
- selfA.StoredVariables["ForecastState"].store( EMX )
- if selfA._toStore("ForecastCovariance"):
- selfA.StoredVariables["ForecastCovariance"].store( EnsembleErrorCovariance(EMX) )
- if selfA._toStore("BMA"):
- selfA.StoredVariables["BMA"].store( EMX - Xa )
- if selfA._toStore("InnovationAtCurrentState"):
- selfA.StoredVariables["InnovationAtCurrentState"].store( - HX_predicted + Ynpu )
- if selfA._toStore("SimulatedObservationAtCurrentState") \
- or selfA._toStore("SimulatedObservationAtCurrentOptimum"):
- selfA.StoredVariables["SimulatedObservationAtCurrentState"].store( HX_predicted )
- # ---> autres
- if selfA._parameters["StoreInternalVariables"] \
- or selfA._toStore("CostFunctionJ") \
- or selfA._toStore("CostFunctionJb") \
- or selfA._toStore("CostFunctionJo") \
- or selfA._toStore("CurrentOptimum") \
- or selfA._toStore("APosterioriCovariance"):
- Jb = float( 0.5 * (Xa - Xb).T * BI * (Xa - Xb) )
- Jo = float( 0.5 * _Innovation.T * RI * _Innovation )
- J = Jb + Jo
- selfA.StoredVariables["CostFunctionJb"].store( Jb )
- selfA.StoredVariables["CostFunctionJo"].store( Jo )
- selfA.StoredVariables["CostFunctionJ" ].store( J )
- #
- if selfA._toStore("IndexOfOptimum") \
- or selfA._toStore("CurrentOptimum") \
- or selfA._toStore("CostFunctionJAtCurrentOptimum") \
- or selfA._toStore("CostFunctionJbAtCurrentOptimum") \
- or selfA._toStore("CostFunctionJoAtCurrentOptimum") \
- or selfA._toStore("SimulatedObservationAtCurrentOptimum"):
- IndexMin = numpy.argmin( selfA.StoredVariables["CostFunctionJ"][nbPreviousSteps:] ) + nbPreviousSteps
- if selfA._toStore("IndexOfOptimum"):
- selfA.StoredVariables["IndexOfOptimum"].store( IndexMin )
- if selfA._toStore("CurrentOptimum"):
- selfA.StoredVariables["CurrentOptimum"].store( selfA.StoredVariables["Analysis"][IndexMin] )
- if selfA._toStore("SimulatedObservationAtCurrentOptimum"):
- selfA.StoredVariables["SimulatedObservationAtCurrentOptimum"].store( selfA.StoredVariables["SimulatedObservationAtCurrentAnalysis"][IndexMin] )
- if selfA._toStore("CostFunctionJbAtCurrentOptimum"):
- selfA.StoredVariables["CostFunctionJbAtCurrentOptimum"].store( selfA.StoredVariables["CostFunctionJb"][IndexMin] )
- if selfA._toStore("CostFunctionJoAtCurrentOptimum"):
- selfA.StoredVariables["CostFunctionJoAtCurrentOptimum"].store( selfA.StoredVariables["CostFunctionJo"][IndexMin] )
- if selfA._toStore("CostFunctionJAtCurrentOptimum"):
- selfA.StoredVariables["CostFunctionJAtCurrentOptimum" ].store( selfA.StoredVariables["CostFunctionJ" ][IndexMin] )
- if selfA._toStore("APosterioriCovariance"):
- selfA.StoredVariables["APosterioriCovariance"].store( EnsembleErrorCovariance(Xn) )
- if selfA._parameters["EstimationOf"] == "Parameters" \
- and J < previousJMinimum:
- previousJMinimum = J
- XaMin = Xa
- if selfA._toStore("APosterioriCovariance"):
- covarianceXaMin = selfA.StoredVariables["APosterioriCovariance"][-1]
- #
- # Stockage final supplémentaire de l'optimum en estimation de paramètres
- # ----------------------------------------------------------------------
- if selfA._parameters["EstimationOf"] == "Parameters":
- selfA.StoredVariables["CurrentIterationNumber"].store( len(selfA.StoredVariables["Analysis"]) )
- selfA.StoredVariables["Analysis"].store( XaMin )
- if selfA._toStore("APosterioriCovariance"):
- selfA.StoredVariables["APosterioriCovariance"].store( covarianceXaMin )
- if selfA._toStore("BMA"):
- selfA.StoredVariables["BMA"].store( numpy.ravel(Xb) - numpy.ravel(XaMin) )
- #
- return 0
-
-# ==============================================================================
-def stdkf(selfA, Xb, Y, U, HO, EM, CM, R, B, Q):
- """
- Standard Kalman Filter
- """
- if selfA._parameters["EstimationOf"] == "Parameters":
- selfA._parameters["StoreInternalVariables"] = True
- #
- # Opérateurs
- # ----------
- Ht = HO["Tangent"].asMatrix(Xb)
- Ha = HO["Adjoint"].asMatrix(Xb)
- #
- if selfA._parameters["EstimationOf"] == "State":
- Mt = EM["Tangent"].asMatrix(Xb)
- Ma = EM["Adjoint"].asMatrix(Xb)
- #
- if CM is not None and "Tangent" in CM and U is not None:
- Cm = CM["Tangent"].asMatrix(Xb)
- else:
- Cm = None
- #
- # Durée d'observation et tailles
- if hasattr(Y,"stepnumber"):
- duration = Y.stepnumber()
- __p = numpy.cumprod(Y.shape())[-1]
- else:
- duration = 2
- __p = numpy.array(Y).size
- #
- # Précalcul des inversions de B et R
- if selfA._parameters["StoreInternalVariables"] \
- or selfA._toStore("CostFunctionJ") \
- or selfA._toStore("CostFunctionJb") \
- or selfA._toStore("CostFunctionJo") \
- or selfA._toStore("CurrentOptimum") \
- or selfA._toStore("APosterioriCovariance"):
- BI = B.getI()
- RI = R.getI()
- #
- __n = Xb.size
- #
- if len(selfA.StoredVariables["Analysis"])==0 or not selfA._parameters["nextStep"]:
- Xn = Xb
- Pn = B
- selfA.StoredVariables["CurrentIterationNumber"].store( len(selfA.StoredVariables["Analysis"]) )
- selfA.StoredVariables["Analysis"].store( Xb )
- if selfA._toStore("APosterioriCovariance"):
- if hasattr(B,"asfullmatrix"):
- selfA.StoredVariables["APosterioriCovariance"].store( B.asfullmatrix(__n) )
- else:
- selfA.StoredVariables["APosterioriCovariance"].store( B )
- selfA._setInternalState("seed", numpy.random.get_state())
- elif selfA._parameters["nextStep"]: