__author__ = "Jean-Philippe ARGAUD"
__all__ = []
-import logging, copy
+import logging, copy, os
import numpy
from daCore import Persistence
from daCore import PlatformInfo
name = "GenericVector",
asVector = None,
asPersistentVector = None,
+ fromScript = None,
Scheduler = None,
toBeChecked = False,
):
"""
Permet de définir un vecteur :
- - asVector : entrée des données, comme un vecteur compatible avec
- le constructeur de numpy.matrix
- - asPersistentVector : entrée des données, comme une série de
- vecteurs compatible avec le constructeur de numpy.matrix, ou
- comme un objet de type Persistence
+ - asVector : entrée des données, comme un vecteur compatible avec le
+ constructeur de numpy.matrix, ou "True" si entrée par script.
+ - asPersistentVector : entrée des données, comme une série de vecteurs
+ compatible avec le constructeur de numpy.matrix, ou comme un objet de
+ type Persistence, ou "True" si entrée par script.
+ - fromScript : si un script valide est donné contenant une variable
+ nommée "name", la variable est de type "asVector" (par défaut) ou
+ "asPersistentVector" selon que l'une de ces variables est placée à
+ "True".
"""
self.__name = str(name)
self.__check = bool(toBeChecked)
self.__T = None
self.__is_vector = False
self.__is_series = False
- if asVector is not None:
+ #
+ if fromScript is not None:
+ __Vector, __Series = None, None
+ if VectorSerie:
+ __Series = ImportFromScript(fromScript).getvalue( self.__name )
+ else:
+ __Vector = ImportFromScript(fromScript).getvalue( self.__name )
+ else:
+ __Vector, __Series = asVector, asPersistentVector
+ #
+ if __Vector is not None:
self.__is_vector = True
self.__V = numpy.matrix( numpy.ravel(numpy.matrix(asVector)), numpy.float ).T
self.shape = self.__V.shape
self.size = self.__V.size
- elif asPersistentVector is not None:
+ elif __Series is not None:
self.__is_series = True
- if type(asPersistentVector) in [tuple, list, numpy.ndarray, numpy.matrix]:
+ if type(__Series) in [tuple, list, numpy.ndarray, numpy.matrix]:
self.__V = Persistence.OneVector(self.__name, basetype=numpy.matrix)
- for member in asPersistentVector:
+ for member in __Series:
self.__V.store( numpy.matrix( numpy.asmatrix(member).A1, numpy.float ).T )
import sys ; sys.stdout.flush()
else:
- self.__V = asPersistentVector
+ self.__V = __Series
if type(self.__V.shape) in (tuple, list):
self.shape = self.__V.shape
else:
asEyeByScalar = None,
asEyeByVector = None,
asCovObject = None,
+ fromScript = None,
toBeChecked = False,
):
"""
self.__is_vector = False
self.__is_matrix = False
self.__is_object = False
- if asEyeByScalar is not None:
- if numpy.matrix(asEyeByScalar).size != 1:
- raise ValueError(' The diagonal multiplier given to define a sparse matrix is not a unique scalar value.\n Its actual measured size is %i. Please check your scalar input.'%numpy.matrix(asEyeByScalar).size)
+ #
+ if fromScript is not None:
+ __Matrix, __Scalar, __Vector, __Object = None, None, None, None
+ if asEyeByScalar:
+ __Scalar = _ImportFromScript(Script).getvalue( "BackgroundError" )
+ elif asEyeByVector:
+ __Vector = _ImportFromScript(Script).getvalue( "BackgroundError" )
+ elif asCovObject:
+ __Object = _ImportFromScript(Script).getvalue( "BackgroundError" )
+ else:
+ __Matrix = _ImportFromScript(Script).getvalue( "BackgroundError" )
+ else:
+ __Matrix, __Scalar, __Vector, __Object = asCovariance, asEyeByScalar, asEyeByVector, asCovObject
+ #
+ if __Scalar is not None:
+ if numpy.matrix(__Scalar).size != 1:
+ raise ValueError(' The diagonal multiplier given to define a sparse matrix is not a unique scalar value.\n Its actual measured size is %i. Please check your scalar input.'%numpy.matrix(__Scalar).size)
self.__is_scalar = True
- self.__C = numpy.abs( float(asEyeByScalar) )
+ self.__C = numpy.abs( float(__Scalar) )
self.shape = (0,0)
self.size = 0
- elif asEyeByVector is not None:
+ elif __Vector is not None:
self.__is_vector = True
- self.__C = numpy.abs( numpy.array( numpy.ravel( numpy.matrix(asEyeByVector, float ) ) ) )
+ self.__C = numpy.abs( numpy.array( numpy.ravel( numpy.matrix(__Vector, float ) ) ) )
self.shape = (self.__C.size,self.__C.size)
self.size = self.__C.size**2
- elif asCovariance is not None:
+ elif __Matrix is not None:
self.__is_matrix = True
- self.__C = numpy.matrix( asCovariance, float )
+ self.__C = numpy.matrix( __Matrix, float )
self.shape = self.__C.shape
self.size = self.__C.size
- elif asCovObject is not None:
+ elif __Object is not None:
self.__is_object = True
- self.__C = asCovObject
+ self.__C = __Object
for at in ("getT","getI","diag","trace","__add__","__sub__","__neg__","__mul__","__rmul__"):
if not hasattr(self.__C,at):
raise ValueError("The matrix given for %s as an object has no attribute \"%s\". Please check your object input."%(self.__name,at))
"x.__len__() <==> len(x)"
return self.shape[0]
+# ==============================================================================
+class ObserverF(object):
+ """
+ Creation d'une fonction d'observateur a partir de son texte
+ """
+ def __init__(self, corps=""):
+ self.__corps = corps
+ def func(self,var,info):
+ "Fonction d'observation"
+ exec(self.__corps)
+ def getfunc(self):
+ "Restitution du pointeur de fonction dans l'objet"
+ return self.func
+
+# ==============================================================================
+class ImportFromScript(object):
+ """
+ Obtention d'une variable nommee depuis un fichier script importe
+ """
+ def __init__(self, __filename=None):
+ "Verifie l'existence et importe le script"
+ __filename = __filename.rstrip(".py")
+ if __filename is None:
+ raise ValueError("The name of the file containing the variable to be imported has to be specified.")
+ if not os.path.isfile(str(__filename)+".py"):
+ raise ValueError("The file containing the variable to be imported doesn't seem to exist. The given file name is:\n \"%s\""%__filename)
+ self.__scriptfile = __import__(__filename, globals(), locals(), [])
+ self.__scriptstring = open(__filename+".py",'r').read()
+ def getvalue(self, __varname=None, __synonym=None ):
+ "Renvoie la variable demandee"
+ if __varname is None:
+ raise ValueError("The name of the variable to be imported has to be specified.")
+ if not hasattr(self.__scriptfile, __varname):
+ if __synonym is None:
+ raise ValueError("The imported script file doesn't contain the specified variable \"%s\"."%__varname)
+ elif not hasattr(self.__scriptfile, __synonym):
+ raise ValueError("The imported script file doesn't contain the specified variable \"%s\"."%__synonym)
+ else:
+ return getattr(self.__scriptfile, __synonym)
+ else:
+ return getattr(self.__scriptfile, __varname)
+ def getstring(self):
+ "Renvoie le script complet"
+ return self.__scriptstring
+
# ==============================================================================
def CostFunction3D(_x,
_Hm = None, # Pour simuler Hm(x) : HO["Direct"].appliedTo
# Author: Jean-Philippe Argaud, jean-philippe.argaud@edf.fr, EDF R&D
"""
- Interface de scripting pour une étude ADAO
+ Normalized interface for ADAO scripting (full version API)
"""
__author__ = "Jean-Philippe ARGAUD"
__all__ = ["New"]
import os
-from daCore import AssimilationStudy, Templates
+from daCore.AssimilationStudy import AssimilationStudy as _AssimilationStudy
+from daCore.Templates import ObserverTemplates as _ObserverTemplates
+from daCore.BasicObjects import ImportFromScript as _ImportFromScript
+from daCore.BasicObjects import ObserverF as _ObserverF
# ==============================================================================
class New(object):
Creation TUI d'un cas ADAO
"""
def __init__(self, name = ""):
- self.__adaoStudy = AssimilationStudy.AssimilationStudy(name)
- self.__dumper = _DumpLogger(name)
+ self.__adaoStudy = _AssimilationStudy(name)
+ self.__case = _CaseLogger(name)
# -----------------------------------------------------------
Variable = None,
Vector = None,
VectorSerie = None):
- "Interface unique de définition de variables d'entrées par argument"
- self.__dumper.register("set",dir(),locals(),None,True)
+ "Interface unique de definition de variables d'entrees par argument"
+ self.__case.register("set",dir(),locals(),None,True)
try:
if Concept == "Background":
self.setBackground(Vector,VectorSerie,Script,Stored)
VectorSerie = None,
Script = None,
Stored = False):
- "Définition d'une entrée de calcul"
- self.__dumper.register("setBackground", dir(), locals())
+ "Definition d'une entree de calcul"
+ self.__case.register("setBackground", dir(), locals())
if Script is not None:
__Vector, __PersistentVector = None, None
if VectorSerie:
DiagonalSparseMatrix = None,
Script = None,
Stored = False):
- "Définition d'une entrée de calcul"
- self.__dumper.register("setBackgroundError", dir(), locals())
+ "Definition d'une entree de calcul"
+ self.__case.register("setBackgroundError", dir(), locals())
if Script is not None:
__Covariance, __Scalar, __Vector = None, None, None
if ScalarSparseMatrix:
VectorSerie = None,
Script = None,
Stored = False):
- "Définition d'une entrée de vérification"
- self.__dumper.register("setCheckingPoint", dir(), locals())
+ "Definition d'une entree de verification"
+ self.__case.register("setCheckingPoint", dir(), locals())
if Script is not None:
__Vector, __PersistentVector = None, None
if VectorSerie:
Parameters = None,
Script = None,
Stored = False):
- "Définition d'une entrée de calcul"
- self.__dumper.register("setControlModel", dir(), locals())
+ "Definition d'une entree de calcul"
+ self.__case.register("setControlModel", dir(), locals())
__Parameters = {}
if (Parameters is not None) and isinstance(Parameters, dict):
if "DifferentialIncrement" in Parameters:
VectorSerie = None,
Script = None,
Stored = False):
- "Définition d'une entrée de calcul"
- self.__dumper.register("setControlInput", dir(), locals())
+ "Definition d'une entree de calcul"
+ self.__case.register("setControlInput", dir(), locals())
if Script is not None:
__Vector, __PersistentVector = None, None
if VectorSerie:
DiagonalSparseMatrix = None,
Script = None,
Stored = False):
- "Définition d'une entrée de calcul"
- self.__dumper.register("setEvolutionError", dir(), locals())
+ "Definition d'une entree de calcul"
+ self.__case.register("setEvolutionError", dir(), locals())
if Script is not None:
__Covariance, __Scalar, __Vector = None, None, None
if ScalarSparseMatrix:
Parameters = None,
Script = None,
Stored = False):
- "Définition d'une entrée de calcul"
- self.__dumper.register("setEvolutionModel", dir(), locals())
+ "Definition d'une entree de calcul"
+ self.__case.register("setEvolutionModel", dir(), locals())
__Parameters = {}
if (Parameters is not None) and isinstance(Parameters, dict):
if "DifferentialIncrement" in Parameters:
VectorSerie = None,
Script = None,
Stored = False):
- "Définition d'une entrée de calcul"
- self.__dumper.register("setObservation", dir(), locals())
+ "Definition d'une entree de calcul"
+ self.__case.register("setObservation", dir(), locals())
if Script is not None:
__Vector, __PersistentVector = None, None
if VectorSerie:
DiagonalSparseMatrix = None,
Script = None,
Stored = False):
- "Définition d'une entrée de calcul"
- self.__dumper.register("setObservationError", dir(), locals())
+ "Definition d'une entree de calcul"
+ self.__case.register("setObservationError", dir(), locals())
if Script is not None:
__Covariance, __Scalar, __Vector = None, None, None
if ScalarSparseMatrix:
Parameters = None,
Script = None,
Stored = False):
- "Définition d'une entrée de calcul"
- self.__dumper.register("setObservationOperator", dir(), locals())
+ "Definition d'une entree de calcul"
+ self.__case.register("setObservationOperator", dir(), locals())
__Parameters = {}
if (Parameters is not None) and isinstance(Parameters, dict):
if "DifferentialIncrement" in Parameters:
Algorithm = None,
Parameters = None,
Script = None):
- "Définition d'un paramétrage du calcul"
- self.__dumper.register("setAlgorithmParameters", dir(), locals())
+ "Definition d'un parametrage du calcul"
+ self.__case.register("setAlgorithmParameters", dir(), locals())
if Script is not None:
__Algorithm = _ImportFromScript(Script).getvalue( "Algorithm" )
__Parameters = _ImportFromScript(Script).getvalue( "AlgorithmParameters", "Parameters" )
self.__adaoStudy.setAlgorithmParameters( asDico = __Parameters )
def setDebug(self):
- "Définition d'un paramétrage du calcul"
- self.__dumper.register("setDebug",dir(),locals())
+ "Definition d'un parametrage du calcul"
+ self.__case.register("setDebug",dir(),locals())
return self.__adaoStudy.setDebug()
def setNoDebug(self):
- "Définition d'un paramétrage du calcul"
- self.__dumper.register("setNoDebug",dir(),locals())
+ "Definition d'un parametrage du calcul"
+ self.__case.register("setNoDebug",dir(),locals())
return self.__adaoStudy.unsetDebug()
def setObserver(
String = None,
Script = None,
Info = None):
- "Définition d'un paramétrage du calcul"
- self.__dumper.register("setObserver", dir(), locals())
+ "Definition d'un parametrage du calcul"
+ self.__case.register("setObserver", dir(), locals())
if Variable is None:
raise ValueError("setting an observer has to be done over a variable name, not over None.")
else:
#
if String is not None:
__FunctionText = String
- elif (Template is not None) and (Template in Templates.ObserverTemplates):
- __FunctionText = Templates.ObserverTemplates[Template]
+ elif (Template is not None) and (Template in _ObserverTemplates):
+ __FunctionText = _ObserverTemplates[Template]
elif Script is not None:
__FunctionText = _ImportFromScript(Script).getstring()
else:
def executePythonScheme(self):
"Lancement du calcul"
- self.__dumper.register("executePythonScheme", dir(), locals())
+ self.__case.register("executePythonScheme", dir(), locals())
try:
self.__adaoStudy.analyze()
except Exception as e:
def executeYACSScheme(self, File=None):
"Lancement du calcul"
- self.__dumper.register("executeYACSScheme", dir(), locals())
+ self.__case.register("executeYACSScheme", dir(), locals())
raise NotImplementedError()
# -----------------------------------------------------------
def get(self, Concept=None):
- "Récupération d'une sortie du calcul"
- self.__dumper.register("get",dir(),locals(),Concept)
+ "Recuperation d'une sortie du calcul"
+ self.__case.register("get",dir(),locals(),Concept)
return self.__adaoStudy.get(Concept)
def dumpNormalizedCommands(self, filename=None):
- "Récupération de la liste des commandes du cas"
- return self.__dumper.dump(filename)
+ "Recuperation de la liste des commandes du cas TUI"
+ return self.__case.dump(filename, "TUI")
def __dir__(self):
return ['set', 'get', 'execute', '__doc__', '__init__', '__module__']
-class _DumpLogger(object):
+# ==============================================================================
+class _CaseLogger(object):
"""
- Conservation des commandes de création d'un cas
+ Conservation des commandes de creation d'un cas
"""
def __init__(self, __name="", __objname="case"):
self.__name = str(__name)
self.__objname = str(__objname)
self.__logSerie = []
self.__switchoff = False
- self.__logSerie.append("#\n# Python script for ADAO TUI\n#")
- self.__logSerie.append("from numpy import array, matrix")
- self.__logSerie.append("import adaoBuilder")
- self.__logSerie.append("%s = adaoBuilder.New('%s')"%(self.__objname, self.__name))
def register(self, __command=None, __keys=None, __local=None, __pre=None, __switchoff=False):
"Enregistrement d'une commande individuelle"
if __command is not None and __keys is not None and __local is not None and not self.__switchoff:
+ if "self" in __keys: __keys.remove("self")
+ self.__logSerie.append( (str(__command), __keys, __local, __pre, __switchoff) )
+ if __switchoff:
+ self.__switchoff = True
+ if not __switchoff:
+ self.__switchoff = False
+ def dump(self, __filename=None, __format="TUI"):
+ if __format == "TUI":
+ self.__dumper = _TUIViewer(self.__name, self.__objname, self.__logSerie)
+ __text = self.__dumper.dump(__filename)
+ else:
+ raise ValueError("Dumping as \"%s\" is not available"%__format)
+ return __text
+
+# ==============================================================================
+class _GenericViewer(object):
+ """
+ Etablissement des commandes de creation d'une vue
+ """
+ def __init__(self, __name="", __objname="case", __content=None):
+ self._name = str(__name)
+ self._objname = str(__objname)
+ self._lineSerie = []
+ self._switchoff = False
+ self._numobservers = 1
+ def _addLine(self, line=""):
+ self._lineSerie.append(line)
+ def _append(self):
+ "Enregistrement d'une commande individuelle"
+ raise NotImplementedError()
+ def dump(self, __filename=None):
+ "Restitution de la liste des commandes de creation d'un cas"
+ raise NotImplementedError()
+
+class _TUIViewer(_GenericViewer):
+ """
+ Etablissement des commandes de creation d'un cas TUI
+ """
+ def __init__(self, __name="", __objname="case", __content=None):
+ _GenericViewer.__init__(self, __name, __objname, __content)
+ self._addLine("#\n# Python script for ADAO TUI\n#")
+ self._addLine("from numpy import array, matrix")
+ self._addLine("import adaoBuilder")
+ self._addLine("%s = adaoBuilder.New('%s')"%(self._objname, self._name))
+ if __content is not None:
+ for command in __content:
+ self._append(*command)
+ def _append(self, __command=None, __keys=None, __local=None, __pre=None, __switchoff=False):
+ if __command is not None and __keys is not None and __local is not None and not self._switchoff:
__text = ""
if __pre is not None:
__text += "%s = "%__pre
- __text += "%s.%s( "%(self.__objname,str(__command))
- __keys.remove("self")
+ __text += "%s.%s( "%(self._objname,str(__command))
+ if "self" in __keys: __keys.remove("self")
for k in __keys:
__v = __local[k]
if __v is None: continue
__text += "%s=%s, "%(k,repr(__v))
__text += ")"
- self.__logSerie.append(__text)
+ self._addLine(__text)
if __switchoff:
- self.__switchoff = True
+ self._switchoff = True
if not __switchoff:
- self.__switchoff = False
+ self._switchoff = False
def dump(self, __filename=None):
- "Restitution de la liste des commandes de création d'un cas"
- __text = "\n".join(self.__logSerie)
+ __text = "\n".join(self._lineSerie)
if __filename is not None:
fid = open(__filename,"w")
fid.write(__text)
fid.close()
return __text
-class _ObserverF(object):
- """
- Création d'une fonction d'observateur à partir de son texte
- """
- def __init__(self, corps=""):
- self.__corps = corps
- def func(self,var,info):
- "Fonction d'observation"
- exec(self.__corps)
- def getfunc(self):
- "Restitution du pointeur de fonction dans l'objet"
- return self.func
-
-class _ImportFromScript(object):
- """
- Obtention d'une variable nommée depuis un fichier script importé
- """
- def __init__(self, __filename=None):
- "Verifie l'existence et importe le script"
- __filename = __filename.rstrip(".py")
- if __filename is None:
- raise ValueError("The name of the file containing the variable to be imported has to be specified.")
- if not os.path.isfile(str(__filename)+".py"):
- raise ValueError("The file containing the variable to be imported doesn't seem to exist. The given file name is:\n \"%s\""%__filename)
- self.__scriptfile = __import__(__filename, globals(), locals(), [])
- self.__scriptstring = open(__filename+".py",'r').read()
- def getvalue(self, __varname=None, __synonym=None ):
- "Renvoie la variable demandee"
- if __varname is None:
- raise ValueError("The name of the variable to be imported has to be specified.")
- if not hasattr(self.__scriptfile, __varname):
- if __synonym is None:
- raise ValueError("The imported script file doesn't contain the specified variable \"%s\"."%__varname)
- elif not hasattr(self.__scriptfile, __synonym):
- raise ValueError("The imported script file doesn't contain the specified variable \"%s\"."%__synonym)
- else:
- return getattr(self.__scriptfile, __synonym)
- else:
- return getattr(self.__scriptfile, __varname)
- def getstring(self):
- "Renvoie le script complet"
- return self.__scriptstring
-
# ==============================================================================
if __name__ == "__main__":
print('\n AUTODIAGNOSTIC \n')