]> SALOME platform Git repositories - modules/kernel.git/commitdiff
Salome HOME
Merge remote-tracking branch 'origin/vuzlov/26459' into 26459_ZeroMQ 26459_ZeroMQ
authorEkaterina Sukhareva <ekaterina.sukhareva@opencascade.com>
Thu, 7 Sep 2023 16:54:13 +0000 (17:54 +0100)
committerEkaterina Sukhareva <ekaterina.sukhareva@opencascade.com>
Thu, 7 Sep 2023 16:54:13 +0000 (17:54 +0100)
14 files changed:
1  2 
CMakeLists.txt
CTestTestfileInstall.cmake.in
SalomeKERNELConfig.cmake.in
bin/ORBConfigFile.py
idl/SALOME_Component.idl
idl/SALOME_PyNode.idl
src/Basics/Basics_Utils.cxx
src/Basics/Basics_Utils.hxx
src/CMakeLists.txt
src/Container/Container_i.cxx
src/Container/SALOME_ContainerPy.py
src/Container/SALOME_Container_i.hxx
src/Container/SALOME_PyNode.py
src/Container/Test/CMakeLists.txt

diff --cc CMakeLists.txt
index b7882201c11d5e48d487a62afc5d1ce104b7e1f5,b823656d67a1ffc41b6aa933f99a318eed95e747..74584d534b1196ed9f1d1b3f417dba1afb3477e4
@@@ -213,7 -209,7 +215,7 @@@ INCLUDE(CMakePackageConfigHelpers
  # They all have to be INSTALL'd with the option "EXPORT ${PROJECT_NAME}TargetGroup"
  
  SET(_${PROJECT_NAME}_exposed_targets
-   SALOMELog SALOMEBasics ArgvKeeper SALOMELocalTrace SalomeHDFPersist OpUtil)
 -  SALOMEBasics SALOMELocalTrace SalomeHDFPersist SalomeZeroMQ OpUtil)
++  SALOMELog SALOMEBasics ArgvKeeper SALOMELocalTrace SalomeHDFPersist SalomeZeroMQ OpUtil)
  
  # CORBA specific targets:
  IF(NOT SALOME_LIGHT_ONLY)
Simple merge
Simple merge
Simple merge
Simple merge
Simple merge
Simple merge
Simple merge
Simple merge
Simple merge
Simple merge
Simple merge
index 1bb41877d1db9f41a55fed7dc982365fc391d531,91a58912ef0be3a8a4f2a7c8b6631e814d51ac58..bc36a4b8327cd69bc59ecad1b3ac46fa2462a404
@@@ -29,9 -29,8 +29,10 @@@ import pickl
  import Engines__POA
  import SALOME__POA
  import SALOME
+ from SalomeZeroMQPy import SalomeZeroMQ
  
 +MY_CONTAINER_ENTRY_IN_GLBS = "my_container"
 +
  class Generic(SALOME__POA.GenericObj):
    """A Python implementation of the GenericObj CORBA IDL"""
    def __init__(self,poa):
