From 1ad5432b9f694b2fb12b653b4b289f0374f952ee Mon Sep 17 00:00:00 2001 From: Anthony Geay Date: Fri, 10 Feb 2023 17:06:55 +0100 Subject: [PATCH] [EDF26936] : End of the 2GB limit. --- idl/SALOME_Comm.idl | 9 +++++ idl/SALOME_PyNode.idl | 5 +-- src/Communication/CMakeLists.txt | 4 +++ src/Communication/ReceiverFactory.cxx | 44 +++++++++++++++++++++++++ src/Communication/ReceiverFactory.hxx | 18 ++++++++++ src/Communication/SenderByteImpl.cxx | 35 ++++++++++++++++++++ src/Communication/SenderByteImpl.hxx | 42 ++++++++++++++++++++++++ src/Container/SALOME_PyNode.py | 47 +++++++++++++++++++++++++-- src/KERNEL_PY/__init__.py | 5 ++- 9 files changed, 204 insertions(+), 5 deletions(-) create mode 100644 src/Communication/SenderByteImpl.cxx create mode 100644 src/Communication/SenderByteImpl.hxx diff --git a/idl/SALOME_Comm.idl b/idl/SALOME_Comm.idl index bf9f782b8..ee4c95341 100644 --- a/idl/SALOME_Comm.idl +++ b/idl/SALOME_Comm.idl @@ -23,6 +23,7 @@ #ifndef _SALOME_COMM_IDL_ #define _SALOME_COMM_IDL_ +#include "SALOME_GenericObj.idl" #include "SALOME_Exception.idl" /*! @@ -40,6 +41,8 @@ module SALOME { typedef sequence vectorOfLong; + typedef sequence vectorOfByte; + interface MultiCommClass { void setProtocol(in TypeOfCommunication typ); }; @@ -132,6 +135,12 @@ module SALOME { long getSizeOfColumn(); void release(); }; + + interface SenderByte : GenericObj + { + unsigned long long getSize(); + vectorOfByte sendPart(in unsigned long long n1,in unsigned long long n2); + }; }; #endif diff --git a/idl/SALOME_PyNode.idl b/idl/SALOME_PyNode.idl index 23a2b4709..3a9747995 100644 --- a/idl/SALOME_PyNode.idl +++ b/idl/SALOME_PyNode.idl @@ -26,6 +26,7 @@ #include "SALOME_GenericObj.idl" #include "SALOME_Exception.idl" +#include "SALOME_Comm.idl" /*! \file SALOME_PyNode.idl \brief interface for remote python execution */ @@ -91,11 +92,11 @@ module Engines /*! \brief first part of whole execute method. This split is to reduce the memory peak. */ - void executeFirst(in pickledArgs inargs) raises (SALOME::SALOME_Exception); + void executeFirst(in SALOME::SenderByte inargs) raises (SALOME::SALOME_Exception); /*! \brief second and last part of execute method. This split is to reduce the memory peak. */ - pickledArgs executeSecond(in listofstring outargsname) raises (SALOME::SALOME_Exception); + SALOME::SenderByte executeSecond(in listofstring outargsname) raises (SALOME::SALOME_Exception); pickledArgs getValueOfVarInContext(in string varName) raises (SALOME::SALOME_Exception); diff --git a/src/Communication/CMakeLists.txt b/src/Communication/CMakeLists.txt index 3bb29256b..038c4756d 100755 --- a/src/Communication/CMakeLists.txt +++ b/src/Communication/CMakeLists.txt @@ -26,6 +26,7 @@ INCLUDE_DIRECTORIES( ${CMAKE_CURRENT_SOURCE_DIR}/../Basics ${CMAKE_CURRENT_SOURCE_DIR}/../SALOMELocalTrace ${CMAKE_CURRENT_SOURCE_DIR}/../Utils + ${CMAKE_CURRENT_SOURCE_DIR}/../GenericObj ${PROJECT_BINARY_DIR}/idl ) @@ -33,6 +34,7 @@ SET(COMMON_LIBS OpUtil SALOMELocalTrace SalomeIDLKernel + SalomeGenericObj ${PYTHON_LIBRARIES} ${MPI_CXX_LIBRARIES} ) @@ -45,6 +47,7 @@ SET(SalomeCommunication_SOURCES SALOMEMultiComm.cxx ReceiverFactory.cxx MatrixClient.cxx + SenderByteImpl.cxx ) ADD_DEFINITIONS(${OMNIORB_DEFINITIONS} ${MPI_DEFINITIONS}) @@ -64,6 +67,7 @@ SET(COMMON_HEADERS_HXX SALOME_Communication.hxx SALOME_Matrix_i.hxx SenderFactory.hxx + SenderByteImpl.hxx ) INSTALL(FILES ${COMMON_HEADERS_HXX} DESTINATION ${SALOME_INSTALL_HEADERS}) diff --git a/src/Communication/ReceiverFactory.cxx b/src/Communication/ReceiverFactory.cxx index 5f354df8d..13cf127cd 100644 --- a/src/Communication/ReceiverFactory.cxx +++ b/src/Communication/ReceiverFactory.cxx @@ -186,3 +186,47 @@ int *ReceiverFactory::getValueOneShot(SALOME::SenderInt_ptr sender,long &size) } } +SeqByteReceiver::SeqByteReceiver(SALOME::SenderByte_ptr sender):_obj(SALOME::SenderByte::_duplicate(sender)) +{ +} + +char *SeqByteReceiver::data(unsigned long& size) +{ + size = _obj->getSize(); + if(size <= CHUNK_SIZE) + { + this->fetchOneShot( size ); + return reinterpret_cast(_data_one_shot->get_buffer()); + } + else + { + this->fetchByChunks( size ); + return _data_for_split_case.get(); + } +} + +void SeqByteReceiver::fetchOneShot(unsigned long size) +{ + _data_one_shot.reset( _obj->sendPart(0,size) ); +} + +void SeqByteReceiver::fetchByChunks(unsigned long size) +{ + _data_for_split_case.reset( new char[size] ); + char *destination = _data_for_split_case.get(); + constexpr unsigned long EFF_CHUNK_SIZE = CHUNK_SIZE / 8; + unsigned long iStart = 0; + unsigned long iEnd = EFF_CHUNK_SIZE; + while( iStart!=iEnd && iEnd <= size ) + { + std::unique_ptr part( _obj->sendPart(iStart,iEnd) ); + const unsigned char *partC = part->get_buffer(); + std::copy(partC,partC+(iEnd-iStart),destination+iStart); + iStart = iEnd; iEnd = std::min(iStart + EFF_CHUNK_SIZE,size); + } +} + +SeqByteReceiver::~SeqByteReceiver() +{ + _obj->UnRegister(); +} diff --git a/src/Communication/ReceiverFactory.hxx b/src/Communication/ReceiverFactory.hxx index 03c8b873e..c566ea9d0 100644 --- a/src/Communication/ReceiverFactory.hxx +++ b/src/Communication/ReceiverFactory.hxx @@ -43,5 +43,23 @@ private: static int *getValueOneShot(SALOME::SenderInt_ptr sender,long &size); }; +#include + +class COMMUNICATION_EXPORT SeqByteReceiver +{ +public: + SeqByteReceiver(SALOME::SenderByte_ptr sender); + char *data(unsigned long& size); + ~SeqByteReceiver(); +private: + void fetchOneShot(unsigned long size); + void fetchByChunks(unsigned long size); +private: + static constexpr unsigned long CHUNK_SIZE = 2000000000; + std::unique_ptr _data_for_split_case; + std::unique_ptr _data_one_shot; + SALOME::SenderByte_var _obj; +}; + #endif diff --git a/src/Communication/SenderByteImpl.cxx b/src/Communication/SenderByteImpl.cxx new file mode 100644 index 000000000..d79a6d276 --- /dev/null +++ b/src/Communication/SenderByteImpl.cxx @@ -0,0 +1,35 @@ +// Copyright (C) 2023 CEA/DEN, EDF R&D +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Lesser General Public +// License as published by the Free Software Foundation; either +// version 2.1 of the License, or (at your option) any later version. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public +// License along with this library; if not, write to the Free Software +// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +// +// See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com +// + +#include "SenderByteImpl.hxx" + +CORBA::ULongLong SenderByteImpl::getSize() +{ + return _size; +} + +SALOME::vectorOfByte *SenderByteImpl::sendPart(CORBA::ULongLong n1, CORBA::ULongLong n2) +{ + SALOME::vectorOfByte *ret = new SALOME::vectorOfByte; + CORBA::ULongLong retSize(n2-n1); + ret->length(retSize); + for(CORBA::ULongLong i = 0 ; i < retSize ; ++i) + (*ret)[i] = _data[n1+i]; + return ret; +} diff --git a/src/Communication/SenderByteImpl.hxx b/src/Communication/SenderByteImpl.hxx new file mode 100644 index 000000000..18b4a41fa --- /dev/null +++ b/src/Communication/SenderByteImpl.hxx @@ -0,0 +1,42 @@ +// Copyright (C) 2023 CEA/DEN, EDF R&D +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Lesser General Public +// License as published by the Free Software Foundation; either +// version 2.1 of the License, or (at your option) any later version. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public +// License along with this library; if not, write to the Free Software +// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +// +// See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com +// + +#pragma once + +#include "SALOME_Communication.hxx" + +#include +#include CORBA_SERVER_HEADER(SALOME_Comm) +#include "SALOME_GenericObj_i.hh" + +#include + +class COMMUNICATION_EXPORT SenderByteImpl : public virtual POA_SALOME::SenderByte, + public virtual PortableServer::ServantBase, + public virtual SALOME::GenericObj_i +{ +public: + //! SenderByteImpl instance does not have ownership of data + SenderByteImpl(char *data, std::size_t size):_data(data),_size(size) { } + CORBA::ULongLong getSize() override; + SALOME::vectorOfByte *sendPart(CORBA::ULongLong n1, CORBA::ULongLong n2) override; +private: + char *_data = nullptr; + std::size_t _size = 0; +}; diff --git a/src/Container/SALOME_PyNode.py b/src/Container/SALOME_PyNode.py index 74bc948ee..4279063d0 100644 --- a/src/Container/SALOME_PyNode.py +++ b/src/Container/SALOME_PyNode.py @@ -103,6 +103,42 @@ class PyNode_i (Engines__POA.PyNode,Generic): l=traceback.format_exception(exc_typ,exc_val,exc_fr) raise SALOME.SALOME_Exception(SALOME.ExceptionStruct(SALOME.BAD_PARAM,"".join(l),"PyNode: %s, function: %s" % (self.nodeName,funcName),0)) +class SenderByte_i(SALOME__POA.SenderByte,Generic): + def __init__(self,poa,bytesToSend): + Generic.__init__(self,poa) + self.bytesToSend = bytesToSend + + def getSize(self): + return len(self.bytesToSend) + + def sendPart(self,n1,n2): + return self.bytesToSend[n1:n2] + +class SeqByteReceiver: + CHUNK_SIZE = 2000000000 + def __init__(self,sender): + self._obj = sender + def __del__(self): + self._obj.UnRegister() + pass + def data(self): + size = self._obj.getSize() + if size <= SeqByteReceiver.CHUNK_SIZE: + return self.fetchOneShot( size ) + else: + return self.fetchByChunks( size ) + def fetchOneShot(self,size): + return self._obj.sendPart(0,size) + def fetchByChunks(self,size): + data_for_split_case = bytes(0) + EFF_CHUNK_SIZE = SeqByteReceiver.CHUNK_SIZE // 8 + iStart = 0 ; iEnd = EFF_CHUNK_SIZE + while iStart!=iEnd and iEnd <= size: + part = self._obj.sendPart(iStart,iEnd) + data_for_split_case = bytes(0).join( [data_for_split_case,part] ) + iStart = iEnd; iEnd = min(iStart + EFF_CHUNK_SIZE,size) + return data_for_split_case + class PyScriptNode_i (Engines__POA.PyScriptNode,Generic): """The implementation of the PyScriptNode CORBA IDL that executes a script""" def __init__(self, nodeName,code,poa,my_container): @@ -166,7 +202,11 @@ class PyScriptNode_i (Engines__POA.PyScriptNode,Generic): """ Same than first part of self.execute to reduce memory peak.""" import time try: - _,kws=pickle.loads(argsin) + data = None + if True: # to force call of SeqByteReceiver's destructor + argsInPy = SeqByteReceiver( argsin ) + data = argsInPy.data() + _,kws=pickle.loads(data) self.context.update(kws) except Exception: exc_typ,exc_val,exc_fr=sys.exc_info() @@ -183,7 +223,10 @@ class PyScriptNode_i (Engines__POA.PyScriptNode,Generic): raise KeyError("There is no variable %s in context" % arg) argsout.append(self.context[arg]) argsout=pickle.dumps(tuple(argsout),-1) - return argsout + ret = SenderByte_i( self.poa,argsout ) + id_o = self.poa.activate_object(ret) + retObj = self.poa.id_to_reference(id_o) + return retObj._narrow( SALOME.SenderByte ) except Exception: exc_typ,exc_val,exc_fr=sys.exc_info() l=traceback.format_exception(exc_typ,exc_val,exc_fr) diff --git a/src/KERNEL_PY/__init__.py b/src/KERNEL_PY/__init__.py index f675cf5ea..bdbf1001d 100644 --- a/src/KERNEL_PY/__init__.py +++ b/src/KERNEL_PY/__init__.py @@ -410,9 +410,12 @@ def salome_shutdown_containers_with_session(): ref_in_ns = "/".join(root+[cont_name]) naming_service.Destroy_Name(ref_in_ns) print("Number of containers in NS after clean : {}".format( len( list(salome_walk_on_containers(ns,[""])) ))) + +def retrieve_containers_in_ns(): + return [elt for elt in naming_service.repr() if "/Containers/" == elt[:12]] def salome_shutdown_containers_without_session(): - containersEntries = [elt for elt in naming_service.repr() if "/Containers/" == elt[:12]] + containersEntries = retrieve_containers_in_ns() for containerEntry in containersEntries: cont = naming_service.Resolve(containerEntry) try: -- 2.39.2