élémentaire. Ces variables sont ensuite disponibles pour implémenter un
algorithme élémentaire particulier.
- Background............: vecteur Xb
- Observation...........: vecteur Y (potentiellement temporel)
- d'observations
- State.................: vecteur d'état dont une partie est le vecteur de
- contrôle. Cette information n'est utile que si l'on veut faire des
- calculs sur l'état complet, mais elle n'est pas indispensable pour
- l'assimilation.
- Control...............: vecteur X contenant toutes les variables de
- contrôle, i.e. les paramètres ou l'état dont on veut estimer la
- valeur pour obtenir les observations
- ObservationOperator...: opérateur d'observation H
+ - Background : vecteur Xb
+ - Observation : vecteur Y (potentiellement temporel) d'observations
+ - State : vecteur d'état dont une partie est le vecteur de contrôle.
+ Cette information n'est utile que si l'on veut faire des calculs sur
+ l'état complet, mais elle n'est pas indispensable pour l'assimilation.
+ - Control : vecteur X contenant toutes les variables de contrôle, i.e.
+ les paramètres ou l'état dont on veut estimer la valeur pour obtenir
+ les observations
+ - ObservationOperator...: opérateur d'observation H
Les observations présentent une erreur dont la matrice de covariance est
R. L'ébauche du vecteur de contrôle présente une erreur dont la matrice
return 0
def setObservationOperator(self,
- asFunction = {"Direct":None, "Tangent":None, "Adjoint":None,
- "useApproximatedDerivatives":False,
- "withCenteredDF" :False,
- "withIncrement" :0.01,
- "withdX" :None,
- },
+ asFunction = None,
asMatrix = None,
appliedToX = None,
toBeStored = False,
L'opérateur doit néanmoins déjà avoir été défini comme d'habitude.
- toBeStored : booléen indiquant si la donnée d'entrée est sauvée pour
être rendue disponible au même titre que les variables de calcul
+ L'argument "asFunction" peut prendre la forme complète suivante, avec
+ les valeurs par défaut standards :
+ asFunction = {"Direct":None, "Tangent":None, "Adjoint":None,
+ "useApproximatedDerivatives":False,
+ "withCenteredDF" :False,
+ "withIncrement" :0.01,
+ "withdX" :None,
+ "withAvoidingRedundancy" :True,
+ "withToleranceInRedundancy" :1.e-15,
+ "withLenghtOfRedundancy" :-1,
+ }
"""
if (type(asFunction) is type({})) and \
asFunction.has_key("useApproximatedDerivatives") and bool(asFunction["useApproximatedDerivatives"]) and \
asFunction.has_key("Direct") and (asFunction["Direct"] is not None):
- if not asFunction.has_key("withCenteredDF"): asFunction["withCenteredDF"] = False
- if not asFunction.has_key("withIncrement"): asFunction["withIncrement"] = 0.01
- if not asFunction.has_key("withdX"): asFunction["withdX"] = None
+ if not asFunction.has_key("withCenteredDF"): asFunction["withCenteredDF"] = False
+ if not asFunction.has_key("withIncrement"): asFunction["withIncrement"] = 0.01
+ if not asFunction.has_key("withdX"): asFunction["withdX"] = None
+ if not asFunction.has_key("withAvoidingRedundancy"): asFunction["withAvoidingRedundancy"] = True
+ if not asFunction.has_key("withToleranceInRedundancy"): asFunction["withToleranceInRedundancy"] = 1.e-15
+ if not asFunction.has_key("withLenghtOfRedundancy"): asFunction["withLenghtOfRedundancy"] = -1
from daNumerics.ApproximatedDerivatives import FDApproximation
FDA = FDApproximation(
- Function = asFunction["Direct"],
- centeredDF = asFunction["withCenteredDF"],
- increment = asFunction["withIncrement"],
- dX = asFunction["withdX"] )
+ Function = asFunction["Direct"],
+ centeredDF = asFunction["withCenteredDF"],
+ increment = asFunction["withIncrement"],
+ dX = asFunction["withdX"],
+ avoidingRedundancy = asFunction["withAvoidingRedundancy"],
+ toleranceInRedundancy = asFunction["withToleranceInRedundancy"],
+ lenghtOfRedundancy = asFunction["withLenghtOfRedundancy"],
+ )
self.__HO["Direct"] = Operator( fromMethod = FDA.DirectOperator )
self.__HO["Tangent"] = Operator( fromMethod = FDA.TangentOperator )
self.__HO["Adjoint"] = Operator( fromMethod = FDA.AdjointOperator )
# -----------------------------------------------------------
def setEvolutionModel(self,
- asFunction = {"Direct":None, "Tangent":None, "Adjoint":None,
- "useApproximatedDerivatives":False,
- "withCenteredDF" :False,
- "withIncrement" :0.01,
- "withdX" :None,
- },
+ asFunction = None,
asMatrix = None,
Scheduler = None,
toBeStored = False,
constructeur de numpy.matrix.
- toBeStored : booléen indiquant si la donnée d'entrée est sauvée pour
être rendue disponible au même titre que les variables de calcul
+ L'argument "asFunction" peut prendre la forme complète suivante, avec
+ les valeurs par défaut standards :
+ asFunction = {"Direct":None, "Tangent":None, "Adjoint":None,
+ "useApproximatedDerivatives":False,
+ "withCenteredDF" :False,
+ "withIncrement" :0.01,
+ "withdX" :None,
+ "withAvoidingRedundancy" :True,
+ "withToleranceInRedundancy" :1.e-15,
+ "withLenghtOfRedundancy" :-1,
+ }
"""
if (type(asFunction) is type({})) and \
asFunction.has_key("useApproximatedDerivatives") and bool(asFunction["useApproximatedDerivatives"]) and \
asFunction.has_key("Direct") and (asFunction["Direct"] is not None):
- if not asFunction.has_key("withCenteredDF"): asFunction["withCenteredDF"] = False
- if not asFunction.has_key("withIncrement"): asFunction["withIncrement"] = 0.01
- if not asFunction.has_key("withdX"): asFunction["withdX"] = None
+ if not asFunction.has_key("withCenteredDF"): asFunction["withCenteredDF"] = False
+ if not asFunction.has_key("withIncrement"): asFunction["withIncrement"] = 0.01
+ if not asFunction.has_key("withdX"): asFunction["withdX"] = None
+ if not asFunction.has_key("withAvoidingRedundancy"): asFunction["withAvoidingRedundancy"] = True
+ if not asFunction.has_key("withToleranceInRedundancy"): asFunction["withToleranceInRedundancy"] = 1.e-15
+ if not asFunction.has_key("withLenghtOfRedundancy"): asFunction["withLenghtOfRedundancy"] = -1
from daNumerics.ApproximatedDerivatives import FDApproximation
FDA = FDApproximation(
- Function = asFunction["Direct"],
- centeredDF = asFunction["withCenteredDF"],
- increment = asFunction["withIncrement"],
- dX = asFunction["withdX"] )
+ Function = asFunction["Direct"],
+ centeredDF = asFunction["withCenteredDF"],
+ increment = asFunction["withIncrement"],
+ dX = asFunction["withdX"],
+ avoidingRedundancy = asFunction["withAvoidingRedundancy"],
+ toleranceInRedundancy = asFunction["withToleranceInRedundancy"],
+ lenghtOfRedundancy = asFunction["withLenghtOfRedundancy"],
+ )
self.__EM["Direct"] = Operator( fromMethod = FDA.DirectOperator )
self.__EM["Tangent"] = Operator( fromMethod = FDA.TangentOperator )
self.__EM["Adjoint"] = Operator( fromMethod = FDA.AdjointOperator )
import os, numpy, time
import logging
-# logging.getLogger().setLevel(logging.DEBUG)
+# logging.getLogger().setLevel(logging.DEBUG)
# ==============================================================================
class FDApproximation:
"dX" qui sera multiplié par "increment" (donc en %), et on effectue de DF
centrées si le booléen "centeredDF" est vrai.
"""
- def __init__(self, Function = None, centeredDF = False, increment = 0.01, dX = None):
+ def __init__(self,
+ Function = None,
+ centeredDF = False,
+ increment = 0.01,
+ dX = None,
+ avoidingRedundancy = True,
+ toleranceInRedundancy = 1.e-15,
+ lenghtOfRedundancy = -1,
+ ):
self.__userFunction = Function
self.__centeredDF = bool(centeredDF)
+ if avoidingRedundancy:
+ self.__avoidRC = True
+ self.__tolerBP = float(toleranceInRedundancy)
+ self.__lenghtR = int(lenghtOfRedundancy)
+ self.__listDPCP = [] # Direct Operator Previous Calculated Points
+ self.__listDPCR = [] # Direct Operator Previous Calculated Results
+ self.__listDPCN = [] # Direct Operator Previous Calculated Point Norms
+ self.__listJPCP = [] # Jacobian Previous Calculated Points
+ self.__listJPCI = [] # Jacobian Previous Calculated Increment
+ self.__listJPCR = [] # Jacobian Previous Calculated Results
+ self.__listJPPN = [] # Jacobian Previous Calculated Point Norms
+ self.__listJPIN = [] # Jacobian Previous Calculated Increment Norms
+ else:
+ self.__avoidRC = False
if float(increment) <> 0.:
self.__increment = float(increment)
else:
self.__dX = None
else:
self.__dX = numpy.asmatrix(numpy.ravel( dX )).T
+ logging.debug("FDA Reduction des doublons de calcul : %s"%self.__avoidRC)
+ if self.__avoidRC:
+ logging.debug("FDA Tolerance de determination des doublons : %.2e"%self.__tolerBP)
+
+ # ---------------------------------------------------------
+ def __doublon__(self, e, l, n, v=""):
+ __ac, __i = False, -1
+ for i in xrange(len(l)-1,-1,-1):
+ if numpy.linalg.norm(e - l[i]) < self.__tolerBP * n[i]:
+ __ac, __i = True, i
+ if v is not None: logging.debug("FDA Cas%s déja calculé, récupération du doublon"%v)
+ break
+ return __ac, __i
# ---------------------------------------------------------
def DirectOperator(self, X ):
Calcul du direct à l'aide de la fonction fournie.
"""
_X = numpy.asmatrix(numpy.ravel( X )).T
- _HX = self.__userFunction( _X )
+ #
+ __alreadyCalculated = False
+ if self.__avoidRC:
+ __alreadyCalculated, __i = self.__doublon__(_X, self.__listDPCP, self.__listDPCN, " H")
+ #
+ if __alreadyCalculated:
+ logging.debug("FDA Calcul DirectOperator (par récupération)")
+ _HX = self.__listDPCR[__i]
+ else:
+ logging.debug("FDA Calcul DirectOperator (explicite)")
+ _HX = self.__userFunction( _X )
+ if self.__avoidRC:
+ if self.__lenghtR < 0: self.__lenghtR = 2 * _X.size
+ if len(self.__listDPCP) > self.__lenghtR:
+ self.__listDPCP.pop(0)
+ self.__listDPCR.pop(0)
+ self.__listDPCN.pop(0)
+ self.__listDPCP.append( _X )
+ self.__listDPCR.append( _HX )
+ self.__listDPCN.append( numpy.linalg.norm(_X) )
+ #
return numpy.ravel( _HX )
# ---------------------------------------------------------
else:
_dX = numpy.where( _dX == 0., moyenne, _dX )
#
- if self.__centeredDF:
- #
- _Jacobienne = []
- for i in range( len(_dX) ):
- _dXi = _dX[i]
- _X_plus_dXi = numpy.array( _X.A1, dtype=float )
- _X_plus_dXi[i] = _X[i] + _dXi
- _X_moins_dXi = numpy.array( _X.A1, dtype=float )
- _X_moins_dXi[i] = _X[i] - _dXi
+ __alreadyCalculated = False
+ if self.__avoidRC:
+ __bidon, __alreadyCalculatedP = self.__doublon__(_X, self.__listJPCP, self.__listJPPN, None)
+ __bidon, __alreadyCalculatedI = self.__doublon__(_dX, self.__listJPCI, self.__listJPIN, None)
+ if __alreadyCalculatedP == __alreadyCalculatedI > -1:
+ __alreadyCalculated, __i = True, __alreadyCalculatedP
+ #
+ if __alreadyCalculated:
+ logging.debug("FDA Calcul Jacobienne (par récupération)")
+ _Jacobienne = self.__listJPCR[__i]
+ else:
+ logging.debug("FDA Calcul Jacobienne (explicite)")
+ if self.__centeredDF:
#
- _HX_plus_dXi = self.DirectOperator( _X_plus_dXi )
- _HX_moins_dXi = self.DirectOperator( _X_moins_dXi )
+ _Jacobienne = []
+ for i in range( _dX.size ):
+ _dXi = _dX[i]
+ _X_plus_dXi = numpy.array( _X.A1, dtype=float )
+ _X_plus_dXi[i] = _X[i] + _dXi
+ _X_moins_dXi = numpy.array( _X.A1, dtype=float )
+ _X_moins_dXi[i] = _X[i] - _dXi
+ #
+ _HX_plus_dXi = self.DirectOperator( _X_plus_dXi )
+ _HX_moins_dXi = self.DirectOperator( _X_moins_dXi )
+ #
+ _Jacobienne.append( numpy.ravel( _HX_plus_dXi - _HX_moins_dXi ) / (2.*_dXi) )
#
- _Jacobienne.append( numpy.ravel( _HX_plus_dXi - _HX_moins_dXi ) / (2.*_dXi) )
- #
- else:
- #
- _Jacobienne = []
- _HX = self.DirectOperator( _X )
- for i in range( len(_dX) ):
- _dXi = _dX[i]
- _X_plus_dXi = numpy.array( _X.A1, dtype=float )
- _X_plus_dXi[i] = _X[i] + _dXi
+ else:
#
- _HX_plus_dXi = self.DirectOperator( _X_plus_dXi )
+ _Jacobienne = []
+ _HX = self.DirectOperator( _X )
+ for i in range( _dX.size ):
+ _dXi = _dX[i]
+ _X_plus_dXi = numpy.array( _X.A1, dtype=float )
+ _X_plus_dXi[i] = _X[i] + _dXi
+ #
+ _HX_plus_dXi = self.DirectOperator( _X_plus_dXi )
+ #
+ _Jacobienne.append( numpy.ravel(( _HX_plus_dXi - _HX ) / _dXi) )
#
- _Jacobienne.append( numpy.ravel(( _HX_plus_dXi - _HX ) / _dXi) )
#
+ _Jacobienne = numpy.matrix( numpy.vstack( _Jacobienne ) ).T
+ if self.__avoidRC:
+ if self.__lenghtR < 0: self.__lenghtR = 2 * _X.size
+ if len(self.__listJPCP) > self.__lenghtR:
+ self.__listJPCP.pop(0)
+ self.__listJPCI.pop(0)
+ self.__listJPCR.pop(0)
+ self.__listJPPN.pop(0)
+ self.__listJPIN.pop(0)
+ self.__listJPCP.append( _X )
+ self.__listJPCI.append( _dX )
+ self.__listJPCR.append( _Jacobienne )
+ self.__listJPPN.append( numpy.linalg.norm(_X) )
+ self.__listJPIN.append( numpy.linalg.norm(_Jacobienne) )
#
- _Jacobienne = numpy.matrix( numpy.vstack( _Jacobienne ) ).T
logging.debug("FDA Fin du calcul de la Jacobienne")
#
return _Jacobienne