]> SALOME platform Git repositories - modules/kernel.git/commitdiff
Salome HOME
[EDF26936] : End of the 2GB limit.
authorAnthony Geay <anthony.geay@edf.fr>
Fri, 10 Feb 2023 16:06:55 +0000 (17:06 +0100)
committerAnthony Geay <anthony.geay@edf.fr>
Mon, 13 Feb 2023 14:06:30 +0000 (15:06 +0100)
idl/SALOME_Comm.idl
idl/SALOME_PyNode.idl
src/Communication/CMakeLists.txt
src/Communication/ReceiverFactory.cxx
src/Communication/ReceiverFactory.hxx
src/Communication/SenderByteImpl.cxx [new file with mode: 0644]
src/Communication/SenderByteImpl.hxx [new file with mode: 0644]
src/Container/SALOME_PyNode.py
src/KERNEL_PY/__init__.py

index bf9f782b8c26d238aa25c273dcf10c29f9aa05e0..ee4c953413d5d255c87b88dca15a6f67cce06062 100644 (file)
@@ -23,6 +23,7 @@
 #ifndef _SALOME_COMM_IDL_
 #define _SALOME_COMM_IDL_
 
+#include "SALOME_GenericObj.idl"
 #include "SALOME_Exception.idl"
 
 /*!
@@ -40,6 +41,8 @@ module SALOME {
   
   typedef sequence<long> vectorOfLong;
   
+  typedef sequence<octet> vectorOfByte;
+  
   interface MultiCommClass {
     void setProtocol(in TypeOfCommunication typ);
   };
@@ -132,6 +135,12 @@ module SALOME {
     long getSizeOfColumn();
     void release();
   };
+
+  interface SenderByte : GenericObj
+  {
+    unsigned long long getSize();
+    vectorOfByte sendPart(in unsigned long long n1,in unsigned long long n2);
+  };
 };
 
 #endif
index 23a2b47094c378a722be28572cf31cf4048af1e7..3a974799548229ef4c0a7a2a857c3bfdbf9efc01 100644 (file)
@@ -26,6 +26,7 @@
 
 #include "SALOME_GenericObj.idl"
 #include "SALOME_Exception.idl"
+#include "SALOME_Comm.idl"
 
 /*! \file SALOME_PyNode.idl \brief interface for remote python execution
 */
@@ -91,11 +92,11 @@ module Engines
 
     /*! \brief first part of whole execute method. This split is to reduce the memory peak.
     */
-    void executeFirst(in pickledArgs inargs) raises (SALOME::SALOME_Exception);
+    void executeFirst(in SALOME::SenderByte inargs) raises (SALOME::SALOME_Exception);
 
     /*! \brief second and last part of execute method. This split is to reduce the memory peak.
     */
-    pickledArgs executeSecond(in listofstring outargsname) raises (SALOME::SALOME_Exception);
+    SALOME::SenderByte executeSecond(in listofstring outargsname) raises (SALOME::SALOME_Exception);
 
     pickledArgs getValueOfVarInContext(in string varName) raises (SALOME::SALOME_Exception);
 
index 3bb29256bb05231998292689076bf1e39e2dd434..038c4756d9a5875dbae973d0ad44b46603c10c18 100755 (executable)
@@ -26,6 +26,7 @@ INCLUDE_DIRECTORIES(
   ${CMAKE_CURRENT_SOURCE_DIR}/../Basics
   ${CMAKE_CURRENT_SOURCE_DIR}/../SALOMELocalTrace
   ${CMAKE_CURRENT_SOURCE_DIR}/../Utils
+  ${CMAKE_CURRENT_SOURCE_DIR}/../GenericObj
   ${PROJECT_BINARY_DIR}/idl
 )
 
@@ -33,6 +34,7 @@ SET(COMMON_LIBS
   OpUtil
   SALOMELocalTrace
   SalomeIDLKernel
+  SalomeGenericObj
   ${PYTHON_LIBRARIES}
   ${MPI_CXX_LIBRARIES}
 )
@@ -45,6 +47,7 @@ SET(SalomeCommunication_SOURCES
   SALOMEMultiComm.cxx
   ReceiverFactory.cxx
   MatrixClient.cxx
+  SenderByteImpl.cxx
 )
 
 ADD_DEFINITIONS(${OMNIORB_DEFINITIONS} ${MPI_DEFINITIONS})
@@ -64,6 +67,7 @@ SET(COMMON_HEADERS_HXX
   SALOME_Communication.hxx
   SALOME_Matrix_i.hxx
   SenderFactory.hxx
+  SenderByteImpl.hxx
 )
 
 INSTALL(FILES ${COMMON_HEADERS_HXX} DESTINATION ${SALOME_INSTALL_HEADERS})
index 5f354df8d5ac750d1c6e2b33172e2ba5319c5a9f..13cf127cd3c90a06aee8ca432ed4db104be0aa6e 100644 (file)
@@ -186,3 +186,47 @@ int *ReceiverFactory::getValueOneShot(SALOME::SenderInt_ptr sender,long &size)
     }
 }
 
