Salome HOME
Successful first launch of scheme on cluster.
[modules/yacs.git] / src / evalyfx / YACSEvalYFXPattern.cxx
index 7afb739d05cb70ed440fefd3d742e2bef8f200c5..8f75f4ebd1fbf91d3ec3a28212046a01930dfc67 100644 (file)
@@ -21,7 +21,9 @@
 #include "YACSEvalYFXPattern.hxx"
 #include "YACSEvalResource.hxx"
 #include "YACSEvalSeqAny.hxx"
+#include "YACSEvalSession.hxx"
 #include "YACSEvalObserver.hxx"
+#include "YACSEvalSessionInternal.hxx"
 #include "YACSEvalAutoPtr.hxx"
 
 #include "ElementaryNode.hxx"
@@ -38,6 +40,8 @@
 #include "PythonNode.hxx"
 #include "InlineNode.hxx"
 #include "ServiceNode.hxx"
+#include "PyStdout.hxx"
+#include "AutoGIL.hxx"
 
 #include "ResourcesManager.hxx"
 
@@ -47,6 +51,9 @@
 #include <sstream>
 #include <iterator>
 
+////
+#include <stdlib.h>
+
 const char YACSEvalYFXPattern::DFT_PROC_NAME[]="YFX";
 
 const char YACSEvalYFXPattern::ST_OK[]="ALL_OK";
@@ -55,10 +62,21 @@ const char YACSEvalYFXPattern::ST_FAILED[]="SOME_SAMPLES_FAILED_AND_ALL_OF_THEM_
 
 const char YACSEvalYFXPattern::ST_ERROR[]="SOME_SAMPLES_FAILED_BUT_IMPOSSIBLE_TO_CONCLUDE_ON_THEM";
 
+const std::size_t YACSEvalYFXPattern::MAX_LGTH_OF_INP_DUMP=10000;
+
 const char YACSEvalYFXRunOnlyPattern::FIRST_FE_SUBNODE_NAME[]="Bloc";
 
 const char YACSEvalYFXRunOnlyPattern::GATHER_NODE_NAME[]="__gather__";
 
+class MyAutoThreadSaver
+{
+public:
+  MyAutoThreadSaver():_save(PyEval_SaveThread()) { }
+  ~MyAutoThreadSaver() { PyEval_RestoreThread(_save); }
+private:
+  PyThreadState *_save;
+};
+
 std::vector< YACSEvalInputPort *> YACSEvalYFXPattern::getFreeInputPorts() const
 {
   std::size_t sz(_inputs.size());
@@ -483,6 +501,49 @@ YACS::ENGINE::Proc *YACSEvalYFXRunOnlyPattern::getUndergroundGeneratedGraph() co
   return _generatedGraph;
 }
 
