]> SALOME platform Git repositories - modules/kernel.git/commitdiff
Salome HOME
[EDF30875] : Robustify mem/mem exchange
authorAnthony Geay <anthony.geay@edf.fr>
Mon, 9 Sep 2024 16:22:00 +0000 (18:22 +0200)
committerAnthony Geay <anthony.geay@edf.fr>
Fri, 13 Sep 2024 07:19:58 +0000 (09:19 +0200)
idl/SALOME_PyNode.idl
src/Basics/KernelBasis.i
src/Container/SALOME_ContainerHelper.py
src/Container/SALOME_PyNode.py
src/Launcher/SALOME_LogManager.py
src/Launcher/Test/testCrashProofContainer.py

index e0baf0131f10136e0853e0d2982c64b3c685ed81..2953a51427e10afbaa723236e5fb20489a95d60c 100644 (file)
@@ -114,7 +114,8 @@ module Engines
   interface ContextExchanger
   {
     SALOME::SenderByte getInputContext() raises (SALOME::SALOME_Exception);
-    void pushOutputContext( in SALOME::SenderByte ctx ) raises (SALOME::SALOME_Exception);
+    void pushOutputContext( in pickledArgs ctx ) raises (SALOME::SALOME_Exception);
+    void finishPushContext() raises (SALOME::SALOME_Exception);
   };
 
 };