@@@ -104,274 -103,25 +105,292 @@@ class PyNode_i (Engines__POA.PyNode,Gen
        exc_typ,exc_val,exc_fr=sys.exc_info()
        l=traceback.format_exception(exc_typ,exc_val,exc_fr)
        raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyNode: %s, function: %s" % (self.nodeName,funcName),0))
 -
++    
+   def executeZMQ(self, funcName):
+     """Execute the function funcName found in local context using ZeroMQ"""
+     try:
+       input_data = self._zeroMQ.receive_data_zmq()
+       args,kwargs=pickle.loads(input_data)
+       func=self.context[funcName]
+       result_data=func(*args, **kwargs)
+       output_data=pickle.dumps(result_data, -1)
+       self._zeroMQ.send_data_zmq(output_data)
+     except:
+       exc_typ,exc_val,exc_fr=sys.exc_info()
+       l=traceback.format_exception(exc_typ,exc_val,exc_fr)
+       raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyNode: %s, function: %s" % (self.nodeName,funcName),0))
+   def setZmqSocketOpt(self, opt):
+     self._workingAddr = opt
+     self._zeroMQ = SalomeZeroMQ(self._workingAddr)
  
 +class SenderByte_i(SALOME__POA.SenderByte,Generic):
 +  def __init__(self,poa,bytesToSend):
 +    Generic.__init__(self,poa)
 +    self.bytesToSend = bytesToSend
 +
 +  def getSize(self):
 +    return len(self.bytesToSend)
 +
 +  def sendPart(self,n1,n2):
 +    return self.bytesToSend[n1:n2]
 +
 +SALOME_FILE_BIG_OBJ_DIR = "SALOME_FILE_BIG_OBJ_DIR"
 +    
 +SALOME_BIG_OBJ_ON_DISK_THRES_VAR = "SALOME_BIG_OBJ_ON_DISK_THRES"
 +
 +# default is 50 MB
 +SALOME_BIG_OBJ_ON_DISK_THRES_DFT = 50000000
 +
 +from ctypes import c_int
 +TypeCounter = c_int
 +
 +def GetSizeOfTCnt():
 +  return len( bytes(TypeCounter(0) ) )
 +
 +def GetObjectFromFile(fname):
 +  with open(fname,"rb") as f:
 +    cntb = f.read( GetSizeOfTCnt() )
 +    cnt = TypeCounter.from_buffer_copy( cntb ).value
 +    obj = pickle.load(f)
 +  return obj,cnt
 +
 +def DumpInFile(obj,fname):
 +  with open(fname,"wb") as f:
 +    f.write( bytes( TypeCounter(1) ) )
 +    f.write( obj )
 +
 +def IncrRefInFile(fname):
 +  with open(fname,"rb") as f:
 +    cntb = f.read( GetSizeOfTCnt() )
 +  cnt = TypeCounter.from_buffer_copy( cntb ).value
 +  with open(fname,"rb+") as f:
 +    #import KernelServices ; KernelServices.EntryForDebuggerBreakPoint()
 +    f.write( bytes( TypeCounter(cnt+1) ) )
 +
 +def DecrRefInFile(fname):
 +  import os
 +  with open(fname,"rb") as f:
 +    cntb = f.read( GetSizeOfTCnt() )
 +  cnt = TypeCounter.from_buffer_copy( cntb ).value
 +  #
 +  #import KernelServices ; KernelServices.EntryForDebuggerBreakPoint()
 +  if cnt == 1:
 +    os.unlink( fname )
 +  else:
 +    with open(fname,"rb+") as f:
 +        f.write( bytes( TypeCounter(cnt-1) ) )
 +
 +def GetBigObjectOnDiskThreshold():
 +  import os
 +  if SALOME_BIG_OBJ_ON_DISK_THRES_VAR in os.environ:
 +    return int( os.environ[SALOME_BIG_OBJ_ON_DISK_THRES_VAR] )
 +  else:
 +    return SALOME_BIG_OBJ_ON_DISK_THRES_DFT
 +
 +def ActivateProxyMecanismOrNot( sizeInByte ):
 +  thres = GetBigObjectOnDiskThreshold()
 +  if thres == -1:
 +    return False
 +  else:
 +    return sizeInByte > thres
 +
 +def GetBigObjectDirectory():
 +  import os
 +  if SALOME_FILE_BIG_OBJ_DIR not in os.environ:
 +    raise RuntimeError("An object of size higher than limit detected and no directory specified to dump it in file !")
 +  return os.path.expanduser( os.path.expandvars( os.environ[SALOME_FILE_BIG_OBJ_DIR] ) )
 +
 +def GetBigObjectFileName():
 +  """
 +  Return a filename in the most secure manner (see tempfile documentation)
 +  """
 +  import tempfile
 +  with tempfile.NamedTemporaryFile(dir=GetBigObjectDirectory(),prefix="mem_",suffix=".pckl") as f:
 +    ret = f.name
 +  return ret
 +
 +class BigObjectOnDiskBase:
 +  def __init__(self, fileName, objSerialized):
 +    """
 +    :param fileName: the file used to dump into.
 +    :param objSerialized: the object in pickeled form
 +    :type objSerialized: bytes
 +    """
 +    self._filename = fileName
 +    # attribute _destroy is here to tell client side or server side
 +    # only client side can be with _destroy set to True. server side due to risk of concurrency
 +    # so pickled form of self must be done with this attribute set to False.
 +    self._destroy = False
 +    self.__dumpIntoFile(objSerialized)
 +
 +  def getDestroyStatus(self):
 +    return self._destroy
 +
 +  def incrRef(self):
 +    if self._destroy:
 +      IncrRefInFile( self._filename )
 +    else:
 +      # should never happen !
 +      RuntimeError("Invalid call to incrRef !")
 +
 +  def decrRef(self):
 +    if self._destroy:
 +      DecrRefInFile( self._filename )
 +    else:
 +      # should never happen !
 +      RuntimeError("Invalid call to decrRef !")
 +
 +  def unlinkOnDestructor(self):
 +    self._destroy = True
 +
 +  def doNotTouchFile(self):
 +    """
 +    Method called slave side. The life cycle management of file is client side not slave side.
 +    """
 +    self._destroy = False
 +
 +  def __del__(self):
 +    if self._destroy:
 +      DecrRefInFile( self._filename )
 +
 +  def getFileName(self):
 +    return self._filename
 +  
 +  def __dumpIntoFile(self, objSerialized):
 +    DumpInFile( objSerialized, self._filename )
 +
 +  def get(self):
 +    obj, _ = GetObjectFromFile( self._filename )
 +    return obj
 +
 +  def __float__(self):
 +    return float( self.get() )
 +    
 +  def __int__(self):
 +    return int( self.get() )
 +    
 +  def __str__(self):
 +    obj = self.get()
 +    if isinstance(obj,str):
 +        return obj
 +    else:
 +        raise RuntimeError("Not a string")
 +      
 +class BigObjectOnDisk(BigObjectOnDiskBase):
 +  def __init__(self, fileName, objSerialized):
 +    BigObjectOnDiskBase.__init__(self, fileName, objSerialized)
 +    
 +class BigObjectOnDiskListElement(BigObjectOnDiskBase):
 +  def __init__(self, pos, length, fileName):
 +    self._filename = fileName
 +    self._destroy = False
 +    self._pos = pos
 +    self._length = length
 +
 +  def get(self):
 +    fullObj = BigObjectOnDiskBase.get(self)
 +    return fullObj[ self._pos ]
 +    
 +  def __getitem__(self, i):
 +    return self.get()[i]
 +
 +  def __len__(self):
 +    return len(self.get())
 +    
 +class BigObjectOnDiskSequence(BigObjectOnDiskBase):
 +  def __init__(self, length, fileName, objSerialized):
 +    BigObjectOnDiskBase.__init__(self, fileName, objSerialized)
 +    self._length = length
 +
 +  def __getitem__(self, i):
 +    return BigObjectOnDiskListElement(i, self._length, self.getFileName())
 +
 +  def __len__(self):
 +    return self._length
 +
 +class BigObjectOnDiskList(BigObjectOnDiskSequence):
 +  def __init__(self, length, fileName, objSerialized):
 +    BigObjectOnDiskSequence.__init__(self, length, fileName, objSerialized)
 +    
 +class BigObjectOnDiskTuple(BigObjectOnDiskSequence):
 +  def __init__(self, length, fileName, objSerialized):
 +    BigObjectOnDiskSequence.__init__(self, length, fileName, objSerialized)
 +
 +def SpoolPickleObject( obj ):
 +  import pickle
 +  pickleObjInit = pickle.dumps( obj , pickle.HIGHEST_PROTOCOL )
 +  if not ActivateProxyMecanismOrNot( len(pickleObjInit) ):
 +    return pickleObjInit
 +  else:
 +    if isinstance( obj, list):
 +      proxyObj = BigObjectOnDiskList( len(obj), GetBigObjectFileName() , pickleObjInit )
 +    elif isinstance( obj, tuple):
 +      proxyObj = BigObjectOnDiskTuple( len(obj), GetBigObjectFileName() , pickleObjInit )
 +    else:
 +      proxyObj = BigObjectOnDisk( GetBigObjectFileName() , pickleObjInit )
 +    pickleProxy = pickle.dumps( proxyObj , pickle.HIGHEST_PROTOCOL )
 +    return pickleProxy
 +
 +def UnProxyObjectSimple( obj ):
 +  """
 +  Method to be called in Remote mode. Alterate the obj _status attribute. 
 +  Because the slave process does not participate in the reference counting
 +  """
 +  if isinstance(obj,BigObjectOnDiskBase):
 +    obj.doNotTouchFile()
 +    return obj.get()
 +  elif isinstance( obj, list):
 +    retObj = []
 +    for elt in obj:
 +      retObj.append( UnProxyObjectSimple(elt) )
 +    return retObj
 +  else:
 +    return obj
 +
 +def UnProxyObjectSimpleLocal( obj ):
 +  """
 +  Method to be called in Local mode. Do not alterate the PyObj counter
 +  """
 +  if isinstance(obj,BigObjectOnDiskBase):
 +    return obj.get()
 +  elif isinstance( obj, list):
 +    retObj = []
 +    for elt in obj:
 +      retObj.append( UnProxyObjectSimpleLocal(elt) )
 +    return retObj
 +  else:
 +    return obj
 +    
 +class SeqByteReceiver:
 +  # 2GB limit to trigger split into chunks
 +  CHUNK_SIZE = 2000000000
 +  def __init__(self,sender):
 +    self._obj = sender
 +  def __del__(self):
 +    self._obj.UnRegister()
 +    pass
 +  def data(self):
 +    size = self._obj.getSize()
 +    if size <= SeqByteReceiver.CHUNK_SIZE:
 +      return self.fetchOneShot( size )
 +    else:
 +      return self.fetchByChunks( size )
 +  def fetchOneShot(self,size):
 +    return self._obj.sendPart(0,size)
 +  def fetchByChunks(self,size):
 +      """
 +      To avoid memory peak parts over 2GB are sent using EFF_CHUNK_SIZE size.
 +      """
 +      data_for_split_case = bytes(0)
 +      EFF_CHUNK_SIZE = SeqByteReceiver.CHUNK_SIZE // 8
 +      iStart = 0 ; iEnd = EFF_CHUNK_SIZE
 +      while iStart!=iEnd and iEnd <= size:
 +        part = self._obj.sendPart(iStart,iEnd)
 +        data_for_split_case = bytes(0).join( [data_for_split_case,part] )
 +        iStart = iEnd; iEnd = min(iStart + EFF_CHUNK_SIZE,size)
 +      return data_for_split_case
 +
  class PyScriptNode_i (Engines__POA.PyScriptNode,Generic):
    """The implementation of the PyScriptNode CORBA IDL that executes a script"""
    def __init__(self, nodeName,code,poa,my_container):
index fe3991a88affcfc8a69f2dbf6a958408e579da58,4c01605fac1760e4be19aac93233006a640b9dca..6eb94f8927c42ba8f13bb59eb2bcf805aace2181
@@@ -18,7 -18,7 +18,7 @@@
  #
  
  SET(LOCAL_TEST_DIR ${KERNEL_TEST_DIR}/Container)
- INSTALL(FILES testcontainer.py testProxy.py DESTINATION ${LOCAL_TEST_DIR})
 -INSTALL(FILES testcontainer.py testDataLimitation.py DESTINATION ${LOCAL_TEST_DIR})
++INSTALL(FILES testcontainer.py testProxy.py testDataLimitation.py DESTINATION ${LOCAL_TEST_DIR})
  
  INSTALL(FILES CTestTestfileInstall.cmake
          DESTINATION ${LOCAL_TEST_DIR}