import KernelBasis
import SALOME
import SALOME__POA
+import Engines
+import Engines__POA
from SALOME_ContainerHelper import ScriptExecInfo
MY_CONTAINER_ENTRY_IN_GLBS = "my_container"
FinalCode = """import pickle
from SALOME_PyNode import LogOfCurrentExecutionSession,MY_PERFORMANCE_LOG_ENTRY_IN_GLBS
+from SALOME_PyNode import ExchangeModeServerSideFactory
from KernelBasis import VerbosityActivated
import CORBA
import Engines
# WorkDir may be important to replay : "{}"
orb = CORBA.ORB_init([''])
caseDirectory = "{}"
+contextExchangeMode = "{}"
+bigObjExchanger = ExchangeModeServerSideFactory( contextExchangeMode )
codeFileName = os.path.join( caseDirectory, "{}" )
-inputFileName = os.path.join( caseDirectory, "{}" )
-outputFileName = os.path.join( caseDirectory, "{}" )
+inputFileName = "{}"
+outputFileName = "{}"
del os
outputsKeys = {}
exec( "{{}} = LogOfCurrentExecutionSession( orb.string_to_object( \\"{}\\" ) )".format(MY_PERFORMANCE_LOG_ENTRY_IN_GLBS) )
if VerbosityActivated():
eval( "{{}}".format(MY_PERFORMANCE_LOG_ENTRY_IN_GLBS) ).addFreestyleAndFlush( ("b4loadctx",datetime.now()) )
-with open(inputFileName,"rb") as f:
- context = pickle.load( f )
+context = bigObjExchanger.buildContextPointEntry(caseDirectory,inputFileName)
if VerbosityActivated():
eval( "{{}}".format(MY_PERFORMANCE_LOG_ENTRY_IN_GLBS) ).addFreestyleAndFlush( ("afterloadctx",datetime.now()) )
context[MY_PERFORMANCE_LOG_ENTRY_IN_GLBS] = eval( MY_PERFORMANCE_LOG_ENTRY_IN_GLBS )
if VerbosityActivated():
eval( "{{}}".format(MY_PERFORMANCE_LOG_ENTRY_IN_GLBS) ).addFreestyleAndFlush( ("strtdumpout",datetime.now()) )
#
-with open(outputFileName,"wb") as f:
- pickle.dump( context, f )
+bigObjExchanger.pushContext(outputFileName,context)
if VerbosityActivated():
eval( "{{}}".format(MY_PERFORMANCE_LOG_ENTRY_IN_GLBS) ).addFreestyleAndFlush( ("afterdump",datetime.now()) )
"""
class PythonFunctionEvaluatorParams:
def __init__(self, mainFileName, codeFileName, inContextFileName, outContextFileName):
+ """
+ :param outContextFileName: instance of ExchangeContextCltAbs
+ """
self._main_filename = mainFileName
self._code_filename = codeFileName
self._in_context_filename = inContextFileName
self._out_context_filename = outContextFileName
@property
def result(self):
- import pickle
- with open(self._out_context_filename,"rb") as f:
- return pickle.load( f )
+ return self._out_context_filename.retrieveRemoteContext()
+
def destroyOnOK(self):
- for fileToDestroy in [self._main_filename,self._code_filename,self._in_context_filename,self._out_context_filename]:
+ self._out_context_filename.removeContextSupport()
+ for fileToDestroy in [self._main_filename,self._code_filename,self._in_context_filename]:
if os.path.exists( fileToDestroy ):
os.unlink( fileToDestroy )
def destroyOnKO(self, containerRef):
- """
- Called in the context of failure with replay mode activated
- """
- for fileToDestroy in [self._out_context_filename]:
- if os.path.exists( fileToDestroy ):
- os.unlink( fileToDestroy )
- # register to container files group associated to the
- containerRef.addLogFileNameGroup([self._main_filename,self._code_filename,self._in_context_filename])
+ """
+ Called in the context of failure with replay mode activated
+ """
+ self._out_context_filename.removeContextSupport()
+ # register to container files group associated to the
+ containerRef.addLogFileNameGroup([self._main_filename,self._code_filename,self._in_context_filename])
+
@property
def replayCmd(self):
return "To replay : ( cd {} && python3 {} )".format(os.path.dirname(self._main_filename),os.path.basename(self._main_filename))
{banner}
"""
+class ExchangeContextBridgeAbs(abc.ABC):
+ def __init__(self):
+ pass
+
+ @abc.abstractmethod
+ def buildContextPointEntry(self, caseDir, contextEntry):
+ raise RuntimeError("Must be overloaded")
+
+ @abc.abstractmethod
+ def pushContext(self, contextPointEntry, contextEntry):
+ raise RuntimeError("Must be overloaded")
+
+class ExchangeContextUsingFile( ExchangeContextBridgeAbs ):
+ def buildContextPointEntry(self, caseDir, contextEntry):
+ self._case_dir = caseDir
+ inputFileName = os.path.join( self._case_dir, contextEntry )
+ with open(inputFileName,"rb") as f:
+ context = pickle.load( f )
+ return context
+
+ def pushContext(self, contextPointEntry, context):
+ outputFileName = os.path.join( self._case_dir, contextPointEntry )
+ with open(outputFileName,"wb") as f:
+ pickle.dump( context, f )
+
+class ContextExchanger_i(Engines__POA.ContextExchanger,Generic):
+ """
+ In TCP mode, servant hosted SALOME_Container side.
+ """
+ def __init__(self, inCtx):
+ import salome
+ self._poa = salome.orb.resolve_initial_references("RootPOA")
+ self._in_ctx = inCtx
+
+ def getPOA(self):
+ return self._poa
+
+ def getInputContext(self):
+ obj = SenderByte_i(self._poa, pickle.dumps( self._in_ctx ) ) ; id_o = self._poa.activate_object(obj) ; refPtr = self._poa.id_to_reference(id_o)
+ return refPtr
+
+ def pushOutputContext(self, ctx):
+ self._output_context = pickle.loads( SeqByteReceiver( ctx ).data() )
+
+ def getOutputContext(self):
+ return self._output_context
+
+class ExchangeContextUsingTCP( ExchangeContextBridgeAbs ):
+ def buildContextPointEntry(self, caseDir, contextEntry):
+ import CORBA
+ self._orb = CORBA.ORB_init([''])
+ self._data_exchange_channel = self._orb.string_to_object( contextEntry )
+ return pickle.loads( SeqByteReceiver( self._data_exchange_channel.getInputContext() ).data() )
+
+ def pushContext(self, contextPointEntry, context):
+ poa = self._orb.resolve_initial_references("RootPOA")
+ pman = poa.the_POAManager
+ pman.activate()
+ obj = SenderByte_i(poa, pickle.dumps( context) ) ; id_o = poa.activate_object(obj) ; refPtr = poa.id_to_reference(id_o)
+ self._data_exchange_channel.pushOutputContext( refPtr )
+
+def ExchangeModeServerSideFactory( exchangeMode ):
+ if exchangeMode == "File":
+ return ExchangeContextUsingFile()
+ elif exchangeMode == "TCP":
+ return ExchangeContextUsingTCP()
+ else:
+ raise RuntimeError( "ExchangeModeServerSideFactory : {} not in [File,TCP]" )
+
+class ExchangeContextCltAbs(abc.ABC):
+ def __init__(self):
+ pass
+
+ @abc.abstractmethod
+ def hostInputContext(self, dirForReplayFiles, contextFileBaseName, context):
+ raise RuntimeError("Must be overloaded")
+
+ @abc.abstractmethod
+ def setOutputContextEntryPoint(self, dirForReplayFiles, outCtxtEntryPoint):
+ raise RuntimeError("Must be overloaded")
+
+ @abc.abstractmethod
+ def retrieveRemoteContext(self):
+ raise RuntimeError("Must be overloaded")
+
+ @abc.abstractmethod
+ def removeContextSupport(self):
+ raise RuntimeError("Must be overloaded")
+
+class ExchangeContextUsingFileClt(ExchangeContextCltAbs):
+
+ def hostInputContext(self, dirForReplayFiles, contextFileBaseName, context):
+ contextFileName = os.path.join( dirForReplayFiles, contextFileBaseName)
+ with open(contextFileName,"wb") as contextFd:
+ pickle.dump( context, contextFd)
+ return os.path.basename( contextFileName )
+
+ def setOutputContextEntryPoint(self, dirForReplayFiles, outCtxtEntryPoint):
+ self._out_ctx_entry_point = os.path.join( dirForReplayFiles, outCtxtEntryPoint )
+ return os.path.basename( self._out_ctx_entry_point )
+
+ def retrieveRemoteContext(self):
+ with open(self._out_ctx_entry_point,"rb") as f:
+ return pickle.load( f )
+
+ def removeContextSupport(self):
+ if os.path.exists( self._out_ctx_entry_point ):
+ os.unlink( self._out_ctx_entry_point )
+
+class ExchangeContextUsingTCPClt(ExchangeContextCltAbs):
+
+ def hostInputContext(self, dirForReplayFiles, contextFileBaseName, context):
+ import salome
+ self._servant = ContextExchanger_i(context)
+ poa = self._servant.getPOA()
+ self._id_o = poa.activate_object(self._servant) ; refPtr = poa.id_to_reference(self._id_o)
+ self._portable_serv_entry = salome.orb.object_to_string( refPtr )
+ return self._portable_serv_entry
+
+ def setOutputContextEntryPoint(self, dirForReplayFiles, outCtxtEntryPoint):
+ ret = self._portable_serv_entry
+ del self._portable_serv_entry # destroy to release ref count
+ import gc
+ gc.collect()
+ return ret
+
+ def retrieveRemoteContext(self):
+ return self._servant.getOutputContext()
+
+ def removeContextSupport(self):
+ poa = self._servant.getPOA()
+ poa.deactivate_object(self._id_o)
+ del self._servant
+ import gc
+ gc.collect()
+
+def ExchangeModeCltSideFactory( exchangeMode ):
+ if exchangeMode == "File":
+ return ExchangeContextUsingFileClt()
+ elif exchangeMode == "TCP":
+ return ExchangeContextUsingTCPClt()
+ else:
+ raise RuntimeError( "ExchangeModeCltSideFactory : {} not in [File,TCP]" )
+
def ExecCrashProofGeneric( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, keepFilesToReplay, closeEyesOnErrorAtExit):
"""
Equivalent of exec(code,context) but executed in a separate subprocess to avoid to make the current process crash.
return False,stderr
#
- def InternalExecResistant( code, context, outargsname):
+ def InternalExecResistant( exchangeMode, code, context, outargsname):
import KernelBasis
orb = CORBA.ORB_init([''])
iorScriptLog = orb.object_to_string( instanceOfLogOfCurrentSession._remote_handle )#ref ContainerScriptPerfLog_ptr
sys.stderr.write({!r})
sys.stderr.flush()""".format( MY_KEY_TO_DETECT_FINISH ) )
codeFd.flush()
+ exCtx = ExchangeModeCltSideFactory(exchangeMode)
codeFileNameFull = codeFd.name
codeFileName = os.path.basename( codeFileNameFull )
- contextFileName = os.path.join( dirForReplayFiles, "contextsafe_{}.pckl".format( RetrieveUniquePartFromPfx( codeFileName ) ) )
- with open(contextFileName,"wb") as contextFd:
- pickle.dump( context, contextFd)
- resFileName = os.path.join( dirForReplayFiles, "outcontextsafe_{}.pckl".format( RetrieveUniquePartFromPfx( codeFileName ) ) )
+ contextFileName = exCtx.hostInputContext(dirForReplayFiles, "contextsafe_{}.pckl".format( RetrieveUniquePartFromPfx( codeFileName ) ), context)
+ resFileName = exCtx.setOutputContextEntryPoint( dirForReplayFiles, "outcontextsafe_{}.pckl".format( RetrieveUniquePartFromPfx( codeFileName ) ) )
mainExecFileName = os.path.join( dirForReplayFiles, "mainexecsafe_{}.py".format( RetrieveUniquePartFromPfx( codeFileName ) ) )
with open(mainExecFileName,"w") as f:
- f.write( FinalCode.format( os.getcwd(), dirForReplayFiles, codeFileName, os.path.basename( contextFileName ), os.path.basename( resFileName ), outargsname, iorScriptLog ) )
+ f.write( FinalCode.format( os.getcwd(), dirForReplayFiles, exchangeMode, codeFileName, contextFileName, resFileName, outargsname, iorScriptLog ) )
timeOut = KernelBasis.GetExecutionTimeOut()
nbRetry = KernelBasis.GetNumberOfRetry()
logging.debug( "Nb retry = {} Timout in seconds = {}".format( nbRetry, timeOut ) )
for iTry in range( nbRetry ):
if iTry > 0:
print( "WARNING : Retry # {}. Following code has generated non zero return code ( {} ). Trying again ... \n{}".format( iTry, returnCode, code ) )
+ logging.debug( "try # {} / {} --- ".format( iTry, nbRetry ) )
p = sp.Popen(["python3", mainExecFileName],cwd = os.getcwd(),stdout = sp.PIPE, stderr = sp.PIPE)
try:
args = {}
if iTry >= 1:
logging.warning( "At Retry #{} it's successful :)".format(iTry) )
break
- return returnCode, stdout, stderr, PythonFunctionEvaluatorParams(mainExecFileName,codeFileNameFull,contextFileName,resFileName)
+ return returnCode, stdout, stderr, PythonFunctionEvaluatorParams(mainExecFileName,codeFileNameFull,contextFileName,exCtx)
ret = instanceOfLogOfCurrentSession._current_instance
- returnCode, stdout, stderr, evParams = InternalExecResistant( code, context, outargsname )
+ exchangeMode = "File"
+ if not keepFilesToReplay:
+ exchangeMode = "TCP"
+ returnCode, stdout, stderr, evParams = InternalExecResistant( exchangeMode, code, context, outargsname )
stdout = stdout.decode()
stderr = stderr.decode()
sys.stdout.write( stdout ) ; sys.stdout.flush()
raise RuntimeError(f"Subprocess launched {evParams.strDependingOnReturnCode(keepFilesToReplay,returnCode)}stdout :\n{stdout}\nstderr :\n{stderr}")
def ExecCrashProofWithReplay( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ):
- return ExecCrashProofGeneric(code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, True, False)
+ return ExecCrashProofGeneric(code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, keepFilesToReplay = True, closeEyesOnErrorAtExit = False)
def ExecCrashProofWithoutReplay( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ):
- return ExecCrashProofGeneric(code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, False, False)
+ return ExecCrashProofGeneric(code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, keepFilesToReplay = False, closeEyesOnErrorAtExit = False)
def ExecCrashProofWithReplayFT( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ):
- return ExecCrashProofGeneric(code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, True, True)
+ return ExecCrashProofGeneric(code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, keepFilesToReplay = True, closeEyesOnErrorAtExit = True)
def ExecCrashProofWithoutReplayFT( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ):
- return ExecCrashProofGeneric(code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, False, True)
+ return ExecCrashProofGeneric(code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, keepFilesToReplay = False, closeEyesOnErrorAtExit = True)
def ExecLocal( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ):
exec( code, context )