return numpy.asarray(__Ensemble).mean(axis=1, dtype=mfp).astype('float').reshape((-1,1))
# ==============================================================================
-def EnsembleOfAnomalies( Ensemble, OptMean = None, Normalisation = 1.):
+def EnsembleOfAnomalies( __Ensemble, __OptMean = None, __Normalisation = 1.):
"Renvoie les anomalies centrées à partir d'un ensemble"
- if OptMean is None:
- __Em = EnsembleMean( Ensemble )
+ if __OptMean is None:
+ __Em = EnsembleMean( __Ensemble )
else:
- __Em = numpy.ravel(OptMean).reshape((-1,1))
+ __Em = numpy.ravel( __OptMean ).reshape((-1,1))
#
- return Normalisation * (numpy.asarray(Ensemble) - __Em)
+ return __Normalisation * (numpy.asarray( __Ensemble ) - __Em)
# ==============================================================================
-def EnsembleErrorCovariance( Ensemble, __quick = False ):
+def EnsembleErrorCovariance( __Ensemble, __quick = False ):
"Renvoie l'estimation empirique de la covariance d'ensemble"
if __quick:
# Covariance rapide mais rarement définie positive
- __Covariance = numpy.cov(Ensemble)
+ __Covariance = numpy.cov( __Ensemble )
else:
# Résultat souvent identique à numpy.cov, mais plus robuste
- __n, __m = numpy.asarray(Ensemble).shape
- __Anomalies = EnsembleOfAnomalies( Ensemble )
+ __n, __m = numpy.asarray( __Ensemble ).shape
+ __Anomalies = EnsembleOfAnomalies( __Ensemble )
# Estimation empirique
- __Covariance = (__Anomalies @ __Anomalies.T) / (__m-1)
+ __Covariance = ( __Anomalies @ __Anomalies.T ) / (__m-1)
# Assure la symétrie
- __Covariance = (__Covariance + __Covariance.T) * 0.5
+ __Covariance = ( __Covariance + __Covariance.T ) * 0.5
# Assure la positivité
- __epsilon = mpr*numpy.trace(__Covariance)
+ __epsilon = mpr*numpy.trace( __Covariance )
__Covariance = __Covariance + __epsilon * numpy.identity(__n)
#
return __Covariance
else:
LBounds = None
if LBounds is not None:
- def NoneRemove(paire):
- bmin, bmax = paire
- if bmin is None: bmin = numpy.finfo('float').min
- if bmax is None: bmax = numpy.finfo('float').max
- return [bmin, bmax]
- LBounds = numpy.matrix( [NoneRemove(paire) for paire in LBounds] )
+ LBounds = numpy.asmatrix(ForceNumericBounds( LBounds ))
#
# Échantillonnage des états
YfQ = None
#
return 0
+# ==============================================================================
+def ForceNumericBounds( __Bounds ):
+ "Force les bornes à être des valeurs numériques, sauf si None"
+ # Conserve une valeur par défaut à None s'il n'y a pas de bornes
+ if __Bounds is None: return None
+ # Converti toutes les bornes individuelles None à +/- l'infini
+ __Bounds = numpy.asarray( __Bounds, dtype=float )
+ if len(__Bounds.shape) != 2 or min(__Bounds.shape) <= 0:
+ raise ValueError("Incorrectly defined bounds data")
+ __Bounds[numpy.isnan(__Bounds)[:,0],0] = -sys.float_info.max
+ __Bounds[numpy.isnan(__Bounds)[:,1],1] = sys.float_info.max
+ return __Bounds
+
+# ==============================================================================
+def RecentredBounds( __Bounds, __Center):
+ # Conserve une valeur par défaut à None s'il n'y a pas de bornes
+ if __Bounds is None: return None
+ # Recentre les valeurs numériques de bornes
+ return ForceNumericBounds( __Bounds ) - numpy.ravel( __Center ).transpose((-1,1))
+
# ==============================================================================
def enks(selfA, Xb, Y, U, HO, EM, CM, R, B, Q, VariantM="EnKS16-KalmanFilterFormula"):
"""
"""
if selfA._parameters["EstimationOf"] == "Parameters":
selfA._parameters["StoreInternalVariables"] = True
+ selfA._parameters["Bounds"] = ForceNumericBounds( selfA._parameters["Bounds"] )
#
# Opérateurs
H = HO["Direct"].appliedControledFormTo
else:
Un = None
#
+ if selfA._parameters["Bounds"] is not None and selfA._parameters["ConstrainedBy"] == "EstimateProjection":
+ Xn = numpy.max(numpy.hstack((Xn,numpy.asmatrix(selfA._parameters["Bounds"])[:,0])),axis=1)
+ Xn = numpy.min(numpy.hstack((Xn,numpy.asmatrix(selfA._parameters["Bounds"])[:,1])),axis=1)
+ #
if selfA._parameters["EstimationOf"] == "State": # Forecast + Q and observation of forecast
Xn_predicted = numpy.asmatrix(numpy.ravel( M( (Xn, Un) ) )).T
if Cm is not None and Un is not None: # Attention : si Cm est aussi dans M, doublon !
Xn = Xn_predicted + Kn * _Innovation
Pn = Pn_predicted - Kn * Ht * Pn_predicted
#
+ if selfA._parameters["Bounds"] is not None and selfA._parameters["ConstrainedBy"] == "EstimateProjection":
+ Xn = numpy.max(numpy.hstack((Xn,numpy.asmatrix(selfA._parameters["Bounds"])[:,0])),axis=1)
+ Xn = numpy.min(numpy.hstack((Xn,numpy.asmatrix(selfA._parameters["Bounds"])[:,1])),axis=1)
+ #
Xa = Xn # Pointeurs
#--------------------------
selfA._setInternalState("Xn", Xn)
x0 = numpy.zeros(Xini.size),
fprime = GradientOfCostFunction,
args = (),
- bounds = selfA._parameters["Bounds"],
+ bounds = RecentredBounds(selfA._parameters["Bounds"], Xb),
maxfun = selfA._parameters["MaximumNumberOfSteps"]-1,
factr = selfA._parameters["CostDecrementTolerance"]*1.e14,
pgtol = selfA._parameters["ProjectedGradientTolerance"],
x0 = numpy.zeros(Xini.size),
fprime = GradientOfCostFunction,
args = (),
- bounds = selfA._parameters["Bounds"],
+ bounds = RecentredBounds(selfA._parameters["Bounds"], Xb),
maxfun = selfA._parameters["MaximumNumberOfSteps"],
pgtol = selfA._parameters["ProjectedGradientTolerance"],
ftol = selfA._parameters["CostDecrementTolerance"],
x0 = Xini,
fprime = GradientOfCostFunction,
args = (),
- bounds = selfA._parameters["Bounds"],
+ bounds = RecentredBounds(selfA._parameters["Bounds"], Xb),
maxfun = selfA._parameters["MaximumNumberOfSteps"]-1,
factr = selfA._parameters["CostDecrementTolerance"]*1.e14,
pgtol = selfA._parameters["ProjectedGradientTolerance"],
x0 = Xini,
fprime = GradientOfCostFunction,
args = (),
- bounds = selfA._parameters["Bounds"],
+ bounds = RecentredBounds(selfA._parameters["Bounds"], Xb),
maxfun = selfA._parameters["MaximumNumberOfSteps"],
pgtol = selfA._parameters["ProjectedGradientTolerance"],
ftol = selfA._parameters["CostDecrementTolerance"],
pass
#
if selfA._parameters["Bounds"] is not None and selfA._parameters["ConstrainedBy"] == "EstimateProjection":
- _Xn = numpy.max(numpy.hstack((_Xn,numpy.asmatrix(selfA._parameters["Bounds"])[:,0])),axis=1)
- _Xn = numpy.min(numpy.hstack((_Xn,numpy.asmatrix(selfA._parameters["Bounds"])[:,1])),axis=1)
+ __Bounds = ForceNumericBounds( selfA._parameters["Bounds"] )
+ _Xn = numpy.max(numpy.hstack((_Xn,numpy.asmatrix(__Bounds)[:,0])),axis=1)
+ _Xn = numpy.min(numpy.hstack((_Xn,numpy.asmatrix(__Bounds)[:,1])),axis=1)
#
# Etape de différence aux observations
if selfA._parameters["EstimationOf"] == "State":
"""
if selfA._parameters["EstimationOf"] == "Parameters":
selfA._parameters["StoreInternalVariables"] = True
+ selfA._parameters["Bounds"] = ForceNumericBounds( selfA._parameters["Bounds"] )
#
L = Xb.size
Alpha = selfA._parameters["Alpha"]
x0 = Xini,
fprime = GradientOfCostFunction,
args = (),
- bounds = selfA._parameters["Bounds"],
+ bounds = RecentredBounds(selfA._parameters["Bounds"], Xb),
maxfun = selfA._parameters["MaximumNumberOfSteps"]-1,
factr = selfA._parameters["CostDecrementTolerance"]*1.e14,
pgtol = selfA._parameters["ProjectedGradientTolerance"],
x0 = Xini,
fprime = GradientOfCostFunction,
args = (),
- bounds = selfA._parameters["Bounds"],
+ bounds = RecentredBounds(selfA._parameters["Bounds"], Xb),
maxfun = selfA._parameters["MaximumNumberOfSteps"],
pgtol = selfA._parameters["ProjectedGradientTolerance"],
ftol = selfA._parameters["CostDecrementTolerance"],