lenghtOfRedundancy = -1,
mpEnabled = False,
mpWorkers = None,
+ mfEnabled = False,
):
if mpEnabled:
try:
self.__mpWorkers = None
logging.debug("FDA Calculs en multiprocessing : %s (nombre de processus : %s)"%(self.__mpEnabled,self.__mpWorkers))
#
+ if mfEnabled:
+ self.__mfEnabled = True
+ else:
+ self.__mfEnabled = False
+ logging.debug("FDA Calculs en multifonctions : %s"%(self.__mfEnabled,))
+ #
+ if avoidingRedundancy:
+ self.__avoidRC = True
+ self.__tolerBP = float(toleranceInRedundancy)
+ self.__lenghtRJ = int(lenghtOfRedundancy)
+ self.__listJPCP = [] # Jacobian Previous Calculated Points
+ self.__listJPCI = [] # Jacobian Previous Calculated Increment
+ self.__listJPCR = [] # Jacobian Previous Calculated Results
+ self.__listJPPN = [] # Jacobian Previous Calculated Point Norms
+ self.__listJPIN = [] # Jacobian Previous Calculated Increment Norms
+ else:
+ self.__avoidRC = False
+ #
if self.__mpEnabled:
if isinstance(Function,types.FunctionType):
logging.debug("FDA Calculs en multiprocessing : FunctionType")
self.__userFunction__modl = os.path.basename(mod).replace('.pyc','').replace('.pyo','').replace('.py','')
self.__userFunction__path = os.path.dirname(mod)
del mod
- self.__userOperator = Operator( fromMethod = Function )
+ self.__userOperator = Operator( fromMethod = Function, avoidingRedundancy = self.__avoidRC, inputAsMultiFunction = self.__mfEnabled )
self.__userFunction = self.__userOperator.appliedTo # Pour le calcul Direct
elif isinstance(Function,types.MethodType):
logging.debug("FDA Calculs en multiprocessing : MethodType")
self.__userFunction__modl = os.path.basename(mod).replace('.pyc','').replace('.pyo','').replace('.py','')
self.__userFunction__path = os.path.dirname(mod)
del mod
- self.__userOperator = Operator( fromMethod = Function )
+ self.__userOperator = Operator( fromMethod = Function, avoidingRedundancy = self.__avoidRC, inputAsMultiFunction = self.__mfEnabled )
self.__userFunction = self.__userOperator.appliedTo # Pour le calcul Direct
else:
raise TypeError("User defined function or method has to be provided for finite differences approximation.")
else:
- self.__userOperator = Operator( fromMethod = Function )
+ self.__userOperator = Operator( fromMethod = Function, avoidingRedundancy = self.__avoidRC, inputAsMultiFunction = self.__mfEnabled )
self.__userFunction = self.__userOperator.appliedTo
#
self.__centeredDF = bool(centeredDF)
- if avoidingRedundancy:
- self.__avoidRC = True
- self.__tolerBP = float(toleranceInRedundancy)
- self.__lenghtRJ = int(lenghtOfRedundancy)
- self.__listJPCP = [] # Jacobian Previous Calculated Points
- self.__listJPCI = [] # Jacobian Previous Calculated Increment
- self.__listJPCR = [] # Jacobian Previous Calculated Results
- self.__listJPPN = [] # Jacobian Previous Calculated Point Norms
- self.__listJPIN = [] # Jacobian Previous Calculated Increment Norms
- else:
- self.__avoidRC = False
if abs(float(increment)) > 1.e-15:
self.__increment = float(increment)
else:
Calcul du direct à l'aide de la fonction fournie.
"""
logging.debug("FDA Calcul DirectOperator (explicite)")
- _X = numpy.asmatrix(numpy.ravel( X )).T
- _HX = numpy.ravel(self.__userFunction( _X ))
+ if self.__mfEnabled:
+ _HX = self.__userFunction( X, argsAsSerie = True )
+ else:
+ _X = numpy.asmatrix(numpy.ravel( X )).T
+ _HX = numpy.ravel(self.__userFunction( _X ))
#
return _HX
logging.debug("FDA Calcul Jacobienne (explicite)")
if self.__centeredDF:
#
- if self.__mpEnabled:
+ if self.__mpEnabled and not self.__mfEnabled:
funcrepr = {
"__userFunction__path" : self.__userFunction__path,
"__userFunction__modl" : self.__userFunction__modl,
_Jacobienne = []
for i in range( len(_dX) ):
_Jacobienne.append( numpy.ravel( _HX_plusmoins_dX[2*i] - _HX_plusmoins_dX[2*i+1] ) / (2.*_dX[i]) )
+ #
+ elif self.__mfEnabled:
+ _xserie = []
+ for i in range( len(_dX) ):
+ _dXi = _dX[i]
+ _X_plus_dXi = numpy.array( _X.A1, dtype=float )
+ _X_plus_dXi[i] = _X[i] + _dXi
+ _X_moins_dXi = numpy.array( _X.A1, dtype=float )
+ _X_moins_dXi[i] = _X[i] - _dXi
+ #
+ _xserie.append( _X_plus_dXi )
+ _xserie.append( _X_moins_dXi )
+ #
+ _HX_plusmoins_dX = self.DirectOperator( _xserie )
+ #
+ _Jacobienne = []
+ for i in range( len(_dX) ):
+ _Jacobienne.append( numpy.ravel( _HX_plusmoins_dX[2*i] - _HX_plusmoins_dX[2*i+1] ) / (2.*_dX[i]) )
+ #
else:
_Jacobienne = []
for i in range( _dX.size ):
#
else:
#
- if self.__mpEnabled:
- _HX_plus_dX = []
+ if self.__mpEnabled and not self.__mfEnabled:
funcrepr = {
"__userFunction__path" : self.__userFunction__path,
"__userFunction__modl" : self.__userFunction__modl,
_Jacobienne = []
for i in range( len(_dX) ):
_Jacobienne.append( numpy.ravel(( _HX_plus_dX[i] - _HX ) / _dX[i]) )
+ #
+ elif self.__mfEnabled:
+ _xserie = []
+ _xserie.append( _X.A1 )
+ for i in range( len(_dX) ):
+ _X_plus_dXi = numpy.array( _X.A1, dtype=float )
+ _X_plus_dXi[i] = _X[i] + _dX[i]
+ #
+ _xserie.append( _X_plus_dXi )
+ #
+ _HX_plus_dX = self.DirectOperator( _xserie )
+ #
+ _HX = _HX_plus_dX.pop(0)
+ #
+ _Jacobienne = []
+ for i in range( len(_dX) ):
+ _Jacobienne.append( numpy.ravel(( _HX_plus_dX[i] - _HX ) / _dX[i]) )
+ #
else:
_Jacobienne = []
_HX = self.DirectOperator( _X )
_Jacobienne.append( numpy.ravel(( _HX_plus_dXi - _HX ) / _dXi) )
#
#
- _Jacobienne = numpy.matrix( numpy.vstack( _Jacobienne ) ).T
+ _Jacobienne = numpy.asmatrix( numpy.vstack( _Jacobienne ) ).T
if self.__avoidRC:
if self.__lenghtRJ < 0: self.__lenghtRJ = 2 * _X.size
while len(self.__listJPCP) > self.__lenghtRJ:
"""
Calcul du tangent à l'aide de la Jacobienne.
"""
- assert len(paire) == 2, "Incorrect number of arguments"
- X, dX = paire
+ if self.__mfEnabled:
+ assert len(paire) == 1, "Incorrect lenght of arguments"
+ _paire = paire[0]
+ assert len(_paire) == 2, "Incorrect number of arguments"
+ else:
+ assert len(paire) == 2, "Incorrect number of arguments"
+ _paire = paire
+ X, dX = _paire
_Jacobienne = self.TangentMatrix( X )
if dX is None or len(dX) == 0:
#
# Calcul de la forme matricielle si le second argument est None
# -------------------------------------------------------------
- return _Jacobienne
+ if self.__mfEnabled: return [_Jacobienne,]
+ else: return _Jacobienne
else:
#
# Calcul de la valeur linéarisée de H en X appliqué à dX
# ------------------------------------------------------
_dX = numpy.asmatrix(numpy.ravel( dX )).T
_HtX = numpy.dot(_Jacobienne, _dX)
- return _HtX.A1
+ if self.__mfEnabled: return [_HtX.A1,]
+ else: return _HtX.A1
# ---------------------------------------------------------
def AdjointOperator(self, paire ):
"""
Calcul de l'adjoint à l'aide de la Jacobienne.
"""
- assert len(paire) == 2, "Incorrect number of arguments"
- X, Y = paire
+ if self.__mfEnabled:
+ assert len(paire) == 1, "Incorrect lenght of arguments"
+ _paire = paire[0]
+ assert len(_paire) == 2, "Incorrect number of arguments"
+ else:
+ assert len(paire) == 2, "Incorrect number of arguments"
+ _paire = paire
+ X, Y = _paire
_JacobienneT = self.TangentMatrix( X ).T
if Y is None or len(Y) == 0:
#
# Calcul de la forme matricielle si le second argument est None
# -------------------------------------------------------------
- return _JacobienneT
+ if self.__mfEnabled: return [_JacobienneT,]
+ else: return _JacobienneT
else:
#
# Calcul de la valeur de l'adjoint en X appliqué à Y
# --------------------------------------------------
_Y = numpy.asmatrix(numpy.ravel( Y )).T
_HaY = numpy.dot(_JacobienneT, _Y)
- return _HaY.A1
+ if self.__mfEnabled: return [_HaY.A1,]
+ else: return _HaY.A1
# ==============================================================================
if __name__ == "__main__":