Salome HOME
Work in progress : workload manager step 2
[modules/yacs.git] / src / runtime / PythonNode.cxx
index e8cbc26248a5a640f154355bd4dfc214cc51af0e..4f7bd51170de67de5fb60448908a9dd2479b0840 100644 (file)
@@ -1,4 +1,4 @@
-// Copyright (C) 2006-2014  CEA/DEN, EDF R&D
+// Copyright (C) 2006-2019  CEA/DEN, EDF R&D
 //
 // This library is free software; you can redistribute it and/or
 // modify it under the terms of the GNU Lesser General Public
@@ -48,26 +48,36 @@ typedef int Py_ssize_t;
 using namespace YACS::ENGINE;
 using namespace std;
 
+const char PythonEntry::SCRIPT_FOR_SIMPLE_SERIALIZATION[]="import pickle\n"
+    "def pickleForVarSimplePyth2009(val):\n"
+    "  return pickle.dumps(val,-1)\n"
+    "\n";
+
 const char PythonNode::IMPL_NAME[]="Python";
 const char PythonNode::KIND[]="Python";
 
-const char PythonNode::SCRIPT_FOR_SERIALIZATION[]="import cPickle\n"
+const char PythonNode::SCRIPT_FOR_SERIALIZATION[]="import pickle\n"
     "def pickleForDistPyth2009(kws):\n"
-    "  return cPickle.dumps(((),kws),-1)\n"
+    "  return pickle.dumps(((),kws),-1)\n"
     "\n"
     "def unPickleForDistPyth2009(st):\n"
-    "  args=cPickle.loads(st)\n"
+    "  args=pickle.loads(st)\n"
     "  return args\n";
 
-const char PyFuncNode::SCRIPT_FOR_SERIALIZATION[]="import cPickle\n"
+const char PythonNode::REMOTE_NAME[]="remote";
+
+const char PythonNode::DPL_INFO_NAME[]="my_dpl_localization";
+
+const char PyFuncNode::SCRIPT_FOR_SERIALIZATION[]="import pickle\n"
     "def pickleForDistPyth2009(*args,**kws):\n"
-    "  return cPickle.dumps((args,kws),-1)\n"
+    "  return pickle.dumps((args,kws),-1)\n"
     "\n"
     "def unPickleForDistPyth2009(st):\n"
-    "  args=cPickle.loads(st)\n"
+    "  args=pickle.loads(st)\n"
     "  return args\n";
 
-PythonEntry::PythonEntry():_context(0),_pyfuncSer(0),_pyfuncUnser(0)
+
+PythonEntry::PythonEntry():_context(0),_pyfuncSer(0),_pyfuncUnser(0),_pyfuncSimpleSer(0)
 {
 }
 
@@ -92,6 +102,9 @@ void PythonEntry::commonRemoteLoadPart1(InlineNode *reqNode)
         {
           try
           {
+            if(!_imposedResource.empty() && !_imposedContainer.empty())
+              container->start(reqNode, _imposedResource, _imposedContainer);
+            else
               container->start(reqNode);
           }
           catch(Exception& e)
@@ -110,29 +123,40 @@ void PythonEntry::commonRemoteLoadPart1(InlineNode *reqNode)
     }
 }
 
