From 5904906cb9a59e52304363334c5c7c93615ee52f Mon Sep 17 00:00:00 2001 From: Anthony Geay Date: Fri, 26 Jan 2024 16:40:41 +0100 Subject: [PATCH] [EDF29150] : keep track of File System usage --- src/Container/SALOME_ContainerHelper.py | 9 ++ src/Container/SALOME_PyNode.py | 132 +++++++++++---- src/Launcher/Test/testPerfLogManager1.py | 197 ++++++++++++----------- 3 files changed, 217 insertions(+), 121 deletions(-) diff --git a/src/Container/SALOME_ContainerHelper.py b/src/Container/SALOME_ContainerHelper.py index 8395a013f..24bbba907 100644 --- a/src/Container/SALOME_ContainerHelper.py +++ b/src/Container/SALOME_ContainerHelper.py @@ -64,6 +64,15 @@ class ScriptExecInfo: self._output_hdd_mem = None self._start_pos_log = None self._stop_pos_log = None + self._freestyle_log = [] + + @property + def freestyle(self): + return self._freestyle_log + + @freestyle.setter + def freestyle(self, value): + self._freestyle_log.append( value ) @property def measureTimeResolution(self): diff --git a/src/Container/SALOME_PyNode.py b/src/Container/SALOME_PyNode.py index 84fd6b746..d36e36908 100644 --- a/src/Container/SALOME_PyNode.py +++ b/src/Container/SALOME_PyNode.py @@ -36,6 +36,8 @@ from SALOME_ContainerHelper import ScriptExecInfo MY_CONTAINER_ENTRY_IN_GLBS = "my_container" +MY_PERFORMANCE_LOG_ENTRY_IN_GLBS = "my_log_4_this_session" + class Generic(SALOME__POA.GenericObj): """A Python implementation of the GenericObj CORBA IDL""" def __init__(self,poa): @@ -426,36 +428,63 @@ class MonitoringInfo: @property def outFileName(self): return self._out_file_name + +def FileSystemMonitoring(intervalInMs, dirNameToInspect): + """ + This method loops indefinitely every intervalInMs milliseconds to scan + number of inodes and size of content recursively included into the in input directory. -class GenericPythonMonitoringLauncherCtxMgr: - def __init__(self, monitoringParams): - """ - Args: - ---- - monitoringParams (MonitoringInfo) - """ - self._monitoring_params = monitoringParams - def __enter__(self): - import KernelBasis - pid = KernelBasis.LaunchMonitoring(self._monitoring_params.pyFileName.filename) - self._monitoring_params.pid = pid - return self._monitoring_params - - def __exit__(self,exctype, exc, tb): - StopMonitoring( self._monitoring_params ) - -def LaunchTimeCPUMonitoring( intervalInMs ): + See also CPUMemoryMonitoring + """ + global orb + import os + dirNameToInspect2 = os.path.abspath( os.path.expanduser(dirNameToInspect) ) + import tempfile + import logging + import KernelBasis + # outFileNameSave stores the content of outFileName during phase of dumping + with tempfile.NamedTemporaryFile(prefix=os.path.basename(dirNameToInspect2),dir=os.path.dirname(dirNameToInspect2)) as f: + outFileNameSave = f.name + with tempfile.NamedTemporaryFile(prefix="fs_monitor_",suffix=".py") as f: + tempPyFile = f.name + tempOutFile = "{}.txt".format( os.path.splitext( tempPyFile )[0] ) + with open(tempPyFile,"w") as f: + f.write(""" +import subprocess as sp +import re +import os +import time +import datetime +with open("{tempOutFile}","a") as f: + while(True): + nbinodes = sp.check_output("{{}} | wc -l".format( " ".join(["find",os.path.dirname( "{dirNameToInspect2}" )]), ), shell = True).decode().strip() + szOfDirStr = re.split("[\s]+",sp.check_output(["du","-sh",os.path.dirname( "{dirNameToInspect2}" )]).decode())[0] + f.write( "{{}}\\n".format( str( datetime.datetime.now().timestamp() ) ) ) + f.write( "{{}}\\n".format( str( nbinodes ) ) ) + f.write( "{{}}\\n".format( str( szOfDirStr ) ) ) + f.flush() + time.sleep( {intervalInMs} / 1000.0 ) +""".format( **locals())) + logging.debug( "File for FS monitoring dump file : {}".format(tempPyFile) ) + pyFileName = FileDeleter( tempPyFile ) + outFileName = FileDeleter( tempOutFile ) + pid = KernelBasis.LaunchMonitoring( tempPyFile ) + return MonitoringInfo(pyFileName,outFileName,pid) + +def CPUMemoryMonitoring( intervalInMs ): """ Launch a subprocess monitoring self process. This monitoring subprocess is a python process lauching every intervalInMs ms evaluation of - CPU usage and RSS memory. + CPU usage and RSS memory of the calling process. Communication between subprocess and self is done by file. + + See also FileSystemMonitoring """ import KernelBasis def BuildPythonFileForCPUPercent( intervalInMs ): import os import tempfile - with tempfile.NamedTemporaryFile(prefix="htop_",suffix=".py") as f: + with tempfile.NamedTemporaryFile(prefix="cpu_mem_monitor_",suffix=".py") as f: tempPyFile = f.name tempOutFile = "{}.txt".format( os.path.splitext( tempPyFile )[0] ) pid = os.getpid() @@ -473,9 +502,25 @@ with open("{}","a") as f: """.format(pid, tempOutFile, intervalInMs)) return FileDeleter(tempPyFile), FileDeleter(tempOutFile) pyFileName, outFileName = BuildPythonFileForCPUPercent( intervalInMs ) - return MonitoringInfo(pyFileName, outFileName, None) +class GenericPythonMonitoringLauncherCtxMgr: + def __init__(self, monitoringParams): + """ + Args: + ---- + monitoringParams (MonitoringInfo) + """ + self._monitoring_params = monitoringParams + def __enter__(self): + import KernelBasis + pid = KernelBasis.LaunchMonitoring(self._monitoring_params.pyFileName.filename) + self._monitoring_params.pid = pid + return self._monitoring_params + + def __exit__(self,exctype, exc, tb): + StopMonitoring( self._monitoring_params ) + def StopMonitoring( monitoringInfo ): """ Kill monitoring subprocess. @@ -489,7 +534,7 @@ def StopMonitoring( monitoringInfo ): def ReadCPUMemInfo( monitoringInfo ): """ - Retrieve data of monitoring. + Retrieve CPU/Mem data of monitoring. Args: ---- @@ -505,6 +550,28 @@ def ReadCPUMemInfo( monitoringInfo ): mem_rss = [ int(elt) for elt in ret[1::2]] return [(a,b) for a,b in zip(cpu,mem_rss)] +def ReadInodeSizeInfo( monitoringInfo ): + """ + Retrieve nb of inodes and size of monitoring + + Args: + ---- + monitoringInfo (MonitoringInfo): info returned by LaunchMonitoring + + Returns + ------- + list : list of triplets. First param of triplet is datetimestruct + Second param of triplet is #inodes. + Thirst param of triplet is size. + """ + import datetime + with open(monitoringInfo.outFileName.filename, "r") as f: + coarseData = [ elt.strip() for elt in f.readlines() ] + tss = [ datetime.datetime.fromtimestamp( float(elt) ) for elt in coarseData[::3] ] + nbInodes = [int(elt) for elt in coarseData[1::3]] + volumeOfDir = coarseData[2::3] + return [(t,a,b) for t,a,b in zip(tss,nbInodes,volumeOfDir)] + class SeqByteReceiver: # 2GB limit to trigger split into chunks CHUNK_SIZE = 2000000000 @@ -539,6 +606,10 @@ class LogOfCurrentExecutionSession: self._remote_handle = handleToCentralizedInst self._current_instance = ScriptExecInfo() + def addFreestyleAndFlush(self, value): + self._current_instance.freestyle = value + self.finalizeAndPushToMaster() + def addInfoOnLevel2(self, key, value): setattr(self._current_instance,key,value) @@ -629,15 +700,17 @@ class PyScriptNode_i (Engines__POA.PyScriptNode,Generic): def executeFirst(self,argsin): """ Same than first part of self.execute to reduce memory peak.""" + def ArgInMananger(self,argsin): + argsInPy = SeqByteReceiver( argsin ) + data = argsInPy.data() + self.addInfoOnLevel2("inputMem",len(data)) + _,kws=pickle.loads(data) + return kws try: self.beginOfCurrentExecutionSession() - data = None self.addTimeInfoOnLevel2("startInputTime") - if True: # to force call of SeqByteReceiver's destructor - argsInPy = SeqByteReceiver( argsin ) - data = argsInPy.data() - self.addInfoOnLevel2("inputMem",len(data)) - _,kws=pickle.loads(data) + # to force call of SeqByteReceiver's destructor + kws = ArgInMananger(self,argsin) vis = InOutputObjVisitor() for elt in kws: # fetch real data if necessary @@ -658,7 +731,7 @@ class PyScriptNode_i (Engines__POA.PyScriptNode,Generic): self.addTimeInfoOnLevel2("startExecTime") ## self.addInfoOnLevel2("measureTimeResolution",self.my_container_py.monitoringtimeresms()) - with GenericPythonMonitoringLauncherCtxMgr( LaunchTimeCPUMonitoring( self.my_container_py.monitoringtimeresms() ) ) as monitoringParams: + with GenericPythonMonitoringLauncherCtxMgr( CPUMemoryMonitoring( self.my_container_py.monitoringtimeresms() ) ) as monitoringParams: exec(self.ccode, self.context) cpumeminfo = ReadCPUMemInfo( monitoringParams ) ## @@ -731,6 +804,7 @@ class PyScriptNode_i (Engines__POA.PyScriptNode,Generic): def beginOfCurrentExecutionSession(self): self._current_execution_session = LogOfCurrentExecutionSession( self._log_script.addExecutionSession() ) + self.context[MY_PERFORMANCE_LOG_ENTRY_IN_GLBS] = self._current_execution_session def endOfCurrentExecutionSession(self): self._current_execution_session.finalizeAndPushToMaster() diff --git a/src/Launcher/Test/testPerfLogManager1.py b/src/Launcher/Test/testPerfLogManager1.py index d7f64fe03..c2f27fdec 100644 --- a/src/Launcher/Test/testPerfLogManager1.py +++ b/src/Launcher/Test/testPerfLogManager1.py @@ -29,6 +29,7 @@ import glob import pickle import tempfile import logging +from datetime import datetime def flush(): import sys @@ -67,31 +68,35 @@ class testPerfLogManager1(unittest.TestCase): salome.logm.clear() #PROXY_THRES = "-1" PROXY_THRES = "1" - with tempfile.TemporaryDirectory() as tmpdirnameMonitoring: - monitoringFile = os.path.join( str( tmpdirnameMonitoring ), "zeHtop.pckl" ) - monitoringFileTwo = os.path.join( str( tmpdirnameMonitoring ), "zeHtopTwo.pckl" ) - iorLogm = os.path.join( str( tmpdirnameMonitoring ), "logm.ior" ) - with open(iorLogm,"w") as f: - f.write( salome.orb.object_to_string(salome.logm) ) - logging.debug("Monitoring file : {}".format(monitoringFile)) - with tempfile.TemporaryDirectory() as tmpdirname: - with salome.LogManagerLaunchMonitoringFileCtxMgr(250,monitoringFile) as monitoringParams: - pyFileContainingCodeOfMonitoring = monitoringParams.pyFileName.filename - logging.debug("Python file containing code of monitoring : {}".format(pyFileContainingCodeOfMonitoring)) - val_for_big_obj = str( tmpdirname ) - os.environ["SALOME_FILE_BIG_OBJ_DIR"] = val_for_big_obj - # Override environement for all containers launched - salome.cm.SetOverrideEnvForContainersSimple(env = [("SALOME_FILE_BIG_OBJ_DIR",val_for_big_obj),("SALOME_BIG_OBJ_ON_DISK_THRES",PROXY_THRES)]) - salome.cm.SetDeltaTimeBetweenCPUMemMeasureInMilliSecond( 250 ) - cont = salome.cm.GiveContainer(cp) - logging.debug("{} {}".format(40*"*",cont.getPID())) - script_st = """ + with SALOME_PyNode.GenericPythonMonitoringLauncherCtxMgr( SALOME_PyNode.FileSystemMonitoring(1000,salome.__file__) ) as monitoringParamsForFileMaster: + with SALOME_PyNode.GenericPythonMonitoringLauncherCtxMgr( SALOME_PyNode.CPUMemoryMonitoring(1000) ) as monitoringParamsMaster: + with tempfile.TemporaryDirectory() as tmpdirnameMonitoring: + monitoringFile = os.path.join( str( tmpdirnameMonitoring ), "zeHtop.pckl" ) + monitoringFileTwo = os.path.join( str( tmpdirnameMonitoring ), "zeHtopTwo.pckl" ) + iorLogm = os.path.join( str( tmpdirnameMonitoring ), "logm.ior" ) + with open(iorLogm,"w") as f: + f.write( salome.orb.object_to_string(salome.logm) ) + logging.debug("Monitoring file : {}".format(monitoringFile)) + with tempfile.TemporaryDirectory() as tmpdirname: + with salome.LogManagerLaunchMonitoringFileCtxMgr(250,monitoringFile) as monitoringParams: + pyFileContainingCodeOfMonitoring = monitoringParams.pyFileName.filename + logging.debug("Python file containing code of monitoring : {}".format(pyFileContainingCodeOfMonitoring)) + val_for_big_obj = str( tmpdirname ) + os.environ["SALOME_FILE_BIG_OBJ_DIR"] = val_for_big_obj + # Override environement for all containers launched + salome.cm.SetOverrideEnvForContainersSimple(env = [("SALOME_FILE_BIG_OBJ_DIR",val_for_big_obj),("SALOME_BIG_OBJ_ON_DISK_THRES",PROXY_THRES)]) + salome.cm.SetDeltaTimeBetweenCPUMemMeasureInMilliSecond( 250 ) + cont = salome.cm.GiveContainer(cp) + logging.debug("{} {}".format(40*"*",cont.getPID())) + script_st = """ import logging import sys import KernelBasis +from datetime import datetime cst = KernelBasis.GetTimeAdjustmentCst() logging.debug("constant = {}".format(cst)) nbcore = 3 +my_log_4_this_session.addFreestyleAndFlush( ("a",datetime.now()) ) # exemple of custom tracking print("coucou {} {}".format(len(zeinput0),len(zeinput1))) logging.debug("debug or not debug") ob = [ [ bytes(3000000) ] ] @@ -102,86 +107,94 @@ pihm, ts = KernelBasis.HeatMarcel(1 * nbcore * cst,nbcore) print("Time ellapse spent : {} s".format(ts)) sys.stderr.write("fake error message\\n") """ - poa = salome.orb.resolve_initial_references("RootPOA") - zeinput0 = [ bytes(100000000) ] - if SALOME_PyNode.GetBigObjectOnDiskThreshold() != -1: - zeinput0 = SALOME_PyNode.ProxyfyPickeled( zeinput0 ) - zeinput0.unlinkOnDestructor() - obj = SALOME_PyNode.SenderByte_i(poa,pickle.dumps( (["zeinput0"],{"zeinput0": [zeinput0], "zeinput1": [ [zeinput0], [zeinput0] ] }) )) - id_o = poa.activate_object(obj) - refPtr = poa.id_to_reference(id_o) - pyscript2 = cont.createPyScriptNode("testScript2",script_st) - pyscript2.executeFirst(refPtr) - ret2 = pyscript2.executeSecond(["ob","ob2"])# generate a DeprecationWarning: PY_SSIZE_T_CLEAN will be required for '#' formats on debian11 ? - ret3, fileNamesProxyOut = unProxyfy( ret2 ) - logging.getLogger().debug("test logging 1") - logging.debug("test logging 2") - logging.debug( salome.orb.object_to_string( salome.logm ) ) - a = salome.logm.NaiveFetch() - logging.debug(a) - logging.debug(a[0][1][0]) - logging.debug( a[0][1][0].get()._input_hdd_mem._data[0]._data[0]._hdd_mem ) # important - logging.debug( a[0][1][0].get()._input_hdd_mem._data[1]._data[0]._data[0]._hdd_mem ) # important - fileNameProxyIn = a[0][1][0].get()._input_hdd_mem._data[0]._data[0]._file_name - logging.debug( fileNameProxyIn ) - del zeinput0 - del ret3 - import gc ; gc.collect() - if fileNameProxyIn is not None: - if os.path.exists(fileNameProxyIn): - raise RuntimeError("Oooops 2") - for fileNameProxyOut in fileNamesProxyOut: - if fileNameProxyOut is not None: - if os.path.exists(fileNameProxyOut): - raise RuntimeError("Oooops 3") - # execution #2 inside last - script_st2 = """ + poa = salome.orb.resolve_initial_references("RootPOA") + zeinput0 = [ bytes(100000000) ] + if SALOME_PyNode.GetBigObjectOnDiskThreshold() != -1: + zeinput0 = SALOME_PyNode.ProxyfyPickeled( zeinput0 ) + zeinput0.unlinkOnDestructor() + obj = SALOME_PyNode.SenderByte_i(poa,pickle.dumps( (["zeinput0"],{"zeinput0": [zeinput0], "zeinput1": [ [zeinput0], [zeinput0] ] }) )) + id_o = poa.activate_object(obj) + refPtr = poa.id_to_reference(id_o) + pyscript2 = cont.createPyScriptNode("testScript2",script_st) + pyscript2.executeFirst(refPtr) + ret2 = pyscript2.executeSecond(["ob","ob2"])# generate a DeprecationWarning: PY_SSIZE_T_CLEAN will be required for '#' formats on debian11 ? + ret3, fileNamesProxyOut = unProxyfy( ret2 ) + logging.getLogger().debug("test logging 1") + logging.debug("test logging 2") + logging.debug( salome.orb.object_to_string( salome.logm ) ) + a = salome.logm.NaiveFetch() + logging.debug(a) + fs = a[0][1][0].get().freestyle + self.assertEqual(len(fs),1) + self.assertEqual(fs[0][0],"a") + self.assertTrue( isinstance( fs[0][1], datetime ) ) + logging.debug(a[0][1][0]) + logging.debug( a[0][1][0].get()._input_hdd_mem._data[0]._data[0]._hdd_mem ) # important + logging.debug( a[0][1][0].get()._input_hdd_mem._data[1]._data[0]._data[0]._hdd_mem ) # important + fileNameProxyIn = a[0][1][0].get()._input_hdd_mem._data[0]._data[0]._file_name + logging.debug( fileNameProxyIn ) + del zeinput0 + del ret3 + import gc ; gc.collect() + if fileNameProxyIn is not None: + if os.path.exists(fileNameProxyIn): + raise RuntimeError("Oooops 2") + for fileNameProxyOut in fileNamesProxyOut: + if fileNameProxyOut is not None: + if os.path.exists(fileNameProxyOut): + raise RuntimeError("Oooops 3") + # execution #2 inside last + script_st2 = """ import logging b = 7+a logging.debug("Execution 2") import time time.sleep(1) """ - obj2 = SALOME_PyNode.SenderByte_i(poa,pickle.dumps((["a"],{"a":3}))) - id2_o = poa.activate_object(obj2) - refPtr2 = poa.id_to_reference(id2_o) - pyscript2.assignNewCompiledCode(script_st2) - pyscript2.executeFirst(refPtr2) - ret2_0 = pyscript2.executeSecond(["b"]) - ret2_1, fileNamesProxyOut2 = unProxyfy( ret2_0 ) - logging.debug( fileNamesProxyOut2 ) - a = salome.logm.NaiveFetch() - del ret2_1 + obj2 = SALOME_PyNode.SenderByte_i(poa,pickle.dumps((["a"],{"a":3}))) + id2_o = poa.activate_object(obj2) + refPtr2 = poa.id_to_reference(id2_o) + pyscript2.assignNewCompiledCode(script_st2) + pyscript2.executeFirst(refPtr2) + ret2_0 = pyscript2.executeSecond(["b"]) + ret2_1, fileNamesProxyOut2 = unProxyfy( ret2_0 ) + logging.debug( fileNamesProxyOut2 ) + a = salome.logm.NaiveFetch() + del ret2_1 + import gc ; gc.collect() + for fileNameProxyOut in fileNamesProxyOut2: + if fileNameProxyOut is not None: + if os.path.exists(fileNameProxyOut): + raise RuntimeError("Oooops 3") + # + fname = os.path.join(str( tmpdirname ),"perf.log") + salome.logm.DumpInFile( fname ) + logManagerInst0 = salome.LogManagerLoadFromFile( fname ) + logging.debug( logManagerInst0[0][1][0].get()._input_hdd_mem._data[1]._data[0]._data[0]._hdd_mem ) # important + logManagerInst = salome.logm.Fetch(True) + logManagerInst2 = salome.logm.Fetch(True) + logging.debug( salome.LogManagerLoadFromIORFile( iorLogm )[0][1][0].get() ) + salome.logm.putStructInFileAtomic(False,monitoringFileTwo) + logging.debug( salome.LogManagerLoadFromFile(monitoringFileTwo)[0][1][0].get() ) + logging.debug( logManagerInst[0][1][0].get()._input_hdd_mem._data[1]._data[0]._data[0]._hdd_mem ) # important + self.assertTrue( logManagerInst2[0][1][0].get() is None ) + self.assertTrue( logManagerInst[0][1][1].get()._output_hdd_mem._data[0]._file_name == fileNamesProxyOut2[0] ) + logging.debug( logManagerInst[0][1][1].log() ) + # 2 files because a backup file is stored in case of unexpected kill during + self.assertEqual( len( glob.glob("{}*".format(monitoringFile) ) ) , 2 ) + # leaving MonitoringFile Manager -> backup file is killed + pass + #self.assertEqual(monitoringFileSafe, monitoringFile) + self.assertEqual( len( glob.glob("{}*".format(monitoringFile) ) ) , 1 ) + logging.debug( salome.LogManagerLoadFromFile(monitoringFile)[0][1][0].get() ) + del monitoringParams import gc ; gc.collect() - for fileNameProxyOut in fileNamesProxyOut2: - if fileNameProxyOut is not None: - if os.path.exists(fileNameProxyOut): - raise RuntimeError("Oooops 3") - # - fname = os.path.join(str( tmpdirname ),"perf.log") - salome.logm.DumpInFile( fname ) - logManagerInst0 = salome.LogManagerLoadFromFile( fname ) - logging.debug( logManagerInst0[0][1][0].get()._input_hdd_mem._data[1]._data[0]._data[0]._hdd_mem ) # important - logManagerInst = salome.logm.Fetch(True) - logManagerInst2 = salome.logm.Fetch(True) - logging.debug( salome.LogManagerLoadFromIORFile( iorLogm )[0][1][0].get() ) - salome.logm.putStructInFileAtomic(False,monitoringFileTwo) - logging.debug( salome.LogManagerLoadFromFile(monitoringFileTwo)[0][1][0].get() ) - logging.debug( logManagerInst[0][1][0].get()._input_hdd_mem._data[1]._data[0]._data[0]._hdd_mem ) # important - self.assertTrue( logManagerInst2[0][1][0].get() is None ) - self.assertTrue( logManagerInst[0][1][1].get()._output_hdd_mem._data[0]._file_name == fileNamesProxyOut2[0] ) - logging.debug( logManagerInst[0][1][1].log() ) - # 2 files because a backup file is stored in case of unexpected kill during - self.assertEqual( len( glob.glob("{}*".format(monitoringFile) ) ) , 2 ) - # leaving MonitoringFile Manager -> backup file is killed - pass - #self.assertEqual(monitoringFileSafe, monitoringFile) - self.assertEqual( len( glob.glob("{}*".format(monitoringFile) ) ) , 1 ) - logging.debug( salome.LogManagerLoadFromFile(monitoringFile)[0][1][0].get() ) - del monitoringParams - import gc ; gc.collect() - self.assertFalse( os.path.exists(pyFileContainingCodeOfMonitoring) ) - cont.Shutdown() + self.assertFalse( os.path.exists(pyFileContainingCodeOfMonitoring) ) + cont.Shutdown() + cpumeminfo = SALOME_PyNode.ReadCPUMemInfo( monitoringParamsMaster ) + dirInfo = SALOME_PyNode.ReadInodeSizeInfo( monitoringParamsForFileMaster ) + logging.debug( cpumeminfo ) + logging.debug( dirInfo ) def testEasyNamingService(self): """ -- 2.39.2