-// Copyright (C) 2006-2021 CEA/DEN, EDF R&D
+// Copyright (C) 2006-2023 CEA, EDF
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Lesser General Public
#include "PythonNode.hxx"
#include "PythonPorts.hxx"
#include "TypeCode.hxx"
-#include "AutoGIL.hxx"
+#include "PythonCppUtils.hxx"
#include "Container.hxx"
#include "SalomeContainer.hxx"
#include "SalomeHPContainer.hxx"
#include "SalomeContainerTmpForHP.hxx"
#include "ConversionException.hxx"
+#include "ReceiverFactory.hxx"
+#include "SenderByteImpl.hxx"
#include "PyStdout.hxx"
#include <iostream>
" return pickle.dumps(val,-1)\n"
"\n";
+PyObject *PythonEntry::_pyClsBigObject = nullptr;
+
const char PythonNode::IMPL_NAME[]="Python";
const char PythonNode::KIND[]="Python";
" args=pickle.loads(st)\n"
" return args\n";
+static char SCRIPT_FOR_BIGOBJECT[]="import SALOME_PyNode\n"
+ "BigObjectOnDiskBase = SALOME_PyNode.BigObjectOnDiskBase\n";
+
+// pickle.load concurrency issue : see https://bugs.python.org/issue12680
+#if PY_VERSION_HEX < 0x03070000
+#include <mutex>
+static std::mutex data_mutex;
+#endif
+
PythonEntry::PythonEntry():_context(0),_pyfuncSer(0),_pyfuncUnser(0),_pyfuncSimpleSer(0)
{
}
Engines::PyNodeBase_var pynode(getRemoteInterpreterHandle());
///
{
+#if PY_VERSION_HEX < 0x03070000
+ std::unique_lock<std::mutex> lock(data_mutex);
+#endif
AutoGIL agil;
const char *picklizeScript(getSerializationScript());
PyObject *res=PyRun_String(picklizeScript,Py_file_input,_context,_context);
throw Exception("Error during load");
}
Py_DECREF(res); Py_DECREF(res2);
+ AutoPyRef res3(PyRun_String(SCRIPT_FOR_BIGOBJECT,Py_file_input,_context,_context));
_pyfuncSer=PyDict_GetItemString(_context,"pickleForDistPyth2009");
_pyfuncUnser=PyDict_GetItemString(_context,"unPickleForDistPyth2009");
_pyfuncSimpleSer=PyDict_GetItemString(_context,"pickleForVarSimplePyth2009");
+ if(! _pyClsBigObject )
+ {
+ _pyClsBigObject=PyDict_GetItemString(_context,"BigObjectOnDiskBase");
+ Py_INCREF(_pyClsBigObject);
+ }
if(_pyfuncSer == NULL)
{
std::string errorDetails;
return !_imposedResource.empty() && !_imposedContainer.empty();
}
-PythonNode::PythonNode(const PythonNode& other, ComposedNode *father):InlineNode(other,father),_autoSqueeze(other._autoSqueeze)
+bool PythonEntry::IsProxy( PyObject *ob )
+{
+ if(!_pyClsBigObject)
+ return false;
+ if( PyObject_IsInstance( ob, _pyClsBigObject) == 1 )
+ {
+ return true;
+ }
+ else
+ {
+ if( PyList_Check( ob ) )
+ {
+ auto sz = PyList_Size( ob );
+ for( auto i = 0 ; i < sz ; ++i )
+ {
+ PyObject *elt = PyList_GetItem( ob, i );
+ if( PythonEntry::IsProxy(elt) )
+ return true;
+ }
+ }
+ }
+ return false;
+}
+
+bool PythonEntry::GetDestroyStatus( PyObject *ob )
+{
+ if(!_pyClsBigObject)
+ return false;
+ if( PyObject_IsInstance( ob, _pyClsBigObject) == 1 )
+ {
+ AutoPyRef unlinkOnDestructor = PyObject_GetAttrString(ob,"getDestroyStatus");
+ AutoPyRef tmp = PyObject_CallFunctionObjArgs(unlinkOnDestructor,nullptr);
+ if( PyBool_Check(tmp.get()) )
+ {
+ return tmp.get() == Py_True;
+ }
+ return false;
+ }
+ else
+ {
+ if( PyList_Check( ob ) )
+ {
+ auto sz = PyList_Size( ob );
+ for( auto i = 0 ; i < sz ; ++i )
+ {
+ PyObject *elt = PyList_GetItem( ob, i );
+ if( PythonEntry::GetDestroyStatus(elt) )
+ return true;
+ }
+ }
+ }
+ return false;
+}
+
+void PythonEntry::IfProxyDoSomething( PyObject *ob, const char *meth )
+{
+ if(!_pyClsBigObject)
+ return ;
+ if( PyObject_IsInstance( ob, _pyClsBigObject) == 1 )
+ {
+ AutoPyRef unlinkOnDestructor = PyObject_GetAttrString(ob,meth);
+ AutoPyRef tmp = PyObject_CallFunctionObjArgs(unlinkOnDestructor,nullptr);
+ }
+ else
+ {
+ if( PyList_Check( ob ) )
+ {
+ auto sz = PyList_Size( ob );
+ for( auto i = 0 ; i < sz ; ++i )
+ {
+ PyObject *elt = PyList_GetItem( ob, i );
+ PythonEntry::IfProxyDoSomething( elt, meth );
+ }
+ }
+ }
+}
+
+void PythonEntry::DoNotTouchFileIfProxy( PyObject *ob )
+{
+ IfProxyDoSomething(ob,"doNotTouchFile");
+}
+
+void PythonEntry::UnlinkOnDestructorIfProxy( PyObject *ob )
+{
+ IfProxyDoSomething(ob,"unlinkOnDestructor");
+}
+
+PythonNode::PythonNode(const PythonNode& other, ComposedNode *father):
+InlineNode(other,father),_autoSqueeze(other._autoSqueeze),_nonSqueezableOutputNodes(other._nonSqueezableOutputNodes)
{
_pynode = Engines::PyScriptNode::_nil();
_implementation=IMPL_NAME;
loadPythonAdapter(this,dummy);
_pynode->assignNewCompiledCode(getScript().c_str());
}
- //
- std::unique_ptr<Engines::pickledArgs> serializationInputCorba(new Engines::pickledArgs);
+ // not managed by unique_ptr here because destructed by the order of client.
+ SenderByteImpl *serializationInputCorba = nullptr;
AutoPyRef serializationInput;
{
+#if PY_VERSION_HEX < 0x03070000
+ std::unique_lock<std::mutex> lock(data_mutex);
+#endif
AutoGIL agil;
PyObject *args(0),*ob(0);
//===========================================================================
serializationInput.set(PyObject_CallFunctionObjArgs(_pyfuncSer,args,nullptr));
Py_DECREF(args);
//The pickled string may contain NULL characters so use PyString_AsStringAndSize
- char *serializationInputC(0);
+ char *serializationInputC(nullptr);
Py_ssize_t len;
if (PyBytes_AsStringAndSize(serializationInput, &serializationInputC, &len))
throw Exception("DistributedPythonNode problem in python pickle");
// no copy here. The C byte array of Python is taken as this into CORBA sequence to avoid copy
- serializationInputCorba.reset(new Engines::pickledArgs(len,len,reinterpret_cast<CORBA::Octet *>(serializationInputC),0));
+ serializationInputCorba = new SenderByteImpl(serializationInputC,len);
}
//get the list of output argument names
// Execute in remote Python node
//===========================================================================
DEBTRACE( "-----------------starting remote python invocation-----------------" );
- std::unique_ptr<Engines::pickledArgs> resultCorba;
+ std::unique_ptr<SALOME::SenderByteSeq> resultCorba;
try
{
//pass outargsname and dict serialized
- _pynode->executeFirst(*(serializationInputCorba.get()));
+ SALOME::SenderByte_var serializationInputRef = serializationInputCorba->_this();
+ _pynode->executeFirst(serializationInputRef);
//serializationInput and serializationInputCorba are no more needed for server. Release it.
- serializationInputCorba.reset(nullptr); serializationInput.set(nullptr);
- resultCorba.reset(_pynode->executeSecond(myseq));
+ serializationInput.set(nullptr);
+ resultCorba.reset( _pynode->executeSecond(myseq) );
}
catch( const SALOME::SALOME_Exception& ex )
{
_errorDetails=msg.str();
throw Exception(msg.str());
}
-// if(!CORBA::is_nil(_pynode))
-// {
-// _pynode->UnRegister();
-// }
-// _pynode = Engines::PyScriptNode::_nil();
-// //
-// bool dummy;
-// Engines::Container_var cont(GetContainerObj(this,dummy));
-// cont->removePyScriptNode(getName().c_str());
+ catch(CORBA::COMM_FAILURE& ex)
+ {
+ std::ostringstream msg;
+ msg << "Exception on remote python invocation." << std::endl ;
+ msg << "Caught system exception COMM_FAILURE -- unable to contact the "
+ << "object." << std::endl;
+ _errorDetails=msg.str();
+ throw Exception(msg.str());
+ }
+ catch(CORBA::SystemException& ex)
+ {
+ std::ostringstream msg;
+ msg << "Exception on remote python invocation." << std::endl ;
+ msg << "Caught a CORBA::SystemException." ;
+ CORBA::Any tmp;
+ tmp <<= ex;
+ CORBA::TypeCode_var tc = tmp.type();
+ const char *p = tc->name();
+ if ( *p != '\0' )
+ msg <<p;
+ else
+ msg << tc->id();
+ msg << std::endl;
+ _errorDetails=msg.str();
+ throw Exception(msg.str());
+ }
+ catch(CORBA::Exception& ex)
+ {
+ std::ostringstream msg;
+ msg << "Exception on remote python invocation." << std::endl ;
+ msg << "Caught CORBA::Exception. " ;
+ CORBA::Any tmp;
+ tmp <<= ex;
+ CORBA::TypeCode_var tc = tmp.type();
+ const char *p = tc->name();
+ if ( *p != '\0' )
+ msg <<p;
+ else
+ msg << tc->id();
+ msg << std::endl;
+ _errorDetails=msg.str();
+ throw Exception(msg.str());
+ }
+ catch(omniORB::fatalException& fe)
+ {
+ std::ostringstream msg;
+ msg << "Exception on remote python invocation." << std::endl ;
+ msg << "Caught omniORB::fatalException:" << std::endl;
+ msg << " file: " << fe.file() << std::endl;
+ msg << " line: " << fe.line() << std::endl;
+ msg << " mesg: " << fe.errmsg() << std::endl;
+ _errorDetails=msg.str();
+ throw Exception(msg.str());
+ }
DEBTRACE( "-----------------end of remote python invocation-----------------" );
//===========================================================================
// Get results, unpickle and put them in output ports
//===========================================================================
- auto length(resultCorba->length());
- char *resultCorbaC(reinterpret_cast<char *>(resultCorba->get_buffer()));
{
+#if PY_VERSION_HEX < 0x03070000
+ std::unique_lock<std::mutex> lock(data_mutex);
+#endif
AutoGIL agil;
- PyObject *args(0),*ob(0);
- PyObject* resultPython=PyMemoryView_FromMemory(resultCorbaC,length,PyBUF_READ);
- args = PyTuple_New(1);
- PyTuple_SetItem(args,0,resultPython);
- PyObject *finalResult=PyObject_CallObject(_pyfuncUnser,args);
- resultCorba.reset(nullptr);
- Py_DECREF(args);
-
- if (finalResult == NULL)
- {
- std::stringstream msg;
- msg << "Conversion with pickle of output ports failed !";
- msg << " : " << __FILE__ << ":" << __LINE__;
- _errorDetails=msg.str();
- throw YACS::ENGINE::ConversionException(msg.str());
- }
-
DEBTRACE( "-----------------PythonNode::outputs-----------------" );
- int nres=1;
- if(finalResult == Py_None)
- nres=0;
- else if(PyTuple_Check(finalResult))
- nres=PyTuple_Size(finalResult);
+ int nres( resultCorba->length() );
if(getNumberOfOutputPorts() != nres)
{
std::string msg="Number of output arguments : Mismatch between definition and execution";
- Py_DECREF(finalResult);
_errorDetails=msg;
throw Exception(msg);
}
-
pos=0;
try
{
DEBTRACE( "port name: " << p->getName() );
DEBTRACE( "port kind: " << p->edGetType()->kind() );
DEBTRACE( "port pos : " << pos );
- if(PyTuple_Check(finalResult))
- ob=PyTuple_GetItem(finalResult,pos) ;
- else
- ob=finalResult;
- DEBTRACE( "ob refcnt: " << ob->ob_refcnt );
- p->put(ob);
+ SALOME::SenderByte_var elt = (*resultCorba)[pos];
+ SeqByteReceiver recv(elt);
+ unsigned long length = 0;
+ char *resultCorbaC = recv.data(length);
+ {
+ AutoPyRef resultPython=PyMemoryView_FromMemory(resultCorbaC,length,PyBUF_READ);
+ AutoPyRef args = PyTuple_New(1);
+ PyTuple_SetItem(args,0,resultPython.retn());
+ AutoPyRef ob = PyObject_CallObject(_pyfuncUnser,args);
+ if (!ob)
+ {
+ std::stringstream msg;
+ msg << "Conversion with pickle of output ports failed !";
+ msg << " : " << __FILE__ << ":" << __LINE__;
+ _errorDetails=msg.str();
+ throw YACS::ENGINE::ConversionException(msg.str());
+ }
+ UnlinkOnDestructorIfProxy(ob);
+ p->put( ob );
+ }
pos++;
}
- Py_DECREF(finalResult);
}
catch(ConversionException& ex)
{
- Py_DECREF(finalResult);
_errorDetails=ex.what();
throw;
}
DEBTRACE( "++++++++++++++ ENDOF PyNode::executeRemote: " << getName() << " ++++++++++++++++++++" );
}
+void PythonNode::ExecuteLocalInternal(const std::string& codeStr, PyObject *context, std::string& errorDetails)
+{
+ DEBTRACE( code );
+ DEBTRACE( "context refcnt: " << context->ob_refcnt );
+ std::ostringstream stream;
+ stream << "/tmp/PythonNode_";
+ stream << getpid();
+ AutoPyRef code=Py_CompileString(codeStr.c_str(), stream.str().c_str(), Py_file_input);
+ if(code == NULL)
+ {
+ errorDetails="";
+ AutoPyRef new_stderr = newPyStdOut(errorDetails);
+ PySys_SetObject((char*)"stderr", new_stderr);
+ PyErr_Print();
+ PySys_SetObject((char*)"stderr", PySys_GetObject((char*)"__stderr__"));
+ throw YACS::Exception("Error during execution");
+ }
+ {
+ AutoPyRef res = PyEval_EvalCode( code, context, context);
+ }
+ DEBTRACE( "context refcnt: " << context->ob_refcnt );
+ fflush(stdout);
+ fflush(stderr);
+ if(PyErr_Occurred ())
+ {
+ errorDetails="";
+ AutoPyRef new_stderr = newPyStdOut(errorDetails);
+ PySys_SetObject((char*)"stderr", new_stderr);
+ ofstream errorfile(stream.str().c_str());
+ if (errorfile.is_open())
+ {
+ errorfile << codeStr;
+ errorfile.close();
+ }
+ PyErr_Print();
+ PySys_SetObject((char*)"stderr", PySys_GetObject((char*)"__stderr__"));
+ throw YACS::Exception("Error during execution");
+ }
+}
+
+void PythonNode::executeLocalInternal(const std::string& codeStr)
+{
+ ExecuteLocalInternal(codeStr,_context,_errorDetails);
+}
+
void PythonNode::executeLocal()
{
DEBTRACE( "++++++++++++++ PyNode::executeLocal: " << getName() << " ++++++++++++++++++++" );
{
AutoGIL agil;
-
+ std::ostringstream unpxy; unpxy << "from SALOME_PyNode import UnProxyObjectSimpleLocal" << std::endl;
DEBTRACE( "---------------PyNode::inputs---------------" );
list<InputPort *>::iterator iter2;
for(iter2 = _setOfInputPort.begin(); iter2 != _setOfInputPort.end(); iter2++)
DEBTRACE( "port name: " << p->getName() );
DEBTRACE( "port kind: " << p->edGetType()->kind() );
PyObject* ob=p->getPyObj();
- DEBTRACE( "ob refcnt: " << ob->ob_refcnt );
+ DEBTRACE( "ob refcnt: " << ob->ob_refcnt );
+ unpxy << p->getName() << " = UnProxyObjectSimpleLocal( " << p->getName() << " )" << std::endl;
#ifdef _DEVDEBUG_
PyObject_Print(ob,stderr,Py_PRINT_RAW);
cerr << endl;
//calculation
DEBTRACE( "----------------PyNode::calculation---------------" );
- DEBTRACE( _script );
- DEBTRACE( "_context refcnt: " << _context->ob_refcnt );
+
+ if( ! getSqueezeStatus() )
+ executeLocalInternal( unpxy.str() );
- std::ostringstream stream;
- stream << "/tmp/PythonNode_";
- stream << getpid();
-
- PyObject* code=Py_CompileString(_script.c_str(), stream.str().c_str(), Py_file_input);
- if(code == NULL)
- {
- _errorDetails="";
- PyObject* new_stderr = newPyStdOut(_errorDetails);
- PySys_SetObject((char*)"stderr", new_stderr);
- PyErr_Print();
- PySys_SetObject((char*)"stderr", PySys_GetObject((char*)"__stderr__"));
- Py_DECREF(new_stderr);
- throw Exception("Error during execution");
- }
- PyObject *res = PyEval_EvalCode( code, _context, _context);
-
- Py_DECREF(code);
- Py_XDECREF(res);
- DEBTRACE( "_context refcnt: " << _context->ob_refcnt );
- fflush(stdout);
- fflush(stderr);
- if(PyErr_Occurred ())
- {
- _errorDetails="";
- PyObject* new_stderr = newPyStdOut(_errorDetails);
- PySys_SetObject((char*)"stderr", new_stderr);
- ofstream errorfile(stream.str().c_str());
- if (errorfile.is_open())
- {
- errorfile << _script;
- errorfile.close();
- }
- PyErr_Print();
- PySys_SetObject((char*)"stderr", PySys_GetObject((char*)"__stderr__"));
- Py_DECREF(new_stderr);
- throw Exception("Error during execution");
- }
+ executeLocalInternal( _script );
DEBTRACE( "-----------------PyNode::outputs-----------------" );
list<OutputPort *>::iterator iter;
cerr << endl;
#endif
p->put(ob);
+ if(!isUsingPythonCache())
+ PyDict_DelItemString(_context,p->getName().c_str());
}
}
catch(ConversionException& ex)
if(_autoSqueeze)
squeezeMemory();
DEBTRACE( "-----------------End PyNode::outputs-----------------" );
+ if(!isUsingPythonCache())
+ {
+ for(iter2 = _setOfInputPort.begin(); iter2 != _setOfInputPort.end(); iter2++)
+ {
+ AutoPyRef pStr = PyUnicode_FromString( (*iter2)->getName().c_str() );
+ if( PyDict_Contains(_context,pStr) == 1 )
+ { PyDict_DelItem(_context,pStr); }
+ }
+ }
}
DEBTRACE( "++++++++++++++ End PyNode::execute: " << getName() << " ++++++++++++++++++++" );
}
+/*!
+ * [EDF28562]
+ * \param in squeezeExceptions list on output port name excluded from the squeeze mecanism
+ */
+void PythonNode::setSqueezeStatusWithExceptions(bool sqStatus, const std::vector<std::string>& squeezeExceptions)
+{
+ this->setSqueezeStatus(sqStatus);
+ this->_nonSqueezableOutputNodes = std::set<std::string>(squeezeExceptions.begin(), squeezeExceptions.end());
+}
+
void PythonNode::squeezeMemorySafe()
{
AutoGIL agil;
}
for(auto p : _setOfOutputPort)
{
+ if (!this->_nonSqueezableOutputNodes.empty() && this->_nonSqueezableOutputNodes.find(p->getName()) != this->_nonSqueezableOutputNodes.end())
+ continue;
PyDict_DelItemString(_context,p->getName().c_str());
OutputPyPort *p2(static_cast<OutputPyPort *>(p));
p2->putWithoutForward(Py_None);
}
for(auto p : _setOfOutputPort)
{
+ if (!this->_nonSqueezableOutputNodes.empty() && this->_nonSqueezableOutputNodes.find(p->getName()) != this->_nonSqueezableOutputNodes.end())
+ continue;
OutputPyPort *p2(static_cast<OutputPyPort *>(p));
p2->putWithoutForward(Py_None);
}
//
Engines::pickledArgs_var serializationInputCorba(new Engines::pickledArgs);;
{
+#if PY_VERSION_HEX < 0x03070000
+ std::unique_lock<std::mutex> lock(data_mutex);
+#endif
AutoGIL agil;
PyObject *ob(0);
//===========================================================================
_errorDetails=msg;
throw Exception(msg);
}
+ catch(CORBA::COMM_FAILURE& ex)
+ {
+ std::ostringstream msg;
+ msg << "Exception on remote python invocation." << std::endl ;
+ msg << "Caught system exception COMM_FAILURE -- unable to contact the "
+ << "object." << std::endl;
+ _errorDetails=msg.str();
+ throw Exception(msg.str());
+ }
+ catch(CORBA::SystemException& ex)
+ {
+ std::ostringstream msg;
+ msg << "Exception on remote python invocation." << std::endl ;
+ msg << "Caught a CORBA::SystemException." ;
+ CORBA::Any tmp;
+ tmp <<= ex;
+ CORBA::TypeCode_var tc = tmp.type();
+ const char *p = tc->name();
+ if ( *p != '\0' )
+ msg <<p;
+ else
+ msg << tc->id();
+ msg << std::endl;
+ _errorDetails=msg.str();
+ throw Exception(msg.str());
+ }
+ catch(CORBA::Exception& ex)
+ {
+ std::ostringstream msg;
+ msg << "Exception on remote python invocation." << std::endl ;
+ msg << "Caught CORBA::Exception. " ;
+ CORBA::Any tmp;
+ tmp <<= ex;
+ CORBA::TypeCode_var tc = tmp.type();
+ const char *p = tc->name();
+ if ( *p != '\0' )
+ msg <<p;
+ else
+ msg << tc->id();
+ msg << std::endl;
+ _errorDetails=msg.str();
+ throw Exception(msg.str());
+ }
+ catch(omniORB::fatalException& fe)
+ {
+ std::ostringstream msg;
+ msg << "Exception on remote python invocation." << std::endl ;
+ msg << "Caught omniORB::fatalException:" << std::endl;
+ msg << " file: " << fe.file() << std::endl;
+ msg << " line: " << fe.line() << std::endl;
+ msg << " mesg: " << fe.errmsg() << std::endl;
+ _errorDetails=msg.str();
+ throw Exception(msg.str());
+ }
DEBTRACE( "-----------------end of remote python invocation-----------------" );
//===========================================================================
// Get results, unpickle and put them in output ports
resultCorbaC[i]=resultCorba[i];
{
+#if PY_VERSION_HEX < 0x03070000
+ std::unique_lock<std::mutex> lock(data_mutex);
+#endif
AutoGIL agil;
PyObject *resultPython(PyBytes_FromStringAndSize(resultCorbaC,resultCorba->length()));