"""
__author__ = "Jean-Philippe ARGAUD"
#
-import os, numpy, time
-#
# ==============================================================================
#
+import os, numpy, time
+#
def DirectOperator( XX ):
- """ Direct non-linear simulation operator """
- #
- # ------------------------------------------> EXAMPLE TO BE REMOVED
- if isinstance(XX, type(numpy.matrix([]))): # EXAMPLE TO BE REMOVED
- HX = XX.A1.tolist() # EXAMPLE TO BE REMOVED
- elif isinstance(XX, type(numpy.array([]))): # EXAMPLE TO BE REMOVED
- HX = numpy.matrix(XX).A1.tolist() # EXAMPLE TO BE REMOVED
- else: # EXAMPLE TO BE REMOVED
- HX = numpy.ravel(XX) # EXAMPLE TO BE REMOVED
- # ------------------------------------------> EXAMPLE TO BE REMOVED
- #
- return numpy.array( HX )
-
-# ==============================================================================
-from adao.daCore.NumericObjects import FDApproximation
-FDA = FDApproximation( DirectOperator )
-TangentOperator = FDA.TangentOperator
-AdjointOperator = FDA.AdjointOperator
-
+ # Opérateur identité
+ return numpy.ravel(XX)
+#
+def TangentOperator(paire ):
+ # Opérateur identité
+ (XX, dX) = paire
+ return numpy.ravel(dX)
+#
+def AdjointOperator( paire ):
+ # Opérateur identité
+ (XX, YY) = paire
+ return numpy.ravel(YY)
+#
# ==============================================================================
if __name__ == "__main__":
- print("")
+ print()
print("AUTODIAGNOSTIC")
print("==============")
FX = DirectOperator( X0 )
print("FX =", FX)
- print("")
+ print()
))
def run(self, Xb=None, Y=None, U=None, HO=None, EM=None, CM=None, R=None, B=None, Q=None, Parameters=None):
- self._pre_run(Parameters, Xb, Y, R, B, Q)
+ self._pre_run(Parameters, Xb, Y, U, HO, EM, CM, R, B, Q)
#
# Correction pour pallier a un bug de TNC sur le retour du Minimum
if "Minimizer" in self._parameters and self._parameters["Minimizer"] == "TNC":
))
def run(self, Xb=None, Y=None, U=None, HO=None, EM=None, CM=None, R=None, B=None, Q=None, Parameters=None):
- self._pre_run(Parameters, Xb, Y, R, B, Q)
+ self._pre_run(Parameters, Xb, Y, U, HO, EM, CM, R, B, Q)
#
# Correction pour pallier a un bug de TNC sur le retour du Minimum
if "Minimizer" in self._parameters and self._parameters["Minimizer"] == "TNC":
))
def run(self, Xb=None, Y=None, U=None, HO=None, EM=None, CM=None, R=None, B=None, Q=None, Parameters=None):
- self._pre_run(Parameters, Xb, Y, R, B, Q)
+ self._pre_run(Parameters, Xb, Y, U, HO, EM, CM, R, B, Q)
#
Hm = HO["Direct"].appliedTo
Ht = HO["Tangent"].appliedInXTo
))
def run(self, Xb=None, Y=None, U=None, HO=None, EM=None, CM=None, R=None, B=None, Q=None, Parameters=None):
- self._pre_run(Parameters, Xb, Y, R, B, Q)
+ self._pre_run(Parameters, Xb, Y, U, HO, EM, CM, R, B, Q)
#
Hm = HO["Tangent"].asMatrix(Xb)
Hm = Hm.reshape(Y.size,Xb.size) # ADAO & check shape
))
def run(self, Xb=None, Y=None, U=None, HO=None, EM=None, CM=None, R=None, B=None, Q=None, Parameters=None):
- self._pre_run(Parameters, Xb, Y, R, B, Q)
+ self._pre_run(Parameters, Xb, Y, U, HO, EM, CM, R, B, Q)
#
if not PlatformInfo.has_nlopt and not self._parameters["Minimizer"] in ["COBYLA", "POWELL", "SIMPLEX"]:
logging.warning("%s Minimization by SIMPLEX is forced because %s is unavailable (COBYLA, POWELL are also available)"%(self._name,self._parameters["Minimizer"]))
))
def run(self, Xb=None, Y=None, U=None, HO=None, EM=None, CM=None, R=None, B=None, Q=None, Parameters=None):
- self._pre_run(Parameters, Xb, Y, R, B, Q)
+ self._pre_run(Parameters, Xb, Y, U, HO, EM, CM, R, B, Q)
#
len_X = numpy.asarray(Xb).size
popsize = round(self._parameters["PopulationSize"]/len_X)
))
def run(self, Xb=None, Y=None, U=None, HO=None, EM=None, CM=None, R=None, B=None, Q=None, Parameters=None):
- self._pre_run(Parameters, Xb, Y, R, B, Q)
+ self._pre_run(Parameters, Xb, Y, U, HO, EM, CM, R, B, Q)
#
# Précalcul des inversions de B et R
# ----------------------------------
))
def run(self, Xb=None, Y=None, U=None, HO=None, EM=None, CM=None, R=None, B=None, Q=None, Parameters=None):
- self._pre_run(Parameters, Xb, Y, R, B, Q)
+ self._pre_run(Parameters, Xb, Y, U, HO, EM, CM, R, B, Q)
#
if self._parameters["EstimationOf"] == "Parameters":
self._parameters["StoreInternalVariables"] = True
))
def run(self, Xb=None, Y=None, U=None, HO=None, EM=None, CM=None, R=None, B=None, Q=None, Parameters=None):
- self._pre_run(Parameters, Xb, Y, R, B, Q)
+ self._pre_run(Parameters, Xb, Y, U, HO, EM, CM, R, B, Q)
#
Hm = HO["Tangent"].asMatrix(Xb)
Hm = Hm.reshape(Y.size,Xb.size) # ADAO & check shape
))
def run(self, Xb=None, Y=None, U=None, HO=None, EM=None, CM=None, R=None, B=None, Q=None, Parameters=None):
- self._pre_run(Parameters, Xb, Y, R, B, Q)
+ self._pre_run(Parameters, Xb, Y, U, HO, EM, CM, R, B, Q)
#
if self._parameters["EstimationOf"] == "Parameters":
self._parameters["StoreInternalVariables"] = True
))
def run(self, Xb=None, Y=None, U=None, HO=None, EM=None, CM=None, R=None, B=None, Q=None, Parameters=None):
- self._pre_run(Parameters, Xb, Y, R, B, Q)
+ self._pre_run(Parameters, Xb, Y, U, HO, EM, CM, R, B, Q)
#
Hm = HO["Direct"].appliedTo
#
))
def run(self, Xb=None, Y=None, U=None, HO=None, EM=None, CM=None, R=None, B=None, Q=None, Parameters=None):
- self._pre_run(Parameters, Xb, Y, R, B, Q)
+ self._pre_run(Parameters, Xb, Y, U, HO, EM, CM, R, B, Q)
#
Hm = HO["Direct"].appliedTo
if self._parameters["ResiduFormula"] in ["Taylor", "TaylorOnNorm"]:
))
def run(self, Xb=None, Y=None, U=None, HO=None, EM=None, CM=None, R=None, B=None, Q=None, Parameters=None):
- self._pre_run(Parameters, Xb, Y, R, B, Q)
+ self._pre_run(Parameters, Xb, Y, U, HO, EM, CM, R, B, Q)
#
_p = self._parameters["NumberOfPrintedDigits"]
numpy.set_printoptions(precision=_p)
))
def run(self, Xb=None, Y=None, U=None, HO=None, EM=None, CM=None, R=None, B=None, Q=None, Parameters=None):
- self._pre_run(Parameters, Xb, Y, R, B, Q)
+ self._pre_run(Parameters, Xb, Y, U, HO, EM, CM, R, B, Q)
#
if self._parameters["EstimationOf"] == "Parameters":
self._parameters["StoreInternalVariables"] = True
))
def run(self, Xb=None, Y=None, U=None, HO=None, EM=None, CM=None, R=None, B=None, Q=None, Parameters=None):
- self._pre_run(Parameters, Xb, Y, R, B, Q)
+ self._pre_run(Parameters, Xb, Y, U, HO, EM, CM, R, B, Q)
#
- Hm = HO["Tangent"].asMatrix(None)
+ Hm = HO["Tangent"].asMatrix(Xb)
Hm = Hm.reshape(Y.size,-1) # ADAO & check shape
- Ha = HO["Adjoint"].asMatrix(None)
+ Ha = HO["Adjoint"].asMatrix(Xb)
Ha = Ha.reshape(-1,Y.size) # ADAO & check shape
#
RI = R.getI()
))
def run(self, Xb=None, Y=None, U=None, HO=None, EM=None, CM=None, R=None, B=None, Q=None, Parameters=None):
- self._pre_run(Parameters, Xb, Y, R, B, Q)
+ self._pre_run(Parameters, Xb, Y, U, HO, EM, CM, R, B, Q)
#
def RMS(V1, V2):
import math
))
def run(self, Xb=None, Y=None, U=None, HO=None, EM=None, CM=None, R=None, B=None, Q=None, Parameters=None):
- self._pre_run(Parameters, Xb, Y, R, B, Q)
+ self._pre_run(Parameters, Xb, Y, U, HO, EM, CM, R, B, Q)
#
if self._parameters["SetDebug"]:
CUR_LEVEL = logging.getLogger().getEffectiveLevel()
))
def run(self, Xb=None, Y=None, U=None, HO=None, EM=None, CM=None, R=None, B=None, Q=None, Parameters=None):
- self._pre_run(Parameters, Xb, Y, R, B, Q)
+ self._pre_run(Parameters, Xb, Y, U, HO, EM, CM, R, B, Q)
#
# Correction pour pallier a un bug de TNC sur le retour du Minimum
if "Minimizer" in self._parameters and self._parameters["Minimizer"] == "TNC":
))
def run(self, Xb=None, Y=None, U=None, HO=None, EM=None, CM=None, R=None, B=None, Q=None, Parameters=None):
- self._pre_run(Parameters, Xb, Y, R, B, Q)
+ self._pre_run(Parameters, Xb, Y, U, HO, EM, CM, R, B, Q)
print("Results of observer check on all potential variables or commands,")
print(" only activated on selected ones by explicit association.")
print("")
))
def run(self, Xb=None, Y=None, U=None, HO=None, EM=None, CM=None, R=None, B=None, Q=None, Parameters=None):
- self._pre_run(Parameters, Xb, Y, R, B, Q)
+ self._pre_run(Parameters, Xb, Y, U, HO, EM, CM, R, B, Q)
#
Hm = HO["Direct"].appliedTo
#
))
def run(self, Xb=None, Y=None, U=None, HO=None, EM=None, CM=None, R=None, B=None, Q=None, Parameters=None):
- self._pre_run(Parameters, Xb, Y, R, B, Q)
+ self._pre_run(Parameters, Xb, Y, U, HO, EM, CM, R, B, Q)
#
if ("BoxBounds" in self._parameters) and isinstance(self._parameters["BoxBounds"], (list, tuple)) and (len(self._parameters["BoxBounds"]) > 0):
BoxBounds = self._parameters["BoxBounds"]
- logging.debug("%s Prise en compte des bornes d'incréments de paramètres effectuee"%(self._name,))
+ logging.debug("%s Prise en compte des bornes d'incréments de paramètres effectuée"%(self._name,))
else:
- raise ValueError("Particle Swarm Optimization requires bounds on all variables to be given.")
+ raise ValueError("Particle Swarm Optimization requires bounds on all variables increments to be truly given (BoxBounds).")
BoxBounds = numpy.array(BoxBounds)
if numpy.isnan(BoxBounds).any():
- raise ValueError("Particle Swarm Optimization requires bounds on all variables increments to be truly given, \"None\" is not allowed. The actual increments bounds are:\n%s"%BoxBounds)
+ raise ValueError("Particle Swarm Optimization requires bounds on all variables increments to be truly given (BoxBounds), \"None\" is not allowed. The actual increments bounds are:\n%s"%BoxBounds)
#
Phig = float( self._parameters["GroupRecallRate"] )
Phip = 1. - Phig
))
def run(self, Xb=None, Y=None, U=None, HO=None, EM=None, CM=None, R=None, B=None, Q=None, Parameters=None):
- self._pre_run(Parameters, Xb, Y, R, B, Q)
+ self._pre_run(Parameters, Xb, Y, U, HO, EM, CM, R, B, Q)
#
Hm = HO["Direct"].appliedTo
#
))
def run(self, Xb=None, Y=None, U=None, HO=None, EM=None, CM=None, R=None, B=None, Q=None, Parameters=None):
- self._pre_run(Parameters, Xb, Y, R, B, Q)
+ self._pre_run(Parameters, Xb, Y, U, HO, EM, CM, R, B, Q)
#
Hm = HO["Direct"].appliedTo
#
))
def run(self, Xb=None, Y=None, U=None, HO=None, EM=None, CM=None, R=None, B=None, Q=None, Parameters=None):
- self._pre_run(Parameters, Xb, Y, R, B, Q)
+ self._pre_run(Parameters, Xb, Y, U, HO, EM, CM, R, B, Q)
#
if self._parameters["NoiseDistribution"] == "Uniform":
nrange = numpy.ravel(self._parameters["NoiseHalfRange"]) # Vecteur
))
def run(self, Xb=None, Y=None, U=None, HO=None, EM=None, CM=None, R=None, B=None, Q=None, Parameters=None):
- self._pre_run(Parameters, Xb, Y, R, B, Q)
+ self._pre_run(Parameters, Xb, Y, U, HO, EM, CM, R, B, Q)
#
Hm = HO["Direct"].appliedTo
Ht = HO["Tangent"].appliedInXTo
))
def run(self, Xb=None, Y=None, U=None, HO=None, EM=None, CM=None, R=None, B=None, Q=None, Parameters=None):
- self._pre_run(Parameters, Xb, Y, R, B, Q)
+ self._pre_run(Parameters, Xb, Y, U, HO, EM, CM, R, B, Q)
#
if self._parameters["EstimationOf"] == "Parameters":
self._parameters["StoreInternalVariables"] = True
self.__canonical_parameter_name["algorithm"] = "Algorithm"
self.__canonical_parameter_name["storesupplementarycalculations"] = "StoreSupplementaryCalculations"
- def _pre_run(self, Parameters, Xb=None, Y=None, R=None, B=None, Q=None ):
+ def _pre_run(self, Parameters, Xb=None, Y=None, U=None, HO=None, EM=None, CM=None, R=None, B=None, Q=None ):
"Pré-calcul"
logging.debug("%s Lancement", self._name)
logging.debug("%s Taille mémoire utilisée de %.0f Mio"%(self._name, self._m.getUsedMemory("Mio")))
for k, v in self.__variable_names_not_public.items():
if k not in self._parameters: self.__setParameters( {k:v} )
#
- # Corrections et compléments
- def __test_vvalue(argument, variable, argname):
+ # Corrections et compléments des vecteurs
+ def __test_vvalue(argument, variable, argname, symbol=None):
+ if symbol is None: symbol = variable
if argument is None:
if variable in self.__required_inputs["RequiredInputValues"]["mandatory"]:
- raise ValueError("%s %s vector %s has to be properly defined!"%(self._name,argname,variable))
+ raise ValueError("%s %s vector %s is not set and has to be properly defined!"%(self._name,argname,symbol))
elif variable in self.__required_inputs["RequiredInputValues"]["optional"]:
- logging.debug("%s %s vector %s is not set, but is optional."%(self._name,argname,variable))
+ logging.debug("%s %s vector %s is not set, but is optional."%(self._name,argname,symbol))
else:
- logging.debug("%s %s vector %s is not set, but is not required."%(self._name,argname,variable))
+ logging.debug("%s %s vector %s is not set, but is not required."%(self._name,argname,symbol))
else:
- logging.debug("%s %s vector %s is set, and its size is %i."%(self._name,argname,variable,numpy.array(argument).size))
+ logging.debug("%s %s vector %s is set, and its size is %i."%(self._name,argname,symbol,numpy.array(argument).size))
return 0
__test_vvalue( Xb, "Xb", "Background or initial state" )
__test_vvalue( Y, "Y", "Observation" )
+ __test_vvalue( U, "U", "Control" )
#
- def __test_cvalue(argument, variable, argname):
+ # Corrections et compléments des covariances
+ def __test_cvalue(argument, variable, argname, symbol=None):
+ if symbol is None: symbol = variable
if argument is None:
if variable in self.__required_inputs["RequiredInputValues"]["mandatory"]:
- raise ValueError("%s %s error covariance matrix %s has to be properly defined!"%(self._name,argname,variable))
+ raise ValueError("%s %s error covariance matrix %s is not set and has to be properly defined!"%(self._name,argname,symbol))
elif variable in self.__required_inputs["RequiredInputValues"]["optional"]:
- logging.debug("%s %s error covariance matrix %s is not set, but is optional."%(self._name,argname,variable))
+ logging.debug("%s %s error covariance matrix %s is not set, but is optional."%(self._name,argname,symbol))
else:
- logging.debug("%s %s error covariance matrix %s is not set, but is not required."%(self._name,argname,variable))
+ logging.debug("%s %s error covariance matrix %s is not set, but is not required."%(self._name,argname,symbol))
else:
- logging.debug("%s %s error covariance matrix %s is set."%(self._name,argname,variable))
+ logging.debug("%s %s error covariance matrix %s is set."%(self._name,argname,symbol))
return 0
- __test_cvalue( R, "R", "Observation" )
__test_cvalue( B, "B", "Background" )
+ __test_cvalue( R, "R", "Observation" )
__test_cvalue( Q, "Q", "Evolution" )
#
+ # Corrections et compléments des opérateurs
+ def __test_ovalue(argument, variable, argname, symbol=None):
+ if symbol is None: symbol = variable
+ if argument is None or (isinstance(argument,dict) and len(argument)==0):
+ if variable in self.__required_inputs["RequiredInputValues"]["mandatory"]:
+ raise ValueError("%s %s operator %s is not set and has to be properly defined!"%(self._name,argname,symbol))
+ elif variable in self.__required_inputs["RequiredInputValues"]["optional"]:
+ logging.debug("%s %s operator %s is not set, but is optional."%(self._name,argname,symbol))
+ else:
+ logging.debug("%s %s operator %s is not set, but is not required."%(self._name,argname,symbol))
+ else:
+ logging.debug("%s %s operator %s is set."%(self._name,argname,symbol))
+ return 0
+ __test_ovalue( HO, "HO", "Observation", "H" )
+ __test_ovalue( EM, "EM", "Evolution", "M" )
+ __test_ovalue( CM, "CM", "Control Model", "C" )
+ #
if ("Bounds" in self._parameters) and isinstance(self._parameters["Bounds"], (list, tuple)) and (len(self._parameters["Bounds"]) > 0):
logging.debug("%s Prise en compte des bornes effectuee"%(self._name,))
else:
#
variables = variables + step
if bounds is not None:
+ # Attention : boucle infinie à éviter si un intervalle est trop petit
while( (variables < numpy.ravel(numpy.asmatrix(bounds)[:,0])).any() or (variables > numpy.ravel(numpy.asmatrix(bounds)[:,1])).any() ):
step = step/2.
variables = variables - step