Salome HOME
pickle.load concurrency issue
[modules/yacs.git] / src / runtime / PythonNode.cxx
index 02e8e302c98f6c22a7b3d1ba24c8b5300898bc0b..66c7aae24dfb4ceaad737d3e8a0aebc1756049fe 100644 (file)
@@ -1,4 +1,4 @@
-// Copyright (C) 2006-2016  CEA/DEN, EDF R&D
+// Copyright (C) 2006-2021  CEA/DEN, EDF R&D
 //
 // This library is free software; you can redistribute it and/or
 // modify it under the terms of the GNU Lesser General Public
@@ -21,7 +21,7 @@
 #include "PythonNode.hxx"
 #include "PythonPorts.hxx"
 #include "TypeCode.hxx"
-#include "AutoGIL.hxx"
+#include "PythonCppUtils.hxx"
 #include "Container.hxx"
 #include "SalomeContainer.hxx"
 #include "SalomeHPContainer.hxx"
@@ -30,6 +30,7 @@
 
 #include "PyStdout.hxx"
 #include <iostream>
+#include <memory>
 #include <sstream>
 #include <fstream>
 
@@ -76,6 +77,11 @@ const char PyFuncNode::SCRIPT_FOR_SERIALIZATION[]="import pickle\n"
     "  args=pickle.loads(st)\n"
     "  return args\n";
 
+// pickle.load concurrency issue : see https://bugs.python.org/issue12680
+#if PY_VERSION_HEX < 0x03070000
+#include <mutex>
+static std::mutex data_mutex;
+#endif
 
 PythonEntry::PythonEntry():_context(0),_pyfuncSer(0),_pyfuncUnser(0),_pyfuncSimpleSer(0)
 {
@@ -90,26 +96,29 @@ PythonEntry::~PythonEntry()
   Py_XDECREF(_context);
 }
 
-void PythonEntry::commonRemoteLoadPart1(InlineNode *reqNode)
+void PythonEntry::loadRemoteContainer(InlineNode *reqNode)
 {
   DEBTRACE( "---------------PythonEntry::CommonRemoteLoad function---------------" );
   Container *container(reqNode->getContainer());
   bool isContAlreadyStarted(false);
   if(container)
     {
-      isContAlreadyStarted=container->isAlreadyStarted(reqNode);
-      if(!isContAlreadyStarted)
+      try
+      {
+        if(hasImposedResource())
+          container->start(reqNode, _imposedResource, _imposedContainer);
+        else
         {
-          try
-          {
-              container->start(reqNode);
-          }
-          catch(Exception& e)
-          {
-              reqNode->setErrorDetails(e.what());
-              throw e;
-          }
+          isContAlreadyStarted=container->isAlreadyStarted(reqNode);
+          if(!isContAlreadyStarted)
+            container->start(reqNode);
         }
+      }
+      catch(Exception& e)
+      {
+          reqNode->setErrorDetails(e.what());
+          throw e;
+      }
     }
   else
     {
@@ -120,43 +129,47 @@ void PythonEntry::commonRemoteLoadPart1(InlineNode *reqNode)
     }
 }
 
