return ComposedNode::getNumberOfOutputPorts()+1;
}
+/*!
+ * DynParaLoop creates at runtime (exupdateState) clone of nodes. One clone per branch.
+ * This method returns the id of the branch given the node \a node.
+ * If \a node is not a dynamically created node in \a this -1 is returned.
+ */
+int DynParaLoop::getBranchIDOfNode(Node *node) const
+{
+ if(_node)
+ {
+ for(std::vector<Node *>::const_iterator it=_execNodes.begin();it!=_execNodes.end();it++)
+ if(node==*it)
+ return std::distance(_execNodes.begin(),it);
+ }
+ if(_finalizeNode)
+ {
+ for(std::vector<Node *>::const_iterator it=_execFinalizeNodes.begin();it!=_execFinalizeNodes.end();it++)
+ if(node==*it)
+ return std::distance(_execFinalizeNodes.begin(),it);
+ }
+ if(_initNode)
+ {
+ for(std::vector<Node *>::const_iterator it=_execInitNodes.begin();it!=_execInitNodes.end();it++)
+ if(node==*it)
+ return std::distance(_execInitNodes.begin(),it);
+ }
+ return -1;
+}
+
std::list<OutputPort *> DynParaLoop::getSetOfOutputPort() const
{
list<OutputPort *> ret=ComposedNode::getSetOfOutputPort();
int getNumberOfInputPorts() const;
int getNumberOfOutputPorts() const;
unsigned getNumberOfEltsConsumed() const { return _nbOfEltConsumed; }
+ int getBranchIDOfNode(Node *node) const;
std::list<OutputPort *> getSetOfOutputPort() const;
std::list<OutputPort *> getLocalOutputPorts() const;
OutputPort *edGetSamplePort() { return &_splittedPort; }
int Executor::_maxThreads(50);
size_t Executor::_threadStackSize(1048576); // Default thread stack size is 1MB
-Executor::Executor():_nbOfConcurrentThreads(0), _semForMaxThreads(_maxThreads),_keepGoingOnFail(false)
+Executor::Executor():_nbOfConcurrentThreads(0), _semForMaxThreads(_maxThreads),_keepGoingOnFail(false),_DPLScopeSensitive(true)
{
_root=0;
_toContinue = true;
// Execute task
+ if(execInst->getDPLScopeSensitive())
+ {
+ Node *node(dynamic_cast<Node *>(task));
+ ComposedNode *gfn(dynamic_cast<ComposedNode *>(sched));
+ if(node!=0 && gfn!=0)
+ node->applyDPLScope(gfn);
+ }
+
YACS::Event ev=YACS::FINISH;
try
{
std::ofstream _trace;
std::string _dumpErrorFile;
bool _keepGoingOnFail;
+ //! specifies if scope DynParaLoop is active or not. False by default.
+ bool _DPLScopeSensitive;
#ifdef WIN32
DWORD _start;
#else
void RunB(Scheduler *graph,int debug=0, bool fromScratch=true);
void setKeepGoingProperty(bool newVal) { _keepGoingOnFail=newVal; }
bool getKeepGoingProperty() const { return _keepGoingOnFail; }
+ void setDPLScopeSensitive(bool newVal) { _DPLScopeSensitive=_DPLScopeSensitive; }
+ bool getDPLScopeSensitive() const { return _DPLScopeSensitive; }
YACS::ExecutionMode getCurrentExecMode();
YACS::ExecutorState getExecutorState();
void setExecMode(YACS::ExecutionMode mode);
//
#include "Node.hxx"
+#include "DynParaLoop.hxx"
#include "InputPort.hxx"
#include "OutputPort.hxx"
#include "InPropertyPort.hxx"
sendEvent("status");
}
+std::vector<std::pair<std::string,int> > Node::getDPLScopeInfo(ComposedNode *gfn)
+{
+ std::vector< std::pair<std::string,int> > ret;
+ Node *work2(this);
+ ComposedNode *work(getFather());
+ while(work!=gfn && work!=0)
+ {
+ DynParaLoop *workc(dynamic_cast<DynParaLoop *>(work));
+ if(workc)
+ {
+ std::pair<std::string,int> p(gfn->getChildName(workc),workc->getBranchIDOfNode(work2));
+ ret.push_back(p);
+ }
+ work2=work;
+ work=work->getFather();
+ }
+ return ret;
+}
+
+/*!
+ * Method called by the Executor only if the executor is sensitive of scope of DynParaLoop.
+ * This method is virtual and empty because by default nothing is done.
+ */
+void Node::applyDPLScope(ComposedNode *gfn)
+{
+}
+
//! emit notification to all observers registered with the dispatcher
/*!
* The dispatcher is unique and can be obtained by getDispatcher()
virtual int getMaxLevelOfParallelism() const = 0;
std::string getQualifiedName() const;
int getNumId();
+ std::vector<std::pair<std::string,int> > getDPLScopeInfo(ComposedNode *gfn);
+ virtual void applyDPLScope(ComposedNode *gfn);
virtual void sendEvent(const std::string& event);
static std::map<int,Node *> idMap;
virtual std::string typeName() { return "YACS__ENGINE__Node"; }
using namespace YACS::ENGINE;
using namespace std;
+const char PythonEntry::SCRIPT_FOR_SIMPLE_SERIALIZATION[]="import cPickle\n"
+ "def pickleForVarSimplePyth2009(val):\n"
+ " return cPickle.dumps(val,-1)\n"
+ "\n";
+
const char PythonNode::IMPL_NAME[]="Python";
const char PythonNode::KIND[]="Python";
" args=cPickle.loads(st)\n"
" return args\n";
+const char PythonNode::REMOTE_NAME[]="remote";
+
+const char PythonNode::DPL_INFO_NAME[]="my_dpl_localization";
+
const char PyFuncNode::SCRIPT_FOR_SERIALIZATION[]="import cPickle\n"
"def pickleForDistPyth2009(*args,**kws):\n"
" return cPickle.dumps((args,kws),-1)\n"
" args=cPickle.loads(st)\n"
" return args\n";
-PythonEntry::PythonEntry():_context(0),_pyfuncSer(0),_pyfuncUnser(0)
+
+PythonEntry::PythonEntry():_context(0),_pyfuncSer(0),_pyfuncUnser(0),_pyfuncSimpleSer(0)
{
}
AutoGIL agil;
const char *picklizeScript(getSerializationScript());
PyObject *res=PyRun_String(picklizeScript,Py_file_input,_context,_context);
- if(res == NULL)
+ PyObject *res2(PyRun_String(SCRIPT_FOR_SIMPLE_SERIALIZATION,Py_file_input,_context,_context));
+ if(res == NULL || res2==NULL)
{
std::string errorDetails;
PyObject* new_stderr = newPyStdOut(errorDetails);
Py_DECREF(new_stderr);
throw Exception("Error during load");
}
- Py_DECREF(res);
+ Py_DECREF(res); Py_DECREF(res2);
_pyfuncSer=PyDict_GetItemString(_context,"pickleForDistPyth2009");
_pyfuncUnser=PyDict_GetItemString(_context,"unPickleForDistPyth2009");
+ _pyfuncSimpleSer=PyDict_GetItemString(_context,"pickleForVarSimplePyth2009");
if(_pyfuncSer == NULL)
{
std::string errorDetails;
Py_DECREF(new_stderr);
throw Exception("Error during load");
}
+ if(_pyfuncSimpleSer == NULL)
+ {
+ std::string errorDetails;
+ PyObject *new_stderr(newPyStdOut(errorDetails));
+ reqNode->setErrorDetails(errorDetails);
+ PySys_SetObject((char*)"stderr", new_stderr);
+ PyErr_Print();
+ PySys_SetObject((char*)"stderr", PySys_GetObject((char*)"__stderr__"));
+ Py_DECREF(new_stderr);
+ throw Exception("Error during load");
+ }
}
if(isInitializeRequested)
{//This one is called only once at initialization in the container if an init-script is specified.
void PythonNode::load()
{
DEBTRACE( "---------------PyNode::load function---------------" );
- if(_mode=="remote")
+ if(_mode==PythonNode::REMOTE_NAME)
loadRemote();
else
loadLocal();
void PythonNode::execute()
{
- if(_mode=="remote")
+ if(_mode==PythonNode::REMOTE_NAME)
executeRemote();
else
executeLocal();
return n;
}
+void PythonNode::applyDPLScope(ComposedNode *gfn)
+{
+ std::vector< std::pair<std::string,int> > ret(getDPLScopeInfo(gfn));
+ if(ret.empty())
+ return ;
+ //
+ PyObject *ob(0);
+ {
+ AutoGIL agil;
+ std::size_t sz(ret.size());
+ ob=PyList_New(sz);
+ for(std::size_t i=0;i<sz;i++)
+ {
+ const std::pair<std::string,int>& p(ret[i]);
+ PyObject *elt(PyTuple_New(2));
+ PyTuple_SetItem(elt,0,PyString_FromString(p.first.c_str()));
+ PyTuple_SetItem(elt,1,PyLong_FromLong(p.second));
+ PyList_SetItem(ob,i,elt);
+ }
+ }
+ if(_mode==REMOTE_NAME)
+ {
+ Engines::pickledArgs_var serializationInputCorba(new Engines::pickledArgs);
+ {
+ AutoGIL agil;
+ PyObject *serializationInput(PyObject_CallFunctionObjArgs(_pyfuncSimpleSer,ob,NULL));
+ Py_XDECREF(ob);
+ char *serializationInputC(0);
+ Py_ssize_t len;
+ if (PyString_AsStringAndSize(serializationInput, &serializationInputC, &len))
+ throw Exception("DistributedPythonNode problem in python pickle");
+ serializationInputCorba->length(len);
+ for(int i=0; i < len ; i++)
+ serializationInputCorba[i]=serializationInputC[i];
+ Py_XDECREF(serializationInput);
+ }
+ _pynode->defineNewCustomVar(DPL_INFO_NAME,serializationInputCorba);
+ }
+ else
+ {
+ AutoGIL agil;
+ PyDict_SetItemString(_context,DPL_INFO_NAME,ob);
+ Py_XDECREF(ob);
+ }
+}
+
PyFuncNode::PyFuncNode(const PyFuncNode& other, ComposedNode *father):InlineFuncNode(other,father),_pyfunc(0)
{
_implementation = PythonNode::IMPL_NAME;
void PyFuncNode::load()
{
DEBTRACE( "---------------PyfuncNode::load function---------------" );
- if(_mode=="remote")
+ if(_mode==PythonNode::REMOTE_NAME)
loadRemote();
else
loadLocal();
void PyFuncNode::execute()
{
- if(_mode=="remote")
+ if(_mode==PythonNode::REMOTE_NAME)
executeRemote();
else
executeLocal();
PyObject *_context;
PyObject *_pyfuncSer;
PyObject *_pyfuncUnser;
+ PyObject *_pyfuncSimpleSer;
+ public:
+ static const char SCRIPT_FOR_SIMPLE_SERIALIZATION[];
};
class YACSRUNTIMESALOME_EXPORT PythonNode : public InlineNode, public PythonEntry
std::string getContainerLog();
PythonNode* cloneNode(const std::string& name);
virtual std::string typeName() { return "YACS__ENGINE__PythonNode"; }
+ void applyDPLScope(ComposedNode *gfn);
public:
static const char KIND[];
static const char IMPL_NAME[];
static const char SCRIPT_FOR_SERIALIZATION[];
+ static const char REMOTE_NAME[];
+ static const char DPL_INFO_NAME[];
protected:
Engines::PyScriptNode_var _pynode;
};
pass
def test18(self):
- SALOMERuntime.RuntimeSALOME_setRuntime()
- r=pilot.getRuntime()
- p=r.createProc("prTest18")
- n00=r.createScriptNode("Salome","n00")
+ p=self.r.createProc("prTest18")
+ n00=self.r.createScriptNode("Salome","n00")
self.assertEqual(n00.getMaxLevelOfParallelism(),1)
n00.setExecutionMode("remote")
self.assertEqual(n00.getMaxLevelOfParallelism(),1)
self.assertEqual(n00.getMaxLevelOfParallelism(),7) # <- here
pass
+ def test19(self):
+ """This test checks the mechanism of YACS that allow PythonNodes to know their DynParaLoop context."""
+ fname="test19.xml"
+ r=SALOMERuntime.getSALOMERuntime()
+ l=loader.YACSLoader()
+ #
+ p=self.r.createProc("PROC")
+ ti=p.createType("int","int")
+ tdi=p.createSequenceTc("seqint","seqint",ti)
+ # Level0
+ fe0=self.r.createForEachLoop("FE0",ti) ; p.edAddChild(fe0)
+ fe0.edGetNbOfBranchesPort().edInitInt(4)
+ fe0_end=self.r.createScriptNode("Salome","fe0_end")
+ fe0.edSetFinalizeNode(fe0_end)
+ fe0_end.setScript("""assert([elt[0] for elt in my_dpl_localization]==["FE0"])
+assert(my_dpl_localization[0][1]>=0 and my_dpl_localization[0][1]<4)""")
+ n0=self.r.createScriptNode("Salome","n0") ; p.edAddChild(n0)
+ n0.setScript("o1=range(10)")
+ a=n0.edAddOutputPort("o1",tdi)
+ p.edAddLink(a,fe0.edGetSeqOfSamplesPort()) ; p.edAddCFLink(n0,fe0)
+ # Level1
+ b0=self.r.createBloc("b0") ; fe0.edAddChild(b0)
+ n1=self.r.createScriptNode("Salome","n1") ; b0.edAddChild(n1)
+ n1.setScript("""assert([elt[0] for elt in my_dpl_localization]==["FE0"])
+assert(my_dpl_localization[0][1]>=0 and my_dpl_localization[0][1]<4)
+o1=range(10)""")
+ b=n1.edAddOutputPort("o1",tdi)
+ fe1=self.r.createForEachLoop("FE1",ti) ; b0.edAddChild(fe1)
+ fe1.edGetNbOfBranchesPort().edInitInt(3)
+ fe1_end=self.r.createScriptNode("Salome","fe1_end")
+ fe1_end.setScript("""assert([elt[0] for elt in my_dpl_localization]==["FE0.b0.FE1","FE0"])
+assert(my_dpl_localization[1][1]>=0 and my_dpl_localization[1][1]<4)
+assert(my_dpl_localization[0][1]>=0 and my_dpl_localization[0][1]<3)
+""")
+ fe1.edSetFinalizeNode(fe1_end)
+ p.edAddLink(b,fe1.edGetSeqOfSamplesPort()) ; p.edAddCFLink(n1,fe1)
+ # Level2
+ n2=self.r.createScriptNode("Salome","n2") ; fe1.edAddChild(n2)
+ n2.setScript("""assert([elt[0] for elt in my_dpl_localization]==["FE0.b0.FE1","FE0"])
+assert(my_dpl_localization[1][1]>=0 and my_dpl_localization[1][1]<4)
+assert(my_dpl_localization[0][1]>=0 and my_dpl_localization[0][1]<3)
+""")
+
+ p.saveSchema(fname)
+ ex=pilot.ExecutorSwig()
+
+ # local run of PythonNodes n1 and n2
+ p=l.load(fname)
+ p.init()
+ self.assertEqual(p.getState(),pilot.READY)
+ ex.setDPLScopeSensitive(True) # <- this line is the aim of the test
+ ex.RunW(p,0)
+ self.assertEqual(p.getState(),pilot.DONE)
+
+ # run remote
+ p=l.load(fname)
+ cont=p.createContainer("gg","HPSalome")
+ cont.setSizeOfPool(2)
+ n1=p.getChildByName("FE0.b0.n1") ; n1.setExecutionMode("remote") ; n1.setContainer(cont)
+ n2=p.getChildByName("FE0.b0.FE1.n2") ; n2.setExecutionMode("remote") ; n2.setContainer(cont)
+
+ p.init()
+ self.assertEqual(p.getState(),pilot.READY)
+ ex.RunW(p,0)
+ self.assertEqual(p.getState(),pilot.DONE)
+ pass
+
pass
if __name__ == '__main__':