From aeed5c8c0e9052fb0d855c98046f18ab190aeb15 Mon Sep 17 00:00:00 2001 From: Anthony Geay Date: Mon, 22 May 2023 13:39:40 +0200 Subject: [PATCH] Implementation of proxy mecanism into KERNEL/YACS --- idl/SALOME_Comm.idl | 2 + idl/SALOME_PyNode.idl | 2 +- src/Container/SALOME_PyNode.py | 102 +++++++++++++++++++++++++++++++-- 3 files changed, 100 insertions(+), 6 deletions(-) diff --git a/idl/SALOME_Comm.idl b/idl/SALOME_Comm.idl index ee4c95341..87bfda9c0 100644 --- a/idl/SALOME_Comm.idl +++ b/idl/SALOME_Comm.idl @@ -141,6 +141,8 @@ module SALOME { unsigned long long getSize(); vectorOfByte sendPart(in unsigned long long n1,in unsigned long long n2); }; + + typedef sequence SenderByteSeq; }; #endif diff --git a/idl/SALOME_PyNode.idl b/idl/SALOME_PyNode.idl index 3a9747995..d4054d40a 100644 --- a/idl/SALOME_PyNode.idl +++ b/idl/SALOME_PyNode.idl @@ -96,7 +96,7 @@ module Engines /*! \brief second and last part of execute method. This split is to reduce the memory peak. */ - SALOME::SenderByte executeSecond(in listofstring outargsname) raises (SALOME::SALOME_Exception); + SALOME::SenderByteSeq executeSecond(in listofstring outargsname) raises (SALOME::SALOME_Exception); pickledArgs getValueOfVarInContext(in string varName) raises (SALOME::SALOME_Exception); diff --git a/src/Container/SALOME_PyNode.py b/src/Container/SALOME_PyNode.py index 4279063d0..5e43428de 100644 --- a/src/Container/SALOME_PyNode.py +++ b/src/Container/SALOME_PyNode.py @@ -113,8 +113,92 @@ class SenderByte_i(SALOME__POA.SenderByte,Generic): def sendPart(self,n1,n2): return self.bytesToSend[n1:n2] + +SALOME_FILE_BIG_OBJ_DIR = "SALOME_FILE_BIG_OBJ_DIR" + +SALOME_BIG_OBJ_ON_DISK_THRES_VAR = "SALOME_BIG_OBJ_ON_DISK_THRES" + +# default is 50 MB +SALOME_BIG_OBJ_ON_DISK_THRES_DFT = 50000000 + +def GetBigObjectOnDiskThreshold(): + import os + if SALOME_BIG_OBJ_ON_DISK_THRES_VAR in os.environ: + return int( os.environ[SALOME_BIG_OBJ_ON_DISK_THRES_VAR] ) + else: + return SALOME_BIG_OBJ_ON_DISK_THRES_DFT + +def GetBigObjectDirectory(): + import os + if SALOME_FILE_BIG_OBJ_DIR not in os.environ: + raise RuntimeError("An object of size higher than limit detected and no directory specified to dump it in file !") + return os.environ[SALOME_FILE_BIG_OBJ_DIR] + +def GetBigObjectFileName(): + """ + Return a filename in the most secure manner (see tempfile documentation) + """ + import tempfile + with tempfile.NamedTemporaryFile(dir=GetBigObjectDirectory(),prefix="mem_",suffix=".pckl") as f: + ret = f.name + return ret + +class BigObjectOnDisk: + def __init__(self, fileName, objSerialized): + """ + :param fileName: the file used to dump into. + :param objSerialized: the object in pickeled form + :type objSerialized: bytes + """ + self._filename = fileName + self._destroy = False + self.__dumpIntoFile(objSerialized) + + def unlinkOnDestructor(self): + self._destroy = True + + def doNotTouchFile(self): + """ + Method called slave side. The life cycle management of file is client side not slave side. + """ + self._destroy = False + + def __del__(self): + if self._destroy: + import os + os.unlink( self._filename ) + + def getFileName(self): + return self._filename + + def __dumpIntoFile(self, objSerialized): + with open(self._filename,"wb") as f: + f.write(objSerialized) + + def get(self): + import pickle + with open(self._filename,"rb") as f: + return pickle.load(f) + +def SpoolPickleObject( obj ): + import pickle + pickleObjInit = pickle.dumps( obj , pickle.HIGHEST_PROTOCOL ) + if len(pickleObjInit) < GetBigObjectOnDiskThreshold(): + return pickleObjInit + else: + proxyObj = BigObjectOnDisk( GetBigObjectFileName() , pickleObjInit ) + pickleProxy = pickle.dumps( proxyObj , pickle.HIGHEST_PROTOCOL ) + return pickleProxy + +def UnProxyObject( obj ): + if isinstance(obj,BigObjectOnDisk): + obj.doNotTouchFile() + return obj.get() + else: + return obj class SeqByteReceiver: + # 2GB limit to trigger split into chunks CHUNK_SIZE = 2000000000 def __init__(self,sender): self._obj = sender @@ -130,6 +214,9 @@ class SeqByteReceiver: def fetchOneShot(self,size): return self._obj.sendPart(0,size) def fetchByChunks(self,size): + """ + To avoid memory peak parts over 2GB are sent using EFF_CHUNK_SIZE size. + """ data_for_split_case = bytes(0) EFF_CHUNK_SIZE = SeqByteReceiver.CHUNK_SIZE // 8 iStart = 0 ; iEnd = EFF_CHUNK_SIZE @@ -207,6 +294,8 @@ class PyScriptNode_i (Engines__POA.PyScriptNode,Generic): argsInPy = SeqByteReceiver( argsin ) data = argsInPy.data() _,kws=pickle.loads(data) + for elt in kws: + kws[elt] = UnProxyObject( kws[elt] ) self.context.update(kws) except Exception: exc_typ,exc_val,exc_fr=sys.exc_info() @@ -222,11 +311,14 @@ class PyScriptNode_i (Engines__POA.PyScriptNode,Generic): if arg not in self.context: raise KeyError("There is no variable %s in context" % arg) argsout.append(self.context[arg]) - argsout=pickle.dumps(tuple(argsout),-1) - ret = SenderByte_i( self.poa,argsout ) - id_o = self.poa.activate_object(ret) - retObj = self.poa.id_to_reference(id_o) - return retObj._narrow( SALOME.SenderByte ) + ret = [ ] + for arg in argsout: + argPickle = SpoolPickleObject( arg ) + retArg = SenderByte_i( self.poa,argPickle ) + id_o = self.poa.activate_object(retArg) + retObj = self.poa.id_to_reference(id_o) + ret.append( retObj._narrow( SALOME.SenderByte ) ) + return ret except Exception: exc_typ,exc_val,exc_fr=sys.exc_info() l=traceback.format_exception(exc_typ,exc_val,exc_fr) -- 2.39.2