From: Ekaterina Sukhareva Date: Thu, 7 Sep 2023 16:54:13 +0000 (+0100) Subject: Merge remote-tracking branch 'origin/vuzlov/26459' into 26459_ZeroMQ X-Git-Url: http://git.salome-platform.org/gitweb/?a=commitdiff_plain;h=refs%2Fheads%2F26459_ZeroMQ;p=modules%2Fkernel.git Merge remote-tracking branch 'origin/vuzlov/26459' into 26459_ZeroMQ --- 727ade3a988d2d3473a5a8496a473da395b9f65b diff --cc CMakeLists.txt index b7882201c,b823656d6..74584d534 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@@ -213,7 -209,7 +215,7 @@@ INCLUDE(CMakePackageConfigHelpers # They all have to be INSTALL'd with the option "EXPORT ${PROJECT_NAME}TargetGroup" SET(_${PROJECT_NAME}_exposed_targets - SALOMELog SALOMEBasics ArgvKeeper SALOMELocalTrace SalomeHDFPersist OpUtil) - SALOMEBasics SALOMELocalTrace SalomeHDFPersist SalomeZeroMQ OpUtil) ++ SALOMELog SALOMEBasics ArgvKeeper SALOMELocalTrace SalomeHDFPersist SalomeZeroMQ OpUtil) # CORBA specific targets: IF(NOT SALOME_LIGHT_ONLY) diff --cc src/Container/SALOME_PyNode.py index 1bb41877d,91a58912e..bc36a4b83 --- a/src/Container/SALOME_PyNode.py +++ b/src/Container/SALOME_PyNode.py @@@ -29,9 -29,8 +29,10 @@@ import pickl import Engines__POA import SALOME__POA import SALOME + from SalomeZeroMQPy import SalomeZeroMQ +MY_CONTAINER_ENTRY_IN_GLBS = "my_container" + class Generic(SALOME__POA.GenericObj): """A Python implementation of the GenericObj CORBA IDL""" def __init__(self,poa): @@@ -104,274 -103,25 +105,292 @@@ class PyNode_i (Engines__POA.PyNode,Gen exc_typ,exc_val,exc_fr=sys.exc_info() l=traceback.format_exception(exc_typ,exc_val,exc_fr) raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyNode: %s, function: %s" % (self.nodeName,funcName),0)) - ++ + def executeZMQ(self, funcName): + """Execute the function funcName found in local context using ZeroMQ""" + try: + input_data = self._zeroMQ.receive_data_zmq() + args,kwargs=pickle.loads(input_data) + func=self.context[funcName] + result_data=func(*args, **kwargs) + output_data=pickle.dumps(result_data, -1) + self._zeroMQ.send_data_zmq(output_data) + except: + exc_typ,exc_val,exc_fr=sys.exc_info() + l=traceback.format_exception(exc_typ,exc_val,exc_fr) + raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyNode: %s, function: %s" % (self.nodeName,funcName),0)) + + def setZmqSocketOpt(self, opt): + self._workingAddr = opt + self._zeroMQ = SalomeZeroMQ(self._workingAddr) +class SenderByte_i(SALOME__POA.SenderByte,Generic): + def __init__(self,poa,bytesToSend): + Generic.__init__(self,poa) + self.bytesToSend = bytesToSend + + def getSize(self): + return len(self.bytesToSend) + + def sendPart(self,n1,n2): + return self.bytesToSend[n1:n2] + +SALOME_FILE_BIG_OBJ_DIR = "SALOME_FILE_BIG_OBJ_DIR" + +SALOME_BIG_OBJ_ON_DISK_THRES_VAR = "SALOME_BIG_OBJ_ON_DISK_THRES" + +# default is 50 MB +SALOME_BIG_OBJ_ON_DISK_THRES_DFT = 50000000 + +from ctypes import c_int +TypeCounter = c_int + +def GetSizeOfTCnt(): + return len( bytes(TypeCounter(0) ) ) + +def GetObjectFromFile(fname): + with open(fname,"rb") as f: + cntb = f.read( GetSizeOfTCnt() ) + cnt = TypeCounter.from_buffer_copy( cntb ).value + obj = pickle.load(f) + return obj,cnt + +def DumpInFile(obj,fname): + with open(fname,"wb") as f: + f.write( bytes( TypeCounter(1) ) ) + f.write( obj ) + +def IncrRefInFile(fname): + with open(fname,"rb") as f: + cntb = f.read( GetSizeOfTCnt() ) + cnt = TypeCounter.from_buffer_copy( cntb ).value + with open(fname,"rb+") as f: + #import KernelServices ; KernelServices.EntryForDebuggerBreakPoint() + f.write( bytes( TypeCounter(cnt+1) ) ) + +def DecrRefInFile(fname): + import os + with open(fname,"rb") as f: + cntb = f.read( GetSizeOfTCnt() ) + cnt = TypeCounter.from_buffer_copy( cntb ).value + # + #import KernelServices ; KernelServices.EntryForDebuggerBreakPoint() + if cnt == 1: + os.unlink( fname ) + else: + with open(fname,"rb+") as f: + f.write( bytes( TypeCounter(cnt-1) ) ) + +def GetBigObjectOnDiskThreshold(): + import os + if SALOME_BIG_OBJ_ON_DISK_THRES_VAR in os.environ: + return int( os.environ[SALOME_BIG_OBJ_ON_DISK_THRES_VAR] ) + else: + return SALOME_BIG_OBJ_ON_DISK_THRES_DFT + +def ActivateProxyMecanismOrNot( sizeInByte ): + thres = GetBigObjectOnDiskThreshold() + if thres == -1: + return False + else: + return sizeInByte > thres + +def GetBigObjectDirectory(): + import os + if SALOME_FILE_BIG_OBJ_DIR not in os.environ: + raise RuntimeError("An object of size higher than limit detected and no directory specified to dump it in file !") + return os.path.expanduser( os.path.expandvars( os.environ[SALOME_FILE_BIG_OBJ_DIR] ) ) + +def GetBigObjectFileName(): + """ + Return a filename in the most secure manner (see tempfile documentation) + """ + import tempfile + with tempfile.NamedTemporaryFile(dir=GetBigObjectDirectory(),prefix="mem_",suffix=".pckl") as f: + ret = f.name + return ret + +class BigObjectOnDiskBase: + def __init__(self, fileName, objSerialized): + """ + :param fileName: the file used to dump into. + :param objSerialized: the object in pickeled form + :type objSerialized: bytes + """ + self._filename = fileName + # attribute _destroy is here to tell client side or server side + # only client side can be with _destroy set to True. server side due to risk of concurrency + # so pickled form of self must be done with this attribute set to False. + self._destroy = False + self.__dumpIntoFile(objSerialized) + + def getDestroyStatus(self): + return self._destroy + + def incrRef(self): + if self._destroy: + IncrRefInFile( self._filename ) + else: + # should never happen ! + RuntimeError("Invalid call to incrRef !") + + def decrRef(self): + if self._destroy: + DecrRefInFile( self._filename ) + else: + # should never happen ! + RuntimeError("Invalid call to decrRef !") + + def unlinkOnDestructor(self): + self._destroy = True + + def doNotTouchFile(self): + """ + Method called slave side. The life cycle management of file is client side not slave side. + """ + self._destroy = False + + def __del__(self): + if self._destroy: + DecrRefInFile( self._filename ) + + def getFileName(self): + return self._filename + + def __dumpIntoFile(self, objSerialized): + DumpInFile( objSerialized, self._filename ) + + def get(self): + obj, _ = GetObjectFromFile( self._filename ) + return obj + + def __float__(self): + return float( self.get() ) + + def __int__(self): + return int( self.get() ) + + def __str__(self): + obj = self.get() + if isinstance(obj,str): + return obj + else: + raise RuntimeError("Not a string") + +class BigObjectOnDisk(BigObjectOnDiskBase): + def __init__(self, fileName, objSerialized): + BigObjectOnDiskBase.__init__(self, fileName, objSerialized) + +class BigObjectOnDiskListElement(BigObjectOnDiskBase): + def __init__(self, pos, length, fileName): + self._filename = fileName + self._destroy = False + self._pos = pos + self._length = length + + def get(self): + fullObj = BigObjectOnDiskBase.get(self) + return fullObj[ self._pos ] + + def __getitem__(self, i): + return self.get()[i] + + def __len__(self): + return len(self.get()) + +class BigObjectOnDiskSequence(BigObjectOnDiskBase): + def __init__(self, length, fileName, objSerialized): + BigObjectOnDiskBase.__init__(self, fileName, objSerialized) + self._length = length + + def __getitem__(self, i): + return BigObjectOnDiskListElement(i, self._length, self.getFileName()) + + def __len__(self): + return self._length + +class BigObjectOnDiskList(BigObjectOnDiskSequence): + def __init__(self, length, fileName, objSerialized): + BigObjectOnDiskSequence.__init__(self, length, fileName, objSerialized) + +class BigObjectOnDiskTuple(BigObjectOnDiskSequence): + def __init__(self, length, fileName, objSerialized): + BigObjectOnDiskSequence.__init__(self, length, fileName, objSerialized) + +def SpoolPickleObject( obj ): + import pickle + pickleObjInit = pickle.dumps( obj , pickle.HIGHEST_PROTOCOL ) + if not ActivateProxyMecanismOrNot( len(pickleObjInit) ): + return pickleObjInit + else: + if isinstance( obj, list): + proxyObj = BigObjectOnDiskList( len(obj), GetBigObjectFileName() , pickleObjInit ) + elif isinstance( obj, tuple): + proxyObj = BigObjectOnDiskTuple( len(obj), GetBigObjectFileName() , pickleObjInit ) + else: + proxyObj = BigObjectOnDisk( GetBigObjectFileName() , pickleObjInit ) + pickleProxy = pickle.dumps( proxyObj , pickle.HIGHEST_PROTOCOL ) + return pickleProxy + +def UnProxyObjectSimple( obj ): + """ + Method to be called in Remote mode. Alterate the obj _status attribute. + Because the slave process does not participate in the reference counting + """ + if isinstance(obj,BigObjectOnDiskBase): + obj.doNotTouchFile() + return obj.get() + elif isinstance( obj, list): + retObj = [] + for elt in obj: + retObj.append( UnProxyObjectSimple(elt) ) + return retObj + else: + return obj + +def UnProxyObjectSimpleLocal( obj ): + """ + Method to be called in Local mode. Do not alterate the PyObj counter + """ + if isinstance(obj,BigObjectOnDiskBase): + return obj.get() + elif isinstance( obj, list): + retObj = [] + for elt in obj: + retObj.append( UnProxyObjectSimpleLocal(elt) ) + return retObj + else: + return obj + +class SeqByteReceiver: + # 2GB limit to trigger split into chunks + CHUNK_SIZE = 2000000000 + def __init__(self,sender): + self._obj = sender + def __del__(self): + self._obj.UnRegister() + pass + def data(self): + size = self._obj.getSize() + if size <= SeqByteReceiver.CHUNK_SIZE: + return self.fetchOneShot( size ) + else: + return self.fetchByChunks( size ) + def fetchOneShot(self,size): + return self._obj.sendPart(0,size) + def fetchByChunks(self,size): + """ + To avoid memory peak parts over 2GB are sent using EFF_CHUNK_SIZE size. + """ + data_for_split_case = bytes(0) + EFF_CHUNK_SIZE = SeqByteReceiver.CHUNK_SIZE // 8 + iStart = 0 ; iEnd = EFF_CHUNK_SIZE + while iStart!=iEnd and iEnd <= size: + part = self._obj.sendPart(iStart,iEnd) + data_for_split_case = bytes(0).join( [data_for_split_case,part] ) + iStart = iEnd; iEnd = min(iStart + EFF_CHUNK_SIZE,size) + return data_for_split_case + class PyScriptNode_i (Engines__POA.PyScriptNode,Generic): """The implementation of the PyScriptNode CORBA IDL that executes a script""" def __init__(self, nodeName,code,poa,my_container): diff --cc src/Container/Test/CMakeLists.txt index fe3991a88,4c01605fa..6eb94f892 --- a/src/Container/Test/CMakeLists.txt +++ b/src/Container/Test/CMakeLists.txt @@@ -18,7 -18,7 +18,7 @@@ # SET(LOCAL_TEST_DIR ${KERNEL_TEST_DIR}/Container) - INSTALL(FILES testcontainer.py testProxy.py DESTINATION ${LOCAL_TEST_DIR}) -INSTALL(FILES testcontainer.py testDataLimitation.py DESTINATION ${LOCAL_TEST_DIR}) ++INSTALL(FILES testcontainer.py testProxy.py testDataLimitation.py DESTINATION ${LOCAL_TEST_DIR}) INSTALL(FILES CTestTestfileInstall.cmake DESTINATION ${LOCAL_TEST_DIR}