]> SALOME platform Git repositories - modules/adao.git/commitdiff
Salome HOME
Adding ParallelFunctionTest algorithm and improve parallel modes
authorJean-Philippe ARGAUD <jean-philippe.argaud@edf.fr>
Sat, 25 May 2019 18:47:50 +0000 (20:47 +0200)
committerJean-Philippe ARGAUD <jean-philippe.argaud@edf.fr>
Sun, 26 May 2019 08:19:03 +0000 (10:19 +0200)
src/daComposant/daAlgorithms/ParallelFunctionTest.py [new file with mode: 0644]
src/daComposant/daCore/BasicObjects.py
src/daSalome/daYacsSchemaCreator/infos_daComposant.py

diff --git a/src/daComposant/daAlgorithms/ParallelFunctionTest.py b/src/daComposant/daAlgorithms/ParallelFunctionTest.py
new file mode 100644 (file)
index 0000000..30f0047
--- /dev/null
@@ -0,0 +1,187 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (C) 2008-2019 EDF R&D
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
+#
+# See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
+#
+# Author: Jean-Philippe Argaud, jean-philippe.argaud@edf.fr, EDF R&D
+
+import sys, logging
+from daCore import BasicObjects, PlatformInfo
+import numpy, copy
+mpr = PlatformInfo.PlatformInfo().MachinePrecision()
+mfp = PlatformInfo.PlatformInfo().MaximumPrecision()
+if sys.version_info.major > 2:
+    unicode = str
+
+# ==============================================================================
+class ElementaryAlgorithm(BasicObjects.Algorithm):
+    def __init__(self):
+        BasicObjects.Algorithm.__init__(self, "PARALLELFUNCTIONTEST")
+        self.defineRequiredParameter(
+            name     = "NumberOfPrintedDigits",
+            default  = 5,
+            typecast = int,
+            message  = "Nombre de chiffres affichés pour les impressions de réels",
+            minval   = 0,
+            )
+        self.defineRequiredParameter(
+            name     = "NumberOfRepetition",
+            default  = 1,
+            typecast = int,
+            message  = "Nombre de fois où l'exécution de la fonction est répétée",
+            minval   = 1,
+            )
+        self.defineRequiredParameter(
+            name     = "ResultTitle",
+            default  = "",
+            typecast = str,
+            message  = "Titre du tableau et de la figure",
+            )
+        self.defineRequiredParameter(
+            name     = "SetDebug",
+            default  = False,
+            typecast = bool,
+            message  = "Activation du mode debug lors de l'exécution",
+            )
+        self.defineRequiredParameter(
+            name     = "StoreSupplementaryCalculations",
+            default  = [],
+            typecast = tuple,
+            message  = "Liste de calculs supplémentaires à stocker et/ou effectuer",
+            listval  = [
+                "CurrentState",
+                "SimulatedObservationAtCurrentState",
+                ]
+            )
+        self.requireInputArguments(
+            mandatory= ("Xb", "HO"),
+            )
+
+    def run(self, Xb=None, Y=None, U=None, HO=None, EM=None, CM=None, R=None, B=None, Q=None, Parameters=None):
+        self._pre_run(Parameters, Xb, Y, R, B, Q)
+        #
+        Hm = HO["Direct"].appliedTo
+        #
+        Xn = copy.copy( Xb )
+        #
+        # ----------
+        __marge =  5*u" "
+        _p = self._parameters["NumberOfPrintedDigits"]
+        if len(self._parameters["ResultTitle"]) > 0:
+            __rt = unicode(self._parameters["ResultTitle"])
+            msgs  = u"\n"
+            msgs +=  __marge + "====" + "="*len(__rt) + "====\n"
+            msgs +=  __marge + "    " + __rt + "\n"
+            msgs +=  __marge + "====" + "="*len(__rt) + "====\n"
+            print("%s"%msgs)
+        #
+        msgs  = ("===> Information before launching:\n")
+        msgs += ("     -----------------------------\n")
+        msgs += ("     Characteristics of input vector X, internally converted:\n")
+        msgs += ("       Type...............: %s\n")%type( Xn )
+        msgs += ("       Lenght of vector...: %i\n")%max(numpy.matrix( Xn ).shape)
+        msgs += ("       Minimum value......: %."+str(_p)+"e\n")%numpy.min( Xn )
+        msgs += ("       Maximum value......: %."+str(_p)+"e\n")%numpy.max( Xn )
+        msgs += ("       Mean of vector.....: %."+str(_p)+"e\n")%numpy.mean( Xn, dtype=mfp )
+        msgs += ("       Standard error.....: %."+str(_p)+"e\n")%numpy.std( Xn, dtype=mfp )
+        msgs += ("       L2 norm of vector..: %."+str(_p)+"e\n")%numpy.linalg.norm( Xn )
+        print(msgs)
+        #
+        print("     %s\n"%("-"*75,))
+        if self._parameters["SetDebug"]:
+            CUR_LEVEL = logging.getLogger().getEffectiveLevel()
+            logging.getLogger().setLevel(logging.DEBUG)
+            print("===> Beginning of evaluation, activating debug\n")
+        else:
+            print("===> Beginning of evaluation, without activating debug\n")
+        #
+        Xs = []
+        Ys = []
+        for i in range(self._parameters["NumberOfRepetition"]):
+            if self._toStore("CurrentState"):
+                self.StoredVariables["CurrentState"].store( numpy.ravel(Xn) )
+            Xs.append( Xn )
+        #
+        # ----------
+        HO["Direct"].disableAvoidingRedundancy()
+        # ----------
+        Ys = Hm( Xs, argsAsSerie = True )
+        # ----------
+        HO["Direct"].enableAvoidingRedundancy()
+        # ----------
+        #
+        print()
+        if self._parameters["SetDebug"]:
+            print("===> End of evaluation, deactivating debug\n")
+            logging.getLogger().setLevel(CUR_LEVEL)
+        else:
+            print("===> End of evaluation, without deactivating debug\n")
+        #
+        for i in range(self._parameters["NumberOfRepetition"]):
+            print("     %s\n"%("-"*75,))
+            if self._parameters["NumberOfRepetition"] > 1:
+                print("===> Repetition step number %i on a total of %i\n"%(i+1,self._parameters["NumberOfRepetition"]))
+            #
+            Yn = Ys[i]
+            msgs  = ("===> Information after evaluation:\n")
+            msgs += ("\n     Characteristics of simulated output vector Y=H(X), to compare to others:\n")
+            msgs += ("       Type...............: %s\n")%type( Yn )
+            msgs += ("       Lenght of vector...: %i\n")%max(numpy.matrix( Yn ).shape)
+            msgs += ("       Minimum value......: %."+str(_p)+"e\n")%numpy.min( Yn )
+            msgs += ("       Maximum value......: %."+str(_p)+"e\n")%numpy.max( Yn )
+            msgs += ("       Mean of vector.....: %."+str(_p)+"e\n")%numpy.mean( Yn, dtype=mfp )
+            msgs += ("       Standard error.....: %."+str(_p)+"e\n")%numpy.std( Yn, dtype=mfp )
+            msgs += ("       L2 norm of vector..: %."+str(_p)+"e\n")%numpy.linalg.norm( Yn )
+            print(msgs)
+            if self._toStore("SimulatedObservationAtCurrentState"):
+                self.StoredVariables["SimulatedObservationAtCurrentState"].store( numpy.ravel(Yn) )
+        #
+        if self._parameters["NumberOfRepetition"] > 1:
+            msgs  = ("     %s\n"%("-"*75,))
+            msgs += ("\n===> Statistical analysis of the outputs obtained through parallel repeated evaluations\n")
+            msgs += ("\n     (Remark: numbers that are (about) under %.0e represent 0 to machine precision)\n"%mpr)
+            Yy = numpy.array( Ys )
+            msgs += ("\n     Characteristics of the whole set of outputs Y:\n")
+            msgs += ("       Number of evaluations.........................: %i\n")%len( Ys )
+            msgs += ("       Minimum value of the whole set of outputs.....: %."+str(_p)+"e\n")%numpy.min( Yy )
+            msgs += ("       Maximum value of the whole set of outputs.....: %."+str(_p)+"e\n")%numpy.max( Yy )
+            msgs += ("       Mean of vector of the whole set of outputs....: %."+str(_p)+"e\n")%numpy.mean( Yy, dtype=mfp )
+            msgs += ("       Standard error of the whole set of outputs....: %."+str(_p)+"e\n")%numpy.std( Yy, dtype=mfp )
+            Ym = numpy.mean( numpy.array( Ys ), axis=0, dtype=mfp )
+            msgs += ("\n     Characteristics of the vector Ym, mean of the outputs Y:\n")
+            msgs += ("       Size of the mean of the outputs...............: %i\n")%Ym.size
+            msgs += ("       Minimum value of the mean of the outputs......: %."+str(_p)+"e\n")%numpy.min( Ym )
+            msgs += ("       Maximum value of the mean of the outputs......: %."+str(_p)+"e\n")%numpy.max( Ym )
+            msgs += ("       Mean of the mean of the outputs...............: %."+str(_p)+"e\n")%numpy.mean( Ym, dtype=mfp )
+            msgs += ("       Standard error of the mean of the outputs.....: %."+str(_p)+"e\n")%numpy.std( Ym, dtype=mfp )
+            Ye = numpy.mean( numpy.array( Ys ) - Ym, axis=0, dtype=mfp )
+            msgs += "\n     Characteristics of the mean of the differences between the outputs Y and their mean Ym:\n"
+            msgs += ("       Size of the mean of the differences...........: %i\n")%Ym.size
+            msgs += ("       Minimum value of the mean of the differences..: %."+str(_p)+"e\n")%numpy.min( Ye )
+            msgs += ("       Maximum value of the mean of the differences..: %."+str(_p)+"e\n")%numpy.max( Ye )
+            msgs += ("       Mean of the mean of the differences...........: %."+str(_p)+"e\n")%numpy.mean( Ye, dtype=mfp )
+            msgs += ("       Standard error of the mean of the differences.: %."+str(_p)+"e\n")%numpy.std( Ye, dtype=mfp )
+            msgs += ("\n     %s\n"%("-"*75,))
+            print(msgs)
+        #
+        self._post_run(HO)
+        return 0
+
+# ==============================================================================
+if __name__ == "__main__":
+    print('\n AUTODIAGNOSTIC\n')
index c9f72028f075387bf3886ed3f9b2af76dd9d3d19..592c56fdda34ffb8ca5462f9aa9449a749b77128 100644 (file)
@@ -113,6 +113,7 @@ class Operator(object):
         fromMatrix           = None,
         avoidingRedundancy   = True,
         inputAsMultiFunction = False,
+        enableMultiProcess   = False,
         extraArguments       = None,
         ):
         """
@@ -131,13 +132,14 @@ class Operator(object):
         self.__NbCallsAsMatrix, self.__NbCallsAsMethod, self.__NbCallsOfCached = 0, 0, 0
         self.__AvoidRC   = bool( avoidingRedundancy )
         self.__inputAsMF = bool( inputAsMultiFunction )
+        self.__mpEnabled = bool( enableMultiProcess )
         self.__extraArgs = extraArguments
         if   fromMethod is not None and self.__inputAsMF:
             self.__Method = fromMethod # logtimer(fromMethod)
             self.__Matrix = None
             self.__Type   = "Method"
         elif fromMethod is not None and not self.__inputAsMF:
-            self.__Method = partial( MultiFonction, _sFunction=fromMethod)
+            self.__Method = partial( MultiFonction, _sFunction=fromMethod, _mpEnabled=self.__mpEnabled)
             self.__Matrix = None
             self.__Type   = "Method"
         elif fromMatrix is not None:
@@ -404,14 +406,18 @@ class FullOperator(object):
         __Parameters = {}
         if (asDict is not None) and isinstance(asDict, dict):
             __Parameters.update( asDict )
-            if "DifferentialIncrement" in asDict:
-                __Parameters["withIncrement"]  = asDict["DifferentialIncrement"]
-            if "CenteredFiniteDifference" in asDict:
-                __Parameters["withCenteredDF"] = asDict["CenteredFiniteDifference"]
-            if "EnableMultiProcessing" in asDict:
-                __Parameters["withmpEnabled"]  = asDict["EnableMultiProcessing"]
-            if "NumberOfProcesses" in asDict:
-                __Parameters["withmpWorkers"]  = asDict["NumberOfProcesses"]
+        # Priorité à EnableMultiProcessingInDerivatives=True
+        if "EnableMultiProcessing" in __Parameters and __Parameters["EnableMultiProcessing"]:
+            __Parameters["EnableMultiProcessingInDerivatives"] = True
+            __Parameters["EnableMultiProcessingInEvaluation"]  = False
+        if "EnableMultiProcessingInDerivatives"  not in __Parameters:
+            __Parameters["EnableMultiProcessingInDerivatives"]  = False
+        if __Parameters["EnableMultiProcessingInDerivatives"]:
+            __Parameters["EnableMultiProcessingInEvaluation"]  = False
+        if "EnableMultiProcessingInEvaluation"  not in __Parameters:
+            __Parameters["EnableMultiProcessingInEvaluation"]  = False
+        if "withIncrement" in __Parameters: # Temporaire
+            __Parameters["DifferentialIncrement"] = __Parameters["withIncrement"]
         #
         if asScript is not None:
             __Matrix, __Function = None, None
@@ -477,40 +483,39 @@ class FullOperator(object):
         if isinstance(__Function, dict) and \
                 ("useApproximatedDerivatives" in __Function) and bool(__Function["useApproximatedDerivatives"]) and \
                 ("Direct" in __Function) and (__Function["Direct"] is not None):
-            if "withCenteredDF"            not in __Function: __Function["withCenteredDF"]            = False
-            if "withIncrement"             not in __Function: __Function["withIncrement"]             = 0.01
-            if "withdX"                    not in __Function: __Function["withdX"]                    = None
-            if "withAvoidingRedundancy"    not in __Function: __Function["withAvoidingRedundancy"]    = avoidRC
-            if "withToleranceInRedundancy" not in __Function: __Function["withToleranceInRedundancy"] = 1.e-18
-            if "withLenghtOfRedundancy"    not in __Function: __Function["withLenghtOfRedundancy"]    = -1
-            if "withmpEnabled"             not in __Function: __Function["withmpEnabled"]             = False
-            if "withmpWorkers"             not in __Function: __Function["withmpWorkers"]             = None
-            if "withmfEnabled"             not in __Function: __Function["withmfEnabled"]             = inputAsMF
+            if "CenteredFiniteDifference"           not in __Function: __Function["CenteredFiniteDifference"]           = False
+            if "DifferentialIncrement"              not in __Function: __Function["DifferentialIncrement"]              = 0.01
+            if "withdX"                             not in __Function: __Function["withdX"]                             = None
+            if "withAvoidingRedundancy"             not in __Function: __Function["withAvoidingRedundancy"]             = avoidRC
+            if "withToleranceInRedundancy"          not in __Function: __Function["withToleranceInRedundancy"]          = 1.e-18
+            if "withLenghtOfRedundancy"             not in __Function: __Function["withLenghtOfRedundancy"]             = -1
+            if "NumberOfProcesses"                  not in __Function: __Function["NumberOfProcesses"]                  = None
+            if "withmfEnabled"                      not in __Function: __Function["withmfEnabled"]                      = inputAsMF
             from daNumerics.ApproximatedDerivatives import FDApproximation
             FDA = FDApproximation(
                 Function              = __Function["Direct"],
-                centeredDF            = __Function["withCenteredDF"],
-                increment             = __Function["withIncrement"],
+                centeredDF            = __Function["CenteredFiniteDifference"],
+                increment             = __Function["DifferentialIncrement"],
                 dX                    = __Function["withdX"],
                 avoidingRedundancy    = __Function["withAvoidingRedundancy"],
                 toleranceInRedundancy = __Function["withToleranceInRedundancy"],
                 lenghtOfRedundancy    = __Function["withLenghtOfRedundancy"],
-                mpEnabled             = __Function["withmpEnabled"],
-                mpWorkers             = __Function["withmpWorkers"],
+                mpEnabled             = __Function["EnableMultiProcessingInDerivatives"],
+                mpWorkers             = __Function["NumberOfProcesses"],
                 mfEnabled             = __Function["withmfEnabled"],
                 )
-            self.__FO["Direct"]  = Operator( fromMethod = FDA.DirectOperator,  avoidingRedundancy = avoidRC, inputAsMultiFunction = inputAsMF, extraArguments = self.__extraArgs )
+            self.__FO["Direct"]  = Operator( fromMethod = FDA.DirectOperator,  avoidingRedundancy = avoidRC, inputAsMultiFunction = inputAsMF, extraArguments = self.__extraArgs, enableMultiProcess = __Parameters["EnableMultiProcessingInEvaluation"] )
             self.__FO["Tangent"] = Operator( fromMethod = FDA.TangentOperator, avoidingRedundancy = avoidRC, inputAsMultiFunction = inputAsMF, extraArguments = self.__extraArgs )
             self.__FO["Adjoint"] = Operator( fromMethod = FDA.AdjointOperator, avoidingRedundancy = avoidRC, inputAsMultiFunction = inputAsMF, extraArguments = self.__extraArgs )
         elif isinstance(__Function, dict) and \
                 ("Direct" in __Function) and ("Tangent" in __Function) and ("Adjoint" in __Function) and \
                 (__Function["Direct"] is not None) and (__Function["Tangent"] is not None) and (__Function["Adjoint"] is not None):
-            self.__FO["Direct"]  = Operator( fromMethod = __Function["Direct"],  avoidingRedundancy = avoidRC, inputAsMultiFunction = inputAsMF, extraArguments = self.__extraArgs )
+            self.__FO["Direct"]  = Operator( fromMethod = __Function["Direct"],  avoidingRedundancy = avoidRC, inputAsMultiFunction = inputAsMF, extraArguments = self.__extraArgs, enableMultiProcess = __Parameters["EnableMultiProcessingInEvaluation"] )
             self.__FO["Tangent"] = Operator( fromMethod = __Function["Tangent"], avoidingRedundancy = avoidRC, inputAsMultiFunction = inputAsMF, extraArguments = self.__extraArgs )
             self.__FO["Adjoint"] = Operator( fromMethod = __Function["Adjoint"], avoidingRedundancy = avoidRC, inputAsMultiFunction = inputAsMF, extraArguments = self.__extraArgs )
         elif asMatrix is not None:
             __matrice = numpy.matrix( __Matrix, numpy.float )
-            self.__FO["Direct"]  = Operator( fromMatrix = __matrice,   avoidingRedundancy = avoidRC, inputAsMultiFunction = inputAsMF )
+            self.__FO["Direct"]  = Operator( fromMatrix = __matrice,   avoidingRedundancy = avoidRC, inputAsMultiFunction = inputAsMF, enableMultiProcess = __Parameters["EnableMultiProcessingInEvaluation"] )
             self.__FO["Tangent"] = Operator( fromMatrix = __matrice,   avoidingRedundancy = avoidRC, inputAsMultiFunction = inputAsMF )
             self.__FO["Adjoint"] = Operator( fromMatrix = __matrice.T, avoidingRedundancy = avoidRC, inputAsMultiFunction = inputAsMF )
             del __matrice
@@ -1848,29 +1853,69 @@ class CaseLogger(object):
         return __formater.load(__filename, __content, __object)
 
 # ==============================================================================
-def MultiFonction( __xserie, _extraArguments = None, _sFunction = lambda x: x ):
+def MultiFonction(
+        __xserie,
+        _extraArguments = None,
+        _sFunction      = lambda x: x,
+        _mpEnabled      = False,
+        _mpWorkers      = None,
+        ):
     """
     Pour une liste ordonnée de vecteurs en entrée, renvoie en sortie la liste
     correspondante de valeurs de la fonction en argument
     """
     # Vérifications et définitions initiales
+    # logging.debug("MULTF Internal multifonction calculations begin with function %s"%(_sFunction.__name__,))
     if not PlatformInfo.isIterable( __xserie ):
         raise TypeError("MultiFonction not iterable unkown input type: %s"%(type(__xserie),))
+    if _mpEnabled:
+        if (_mpWorkers is None) or (_mpWorkers is not None and _mpWorkers < 1):
+            __mpWorkers = None
+        else:
+            __mpWorkers = int(_mpWorkers)
+        try:
+            import multiprocessing
+            __mpEnabled = True
+        except ImportError:
+            __mpEnabled = False
+    else:
+        __mpEnabled = False
+        __mpWorkers = None
     #
     # Calculs effectifs
-    __multiHX = []
-    if _extraArguments is None:
-        for __xvalue in __xserie:
-            __multiHX.append( _sFunction( __xvalue ) )
-    elif _extraArguments is not None and isinstance(_extraArguments, (list, tuple, map)):
-        for __xvalue in __xserie:
-            __multiHX.append( _sFunction( __xvalue, *_extraArguments ) )
-    elif _extraArguments is not None and isinstance(_extraArguments, dict):
-        for __xvalue in __xserie:
-            __multiHX.append( _sFunction( __xvalue, **_extraArguments ) )
+    if __mpEnabled:
+        _jobs = []
+        if _extraArguments is None:
+            _jobs = __xserie
+        elif _extraArguments is not None and isinstance(_extraArguments, (list, tuple, map)):
+            for __xvalue in __xserie:
+                _jobs.append( [__xvalue, ] + list(_extraArguments) )
+        else:
+            raise TypeError("MultiFonction extra arguments unkown input type: %s"%(type(_extraArguments),))
+        # logging.debug("MULTF Internal multiprocessing calculations begin : evaluation of %i point(s)"%(len(_jobs),))
+        import multiprocessing
+        with multiprocessing.Pool(__mpWorkers) as pool:
+            __multiHX = pool.map( _sFunction, _jobs )
+            pool.close()
+            pool.join()
+        # logging.debug("MULTF Internal multiprocessing calculation end")
     else:
-        raise TypeError("MultiFonction extra arguments unkown input type: %s"%(type(_extraArguments),))
+        # logging.debug("MULTF Internal monoprocessing calculation begin")
+        __multiHX = []
+        if _extraArguments is None:
+            for __xvalue in __xserie:
+                __multiHX.append( _sFunction( __xvalue ) )
+        elif _extraArguments is not None and isinstance(_extraArguments, (list, tuple, map)):
+            for __xvalue in __xserie:
+                __multiHX.append( _sFunction( __xvalue, *_extraArguments ) )
+        elif _extraArguments is not None and isinstance(_extraArguments, dict):
+            for __xvalue in __xserie:
+                __multiHX.append( _sFunction( __xvalue, **_extraArguments ) )
+        else:
+            raise TypeError("MultiFonction extra arguments unkown input type: %s"%(type(_extraArguments),))
+        # logging.debug("MULTF Internal monoprocessing calculation end")
     #
+    # logging.debug("MULTF Internal multifonction calculations end")
     return __multiHX
 
 # ==============================================================================
index 539c1bf0d572659b7429dbfe385458b8297f7a8c..5bc0979e65072b26c80a55d01a1aecfd8d1752f3 100644 (file)
@@ -86,6 +86,7 @@ CheckAlgos = [
     "TangentTest",
     "LocalSensitivityTest",
     "SamplingTest",
+    "ParallelFunctionTest",
     "ObserverTest",
     ]
 
@@ -200,6 +201,10 @@ AlgoDataRequirements["SamplingTest"] = [
     "Observation", "ObservationError",
     "ObservationOperator",
     ]
+AlgoDataRequirements["ParallelFunctionTest"] = [
+    "CheckingPoint",
+    "ObservationOperator",
+    ]
 AlgoDataRequirements["ObserverTest"] = [
     "Observers",
     ]