From 673c82156c3b22d75821e93850342d81f37583be Mon Sep 17 00:00:00 2001 From: Anthony Geay Date: Thu, 29 Oct 2015 11:41:33 +0100 Subject: [PATCH] DPL scope implementation. --- src/engine/DynParaLoop.cxx | 28 +++++++ src/engine/DynParaLoop.hxx | 1 + src/engine/Executor.cxx | 10 ++- src/engine/Executor.hxx | 4 + src/engine/Node.cxx | 28 +++++++ src/engine/Node.hxx | 2 + src/runtime/PythonNode.cxx | 83 +++++++++++++++++++-- src/runtime/PythonNode.hxx | 6 ++ src/yacsloader_swig/Test/testSaveLoadRun.py | 73 +++++++++++++++++- 9 files changed, 223 insertions(+), 12 deletions(-) diff --git a/src/engine/DynParaLoop.cxx b/src/engine/DynParaLoop.cxx index 71a5fe368..44a612569 100644 --- a/src/engine/DynParaLoop.cxx +++ b/src/engine/DynParaLoop.cxx @@ -154,6 +154,34 @@ int DynParaLoop::getNumberOfOutputPorts() const return ComposedNode::getNumberOfOutputPorts()+1; } +/*! + * DynParaLoop creates at runtime (exupdateState) clone of nodes. One clone per branch. + * This method returns the id of the branch given the node \a node. + * If \a node is not a dynamically created node in \a this -1 is returned. + */ +int DynParaLoop::getBranchIDOfNode(Node *node) const +{ + if(_node) + { + for(std::vector::const_iterator it=_execNodes.begin();it!=_execNodes.end();it++) + if(node==*it) + return std::distance(_execNodes.begin(),it); + } + if(_finalizeNode) + { + for(std::vector::const_iterator it=_execFinalizeNodes.begin();it!=_execFinalizeNodes.end();it++) + if(node==*it) + return std::distance(_execFinalizeNodes.begin(),it); + } + if(_initNode) + { + for(std::vector::const_iterator it=_execInitNodes.begin();it!=_execInitNodes.end();it++) + if(node==*it) + return std::distance(_execInitNodes.begin(),it); + } + return -1; +} + std::list DynParaLoop::getSetOfOutputPort() const { list ret=ComposedNode::getSetOfOutputPort(); diff --git a/src/engine/DynParaLoop.hxx b/src/engine/DynParaLoop.hxx index eabcec736..86c9e8399 100644 --- a/src/engine/DynParaLoop.hxx +++ b/src/engine/DynParaLoop.hxx @@ -79,6 +79,7 @@ namespace YACS int getNumberOfInputPorts() const; int getNumberOfOutputPorts() const; unsigned getNumberOfEltsConsumed() const { return _nbOfEltConsumed; } + int getBranchIDOfNode(Node *node) const; std::list getSetOfOutputPort() const; std::list getLocalOutputPorts() const; OutputPort *edGetSamplePort() { return &_splittedPort; } diff --git a/src/engine/Executor.cxx b/src/engine/Executor.cxx index 456caf6d3..9127df025 100644 --- a/src/engine/Executor.cxx +++ b/src/engine/Executor.cxx @@ -75,7 +75,7 @@ using YACS::BASES::Semaphore; int Executor::_maxThreads(50); size_t Executor::_threadStackSize(1048576); // Default thread stack size is 1MB -Executor::Executor():_nbOfConcurrentThreads(0), _semForMaxThreads(_maxThreads),_keepGoingOnFail(false) +Executor::Executor():_nbOfConcurrentThreads(0), _semForMaxThreads(_maxThreads),_keepGoingOnFail(false),_DPLScopeSensitive(true) { _root=0; _toContinue = true; @@ -1148,6 +1148,14 @@ void *Executor::functionForTaskExecution(void *arg) // Execute task + if(execInst->getDPLScopeSensitive()) + { + Node *node(dynamic_cast(task)); + ComposedNode *gfn(dynamic_cast(sched)); + if(node!=0 && gfn!=0) + node->applyDPLScope(gfn); + } + YACS::Event ev=YACS::FINISH; try { diff --git a/src/engine/Executor.hxx b/src/engine/Executor.hxx index e1bcfd292..87eb1115c 100644 --- a/src/engine/Executor.hxx +++ b/src/engine/Executor.hxx @@ -86,6 +86,8 @@ namespace YACS std::ofstream _trace; std::string _dumpErrorFile; bool _keepGoingOnFail; + //! specifies if scope DynParaLoop is active or not. False by default. + bool _DPLScopeSensitive; #ifdef WIN32 DWORD _start; #else @@ -99,6 +101,8 @@ namespace YACS void RunB(Scheduler *graph,int debug=0, bool fromScratch=true); void setKeepGoingProperty(bool newVal) { _keepGoingOnFail=newVal; } bool getKeepGoingProperty() const { return _keepGoingOnFail; } + void setDPLScopeSensitive(bool newVal) { _DPLScopeSensitive=_DPLScopeSensitive; } + bool getDPLScopeSensitive() const { return _DPLScopeSensitive; } YACS::ExecutionMode getCurrentExecMode(); YACS::ExecutorState getExecutorState(); void setExecMode(YACS::ExecutionMode mode); diff --git a/src/engine/Node.cxx b/src/engine/Node.cxx index 774e670e0..4a61710a7 100644 --- a/src/engine/Node.cxx +++ b/src/engine/Node.cxx @@ -18,6 +18,7 @@ // #include "Node.hxx" +#include "DynParaLoop.hxx" #include "InputPort.hxx" #include "OutputPort.hxx" #include "InPropertyPort.hxx" @@ -621,6 +622,33 @@ void Node::setState(YACS::StatesForNode theState) sendEvent("status"); } +std::vector > Node::getDPLScopeInfo(ComposedNode *gfn) +{ + std::vector< std::pair > ret; + Node *work2(this); + ComposedNode *work(getFather()); + while(work!=gfn && work!=0) + { + DynParaLoop *workc(dynamic_cast(work)); + if(workc) + { + std::pair p(gfn->getChildName(workc),workc->getBranchIDOfNode(work2)); + ret.push_back(p); + } + work2=work; + work=work->getFather(); + } + return ret; +} + +/*! + * Method called by the Executor only if the executor is sensitive of scope of DynParaLoop. + * This method is virtual and empty because by default nothing is done. + */ +void Node::applyDPLScope(ComposedNode *gfn) +{ +} + //! emit notification to all observers registered with the dispatcher /*! * The dispatcher is unique and can be obtained by getDispatcher() diff --git a/src/engine/Node.hxx b/src/engine/Node.hxx index 92a632317..0a85d294b 100644 --- a/src/engine/Node.hxx +++ b/src/engine/Node.hxx @@ -174,6 +174,8 @@ namespace YACS virtual int getMaxLevelOfParallelism() const = 0; std::string getQualifiedName() const; int getNumId(); + std::vector > getDPLScopeInfo(ComposedNode *gfn); + virtual void applyDPLScope(ComposedNode *gfn); virtual void sendEvent(const std::string& event); static std::map idMap; virtual std::string typeName() { return "YACS__ENGINE__Node"; } diff --git a/src/runtime/PythonNode.cxx b/src/runtime/PythonNode.cxx index 3a448d3f6..acadec57e 100644 --- a/src/runtime/PythonNode.cxx +++ b/src/runtime/PythonNode.cxx @@ -48,6 +48,11 @@ typedef int Py_ssize_t; using namespace YACS::ENGINE; using namespace std; +const char PythonEntry::SCRIPT_FOR_SIMPLE_SERIALIZATION[]="import cPickle\n" + "def pickleForVarSimplePyth2009(val):\n" + " return cPickle.dumps(val,-1)\n" + "\n"; + const char PythonNode::IMPL_NAME[]="Python"; const char PythonNode::KIND[]="Python"; @@ -59,6 +64,10 @@ const char PythonNode::SCRIPT_FOR_SERIALIZATION[]="import cPickle\n" " args=cPickle.loads(st)\n" " return args\n"; +const char PythonNode::REMOTE_NAME[]="remote"; + +const char PythonNode::DPL_INFO_NAME[]="my_dpl_localization"; + const char PyFuncNode::SCRIPT_FOR_SERIALIZATION[]="import cPickle\n" "def pickleForDistPyth2009(*args,**kws):\n" " return cPickle.dumps((args,kws),-1)\n" @@ -67,7 +76,8 @@ const char PyFuncNode::SCRIPT_FOR_SERIALIZATION[]="import cPickle\n" " args=cPickle.loads(st)\n" " return args\n"; -PythonEntry::PythonEntry():_context(0),_pyfuncSer(0),_pyfuncUnser(0) + +PythonEntry::PythonEntry():_context(0),_pyfuncSer(0),_pyfuncUnser(0),_pyfuncSimpleSer(0) { } @@ -171,7 +181,8 @@ void PythonEntry::commonRemoteLoadPart3(InlineNode *reqNode, Engines::Container_ AutoGIL agil; const char *picklizeScript(getSerializationScript()); PyObject *res=PyRun_String(picklizeScript,Py_file_input,_context,_context); - if(res == NULL) + PyObject *res2(PyRun_String(SCRIPT_FOR_SIMPLE_SERIALIZATION,Py_file_input,_context,_context)); + if(res == NULL || res2==NULL) { std::string errorDetails; PyObject* new_stderr = newPyStdOut(errorDetails); @@ -182,9 +193,10 @@ void PythonEntry::commonRemoteLoadPart3(InlineNode *reqNode, Engines::Container_ Py_DECREF(new_stderr); throw Exception("Error during load"); } - Py_DECREF(res); + Py_DECREF(res); Py_DECREF(res2); _pyfuncSer=PyDict_GetItemString(_context,"pickleForDistPyth2009"); _pyfuncUnser=PyDict_GetItemString(_context,"unPickleForDistPyth2009"); + _pyfuncSimpleSer=PyDict_GetItemString(_context,"pickleForVarSimplePyth2009"); if(_pyfuncSer == NULL) { std::string errorDetails; @@ -207,6 +219,17 @@ void PythonEntry::commonRemoteLoadPart3(InlineNode *reqNode, Engines::Container_ Py_DECREF(new_stderr); throw Exception("Error during load"); } + if(_pyfuncSimpleSer == NULL) + { + std::string errorDetails; + PyObject *new_stderr(newPyStdOut(errorDetails)); + reqNode->setErrorDetails(errorDetails); + PySys_SetObject((char*)"stderr", new_stderr); + PyErr_Print(); + PySys_SetObject((char*)"stderr", PySys_GetObject((char*)"__stderr__")); + Py_DECREF(new_stderr); + throw Exception("Error during load"); + } } if(isInitializeRequested) {//This one is called only once at initialization in the container if an init-script is specified. @@ -337,7 +360,7 @@ void PythonNode::checkBasicConsistency() const throw(YACS::Exception) void PythonNode::load() { DEBTRACE( "---------------PyNode::load function---------------" ); - if(_mode=="remote") + if(_mode==PythonNode::REMOTE_NAME) loadRemote(); else loadLocal(); @@ -356,7 +379,7 @@ void PythonNode::loadRemote() void PythonNode::execute() { - if(_mode=="remote") + if(_mode==PythonNode::REMOTE_NAME) executeRemote(); else executeLocal(); @@ -699,6 +722,52 @@ PythonNode* PythonNode::cloneNode(const std::string& name) return n; } +void PythonNode::applyDPLScope(ComposedNode *gfn) +{ + std::vector< std::pair > ret(getDPLScopeInfo(gfn)); + if(ret.empty()) + return ; + // + PyObject *ob(0); + { + AutoGIL agil; + std::size_t sz(ret.size()); + ob=PyList_New(sz); + for(std::size_t i=0;i& p(ret[i]); + PyObject *elt(PyTuple_New(2)); + PyTuple_SetItem(elt,0,PyString_FromString(p.first.c_str())); + PyTuple_SetItem(elt,1,PyLong_FromLong(p.second)); + PyList_SetItem(ob,i,elt); + } + } + if(_mode==REMOTE_NAME) + { + Engines::pickledArgs_var serializationInputCorba(new Engines::pickledArgs); + { + AutoGIL agil; + PyObject *serializationInput(PyObject_CallFunctionObjArgs(_pyfuncSimpleSer,ob,NULL)); + Py_XDECREF(ob); + char *serializationInputC(0); + Py_ssize_t len; + if (PyString_AsStringAndSize(serializationInput, &serializationInputC, &len)) + throw Exception("DistributedPythonNode problem in python pickle"); + serializationInputCorba->length(len); + for(int i=0; i < len ; i++) + serializationInputCorba[i]=serializationInputC[i]; + Py_XDECREF(serializationInput); + } + _pynode->defineNewCustomVar(DPL_INFO_NAME,serializationInputCorba); + } + else + { + AutoGIL agil; + PyDict_SetItemString(_context,DPL_INFO_NAME,ob); + Py_XDECREF(ob); + } +} + PyFuncNode::PyFuncNode(const PyFuncNode& other, ComposedNode *father):InlineFuncNode(other,father),_pyfunc(0) { _implementation = PythonNode::IMPL_NAME; @@ -769,7 +838,7 @@ void PyFuncNode::checkBasicConsistency() const throw(YACS::Exception) void PyFuncNode::load() { DEBTRACE( "---------------PyfuncNode::load function---------------" ); - if(_mode=="remote") + if(_mode==PythonNode::REMOTE_NAME) loadRemote(); else loadLocal(); @@ -854,7 +923,7 @@ void PyFuncNode::loadLocal() void PyFuncNode::execute() { - if(_mode=="remote") + if(_mode==PythonNode::REMOTE_NAME) executeRemote(); else executeLocal(); diff --git a/src/runtime/PythonNode.hxx b/src/runtime/PythonNode.hxx index d300760aa..3f31caebc 100644 --- a/src/runtime/PythonNode.hxx +++ b/src/runtime/PythonNode.hxx @@ -54,6 +54,9 @@ namespace YACS PyObject *_context; PyObject *_pyfuncSer; PyObject *_pyfuncUnser; + PyObject *_pyfuncSimpleSer; + public: + static const char SCRIPT_FOR_SIMPLE_SERIALIZATION[]; }; class YACSRUNTIMESALOME_EXPORT PythonNode : public InlineNode, public PythonEntry @@ -81,10 +84,13 @@ namespace YACS std::string getContainerLog(); PythonNode* cloneNode(const std::string& name); virtual std::string typeName() { return "YACS__ENGINE__PythonNode"; } + void applyDPLScope(ComposedNode *gfn); public: static const char KIND[]; static const char IMPL_NAME[]; static const char SCRIPT_FOR_SERIALIZATION[]; + static const char REMOTE_NAME[]; + static const char DPL_INFO_NAME[]; protected: Engines::PyScriptNode_var _pynode; }; diff --git a/src/yacsloader_swig/Test/testSaveLoadRun.py b/src/yacsloader_swig/Test/testSaveLoadRun.py index 84001a792..460f6c208 100755 --- a/src/yacsloader_swig/Test/testSaveLoadRun.py +++ b/src/yacsloader_swig/Test/testSaveLoadRun.py @@ -1302,10 +1302,8 @@ for i in i8: pass def test18(self): - SALOMERuntime.RuntimeSALOME_setRuntime() - r=pilot.getRuntime() - p=r.createProc("prTest18") - n00=r.createScriptNode("Salome","n00") + p=self.r.createProc("prTest18") + n00=self.r.createScriptNode("Salome","n00") self.assertEqual(n00.getMaxLevelOfParallelism(),1) n00.setExecutionMode("remote") self.assertEqual(n00.getMaxLevelOfParallelism(),1) @@ -1318,6 +1316,73 @@ for i in i8: self.assertEqual(n00.getMaxLevelOfParallelism(),7) # <- here pass + def test19(self): + """This test checks the mechanism of YACS that allow PythonNodes to know their DynParaLoop context.""" + fname="test19.xml" + r=SALOMERuntime.getSALOMERuntime() + l=loader.YACSLoader() + # + p=self.r.createProc("PROC") + ti=p.createType("int","int") + tdi=p.createSequenceTc("seqint","seqint",ti) + # Level0 + fe0=self.r.createForEachLoop("FE0",ti) ; p.edAddChild(fe0) + fe0.edGetNbOfBranchesPort().edInitInt(4) + fe0_end=self.r.createScriptNode("Salome","fe0_end") + fe0.edSetFinalizeNode(fe0_end) + fe0_end.setScript("""assert([elt[0] for elt in my_dpl_localization]==["FE0"]) +assert(my_dpl_localization[0][1]>=0 and my_dpl_localization[0][1]<4)""") + n0=self.r.createScriptNode("Salome","n0") ; p.edAddChild(n0) + n0.setScript("o1=range(10)") + a=n0.edAddOutputPort("o1",tdi) + p.edAddLink(a,fe0.edGetSeqOfSamplesPort()) ; p.edAddCFLink(n0,fe0) + # Level1 + b0=self.r.createBloc("b0") ; fe0.edAddChild(b0) + n1=self.r.createScriptNode("Salome","n1") ; b0.edAddChild(n1) + n1.setScript("""assert([elt[0] for elt in my_dpl_localization]==["FE0"]) +assert(my_dpl_localization[0][1]>=0 and my_dpl_localization[0][1]<4) +o1=range(10)""") + b=n1.edAddOutputPort("o1",tdi) + fe1=self.r.createForEachLoop("FE1",ti) ; b0.edAddChild(fe1) + fe1.edGetNbOfBranchesPort().edInitInt(3) + fe1_end=self.r.createScriptNode("Salome","fe1_end") + fe1_end.setScript("""assert([elt[0] for elt in my_dpl_localization]==["FE0.b0.FE1","FE0"]) +assert(my_dpl_localization[1][1]>=0 and my_dpl_localization[1][1]<4) +assert(my_dpl_localization[0][1]>=0 and my_dpl_localization[0][1]<3) +""") + fe1.edSetFinalizeNode(fe1_end) + p.edAddLink(b,fe1.edGetSeqOfSamplesPort()) ; p.edAddCFLink(n1,fe1) + # Level2 + n2=self.r.createScriptNode("Salome","n2") ; fe1.edAddChild(n2) + n2.setScript("""assert([elt[0] for elt in my_dpl_localization]==["FE0.b0.FE1","FE0"]) +assert(my_dpl_localization[1][1]>=0 and my_dpl_localization[1][1]<4) +assert(my_dpl_localization[0][1]>=0 and my_dpl_localization[0][1]<3) +""") + + p.saveSchema(fname) + ex=pilot.ExecutorSwig() + + # local run of PythonNodes n1 and n2 + p=l.load(fname) + p.init() + self.assertEqual(p.getState(),pilot.READY) + ex.setDPLScopeSensitive(True) # <- this line is the aim of the test + ex.RunW(p,0) + self.assertEqual(p.getState(),pilot.DONE) + + # run remote + p=l.load(fname) + cont=p.createContainer("gg","HPSalome") + cont.setSizeOfPool(2) + n1=p.getChildByName("FE0.b0.n1") ; n1.setExecutionMode("remote") ; n1.setContainer(cont) + n2=p.getChildByName("FE0.b0.FE1.n2") ; n2.setExecutionMode("remote") ; n2.setContainer(cont) + + p.init() + self.assertEqual(p.getState(),pilot.READY) + ex.RunW(p,0) + self.assertEqual(p.getState(),pilot.DONE) + pass + pass if __name__ == '__main__': -- 2.30.2