Salome HOME
[EDF28562] : Possibility of exclusion output port of squeezeMemory mecanism in Python...
[modules/yacs.git] / src / runtime / PythonNode.cxx
index b72a37167e4cd57fce2e3f85ded3004de78535df..57a13a016448a21fbd9322d87bd9550823187f5d 100644 (file)
@@ -1,4 +1,4 @@
-// Copyright (C) 2006-2022  CEA/DEN, EDF R&D
+// Copyright (C) 2006-2023  CEA, EDF
 //
 // This library is free software; you can redistribute it and/or
 // modify it under the terms of the GNU Lesser General Public
@@ -27,6 +27,8 @@
 #include "SalomeHPContainer.hxx"
 #include "SalomeContainerTmpForHP.hxx"
 #include "ConversionException.hxx"
+#include "ReceiverFactory.hxx"
+#include "SenderByteImpl.hxx"
 
 #include "PyStdout.hxx"
 #include <iostream>
@@ -54,6 +56,8 @@ const char PythonEntry::SCRIPT_FOR_SIMPLE_SERIALIZATION[]="import pickle\n"
     "  return pickle.dumps(val,-1)\n"
     "\n";
 
+PyObject *PythonEntry::_pyClsBigObject = nullptr;
+
 const char PythonNode::IMPL_NAME[]="Python";
 const char PythonNode::KIND[]="Python";
 
@@ -77,6 +81,9 @@ const char PyFuncNode::SCRIPT_FOR_SERIALIZATION[]="import pickle\n"
     "  args=pickle.loads(st)\n"
     "  return args\n";
 
+static char SCRIPT_FOR_BIGOBJECT[]="import SALOME_PyNode\n"
+    "BigObjectOnDiskBase = SALOME_PyNode.BigObjectOnDiskBase\n";
+
 // pickle.load concurrency issue : see https://bugs.python.org/issue12680
 #if PY_VERSION_HEX < 0x03070000
 #include <mutex>
@@ -210,9 +217,15 @@ void PythonEntry::loadRemoteContext(InlineNode *reqNode, Engines::Container_ptr
         throw Exception("Error during load");
       }
     Py_DECREF(res); Py_DECREF(res2);
+    AutoPyRef res3(PyRun_String(SCRIPT_FOR_BIGOBJECT,Py_file_input,_context,_context));
     _pyfuncSer=PyDict_GetItemString(_context,"pickleForDistPyth2009");
     _pyfuncUnser=PyDict_GetItemString(_context,"unPickleForDistPyth2009");
     _pyfuncSimpleSer=PyDict_GetItemString(_context,"pickleForVarSimplePyth2009");
+    if(! _pyClsBigObject )
+    {
+      _pyClsBigObject=PyDict_GetItemString(_context,"BigObjectOnDiskBase");
+      Py_INCREF(_pyClsBigObject);
+    }
     if(_pyfuncSer == NULL)
       {
         std::string errorDetails;
@@ -315,7 +328,95 @@ bool PythonEntry::hasImposedResource()const
   return !_imposedResource.empty() && !_imposedContainer.empty();
 }
 
