const char YACSEvalYFXGraphGen::GATHER_NODE_NAME[]="__gather__";
+const char YACSEvalYFXGraphGen::HIDDEN_INDEX_VAR[]="___idx___";
+
class MyAutoThreadSaver
{
public:
_generatedGraph=0; _FEInGeneratedGraph=0;
}
-bool YACSEvalYFXGraphGen::isLocked() const
-{
- return _generatedGraph!=0;
-}
-
-int YACSEvalYFXGraphGen::assignNbOfBranches()
-{
- if(!_generatedGraph)
- throw YACS::Exception("YACSEvalYFXGraphGen::assignNbOfBranches : the generated graph has not been created !");
- std::list<YACS::ENGINE::Node *> nodes(_generatedGraph->getChildren());
- YACS::ENGINE::ForEachLoop *zeMainNode(0);
- for(std::list<YACS::ENGINE::Node *>::const_iterator it=nodes.begin();it!=nodes.end();it++)
- {
- YACS::ENGINE::ForEachLoop *isZeMainNode(dynamic_cast<YACS::ENGINE::ForEachLoop *>(*it));
- if(isZeMainNode)
- {
- if(!zeMainNode)
- zeMainNode=isZeMainNode;
- else
- throw YACS::Exception("YACSEvalYFXGraphGen::assignNbOfBranches : internal error 1 !");
- }
- }
- if(!zeMainNode)
- throw YACS::Exception("YACSEvalYFXGraphGen::assignNbOfBranches : internal error 2 !");
- unsigned int nbProcsDeclared(getBoss()->getResourcesInternal()->getNumberOfProcsDeclared());
- nbProcsDeclared=std::max(nbProcsDeclared,4u);
- int nbOfBranch=1;
- if(getBoss()->getParallelizeStatus())
- {
- nbOfBranch=(nbProcsDeclared/getBoss()->getResourcesInternal()->getMaxLevelOfParallelism());
- nbOfBranch=std::max(nbOfBranch,1);
- }
- YACS::ENGINE::InputPort *zeInputToSet(zeMainNode->edGetNbOfBranchesPort());
- YACS::ENGINE::AnyInputPort *zeInputToSetC(dynamic_cast<YACS::ENGINE::AnyInputPort *>(zeInputToSet));
- if(!zeInputToSetC)
- throw YACS::Exception("YACSEvalYFXGraphGen::assignNbOfBranches : internal error 3 !");
- YACS::ENGINE::Any *a(YACS::ENGINE::AtomAny::New(nbOfBranch));
- zeInputToSetC->put(a);
- zeInputToSetC->exSaveInit();
- a->decrRef();
- return nbOfBranch;
-}
-
-void YACSEvalYFXGraphGenInteractive::generateGraph()
+void YACSEvalYFXGraphGen::generateGraphCommon(CustomPatcher& patcher)
{
if(_generatedGraph)
{ delete _generatedGraph; _generatedGraph=0; _FEInGeneratedGraph=0; }
YACS::ENGINE::TypeCode *pyobjTC(_generatedGraph->createInterfaceTc("python:obj:1.0","pyobj",std::list<YACS::ENGINE::TypeCodeObjref *>()));
std::ostringstream oss; oss << "Loop_" << getBoss()->getRunNode()->getName();
_generatedGraph->createType(YACSEvalAnyDouble::TYPE_REPR,"double");
- _generatedGraph->createType(YACSEvalAnyInt::TYPE_REPR,"int");
+ YACS::ENGINE::TypeCode *tcInt(_generatedGraph->createType(YACSEvalAnyInt::TYPE_REPR,"int"));
//
YACS::ENGINE::InlineNode *n0(r->createScriptNode(YACS::ENGINE::PythonNode::KIND,"__initializer__"));
_generatedGraph->edAddChild(n0);
(*it).setUndergroundPortToBeSet(inpc);
}
}
- std::ostringstream n0Script; n0Script << "sender=zip(" << var0.str() << ")\n";
+ std::ostringstream n0Script; n0Script << "sender=[tuple([__p9Sq]+list(__p9Sw)) for __p9Sq,__p9Sw in enumerate(zip(" << var0.str() << "))]\n";
n0->setScript(n0Script.str());
//
YACS::ENGINE::ForEachLoop *n1(r->createForEachLoop(oss.str(),pyobjTC));
n10->edAddCFLink(n100,n101);
n1->edAddDFLink(n1->edGetSamplePort(),dispatchIn);
std::ostringstream var1;
+ YACS::ENGINE::OutputPort *n100_output(n100->edAddOutputPort(HIDDEN_INDEX_VAR,tcInt));
+ var1 << HIDDEN_INDEX_VAR << ",";
for(std::vector< YACSEvalInputPort >::const_iterator it=inputs.begin();it!=inputs.end();it++)
{
if((*it).isRandomVar())
std::ostringstream n100Script; n100Script << var1.str() << "=i0\n";
n100->setScript(n100Script.str());
const std::vector<YACSEvalOutputPort *>& outputsOfInt(getBoss()->getOutputsOfInterest());
+ {
+ std::list<YACS::ENGINE::ElementaryNode *> n101_constit(n101->getRecursiveConstituents());
+ for(std::list<YACS::ENGINE::ElementaryNode *>::const_iterator it=n101_constit.begin();it!=n101_constit.end();it++)
+ {
+ YACS::ENGINE::InlineNode *eltc(dynamic_cast<YACS::ENGINE::InlineNode *>(*it));
+ if(eltc)
+ {
+ YACS::ENGINE::InputPort *n101_input(eltc->edAddInputPort(HIDDEN_INDEX_VAR,tcInt));
+ _generatedGraph->edAddDFLink(n100_output,n101_input);
+ }
+ }
+ }
for(std::vector< YACSEvalOutputPort * >::const_iterator it=outputsOfInt.begin();it!=outputsOfInt.end();it++)
{
+ patcher.addOutputVar((*it)->getName());
YACS::ENGINE::TypeCode *tc(YACSEvalYFXPattern::CreateSeqTypeCodeFrom(_generatedGraph,(*it)->getTypeOfData()));
YACS::ENGINE::InputPort *myIn(n2->edAddInputPort((*it)->getName(),tc));
std::string tmpPortName(runNode->getOutPortName((*it)->getUndergroundPtr()));
YACS::ENGINE::OutputPort *myOut(n101->getOutputPort(tmpPortName));
_generatedGraph->edAddDFLink(myOut,myIn);
}
+ patcher.assignOutput(n2);
_generatedGraph->updateContainersAndComponents();
}
+bool YACSEvalYFXGraphGen::isLocked() const
+{
+ return _generatedGraph!=0;
+}
+
+int YACSEvalYFXGraphGen::assignNbOfBranches()
+{
+ if(!_generatedGraph)
+ throw YACS::Exception("YACSEvalYFXGraphGen::assignNbOfBranches : the generated graph has not been created !");
+ std::list<YACS::ENGINE::Node *> nodes(_generatedGraph->getChildren());
+ YACS::ENGINE::ForEachLoop *zeMainNode(0);
+ for(std::list<YACS::ENGINE::Node *>::const_iterator it=nodes.begin();it!=nodes.end();it++)
+ {
+ YACS::ENGINE::ForEachLoop *isZeMainNode(dynamic_cast<YACS::ENGINE::ForEachLoop *>(*it));
+ if(isZeMainNode)
+ {
+ if(!zeMainNode)
+ zeMainNode=isZeMainNode;
+ else
+ throw YACS::Exception("YACSEvalYFXGraphGen::assignNbOfBranches : internal error 1 !");
+ }
+ }
+ if(!zeMainNode)
+ throw YACS::Exception("YACSEvalYFXGraphGen::assignNbOfBranches : internal error 2 !");
+ unsigned int nbProcsDeclared(getBoss()->getResourcesInternal()->getNumberOfProcsDeclared());
+ nbProcsDeclared=std::max(nbProcsDeclared,4u);
+ int nbOfBranch=1;
+ if(getBoss()->getParallelizeStatus())
+ {
+ nbOfBranch=(nbProcsDeclared/getBoss()->getResourcesInternal()->getMaxLevelOfParallelism());
+ nbOfBranch=std::max(nbOfBranch,1);
+ }
+ YACS::ENGINE::InputPort *zeInputToSet(zeMainNode->edGetNbOfBranchesPort());
+ YACS::ENGINE::AnyInputPort *zeInputToSetC(dynamic_cast<YACS::ENGINE::AnyInputPort *>(zeInputToSet));
+ if(!zeInputToSetC)
+ throw YACS::Exception("YACSEvalYFXGraphGen::assignNbOfBranches : internal error 3 !");
+ YACS::ENGINE::Any *a(YACS::ENGINE::AtomAny::New(nbOfBranch));
+ zeInputToSetC->put(a);
+ zeInputToSetC->exSaveInit();
+ a->decrRef();
+ return nbOfBranch;
+}
+
+void YACSEvalYFXGraphGenInteractive::generateGraph()
+{
+ class LocalPatcher : public YACSEvalYFXGraphGen::CustomPatcher
+ {
+ public:
+ void addOutputVar(const std::string& name) { }
+ void assignOutput(YACS::ENGINE::InlineNode *node) { }
+ };
+ LocalPatcher lp;
+ this->generateGraphCommon(lp);
+}
+
bool YACSEvalYFXGraphGenInteractive::go(const YACSEvalExecParams& params, YACSEvalSession *session) const
{
YACS::ENGINE::Executor exe;
void YACSEvalYFXGraphGenCluster::generateGraph()
{
YACS::ENGINE::AutoGIL agil;
- if(_generatedGraph)
- { delete _generatedGraph; _generatedGraph=0; _FEInGeneratedGraph=0; }
//
const char EFXGenFileName[]="EFXGenFileName";
const char EFXGenContent[]="import getpass,datetime,os\nn=datetime.datetime.now()\nreturn os.path.join(os.path.sep,\"tmp\",\"EvalYFX_%s_%s_%s.xml\"%(getpass.getuser(),n.strftime(\"%d%m%y\"),n.strftime(\"%H%M%S\")))";
val=YACS::ENGINE::evalFuncPyWithNoParams(func);
_jobName=PyString_AsString(val);
//
- static const char LISTPYOBJ_STR[]="list[pyobj]";
- if(getBoss()->getOutputsOfInterest().empty())
- return ;
- YACS::ENGINE::RuntimeSALOME::setRuntime();
- YACS::ENGINE::RuntimeSALOME *r(YACS::ENGINE::getSALOMERuntime());
- _generatedGraph=r->createProc(DFT_PROC_NAME);
- YACS::ENGINE::TypeCode *pyobjTC(_generatedGraph->createInterfaceTc("python:obj:1.0","pyobj",std::list<YACS::ENGINE::TypeCodeObjref *>()));
- std::ostringstream oss; oss << "Loop_" << getBoss()->getRunNode()->getName();
- _generatedGraph->createType(YACSEvalAnyDouble::TYPE_REPR,"double");
- _generatedGraph->createType(YACSEvalAnyInt::TYPE_REPR,"int");
- //
- YACS::ENGINE::InlineNode *n0(r->createScriptNode(YACS::ENGINE::PythonNode::KIND,"__initializer__"));
- _generatedGraph->edAddChild(n0);
- YACS::ENGINE::TypeCode *listPyobjTC(_generatedGraph->createSequenceTc(LISTPYOBJ_STR,LISTPYOBJ_STR,pyobjTC));
- YACS::ENGINE::OutputPort *sender(n0->edAddOutputPort("sender",listPyobjTC));
- std::ostringstream var0;
- const std::vector< YACSEvalInputPort >& inputs(getBoss()->getInputs());
- for(std::vector< YACSEvalInputPort >::const_iterator it=inputs.begin();it!=inputs.end();it++)
- {
- if((*it).isRandomVar())
- {
- var0 << (*it).getName() << ",";
- YACS::ENGINE::TypeCode *tc(YACSEvalYFXPattern::CreateSeqTypeCodeFrom(_generatedGraph,(*it).getTypeOfData()));
- YACS::ENGINE::InputPort *inp(n0->edAddInputPort((*it).getName(),tc));
- YACS::ENGINE::InputPyPort *inpc(dynamic_cast<YACS::ENGINE::InputPyPort *>(inp));
- if(!inpc)
- throw YACS::Exception("YACSEvalYFXRunOnlyPattern::generateGraph : internal error 1 !");
- (*it).setUndergroundPortToBeSet(inpc);
- }
+ class ClusterPatcher : public YACSEvalYFXGraphGen::CustomPatcher
+ {
+ public:
+ ClusterPatcher(const std::string& jobName):_jobName(jobName) { n2Script << "zeRes=["; }
+ void addOutputVar(const std::string& name) { n2Script<< name << ", "; }
+ void assignOutput(YACS::ENGINE::InlineNode *node) {
+ n2Script << "]\nwith open(\"" << _jobName << "\",\"w\") as f:" << std::endl;
+ n2Script << " f.write(str(zeRes))" << std::endl;
+ node->setScript(n2Script.str());
}
- std::ostringstream n0Script; n0Script << "sender=zip(" << var0.str() << ")\n";
- n0->setScript(n0Script.str());
+ private:
+ std::ostringstream n2Script;
+ std::string _jobName;
+ };
+ ClusterPatcher cp(_jobName);
//
- YACS::ENGINE::ForEachLoop *n1(r->createForEachLoop(oss.str(),pyobjTC));
- _FEInGeneratedGraph=n1;
- _generatedGraph->edAddChild(n1);
- _generatedGraph->edAddCFLink(n0,n1);
- _generatedGraph->edAddDFLink(sender,n1->edGetSeqOfSamplesPort());
- YACS::ENGINE::InlineNode *n2(r->createScriptNode(YACS::ENGINE::PythonNode::KIND,GATHER_NODE_NAME));
- _generatedGraph->edAddChild(n2);
- _generatedGraph->edAddCFLink(n1,n2);
- //
- YACS::ENGINE::Bloc *n10(r->createBloc(FIRST_FE_SUBNODE_NAME));
- n1->edAddChild(n10);
- YACS::ENGINE::InlineNode *n100(r->createScriptNode(YACS::ENGINE::PythonNode::KIND,"__dispatch__"));
- YACS::ENGINE::ComposedNode *runNode(getBoss()->getRunNode());
- YACS::ENGINE::Node *n101(runNode->cloneWithoutCompAndContDeepCpy(0,true));
- n10->edAddChild(n100);
- n10->edAddChild(n101);
- YACS::ENGINE::InputPort *dispatchIn(n100->edAddInputPort("i0",pyobjTC));
- n10->edAddCFLink(n100,n101);
- n1->edAddDFLink(n1->edGetSamplePort(),dispatchIn);
- std::ostringstream var1;
- for(std::vector< YACSEvalInputPort >::const_iterator it=inputs.begin();it!=inputs.end();it++)
- {
- if((*it).isRandomVar())
- {
- var1 << (*it).getName() << ",";
- YACS::ENGINE::OutputPort *myOut(n100->edAddOutputPort((*it).getName(),_generatedGraph->getTypeCode((*it).getTypeOfData())));
- std::string tmpPortName(runNode->getInPortName((*it).getUndergroundPtr()));
- YACS::ENGINE::InputPort *myIn(n101->getInputPort(tmpPortName));
- n10->edAddDFLink(myOut,myIn);
- }
- }
- std::ostringstream n100Script; n100Script << var1.str() << "=i0\n";
- n100->setScript(n100Script.str());
- const std::vector<YACSEvalOutputPort *>& outputsOfInt(getBoss()->getOutputsOfInterest());
- std::ostringstream n2Script; n2Script << "zeRes=[";
- for(std::vector< YACSEvalOutputPort * >::const_iterator it=outputsOfInt.begin();it!=outputsOfInt.end();it++)
- {
- YACS::ENGINE::TypeCode *tc(YACSEvalYFXPattern::CreateSeqTypeCodeFrom(_generatedGraph,(*it)->getTypeOfData()));
- YACS::ENGINE::InputPort *myIn(n2->edAddInputPort((*it)->getName(),tc));
- n2Script << (*it)->getName() << ", ";
- std::string tmpPortName(runNode->getOutPortName((*it)->getUndergroundPtr()));
- YACS::ENGINE::OutputPort *myOut(n101->getOutputPort(tmpPortName));
- _generatedGraph->edAddDFLink(myOut,myIn);
- }
- n2Script << "]\nf=file(\"" << _jobName << "\",\"w\") ; f.write(str(zeRes)) ; del f";
- n2->setScript(n2Script.str());
- _generatedGraph->updateContainersAndComponents();
+ this->generateGraphCommon(cp);
}
bool YACSEvalYFXGraphGenCluster::go(const YACSEvalExecParams& params, YACSEvalSession *session) const