Salome HOME
Update obsolete numeric objects
authorJean-Philippe ARGAUD <jean-philippe.argaud@edf.fr>
Fri, 14 Feb 2020 09:48:01 +0000 (10:48 +0100)
committerJean-Philippe ARGAUD <jean-philippe.argaud@edf.fr>
Fri, 14 Feb 2020 09:48:01 +0000 (10:48 +0100)
src/daComposant/daNumerics/ApproximatedDerivatives.py [deleted file]

diff --git a/src/daComposant/daNumerics/ApproximatedDerivatives.py b/src/daComposant/daNumerics/ApproximatedDerivatives.py
deleted file mode 100644 (file)
index 2b73055..0000000
+++ /dev/null
@@ -1,429 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Copyright (C) 2008-2020 EDF R&D
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2.1 of the License.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
-#
-# See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
-#
-# Author: Jean-Philippe Argaud, jean-philippe.argaud@edf.fr, EDF R&D
-
-__doc__ = """
-    Définit les versions approximées des opérateurs tangents et adjoints.
-"""
-__author__ = "Jean-Philippe ARGAUD"
-
-import os, numpy, time, copy, types, sys
-import logging
-from daCore.BasicObjects import Operator
-# logging.getLogger().setLevel(logging.DEBUG)
-
-# ==============================================================================
-def ExecuteFunction( paire ):
-    assert len(paire) == 2, "Incorrect number of arguments"
-    X, funcrepr = paire
-    __X = numpy.asmatrix(numpy.ravel( X )).T
-    __sys_path_tmp = sys.path ; sys.path.insert(0,funcrepr["__userFunction__path"])
-    __module = __import__(funcrepr["__userFunction__modl"], globals(), locals(), [])
-    __fonction = getattr(__module,funcrepr["__userFunction__name"])
-    sys.path = __sys_path_tmp ; del __sys_path_tmp
-    __HX  = __fonction( __X )
-    return numpy.ravel( __HX )
-
-# ==============================================================================
-class FDApproximation(object):
-    """
-    Cette classe sert d'interface pour définir les opérateurs approximés. A la
-    création d'un objet, en fournissant une fonction "Function", on obtient un
-    objet qui dispose de 3 méthodes "DirectOperator", "TangentOperator" et
-    "AdjointOperator". On contrôle l'approximation DF avec l'incrément
-    multiplicatif "increment" valant par défaut 1%, ou avec l'incrément fixe
-    "dX" qui sera multiplié par "increment" (donc en %), et on effectue de DF
-    centrées si le booléen "centeredDF" est vrai.
-    """
-    def __init__(self,
-            Function              = None,
-            centeredDF            = False,
-            increment             = 0.01,
-            dX                    = None,
-            avoidingRedundancy    = True,
-            toleranceInRedundancy = 1.e-18,
-            lenghtOfRedundancy    = -1,
-            mpEnabled             = False,
-            mpWorkers             = None,
-            mfEnabled             = False,
-            ):
-        if mpEnabled:
-            try:
-                import multiprocessing
-                self.__mpEnabled = True
-            except ImportError:
-                self.__mpEnabled = False
-        else:
-            self.__mpEnabled = False
-        self.__mpWorkers = mpWorkers
-        if self.__mpWorkers is not None and self.__mpWorkers < 1:
-            self.__mpWorkers = None
-        logging.debug("FDA Calculs en multiprocessing : %s (nombre de processus : %s)"%(self.__mpEnabled,self.__mpWorkers))
-        #
-        if mfEnabled:
-            self.__mfEnabled = True
-        else:
-            self.__mfEnabled = False
-        logging.debug("FDA Calculs en multifonctions : %s"%(self.__mfEnabled,))
-        #
-        if avoidingRedundancy:
-            self.__avoidRC = True
-            self.__tolerBP = float(toleranceInRedundancy)
-            self.__lenghtRJ = int(lenghtOfRedundancy)
-            self.__listJPCP = [] # Jacobian Previous Calculated Points
-            self.__listJPCI = [] # Jacobian Previous Calculated Increment
-            self.__listJPCR = [] # Jacobian Previous Calculated Results
-            self.__listJPPN = [] # Jacobian Previous Calculated Point Norms
-            self.__listJPIN = [] # Jacobian Previous Calculated Increment Norms
-        else:
-            self.__avoidRC = False
-        #
-        if self.__mpEnabled:
-            if isinstance(Function,types.FunctionType):
-                logging.debug("FDA Calculs en multiprocessing : FunctionType")
-                self.__userFunction__name = Function.__name__
-                try:
-                    mod = os.path.join(Function.__globals__['filepath'],Function.__globals__['filename'])
-                except:
-                    mod = os.path.abspath(Function.__globals__['__file__'])
-                if not os.path.isfile(mod):
-                    raise ImportError("No user defined function or method found with the name %s"%(mod,))
-                self.__userFunction__modl = os.path.basename(mod).replace('.pyc','').replace('.pyo','').replace('.py','')
-                self.__userFunction__path = os.path.dirname(mod)
-                del mod
-                self.__userOperator = Operator( fromMethod = Function, avoidingRedundancy = self.__avoidRC, inputAsMultiFunction = self.__mfEnabled )
-                self.__userFunction = self.__userOperator.appliedTo # Pour le calcul Direct
-            elif isinstance(Function,types.MethodType):
-                logging.debug("FDA Calculs en multiprocessing : MethodType")
-                self.__userFunction__name = Function.__name__
-                try:
-                    mod = os.path.join(Function.__globals__['filepath'],Function.__globals__['filename'])
-                except:
-                    mod = os.path.abspath(Function.__func__.__globals__['__file__'])
-                if not os.path.isfile(mod):
-                    raise ImportError("No user defined function or method found with the name %s"%(mod,))
-                self.__userFunction__modl = os.path.basename(mod).replace('.pyc','').replace('.pyo','').replace('.py','')
-                self.__userFunction__path = os.path.dirname(mod)
-                del mod
-                self.__userOperator = Operator( fromMethod = Function, avoidingRedundancy = self.__avoidRC, inputAsMultiFunction = self.__mfEnabled )
-                self.__userFunction = self.__userOperator.appliedTo # Pour le calcul Direct
-            else:
-                raise TypeError("User defined function or method has to be provided for finite differences approximation.")
-        else:
-            self.__userOperator = Operator( fromMethod = Function, avoidingRedundancy = self.__avoidRC, inputAsMultiFunction = self.__mfEnabled )
-            self.__userFunction = self.__userOperator.appliedTo
-        #
-        self.__centeredDF = bool(centeredDF)
-        if abs(float(increment)) > 1.e-15:
-            self.__increment  = float(increment)
-        else:
-            self.__increment  = 0.01
-        if dX is None:
-            self.__dX     = None
-        else:
-            self.__dX     = numpy.asmatrix(numpy.ravel( dX )).T
-        logging.debug("FDA Reduction des doublons de calcul : %s"%self.__avoidRC)
-        if self.__avoidRC:
-            logging.debug("FDA Tolerance de determination des doublons : %.2e"%self.__tolerBP)
-
-    # ---------------------------------------------------------
-    def __doublon__(self, e, l, n, v=None):
-        __ac, __iac = False, -1
-        for i in range(len(l)-1,-1,-1):
-            if numpy.linalg.norm(e - l[i]) < self.__tolerBP * n[i]:
-                __ac, __iac = True, i
-                if v is not None: logging.debug("FDA Cas%s déja calculé, récupération du doublon %i"%(v,__iac))
-                break
-        return __ac, __iac
-
-    # ---------------------------------------------------------
-    def DirectOperator(self, X ):
-        """
-        Calcul du direct à l'aide de la fonction fournie.
-        """
-        logging.debug("FDA Calcul DirectOperator (explicite)")
-        if self.__mfEnabled:
-            _HX = self.__userFunction( X, argsAsSerie = True )
-        else:
-            _X = numpy.asmatrix(numpy.ravel( X )).T
-            _HX = numpy.ravel(self.__userFunction( _X ))
-        #
-        return _HX
-
-    # ---------------------------------------------------------
-    def TangentMatrix(self, X ):
-        """
-        Calcul de l'opérateur tangent comme la Jacobienne par différences finies,
-        c'est-à-dire le gradient de H en X. On utilise des différences finies
-        directionnelles autour du point X. X est un numpy.matrix.
-
-        Différences finies centrées (approximation d'ordre 2):
-        1/ Pour chaque composante i de X, on ajoute et on enlève la perturbation
-           dX[i] à la  composante X[i], pour composer X_plus_dXi et X_moins_dXi, et
-           on calcule les réponses HX_plus_dXi = H( X_plus_dXi ) et HX_moins_dXi =
-           H( X_moins_dXi )
-        2/ On effectue les différences (HX_plus_dXi-HX_moins_dXi) et on divise par
-           le pas 2*dXi
-        3/ Chaque résultat, par composante, devient une colonne de la Jacobienne
-
-        Différences finies non centrées (approximation d'ordre 1):
-        1/ Pour chaque composante i de X, on ajoute la perturbation dX[i] à la
-           composante X[i] pour composer X_plus_dXi, et on calcule la réponse
-           HX_plus_dXi = H( X_plus_dXi )
-        2/ On calcule la valeur centrale HX = H(X)
-        3/ On effectue les différences (HX_plus_dXi-HX) et on divise par
-           le pas dXi
-        4/ Chaque résultat, par composante, devient une colonne de la Jacobienne
-
-        """
-        logging.debug("FDA Début du calcul de la Jacobienne")
-        logging.debug("FDA   Incrément de............: %s*X"%float(self.__increment))
-        logging.debug("FDA   Approximation centrée...: %s"%(self.__centeredDF))
-        #
-        if X is None or len(X)==0:
-            raise ValueError("Nominal point X for approximate derivatives can not be None or void (X=%s)."%(str(X),))
-        #
-        _X = numpy.asmatrix(numpy.ravel( X )).T
-        #
-        if self.__dX is None:
-            _dX  = self.__increment * _X
-        else:
-            _dX = numpy.asmatrix(numpy.ravel( self.__dX )).T
-        #
-        if (_dX == 0.).any():
-            moyenne = _dX.mean()
-            if moyenne == 0.:
-                _dX = numpy.where( _dX == 0., float(self.__increment), _dX )
-            else:
-                _dX = numpy.where( _dX == 0., moyenne, _dX )
-        #
-        __alreadyCalculated  = False
-        if self.__avoidRC:
-            __bidon, __alreadyCalculatedP = self.__doublon__(_X,  self.__listJPCP, self.__listJPPN, None)
-            __bidon, __alreadyCalculatedI = self.__doublon__(_dX, self.__listJPCI, self.__listJPIN, None)
-            if __alreadyCalculatedP == __alreadyCalculatedI > -1:
-                __alreadyCalculated, __i = True, __alreadyCalculatedP
-                logging.debug("FDA Cas J déja calculé, récupération du doublon %i"%__i)
-        #
-        if __alreadyCalculated:
-            logging.debug("FDA   Calcul Jacobienne (par récupération du doublon %i)"%__i)
-            _Jacobienne = self.__listJPCR[__i]
-        else:
-            logging.debug("FDA   Calcul Jacobienne (explicite)")
-            if self.__centeredDF:
-                #
-                if self.__mpEnabled and not self.__mfEnabled:
-                    funcrepr = {
-                        "__userFunction__path" : self.__userFunction__path,
-                        "__userFunction__modl" : self.__userFunction__modl,
-                        "__userFunction__name" : self.__userFunction__name,
-                    }
-                    _jobs = []
-                    for i in range( len(_dX) ):
-                        _dXi            = _dX[i]
-                        _X_plus_dXi     = numpy.array( _X.A1, dtype=float )
-                        _X_plus_dXi[i]  = _X[i] + _dXi
-                        _X_moins_dXi    = numpy.array( _X.A1, dtype=float )
-                        _X_moins_dXi[i] = _X[i] - _dXi
-                        #
-                        _jobs.append( (_X_plus_dXi,  funcrepr) )
-                        _jobs.append( (_X_moins_dXi, funcrepr) )
-                    #
-                    import multiprocessing
-                    self.__pool = multiprocessing.Pool(self.__mpWorkers)
-                    _HX_plusmoins_dX = self.__pool.map( ExecuteFunction, _jobs )
-                    self.__pool.close()
-                    self.__pool.join()
-                    #
-                    _Jacobienne  = []
-                    for i in range( len(_dX) ):
-                        _Jacobienne.append( numpy.ravel( _HX_plusmoins_dX[2*i] - _HX_plusmoins_dX[2*i+1] ) / (2.*_dX[i]) )
-                    #
-                elif self.__mfEnabled:
-                    _xserie = []
-                    for i in range( len(_dX) ):
-                        _dXi            = _dX[i]
-                        _X_plus_dXi     = numpy.array( _X.A1, dtype=float )
-                        _X_plus_dXi[i]  = _X[i] + _dXi
-                        _X_moins_dXi    = numpy.array( _X.A1, dtype=float )
-                        _X_moins_dXi[i] = _X[i] - _dXi
-                        #
-                        _xserie.append( _X_plus_dXi )
-                        _xserie.append( _X_moins_dXi )
-                    #
-                    _HX_plusmoins_dX = self.DirectOperator( _xserie )
-                     #
-                    _Jacobienne  = []
-                    for i in range( len(_dX) ):
-                        _Jacobienne.append( numpy.ravel( _HX_plusmoins_dX[2*i] - _HX_plusmoins_dX[2*i+1] ) / (2.*_dX[i]) )
-                    #
-                else:
-                    _Jacobienne  = []
-                    for i in range( _dX.size ):
-                        _dXi            = _dX[i]
-                        _X_plus_dXi     = numpy.array( _X.A1, dtype=float )
-                        _X_plus_dXi[i]  = _X[i] + _dXi
-                        _X_moins_dXi    = numpy.array( _X.A1, dtype=float )
-                        _X_moins_dXi[i] = _X[i] - _dXi
-                        #
-                        _HX_plus_dXi    = self.DirectOperator( _X_plus_dXi )
-                        _HX_moins_dXi   = self.DirectOperator( _X_moins_dXi )
-                        #
-                        _Jacobienne.append( numpy.ravel( _HX_plus_dXi - _HX_moins_dXi ) / (2.*_dXi) )
-                #
-            else:
-                #
-                if self.__mpEnabled and not self.__mfEnabled:
-                    funcrepr = {
-                        "__userFunction__path" : self.__userFunction__path,
-                        "__userFunction__modl" : self.__userFunction__modl,
-                        "__userFunction__name" : self.__userFunction__name,
-                    }
-                    _jobs = []
-                    _jobs.append( (_X.A1, funcrepr) )
-                    for i in range( len(_dX) ):
-                        _X_plus_dXi    = numpy.array( _X.A1, dtype=float )
-                        _X_plus_dXi[i] = _X[i] + _dX[i]
-                        #
-                        _jobs.append( (_X_plus_dXi, funcrepr) )
-                    #
-                    import multiprocessing
-                    self.__pool = multiprocessing.Pool(self.__mpWorkers)
-                    _HX_plus_dX = self.__pool.map( ExecuteFunction, _jobs )
-                    self.__pool.close()
-                    self.__pool.join()
-                    #
-                    _HX = _HX_plus_dX.pop(0)
-                    #
-                    _Jacobienne = []
-                    for i in range( len(_dX) ):
-                        _Jacobienne.append( numpy.ravel(( _HX_plus_dX[i] - _HX ) / _dX[i]) )
-                    #
-                elif self.__mfEnabled:
-                    _xserie = []
-                    _xserie.append( _X.A1 )
-                    for i in range( len(_dX) ):
-                        _X_plus_dXi    = numpy.array( _X.A1, dtype=float )
-                        _X_plus_dXi[i] = _X[i] + _dX[i]
-                        #
-                        _xserie.append( _X_plus_dXi )
-                    #
-                    _HX_plus_dX = self.DirectOperator( _xserie )
-                    #
-                    _HX = _HX_plus_dX.pop(0)
-                    #
-                    _Jacobienne = []
-                    for i in range( len(_dX) ):
-                        _Jacobienne.append( numpy.ravel(( _HX_plus_dX[i] - _HX ) / _dX[i]) )
-                   #
-                else:
-                    _Jacobienne  = []
-                    _HX = self.DirectOperator( _X )
-                    for i in range( _dX.size ):
-                        _dXi            = _dX[i]
-                        _X_plus_dXi     = numpy.array( _X.A1, dtype=float )
-                        _X_plus_dXi[i]  = _X[i] + _dXi
-                        #
-                        _HX_plus_dXi = self.DirectOperator( _X_plus_dXi )
-                        #
-                        _Jacobienne.append( numpy.ravel(( _HX_plus_dXi - _HX ) / _dXi) )
-                #
-            #
-            _Jacobienne = numpy.asmatrix( numpy.vstack( _Jacobienne ) ).T
-            if self.__avoidRC:
-                if self.__lenghtRJ < 0: self.__lenghtRJ = 2 * _X.size
-                while len(self.__listJPCP) > self.__lenghtRJ:
-                    self.__listJPCP.pop(0)
-                    self.__listJPCI.pop(0)
-                    self.__listJPCR.pop(0)
-                    self.__listJPPN.pop(0)
-                    self.__listJPIN.pop(0)
-                self.__listJPCP.append( copy.copy(_X) )
-                self.__listJPCI.append( copy.copy(_dX) )
-                self.__listJPCR.append( copy.copy(_Jacobienne) )
-                self.__listJPPN.append( numpy.linalg.norm(_X) )
-                self.__listJPIN.append( numpy.linalg.norm(_Jacobienne) )
-        #
-        logging.debug("FDA Fin du calcul de la Jacobienne")
-        #
-        return _Jacobienne
-
-    # ---------------------------------------------------------
-    def TangentOperator(self, paire ):
-        """
-        Calcul du tangent à l'aide de la Jacobienne.
-        """
-        if self.__mfEnabled:
-            assert len(paire) == 1, "Incorrect lenght of arguments"
-            _paire = paire[0]
-            assert len(_paire) == 2, "Incorrect number of arguments"
-        else:
-            assert len(paire) == 2, "Incorrect number of arguments"
-            _paire = paire
-        X, dX = _paire
-        _Jacobienne = self.TangentMatrix( X )
-        if dX is None or len(dX) == 0:
-            #
-            # Calcul de la forme matricielle si le second argument est None
-            # -------------------------------------------------------------
-            if self.__mfEnabled: return [_Jacobienne,]
-            else:                return _Jacobienne
-        else:
-            #
-            # Calcul de la valeur linéarisée de H en X appliqué à dX
-            # ------------------------------------------------------
-            _dX = numpy.asmatrix(numpy.ravel( dX )).T
-            _HtX = numpy.dot(_Jacobienne, _dX)
-            if self.__mfEnabled: return [_HtX.A1,]
-            else:                return _HtX.A1
-
-    # ---------------------------------------------------------
-    def AdjointOperator(self, paire ):
-        """
-        Calcul de l'adjoint à l'aide de la Jacobienne.
-        """
-        if self.__mfEnabled:
-            assert len(paire) == 1, "Incorrect lenght of arguments"
-            _paire = paire[0]
-            assert len(_paire) == 2, "Incorrect number of arguments"
-        else:
-            assert len(paire) == 2, "Incorrect number of arguments"
-            _paire = paire
-        X, Y = _paire
-        _JacobienneT = self.TangentMatrix( X ).T
-        if Y is None or len(Y) == 0:
-            #
-            # Calcul de la forme matricielle si le second argument est None
-            # -------------------------------------------------------------
-            if self.__mfEnabled: return [_JacobienneT,]
-            else:                return _JacobienneT
-        else:
-            #
-            # Calcul de la valeur de l'adjoint en X appliqué à Y
-            # --------------------------------------------------
-            _Y = numpy.asmatrix(numpy.ravel( Y )).T
-            _HaY = numpy.dot(_JacobienneT, _Y)
-            if self.__mfEnabled: return [_HaY.A1,]
-            else:                return _HaY.A1
-
-# ==============================================================================
-if __name__ == "__main__":
-    print('\n AUTODIAGNOSTIC\n')