+std::string YACSEvalYFXRunOnlyPattern::getErrorDetailsInCaseOfFailure() const
+{
+  std::string st(getStatusOfRunStr());//test if a run has occurred.
+  if(st==ST_OK)
+    throw YACS::Exception("YACSEvalYFXRunOnlyPattern::getErrorDetailsInCaseOfFailure : The execution of scheme has been carried out to the end without any problem !");
+  // All the problem can only comes from foreach -> scan it
+  YACS::ENGINE::ForEachLoop *fe(findTopForEach());
+  YACS::ENGINE::NodeStateNameMap nsm;
+  unsigned nbB(fe->getNumberOfBranchesCreatedDyn());
+  std::ostringstream oss;
+  for(unsigned j=0;j<nbB;j++)
+    {
+      YACS::ENGINE::Node *nn(fe->getChildByNameExec(FIRST_FE_SUBNODE_NAME,j));
+      YACS::ENGINE::Bloc *nnc(dynamic_cast<YACS::ENGINE::Bloc *>(nn));
+      if(!nnc)
+        throw YACS::Exception("YACSEvalYFXRunOnlyPattern::getErrorDetailsInCaseOfFailure : internal error 1 ! The direct son of main foreach is expected to be a Bloc !");
+      if(nnc->getState()==YACS::DONE)
+        continue;
+      std::list< YACS::ENGINE::ElementaryNode *> fec(nnc->getRecursiveConstituents());
+      for(std::list< YACS::ENGINE::ElementaryNode *>::reverse_iterator it=fec.rbegin();it!=fec.rend();it++)
+        {
+          YACS::StatesForNode st0((*it)->getState());
+          if(st0!=YACS::DONE)
+            {
+              oss << "NODE = " << nnc->getChildName(*it) << std::endl;
+              oss << "STATUS = " << nsm[st0] << std::endl;
+              oss << "BRANCH ID = " << j << std::endl;
+              std::list<YACS::ENGINE::InputPort *> inps((*it)->getSetOfInputPort());
+              for(std::list<YACS::ENGINE::InputPort *>::const_iterator it=inps.begin();it!=inps.end();it++)
+                {
+                  std::string d((*it)->getHumanRepr());
+                  if(d.size()>10000)
+                    d=d.substr(0,MAX_LGTH_OF_INP_DUMP);
+                  oss << "INPUT \"" << (*it)->getName() << "\" = " << d << std::endl;
+                }
+              oss << "DETAILS = " << std::endl;
+              oss << (*it)->getErrorDetails();
+            }
+        }
+    }
+  return oss.str();
+}
+
 std::string YACSEvalYFXRunOnlyPattern::getStatusOfRunStr() const
 {
   YACS::StatesForNode st(_generatedGraph->getState());
@@ -566,15 +627,7 @@ std::vector<YACSEvalSeqAny *> YACSEvalYFXRunOnlyPattern::getResultsInCaseOfFailu
     }
   getStatusOfRunStr();// To check that the status is recognized.
   std::list<YACS::ENGINE::Node *> lns(_generatedGraph->edGetDirectDescendants());
-  YACS::ENGINE::ForEachLoop *fe(0);
-  for(std::list<YACS::ENGINE::Node *>::const_iterator it=lns.begin();it!=lns.end();it++)
-    {
-      fe=dynamic_cast<YACS::ENGINE::ForEachLoop *>(*it);
-      if(fe)
-        break;
-    }
-  if(!fe)
-    throw YACS::Exception("YACSEvalYFXRunOnlyPattern::getResultsInCaseOfFailure : internal error 2 ! ForEach is not accessible !");
+  YACS::ENGINE::ForEachLoop *fe(findTopForEach());
   //
   YACS::ENGINE::Executor exe;
   std::vector<YACS::ENGINE::SequenceAny *> outputs;
@@ -611,6 +664,79 @@ void YACSEvalYFXRunOnlyPattern::emitStart() const
   obs->notifyNumberOfSamplesToEval(getBoss(),_FEInGeneratedGraph->getNbOfElementsToBeProcessed());
 }
 
