From c3fd7aa926122dd7b8524631f1e4d8966905a607 Mon Sep 17 00:00:00 2001 From: Anthony Geay Date: Wed, 11 Sep 2024 22:04:38 +0200 Subject: [PATCH] [EDF30875] : Fix problem of COMM_FAILURE_WaitingForReply after 200 execs --- idl/SALOME_PyNode.idl | 3 ++- src/Basics/KernelBasis.i | 5 +++++ src/Container/SALOME_PyNode.py | 39 +++++++++++++++++++++++++--------- 3 files changed, 36 insertions(+), 11 deletions(-) diff --git a/idl/SALOME_PyNode.idl b/idl/SALOME_PyNode.idl index e0baf0131..2953a5142 100644 --- a/idl/SALOME_PyNode.idl +++ b/idl/SALOME_PyNode.idl @@ -114,7 +114,8 @@ module Engines interface ContextExchanger { SALOME::SenderByte getInputContext() raises (SALOME::SALOME_Exception); - void pushOutputContext( in SALOME::SenderByte ctx ) raises (SALOME::SALOME_Exception); + void pushOutputContext( in pickledArgs ctx ) raises (SALOME::SALOME_Exception); + void finishPushContext() raises (SALOME::SALOME_Exception); }; }; diff --git a/src/Basics/KernelBasis.i b/src/Basics/KernelBasis.i index 06e0e0e78..c94b1e92b 100644 --- a/src/Basics/KernelBasis.i +++ b/src/Basics/KernelBasis.i @@ -106,6 +106,11 @@ void WriteInStderr(const std::string& msg); %inline { +void IncrementRefCounter( PyObject * obj ) +{ + Py_XINCREF( obj ); +} + PyObject *HeatMarcelSwig(double timeAjustment, unsigned int nbThreads = 0) { double timeInS = 0.0; diff --git a/src/Container/SALOME_PyNode.py b/src/Container/SALOME_PyNode.py index 65eaec843..d38a351fd 100644 --- a/src/Container/SALOME_PyNode.py +++ b/src/Container/SALOME_PyNode.py @@ -863,6 +863,7 @@ from salome_utils import positionVerbosityOfLoggerRegardingState import Engines import salome import os +import sys salome.salome_init() from datetime import datetime SetVerbosityActivated( {} ) @@ -990,6 +991,7 @@ class ContextExchanger_i(Engines__POA.ContextExchanger): import salome self._poa = salome.orb.resolve_initial_references("RootPOA") self._in_ctx = inCtx + self._out_ctx = bytes(0) def getPOA(self): return self._poa @@ -997,27 +999,44 @@ class ContextExchanger_i(Engines__POA.ContextExchanger): def getInputContext(self): obj = SenderByte_i(self._poa, pickle.dumps( self._in_ctx ) ) ; id_o = self._poa.activate_object(obj) ; refPtr = self._poa.id_to_reference(id_o) return refPtr - + def pushOutputContext(self, ctx): - self._output_context = pickle.loads( SeqByteReceiver( ctx ).data() ) + try: + self._out_ctx += ctx + except Exception as e: + raise SALOME.SALOME_Exception( SALOME.ExceptionStruct(SALOME.INTERNAL_ERROR,str(e),"pushOutputContext",0) ) + + def finishPushContext(self): + try: + #raise RuntimeError(f"Anthony {dir(self)}") + self._output_context = pickle.loads( self._out_ctx ) + #del self._out_ctx + except Exception as e: + raise SALOME.SALOME_Exception( SALOME.ExceptionStruct(SALOME.INTERNAL_ERROR,str(e),"finishPushContext",0) ) def getOutputContext(self): return self._output_context class ExchangeContextUsingTCP( ExchangeContextBridgeAbs ): def buildContextPointEntry(self, caseDir, contextEntry): - import CORBA - self._orb = CORBA.ORB_init(['']) + import salome + salome.salome_init() + self._orb = salome.orb self._data_exchange_channel = self._orb.string_to_object( contextEntry ) return pickle.loads( SeqByteReceiver( self._data_exchange_channel.getInputContext() ).data() ) def pushContext(self, contextPointEntry, context): - poa = self._orb.resolve_initial_references("RootPOA") - pman = poa.the_POAManager - pman.activate() - obj = SenderByte_i(poa, pickle.dumps( context) ) ; id_o = poa.activate_object(obj) ; refPtr = poa.id_to_reference(id_o) - self._data_exchange_channel.pushOutputContext( refPtr ) - pman.deactivate(True,True) + ctxBytes = pickle.dumps( context ) + size = len( ctxBytes ) + if size <= SeqByteReceiver.CHUNK_SIZE: + self._data_exchange_channel.pushOutputContext( ctxBytes ) + else: + EFF_CHUNK_SIZE = SeqByteReceiver.CHUNK_SIZE // 8 + iStart = 0 ; iEnd = EFF_CHUNK_SIZE + while iStart!=iEnd and iEnd <= size: + self._data_exchange_channel.pushOutputContext( ctxBytes[iStart:iEnd] ) + iStart = iEnd; iEnd = min(iStart + EFF_CHUNK_SIZE,size) + self._data_exchange_channel.finishPushContext() def ExchangeModeServerSideFactory( exchangeMode ): if exchangeMode == "File": -- 2.39.2