Salome HOME
DPL scope implementation.
authorAnthony Geay <anthony.geay@edf.fr>
Thu, 29 Oct 2015 10:41:33 +0000 (11:41 +0100)
committerAnthony Geay <anthony.geay@edf.fr>
Thu, 29 Oct 2015 10:41:33 +0000 (11:41 +0100)
src/engine/DynParaLoop.cxx
src/engine/DynParaLoop.hxx
src/engine/Executor.cxx
src/engine/Executor.hxx
src/engine/Node.cxx
src/engine/Node.hxx
src/runtime/PythonNode.cxx
src/runtime/PythonNode.hxx
src/yacsloader_swig/Test/testSaveLoadRun.py

index 71a5fe368fd348bb45f22c7cdbfd28e833d4f616..44a612569e36eb84cdb0977c06190b4319a910ce 100644 (file)
@@ -154,6 +154,34 @@ int DynParaLoop::getNumberOfOutputPorts() const
   return ComposedNode::getNumberOfOutputPorts()+1;
 }
 
+/*!
+ * DynParaLoop creates at runtime (exupdateState) clone of nodes. One clone per branch.
+ * This method returns the id of the branch given the node \a node.
+ * If \a node is not a dynamically created node in \a this -1 is returned.
+ */
+int DynParaLoop::getBranchIDOfNode(Node *node) const
+{
+  if(_node)
+    {
+      for(std::vector<Node *>::const_iterator it=_execNodes.begin();it!=_execNodes.end();it++)
+        if(node==*it)
+          return std::distance(_execNodes.begin(),it);
+    }
+  if(_finalizeNode)
+    {
+      for(std::vector<Node *>::const_iterator it=_execFinalizeNodes.begin();it!=_execFinalizeNodes.end();it++)
+        if(node==*it)
+          return std::distance(_execFinalizeNodes.begin(),it);
+    }
+  if(_initNode)
+    {
+      for(std::vector<Node *>::const_iterator it=_execInitNodes.begin();it!=_execInitNodes.end();it++)
+        if(node==*it)
+          return std::distance(_execInitNodes.begin(),it);
+    }
+  return -1;
+}
+
 std::list<OutputPort *> DynParaLoop::getSetOfOutputPort() const
 {
   list<OutputPort *> ret=ComposedNode::getSetOfOutputPort();
index eabcec73648023bde95f44b09a77d9e0c36cf4e4..86c9e839909064888e4136b7fbe6a5aabee9231b 100644 (file)
@@ -79,6 +79,7 @@ namespace YACS
       int getNumberOfInputPorts() const;
       int getNumberOfOutputPorts() const;
       unsigned getNumberOfEltsConsumed() const { return _nbOfEltConsumed; }
+      int getBranchIDOfNode(Node *node) const;
       std::list<OutputPort *> getSetOfOutputPort() const;
       std::list<OutputPort *> getLocalOutputPorts() const;
       OutputPort *edGetSamplePort() { return &_splittedPort; }
index 456caf6d3eb52e59d65a93c42c77c63a1f124446..9127df025b50ac007ba9ecdc4c34e90cf8e12a3d 100644 (file)
@@ -75,7 +75,7 @@ using YACS::BASES::Semaphore;
 int Executor::_maxThreads(50);
 size_t Executor::_threadStackSize(1048576); // Default thread stack size is 1MB
 
-Executor::Executor():_nbOfConcurrentThreads(0), _semForMaxThreads(_maxThreads),_keepGoingOnFail(false)
+Executor::Executor():_nbOfConcurrentThreads(0), _semForMaxThreads(_maxThreads),_keepGoingOnFail(false),_DPLScopeSensitive(true)
 {
   _root=0;
   _toContinue = true;
@@ -1148,6 +1148,14 @@ void *Executor::functionForTaskExecution(void *arg)
 
   // Execute task
 
+  if(execInst->getDPLScopeSensitive())
+    {
+      Node *node(dynamic_cast<Node *>(task));
+      ComposedNode *gfn(dynamic_cast<ComposedNode *>(sched));
+      if(node!=0 && gfn!=0)
+        node->applyDPLScope(gfn);
+    }
+
   YACS::Event ev=YACS::FINISH;
   try
     {
index e1bcfd2925af68f77e46d8cefeb592107d0b56f6..87eb1115cd6d246635cd4ba290e28a9424cb0f71 100644 (file)
@@ -86,6 +86,8 @@ namespace YACS
       std::ofstream _trace;
       std::string _dumpErrorFile;
       bool _keepGoingOnFail;
+      //! specifies if scope DynParaLoop is active or not. False by default.
+      bool _DPLScopeSensitive;
 #ifdef WIN32
          DWORD _start;
 #else
@@ -99,6 +101,8 @@ namespace YACS
       void RunB(Scheduler *graph,int debug=0, bool fromScratch=true);
       void setKeepGoingProperty(bool newVal) { _keepGoingOnFail=newVal; }
       bool getKeepGoingProperty() const { return _keepGoingOnFail; }
+      void setDPLScopeSensitive(bool newVal) { _DPLScopeSensitive=_DPLScopeSensitive; }
+      bool getDPLScopeSensitive() const { return _DPLScopeSensitive; }
       YACS::ExecutionMode getCurrentExecMode();
       YACS::ExecutorState getExecutorState();
       void setExecMode(YACS::ExecutionMode mode);
index 774e670e07f5ae76146b0f3763c355adfe3852d5..4a61710a743e03960477d5fc4538d8eba10b320c 100644 (file)
@@ -18,6 +18,7 @@
 //
 
 #include "Node.hxx"
+#include "DynParaLoop.hxx"
 #include "InputPort.hxx"
 #include "OutputPort.hxx"
 #include "InPropertyPort.hxx"
@@ -621,6 +622,33 @@ void Node::setState(YACS::StatesForNode theState)
   sendEvent("status");
 }
 
+std::vector<std::pair<std::string,int> > Node::getDPLScopeInfo(ComposedNode *gfn)
+{ 
+  std::vector< std::pair<std::string,int> > ret;
+  Node *work2(this);
+  ComposedNode *work(getFather());
+  while(work!=gfn && work!=0)
+    {
+      DynParaLoop *workc(dynamic_cast<DynParaLoop *>(work));
+      if(workc)
+        {
+          std::pair<std::string,int> p(gfn->getChildName(workc),workc->getBranchIDOfNode(work2));
+          ret.push_back(p);
+        }
+      work2=work;
+      work=work->getFather();
+    }
+  return ret;
+}
+
+/*!
+ * Method called by the Executor only if the executor is sensitive of scope of DynParaLoop.
+ * This method is virtual and empty because by default nothing is done.
+ */
+void Node::applyDPLScope(ComposedNode *gfn)
+{
+}
+
 //! emit notification to all observers registered with  the dispatcher 
 /*!
  * The dispatcher is unique and can be obtained by getDispatcher()
index 92a6323170616e1e59f3a7c80ba03dac0cf61da9..0a85d294b4eaa0a49690e2c2d0e7b219462fed2b 100644 (file)
@@ -174,6 +174,8 @@ namespace YACS
       virtual int getMaxLevelOfParallelism() const = 0;
       std::string getQualifiedName() const;
       int getNumId();
+      std::vector<std::pair<std::string,int> > getDPLScopeInfo(ComposedNode *gfn);
+      virtual void applyDPLScope(ComposedNode *gfn);
       virtual void sendEvent(const std::string& event);
       static std::map<int,Node *> idMap;
       virtual std::string typeName() { return "YACS__ENGINE__Node"; }
index 3a448d3f6f74e27f4b1f5fd11b4a5abe20c608bc..acadec57ee9efccb5dd2744ad336c99951948328 100644 (file)
@@ -48,6 +48,11 @@ typedef int Py_ssize_t;
 using namespace YACS::ENGINE;
 using namespace std;
 
+const char PythonEntry::SCRIPT_FOR_SIMPLE_SERIALIZATION[]="import cPickle\n"
+    "def pickleForVarSimplePyth2009(val):\n"
+    "  return cPickle.dumps(val,-1)\n"
+    "\n";
+
 const char PythonNode::IMPL_NAME[]="Python";
 const char PythonNode::KIND[]="Python";
 
@@ -59,6 +64,10 @@ const char PythonNode::SCRIPT_FOR_SERIALIZATION[]="import cPickle\n"
     "  args=cPickle.loads(st)\n"
     "  return args\n";
 
+const char PythonNode::REMOTE_NAME[]="remote";
+
+const char PythonNode::DPL_INFO_NAME[]="my_dpl_localization";
+
 const char PyFuncNode::SCRIPT_FOR_SERIALIZATION[]="import cPickle\n"
     "def pickleForDistPyth2009(*args,**kws):\n"
     "  return cPickle.dumps((args,kws),-1)\n"
@@ -67,7 +76,8 @@ const char PyFuncNode::SCRIPT_FOR_SERIALIZATION[]="import cPickle\n"
     "  args=cPickle.loads(st)\n"
     "  return args\n";
 
-PythonEntry::PythonEntry():_context(0),_pyfuncSer(0),_pyfuncUnser(0)
+
+PythonEntry::PythonEntry():_context(0),_pyfuncSer(0),_pyfuncUnser(0),_pyfuncSimpleSer(0)
 {
 }
 
@@ -171,7 +181,8 @@ void PythonEntry::commonRemoteLoadPart3(InlineNode *reqNode, Engines::Container_
     AutoGIL agil;
     const char *picklizeScript(getSerializationScript());
     PyObject *res=PyRun_String(picklizeScript,Py_file_input,_context,_context);
-    if(res == NULL)
+    PyObject *res2(PyRun_String(SCRIPT_FOR_SIMPLE_SERIALIZATION,Py_file_input,_context,_context));
+    if(res == NULL || res2==NULL)
       {
         std::string errorDetails;
         PyObject* new_stderr = newPyStdOut(errorDetails);
@@ -182,9 +193,10 @@ void PythonEntry::commonRemoteLoadPart3(InlineNode *reqNode, Engines::Container_
         Py_DECREF(new_stderr);
         throw Exception("Error during load");
       }
-    Py_DECREF(res);
+    Py_DECREF(res); Py_DECREF(res2);
     _pyfuncSer=PyDict_GetItemString(_context,"pickleForDistPyth2009");
     _pyfuncUnser=PyDict_GetItemString(_context,"unPickleForDistPyth2009");
+    _pyfuncSimpleSer=PyDict_GetItemString(_context,"pickleForVarSimplePyth2009");
     if(_pyfuncSer == NULL)
       {
         std::string errorDetails;
@@ -207,6 +219,17 @@ void PythonEntry::commonRemoteLoadPart3(InlineNode *reqNode, Engines::Container_
         Py_DECREF(new_stderr);
         throw Exception("Error during load");
       }
+    if(_pyfuncSimpleSer == NULL)
+      {
+        std::string errorDetails;
+        PyObject *new_stderr(newPyStdOut(errorDetails));
+        reqNode->setErrorDetails(errorDetails);
+        PySys_SetObject((char*)"stderr", new_stderr);
+        PyErr_Print();
+        PySys_SetObject((char*)"stderr", PySys_GetObject((char*)"__stderr__"));
+        Py_DECREF(new_stderr);
+        throw Exception("Error during load");
+      }
   }
   if(isInitializeRequested)
     {//This one is called only once at initialization in the container if an init-script is specified.
@@ -337,7 +360,7 @@ void PythonNode::checkBasicConsistency() const throw(YACS::Exception)
 void PythonNode::load()
 {
   DEBTRACE( "---------------PyNode::load function---------------" );
-  if(_mode=="remote")
+  if(_mode==PythonNode::REMOTE_NAME)
     loadRemote();
   else
     loadLocal();
@@ -356,7 +379,7 @@ void PythonNode::loadRemote()
 
 void PythonNode::execute()
 {
-  if(_mode=="remote")
+  if(_mode==PythonNode::REMOTE_NAME)
     executeRemote();
   else
     executeLocal();
@@ -699,6 +722,52 @@ PythonNode* PythonNode::cloneNode(const std::string& name)
   return n;
 }
 
+void PythonNode::applyDPLScope(ComposedNode *gfn)
+{
+  std::vector< std::pair<std::string,int> > ret(getDPLScopeInfo(gfn));
+  if(ret.empty())
+    return ;
+  //
+  PyObject *ob(0);
+  {
+    AutoGIL agil;
+    std::size_t sz(ret.size());
+    ob=PyList_New(sz);
+    for(std::size_t i=0;i<sz;i++)
+      {
+        const std::pair<std::string,int>& p(ret[i]);
+        PyObject *elt(PyTuple_New(2));
+        PyTuple_SetItem(elt,0,PyString_FromString(p.first.c_str()));
+        PyTuple_SetItem(elt,1,PyLong_FromLong(p.second));
+        PyList_SetItem(ob,i,elt);
+      }
+  }
+  if(_mode==REMOTE_NAME)
+    {
+      Engines::pickledArgs_var serializationInputCorba(new Engines::pickledArgs);
+      {
+        AutoGIL agil;
+        PyObject *serializationInput(PyObject_CallFunctionObjArgs(_pyfuncSimpleSer,ob,NULL));
+        Py_XDECREF(ob);
+        char *serializationInputC(0);
+        Py_ssize_t len;
+        if (PyString_AsStringAndSize(serializationInput, &serializationInputC, &len))
+          throw Exception("DistributedPythonNode problem in python pickle");
+        serializationInputCorba->length(len);
+        for(int i=0; i < len ; i++)
+          serializationInputCorba[i]=serializationInputC[i];
+        Py_XDECREF(serializationInput);
+      }
+      _pynode->defineNewCustomVar(DPL_INFO_NAME,serializationInputCorba);
+    }
+  else
+    {
+      AutoGIL agil;
+      PyDict_SetItemString(_context,DPL_INFO_NAME,ob);
+      Py_XDECREF(ob);
+    }
+}
+
 PyFuncNode::PyFuncNode(const PyFuncNode& other, ComposedNode *father):InlineFuncNode(other,father),_pyfunc(0)
 {
   _implementation = PythonNode::IMPL_NAME;
@@ -769,7 +838,7 @@ void PyFuncNode::checkBasicConsistency() const throw(YACS::Exception)
 void PyFuncNode::load()
 {
   DEBTRACE( "---------------PyfuncNode::load function---------------" );
-  if(_mode=="remote")
+  if(_mode==PythonNode::REMOTE_NAME)
     loadRemote();
   else
     loadLocal();
@@ -854,7 +923,7 @@ void PyFuncNode::loadLocal()
 
 void PyFuncNode::execute()
 {
-  if(_mode=="remote")
+  if(_mode==PythonNode::REMOTE_NAME)
     executeRemote();
   else
     executeLocal();
index d300760aad8de85b2706e5b7ad548b3d4aa148c8..3f31caebc481dbb08ab3621b9a5d21d78b1f80a3 100644 (file)
@@ -54,6 +54,9 @@ namespace YACS
       PyObject *_context;
       PyObject *_pyfuncSer;
       PyObject *_pyfuncUnser;
+      PyObject *_pyfuncSimpleSer;
+    public:
+      static const char SCRIPT_FOR_SIMPLE_SERIALIZATION[];
     };
 
     class YACSRUNTIMESALOME_EXPORT PythonNode : public InlineNode, public PythonEntry
@@ -81,10 +84,13 @@ namespace YACS
       std::string getContainerLog();
       PythonNode* cloneNode(const std::string& name);
       virtual std::string typeName() { return "YACS__ENGINE__PythonNode"; }
+      void applyDPLScope(ComposedNode *gfn);
     public:
       static const char KIND[];
       static const char IMPL_NAME[];
       static const char SCRIPT_FOR_SERIALIZATION[];
+      static const char REMOTE_NAME[];
+      static const char DPL_INFO_NAME[];
     protected:
       Engines::PyScriptNode_var _pynode;
     };
index 84001a792d064430ec3a61e7864e2c36bd5f6f64..460f6c208a3884e85710d1e9f1b16a8bd7c0d46e 100755 (executable)
@@ -1302,10 +1302,8 @@ for i in i8:
     pass
 
   def test18(self):
-    SALOMERuntime.RuntimeSALOME_setRuntime()
-    r=pilot.getRuntime()
-    p=r.createProc("prTest18")
-    n00=r.createScriptNode("Salome","n00")
+    p=self.r.createProc("prTest18")
+    n00=self.r.createScriptNode("Salome","n00")
     self.assertEqual(n00.getMaxLevelOfParallelism(),1)
     n00.setExecutionMode("remote")
     self.assertEqual(n00.getMaxLevelOfParallelism(),1)
@@ -1318,6 +1316,73 @@ for i in i8:
     self.assertEqual(n00.getMaxLevelOfParallelism(),7) # <- here
     pass
     
+  def test19(self):
+    """This test checks the mechanism of YACS that allow PythonNodes to know their DynParaLoop context."""
+    fname="test19.xml"
+    r=SALOMERuntime.getSALOMERuntime()
+    l=loader.YACSLoader()
+    #
+    p=self.r.createProc("PROC")
+    ti=p.createType("int","int")
+    tdi=p.createSequenceTc("seqint","seqint",ti)
+    # Level0
+    fe0=self.r.createForEachLoop("FE0",ti) ; p.edAddChild(fe0)
+    fe0.edGetNbOfBranchesPort().edInitInt(4)
+    fe0_end=self.r.createScriptNode("Salome","fe0_end")
+    fe0.edSetFinalizeNode(fe0_end)
+    fe0_end.setScript("""assert([elt[0] for elt in my_dpl_localization]==["FE0"])
+assert(my_dpl_localization[0][1]>=0 and my_dpl_localization[0][1]<4)""")
+    n0=self.r.createScriptNode("Salome","n0") ; p.edAddChild(n0)
+    n0.setScript("o1=range(10)")
+    a=n0.edAddOutputPort("o1",tdi)
+    p.edAddLink(a,fe0.edGetSeqOfSamplesPort()) ; p.edAddCFLink(n0,fe0)
+    # Level1
+    b0=self.r.createBloc("b0") ; fe0.edAddChild(b0)
+    n1=self.r.createScriptNode("Salome","n1") ; b0.edAddChild(n1)
+    n1.setScript("""assert([elt[0] for elt in my_dpl_localization]==["FE0"])
+assert(my_dpl_localization[0][1]>=0 and my_dpl_localization[0][1]<4)
+o1=range(10)""")
+    b=n1.edAddOutputPort("o1",tdi)
+    fe1=self.r.createForEachLoop("FE1",ti) ; b0.edAddChild(fe1)
+    fe1.edGetNbOfBranchesPort().edInitInt(3)
+    fe1_end=self.r.createScriptNode("Salome","fe1_end")
+    fe1_end.setScript("""assert([elt[0] for elt in my_dpl_localization]==["FE0.b0.FE1","FE0"])
+assert(my_dpl_localization[1][1]>=0 and my_dpl_localization[1][1]<4)
+assert(my_dpl_localization[0][1]>=0 and my_dpl_localization[0][1]<3)
+""")
+    fe1.edSetFinalizeNode(fe1_end)
+    p.edAddLink(b,fe1.edGetSeqOfSamplesPort()) ; p.edAddCFLink(n1,fe1)
+    # Level2
+    n2=self.r.createScriptNode("Salome","n2") ; fe1.edAddChild(n2)
+    n2.setScript("""assert([elt[0] for elt in my_dpl_localization]==["FE0.b0.FE1","FE0"])
+assert(my_dpl_localization[1][1]>=0 and my_dpl_localization[1][1]<4)
+assert(my_dpl_localization[0][1]>=0 and my_dpl_localization[0][1]<3)
+""")
+    
+    p.saveSchema(fname)
+    ex=pilot.ExecutorSwig()
+    
+    # local run of PythonNodes n1 and n2
+    p=l.load(fname)
+    p.init()
+    self.assertEqual(p.getState(),pilot.READY)
+    ex.setDPLScopeSensitive(True) # <- this line is the aim of the test
+    ex.RunW(p,0)
+    self.assertEqual(p.getState(),pilot.DONE)
+
+    # run remote
+    p=l.load(fname)
+    cont=p.createContainer("gg","HPSalome")
+    cont.setSizeOfPool(2)
+    n1=p.getChildByName("FE0.b0.n1") ; n1.setExecutionMode("remote") ; n1.setContainer(cont)
+    n2=p.getChildByName("FE0.b0.FE1.n2") ; n2.setExecutionMode("remote") ; n2.setContainer(cont)
+    
+    p.init()
+    self.assertEqual(p.getState(),pilot.READY)
+    ex.RunW(p,0)
+    self.assertEqual(p.getState(),pilot.DONE)
+    pass
+
   pass
 
 if __name__ == '__main__':