index 06e0e0e785dc91ab96039bfb08831e34520c7b0d..c94b1e92b62dd8b87d51fa2f01c5556cf26b4b68 100644 (file)
@@ -106,6 +106,11 @@ void WriteInStderr(const std::string& msg);
 
 %inline
 {
+void IncrementRefCounter( PyObject * obj )
+{
+  Py_XINCREF( obj );
+}
+
 PyObject *HeatMarcelSwig(double timeAjustment, unsigned int nbThreads = 0)
 {
   double timeInS = 0.0;
index da46cd6ed7ec2321244b00384c56d8ec2df4ae2c..eddce54eabe292209b8055b14b7494212e921358 100644 (file)
@@ -81,6 +81,9 @@ class ScriptExecInfo:
     def preappendFreestyle(self, value):
        self._freestyle_log = value + self._freestyle_log
 
+    def appendFreestyle(self, value):
+       self._freestyle_log += value
+
     @property
     def measureTimeResolution(self):
       return self._measure_time_resolution_ms
index 4f4129efe5b2e63e2fc74b0e811e69a942daea37..8cbbca92237206599d85e1bb4a064abcbd5fa136 100644 (file)
@@ -860,22 +860,23 @@ from SALOME_PyNode import LogOfCurrentExecutionSession,MY_PERFORMANCE_LOG_ENTRY_
 from SALOME_PyNode import ExchangeModeServerSideFactory
 from KernelBasis import VerbosityActivated,SetVerbosityLevel,SetVerbosityActivated
 from salome_utils import positionVerbosityOfLoggerRegardingState
-import CORBA
 import Engines
+import salome
 import os
+import sys
+salome.salome_init()
 from datetime import datetime
 SetVerbosityActivated( {} )
 SetVerbosityLevel( "{}" )
 positionVerbosityOfLoggerRegardingState()
 # WorkDir may be important to replay : "{}"
-orb = CORBA.ORB_init([''])
+orb = salome.orb
 caseDirectory = "{}"
 contextExchangeMode = "{}"
 bigObjExchanger = ExchangeModeServerSideFactory( contextExchangeMode )
 codeFileName = os.path.join( caseDirectory, "{}" )
 inputFileName = "{}"
 outputFileName = "{}"
-del os
 outputsKeys = {}
 exec( "{{}} = LogOfCurrentExecutionSession( orb.string_to_object( \\"{}\\" ) )".format(MY_PERFORMANCE_LOG_ENTRY_IN_GLBS) )
 if VerbosityActivated():
@@ -915,16 +916,16 @@ class PythonFunctionEvaluatorParams:
   def result(self):
     return self._out_context_filename.retrieveRemoteContext()
 
-  def destroyOnOK(self):
-    self._out_context_filename.removeContextSupport()
+  def destroyOnOKAndReplay(self):
+    self._out_context_filename.removeContextSupport( True )
     for fileToDestroy in [self._main_filename,self._code_filename,self._in_context_filename]:
       if os.path.exists( fileToDestroy ):
         os.unlink( fileToDestroy )
-  def destroyOnKO(self, containerRef):
+  def destroyOnKOAndReplay(self, containerRef):
     """
     Called in the context of failure with replay mode activated
     """
-    self._out_context_filename.removeContextSupport()
+    self._out_context_filename.removeContextSupport( False )
     # register to container files group associated to the
     containerRef.addLogFileNameGroup([self._main_filename,self._code_filename,self._in_context_filename])
 
@@ -982,7 +983,7 @@ class ExchangeContextUsingFile( ExchangeContextBridgeAbs ):
     with open(outputFileName,"wb") as f:
       pickle.dump( context, f )
 
-class ContextExchanger_i(Engines__POA.ContextExchanger,Generic):
+class ContextExchanger_i(Engines__POA.ContextExchanger):
   """
   In TCP mode, servant hosted SALOME_Container side.
   """
@@ -990,6 +991,7 @@ class ContextExchanger_i(Engines__POA.ContextExchanger,Generic):
     import salome
     self._poa = salome.orb.resolve_initial_references("RootPOA")
     self._in_ctx = inCtx
+    self._out_ctx = bytes(0)
 
   def getPOA(self):
     return self._poa
@@ -997,26 +999,42 @@ class ContextExchanger_i(Engines__POA.ContextExchanger,Generic):
   def getInputContext(self):
     obj = SenderByte_i(self._poa, pickle.dumps( self._in_ctx ) ) ; id_o = self._poa.activate_object(obj) ; refPtr = self._poa.id_to_reference(id_o)
     return refPtr
-
+    
   def pushOutputContext(self, ctx):
-    self._output_context = pickle.loads( SeqByteReceiver( ctx ).data() )
+    try:
+      self._out_ctx += ctx
+    except Exception as e:
+      raise SALOME.SALOME_Exception( SALOME.ExceptionStruct(SALOME.INTERNAL_ERROR,str(e),"pushOutputContext",0) )
+
+  def finishPushContext(self):
+    try:
+      self._output_context = pickle.loads( self._out_ctx )
+    except Exception as e:
+      raise SALOME.SALOME_Exception( SALOME.ExceptionStruct(SALOME.INTERNAL_ERROR,str(e),"finishPushContext",0) )
 
   def getOutputContext(self):
     return self._output_context
 
 class ExchangeContextUsingTCP( ExchangeContextBridgeAbs ):
   def buildContextPointEntry(self, caseDir, contextEntry):
-    import CORBA
-    self._orb = CORBA.ORB_init([''])
+    import salome
+    salome.salome_init()
+    self._orb = salome.orb
     self._data_exchange_channel = self._orb.string_to_object( contextEntry )
     return pickle.loads( SeqByteReceiver( self._data_exchange_channel.getInputContext() ).data() )
   
   def pushContext(self, contextPointEntry, context):
-    poa = self._orb.resolve_initial_references("RootPOA")
-    pman = poa.the_POAManager
-    pman.activate()
-    obj = SenderByte_i(poa, pickle.dumps( context) ) ; id_o = poa.activate_object(obj) ; refPtr = poa.id_to_reference(id_o)
-    self._data_exchange_channel.pushOutputContext( refPtr )
+    ctxBytes = pickle.dumps( context )
+    size = len( ctxBytes )
+    if size <= SeqByteReceiver.CHUNK_SIZE:
+      self._data_exchange_channel.pushOutputContext( ctxBytes )
+    else:
+      EFF_CHUNK_SIZE = SeqByteReceiver.CHUNK_SIZE // 8
+      iStart = 0 ; iEnd = EFF_CHUNK_SIZE
+      while iStart!=iEnd and iEnd <= size:
+        self._data_exchange_channel.pushOutputContext( ctxBytes[iStart:iEnd] )
+        iStart = iEnd; iEnd = min(iStart + EFF_CHUNK_SIZE,size)
+    self._data_exchange_channel.finishPushContext()
 
 def ExchangeModeServerSideFactory( exchangeMode ):
   if exchangeMode == "File":
@@ -1043,16 +1061,19 @@ class ExchangeContextCltAbs(abc.ABC):
     raise RuntimeError("Must be overloaded")
   
   @abc.abstractmethod
-  def removeContextSupport(self):
+  def removeContextSupport(self, isOK):
     raise RuntimeError("Must be overloaded")
 
 class ExchangeContextUsingFileClt(ExchangeContextCltAbs):
 
+  def __init__(self, keepFilesToReplay):
+    self._keep_in_files = keepFilesToReplay
+
   def hostInputContext(self, dirForReplayFiles, contextFileBaseName, context):
-    contextFileName = os.path.join( dirForReplayFiles, contextFileBaseName)
-    with open(contextFileName,"wb") as contextFd:
+    self._in_ctx_entry_point = os.path.join( dirForReplayFiles, contextFileBaseName)
+    with open(self._in_ctx_entry_point,"wb") as contextFd:
       pickle.dump( context, contextFd)
-    return os.path.basename( contextFileName )
+    return os.path.basename( self._in_ctx_entry_point )
   
   def setOutputContextEntryPoint(self, dirForReplayFiles, outCtxtEntryPoint):
     self._out_ctx_entry_point = os.path.join( dirForReplayFiles, outCtxtEntryPoint )
@@ -1062,9 +1083,13 @@ class ExchangeContextUsingFileClt(ExchangeContextCltAbs):
     with open(self._out_ctx_entry_point,"rb") as f:
       return pickle.load( f )
     
-  def removeContextSupport(self):
-    if os.path.exists( self._out_ctx_entry_point ):
-      os.unlink( self._out_ctx_entry_point )
+  def removeContextSupport(self, isOK):
+    fileNamesToRm = [self._out_ctx_entry_point]
+    if isOK or not self._keep_in_files:
+      fileNamesToRm += [self._in_ctx_entry_point]
+    for fileName in fileNamesToRm:
+      if os.path.exists( fileName ):
+        os.unlink( fileName )
 
 class ExchangeContextUsingTCPClt(ExchangeContextCltAbs):
 
@@ -1077,25 +1102,21 @@ class ExchangeContextUsingTCPClt(ExchangeContextCltAbs):
     return self._portable_serv_entry
   
   def setOutputContextEntryPoint(self, dirForReplayFiles, outCtxtEntryPoint):
-    ret = self._portable_serv_entry
-    del self._portable_serv_entry # destroy to release ref count
-    import gc
-    gc.collect()
-    return ret
+    return self._portable_serv_entry
   
   def retrieveRemoteContext(self):
     return self._servant.getOutputContext()
   
-  def removeContextSupport(self):
+  def removeContextSupport(self, isOK):# isOK ignored. Because in memory mode nothing to save
     poa = self._servant.getPOA()
     poa.deactivate_object(self._id_o)
     del self._servant
     import gc
     gc.collect()
     
-def ExchangeModeCltSideFactory( exchangeMode ):
+def ExchangeModeCltSideFactory( exchangeMode, keepFilesToReplay ):
   if exchangeMode == "File":
-    return ExchangeContextUsingFileClt()
+    return ExchangeContextUsingFileClt(keepFilesToReplay)
   elif exchangeMode == "TCP":
     return ExchangeContextUsingTCPClt()
   else:
@@ -1129,7 +1150,6 @@ def ExecCrashProofGeneric( code, context, outargsname, containerRef, instanceOfL
   import tempfile
   import pickle
   import subprocess as sp
-  import CORBA
   import logging
   #
   def IsConsideredAsOKRun( returnCode, closeEyesOnErrorAtExit , stderr ):
@@ -1148,9 +1168,11 @@ def ExecCrashProofGeneric( code, context, outargsname, containerRef, instanceOfL
       return False,stderr
 
   #
-  def InternalExecResistant( exchangeMode, code, context, outargsname):
+  def InternalExecResistant( exchangeMode, keepFilesToReplay, code, context, outargsname):
     import KernelBasis
-    orb = CORBA.ORB_init([''])
+    import salome
+    salome.salome_init()
+    orb = salome.orb
     iorScriptLog = orb.object_to_string( instanceOfLogOfCurrentSession._remote_handle )#ref ContainerScriptPerfLog_ptr
     ####
     EXEC_CODE_FNAME_PXF = "execsafe_"
@@ -1168,7 +1190,7 @@ import sys
 sys.stderr.write({!r})
 sys.stderr.flush()""".format( MY_KEY_TO_DETECT_FINISH ) )
       codeFd.flush()
-      exCtx = ExchangeModeCltSideFactory(exchangeMode)
+      exCtx = ExchangeModeCltSideFactory(exchangeMode, keepFilesToReplay)
       codeFileNameFull =  codeFd.name
       codeFileName = os.path.basename( codeFileNameFull )
       contextFileName = exCtx.hostInputContext(dirForReplayFiles, "contextsafe_{}.pckl".format( RetrieveUniquePartFromPfx( codeFileName  ) ), context)
@@ -1204,7 +1226,7 @@ sys.stderr.flush()""".format( MY_KEY_TO_DETECT_FINISH ) )
   exchangeMode = "File"
   if not keepFilesToReplay:
     exchangeMode = "TCP"
-  returnCode, stdout, stderr, evParams = InternalExecResistant( exchangeMode, code, context, outargsname )
+  returnCode, stdout, stderr, evParams = InternalExecResistant( exchangeMode, keepFilesToReplay, code, context, outargsname )
   stdout = stdout.decode()
   stderr = stderr.decode()
   sys.stdout.write( stdout ) ; sys.stdout.flush()
@@ -1215,15 +1237,15 @@ sys.stderr.flush()""".format( MY_KEY_TO_DETECT_FINISH ) )
     if len(pcklData) > 0:
       ret = pickle.loads( pcklData )
     context.update( evParams.result )
-    evParams.destroyOnOK()
+    evParams.destroyOnOKAndReplay()
     if returnCode != 0:
       print( "WARNING : Following code has generated non zero return code ( {} ) but considered as OK\n{}".format( returnCode, code ) )
     return ret
   else:
     if keepFilesToReplay:
-      evParams.destroyOnKO( containerRef )
+      evParams.destroyOnKOAndReplay( containerRef )
     else:
-      evParams.destroyOnOK()
+      evParams.destroyOnOKAndReplay()
     raise RuntimeError(f"Subprocess launched {evParams.strDependingOnReturnCode(keepFilesToReplay,returnCode)}stdout :\n{stdout}\nstderr :\n{stderr}")
 
 def ExecCrashProofWithReplay( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ):
@@ -1414,6 +1436,7 @@ class PyScriptNode_Abstract_i(Engines__POA.PyScriptNode,Generic,abc.ABC):
       self.addTimeInfoOnLevel2("startExecTime")
       ##
       self.addInfoOnLevel2("measureTimeResolution",self.my_container_py.monitoringtimeresms())
+      self._current_execution_session.finalizeAndPushToMaster() # flush it to the server (scheduler hosting ContainerManager) in order to retrieve it in Python server
       cpumeminfo, self._current_execution_session._current_instance = executeSecondInternal( self.my_container_py.monitoringtimeresms() )
       ##
       self.addInfoOnLevel2("CPUMemDuringExec",cpumeminfo)
index 58114fc413be3453069c667f74d276da0f534c37..7404a35cfbed23199cfa2d95a3606517b55cc89d 100644 (file)
@@ -45,16 +45,20 @@ class SALOME_ContainerScriptExecPerfLog:
     :param alreadyOnSiteBytes: pickle of instance of ScriptExecInfo of previous value (if any) [bytes]
     :param instanceRemoteBytes: pickle of current instance of ScriptExecInfo (if any) [bytes]
     """
+    instanceRemote = pickle.loads(instanceRemoteBytes)
     alreadyOnSite = None
     if len( alreadyOnSiteBytes ) > 0:
       alreadyOnSite = pickle.loads(alreadyOnSiteBytes)
-    instanceRemote = pickle.loads(instanceRemoteBytes)
     self._stop_pos = os.path.getsize( self.father.father.logfilename )
-    setattr(instanceRemote,"tracePosStop",self._stop_pos)
-    setattr(instanceRemote,"tracePosStart",self._start_pos)
     if alreadyOnSite:
-      instanceRemote.preappendFreestyle( alreadyOnSite._freestyle_log )
-    return pickle.dumps(instanceRemote)
+      setattr(alreadyOnSite,"tracePosStop",self._stop_pos)
+      setattr(alreadyOnSite,"tracePosStart",self._start_pos)
+      alreadyOnSite.appendFreestyle( instanceRemote._freestyle_log )
+      return pickle.dumps(alreadyOnSite)
+    else:
+      setattr(instanceRemote,"tracePosStop",self._stop_pos)
+      setattr(instanceRemote,"tracePosStart",self._start_pos)
+      return pickle.dumps(instanceRemote)
 
   def start(self):
     self._start_pos = os.path.getsize( self.father.father.logfilename )
index eb30ec1cc442e0dc528393b4a05dd76385f877a3..9fe5aa7853318f74fa404aa0d9a373fe77d23d2b 100644 (file)
@@ -103,6 +103,12 @@ del i
 j = np.zeros(shape=(2*nb,),dtype=np.float64) 
 """
 
+FunnyCase_test7 = """
+import numpy as np
+nb = i.shape[0]
+j = np.zeros(shape=(2*nb,),dtype=np.float64) 
+"""
+
 class testPerfLogManager1(unittest.TestCase):
     def test0(self):
         """
@@ -309,6 +315,7 @@ class testPerfLogManager1(unittest.TestCase):
                 logInfoForCont = [elt for elt in a if "container_crash_test_5" in elt.ns_entry]
                 self.assertEqual( len(logInfoForCont), 1 )
                 logInfoForCont = logInfoForCont[0]
+                self.assertTrue( logInfoForCont[1][0].startExecTime is not None )
                 self.assertEqual( [elt[0] for elt in logInfoForCont[1][0].get().freestyle] , ['b4loadctx', 'afterloadctx', 'bforeexec', 'b4loadctx', 'afterloadctx', 'bforeexec', 'afterexec', 'strtdumpout', 'afterdump'] ) # <- aim of test is here. First 3 entries ('b4loadctx', 'afterloadctx', 'bforeexec') prove that first attempt fails to return within 10 sececonds as requested by KernelBasis.SetExecutionTimeOut(10)
                 pass
             pass
@@ -348,6 +355,62 @@ class testPerfLogManager1(unittest.TestCase):
                 gc.collect()
                 #time.sleep(10)
 
+    def test7(self):
+        """
+        [EDF30875] : Garanty that DirectoryForReplay is clean after execution.
+        """
+        import numpy as np
+        import gc
+        szOfData = 12000
+        KernelBasis.SetPyExecutionMode("OutOfProcessWithReplayFT")
+        salome.cm.SetBigObjOnDiskThreshold( 1 ) # enable proxy
+        with tempfile.TemporaryDirectory() as tmpdirname:
+            os.chdir( tmpdirname )
+            hostname = "localhost"
+            cp = pylauncher.GetRequestForGiveContainer(hostname,"container_crash_test_7")
+            salome.cm.SetDirectoryForReplayFiles( str( tmpdirname ) )
+            KernelBasis.SetBigObjOnDiskDirectory( str( tmpdirname ) )
+            with salome.ContainerLauncherCM(cp,True) as cont:
+                poa = salome.orb.resolve_initial_references("RootPOA")
+                arr = np.zeros(shape=(szOfData,),dtype=np.float64)
+                obj = SALOME_PyNode.SenderByte_i(poa,pickle.dumps( (["i"],{"i": arr} ) )) ; id_o = poa.activate_object(obj) ; refPtr = poa.id_to_reference(id_o)
+                gc.collect()
+                pyscript = cont.createPyScriptNode("testScript",FunnyCase_test7)
+                pyscript.executeFirst(refPtr)
+                ret = pyscript.executeSecond(["j"])
+                pxy = pickle.loads( SALOME_PyNode.SeqByteReceiver(ret[0]).data() ) # receiving twice size of input -> 2 GB
+                ret0 = UnProxyObjectSimple( pxy ) # it's a proxy -> un proxyfy it
+                DecrRefInFile( pxy.getFileName() )
+                self.assertEqual( len( os.listdir( str( tmpdirname ) ) ) , 0 ) # very important it must be clean
+
+    def test8(self):
+        """
+        [EDF30875] : same than test7 but with OutOfProcessWithReplay.
+        """
+        import numpy as np
+        import gc
+        szOfData = 12000
+        KernelBasis.SetPyExecutionMode("OutOfProcessWithReplay")
+        salome.cm.SetBigObjOnDiskThreshold( 1 ) # enable proxy
+        with tempfile.TemporaryDirectory() as tmpdirname:
+            os.chdir( tmpdirname )
+            hostname = "localhost"
+            cp = pylauncher.GetRequestForGiveContainer(hostname,"container_crash_test_8")
+            salome.cm.SetDirectoryForReplayFiles( str( tmpdirname ) )
+            KernelBasis.SetBigObjOnDiskDirectory( str( tmpdirname ) )
+            with salome.ContainerLauncherCM(cp,True) as cont:
+                poa = salome.orb.resolve_initial_references("RootPOA")
+                arr = np.zeros(shape=(szOfData,),dtype=np.float64)
+                obj = SALOME_PyNode.SenderByte_i(poa,pickle.dumps( (["i"],{"i": arr} ) )) ; id_o = poa.activate_object(obj) ; refPtr = poa.id_to_reference(id_o)
+                gc.collect()
+                pyscript = cont.createPyScriptNode("testScript",FunnyCase_test7)
+                pyscript.executeFirst(refPtr)
+                ret = pyscript.executeSecond(["j"])
+                pxy = pickle.loads( SALOME_PyNode.SeqByteReceiver(ret[0]).data() ) # receiving twice size of input -> 2 GB
+                ret0 = UnProxyObjectSimple( pxy ) # it's a proxy -> un proxyfy it
+                DecrRefInFile( pxy.getFileName() )
+                self.assertEqual( len( os.listdir( str( tmpdirname ) ) ) , 0 ) # very important it must be clean
+
 
 if __name__ == '__main__':
     from salome_utils import positionVerbosityOfLoggerRegardingState,setVerboseLevel,setVerbose