+ return ret;
+}
+
+////////////////////
+
+void YACSEvalYFXGraphGenCluster::generateGraph()
+{
+ AutoGIL agil;
+ //
+ const char EFXGenFileName[]="EFXGenFileName";
+ const char EFXGenContent[]="import getpass,datetime,os\nn=datetime.datetime.now()\nreturn os.path.join(os.path.sep,\"tmp\",\"EvalYFX_%s_%s_%s.xml\"%(getpass.getuser(),n.strftime(\"%d%m%y\"),n.strftime(\"%H%M%S\")))";
+ const char EFXGenContent2[]="import getpass,datetime\nn=datetime.datetime.now()\nreturn \"EvalYFX_%s_%s_%s\"%(getpass.getuser(),n.strftime(\"%d%m%y\"),n.strftime(\"%H%M%S\"))";
+ //
+ AutoPyRef func(YACS::ENGINE::evalPy(EFXGenFileName,EFXGenContent));
+ AutoPyRef val(YACS::ENGINE::evalFuncPyWithNoParams(func));
+ if (PyUnicode_Check(val))
+ _locSchemaFile = PyUnicode_AsUTF8(val);
+ else
+ throw YACS::Exception("YACSEvalYFXGraphGenCluster::generateGraph: python call error. ");
+
+ func=YACS::ENGINE::evalPy(EFXGenFileName,EFXGenContent2);
+ val=YACS::ENGINE::evalFuncPyWithNoParams(func);
+ if (PyUnicode_Check(val))
+ _jobName = PyUnicode_AsUTF8(val);
+ else
+ throw YACS::Exception("YACSEvalYFXGraphGenCluster::generateGraph: python call error. ");
+
+ class ClusterPatcher : public YACSEvalYFXGraphGen::CustomPatcher
+ {
+ public:
+ ClusterPatcher(const std::string& jobName):_jobName(jobName) { n2Script << "zeRes=["; }
+ void addOutputVar(const std::string& name) { n2Script<< name << ", "; }
+ void assignOutput(YACS::ENGINE::InlineNode *node) {
+ n2Script << "]\nwith open(\"" << _jobName << "\",\"w\") as f:" << std::endl;
+ n2Script << " f.write(repr(zeRes))" << std::endl;
+ node->setScript(n2Script.str());
+ }
+ private:
+ std::ostringstream n2Script;
+ std::string _jobName;
+ };
+ ClusterPatcher cp(_jobName);
+ //
+ this->generateGraphCommon(cp);
+}
+
+bool YACSEvalYFXGraphGenCluster::go(const YACSEvalExecParams& params, YACSEvalSession *session) const
+{
+ AutoGIL agil;
+ _errors = "";
+ getUndergroundGeneratedGraph()->saveSchema(_locSchemaFile);
+ YACSEvalListOfResources *rss(getBoss()->getResourcesInternal());
+ const YACSEvalParamsForCluster& cli(rss->getAddParamsForCluster());
+ std::vector<std::string> machines(rss->getAllChosenMachines());
+ if(machines.size()!=1)
+ throw YACS::Exception("YACSEvalYFXGraphGenCluster::go : internal error ! In batch mode and not exactly one machine !");
+ Engines::SalomeLauncher_var sl(session->getInternal()->goFetchingSalomeLauncherInNS());
+ Engines::ResourceParameters rr;
+ rr.name=CORBA::string_dup(machines[0].c_str());
+ rr.hostname=CORBA::string_dup("");
+ rr.can_launch_batch_jobs=true;
+ rr.can_run_containers=true;
+ rr.OS=CORBA::string_dup("Linux");
+ rr.componentList.length(0);
+ rr.nb_proc=rss->getNumberOfProcsDeclared();// <- important
+ rr.mem_mb=0; // use default value
+ rr.cpu_clock=0; // use default value
+ rr.nb_node=0;// the number of nodes is not explicitly set
+ rr.nb_proc_per_node=1;// useless only nb_proc used.
+ rr.policy=CORBA::string_dup("cycl");
+ rr.resList.length(0);
+ Engines::JobParameters jp;
+ jp.job_name=CORBA::string_dup(_jobName.c_str());
+ jp.job_type=CORBA::string_dup("yacs_file");
+ jp.job_file=CORBA::string_dup(_locSchemaFile.c_str());
+ jp.env_file=CORBA::string_dup("");
+ jp.in_files.length(cli.getInFiles().size());
+ std::list<std::string>::const_iterator it;
+ int i;
+ for (it = cli.getInFiles().begin(), i=0 ;
+ it != cli.getInFiles().end();
+ it++, i++)
+ {
+ jp.in_files[i] = CORBA::string_dup((*it).c_str());
+ }
+ jp.out_files.length(1);
+ jp.out_files[0]=CORBA::string_dup(_jobName.c_str());
+ jp.work_directory=CORBA::string_dup(cli.getRemoteWorkingDir().c_str());
+ jp.local_directory=CORBA::string_dup(cli.getLocalWorkingDir().c_str());
+ jp.result_directory=CORBA::string_dup(cli.getLocalWorkingDir().c_str());
+ jp.maximum_duration=CORBA::string_dup(cli.getMaxDuration().c_str());
+ jp.resource_required=rr;
+ jp.queue=CORBA::string_dup("");
+ jp.exclusive=false;
+ jp.mem_per_cpu=rr.mem_mb;
+ jp.wckey=CORBA::string_dup(cli.getWCKey().c_str());
+ jp.extra_params=CORBA::string_dup("");
+ jp.specific_parameters.length(0);
+ jp.launcher_file=CORBA::string_dup("");
+ jp.launcher_args=CORBA::string_dup("");
+ _jobid=sl->createJob(jp);
+ try
+ {
+ sl->launchJob(_jobid);
+ }
+ catch (const SALOME::SALOME_Exception & ex)
+ {
+ _errors = ex.details.text.in();
+ return false;
+ }
+ catch (const CORBA::SystemException& ex)
+ {
+ _errors = "Receive CORBA SystemException.";
+ return false;
+ }
+
+ bool ret(false);
+ while(true)
+ {
+ PyRun_SimpleString("import time ; time.sleep(10)");
+ char *state(sl->getJobState(_jobid));//"CREATED", "IN_PROCESS", "QUEUED", "RUNNING", "PAUSED", "FINISHED" or "FAILED"
+ std::string sstate(state);
+ CORBA::string_free(state);
+ if(sstate=="FINISHED" || sstate=="FAILED")
+ {
+ ret=sstate=="FINISHED";
+ break;
+ }
+ }
+ sl->getJobResults(_jobid,cli.getLocalWorkingDir().c_str());
+ //
+ try
+ {
+ std::ostringstream oss; oss << "import os" << std::endl << "p=os.path.join(\"" << cli.getLocalWorkingDir() << "\",\"" << _jobName << "\")" << std::endl;
+ oss << "if not os.path.exists(p):\n return None\n";
+ oss << "f=open(p,\"r\")" << std::endl;
+ oss << "return eval(f.read())";
+ std::string zeInput(oss.str());
+ AutoPyRef func(YACS::ENGINE::evalPy("fetch",zeInput));
+ AutoPyRef val(YACS::ENGINE::evalFuncPyWithNoParams(func));
+ if(!PyList_Check(val))
+ throw YACS::Exception("Fetched file does not contain a list !");
+ Py_ssize_t sz(PyList_Size(val));
+ _res.resize(sz);
+ for(Py_ssize_t i=0;i<sz;i++)
+ {
+ std::vector<double>& res0(_res[i]);
+ PyObject *elt0(PyList_GetItem(val,i));
+ if(!PyList_Check(elt0))
+ throw YACS::Exception("Fetched file does contain a list of list !");
+ Py_ssize_t sz0(PyList_Size(elt0)); res0.resize(sz0);
+ for(Py_ssize_t j=0;j<sz0;j++)
+ {
+ PyObject *elt1(PyList_GetItem(elt0,j));
+ res0[j]=PyFloat_AsDouble(elt1);
+ }
+ }
+ // cleanup
+ std::ostringstream oss1; oss1 << "import os" << std::endl << "p=os.path.join(\"" << cli.getLocalWorkingDir() << "\",\"" << _jobName << "\") ; os.remove(p)" << std::endl;
+ std::string s1(oss1.str());
+ PyRun_SimpleString(s1.c_str());
+ if(!params.getFetchRemoteDirForClusterStatus())
+ {
+ std::ostringstream oss2; oss2 << "import os,shutil" << std::endl << "p=os.path.join(\"" << cli.getLocalWorkingDir() << "\",\"logs\") ; shutil.rmtree(p)" << std::endl;
+ std::string s2(oss2.str());
+ PyRun_SimpleString(s2.c_str());
+ }
+ }
+ catch(YACS::Exception& e)
+ {
+ _errors=e.what();
+ return false;
+ }
+ //
+ return ret;
+}
+
+std::string YACSEvalYFXGraphGenCluster::getErrors()const
+{
+ return _errors;
+}
+
+std::vector<YACSEvalSeqAny *> YACSEvalYFXGraphGenCluster::getResults() const
+{
+ std::size_t sz(_res.size());
+ std::vector<YACSEvalSeqAny *> ret(sz);
+ for(std::size_t i=0;i<sz;i++)
+ {
+ YACS::AutoCppPtr<YACSEvalSeqAnyDouble> elt(new YACSEvalSeqAnyDouble(_res[i]));
+ ret[i]=elt.dettach();
+ }
+ return ret;