+bool YACSEvalYFXRunOnlyPattern::go(bool stopASAP, YACSEvalSession *session) const
+{
+  emitStart();
+  if(getResourcesInternal()->isInteractive())
+    {
+      YACS::ENGINE::Executor exe;
+      exe.setKeepGoingProperty(!stopASAP);
+      {
+        MyAutoThreadSaver locker;
+        exe.RunW(getUndergroundGeneratedGraph());
+      }
+      return getUndergroundGeneratedGraph()->getState()==YACS::DONE;
+    }
+  else
+    {
+      const char EFXGenFileName[]="EFXGenFileName";
+      const char EFXGenContent[]="import getpass,datetime,os\nn=datetime.datetime.now()\nreturn os.path.join(os.path.sep,\"tmp\",\"EvalYFX_%s_%s_%s.xml\"%(getpass.getuser(),n.strftime(\"%d%b%y\"),n.strftime(\"%H%M%S\")))";
+      const char EFXGenContent2[]="import getpass,datetime\nn=datetime.datetime.now()\nreturn \"EvalYFX_%s_%s_%s\"%(getpass.getuser(),n.strftime(\"%d%b%y\"),n.strftime(\"%H%M%S\"))";
+      //
+      YACS::ENGINE::AutoPyRef func(YACS::ENGINE::evalPy(EFXGenFileName,EFXGenContent));
+      YACS::ENGINE::AutoPyRef val(YACS::ENGINE::evalFuncPyWithNoParams(func));
+      std::string locSchemaFile(PyString_AsString(val));
+      getUndergroundGeneratedGraph()->saveSchema(locSchemaFile);
+      func=YACS::ENGINE::evalPy(EFXGenFileName,EFXGenContent2);
+      val=YACS::ENGINE::evalFuncPyWithNoParams(func);
+      std::string jobName(PyString_AsString(val));
+      YACSEvalListOfResources *rss(getResourcesInternal());
+      const YACSEvalParamsForCluster& cli(rss->getAddParamsForCluster());
+      std::vector<std::string> machines(rss->getAllChosenMachines());
+      if(machines.size()!=1)
+        throw YACS::Exception("YACSEvalYFXRunOnlyPattern::go : internal error ! In batch mode and not exactly one machine !");
+      Engines::SalomeLauncher_var sl(session->getInternal()->goFetchingSalomeLauncherInNS());
+      Engines::ResourceParameters rr;
+      rr.name=CORBA::string_dup(machines[0].c_str());
+      rr.hostname=CORBA::string_dup("");
+      rr.can_launch_batch_jobs=true;
+      rr.can_run_containers=true;
+      rr.OS=CORBA::string_dup("Linux");
+      rr.componentList.length(0);
+      rr.nb_proc=rss->getNumberOfProcsDeclared();// <- important
+      rr.mem_mb=1024;
+      rr.cpu_clock=1000;
+      rr.nb_node=1;// useless only nb_proc used.
+      rr.nb_proc_per_node=1;// useless only nb_proc used.
+      rr.policy=CORBA::string_dup("cycl");
+      rr.resList.length(0);
+      Engines::JobParameters jp;
+      jp.job_name=CORBA::string_dup(jobName.c_str());
+      jp.job_type=CORBA::string_dup("yacs_file");
+      jp.job_file=CORBA::string_dup(locSchemaFile.c_str());
+      jp.env_file=CORBA::string_dup("");
+      jp.in_files.length();
+      jp.out_files.length();
+      jp.work_directory=CORBA::string_dup(cli.getRemoteWorkingDir().c_str());
+      jp.local_directory=CORBA::string_dup(cli.getLocalWorkingDir().c_str());
+      jp.result_directory=CORBA::string_dup(cli.getLocalWorkingDir().c_str());
+      jp.maximum_duration=CORBA::string_dup(cli.getMaxDuration().c_str());
+      jp.resource_required=rr;
+      jp.queue=CORBA::string_dup("");
+      jp.exclusive=false;
+      jp.mem_per_cpu=rr.mem_mb;
+      jp.wckey=CORBA::string_dup(cli.getWCKey().c_str());
+      jp.extra_params=CORBA::string_dup("");
+      jp.specific_parameters.length(0);
+      jp.launcher_file=CORBA::string_dup("");
+      jp.launcher_args=CORBA::string_dup("");
+      CORBA::Long jobid(sl->createJob(jp));
+      sl->launchJob(jobid);
+      std::cerr << "*** " << jobName << " -> " << jobid << std::endl;
+      return false;
+    }
+}
+
 bool YACSEvalYFXRunOnlyPattern::IsMatching(YACS::ENGINE::Proc *scheme, YACS::ENGINE::ComposedNode *& runNode)
 {
   std::list<YACS::ENGINE::Node *> nodes(scheme->getChildren());
@@ -691,3 +817,17 @@ void YACSEvalYFXRunOnlyPattern::buildOutputPorts()
     }
 }
 
+YACS::ENGINE::ForEachLoop *YACSEvalYFXRunOnlyPattern::findTopForEach() const
+{
+  std::list<YACS::ENGINE::Node *> lns(_generatedGraph->edGetDirectDescendants());
+  YACS::ENGINE::ForEachLoop *fe(0);
+  for(std::list<YACS::ENGINE::Node *>::const_iterator it=lns.begin();it!=lns.end();it++)
+    {
+      fe=dynamic_cast<YACS::ENGINE::ForEachLoop *>(*it);
+      if(fe)
+        break;
+    }
+  if(!fe)
+    throw YACS::Exception("YACSEvalYFXRunOnlyPattern::findTopForEach : internal error 2 ! ForEach is not accessible !");
+  return fe;
+}