#include "YACSEvalYFXPattern.hxx"
#include "YACSEvalResource.hxx"
#include "YACSEvalSeqAny.hxx"
+#include "YACSEvalSession.hxx"
#include "YACSEvalObserver.hxx"
+#include "YACSEvalSessionInternal.hxx"
#include "YACSEvalAutoPtr.hxx"
#include "ElementaryNode.hxx"
#include "PythonNode.hxx"
#include "InlineNode.hxx"
#include "ServiceNode.hxx"
+#include "PyStdout.hxx"
+#include "AutoGIL.hxx"
#include "ResourcesManager.hxx"
#include <sstream>
#include <iterator>
+////
+#include <stdlib.h>
+
const char YACSEvalYFXPattern::DFT_PROC_NAME[]="YFX";
const char YACSEvalYFXPattern::ST_OK[]="ALL_OK";
const char YACSEvalYFXPattern::ST_ERROR[]="SOME_SAMPLES_FAILED_BUT_IMPOSSIBLE_TO_CONCLUDE_ON_THEM";
+const std::size_t YACSEvalYFXPattern::MAX_LGTH_OF_INP_DUMP=10000;
+
const char YACSEvalYFXRunOnlyPattern::FIRST_FE_SUBNODE_NAME[]="Bloc";
const char YACSEvalYFXRunOnlyPattern::GATHER_NODE_NAME[]="__gather__";
+class MyAutoThreadSaver
+{
+public:
+ MyAutoThreadSaver():_save(PyEval_SaveThread()) { }
+ ~MyAutoThreadSaver() { PyEval_RestoreThread(_save); }
+private:
+ PyThreadState *_save;
+};
+
std::vector< YACSEvalInputPort *> YACSEvalYFXPattern::getFreeInputPorts() const
{
std::size_t sz(_inputs.size());
return _generatedGraph;
}
+std::string YACSEvalYFXRunOnlyPattern::getErrorDetailsInCaseOfFailure() const
+{
+ std::string st(getStatusOfRunStr());//test if a run has occurred.
+ if(st==ST_OK)
+ throw YACS::Exception("YACSEvalYFXRunOnlyPattern::getErrorDetailsInCaseOfFailure : The execution of scheme has been carried out to the end without any problem !");
+ // All the problem can only comes from foreach -> scan it
+ YACS::ENGINE::ForEachLoop *fe(findTopForEach());
+ YACS::ENGINE::NodeStateNameMap nsm;
+ unsigned nbB(fe->getNumberOfBranchesCreatedDyn());
+ std::ostringstream oss;
+ for(unsigned j=0;j<nbB;j++)
+ {
+ YACS::ENGINE::Node *nn(fe->getChildByNameExec(FIRST_FE_SUBNODE_NAME,j));
+ YACS::ENGINE::Bloc *nnc(dynamic_cast<YACS::ENGINE::Bloc *>(nn));
+ if(!nnc)
+ throw YACS::Exception("YACSEvalYFXRunOnlyPattern::getErrorDetailsInCaseOfFailure : internal error 1 ! The direct son of main foreach is expected to be a Bloc !");
+ if(nnc->getState()==YACS::DONE)
+ continue;
+ std::list< YACS::ENGINE::ElementaryNode *> fec(nnc->getRecursiveConstituents());
+ for(std::list< YACS::ENGINE::ElementaryNode *>::reverse_iterator it=fec.rbegin();it!=fec.rend();it++)
+ {
+ YACS::StatesForNode st0((*it)->getState());
+ if(st0!=YACS::DONE)
+ {
+ oss << "NODE = " << nnc->getChildName(*it) << std::endl;
+ oss << "STATUS = " << nsm[st0] << std::endl;
+ oss << "BRANCH ID = " << j << std::endl;
+ std::list<YACS::ENGINE::InputPort *> inps((*it)->getSetOfInputPort());
+ for(std::list<YACS::ENGINE::InputPort *>::const_iterator it=inps.begin();it!=inps.end();it++)
+ {
+ std::string d((*it)->getHumanRepr());
+ if(d.size()>10000)
+ d=d.substr(0,MAX_LGTH_OF_INP_DUMP);
+ oss << "INPUT \"" << (*it)->getName() << "\" = " << d << std::endl;
+ }
+ oss << "DETAILS = " << std::endl;
+ oss << (*it)->getErrorDetails();
+ }
+ }
+ }
+ return oss.str();
+}
+
std::string YACSEvalYFXRunOnlyPattern::getStatusOfRunStr() const
{
YACS::StatesForNode st(_generatedGraph->getState());
}
getStatusOfRunStr();// To check that the status is recognized.
std::list<YACS::ENGINE::Node *> lns(_generatedGraph->edGetDirectDescendants());
- YACS::ENGINE::ForEachLoop *fe(0);
- for(std::list<YACS::ENGINE::Node *>::const_iterator it=lns.begin();it!=lns.end();it++)
- {
- fe=dynamic_cast<YACS::ENGINE::ForEachLoop *>(*it);
- if(fe)
- break;
- }
- if(!fe)
- throw YACS::Exception("YACSEvalYFXRunOnlyPattern::getResultsInCaseOfFailure : internal error 2 ! ForEach is not accessible !");
+ YACS::ENGINE::ForEachLoop *fe(findTopForEach());
//
YACS::ENGINE::Executor exe;
std::vector<YACS::ENGINE::SequenceAny *> outputs;
obs->notifyNumberOfSamplesToEval(getBoss(),_FEInGeneratedGraph->getNbOfElementsToBeProcessed());
}
+bool YACSEvalYFXRunOnlyPattern::go(bool stopASAP, YACSEvalSession *session) const
+{
+ emitStart();
+ if(getResourcesInternal()->isInteractive())
+ {
+ YACS::ENGINE::Executor exe;
+ exe.setKeepGoingProperty(!stopASAP);
+ {
+ MyAutoThreadSaver locker;
+ exe.RunW(getUndergroundGeneratedGraph());
+ }
+ return getUndergroundGeneratedGraph()->getState()==YACS::DONE;
+ }
+ else
+ {
+ const char EFXGenFileName[]="EFXGenFileName";
+ const char EFXGenContent[]="import getpass,datetime,os\nn=datetime.datetime.now()\nreturn os.path.join(os.path.sep,\"tmp\",\"EvalYFX_%s_%s_%s.xml\"%(getpass.getuser(),n.strftime(\"%d%b%y\"),n.strftime(\"%H%M%S\")))";
+ const char EFXGenContent2[]="import getpass,datetime\nn=datetime.datetime.now()\nreturn \"EvalYFX_%s_%s_%s\"%(getpass.getuser(),n.strftime(\"%d%b%y\"),n.strftime(\"%H%M%S\"))";
+ //
+ YACS::ENGINE::AutoPyRef func(YACS::ENGINE::evalPy(EFXGenFileName,EFXGenContent));
+ YACS::ENGINE::AutoPyRef val(YACS::ENGINE::evalFuncPyWithNoParams(func));
+ std::string locSchemaFile(PyString_AsString(val));
+ getUndergroundGeneratedGraph()->saveSchema(locSchemaFile);
+ func=YACS::ENGINE::evalPy(EFXGenFileName,EFXGenContent2);
+ val=YACS::ENGINE::evalFuncPyWithNoParams(func);
+ std::string jobName(PyString_AsString(val));
+ YACSEvalListOfResources *rss(getResourcesInternal());
+ const YACSEvalParamsForCluster& cli(rss->getAddParamsForCluster());
+ std::vector<std::string> machines(rss->getAllChosenMachines());
+ if(machines.size()!=1)
+ throw YACS::Exception("YACSEvalYFXRunOnlyPattern::go : internal error ! In batch mode and not exactly one machine !");
+ Engines::SalomeLauncher_var sl(session->getInternal()->goFetchingSalomeLauncherInNS());
+ Engines::ResourceParameters rr;
+ rr.name=CORBA::string_dup(machines[0].c_str());
+ rr.hostname=CORBA::string_dup("");
+ rr.can_launch_batch_jobs=true;
+ rr.can_run_containers=true;
+ rr.OS=CORBA::string_dup("Linux");
+ rr.componentList.length(0);
+ rr.nb_proc=rss->getNumberOfProcsDeclared();// <- important
+ rr.mem_mb=1024;
+ rr.cpu_clock=1000;
+ rr.nb_node=1;// useless only nb_proc used.
+ rr.nb_proc_per_node=1;// useless only nb_proc used.
+ rr.policy=CORBA::string_dup("cycl");
+ rr.resList.length(0);
+ Engines::JobParameters jp;
+ jp.job_name=CORBA::string_dup(jobName.c_str());
+ jp.job_type=CORBA::string_dup("yacs_file");
+ jp.job_file=CORBA::string_dup(locSchemaFile.c_str());
+ jp.env_file=CORBA::string_dup("");
+ jp.in_files.length();
+ jp.out_files.length();
+ jp.work_directory=CORBA::string_dup(cli.getRemoteWorkingDir().c_str());
+ jp.local_directory=CORBA::string_dup(cli.getLocalWorkingDir().c_str());
+ jp.result_directory=CORBA::string_dup(cli.getLocalWorkingDir().c_str());
+ jp.maximum_duration=CORBA::string_dup(cli.getMaxDuration().c_str());
+ jp.resource_required=rr;
+ jp.queue=CORBA::string_dup("");
+ jp.exclusive=false;
+ jp.mem_per_cpu=rr.mem_mb;
+ jp.wckey=CORBA::string_dup(cli.getWCKey().c_str());
+ jp.extra_params=CORBA::string_dup("");
+ jp.specific_parameters.length(0);
+ jp.launcher_file=CORBA::string_dup("");
+ jp.launcher_args=CORBA::string_dup("");
+ CORBA::Long jobid(sl->createJob(jp));
+ sl->launchJob(jobid);
+ std::cerr << "*** " << jobName << " -> " << jobid << std::endl;
+ return false;
+ }
+}
+
bool YACSEvalYFXRunOnlyPattern::IsMatching(YACS::ENGINE::Proc *scheme, YACS::ENGINE::ComposedNode *& runNode)
{
std::list<YACS::ENGINE::Node *> nodes(scheme->getChildren());
}
}
+YACS::ENGINE::ForEachLoop *YACSEvalYFXRunOnlyPattern::findTopForEach() const
+{
+ std::list<YACS::ENGINE::Node *> lns(_generatedGraph->edGetDirectDescendants());
+ YACS::ENGINE::ForEachLoop *fe(0);
+ for(std::list<YACS::ENGINE::Node *>::const_iterator it=lns.begin();it!=lns.end();it++)
+ {
+ fe=dynamic_cast<YACS::ENGINE::ForEachLoop *>(*it);
+ if(fe)
+ break;
+ }
+ if(!fe)
+ throw YACS::Exception("YACSEvalYFXRunOnlyPattern::findTopForEach : internal error 2 ! ForEach is not accessible !");
+ return fe;
+}