-PythonNode::PythonNode(const PythonNode& other, ComposedNode *father):InlineNode(other,father),_autoSqueeze(other._autoSqueeze)
+bool PythonEntry::IsProxy( PyObject *ob )
+{
+  if(!_pyClsBigObject)
+    return false;
+  if( PyObject_IsInstance( ob, _pyClsBigObject) == 1 )
+  {
+    return true;
+  }
+  else
+  {
+    if( PyList_Check( ob ) )
+    {
+      auto sz = PyList_Size( ob );
+      for( auto i = 0 ; i < sz ; ++i )
+      {
+        PyObject *elt = PyList_GetItem( ob, i );
+        if( PythonEntry::IsProxy(elt) )
+          return true;
+      }
+    }
+  }
+  return false;
+}
+
+bool PythonEntry::GetDestroyStatus( PyObject *ob )
+{
+  if(!_pyClsBigObject)
+    return false;
+  if( PyObject_IsInstance( ob, _pyClsBigObject) == 1 )
+  {
+    AutoPyRef unlinkOnDestructor = PyObject_GetAttrString(ob,"getDestroyStatus");
+    AutoPyRef tmp = PyObject_CallFunctionObjArgs(unlinkOnDestructor,nullptr);
+    if( PyBool_Check(tmp.get()) )
+    {
+      return tmp.get() == Py_True;
+    }
+    return false;
+  }
+  else
+  {
+    if( PyList_Check( ob ) )
+    {
+      auto sz = PyList_Size( ob );
+      for( auto i = 0 ; i < sz ; ++i )
+      {
+        PyObject *elt = PyList_GetItem( ob, i );
+        if( PythonEntry::GetDestroyStatus(elt) )
+          return true;
+      }
+    }
+  }
+  return false;
+}
+
+void PythonEntry::IfProxyDoSomething( PyObject *ob, const char *meth )
+{
+  if(!_pyClsBigObject)
+    return ;
+  if( PyObject_IsInstance( ob, _pyClsBigObject) == 1 )
+  {
+    AutoPyRef unlinkOnDestructor = PyObject_GetAttrString(ob,meth);
+    AutoPyRef tmp = PyObject_CallFunctionObjArgs(unlinkOnDestructor,nullptr);
+  }
+  else
+  {
+    if( PyList_Check( ob ) )
+    {
+      auto sz = PyList_Size( ob );
+      for( auto i = 0 ; i < sz ; ++i )
+      {
+        PyObject *elt = PyList_GetItem( ob, i );
+        PythonEntry::IfProxyDoSomething( elt, meth );
+      }
+    }
+  }
+}
+
+void PythonEntry::DoNotTouchFileIfProxy( PyObject *ob )
+{
+  IfProxyDoSomething(ob,"doNotTouchFile");
+}
+
+void PythonEntry::UnlinkOnDestructorIfProxy( PyObject *ob )
+{
+  IfProxyDoSomething(ob,"unlinkOnDestructor");
+}
+
+PythonNode::PythonNode(const PythonNode& other, ComposedNode *father):
+InlineNode(other,father),_autoSqueeze(other._autoSqueeze),_nonSqueezableOutputNodes(other._nonSqueezableOutputNodes)
 {
   _pynode = Engines::PyScriptNode::_nil();
   _implementation=IMPL_NAME;
@@ -417,8 +518,8 @@ void PythonNode::executeRemote()
       loadPythonAdapter(this,dummy);
       _pynode->assignNewCompiledCode(getScript().c_str());
     }
-  //
-  std::unique_ptr<Engines::pickledArgs> serializationInputCorba(new Engines::pickledArgs);
+  // not managed by unique_ptr here because destructed by the order of client.
+  SenderByteImpl *serializationInputCorba = nullptr;
   AutoPyRef serializationInput;
   {
 #if PY_VERSION_HEX < 0x03070000
@@ -446,12 +547,12 @@ void PythonNode::executeRemote()
       serializationInput.set(PyObject_CallFunctionObjArgs(_pyfuncSer,args,nullptr));
       Py_DECREF(args);
       //The pickled string may contain NULL characters so use PyString_AsStringAndSize
-      char *serializationInputC(0);
+      char *serializationInputC(nullptr);
       Py_ssize_t len;
       if (PyBytes_AsStringAndSize(serializationInput, &serializationInputC, &len))
         throw Exception("DistributedPythonNode problem in python pickle");
       // no copy here. The C byte array of Python is taken  as this into CORBA sequence to avoid copy
-      serializationInputCorba.reset(new Engines::pickledArgs(len,len,reinterpret_cast<CORBA::Octet *>(serializationInputC),0));
+      serializationInputCorba = new SenderByteImpl(serializationInputC,len);
   }
 
   //get the list of output argument names
@@ -472,14 +573,15 @@ void PythonNode::executeRemote()
   // Execute in remote Python node
   //===========================================================================
   DEBTRACE( "-----------------starting remote python invocation-----------------" );
