]> SALOME platform Git repositories - modules/kernel.git/commitdiff
Salome HOME
[EDF30875] : In OutOfProcessNoReplay and OutOfProcessNoReplayFT mode use CORBA/Mem...
authorAnthony Geay <anthony.geay@edf.fr>
Mon, 9 Sep 2024 07:04:14 +0000 (09:04 +0200)
committerCédric Aguerre <cedric.aguerre@edf.fr>
Fri, 13 Sep 2024 07:47:12 +0000 (09:47 +0200)
idl/SALOME_PyNode.idl
src/Container/SALOME_PyNode.py
src/Launcher/Test/testCrashProofContainer.py

index dfe699d3fbbcc84059aa61e4ad5d1d21a4de91fc..e0baf0131f10136e0853e0d2982c64b3c685ed81 100644 (file)
@@ -111,6 +111,12 @@ module Engines
     void removeAllVarsInContext() raises (SALOME::SALOME_Exception);
   } ;
 
+  interface ContextExchanger
+  {
+    SALOME::SenderByte getInputContext() raises (SALOME::SALOME_Exception);
+    void pushOutputContext( in SALOME::SenderByte ctx ) raises (SALOME::SALOME_Exception);
+  };
+
 };
 
 #endif
index 062bea530318ef33ea1152698a7e0ad2dee819de..6ec1afc943840e618426d0e329192ad9915c661a 100644 (file)
@@ -36,6 +36,8 @@ import Engines__POA
 import KernelBasis
 import SALOME
 import SALOME__POA
+import Engines
+import Engines__POA
 from SALOME_ContainerHelper import ScriptExecInfo
 
 MY_CONTAINER_ENTRY_IN_GLBS = "my_container"
@@ -855,6 +857,7 @@ class SeqByteReceiver:
   
 FinalCode = """import pickle
 from SALOME_PyNode import LogOfCurrentExecutionSession,MY_PERFORMANCE_LOG_ENTRY_IN_GLBS
+from SALOME_PyNode import ExchangeModeServerSideFactory
 from KernelBasis import VerbosityActivated
 import CORBA
 import Engines
@@ -863,16 +866,17 @@ from datetime import datetime
 # WorkDir may be important to replay : "{}"
 orb = CORBA.ORB_init([''])
 caseDirectory = "{}"
+contextExchangeMode = "{}"
+bigObjExchanger = ExchangeModeServerSideFactory( contextExchangeMode )
 codeFileName = os.path.join( caseDirectory, "{}" )
-inputFileName = os.path.join( caseDirectory, "{}" )
-outputFileName = os.path.join( caseDirectory, "{}" )
+inputFileName = "{}"
+outputFileName = "{}"
 del os
 outputsKeys = {}
 exec( "{{}} = LogOfCurrentExecutionSession( orb.string_to_object( \\"{}\\" ) )".format(MY_PERFORMANCE_LOG_ENTRY_IN_GLBS) )
 if VerbosityActivated():
   eval( "{{}}".format(MY_PERFORMANCE_LOG_ENTRY_IN_GLBS) ).addFreestyleAndFlush( ("b4loadctx",datetime.now()) )
-with open(inputFileName,"rb") as f:
-  context = pickle.load( f )
+context = bigObjExchanger.buildContextPointEntry(caseDirectory,inputFileName)
 if VerbosityActivated():
   eval( "{{}}".format(MY_PERFORMANCE_LOG_ENTRY_IN_GLBS) ).addFreestyleAndFlush( ("afterloadctx",datetime.now()) )
 context[MY_PERFORMANCE_LOG_ENTRY_IN_GLBS] = eval( MY_PERFORMANCE_LOG_ENTRY_IN_GLBS )
@@ -889,36 +893,37 @@ context = dict( [(k,v) for k,v in context.items() if k in outputsKeys] )
 if VerbosityActivated():
   eval( "{{}}".format(MY_PERFORMANCE_LOG_ENTRY_IN_GLBS) ).addFreestyleAndFlush( ("strtdumpout",datetime.now()) )
 #
-with open(outputFileName,"wb") as f:
-  pickle.dump( context, f )
+bigObjExchanger.pushContext(outputFileName,context)
 if VerbosityActivated():
   eval( "{{}}".format(MY_PERFORMANCE_LOG_ENTRY_IN_GLBS) ).addFreestyleAndFlush( ("afterdump",datetime.now()) )
 """
 
 class PythonFunctionEvaluatorParams:
   def __init__(self, mainFileName, codeFileName, inContextFileName, outContextFileName):
