Salome HOME
pickle.load concurrency issue
[modules/yacs.git] / src / runtime / PythonNode.cxx
index a7b850dbbd15eaf3468ed83bf665a95d17566234..66c7aae24dfb4ceaad737d3e8a0aebc1756049fe 100644 (file)
@@ -1,4 +1,4 @@
-// Copyright (C) 2006-2019  CEA/DEN, EDF R&D
+// Copyright (C) 2006-2021  CEA/DEN, EDF R&D
 //
 // This library is free software; you can redistribute it and/or
 // modify it under the terms of the GNU Lesser General Public
@@ -21,7 +21,7 @@
 #include "PythonNode.hxx"
 #include "PythonPorts.hxx"
 #include "TypeCode.hxx"
-#include "AutoGIL.hxx"
+#include "PythonCppUtils.hxx"
 #include "Container.hxx"
 #include "SalomeContainer.hxx"
 #include "SalomeHPContainer.hxx"
@@ -30,6 +30,7 @@
 
 #include "PyStdout.hxx"
 #include <iostream>
+#include <memory>
 #include <sstream>
 #include <fstream>
 
@@ -76,6 +77,12 @@ const char PyFuncNode::SCRIPT_FOR_SERIALIZATION[]="import pickle\n"
     "  args=pickle.loads(st)\n"
     "  return args\n";
 
+// pickle.load concurrency issue : see https://bugs.python.org/issue12680
+#if PY_VERSION_HEX < 0x03070000
+#include <mutex>
+static std::mutex data_mutex;
+#endif
+
 PythonEntry::PythonEntry():_context(0),_pyfuncSer(0),_pyfuncUnser(0),_pyfuncSimpleSer(0)
 {
 }
