+++ /dev/null
-# -*- coding: utf-8 -*-
-#
-# Copyright (C) 2008-2019 EDF R&D
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2.1 of the License.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-#
-# See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
-#
-# Author: Jean-Philippe Argaud, jean-philippe.argaud@edf.fr, EDF R&D
-
-__doc__ = """
- Implémentation informatique de l'algorithme MMQR, basée sur la publication :
- David R. Hunter, Kenneth Lange, "Quantile Regression via an MM Algorithm",
- Journal of Computational and Graphical Statistics, 9, 1, pp.60-77, 2000.
-"""
-__author__ = "Jean-Philippe ARGAUD"
-
-import sys, math
-from numpy import array, matrix, asarray, asmatrix
-from numpy import sum, dot, linalg, ravel, max, min, hstack, argmin, argmax
-
-# ==============================================================================
-def mmqr(
- func = None,
- x0 = None,
- fprime = None,
- bounds = None,
- quantile = 0.5,
- maxfun = 15000,
- toler = 1.e-06,
- y = None,
- ):
- #
- # Recuperation des donnees et informations initiales
- # --------------------------------------------------
- variables = asmatrix(ravel( x0 ))
- mesures = asmatrix(ravel( y )).T
- increment = sys.float_info[0]
- p = len(variables.flat)
- n = len(mesures.flat)
- quantile = float(quantile)
- #
- # Calcul des parametres du MM
- # ---------------------------
- tn = float(toler) / n
- e0 = -tn / math.log(tn)
- epsilon = (e0-tn)/(1+math.log(e0))
- #
- # Calculs d'initialisation
- # ------------------------
- residus = ravel( mesures - func( variables ) )
- poids = asarray( 1./(epsilon+abs(residus)) )
- veps = 1. - 2. * quantile - residus * poids
- lastsurrogate = -sum(residus*veps) - (1.-2.*quantile)*sum(residus)
- iteration = 0
- #
- # Recherche iterative
- # -------------------
- while (increment > toler) and (iteration < maxfun) :
- iteration += 1
- #
- Derivees = array(fprime(variables))
- Derivees = Derivees.reshape(n,p) # Necessaire pour remettre en place la matrice si elle passe par des tuyaux YACS
- DeriveesT = array(matrix(Derivees).T)
- M = dot( DeriveesT , (array(matrix(p*[poids,]).T)*Derivees) )
- SM = dot( DeriveesT , veps ).T
- step = - linalg.lstsq( M, SM, rcond=-1 )[0]
- #
- variables = variables + step
- if bounds is not None:
- while( (variables < ravel(asmatrix(bounds)[:,0])).any() or (variables > ravel(asmatrix(bounds)[:,1])).any() ):
- step = step/2.
- variables = variables - step
- residus = ravel( mesures - func(variables) )
- surrogate = sum(residus**2 * poids) + (4.*quantile-2.) * sum(residus)
- #
- while ( (surrogate > lastsurrogate) and ( max(list(abs(step))) > 1.e-16 ) ) :
- step = step/2.
- variables = variables - step
- residus = ravel( mesures-func(variables) )
- surrogate = sum(residus**2 * poids) + (4.*quantile-2.) * sum(residus)
- #
- increment = lastsurrogate-surrogate
- poids = 1./(epsilon+abs(residus))
- veps = 1. - 2. * quantile - residus * poids
- lastsurrogate = -sum(residus * veps) - (1.-2.*quantile)*sum(residus)
- #
- # Mesure d'écart : q*Sum(residus)-sum(residus negatifs)
- # ----------------
- Ecart = quantile * sum(residus) - sum( residus[residus<0] )
- #
- return variables, Ecart, [n,p,iteration,increment,0]
-
-# ==============================================================================
-if __name__ == "__main__":
- print('\n AUTODIAGNOSTIC\n')
import copy
import numpy
from functools import partial
-from daCore import Persistence
-from daCore import PlatformInfo
-from daCore import Interfaces
+from daCore import Persistence, PlatformInfo, Interfaces
from daCore import Templates
-from daCore.Interfaces import ImportFromScript, ImportFromFile
# ==============================================================================
class CacheManager(object):
if asScript is not None:
__Matrix, __Function = None, None
if asMatrix:
- __Matrix = ImportFromScript(asScript).getvalue( self.__name )
+ __Matrix = Interfaces.ImportFromScript(asScript).getvalue( self.__name )
elif asOneFunction:
- __Function = { "Direct":ImportFromScript(asScript).getvalue( "DirectOperator" ) }
+ __Function = { "Direct":Interfaces.ImportFromScript(asScript).getvalue( "DirectOperator" ) }
__Function.update({"useApproximatedDerivatives":True})
__Function.update(__Parameters)
elif asThreeFunctions:
__Function = {
- "Direct" :ImportFromScript(asScript).getvalue( "DirectOperator" ),
- "Tangent":ImportFromScript(asScript).getvalue( "TangentOperator" ),
- "Adjoint":ImportFromScript(asScript).getvalue( "AdjointOperator" ),
+ "Direct" :Interfaces.ImportFromScript(asScript).getvalue( "DirectOperator" ),
+ "Tangent":Interfaces.ImportFromScript(asScript).getvalue( "TangentOperator" ),
+ "Adjoint":Interfaces.ImportFromScript(asScript).getvalue( "AdjointOperator" ),
}
__Function.update(__Parameters)
else:
if "withLenghtOfRedundancy" not in __Function: __Function["withLenghtOfRedundancy"] = -1
if "NumberOfProcesses" not in __Function: __Function["NumberOfProcesses"] = None
if "withmfEnabled" not in __Function: __Function["withmfEnabled"] = inputAsMF
- from daNumerics.ApproximatedDerivatives import FDApproximation
- FDA = FDApproximation(
+ from daCore import NumericObjects
+ FDA = NumericObjects.FDApproximation(
Function = __Function["Direct"],
centeredDF = __Function["CenteredFiniteDifference"],
increment = __Function["DifferentialIncrement"],
self.__FO["Adjoint"] = Operator( fromMatrix = __matrice.T, avoidingRedundancy = avoidRC, inputAsMultiFunction = inputAsMF )
del __matrice
else:
- raise ValueError("Improperly defined operator, it requires at minima either a matrix, a Direct for approximate derivatives or a Tangent/Adjoint pair.")
+ raise ValueError("The %s object is improperly defined or undefined, it requires at minima either a matrix, a Direct operator for approximate derivatives or a Tangent/Adjoint operators pair. Please check your operator input."%self.__name)
#
if __appliedInX is not None:
self.__FO["AppliedInX"] = {}
def __repr__(self):
"x.__repr__() <==> repr(x)"
- return repr(self.__V)
+ return repr(self.__FO)
def __str__(self):
"x.__str__() <==> str(x)"
- return str(self.__V)
+ return str(self.__FO)
# ==============================================================================
class Algorithm(object):
self.__required_parameters = {}
self.__required_inputs = {"RequiredInputValues":{"mandatory":(), "optional":()}}
self.__variable_names_not_public = {"nextStep":False} # Duplication dans AlgorithmAndParameters
+ self.__canonical_parameter_name = {} # Correspondance "lower"->"correct"
+ self.__canonical_stored_name = {} # Correspondance "lower"->"correct"
#
self.StoredVariables = {}
self.StoredVariables["APosterioriCorrelations"] = Persistence.OneMatrix(name = "APosterioriCorrelations")
self.StoredVariables["SimulatedObservationAtCurrentState"] = Persistence.OneVector(name = "SimulatedObservationAtCurrentState")
self.StoredVariables["SimulatedObservationAtOptimum"] = Persistence.OneVector(name = "SimulatedObservationAtOptimum")
self.StoredVariables["SimulationQuantiles"] = Persistence.OneMatrix(name = "SimulationQuantiles")
+ #
+ for k in self.StoredVariables:
+ self.__canonical_stored_name[k.lower()] = k
def _pre_run(self, Parameters, Xb=None, Y=None, R=None, B=None, Q=None ):
"Pré-calcul"
des classes de persistance.
"""
if key is not None:
- return self.StoredVariables[key]
+ return self.StoredVariables[self.__canonical_stored_name[key.lower()]]
else:
return self.StoredVariables
def __contains__(self, key=None):
"D.__contains__(k) -> True if D has a key k, else False"
- return key in self.StoredVariables
+ if key is None or key.lower() not in self.__canonical_stored_name:
+ return False
+ else:
+ return self.__canonical_stored_name[key.lower()] in self.StoredVariables
def keys(self):
"D.keys() -> list of D's keys"
def pop(self, k, d):
"D.pop(k[,d]) -> v, remove specified key and return the corresponding value"
- if hasattr(self, "StoredVariables"):
- return self.StoredVariables.pop(k, d)
+ if hasattr(self, "StoredVariables") and k.lower() in self.__canonical_stored_name:
+ return self.StoredVariables.pop(self.__canonical_stored_name[k.lower()], d)
else:
try:
msg = "'%s'"%k
"listval" : listval,
"message" : message,
}
+ self.__canonical_parameter_name[name.lower()] = name
logging.debug("%s %s (valeur par défaut = %s)", self._name, message, self.setParameterValue(name))
def getRequiredParameters(self, noDetails=True):
"""
Renvoie la valeur d'un paramètre requis de manière contrôlée
"""
- default = self.__required_parameters[name]["default"]
- typecast = self.__required_parameters[name]["typecast"]
- minval = self.__required_parameters[name]["minval"]
- maxval = self.__required_parameters[name]["maxval"]
- listval = self.__required_parameters[name]["listval"]
+ __k = self.__canonical_parameter_name[name.lower()]
+ default = self.__required_parameters[__k]["default"]
+ typecast = self.__required_parameters[__k]["typecast"]
+ minval = self.__required_parameters[__k]["minval"]
+ maxval = self.__required_parameters[__k]["maxval"]
+ listval = self.__required_parameters[__k]["listval"]
#
if value is None and default is None:
__val = None
else: __val = typecast( default )
else:
if typecast is None: __val = value
- else: __val = typecast( value )
+ else:
+ try:
+ __val = typecast( value )
+ except:
+ raise ValueError("The value '%s' for the parameter named '%s' can not be correctly evaluated with type '%s'."%(value, __k, typecast))
#
if minval is not None and (numpy.array(__val, float) < minval).any():
- raise ValueError("The parameter named \"%s\" of value \"%s\" can not be less than %s."%(name, __val, minval))
+ raise ValueError("The parameter named '%s' of value '%s' can not be less than %s."%(__k, __val, minval))
if maxval is not None and (numpy.array(__val, float) > maxval).any():
- raise ValueError("The parameter named \"%s\" of value \"%s\" can not be greater than %s."%(name, __val, maxval))
+ raise ValueError("The parameter named '%s' of value '%s' can not be greater than %s."%(__k, __val, maxval))
if listval is not None:
if typecast is list or typecast is tuple or isinstance(__val,list) or isinstance(__val,tuple):
for v in __val:
if v not in listval:
- raise ValueError("The value \"%s\" of the parameter named \"%s\" is not allowed, it has to be in the list %s."%(v, name, listval))
+ raise ValueError("The value '%s' is not allowed for the parameter named '%s', it has to be in the list %s."%(v, __k, listval))
elif __val not in listval:
- raise ValueError("The value \"%s\" of the parameter named \"%s\" is not allowed, it has to be in the list %s."%( __val, name,listval))
+ raise ValueError("The value '%s' is not allowed for the parameter named '%s', it has to be in the list %s."%( __val, __k,listval))
return __val
def requireInputArguments(self, mandatory=(), optional=()):
self.updateParameters( asDict, asScript )
#
if asAlgorithm is None and asScript is not None:
- __Algo = ImportFromScript(asScript).getvalue( "Algorithm" )
+ __Algo = Interfaces.ImportFromScript(asScript).getvalue( "Algorithm" )
else:
__Algo = asAlgorithm
#
):
"Mise a jour des parametres"
if asDict is None and asScript is not None:
- __Dict = ImportFromScript(asScript).getvalue( self.__name, "Parameters" )
+ __Dict = Interfaces.ImportFromScript(asScript).getvalue( self.__name, "Parameters" )
else:
__Dict = asDict
#
self.__P = {}
#
if asAlgorithm is None and asScript is not None:
- __Algo = ImportFromScript(asScript).getvalue( "Algorithm" )
+ __Algo = Interfaces.ImportFromScript(asScript).getvalue( "Algorithm" )
else:
__Algo = asAlgorithm
#
if asDict is None and asScript is not None:
- __Dict = ImportFromScript(asScript).getvalue( self.__name, "Parameters" )
+ __Dict = Interfaces.ImportFromScript(asScript).getvalue( self.__name, "Parameters" )
else:
__Dict = asDict
#
elif (asTemplate is not None) and (asTemplate in Templates.ObserverTemplates):
__FunctionText = Templates.ObserverTemplates[asTemplate]
elif asScript is not None:
- __FunctionText = ImportFromScript(asScript).getstring()
+ __FunctionText = Interfaces.ImportFromScript(asScript).getstring()
else:
__FunctionText = ""
__Function = ObserverF(__FunctionText)
contenant des valeurs en colonnes, elles-mêmes nommées "colNames"
(s'il n'y a pas de nom de colonne indiquée, on cherche une colonne
nommée "name"), on récupère les colonnes et on les range ligne après
- ligne (colMajor=False) ou colonne après colonne (colMajor=True). La
- variable résultante est de type "asVector" (par défaut) ou
- "asPersistentVector" selon que l'une de ces variables est placée à
- "True".
+ ligne (colMajor=False, par défaut) ou colonne après colonne
+ (colMajor=True). La variable résultante est de type "asVector" (par
+ défaut) ou "asPersistentVector" selon que l'une de ces variables est
+ placée à "True".
"""
self.__name = str(name)
self.__check = bool(toBeChecked)
if asScript is not None:
__Vector, __Series = None, None
if asPersistentVector:
- __Series = ImportFromScript(asScript).getvalue( self.__name )
+ __Series = Interfaces.ImportFromScript(asScript).getvalue( self.__name )
else:
- __Vector = ImportFromScript(asScript).getvalue( self.__name )
+ __Vector = Interfaces.ImportFromScript(asScript).getvalue( self.__name )
elif asDataFile is not None:
__Vector, __Series = None, None
if asPersistentVector:
if colNames is not None:
- __Series = ImportFromFile(asDataFile).getvalue( colNames )[1]
+ __Series = Interfaces.ImportFromFile(asDataFile).getvalue( colNames )[1]
else:
- __Series = ImportFromFile(asDataFile).getvalue( [self.__name,] )[1]
- if bool(colMajor) and not ImportFromFile(asDataFile).getformat() == "application/numpy.npz":
+ __Series = Interfaces.ImportFromFile(asDataFile).getvalue( [self.__name,] )[1]
+ if bool(colMajor) and not Interfaces.ImportFromFile(asDataFile).getformat() == "application/numpy.npz":
__Series = numpy.transpose(__Series)
- elif not bool(colMajor) and ImportFromFile(asDataFile).getformat() == "application/numpy.npz":
+ elif not bool(colMajor) and Interfaces.ImportFromFile(asDataFile).getformat() == "application/numpy.npz":
__Series = numpy.transpose(__Series)
else:
if colNames is not None:
- __Vector = ImportFromFile(asDataFile).getvalue( colNames )[1]
+ __Vector = Interfaces.ImportFromFile(asDataFile).getvalue( colNames )[1]
else:
- __Vector = ImportFromFile(asDataFile).getvalue( [self.__name,] )[1]
+ __Vector = Interfaces.ImportFromFile(asDataFile).getvalue( [self.__name,] )[1]
if bool(colMajor):
__Vector = numpy.ravel(__Vector, order = "F")
else:
self.shape = (self.shape[0],1)
self.size = self.shape[0] * self.shape[1]
else:
- raise ValueError("The %s object is improperly defined, it requires at minima either a vector, a list/tuple of vectors or a persistent object. Please check your vector input."%self.__name)
+ raise ValueError("The %s object is improperly defined or undefined, it requires at minima either a vector, a list/tuple of vectors or a persistent object. Please check your vector input."%self.__name)
#
if scheduledBy is not None:
self.__T = scheduledBy
if asScript is not None:
__Matrix, __Scalar, __Vector, __Object = None, None, None, None
if asEyeByScalar:
- __Scalar = ImportFromScript(asScript).getvalue( self.__name )
+ __Scalar = Interfaces.ImportFromScript(asScript).getvalue( self.__name )
elif asEyeByVector:
- __Vector = ImportFromScript(asScript).getvalue( self.__name )
+ __Vector = Interfaces.ImportFromScript(asScript).getvalue( self.__name )
elif asCovObject:
- __Object = ImportFromScript(asScript).getvalue( self.__name )
+ __Object = Interfaces.ImportFromScript(asScript).getvalue( self.__name )
else:
- __Matrix = ImportFromScript(asScript).getvalue( self.__name )
+ __Matrix = Interfaces.ImportFromScript(asScript).getvalue( self.__name )
else:
__Matrix, __Scalar, __Vector, __Object = asCovariance, asEyeByScalar, asEyeByVector, asCovObject
#
--- /dev/null
+# -*- coding: utf-8 -*-
+#
+# Copyright (C) 2008-2019 EDF R&D
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+# See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
+#
+# Author: Jean-Philippe Argaud, jean-philippe.argaud@edf.fr, EDF R&D
+
+__doc__ = """
+ Définit les versions approximées des opérateurs tangents et adjoints.
+"""
+__author__ = "Jean-Philippe ARGAUD"
+
+import os, numpy, time, copy, types, sys, math, logging
+from daCore.BasicObjects import Operator
+from daCore.PlatformInfo import PlatformInfo
+mpr = PlatformInfo().MachinePrecision()
+mfp = PlatformInfo().MaximumPrecision()
+# logging.getLogger().setLevel(logging.DEBUG)
+
+# ==============================================================================
+def ExecuteFunction( paire ):
+ assert len(paire) == 2, "Incorrect number of arguments"
+ X, funcrepr = paire
+ __X = numpy.asmatrix(numpy.ravel( X )).T
+ __sys_path_tmp = sys.path ; sys.path.insert(0,funcrepr["__userFunction__path"])
+ __module = __import__(funcrepr["__userFunction__modl"], globals(), locals(), [])
+ __fonction = getattr(__module,funcrepr["__userFunction__name"])
+ sys.path = __sys_path_tmp ; del __sys_path_tmp
+ __HX = __fonction( __X )
+ return numpy.ravel( __HX )
+
+# ==============================================================================
+class FDApproximation(object):
+ """
+ Cette classe sert d'interface pour définir les opérateurs approximés. A la
+ création d'un objet, en fournissant une fonction "Function", on obtient un
+ objet qui dispose de 3 méthodes "DirectOperator", "TangentOperator" et
+ "AdjointOperator". On contrôle l'approximation DF avec l'incrément
+ multiplicatif "increment" valant par défaut 1%, ou avec l'incrément fixe
+ "dX" qui sera multiplié par "increment" (donc en %), et on effectue de DF
+ centrées si le booléen "centeredDF" est vrai.
+ """
+ def __init__(self,
+ Function = None,
+ centeredDF = False,
+ increment = 0.01,
+ dX = None,
+ avoidingRedundancy = True,
+ toleranceInRedundancy = 1.e-18,
+ lenghtOfRedundancy = -1,
+ mpEnabled = False,
+ mpWorkers = None,
+ mfEnabled = False,
+ ):
+ if mpEnabled:
+ try:
+ import multiprocessing
+ self.__mpEnabled = True
+ except ImportError:
+ self.__mpEnabled = False
+ else:
+ self.__mpEnabled = False
+ self.__mpWorkers = mpWorkers
+ if self.__mpWorkers is not None and self.__mpWorkers < 1:
+ self.__mpWorkers = None
+ logging.debug("FDA Calculs en multiprocessing : %s (nombre de processus : %s)"%(self.__mpEnabled,self.__mpWorkers))
+ #
+ if mfEnabled:
+ self.__mfEnabled = True
+ else:
+ self.__mfEnabled = False
+ logging.debug("FDA Calculs en multifonctions : %s"%(self.__mfEnabled,))
+ #
+ if avoidingRedundancy:
+ self.__avoidRC = True
+ self.__tolerBP = float(toleranceInRedundancy)
+ self.__lenghtRJ = int(lenghtOfRedundancy)
+ self.__listJPCP = [] # Jacobian Previous Calculated Points
+ self.__listJPCI = [] # Jacobian Previous Calculated Increment
+ self.__listJPCR = [] # Jacobian Previous Calculated Results
+ self.__listJPPN = [] # Jacobian Previous Calculated Point Norms
+ self.__listJPIN = [] # Jacobian Previous Calculated Increment Norms
+ else:
+ self.__avoidRC = False
+ #
+ if self.__mpEnabled:
+ if isinstance(Function,types.FunctionType):
+ logging.debug("FDA Calculs en multiprocessing : FunctionType")
+ self.__userFunction__name = Function.__name__
+ try:
+ mod = os.path.join(Function.__globals__['filepath'],Function.__globals__['filename'])
+ except:
+ mod = os.path.abspath(Function.__globals__['__file__'])
+ if not os.path.isfile(mod):
+ raise ImportError("No user defined function or method found with the name %s"%(mod,))
+ self.__userFunction__modl = os.path.basename(mod).replace('.pyc','').replace('.pyo','').replace('.py','')
+ self.__userFunction__path = os.path.dirname(mod)
+ del mod
+ self.__userOperator = Operator( fromMethod = Function, avoidingRedundancy = self.__avoidRC, inputAsMultiFunction = self.__mfEnabled )
+ self.__userFunction = self.__userOperator.appliedTo # Pour le calcul Direct
+ elif isinstance(Function,types.MethodType):
+ logging.debug("FDA Calculs en multiprocessing : MethodType")
+ self.__userFunction__name = Function.__name__
+ try:
+ mod = os.path.join(Function.__globals__['filepath'],Function.__globals__['filename'])
+ except:
+ mod = os.path.abspath(Function.__func__.__globals__['__file__'])
+ if not os.path.isfile(mod):
+ raise ImportError("No user defined function or method found with the name %s"%(mod,))
+ self.__userFunction__modl = os.path.basename(mod).replace('.pyc','').replace('.pyo','').replace('.py','')
+ self.__userFunction__path = os.path.dirname(mod)
+ del mod
+ self.__userOperator = Operator( fromMethod = Function, avoidingRedundancy = self.__avoidRC, inputAsMultiFunction = self.__mfEnabled )
+ self.__userFunction = self.__userOperator.appliedTo # Pour le calcul Direct
+ else:
+ raise TypeError("User defined function or method has to be provided for finite differences approximation.")
+ else:
+ self.__userOperator = Operator( fromMethod = Function, avoidingRedundancy = self.__avoidRC, inputAsMultiFunction = self.__mfEnabled )
+ self.__userFunction = self.__userOperator.appliedTo
+ #
+ self.__centeredDF = bool(centeredDF)
+ if abs(float(increment)) > 1.e-15:
+ self.__increment = float(increment)
+ else:
+ self.__increment = 0.01
+ if dX is None:
+ self.__dX = None
+ else:
+ self.__dX = numpy.asmatrix(numpy.ravel( dX )).T
+ logging.debug("FDA Reduction des doublons de calcul : %s"%self.__avoidRC)
+ if self.__avoidRC:
+ logging.debug("FDA Tolerance de determination des doublons : %.2e"%self.__tolerBP)
+
+ # ---------------------------------------------------------
+ def __doublon__(self, e, l, n, v=None):
+ __ac, __iac = False, -1
+ for i in range(len(l)-1,-1,-1):
+ if numpy.linalg.norm(e - l[i]) < self.__tolerBP * n[i]:
+ __ac, __iac = True, i
+ if v is not None: logging.debug("FDA Cas%s déja calculé, récupération du doublon %i"%(v,__iac))
+ break
+ return __ac, __iac
+
+ # ---------------------------------------------------------
+ def DirectOperator(self, X ):
+ """
+ Calcul du direct à l'aide de la fonction fournie.
+ """
+ logging.debug("FDA Calcul DirectOperator (explicite)")
+ if self.__mfEnabled:
+ _HX = self.__userFunction( X, argsAsSerie = True )
+ else:
+ _X = numpy.asmatrix(numpy.ravel( X )).T
+ _HX = numpy.ravel(self.__userFunction( _X ))
+ #
+ return _HX
+
+ # ---------------------------------------------------------
+ def TangentMatrix(self, X ):
+ """
+ Calcul de l'opérateur tangent comme la Jacobienne par différences finies,
+ c'est-à-dire le gradient de H en X. On utilise des différences finies
+ directionnelles autour du point X. X est un numpy.matrix.
+
+ Différences finies centrées (approximation d'ordre 2):
+ 1/ Pour chaque composante i de X, on ajoute et on enlève la perturbation
+ dX[i] à la composante X[i], pour composer X_plus_dXi et X_moins_dXi, et
+ on calcule les réponses HX_plus_dXi = H( X_plus_dXi ) et HX_moins_dXi =
+ H( X_moins_dXi )
+ 2/ On effectue les différences (HX_plus_dXi-HX_moins_dXi) et on divise par
+ le pas 2*dXi
+ 3/ Chaque résultat, par composante, devient une colonne de la Jacobienne
+
+ Différences finies non centrées (approximation d'ordre 1):
+ 1/ Pour chaque composante i de X, on ajoute la perturbation dX[i] à la
+ composante X[i] pour composer X_plus_dXi, et on calcule la réponse
+ HX_plus_dXi = H( X_plus_dXi )
+ 2/ On calcule la valeur centrale HX = H(X)
+ 3/ On effectue les différences (HX_plus_dXi-HX) et on divise par
+ le pas dXi
+ 4/ Chaque résultat, par composante, devient une colonne de la Jacobienne
+
+ """
+ logging.debug("FDA Début du calcul de la Jacobienne")
+ logging.debug("FDA Incrément de............: %s*X"%float(self.__increment))
+ logging.debug("FDA Approximation centrée...: %s"%(self.__centeredDF))
+ #
+ if X is None or len(X)==0:
+ raise ValueError("Nominal point X for approximate derivatives can not be None or void (X=%s)."%(str(X),))
+ #
+ _X = numpy.asmatrix(numpy.ravel( X )).T
+ #
+ if self.__dX is None:
+ _dX = self.__increment * _X
+ else:
+ _dX = numpy.asmatrix(numpy.ravel( self.__dX )).T
+ #
+ if (_dX == 0.).any():
+ moyenne = _dX.mean()
+ if moyenne == 0.:
+ _dX = numpy.where( _dX == 0., float(self.__increment), _dX )
+ else:
+ _dX = numpy.where( _dX == 0., moyenne, _dX )
+ #
+ __alreadyCalculated = False
+ if self.__avoidRC:
+ __bidon, __alreadyCalculatedP = self.__doublon__(_X, self.__listJPCP, self.__listJPPN, None)
+ __bidon, __alreadyCalculatedI = self.__doublon__(_dX, self.__listJPCI, self.__listJPIN, None)
+ if __alreadyCalculatedP == __alreadyCalculatedI > -1:
+ __alreadyCalculated, __i = True, __alreadyCalculatedP
+ logging.debug("FDA Cas J déja calculé, récupération du doublon %i"%__i)
+ #
+ if __alreadyCalculated:
+ logging.debug("FDA Calcul Jacobienne (par récupération du doublon %i)"%__i)
+ _Jacobienne = self.__listJPCR[__i]
+ else:
+ logging.debug("FDA Calcul Jacobienne (explicite)")
+ if self.__centeredDF:
+ #
+ if self.__mpEnabled and not self.__mfEnabled:
+ funcrepr = {
+ "__userFunction__path" : self.__userFunction__path,
+ "__userFunction__modl" : self.__userFunction__modl,
+ "__userFunction__name" : self.__userFunction__name,
+ }
+ _jobs = []
+ for i in range( len(_dX) ):
+ _dXi = _dX[i]
+ _X_plus_dXi = numpy.array( _X.A1, dtype=float )
+ _X_plus_dXi[i] = _X[i] + _dXi
+ _X_moins_dXi = numpy.array( _X.A1, dtype=float )
+ _X_moins_dXi[i] = _X[i] - _dXi
+ #
+ _jobs.append( (_X_plus_dXi, funcrepr) )
+ _jobs.append( (_X_moins_dXi, funcrepr) )
+ #
+ import multiprocessing
+ self.__pool = multiprocessing.Pool(self.__mpWorkers)
+ _HX_plusmoins_dX = self.__pool.map( ExecuteFunction, _jobs )
+ self.__pool.close()
+ self.__pool.join()
+ #
+ _Jacobienne = []
+ for i in range( len(_dX) ):
+ _Jacobienne.append( numpy.ravel( _HX_plusmoins_dX[2*i] - _HX_plusmoins_dX[2*i+1] ) / (2.*_dX[i]) )
+ #
+ elif self.__mfEnabled:
+ _xserie = []
+ for i in range( len(_dX) ):
+ _dXi = _dX[i]
+ _X_plus_dXi = numpy.array( _X.A1, dtype=float )
+ _X_plus_dXi[i] = _X[i] + _dXi
+ _X_moins_dXi = numpy.array( _X.A1, dtype=float )
+ _X_moins_dXi[i] = _X[i] - _dXi
+ #
+ _xserie.append( _X_plus_dXi )
+ _xserie.append( _X_moins_dXi )
+ #
+ _HX_plusmoins_dX = self.DirectOperator( _xserie )
+ #
+ _Jacobienne = []
+ for i in range( len(_dX) ):
+ _Jacobienne.append( numpy.ravel( _HX_plusmoins_dX[2*i] - _HX_plusmoins_dX[2*i+1] ) / (2.*_dX[i]) )
+ #
+ else:
+ _Jacobienne = []
+ for i in range( _dX.size ):
+ _dXi = _dX[i]
+ _X_plus_dXi = numpy.array( _X.A1, dtype=float )
+ _X_plus_dXi[i] = _X[i] + _dXi
+ _X_moins_dXi = numpy.array( _X.A1, dtype=float )
+ _X_moins_dXi[i] = _X[i] - _dXi
+ #
+ _HX_plus_dXi = self.DirectOperator( _X_plus_dXi )
+ _HX_moins_dXi = self.DirectOperator( _X_moins_dXi )
+ #
+ _Jacobienne.append( numpy.ravel( _HX_plus_dXi - _HX_moins_dXi ) / (2.*_dXi) )
+ #
+ else:
+ #
+ if self.__mpEnabled and not self.__mfEnabled:
+ funcrepr = {
+ "__userFunction__path" : self.__userFunction__path,
+ "__userFunction__modl" : self.__userFunction__modl,
+ "__userFunction__name" : self.__userFunction__name,
+ }
+ _jobs = []
+ _jobs.append( (_X.A1, funcrepr) )
+ for i in range( len(_dX) ):
+ _X_plus_dXi = numpy.array( _X.A1, dtype=float )
+ _X_plus_dXi[i] = _X[i] + _dX[i]
+ #
+ _jobs.append( (_X_plus_dXi, funcrepr) )
+ #
+ import multiprocessing
+ self.__pool = multiprocessing.Pool(self.__mpWorkers)
+ _HX_plus_dX = self.__pool.map( ExecuteFunction, _jobs )
+ self.__pool.close()
+ self.__pool.join()
+ #
+ _HX = _HX_plus_dX.pop(0)
+ #
+ _Jacobienne = []
+ for i in range( len(_dX) ):
+ _Jacobienne.append( numpy.ravel(( _HX_plus_dX[i] - _HX ) / _dX[i]) )
+ #
+ elif self.__mfEnabled:
+ _xserie = []
+ _xserie.append( _X.A1 )
+ for i in range( len(_dX) ):
+ _X_plus_dXi = numpy.array( _X.A1, dtype=float )
+ _X_plus_dXi[i] = _X[i] + _dX[i]
+ #
+ _xserie.append( _X_plus_dXi )
+ #
+ _HX_plus_dX = self.DirectOperator( _xserie )
+ #
+ _HX = _HX_plus_dX.pop(0)
+ #
+ _Jacobienne = []
+ for i in range( len(_dX) ):
+ _Jacobienne.append( numpy.ravel(( _HX_plus_dX[i] - _HX ) / _dX[i]) )
+ #
+ else:
+ _Jacobienne = []
+ _HX = self.DirectOperator( _X )
+ for i in range( _dX.size ):
+ _dXi = _dX[i]
+ _X_plus_dXi = numpy.array( _X.A1, dtype=float )
+ _X_plus_dXi[i] = _X[i] + _dXi
+ #
+ _HX_plus_dXi = self.DirectOperator( _X_plus_dXi )
+ #
+ _Jacobienne.append( numpy.ravel(( _HX_plus_dXi - _HX ) / _dXi) )
+ #
+ #
+ _Jacobienne = numpy.asmatrix( numpy.vstack( _Jacobienne ) ).T
+ if self.__avoidRC:
+ if self.__lenghtRJ < 0: self.__lenghtRJ = 2 * _X.size
+ while len(self.__listJPCP) > self.__lenghtRJ:
+ self.__listJPCP.pop(0)
+ self.__listJPCI.pop(0)
+ self.__listJPCR.pop(0)
+ self.__listJPPN.pop(0)
+ self.__listJPIN.pop(0)
+ self.__listJPCP.append( copy.copy(_X) )
+ self.__listJPCI.append( copy.copy(_dX) )
+ self.__listJPCR.append( copy.copy(_Jacobienne) )
+ self.__listJPPN.append( numpy.linalg.norm(_X) )
+ self.__listJPIN.append( numpy.linalg.norm(_Jacobienne) )
+ #
+ logging.debug("FDA Fin du calcul de la Jacobienne")
+ #
+ return _Jacobienne
+
+ # ---------------------------------------------------------
+ def TangentOperator(self, paire ):
+ """
+ Calcul du tangent à l'aide de la Jacobienne.
+ """
+ if self.__mfEnabled:
+ assert len(paire) == 1, "Incorrect lenght of arguments"
+ _paire = paire[0]
+ assert len(_paire) == 2, "Incorrect number of arguments"
+ else:
+ assert len(paire) == 2, "Incorrect number of arguments"
+ _paire = paire
+ X, dX = _paire
+ _Jacobienne = self.TangentMatrix( X )
+ if dX is None or len(dX) == 0:
+ #
+ # Calcul de la forme matricielle si le second argument est None
+ # -------------------------------------------------------------
+ if self.__mfEnabled: return [_Jacobienne,]
+ else: return _Jacobienne
+ else:
+ #
+ # Calcul de la valeur linéarisée de H en X appliqué à dX
+ # ------------------------------------------------------
+ _dX = numpy.asmatrix(numpy.ravel( dX )).T
+ _HtX = numpy.dot(_Jacobienne, _dX)
+ if self.__mfEnabled: return [_HtX.A1,]
+ else: return _HtX.A1
+
+ # ---------------------------------------------------------
+ def AdjointOperator(self, paire ):
+ """
+ Calcul de l'adjoint à l'aide de la Jacobienne.
+ """
+ if self.__mfEnabled:
+ assert len(paire) == 1, "Incorrect lenght of arguments"
+ _paire = paire[0]
+ assert len(_paire) == 2, "Incorrect number of arguments"
+ else:
+ assert len(paire) == 2, "Incorrect number of arguments"
+ _paire = paire
+ X, Y = _paire
+ _JacobienneT = self.TangentMatrix( X ).T
+ if Y is None or len(Y) == 0:
+ #
+ # Calcul de la forme matricielle si le second argument est None
+ # -------------------------------------------------------------
+ if self.__mfEnabled: return [_JacobienneT,]
+ else: return _JacobienneT
+ else:
+ #
+ # Calcul de la valeur de l'adjoint en X appliqué à Y
+ # --------------------------------------------------
+ _Y = numpy.asmatrix(numpy.ravel( Y )).T
+ _HaY = numpy.dot(_JacobienneT, _Y)
+ if self.__mfEnabled: return [_HaY.A1,]
+ else: return _HaY.A1
+
+# ==============================================================================
+def mmqr(
+ func = None,
+ x0 = None,
+ fprime = None,
+ bounds = None,
+ quantile = 0.5,
+ maxfun = 15000,
+ toler = 1.e-06,
+ y = None,
+ ):
+ """
+ Implémentation informatique de l'algorithme MMQR, basée sur la publication :
+ David R. Hunter, Kenneth Lange, "Quantile Regression via an MM Algorithm",
+ Journal of Computational and Graphical Statistics, 9, 1, pp.60-77, 2000.
+ """
+ #
+ # Recuperation des donnees et informations initiales
+ # --------------------------------------------------
+ variables = numpy.ravel( x0 )
+ mesures = numpy.ravel( y )
+ increment = sys.float_info[0]
+ p = variables.size
+ n = mesures.size
+ quantile = float(quantile)
+ #
+ # Calcul des parametres du MM
+ # ---------------------------
+ tn = float(toler) / n
+ e0 = -tn / math.log(tn)
+ epsilon = (e0-tn)/(1+math.log(e0))
+ #
+ # Calculs d'initialisation
+ # ------------------------
+ residus = mesures - numpy.ravel( func( variables ) )
+ poids = 1./(epsilon+numpy.abs(residus))
+ veps = 1. - 2. * quantile - residus * poids
+ lastsurrogate = -numpy.sum(residus*veps) - (1.-2.*quantile)*numpy.sum(residus)
+ iteration = 0
+ #
+ # Recherche iterative
+ # -------------------
+ while (increment > toler) and (iteration < maxfun) :
+ iteration += 1
+ #
+ Derivees = numpy.array(fprime(variables))
+ Derivees = Derivees.reshape(n,p) # Necessaire pour remettre en place la matrice si elle passe par des tuyaux YACS
+ DeriveesT = Derivees.transpose()
+ M = numpy.dot( DeriveesT , (numpy.array(numpy.matrix(p*[poids,]).T)*Derivees) )
+ SM = numpy.transpose(numpy.dot( DeriveesT , veps ))
+ step = - numpy.linalg.lstsq( M, SM, rcond=-1 )[0]
+ #
+ variables = variables + step
+ if bounds is not None:
+ while( (variables < numpy.ravel(numpy.asmatrix(bounds)[:,0])).any() or (variables > numpy.ravel(numpy.asmatrix(bounds)[:,1])).any() ):
+ step = step/2.
+ variables = variables - step
+ residus = mesures - numpy.ravel( func(variables) )
+ surrogate = numpy.sum(residus**2 * poids) + (4.*quantile-2.) * numpy.sum(residus)
+ #
+ while ( (surrogate > lastsurrogate) and ( max(list(numpy.abs(step))) > 1.e-16 ) ) :
+ step = step/2.
+ variables = variables - step
+ residus = mesures - numpy.ravel( func(variables) )
+ surrogate = numpy.sum(residus**2 * poids) + (4.*quantile-2.) * numpy.sum(residus)
+ #
+ increment = lastsurrogate-surrogate
+ poids = 1./(epsilon+numpy.abs(residus))
+ veps = 1. - 2. * quantile - residus * poids
+ lastsurrogate = -numpy.sum(residus * veps) - (1.-2.*quantile)*numpy.sum(residus)
+ #
+ # Mesure d'écart
+ # --------------
+ Ecart = quantile * numpy.sum(residus) - numpy.sum( residus[residus<0] )
+ #
+ return variables, Ecart, [n,p,iteration,increment,0]
+
+# ==============================================================================
+if __name__ == "__main__":
+ print('\n AUTODIAGNOSTIC\n')