+++ /dev/null
-# -*- coding: utf-8 -*-
-#
-# Copyright (C) 2008-2020 EDF R&D
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2.1 of the License.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-#
-# See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
-#
-# Author: Jean-Philippe Argaud, jean-philippe.argaud@edf.fr, EDF R&D
-
-__doc__ = """
- Définit les versions approximées des opérateurs tangents et adjoints.
-"""
-__author__ = "Jean-Philippe ARGAUD"
-
-import os, numpy, time, copy, types, sys
-import logging
-from daCore.BasicObjects import Operator
-# logging.getLogger().setLevel(logging.DEBUG)
-
-# ==============================================================================
-def ExecuteFunction( paire ):
- assert len(paire) == 2, "Incorrect number of arguments"
- X, funcrepr = paire
- __X = numpy.asmatrix(numpy.ravel( X )).T
- __sys_path_tmp = sys.path ; sys.path.insert(0,funcrepr["__userFunction__path"])
- __module = __import__(funcrepr["__userFunction__modl"], globals(), locals(), [])
- __fonction = getattr(__module,funcrepr["__userFunction__name"])
- sys.path = __sys_path_tmp ; del __sys_path_tmp
- __HX = __fonction( __X )
- return numpy.ravel( __HX )
-
-# ==============================================================================
-class FDApproximation(object):
- """
- Cette classe sert d'interface pour définir les opérateurs approximés. A la
- création d'un objet, en fournissant une fonction "Function", on obtient un
- objet qui dispose de 3 méthodes "DirectOperator", "TangentOperator" et
- "AdjointOperator". On contrôle l'approximation DF avec l'incrément
- multiplicatif "increment" valant par défaut 1%, ou avec l'incrément fixe
- "dX" qui sera multiplié par "increment" (donc en %), et on effectue de DF
- centrées si le booléen "centeredDF" est vrai.
- """
- def __init__(self,
- Function = None,
- centeredDF = False,
- increment = 0.01,
- dX = None,
- avoidingRedundancy = True,
- toleranceInRedundancy = 1.e-18,
- lenghtOfRedundancy = -1,
- mpEnabled = False,
- mpWorkers = None,
- mfEnabled = False,
- ):
- if mpEnabled:
- try:
- import multiprocessing
- self.__mpEnabled = True
- except ImportError:
- self.__mpEnabled = False
- else:
- self.__mpEnabled = False
- self.__mpWorkers = mpWorkers
- if self.__mpWorkers is not None and self.__mpWorkers < 1:
- self.__mpWorkers = None
- logging.debug("FDA Calculs en multiprocessing : %s (nombre de processus : %s)"%(self.__mpEnabled,self.__mpWorkers))
- #
- if mfEnabled:
- self.__mfEnabled = True
- else:
- self.__mfEnabled = False
- logging.debug("FDA Calculs en multifonctions : %s"%(self.__mfEnabled,))
- #
- if avoidingRedundancy:
- self.__avoidRC = True
- self.__tolerBP = float(toleranceInRedundancy)
- self.__lenghtRJ = int(lenghtOfRedundancy)
- self.__listJPCP = [] # Jacobian Previous Calculated Points
- self.__listJPCI = [] # Jacobian Previous Calculated Increment
- self.__listJPCR = [] # Jacobian Previous Calculated Results
- self.__listJPPN = [] # Jacobian Previous Calculated Point Norms
- self.__listJPIN = [] # Jacobian Previous Calculated Increment Norms
- else:
- self.__avoidRC = False
- #
- if self.__mpEnabled:
- if isinstance(Function,types.FunctionType):
- logging.debug("FDA Calculs en multiprocessing : FunctionType")
- self.__userFunction__name = Function.__name__
- try:
- mod = os.path.join(Function.__globals__['filepath'],Function.__globals__['filename'])
- except:
- mod = os.path.abspath(Function.__globals__['__file__'])
- if not os.path.isfile(mod):
- raise ImportError("No user defined function or method found with the name %s"%(mod,))
- self.__userFunction__modl = os.path.basename(mod).replace('.pyc','').replace('.pyo','').replace('.py','')
- self.__userFunction__path = os.path.dirname(mod)
- del mod
- self.__userOperator = Operator( fromMethod = Function, avoidingRedundancy = self.__avoidRC, inputAsMultiFunction = self.__mfEnabled )
- self.__userFunction = self.__userOperator.appliedTo # Pour le calcul Direct
- elif isinstance(Function,types.MethodType):
- logging.debug("FDA Calculs en multiprocessing : MethodType")
- self.__userFunction__name = Function.__name__
- try:
- mod = os.path.join(Function.__globals__['filepath'],Function.__globals__['filename'])
- except:
- mod = os.path.abspath(Function.__func__.__globals__['__file__'])
- if not os.path.isfile(mod):
- raise ImportError("No user defined function or method found with the name %s"%(mod,))
- self.__userFunction__modl = os.path.basename(mod).replace('.pyc','').replace('.pyo','').replace('.py','')
- self.__userFunction__path = os.path.dirname(mod)
- del mod
- self.__userOperator = Operator( fromMethod = Function, avoidingRedundancy = self.__avoidRC, inputAsMultiFunction = self.__mfEnabled )
- self.__userFunction = self.__userOperator.appliedTo # Pour le calcul Direct
- else:
- raise TypeError("User defined function or method has to be provided for finite differences approximation.")
- else:
- self.__userOperator = Operator( fromMethod = Function, avoidingRedundancy = self.__avoidRC, inputAsMultiFunction = self.__mfEnabled )
- self.__userFunction = self.__userOperator.appliedTo
- #
- self.__centeredDF = bool(centeredDF)
- if abs(float(increment)) > 1.e-15:
- self.__increment = float(increment)
- else:
- self.__increment = 0.01
- if dX is None:
- self.__dX = None
- else:
- self.__dX = numpy.asmatrix(numpy.ravel( dX )).T
- logging.debug("FDA Reduction des doublons de calcul : %s"%self.__avoidRC)
- if self.__avoidRC:
- logging.debug("FDA Tolerance de determination des doublons : %.2e"%self.__tolerBP)
-
- # ---------------------------------------------------------
- def __doublon__(self, e, l, n, v=None):
- __ac, __iac = False, -1
- for i in range(len(l)-1,-1,-1):
- if numpy.linalg.norm(e - l[i]) < self.__tolerBP * n[i]:
- __ac, __iac = True, i
- if v is not None: logging.debug("FDA Cas%s déja calculé, récupération du doublon %i"%(v,__iac))
- break
- return __ac, __iac
-
- # ---------------------------------------------------------
- def DirectOperator(self, X ):
- """
- Calcul du direct à l'aide de la fonction fournie.
- """
- logging.debug("FDA Calcul DirectOperator (explicite)")
- if self.__mfEnabled:
- _HX = self.__userFunction( X, argsAsSerie = True )
- else:
- _X = numpy.asmatrix(numpy.ravel( X )).T
- _HX = numpy.ravel(self.__userFunction( _X ))
- #
- return _HX
-
- # ---------------------------------------------------------
- def TangentMatrix(self, X ):
- """
- Calcul de l'opérateur tangent comme la Jacobienne par différences finies,
- c'est-à-dire le gradient de H en X. On utilise des différences finies
- directionnelles autour du point X. X est un numpy.matrix.
-
- Différences finies centrées (approximation d'ordre 2):
- 1/ Pour chaque composante i de X, on ajoute et on enlève la perturbation
- dX[i] à la composante X[i], pour composer X_plus_dXi et X_moins_dXi, et
- on calcule les réponses HX_plus_dXi = H( X_plus_dXi ) et HX_moins_dXi =
- H( X_moins_dXi )
- 2/ On effectue les différences (HX_plus_dXi-HX_moins_dXi) et on divise par
- le pas 2*dXi
- 3/ Chaque résultat, par composante, devient une colonne de la Jacobienne
-
- Différences finies non centrées (approximation d'ordre 1):
- 1/ Pour chaque composante i de X, on ajoute la perturbation dX[i] à la
- composante X[i] pour composer X_plus_dXi, et on calcule la réponse
- HX_plus_dXi = H( X_plus_dXi )
- 2/ On calcule la valeur centrale HX = H(X)
- 3/ On effectue les différences (HX_plus_dXi-HX) et on divise par
- le pas dXi
- 4/ Chaque résultat, par composante, devient une colonne de la Jacobienne
-
- """
- logging.debug("FDA Début du calcul de la Jacobienne")
- logging.debug("FDA Incrément de............: %s*X"%float(self.__increment))
- logging.debug("FDA Approximation centrée...: %s"%(self.__centeredDF))
- #
- if X is None or len(X)==0:
- raise ValueError("Nominal point X for approximate derivatives can not be None or void (X=%s)."%(str(X),))
- #
- _X = numpy.asmatrix(numpy.ravel( X )).T
- #
- if self.__dX is None:
- _dX = self.__increment * _X
- else:
- _dX = numpy.asmatrix(numpy.ravel( self.__dX )).T
- #
- if (_dX == 0.).any():
- moyenne = _dX.mean()
- if moyenne == 0.:
- _dX = numpy.where( _dX == 0., float(self.__increment), _dX )
- else:
- _dX = numpy.where( _dX == 0., moyenne, _dX )
- #
- __alreadyCalculated = False
- if self.__avoidRC:
- __bidon, __alreadyCalculatedP = self.__doublon__(_X, self.__listJPCP, self.__listJPPN, None)
- __bidon, __alreadyCalculatedI = self.__doublon__(_dX, self.__listJPCI, self.__listJPIN, None)
- if __alreadyCalculatedP == __alreadyCalculatedI > -1:
- __alreadyCalculated, __i = True, __alreadyCalculatedP
- logging.debug("FDA Cas J déja calculé, récupération du doublon %i"%__i)
- #
- if __alreadyCalculated:
- logging.debug("FDA Calcul Jacobienne (par récupération du doublon %i)"%__i)
- _Jacobienne = self.__listJPCR[__i]
- else:
- logging.debug("FDA Calcul Jacobienne (explicite)")
- if self.__centeredDF:
- #
- if self.__mpEnabled and not self.__mfEnabled:
- funcrepr = {
- "__userFunction__path" : self.__userFunction__path,
- "__userFunction__modl" : self.__userFunction__modl,
- "__userFunction__name" : self.__userFunction__name,
- }
- _jobs = []
- for i in range( len(_dX) ):
- _dXi = _dX[i]
- _X_plus_dXi = numpy.array( _X.A1, dtype=float )
- _X_plus_dXi[i] = _X[i] + _dXi
- _X_moins_dXi = numpy.array( _X.A1, dtype=float )
- _X_moins_dXi[i] = _X[i] - _dXi
- #
- _jobs.append( (_X_plus_dXi, funcrepr) )
- _jobs.append( (_X_moins_dXi, funcrepr) )
- #
- import multiprocessing
- self.__pool = multiprocessing.Pool(self.__mpWorkers)
- _HX_plusmoins_dX = self.__pool.map( ExecuteFunction, _jobs )
- self.__pool.close()
- self.__pool.join()
- #
- _Jacobienne = []
- for i in range( len(_dX) ):
- _Jacobienne.append( numpy.ravel( _HX_plusmoins_dX[2*i] - _HX_plusmoins_dX[2*i+1] ) / (2.*_dX[i]) )
- #
- elif self.__mfEnabled:
- _xserie = []
- for i in range( len(_dX) ):
- _dXi = _dX[i]
- _X_plus_dXi = numpy.array( _X.A1, dtype=float )
- _X_plus_dXi[i] = _X[i] + _dXi
- _X_moins_dXi = numpy.array( _X.A1, dtype=float )
- _X_moins_dXi[i] = _X[i] - _dXi
- #
- _xserie.append( _X_plus_dXi )
- _xserie.append( _X_moins_dXi )
- #
- _HX_plusmoins_dX = self.DirectOperator( _xserie )
- #
- _Jacobienne = []
- for i in range( len(_dX) ):
- _Jacobienne.append( numpy.ravel( _HX_plusmoins_dX[2*i] - _HX_plusmoins_dX[2*i+1] ) / (2.*_dX[i]) )
- #
- else:
- _Jacobienne = []
- for i in range( _dX.size ):
- _dXi = _dX[i]
- _X_plus_dXi = numpy.array( _X.A1, dtype=float )
- _X_plus_dXi[i] = _X[i] + _dXi
- _X_moins_dXi = numpy.array( _X.A1, dtype=float )
- _X_moins_dXi[i] = _X[i] - _dXi
- #
- _HX_plus_dXi = self.DirectOperator( _X_plus_dXi )
- _HX_moins_dXi = self.DirectOperator( _X_moins_dXi )
- #
- _Jacobienne.append( numpy.ravel( _HX_plus_dXi - _HX_moins_dXi ) / (2.*_dXi) )
- #
- else:
- #
- if self.__mpEnabled and not self.__mfEnabled:
- funcrepr = {
- "__userFunction__path" : self.__userFunction__path,
- "__userFunction__modl" : self.__userFunction__modl,
- "__userFunction__name" : self.__userFunction__name,
- }
- _jobs = []
- _jobs.append( (_X.A1, funcrepr) )
- for i in range( len(_dX) ):
- _X_plus_dXi = numpy.array( _X.A1, dtype=float )
- _X_plus_dXi[i] = _X[i] + _dX[i]
- #
- _jobs.append( (_X_plus_dXi, funcrepr) )
- #
- import multiprocessing
- self.__pool = multiprocessing.Pool(self.__mpWorkers)
- _HX_plus_dX = self.__pool.map( ExecuteFunction, _jobs )
- self.__pool.close()
- self.__pool.join()
- #
- _HX = _HX_plus_dX.pop(0)
- #
- _Jacobienne = []
- for i in range( len(_dX) ):
- _Jacobienne.append( numpy.ravel(( _HX_plus_dX[i] - _HX ) / _dX[i]) )
- #
- elif self.__mfEnabled:
- _xserie = []
- _xserie.append( _X.A1 )
- for i in range( len(_dX) ):
- _X_plus_dXi = numpy.array( _X.A1, dtype=float )
- _X_plus_dXi[i] = _X[i] + _dX[i]
- #
- _xserie.append( _X_plus_dXi )
- #
- _HX_plus_dX = self.DirectOperator( _xserie )
- #
- _HX = _HX_plus_dX.pop(0)
- #
- _Jacobienne = []
- for i in range( len(_dX) ):
- _Jacobienne.append( numpy.ravel(( _HX_plus_dX[i] - _HX ) / _dX[i]) )
- #
- else:
- _Jacobienne = []
- _HX = self.DirectOperator( _X )
- for i in range( _dX.size ):
- _dXi = _dX[i]
- _X_plus_dXi = numpy.array( _X.A1, dtype=float )
- _X_plus_dXi[i] = _X[i] + _dXi
- #
- _HX_plus_dXi = self.DirectOperator( _X_plus_dXi )
- #
- _Jacobienne.append( numpy.ravel(( _HX_plus_dXi - _HX ) / _dXi) )
- #
- #
- _Jacobienne = numpy.asmatrix( numpy.vstack( _Jacobienne ) ).T
- if self.__avoidRC:
- if self.__lenghtRJ < 0: self.__lenghtRJ = 2 * _X.size
- while len(self.__listJPCP) > self.__lenghtRJ:
- self.__listJPCP.pop(0)
- self.__listJPCI.pop(0)
- self.__listJPCR.pop(0)
- self.__listJPPN.pop(0)
- self.__listJPIN.pop(0)
- self.__listJPCP.append( copy.copy(_X) )
- self.__listJPCI.append( copy.copy(_dX) )
- self.__listJPCR.append( copy.copy(_Jacobienne) )
- self.__listJPPN.append( numpy.linalg.norm(_X) )
- self.__listJPIN.append( numpy.linalg.norm(_Jacobienne) )
- #
- logging.debug("FDA Fin du calcul de la Jacobienne")
- #
- return _Jacobienne
-
- # ---------------------------------------------------------
- def TangentOperator(self, paire ):
- """
- Calcul du tangent à l'aide de la Jacobienne.
- """
- if self.__mfEnabled:
- assert len(paire) == 1, "Incorrect lenght of arguments"
- _paire = paire[0]
- assert len(_paire) == 2, "Incorrect number of arguments"
- else:
- assert len(paire) == 2, "Incorrect number of arguments"
- _paire = paire
- X, dX = _paire
- _Jacobienne = self.TangentMatrix( X )
- if dX is None or len(dX) == 0:
- #
- # Calcul de la forme matricielle si le second argument est None
- # -------------------------------------------------------------
- if self.__mfEnabled: return [_Jacobienne,]
- else: return _Jacobienne
- else:
- #
- # Calcul de la valeur linéarisée de H en X appliqué à dX
- # ------------------------------------------------------
- _dX = numpy.asmatrix(numpy.ravel( dX )).T
- _HtX = numpy.dot(_Jacobienne, _dX)
- if self.__mfEnabled: return [_HtX.A1,]
- else: return _HtX.A1
-
- # ---------------------------------------------------------
- def AdjointOperator(self, paire ):
- """
- Calcul de l'adjoint à l'aide de la Jacobienne.
- """
- if self.__mfEnabled:
- assert len(paire) == 1, "Incorrect lenght of arguments"
- _paire = paire[0]
- assert len(_paire) == 2, "Incorrect number of arguments"
- else:
- assert len(paire) == 2, "Incorrect number of arguments"
- _paire = paire
- X, Y = _paire
- _JacobienneT = self.TangentMatrix( X ).T
- if Y is None or len(Y) == 0:
- #
- # Calcul de la forme matricielle si le second argument est None
- # -------------------------------------------------------------
- if self.__mfEnabled: return [_JacobienneT,]
- else: return _JacobienneT
- else:
- #
- # Calcul de la valeur de l'adjoint en X appliqué à Y
- # --------------------------------------------------
- _Y = numpy.asmatrix(numpy.ravel( Y )).T
- _HaY = numpy.dot(_JacobienneT, _Y)
- if self.__mfEnabled: return [_HaY.A1,]
- else: return _HaY.A1
-
-# ==============================================================================
-if __name__ == "__main__":
- print('\n AUTODIAGNOSTIC\n')