+SeqByteReceiver::SeqByteReceiver(SALOME::SenderByte_ptr sender):_obj(SALOME::SenderByte::_duplicate(sender))
+{
+}
+
+char *SeqByteReceiver::data(unsigned long& size)
+{
+  size = _obj->getSize();
+  if(size <= CHUNK_SIZE)
+  {
+    this->fetchOneShot( size );
+    return reinterpret_cast<char *>(_data_one_shot->get_buffer());
+  }
+  else
+  {
+    this->fetchByChunks( size );
+    return _data_for_split_case.get();
+  }
+}
+
+void SeqByteReceiver::fetchOneShot(unsigned long size)
+{
+  _data_one_shot.reset( _obj->sendPart(0,size) );
+}
+
+void SeqByteReceiver::fetchByChunks(unsigned long size)
+{
+  _data_for_split_case.reset( new char[size] );
+  char *destination = _data_for_split_case.get();
+  constexpr unsigned long EFF_CHUNK_SIZE = CHUNK_SIZE / 8;
+  unsigned long iStart = 0;
+  unsigned long iEnd = EFF_CHUNK_SIZE;
+  while( iStart!=iEnd && iEnd <= size )
+  {
+    std::unique_ptr<SALOME::vectorOfByte> part( _obj->sendPart(iStart,iEnd) );
+    const unsigned char *partC = part->get_buffer();
+    std::copy(partC,partC+(iEnd-iStart),destination+iStart);
+    iStart = iEnd; iEnd = std::min(iStart + EFF_CHUNK_SIZE,size);
+  }
+}
+
+SeqByteReceiver::~SeqByteReceiver()
+{
+  _obj->UnRegister();
+}
index 03c8b873e441bb6a26d67975313f868deca4901a..c566ea9d057dc7fd3fda5522778a599435a7eee8 100644 (file)
@@ -43,5 +43,23 @@ private:
   static int *getValueOneShot(SALOME::SenderInt_ptr sender,long &size);
 };
 
+#include <memory>
+
+class COMMUNICATION_EXPORT SeqByteReceiver
+{
+public:
+  SeqByteReceiver(SALOME::SenderByte_ptr sender);
+  char *data(unsigned long& size);
+  ~SeqByteReceiver();
+private:
+  void fetchOneShot(unsigned long size);
+  void fetchByChunks(unsigned long size);
+private:
+  static constexpr unsigned long CHUNK_SIZE = 2000000000;
+  std::unique_ptr<char[]> _data_for_split_case;
+  std::unique_ptr<SALOME::vectorOfByte> _data_one_shot;
+  SALOME::SenderByte_var _obj;
+};
+
 #endif
 
diff --git a/src/Communication/SenderByteImpl.cxx b/src/Communication/SenderByteImpl.cxx
new file mode 100644 (file)
index 0000000..d79a6d2
--- /dev/null
@@ -0,0 +1,35 @@
+// Copyright (C) 2023  CEA/DEN, EDF R&D
+//
+// This library is free software; you can redistribute it and/or
+// modify it under the terms of the GNU Lesser General Public
+// License as published by the Free Software Foundation; either
+// version 2.1 of the License, or (at your option) any later version.
+//
+// This library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+// Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public
+// License along with this library; if not, write to the Free Software
+// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
+//
+// See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
+//
+
+#include "SenderByteImpl.hxx"
+
+CORBA::ULongLong SenderByteImpl::getSize()
+{
+  return _size;
+}
+
+SALOME::vectorOfByte *SenderByteImpl::sendPart(CORBA::ULongLong n1, CORBA::ULongLong n2)
+{
+  SALOME::vectorOfByte *ret = new SALOME::vectorOfByte;
+  CORBA::ULongLong retSize(n2-n1);
+  ret->length(retSize);
+  for(CORBA::ULongLong i = 0 ; i < retSize ; ++i)
+    (*ret)[i] = _data[n1+i];
+  return ret;
+}
diff --git a/src/Communication/SenderByteImpl.hxx b/src/Communication/SenderByteImpl.hxx
new file mode 100644 (file)
index 0000000..18b4a41
--- /dev/null
@@ -0,0 +1,42 @@
+// Copyright (C) 2023  CEA/DEN, EDF R&D
+//
+// This library is free software; you can redistribute it and/or
+// modify it under the terms of the GNU Lesser General Public
+// License as published by the Free Software Foundation; either
+// version 2.1 of the License, or (at your option) any later version.
+//
+// This library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+// Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public
+// License along with this library; if not, write to the Free Software
+// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
+//
+// See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
+//
+
+#pragma once
+
+#include "SALOME_Communication.hxx"
+
+#include <SALOMEconfig.h>
+#include CORBA_SERVER_HEADER(SALOME_Comm)
+#include "SALOME_GenericObj_i.hh"
+
+#include <cstddef>
+
+class COMMUNICATION_EXPORT SenderByteImpl : public virtual POA_SALOME::SenderByte,
+                                            public virtual PortableServer::ServantBase,
+                                            public virtual SALOME::GenericObj_i
+{
+public:
+  //! SenderByteImpl instance does not have ownership of data
+  SenderByteImpl(char *data, std::size_t size):_data(data),_size(size) { }
+  CORBA::ULongLong getSize() override;
+  SALOME::vectorOfByte *sendPart(CORBA::ULongLong n1, CORBA::ULongLong n2) override;
+private:
+  char *_data = nullptr;
+  std::size_t _size = 0;
+};
index 74bc948ee046553a2f03bc63f553e4441d787553..4279063d080f12b85465da4af398b256ea1c4f19 100644 (file)
@@ -103,6 +103,42 @@ class PyNode_i (Engines__POA.PyNode,Generic):
       l=traceback.format_exception(exc_typ,exc_val,exc_fr)
       raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyNode: %s, function: %s" % (self.nodeName,funcName),0))
 
