# -*- coding: utf-8 -*-
#
-# Copyright (C) 2008-2020 EDF R&D
+# Copyright (C) 2008-2021 EDF R&D
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
import sys
import logging
import copy
+import time
import numpy
from functools import partial
from daCore import Persistence, PlatformInfo, Interfaces
"Renvoie le type"
return self.__Type
- def appliedTo(self, xValue, HValue = None, argsAsSerie = False):
+ def appliedTo(self, xValue, HValue = None, argsAsSerie = False, returnSerieAsArrayMatrix = False):
"""
Permet de restituer le résultat de l'application de l'opérateur à une
série d'arguments xValue. Cette méthode se contente d'appliquer, chaque
#
if _HValue is not None:
assert len(_xValue) == len(_HValue), "Incompatible number of elements in xValue and HValue"
- HxValue = []
+ _HxValue = []
for i in range(len(_HValue)):
- HxValue.append( numpy.asmatrix( numpy.ravel( _HValue[i] ) ).T )
+ _HxValue.append( numpy.asmatrix( numpy.ravel( _HValue[i] ) ).T )
if self.__AvoidRC:
- Operator.CM.storeValueInX(_xValue[i],HxValue[-1],self.__name)
+ Operator.CM.storeValueInX(_xValue[i],_HxValue[-1],self.__name)
else:
- HxValue = []
+ _HxValue = []
_xserie = []
_hindex = []
for i, xv in enumerate(_xValue):
_xserie.append( xv )
_hindex.append( i )
_hv = None
- HxValue.append( _hv )
+ _HxValue.append( _hv )
#
if len(_xserie)>0 and self.__Matrix is None:
if self.__extraArgs is None:
for i in _hindex:
_xv = _xserie.pop(0)
_hv = _hserie.pop(0)
- HxValue[i] = _hv
+ _HxValue[i] = _hv
if self.__AvoidRC:
Operator.CM.storeValueInX(_xv,_hv,self.__name)
#
- if argsAsSerie: return HxValue
- else: return HxValue[-1]
+ if returnSerieAsArrayMatrix:
+ _HxValue = numpy.stack([numpy.ravel(_hv) for _hv in _HxValue], axis=1)
+ #
+ if argsAsSerie: return _HxValue
+ else: return _HxValue[-1]
- def appliedControledFormTo(self, paires, argsAsSerie = False):
+ def appliedControledFormTo(self, paires, argsAsSerie = False, returnSerieAsArrayMatrix = False):
"""
Permet de restituer le résultat de l'application de l'opérateur à des
paires (xValue, uValue). Cette méthode se contente d'appliquer, son
PlatformInfo.isIterable( _xuValue, True, " in Operator.appliedControledFormTo" )
#
if self.__Matrix is not None:
- HxValue = []
+ _HxValue = []
for paire in _xuValue:
_xValue, _uValue = paire
self.__addOneMatrixCall()
- HxValue.append( self.__Matrix * _xValue )
+ _HxValue.append( self.__Matrix * _xValue )
else:
- HxValue = []
+ _xuArgs = []
for paire in _xuValue:
- _xuValue = []
_xValue, _uValue = paire
if _uValue is not None:
- _xuValue.append( paire )
+ _xuArgs.append( paire )
else:
- _xuValue.append( _xValue )
- self.__addOneMethodCall( len(_xuValue) )
+ _xuArgs.append( _xValue )
+ self.__addOneMethodCall( len(_xuArgs) )
if self.__extraArgs is None:
- HxValue = self.__Method( _xuValue ) # Calcul MF
+ _HxValue = self.__Method( _xuArgs ) # Calcul MF
else:
- HxValue = self.__Method( _xuValue, self.__extraArgs ) # Calcul MF
+ _HxValue = self.__Method( _xuArgs, self.__extraArgs ) # Calcul MF
+ #
+ if returnSerieAsArrayMatrix:
+ _HxValue = numpy.stack([numpy.ravel(_hv) for _hv in _HxValue], axis=1)
#
- if argsAsSerie: return HxValue
- else: return HxValue[-1]
+ if argsAsSerie: return _HxValue
+ else: return _HxValue[-1]
- def appliedInXTo(self, paires, argsAsSerie = False):
+ def appliedInXTo(self, paires, argsAsSerie = False, returnSerieAsArrayMatrix = False):
"""
Permet de restituer le résultat de l'application de l'opérateur à une
série d'arguments xValue, sachant que l'opérateur est valable en
PlatformInfo.isIterable( _nxValue, True, " in Operator.appliedInXTo" )
#
if self.__Matrix is not None:
- HxValue = []
+ _HxValue = []
for paire in _nxValue:
_xNominal, _xValue = paire
self.__addOneMatrixCall()
- HxValue.append( self.__Matrix * _xValue )
+ _HxValue.append( self.__Matrix * _xValue )
else:
self.__addOneMethodCall( len(_nxValue) )
if self.__extraArgs is None:
- HxValue = self.__Method( _nxValue ) # Calcul MF
+ _HxValue = self.__Method( _nxValue ) # Calcul MF
else:
- HxValue = self.__Method( _nxValue, self.__extraArgs ) # Calcul MF
+ _HxValue = self.__Method( _nxValue, self.__extraArgs ) # Calcul MF
+ #
+ if returnSerieAsArrayMatrix:
+ _HxValue = numpy.stack([numpy.ravel(_hv) for _hv in _HxValue], axis=1)
#
- if argsAsSerie: return HxValue
- else: return HxValue[-1]
+ if argsAsSerie: return _HxValue
+ else: return _HxValue[-1]
def asMatrix(self, ValueForMethodForm = "UnknownVoidValue", argsAsSerie = False):
"""
"Pré-calcul"
logging.debug("%s Lancement", self._name)
logging.debug("%s Taille mémoire utilisée de %.0f Mio"%(self._name, self._m.getUsedMemory("Mio")))
+ self._getTimeState(reset=True)
#
# Mise a jour des paramètres internes avec le contenu de Parameters, en
# reprenant les valeurs par défauts pour toutes celles non définies
logging.debug("%s Nombre d'évaluation(s) de l'opérateur d'observation direct/tangent/adjoint.: %i/%i/%i", self._name, _oH["Direct"].nbcalls(0),_oH["Tangent"].nbcalls(0),_oH["Adjoint"].nbcalls(0))
logging.debug("%s Nombre d'appels au cache d'opérateur d'observation direct/tangent/adjoint..: %i/%i/%i", self._name, _oH["Direct"].nbcalls(3),_oH["Tangent"].nbcalls(3),_oH["Adjoint"].nbcalls(3))
logging.debug("%s Taille mémoire utilisée de %.0f Mio", self._name, self._m.getUsedMemory("Mio"))
+ logging.debug("%s Durées d'utilisation CPU de %.1fs et elapsed de %.1fs", self._name, self._getTimeState()[0], self._getTimeState()[1])
logging.debug("%s Terminé", self._name)
return 0
"""
raise NotImplementedError("Mathematical assimilation calculation has not been implemented!")
- def defineRequiredParameter(self, name = None, default = None, typecast = None, message = None, minval = None, maxval = None, listval = None):
+ def defineRequiredParameter(self, name = None, default = None, typecast = None, message = None, minval = None, maxval = None, listval = None, listadv = None):
"""
Permet de définir dans l'algorithme des paramètres requis et leurs
caractéristiques par défaut.
"minval" : minval,
"maxval" : maxval,
"listval" : listval,
+ "listadv" : listadv,
"message" : message,
}
self.__canonical_parameter_name[name.lower()] = name
minval = self.__required_parameters[__k]["minval"]
maxval = self.__required_parameters[__k]["maxval"]
listval = self.__required_parameters[__k]["listval"]
+ listadv = self.__required_parameters[__k]["listadv"]
#
if value is None and default is None:
__val = None
raise ValueError("The parameter named '%s' of value '%s' can not be less than %s."%(__k, __val, minval))
if maxval is not None and (numpy.array(__val, float) > maxval).any():
raise ValueError("The parameter named '%s' of value '%s' can not be greater than %s."%(__k, __val, maxval))
- if listval is not None:
+ if listval is not None or listadv is not None:
if typecast is list or typecast is tuple or isinstance(__val,list) or isinstance(__val,tuple):
for v in __val:
- if v not in listval:
+ if listval is not None and v in listval: continue
+ elif listadv is not None and v in listadv: continue
+ else:
raise ValueError("The value '%s' is not allowed for the parameter named '%s', it has to be in the list %s."%(v, __k, listval))
- elif __val not in listval:
+ elif not (listval is not None and __val in listval) and not (listadv is not None and __val in listadv):
raise ValueError("The value '%s' is not allowed for the parameter named '%s', it has to be in the list %s."%( __val, __k,listval))
#
return __val
pass
logging.debug("%s %s : %s", self._name, self.__required_parameters[k]["message"], self._parameters[k])
+ def _getTimeState(self, reset=False):
+ """
+ Initialise ou restitue le temps de calcul (cpu/elapsed) à la seconde
+ """
+ if reset:
+ self.__initial_cpu_time = time.process_time()
+ self.__initial_elapsed_time = time.perf_counter()
+ return 0., 0.
+ else:
+ self.__cpu_time = time.process_time() - self.__initial_cpu_time
+ self.__elapsed_time = time.perf_counter() - self.__initial_elapsed_time
+ return self.__cpu_time, self.__elapsed_time
+
+ def _StopOnTimeLimit(self, X=None, withReason=False):
+ "Stop criteria on time limit: True/False [+ Reason]"
+ c, e = self._getTimeState()
+ if "MaximumCpuTime" in self._parameters and c > self._parameters["MaximumCpuTime"]:
+ __SC, __SR = True, "Reached maximum CPU time (%.1fs > %.1fs)"%(c, self._parameters["MaximumCpuTime"])
+ elif "MaximumElapsedTime" in self._parameters and e > self._parameters["MaximumElapsedTime"]:
+ __SC, __SR = True, "Reached maximum elapsed time (%.1fs > %.1fs)"%(e, self._parameters["MaximumElapsedTime"])
+ else:
+ __SC, __SR = False, ""
+ if withReason:
+ return __SC, __SR
+ else:
+ return __SC
+
# ==============================================================================
class AlgorithmAndParameters(object):
"""
elif self.isobject() and hasattr(self.__C,"choleskyI"):
return Covariance(self.__name+"H", asCovObject = self.__C.choleskyI() )
+ def sqrtm(self):
+ "Racine carrée matricielle"
+ if self.ismatrix():
+ import scipy
+ return Covariance(self.__name+"C", asCovariance = scipy.linalg.sqrtm(self.__C) )
+ elif self.isvector():
+ return Covariance(self.__name+"C", asEyeByVector = numpy.sqrt( self.__C ) )
+ elif self.isscalar():
+ return Covariance(self.__name+"C", asEyeByScalar = numpy.sqrt( self.__C ) )
+ elif self.isobject() and hasattr(self.__C,"sqrt"):
+ return Covariance(self.__name+"C", asCovObject = self.__C.sqrt() )
+
+ def sqrtmI(self):
+ "Inversion de la racine carrée matricielle"
+ if self.ismatrix():
+ import scipy
+ return Covariance(self.__name+"H", asCovariance = scipy.linalg.sqrtm(self.__C).I )
+ elif self.isvector():
+ return Covariance(self.__name+"H", asEyeByVector = 1.0 / numpy.sqrt( self.__C ) )
+ elif self.isscalar():
+ return Covariance(self.__name+"H", asEyeByScalar = 1.0 / numpy.sqrt( self.__C ) )
+ elif self.isobject() and hasattr(self.__C,"sqrtI"):
+ return Covariance(self.__name+"H", asCovObject = self.__C.sqrtI() )
+
def diag(self, msize=None):
"Diagonale de la matrice"
if self.ismatrix():