-  std::unique_ptr<Engines::pickledArgs> resultCorba;
+  std::unique_ptr<SALOME::SenderByteSeq> resultCorba;
   try
     {
       //pass outargsname and dict serialized
-      _pynode->executeFirst(*(serializationInputCorba.get()));
+      SALOME::SenderByte_var serializationInputRef = serializationInputCorba->_this();
+      _pynode->executeFirst(serializationInputRef);
       //serializationInput and serializationInputCorba are no more needed for server. Release it.
-      serializationInputCorba.reset(nullptr); serializationInput.set(nullptr);
-      resultCorba.reset(_pynode->executeSecond(myseq));
+      serializationInput.set(nullptr);
+      resultCorba.reset( _pynode->executeSecond(myseq) );
     }
   catch( const SALOME::SALOME_Exception& ex )
     {
@@ -497,58 +599,78 @@ void PythonNode::executeRemote()
       _errorDetails=msg.str();
       throw Exception(msg.str());
     }
-//   if(!CORBA::is_nil(_pynode))
-//     {
-//       _pynode->UnRegister();
-//     }
-//   _pynode = Engines::PyScriptNode::_nil();
-//   //
-//   bool dummy;
-//   Engines::Container_var cont(GetContainerObj(this,dummy));
-//   cont->removePyScriptNode(getName().c_str());
+  catch(CORBA::COMM_FAILURE& ex)
+    {
+      std::ostringstream msg;
+      msg << "Exception on remote python invocation." << std::endl ;
+      msg << "Caught system exception COMM_FAILURE -- unable to contact the "
+          << "object." << std::endl;
+      _errorDetails=msg.str();
+      throw Exception(msg.str());
+    }
+  catch(CORBA::SystemException& ex)
+    {
+      std::ostringstream msg;
+      msg << "Exception on remote python invocation." << std::endl ;
+      msg << "Caught a CORBA::SystemException." ;
+      CORBA::Any tmp;
+      tmp <<= ex;
+      CORBA::TypeCode_var tc = tmp.type();
+      const char *p = tc->name();
+      if ( *p != '\0' )
+        msg <<p;
+      else
+        msg  << tc->id();
+      msg << std::endl;
+      _errorDetails=msg.str();
+      throw Exception(msg.str());
+    }
+  catch(CORBA::Exception& ex)
+    {
+      std::ostringstream msg;
+      msg << "Exception on remote python invocation." << std::endl ;
+      msg << "Caught CORBA::Exception. " ;
+      CORBA::Any tmp;
+      tmp <<= ex;
+      CORBA::TypeCode_var tc = tmp.type();
+      const char *p = tc->name();
+      if ( *p != '\0' )
+        msg <<p;
+      else
+        msg  << tc->id();
+      msg << std::endl;
+      _errorDetails=msg.str();
+      throw Exception(msg.str());
+    }
+  catch(omniORB::fatalException& fe)
+    {
+      std::ostringstream msg;
+      msg << "Exception on remote python invocation." << std::endl ;
+      msg << "Caught omniORB::fatalException:" << std::endl;
+      msg << "  file: " << fe.file() << std::endl;
+      msg << "  line: " << fe.line() << std::endl;
+      msg << "  mesg: " << fe.errmsg() << std::endl;
+      _errorDetails=msg.str();
+      throw Exception(msg.str());
+    }
   DEBTRACE( "-----------------end of remote python invocation-----------------" );
   //===========================================================================
   // Get results, unpickle and put them in output ports
   //===========================================================================