-Engines::Container_var PythonEntry::commonRemoteLoadPart2(InlineNode *reqNode, bool& isInitializeRequested)
+Engines::Container_var GetContainerObj(InlineNode *reqNode, bool& isStandardCont)
 {
+  isStandardCont = false;
   Container *container(reqNode->getContainer());
-  Engines::Container_var objContainer=Engines::Container::_nil();
+  Engines::Container_var objContainer(Engines::Container::_nil());
   if(!container)
-    throw Exception("No container specified !");
+    throw YACS::Exception("No container specified !");
   SalomeContainer *containerCast0(dynamic_cast<SalomeContainer *>(container));
   SalomeHPContainer *containerCast1(dynamic_cast<SalomeHPContainer *>(container));
   if(containerCast0)
-    objContainer=containerCast0->getContainerPtr(reqNode);
+    {
+      isStandardCont = true;
+      objContainer=containerCast0->getContainerPtr(reqNode);
+    }
   else if(containerCast1)
     {
       YACS::BASES::AutoCppPtr<SalomeContainerTmpForHP> tmpCont(SalomeContainerTmpForHP::BuildFrom(containerCast1,reqNode));
       objContainer=tmpCont->getContainerPtr(reqNode);
     }
   else
-    throw Exception("Unrecognized type of container ! Salome one is expected for PythonNode/PyFuncNode !");
+    throw YACS::Exception("Unrecognized type of container ! Salome one is expected for PythonNode/PyFuncNode !");
   if(CORBA::is_nil(objContainer))
-    throw Exception("Container corba pointer is NULL for PythonNode !");
+    throw YACS::Exception("Container corba pointer is NULL for PythonNode !");
+  return objContainer;
+}
+
+Engines::Container_var PythonEntry::commonRemoteLoadPart2(InlineNode *reqNode, bool& isInitializeRequested)
+{
+  bool isStandardCont(true);
+  Engines::Container_var objContainer(GetContainerObj(reqNode,isStandardCont));
   isInitializeRequested=false;
   try
   {
-      if(containerCast0)
+      if(isStandardCont)
         {
           createRemoteAdaptedPyInterpretor(objContainer);
         }
@@ -171,7 +195,8 @@ void PythonEntry::commonRemoteLoadPart3(InlineNode *reqNode, Engines::Container_
     AutoGIL agil;
     const char *picklizeScript(getSerializationScript());
     PyObject *res=PyRun_String(picklizeScript,Py_file_input,_context,_context);
-    if(res == NULL)
+    PyObject *res2(PyRun_String(SCRIPT_FOR_SIMPLE_SERIALIZATION,Py_file_input,_context,_context));
+    if(res == NULL || res2==NULL)
       {
         std::string errorDetails;
         PyObject* new_stderr = newPyStdOut(errorDetails);
@@ -182,9 +207,10 @@ void PythonEntry::commonRemoteLoadPart3(InlineNode *reqNode, Engines::Container_
         Py_DECREF(new_stderr);
         throw Exception("Error during load");
       }
-    Py_DECREF(res);
+    Py_DECREF(res); Py_DECREF(res2);
     _pyfuncSer=PyDict_GetItemString(_context,"pickleForDistPyth2009");
     _pyfuncUnser=PyDict_GetItemString(_context,"unPickleForDistPyth2009");
+    _pyfuncSimpleSer=PyDict_GetItemString(_context,"pickleForVarSimplePyth2009");
     if(_pyfuncSer == NULL)
       {
         std::string errorDetails;
@@ -207,6 +233,17 @@ void PythonEntry::commonRemoteLoadPart3(InlineNode *reqNode, Engines::Container_
         Py_DECREF(new_stderr);
         throw Exception("Error during load");
       }
+    if(_pyfuncSimpleSer == NULL)
+      {
+        std::string errorDetails;
+        PyObject *new_stderr(newPyStdOut(errorDetails));
+        reqNode->setErrorDetails(errorDetails);
+        PySys_SetObject((char*)"stderr", new_stderr);
+        PyErr_Print();
+        PySys_SetObject((char*)"stderr", PySys_GetObject((char*)"__stderr__"));
+        Py_DECREF(new_stderr);
+        throw Exception("Error during load");
+      }
   }
   if(isInitializeRequested)
     {//This one is called only once at initialization in the container if an init-script is specified.
@@ -271,7 +308,7 @@ void PythonEntry::commonRemoteLoad(InlineNode *reqNode)
   commonRemoteLoadPart3(reqNode,objContainer,isInitializeRequested);
 }
 
-PythonNode::PythonNode(const PythonNode& other, ComposedNode *father):InlineNode(other,father)
+PythonNode::PythonNode(const PythonNode& other, ComposedNode *father):InlineNode(other,father),_autoSqueeze(other._autoSqueeze)
 {
   _implementation=IMPL_NAME;
   {
@@ -311,7 +348,7 @@ PythonNode::~PythonNode()
     }
 }
 
-void PythonNode::checkBasicConsistency() const throw(YACS::Exception)
+void PythonNode::checkBasicConsistency() const 
 {
   DEBTRACE("checkBasicConsistency");
   InlineNode::checkBasicConsistency();
@@ -337,7 +374,7 @@ void PythonNode::checkBasicConsistency() const throw(YACS::Exception)
 void PythonNode::load()
 {
   DEBTRACE( "---------------PyNode::load function---------------" );
-  if(_mode=="remote")
+  if(_mode==PythonNode::REMOTE_NAME)
     loadRemote();
   else
     loadLocal();
@@ -356,7 +393,7 @@ void PythonNode::loadRemote()
 
 void PythonNode::execute()
 {
-  if(_mode=="remote")
+  if(_mode==PythonNode::REMOTE_NAME)
     executeRemote();
   else
     executeLocal();
@@ -401,7 +438,7 @@ void PythonNode::executeRemote()
       //The pickled string may contain NULL characters so use PyString_AsStringAndSize
       char *serializationInputC(0);
       Py_ssize_t len;
-      if (PyString_AsStringAndSize(serializationInput, &serializationInputC, &len))
+      if (PyBytes_AsStringAndSize(serializationInput, &serializationInputC, &len))
         throw Exception("DistributedPythonNode problem in python pickle");
       serializationInputCorba->length(len);
       for(int i=0; i < len ; i++)
@@ -453,7 +490,7 @@ void PythonNode::executeRemote()
   {
       AutoGIL agil;
       PyObject *args(0),*ob(0);
-      PyObject* resultPython=PyString_FromStringAndSize(resultCorbaC,resultCorba->length());
+      PyObject* resultPython=PyBytes_FromStringAndSize(resultCorbaC,resultCorba->length());
       delete [] resultCorbaC;
       args = PyTuple_New(1);
       PyTuple_SetItem(args,0,resultPython);
@@ -509,7 +546,18 @@ void PythonNode::executeRemote()
           _errorDetails=ex.what();
           throw;
       }
+      if(_autoSqueeze)
+        squeezeMemoryRemote();
   }
+  //
+  if(!CORBA::is_nil(_pynode))
+    {
+      _pynode->UnRegister();
+    }
+  _pynode = Engines::PyScriptNode::_nil();
+  bool dummy;
+  Engines::Container_var cont(GetContainerObj(this,dummy));
+  cont->removePyScriptNode(getName().c_str());
   DEBTRACE( "++++++++++++++ ENDOF PyNode::executeRemote: " << getName() << " ++++++++++++++++++++" );
 }
 
@@ -550,7 +598,7 @@ void PythonNode::executeLocal()
     PyObject* code=Py_CompileString(_script.c_str(), stream.str().c_str(), Py_file_input);
     if(code == NULL)
       {
-        _errorDetails="";
+        _errorDetails=""; 
         PyObject* new_stderr = newPyStdOut(_errorDetails);
         PySys_SetObject((char*)"stderr", new_stderr);
         PyErr_Print();
@@ -558,7 +606,7 @@ void PythonNode::executeLocal()
         Py_DECREF(new_stderr);
         throw Exception("Error during execution");
       }
-    PyObject *res = PyEval_EvalCode((PyCodeObject *)code, _context, _context);
+    PyObject *res = PyEval_EvalCode(  code, _context, _context);
 
     Py_DECREF(code);
     Py_XDECREF(res);
@@ -612,12 +660,54 @@ void PythonNode::executeLocal()
         _errorDetails=ex.what();
         throw;
     }
-
+    if(_autoSqueeze)
+      squeezeMemory();
     DEBTRACE( "-----------------End PyNode::outputs-----------------" );
   }
   DEBTRACE( "++++++++++++++ End PyNode::execute: " << getName() << " ++++++++++++++++++++" );
 }
 
+void PythonNode::squeezeMemorySafe()
+{
+  AutoGIL agil;
+  if(_mode==PythonNode::REMOTE_NAME)
+    this->squeezeMemoryRemote();
+  else
+    this->squeezeMemory();
+}
+  
+void PythonNode::squeezeMemory()
+{
+  for(auto p : _setOfInputPort)
+    {
+      PyDict_DelItemString(_context,p->getName().c_str());
+      InputPyPort *p2(static_cast<InputPyPort *>(p));
+      if(p2->canSafelySqueezeMemory())
+        p2->put(Py_None);
+    }
+  for(auto p : _setOfOutputPort)
+    {
+      PyDict_DelItemString(_context,p->getName().c_str());
+      OutputPyPort *p2(static_cast<OutputPyPort *>(p));
+      p2->putWithoutForward(Py_None);
+    }
+}
+
+void PythonNode::squeezeMemoryRemote()
+{
+  for(auto p : _setOfInputPort)
+    {
+      InputPyPort *p2(static_cast<InputPyPort *>(p));
+      if(p2->canSafelySqueezeMemory())
+        p2->put(Py_None);
+    }
+  for(auto p : _setOfOutputPort)
+    {
+      OutputPyPort *p2(static_cast<OutputPyPort *>(p));
+      p2->putWithoutForward(Py_None);
+    }
+}
+
 std::string PythonNode::getContainerLog()
 {
   return PythonEntry::GetContainerLog(_mode,_container,this);
@@ -635,6 +725,21 @@ void PythonNode::shutdown(int level)
     }
 }
 
+void PythonNode::imposeResource(const std::string& resource_name,
+                                const std::string& container_name)
+{
+  if(!resource_name.empty() && !container_name.empty())
+  {
+    _imposedResource = resource_name;
+    _imposedContainer = container_name;
+  }
+}
+
+bool PythonNode::canAcceptImposedResource()
+{
+  return _container != nullptr && _container->canAcceptImposedResource();
+}
+
 Node *PythonNode::simpleClone(ComposedNode *father, bool editionOnly) const
 {
   return new PythonNode(*this,father);
@@ -645,11 +750,12 @@ void PythonNode::createRemoteAdaptedPyInterpretor(Engines::Container_ptr objCont
   if(!CORBA::is_nil(_pynode))
     _pynode->UnRegister();
   _pynode=objContainer->createPyScriptNode(getName().c_str(),getScript().c_str());
+  _pynode->Register();
 }
 
 Engines::PyNodeBase_var PythonNode::retrieveDftRemotePyInterpretorIfAny(Engines::Container_ptr objContainer) const
 {
-  Engines::PyScriptNode_var ret(objContainer->getDefaultPyScriptNode());
+  Engines::PyScriptNode_var ret(objContainer->getDefaultPyScriptNode(getName().c_str()));
   if(!CORBA::is_nil(ret))
     {
       ret->Register();
@@ -663,7 +769,10 @@ void PythonNode::assignRemotePyInterpretor(Engines::PyNodeBase_var remoteInterp)
     {
       Engines::PyScriptNode_var tmpp(Engines::PyScriptNode::_narrow(remoteInterp));
       if(_pynode->_is_equivalent(tmpp))
-        return ;
+        {
+          _pynode->UnRegister();
+          return ;
+        }
     }
   if(!CORBA::is_nil(_pynode))
     _pynode->UnRegister();
@@ -699,6 +808,52 @@ PythonNode* PythonNode::cloneNode(const std::string& name)
   return n;
 }
 
+void PythonNode::applyDPLScope(ComposedNode *gfn)
+{
+  std::vector< std::pair<std::string,int> > ret(getDPLScopeInfo(gfn));
+  if(ret.empty())
+    return ;
+  //
+  PyObject *ob(0);
+  {
+    AutoGIL agil;
+    std::size_t sz(ret.size());
+    ob=PyList_New(sz);
+    for(std::size_t i=0;i<sz;i++)
+      {
+        const std::pair<std::string,int>& p(ret[i]);
+        PyObject *elt(PyTuple_New(2));
+        PyTuple_SetItem(elt,0,PyUnicode_FromString(p.first.c_str()));
+        PyTuple_SetItem(elt,1,PyLong_FromLong(p.second));
+        PyList_SetItem(ob,i,elt);
+      }
+  }
+  if(_mode==REMOTE_NAME)
+    {
+      Engines::pickledArgs_var serializationInputCorba(new Engines::pickledArgs);
+      {
+        AutoGIL agil;
+        PyObject *serializationInput(PyObject_CallFunctionObjArgs(_pyfuncSimpleSer,ob,NULL));
+        Py_XDECREF(ob);
+        char *serializationInputC(0);
+        Py_ssize_t len;
+        if (PyBytes_AsStringAndSize(serializationInput, &serializationInputC, &len))
+          throw Exception("DistributedPythonNode problem in python pickle");
+        serializationInputCorba->length(len);
+        for(int i=0; i < len ; i++)
+          serializationInputCorba[i]=serializationInputC[i];
+        Py_XDECREF(serializationInput);
+      }
+      _pynode->defineNewCustomVar(DPL_INFO_NAME,serializationInputCorba);
+    }
+  else
+    {
+      AutoGIL agil;
+      PyDict_SetItemString(_context,DPL_INFO_NAME,ob);
+      Py_XDECREF(ob);
+    }
+}
+
 PyFuncNode::PyFuncNode(const PyFuncNode& other, ComposedNode *father):InlineFuncNode(other,father),_pyfunc(0)
 {
   _implementation = PythonNode::IMPL_NAME;
@@ -743,7 +898,21 @@ PyFuncNode::~PyFuncNode()
     }
 }
 
-void PyFuncNode::checkBasicConsistency() const throw(YACS::Exception)
+void PyFuncNode::init(bool start)
+{
+  initCommonPartWithoutStateManagement(start);
+  if(_state == YACS::DISABLED)
+    {
+      exDisabledState(); // to refresh propagation of DISABLED state
+      return ;
+    }
+  if(start) //complete initialization
+    setState(YACS::READY);
+  else if(_state > YACS::LOADED)// WARNING FuncNode has internal vars (CEA usecase) ! Partial initialization (inside a loop). Exclusivity of funcNode.
+    setState(YACS::TORECONNECT);
+}
+
+void PyFuncNode::checkBasicConsistency() const 
 {
   DEBTRACE("checkBasicConsistency");
   InlineFuncNode::checkBasicConsistency();
@@ -769,7 +938,7 @@ void PyFuncNode::checkBasicConsistency() const throw(YACS::Exception)
 void PyFuncNode::load()
 {
   DEBTRACE( "---------------PyfuncNode::load function---------------" );
-  if(_mode=="remote")
+  if(_mode==PythonNode::REMOTE_NAME)
     loadRemote();
   else
     loadLocal();
@@ -814,7 +983,7 @@ void PyFuncNode::loadLocal()
         Py_DECREF(new_stderr);
         throw Exception("Error during execution");
       }
-    PyObject *res = PyEval_EvalCode((PyCodeObject *)code, _context, _context);
+    PyObject *res = PyEval_EvalCode( code, _context, _context);
     Py_DECREF(code);
     Py_XDECREF(res);
 
@@ -854,7 +1023,7 @@ void PyFuncNode::loadLocal()
 
 void PyFuncNode::execute()
 {
-  if(_mode=="remote")
+  if(_mode==PythonNode::REMOTE_NAME)
     executeRemote();
   else
     executeLocal();
@@ -898,7 +1067,7 @@ void PyFuncNode::executeRemote()
       //The pickled string may contain NULL characters so use PyString_AsStringAndSize
       char *serializationInputC(0);
       Py_ssize_t len;
-      if (PyString_AsStringAndSize(serializationInput, &serializationInputC, &len))
+      if (PyBytes_AsStringAndSize(serializationInput, &serializationInputC, &len))
         throw Exception("DistributedPythonNode problem in python pickle");
 
       serializationInputCorba->length(len);
@@ -936,7 +1105,7 @@ void PyFuncNode::executeRemote()
   {
       AutoGIL agil;
 
-      PyObject *resultPython(PyString_FromStringAndSize(resultCorbaC,resultCorba->length()));
+      PyObject *resultPython(PyBytes_FromStringAndSize(resultCorbaC,resultCorba->length()));
       delete [] resultCorbaC;
       PyObject *args(PyTuple_New(1)),*ob(0);
       PyTuple_SetItem(args,0,resultPython);
@@ -1117,7 +1286,7 @@ void PyFuncNode::createRemoteAdaptedPyInterpretor(Engines::Container_ptr objCont
 
 Engines::PyNodeBase_var PyFuncNode::retrieveDftRemotePyInterpretorIfAny(Engines::Container_ptr objContainer) const
 {
-  Engines::PyNode_var ret(objContainer->getDefaultPyNode());
+  Engines::PyNode_var ret(objContainer->getDefaultPyNode(getName().c_str()));
   if(!CORBA::is_nil(ret))
     {
       ret->Register();
@@ -1181,3 +1350,18 @@ void PyFuncNode::shutdown(int level)
     }
 }
 
+void PyFuncNode::imposeResource(const std::string& resource_name,
+                                const std::string& container_name)
+{
+  if(!resource_name.empty() && !container_name.empty())
+  {
+    _imposedResource = resource_name;
+    _imposedContainer = container_name;
+  }
+}
+
+bool PyFuncNode::canAcceptImposedResource()
+{
+  return _container != nullptr && _container->canAcceptImposedResource();
+}
+