+    """
+    :param outContextFileName: instance of ExchangeContextCltAbs
+    """
     self._main_filename = mainFileName
     self._code_filename = codeFileName
     self._in_context_filename = inContextFileName
     self._out_context_filename = outContextFileName
   @property
   def result(self):
-    import pickle
-    with open(self._out_context_filename,"rb") as f:
-      return pickle.load( f )
+    return self._out_context_filename.retrieveRemoteContext()
+
   def destroyOnOK(self):
-    for fileToDestroy in [self._main_filename,self._code_filename,self._in_context_filename,self._out_context_filename]:
+    self._out_context_filename.removeContextSupport()
+    for fileToDestroy in [self._main_filename,self._code_filename,self._in_context_filename]:
       if os.path.exists( fileToDestroy ):
         os.unlink( fileToDestroy )
   def destroyOnKO(self, containerRef):
-     """
-     Called in the context of failure with replay mode activated
-     """
-     for fileToDestroy in [self._out_context_filename]:
-      if os.path.exists( fileToDestroy ):
-        os.unlink( fileToDestroy )
-      # register to container files group associated to the
-      containerRef.addLogFileNameGroup([self._main_filename,self._code_filename,self._in_context_filename])
+    """
+    Called in the context of failure with replay mode activated
+    """
+    self._out_context_filename.removeContextSupport()
+    # register to container files group associated to the
+    containerRef.addLogFileNameGroup([self._main_filename,self._code_filename,self._in_context_filename])
+
   @property
   def replayCmd(self):
     return "To replay : ( cd {} && python3 {} )".format(os.path.dirname(self._main_filename),os.path.basename(self._main_filename))
@@ -948,6 +953,150 @@ Looks like a hard crash as returnCode {returnCode} != 0
 {banner}
 """
 
+class ExchangeContextBridgeAbs(abc.ABC):
+  def __init__(self):
+    pass
+
+  @abc.abstractmethod
+  def buildContextPointEntry(self, caseDir, contextEntry):
+    raise RuntimeError("Must be overloaded")
+
+  @abc.abstractmethod
+  def pushContext(self, contextPointEntry, contextEntry):
+    raise RuntimeError("Must be overloaded")
+
+class ExchangeContextUsingFile( ExchangeContextBridgeAbs ):
+  def buildContextPointEntry(self, caseDir, contextEntry):
+    self._case_dir = caseDir
+    inputFileName = os.path.join( self._case_dir, contextEntry )
+    with open(inputFileName,"rb") as f:
+      context = pickle.load( f )
+    return context
+  
+  def pushContext(self, contextPointEntry, context):
+    outputFileName = os.path.join( self._case_dir, contextPointEntry )
+    with open(outputFileName,"wb") as f:
+      pickle.dump( context, f )
+
+class ContextExchanger_i(Engines__POA.ContextExchanger,Generic):
+  """
+  In TCP mode, servant hosted SALOME_Container side.
+  """
+  def __init__(self, inCtx):
+    import salome
+    self._poa = salome.orb.resolve_initial_references("RootPOA")
+    self._in_ctx = inCtx
+
+  def getPOA(self):
+    return self._poa
+
+  def getInputContext(self):
+    obj = SenderByte_i(self._poa, pickle.dumps( self._in_ctx ) ) ; id_o = self._poa.activate_object(obj) ; refPtr = self._poa.id_to_reference(id_o)
+    return refPtr
+
+  def pushOutputContext(self, ctx):
+    self._output_context = pickle.loads( SeqByteReceiver( ctx ).data() )
+
+  def getOutputContext(self):
+    return self._output_context
+
+class ExchangeContextUsingTCP( ExchangeContextBridgeAbs ):
+  def buildContextPointEntry(self, caseDir, contextEntry):
+    import CORBA
+    self._orb = CORBA.ORB_init([''])
+    self._data_exchange_channel = self._orb.string_to_object( contextEntry )
+    return pickle.loads( SeqByteReceiver( self._data_exchange_channel.getInputContext() ).data() )
+  
+  def pushContext(self, contextPointEntry, context):
+    poa = self._orb.resolve_initial_references("RootPOA")
+    pman = poa.the_POAManager
+    pman.activate()
+    obj = SenderByte_i(poa, pickle.dumps( context) ) ; id_o = poa.activate_object(obj) ; refPtr = poa.id_to_reference(id_o)
+    self._data_exchange_channel.pushOutputContext( refPtr )
+
+def ExchangeModeServerSideFactory( exchangeMode ):
+  if exchangeMode == "File":
+    return ExchangeContextUsingFile()
+  elif exchangeMode == "TCP":
+    return ExchangeContextUsingTCP()
+  else:
+    raise RuntimeError( "ExchangeModeServerSideFactory : {} not in [File,TCP]" )
+
+class ExchangeContextCltAbs(abc.ABC):
+  def __init__(self):
+    pass
+
+  @abc.abstractmethod
+  def hostInputContext(self, dirForReplayFiles, contextFileBaseName, context):
+    raise RuntimeError("Must be overloaded")
+  
+  @abc.abstractmethod
+  def setOutputContextEntryPoint(self, dirForReplayFiles, outCtxtEntryPoint):
+    raise RuntimeError("Must be overloaded")
+  
+  @abc.abstractmethod
+  def retrieveRemoteContext(self):
+    raise RuntimeError("Must be overloaded")
+  
+  @abc.abstractmethod
+  def removeContextSupport(self):
+    raise RuntimeError("Must be overloaded")
+
+class ExchangeContextUsingFileClt(ExchangeContextCltAbs):
+
+  def hostInputContext(self, dirForReplayFiles, contextFileBaseName, context):
+    contextFileName = os.path.join( dirForReplayFiles, contextFileBaseName)
+    with open(contextFileName,"wb") as contextFd:
+      pickle.dump( context, contextFd)
+    return os.path.basename( contextFileName )
+  
+  def setOutputContextEntryPoint(self, dirForReplayFiles, outCtxtEntryPoint):
+    self._out_ctx_entry_point = os.path.join( dirForReplayFiles, outCtxtEntryPoint )
+    return os.path.basename( self._out_ctx_entry_point )
+
+  def retrieveRemoteContext(self):
+    with open(self._out_ctx_entry_point,"rb") as f:
+      return pickle.load( f )
+    
+  def removeContextSupport(self):
+    if os.path.exists( self._out_ctx_entry_point ):
+      os.unlink( self._out_ctx_entry_point )
+
+class ExchangeContextUsingTCPClt(ExchangeContextCltAbs):
+
+  def hostInputContext(self, dirForReplayFiles, contextFileBaseName, context):
+    import salome
+    self._servant = ContextExchanger_i(context)
+    poa = self._servant.getPOA()
+    self._id_o = poa.activate_object(self._servant) ; refPtr = poa.id_to_reference(self._id_o)
+    self._portable_serv_entry = salome.orb.object_to_string( refPtr )
+    return self._portable_serv_entry
+  
+  def setOutputContextEntryPoint(self, dirForReplayFiles, outCtxtEntryPoint):
+    ret = self._portable_serv_entry
+    del self._portable_serv_entry # destroy to release ref count
+    import gc
+    gc.collect()
+    return ret
+  
+  def retrieveRemoteContext(self):
+    return self._servant.getOutputContext()
+  
+  def removeContextSupport(self):
+    poa = self._servant.getPOA()
+    poa.deactivate_object(self._id_o)
+    del self._servant
+    import gc
+    gc.collect()
+    
+def ExchangeModeCltSideFactory( exchangeMode ):
+  if exchangeMode == "File":
+    return ExchangeContextUsingFileClt()
+  elif exchangeMode == "TCP":
+    return ExchangeContextUsingTCPClt()
+  else:
+    raise RuntimeError( "ExchangeModeCltSideFactory : {} not in [File,TCP]" )
+
 def ExecCrashProofGeneric( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, keepFilesToReplay, closeEyesOnErrorAtExit):
   """
   Equivalent of exec(code,context) but executed in a separate subprocess to avoid to make the current process crash.