-  auto length(resultCorba->length());
-  char *resultCorbaC(reinterpret_cast<char *>(resultCorba->get_buffer()));
   {
 #if PY_VERSION_HEX < 0x03070000
       std::unique_lock<std::mutex> lock(data_mutex);
 #endif
       AutoGIL agil;
-      PyObject *args(0),*ob(0);
-      PyObject* resultPython=PyMemoryView_FromMemory(resultCorbaC,length,PyBUF_READ);
-      args = PyTuple_New(1);
-      PyTuple_SetItem(args,0,resultPython);
-      PyObject *finalResult=PyObject_CallObject(_pyfuncUnser,args);
-      resultCorba.reset(nullptr);
-      Py_DECREF(args);
-
-      if (finalResult == NULL)
-        {
-          std::stringstream msg;
-          msg << "Conversion with pickle of output ports failed !";
-          msg << " : " << __FILE__ << ":" << __LINE__;
-          _errorDetails=msg.str();
-          throw YACS::ENGINE::ConversionException(msg.str());
-        }
-
       DEBTRACE( "-----------------PythonNode::outputs-----------------" );
-      int nres=1;
-      if(finalResult == Py_None)
-        nres=0;
-      else if(PyTuple_Check(finalResult))
-        nres=PyTuple_Size(finalResult);
+      int nres( resultCorba->length() );
 
       if(getNumberOfOutputPorts() != nres)
         {
           std::string msg="Number of output arguments : Mismatch between definition and execution";
-          Py_DECREF(finalResult);
           _errorDetails=msg;
           throw Exception(msg);
         }
-
       pos=0;
       try
       {
@@ -558,19 +680,31 @@ void PythonNode::executeRemote()
               DEBTRACE( "port name: " << p->getName() );
               DEBTRACE( "port kind: " << p->edGetType()->kind() );
               DEBTRACE( "port pos : " << pos );
-              if(PyTuple_Check(finalResult))
-                ob=PyTuple_GetItem(finalResult,pos) ;
-              else
-                ob=finalResult;
-              DEBTRACE( "ob refcnt: " << ob->ob_refcnt );
-              p->put(ob);
+              SALOME::SenderByte_var elt = (*resultCorba)[pos];
+              SeqByteReceiver recv(elt);
+              unsigned long length = 0;
+              char *resultCorbaC = recv.data(length);
+              {
+                AutoPyRef resultPython=PyMemoryView_FromMemory(resultCorbaC,length,PyBUF_READ);
+                AutoPyRef args = PyTuple_New(1);
+                PyTuple_SetItem(args,0,resultPython.retn());
+                AutoPyRef ob = PyObject_CallObject(_pyfuncUnser,args);
+                if (!ob)
+                {
+                  std::stringstream msg;
+                  msg << "Conversion with pickle of output ports failed !";
+                  msg << " : " << __FILE__ << ":" << __LINE__;
+                  _errorDetails=msg.str();
+                  throw YACS::ENGINE::ConversionException(msg.str());
+                }
+                UnlinkOnDestructorIfProxy(ob);
+                p->put( ob );
+              }
               pos++;
             }
-          Py_DECREF(finalResult);
       }
       catch(ConversionException& ex)
       {
-          Py_DECREF(finalResult);
           _errorDetails=ex.what();
           throw;
       }
@@ -588,12 +722,57 @@ void PythonNode::executeRemote()
   DEBTRACE( "++++++++++++++ ENDOF PyNode::executeRemote: " << getName() << " ++++++++++++++++++++" );
 }
 
