From: Jean-Philippe ARGAUD Date: Fri, 14 Feb 2020 09:48:01 +0000 (+0100) Subject: Update obsolete numeric objects X-Git-Tag: V9_5_0a1~1 X-Git-Url: http://git.salome-platform.org/gitweb/?a=commitdiff_plain;h=09ffd0bebb63a46c583328b99293b61ed1144379;p=modules%2Fadao.git Update obsolete numeric objects --- diff --git a/src/daComposant/daNumerics/ApproximatedDerivatives.py b/src/daComposant/daNumerics/ApproximatedDerivatives.py deleted file mode 100644 index 2b73055..0000000 --- a/src/daComposant/daNumerics/ApproximatedDerivatives.py +++ /dev/null @@ -1,429 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Copyright (C) 2008-2020 EDF R&D -# -# This library is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License as published by the Free Software Foundation; either -# version 2.1 of the License. -# -# This library is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public -# License along with this library; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -# -# See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com -# -# Author: Jean-Philippe Argaud, jean-philippe.argaud@edf.fr, EDF R&D - -__doc__ = """ - Définit les versions approximées des opérateurs tangents et adjoints. -""" -__author__ = "Jean-Philippe ARGAUD" - -import os, numpy, time, copy, types, sys -import logging -from daCore.BasicObjects import Operator -# logging.getLogger().setLevel(logging.DEBUG) - -# ============================================================================== -def ExecuteFunction( paire ): - assert len(paire) == 2, "Incorrect number of arguments" - X, funcrepr = paire - __X = numpy.asmatrix(numpy.ravel( X )).T - __sys_path_tmp = sys.path ; sys.path.insert(0,funcrepr["__userFunction__path"]) - __module = __import__(funcrepr["__userFunction__modl"], globals(), locals(), []) - __fonction = getattr(__module,funcrepr["__userFunction__name"]) - sys.path = __sys_path_tmp ; del __sys_path_tmp - __HX = __fonction( __X ) - return numpy.ravel( __HX ) - -# ============================================================================== -class FDApproximation(object): - """ - Cette classe sert d'interface pour définir les opérateurs approximés. A la - création d'un objet, en fournissant une fonction "Function", on obtient un - objet qui dispose de 3 méthodes "DirectOperator", "TangentOperator" et - "AdjointOperator". On contrôle l'approximation DF avec l'incrément - multiplicatif "increment" valant par défaut 1%, ou avec l'incrément fixe - "dX" qui sera multiplié par "increment" (donc en %), et on effectue de DF - centrées si le booléen "centeredDF" est vrai. - """ - def __init__(self, - Function = None, - centeredDF = False, - increment = 0.01, - dX = None, - avoidingRedundancy = True, - toleranceInRedundancy = 1.e-18, - lenghtOfRedundancy = -1, - mpEnabled = False, - mpWorkers = None, - mfEnabled = False, - ): - if mpEnabled: - try: - import multiprocessing - self.__mpEnabled = True - except ImportError: - self.__mpEnabled = False - else: - self.__mpEnabled = False - self.__mpWorkers = mpWorkers - if self.__mpWorkers is not None and self.__mpWorkers < 1: - self.__mpWorkers = None - logging.debug("FDA Calculs en multiprocessing : %s (nombre de processus : %s)"%(self.__mpEnabled,self.__mpWorkers)) - # - if mfEnabled: - self.__mfEnabled = True - else: - self.__mfEnabled = False - logging.debug("FDA Calculs en multifonctions : %s"%(self.__mfEnabled,)) - # - if avoidingRedundancy: - self.__avoidRC = True - self.__tolerBP = float(toleranceInRedundancy) - self.__lenghtRJ = int(lenghtOfRedundancy) - self.__listJPCP = [] # Jacobian Previous Calculated Points - self.__listJPCI = [] # Jacobian Previous Calculated Increment - self.__listJPCR = [] # Jacobian Previous Calculated Results - self.__listJPPN = [] # Jacobian Previous Calculated Point Norms - self.__listJPIN = [] # Jacobian Previous Calculated Increment Norms - else: - self.__avoidRC = False - # - if self.__mpEnabled: - if isinstance(Function,types.FunctionType): - logging.debug("FDA Calculs en multiprocessing : FunctionType") - self.__userFunction__name = Function.__name__ - try: - mod = os.path.join(Function.__globals__['filepath'],Function.__globals__['filename']) - except: - mod = os.path.abspath(Function.__globals__['__file__']) - if not os.path.isfile(mod): - raise ImportError("No user defined function or method found with the name %s"%(mod,)) - self.__userFunction__modl = os.path.basename(mod).replace('.pyc','').replace('.pyo','').replace('.py','') - self.__userFunction__path = os.path.dirname(mod) - del mod - self.__userOperator = Operator( fromMethod = Function, avoidingRedundancy = self.__avoidRC, inputAsMultiFunction = self.__mfEnabled ) - self.__userFunction = self.__userOperator.appliedTo # Pour le calcul Direct - elif isinstance(Function,types.MethodType): - logging.debug("FDA Calculs en multiprocessing : MethodType") - self.__userFunction__name = Function.__name__ - try: - mod = os.path.join(Function.__globals__['filepath'],Function.__globals__['filename']) - except: - mod = os.path.abspath(Function.__func__.__globals__['__file__']) - if not os.path.isfile(mod): - raise ImportError("No user defined function or method found with the name %s"%(mod,)) - self.__userFunction__modl = os.path.basename(mod).replace('.pyc','').replace('.pyo','').replace('.py','') - self.__userFunction__path = os.path.dirname(mod) - del mod - self.__userOperator = Operator( fromMethod = Function, avoidingRedundancy = self.__avoidRC, inputAsMultiFunction = self.__mfEnabled ) - self.__userFunction = self.__userOperator.appliedTo # Pour le calcul Direct - else: - raise TypeError("User defined function or method has to be provided for finite differences approximation.") - else: - self.__userOperator = Operator( fromMethod = Function, avoidingRedundancy = self.__avoidRC, inputAsMultiFunction = self.__mfEnabled ) - self.__userFunction = self.__userOperator.appliedTo - # - self.__centeredDF = bool(centeredDF) - if abs(float(increment)) > 1.e-15: - self.__increment = float(increment) - else: - self.__increment = 0.01 - if dX is None: - self.__dX = None - else: - self.__dX = numpy.asmatrix(numpy.ravel( dX )).T - logging.debug("FDA Reduction des doublons de calcul : %s"%self.__avoidRC) - if self.__avoidRC: - logging.debug("FDA Tolerance de determination des doublons : %.2e"%self.__tolerBP) - - # --------------------------------------------------------- - def __doublon__(self, e, l, n, v=None): - __ac, __iac = False, -1 - for i in range(len(l)-1,-1,-1): - if numpy.linalg.norm(e - l[i]) < self.__tolerBP * n[i]: - __ac, __iac = True, i - if v is not None: logging.debug("FDA Cas%s déja calculé, récupération du doublon %i"%(v,__iac)) - break - return __ac, __iac - - # --------------------------------------------------------- - def DirectOperator(self, X ): - """ - Calcul du direct à l'aide de la fonction fournie. - """ - logging.debug("FDA Calcul DirectOperator (explicite)") - if self.__mfEnabled: - _HX = self.__userFunction( X, argsAsSerie = True ) - else: - _X = numpy.asmatrix(numpy.ravel( X )).T - _HX = numpy.ravel(self.__userFunction( _X )) - # - return _HX - - # --------------------------------------------------------- - def TangentMatrix(self, X ): - """ - Calcul de l'opérateur tangent comme la Jacobienne par différences finies, - c'est-à-dire le gradient de H en X. On utilise des différences finies - directionnelles autour du point X. X est un numpy.matrix. - - Différences finies centrées (approximation d'ordre 2): - 1/ Pour chaque composante i de X, on ajoute et on enlève la perturbation - dX[i] à la composante X[i], pour composer X_plus_dXi et X_moins_dXi, et - on calcule les réponses HX_plus_dXi = H( X_plus_dXi ) et HX_moins_dXi = - H( X_moins_dXi ) - 2/ On effectue les différences (HX_plus_dXi-HX_moins_dXi) et on divise par - le pas 2*dXi - 3/ Chaque résultat, par composante, devient une colonne de la Jacobienne - - Différences finies non centrées (approximation d'ordre 1): - 1/ Pour chaque composante i de X, on ajoute la perturbation dX[i] à la - composante X[i] pour composer X_plus_dXi, et on calcule la réponse - HX_plus_dXi = H( X_plus_dXi ) - 2/ On calcule la valeur centrale HX = H(X) - 3/ On effectue les différences (HX_plus_dXi-HX) et on divise par - le pas dXi - 4/ Chaque résultat, par composante, devient une colonne de la Jacobienne - - """ - logging.debug("FDA Début du calcul de la Jacobienne") - logging.debug("FDA Incrément de............: %s*X"%float(self.__increment)) - logging.debug("FDA Approximation centrée...: %s"%(self.__centeredDF)) - # - if X is None or len(X)==0: - raise ValueError("Nominal point X for approximate derivatives can not be None or void (X=%s)."%(str(X),)) - # - _X = numpy.asmatrix(numpy.ravel( X )).T - # - if self.__dX is None: - _dX = self.__increment * _X - else: - _dX = numpy.asmatrix(numpy.ravel( self.__dX )).T - # - if (_dX == 0.).any(): - moyenne = _dX.mean() - if moyenne == 0.: - _dX = numpy.where( _dX == 0., float(self.__increment), _dX ) - else: - _dX = numpy.where( _dX == 0., moyenne, _dX ) - # - __alreadyCalculated = False - if self.__avoidRC: - __bidon, __alreadyCalculatedP = self.__doublon__(_X, self.__listJPCP, self.__listJPPN, None) - __bidon, __alreadyCalculatedI = self.__doublon__(_dX, self.__listJPCI, self.__listJPIN, None) - if __alreadyCalculatedP == __alreadyCalculatedI > -1: - __alreadyCalculated, __i = True, __alreadyCalculatedP - logging.debug("FDA Cas J déja calculé, récupération du doublon %i"%__i) - # - if __alreadyCalculated: - logging.debug("FDA Calcul Jacobienne (par récupération du doublon %i)"%__i) - _Jacobienne = self.__listJPCR[__i] - else: - logging.debug("FDA Calcul Jacobienne (explicite)") - if self.__centeredDF: - # - if self.__mpEnabled and not self.__mfEnabled: - funcrepr = { - "__userFunction__path" : self.__userFunction__path, - "__userFunction__modl" : self.__userFunction__modl, - "__userFunction__name" : self.__userFunction__name, - } - _jobs = [] - for i in range( len(_dX) ): - _dXi = _dX[i] - _X_plus_dXi = numpy.array( _X.A1, dtype=float ) - _X_plus_dXi[i] = _X[i] + _dXi - _X_moins_dXi = numpy.array( _X.A1, dtype=float ) - _X_moins_dXi[i] = _X[i] - _dXi - # - _jobs.append( (_X_plus_dXi, funcrepr) ) - _jobs.append( (_X_moins_dXi, funcrepr) ) - # - import multiprocessing - self.__pool = multiprocessing.Pool(self.__mpWorkers) - _HX_plusmoins_dX = self.__pool.map( ExecuteFunction, _jobs ) - self.__pool.close() - self.__pool.join() - # - _Jacobienne = [] - for i in range( len(_dX) ): - _Jacobienne.append( numpy.ravel( _HX_plusmoins_dX[2*i] - _HX_plusmoins_dX[2*i+1] ) / (2.*_dX[i]) ) - # - elif self.__mfEnabled: - _xserie = [] - for i in range( len(_dX) ): - _dXi = _dX[i] - _X_plus_dXi = numpy.array( _X.A1, dtype=float ) - _X_plus_dXi[i] = _X[i] + _dXi - _X_moins_dXi = numpy.array( _X.A1, dtype=float ) - _X_moins_dXi[i] = _X[i] - _dXi - # - _xserie.append( _X_plus_dXi ) - _xserie.append( _X_moins_dXi ) - # - _HX_plusmoins_dX = self.DirectOperator( _xserie ) - # - _Jacobienne = [] - for i in range( len(_dX) ): - _Jacobienne.append( numpy.ravel( _HX_plusmoins_dX[2*i] - _HX_plusmoins_dX[2*i+1] ) / (2.*_dX[i]) ) - # - else: - _Jacobienne = [] - for i in range( _dX.size ): - _dXi = _dX[i] - _X_plus_dXi = numpy.array( _X.A1, dtype=float ) - _X_plus_dXi[i] = _X[i] + _dXi - _X_moins_dXi = numpy.array( _X.A1, dtype=float ) - _X_moins_dXi[i] = _X[i] - _dXi - # - _HX_plus_dXi = self.DirectOperator( _X_plus_dXi ) - _HX_moins_dXi = self.DirectOperator( _X_moins_dXi ) - # - _Jacobienne.append( numpy.ravel( _HX_plus_dXi - _HX_moins_dXi ) / (2.*_dXi) ) - # - else: - # - if self.__mpEnabled and not self.__mfEnabled: - funcrepr = { - "__userFunction__path" : self.__userFunction__path, - "__userFunction__modl" : self.__userFunction__modl, - "__userFunction__name" : self.__userFunction__name, - } - _jobs = [] - _jobs.append( (_X.A1, funcrepr) ) - for i in range( len(_dX) ): - _X_plus_dXi = numpy.array( _X.A1, dtype=float ) - _X_plus_dXi[i] = _X[i] + _dX[i] - # - _jobs.append( (_X_plus_dXi, funcrepr) ) - # - import multiprocessing - self.__pool = multiprocessing.Pool(self.__mpWorkers) - _HX_plus_dX = self.__pool.map( ExecuteFunction, _jobs ) - self.__pool.close() - self.__pool.join() - # - _HX = _HX_plus_dX.pop(0) - # - _Jacobienne = [] - for i in range( len(_dX) ): - _Jacobienne.append( numpy.ravel(( _HX_plus_dX[i] - _HX ) / _dX[i]) ) - # - elif self.__mfEnabled: - _xserie = [] - _xserie.append( _X.A1 ) - for i in range( len(_dX) ): - _X_plus_dXi = numpy.array( _X.A1, dtype=float ) - _X_plus_dXi[i] = _X[i] + _dX[i] - # - _xserie.append( _X_plus_dXi ) - # - _HX_plus_dX = self.DirectOperator( _xserie ) - # - _HX = _HX_plus_dX.pop(0) - # - _Jacobienne = [] - for i in range( len(_dX) ): - _Jacobienne.append( numpy.ravel(( _HX_plus_dX[i] - _HX ) / _dX[i]) ) - # - else: - _Jacobienne = [] - _HX = self.DirectOperator( _X ) - for i in range( _dX.size ): - _dXi = _dX[i] - _X_plus_dXi = numpy.array( _X.A1, dtype=float ) - _X_plus_dXi[i] = _X[i] + _dXi - # - _HX_plus_dXi = self.DirectOperator( _X_plus_dXi ) - # - _Jacobienne.append( numpy.ravel(( _HX_plus_dXi - _HX ) / _dXi) ) - # - # - _Jacobienne = numpy.asmatrix( numpy.vstack( _Jacobienne ) ).T - if self.__avoidRC: - if self.__lenghtRJ < 0: self.__lenghtRJ = 2 * _X.size - while len(self.__listJPCP) > self.__lenghtRJ: - self.__listJPCP.pop(0) - self.__listJPCI.pop(0) - self.__listJPCR.pop(0) - self.__listJPPN.pop(0) - self.__listJPIN.pop(0) - self.__listJPCP.append( copy.copy(_X) ) - self.__listJPCI.append( copy.copy(_dX) ) - self.__listJPCR.append( copy.copy(_Jacobienne) ) - self.__listJPPN.append( numpy.linalg.norm(_X) ) - self.__listJPIN.append( numpy.linalg.norm(_Jacobienne) ) - # - logging.debug("FDA Fin du calcul de la Jacobienne") - # - return _Jacobienne - - # --------------------------------------------------------- - def TangentOperator(self, paire ): - """ - Calcul du tangent à l'aide de la Jacobienne. - """ - if self.__mfEnabled: - assert len(paire) == 1, "Incorrect lenght of arguments" - _paire = paire[0] - assert len(_paire) == 2, "Incorrect number of arguments" - else: - assert len(paire) == 2, "Incorrect number of arguments" - _paire = paire - X, dX = _paire - _Jacobienne = self.TangentMatrix( X ) - if dX is None or len(dX) == 0: - # - # Calcul de la forme matricielle si le second argument est None - # ------------------------------------------------------------- - if self.__mfEnabled: return [_Jacobienne,] - else: return _Jacobienne - else: - # - # Calcul de la valeur linéarisée de H en X appliqué à dX - # ------------------------------------------------------ - _dX = numpy.asmatrix(numpy.ravel( dX )).T - _HtX = numpy.dot(_Jacobienne, _dX) - if self.__mfEnabled: return [_HtX.A1,] - else: return _HtX.A1 - - # --------------------------------------------------------- - def AdjointOperator(self, paire ): - """ - Calcul de l'adjoint à l'aide de la Jacobienne. - """ - if self.__mfEnabled: - assert len(paire) == 1, "Incorrect lenght of arguments" - _paire = paire[0] - assert len(_paire) == 2, "Incorrect number of arguments" - else: - assert len(paire) == 2, "Incorrect number of arguments" - _paire = paire - X, Y = _paire - _JacobienneT = self.TangentMatrix( X ).T - if Y is None or len(Y) == 0: - # - # Calcul de la forme matricielle si le second argument est None - # ------------------------------------------------------------- - if self.__mfEnabled: return [_JacobienneT,] - else: return _JacobienneT - else: - # - # Calcul de la valeur de l'adjoint en X appliqué à Y - # -------------------------------------------------- - _Y = numpy.asmatrix(numpy.ravel( Y )).T - _HaY = numpy.dot(_JacobienneT, _Y) - if self.__mfEnabled: return [_HaY.A1,] - else: return _HaY.A1 - -# ============================================================================== -if __name__ == "__main__": - print('\n AUTODIAGNOSTIC\n')