+class SenderByte_i(SALOME__POA.SenderByte,Generic):
+  def __init__(self,poa,bytesToSend):
+    Generic.__init__(self,poa)
+    self.bytesToSend = bytesToSend
+
+  def getSize(self):
+    return len(self.bytesToSend)
+
+  def sendPart(self,n1,n2):
+    return self.bytesToSend[n1:n2]
+    
+class SeqByteReceiver:
+  CHUNK_SIZE = 2000000000
+  def __init__(self,sender):
+    self._obj = sender
+  def __del__(self):
+    self._obj.UnRegister()
+    pass
+  def data(self):
+    size = self._obj.getSize()
+    if size <= SeqByteReceiver.CHUNK_SIZE:
+      return self.fetchOneShot( size )
+    else:
+      return self.fetchByChunks( size )
+  def fetchOneShot(self,size):
+    return self._obj.sendPart(0,size)
+  def fetchByChunks(self,size):
+      data_for_split_case = bytes(0)
+      EFF_CHUNK_SIZE = SeqByteReceiver.CHUNK_SIZE // 8
+      iStart = 0 ; iEnd = EFF_CHUNK_SIZE
+      while iStart!=iEnd and iEnd <= size:
+        part = self._obj.sendPart(iStart,iEnd)
+        data_for_split_case = bytes(0).join( [data_for_split_case,part] )
+        iStart = iEnd; iEnd = min(iStart + EFF_CHUNK_SIZE,size)
+      return data_for_split_case
+
 class PyScriptNode_i (Engines__POA.PyScriptNode,Generic):
   """The implementation of the PyScriptNode CORBA IDL that executes a script"""
   def __init__(self, nodeName,code,poa,my_container):
@@ -166,7 +202,11 @@ class PyScriptNode_i (Engines__POA.PyScriptNode,Generic):
     """ Same than first part of self.execute to reduce memory peak."""
     import time
     try:
-      _,kws=pickle.loads(argsin)
+      data = None
+      if True: # to force call of SeqByteReceiver's destructor
+        argsInPy = SeqByteReceiver( argsin )
+        data = argsInPy.data()
+      _,kws=pickle.loads(data)
       self.context.update(kws)
     except Exception:
       exc_typ,exc_val,exc_fr=sys.exc_info()
@@ -183,7 +223,10 @@ class PyScriptNode_i (Engines__POA.PyScriptNode,Generic):
           raise KeyError("There is no variable %s in context" % arg)
         argsout.append(self.context[arg])
       argsout=pickle.dumps(tuple(argsout),-1)
-      return argsout
+      ret = SenderByte_i( self.poa,argsout )
+      id_o = self.poa.activate_object(ret)
+      retObj = self.poa.id_to_reference(id_o)
+      return retObj._narrow( SALOME.SenderByte )
     except Exception:
       exc_typ,exc_val,exc_fr=sys.exc_info()
       l=traceback.format_exception(exc_typ,exc_val,exc_fr)
index 8c05dbac6e1065e16d3dd9ddbceab1ebcf281c09..c3bd7fbec5ecb860d76ae1a1284d2092dc55e2e2 100644 (file)
@@ -411,9 +411,12 @@ def salome_shutdown_containers_with_session():
         ref_in_ns = "/".join(root+[cont_name])
         naming_service.Destroy_Name(ref_in_ns)
     print("Number of containers in NS after clean : {}".format( len( list(salome_walk_on_containers(ns,[""])) )))
+
+def retrieve_containers_in_ns():
+    return [elt for elt in naming_service.repr() if "/Containers/" == elt[:12]]
     
 def salome_shutdown_containers_without_session():
-    containersEntries = [elt for elt in naming_service.repr() if "/Containers/" == elt[:12]]
+    containersEntries = retrieve_containers_in_ns()
     for containerEntry in containersEntries:
         cont = naming_service.Resolve(containerEntry)
         try: