Salome HOME
[EDF29138] : measure CPU/Mem even in OutOfProcess mode
[modules/kernel.git] / src / Container / SALOME_PyNode.py
index 83211834562880e2834a206e826748d9effa3228..e7f3cb62587864d177b96e5cb75e0e0f607d1c4b 100644 (file)
@@ -30,6 +30,7 @@ import Engines__POA
 import SALOME__POA
 import SALOME
 import logging
+import abc
 import os
 import sys
 from SALOME_ContainerHelper import ScriptExecInfo
@@ -527,12 +528,21 @@ def CPUMemoryMonitoring( intervalInMs, outFileName = None ):
       f.write("""import psutil
 pid = {}
 process = psutil.Process( pid )
+def getChargeOf( p ):
+  a,b = p.cpu_percent(), p.memory_info().rss
+  try:
+    for c in p.children():
+      a += c.cpu_percent(interval=0.01) ; b += c.memory_info().rss
+  except:
+    pass
+  return a,b
 import time
 with open("{}","a") as f:
   f.write( "{{}}\\n".format( "{}" ) )
   while True:
-    f.write( "{{}}\\n".format( str( process.cpu_percent() ) ) )
-    f.write( "{{}}\\n".format( str( process.memory_info().rss  ) ) )
+    cpu,mem_rss = getChargeOf( process )
+    f.write( "{{}}\\n".format( str( cpu ) ) )
+    f.write( "{{}}\\n".format( str( mem_rss  ) ) )
     f.flush()
     time.sleep( {} / 1000.0 )
 """.format(pid, tempOutFile, intervalInMs, intervalInMs))
@@ -714,25 +724,200 @@ class SeqByteReceiver:
         data_for_split_case = bytes(0).join( [data_for_split_case,part] )
         iStart = iEnd; iEnd = min(iStart + EFF_CHUNK_SIZE,size)
       return data_for_split_case
+  
+FinalCode = """import pickle
+from SALOME_PyNode import LogOfCurrentExecutionSession,MY_PERFORMANCE_LOG_ENTRY_IN_GLBS
+import CORBA
+import Engines
+orb = CORBA.ORB_init([''])
+codeFileName = "{}"
+inputFileName = "{}"
+outputFileName = "{}"
+outputsKeys = {}
+exec( "{{}} = LogOfCurrentExecutionSession( orb.string_to_object( \\"{}\\" ) )".format(MY_PERFORMANCE_LOG_ENTRY_IN_GLBS) )
+with open(inputFileName,"rb") as f:
+  context = pickle.load( f )
+context[MY_PERFORMANCE_LOG_ENTRY_IN_GLBS] = eval( MY_PERFORMANCE_LOG_ENTRY_IN_GLBS )
+with open(codeFileName,"r") as f:
+  code = f.read()
+# go for execution
+exec( code , context )
+# filter part of context to be exported to father process
+context = dict( [(k,v) for k,v in context.items() if k in outputsKeys] )
+#
+with open(outputFileName,"wb") as f:
+  pickle.dump( context, f )
+"""
+
+class PythonFunctionEvaluatorParams:
+  def __init__(self, mainFileName, codeFileName, inContextFileName, outContextFileName):
+    self._main_filename = mainFileName
+    self._code_filename = codeFileName
+    self._in_context_filename = inContextFileName
+    self._out_context_filename = outContextFileName
+  @property
+  def result(self):
+    import pickle
+    with open(self._out_context_filename,"rb") as f:
+      return pickle.load( f )
+  def destroyOnOK(self):
+    for fileToDestroy in [self._main_filename,self._code_filename,self._in_context_filename,self._out_context_filename]:
+      if os.path.exists( fileToDestroy ):
+        os.unlink( fileToDestroy )
+  def destroyOnKO(self, containerRef):
+     """
+     Called in the context of failure with replay mode activated
+     """
+     for fileToDestroy in [self._out_context_filename]:
+      if os.path.exists( fileToDestroy ):
+        os.unlink( fileToDestroy )
+      # register to container files group associated to the
+      containerRef.addLogFileNameGroup([self._main_filename,self._code_filename,self._in_context_filename])
+  @property
+  def replayCmd(self):
+    return "To replay : ( cd {} && python3 {} )".format(os.path.dirname(self._main_filename),os.path.basename(self._main_filename))
+  
+  @property
+  def cleanOperations(self):
+    import os
+    return "To clean files : ( cd {} && rm {} )".format( os.path.dirname(self._main_filename)," ".join( [os.path.basename(self._main_filename),self._code_filename,self._in_context_filename] ) )
+
+  def strDependingOnReturnCode(self, keepFilesToReplay, returnCode):
+    if returnCode == -1:
+      return f"return with non zero code ({returnCode})"
+    else:
+      banner = 200*"*"
+      if keepFilesToReplay:
+        return f"""return with non zero code ({returnCode})
+{banner}
+Looks like a hard crash as returnCode {returnCode} != 0
+{self.replayCmd}
+{self.cleanOperations}
+{banner}
+"""
+      else:
+        return f"""return with non zero code ({returnCode})
+{banner}
+Looks like a hard crash as returnCode {returnCode} != 0
+{banner}
+"""
+
+def ExecCrashProofGeneric( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, keepFilesToReplay ):
+  """
+  Equivalent of exec(code,context) but executed in a separate subprocess to avoid to make the current process crash.
+  
+  Args:
+  -----
+
+  code (str) : python code to be executed using context
+  context (dict) : context to be used for execution. This context will be updated in accordance with the execution of code.
+  outargsname (list<str>) : list of arguments to be exported 
+  containerRef (Engines.Container) : Container ref (retrieving the Files to created when keepFilesToReplay is set to False)
+  instanceOfLogOfCurrentSession (LogOfCurrentExecutionSession) : instance of LogOfCurrentExecutionSession to build remotely the reference in order to log information
+  keepFilesToReplay (bool) : if True when something goes wrong during execution all the files to replay post mortem case are kept. If False only error is reported but files to replay are destoyed.
+
+  Return:
+  -------
+
+  ScriptExecInfo : instance serverside
+
+  In/Out:
+  -------
+
+  context will be modified by this method. elts in outargsname will be added and their corresponding value coming from evaluation.
+  """
+  import tempfile
+  import pickle
+  import subprocess as sp
+  import CORBA
+  #
+  def InternalExecResistant( code, context, outargsname):
+    orb = CORBA.ORB_init([''])
+    iorScriptLog = orb.object_to_string( instanceOfLogOfCurrentSession._remote_handle )#ref ContainerScriptPerfLog_ptr
+    ####
+    EXEC_CODE_FNAME_PXF = "execsafe_"
+    def RetrieveUniquePartFromPfx( fname ):
+      return os.path.splitext( os.path.basename(fname)[len(EXEC_CODE_FNAME_PXF):] )[0]
+    with tempfile.NamedTemporaryFile(dir=os.getcwd(),prefix=EXEC_CODE_FNAME_PXF,suffix=".py", mode="w", delete = False) as codeFd:
+      codeFd.write( code )
+      codeFd.flush()
+      codeFileName = os.path.basename( codeFd.name )
+      contextFileName = "contextsafe_{}.pckl".format( RetrieveUniquePartFromPfx( codeFileName  ) )
+      with open(contextFileName,"wb") as contextFd:
+        pickle.dump( context, contextFd)
+      resFileName = "outcontextsafe_{}.pckl".format( RetrieveUniquePartFromPfx( codeFileName  ) )
+      mainExecFileName = os.path.abspath( "mainexecsafe_{}.py".format( RetrieveUniquePartFromPfx( codeFileName  ) ) )
+      with open(mainExecFileName,"w") as f:
+        f.write( FinalCode.format( codeFileName, contextFileName, resFileName, outargsname, iorScriptLog ) )
+      p = sp.Popen(["python3", mainExecFileName],stdout = sp.PIPE, stderr = sp.PIPE)
+      stdout, stderr = p.communicate()
+      returnCode = p.returncode
+    return returnCode, stdout, stderr, PythonFunctionEvaluatorParams(mainExecFileName,codeFileName,contextFileName,resFileName)
+  ret = instanceOfLogOfCurrentSession._current_instance
+  returnCode, stdout, stderr, evParams = InternalExecResistant( code, context, outargsname )
+  stdout = stdout.decode()
+  stderr = stderr.decode()
+  sys.stdout.write( stdout ) ; sys.stdout.flush()
+  sys.stderr.write( stderr ) ; sys.stderr.flush()
+  if returnCode == 0:
+    pcklData = instanceOfLogOfCurrentSession._remote_handle.getObj()
+    if len(pcklData) > 0:
+      ret = pickle.loads( pcklData )
+    context.update( evParams.result )
+    evParams.destroyOnOK()
+    return ret
+  if returnCode != 0:
+    if keepFilesToReplay:
+      evParams.destroyOnKO( containerRef )
+    else:
+      evParams.destroyOnOK()
+    raise RuntimeError(f"Subprocess launched {evParams.strDependingOnReturnCode(keepFilesToReplay,returnCode)}stdout :\n{stdout}\nstderr :\n{stderr}")
+
+def ExecCrashProofWithReplay( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ):
+  return ExecCrashProofGeneric(code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, True)
 
-class LogOfCurrentExecutionSession:
+def ExecCrashProofWithoutReplay( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ):
+  return ExecCrashProofGeneric(code, context, outargsname, containerRef, instanceOfLogOfCurrentSession, False)
+
+def ExecLocal( code, context, outargsname, containerRef, instanceOfLogOfCurrentSession ):
+  exec( code, context )
+  return instanceOfLogOfCurrentSession._current_instance
+
+class LogOfCurrentExecutionSessionAbs(abc.ABC):
+  def __init__(self):
+    self._current_instance = ScriptExecInfo()
+
+  def addInfoOnLevel2(self, key, value):
+    setattr(self._current_instance,key,value)
+
+  @abc.abstractmethod
+  def addFreestyleAndFlush(self, value):
+    raise RuntimeError("Must be overloaded")
+
+class LogOfCurrentExecutionSession(LogOfCurrentExecutionSessionAbs):
   def __init__(self, handleToCentralizedInst):
+    super().__init__()
     self._remote_handle = handleToCentralizedInst
-    self._current_instance = ScriptExecInfo()
 
   def addFreestyleAndFlush(self, value):
     self._current_instance.freestyle = value
     self.finalizeAndPushToMaster()
 
-  def addInfoOnLevel2(self, key, value):
-    setattr(self._current_instance,key,value)
-
   def finalizeAndPushToMaster(self):
     self._remote_handle.assign( pickle.dumps( self._current_instance ) )
 
-class PyScriptNode_i (Engines__POA.PyScriptNode,Generic):
+class LogOfCurrentExecutionSessionStub(LogOfCurrentExecutionSessionAbs):
+  """
+  This class is to stub LogOfCurrentExecutionSession in context of replay where the server (handleToCentralizedInst) has vanished
+  """
+  def __init__(self, handleToCentralizedInst = None):
+    super().__init__()
+  def addFreestyleAndFlush(self, value):
+    pass
+
+class PyScriptNode_Abstract_i(Engines__POA.PyScriptNode,Generic,abc.ABC):
   """The implementation of the PyScriptNode CORBA IDL that executes a script"""
-  def __init__(self, nodeName,code,poa,my_container,logscript):
+  def __init__(self, nodeName, code, poa, my_container, logscript):
     """Initialize the node : compilation in the local context"""
     Generic.__init__(self,poa)
     self.nodeName=nodeName
@@ -746,6 +931,10 @@ class PyScriptNode_i (Engines__POA.PyScriptNode,Generic):
     self._log_script = logscript
     self._current_execution_session = None
     sys.stdout.flush() ; sys.stderr.flush() # flush to correctly capture log per execution session
+
+  @abc.abstractmethod
+  def executeNow(self, outargsname):
+    raise RuntimeError("Must be overloaded")
       
   def __del__(self):
     # force removal of self.context. Don t know why it s not done by default
@@ -846,7 +1035,7 @@ class PyScriptNode_i (Engines__POA.PyScriptNode,Generic):
       ##
       self.addInfoOnLevel2("measureTimeResolution",self.my_container_py.monitoringtimeresms())
       with GenericPythonMonitoringLauncherCtxMgr( CPUMemoryMonitoring( self.my_container_py.monitoringtimeresms() ) ) as monitoringParams:
-        exec(self.ccode, self.context)
+        self._current_execution_session._current_instance = self.executeNow( outargsname )
         cpumeminfo = ReadCPUMemInfo( monitoringParams )
       ##
       self.addInfoOnLevel2("CPUMemDuringExec",cpumeminfo)
@@ -930,3 +1119,24 @@ class PyScriptNode_i (Engines__POA.PyScriptNode,Generic):
   def addTimeInfoOnLevel2(self, key):
     from datetime import datetime
     self._current_execution_session.addInfoOnLevel2(key,datetime.now())
+
+class PyScriptNode_i(PyScriptNode_Abstract_i):
+  def __init__(self, nodeName, code, poa, my_container, logscript):
+    super().__init__(nodeName, code, poa, my_container, logscript)
+
+  def executeNow(self, outargsname):
+    return ExecLocal(self.ccode,self.context,outargsname,self.my_container,self._current_execution_session)
+    
+class PyScriptNode_OutOfProcess_i(PyScriptNode_Abstract_i):
+  def __init__(self, nodeName, code, poa, my_container, logscript):
+    super().__init__(nodeName, code, poa, my_container, logscript)
+
+  def executeNow(self, outargsname):
+    return ExecCrashProofWithoutReplay(self.code,self.context,outargsname,self.my_container,self._current_execution_session)
+
+class PyScriptNode_OutOfProcess_Replay_i(PyScriptNode_Abstract_i):
+  def __init__(self, nodeName, code, poa, my_container, logscript):
+    super().__init__(nodeName, code, poa, my_container, logscript)
+
+  def executeNow(self, outargsname):
+    return ExecCrashProofWithReplay(self.code,self.context,outargsname,self.my_container,self._current_execution_session)