From 5c9439aea6714a474239583e8e6eb80542d3811f Mon Sep 17 00:00:00 2001 From: Anthony Geay Date: Mon, 9 Sep 2024 18:22:00 +0200 Subject: [PATCH] [EDF30875] : Robustify mem/mem exchange --- idl/SALOME_PyNode.idl | 3 +- src/Basics/KernelBasis.i | 5 + src/Container/SALOME_ContainerHelper.py | 3 + src/Container/SALOME_PyNode.py | 103 ++++++++++++------- src/Launcher/SALOME_LogManager.py | 14 ++- src/Launcher/Test/testCrashProofContainer.py | 63 ++++++++++++ 6 files changed, 145 insertions(+), 46 deletions(-) diff --git a/idl/SALOME_PyNode.idl b/idl/SALOME_PyNode.idl index e0baf0131..2953a5142 100644 --- a/idl/SALOME_PyNode.idl +++ b/idl/SALOME_PyNode.idl @@ -114,7 +114,8 @@ module Engines interface ContextExchanger { SALOME::SenderByte getInputContext() raises (SALOME::SALOME_Exception); - void pushOutputContext( in SALOME::SenderByte ctx ) raises (SALOME::SALOME_Exception); + void pushOutputContext( in pickledArgs ctx ) raises (SALOME::SALOME_Exception); + void finishPushContext() raises (SALOME::SALOME_Exception); }; }; diff --git a/src/Basics/KernelBasis.i b/src/Basics/KernelBasis.i index 06e0e0e78..c94b1e92b 100644 --- a/src/Basics/KernelBasis.i +++ b/src/Basics/KernelBasis.i @@ -106,6 +106,11 @@ void WriteInStderr(const std::string& msg); %inline { +void IncrementRefCounter( PyObject * obj ) +{ + Py_XINCREF( obj ); +} + PyObject *HeatMarcelSwig(double timeAjustment, unsigned int nbThreads = 0) { double timeInS = 0.0; diff --git a/src/Container/SALOME_ContainerHelper.py b/src/Container/SALOME_ContainerHelper.py index da46cd6ed..eddce54ea 100644 --- a/src/Container/SALOME_ContainerHelper.py +++ b/src/Container/SALOME_ContainerHelper.py @@ -81,6 +81,9 @@ class ScriptExecInfo: def preappendFreestyle(self, value): self._freestyle_log = value + self._freestyle_log + def appendFreestyle(self, value): + self._freestyle_log += value + @property def measureTimeResolution(self): return self._measure_time_resolution_ms diff --git a/src/Container/SALOME_PyNode.py b/src/Container/SALOME_PyNode.py index 4f4129efe..8cbbca922 100644 --- a/src/Container/SALOME_PyNode.py +++ b/src/Container/SALOME_PyNode.py @@ -860,22 +860,23 @@ from SALOME_PyNode import LogOfCurrentExecutionSession,MY_PERFORMANCE_LOG_ENTRY_ from SALOME_PyNode import ExchangeModeServerSideFactory from KernelBasis import VerbosityActivated,SetVerbosityLevel,SetVerbosityActivated from salome_utils import positionVerbosityOfLoggerRegardingState -import CORBA import Engines +import salome import os +import sys +salome.salome_init() from datetime import datetime SetVerbosityActivated( {} ) SetVerbosityLevel( "{}" ) positionVerbosityOfLoggerRegardingState() # WorkDir may be important to replay : "{}" -orb = CORBA.ORB_init(['']) +orb = salome.orb caseDirectory = "{}" contextExchangeMode = "{}" bigObjExchanger = ExchangeModeServerSideFactory( contextExchangeMode ) codeFileName = os.path.join( caseDirectory, "{}" ) inputFileName = "{}" outputFileName = "{}" -del os outputsKeys = {} exec( "{{}} = LogOfCurrentExecutionSession( orb.string_to_object( \\"{}\\" ) )".format(MY_PERFORMANCE_LOG_ENTRY_IN_GLBS) ) if VerbosityActivated(): @@ -915,16 +916,16 @@ class PythonFunctionEvaluatorParams: def result(self): return self._out_context_filename.retrieveRemoteContext() - def destroyOnOK(self): - self._out_context_filename.removeContextSupport() + def destroyOnOKAndReplay(self): + self._out_context_filename.removeContextSupport( True ) for fileToDestroy in [self._main_filename,self._code_filename,self._in_context_filename]: if os.path.exists( fileToDestroy ): os.unlink( fileToDestroy ) - def destroyOnKO(self, containerRef): + def destroyOnKOAndReplay(self, containerRef): """ Called in the context of failure with replay mode activated """ - self._out_context_filename.removeContextSupport() + self._out_context_filename.removeContextSupport( False ) # register to container files group associated to the containerRef.addLogFileNameGroup([self._main_filename,self._code_filename,self._in_context_filename]) @@ -982,7 +983,7 @@ class ExchangeContextUsingFile( ExchangeContextBridgeAbs ): with open(outputFileName,"wb") as f: pickle.dump( context, f ) -class ContextExchanger_i(Engines__POA.ContextExchanger,Generic): +class ContextExchanger_i(Engines__POA.ContextExchanger): """ In TCP mode, servant hosted SALOME_Container side. """ @@ -990,6 +991,7 @@ class ContextExchanger_i(Engines__POA.ContextExchanger,Generic): import salome self._poa = salome.orb.resolve_initial_references("RootPOA") self._in_ctx = inCtx + self._out_ctx = bytes(0) def getPOA(self): return self._poa @@ -997,26 +999,42 @@ class ContextExchanger_i(Engines__POA.ContextExchanger,Generic): def getInputContext(self): obj = SenderByte_i(self._poa, pickle.dumps( self._in_ctx ) ) ; id_o = self._poa.activate_object(obj) ; refPtr = self._poa.id_to_reference(id_o) return refPtr - + def pushOutputContext(self, ctx): - self._output_context = pickle.loads( SeqByteReceiver( ctx ).data() ) + try: + self._out_ctx += ctx + except Exception as e: + raise SALOME.SALOME_Exception( SALOME.ExceptionStruct(SALOME.INTERNAL_ERROR,str(e),"pushOutputContext",0) ) + + def finishPushContext(self): + try: + self._output_context = pickle.loads( self._out_ctx ) + except Exception as e: + raise SALOME.SALOME_Exception( SALOME.ExceptionStruct(SALOME.INTERNAL_ERROR,str(e),"finishPushContext",0) ) def getOutputContext(self): return self._output_context class ExchangeContextUsingTCP( ExchangeContextBridgeAbs ): def buildContextPointEntry(self, caseDir, contextEntry): - import CORBA - self._orb = CORBA.ORB_init(['']) + import salome + salome.salome_init() + self._orb = salome.orb self._data_exchange_channel = self._orb.string_to_object( contextEntry ) return pickle.loads( SeqByteReceiver( self._data_exchange_channel.getInputContext() ).data() ) def pushContext(self, contextPointEntry, context): - poa = self._orb.resolve_initial_references("RootPOA") - pman = poa.the_POAManager - pman.activate() - obj = SenderByte_i(poa, pickle.dumps( context) ) ; id_o = poa.activate_object(obj) ; refPtr = poa.id_to_reference(id_o) - self._data_exchange_channel.pushOutputContext( refPtr ) + ctxBytes = pickle.dumps( context ) + size = len( ctxBytes ) + if size <= SeqByteReceiver.CHUNK_SIZE: + self._data_exchange_channel.pushOutputContext( ctxBytes ) + else: + EFF_CHUNK_SIZE = SeqByteReceiver.CHUNK_SIZE // 8 + iStart = 0 ; iEnd = EFF_CHUNK_SIZE + while iStart!=iEnd and iEnd <= size: + self._data_exchange_channel.pushOutputContext( ctxBytes[iStart:iEnd] ) + iStart = iEnd; iEnd = min(iStart + EFF_CHUNK_SIZE,size) + self._data_exchange_channel.finishPushContext() def ExchangeModeServerSideFactory( exchangeMode ): if exchangeMode == "File": @@ -1043,16 +1061,19 @@ class ExchangeContextCltAbs(abc.ABC): raise RuntimeError("Must be overloaded") @abc.abstractmethod - def removeContextSupport(self): + def removeContextSupport(self, isOK): raise RuntimeError("Must be overloaded") class ExchangeContextUsingFileClt(ExchangeContextCltAbs): + def __init__(self, keepFilesToReplay): + self._keep_in_files = keepFilesToReplay + def hostInputContext(self, dirForReplayFiles, contextFileBaseName, context): - contextFileName = os.path.join( dirForReplayFiles, contextFileBaseName) - with open(contextFileName,"wb") as contextFd: + self._in_ctx_entry_point = os.path.join( dirForReplayFiles, contextFileBaseName) + with open(self._in_ctx_entry_point,"wb") as contextFd: pickle.dump( context, contextFd) - return os.path.basename( contextFileName ) + return os.path.basename( self._in_ctx_entry_point ) def setOutputContextEntryPoint(self, dirForReplayFiles, outCtxtEntryPoint): self._out_ctx_entry_point = os.path.join( dirForReplayFiles, outCtxtEntryPoint ) @@ -1062,9 +1083,13 @@ class ExchangeContextUsingFileClt(ExchangeContextCltAbs): with open(self._out_ctx_entry_point,"rb") as f: return pickle.load( f ) - def removeContextSupport(self): - if os.path.exists( self._out_ctx_entry_point ): - os.unlink( self._out_ctx_entry_point ) + def removeContextSupport(self, isOK): + fileNamesToRm = [self._out_ctx_entry_point] + if isOK or not self._keep_in_files: + fileNamesToRm += [self._in_ctx_entry_point] + for fileName in fileNamesToRm: + if os.path.exists( fileName ): + os.unlink( fileName ) class ExchangeContextUsingTCPClt(ExchangeContextCltAbs): @@ -1077,25 +1102,21 @@ class ExchangeContextUsingTCPClt(ExchangeContextCltAbs): return self._portable_serv_entry def setOutputContextEntryPoint(self, dirForReplayFiles, outCtxtEntryPoint): - ret = self._portable_serv_entry - del self._portable_serv_entry # destroy to release ref count - import gc - gc.collect() - return ret + return self._portable_serv_entry def retrieveRemoteContext(self): return self._servant.getOutputContext() - def removeContextSupport(self): + def removeContextSupport(self, isOK):# isOK ignored. Because in memory mode nothing to save poa = self._servant.getPOA() poa.deactivate_object(self._id_o) del self._servant import gc gc.collect() -def ExchangeModeCltSideFactory( exchangeMode ): +def ExchangeModeCltSideFactory( exchangeMode, keepFilesToReplay ): if exchangeMode == "File": - return ExchangeContextUsingFileClt() + return ExchangeContextUsingFileClt(keepFilesToReplay) elif exchangeMode == "TCP": return ExchangeContextUsingTCPClt() else: @@ -1129,7 +1150,6 @@ def ExecCrashProofGeneric( code, context, outargsname, containerRef, instanceOfL import tempfile import pickle import subprocess as sp - import CORBA import logging # def IsConsideredAsOKRun( returnCode, closeEyesOnErrorAtExit , stderr ): @@ -1148,9 +1168,11 @@ def ExecCrashProofGeneric( code, context, outargsname, containerRef, instanceOfL return False,stderr # - def InternalExecResistant( exchangeMode, code, context, outargsname): + def InternalExecResistant( exchangeMode, keepFilesToReplay, code, context, outargsname): import KernelBasis - orb = CORBA.ORB_init(['']) + import salome + salome.salome_init() + orb = salome.orb iorScriptLog = orb.object_to_string( instanceOfLogOfCurrentSession._remote_handle )#ref ContainerScriptPerfLog_ptr #### EXEC_CODE_FNAME_PXF = "execsafe_" @@ -1168,7 +1190,7 @@ import sys sys.stderr.write({!r}) sys.stderr.flush()""".format( MY_KEY_TO_DETECT_FINISH ) ) codeFd.flush() - exCtx = ExchangeModeCltSideFactory(exchangeMode) + exCtx = ExchangeModeCltSideFactory(exchangeMode, keepFilesToReplay) codeFileNameFull = codeFd.name codeFileName = os.path.basename( codeFileNameFull ) contextFileName = exCtx.hostInputContext(dirForReplayFiles, "contextsafe_{}.pckl".format( RetrieveUniquePartFromPfx( codeFileName ) ), context) @@ -1204,7 +1226,7 @@ sys.stderr.flush()""".format( MY_KEY_TO_DETECT_FINISH ) ) exchangeMode = "File" if not keepFilesToReplay: exchangeMode = "TCP" - returnCode, stdout, stderr, evParams = InternalExecResistant( exchangeMode, code, context, outargsname ) + returnCode, stdout, stderr, evParams = InternalExecResistant( exchangeMode, keepFilesToReplay, code, context, outargsname ) stdout = stdout.decode() stderr = stderr.decode() sys.stdout.write( stdout ) ; sys.stdout.flush() @@ -1215,15 +1237,15 @@ sys.stderr.flush()""".format( MY_KEY_TO_DETECT_FINISH ) ) if len(pcklData) > 0: ret = pickle.loads( pcklData ) context.update( evParams.result ) - evParams.destroyOnOK() + evParams.destroyOnOKAndReplay() if returnCode != 0: print( "WARNING : Following code has generated non zero return code ( {} ) but considered as OK\n{}".format( returnCode, code ) ) return ret else: if keepFilesToReplay: - evParams.destroyOnKO( containerRef ) + evParams.destroyOnKOAndReplay( containerRef ) else: - evParams.destroyOnOK() + evParams.destroyOnOKAndReplay() raise RuntimeError(f"Subprocess launched {evParams.strDependingOnReturnCode(keepFilesToReplay,returnCode)}stdout :\n{stdout}\nstderr :\n{stderr}") def ExecCrashProofWithReplay( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ): @@ -1414,6 +1436,7 @@ class PyScriptNode_Abstract_i(Engines__POA.PyScriptNode,Generic,abc.ABC): self.addTimeInfoOnLevel2("startExecTime") ## self.addInfoOnLevel2("measureTimeResolution",self.my_container_py.monitoringtimeresms()) + self._current_execution_session.finalizeAndPushToMaster() # flush it to the server (scheduler hosting ContainerManager) in order to retrieve it in Python server cpumeminfo, self._current_execution_session._current_instance = executeSecondInternal( self.my_container_py.monitoringtimeresms() ) ## self.addInfoOnLevel2("CPUMemDuringExec",cpumeminfo) diff --git a/src/Launcher/SALOME_LogManager.py b/src/Launcher/SALOME_LogManager.py index 58114fc41..7404a35cf 100644 --- a/src/Launcher/SALOME_LogManager.py +++ b/src/Launcher/SALOME_LogManager.py @@ -45,16 +45,20 @@ class SALOME_ContainerScriptExecPerfLog: :param alreadyOnSiteBytes: pickle of instance of ScriptExecInfo of previous value (if any) [bytes] :param instanceRemoteBytes: pickle of current instance of ScriptExecInfo (if any) [bytes] """ + instanceRemote = pickle.loads(instanceRemoteBytes) alreadyOnSite = None if len( alreadyOnSiteBytes ) > 0: alreadyOnSite = pickle.loads(alreadyOnSiteBytes) - instanceRemote = pickle.loads(instanceRemoteBytes) self._stop_pos = os.path.getsize( self.father.father.logfilename ) - setattr(instanceRemote,"tracePosStop",self._stop_pos) - setattr(instanceRemote,"tracePosStart",self._start_pos) if alreadyOnSite: - instanceRemote.preappendFreestyle( alreadyOnSite._freestyle_log ) - return pickle.dumps(instanceRemote) + setattr(alreadyOnSite,"tracePosStop",self._stop_pos) + setattr(alreadyOnSite,"tracePosStart",self._start_pos) + alreadyOnSite.appendFreestyle( instanceRemote._freestyle_log ) + return pickle.dumps(alreadyOnSite) + else: + setattr(instanceRemote,"tracePosStop",self._stop_pos) + setattr(instanceRemote,"tracePosStart",self._start_pos) + return pickle.dumps(instanceRemote) def start(self): self._start_pos = os.path.getsize( self.father.father.logfilename ) diff --git a/src/Launcher/Test/testCrashProofContainer.py b/src/Launcher/Test/testCrashProofContainer.py index eb30ec1cc..9fe5aa785 100644 --- a/src/Launcher/Test/testCrashProofContainer.py +++ b/src/Launcher/Test/testCrashProofContainer.py @@ -103,6 +103,12 @@ del i j = np.zeros(shape=(2*nb,),dtype=np.float64) """ +FunnyCase_test7 = """ +import numpy as np +nb = i.shape[0] +j = np.zeros(shape=(2*nb,),dtype=np.float64) +""" + class testPerfLogManager1(unittest.TestCase): def test0(self): """ @@ -309,6 +315,7 @@ class testPerfLogManager1(unittest.TestCase): logInfoForCont = [elt for elt in a if "container_crash_test_5" in elt.ns_entry] self.assertEqual( len(logInfoForCont), 1 ) logInfoForCont = logInfoForCont[0] + self.assertTrue( logInfoForCont[1][0].startExecTime is not None ) self.assertEqual( [elt[0] for elt in logInfoForCont[1][0].get().freestyle] , ['b4loadctx', 'afterloadctx', 'bforeexec', 'b4loadctx', 'afterloadctx', 'bforeexec', 'afterexec', 'strtdumpout', 'afterdump'] ) # <- aim of test is here. First 3 entries ('b4loadctx', 'afterloadctx', 'bforeexec') prove that first attempt fails to return within 10 sececonds as requested by KernelBasis.SetExecutionTimeOut(10) pass pass @@ -348,6 +355,62 @@ class testPerfLogManager1(unittest.TestCase): gc.collect() #time.sleep(10) + def test7(self): + """ + [EDF30875] : Garanty that DirectoryForReplay is clean after execution. + """ + import numpy as np + import gc + szOfData = 12000 + KernelBasis.SetPyExecutionMode("OutOfProcessWithReplayFT") + salome.cm.SetBigObjOnDiskThreshold( 1 ) # enable proxy + with tempfile.TemporaryDirectory() as tmpdirname: + os.chdir( tmpdirname ) + hostname = "localhost" + cp = pylauncher.GetRequestForGiveContainer(hostname,"container_crash_test_7") + salome.cm.SetDirectoryForReplayFiles( str( tmpdirname ) ) + KernelBasis.SetBigObjOnDiskDirectory( str( tmpdirname ) ) + with salome.ContainerLauncherCM(cp,True) as cont: + poa = salome.orb.resolve_initial_references("RootPOA") + arr = np.zeros(shape=(szOfData,),dtype=np.float64) + obj = SALOME_PyNode.SenderByte_i(poa,pickle.dumps( (["i"],{"i": arr} ) )) ; id_o = poa.activate_object(obj) ; refPtr = poa.id_to_reference(id_o) + gc.collect() + pyscript = cont.createPyScriptNode("testScript",FunnyCase_test7) + pyscript.executeFirst(refPtr) + ret = pyscript.executeSecond(["j"]) + pxy = pickle.loads( SALOME_PyNode.SeqByteReceiver(ret[0]).data() ) # receiving twice size of input -> 2 GB + ret0 = UnProxyObjectSimple( pxy ) # it's a proxy -> un proxyfy it + DecrRefInFile( pxy.getFileName() ) + self.assertEqual( len( os.listdir( str( tmpdirname ) ) ) , 0 ) # very important it must be clean + + def test8(self): + """ + [EDF30875] : same than test7 but with OutOfProcessWithReplay. + """ + import numpy as np + import gc + szOfData = 12000 + KernelBasis.SetPyExecutionMode("OutOfProcessWithReplay") + salome.cm.SetBigObjOnDiskThreshold( 1 ) # enable proxy + with tempfile.TemporaryDirectory() as tmpdirname: + os.chdir( tmpdirname ) + hostname = "localhost" + cp = pylauncher.GetRequestForGiveContainer(hostname,"container_crash_test_8") + salome.cm.SetDirectoryForReplayFiles( str( tmpdirname ) ) + KernelBasis.SetBigObjOnDiskDirectory( str( tmpdirname ) ) + with salome.ContainerLauncherCM(cp,True) as cont: + poa = salome.orb.resolve_initial_references("RootPOA") + arr = np.zeros(shape=(szOfData,),dtype=np.float64) + obj = SALOME_PyNode.SenderByte_i(poa,pickle.dumps( (["i"],{"i": arr} ) )) ; id_o = poa.activate_object(obj) ; refPtr = poa.id_to_reference(id_o) + gc.collect() + pyscript = cont.createPyScriptNode("testScript",FunnyCase_test7) + pyscript.executeFirst(refPtr) + ret = pyscript.executeSecond(["j"]) + pxy = pickle.loads( SALOME_PyNode.SeqByteReceiver(ret[0]).data() ) # receiving twice size of input -> 2 GB + ret0 = UnProxyObjectSimple( pxy ) # it's a proxy -> un proxyfy it + DecrRefInFile( pxy.getFileName() ) + self.assertEqual( len( os.listdir( str( tmpdirname ) ) ) , 0 ) # very important it must be clean + if __name__ == '__main__': from salome_utils import positionVerbosityOfLoggerRegardingState,setVerboseLevel,setVerbose -- 2.39.2