# -*- coding: utf-8 -*-
#
-# Copyright (C) 2008-2018 EDF R&D
+# Copyright (C) 2008-2022 EDF R&D
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# Author: Jean-Philippe Argaud, jean-philippe.argaud@edf.fr, EDF R&D
"""
- Normalized interface for ADAO scripting (generic API)
+ Normalized interface for ADAO scripting (generic API).
"""
__author__ = "Jean-Philippe ARGAUD"
__all__ = ["Aidsm"]
import os
import sys
+import inspect
#
from daCore.BasicObjects import State, Covariance, FullOperator, Operator
from daCore.BasicObjects import AlgorithmAndParameters, DataObserver
-from daCore.BasicObjects import CaseLogger
+from daCore.BasicObjects import RegulationAndParameters, CaseLogger
+from daCore.BasicObjects import UserScript, ExternalParameters
from daCore import PlatformInfo
#
from daCore import ExtendedLogging ; ExtendedLogging.ExtendedLogging() # A importer en premier
class Aidsm(object):
""" ADAO Internal Data Structure Model """
def __init__(self, name = "", addViewers=None):
- self.__name = str(name)
+ self.__name = str(name)
+ self.__objname = None
self.__directory = None
self.__case = CaseLogger(self.__name, "case", addViewers)
#
self.__StoredInputs = {}
self.__PostAnalysis = []
#
- self.__Concepts = [
+ self.__Concepts = [ # Liste exhaustive
"AlgorithmParameters",
"Background",
+ "BackgroundError",
"CheckingPoint",
"ControlInput",
- "Observation",
- "BackgroundError",
- "ObservationError",
+ "ControlModel",
+ "Debug",
+ "Directory",
"EvolutionError",
- "ObservationOperator",
"EvolutionModel",
- "ControlModel",
"Name",
- "Directory",
- "Debug",
"NoDebug",
+ "Observation",
+ "ObservationError",
+ "ObservationOperator",
"Observer",
+ "RegulationParameters",
+ "SupplementaryParameters",
+ "UserPostAnalysis",
]
#
for ename in self.__Concepts:
self.__adaoObject[ename] = None
for ename in ("ObservationOperator", "EvolutionModel", "ControlModel"):
self.__adaoObject[ename] = {}
- for ename in ("Observer",):
+ for ename in ("BackgroundError", "ObservationError"):
+ self.__adaoObject[ename] = Covariance(ename, asEyeByScalar = 1.)
+ for ename in ("EvolutionError",):
+ self.__adaoObject[ename] = Covariance(ename, asEyeByScalar = 1.e-16)
+ for ename in ("Observer", "UserPostAnalysis"):
self.__adaoObject[ename] = []
- self.__StoredInputs[ename] = []
+ self.__StoredInputs[ename] = [] # Vide par defaut
+ self.__StoredInputs["Name"] = self.__name
+ self.__StoredInputs["Directory"] = self.__directory
#
# Récupère le chemin du répertoire parent et l'ajoute au path
# (Cela complète l'action de la classe PathManagement dans PlatformInfo,
Concept = None, # Premier argument
Algorithm = None,
AppliedInXb = None,
- AvoidRC = True,
Checked = False,
+ ColMajor = False,
+ ColNames = None,
+ DataFile = None,
DiagonalSparseMatrix = None,
+ ExtraArguments = None,
Info = None,
+ InputFunctionAsMulti = False,
Matrix = None,
ObjectFunction = None,
ObjectMatrix = None,
OneFunction = None,
Parameters = None,
+ PerformanceProfile = None,
ScalarSparseMatrix = None,
Scheduler = None,
Script = None,
try:
if Concept in ("Background", "CheckingPoint", "ControlInput", "Observation"):
commande = getattr(self,"set"+Concept)
- commande(Vector, VectorSerie, Script, Stored, Scheduler, Checked )
+ commande(Vector, VectorSerie, Script, DataFile, ColNames, ColMajor, Stored, Scheduler, Checked )
elif Concept in ("BackgroundError", "ObservationError", "EvolutionError"):
commande = getattr(self,"set"+Concept)
commande(Matrix, ScalarSparseMatrix, DiagonalSparseMatrix,
Script, Stored, ObjectMatrix, Checked )
elif Concept == "AlgorithmParameters":
self.setAlgorithmParameters( Algorithm, Parameters, Script )
+ elif Concept == "RegulationParameters":
+ self.setRegulationParameters( Algorithm, Parameters, Script )
elif Concept == "Name":
self.setName(String)
elif Concept == "Directory":
self.setNoDebug()
elif Concept == "Observer":
self.setObserver( Variable, Template, String, Script, Info, ObjectFunction, Scheduler )
+ elif Concept == "UserPostAnalysis":
+ self.setUserPostAnalysis( Template, String, Script )
+ elif Concept == "SupplementaryParameters":
+ self.setSupplementaryParameters( Parameters, Script )
elif Concept == "ObservationOperator":
self.setObservationOperator(
Matrix, OneFunction, ThreeFunctions, AppliedInXb,
- Parameters, Script, Stored, AvoidRC, Checked )
+ Parameters, Script, ExtraArguments,
+ Stored, PerformanceProfile, InputFunctionAsMulti, Checked )
elif Concept in ("EvolutionModel", "ControlModel"):
commande = getattr(self,"set"+Concept)
commande(
Matrix, OneFunction, ThreeFunctions,
- Parameters, Script, Scheduler, Stored, AvoidRC, Checked )
-
+ Parameters, Script, Scheduler, ExtraArguments,
+ Stored, PerformanceProfile, InputFunctionAsMulti, Checked )
else:
raise ValueError("the variable named '%s' is not allowed."%str(Concept))
except Exception as e:
- if isinstance(e, SyntaxError): msg = "at %s: %s"%(e.offset, e.text)
+ if isinstance(e, SyntaxError): msg = " at %s: %s"%(e.offset, e.text)
else: msg = ""
raise ValueError(("during settings, the following error occurs:\n"+\
- "\n%s %s\n\nSee also the potential messages, "+\
+ "\n%s%s\n\nSee also the potential messages, "+\
"which can show the origin of the above error, "+\
"in the launching terminal.")%(str(e),msg))
Vector = None,
VectorSerie = None,
Script = None,
+ DataFile = None,
+ ColNames = None,
+ ColMajor = False,
Stored = False,
Scheduler = None,
Checked = False):
name = Concept,
asVector = Vector,
asPersistentVector = VectorSerie,
- asScript = self.with_directory(Script),
+ asScript = self.__with_directory(Script),
+ asDataFile = DataFile,
+ colNames = ColNames,
+ colMajor = ColMajor,
scheduledBy = Scheduler,
toBeChecked = Checked,
)
Vector = None,
VectorSerie = None,
Script = None,
+ DataFile = None,
+ ColNames = None,
+ ColMajor = False,
Stored = False,
Scheduler = None,
Checked = False):
name = Concept,
asVector = Vector,
asPersistentVector = VectorSerie,
- asScript = self.with_directory(Script),
+ asScript = self.__with_directory(Script),
+ asDataFile = DataFile,
+ colNames = ColNames,
+ colMajor = ColMajor,
scheduledBy = Scheduler,
toBeChecked = Checked,
)
Vector = None,
VectorSerie = None,
Script = None,
+ DataFile = None,
+ ColNames = None,
+ ColMajor = False,
Stored = False,
Scheduler = None,
Checked = False):
name = Concept,
asVector = Vector,
asPersistentVector = VectorSerie,
- asScript = self.with_directory(Script),
+ asScript = self.__with_directory(Script),
+ asDataFile = DataFile,
+ colNames = ColNames,
+ colMajor = ColMajor,
scheduledBy = Scheduler,
toBeChecked = Checked,
)
Vector = None,
VectorSerie = None,
Script = None,
+ DataFile = None,
+ ColNames = None,
+ ColMajor = False,
Stored = False,
Scheduler = None,
Checked = False):
name = Concept,
asVector = Vector,
asPersistentVector = VectorSerie,
- asScript = self.with_directory(Script),
+ asScript = self.__with_directory(Script),
+ asDataFile = DataFile,
+ colNames = ColNames,
+ colMajor = ColMajor,
scheduledBy = Scheduler,
toBeChecked = Checked,
)
asEyeByScalar = ScalarSparseMatrix,
asEyeByVector = DiagonalSparseMatrix,
asCovObject = ObjectMatrix,
- asScript = self.with_directory(Script),
+ asScript = self.__with_directory(Script),
toBeChecked = Checked,
)
if Stored:
asEyeByScalar = ScalarSparseMatrix,
asEyeByVector = DiagonalSparseMatrix,
asCovObject = ObjectMatrix,
- asScript = self.with_directory(Script),
+ asScript = self.__with_directory(Script),
toBeChecked = Checked,
)
if Stored:
asEyeByScalar = ScalarSparseMatrix,
asEyeByVector = DiagonalSparseMatrix,
asCovObject = ObjectMatrix,
- asScript = self.with_directory(Script),
+ asScript = self.__with_directory(Script),
toBeChecked = Checked,
)
if Stored:
return 0
def setObservationOperator(self,
- Matrix = None,
- OneFunction = None,
- ThreeFunctions = None,
- AppliedInXb = None,
- Parameters = None,
- Script = None,
- Stored = False,
- AvoidRC = True,
- Checked = False):
+ Matrix = None,
+ OneFunction = None,
+ ThreeFunctions = None,
+ AppliedInXb = None,
+ Parameters = None,
+ Script = None,
+ ExtraArguments = None,
+ Stored = False,
+ PerformanceProfile = None,
+ InputFunctionAsMulti = False,
+ Checked = False):
"Definition d'un concept de calcul"
Concept = "ObservationOperator"
self.__case.register("set"+Concept, dir(), locals())
asMatrix = Matrix,
asOneFunction = OneFunction,
asThreeFunctions = ThreeFunctions,
- asScript = self.with_directory(Script),
+ asScript = self.__with_directory(Script),
asDict = Parameters,
appliedInX = AppliedInXb,
- avoidRC = AvoidRC,
+ extraArguments = ExtraArguments,
+ performancePrf = PerformanceProfile,
+ inputAsMF = InputFunctionAsMulti,
scheduledBy = None,
toBeChecked = Checked,
)
return 0
def setEvolutionModel(self,
- Matrix = None,
- OneFunction = None,
- ThreeFunctions = None,
- Parameters = None,
- Script = None,
- Stored = False,
- Scheduler = None,
- AvoidRC = True,
- Checked = False):
+ Matrix = None,
+ OneFunction = None,
+ ThreeFunctions = None,
+ Parameters = None,
+ Script = None,
+ Scheduler = None,
+ ExtraArguments = None,
+ Stored = False,
+ PerformanceProfile = None,
+ InputFunctionAsMulti = False,
+ Checked = False):
"Definition d'un concept de calcul"
Concept = "EvolutionModel"
self.__case.register("set"+Concept, dir(), locals())
asMatrix = Matrix,
asOneFunction = OneFunction,
asThreeFunctions = ThreeFunctions,
- asScript = self.with_directory(Script),
+ asScript = self.__with_directory(Script),
asDict = Parameters,
appliedInX = None,
- avoidRC = AvoidRC,
+ extraArguments = ExtraArguments,
+ performancePrf = PerformanceProfile,
+ inputAsMF = InputFunctionAsMulti,
scheduledBy = Scheduler,
toBeChecked = Checked,
)
return 0
def setControlModel(self,
- Matrix = None,
- OneFunction = None,
- ThreeFunctions = None,
- Parameters = None,
- Script = None,
- Stored = False,
- Scheduler = None,
- AvoidRC = True,
- Checked = False):
+ Matrix = None,
+ OneFunction = None,
+ ThreeFunctions = None,
+ Parameters = None,
+ Script = None,
+ Scheduler = None,
+ ExtraArguments = None,
+ Stored = False,
+ PerformanceProfile = None,
+ InputFunctionAsMulti = False,
+ Checked = False):
"Definition d'un concept de calcul"
Concept = "ControlModel"
self.__case.register("set"+Concept, dir(), locals())
asMatrix = Matrix,
asOneFunction = OneFunction,
asThreeFunctions = ThreeFunctions,
- asScript = self.with_directory(Script),
+ asScript = self.__with_directory(Script),
asDict = Parameters,
appliedInX = None,
- avoidRC = AvoidRC,
+ extraArguments = ExtraArguments,
+ performancePrf = PerformanceProfile,
+ inputAsMF = InputFunctionAsMulti,
scheduledBy = Scheduler,
toBeChecked = Checked,
)
name = Concept,
asAlgorithm = Algorithm,
asDict = Parameters,
- asScript = self.with_directory(Script),
+ asScript = self.__with_directory(Script),
)
return 0
Parameters = None,
Script = None):
"Mise a jour d'un concept de calcul"
- if "AlgorithmParameters" not in self.__adaoObject:
- raise ValueError("No algorithm registred, ask for one before updating parameters")
- self.__adaoObject["AlgorithmParameters"].updateParameters(
+ Concept = "AlgorithmParameters"
+ if Concept not in self.__adaoObject or self.__adaoObject[Concept] is None:
+ raise ValueError("\n\nNo algorithm registred, set one before updating parameters or executing\n")
+ self.__adaoObject[Concept].updateParameters(
asDict = Parameters,
- asScript = self.with_directory(Script),
+ asScript = self.__with_directory(Script),
+ )
+ # RaJ du register
+ return 0
+
+ def setRegulationParameters(self,
+ Algorithm = None,
+ Parameters = None,
+ Script = None):
+ "Definition d'un concept de calcul"
+ Concept = "RegulationParameters"
+ self.__case.register("set"+Concept, dir(), locals())
+ self.__adaoObject[Concept] = RegulationAndParameters(
+ name = Concept,
+ asAlgorithm = Algorithm,
+ asDict = Parameters,
+ asScript = self.__with_directory(Script),
+ )
+ return 0
+
+ def setSupplementaryParameters(self,
+ Parameters = None,
+ Script = None):
+ "Definition d'un concept de calcul"
+ Concept = "SupplementaryParameters"
+ self.__case.register("set"+Concept, dir(), locals())
+ self.__adaoObject[Concept] = ExternalParameters(
+ name = Concept,
+ asDict = Parameters,
+ asScript = self.__with_directory(Script),
+ )
+ return 0
+
+ def updateSupplementaryParameters(self,
+ Parameters = None,
+ Script = None):
+ "Mise a jour d'un concept de calcul"
+ Concept = "SupplementaryParameters"
+ if Concept not in self.__adaoObject or self.__adaoObject[Concept] is None:
+ self.__adaoObject[Concept] = ExternalParameters(name = Concept)
+ self.__adaoObject[Concept].updateParameters(
+ asDict = Parameters,
+ asScript = self.__with_directory(Script),
)
return 0
onVariable = Variable,
asTemplate = Template,
asString = String,
- asScript = self.with_directory(Script),
+ asScript = self.__with_directory(Script),
asObsObject = ObjectFunction,
withInfo = Info,
scheduledBy = Scheduler,
else:
return self.__adaoObject["AlgorithmParameters"].removeObserver( ename, ObjectFunction )
+ def setUserPostAnalysis(self,
+ Template = None,
+ String = None,
+ Script = None):
+ "Definition d'un concept de calcul"
+ Concept = "UserPostAnalysis"
+ self.__case.register("set"+Concept, dir(), locals())
+ self.__adaoObject[Concept].append( repr(UserScript(
+ name = Concept,
+ asTemplate = Template,
+ asString = String,
+ asScript = self.__with_directory(Script),
+ )))
+ return 0
+
# -----------------------------------------------------------
def get(self, Concept=None, noDetails=True ):
elif Concept == "AlgorithmRequiredParameters" and self.__adaoObject["AlgorithmParameters"] is not None:
return self.__adaoObject["AlgorithmParameters"].getAlgorithmRequiredParameters(noDetails)
#
+ elif Concept == "AlgorithmRequiredInputs" and self.__adaoObject["AlgorithmParameters"] is not None:
+ return self.__adaoObject["AlgorithmParameters"].getAlgorithmInputArguments()
+ #
+ elif Concept == "AlgorithmAttributes" and self.__adaoObject["AlgorithmParameters"] is not None:
+ return self.__adaoObject["AlgorithmParameters"].getAlgorithmAttributes()
+ #
+ elif self.__adaoObject["SupplementaryParameters"] is not None and Concept == "SupplementaryParameters":
+ return self.__adaoObject["SupplementaryParameters"].get()
+ #
+ elif self.__adaoObject["SupplementaryParameters"] is not None and Concept in self.__adaoObject["SupplementaryParameters"]:
+ return self.__adaoObject["SupplementaryParameters"].get( Concept )
+ #
else:
raise ValueError("The requested key \"%s\" does not exists as an input or a stored variable."%Concept)
else:
allvariables = {}
allvariables.update( {"AlgorithmParameters":self.__adaoObject["AlgorithmParameters"].get()} )
+ if self.__adaoObject["SupplementaryParameters"] is not None:
+ allvariables.update( {"SupplementaryParameters":self.__adaoObject["SupplementaryParameters"].get()} )
# allvariables.update( self.__adaoObject["AlgorithmParameters"].get() )
allvariables.update( self.__StoredInputs )
allvariables.pop('Observer', None)
+ allvariables.pop('UserPostAnalysis', None)
return allvariables
# -----------------------------------------------------------
variables = []
if len(list(self.__adaoObject["AlgorithmParameters"].keys())) > 0:
variables.extend(list(self.__adaoObject["AlgorithmParameters"].keys()))
+ if self.__adaoObject["SupplementaryParameters"] is not None and \
+ len(list(self.__adaoObject["SupplementaryParameters"].keys())) > 0:
+ variables.extend(list(self.__adaoObject["SupplementaryParameters"].keys()))
if len(list(self.__StoredInputs.keys())) > 0:
variables.extend( list(self.__StoredInputs.keys()) )
variables.remove('Observer')
+ variables.remove('UserPostAnalysis')
variables.sort()
return variables
if os.path.isdir(trypath):
for fname in os.listdir(trypath):
if os.path.isfile(os.path.join(trypath,fname)):
- fc = open(os.path.join(trypath,fname)).read()
- iselal = bool("class ElementaryAlgorithm" in fc)
root, ext = os.path.splitext(fname)
- if iselal and ext == '.py' and root != '__init__':
- files.append(root)
+ if ext != ".py": continue
+ with open(os.path.join(trypath,fname)) as fc:
+ iselal = bool("class ElementaryAlgorithm" in fc.read())
+ if iselal and ext == '.py' and root != '__init__':
+ files.append(root)
files.sort()
return files
# -----------------------------------------------------------
- def execute(self, Executor=None, SaveCaseInFile=None):
+ def execute(self, Executor=None, SaveCaseInFile=None, nextStep=False):
"Lancement du calcul"
self.__case.register("execute",dir(),locals(),None,True)
- Operator.CM.clearCache()
+ self.updateAlgorithmParameters(Parameters={"nextStep":bool(nextStep)})
+ if not nextStep: Operator.CM.clearCache()
try:
if Executor == "YACS": self.__executeYACSScheme( SaveCaseInFile )
else: self.__executePythonScheme( SaveCaseInFile )
if FileName is not None:
self.dump( FileName, "TUI")
self.__adaoObject["AlgorithmParameters"].executePythonScheme( self.__adaoObject )
+ if "UserPostAnalysis" in self.__adaoObject and len(self.__adaoObject["UserPostAnalysis"])>0:
+ self.__objname = self.__retrieve_objname()
+ for __UpaOne in self.__adaoObject["UserPostAnalysis"]:
+ __UpaOne = eval(str(__UpaOne))
+ exec(__UpaOne, {}, {'self':self, 'ADD':self, 'case':self, 'adaopy':self, self.__objname:self})
return 0
def __executeYACSScheme(self, FileName=None):
__commands = self.__case.load(FileName, Content, Object, Formater)
from numpy import array, matrix
for __command in __commands:
- if __command.find("set")>-1 and __command.find("set_")<0:
- # logging.debug('Command loaded: %s'%(__command,))
- exec("self."+__command)
+ if (__command.find("set")>-1 and __command.find("set_")<0) or 'UserPostAnalysis' in __command:
+ exec("self."+__command, {}, locals())
else:
- # logging.debug('Command not loaded: %s'%(__command,))
self.__PostAnalysis.append(__command)
return self
+ def convert(self,
+ FileNameFrom=None, ContentFrom=None, ObjectFrom=None, FormaterFrom="TUI",
+ FileNameTo=None, FormaterTo="TUI",
+ ):
+ "Conversion normalisée des commandes"
+ return self.load(
+ FileName=FileNameFrom, Content=ContentFrom, Object=ObjectFrom, Formater=FormaterFrom
+ ).dump(
+ FileName=FileNameTo, Formater=FormaterTo
+ )
+
def clear(self):
"Effacement du contenu du cas en cours"
self.__init__(self.__name)
- def with_directory(self, __filename=None):
+ # -----------------------------------------------------------
+
+ def __with_directory(self, __filename=None):
if os.path.exists(str(__filename)):
__fullpath = __filename
elif os.path.exists(os.path.join(str(self.__directory), str(__filename))):
__fullpath = __filename
return __fullpath
- # -----------------------------------------------------------
+ def __retrieve_objname(self):
+ "Ne pas utiliser dans le __init__, la variable appelante n'existe pas encore"
+ __names = []
+ for level in reversed(inspect.stack()):
+ __names += [name for name, value in level.frame.f_locals.items() if value is self]
+ __names += [name for name, value in globals().items() if value is self]
+ while 'self' in __names: __names.remove('self') # Devrait toujours être trouvé, donc pas d'erreur
+ if len(__names) > 0:
+ self.__objname = __names[0]
+ else:
+ self.__objname = "ADD"
+ return self.__objname
def __dir__(self):
"Clarifie la visibilité des méthodes"
- return ['set', 'get', 'execute', '__doc__', '__init__', '__module__']
+ return ['set', 'get', 'execute', 'dump', 'load', '__doc__', '__init__', '__module__']
+
+ def __str__(self):
+ "Représentation pour impression (mais pas repr)"
+ msg = self.dump(None, "SimpleReportInPlainTxt")
+ return msg
def prepare_to_pickle(self):
"Retire les variables non pickelisables, avec recopie efficace"
for k in self.__adaoObject['AlgorithmParameters'].keys():
if k == "Algorithm": continue
if k in self.__StoredInputs:
- raise ValueError("the key \"%s\s to be transfered for pickling will overwrite an existing one.")
+ raise ValueError("The key \"%s\" to be transfered for pickling will overwrite an existing one."%(k,))
if self.__adaoObject['AlgorithmParameters'].hasObserver( k ):
self.__adaoObject['AlgorithmParameters'].removeObserver( k, "", True )
self.__StoredInputs[k] = self.__adaoObject['AlgorithmParameters'].pop(k, None)
if sys.version_info[0] == 2:
del self.__adaoObject # Because it breaks pickle in Python 2. Not required for Python 3
del self.__case # Because it breaks pickle in Python 2. Not required for Python 3
- return 0
+ if sys.version_info.major < 3:
+ return 0
+ else:
+ return self.__StoredInputs
# ==============================================================================
if __name__ == "__main__":
- print('\n AUTODIAGNOSTIC \n')
+ print('\n AUTODIAGNOSTIC\n')