@@ -995,7 +1144,7 @@ def ExecCrashProofGeneric( code, context, outargsname, containerRef, instanceOfL
       return False,stderr
 
   #
-  def InternalExecResistant( code, context, outargsname):
+  def InternalExecResistant( exchangeMode, code, context, outargsname):
     import KernelBasis
     orb = CORBA.ORB_init([''])
     iorScriptLog = orb.object_to_string( instanceOfLogOfCurrentSession._remote_handle )#ref ContainerScriptPerfLog_ptr
@@ -1015,21 +1164,21 @@ import sys
 sys.stderr.write({!r})
 sys.stderr.flush()""".format( MY_KEY_TO_DETECT_FINISH ) )
       codeFd.flush()
+      exCtx = ExchangeModeCltSideFactory(exchangeMode)
       codeFileNameFull =  codeFd.name
       codeFileName = os.path.basename( codeFileNameFull )
-      contextFileName = os.path.join( dirForReplayFiles, "contextsafe_{}.pckl".format( RetrieveUniquePartFromPfx( codeFileName  ) ) )
-      with open(contextFileName,"wb") as contextFd:
-        pickle.dump( context, contextFd)
-      resFileName = os.path.join( dirForReplayFiles, "outcontextsafe_{}.pckl".format( RetrieveUniquePartFromPfx( codeFileName  ) ) )
+      contextFileName = exCtx.hostInputContext(dirForReplayFiles, "contextsafe_{}.pckl".format( RetrieveUniquePartFromPfx( codeFileName  ) ), context)
+      resFileName = exCtx.setOutputContextEntryPoint( dirForReplayFiles, "outcontextsafe_{}.pckl".format( RetrieveUniquePartFromPfx( codeFileName  ) ) )
       mainExecFileName = os.path.join( dirForReplayFiles, "mainexecsafe_{}.py".format( RetrieveUniquePartFromPfx( codeFileName  ) ) )
       with open(mainExecFileName,"w") as f:
-        f.write( FinalCode.format( os.getcwd(), dirForReplayFiles, codeFileName, os.path.basename( contextFileName ), os.path.basename( resFileName ), outargsname, iorScriptLog ) )
+        f.write( FinalCode.format( os.getcwd(), dirForReplayFiles, exchangeMode, codeFileName, contextFileName, resFileName, outargsname, iorScriptLog ) )
       timeOut = KernelBasis.GetExecutionTimeOut()
       nbRetry = KernelBasis.GetNumberOfRetry()
       logging.debug( "Nb retry = {}   Timout in seconds = {}".format( nbRetry, timeOut ) )
       for iTry in range( nbRetry ):
         if iTry > 0:
           print( "WARNING : Retry # {}. Following code has generated non zero return code ( {} ). Trying again ... \n{}".format( iTry, returnCode, code ) )
+        logging.debug( "try # {} / {} --- ".format( iTry, nbRetry ) )
         p = sp.Popen(["python3", mainExecFileName],cwd = os.getcwd(),stdout = sp.PIPE, stderr = sp.PIPE)
         try:
           args = {}
@@ -1045,9 +1194,12 @@ sys.stderr.flush()""".format( MY_KEY_TO_DETECT_FINISH ) )
           if iTry >= 1:
             logging.warning( "At Retry #{} it's successful :)".format(iTry) )
           break
-    return returnCode, stdout, stderr, PythonFunctionEvaluatorParams(mainExecFileName,codeFileNameFull,contextFileName,resFileName)
+    return returnCode, stdout, stderr, PythonFunctionEvaluatorParams(mainExecFileName,codeFileNameFull,contextFileName,exCtx)
   ret = instanceOfLogOfCurrentSession._current_instance
-  returnCode, stdout, stderr, evParams = InternalExecResistant( code, context, outargsname )
+  exchangeMode = "File"
+  if not keepFilesToReplay:
+    exchangeMode = "TCP"
+  returnCode, stdout, stderr, evParams = InternalExecResistant( exchangeMode, code, context, outargsname )
   stdout = stdout.decode()
   stderr = stderr.decode()
   sys.stdout.write( stdout ) ; sys.stdout.flush()
@@ -1070,16 +1222,16 @@ sys.stderr.flush()""".format( MY_KEY_TO_DETECT_FINISH ) )
     raise RuntimeError(f"Subprocess launched {evParams.strDependingOnReturnCode(keepFilesToReplay,returnCode)}stdout :\n{stdout}\nstderr :\n{stderr}")
 
 def ExecCrashProofWithReplay( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ):
-  return ExecCrashProofGeneric(code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, True, False)
+  return ExecCrashProofGeneric(code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, keepFilesToReplay = True, closeEyesOnErrorAtExit = False)
 
 def ExecCrashProofWithoutReplay( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ):
-  return ExecCrashProofGeneric(code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, False, False)
+  return ExecCrashProofGeneric(code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, keepFilesToReplay = False, closeEyesOnErrorAtExit = False)
 
 def ExecCrashProofWithReplayFT( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ):
-  return ExecCrashProofGeneric(code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, True, True)
+  return ExecCrashProofGeneric(code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, keepFilesToReplay = True, closeEyesOnErrorAtExit = True)
 
 def ExecCrashProofWithoutReplayFT( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ):
-  return ExecCrashProofGeneric(code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, False, True)
+  return ExecCrashProofGeneric(code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, keepFilesToReplay = False, closeEyesOnErrorAtExit = True)
 
 def ExecLocal( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ):
   exec( code, context )
index ea53f6dda424acdf8b236bdc688e0737230ee8f4..eb30ec1cc442e0dc528393b4a05dd76385f877a3 100644 (file)
@@ -26,6 +26,7 @@ import pylauncher
 import SALOME_PyNode
 import KernelBasis
 import SALOME
+from SALOME_PyNode import UnProxyObjectSimple,DecrRefInFile
 
 import tempfile
 import glob
@@ -93,6 +94,15 @@ else:
 j = 44
 """
 