@@ -96,22 +103,22 @@ void PythonEntry::loadRemoteContainer(InlineNode *reqNode)
   bool isContAlreadyStarted(false);
   if(container)
     {
-      isContAlreadyStarted=container->isAlreadyStarted(reqNode);
-      if(!isContAlreadyStarted)
+      try
+      {
+        if(hasImposedResource())
+          container->start(reqNode, _imposedResource, _imposedContainer);
+        else
         {
-          try
-          {
-            if(hasImposedResource())
-              container->start(reqNode, _imposedResource, _imposedContainer);
-            else
-              container->start(reqNode);
-          }
-          catch(Exception& e)
-          {
-              reqNode->setErrorDetails(e.what());
-              throw e;
-          }
+          isContAlreadyStarted=container->isAlreadyStarted(reqNode);
+          if(!isContAlreadyStarted)
+            container->start(reqNode);
         }
+      }
+      catch(Exception& e)
+      {
+          reqNode->setErrorDetails(e.what());
+          throw e;
+      }
     }
   else
     {
@@ -184,6 +191,9 @@ void PythonEntry::loadRemoteContext(InlineNode *reqNode, Engines::Container_ptr
   Engines::PyNodeBase_var pynode(getRemoteInterpreterHandle());
   ///
   {
+#if PY_VERSION_HEX < 0x03070000
+    std::unique_lock<std::mutex> lock(data_mutex);
+#endif
     AutoGIL agil;
     const char *picklizeScript(getSerializationScript());
     PyObject *res=PyRun_String(picklizeScript,Py_file_input,_context,_context);
@@ -341,13 +351,10 @@ PythonNode::PythonNode(const std::string& name):InlineNode(name)
 
 PythonNode::~PythonNode()
 {
-  if(!CORBA::is_nil(_pynode))
-    {
-      _pynode->UnRegister();
-    }
+  freeKernelPynode();
 }
 
-void PythonNode::checkBasicConsistency() const 
+void PythonNode::checkBasicConsistency() const
 {
   DEBTRACE("checkBasicConsistency");
   InlineNode::checkBasicConsistency();
@@ -411,8 +418,12 @@ void PythonNode::executeRemote()
       _pynode->assignNewCompiledCode(getScript().c_str());
     }
   //
-  Engines::pickledArgs_var serializationInputCorba(new Engines::pickledArgs);
+  std::unique_ptr<Engines::pickledArgs> serializationInputCorba(new Engines::pickledArgs);
+  AutoPyRef serializationInput;
   {
+#if PY_VERSION_HEX < 0x03070000
+      std::unique_lock<std::mutex> lock(data_mutex);
+#endif
       AutoGIL agil;
       PyObject *args(0),*ob(0);
       //===========================================================================
@@ -432,17 +443,15 @@ void PythonNode::executeRemote()
       PyObject_Print(args,stderr,Py_PRINT_RAW);
       std::cerr << endl;
 #endif
-      PyObject *serializationInput(PyObject_CallFunctionObjArgs(_pyfuncSer,args,NULL));
+      serializationInput.set(PyObject_CallFunctionObjArgs(_pyfuncSer,args,nullptr));
       Py_DECREF(args);
       //The pickled string may contain NULL characters so use PyString_AsStringAndSize
       char *serializationInputC(0);
       Py_ssize_t len;
       if (PyBytes_AsStringAndSize(serializationInput, &serializationInputC, &len))
         throw Exception("DistributedPythonNode problem in python pickle");
-      serializationInputCorba->length(len);
-      for(int i=0; i < len ; i++)
-        serializationInputCorba[i]=serializationInputC[i];
-      Py_DECREF(serializationInput);
+      // no copy here. The C byte array of Python is taken  as this into CORBA sequence to avoid copy
+      serializationInputCorba.reset(new Engines::pickledArgs(len,len,reinterpret_cast<CORBA::Octet *>(serializationInputC),0));
   }
 
   //get the list of output argument names
@@ -463,37 +472,57 @@ void PythonNode::executeRemote()
   // Execute in remote Python node
   //===========================================================================
   DEBTRACE( "-----------------starting remote python invocation-----------------" );
-  Engines::pickledArgs_var resultCorba;
+  std::unique_ptr<Engines::pickledArgs> resultCorba;
   try
     {
       //pass outargsname and dict serialized
-      resultCorba=_pynode->execute(myseq,serializationInputCorba);
+      _pynode->executeFirst(*(serializationInputCorba.get()));
+      //serializationInput and serializationInputCorba are no more needed for server. Release it.
+      serializationInputCorba.reset(nullptr); serializationInput.set(nullptr);
+      resultCorba.reset(_pynode->executeSecond(myseq));
     }
   catch( const SALOME::SALOME_Exception& ex )
     {
-      std::string msg="Exception on remote python invocation";
-      msg += '\n';
-      msg += ex.details.text.in();
-      _errorDetails=msg;
-      throw Exception(msg);
+      std::ostringstream msg; msg << "Exception on remote python invocation" << std::endl << ex.details.text.in() << std::endl;
+      msg << "PyScriptNode CORBA ref : ";
+      {
+        CORBA::ORB_ptr orb(getSALOMERuntime()->getOrb());
+        if(!CORBA::is_nil(orb))
+        {
+          CORBA::String_var IOR(orb->object_to_string(_pynode));
+          msg << IOR;
+        }
+      }
+      msg << std::endl;
+      _errorDetails=msg.str();
+      throw Exception(msg.str());
     }
+//   if(!CORBA::is_nil(_pynode))
+//     {
+//       _pynode->UnRegister();
+//     }
+//   _pynode = Engines::PyScriptNode::_nil();
+//   //
+//   bool dummy;
+//   Engines::Container_var cont(GetContainerObj(this,dummy));
+//   cont->removePyScriptNode(getName().c_str());
   DEBTRACE( "-----------------end of remote python invocation-----------------" );
   //===========================================================================
   // Get results, unpickle and put them in output ports
   //===========================================================================
-  char *resultCorbaC=new char[resultCorba->length()+1];
-  resultCorbaC[resultCorba->length()]='\0';
-  for(int i=0;i<resultCorba->length();i++)
-    resultCorbaC[i]=resultCorba[i];
-
+  auto length(resultCorba->length());
+  char *resultCorbaC(reinterpret_cast<char *>(resultCorba->get_buffer()));
   {
+#if PY_VERSION_HEX < 0x03070000
+      std::unique_lock<std::mutex> lock(data_mutex);
+#endif
       AutoGIL agil;
       PyObject *args(0),*ob(0);
-      PyObject* resultPython=PyBytes_FromStringAndSize(resultCorbaC,resultCorba->length());
-      delete [] resultCorbaC;
+      PyObject* resultPython=PyMemoryView_FromMemory(resultCorbaC,length,PyBUF_READ);
       args = PyTuple_New(1);
       PyTuple_SetItem(args,0,resultPython);
       PyObject *finalResult=PyObject_CallObject(_pyfuncUnser,args);
+      resultCorba.reset(nullptr);
       Py_DECREF(args);
 
       if (finalResult == NULL)
@@ -551,11 +580,7 @@ void PythonNode::executeRemote()
   //
   if(!isUsingPythonCache())
   {
-    if(!CORBA::is_nil(_pynode))
-      {
-        _pynode->UnRegister();
-      }
-    _pynode = Engines::PyScriptNode::_nil();
+    freeKernelPynode();
     bool dummy;
     Engines::Container_var cont(GetContainerObj(this,dummy));
     cont->removePyScriptNode(getName().c_str());
@@ -721,8 +746,7 @@ void PythonNode::shutdown(int level)
   if(_mode=="local")return;
   if(_container)
     {
-      if(!CORBA::is_nil(_pynode)) _pynode->UnRegister();
-      _pynode=Engines::PyScriptNode::_nil();
+      freeKernelPynode();
       _container->shutdown(level);
     }
 }
@@ -742,6 +766,11 @@ bool PythonNode::canAcceptImposedResource()
   return _container != nullptr && _container->canAcceptImposedResource();
 }
 
+bool PythonNode::hasImposedResource()const
+{
+  return PythonEntry::hasImposedResource();
+}
+
 std::string PythonNode::pythonEntryName()const
 {
   if(isUsingPythonCache())
@@ -758,6 +787,22 @@ bool PythonNode::isUsingPythonCache()const
   return found;
 }
 
+void PythonNode::freeKernelPynode()
+{
+  if(!CORBA::is_nil(_pynode))
+  {
+    try
+    {
+      _pynode->UnRegister();
+    }
+    catch(...)
+    {
+      DEBTRACE("Trouble when pynode->UnRegister!")
+    }
+    _pynode = Engines::PyScriptNode::_nil();
+  }
+}
+
 Node *PythonNode::simpleClone(ComposedNode *father, bool editionOnly) const
 {
   return new PythonNode(*this,father);
@@ -765,8 +810,7 @@ Node *PythonNode::simpleClone(ComposedNode *father, bool editionOnly) const
 
 void PythonNode::createRemoteAdaptedPyInterpretor(Engines::Container_ptr objContainer)
 {
-  if(!CORBA::is_nil(_pynode))
-    _pynode->UnRegister();
+  freeKernelPynode();
   _pynode=objContainer->createPyScriptNode(pythonEntryName().c_str(),getScript().c_str());
   _pynode->Register();
 }
@@ -790,7 +834,7 @@ void PythonNode::assignRemotePyInterpretor(Engines::PyNodeBase_var remoteInterp)
     Engines::PyScriptNode_var tmpp(Engines::PyScriptNode::_narrow(remoteInterp));
     if(!_pynode->_is_equivalent(tmpp))
     {
-      _pynode->UnRegister();
+      freeKernelPynode();
       _pynode=Engines::PyScriptNode::_narrow(remoteInterp);
     }
   }
@@ -930,7 +974,7 @@ void PyFuncNode::init(bool start)
     setState(YACS::TORECONNECT);
 }
 
-void PyFuncNode::checkBasicConsistency() const 
+void PyFuncNode::checkBasicConsistency() const
 {
   DEBTRACE("checkBasicConsistency");
   InlineFuncNode::checkBasicConsistency();
@@ -1062,6 +1106,9 @@ void PyFuncNode::executeRemote()
   //
   Engines::pickledArgs_var serializationInputCorba(new Engines::pickledArgs);;
   {
+#if PY_VERSION_HEX < 0x03070000
+      std::unique_lock<std::mutex> lock(data_mutex);
+#endif
       AutoGIL agil;
       PyObject *ob(0);
       //===========================================================================
@@ -1121,6 +1168,9 @@ void PyFuncNode::executeRemote()
     resultCorbaC[i]=resultCorba[i];
 
   {
+#if PY_VERSION_HEX < 0x03070000
+      std::unique_lock<std::mutex> lock(data_mutex);
+#endif
       AutoGIL agil;
 
       PyObject *resultPython(PyBytes_FromStringAndSize(resultCorbaC,resultCorba->length()));
@@ -1382,3 +1432,9 @@ bool PyFuncNode::canAcceptImposedResource()
 {
   return _container != nullptr && _container->canAcceptImposedResource();
 }
+
+bool PyFuncNode::hasImposedResource()const
+{
+  return PythonEntry::hasImposedResource();
+}
+