From 09d35dbe117ef8eb51016ab537ca1192539aead0 Mon Sep 17 00:00:00 2001 From: Jean-Philippe ARGAUD Date: Sat, 25 May 2019 20:47:50 +0200 Subject: [PATCH] Adding ParallelFunctionTest algorithm and improve parallel modes --- .../daAlgorithms/ParallelFunctionTest.py | 187 ++++++++++++++++++ src/daComposant/daCore/BasicObjects.py | 119 +++++++---- .../daYacsSchemaCreator/infos_daComposant.py | 5 + 3 files changed, 274 insertions(+), 37 deletions(-) create mode 100644 src/daComposant/daAlgorithms/ParallelFunctionTest.py diff --git a/src/daComposant/daAlgorithms/ParallelFunctionTest.py b/src/daComposant/daAlgorithms/ParallelFunctionTest.py new file mode 100644 index 0000000..30f0047 --- /dev/null +++ b/src/daComposant/daAlgorithms/ParallelFunctionTest.py @@ -0,0 +1,187 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2008-2019 EDF R&D +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com +# +# Author: Jean-Philippe Argaud, jean-philippe.argaud@edf.fr, EDF R&D + +import sys, logging +from daCore import BasicObjects, PlatformInfo +import numpy, copy +mpr = PlatformInfo.PlatformInfo().MachinePrecision() +mfp = PlatformInfo.PlatformInfo().MaximumPrecision() +if sys.version_info.major > 2: + unicode = str + +# ============================================================================== +class ElementaryAlgorithm(BasicObjects.Algorithm): + def __init__(self): + BasicObjects.Algorithm.__init__(self, "PARALLELFUNCTIONTEST") + self.defineRequiredParameter( + name = "NumberOfPrintedDigits", + default = 5, + typecast = int, + message = "Nombre de chiffres affichés pour les impressions de réels", + minval = 0, + ) + self.defineRequiredParameter( + name = "NumberOfRepetition", + default = 1, + typecast = int, + message = "Nombre de fois où l'exécution de la fonction est répétée", + minval = 1, + ) + self.defineRequiredParameter( + name = "ResultTitle", + default = "", + typecast = str, + message = "Titre du tableau et de la figure", + ) + self.defineRequiredParameter( + name = "SetDebug", + default = False, + typecast = bool, + message = "Activation du mode debug lors de l'exécution", + ) + self.defineRequiredParameter( + name = "StoreSupplementaryCalculations", + default = [], + typecast = tuple, + message = "Liste de calculs supplémentaires à stocker et/ou effectuer", + listval = [ + "CurrentState", + "SimulatedObservationAtCurrentState", + ] + ) + self.requireInputArguments( + mandatory= ("Xb", "HO"), + ) + + def run(self, Xb=None, Y=None, U=None, HO=None, EM=None, CM=None, R=None, B=None, Q=None, Parameters=None): + self._pre_run(Parameters, Xb, Y, R, B, Q) + # + Hm = HO["Direct"].appliedTo + # + Xn = copy.copy( Xb ) + # + # ---------- + __marge = 5*u" " + _p = self._parameters["NumberOfPrintedDigits"] + if len(self._parameters["ResultTitle"]) > 0: + __rt = unicode(self._parameters["ResultTitle"]) + msgs = u"\n" + msgs += __marge + "====" + "="*len(__rt) + "====\n" + msgs += __marge + " " + __rt + "\n" + msgs += __marge + "====" + "="*len(__rt) + "====\n" + print("%s"%msgs) + # + msgs = ("===> Information before launching:\n") + msgs += (" -----------------------------\n") + msgs += (" Characteristics of input vector X, internally converted:\n") + msgs += (" Type...............: %s\n")%type( Xn ) + msgs += (" Lenght of vector...: %i\n")%max(numpy.matrix( Xn ).shape) + msgs += (" Minimum value......: %."+str(_p)+"e\n")%numpy.min( Xn ) + msgs += (" Maximum value......: %."+str(_p)+"e\n")%numpy.max( Xn ) + msgs += (" Mean of vector.....: %."+str(_p)+"e\n")%numpy.mean( Xn, dtype=mfp ) + msgs += (" Standard error.....: %."+str(_p)+"e\n")%numpy.std( Xn, dtype=mfp ) + msgs += (" L2 norm of vector..: %."+str(_p)+"e\n")%numpy.linalg.norm( Xn ) + print(msgs) + # + print(" %s\n"%("-"*75,)) + if self._parameters["SetDebug"]: + CUR_LEVEL = logging.getLogger().getEffectiveLevel() + logging.getLogger().setLevel(logging.DEBUG) + print("===> Beginning of evaluation, activating debug\n") + else: + print("===> Beginning of evaluation, without activating debug\n") + # + Xs = [] + Ys = [] + for i in range(self._parameters["NumberOfRepetition"]): + if self._toStore("CurrentState"): + self.StoredVariables["CurrentState"].store( numpy.ravel(Xn) ) + Xs.append( Xn ) + # + # ---------- + HO["Direct"].disableAvoidingRedundancy() + # ---------- + Ys = Hm( Xs, argsAsSerie = True ) + # ---------- + HO["Direct"].enableAvoidingRedundancy() + # ---------- + # + print() + if self._parameters["SetDebug"]: + print("===> End of evaluation, deactivating debug\n") + logging.getLogger().setLevel(CUR_LEVEL) + else: + print("===> End of evaluation, without deactivating debug\n") + # + for i in range(self._parameters["NumberOfRepetition"]): + print(" %s\n"%("-"*75,)) + if self._parameters["NumberOfRepetition"] > 1: + print("===> Repetition step number %i on a total of %i\n"%(i+1,self._parameters["NumberOfRepetition"])) + # + Yn = Ys[i] + msgs = ("===> Information after evaluation:\n") + msgs += ("\n Characteristics of simulated output vector Y=H(X), to compare to others:\n") + msgs += (" Type...............: %s\n")%type( Yn ) + msgs += (" Lenght of vector...: %i\n")%max(numpy.matrix( Yn ).shape) + msgs += (" Minimum value......: %."+str(_p)+"e\n")%numpy.min( Yn ) + msgs += (" Maximum value......: %."+str(_p)+"e\n")%numpy.max( Yn ) + msgs += (" Mean of vector.....: %."+str(_p)+"e\n")%numpy.mean( Yn, dtype=mfp ) + msgs += (" Standard error.....: %."+str(_p)+"e\n")%numpy.std( Yn, dtype=mfp ) + msgs += (" L2 norm of vector..: %."+str(_p)+"e\n")%numpy.linalg.norm( Yn ) + print(msgs) + if self._toStore("SimulatedObservationAtCurrentState"): + self.StoredVariables["SimulatedObservationAtCurrentState"].store( numpy.ravel(Yn) ) + # + if self._parameters["NumberOfRepetition"] > 1: + msgs = (" %s\n"%("-"*75,)) + msgs += ("\n===> Statistical analysis of the outputs obtained through parallel repeated evaluations\n") + msgs += ("\n (Remark: numbers that are (about) under %.0e represent 0 to machine precision)\n"%mpr) + Yy = numpy.array( Ys ) + msgs += ("\n Characteristics of the whole set of outputs Y:\n") + msgs += (" Number of evaluations.........................: %i\n")%len( Ys ) + msgs += (" Minimum value of the whole set of outputs.....: %."+str(_p)+"e\n")%numpy.min( Yy ) + msgs += (" Maximum value of the whole set of outputs.....: %."+str(_p)+"e\n")%numpy.max( Yy ) + msgs += (" Mean of vector of the whole set of outputs....: %."+str(_p)+"e\n")%numpy.mean( Yy, dtype=mfp ) + msgs += (" Standard error of the whole set of outputs....: %."+str(_p)+"e\n")%numpy.std( Yy, dtype=mfp ) + Ym = numpy.mean( numpy.array( Ys ), axis=0, dtype=mfp ) + msgs += ("\n Characteristics of the vector Ym, mean of the outputs Y:\n") + msgs += (" Size of the mean of the outputs...............: %i\n")%Ym.size + msgs += (" Minimum value of the mean of the outputs......: %."+str(_p)+"e\n")%numpy.min( Ym ) + msgs += (" Maximum value of the mean of the outputs......: %."+str(_p)+"e\n")%numpy.max( Ym ) + msgs += (" Mean of the mean of the outputs...............: %."+str(_p)+"e\n")%numpy.mean( Ym, dtype=mfp ) + msgs += (" Standard error of the mean of the outputs.....: %."+str(_p)+"e\n")%numpy.std( Ym, dtype=mfp ) + Ye = numpy.mean( numpy.array( Ys ) - Ym, axis=0, dtype=mfp ) + msgs += "\n Characteristics of the mean of the differences between the outputs Y and their mean Ym:\n" + msgs += (" Size of the mean of the differences...........: %i\n")%Ym.size + msgs += (" Minimum value of the mean of the differences..: %."+str(_p)+"e\n")%numpy.min( Ye ) + msgs += (" Maximum value of the mean of the differences..: %."+str(_p)+"e\n")%numpy.max( Ye ) + msgs += (" Mean of the mean of the differences...........: %."+str(_p)+"e\n")%numpy.mean( Ye, dtype=mfp ) + msgs += (" Standard error of the mean of the differences.: %."+str(_p)+"e\n")%numpy.std( Ye, dtype=mfp ) + msgs += ("\n %s\n"%("-"*75,)) + print(msgs) + # + self._post_run(HO) + return 0 + +# ============================================================================== +if __name__ == "__main__": + print('\n AUTODIAGNOSTIC\n') diff --git a/src/daComposant/daCore/BasicObjects.py b/src/daComposant/daCore/BasicObjects.py index c9f7202..592c56f 100644 --- a/src/daComposant/daCore/BasicObjects.py +++ b/src/daComposant/daCore/BasicObjects.py @@ -113,6 +113,7 @@ class Operator(object): fromMatrix = None, avoidingRedundancy = True, inputAsMultiFunction = False, + enableMultiProcess = False, extraArguments = None, ): """ @@ -131,13 +132,14 @@ class Operator(object): self.__NbCallsAsMatrix, self.__NbCallsAsMethod, self.__NbCallsOfCached = 0, 0, 0 self.__AvoidRC = bool( avoidingRedundancy ) self.__inputAsMF = bool( inputAsMultiFunction ) + self.__mpEnabled = bool( enableMultiProcess ) self.__extraArgs = extraArguments if fromMethod is not None and self.__inputAsMF: self.__Method = fromMethod # logtimer(fromMethod) self.__Matrix = None self.__Type = "Method" elif fromMethod is not None and not self.__inputAsMF: - self.__Method = partial( MultiFonction, _sFunction=fromMethod) + self.__Method = partial( MultiFonction, _sFunction=fromMethod, _mpEnabled=self.__mpEnabled) self.__Matrix = None self.__Type = "Method" elif fromMatrix is not None: @@ -404,14 +406,18 @@ class FullOperator(object): __Parameters = {} if (asDict is not None) and isinstance(asDict, dict): __Parameters.update( asDict ) - if "DifferentialIncrement" in asDict: - __Parameters["withIncrement"] = asDict["DifferentialIncrement"] - if "CenteredFiniteDifference" in asDict: - __Parameters["withCenteredDF"] = asDict["CenteredFiniteDifference"] - if "EnableMultiProcessing" in asDict: - __Parameters["withmpEnabled"] = asDict["EnableMultiProcessing"] - if "NumberOfProcesses" in asDict: - __Parameters["withmpWorkers"] = asDict["NumberOfProcesses"] + # Priorité à EnableMultiProcessingInDerivatives=True + if "EnableMultiProcessing" in __Parameters and __Parameters["EnableMultiProcessing"]: + __Parameters["EnableMultiProcessingInDerivatives"] = True + __Parameters["EnableMultiProcessingInEvaluation"] = False + if "EnableMultiProcessingInDerivatives" not in __Parameters: + __Parameters["EnableMultiProcessingInDerivatives"] = False + if __Parameters["EnableMultiProcessingInDerivatives"]: + __Parameters["EnableMultiProcessingInEvaluation"] = False + if "EnableMultiProcessingInEvaluation" not in __Parameters: + __Parameters["EnableMultiProcessingInEvaluation"] = False + if "withIncrement" in __Parameters: # Temporaire + __Parameters["DifferentialIncrement"] = __Parameters["withIncrement"] # if asScript is not None: __Matrix, __Function = None, None @@ -477,40 +483,39 @@ class FullOperator(object): if isinstance(__Function, dict) and \ ("useApproximatedDerivatives" in __Function) and bool(__Function["useApproximatedDerivatives"]) and \ ("Direct" in __Function) and (__Function["Direct"] is not None): - if "withCenteredDF" not in __Function: __Function["withCenteredDF"] = False - if "withIncrement" not in __Function: __Function["withIncrement"] = 0.01 - if "withdX" not in __Function: __Function["withdX"] = None - if "withAvoidingRedundancy" not in __Function: __Function["withAvoidingRedundancy"] = avoidRC - if "withToleranceInRedundancy" not in __Function: __Function["withToleranceInRedundancy"] = 1.e-18 - if "withLenghtOfRedundancy" not in __Function: __Function["withLenghtOfRedundancy"] = -1 - if "withmpEnabled" not in __Function: __Function["withmpEnabled"] = False - if "withmpWorkers" not in __Function: __Function["withmpWorkers"] = None - if "withmfEnabled" not in __Function: __Function["withmfEnabled"] = inputAsMF + if "CenteredFiniteDifference" not in __Function: __Function["CenteredFiniteDifference"] = False + if "DifferentialIncrement" not in __Function: __Function["DifferentialIncrement"] = 0.01 + if "withdX" not in __Function: __Function["withdX"] = None + if "withAvoidingRedundancy" not in __Function: __Function["withAvoidingRedundancy"] = avoidRC + if "withToleranceInRedundancy" not in __Function: __Function["withToleranceInRedundancy"] = 1.e-18 + if "withLenghtOfRedundancy" not in __Function: __Function["withLenghtOfRedundancy"] = -1 + if "NumberOfProcesses" not in __Function: __Function["NumberOfProcesses"] = None + if "withmfEnabled" not in __Function: __Function["withmfEnabled"] = inputAsMF from daNumerics.ApproximatedDerivatives import FDApproximation FDA = FDApproximation( Function = __Function["Direct"], - centeredDF = __Function["withCenteredDF"], - increment = __Function["withIncrement"], + centeredDF = __Function["CenteredFiniteDifference"], + increment = __Function["DifferentialIncrement"], dX = __Function["withdX"], avoidingRedundancy = __Function["withAvoidingRedundancy"], toleranceInRedundancy = __Function["withToleranceInRedundancy"], lenghtOfRedundancy = __Function["withLenghtOfRedundancy"], - mpEnabled = __Function["withmpEnabled"], - mpWorkers = __Function["withmpWorkers"], + mpEnabled = __Function["EnableMultiProcessingInDerivatives"], + mpWorkers = __Function["NumberOfProcesses"], mfEnabled = __Function["withmfEnabled"], ) - self.__FO["Direct"] = Operator( fromMethod = FDA.DirectOperator, avoidingRedundancy = avoidRC, inputAsMultiFunction = inputAsMF, extraArguments = self.__extraArgs ) + self.__FO["Direct"] = Operator( fromMethod = FDA.DirectOperator, avoidingRedundancy = avoidRC, inputAsMultiFunction = inputAsMF, extraArguments = self.__extraArgs, enableMultiProcess = __Parameters["EnableMultiProcessingInEvaluation"] ) self.__FO["Tangent"] = Operator( fromMethod = FDA.TangentOperator, avoidingRedundancy = avoidRC, inputAsMultiFunction = inputAsMF, extraArguments = self.__extraArgs ) self.__FO["Adjoint"] = Operator( fromMethod = FDA.AdjointOperator, avoidingRedundancy = avoidRC, inputAsMultiFunction = inputAsMF, extraArguments = self.__extraArgs ) elif isinstance(__Function, dict) and \ ("Direct" in __Function) and ("Tangent" in __Function) and ("Adjoint" in __Function) and \ (__Function["Direct"] is not None) and (__Function["Tangent"] is not None) and (__Function["Adjoint"] is not None): - self.__FO["Direct"] = Operator( fromMethod = __Function["Direct"], avoidingRedundancy = avoidRC, inputAsMultiFunction = inputAsMF, extraArguments = self.__extraArgs ) + self.__FO["Direct"] = Operator( fromMethod = __Function["Direct"], avoidingRedundancy = avoidRC, inputAsMultiFunction = inputAsMF, extraArguments = self.__extraArgs, enableMultiProcess = __Parameters["EnableMultiProcessingInEvaluation"] ) self.__FO["Tangent"] = Operator( fromMethod = __Function["Tangent"], avoidingRedundancy = avoidRC, inputAsMultiFunction = inputAsMF, extraArguments = self.__extraArgs ) self.__FO["Adjoint"] = Operator( fromMethod = __Function["Adjoint"], avoidingRedundancy = avoidRC, inputAsMultiFunction = inputAsMF, extraArguments = self.__extraArgs ) elif asMatrix is not None: __matrice = numpy.matrix( __Matrix, numpy.float ) - self.__FO["Direct"] = Operator( fromMatrix = __matrice, avoidingRedundancy = avoidRC, inputAsMultiFunction = inputAsMF ) + self.__FO["Direct"] = Operator( fromMatrix = __matrice, avoidingRedundancy = avoidRC, inputAsMultiFunction = inputAsMF, enableMultiProcess = __Parameters["EnableMultiProcessingInEvaluation"] ) self.__FO["Tangent"] = Operator( fromMatrix = __matrice, avoidingRedundancy = avoidRC, inputAsMultiFunction = inputAsMF ) self.__FO["Adjoint"] = Operator( fromMatrix = __matrice.T, avoidingRedundancy = avoidRC, inputAsMultiFunction = inputAsMF ) del __matrice @@ -1848,29 +1853,69 @@ class CaseLogger(object): return __formater.load(__filename, __content, __object) # ============================================================================== -def MultiFonction( __xserie, _extraArguments = None, _sFunction = lambda x: x ): +def MultiFonction( + __xserie, + _extraArguments = None, + _sFunction = lambda x: x, + _mpEnabled = False, + _mpWorkers = None, + ): """ Pour une liste ordonnée de vecteurs en entrée, renvoie en sortie la liste correspondante de valeurs de la fonction en argument """ # Vérifications et définitions initiales + # logging.debug("MULTF Internal multifonction calculations begin with function %s"%(_sFunction.__name__,)) if not PlatformInfo.isIterable( __xserie ): raise TypeError("MultiFonction not iterable unkown input type: %s"%(type(__xserie),)) + if _mpEnabled: + if (_mpWorkers is None) or (_mpWorkers is not None and _mpWorkers < 1): + __mpWorkers = None + else: + __mpWorkers = int(_mpWorkers) + try: + import multiprocessing + __mpEnabled = True + except ImportError: + __mpEnabled = False + else: + __mpEnabled = False + __mpWorkers = None # # Calculs effectifs - __multiHX = [] - if _extraArguments is None: - for __xvalue in __xserie: - __multiHX.append( _sFunction( __xvalue ) ) - elif _extraArguments is not None and isinstance(_extraArguments, (list, tuple, map)): - for __xvalue in __xserie: - __multiHX.append( _sFunction( __xvalue, *_extraArguments ) ) - elif _extraArguments is not None and isinstance(_extraArguments, dict): - for __xvalue in __xserie: - __multiHX.append( _sFunction( __xvalue, **_extraArguments ) ) + if __mpEnabled: + _jobs = [] + if _extraArguments is None: + _jobs = __xserie + elif _extraArguments is not None and isinstance(_extraArguments, (list, tuple, map)): + for __xvalue in __xserie: + _jobs.append( [__xvalue, ] + list(_extraArguments) ) + else: + raise TypeError("MultiFonction extra arguments unkown input type: %s"%(type(_extraArguments),)) + # logging.debug("MULTF Internal multiprocessing calculations begin : evaluation of %i point(s)"%(len(_jobs),)) + import multiprocessing + with multiprocessing.Pool(__mpWorkers) as pool: + __multiHX = pool.map( _sFunction, _jobs ) + pool.close() + pool.join() + # logging.debug("MULTF Internal multiprocessing calculation end") else: - raise TypeError("MultiFonction extra arguments unkown input type: %s"%(type(_extraArguments),)) + # logging.debug("MULTF Internal monoprocessing calculation begin") + __multiHX = [] + if _extraArguments is None: + for __xvalue in __xserie: + __multiHX.append( _sFunction( __xvalue ) ) + elif _extraArguments is not None and isinstance(_extraArguments, (list, tuple, map)): + for __xvalue in __xserie: + __multiHX.append( _sFunction( __xvalue, *_extraArguments ) ) + elif _extraArguments is not None and isinstance(_extraArguments, dict): + for __xvalue in __xserie: + __multiHX.append( _sFunction( __xvalue, **_extraArguments ) ) + else: + raise TypeError("MultiFonction extra arguments unkown input type: %s"%(type(_extraArguments),)) + # logging.debug("MULTF Internal monoprocessing calculation end") # + # logging.debug("MULTF Internal multifonction calculations end") return __multiHX # ============================================================================== diff --git a/src/daSalome/daYacsSchemaCreator/infos_daComposant.py b/src/daSalome/daYacsSchemaCreator/infos_daComposant.py index 539c1bf..5bc0979 100644 --- a/src/daSalome/daYacsSchemaCreator/infos_daComposant.py +++ b/src/daSalome/daYacsSchemaCreator/infos_daComposant.py @@ -86,6 +86,7 @@ CheckAlgos = [ "TangentTest", "LocalSensitivityTest", "SamplingTest", + "ParallelFunctionTest", "ObserverTest", ] @@ -200,6 +201,10 @@ AlgoDataRequirements["SamplingTest"] = [ "Observation", "ObservationError", "ObservationOperator", ] +AlgoDataRequirements["ParallelFunctionTest"] = [ + "CheckingPoint", + "ObservationOperator", + ] AlgoDataRequirements["ObserverTest"] = [ "Observers", ] -- 2.39.2