+FunnyCase_test6 = """
+import numpy as np
+import time
+nb = i.shape[0]
+del i
+#time.sleep(10)
+j = np.zeros(shape=(2*nb,),dtype=np.float64) 
+"""
+
 class testPerfLogManager1(unittest.TestCase):
     def test0(self):
         """
@@ -277,6 +287,7 @@ class testPerfLogManager1(unittest.TestCase):
         KernelBasis.SetNumberOfRetry(2)
         KernelBasis.SetExecutionTimeOut(10) # <= Key Point is here
         KernelBasis.SetPyExecutionMode("OutOfProcessNoReplayFT") # Fail tolerant
+        salome.cm.SetBigObjOnDiskThreshold(1000)
         with tempfile.TemporaryDirectory() as tmpdirname:
             os.chdir( tmpdirname )
             fnameFull = Path(tmpdirname) / fname # this file name will be used to voluntary prevent the first execution to return within 10 seconds. But for 2nd evaluation the execution will return within 10 secs.
@@ -301,6 +312,42 @@ class testPerfLogManager1(unittest.TestCase):
                 self.assertEqual( [elt[0] for elt in logInfoForCont[1][0].get().freestyle] , ['b4loadctx', 'afterloadctx', 'bforeexec', 'b4loadctx', 'afterloadctx', 'bforeexec', 'afterexec', 'strtdumpout', 'afterdump'] ) # <- aim of test is here. First 3 entries ('b4loadctx', 'afterloadctx', 'bforeexec') prove that first attempt fails to return within 10 sececonds as requested by KernelBasis.SetExecutionTimeOut(10)
                 pass
             pass
+        KernelBasis.SetExecutionTimeOut(-1)
+
+    def test6(self):
+        """
+        EDF30875 : test focusing on memory management in the context of OutOfProcessNoReplay using TCP/mem
+        """
+        import time
+        import numpy as np
+        import gc
+        szOfData = 125000000# 125 ktuples ~ 1GB
+        KernelBasis.SetPyExecutionMode("OutOfProcessNoReplay") # No replay -> corba channel
+        salome.cm.SetBigObjOnDiskThreshold(1)
+        with tempfile.TemporaryDirectory() as tmpdirname:
+            os.chdir( tmpdirname )
+            hostname = "localhost"
+            cp = pylauncher.GetRequestForGiveContainer(hostname,"container_crash_test_6")
+            salome.cm.SetDirectoryForReplayFiles( str( tmpdirname ) )
+            KernelBasis.SetBigObjOnDiskDirectory( str( tmpdirname ) )
+            with salome.ContainerLauncherCM(cp,True) as cont:
+                poa = salome.orb.resolve_initial_references("RootPOA")
+                arr = np.zeros(shape=(szOfData,),dtype=np.float64) # 125 ktuples ~ 1GB
+                obj = SALOME_PyNode.SenderByte_i(poa,pickle.dumps( (["i"],{"i": arr} ) )) ; id_o = poa.activate_object(obj) ; refPtr = poa.id_to_reference(id_o)
+                del obj
+                gc.collect()
+                pyscript = cont.createPyScriptNode("testScript6",FunnyCase_test6)
+                pyscript.executeFirst(refPtr)
+                ret = pyscript.executeSecond(["j"])
+                pxy = pickle.loads( SALOME_PyNode.SeqByteReceiver(ret[0]).data() ) # receiving twice size of input -> 2 GB
+                ret0 = UnProxyObjectSimple( pxy ) # it's a proxy -> un proxyfy it
+                DecrRefInFile( pxy.getFileName() )
+                logging.debug(f"start to sleep...{os.getpid()}")
+                self.assertEqual( ret0.shape[0], 2*szOfData )
+                del ret0
+                gc.collect()
+                #time.sleep(10)
+
 
 if __name__ == '__main__':
     from salome_utils import positionVerbosityOfLoggerRegardingState,setVerboseLevel,setVerbose