+ if(!zeMainNode)
+ throw YACS::Exception("YACSEvalYFXGraphGen::assignNbOfBranches : internal error 2 !");
+ unsigned int nbProcsDeclared(getBoss()->getResourcesInternal()->getNumberOfProcsDeclared());
+ nbProcsDeclared=std::max(nbProcsDeclared,4u);
+ int nbOfBranch=1;
+ if(getBoss()->getParallelizeStatus())
+ {
+ nbOfBranch=(nbProcsDeclared/getBoss()->getResourcesInternal()->getMaxLevelOfParallelism());
+ nbOfBranch=std::max(nbOfBranch,1);
+ }
+ YACS::ENGINE::InputPort *zeInputToSet(zeMainNode->edGetNbOfBranchesPort());
+ YACS::ENGINE::AnyInputPort *zeInputToSetC(dynamic_cast<YACS::ENGINE::AnyInputPort *>(zeInputToSet));
+ if(!zeInputToSetC)
+ throw YACS::Exception("YACSEvalYFXGraphGen::assignNbOfBranches : internal error 3 !");
+ YACS::ENGINE::Any *a(YACS::ENGINE::AtomAny::New(nbOfBranch));
+ zeInputToSetC->put(a);
+ zeInputToSetC->exSaveInit();
+ a->decrRef();
+ return nbOfBranch;
+}
+
+void YACSEvalYFXGraphGenInteractive::generateGraph()
+{
+ if(_generatedGraph)
+ { delete _generatedGraph; _generatedGraph=0; _FEInGeneratedGraph=0; }
+ static const char LISTPYOBJ_STR[]="list[pyobj]";
+ if(getBoss()->getOutputsOfInterest().empty())
+ return ;
+ YACS::ENGINE::RuntimeSALOME::setRuntime();
+ YACS::ENGINE::RuntimeSALOME *r(YACS::ENGINE::getSALOMERuntime());
+ _generatedGraph=r->createProc(DFT_PROC_NAME);
+ YACS::ENGINE::TypeCode *pyobjTC(_generatedGraph->createInterfaceTc("python:obj:1.0","pyobj",std::list<YACS::ENGINE::TypeCodeObjref *>()));
+ std::ostringstream oss; oss << "Loop_" << getBoss()->getRunNode()->getName();
+ _generatedGraph->createType(YACSEvalAnyDouble::TYPE_REPR,"double");
+ _generatedGraph->createType(YACSEvalAnyInt::TYPE_REPR,"int");
+ //
+ YACS::ENGINE::InlineNode *n0(r->createScriptNode(YACS::ENGINE::PythonNode::KIND,"__initializer__"));
+ _generatedGraph->edAddChild(n0);
+ YACS::ENGINE::TypeCode *listPyobjTC(_generatedGraph->createSequenceTc(LISTPYOBJ_STR,LISTPYOBJ_STR,pyobjTC));
+ YACS::ENGINE::OutputPort *sender(n0->edAddOutputPort("sender",listPyobjTC));
+ std::ostringstream var0;
+ const std::vector< YACSEvalInputPort >& inputs(getBoss()->getInputs());
+ for(std::vector< YACSEvalInputPort >::const_iterator it=inputs.begin();it!=inputs.end();it++)
+ {
+ if((*it).isRandomVar())
+ {
+ var0 << (*it).getName() << ",";
+ YACS::ENGINE::TypeCode *tc(YACSEvalYFXPattern::CreateSeqTypeCodeFrom(_generatedGraph,(*it).getTypeOfData()));
+ YACS::ENGINE::InputPort *inp(n0->edAddInputPort((*it).getName(),tc));
+ YACS::ENGINE::InputPyPort *inpc(dynamic_cast<YACS::ENGINE::InputPyPort *>(inp));
+ if(!inpc)
+ throw YACS::Exception("YACSEvalYFXRunOnlyPattern::generateGraph : internal error 1 !");
+ (*it).setUndergroundPortToBeSet(inpc);
+ }
+ }
+ std::ostringstream n0Script; n0Script << "sender=zip(" << var0.str() << ")\n";
+ n0->setScript(n0Script.str());
+ //
+ YACS::ENGINE::ForEachLoop *n1(r->createForEachLoop(oss.str(),pyobjTC));
+ _FEInGeneratedGraph=n1;
+ _generatedGraph->edAddChild(n1);
+ _generatedGraph->edAddCFLink(n0,n1);
+ _generatedGraph->edAddDFLink(sender,n1->edGetSeqOfSamplesPort());
+ YACS::ENGINE::InlineNode *n2(r->createScriptNode(YACS::ENGINE::PythonNode::KIND,GATHER_NODE_NAME));
+ _generatedGraph->edAddChild(n2);
+ _generatedGraph->edAddCFLink(n1,n2);
+ //
+ YACS::ENGINE::Bloc *n10(r->createBloc(FIRST_FE_SUBNODE_NAME));
+ n1->edAddChild(n10);
+ YACS::ENGINE::InlineNode *n100(r->createScriptNode(YACS::ENGINE::PythonNode::KIND,"__dispatch__"));
+ YACS::ENGINE::ComposedNode *runNode(getBoss()->getRunNode());
+ YACS::ENGINE::Node *n101(runNode->cloneWithoutCompAndContDeepCpy(0,true));
+ n10->edAddChild(n100);
+ n10->edAddChild(n101);
+ YACS::ENGINE::InputPort *dispatchIn(n100->edAddInputPort("i0",pyobjTC));
+ n10->edAddCFLink(n100,n101);
+ n1->edAddDFLink(n1->edGetSamplePort(),dispatchIn);
+ std::ostringstream var1;
+ for(std::vector< YACSEvalInputPort >::const_iterator it=inputs.begin();it!=inputs.end();it++)
+ {
+ if((*it).isRandomVar())
+ {
+ var1 << (*it).getName() << ",";
+ YACS::ENGINE::OutputPort *myOut(n100->edAddOutputPort((*it).getName(),_generatedGraph->getTypeCode((*it).getTypeOfData())));
+ std::string tmpPortName(runNode->getInPortName((*it).getUndergroundPtr()));
+ YACS::ENGINE::InputPort *myIn(n101->getInputPort(tmpPortName));
+ n10->edAddDFLink(myOut,myIn);
+ }
+ }
+ std::ostringstream n100Script; n100Script << var1.str() << "=i0\n";
+ n100->setScript(n100Script.str());
+ const std::vector<YACSEvalOutputPort *>& outputsOfInt(getBoss()->getOutputsOfInterest());
+ for(std::vector< YACSEvalOutputPort * >::const_iterator it=outputsOfInt.begin();it!=outputsOfInt.end();it++)
+ {
+ YACS::ENGINE::TypeCode *tc(YACSEvalYFXPattern::CreateSeqTypeCodeFrom(_generatedGraph,(*it)->getTypeOfData()));
+ YACS::ENGINE::InputPort *myIn(n2->edAddInputPort((*it)->getName(),tc));
+ std::string tmpPortName(runNode->getOutPortName((*it)->getUndergroundPtr()));
+ YACS::ENGINE::OutputPort *myOut(n101->getOutputPort(tmpPortName));
+ _generatedGraph->edAddDFLink(myOut,myIn);
+ }
+ _generatedGraph->updateContainersAndComponents();
+}
+
+bool YACSEvalYFXGraphGenInteractive::go(bool stopASAP, YACSEvalSession *session) const
+{
+ YACS::ENGINE::Executor exe;
+ exe.setKeepGoingProperty(!stopASAP);
+ {
+ MyAutoThreadSaver locker;
+ exe.RunW(getUndergroundGeneratedGraph());
+ }
+ return getUndergroundGeneratedGraph()->getState()==YACS::DONE;
+}
+
+std::vector<YACSEvalSeqAny *> YACSEvalYFXGraphGenInteractive::getResults() const
+{
+ if(getUndergroundGeneratedGraph()->getState()!=YACS::DONE)
+ throw YACS::Exception("YACSEvalYFXRunOnlyPattern::getResults : the execution did not finished correctly ! getResults should not be called !");
+ const std::vector<YACSEvalOutputPort *>& outputsOfInt(getBoss()->getOutputsOfInterest());
+ std::vector<YACSEvalSeqAny *> ret(outputsOfInt.size());
+ YACS::ENGINE::Node *node(getUndergroundGeneratedGraph()->getChildByName(YACSEvalYFXGraphGen::GATHER_NODE_NAME));
+ YACS::ENGINE::PythonNode *nodeC(dynamic_cast<YACS::ENGINE::PythonNode *>(node));
+ if(!nodeC)
+ throw YACS::Exception("YACSEvalYFXRunOnlyPattern::getResults : internal error !");
+ std::size_t ii(0);
+ for(std::vector< YACSEvalOutputPort * >::const_iterator it=outputsOfInt.begin();it!=outputsOfInt.end();it++,ii++)
+ {
+ YACS::ENGINE::InPort *input(nodeC->getInPort((*it)->getName()));
+ YACS::ENGINE::InputPyPort *inputC(dynamic_cast<YACS::ENGINE::InputPyPort *>(input));
+ if(!inputC)
+ {
+ std::ostringstream oss; oss << "YACSEvalYFXRunOnlyPattern::getResults : internal error for input \"" << (*it)->getName() << "\"";
+ throw YACS::Exception(oss.str());
+ }
+ ret[ii]=YACSEvalYFXPattern::BuildValueInPort(inputC);
+ }
+ return ret;
+}
+
+////////////////////
+
+void YACSEvalYFXGraphGenCluster::generateGraph()
+{
+ if(_generatedGraph)
+ { delete _generatedGraph; _generatedGraph=0; _FEInGeneratedGraph=0; }
+ //
+ const char EFXGenFileName[]="EFXGenFileName";
+ const char EFXGenContent[]="import getpass,datetime,os\nn=datetime.datetime.now()\nreturn os.path.join(os.path.sep,\"tmp\",\"EvalYFX_%s_%s_%s.xml\"%(getpass.getuser(),n.strftime(\"%d%b%y\"),n.strftime(\"%H%M%S\")))";
+ const char EFXGenContent2[]="import getpass,datetime\nn=datetime.datetime.now()\nreturn \"EvalYFX_%s_%s_%s\"%(getpass.getuser(),n.strftime(\"%d%b%y\"),n.strftime(\"%H%M%S\"))";
+ //
+ YACS::ENGINE::AutoPyRef func(YACS::ENGINE::evalPy(EFXGenFileName,EFXGenContent));
+ YACS::ENGINE::AutoPyRef val(YACS::ENGINE::evalFuncPyWithNoParams(func));
+ _locSchemaFile=PyString_AsString(val);
+ func=YACS::ENGINE::evalPy(EFXGenFileName,EFXGenContent2);
+ val=YACS::ENGINE::evalFuncPyWithNoParams(func);
+ _jobName=PyString_AsString(val);
+ //
+ static const char LISTPYOBJ_STR[]="list[pyobj]";
+ if(getBoss()->getOutputsOfInterest().empty())
+ return ;
+ YACS::ENGINE::RuntimeSALOME::setRuntime();
+ YACS::ENGINE::RuntimeSALOME *r(YACS::ENGINE::getSALOMERuntime());
+ _generatedGraph=r->createProc(DFT_PROC_NAME);
+ YACS::ENGINE::TypeCode *pyobjTC(_generatedGraph->createInterfaceTc("python:obj:1.0","pyobj",std::list<YACS::ENGINE::TypeCodeObjref *>()));
+ std::ostringstream oss; oss << "Loop_" << getBoss()->getRunNode()->getName();
+ _generatedGraph->createType(YACSEvalAnyDouble::TYPE_REPR,"double");
+ _generatedGraph->createType(YACSEvalAnyInt::TYPE_REPR,"int");
+ //
+ YACS::ENGINE::InlineNode *n0(r->createScriptNode(YACS::ENGINE::PythonNode::KIND,"__initializer__"));
+ _generatedGraph->edAddChild(n0);
+ YACS::ENGINE::TypeCode *listPyobjTC(_generatedGraph->createSequenceTc(LISTPYOBJ_STR,LISTPYOBJ_STR,pyobjTC));
+ YACS::ENGINE::OutputPort *sender(n0->edAddOutputPort("sender",listPyobjTC));
+ std::ostringstream var0;
+ const std::vector< YACSEvalInputPort >& inputs(getBoss()->getInputs());
+ for(std::vector< YACSEvalInputPort >::const_iterator it=inputs.begin();it!=inputs.end();it++)
+ {
+ if((*it).isRandomVar())
+ {
+ var0 << (*it).getName() << ",";
+ YACS::ENGINE::TypeCode *tc(YACSEvalYFXPattern::CreateSeqTypeCodeFrom(_generatedGraph,(*it).getTypeOfData()));
+ YACS::ENGINE::InputPort *inp(n0->edAddInputPort((*it).getName(),tc));
+ YACS::ENGINE::InputPyPort *inpc(dynamic_cast<YACS::ENGINE::InputPyPort *>(inp));
+ if(!inpc)
+ throw YACS::Exception("YACSEvalYFXRunOnlyPattern::generateGraph : internal error 1 !");
+ (*it).setUndergroundPortToBeSet(inpc);
+ }
+ }
+ std::ostringstream n0Script; n0Script << "sender=zip(" << var0.str() << ")\n";
+ n0->setScript(n0Script.str());
+ //
+ YACS::ENGINE::ForEachLoop *n1(r->createForEachLoop(oss.str(),pyobjTC));
+ _FEInGeneratedGraph=n1;
+ _generatedGraph->edAddChild(n1);
+ _generatedGraph->edAddCFLink(n0,n1);
+ _generatedGraph->edAddDFLink(sender,n1->edGetSeqOfSamplesPort());
+ YACS::ENGINE::InlineNode *n2(r->createScriptNode(YACS::ENGINE::PythonNode::KIND,GATHER_NODE_NAME));
+ _generatedGraph->edAddChild(n2);
+ _generatedGraph->edAddCFLink(n1,n2);
+ //
+ YACS::ENGINE::Bloc *n10(r->createBloc(FIRST_FE_SUBNODE_NAME));
+ n1->edAddChild(n10);
+ YACS::ENGINE::InlineNode *n100(r->createScriptNode(YACS::ENGINE::PythonNode::KIND,"__dispatch__"));
+ YACS::ENGINE::ComposedNode *runNode(getBoss()->getRunNode());
+ YACS::ENGINE::Node *n101(runNode->cloneWithoutCompAndContDeepCpy(0,true));
+ n10->edAddChild(n100);
+ n10->edAddChild(n101);
+ YACS::ENGINE::InputPort *dispatchIn(n100->edAddInputPort("i0",pyobjTC));
+ n10->edAddCFLink(n100,n101);
+ n1->edAddDFLink(n1->edGetSamplePort(),dispatchIn);
+ std::ostringstream var1;
+ for(std::vector< YACSEvalInputPort >::const_iterator it=inputs.begin();it!=inputs.end();it++)
+ {
+ if((*it).isRandomVar())
+ {
+ var1 << (*it).getName() << ",";
+ YACS::ENGINE::OutputPort *myOut(n100->edAddOutputPort((*it).getName(),_generatedGraph->getTypeCode((*it).getTypeOfData())));
+ std::string tmpPortName(runNode->getInPortName((*it).getUndergroundPtr()));
+ YACS::ENGINE::InputPort *myIn(n101->getInputPort(tmpPortName));
+ n10->edAddDFLink(myOut,myIn);
+ }
+ }
+ std::ostringstream n100Script; n100Script << var1.str() << "=i0\n";
+ n100->setScript(n100Script.str());
+ const std::vector<YACSEvalOutputPort *>& outputsOfInt(getBoss()->getOutputsOfInterest());
+ std::ostringstream n2Script; n2Script << "zeRes=[";
+ for(std::vector< YACSEvalOutputPort * >::const_iterator it=outputsOfInt.begin();it!=outputsOfInt.end();it++)
+ {
+ YACS::ENGINE::TypeCode *tc(YACSEvalYFXPattern::CreateSeqTypeCodeFrom(_generatedGraph,(*it)->getTypeOfData()));
+ YACS::ENGINE::InputPort *myIn(n2->edAddInputPort((*it)->getName(),tc));
+ n2Script << (*it)->getName() << ", ";
+ std::string tmpPortName(runNode->getOutPortName((*it)->getUndergroundPtr()));
+ YACS::ENGINE::OutputPort *myOut(n101->getOutputPort(tmpPortName));
+ _generatedGraph->edAddDFLink(myOut,myIn);
+ }
+ n2Script << "]\nf=file(\"" << _jobName << "\",\"w\") ; f.write(str(zeRes)) ; del f";
+ n2->setScript(n2Script.str());
+ _generatedGraph->updateContainersAndComponents();
+}
+
+bool YACSEvalYFXGraphGenCluster::go(bool stopASAP, YACSEvalSession *session) const
+{
+ getUndergroundGeneratedGraph()->saveSchema(_locSchemaFile);
+ YACSEvalListOfResources *rss(getBoss()->getResourcesInternal());
+ const YACSEvalParamsForCluster& cli(rss->getAddParamsForCluster());
+ std::vector<std::string> machines(rss->getAllChosenMachines());
+ if(machines.size()!=1)
+ throw YACS::Exception("YACSEvalYFXGraphGenCluster::go : internal error ! In batch mode and not exactly one machine !");
+ Engines::SalomeLauncher_var sl(session->getInternal()->goFetchingSalomeLauncherInNS());
+ Engines::ResourceParameters rr;
+ rr.name=CORBA::string_dup(machines[0].c_str());
+ rr.hostname=CORBA::string_dup("");
+ rr.can_launch_batch_jobs=true;
+ rr.can_run_containers=true;
+ rr.OS=CORBA::string_dup("Linux");
+ rr.componentList.length(0);
+ rr.nb_proc=rss->getNumberOfProcsDeclared();// <- important
+ rr.mem_mb=1024;
+ rr.cpu_clock=1000;
+ rr.nb_node=1;// useless only nb_proc used.
+ rr.nb_proc_per_node=1;// useless only nb_proc used.
+ rr.policy=CORBA::string_dup("cycl");
+ rr.resList.length(0);
+ Engines::JobParameters jp;
+ jp.job_name=CORBA::string_dup(_jobName.c_str());
+ jp.job_type=CORBA::string_dup("yacs_file");
+ jp.job_file=CORBA::string_dup(_locSchemaFile.c_str());
+ jp.env_file=CORBA::string_dup("");
+ jp.in_files.length(0);
+ jp.out_files.length(1);
+ jp.out_files[0]=CORBA::string_dup(_jobName.c_str());
+ jp.work_directory=CORBA::string_dup(cli.getRemoteWorkingDir().c_str());
+ jp.local_directory=CORBA::string_dup(cli.getLocalWorkingDir().c_str());
+ jp.result_directory=CORBA::string_dup(cli.getLocalWorkingDir().c_str());
+ jp.maximum_duration=CORBA::string_dup(cli.getMaxDuration().c_str());
+ jp.resource_required=rr;
+ jp.queue=CORBA::string_dup("");
+ jp.exclusive=false;
+ jp.mem_per_cpu=rr.mem_mb;
+ jp.wckey=CORBA::string_dup(cli.getWCKey().c_str());
+ jp.extra_params=CORBA::string_dup("");
+ jp.specific_parameters.length(0);
+ jp.launcher_file=CORBA::string_dup("");
+ jp.launcher_args=CORBA::string_dup("");
+ _jobid=sl->createJob(jp);
+ sl->launchJob(_jobid);
+ bool ret(false);
+ while(true)
+ {
+ PyRun_SimpleString("import time ; time.sleep(10)");
+ char *state(sl->getJobState(_jobid));//"CREATED", "IN_PROCESS", "QUEUED", "RUNNING", "PAUSED", "FINISHED" or "FAILED"
+ std::string sstate(state);
+ CORBA::string_free(state);
+ if(sstate=="FINISHED" || sstate=="FAILED")
+ {
+ ret=sstate=="FINISHED";
+ break;
+ }
+ }
+ sl->getJobResults(_jobid,cli.getLocalWorkingDir().c_str());
+ //
+ try
+ {
+ std::ostringstream oss; oss << "import os" << std::endl << "p=os.path.join(\"" << cli.getLocalWorkingDir() << "\",\"" << _jobName << "\")" << std::endl;
+ oss << "if not os.path.exists(p):\n return None\n";
+ oss << "f=file(p,\"r\")" << std::endl;
+ oss << "return eval(f.read())";
+ std::string zeInput(oss.str());
+ YACS::ENGINE::AutoPyRef func(YACS::ENGINE::evalPy("fetch",zeInput));
+ YACS::ENGINE::AutoPyRef val(YACS::ENGINE::evalFuncPyWithNoParams(func));
+ if(!PyList_Check(val))
+ throw YACS::Exception("Fetched file does not contain a list !");
+ Py_ssize_t sz(PyList_Size(val));
+ _res.resize(sz);
+ for(Py_ssize_t i=0;i<sz;i++)
+ {
+ std::vector<double>& res0(_res[i]);
+ PyObject *elt0(PyList_GetItem(val,i));
+ if(!PyList_Check(elt0))
+ throw YACS::Exception("Fetched file does contain a list of list !");
+ Py_ssize_t sz0(PyList_Size(elt0)); res0.resize(sz0);
+ for(Py_ssize_t j=0;j<sz0;j++)
+ {
+ PyObject *elt1(PyList_GetItem(elt0,j));
+ res0[j]=PyFloat_AsDouble(elt1);
+ }
+ }
+ // cleanup
+ std::ostringstream oss1; oss1 << "import os" << std::endl << "p=os.path.join(\"" << cli.getLocalWorkingDir() << "\",\"" << _jobName << "\") ; os.remove(p)" << std::endl;
+ std::string s1(oss1.str());
+ PyRun_SimpleString(s1.c_str());
+ std::ostringstream oss2; oss2 << "import os,shutil" << std::endl << "p=os.path.join(\"" << cli.getLocalWorkingDir() << "\",\"logs\") ; shutil.rmtree(p)" << std::endl;
+ std::string s2(oss2.str());
+ PyRun_SimpleString(s2.c_str());
+ }
+ catch(YACS::Exception& e)
+ {
+ _errors=e.what();
+ return false;
+ }
+ //
+ return ret;
+}
+
+std::vector<YACSEvalSeqAny *> YACSEvalYFXGraphGenCluster::getResults() const
+{
+ std::size_t sz(_res.size());
+ std::vector<YACSEvalSeqAny *> ret(sz);
+ for(std::size_t i=0;i<sz;i++)
+ {
+ YACS::AutoCppPtr<YACSEvalSeqAnyDouble> elt(new YACSEvalSeqAnyDouble(_res[i]));
+ ret[i]=elt.dettach();
+ }
+ return ret;