From 3f380a18e53c8b2c8da97ec2adba820ff130134c Mon Sep 17 00:00:00 2001 From: Anthony Geay Date: Mon, 9 Sep 2024 09:04:14 +0200 Subject: [PATCH] [EDF30875] : In OutOfProcessNoReplay and OutOfProcessNoReplayFT mode use CORBA/Mem data channel to transfert --- idl/SALOME_PyNode.idl | 6 + src/Container/SALOME_PyNode.py | 212 ++++++++++++++++--- src/Launcher/Test/testCrashProofContainer.py | 47 ++++ 3 files changed, 235 insertions(+), 30 deletions(-) diff --git a/idl/SALOME_PyNode.idl b/idl/SALOME_PyNode.idl index dfe699d3f..e0baf0131 100644 --- a/idl/SALOME_PyNode.idl +++ b/idl/SALOME_PyNode.idl @@ -111,6 +111,12 @@ module Engines void removeAllVarsInContext() raises (SALOME::SALOME_Exception); } ; + interface ContextExchanger + { + SALOME::SenderByte getInputContext() raises (SALOME::SALOME_Exception); + void pushOutputContext( in SALOME::SenderByte ctx ) raises (SALOME::SALOME_Exception); + }; + }; #endif diff --git a/src/Container/SALOME_PyNode.py b/src/Container/SALOME_PyNode.py index 062bea530..6ec1afc94 100644 --- a/src/Container/SALOME_PyNode.py +++ b/src/Container/SALOME_PyNode.py @@ -36,6 +36,8 @@ import Engines__POA import KernelBasis import SALOME import SALOME__POA +import Engines +import Engines__POA from SALOME_ContainerHelper import ScriptExecInfo MY_CONTAINER_ENTRY_IN_GLBS = "my_container" @@ -855,6 +857,7 @@ class SeqByteReceiver: FinalCode = """import pickle from SALOME_PyNode import LogOfCurrentExecutionSession,MY_PERFORMANCE_LOG_ENTRY_IN_GLBS +from SALOME_PyNode import ExchangeModeServerSideFactory from KernelBasis import VerbosityActivated import CORBA import Engines @@ -863,16 +866,17 @@ from datetime import datetime # WorkDir may be important to replay : "{}" orb = CORBA.ORB_init(['']) caseDirectory = "{}" +contextExchangeMode = "{}" +bigObjExchanger = ExchangeModeServerSideFactory( contextExchangeMode ) codeFileName = os.path.join( caseDirectory, "{}" ) -inputFileName = os.path.join( caseDirectory, "{}" ) -outputFileName = os.path.join( caseDirectory, "{}" ) +inputFileName = "{}" +outputFileName = "{}" del os outputsKeys = {} exec( "{{}} = LogOfCurrentExecutionSession( orb.string_to_object( \\"{}\\" ) )".format(MY_PERFORMANCE_LOG_ENTRY_IN_GLBS) ) if VerbosityActivated(): eval( "{{}}".format(MY_PERFORMANCE_LOG_ENTRY_IN_GLBS) ).addFreestyleAndFlush( ("b4loadctx",datetime.now()) ) -with open(inputFileName,"rb") as f: - context = pickle.load( f ) +context = bigObjExchanger.buildContextPointEntry(caseDirectory,inputFileName) if VerbosityActivated(): eval( "{{}}".format(MY_PERFORMANCE_LOG_ENTRY_IN_GLBS) ).addFreestyleAndFlush( ("afterloadctx",datetime.now()) ) context[MY_PERFORMANCE_LOG_ENTRY_IN_GLBS] = eval( MY_PERFORMANCE_LOG_ENTRY_IN_GLBS ) @@ -889,36 +893,37 @@ context = dict( [(k,v) for k,v in context.items() if k in outputsKeys] ) if VerbosityActivated(): eval( "{{}}".format(MY_PERFORMANCE_LOG_ENTRY_IN_GLBS) ).addFreestyleAndFlush( ("strtdumpout",datetime.now()) ) # -with open(outputFileName,"wb") as f: - pickle.dump( context, f ) +bigObjExchanger.pushContext(outputFileName,context) if VerbosityActivated(): eval( "{{}}".format(MY_PERFORMANCE_LOG_ENTRY_IN_GLBS) ).addFreestyleAndFlush( ("afterdump",datetime.now()) ) """ class PythonFunctionEvaluatorParams: def __init__(self, mainFileName, codeFileName, inContextFileName, outContextFileName): + """ + :param outContextFileName: instance of ExchangeContextCltAbs + """ self._main_filename = mainFileName self._code_filename = codeFileName self._in_context_filename = inContextFileName self._out_context_filename = outContextFileName @property def result(self): - import pickle - with open(self._out_context_filename,"rb") as f: - return pickle.load( f ) + return self._out_context_filename.retrieveRemoteContext() + def destroyOnOK(self): - for fileToDestroy in [self._main_filename,self._code_filename,self._in_context_filename,self._out_context_filename]: + self._out_context_filename.removeContextSupport() + for fileToDestroy in [self._main_filename,self._code_filename,self._in_context_filename]: if os.path.exists( fileToDestroy ): os.unlink( fileToDestroy ) def destroyOnKO(self, containerRef): - """ - Called in the context of failure with replay mode activated - """ - for fileToDestroy in [self._out_context_filename]: - if os.path.exists( fileToDestroy ): - os.unlink( fileToDestroy ) - # register to container files group associated to the - containerRef.addLogFileNameGroup([self._main_filename,self._code_filename,self._in_context_filename]) + """ + Called in the context of failure with replay mode activated + """ + self._out_context_filename.removeContextSupport() + # register to container files group associated to the + containerRef.addLogFileNameGroup([self._main_filename,self._code_filename,self._in_context_filename]) + @property def replayCmd(self): return "To replay : ( cd {} && python3 {} )".format(os.path.dirname(self._main_filename),os.path.basename(self._main_filename)) @@ -948,6 +953,150 @@ Looks like a hard crash as returnCode {returnCode} != 0 {banner} """ +class ExchangeContextBridgeAbs(abc.ABC): + def __init__(self): + pass + + @abc.abstractmethod + def buildContextPointEntry(self, caseDir, contextEntry): + raise RuntimeError("Must be overloaded") + + @abc.abstractmethod + def pushContext(self, contextPointEntry, contextEntry): + raise RuntimeError("Must be overloaded") + +class ExchangeContextUsingFile( ExchangeContextBridgeAbs ): + def buildContextPointEntry(self, caseDir, contextEntry): + self._case_dir = caseDir + inputFileName = os.path.join( self._case_dir, contextEntry ) + with open(inputFileName,"rb") as f: + context = pickle.load( f ) + return context + + def pushContext(self, contextPointEntry, context): + outputFileName = os.path.join( self._case_dir, contextPointEntry ) + with open(outputFileName,"wb") as f: + pickle.dump( context, f ) + +class ContextExchanger_i(Engines__POA.ContextExchanger,Generic): + """ + In TCP mode, servant hosted SALOME_Container side. + """ + def __init__(self, inCtx): + import salome + self._poa = salome.orb.resolve_initial_references("RootPOA") + self._in_ctx = inCtx + + def getPOA(self): + return self._poa + + def getInputContext(self): + obj = SenderByte_i(self._poa, pickle.dumps( self._in_ctx ) ) ; id_o = self._poa.activate_object(obj) ; refPtr = self._poa.id_to_reference(id_o) + return refPtr + + def pushOutputContext(self, ctx): + self._output_context = pickle.loads( SeqByteReceiver( ctx ).data() ) + + def getOutputContext(self): + return self._output_context + +class ExchangeContextUsingTCP( ExchangeContextBridgeAbs ): + def buildContextPointEntry(self, caseDir, contextEntry): + import CORBA + self._orb = CORBA.ORB_init(['']) + self._data_exchange_channel = self._orb.string_to_object( contextEntry ) + return pickle.loads( SeqByteReceiver( self._data_exchange_channel.getInputContext() ).data() ) + + def pushContext(self, contextPointEntry, context): + poa = self._orb.resolve_initial_references("RootPOA") + pman = poa.the_POAManager + pman.activate() + obj = SenderByte_i(poa, pickle.dumps( context) ) ; id_o = poa.activate_object(obj) ; refPtr = poa.id_to_reference(id_o) + self._data_exchange_channel.pushOutputContext( refPtr ) + +def ExchangeModeServerSideFactory( exchangeMode ): + if exchangeMode == "File": + return ExchangeContextUsingFile() + elif exchangeMode == "TCP": + return ExchangeContextUsingTCP() + else: + raise RuntimeError( "ExchangeModeServerSideFactory : {} not in [File,TCP]" ) + +class ExchangeContextCltAbs(abc.ABC): + def __init__(self): + pass + + @abc.abstractmethod + def hostInputContext(self, dirForReplayFiles, contextFileBaseName, context): + raise RuntimeError("Must be overloaded") + + @abc.abstractmethod + def setOutputContextEntryPoint(self, dirForReplayFiles, outCtxtEntryPoint): + raise RuntimeError("Must be overloaded") + + @abc.abstractmethod + def retrieveRemoteContext(self): + raise RuntimeError("Must be overloaded") + + @abc.abstractmethod + def removeContextSupport(self): + raise RuntimeError("Must be overloaded") + +class ExchangeContextUsingFileClt(ExchangeContextCltAbs): + + def hostInputContext(self, dirForReplayFiles, contextFileBaseName, context): + contextFileName = os.path.join( dirForReplayFiles, contextFileBaseName) + with open(contextFileName,"wb") as contextFd: + pickle.dump( context, contextFd) + return os.path.basename( contextFileName ) + + def setOutputContextEntryPoint(self, dirForReplayFiles, outCtxtEntryPoint): + self._out_ctx_entry_point = os.path.join( dirForReplayFiles, outCtxtEntryPoint ) + return os.path.basename( self._out_ctx_entry_point ) + + def retrieveRemoteContext(self): + with open(self._out_ctx_entry_point,"rb") as f: + return pickle.load( f ) + + def removeContextSupport(self): + if os.path.exists( self._out_ctx_entry_point ): + os.unlink( self._out_ctx_entry_point ) + +class ExchangeContextUsingTCPClt(ExchangeContextCltAbs): + + def hostInputContext(self, dirForReplayFiles, contextFileBaseName, context): + import salome + self._servant = ContextExchanger_i(context) + poa = self._servant.getPOA() + self._id_o = poa.activate_object(self._servant) ; refPtr = poa.id_to_reference(self._id_o) + self._portable_serv_entry = salome.orb.object_to_string( refPtr ) + return self._portable_serv_entry + + def setOutputContextEntryPoint(self, dirForReplayFiles, outCtxtEntryPoint): + ret = self._portable_serv_entry + del self._portable_serv_entry # destroy to release ref count + import gc + gc.collect() + return ret + + def retrieveRemoteContext(self): + return self._servant.getOutputContext() + + def removeContextSupport(self): + poa = self._servant.getPOA() + poa.deactivate_object(self._id_o) + del self._servant + import gc + gc.collect() + +def ExchangeModeCltSideFactory( exchangeMode ): + if exchangeMode == "File": + return ExchangeContextUsingFileClt() + elif exchangeMode == "TCP": + return ExchangeContextUsingTCPClt() + else: + raise RuntimeError( "ExchangeModeCltSideFactory : {} not in [File,TCP]" ) + def ExecCrashProofGeneric( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, keepFilesToReplay, closeEyesOnErrorAtExit): """ Equivalent of exec(code,context) but executed in a separate subprocess to avoid to make the current process crash. @@ -995,7 +1144,7 @@ def ExecCrashProofGeneric( code, context, outargsname, containerRef, instanceOfL return False,stderr # - def InternalExecResistant( code, context, outargsname): + def InternalExecResistant( exchangeMode, code, context, outargsname): import KernelBasis orb = CORBA.ORB_init(['']) iorScriptLog = orb.object_to_string( instanceOfLogOfCurrentSession._remote_handle )#ref ContainerScriptPerfLog_ptr @@ -1015,21 +1164,21 @@ import sys sys.stderr.write({!r}) sys.stderr.flush()""".format( MY_KEY_TO_DETECT_FINISH ) ) codeFd.flush() + exCtx = ExchangeModeCltSideFactory(exchangeMode) codeFileNameFull = codeFd.name codeFileName = os.path.basename( codeFileNameFull ) - contextFileName = os.path.join( dirForReplayFiles, "contextsafe_{}.pckl".format( RetrieveUniquePartFromPfx( codeFileName ) ) ) - with open(contextFileName,"wb") as contextFd: - pickle.dump( context, contextFd) - resFileName = os.path.join( dirForReplayFiles, "outcontextsafe_{}.pckl".format( RetrieveUniquePartFromPfx( codeFileName ) ) ) + contextFileName = exCtx.hostInputContext(dirForReplayFiles, "contextsafe_{}.pckl".format( RetrieveUniquePartFromPfx( codeFileName ) ), context) + resFileName = exCtx.setOutputContextEntryPoint( dirForReplayFiles, "outcontextsafe_{}.pckl".format( RetrieveUniquePartFromPfx( codeFileName ) ) ) mainExecFileName = os.path.join( dirForReplayFiles, "mainexecsafe_{}.py".format( RetrieveUniquePartFromPfx( codeFileName ) ) ) with open(mainExecFileName,"w") as f: - f.write( FinalCode.format( os.getcwd(), dirForReplayFiles, codeFileName, os.path.basename( contextFileName ), os.path.basename( resFileName ), outargsname, iorScriptLog ) ) + f.write( FinalCode.format( os.getcwd(), dirForReplayFiles, exchangeMode, codeFileName, contextFileName, resFileName, outargsname, iorScriptLog ) ) timeOut = KernelBasis.GetExecutionTimeOut() nbRetry = KernelBasis.GetNumberOfRetry() logging.debug( "Nb retry = {} Timout in seconds = {}".format( nbRetry, timeOut ) ) for iTry in range( nbRetry ): if iTry > 0: print( "WARNING : Retry # {}. Following code has generated non zero return code ( {} ). Trying again ... \n{}".format( iTry, returnCode, code ) ) + logging.debug( "try # {} / {} --- ".format( iTry, nbRetry ) ) p = sp.Popen(["python3", mainExecFileName],cwd = os.getcwd(),stdout = sp.PIPE, stderr = sp.PIPE) try: args = {} @@ -1045,9 +1194,12 @@ sys.stderr.flush()""".format( MY_KEY_TO_DETECT_FINISH ) ) if iTry >= 1: logging.warning( "At Retry #{} it's successful :)".format(iTry) ) break - return returnCode, stdout, stderr, PythonFunctionEvaluatorParams(mainExecFileName,codeFileNameFull,contextFileName,resFileName) + return returnCode, stdout, stderr, PythonFunctionEvaluatorParams(mainExecFileName,codeFileNameFull,contextFileName,exCtx) ret = instanceOfLogOfCurrentSession._current_instance - returnCode, stdout, stderr, evParams = InternalExecResistant( code, context, outargsname ) + exchangeMode = "File" + if not keepFilesToReplay: + exchangeMode = "TCP" + returnCode, stdout, stderr, evParams = InternalExecResistant( exchangeMode, code, context, outargsname ) stdout = stdout.decode() stderr = stderr.decode() sys.stdout.write( stdout ) ; sys.stdout.flush() @@ -1070,16 +1222,16 @@ sys.stderr.flush()""".format( MY_KEY_TO_DETECT_FINISH ) ) raise RuntimeError(f"Subprocess launched {evParams.strDependingOnReturnCode(keepFilesToReplay,returnCode)}stdout :\n{stdout}\nstderr :\n{stderr}") def ExecCrashProofWithReplay( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ): - return ExecCrashProofGeneric(code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, True, False) + return ExecCrashProofGeneric(code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, keepFilesToReplay = True, closeEyesOnErrorAtExit = False) def ExecCrashProofWithoutReplay( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ): - return ExecCrashProofGeneric(code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, False, False) + return ExecCrashProofGeneric(code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, keepFilesToReplay = False, closeEyesOnErrorAtExit = False) def ExecCrashProofWithReplayFT( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ): - return ExecCrashProofGeneric(code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, True, True) + return ExecCrashProofGeneric(code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, keepFilesToReplay = True, closeEyesOnErrorAtExit = True) def ExecCrashProofWithoutReplayFT( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ): - return ExecCrashProofGeneric(code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, False, True) + return ExecCrashProofGeneric(code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, keepFilesToReplay = False, closeEyesOnErrorAtExit = True) def ExecLocal( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ): exec( code, context ) diff --git a/src/Launcher/Test/testCrashProofContainer.py b/src/Launcher/Test/testCrashProofContainer.py index ea53f6dda..eb30ec1cc 100644 --- a/src/Launcher/Test/testCrashProofContainer.py +++ b/src/Launcher/Test/testCrashProofContainer.py @@ -26,6 +26,7 @@ import pylauncher import SALOME_PyNode import KernelBasis import SALOME +from SALOME_PyNode import UnProxyObjectSimple,DecrRefInFile import tempfile import glob @@ -93,6 +94,15 @@ else: j = 44 """ +FunnyCase_test6 = """ +import numpy as np +import time +nb = i.shape[0] +del i +#time.sleep(10) +j = np.zeros(shape=(2*nb,),dtype=np.float64) +""" + class testPerfLogManager1(unittest.TestCase): def test0(self): """ @@ -277,6 +287,7 @@ class testPerfLogManager1(unittest.TestCase): KernelBasis.SetNumberOfRetry(2) KernelBasis.SetExecutionTimeOut(10) # <= Key Point is here KernelBasis.SetPyExecutionMode("OutOfProcessNoReplayFT") # Fail tolerant + salome.cm.SetBigObjOnDiskThreshold(1000) with tempfile.TemporaryDirectory() as tmpdirname: os.chdir( tmpdirname ) fnameFull = Path(tmpdirname) / fname # this file name will be used to voluntary prevent the first execution to return within 10 seconds. But for 2nd evaluation the execution will return within 10 secs. @@ -301,6 +312,42 @@ class testPerfLogManager1(unittest.TestCase): self.assertEqual( [elt[0] for elt in logInfoForCont[1][0].get().freestyle] , ['b4loadctx', 'afterloadctx', 'bforeexec', 'b4loadctx', 'afterloadctx', 'bforeexec', 'afterexec', 'strtdumpout', 'afterdump'] ) # <- aim of test is here. First 3 entries ('b4loadctx', 'afterloadctx', 'bforeexec') prove that first attempt fails to return within 10 sececonds as requested by KernelBasis.SetExecutionTimeOut(10) pass pass + KernelBasis.SetExecutionTimeOut(-1) + + def test6(self): + """ + EDF30875 : test focusing on memory management in the context of OutOfProcessNoReplay using TCP/mem + """ + import time + import numpy as np + import gc + szOfData = 125000000# 125 ktuples ~ 1GB + KernelBasis.SetPyExecutionMode("OutOfProcessNoReplay") # No replay -> corba channel + salome.cm.SetBigObjOnDiskThreshold(1) + with tempfile.TemporaryDirectory() as tmpdirname: + os.chdir( tmpdirname ) + hostname = "localhost" + cp = pylauncher.GetRequestForGiveContainer(hostname,"container_crash_test_6") + salome.cm.SetDirectoryForReplayFiles( str( tmpdirname ) ) + KernelBasis.SetBigObjOnDiskDirectory( str( tmpdirname ) ) + with salome.ContainerLauncherCM(cp,True) as cont: + poa = salome.orb.resolve_initial_references("RootPOA") + arr = np.zeros(shape=(szOfData,),dtype=np.float64) # 125 ktuples ~ 1GB + obj = SALOME_PyNode.SenderByte_i(poa,pickle.dumps( (["i"],{"i": arr} ) )) ; id_o = poa.activate_object(obj) ; refPtr = poa.id_to_reference(id_o) + del obj + gc.collect() + pyscript = cont.createPyScriptNode("testScript6",FunnyCase_test6) + pyscript.executeFirst(refPtr) + ret = pyscript.executeSecond(["j"]) + pxy = pickle.loads( SALOME_PyNode.SeqByteReceiver(ret[0]).data() ) # receiving twice size of input -> 2 GB + ret0 = UnProxyObjectSimple( pxy ) # it's a proxy -> un proxyfy it + DecrRefInFile( pxy.getFileName() ) + logging.debug(f"start to sleep...{os.getpid()}") + self.assertEqual( ret0.shape[0], 2*szOfData ) + del ret0 + gc.collect() + #time.sleep(10) + if __name__ == '__main__': from salome_utils import positionVerbosityOfLoggerRegardingState,setVerboseLevel,setVerbose -- 2.39.2