X-Git-Url: http://git.salome-platform.org/gitweb/?a=blobdiff_plain;ds=sidebyside;f=src%2Fruntime%2FPythonNode.cxx;h=57a13a016448a21fbd9322d87bd9550823187f5d;hb=d4af5aa95ba95d6e4b301323050ccd0d843d67a8;hp=42eae744604a18cae10c4709ccda5409cbc5f265;hpb=fc23b8dcb9236e2a86bacc06af8b360ba9c30c69;p=modules%2Fyacs.git diff --git a/src/runtime/PythonNode.cxx b/src/runtime/PythonNode.cxx index 42eae7446..57a13a016 100644 --- a/src/runtime/PythonNode.cxx +++ b/src/runtime/PythonNode.cxx @@ -1,4 +1,4 @@ -// Copyright (C) 2006-2021 CEA/DEN, EDF R&D +// Copyright (C) 2006-2023 CEA, EDF // // This library is free software; you can redistribute it and/or // modify it under the terms of the GNU Lesser General Public @@ -21,12 +21,14 @@ #include "PythonNode.hxx" #include "PythonPorts.hxx" #include "TypeCode.hxx" -#include "AutoGIL.hxx" +#include "PythonCppUtils.hxx" #include "Container.hxx" #include "SalomeContainer.hxx" #include "SalomeHPContainer.hxx" #include "SalomeContainerTmpForHP.hxx" #include "ConversionException.hxx" +#include "ReceiverFactory.hxx" +#include "SenderByteImpl.hxx" #include "PyStdout.hxx" #include @@ -54,6 +56,8 @@ const char PythonEntry::SCRIPT_FOR_SIMPLE_SERIALIZATION[]="import pickle\n" " return pickle.dumps(val,-1)\n" "\n"; +PyObject *PythonEntry::_pyClsBigObject = nullptr; + const char PythonNode::IMPL_NAME[]="Python"; const char PythonNode::KIND[]="Python"; @@ -77,6 +81,15 @@ const char PyFuncNode::SCRIPT_FOR_SERIALIZATION[]="import pickle\n" " args=pickle.loads(st)\n" " return args\n"; +static char SCRIPT_FOR_BIGOBJECT[]="import SALOME_PyNode\n" + "BigObjectOnDiskBase = SALOME_PyNode.BigObjectOnDiskBase\n"; + +// pickle.load concurrency issue : see https://bugs.python.org/issue12680 +#if PY_VERSION_HEX < 0x03070000 +#include +static std::mutex data_mutex; +#endif + PythonEntry::PythonEntry():_context(0),_pyfuncSer(0),_pyfuncUnser(0),_pyfuncSimpleSer(0) { } @@ -185,6 +198,9 @@ void PythonEntry::loadRemoteContext(InlineNode *reqNode, Engines::Container_ptr Engines::PyNodeBase_var pynode(getRemoteInterpreterHandle()); /// { +#if PY_VERSION_HEX < 0x03070000 + std::unique_lock lock(data_mutex); +#endif AutoGIL agil; const char *picklizeScript(getSerializationScript()); PyObject *res=PyRun_String(picklizeScript,Py_file_input,_context,_context); @@ -201,9 +217,15 @@ void PythonEntry::loadRemoteContext(InlineNode *reqNode, Engines::Container_ptr throw Exception("Error during load"); } Py_DECREF(res); Py_DECREF(res2); + AutoPyRef res3(PyRun_String(SCRIPT_FOR_BIGOBJECT,Py_file_input,_context,_context)); _pyfuncSer=PyDict_GetItemString(_context,"pickleForDistPyth2009"); _pyfuncUnser=PyDict_GetItemString(_context,"unPickleForDistPyth2009"); _pyfuncSimpleSer=PyDict_GetItemString(_context,"pickleForVarSimplePyth2009"); + if(! _pyClsBigObject ) + { + _pyClsBigObject=PyDict_GetItemString(_context,"BigObjectOnDiskBase"); + Py_INCREF(_pyClsBigObject); + } if(_pyfuncSer == NULL) { std::string errorDetails; @@ -306,7 +328,95 @@ bool PythonEntry::hasImposedResource()const return !_imposedResource.empty() && !_imposedContainer.empty(); } -PythonNode::PythonNode(const PythonNode& other, ComposedNode *father):InlineNode(other,father),_autoSqueeze(other._autoSqueeze) +bool PythonEntry::IsProxy( PyObject *ob ) +{ + if(!_pyClsBigObject) + return false; + if( PyObject_IsInstance( ob, _pyClsBigObject) == 1 ) + { + return true; + } + else + { + if( PyList_Check( ob ) ) + { + auto sz = PyList_Size( ob ); + for( auto i = 0 ; i < sz ; ++i ) + { + PyObject *elt = PyList_GetItem( ob, i ); + if( PythonEntry::IsProxy(elt) ) + return true; + } + } + } + return false; +} + +bool PythonEntry::GetDestroyStatus( PyObject *ob ) +{ + if(!_pyClsBigObject) + return false; + if( PyObject_IsInstance( ob, _pyClsBigObject) == 1 ) + { + AutoPyRef unlinkOnDestructor = PyObject_GetAttrString(ob,"getDestroyStatus"); + AutoPyRef tmp = PyObject_CallFunctionObjArgs(unlinkOnDestructor,nullptr); + if( PyBool_Check(tmp.get()) ) + { + return tmp.get() == Py_True; + } + return false; + } + else + { + if( PyList_Check( ob ) ) + { + auto sz = PyList_Size( ob ); + for( auto i = 0 ; i < sz ; ++i ) + { + PyObject *elt = PyList_GetItem( ob, i ); + if( PythonEntry::GetDestroyStatus(elt) ) + return true; + } + } + } + return false; +} + +void PythonEntry::IfProxyDoSomething( PyObject *ob, const char *meth ) +{ + if(!_pyClsBigObject) + return ; + if( PyObject_IsInstance( ob, _pyClsBigObject) == 1 ) + { + AutoPyRef unlinkOnDestructor = PyObject_GetAttrString(ob,meth); + AutoPyRef tmp = PyObject_CallFunctionObjArgs(unlinkOnDestructor,nullptr); + } + else + { + if( PyList_Check( ob ) ) + { + auto sz = PyList_Size( ob ); + for( auto i = 0 ; i < sz ; ++i ) + { + PyObject *elt = PyList_GetItem( ob, i ); + PythonEntry::IfProxyDoSomething( elt, meth ); + } + } + } +} + +void PythonEntry::DoNotTouchFileIfProxy( PyObject *ob ) +{ + IfProxyDoSomething(ob,"doNotTouchFile"); +} + +void PythonEntry::UnlinkOnDestructorIfProxy( PyObject *ob ) +{ + IfProxyDoSomething(ob,"unlinkOnDestructor"); +} + +PythonNode::PythonNode(const PythonNode& other, ComposedNode *father): +InlineNode(other,father),_autoSqueeze(other._autoSqueeze),_nonSqueezableOutputNodes(other._nonSqueezableOutputNodes) { _pynode = Engines::PyScriptNode::_nil(); _implementation=IMPL_NAME; @@ -408,10 +518,13 @@ void PythonNode::executeRemote() loadPythonAdapter(this,dummy); _pynode->assignNewCompiledCode(getScript().c_str()); } - // - std::unique_ptr serializationInputCorba(new Engines::pickledArgs); + // not managed by unique_ptr here because destructed by the order of client. + SenderByteImpl *serializationInputCorba = nullptr; AutoPyRef serializationInput; { +#if PY_VERSION_HEX < 0x03070000 + std::unique_lock lock(data_mutex); +#endif AutoGIL agil; PyObject *args(0),*ob(0); //=========================================================================== @@ -434,12 +547,12 @@ void PythonNode::executeRemote() serializationInput.set(PyObject_CallFunctionObjArgs(_pyfuncSer,args,nullptr)); Py_DECREF(args); //The pickled string may contain NULL characters so use PyString_AsStringAndSize - char *serializationInputC(0); + char *serializationInputC(nullptr); Py_ssize_t len; if (PyBytes_AsStringAndSize(serializationInput, &serializationInputC, &len)) throw Exception("DistributedPythonNode problem in python pickle"); // no copy here. The C byte array of Python is taken as this into CORBA sequence to avoid copy - serializationInputCorba.reset(new Engines::pickledArgs(len,len,reinterpret_cast(serializationInputC),0)); + serializationInputCorba = new SenderByteImpl(serializationInputC,len); } //get the list of output argument names @@ -460,14 +573,15 @@ void PythonNode::executeRemote() // Execute in remote Python node //=========================================================================== DEBTRACE( "-----------------starting remote python invocation-----------------" ); - std::unique_ptr resultCorba; + std::unique_ptr resultCorba; try { //pass outargsname and dict serialized - _pynode->executeFirst(*(serializationInputCorba.get())); + SALOME::SenderByte_var serializationInputRef = serializationInputCorba->_this(); + _pynode->executeFirst(serializationInputRef); //serializationInput and serializationInputCorba are no more needed for server. Release it. - serializationInputCorba.reset(nullptr); serializationInput.set(nullptr); - resultCorba.reset(_pynode->executeSecond(myseq)); + serializationInput.set(nullptr); + resultCorba.reset( _pynode->executeSecond(myseq) ); } catch( const SALOME::SALOME_Exception& ex ) { @@ -485,55 +599,78 @@ void PythonNode::executeRemote() _errorDetails=msg.str(); throw Exception(msg.str()); } -// if(!CORBA::is_nil(_pynode)) -// { -// _pynode->UnRegister(); -// } -// _pynode = Engines::PyScriptNode::_nil(); -// // -// bool dummy; -// Engines::Container_var cont(GetContainerObj(this,dummy)); -// cont->removePyScriptNode(getName().c_str()); + catch(CORBA::COMM_FAILURE& ex) + { + std::ostringstream msg; + msg << "Exception on remote python invocation." << std::endl ; + msg << "Caught system exception COMM_FAILURE -- unable to contact the " + << "object." << std::endl; + _errorDetails=msg.str(); + throw Exception(msg.str()); + } + catch(CORBA::SystemException& ex) + { + std::ostringstream msg; + msg << "Exception on remote python invocation." << std::endl ; + msg << "Caught a CORBA::SystemException." ; + CORBA::Any tmp; + tmp <<= ex; + CORBA::TypeCode_var tc = tmp.type(); + const char *p = tc->name(); + if ( *p != '\0' ) + msg <id(); + msg << std::endl; + _errorDetails=msg.str(); + throw Exception(msg.str()); + } + catch(CORBA::Exception& ex) + { + std::ostringstream msg; + msg << "Exception on remote python invocation." << std::endl ; + msg << "Caught CORBA::Exception. " ; + CORBA::Any tmp; + tmp <<= ex; + CORBA::TypeCode_var tc = tmp.type(); + const char *p = tc->name(); + if ( *p != '\0' ) + msg <id(); + msg << std::endl; + _errorDetails=msg.str(); + throw Exception(msg.str()); + } + catch(omniORB::fatalException& fe) + { + std::ostringstream msg; + msg << "Exception on remote python invocation." << std::endl ; + msg << "Caught omniORB::fatalException:" << std::endl; + msg << " file: " << fe.file() << std::endl; + msg << " line: " << fe.line() << std::endl; + msg << " mesg: " << fe.errmsg() << std::endl; + _errorDetails=msg.str(); + throw Exception(msg.str()); + } DEBTRACE( "-----------------end of remote python invocation-----------------" ); //=========================================================================== // Get results, unpickle and put them in output ports //=========================================================================== - auto length(resultCorba->length()); - char *resultCorbaC(reinterpret_cast(resultCorba->get_buffer())); { +#if PY_VERSION_HEX < 0x03070000 + std::unique_lock lock(data_mutex); +#endif AutoGIL agil; - PyObject *args(0),*ob(0); - PyObject* resultPython=PyMemoryView_FromMemory(resultCorbaC,length,PyBUF_READ); - args = PyTuple_New(1); - PyTuple_SetItem(args,0,resultPython); - PyObject *finalResult=PyObject_CallObject(_pyfuncUnser,args); - resultCorba.reset(nullptr); - Py_DECREF(args); - - if (finalResult == NULL) - { - std::stringstream msg; - msg << "Conversion with pickle of output ports failed !"; - msg << " : " << __FILE__ << ":" << __LINE__; - _errorDetails=msg.str(); - throw YACS::ENGINE::ConversionException(msg.str()); - } - DEBTRACE( "-----------------PythonNode::outputs-----------------" ); - int nres=1; - if(finalResult == Py_None) - nres=0; - else if(PyTuple_Check(finalResult)) - nres=PyTuple_Size(finalResult); + int nres( resultCorba->length() ); if(getNumberOfOutputPorts() != nres) { std::string msg="Number of output arguments : Mismatch between definition and execution"; - Py_DECREF(finalResult); _errorDetails=msg; throw Exception(msg); } - pos=0; try { @@ -543,19 +680,31 @@ void PythonNode::executeRemote() DEBTRACE( "port name: " << p->getName() ); DEBTRACE( "port kind: " << p->edGetType()->kind() ); DEBTRACE( "port pos : " << pos ); - if(PyTuple_Check(finalResult)) - ob=PyTuple_GetItem(finalResult,pos) ; - else - ob=finalResult; - DEBTRACE( "ob refcnt: " << ob->ob_refcnt ); - p->put(ob); + SALOME::SenderByte_var elt = (*resultCorba)[pos]; + SeqByteReceiver recv(elt); + unsigned long length = 0; + char *resultCorbaC = recv.data(length); + { + AutoPyRef resultPython=PyMemoryView_FromMemory(resultCorbaC,length,PyBUF_READ); + AutoPyRef args = PyTuple_New(1); + PyTuple_SetItem(args,0,resultPython.retn()); + AutoPyRef ob = PyObject_CallObject(_pyfuncUnser,args); + if (!ob) + { + std::stringstream msg; + msg << "Conversion with pickle of output ports failed !"; + msg << " : " << __FILE__ << ":" << __LINE__; + _errorDetails=msg.str(); + throw YACS::ENGINE::ConversionException(msg.str()); + } + UnlinkOnDestructorIfProxy(ob); + p->put( ob ); + } pos++; } - Py_DECREF(finalResult); } catch(ConversionException& ex) { - Py_DECREF(finalResult); _errorDetails=ex.what(); throw; } @@ -573,12 +722,57 @@ void PythonNode::executeRemote() DEBTRACE( "++++++++++++++ ENDOF PyNode::executeRemote: " << getName() << " ++++++++++++++++++++" ); } +void PythonNode::ExecuteLocalInternal(const std::string& codeStr, PyObject *context, std::string& errorDetails) +{ + DEBTRACE( code ); + DEBTRACE( "context refcnt: " << context->ob_refcnt ); + std::ostringstream stream; + stream << "/tmp/PythonNode_"; + stream << getpid(); + AutoPyRef code=Py_CompileString(codeStr.c_str(), stream.str().c_str(), Py_file_input); + if(code == NULL) + { + errorDetails=""; + AutoPyRef new_stderr = newPyStdOut(errorDetails); + PySys_SetObject((char*)"stderr", new_stderr); + PyErr_Print(); + PySys_SetObject((char*)"stderr", PySys_GetObject((char*)"__stderr__")); + throw YACS::Exception("Error during execution"); + } + { + AutoPyRef res = PyEval_EvalCode( code, context, context); + } + DEBTRACE( "context refcnt: " << context->ob_refcnt ); + fflush(stdout); + fflush(stderr); + if(PyErr_Occurred ()) + { + errorDetails=""; + AutoPyRef new_stderr = newPyStdOut(errorDetails); + PySys_SetObject((char*)"stderr", new_stderr); + ofstream errorfile(stream.str().c_str()); + if (errorfile.is_open()) + { + errorfile << codeStr; + errorfile.close(); + } + PyErr_Print(); + PySys_SetObject((char*)"stderr", PySys_GetObject((char*)"__stderr__")); + throw YACS::Exception("Error during execution"); + } +} + +void PythonNode::executeLocalInternal(const std::string& codeStr) +{ + ExecuteLocalInternal(codeStr,_context,_errorDetails); +} + void PythonNode::executeLocal() { DEBTRACE( "++++++++++++++ PyNode::executeLocal: " << getName() << " ++++++++++++++++++++" ); { AutoGIL agil; - + std::ostringstream unpxy; unpxy << "from SALOME_PyNode import UnProxyObjectSimpleLocal" << std::endl; DEBTRACE( "---------------PyNode::inputs---------------" ); list::iterator iter2; for(iter2 = _setOfInputPort.begin(); iter2 != _setOfInputPort.end(); iter2++) @@ -587,7 +781,8 @@ void PythonNode::executeLocal() DEBTRACE( "port name: " << p->getName() ); DEBTRACE( "port kind: " << p->edGetType()->kind() ); PyObject* ob=p->getPyObj(); - DEBTRACE( "ob refcnt: " << ob->ob_refcnt ); + DEBTRACE( "ob refcnt: " << ob->ob_refcnt ); + unpxy << p->getName() << " = UnProxyObjectSimpleLocal( " << p->getName() << " )" << std::endl; #ifdef _DEVDEBUG_ PyObject_Print(ob,stderr,Py_PRINT_RAW); cerr << endl; @@ -600,47 +795,11 @@ void PythonNode::executeLocal() //calculation DEBTRACE( "----------------PyNode::calculation---------------" ); - DEBTRACE( _script ); - DEBTRACE( "_context refcnt: " << _context->ob_refcnt ); + + if( ! getSqueezeStatus() ) + executeLocalInternal( unpxy.str() ); - std::ostringstream stream; - stream << "/tmp/PythonNode_"; - stream << getpid(); - - PyObject* code=Py_CompileString(_script.c_str(), stream.str().c_str(), Py_file_input); - if(code == NULL) - { - _errorDetails=""; - PyObject* new_stderr = newPyStdOut(_errorDetails); - PySys_SetObject((char*)"stderr", new_stderr); - PyErr_Print(); - PySys_SetObject((char*)"stderr", PySys_GetObject((char*)"__stderr__")); - Py_DECREF(new_stderr); - throw Exception("Error during execution"); - } - PyObject *res = PyEval_EvalCode( code, _context, _context); - - Py_DECREF(code); - Py_XDECREF(res); - DEBTRACE( "_context refcnt: " << _context->ob_refcnt ); - fflush(stdout); - fflush(stderr); - if(PyErr_Occurred ()) - { - _errorDetails=""; - PyObject* new_stderr = newPyStdOut(_errorDetails); - PySys_SetObject((char*)"stderr", new_stderr); - ofstream errorfile(stream.str().c_str()); - if (errorfile.is_open()) - { - errorfile << _script; - errorfile.close(); - } - PyErr_Print(); - PySys_SetObject((char*)"stderr", PySys_GetObject((char*)"__stderr__")); - Py_DECREF(new_stderr); - throw Exception("Error during execution"); - } + executeLocalInternal( _script ); DEBTRACE( "-----------------PyNode::outputs-----------------" ); list::iterator iter; @@ -665,6 +824,8 @@ void PythonNode::executeLocal() cerr << endl; #endif p->put(ob); + if(!isUsingPythonCache()) + PyDict_DelItemString(_context,p->getName().c_str()); } } catch(ConversionException& ex) @@ -675,10 +836,29 @@ void PythonNode::executeLocal() if(_autoSqueeze) squeezeMemory(); DEBTRACE( "-----------------End PyNode::outputs-----------------" ); + if(!isUsingPythonCache()) + { + for(iter2 = _setOfInputPort.begin(); iter2 != _setOfInputPort.end(); iter2++) + { + AutoPyRef pStr = PyUnicode_FromString( (*iter2)->getName().c_str() ); + if( PyDict_Contains(_context,pStr) == 1 ) + { PyDict_DelItem(_context,pStr); } + } + } } DEBTRACE( "++++++++++++++ End PyNode::execute: " << getName() << " ++++++++++++++++++++" ); } +/*! + * [EDF28562] + * \param in squeezeExceptions list on output port name excluded from the squeeze mecanism + */ +void PythonNode::setSqueezeStatusWithExceptions(bool sqStatus, const std::vector& squeezeExceptions) +{ + this->setSqueezeStatus(sqStatus); + this->_nonSqueezableOutputNodes = std::set(squeezeExceptions.begin(), squeezeExceptions.end()); +} + void PythonNode::squeezeMemorySafe() { AutoGIL agil; @@ -699,6 +879,8 @@ void PythonNode::squeezeMemory() } for(auto p : _setOfOutputPort) { + if (!this->_nonSqueezableOutputNodes.empty() && this->_nonSqueezableOutputNodes.find(p->getName()) != this->_nonSqueezableOutputNodes.end()) + continue; PyDict_DelItemString(_context,p->getName().c_str()); OutputPyPort *p2(static_cast(p)); p2->putWithoutForward(Py_None); @@ -715,6 +897,8 @@ void PythonNode::squeezeMemoryRemote() } for(auto p : _setOfOutputPort) { + if (!this->_nonSqueezableOutputNodes.empty() && this->_nonSqueezableOutputNodes.find(p->getName()) != this->_nonSqueezableOutputNodes.end()) + continue; OutputPyPort *p2(static_cast(p)); p2->putWithoutForward(Py_None); } @@ -1091,6 +1275,9 @@ void PyFuncNode::executeRemote() // Engines::pickledArgs_var serializationInputCorba(new Engines::pickledArgs);; { +#if PY_VERSION_HEX < 0x03070000 + std::unique_lock lock(data_mutex); +#endif AutoGIL agil; PyObject *ob(0); //=========================================================================== @@ -1140,6 +1327,60 @@ void PyFuncNode::executeRemote() _errorDetails=msg; throw Exception(msg); } + catch(CORBA::COMM_FAILURE& ex) + { + std::ostringstream msg; + msg << "Exception on remote python invocation." << std::endl ; + msg << "Caught system exception COMM_FAILURE -- unable to contact the " + << "object." << std::endl; + _errorDetails=msg.str(); + throw Exception(msg.str()); + } + catch(CORBA::SystemException& ex) + { + std::ostringstream msg; + msg << "Exception on remote python invocation." << std::endl ; + msg << "Caught a CORBA::SystemException." ; + CORBA::Any tmp; + tmp <<= ex; + CORBA::TypeCode_var tc = tmp.type(); + const char *p = tc->name(); + if ( *p != '\0' ) + msg <id(); + msg << std::endl; + _errorDetails=msg.str(); + throw Exception(msg.str()); + } + catch(CORBA::Exception& ex) + { + std::ostringstream msg; + msg << "Exception on remote python invocation." << std::endl ; + msg << "Caught CORBA::Exception. " ; + CORBA::Any tmp; + tmp <<= ex; + CORBA::TypeCode_var tc = tmp.type(); + const char *p = tc->name(); + if ( *p != '\0' ) + msg <id(); + msg << std::endl; + _errorDetails=msg.str(); + throw Exception(msg.str()); + } + catch(omniORB::fatalException& fe) + { + std::ostringstream msg; + msg << "Exception on remote python invocation." << std::endl ; + msg << "Caught omniORB::fatalException:" << std::endl; + msg << " file: " << fe.file() << std::endl; + msg << " line: " << fe.line() << std::endl; + msg << " mesg: " << fe.errmsg() << std::endl; + _errorDetails=msg.str(); + throw Exception(msg.str()); + } DEBTRACE( "-----------------end of remote python invocation-----------------" ); //=========================================================================== // Get results, unpickle and put them in output ports @@ -1150,6 +1391,9 @@ void PyFuncNode::executeRemote() resultCorbaC[i]=resultCorba[i]; { +#if PY_VERSION_HEX < 0x03070000 + std::unique_lock lock(data_mutex); +#endif AutoGIL agil; PyObject *resultPython(PyBytes_FromStringAndSize(resultCorbaC,resultCorba->length()));