-Engines::Container_var PythonEntry::commonRemoteLoadPart2(InlineNode *reqNode, bool& isInitializeRequested)
+Engines::Container_var GetContainerObj(InlineNode *reqNode, bool& isStandardCont)
 {
+  isStandardCont = false;
   Container *container(reqNode->getContainer());
-  Engines::Container_var objContainer=Engines::Container::_nil();
+  Engines::Container_var objContainer(Engines::Container::_nil());
   if(!container)
-    throw Exception("No container specified !");
+    throw YACS::Exception("No container specified !");
   SalomeContainer *containerCast0(dynamic_cast<SalomeContainer *>(container));
   SalomeHPContainer *containerCast1(dynamic_cast<SalomeHPContainer *>(container));
   if(containerCast0)
-    objContainer=containerCast0->getContainerPtr(reqNode);
+    {
+      isStandardCont = true;
+      objContainer=containerCast0->getContainerPtr(reqNode);
+    }
   else if(containerCast1)
     {
       YACS::BASES::AutoCppPtr<SalomeContainerTmpForHP> tmpCont(SalomeContainerTmpForHP::BuildFrom(containerCast1,reqNode));
       objContainer=tmpCont->getContainerPtr(reqNode);
     }
   else
-    throw Exception("Unrecognized type of container ! Salome one is expected for PythonNode/PyFuncNode !");
+    throw YACS::Exception("Unrecognized type of container ! Salome one is expected for PythonNode/PyFuncNode !");
   if(CORBA::is_nil(objContainer))
-    throw Exception("Container corba pointer is NULL for PythonNode !");
+    throw YACS::Exception("Container corba pointer is NULL for PythonNode !");
+  return objContainer;
+}
+
+Engines::Container_var PythonEntry::loadPythonAdapter(InlineNode *reqNode, bool& isInitializeRequested)
+{
+  bool isStandardCont(true);
+  Engines::Container_var objContainer(GetContainerObj(reqNode,isStandardCont));
   isInitializeRequested=false;
   try
   {
-      if(containerCast0)
-        {
-          createRemoteAdaptedPyInterpretor(objContainer);
-        }
-      else
-        {
-          Engines::PyNodeBase_var dftPyScript(retrieveDftRemotePyInterpretorIfAny(objContainer));
-          if(CORBA::is_nil(dftPyScript))
-            {
-              isInitializeRequested=true;
-              createRemoteAdaptedPyInterpretor(objContainer);
-            }
-          else
-            assignRemotePyInterpretor(dftPyScript);
-        }
+    Engines::PyNodeBase_var dftPyScript(retrieveDftRemotePyInterpretorIfAny(objContainer));
+    if(CORBA::is_nil(dftPyScript))
+    {
+      isInitializeRequested=!isStandardCont;
+      createRemoteAdaptedPyInterpretor(objContainer);
+    }
+    else
+      assignRemotePyInterpretor(dftPyScript);
   }
   catch( const SALOME::SALOME_Exception& ex )
   {
@@ -172,12 +185,15 @@ Engines::Container_var PythonEntry::commonRemoteLoadPart2(InlineNode *reqNode, b
   return objContainer;
 }
 
-void PythonEntry::commonRemoteLoadPart3(InlineNode *reqNode, Engines::Container_ptr objContainer, bool isInitializeRequested)
+void PythonEntry::loadRemoteContext(InlineNode *reqNode, Engines::Container_ptr objContainer, bool isInitializeRequested)
 {
   Container *container(reqNode->getContainer());
   Engines::PyNodeBase_var pynode(getRemoteInterpreterHandle());
   ///
   {
+#if PY_VERSION_HEX < 0x03070000
+    std::unique_lock<std::mutex> lock(data_mutex);
+#endif
     AutoGIL agil;
     const char *picklizeScript(getSerializationScript());
     PyObject *res=PyRun_String(picklizeScript,Py_file_input,_context,_context);
@@ -288,14 +304,20 @@ std::string PythonEntry::GetContainerLog(const std::string& mode, Container *con
 
 void PythonEntry::commonRemoteLoad(InlineNode *reqNode)
 {
-  commonRemoteLoadPart1(reqNode);
+  loadRemoteContainer(reqNode);
   bool isInitializeRequested;
-  Engines::Container_var objContainer(commonRemoteLoadPart2(reqNode,isInitializeRequested));
-  commonRemoteLoadPart3(reqNode,objContainer,isInitializeRequested);
+  Engines::Container_var objContainer(loadPythonAdapter(reqNode,isInitializeRequested));
+  loadRemoteContext(reqNode,objContainer,isInitializeRequested);
+}
+
+bool PythonEntry::hasImposedResource()const
+{
+  return !_imposedResource.empty() && !_imposedContainer.empty();
 }
 
-PythonNode::PythonNode(const PythonNode& other, ComposedNode *father):InlineNode(other,father)
+PythonNode::PythonNode(const PythonNode& other, ComposedNode *father):InlineNode(other,father),_autoSqueeze(other._autoSqueeze)
 {
+  _pynode = Engines::PyScriptNode::_nil();
   _implementation=IMPL_NAME;
   {
     AutoGIL agil;
@@ -312,6 +334,7 @@ PythonNode::PythonNode(const PythonNode& other, ComposedNode *father):InlineNode
 
 PythonNode::PythonNode(const std::string& name):InlineNode(name)
 {
+  _pynode = Engines::PyScriptNode::_nil();
   _implementation=IMPL_NAME;
   {
     AutoGIL agil;
@@ -328,13 +351,10 @@ PythonNode::PythonNode(const std::string& name):InlineNode(name)
 
 PythonNode::~PythonNode()
 {
-  if(!CORBA::is_nil(_pynode))
-    {
-      _pynode->UnRegister();
-    }
+  freeKernelPynode();
 }
 
-void PythonNode::checkBasicConsistency() const throw(YACS::Exception)
+void PythonNode::checkBasicConsistency() const
 {
   DEBTRACE("checkBasicConsistency");
   InlineNode::checkBasicConsistency();
@@ -389,17 +409,21 @@ void PythonNode::executeRemote()
 {
   DEBTRACE( "++++++++++++++ PyNode::executeRemote: " << getName() << " ++++++++++++++++++++" );
   if(!_pyfuncSer)
-    throw Exception("DistributedPythonNode badly loaded");
+    throw Exception("PythonNode badly loaded");
   //
   if(dynamic_cast<HomogeneousPoolContainer *>(getContainer()))
     {
       bool dummy;
-      commonRemoteLoadPart2(this,dummy);
+      loadPythonAdapter(this,dummy);
       _pynode->assignNewCompiledCode(getScript().c_str());
     }
   //
-  Engines::pickledArgs_var serializationInputCorba(new Engines::pickledArgs);
+  std::unique_ptr<Engines::pickledArgs> serializationInputCorba(new Engines::pickledArgs);
+  AutoPyRef serializationInput;
   {
+#if PY_VERSION_HEX < 0x03070000
+      std::unique_lock<std::mutex> lock(data_mutex);
+#endif
       AutoGIL agil;
       PyObject *args(0),*ob(0);
       //===========================================================================
@@ -419,17 +443,15 @@ void PythonNode::executeRemote()
       PyObject_Print(args,stderr,Py_PRINT_RAW);
       std::cerr << endl;
 #endif
-      PyObject *serializationInput(PyObject_CallFunctionObjArgs(_pyfuncSer,args,NULL));
+      serializationInput.set(PyObject_CallFunctionObjArgs(_pyfuncSer,args,nullptr));
       Py_DECREF(args);
       //The pickled string may contain NULL characters so use PyString_AsStringAndSize
       char *serializationInputC(0);
       Py_ssize_t len;
       if (PyBytes_AsStringAndSize(serializationInput, &serializationInputC, &len))
         throw Exception("DistributedPythonNode problem in python pickle");
-      serializationInputCorba->length(len);
-      for(int i=0; i < len ; i++)
-        serializationInputCorba[i]=serializationInputC[i];
-      Py_DECREF(serializationInput);
+      // no copy here. The C byte array of Python is taken  as this into CORBA sequence to avoid copy
+      serializationInputCorba.reset(new Engines::pickledArgs(len,len,reinterpret_cast<CORBA::Octet *>(serializationInputC),0));
   }
 
   //get the list of output argument names
@@ -450,37 +472,57 @@ void PythonNode::executeRemote()
   // Execute in remote Python node
   //===========================================================================
   DEBTRACE( "-----------------starting remote python invocation-----------------" );
-  Engines::pickledArgs_var resultCorba;
+  std::unique_ptr<Engines::pickledArgs> resultCorba;
   try
     {
       //pass outargsname and dict serialized
-      resultCorba=_pynode->execute(myseq,serializationInputCorba);
+      _pynode->executeFirst(*(serializationInputCorba.get()));
+      //serializationInput and serializationInputCorba are no more needed for server. Release it.
+      serializationInputCorba.reset(nullptr); serializationInput.set(nullptr);
+      resultCorba.reset(_pynode->executeSecond(myseq));
     }
   catch( const SALOME::SALOME_Exception& ex )
     {
-      std::string msg="Exception on remote python invocation";
-      msg += '\n';
-      msg += ex.details.text.in();
-      _errorDetails=msg;
-      throw Exception(msg);
+      std::ostringstream msg; msg << "Exception on remote python invocation" << std::endl << ex.details.text.in() << std::endl;
+      msg << "PyScriptNode CORBA ref : ";
+      {
+        CORBA::ORB_ptr orb(getSALOMERuntime()->getOrb());
+        if(!CORBA::is_nil(orb))
+        {
+          CORBA::String_var IOR(orb->object_to_string(_pynode));
+          msg << IOR;
+        }
+      }
+      msg << std::endl;
+      _errorDetails=msg.str();
+      throw Exception(msg.str());
     }
+//   if(!CORBA::is_nil(_pynode))
+//     {
+//       _pynode->UnRegister();
+//     }
+//   _pynode = Engines::PyScriptNode::_nil();
+//   //
+//   bool dummy;
+//   Engines::Container_var cont(GetContainerObj(this,dummy));
+//   cont->removePyScriptNode(getName().c_str());
   DEBTRACE( "-----------------end of remote python invocation-----------------" );
   //===========================================================================
   // Get results, unpickle and put them in output ports
   //===========================================================================
-  char *resultCorbaC=new char[resultCorba->length()+1];
-  resultCorbaC[resultCorba->length()]='\0';
-  for(int i=0;i<resultCorba->length();i++)
-    resultCorbaC[i]=resultCorba[i];
-
+  auto length(resultCorba->length());
+  char *resultCorbaC(reinterpret_cast<char *>(resultCorba->get_buffer()));
   {
+#if PY_VERSION_HEX < 0x03070000
+      std::unique_lock<std::mutex> lock(data_mutex);
+#endif
       AutoGIL agil;
       PyObject *args(0),*ob(0);
-      PyObject* resultPython=PyBytes_FromStringAndSize(resultCorbaC,resultCorba->length());
-      delete [] resultCorbaC;
+      PyObject* resultPython=PyMemoryView_FromMemory(resultCorbaC,length,PyBUF_READ);
       args = PyTuple_New(1);
       PyTuple_SetItem(args,0,resultPython);
       PyObject *finalResult=PyObject_CallObject(_pyfuncUnser,args);
+      resultCorba.reset(nullptr);
       Py_DECREF(args);
 
       if (finalResult == NULL)
@@ -532,6 +574,16 @@ void PythonNode::executeRemote()
           _errorDetails=ex.what();
           throw;
       }
+      if(_autoSqueeze)
+        squeezeMemoryRemote();
+  }
+  //
+  if(!isUsingPythonCache())
+  {
+    freeKernelPynode();
+    bool dummy;
+    Engines::Container_var cont(GetContainerObj(this,dummy));
+    cont->removePyScriptNode(getName().c_str());
   }
   DEBTRACE( "++++++++++++++ ENDOF PyNode::executeRemote: " << getName() << " ++++++++++++++++++++" );
 }
@@ -635,12 +687,54 @@ void PythonNode::executeLocal()
         _errorDetails=ex.what();
         throw;
     }
-
+    if(_autoSqueeze)
+      squeezeMemory();
     DEBTRACE( "-----------------End PyNode::outputs-----------------" );
   }
   DEBTRACE( "++++++++++++++ End PyNode::execute: " << getName() << " ++++++++++++++++++++" );
 }
 
+void PythonNode::squeezeMemorySafe()
+{
+  AutoGIL agil;
+  if(_mode==PythonNode::REMOTE_NAME)
+    this->squeezeMemoryRemote();
+  else
+    this->squeezeMemory();
+}
+  
+void PythonNode::squeezeMemory()
+{
+  for(auto p : _setOfInputPort)
+    {
+      PyDict_DelItemString(_context,p->getName().c_str());
+      InputPyPort *p2(static_cast<InputPyPort *>(p));
+      if(p2->canSafelySqueezeMemory())
+        p2->put(Py_None);
+    }
+  for(auto p : _setOfOutputPort)
+    {
+      PyDict_DelItemString(_context,p->getName().c_str());
+      OutputPyPort *p2(static_cast<OutputPyPort *>(p));
+      p2->putWithoutForward(Py_None);
+    }
+}
+
+void PythonNode::squeezeMemoryRemote()
+{
+  for(auto p : _setOfInputPort)
+    {
+      InputPyPort *p2(static_cast<InputPyPort *>(p));
+      if(p2->canSafelySqueezeMemory())
+        p2->put(Py_None);
+    }
+  for(auto p : _setOfOutputPort)
+    {
+      OutputPyPort *p2(static_cast<OutputPyPort *>(p));
+      p2->putWithoutForward(Py_None);
+    }
+}
+
 std::string PythonNode::getContainerLog()
 {
   return PythonEntry::GetContainerLog(_mode,_container,this);
@@ -652,12 +746,63 @@ void PythonNode::shutdown(int level)
   if(_mode=="local")return;
   if(_container)
     {
-      if(!CORBA::is_nil(_pynode)) _pynode->UnRegister();
-      _pynode=Engines::PyScriptNode::_nil();
+      freeKernelPynode();
       _container->shutdown(level);
     }
 }
 
+void PythonNode::imposeResource(const std::string& resource_name,
+                                const std::string& container_name)
+{
+  if(!resource_name.empty() && !container_name.empty())
+  {
+    _imposedResource = resource_name;
+    _imposedContainer = container_name;
+  }
+}
+
+bool PythonNode::canAcceptImposedResource()
+{
+  return _container != nullptr && _container->canAcceptImposedResource();
+}
+
+bool PythonNode::hasImposedResource()const
+{
+  return PythonEntry::hasImposedResource();
+}
+
+std::string PythonNode::pythonEntryName()const
+{
+  if(isUsingPythonCache())
+    return "DEFAULT_NAME_FOR_UNIQUE_PYTHON_NODE_ENTRY";
+  else
+    return getName();
+}
+
+bool PythonNode::isUsingPythonCache()const
+{
+  bool found = false;
+  if(_container)
+    found = _container->isUsingPythonCache();
+  return found;
+}
+
+void PythonNode::freeKernelPynode()
+{
+  if(!CORBA::is_nil(_pynode))
+  {
+    try
+    {
+      _pynode->UnRegister();
+    }
+    catch(...)
+    {
+      DEBTRACE("Trouble when pynode->UnRegister!")
+    }
+    _pynode = Engines::PyScriptNode::_nil();
+  }
+}
+
 Node *PythonNode::simpleClone(ComposedNode *father, bool editionOnly) const
 {
   return new PythonNode(*this,father);
@@ -665,14 +810,14 @@ Node *PythonNode::simpleClone(ComposedNode *father, bool editionOnly) const
 
 void PythonNode::createRemoteAdaptedPyInterpretor(Engines::Container_ptr objContainer)
 {
-  if(!CORBA::is_nil(_pynode))
-    _pynode->UnRegister();
-  _pynode=objContainer->createPyScriptNode(getName().c_str(),getScript().c_str());
+  freeKernelPynode();
+  _pynode=objContainer->createPyScriptNode(pythonEntryName().c_str(),getScript().c_str());
+  _pynode->Register();
 }
 
 Engines::PyNodeBase_var PythonNode::retrieveDftRemotePyInterpretorIfAny(Engines::Container_ptr objContainer) const
 {
-  Engines::PyScriptNode_var ret(objContainer->getDefaultPyScriptNode(getName().c_str()));
+  Engines::PyScriptNode_var ret(objContainer->getDefaultPyScriptNode(pythonEntryName().c_str()));
   if(!CORBA::is_nil(ret))
     {
       ret->Register();
@@ -682,15 +827,18 @@ Engines::PyNodeBase_var PythonNode::retrieveDftRemotePyInterpretorIfAny(Engines:
 
 void PythonNode::assignRemotePyInterpretor(Engines::PyNodeBase_var remoteInterp)
 {
-  if(!CORBA::is_nil(_pynode))
+  if(CORBA::is_nil(_pynode))
+    _pynode=Engines::PyScriptNode::_narrow(remoteInterp);
+  else
+  {
+    Engines::PyScriptNode_var tmpp(Engines::PyScriptNode::_narrow(remoteInterp));
+    if(!_pynode->_is_equivalent(tmpp))
     {
-      Engines::PyScriptNode_var tmpp(Engines::PyScriptNode::_narrow(remoteInterp));
-      if(_pynode->_is_equivalent(tmpp))
-        return ;
+      freeKernelPynode();
+      _pynode=Engines::PyScriptNode::_narrow(remoteInterp);
     }
-  if(!CORBA::is_nil(_pynode))
-    _pynode->UnRegister();
-  _pynode=Engines::PyScriptNode::_narrow(remoteInterp);
+  }
+  _pynode->assignNewCompiledCode(getScript().c_str());
 }
 
 Engines::PyNodeBase_var PythonNode::getRemoteInterpreterHandle()
@@ -737,7 +885,7 @@ void PythonNode::applyDPLScope(ComposedNode *gfn)
       {
         const std::pair<std::string,int>& p(ret[i]);
         PyObject *elt(PyTuple_New(2));
-        PyTuple_SetItem(elt,0,PyBytes_FromString(p.first.c_str()));
+        PyTuple_SetItem(elt,0,PyUnicode_FromString(p.first.c_str()));
         PyTuple_SetItem(elt,1,PyLong_FromLong(p.second));
         PyList_SetItem(ob,i,elt);
       }
@@ -826,7 +974,7 @@ void PyFuncNode::init(bool start)
     setState(YACS::TORECONNECT);
 }
 
-void PyFuncNode::checkBasicConsistency() const throw(YACS::Exception)
+void PyFuncNode::checkBasicConsistency() const
 {
   DEBTRACE("checkBasicConsistency");
   InlineFuncNode::checkBasicConsistency();
@@ -952,12 +1100,15 @@ void PyFuncNode::executeRemote()
   if(dynamic_cast<HomogeneousPoolContainer *>(getContainer()))
     {
       bool dummy;
-      commonRemoteLoadPart2(this,dummy);
+      loadPythonAdapter(this,dummy);
       _pynode->executeAnotherPieceOfCode(getScript().c_str());
     }
   //
   Engines::pickledArgs_var serializationInputCorba(new Engines::pickledArgs);;
   {
+#if PY_VERSION_HEX < 0x03070000
+      std::unique_lock<std::mutex> lock(data_mutex);
+#endif
       AutoGIL agil;
       PyObject *ob(0);
       //===========================================================================
@@ -1017,6 +1168,9 @@ void PyFuncNode::executeRemote()
     resultCorbaC[i]=resultCorba[i];
 
   {
+#if PY_VERSION_HEX < 0x03070000
+      std::unique_lock<std::mutex> lock(data_mutex);
+#endif
       AutoGIL agil;
 
       PyObject *resultPython(PyBytes_FromStringAndSize(resultCorbaC,resultCorba->length()));
@@ -1264,3 +1418,23 @@ void PyFuncNode::shutdown(int level)
     }
 }
 
+void PyFuncNode::imposeResource(const std::string& resource_name,
+                                const std::string& container_name)
+{
+  if(!resource_name.empty() && !container_name.empty())
+  {
+    _imposedResource = resource_name;
+    _imposedContainer = container_name;
+  }
+}
+
+bool PyFuncNode::canAcceptImposedResource()
+{
+  return _container != nullptr && _container->canAcceptImposedResource();
+}
+
+bool PyFuncNode::hasImposedResource()const
+{
+  return PythonEntry::hasImposedResource();
+}
+