1 # -*- coding: utf-8 -*-
3 # Copyright (C) 2008-2017 EDF R&D
5 # This library is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation; either
8 # version 2.1 of the License.
10 # This library is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 # Lesser General Public License for more details.
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this library; if not, write to the Free Software
17 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 # Author: Jean-Philippe Argaud, jean-philippe.argaud@edf.fr, EDF R&D
24 Définit les versions approximées des opérateurs tangents et adjoints.
26 __author__ = "Jean-Philippe ARGAUD"
28 import os, numpy, time, copy, types, sys
30 from daCore.BasicObjects import Operator
31 # logging.getLogger().setLevel(logging.DEBUG)
33 # ==============================================================================
34 def ExecuteFunction( paire ):
35 assert len(paire) == 2, "Incorrect number of arguments"
37 __X = numpy.asmatrix(numpy.ravel( X )).T
38 __sys_path_tmp = sys.path ; sys.path.insert(0,funcrepr["__userFunction__path"])
39 __module = __import__(funcrepr["__userFunction__modl"], globals(), locals(), [])
40 __fonction = getattr(__module,funcrepr["__userFunction__name"])
41 sys.path = __sys_path_tmp ; del __sys_path_tmp
42 __HX = __fonction( __X )
43 return numpy.ravel( __HX )
45 # ==============================================================================
46 class FDApproximation(object):
48 Cette classe sert d'interface pour définir les opérateurs approximés. A la
49 création d'un objet, en fournissant une fonction "Function", on obtient un
50 objet qui dispose de 3 méthodes "DirectOperator", "TangentOperator" et
51 "AdjointOperator". On contrôle l'approximation DF avec l'incrément
52 multiplicatif "increment" valant par défaut 1%, ou avec l'incrément fixe
53 "dX" qui sera multiplié par "increment" (donc en %), et on effectue de DF
54 centrées si le booléen "centeredDF" est vrai.
61 avoidingRedundancy = True,
62 toleranceInRedundancy = 1.e-18,
63 lenghtOfRedundancy = -1,
69 import multiprocessing
70 self.__mpEnabled = True
72 self.__mpEnabled = False
74 self.__mpEnabled = False
75 self.__mpWorkers = mpWorkers
76 if self.__mpWorkers is not None and self.__mpWorkers < 1:
77 self.__mpWorkers = None
78 logging.debug("FDA Calculs en multiprocessing : %s (nombre de processus : %s)"%(self.__mpEnabled,self.__mpWorkers))
81 if isinstance(Function,types.FunctionType):
82 logging.debug("FDA Calculs en multiprocessing : FunctionType")
83 self.__userFunction__name = Function.__name__
85 mod = os.path.join(Function.__globals__['filepath'],Function.__globals__['filename'])
87 mod = os.path.abspath(Function.__globals__['__file__'])
88 if not os.path.isfile(mod):
89 raise ImportError("No user defined function or method found with the name %s"%(mod,))
90 self.__userFunction__modl = os.path.basename(mod).replace('.pyc','').replace('.pyo','').replace('.py','')
91 self.__userFunction__path = os.path.dirname(mod)
93 self.__userOperator = Operator( fromMethod = Function )
94 self.__userFunction = self.__userOperator.appliedTo # Pour le calcul Direct
95 elif isinstance(Function,types.MethodType):
96 logging.debug("FDA Calculs en multiprocessing : MethodType")
97 self.__userFunction__name = Function.__name__
99 mod = os.path.join(Function.__globals__['filepath'],Function.__globals__['filename'])
101 mod = os.path.abspath(Function.__func__.__globals__['__file__'])
102 if not os.path.isfile(mod):
103 raise ImportError("No user defined function or method found with the name %s"%(mod,))
104 self.__userFunction__modl = os.path.basename(mod).replace('.pyc','').replace('.pyo','').replace('.py','')
105 self.__userFunction__path = os.path.dirname(mod)
107 self.__userOperator = Operator( fromMethod = Function )
108 self.__userFunction = self.__userOperator.appliedTo # Pour le calcul Direct
110 raise TypeError("User defined function or method has to be provided for finite differences approximation.")
112 self.__userOperator = Operator( fromMethod = Function )
113 self.__userFunction = self.__userOperator.appliedTo
115 self.__centeredDF = bool(centeredDF)
116 if avoidingRedundancy:
117 self.__avoidRC = True
118 self.__tolerBP = float(toleranceInRedundancy)
119 self.__lenghtRJ = int(lenghtOfRedundancy)
120 self.__listJPCP = [] # Jacobian Previous Calculated Points
121 self.__listJPCI = [] # Jacobian Previous Calculated Increment
122 self.__listJPCR = [] # Jacobian Previous Calculated Results
123 self.__listJPPN = [] # Jacobian Previous Calculated Point Norms
124 self.__listJPIN = [] # Jacobian Previous Calculated Increment Norms
126 self.__avoidRC = False
127 if abs(float(increment)) > 1.e-15:
128 self.__increment = float(increment)
130 self.__increment = 0.01
134 self.__dX = numpy.asmatrix(numpy.ravel( dX )).T
135 logging.debug("FDA Reduction des doublons de calcul : %s"%self.__avoidRC)
137 logging.debug("FDA Tolerance de determination des doublons : %.2e"%self.__tolerBP)
139 # ---------------------------------------------------------
140 def __doublon__(self, e, l, n, v=None):
141 __ac, __iac = False, -1
142 for i in range(len(l)-1,-1,-1):
143 if numpy.linalg.norm(e - l[i]) < self.__tolerBP * n[i]:
144 __ac, __iac = True, i
145 if v is not None: logging.debug("FDA Cas%s déja calculé, récupération du doublon %i"%(v,__iac))
149 # ---------------------------------------------------------
150 def DirectOperator(self, X ):
152 Calcul du direct à l'aide de la fonction fournie.
154 logging.debug("FDA Calcul DirectOperator (explicite)")
155 _X = numpy.asmatrix(numpy.ravel( X )).T
156 _HX = numpy.ravel(self.__userFunction( _X ))
160 # ---------------------------------------------------------
161 def TangentMatrix(self, X ):
163 Calcul de l'opérateur tangent comme la Jacobienne par différences finies,
164 c'est-à-dire le gradient de H en X. On utilise des différences finies
165 directionnelles autour du point X. X est un numpy.matrix.
167 Différences finies centrées (approximation d'ordre 2):
168 1/ Pour chaque composante i de X, on ajoute et on enlève la perturbation
169 dX[i] à la composante X[i], pour composer X_plus_dXi et X_moins_dXi, et
170 on calcule les réponses HX_plus_dXi = H( X_plus_dXi ) et HX_moins_dXi =
172 2/ On effectue les différences (HX_plus_dXi-HX_moins_dXi) et on divise par
174 3/ Chaque résultat, par composante, devient une colonne de la Jacobienne
176 Différences finies non centrées (approximation d'ordre 1):
177 1/ Pour chaque composante i de X, on ajoute la perturbation dX[i] à la
178 composante X[i] pour composer X_plus_dXi, et on calcule la réponse
179 HX_plus_dXi = H( X_plus_dXi )
180 2/ On calcule la valeur centrale HX = H(X)
181 3/ On effectue les différences (HX_plus_dXi-HX) et on divise par
183 4/ Chaque résultat, par composante, devient une colonne de la Jacobienne
186 logging.debug("FDA Calcul de la Jacobienne")
187 logging.debug("FDA Incrément de............: %s*X"%float(self.__increment))
188 logging.debug("FDA Approximation centrée...: %s"%(self.__centeredDF))
190 if X is None or len(X)==0:
191 raise ValueError("Nominal point X for approximate derivatives can not be None or void (X=%s)."%(str(X),))
193 _X = numpy.asmatrix(numpy.ravel( X )).T
195 if self.__dX is None:
196 _dX = self.__increment * _X
198 _dX = numpy.asmatrix(numpy.ravel( self.__dX )).T
200 if (_dX == 0.).any():
203 _dX = numpy.where( _dX == 0., float(self.__increment), _dX )
205 _dX = numpy.where( _dX == 0., moyenne, _dX )
207 __alreadyCalculated = False
209 __bidon, __alreadyCalculatedP = self.__doublon__(_X, self.__listJPCP, self.__listJPPN, None)
210 __bidon, __alreadyCalculatedI = self.__doublon__(_dX, self.__listJPCI, self.__listJPIN, None)
211 if __alreadyCalculatedP == __alreadyCalculatedI > -1:
212 __alreadyCalculated, __i = True, __alreadyCalculatedP
213 logging.debug("FDA Cas J déja calculé, récupération du doublon %i"%__i)
215 if __alreadyCalculated:
216 logging.debug("FDA Calcul Jacobienne (par récupération du doublon %i)"%__i)
217 _Jacobienne = self.__listJPCR[__i]
219 logging.debug("FDA Calcul Jacobienne (explicite)")
220 if self.__centeredDF:
224 "__userFunction__path" : self.__userFunction__path,
225 "__userFunction__modl" : self.__userFunction__modl,
226 "__userFunction__name" : self.__userFunction__name,
229 for i in range( len(_dX) ):
231 _X_plus_dXi = numpy.array( _X.A1, dtype=float )
232 _X_plus_dXi[i] = _X[i] + _dXi
233 _X_moins_dXi = numpy.array( _X.A1, dtype=float )
234 _X_moins_dXi[i] = _X[i] - _dXi
236 _jobs.append( (_X_plus_dXi, funcrepr) )
237 _jobs.append( (_X_moins_dXi, funcrepr) )
239 import multiprocessing
240 self.__pool = multiprocessing.Pool(self.__mpWorkers)
241 _HX_plusmoins_dX = self.__pool.map( ExecuteFunction, _jobs )
246 for i in range( len(_dX) ):
247 _Jacobienne.append( numpy.ravel( _HX_plusmoins_dX[2*i] - _HX_plusmoins_dX[2*i+1] ) / (2.*_dX[i]) )
250 for i in range( _dX.size ):
252 _X_plus_dXi = numpy.array( _X.A1, dtype=float )
253 _X_plus_dXi[i] = _X[i] + _dXi
254 _X_moins_dXi = numpy.array( _X.A1, dtype=float )
255 _X_moins_dXi[i] = _X[i] - _dXi
257 _HX_plus_dXi = self.DirectOperator( _X_plus_dXi )
258 _HX_moins_dXi = self.DirectOperator( _X_moins_dXi )
260 _Jacobienne.append( numpy.ravel( _HX_plus_dXi - _HX_moins_dXi ) / (2.*_dXi) )
267 "__userFunction__path" : self.__userFunction__path,
268 "__userFunction__modl" : self.__userFunction__modl,
269 "__userFunction__name" : self.__userFunction__name,
272 _jobs.append( (_X.A1, funcrepr) )
273 for i in range( len(_dX) ):
274 _X_plus_dXi = numpy.array( _X.A1, dtype=float )
275 _X_plus_dXi[i] = _X[i] + _dX[i]
277 _jobs.append( (_X_plus_dXi, funcrepr) )
279 import multiprocessing
280 self.__pool = multiprocessing.Pool(self.__mpWorkers)
281 _HX_plus_dX = self.__pool.map( ExecuteFunction, _jobs )
285 _HX = _HX_plus_dX.pop(0)
288 for i in range( len(_dX) ):
289 _Jacobienne.append( numpy.ravel(( _HX_plus_dX[i] - _HX ) / _dX[i]) )
292 _HX = self.DirectOperator( _X )
293 for i in range( _dX.size ):
295 _X_plus_dXi = numpy.array( _X.A1, dtype=float )
296 _X_plus_dXi[i] = _X[i] + _dXi
298 _HX_plus_dXi = self.DirectOperator( _X_plus_dXi )
300 _Jacobienne.append( numpy.ravel(( _HX_plus_dXi - _HX ) / _dXi) )
303 _Jacobienne = numpy.matrix( numpy.vstack( _Jacobienne ) ).T
305 if self.__lenghtRJ < 0: self.__lenghtRJ = 2 * _X.size
306 while len(self.__listJPCP) > self.__lenghtRJ:
307 self.__listJPCP.pop(0)
308 self.__listJPCI.pop(0)
309 self.__listJPCR.pop(0)
310 self.__listJPPN.pop(0)
311 self.__listJPIN.pop(0)
312 self.__listJPCP.append( copy.copy(_X) )
313 self.__listJPCI.append( copy.copy(_dX) )
314 self.__listJPCR.append( copy.copy(_Jacobienne) )
315 self.__listJPPN.append( numpy.linalg.norm(_X) )
316 self.__listJPIN.append( numpy.linalg.norm(_Jacobienne) )
318 logging.debug("FDA Fin du calcul de la Jacobienne")
322 # ---------------------------------------------------------
323 def TangentOperator(self, paire ):
325 Calcul du tangent à l'aide de la Jacobienne.
327 assert len(paire) == 2, "Incorrect number of arguments"
329 _Jacobienne = self.TangentMatrix( X )
330 if dX is None or len(dX) == 0:
332 # Calcul de la forme matricielle si le second argument est None
333 # -------------------------------------------------------------
337 # Calcul de la valeur linéarisée de H en X appliqué à dX
338 # ------------------------------------------------------
339 _dX = numpy.asmatrix(numpy.ravel( dX )).T
340 _HtX = numpy.dot(_Jacobienne, _dX)
343 # ---------------------------------------------------------
344 def AdjointOperator(self, paire ):
346 Calcul de l'adjoint à l'aide de la Jacobienne.
348 assert len(paire) == 2, "Incorrect number of arguments"
350 _JacobienneT = self.TangentMatrix( X ).T
351 if Y is None or len(Y) == 0:
353 # Calcul de la forme matricielle si le second argument est None
354 # -------------------------------------------------------------
358 # Calcul de la valeur de l'adjoint en X appliqué à Y
359 # --------------------------------------------------
360 _Y = numpy.asmatrix(numpy.ravel( Y )).T
361 _HaY = numpy.dot(_JacobienneT, _Y)
364 # ==============================================================================
365 if __name__ == "__main__":
366 print('\n AUTODIAGNOSTIC \n')