+# ==============================================================================
+class Observer2Func(object):
+ """
+ Creation d'une fonction d'observateur a partir de son texte
+ """
+ def __init__(self, corps=""):
+ self.__corps = corps
+ def func(self,var,info):
+ "Fonction d'observation"
+ exec(self.__corps)
+ def getfunc(self):
+ "Restitution du pointeur de fonction dans l'objet"
+ return self.func
+
+# ==============================================================================
+class CaseLogger(object):
+ """
+ Conservation des commandes de creation d'un cas
+ """
+ def __init__(self, __name="", __objname="case", __addViewers=None, __addLoaders=None):
+ self.__name = str(__name)
+ self.__objname = str(__objname)
+ self.__logSerie = []
+ self.__switchoff = False
+ self.__viewers = {
+ "TUI" :Interfaces._TUIViewer,
+ "SCD" :Interfaces._SCDViewer,
+ "YACS":Interfaces._YACSViewer,
+ }
+ self.__loaders = {
+ "TUI" :Interfaces._TUIViewer,
+ "COM" :Interfaces._COMViewer,
+ }
+ if __addViewers is not None:
+ self.__viewers.update(dict(__addViewers))
+ if __addLoaders is not None:
+ self.__loaders.update(dict(__addLoaders))
+
+ def register(self, __command=None, __keys=None, __local=None, __pre=None, __switchoff=False):
+ "Enregistrement d'une commande individuelle"
+ if __command is not None and __keys is not None and __local is not None and not self.__switchoff:
+ if "self" in __keys: __keys.remove("self")
+ self.__logSerie.append( (str(__command), __keys, __local, __pre, __switchoff) )
+ if __switchoff:
+ self.__switchoff = True
+ if not __switchoff:
+ self.__switchoff = False
+
+ def dump(self, __filename=None, __format="TUI", __upa=""):
+ "Restitution normalisée des commandes"
+ if __format in self.__viewers:
+ __formater = self.__viewers[__format](self.__name, self.__objname, self.__logSerie)
+ else:
+ raise ValueError("Dumping as \"%s\" is not available"%__format)
+ return __formater.dump(__filename, __upa)
+
+ def load(self, __filename=None, __content=None, __object=None, __format="TUI"):
+ "Chargement normalisé des commandes"
+ if __format in self.__loaders:
+ __formater = self.__loaders[__format]()
+ else:
+ raise ValueError("Loading as \"%s\" is not available"%__format)
+ return __formater.load(__filename, __content, __object)
+
+# ==============================================================================
+def MultiFonction(
+ __xserie,
+ _extraArguments = None,
+ _sFunction = lambda x: x,
+ _mpEnabled = False,
+ _mpWorkers = None,
+ ):
+ """
+ Pour une liste ordonnée de vecteurs en entrée, renvoie en sortie la liste
+ correspondante de valeurs de la fonction en argument
+ """
+ # Vérifications et définitions initiales
+ # logging.debug("MULTF Internal multifonction calculations begin with function %s"%(_sFunction.__name__,))
+ if not PlatformInfo.isIterable( __xserie ):
+ raise TypeError("MultiFonction not iterable unkown input type: %s"%(type(__xserie),))
+ if _mpEnabled:
+ if (_mpWorkers is None) or (_mpWorkers is not None and _mpWorkers < 1):
+ __mpWorkers = None
+ else:
+ __mpWorkers = int(_mpWorkers)
+ try:
+ import multiprocessing
+ __mpEnabled = True
+ except ImportError:
+ __mpEnabled = False
+ else:
+ __mpEnabled = False
+ __mpWorkers = None
+ #
+ # Calculs effectifs
+ if __mpEnabled:
+ _jobs = []
+ if _extraArguments is None:
+ _jobs = __xserie
+ elif _extraArguments is not None and isinstance(_extraArguments, (list, tuple, map)):
+ for __xvalue in __xserie:
+ _jobs.append( [__xvalue, ] + list(_extraArguments) )
+ else:
+ raise TypeError("MultiFonction extra arguments unkown input type: %s"%(type(_extraArguments),))
+ # logging.debug("MULTF Internal multiprocessing calculations begin : evaluation of %i point(s)"%(len(_jobs),))
+ import multiprocessing
+ with multiprocessing.Pool(__mpWorkers) as pool:
+ __multiHX = pool.map( _sFunction, _jobs )
+ pool.close()
+ pool.join()
+ # logging.debug("MULTF Internal multiprocessing calculation end")
+ else:
+ # logging.debug("MULTF Internal monoprocessing calculation begin")
+ __multiHX = []
+ if _extraArguments is None:
+ for __xvalue in __xserie:
+ __multiHX.append( _sFunction( __xvalue ) )
+ elif _extraArguments is not None and isinstance(_extraArguments, (list, tuple, map)):
+ for __xvalue in __xserie:
+ __multiHX.append( _sFunction( __xvalue, *_extraArguments ) )
+ elif _extraArguments is not None and isinstance(_extraArguments, dict):
+ for __xvalue in __xserie:
+ __multiHX.append( _sFunction( __xvalue, **_extraArguments ) )
+ else:
+ raise TypeError("MultiFonction extra arguments unkown input type: %s"%(type(_extraArguments),))
+ # logging.debug("MULTF Internal monoprocessing calculation end")
+ #
+ # logging.debug("MULTF Internal multifonction calculations end")
+ return __multiHX
+
+# ==============================================================================
+def CostFunction3D(_x,
+ _Hm = None, # Pour simuler Hm(x) : HO["Direct"].appliedTo
+ _HmX = None, # Simulation déjà faite de Hm(x)
+ _arg = None, # Arguments supplementaires pour Hm, sous la forme d'un tuple
+ _BI = None,
+ _RI = None,
+ _Xb = None,
+ _Y = None,
+ _SIV = False, # A résorber pour la 8.0
+ _SSC = [], # self._parameters["StoreSupplementaryCalculations"]
+ _nPS = 0, # nbPreviousSteps
+ _QM = "DA", # QualityMeasure
+ _SSV = {}, # Entrée et/ou sortie : self.StoredVariables
+ _fRt = False, # Restitue ou pas la sortie étendue
+ _sSc = True, # Stocke ou pas les SSC
+ ):
+ """
+ Fonction-coût générale utile pour les algorithmes statiques/3D : 3DVAR, BLUE
+ et dérivés, Kalman et dérivés, LeastSquares, SamplingTest, PSO, SA, Tabu,
+ DFO, QuantileRegression
+ """
+ if not _sSc:
+ _SIV = False
+ _SSC = {}
+ else:
+ for k in ["CostFunctionJ",
+ "CostFunctionJb",
+ "CostFunctionJo",
+ "CurrentOptimum",
+ "CurrentState",
+ "IndexOfOptimum",
+ "SimulatedObservationAtCurrentOptimum",
+ "SimulatedObservationAtCurrentState",
+ ]:
+ if k not in _SSV:
+ _SSV[k] = []
+ if hasattr(_SSV[k],"store"):
+ _SSV[k].append = _SSV[k].store # Pour utiliser "append" au lieu de "store"
+ #
+ _X = numpy.asmatrix(numpy.ravel( _x )).T
+ if _SIV or "CurrentState" in _SSC or "CurrentOptimum" in _SSC:
+ _SSV["CurrentState"].append( _X )
+ #
+ if _HmX is not None:
+ _HX = _HmX
+ else:
+ if _Hm is None:
+ raise ValueError("COSTFUNCTION3D Operator has to be defined.")
+ if _arg is None:
+ _HX = _Hm( _X )
+ else:
+ _HX = _Hm( _X, *_arg )
+ _HX = numpy.asmatrix(numpy.ravel( _HX )).T
+ #
+ if "SimulatedObservationAtCurrentState" in _SSC or \
+ "SimulatedObservationAtCurrentOptimum" in _SSC:
+ _SSV["SimulatedObservationAtCurrentState"].append( _HX )
+ #
+ if numpy.any(numpy.isnan(_HX)):
+ Jb, Jo, J = numpy.nan, numpy.nan, numpy.nan
+ else:
+ _Y = numpy.asmatrix(numpy.ravel( _Y )).T
+ if _QM in ["AugmentedWeightedLeastSquares", "AWLS", "AugmentedPonderatedLeastSquares", "APLS", "DA"]:
+ if _BI is None or _RI is None:
+ raise ValueError("Background and Observation error covariance matrix has to be properly defined!")
+ _Xb = numpy.asmatrix(numpy.ravel( _Xb )).T
+ Jb = 0.5 * (_X - _Xb).T * _BI * (_X - _Xb)
+ Jo = 0.5 * (_Y - _HX).T * _RI * (_Y - _HX)
+ elif _QM in ["WeightedLeastSquares", "WLS", "PonderatedLeastSquares", "PLS"]:
+ if _RI is None:
+ raise ValueError("Observation error covariance matrix has to be properly defined!")
+ Jb = 0.
+ Jo = 0.5 * (_Y - _HX).T * _RI * (_Y - _HX)
+ elif _QM in ["LeastSquares", "LS", "L2"]:
+ Jb = 0.
+ Jo = 0.5 * (_Y - _HX).T * (_Y - _HX)
+ elif _QM in ["AbsoluteValue", "L1"]:
+ Jb = 0.
+ Jo = numpy.sum( numpy.abs(_Y - _HX) )
+ elif _QM in ["MaximumError", "ME"]:
+ Jb = 0.
+ Jo = numpy.max( numpy.abs(_Y - _HX) )
+ elif _QM in ["QR", "Null"]:
+ Jb = 0.
+ Jo = 0.
+ else:
+ raise ValueError("Unknown asked quality measure!")
+ #
+ J = float( Jb ) + float( Jo )
+ #
+ if _sSc:
+ _SSV["CostFunctionJb"].append( Jb )
+ _SSV["CostFunctionJo"].append( Jo )
+ _SSV["CostFunctionJ" ].append( J )
+ #
+ if "IndexOfOptimum" in _SSC or \
+ "CurrentOptimum" in _SSC or \
+ "SimulatedObservationAtCurrentOptimum" in _SSC:
+ IndexMin = numpy.argmin( _SSV["CostFunctionJ"][_nPS:] ) + _nPS
+ if "IndexOfOptimum" in _SSC:
+ _SSV["IndexOfOptimum"].append( IndexMin )
+ if "CurrentOptimum" in _SSC:
+ _SSV["CurrentOptimum"].append( _SSV["CurrentState"][IndexMin] )
+ if "SimulatedObservationAtCurrentOptimum" in _SSC:
+ _SSV["SimulatedObservationAtCurrentOptimum"].append( _SSV["SimulatedObservationAtCurrentState"][IndexMin] )
+ #
+ if _fRt:
+ return _SSV
+ else:
+ if _QM in ["QR"]: # Pour le QuantileRegression
+ return _HX
+ else:
+ return J
+