from SALOME_PyNode import ExchangeModeServerSideFactory
from KernelBasis import VerbosityActivated,SetVerbosityLevel,SetVerbosityActivated
from salome_utils import positionVerbosityOfLoggerRegardingState
-import CORBA
import Engines
+import salome
import os
+import sys
+salome.salome_init()
from datetime import datetime
SetVerbosityActivated( {} )
SetVerbosityLevel( "{}" )
positionVerbosityOfLoggerRegardingState()
# WorkDir may be important to replay : "{}"
-orb = CORBA.ORB_init([''])
+orb = salome.orb
caseDirectory = "{}"
contextExchangeMode = "{}"
bigObjExchanger = ExchangeModeServerSideFactory( contextExchangeMode )
codeFileName = os.path.join( caseDirectory, "{}" )
inputFileName = "{}"
outputFileName = "{}"
-del os
outputsKeys = {}
exec( "{{}} = LogOfCurrentExecutionSession( orb.string_to_object( \\"{}\\" ) )".format(MY_PERFORMANCE_LOG_ENTRY_IN_GLBS) )
if VerbosityActivated():
def result(self):
return self._out_context_filename.retrieveRemoteContext()
- def destroyOnOK(self):
- self._out_context_filename.removeContextSupport()
+ def destroyOnOKAndReplay(self):
+ self._out_context_filename.removeContextSupport( True )
for fileToDestroy in [self._main_filename,self._code_filename,self._in_context_filename]:
if os.path.exists( fileToDestroy ):
os.unlink( fileToDestroy )
- def destroyOnKO(self, containerRef):
+ def destroyOnKOAndReplay(self, containerRef):
"""
Called in the context of failure with replay mode activated
"""
- self._out_context_filename.removeContextSupport()
+ self._out_context_filename.removeContextSupport( False )
# register to container files group associated to the
containerRef.addLogFileNameGroup([self._main_filename,self._code_filename,self._in_context_filename])
with open(outputFileName,"wb") as f:
pickle.dump( context, f )
-class ContextExchanger_i(Engines__POA.ContextExchanger,Generic):
+class ContextExchanger_i(Engines__POA.ContextExchanger):
"""
In TCP mode, servant hosted SALOME_Container side.
"""
import salome
self._poa = salome.orb.resolve_initial_references("RootPOA")
self._in_ctx = inCtx
+ self._out_ctx = bytes(0)
def getPOA(self):
return self._poa
def getInputContext(self):
obj = SenderByte_i(self._poa, pickle.dumps( self._in_ctx ) ) ; id_o = self._poa.activate_object(obj) ; refPtr = self._poa.id_to_reference(id_o)
return refPtr
-
+
def pushOutputContext(self, ctx):
- self._output_context = pickle.loads( SeqByteReceiver( ctx ).data() )
+ try:
+ self._out_ctx += ctx
+ except Exception as e:
+ raise SALOME.SALOME_Exception( SALOME.ExceptionStruct(SALOME.INTERNAL_ERROR,str(e),"pushOutputContext",0) )
+
+ def finishPushContext(self):
+ try:
+ self._output_context = pickle.loads( self._out_ctx )
+ except Exception as e:
+ raise SALOME.SALOME_Exception( SALOME.ExceptionStruct(SALOME.INTERNAL_ERROR,str(e),"finishPushContext",0) )
def getOutputContext(self):
return self._output_context
class ExchangeContextUsingTCP( ExchangeContextBridgeAbs ):
def buildContextPointEntry(self, caseDir, contextEntry):
- import CORBA
- self._orb = CORBA.ORB_init([''])
+ import salome
+ salome.salome_init()
+ self._orb = salome.orb
self._data_exchange_channel = self._orb.string_to_object( contextEntry )
return pickle.loads( SeqByteReceiver( self._data_exchange_channel.getInputContext() ).data() )
def pushContext(self, contextPointEntry, context):
- poa = self._orb.resolve_initial_references("RootPOA")
- pman = poa.the_POAManager
- pman.activate()
- obj = SenderByte_i(poa, pickle.dumps( context) ) ; id_o = poa.activate_object(obj) ; refPtr = poa.id_to_reference(id_o)
- self._data_exchange_channel.pushOutputContext( refPtr )
+ ctxBytes = pickle.dumps( context )
+ size = len( ctxBytes )
+ if size <= SeqByteReceiver.CHUNK_SIZE:
+ self._data_exchange_channel.pushOutputContext( ctxBytes )
+ else:
+ EFF_CHUNK_SIZE = SeqByteReceiver.CHUNK_SIZE // 8
+ iStart = 0 ; iEnd = EFF_CHUNK_SIZE
+ while iStart!=iEnd and iEnd <= size:
+ self._data_exchange_channel.pushOutputContext( ctxBytes[iStart:iEnd] )
+ iStart = iEnd; iEnd = min(iStart + EFF_CHUNK_SIZE,size)
+ self._data_exchange_channel.finishPushContext()
def ExchangeModeServerSideFactory( exchangeMode ):
if exchangeMode == "File":
raise RuntimeError("Must be overloaded")
@abc.abstractmethod
- def removeContextSupport(self):
+ def removeContextSupport(self, isOK):
raise RuntimeError("Must be overloaded")
class ExchangeContextUsingFileClt(ExchangeContextCltAbs):
+ def __init__(self, keepFilesToReplay):
+ self._keep_in_files = keepFilesToReplay
+
def hostInputContext(self, dirForReplayFiles, contextFileBaseName, context):
- contextFileName = os.path.join( dirForReplayFiles, contextFileBaseName)
- with open(contextFileName,"wb") as contextFd:
+ self._in_ctx_entry_point = os.path.join( dirForReplayFiles, contextFileBaseName)
+ with open(self._in_ctx_entry_point,"wb") as contextFd:
pickle.dump( context, contextFd)
- return os.path.basename( contextFileName )
+ return os.path.basename( self._in_ctx_entry_point )
def setOutputContextEntryPoint(self, dirForReplayFiles, outCtxtEntryPoint):
self._out_ctx_entry_point = os.path.join( dirForReplayFiles, outCtxtEntryPoint )
with open(self._out_ctx_entry_point,"rb") as f:
return pickle.load( f )
- def removeContextSupport(self):
- if os.path.exists( self._out_ctx_entry_point ):
- os.unlink( self._out_ctx_entry_point )
+ def removeContextSupport(self, isOK):
+ fileNamesToRm = [self._out_ctx_entry_point]
+ if isOK or not self._keep_in_files:
+ fileNamesToRm += [self._in_ctx_entry_point]
+ for fileName in fileNamesToRm:
+ if os.path.exists( fileName ):
+ os.unlink( fileName )
class ExchangeContextUsingTCPClt(ExchangeContextCltAbs):
return self._portable_serv_entry
def setOutputContextEntryPoint(self, dirForReplayFiles, outCtxtEntryPoint):
- ret = self._portable_serv_entry
- del self._portable_serv_entry # destroy to release ref count
- import gc
- gc.collect()
- return ret
+ return self._portable_serv_entry
def retrieveRemoteContext(self):
return self._servant.getOutputContext()
- def removeContextSupport(self):
+ def removeContextSupport(self, isOK):# isOK ignored. Because in memory mode nothing to save
poa = self._servant.getPOA()
poa.deactivate_object(self._id_o)
del self._servant
import gc
gc.collect()
-def ExchangeModeCltSideFactory( exchangeMode ):
+def ExchangeModeCltSideFactory( exchangeMode, keepFilesToReplay ):
if exchangeMode == "File":
- return ExchangeContextUsingFileClt()
+ return ExchangeContextUsingFileClt(keepFilesToReplay)
elif exchangeMode == "TCP":
return ExchangeContextUsingTCPClt()
else:
import tempfile
import pickle
import subprocess as sp
- import CORBA
import logging
#
def IsConsideredAsOKRun( returnCode, closeEyesOnErrorAtExit , stderr ):
return False,stderr
#
- def InternalExecResistant( exchangeMode, code, context, outargsname):
+ def InternalExecResistant( exchangeMode, keepFilesToReplay, code, context, outargsname):
import KernelBasis
- orb = CORBA.ORB_init([''])
+ import salome
+ salome.salome_init()
+ orb = salome.orb
iorScriptLog = orb.object_to_string( instanceOfLogOfCurrentSession._remote_handle )#ref ContainerScriptPerfLog_ptr
####
EXEC_CODE_FNAME_PXF = "execsafe_"
sys.stderr.write({!r})
sys.stderr.flush()""".format( MY_KEY_TO_DETECT_FINISH ) )
codeFd.flush()
- exCtx = ExchangeModeCltSideFactory(exchangeMode)
+ exCtx = ExchangeModeCltSideFactory(exchangeMode, keepFilesToReplay)
codeFileNameFull = codeFd.name
codeFileName = os.path.basename( codeFileNameFull )
contextFileName = exCtx.hostInputContext(dirForReplayFiles, "contextsafe_{}.pckl".format( RetrieveUniquePartFromPfx( codeFileName ) ), context)
exchangeMode = "File"
if not keepFilesToReplay:
exchangeMode = "TCP"
- returnCode, stdout, stderr, evParams = InternalExecResistant( exchangeMode, code, context, outargsname )
+ returnCode, stdout, stderr, evParams = InternalExecResistant( exchangeMode, keepFilesToReplay, code, context, outargsname )
stdout = stdout.decode()
stderr = stderr.decode()
sys.stdout.write( stdout ) ; sys.stdout.flush()
if len(pcklData) > 0:
ret = pickle.loads( pcklData )
context.update( evParams.result )
- evParams.destroyOnOK()
+ evParams.destroyOnOKAndReplay()
if returnCode != 0:
print( "WARNING : Following code has generated non zero return code ( {} ) but considered as OK\n{}".format( returnCode, code ) )
return ret
else:
if keepFilesToReplay:
- evParams.destroyOnKO( containerRef )
+ evParams.destroyOnKOAndReplay( containerRef )
else:
- evParams.destroyOnOK()
+ evParams.destroyOnOKAndReplay()
raise RuntimeError(f"Subprocess launched {evParams.strDependingOnReturnCode(keepFilesToReplay,returnCode)}stdout :\n{stdout}\nstderr :\n{stderr}")
def ExecCrashProofWithReplay( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ):
self.addTimeInfoOnLevel2("startExecTime")
##
self.addInfoOnLevel2("measureTimeResolution",self.my_container_py.monitoringtimeresms())
+ self._current_execution_session.finalizeAndPushToMaster() # flush it to the server (scheduler hosting ContainerManager) in order to retrieve it in Python server
cpumeminfo, self._current_execution_session._current_instance = executeSecondInternal( self.my_container_py.monitoringtimeresms() )
##
self.addInfoOnLevel2("CPUMemDuringExec",cpumeminfo)
j = np.zeros(shape=(2*nb,),dtype=np.float64)
"""
+FunnyCase_test7 = """
+import numpy as np
+nb = i.shape[0]
+j = np.zeros(shape=(2*nb,),dtype=np.float64)
+"""
+
class testPerfLogManager1(unittest.TestCase):
def test0(self):
"""
logInfoForCont = [elt for elt in a if "container_crash_test_5" in elt.ns_entry]
self.assertEqual( len(logInfoForCont), 1 )
logInfoForCont = logInfoForCont[0]
+ self.assertTrue( logInfoForCont[1][0].startExecTime is not None )
self.assertEqual( [elt[0] for elt in logInfoForCont[1][0].get().freestyle] , ['b4loadctx', 'afterloadctx', 'bforeexec', 'b4loadctx', 'afterloadctx', 'bforeexec', 'afterexec', 'strtdumpout', 'afterdump'] ) # <- aim of test is here. First 3 entries ('b4loadctx', 'afterloadctx', 'bforeexec') prove that first attempt fails to return within 10 sececonds as requested by KernelBasis.SetExecutionTimeOut(10)
pass
pass
gc.collect()
#time.sleep(10)
+ def test7(self):
+ """
+ [EDF30875] : Garanty that DirectoryForReplay is clean after execution.
+ """
+ import numpy as np
+ import gc
+ szOfData = 12000
+ KernelBasis.SetPyExecutionMode("OutOfProcessWithReplayFT")
+ salome.cm.SetBigObjOnDiskThreshold( 1 ) # enable proxy
+ with tempfile.TemporaryDirectory() as tmpdirname:
+ os.chdir( tmpdirname )
+ hostname = "localhost"
+ cp = pylauncher.GetRequestForGiveContainer(hostname,"container_crash_test_7")
+ salome.cm.SetDirectoryForReplayFiles( str( tmpdirname ) )
+ KernelBasis.SetBigObjOnDiskDirectory( str( tmpdirname ) )
+ with salome.ContainerLauncherCM(cp,True) as cont:
+ poa = salome.orb.resolve_initial_references("RootPOA")
+ arr = np.zeros(shape=(szOfData,),dtype=np.float64)
+ obj = SALOME_PyNode.SenderByte_i(poa,pickle.dumps( (["i"],{"i": arr} ) )) ; id_o = poa.activate_object(obj) ; refPtr = poa.id_to_reference(id_o)
+ gc.collect()
+ pyscript = cont.createPyScriptNode("testScript",FunnyCase_test7)
+ pyscript.executeFirst(refPtr)
+ ret = pyscript.executeSecond(["j"])
+ pxy = pickle.loads( SALOME_PyNode.SeqByteReceiver(ret[0]).data() ) # receiving twice size of input -> 2 GB
+ ret0 = UnProxyObjectSimple( pxy ) # it's a proxy -> un proxyfy it
+ DecrRefInFile( pxy.getFileName() )
+ self.assertEqual( len( os.listdir( str( tmpdirname ) ) ) , 0 ) # very important it must be clean
+
+ def test8(self):
+ """
+ [EDF30875] : same than test7 but with OutOfProcessWithReplay.
+ """
+ import numpy as np
+ import gc
+ szOfData = 12000
+ KernelBasis.SetPyExecutionMode("OutOfProcessWithReplay")
+ salome.cm.SetBigObjOnDiskThreshold( 1 ) # enable proxy
+ with tempfile.TemporaryDirectory() as tmpdirname:
+ os.chdir( tmpdirname )
+ hostname = "localhost"
+ cp = pylauncher.GetRequestForGiveContainer(hostname,"container_crash_test_8")
+ salome.cm.SetDirectoryForReplayFiles( str( tmpdirname ) )
+ KernelBasis.SetBigObjOnDiskDirectory( str( tmpdirname ) )
+ with salome.ContainerLauncherCM(cp,True) as cont:
+ poa = salome.orb.resolve_initial_references("RootPOA")
+ arr = np.zeros(shape=(szOfData,),dtype=np.float64)
+ obj = SALOME_PyNode.SenderByte_i(poa,pickle.dumps( (["i"],{"i": arr} ) )) ; id_o = poa.activate_object(obj) ; refPtr = poa.id_to_reference(id_o)
+ gc.collect()
+ pyscript = cont.createPyScriptNode("testScript",FunnyCase_test7)
+ pyscript.executeFirst(refPtr)
+ ret = pyscript.executeSecond(["j"])
+ pxy = pickle.loads( SALOME_PyNode.SeqByteReceiver(ret[0]).data() ) # receiving twice size of input -> 2 GB
+ ret0 = UnProxyObjectSimple( pxy ) # it's a proxy -> un proxyfy it
+ DecrRefInFile( pxy.getFileName() )
+ self.assertEqual( len( os.listdir( str( tmpdirname ) ) ) , 0 ) # very important it must be clean
+
if __name__ == '__main__':
from salome_utils import positionVerbosityOfLoggerRegardingState,setVerboseLevel,setVerbose