#
# See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
#
+# Author: Jean-Philippe Argaud, jean-philippe.argaud@edf.fr, EDF R&D
import logging
from daCore import BasicObjects, PlatformInfo
typecast = float,
message = "Maximum des composantes du gradient lors de l'arrêt",
)
- self.defineRequiredParameter(
- name = "CalculateAPosterioriCovariance",
- default = False,
- typecast = bool,
- message = "Calcul de la covariance a posteriori",
- )
self.defineRequiredParameter(
name = "StoreInternalVariables",
default = False,
typecast = bool,
message = "Stockage des variables internes ou intermédiaires du calcul",
)
+ self.defineRequiredParameter(
+ name = "StoreSupplementaryCalculations",
+ default = [],
+ typecast = tuple,
+ message = "Liste de calculs supplémentaires à stocker et/ou effectuer",
+ listval = ["APosterioriCovariance", "BMA", "OMA", "OMB", "Innovation", "SigmaObs2"]
+ )
def run(self, Xb=None, Y=None, H=None, M=None, R=None, B=None, Q=None, Parameters=None):
"""
Calcul de l'estimateur 3D-VAR
"""
logging.debug("%s Lancement"%self._name)
- logging.debug("%s Taille mémoire utilisée de %.1f Mo"%(self._name, m.getUsedMemory("Mo")))
+ logging.debug("%s Taille mémoire utilisée de %.1f Mo"%(self._name, m.getUsedMemory("M")))
#
# Paramètres de pilotage
# ----------------------
logging.debug("%s Analyse Xa = %s"%(self._name, Xa))
#
self.StoredVariables["Analysis"].store( Xa.A1 )
- self.StoredVariables["Innovation"].store( d.A1 )
#
# Calcul de la covariance d'analyse
# ---------------------------------
- if self._parameters["CalculateAPosterioriCovariance"]:
+ if "APosterioriCovariance" in self._parameters["StoreSupplementaryCalculations"]:
HessienneI = []
nb = len(Xini)
for i in range(nb):
raise ValueError("The 3DVAR a posteriori covariance matrix A is not symmetric positive-definite. Check your B and R a priori covariances.")
self.StoredVariables["APosterioriCovariance"].store( A )
#
- logging.debug("%s Taille mémoire utilisée de %.1f Mo"%(self._name, m.getUsedMemory("MB")))
+ # Calculs et/ou stockages supplémentaires
+ # ---------------------------------------
+ if "Innovation" in self._parameters["StoreSupplementaryCalculations"]:
+ self.StoredVariables["Innovation"].store( numpy.asmatrix(d).flatten().A1 )
+ if "BMA" in self._parameters["StoreSupplementaryCalculations"]:
+ self.StoredVariables["BMA"].store( numpy.asmatrix(Xb - Xa).flatten().A1 )
+ if "OMA" in self._parameters["StoreSupplementaryCalculations"]:
+ self.StoredVariables["OMA"].store( numpy.asmatrix(Y - Hm(Xa)).flatten().A1 )
+ if "OMB" in self._parameters["StoreSupplementaryCalculations"]:
+ self.StoredVariables["OMB"].store( numpy.asmatrix(d).flatten().A1 )
+ if "SigmaObs2" in self._parameters["StoreSupplementaryCalculations"]:
+ self.StoredVariables["SigmaObs2"].store( float( (d.T * (Y-Hm(Xa))) / R.trace() ) )
+ #
+ logging.debug("%s Taille mémoire utilisée de %.1f Mo"%(self._name, m.getUsedMemory("M")))
logging.debug("%s Terminé"%self._name)
#
return 0
#
# See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
#
+# Author: Jean-Philippe Argaud, jean-philippe.argaud@edf.fr, EDF R&D
import logging
from daCore import BasicObjects, PlatformInfo
def __init__(self):
BasicObjects.Algorithm.__init__(self, "BLUE")
self.defineRequiredParameter(
- name = "CalculateAPosterioriCovariance",
- default = False,
- typecast = bool,
- message = "Calcul de la covariance a posteriori",
+ name = "StoreSupplementaryCalculations",
+ default = [],
+ typecast = tuple,
+ message = "Liste de calculs supplémentaires à stocker et/ou effectuer",
+ listval = ["APosterioriCovariance", "BMA", "OMA", "OMB", "Innovation", "SigmaBck2", "SigmaObs2"]
)
def run(self, Xb=None, Y=None, H=None, M=None, R=None, B=None, Q=None, Parameters=None):
Calcul de l'estimateur BLUE (ou Kalman simple, ou Interpolation Optimale)
"""
logging.debug("%s Lancement"%self._name)
- logging.debug("%s Taille mémoire utilisée de %.1f Mo"%(self._name, m.getUsedMemory("Mo")))
+ logging.debug("%s Taille mémoire utilisée de %.1f Mo"%(self._name, m.getUsedMemory("M")))
#
# Paramètres de pilotage
# ----------------------
if max(Y.shape) != max(HXb.shape):
raise ValueError("The shapes %s of observations Y and %s of observed calculation H(X) are different, they have to be identical."%(Y.shape,HXb.shape))
d = Y - HXb
- logging.debug("%s Innovation d = %s"%(self._name, d))
+ Jb = 0.
+ Jo = 0.5 * d.T * RI * d
+ J = float( Jb ) + float( Jo )
+ logging.debug("%s Innovation d = %s"%(self._name, d))
+ logging.debug("%s CostFunction Jb = %s"%(self._name, Jb))
+ logging.debug("%s CostFunction Jo = %s"%(self._name, Jo))
+ logging.debug("%s CostFunction J = %s"%(self._name, J))
+ #
+ self.StoredVariables["CostFunctionJb"].store( Jb )
+ self.StoredVariables["CostFunctionJo"].store( Jo )
+ self.StoredVariables["CostFunctionJ" ].store( J )
#
# Calcul de la matrice de gain et de l'analyse
# --------------------------------------------
#
# Calcul de la fonction coût
# --------------------------
+ oma = Y - Hm * Xa
Jb = 0.5 * (Xa - Xb).T * BI * (Xa - Xb)
- Jo = 0.5 * d.T * RI * d
+ Jo = 0.5 * oma.T * RI * oma
J = float( Jb ) + float( Jo )
+ logging.debug("%s OMA = %s"%(self._name, oma))
logging.debug("%s CostFunction Jb = %s"%(self._name, Jb))
logging.debug("%s CostFunction Jo = %s"%(self._name, Jo))
logging.debug("%s CostFunction J = %s"%(self._name, J))
#
self.StoredVariables["Analysis"].store( Xa.A1 )
- self.StoredVariables["Innovation"].store( d.A1 )
self.StoredVariables["CostFunctionJb"].store( Jb )
self.StoredVariables["CostFunctionJo"].store( Jo )
self.StoredVariables["CostFunctionJ" ].store( J )
#
# Calcul de la covariance d'analyse
# ---------------------------------
- if self._parameters["CalculateAPosterioriCovariance"]:
+ if "APosterioriCovariance" in self._parameters["StoreSupplementaryCalculations"]:
A = B - K * Hm * B
if logging.getLogger().level < logging.WARNING: # La verification n'a lieu qu'en debug
try:
raise ValueError("The BLUE a posteriori covariance matrix A is not symmetric positive-definite. Check your B and R a priori covariances.")
self.StoredVariables["APosterioriCovariance"].store( A )
#
- logging.debug("%s Taille mémoire utilisée de %.1f Mo"%(self._name, m.getUsedMemory("Mo")))
+ # Calculs et/ou stockages supplémentaires
+ # ---------------------------------------
+ if "Innovation" in self._parameters["StoreSupplementaryCalculations"]:
+ self.StoredVariables["Innovation"].store( numpy.asmatrix(d).flatten().A1 )
+ if "BMA" in self._parameters["StoreSupplementaryCalculations"]:
+ self.StoredVariables["BMA"].store( numpy.asmatrix(Xb - Xa).flatten().A1 )
+ if "OMA" in self._parameters["StoreSupplementaryCalculations"]:
+ self.StoredVariables["OMA"].store( numpy.asmatrix(oma).flatten().A1 )
+ if "OMB" in self._parameters["StoreSupplementaryCalculations"]:
+ self.StoredVariables["OMB"].store( numpy.asmatrix(d).flatten().A1 )
+ if "SigmaObs2" in self._parameters["StoreSupplementaryCalculations"]:
+ self.StoredVariables["SigmaObs2"].store( float( (d.T * (Y-Hm*Xa)) / R.trace() ) )
+ if "SigmaBck2" in self._parameters["StoreSupplementaryCalculations"]:
+ self.StoredVariables["SigmaBck2"].store( float( (d.T * Hm * (Xa - Xb))/(Hm * B * Hm.T).trace() ) )
+ #
+ logging.debug("%s Taille mémoire utilisée de %.1f Mo"%(self._name, m.getUsedMemory("M")))
logging.debug("%s Terminé"%self._name)
#
return 0
#
# See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
#
+# Author: Jean-Philippe Argaud, jean-philippe.argaud@edf.fr, EDF R&D
import logging
from daCore import BasicObjects, PlatformInfo
m = PlatformInfo.SystemUsage()
+import numpy
+
# ==============================================================================
class ElementaryAlgorithm(BasicObjects.Algorithm):
def __init__(self):
BasicObjects.Algorithm.__init__(self, "LINEARLEASTSQUARES")
+ self.defineRequiredParameter(
+ name = "StoreSupplementaryCalculations",
+ default = [],
+ typecast = tuple,
+ message = "Liste de calculs supplémentaires à stocker et/ou effectuer",
+ listval = ["OMA"]
+ )
def run(self, Xb=None, Y=None, H=None, M=None, R=None, B=None, Q=None, Parameters=None):
"""
(assimilation variationnelle sans ébauche)
"""
logging.debug("%s Lancement"%self._name)
- logging.debug("%s Taille mémoire utilisée de %.1f Mo"%(self._name, m.getUsedMemory("Mo")))
+ logging.debug("%s Taille mémoire utilisée de %.1f Mo"%(self._name, m.getUsedMemory("M")))
#
# Paramètres de pilotage
# ----------------------
#
# Calcul de la fonction coût
# --------------------------
- d = Y - Hm * Xa
+ oma = Y - Hm * Xa
Jb = 0.
- Jo = 0.5 * d.T * RI * d
+ Jo = 0.5 * oma.T * RI * oma
J = float( Jb ) + float( Jo )
logging.debug("%s CostFunction Jb = %s"%(self._name, Jb))
logging.debug("%s CostFunction Jo = %s"%(self._name, Jo))
logging.debug("%s CostFunction J = %s"%(self._name, J))
#
self.StoredVariables["Analysis"].store( Xa.A1 )
- self.StoredVariables["Innovation"].store( d.A1 )
self.StoredVariables["CostFunctionJb"].store( Jb )
self.StoredVariables["CostFunctionJo"].store( Jo )
self.StoredVariables["CostFunctionJ" ].store( J )
#
- logging.debug("%s Taille mémoire utilisée de %.1f Mo"%(self._name, m.getUsedMemory("Mo")))
+ # Calculs et/ou stockages supplémentaires
+ # ---------------------------------------
+ if "OMA" in self._parameters["StoreSupplementaryCalculations"]:
+ self.StoredVariables["OMA"].store( numpy.asmatrix(oma).flatten().A1 )
+ #
+ logging.debug("%s Taille mémoire utilisée de %.1f Mo"%(self._name, m.getUsedMemory("M")))
logging.debug("%s Terminé"%self._name)
#
return 0
#
# See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
#
+# Author: Jean-Philippe Argaud, jean-philippe.argaud@edf.fr, EDF R&D
import logging
from daCore import BasicObjects, PlatformInfo
typecast = bool,
message = "Stockage des variables internes ou intermédiaires du calcul",
)
+ self.defineRequiredParameter(
+ name = "StoreSupplementaryCalculations",
+ default = [],
+ typecast = tuple,
+ message = "Liste de calculs supplémentaires à stocker et/ou effectuer",
+ listval = ["BMA", "OMA", "OMB", "Innovation"]
+ )
def run(self, Xb=None, Y=None, H=None, M=None, R=None, B=None, Q=None, Parameters=None):
"""
(assimilation variationnelle sans ébauche)
"""
logging.debug("%s Lancement"%self._name)
- logging.debug("%s Taille mémoire utilisée de %.1f Mo"%(self._name, m.getUsedMemory("Mo")))
+ logging.debug("%s Taille mémoire utilisée de %.1f Mo"%(self._name, m.getUsedMemory("M")))
#
# Paramètres de pilotage
# ----------------------
logging.debug("%s Analyse Xa = %s"%(self._name, Xa))
#
self.StoredVariables["Analysis"].store( Xa.A1 )
- self.StoredVariables["Innovation"].store( d.A1 )
#
- logging.debug("%s Taille mémoire utilisée de %.1f Mo"%(self._name, m.getUsedMemory("MB")))
+ # Calculs et/ou stockages supplémentaires
+ # ---------------------------------------
+ if "Innovation" in self._parameters["StoreSupplementaryCalculations"]:
+ self.StoredVariables["Innovation"].store( numpy.asmatrix(d).flatten().A1 )
+ if "BMA" in self._parameters["StoreSupplementaryCalculations"]:
+ self.StoredVariables["BMA"].store( numpy.asmatrix(Xb - Xa).flatten().A1 )
+ if "OMA" in self._parameters["StoreSupplementaryCalculations"]:
+ self.StoredVariables["OMA"].store( numpy.asmatrix(Y - Hm(Xa)).flatten().A1 )
+ if "OMB" in self._parameters["StoreSupplementaryCalculations"]:
+ self.StoredVariables["OMB"].store( numpy.asmatrix(d).flatten().A1 )
+ #
+ logging.debug("%s Taille mémoire utilisée de %.1f Mo"%(self._name, m.getUsedMemory("M")))
logging.debug("%s Terminé"%self._name)
#
return 0
print "Demi-somme :", list((numpy.array([0, 1, 2])+numpy.array([0.5, 1.5, 2.5]))/2)
print " qui doit être identique à :"
print "Analyse résultante :", ADD.get("Analysis").valueserie(0)
- print "Innovation :", ADD.get("Innovation").valueserie(0)
print
print "Algorithmes disponibles.......................:", ADD.get_available_algorithms()
else: __val = typecast( value )
#
if minval is not None and __val < minval:
- raise ValueError("The parameter named \"%s\" of value %s can not be less than %s."%(name, __val, minval))
+ raise ValueError("The parameter named \"%s\" of value \"%s\" can not be less than %s."%(name, __val, minval))
if maxval is not None and __val > maxval:
- raise ValueError("The parameter named \"%s\" of value %s can not be greater than %s."%(name, __val, maxval))
- if listval is not None and __val not in listval:
- raise ValueError("The parameter named \"%s\" of value %s has to be in the list %s."%(name, __val, listval))
+ raise ValueError("The parameter named \"%s\" of value \"%s\" can not be greater than %s."%(name, __val, maxval))
+ if listval is not None:
+ if typecast is list or typecast is tuple or type(__val) is list or type(__val) is tuple:
+ for v in __val:
+ if v not in listval:
+ raise ValueError("The value \"%s\" of the parameter named \"%s\" is not allowed, it has to be in the list %s."%(v, name, listval))
+ elif __val not in listval:
+ raise ValueError("The value \"%s\" of the parameter named \"%s\" is not allowed, it has to be in the list %s."%( __val, name,listval))
return __val
def setParameters(self, fromDico={}):