# -*- coding: utf-8 -*-
#
-# Copyright (C) 2008-2020 EDF R&D
+# Copyright (C) 2008-2021 EDF R&D
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
import sys
import logging
import copy
+import time
import numpy
from functools import partial
from daCore import Persistence, PlatformInfo, Interfaces
"Renvoie le type"
return self.__Type
- def appliedTo(self, xValue, HValue = None, argsAsSerie = False):
+ def appliedTo(self, xValue, HValue = None, argsAsSerie = False, returnSerieAsArrayMatrix = False):
"""
Permet de restituer le résultat de l'application de l'opérateur à une
série d'arguments xValue. Cette méthode se contente d'appliquer, chaque
#
if _HValue is not None:
assert len(_xValue) == len(_HValue), "Incompatible number of elements in xValue and HValue"
- HxValue = []
+ _HxValue = []
for i in range(len(_HValue)):
- HxValue.append( numpy.asmatrix( numpy.ravel( _HValue[i] ) ).T )
+ _HxValue.append( numpy.asmatrix( numpy.ravel( _HValue[i] ) ).T )
if self.__AvoidRC:
- Operator.CM.storeValueInX(_xValue[i],HxValue[-1],self.__name)
+ Operator.CM.storeValueInX(_xValue[i],_HxValue[-1],self.__name)
else:
- HxValue = []
+ _HxValue = []
_xserie = []
_hindex = []
for i, xv in enumerate(_xValue):
else:
if self.__Matrix is not None:
self.__addOneMatrixCall()
- _hv = self.__Matrix * xv
+ _xv = numpy.matrix(numpy.ravel(xv)).T
+ _hv = self.__Matrix * _xv
else:
self.__addOneMethodCall()
_xserie.append( xv )
_hindex.append( i )
_hv = None
- HxValue.append( _hv )
+ _HxValue.append( _hv )
#
if len(_xserie)>0 and self.__Matrix is None:
if self.__extraArgs is None:
for i in _hindex:
_xv = _xserie.pop(0)
_hv = _hserie.pop(0)
- HxValue[i] = _hv
+ _HxValue[i] = _hv
if self.__AvoidRC:
Operator.CM.storeValueInX(_xv,_hv,self.__name)
#
- if argsAsSerie: return HxValue
- else: return HxValue[-1]
+ if returnSerieAsArrayMatrix:
+ _HxValue = numpy.stack([numpy.ravel(_hv) for _hv in _HxValue], axis=1)
+ #
+ if argsAsSerie: return _HxValue
+ else: return _HxValue[-1]
- def appliedControledFormTo(self, paires, argsAsSerie = False):
+ def appliedControledFormTo(self, paires, argsAsSerie = False, returnSerieAsArrayMatrix = False):
"""
Permet de restituer le résultat de l'application de l'opérateur à des
paires (xValue, uValue). Cette méthode se contente d'appliquer, son
PlatformInfo.isIterable( _xuValue, True, " in Operator.appliedControledFormTo" )
#
if self.__Matrix is not None:
- HxValue = []
+ _HxValue = []
for paire in _xuValue:
_xValue, _uValue = paire
+ _xValue = numpy.matrix(numpy.ravel(_xValue)).T
self.__addOneMatrixCall()
- HxValue.append( self.__Matrix * _xValue )
+ _HxValue.append( self.__Matrix * _xValue )
else:
- HxValue = []
+ _xuArgs = []
for paire in _xuValue:
- _xuValue = []
_xValue, _uValue = paire
if _uValue is not None:
- _xuValue.append( paire )
+ _xuArgs.append( paire )
else:
- _xuValue.append( _xValue )
- self.__addOneMethodCall( len(_xuValue) )
+ _xuArgs.append( _xValue )
+ self.__addOneMethodCall( len(_xuArgs) )
if self.__extraArgs is None:
- HxValue = self.__Method( _xuValue ) # Calcul MF
+ _HxValue = self.__Method( _xuArgs ) # Calcul MF
else:
- HxValue = self.__Method( _xuValue, self.__extraArgs ) # Calcul MF
+ _HxValue = self.__Method( _xuArgs, self.__extraArgs ) # Calcul MF
+ #
+ if returnSerieAsArrayMatrix:
+ _HxValue = numpy.stack([numpy.ravel(_hv) for _hv in _HxValue], axis=1)
#
- if argsAsSerie: return HxValue
- else: return HxValue[-1]
+ if argsAsSerie: return _HxValue
+ else: return _HxValue[-1]
- def appliedInXTo(self, paires, argsAsSerie = False):
+ def appliedInXTo(self, paires, argsAsSerie = False, returnSerieAsArrayMatrix = False):
"""
Permet de restituer le résultat de l'application de l'opérateur à une
série d'arguments xValue, sachant que l'opérateur est valable en
PlatformInfo.isIterable( _nxValue, True, " in Operator.appliedInXTo" )
#
if self.__Matrix is not None:
- HxValue = []
+ _HxValue = []
for paire in _nxValue:
_xNominal, _xValue = paire
+ _xValue = numpy.matrix(numpy.ravel(_xValue)).T
self.__addOneMatrixCall()
- HxValue.append( self.__Matrix * _xValue )
+ _HxValue.append( self.__Matrix * _xValue )
else:
self.__addOneMethodCall( len(_nxValue) )
if self.__extraArgs is None:
- HxValue = self.__Method( _nxValue ) # Calcul MF
+ _HxValue = self.__Method( _nxValue ) # Calcul MF
else:
- HxValue = self.__Method( _nxValue, self.__extraArgs ) # Calcul MF
+ _HxValue = self.__Method( _nxValue, self.__extraArgs ) # Calcul MF
+ #
+ if returnSerieAsArrayMatrix:
+ _HxValue = numpy.stack([numpy.ravel(_hv) for _hv in _HxValue], axis=1)
#
- if argsAsSerie: return HxValue
- else: return HxValue[-1]
+ if argsAsSerie: return _HxValue
+ else: return _HxValue[-1]
def asMatrix(self, ValueForMethodForm = "UnknownVoidValue", argsAsSerie = False):
"""
- CostFunctionJbAtCurrentOptimum : partie ébauche à l'état optimal courant lors d'itérations
- CostFunctionJo : partie observations de la fonction-coût : Jo
- CostFunctionJoAtCurrentOptimum : partie observations à l'état optimal courant lors d'itérations
+ - CurrentIterationNumber : numéro courant d'itération dans les algorithmes itératifs, à partir de 0
- CurrentOptimum : état optimal courant lors d'itérations
- CurrentState : état courant lors d'itérations
- GradientOfCostFunctionJ : gradient de la fonction-coût globale
self.StoredVariables["CostFunctionJbAtCurrentOptimum"] = Persistence.OneScalar(name = "CostFunctionJbAtCurrentOptimum")
self.StoredVariables["CostFunctionJo"] = Persistence.OneScalar(name = "CostFunctionJo")
self.StoredVariables["CostFunctionJoAtCurrentOptimum"] = Persistence.OneScalar(name = "CostFunctionJoAtCurrentOptimum")
+ self.StoredVariables["CurrentIterationNumber"] = Persistence.OneIndex(name = "CurrentIterationNumber")
self.StoredVariables["CurrentOptimum"] = Persistence.OneVector(name = "CurrentOptimum")
self.StoredVariables["CurrentState"] = Persistence.OneVector(name = "CurrentState")
self.StoredVariables["ForecastState"] = Persistence.OneVector(name = "ForecastState")
self.__canonical_parameter_name["algorithm"] = "Algorithm"
self.__canonical_parameter_name["storesupplementarycalculations"] = "StoreSupplementaryCalculations"
- def _pre_run(self, Parameters, Xb=None, Y=None, R=None, B=None, Q=None ):
+ def _pre_run(self, Parameters, Xb=None, Y=None, U=None, HO=None, EM=None, CM=None, R=None, B=None, Q=None ):
"Pré-calcul"
logging.debug("%s Lancement", self._name)
logging.debug("%s Taille mémoire utilisée de %.0f Mio"%(self._name, self._m.getUsedMemory("Mio")))
+ self._getTimeState(reset=True)
#
# Mise a jour des paramètres internes avec le contenu de Parameters, en
# reprenant les valeurs par défauts pour toutes celles non définies
for k, v in self.__variable_names_not_public.items():
if k not in self._parameters: self.__setParameters( {k:v} )
#
- # Corrections et compléments
- def __test_vvalue(argument, variable, argname):
+ # Corrections et compléments des vecteurs
+ def __test_vvalue(argument, variable, argname, symbol=None):
+ if symbol is None: symbol = variable
if argument is None:
if variable in self.__required_inputs["RequiredInputValues"]["mandatory"]:
- raise ValueError("%s %s vector %s has to be properly defined!"%(self._name,argname,variable))
+ raise ValueError("%s %s vector %s is not set and has to be properly defined!"%(self._name,argname,symbol))
elif variable in self.__required_inputs["RequiredInputValues"]["optional"]:
- logging.debug("%s %s vector %s is not set, but is optional."%(self._name,argname,variable))
+ logging.debug("%s %s vector %s is not set, but is optional."%(self._name,argname,symbol))
else:
- logging.debug("%s %s vector %s is not set, but is not required."%(self._name,argname,variable))
+ logging.debug("%s %s vector %s is not set, but is not required."%(self._name,argname,symbol))
else:
- logging.debug("%s %s vector %s is set, and its size is %i."%(self._name,argname,variable,numpy.array(argument).size))
+ logging.debug("%s %s vector %s is set, and its size is %i."%(self._name,argname,symbol,numpy.array(argument).size))
return 0
__test_vvalue( Xb, "Xb", "Background or initial state" )
__test_vvalue( Y, "Y", "Observation" )
+ __test_vvalue( U, "U", "Control" )
#
- def __test_cvalue(argument, variable, argname):
+ # Corrections et compléments des covariances
+ def __test_cvalue(argument, variable, argname, symbol=None):
+ if symbol is None: symbol = variable
if argument is None:
if variable in self.__required_inputs["RequiredInputValues"]["mandatory"]:
- raise ValueError("%s %s error covariance matrix %s has to be properly defined!"%(self._name,argname,variable))
+ raise ValueError("%s %s error covariance matrix %s is not set and has to be properly defined!"%(self._name,argname,symbol))
elif variable in self.__required_inputs["RequiredInputValues"]["optional"]:
- logging.debug("%s %s error covariance matrix %s is not set, but is optional."%(self._name,argname,variable))
+ logging.debug("%s %s error covariance matrix %s is not set, but is optional."%(self._name,argname,symbol))
else:
- logging.debug("%s %s error covariance matrix %s is not set, but is not required."%(self._name,argname,variable))
+ logging.debug("%s %s error covariance matrix %s is not set, but is not required."%(self._name,argname,symbol))
else:
- logging.debug("%s %s error covariance matrix %s is set."%(self._name,argname,variable))
+ logging.debug("%s %s error covariance matrix %s is set."%(self._name,argname,symbol))
return 0
- __test_cvalue( R, "R", "Observation" )
__test_cvalue( B, "B", "Background" )
+ __test_cvalue( R, "R", "Observation" )
__test_cvalue( Q, "Q", "Evolution" )
#
+ # Corrections et compléments des opérateurs
+ def __test_ovalue(argument, variable, argname, symbol=None):
+ if symbol is None: symbol = variable
+ if argument is None or (isinstance(argument,dict) and len(argument)==0):
+ if variable in self.__required_inputs["RequiredInputValues"]["mandatory"]:
+ raise ValueError("%s %s operator %s is not set and has to be properly defined!"%(self._name,argname,symbol))
+ elif variable in self.__required_inputs["RequiredInputValues"]["optional"]:
+ logging.debug("%s %s operator %s is not set, but is optional."%(self._name,argname,symbol))
+ else:
+ logging.debug("%s %s operator %s is not set, but is not required."%(self._name,argname,symbol))
+ else:
+ logging.debug("%s %s operator %s is set."%(self._name,argname,symbol))
+ return 0
+ __test_ovalue( HO, "HO", "Observation", "H" )
+ __test_ovalue( EM, "EM", "Evolution", "M" )
+ __test_ovalue( CM, "CM", "Control Model", "C" )
+ #
if ("Bounds" in self._parameters) and isinstance(self._parameters["Bounds"], (list, tuple)) and (len(self._parameters["Bounds"]) > 0):
logging.debug("%s Prise en compte des bornes effectuee"%(self._name,))
else:
logging.debug("%s Nombre d'évaluation(s) de l'opérateur d'observation direct/tangent/adjoint.: %i/%i/%i", self._name, _oH["Direct"].nbcalls(0),_oH["Tangent"].nbcalls(0),_oH["Adjoint"].nbcalls(0))
logging.debug("%s Nombre d'appels au cache d'opérateur d'observation direct/tangent/adjoint..: %i/%i/%i", self._name, _oH["Direct"].nbcalls(3),_oH["Tangent"].nbcalls(3),_oH["Adjoint"].nbcalls(3))
logging.debug("%s Taille mémoire utilisée de %.0f Mio", self._name, self._m.getUsedMemory("Mio"))
+ logging.debug("%s Durées d'utilisation CPU de %.1fs et elapsed de %.1fs", self._name, self._getTimeState()[0], self._getTimeState()[1])
logging.debug("%s Terminé", self._name)
return 0
"""
raise NotImplementedError("Mathematical assimilation calculation has not been implemented!")
- def defineRequiredParameter(self, name = None, default = None, typecast = None, message = None, minval = None, maxval = None, listval = None):
+ def defineRequiredParameter(self, name = None, default = None, typecast = None, message = None, minval = None, maxval = None, listval = None, listadv = None):
"""
Permet de définir dans l'algorithme des paramètres requis et leurs
caractéristiques par défaut.
"minval" : minval,
"maxval" : maxval,
"listval" : listval,
+ "listadv" : listadv,
"message" : message,
}
self.__canonical_parameter_name[name.lower()] = name
minval = self.__required_parameters[__k]["minval"]
maxval = self.__required_parameters[__k]["maxval"]
listval = self.__required_parameters[__k]["listval"]
+ listadv = self.__required_parameters[__k]["listadv"]
#
if value is None and default is None:
__val = None
raise ValueError("The parameter named '%s' of value '%s' can not be less than %s."%(__k, __val, minval))
if maxval is not None and (numpy.array(__val, float) > maxval).any():
raise ValueError("The parameter named '%s' of value '%s' can not be greater than %s."%(__k, __val, maxval))
- if listval is not None:
+ if listval is not None or listadv is not None:
if typecast is list or typecast is tuple or isinstance(__val,list) or isinstance(__val,tuple):
for v in __val:
- if v not in listval:
+ if listval is not None and v in listval: continue
+ elif listadv is not None and v in listadv: continue
+ else:
raise ValueError("The value '%s' is not allowed for the parameter named '%s', it has to be in the list %s."%(v, __k, listval))
- elif __val not in listval:
+ elif not (listval is not None and __val in listval) and not (listadv is not None and __val in listadv):
raise ValueError("The value '%s' is not allowed for the parameter named '%s', it has to be in the list %s."%( __val, __k,listval))
#
return __val
pass
logging.debug("%s %s : %s", self._name, self.__required_parameters[k]["message"], self._parameters[k])
+ def _getTimeState(self, reset=False):
+ """
+ Initialise ou restitue le temps de calcul (cpu/elapsed) à la seconde
+ """
+ if reset:
+ self.__initial_cpu_time = time.process_time()
+ self.__initial_elapsed_time = time.perf_counter()
+ return 0., 0.
+ else:
+ self.__cpu_time = time.process_time() - self.__initial_cpu_time
+ self.__elapsed_time = time.perf_counter() - self.__initial_elapsed_time
+ return self.__cpu_time, self.__elapsed_time
+
+ def _StopOnTimeLimit(self, X=None, withReason=False):
+ "Stop criteria on time limit: True/False [+ Reason]"
+ c, e = self._getTimeState()
+ if "MaximumCpuTime" in self._parameters and c > self._parameters["MaximumCpuTime"]:
+ __SC, __SR = True, "Reached maximum CPU time (%.1fs > %.1fs)"%(c, self._parameters["MaximumCpuTime"])
+ elif "MaximumElapsedTime" in self._parameters and e > self._parameters["MaximumElapsedTime"]:
+ __SC, __SR = True, "Reached maximum elapsed time (%.1fs > %.1fs)"%(e, self._parameters["MaximumElapsedTime"])
+ else:
+ __SC, __SR = False, ""
+ if withReason:
+ return __SC, __SR
+ else:
+ return __SC
+
# ==============================================================================
class AlgorithmAndParameters(object):
"""
"Renvoie la liste des paramètres requis selon l'algorithme"
return self.__algorithm.getRequiredParameters(noDetails)
+ def getAlgorithmInputArguments(self):
+ "Renvoie la liste des entrées requises selon l'algorithme"
+ return self.__algorithm.getInputArguments()
+
+ def getAlgorithmAttributes(self):
+ "Renvoie la liste des attributs selon l'algorithme"
+ return self.__algorithm.setAttributes()
+
def setObserver(self, __V, __O, __I, __S):
if self.__algorithm is None \
or isinstance(self.__algorithm, dict) \
self.__P.update( dict(__Dict) )
#
if __Algo is not None:
- self.__P.update( {"Algorithm":__Algo} )
+ self.__P.update( {"Algorithm":str(__Algo)} )
def get(self, key = None):
"Vérifie l'existence d'une clé de variable ou de paramètres"
else:
raise ValueError("setting an observer has to be done over a variable name or a list of variable names.")
#
- if asString is not None:
- __FunctionText = asString
- elif (asTemplate is not None) and (asTemplate in Templates.ObserverTemplates):
- __FunctionText = Templates.ObserverTemplates[asTemplate]
- elif asScript is not None:
- __FunctionText = Interfaces.ImportFromScript(asScript).getstring()
- else:
- __FunctionText = ""
- __Function = ObserverF(__FunctionText)
- #
if asObsObject is not None:
self.__O = asObsObject
else:
+ __FunctionText = str(UserScript('Observer', asTemplate, asString, asScript))
+ __Function = Observer2Func(__FunctionText)
self.__O = __Function.getfunc()
#
for k in range(len(self.__V)):
"x.__str__() <==> str(x)"
return str(self.__V)+"\n"+str(self.__O)
+# ==============================================================================
+class UserScript(object):
+ """
+ Classe générale d'interface de type texte de script utilisateur
+ """
+ def __init__(self,
+ name = "GenericUserScript",
+ asTemplate = None,
+ asString = None,
+ asScript = None,
+ ):
+ """
+ """
+ self.__name = str(name)
+ #
+ if asString is not None:
+ self.__F = asString
+ elif self.__name == "UserPostAnalysis" and (asTemplate is not None) and (asTemplate in Templates.UserPostAnalysisTemplates):
+ self.__F = Templates.UserPostAnalysisTemplates[asTemplate]
+ elif self.__name == "Observer" and (asTemplate is not None) and (asTemplate in Templates.ObserverTemplates):
+ self.__F = Templates.ObserverTemplates[asTemplate]
+ elif asScript is not None:
+ self.__F = Interfaces.ImportFromScript(asScript).getstring()
+ else:
+ self.__F = ""
+
+ def __repr__(self):
+ "x.__repr__() <==> repr(x)"
+ return repr(self.__F)
+
+ def __str__(self):
+ "x.__str__() <==> str(x)"
+ return str(self.__F)
+
+# ==============================================================================
+class ExternalParameters(object):
+ """
+ Classe générale d'interface de type texte de script utilisateur
+ """
+ def __init__(self,
+ name = "GenericExternalParameters",
+ asDict = None,
+ asScript = None,
+ ):
+ """
+ """
+ self.__name = str(name)
+ self.__P = {}
+ #
+ self.updateParameters( asDict, asScript )
+
+ def updateParameters(self,
+ asDict = None,
+ asScript = None,
+ ):
+ "Mise a jour des parametres"
+ if asDict is None and asScript is not None:
+ __Dict = Interfaces.ImportFromScript(asScript).getvalue( self.__name, "ExternalParameters" )
+ else:
+ __Dict = asDict
+ #
+ if __Dict is not None:
+ self.__P.update( dict(__Dict) )
+
+ def get(self, key = None):
+ if key in self.__P:
+ return self.__P[key]
+ else:
+ return list(self.__P.keys())
+
+ def keys(self):
+ return list(self.__P.keys())
+
+ def pop(self, k, d):
+ return self.__P.pop(k, d)
+
+ def items(self):
+ return self.__P.items()
+
+ def __contains__(self, key=None):
+ "D.__contains__(k) -> True if D has a key k, else False"
+ return key in self.__P
+
# ==============================================================================
class State(object):
"""
elif self.isobject() and hasattr(self.__C,"choleskyI"):
return Covariance(self.__name+"H", asCovObject = self.__C.choleskyI() )
+ def sqrtm(self):
+ "Racine carrée matricielle"
+ if self.ismatrix():
+ import scipy
+ return Covariance(self.__name+"C", asCovariance = scipy.linalg.sqrtm(self.__C) )
+ elif self.isvector():
+ return Covariance(self.__name+"C", asEyeByVector = numpy.sqrt( self.__C ) )
+ elif self.isscalar():
+ return Covariance(self.__name+"C", asEyeByScalar = numpy.sqrt( self.__C ) )
+ elif self.isobject() and hasattr(self.__C,"sqrt"):
+ return Covariance(self.__name+"C", asCovObject = self.__C.sqrt() )
+
+ def sqrtmI(self):
+ "Inversion de la racine carrée matricielle"
+ if self.ismatrix():
+ import scipy
+ return Covariance(self.__name+"H", asCovariance = scipy.linalg.sqrtm(self.__C).I )
+ elif self.isvector():
+ return Covariance(self.__name+"H", asEyeByVector = 1.0 / numpy.sqrt( self.__C ) )
+ elif self.isscalar():
+ return Covariance(self.__name+"H", asEyeByScalar = 1.0 / numpy.sqrt( self.__C ) )
+ elif self.isobject() and hasattr(self.__C,"sqrtI"):
+ return Covariance(self.__name+"H", asCovObject = self.__C.sqrtI() )
+
def diag(self, msize=None):
"Diagonale de la matrice"
if self.ismatrix():
return self.shape[0]
# ==============================================================================
-class ObserverF(object):
+class Observer2Func(object):
"""
Creation d'une fonction d'observateur a partir de son texte
"""