+void PythonNode::ExecuteLocalInternal(const std::string& codeStr, PyObject *context, std::string& errorDetails)
+{
+  DEBTRACE(  code );
+  DEBTRACE( "context refcnt: " << context->ob_refcnt );
+  std::ostringstream stream;
+  stream << "/tmp/PythonNode_";
+  stream << getpid();
+  AutoPyRef code=Py_CompileString(codeStr.c_str(), stream.str().c_str(), Py_file_input);
+  if(code == NULL)
+  {
+    errorDetails=""; 
+    AutoPyRef new_stderr = newPyStdOut(errorDetails);
+    PySys_SetObject((char*)"stderr", new_stderr);
+    PyErr_Print();
+    PySys_SetObject((char*)"stderr", PySys_GetObject((char*)"__stderr__"));
+    throw YACS::Exception("Error during execution");
+  }
+  {
+    AutoPyRef res = PyEval_EvalCode(  code, context, context);
+  }
+  DEBTRACE( "context refcnt: " << context->ob_refcnt );
+  fflush(stdout);
+  fflush(stderr);
+  if(PyErr_Occurred ())
+  {
+    errorDetails="";
+    AutoPyRef new_stderr = newPyStdOut(errorDetails);
+    PySys_SetObject((char*)"stderr", new_stderr);
+    ofstream errorfile(stream.str().c_str());
+    if (errorfile.is_open())
+      {
+        errorfile << codeStr;
+        errorfile.close();
+      }
+    PyErr_Print();
+    PySys_SetObject((char*)"stderr", PySys_GetObject((char*)"__stderr__"));
+    throw YACS::Exception("Error during execution");
+  }
+}
+
+void PythonNode::executeLocalInternal(const std::string& codeStr)
+{
+  ExecuteLocalInternal(codeStr,_context,_errorDetails);
+}
+
 void PythonNode::executeLocal()
 {
   DEBTRACE( "++++++++++++++ PyNode::executeLocal: " << getName() << " ++++++++++++++++++++" );
   {
     AutoGIL agil;
-
+    std::ostringstream unpxy; unpxy << "from SALOME_PyNode import UnProxyObjectSimpleLocal" << std::endl;
     DEBTRACE( "---------------PyNode::inputs---------------" );
     list<InputPort *>::iterator iter2;
     for(iter2 = _setOfInputPort.begin(); iter2 != _setOfInputPort.end(); iter2++)
@@ -602,7 +781,8 @@ void PythonNode::executeLocal()
         DEBTRACE( "port name: " << p->getName() );
         DEBTRACE( "port kind: " << p->edGetType()->kind() );
         PyObject* ob=p->getPyObj();
-        DEBTRACE( "ob refcnt: " << ob->ob_refcnt );
+        DEBTRACE( "ob refcnt: " << ob->ob_refcnt ); 
+        unpxy << p->getName() << " = UnProxyObjectSimpleLocal( " << p->getName() << " )" << std::endl;
 #ifdef _DEVDEBUG_
         PyObject_Print(ob,stderr,Py_PRINT_RAW);
         cerr << endl;
@@ -615,47 +795,11 @@ void PythonNode::executeLocal()
 
     //calculation
     DEBTRACE( "----------------PyNode::calculation---------------" );
-    DEBTRACE(  _script );
-    DEBTRACE( "_context refcnt: " << _context->ob_refcnt );
-
-    std::ostringstream stream;
-    stream << "/tmp/PythonNode_";
-    stream << getpid();
+  
+    if( ! getSqueezeStatus() )
+      executeLocalInternal( unpxy.str() );
 
-    PyObject* code=Py_CompileString(_script.c_str(), stream.str().c_str(), Py_file_input);
-    if(code == NULL)
-      {
-        _errorDetails=""; 
-        PyObject* new_stderr = newPyStdOut(_errorDetails);
-        PySys_SetObject((char*)"stderr", new_stderr);
-        PyErr_Print();
-        PySys_SetObject((char*)"stderr", PySys_GetObject((char*)"__stderr__"));
-        Py_DECREF(new_stderr);
-        throw Exception("Error during execution");
-      }
-    PyObject *res = PyEval_EvalCode(  code, _context, _context);
-
-    Py_DECREF(code);
-    Py_XDECREF(res);
-    DEBTRACE( "_context refcnt: " << _context->ob_refcnt );
-    fflush(stdout);
-    fflush(stderr);
-    if(PyErr_Occurred ())
-      {
-        _errorDetails="";
-        PyObject* new_stderr = newPyStdOut(_errorDetails);
-        PySys_SetObject((char*)"stderr", new_stderr);
-        ofstream errorfile(stream.str().c_str());
-        if (errorfile.is_open())
-          {
-            errorfile << _script;
-            errorfile.close();
-          }
-        PyErr_Print();
-        PySys_SetObject((char*)"stderr", PySys_GetObject((char*)"__stderr__"));
-        Py_DECREF(new_stderr);
-        throw Exception("Error during execution");
-      }
+    executeLocalInternal( _script );
 
     DEBTRACE( "-----------------PyNode::outputs-----------------" );
     list<OutputPort *>::iterator iter;
@@ -680,6 +824,8 @@ void PythonNode::executeLocal()
             cerr << endl;
 #endif
             p->put(ob);
+            if(!isUsingPythonCache())
+              PyDict_DelItemString(_context,p->getName().c_str());
           }
     }
     catch(ConversionException& ex)
@@ -690,10 +836,29 @@ void PythonNode::executeLocal()
     if(_autoSqueeze)
       squeezeMemory();
     DEBTRACE( "-----------------End PyNode::outputs-----------------" );
