MY_CONTAINER_ENTRY_IN_GLBS = "my_container"
+MY_PERFORMANCE_LOG_ENTRY_IN_GLBS = "my_log_4_this_session"
+
class Generic(SALOME__POA.GenericObj):
"""A Python implementation of the GenericObj CORBA IDL"""
def __init__(self,poa):
@property
def outFileName(self):
return self._out_file_name
+
+def FileSystemMonitoring(intervalInMs, dirNameToInspect):
+ """
+ This method loops indefinitely every intervalInMs milliseconds to scan
+ number of inodes and size of content recursively included into the in input directory.
-class GenericPythonMonitoringLauncherCtxMgr:
- def __init__(self, monitoringParams):
- """
- Args:
- ----
- monitoringParams (MonitoringInfo)
- """
- self._monitoring_params = monitoringParams
- def __enter__(self):
- import KernelBasis
- pid = KernelBasis.LaunchMonitoring(self._monitoring_params.pyFileName.filename)
- self._monitoring_params.pid = pid
- return self._monitoring_params
-
- def __exit__(self,exctype, exc, tb):
- StopMonitoring( self._monitoring_params )
-
-def LaunchTimeCPUMonitoring( intervalInMs ):
+ See also CPUMemoryMonitoring
+ """
+ global orb
+ import os
+ dirNameToInspect2 = os.path.abspath( os.path.expanduser(dirNameToInspect) )
+ import tempfile
+ import logging
+ import KernelBasis
+ # outFileNameSave stores the content of outFileName during phase of dumping
+ with tempfile.NamedTemporaryFile(prefix=os.path.basename(dirNameToInspect2),dir=os.path.dirname(dirNameToInspect2)) as f:
+ outFileNameSave = f.name
+ with tempfile.NamedTemporaryFile(prefix="fs_monitor_",suffix=".py") as f:
+ tempPyFile = f.name
+ tempOutFile = "{}.txt".format( os.path.splitext( tempPyFile )[0] )
+ with open(tempPyFile,"w") as f:
+ f.write("""
+import subprocess as sp
+import re
+import os
+import time
+import datetime
+with open("{tempOutFile}","a") as f:
+ while(True):
+ nbinodes = sp.check_output("{{}} | wc -l".format( " ".join(["find",os.path.dirname( "{dirNameToInspect2}" )]), ), shell = True).decode().strip()
+ szOfDirStr = re.split("[\s]+",sp.check_output(["du","-sh",os.path.dirname( "{dirNameToInspect2}" )]).decode())[0]
+ f.write( "{{}}\\n".format( str( datetime.datetime.now().timestamp() ) ) )
+ f.write( "{{}}\\n".format( str( nbinodes ) ) )
+ f.write( "{{}}\\n".format( str( szOfDirStr ) ) )
+ f.flush()
+ time.sleep( {intervalInMs} / 1000.0 )
+""".format( **locals()))
+ logging.debug( "File for FS monitoring dump file : {}".format(tempPyFile) )
+ pyFileName = FileDeleter( tempPyFile )
+ outFileName = FileDeleter( tempOutFile )
+ pid = KernelBasis.LaunchMonitoring( tempPyFile )
+ return MonitoringInfo(pyFileName,outFileName,pid)
+
+def CPUMemoryMonitoring( intervalInMs ):
"""
Launch a subprocess monitoring self process.
This monitoring subprocess is a python process lauching every intervalInMs ms evaluation of
- CPU usage and RSS memory.
+ CPU usage and RSS memory of the calling process.
Communication between subprocess and self is done by file.
+
+ See also FileSystemMonitoring
"""
import KernelBasis
def BuildPythonFileForCPUPercent( intervalInMs ):
import os
import tempfile
- with tempfile.NamedTemporaryFile(prefix="htop_",suffix=".py") as f:
+ with tempfile.NamedTemporaryFile(prefix="cpu_mem_monitor_",suffix=".py") as f:
tempPyFile = f.name
tempOutFile = "{}.txt".format( os.path.splitext( tempPyFile )[0] )
pid = os.getpid()
""".format(pid, tempOutFile, intervalInMs))
return FileDeleter(tempPyFile), FileDeleter(tempOutFile)
pyFileName, outFileName = BuildPythonFileForCPUPercent( intervalInMs )
-
return MonitoringInfo(pyFileName, outFileName, None)
+class GenericPythonMonitoringLauncherCtxMgr:
+ def __init__(self, monitoringParams):
+ """
+ Args:
+ ----
+ monitoringParams (MonitoringInfo)
+ """
+ self._monitoring_params = monitoringParams
+ def __enter__(self):
+ import KernelBasis
+ pid = KernelBasis.LaunchMonitoring(self._monitoring_params.pyFileName.filename)
+ self._monitoring_params.pid = pid
+ return self._monitoring_params
+
+ def __exit__(self,exctype, exc, tb):
+ StopMonitoring( self._monitoring_params )
+
def StopMonitoring( monitoringInfo ):
"""
Kill monitoring subprocess.
def ReadCPUMemInfo( monitoringInfo ):
"""
- Retrieve data of monitoring.
+ Retrieve CPU/Mem data of monitoring.
Args:
----
mem_rss = [ int(elt) for elt in ret[1::2]]
return [(a,b) for a,b in zip(cpu,mem_rss)]
+def ReadInodeSizeInfo( monitoringInfo ):
+ """
+ Retrieve nb of inodes and size of monitoring
+
+ Args:
+ ----
+ monitoringInfo (MonitoringInfo): info returned by LaunchMonitoring
+
+ Returns
+ -------
+ list<datetime,int,str> : list of triplets. First param of triplet is datetimestruct
+ Second param of triplet is #inodes.
+ Thirst param of triplet is size.
+ """
+ import datetime
+ with open(monitoringInfo.outFileName.filename, "r") as f:
+ coarseData = [ elt.strip() for elt in f.readlines() ]
+ tss = [ datetime.datetime.fromtimestamp( float(elt) ) for elt in coarseData[::3] ]
+ nbInodes = [int(elt) for elt in coarseData[1::3]]
+ volumeOfDir = coarseData[2::3]
+ return [(t,a,b) for t,a,b in zip(tss,nbInodes,volumeOfDir)]
+
class SeqByteReceiver:
# 2GB limit to trigger split into chunks
CHUNK_SIZE = 2000000000
self._remote_handle = handleToCentralizedInst
self._current_instance = ScriptExecInfo()
+ def addFreestyleAndFlush(self, value):
+ self._current_instance.freestyle = value
+ self.finalizeAndPushToMaster()
+
def addInfoOnLevel2(self, key, value):
setattr(self._current_instance,key,value)
def executeFirst(self,argsin):
""" Same than first part of self.execute to reduce memory peak."""
+ def ArgInMananger(self,argsin):
+ argsInPy = SeqByteReceiver( argsin )
+ data = argsInPy.data()
+ self.addInfoOnLevel2("inputMem",len(data))
+ _,kws=pickle.loads(data)
+ return kws
try:
self.beginOfCurrentExecutionSession()
- data = None
self.addTimeInfoOnLevel2("startInputTime")
- if True: # to force call of SeqByteReceiver's destructor
- argsInPy = SeqByteReceiver( argsin )
- data = argsInPy.data()
- self.addInfoOnLevel2("inputMem",len(data))
- _,kws=pickle.loads(data)
+ # to force call of SeqByteReceiver's destructor
+ kws = ArgInMananger(self,argsin)
vis = InOutputObjVisitor()
for elt in kws:
# fetch real data if necessary
self.addTimeInfoOnLevel2("startExecTime")
##
self.addInfoOnLevel2("measureTimeResolution",self.my_container_py.monitoringtimeresms())
- with GenericPythonMonitoringLauncherCtxMgr( LaunchTimeCPUMonitoring( self.my_container_py.monitoringtimeresms() ) ) as monitoringParams:
+ with GenericPythonMonitoringLauncherCtxMgr( CPUMemoryMonitoring( self.my_container_py.monitoringtimeresms() ) ) as monitoringParams:
exec(self.ccode, self.context)
cpumeminfo = ReadCPUMemInfo( monitoringParams )
##
def beginOfCurrentExecutionSession(self):
self._current_execution_session = LogOfCurrentExecutionSession( self._log_script.addExecutionSession() )
+ self.context[MY_PERFORMANCE_LOG_ENTRY_IN_GLBS] = self._current_execution_session
def endOfCurrentExecutionSession(self):
self._current_execution_session.finalizeAndPushToMaster()
import pickle
import tempfile
import logging
+from datetime import datetime
def flush():
import sys
salome.logm.clear()
#PROXY_THRES = "-1"
PROXY_THRES = "1"
- with tempfile.TemporaryDirectory() as tmpdirnameMonitoring:
- monitoringFile = os.path.join( str( tmpdirnameMonitoring ), "zeHtop.pckl" )
- monitoringFileTwo = os.path.join( str( tmpdirnameMonitoring ), "zeHtopTwo.pckl" )
- iorLogm = os.path.join( str( tmpdirnameMonitoring ), "logm.ior" )
- with open(iorLogm,"w") as f:
- f.write( salome.orb.object_to_string(salome.logm) )
- logging.debug("Monitoring file : {}".format(monitoringFile))
- with tempfile.TemporaryDirectory() as tmpdirname:
- with salome.LogManagerLaunchMonitoringFileCtxMgr(250,monitoringFile) as monitoringParams:
- pyFileContainingCodeOfMonitoring = monitoringParams.pyFileName.filename
- logging.debug("Python file containing code of monitoring : {}".format(pyFileContainingCodeOfMonitoring))
- val_for_big_obj = str( tmpdirname )
- os.environ["SALOME_FILE_BIG_OBJ_DIR"] = val_for_big_obj
- # Override environement for all containers launched
- salome.cm.SetOverrideEnvForContainersSimple(env = [("SALOME_FILE_BIG_OBJ_DIR",val_for_big_obj),("SALOME_BIG_OBJ_ON_DISK_THRES",PROXY_THRES)])
- salome.cm.SetDeltaTimeBetweenCPUMemMeasureInMilliSecond( 250 )
- cont = salome.cm.GiveContainer(cp)
- logging.debug("{} {}".format(40*"*",cont.getPID()))
- script_st = """
+ with SALOME_PyNode.GenericPythonMonitoringLauncherCtxMgr( SALOME_PyNode.FileSystemMonitoring(1000,salome.__file__) ) as monitoringParamsForFileMaster:
+ with SALOME_PyNode.GenericPythonMonitoringLauncherCtxMgr( SALOME_PyNode.CPUMemoryMonitoring(1000) ) as monitoringParamsMaster:
+ with tempfile.TemporaryDirectory() as tmpdirnameMonitoring:
+ monitoringFile = os.path.join( str( tmpdirnameMonitoring ), "zeHtop.pckl" )
+ monitoringFileTwo = os.path.join( str( tmpdirnameMonitoring ), "zeHtopTwo.pckl" )
+ iorLogm = os.path.join( str( tmpdirnameMonitoring ), "logm.ior" )
+ with open(iorLogm,"w") as f:
+ f.write( salome.orb.object_to_string(salome.logm) )
+ logging.debug("Monitoring file : {}".format(monitoringFile))
+ with tempfile.TemporaryDirectory() as tmpdirname:
+ with salome.LogManagerLaunchMonitoringFileCtxMgr(250,monitoringFile) as monitoringParams:
+ pyFileContainingCodeOfMonitoring = monitoringParams.pyFileName.filename
+ logging.debug("Python file containing code of monitoring : {}".format(pyFileContainingCodeOfMonitoring))
+ val_for_big_obj = str( tmpdirname )
+ os.environ["SALOME_FILE_BIG_OBJ_DIR"] = val_for_big_obj
+ # Override environement for all containers launched
+ salome.cm.SetOverrideEnvForContainersSimple(env = [("SALOME_FILE_BIG_OBJ_DIR",val_for_big_obj),("SALOME_BIG_OBJ_ON_DISK_THRES",PROXY_THRES)])
+ salome.cm.SetDeltaTimeBetweenCPUMemMeasureInMilliSecond( 250 )
+ cont = salome.cm.GiveContainer(cp)
+ logging.debug("{} {}".format(40*"*",cont.getPID()))
+ script_st = """
import logging
import sys
import KernelBasis
+from datetime import datetime
cst = KernelBasis.GetTimeAdjustmentCst()
logging.debug("constant = {}".format(cst))
nbcore = 3
+my_log_4_this_session.addFreestyleAndFlush( ("a",datetime.now()) ) # exemple of custom tracking
print("coucou {} {}".format(len(zeinput0),len(zeinput1)))
logging.debug("debug or not debug")
ob = [ [ bytes(3000000) ] ]
print("Time ellapse spent : {} s".format(ts))
sys.stderr.write("fake error message\\n")
"""
- poa = salome.orb.resolve_initial_references("RootPOA")
- zeinput0 = [ bytes(100000000) ]
- if SALOME_PyNode.GetBigObjectOnDiskThreshold() != -1:
- zeinput0 = SALOME_PyNode.ProxyfyPickeled( zeinput0 )
- zeinput0.unlinkOnDestructor()
- obj = SALOME_PyNode.SenderByte_i(poa,pickle.dumps( (["zeinput0"],{"zeinput0": [zeinput0], "zeinput1": [ [zeinput0], [zeinput0] ] }) ))
- id_o = poa.activate_object(obj)
- refPtr = poa.id_to_reference(id_o)
- pyscript2 = cont.createPyScriptNode("testScript2",script_st)
- pyscript2.executeFirst(refPtr)
- ret2 = pyscript2.executeSecond(["ob","ob2"])# generate a DeprecationWarning: PY_SSIZE_T_CLEAN will be required for '#' formats on debian11 ?
- ret3, fileNamesProxyOut = unProxyfy( ret2 )
- logging.getLogger().debug("test logging 1")
- logging.debug("test logging 2")
- logging.debug( salome.orb.object_to_string( salome.logm ) )
- a = salome.logm.NaiveFetch()
- logging.debug(a)
- logging.debug(a[0][1][0])
- logging.debug( a[0][1][0].get()._input_hdd_mem._data[0]._data[0]._hdd_mem ) # important
- logging.debug( a[0][1][0].get()._input_hdd_mem._data[1]._data[0]._data[0]._hdd_mem ) # important
- fileNameProxyIn = a[0][1][0].get()._input_hdd_mem._data[0]._data[0]._file_name
- logging.debug( fileNameProxyIn )
- del zeinput0
- del ret3
- import gc ; gc.collect()
- if fileNameProxyIn is not None:
- if os.path.exists(fileNameProxyIn):
- raise RuntimeError("Oooops 2")
- for fileNameProxyOut in fileNamesProxyOut:
- if fileNameProxyOut is not None:
- if os.path.exists(fileNameProxyOut):
- raise RuntimeError("Oooops 3")
- # execution #2 inside last
- script_st2 = """
+ poa = salome.orb.resolve_initial_references("RootPOA")
+ zeinput0 = [ bytes(100000000) ]
+ if SALOME_PyNode.GetBigObjectOnDiskThreshold() != -1:
+ zeinput0 = SALOME_PyNode.ProxyfyPickeled( zeinput0 )
+ zeinput0.unlinkOnDestructor()
+ obj = SALOME_PyNode.SenderByte_i(poa,pickle.dumps( (["zeinput0"],{"zeinput0": [zeinput0], "zeinput1": [ [zeinput0], [zeinput0] ] }) ))
+ id_o = poa.activate_object(obj)
+ refPtr = poa.id_to_reference(id_o)
+ pyscript2 = cont.createPyScriptNode("testScript2",script_st)
+ pyscript2.executeFirst(refPtr)
+ ret2 = pyscript2.executeSecond(["ob","ob2"])# generate a DeprecationWarning: PY_SSIZE_T_CLEAN will be required for '#' formats on debian11 ?
+ ret3, fileNamesProxyOut = unProxyfy( ret2 )
+ logging.getLogger().debug("test logging 1")
+ logging.debug("test logging 2")
+ logging.debug( salome.orb.object_to_string( salome.logm ) )
+ a = salome.logm.NaiveFetch()
+ logging.debug(a)
+ fs = a[0][1][0].get().freestyle
+ self.assertEqual(len(fs),1)
+ self.assertEqual(fs[0][0],"a")
+ self.assertTrue( isinstance( fs[0][1], datetime ) )
+ logging.debug(a[0][1][0])
+ logging.debug( a[0][1][0].get()._input_hdd_mem._data[0]._data[0]._hdd_mem ) # important
+ logging.debug( a[0][1][0].get()._input_hdd_mem._data[1]._data[0]._data[0]._hdd_mem ) # important
+ fileNameProxyIn = a[0][1][0].get()._input_hdd_mem._data[0]._data[0]._file_name
+ logging.debug( fileNameProxyIn )
+ del zeinput0
+ del ret3
+ import gc ; gc.collect()
+ if fileNameProxyIn is not None:
+ if os.path.exists(fileNameProxyIn):
+ raise RuntimeError("Oooops 2")
+ for fileNameProxyOut in fileNamesProxyOut:
+ if fileNameProxyOut is not None:
+ if os.path.exists(fileNameProxyOut):
+ raise RuntimeError("Oooops 3")
+ # execution #2 inside last
+ script_st2 = """
import logging
b = 7+a
logging.debug("Execution 2")
import time
time.sleep(1)
"""
- obj2 = SALOME_PyNode.SenderByte_i(poa,pickle.dumps((["a"],{"a":3})))
- id2_o = poa.activate_object(obj2)
- refPtr2 = poa.id_to_reference(id2_o)
- pyscript2.assignNewCompiledCode(script_st2)
- pyscript2.executeFirst(refPtr2)
- ret2_0 = pyscript2.executeSecond(["b"])
- ret2_1, fileNamesProxyOut2 = unProxyfy( ret2_0 )
- logging.debug( fileNamesProxyOut2 )
- a = salome.logm.NaiveFetch()
- del ret2_1
+ obj2 = SALOME_PyNode.SenderByte_i(poa,pickle.dumps((["a"],{"a":3})))
+ id2_o = poa.activate_object(obj2)
+ refPtr2 = poa.id_to_reference(id2_o)
+ pyscript2.assignNewCompiledCode(script_st2)
+ pyscript2.executeFirst(refPtr2)
+ ret2_0 = pyscript2.executeSecond(["b"])
+ ret2_1, fileNamesProxyOut2 = unProxyfy( ret2_0 )
+ logging.debug( fileNamesProxyOut2 )
+ a = salome.logm.NaiveFetch()
+ del ret2_1
+ import gc ; gc.collect()
+ for fileNameProxyOut in fileNamesProxyOut2:
+ if fileNameProxyOut is not None:
+ if os.path.exists(fileNameProxyOut):
+ raise RuntimeError("Oooops 3")
+ #
+ fname = os.path.join(str( tmpdirname ),"perf.log")
+ salome.logm.DumpInFile( fname )
+ logManagerInst0 = salome.LogManagerLoadFromFile( fname )
+ logging.debug( logManagerInst0[0][1][0].get()._input_hdd_mem._data[1]._data[0]._data[0]._hdd_mem ) # important
+ logManagerInst = salome.logm.Fetch(True)
+ logManagerInst2 = salome.logm.Fetch(True)
+ logging.debug( salome.LogManagerLoadFromIORFile( iorLogm )[0][1][0].get() )
+ salome.logm.putStructInFileAtomic(False,monitoringFileTwo)
+ logging.debug( salome.LogManagerLoadFromFile(monitoringFileTwo)[0][1][0].get() )
+ logging.debug( logManagerInst[0][1][0].get()._input_hdd_mem._data[1]._data[0]._data[0]._hdd_mem ) # important
+ self.assertTrue( logManagerInst2[0][1][0].get() is None )
+ self.assertTrue( logManagerInst[0][1][1].get()._output_hdd_mem._data[0]._file_name == fileNamesProxyOut2[0] )
+ logging.debug( logManagerInst[0][1][1].log() )
+ # 2 files because a backup file is stored in case of unexpected kill during
+ self.assertEqual( len( glob.glob("{}*".format(monitoringFile) ) ) , 2 )
+ # leaving MonitoringFile Manager -> backup file is killed
+ pass
+ #self.assertEqual(monitoringFileSafe, monitoringFile)
+ self.assertEqual( len( glob.glob("{}*".format(monitoringFile) ) ) , 1 )
+ logging.debug( salome.LogManagerLoadFromFile(monitoringFile)[0][1][0].get() )
+ del monitoringParams
import gc ; gc.collect()
- for fileNameProxyOut in fileNamesProxyOut2:
- if fileNameProxyOut is not None:
- if os.path.exists(fileNameProxyOut):
- raise RuntimeError("Oooops 3")
- #
- fname = os.path.join(str( tmpdirname ),"perf.log")
- salome.logm.DumpInFile( fname )
- logManagerInst0 = salome.LogManagerLoadFromFile( fname )
- logging.debug( logManagerInst0[0][1][0].get()._input_hdd_mem._data[1]._data[0]._data[0]._hdd_mem ) # important
- logManagerInst = salome.logm.Fetch(True)
- logManagerInst2 = salome.logm.Fetch(True)
- logging.debug( salome.LogManagerLoadFromIORFile( iorLogm )[0][1][0].get() )
- salome.logm.putStructInFileAtomic(False,monitoringFileTwo)
- logging.debug( salome.LogManagerLoadFromFile(monitoringFileTwo)[0][1][0].get() )
- logging.debug( logManagerInst[0][1][0].get()._input_hdd_mem._data[1]._data[0]._data[0]._hdd_mem ) # important
- self.assertTrue( logManagerInst2[0][1][0].get() is None )
- self.assertTrue( logManagerInst[0][1][1].get()._output_hdd_mem._data[0]._file_name == fileNamesProxyOut2[0] )
- logging.debug( logManagerInst[0][1][1].log() )
- # 2 files because a backup file is stored in case of unexpected kill during
- self.assertEqual( len( glob.glob("{}*".format(monitoringFile) ) ) , 2 )
- # leaving MonitoringFile Manager -> backup file is killed
- pass
- #self.assertEqual(monitoringFileSafe, monitoringFile)
- self.assertEqual( len( glob.glob("{}*".format(monitoringFile) ) ) , 1 )
- logging.debug( salome.LogManagerLoadFromFile(monitoringFile)[0][1][0].get() )
- del monitoringParams
- import gc ; gc.collect()
- self.assertFalse( os.path.exists(pyFileContainingCodeOfMonitoring) )
- cont.Shutdown()
+ self.assertFalse( os.path.exists(pyFileContainingCodeOfMonitoring) )
+ cont.Shutdown()
+ cpumeminfo = SALOME_PyNode.ReadCPUMemInfo( monitoringParamsMaster )
+ dirInfo = SALOME_PyNode.ReadInodeSizeInfo( monitoringParamsForFileMaster )
+ logging.debug( cpumeminfo )
+ logging.debug( dirInfo )
def testEasyNamingService(self):
"""