+    if(!isUsingPythonCache())
+    {
+      for(iter2 = _setOfInputPort.begin(); iter2 != _setOfInputPort.end(); iter2++)
+      {
+        AutoPyRef pStr = PyUnicode_FromString( (*iter2)->getName().c_str() );
+        if( PyDict_Contains(_context,pStr) == 1 )
+          { PyDict_DelItem(_context,pStr); }
+      }
+    }
   }
   DEBTRACE( "++++++++++++++ End PyNode::execute: " << getName() << " ++++++++++++++++++++" );
 }
 
+/*!
+ * [EDF28562]
+ * \param in squeezeExceptions list on output port name excluded from the squeeze mecanism
+ */
+void PythonNode::setSqueezeStatusWithExceptions(bool sqStatus, const std::vector<std::string>& squeezeExceptions)
+{
+  this->setSqueezeStatus(sqStatus);
+  this->_nonSqueezableOutputNodes = std::set<std::string>(squeezeExceptions.begin(), squeezeExceptions.end());
+}
+
 void PythonNode::squeezeMemorySafe()
 {
   AutoGIL agil;
@@ -714,6 +879,8 @@ void PythonNode::squeezeMemory()
     }
   for(auto p : _setOfOutputPort)
     {
+      if (!this->_nonSqueezableOutputNodes.empty() && this->_nonSqueezableOutputNodes.find(p->getName()) != this->_nonSqueezableOutputNodes.end())
+        continue;
       PyDict_DelItemString(_context,p->getName().c_str());
       OutputPyPort *p2(static_cast<OutputPyPort *>(p));
       p2->putWithoutForward(Py_None);
@@ -730,6 +897,8 @@ void PythonNode::squeezeMemoryRemote()
     }
   for(auto p : _setOfOutputPort)
     {
+      if (!this->_nonSqueezableOutputNodes.empty() && this->_nonSqueezableOutputNodes.find(p->getName()) != this->_nonSqueezableOutputNodes.end())
+        continue;
       OutputPyPort *p2(static_cast<OutputPyPort *>(p));
       p2->putWithoutForward(Py_None);
     }
@@ -1158,6 +1327,60 @@ void PyFuncNode::executeRemote()
       _errorDetails=msg;
       throw Exception(msg);
     }
+  catch(CORBA::COMM_FAILURE& ex)
+    {
+      std::ostringstream msg;
+      msg << "Exception on remote python invocation." << std::endl ;
+      msg << "Caught system exception COMM_FAILURE -- unable to contact the "
+          << "object." << std::endl;
+      _errorDetails=msg.str();
+      throw Exception(msg.str());
+    }
+  catch(CORBA::SystemException& ex)
+    {
+      std::ostringstream msg;
+      msg << "Exception on remote python invocation." << std::endl ;
+      msg << "Caught a CORBA::SystemException." ;
+      CORBA::Any tmp;
+      tmp <<= ex;
+      CORBA::TypeCode_var tc = tmp.type();
+      const char *p = tc->name();
+      if ( *p != '\0' )
+        msg <<p;
+      else
+        msg  << tc->id();
+      msg << std::endl;
+      _errorDetails=msg.str();
+      throw Exception(msg.str());
+    }
+  catch(CORBA::Exception& ex)
+    {
+      std::ostringstream msg;
+      msg << "Exception on remote python invocation." << std::endl ;
+      msg << "Caught CORBA::Exception. " ;
+      CORBA::Any tmp;
+      tmp <<= ex;
+      CORBA::TypeCode_var tc = tmp.type();
+      const char *p = tc->name();
+      if ( *p != '\0' )
+        msg <<p;
+      else
+        msg  << tc->id();
+      msg << std::endl;
+      _errorDetails=msg.str();
+      throw Exception(msg.str());
+    }
+  catch(omniORB::fatalException& fe)
+    {
+      std::ostringstream msg;
+      msg << "Exception on remote python invocation." << std::endl ;
+      msg << "Caught omniORB::fatalException:" << std::endl;
+      msg << "  file: " << fe.file() << std::endl;
+      msg << "  line: " << fe.line() << std::endl;
+      msg << "  mesg: " << fe.errmsg() << std::endl;
+      _errorDetails=msg.str();
+      throw Exception(msg.str());
+    }
   DEBTRACE( "-----------------end of remote python invocation-----------------" );
   //===========================================================================
   // Get results, unpickle and put them in output ports