]> SALOME platform Git repositories - modules/kernel.git/commitdiff
Salome HOME
Merge tag 'V8_5_0' into omu/Launcher9 omu/Launcher9
authorOvidiu Mircescu <ovidiu.mircescu@edf.fr>
Thu, 14 Jun 2018 09:34:07 +0000 (11:34 +0200)
committerOvidiu Mircescu <ovidiu.mircescu@edf.fr>
Thu, 14 Jun 2018 09:34:07 +0000 (11:34 +0200)
18 files changed:
bin/runSession.py
idl/SALOME_Launcher.idl
src/Launcher/CMakeLists.txt
src/Launcher/Launcher.cxx
src/Launcher/Launcher.hxx
src/Launcher/Launcher_Job_Command.cxx
src/Launcher/Launcher_Job_SALOME.cxx
src/Launcher/Launcher_XML_Persistence.cxx
src/Launcher/Launcher_XML_Persistence.hxx
src/Launcher/SALOME_Launcher.cxx
src/Launcher/SALOME_Launcher.hxx
src/Launcher/SALOME_LauncherServer.cxx
src/Launcher/Test/CMakeLists.txt
src/Launcher/Test/CTestTestfileInstall.cmake
src/Launcher/Test/launcher_use_case.py [new file with mode: 0644]
src/Launcher/Test/test_stress.sh [new file with mode: 0755]
src/Launcher/launcher_proxy.py [new file with mode: 0755]
src/ResourcesManager/ResourcesManager.cxx

index 8045a62e6ad06e7a4e58a0c69118ef9f3e76132a..bf0c5c6b7a518b52c9bf84e80bca12a6fcb832a2 100644 (file)
@@ -193,6 +193,7 @@ def __runLocalSession(command):
     for cmd in command:
       single_cmd = cmd.strip().split(' ')
       any_error = False
+      error_code = 1
       try:
         proc = subprocess.Popen(single_cmd)
         (stdoutdata, stderrdata) = proc.communicate() # Wait for process to terminate
@@ -203,6 +204,7 @@ def __runLocalSession(command):
 
         if proc.returncode != 0:
           any_error = True
+          error_code = proc.returncode
       except:
           any_error = True
           pass
@@ -213,7 +215,7 @@ def __runLocalSession(command):
           sys.stdout.write("".join(outmsg))
         if errmsg:
           sys.stderr.write("".join(errmsg))
-        sys.exit(1)
+        sys.exit(error_code)
 
     return 0
   else:
index 954b94e1ff06534ed10456774cfc3d29773bf213..4da1c6c5bc0dc8ca46f2098caf3db8e0c816b2c1 100644 (file)
@@ -285,6 +285,14 @@ interface SalomeLauncher
   //! Kill the job and remove it from the jobs list
   void   removeJob    (in long job_id)                           raises (SALOME::SALOME_Exception);
 
+  //! Get the job's serialization string
+  string dumpJob(in long job_id) raises (SALOME::SALOME_Exception);
+  //! Create a job from its serialization string
+  /*! \param dumpedJob Serialization string returned by dumpJob.
+      \return          Job id
+  */
+  long restoreJob(in string dumpedJob) raises (SALOME::SALOME_Exception);
+
   // Useful methods
   long    createJobWithFile(in string xmlJobFile, in string clusterName) raises (SALOME::SALOME_Exception);
   boolean testBatch        (in ResourceParameters params)                raises (SALOME::SALOME_Exception);
@@ -307,7 +315,6 @@ interface SalomeLauncher
   void loadJobs(in string jobs_file) raises (SALOME::SALOME_Exception);
   //! Save the current list of jobs in an xml file.
   void saveJobs(in string jobs_file) raises (SALOME::SALOME_Exception);
-
 };
 
 };
index def84e2ae5287bd88f5e321972ca66c6ffdece2f..2f661c8e6b101e72f1614f3b48135605f34a0844 100755 (executable)
@@ -130,5 +130,11 @@ SET(COMMON_HEADERS_HXX
   SALOME_Launcher_Parser.hxx
   SALOME_Launcher_defs.hxx
 )
+
+SET(LAUNCHER_PYTHON_SCRIPTS
+  launcher_proxy.py
+)
+
 INSTALL(FILES ${COMMON_HEADERS_HXX} DESTINATION ${SALOME_INSTALL_HEADERS})
 INSTALL(FILES testLauncher.xml DESTINATION ${SALOME_KERNEL_INSTALL_RES_DATA})
+SALOME_INSTALL_SCRIPTS("${LAUNCHER_PYTHON_SCRIPTS}" ${SALOME_INSTALL_PYTHON})
index 5db6b8df3006ff539e4fca657c5b40bd0f537ae6..96a1fafd0a87ce5abcfd6a59f466fb543b182944 100644 (file)
@@ -53,8 +53,6 @@ Launcher_cpp::Launcher_cpp()
 {
   LAUNCHER_MESSAGE("Launcher_cpp constructor");
   _job_cpt = 0;
-  _job_cpt_mutex = new pthread_mutex_t();
-  pthread_mutex_init(_job_cpt_mutex, NULL);
 }
 
 //=============================================================================
@@ -73,9 +71,6 @@ Launcher_cpp::~Launcher_cpp()
   for(it1=_batchmap.begin();it1!=_batchmap.end();it1++)
     delete it1->second;
 #endif
-
-  pthread_mutex_destroy(_job_cpt_mutex);
-  delete _job_cpt_mutex;
 }
 
 #ifdef WITH_LIBBATCH
@@ -90,18 +85,18 @@ Launcher_cpp::createJob(Launcher::Job * new_job)
 {
   LAUNCHER_MESSAGE("Creating a new job");
   // Add job to the jobs map
-  pthread_mutex_lock(_job_cpt_mutex);
   new_job->setNumber(_job_cpt);
   _job_cpt++;
-  pthread_mutex_unlock(_job_cpt_mutex);
   std::map<int, Launcher::Job *>::const_iterator it_job = _launcher_job_map.find(new_job->getNumber());
   if (it_job == _launcher_job_map.end())
+  {
     _launcher_job_map[new_job->getNumber()] = new_job;
+  }
   else
   {
-    LAUNCHER_INFOS("A job as already the same id: " << new_job->getNumber());
+    LAUNCHER_INFOS("A job has already the same id: " << new_job->getNumber());
     delete new_job;
-    throw LauncherException("A job as already the same id - job is not created !");
+    throw LauncherException("A job has already the same id - job is not created !");
   }
   LAUNCHER_MESSAGE("New Job created");
 }
@@ -116,15 +111,8 @@ Launcher_cpp::launchJob(int job_id)
 {
   LAUNCHER_MESSAGE("Launch a job");
 
-  // Check if job exist
-  std::map<int, Launcher::Job *>::const_iterator it_job = _launcher_job_map.find(job_id);
-  if (it_job == _launcher_job_map.end())
-  {
-    LAUNCHER_INFOS("Cannot find the job, is it created ? job number: " << job_id);
-    throw LauncherException("Cannot find the job, is it created ?");
-  }
-
-  Launcher::Job * job = it_job->second;
+  // Check if job exists
+  Launcher::Job * job = findJob(job_id);
 
   // Check job state (cannot launch a job already launched...)
   if (job->getState() != "CREATED")
@@ -133,17 +121,10 @@ Launcher_cpp::launchJob(int job_id)
     throw LauncherException("Bad state of the job: " + job->getState());
   }
 
-  // Third step search batch manager for the job into the map -> instantiate one if does not exist
-#ifdef WITH_LIBBATCH
-  std::map<int, Batch::BatchManager *>::const_iterator it = _batchmap.find(job_id);
-  if(it == _batchmap.end())
-  {
-    createBatchManagerForJob(job);
-  }
-#endif
+  Batch::BatchManager * bm = getBatchManager(job);
 
   try {
-    Batch::JobId batch_manager_job_id = _batchmap[job_id]->submitJob(*(job->getBatchJob()));
+    Batch::JobId batch_manager_job_id = bm->submitJob(*(job->getBatchJob()));
     job->setBatchManagerJobId(batch_manager_job_id);
     job->setState("QUEUED");
     job->setReference(batch_manager_job_id.getReference());
@@ -167,14 +148,7 @@ Launcher_cpp::getJobState(int job_id)
   LAUNCHER_MESSAGE("Get job state");
 
   // Check if job exist
-  std::map<int, Launcher::Job *>::const_iterator it_job = _launcher_job_map.find(job_id);
-  if (it_job == _launcher_job_map.end())
-  {
-    LAUNCHER_INFOS("Cannot find the job, is it created ? job number: " << job_id);
-    throw LauncherException("Cannot find the job, is it created ?");
-  }
-
-  Launcher::Job * job = it_job->second;
+  Launcher::Job * job = findJob(job_id);
 
   std::string state;
   try
@@ -201,14 +175,7 @@ Launcher_cpp::getAssignedHostnames(int job_id)
   LAUNCHER_MESSAGE("Get job assigned hostnames");
 
   // Check if job exist
-  std::map<int, Launcher::Job *>::const_iterator it_job = _launcher_job_map.find(job_id);
-  if (it_job == _launcher_job_map.end())
-  {
-    LAUNCHER_INFOS("Cannot find the job, is it created ? job number: " << job_id);
-    throw LauncherException("Cannot find the job, is it created ?");
-  }
-
-  Launcher::Job * job = it_job->second;
+  Launcher::Job * job = findJob(job_id);
   std::string assigned_hostnames = job->getAssignedHostnames();
 
   return assigned_hostnames.c_str();
@@ -224,15 +191,7 @@ Launcher_cpp::getJobResults(int job_id, std::string directory)
 {
   LAUNCHER_MESSAGE("Get Job results");
 
-  // Check if job exist
-  std::map<int, Launcher::Job *>::const_iterator it_job = _launcher_job_map.find(job_id);
-  if (it_job == _launcher_job_map.end())
-  {
-    LAUNCHER_INFOS("Cannot find the job, is it created ? job number: " << job_id);
-    throw LauncherException("Cannot find the job, is it created ?");
-  }
-
-  Launcher::Job * job = it_job->second;
+  Launcher::Job * job = findJob(job_id);
   std::string resource_name = job->getResourceDefinition().Name;
   try 
   {
@@ -259,15 +218,7 @@ Launcher_cpp::clearJobWorkingDir(int job_id)
 {
   LAUNCHER_MESSAGE("Clear the remote working directory");
 
-  // Check if job exist
-  std::map<int, Launcher::Job *>::const_iterator it_job = _launcher_job_map.find(job_id);
-  if (it_job == _launcher_job_map.end())
-  {
-    LAUNCHER_INFOS("Cannot find the job, is it created ? job number: " << job_id);
-    throw LauncherException("Cannot find the job, is it created ?");
-  }
-
-  Launcher::Job * job = it_job->second;
+  Launcher::Job * job = findJob(job_id);
   try
   {
     _batchmap[job_id]->clearWorkingDir(*(job->getBatchJob()));
@@ -291,15 +242,7 @@ Launcher_cpp::getJobDumpState(int job_id, std::string directory)
   bool rtn;
   LAUNCHER_MESSAGE("Get Job dump state");
 
-  // Check if job exist
-  std::map<int, Launcher::Job *>::const_iterator it_job = _launcher_job_map.find(job_id);
-  if (it_job == _launcher_job_map.end())
-  {
-    LAUNCHER_INFOS("Cannot find the job, is it created ? job number: " << job_id);
-    throw LauncherException("Cannot find the job, is it created ?");
-  }
-
-  Launcher::Job * job = it_job->second;
+  Launcher::Job * job = findJob(job_id);
   std::string resource_name = job->getResourceDefinition().Name;
   try 
   {
@@ -330,15 +273,7 @@ Launcher_cpp::getJobWorkFile(int job_id,
   bool rtn;
   LAUNCHER_MESSAGE("Get working file " << work_file);
 
-  // Check if job exist
-  std::map<int, Launcher::Job *>::const_iterator it_job = _launcher_job_map.find(job_id);
-  if (it_job == _launcher_job_map.end())
-  {
-    LAUNCHER_INFOS("Cannot find the job, is it created ? job number: " << job_id);
-    throw LauncherException("Cannot find the job, is it created ?");
-  }
-
-  Launcher::Job * job = it_job->second;
+  Launcher::Job * job = findJob(job_id);
   std::string resource_name = job->getResourceDefinition().Name;
   try
   {
@@ -389,15 +324,44 @@ Launcher_cpp::stopJob(int job_id)
 {
   LAUNCHER_MESSAGE("Stop Job");
 
-  // Check if job exist
-  std::map<int, Launcher::Job *>::iterator it_job = _launcher_job_map.find(job_id);
-  if (it_job == _launcher_job_map.end())
+  Launcher::Job * job = findJob(job_id);
+  job->stopJob();
+}
+
+std::string
+Launcher_cpp::dumpJob(int job_id)
+{
+  LAUNCHER_MESSAGE("dump Job");
+
+  Launcher::Job * job = findJob(job_id);
+  return Launcher::XML_Persistence::dumpJob(*job);
+}
+
+int
+Launcher_cpp::restoreJob(const std::string& dumpedJob)
+{
+  LAUNCHER_MESSAGE("restore Job");
+  Launcher::Job * new_job=NULL;
+  int jobId = -1;
+  try
   {
-    LAUNCHER_INFOS("Cannot find the job, is it created ? job number: " << job_id);
-    throw LauncherException("Cannot find the job, is it created ?");
+    {
+      new_job = Launcher::XML_Persistence::createJobFromString(dumpedJob);
+    }
+    if(new_job)
+    {
+      jobId = addJob(new_job);
+      if(jobId < 0)
+        delete new_job;
+    }
   }
-
-  it_job->second->stopJob();
+  catch(const LauncherException &ex)
+  {
+    LAUNCHER_INFOS("Cannot load the job. Exception: " << ex.msg.c_str());
+    if(new_job)
+      delete new_job;
+  }
+  return jobId;
 }
 
 //=============================================================================
@@ -649,6 +613,24 @@ Launcher_cpp::stopJob(int job_id)
                           "(libBatch was not present at compilation time)");
 }
 
+std::string
+Launcher_cpp::dumpJob(int job_id)
+{
+  LAUNCHER_INFOS("Launcher compiled without LIBBATCH - cannot dump job!!!");
+  throw LauncherException("Method Launcher_cpp::dumpJob is not available "
+                          "(libBatch was not present at compilation time)");
+  return "";
+}
+
+int
+Launcher_cpp::restoreJob(const std::string& dumpedJob)
+{
+  LAUNCHER_INFOS("Launcher compiled without LIBBATCH - cannot restore job!!!");
+  throw LauncherException("Method Launcher_cpp::restoreJob is not available "
+                          "(libBatch was not present at compilation time)");
+  return 0;
+}
+
 long 
 Launcher_cpp::createJobWithFile( const std::string xmlExecuteFile, std::string clusterName)
 {
@@ -702,9 +684,11 @@ Launcher_cpp::getJobs()
   return _launcher_job_map;
 }
 
-void 
-Launcher_cpp::createBatchManagerForJob(Launcher::Job * job)
+#ifdef WITH_LIBBATCH
+Batch::BatchManager*
+Launcher_cpp::getBatchManager(Launcher::Job * job)
 {
+  Batch::BatchManager* result = nullptr;
   int job_id = job->getNumber();
 
   // Select a resource for the job
@@ -744,7 +728,6 @@ Launcher_cpp::createBatchManagerForJob(Launcher::Job * job)
   }
 
   // Step 2: We can now add a Factory if the resource is correctly define
-#ifdef WITH_LIBBATCH
   std::map<int, Batch::BatchManager *>::const_iterator it = _batchmap.find(job_id);
   if(it == _batchmap.end())
   {
@@ -753,8 +736,8 @@ Launcher_cpp::createBatchManagerForJob(Launcher::Job * job)
       // Warning cannot write on one line like this, because map object is constructed before
       // the method is called...
       //_batchmap[job_id] = FactoryBatchManager(resource_definition);
-      Batch::BatchManager * batch_client = FactoryBatchManager(resource_definition);
-      _batchmap[job_id] = batch_client;
+      result = FactoryBatchManager(resource_definition);
+      _batchmap[job_id] = result;
     }
     catch(const LauncherException &ex)
     {
@@ -767,28 +750,28 @@ Launcher_cpp::createBatchManagerForJob(Launcher::Job * job)
       throw LauncherException(ex.message);
     }
   }
-#endif
+  else
+    result = it->second;
+  return result;
 }
+#endif
 
 void 
 Launcher_cpp::addJobDirectlyToMap(Launcher::Job * new_job)
 {
   // Step 0: Calculated job_id
-  pthread_mutex_lock(_job_cpt_mutex);
-  int job_id = _job_cpt;
+  new_job->setNumber(_job_cpt);
   _job_cpt++;
-  new_job->setNumber(job_id);
-  pthread_mutex_unlock(_job_cpt_mutex);
 
+#ifdef WITH_LIBBATCH
   // Step 1: check if resource is already in the map
-  createBatchManagerForJob(new_job);
+  Batch::BatchManager * bm = getBatchManager(new_job);
 
   // Step 2: add the job to the batch manager
-#ifdef WITH_LIBBATCH
   try
   {
-    Batch::JobId batch_manager_job_id = _batchmap[job_id]->addJob(*(new_job->getBatchJob()),
-                                                                  new_job->getReference());
+    Batch::JobId batch_manager_job_id = bm->addJob(*(new_job->getBatchJob()),
+                                                   new_job->getReference());
     new_job->setBatchManagerJobId(batch_manager_job_id);
   }
   catch(const Batch::GenericException &ex)
@@ -800,7 +783,9 @@ Launcher_cpp::addJobDirectlyToMap(Launcher::Job * new_job)
   // Step 3: add job to launcher map
   std::map<int, Launcher::Job *>::const_iterator it_job = _launcher_job_map.find(new_job->getNumber());
   if (it_job == _launcher_job_map.end())
+  {
     _launcher_job_map[new_job->getNumber()] = new_job;
+  }
   else
   {
     LAUNCHER_INFOS("A job as already the same id: " << new_job->getNumber());
@@ -811,6 +796,50 @@ Launcher_cpp::addJobDirectlyToMap(Launcher::Job * new_job)
 #endif
 }
 
+int
+Launcher_cpp::addJob(Launcher::Job * new_job)
+{
+  string job_state = new_job->getState();
+  int jobId = -1;
+  if (job_state == "CREATED")
+  {
+    // In this case, we ignore run_part information
+    createJob(new_job);
+    jobId = new_job->getNumber();
+  }
+  else if (job_state == "QUEUED"     ||
+            job_state == "RUNNING"    ||
+            job_state == "IN_PROCESS" ||
+            job_state == "PAUSED")
+  {
+    addJobDirectlyToMap(new_job);
+    jobId = new_job->getNumber();
+
+    // We check that the BatchManager could resume the job
+#ifdef WITH_LIBBATCH
+    if (new_job->getBatchManagerJobId().getReference() != new_job->getReference())
+    {
+      LAUNCHER_INFOS("BatchManager type cannot resume a job - job state is set to ERROR");
+      new_job->setState("ERROR");
+    }
+#endif
+  }
+  else if (job_state == "FINISHED" ||
+            job_state == "FAILED"   ||
+            job_state == "ERROR")
+  {
+    // We add run_part information
+    addJobDirectlyToMap(new_job);
+    jobId = new_job->getNumber();
+  }
+  else
+  {
+    LAUNCHER_INFOS("A bad job is found, state unknown " << job_state);
+    jobId = -1;
+  }
+  return jobId;
+}
+
 list<int>
 Launcher_cpp::loadJobs(const char* jobs_file)
 {
@@ -824,47 +853,14 @@ Launcher_cpp::loadJobs(const char* jobs_file)
   for (it_job = jobs_list.begin(); it_job != jobs_list.end(); it_job++)
   {
     Launcher::Job * new_job = *it_job;
-    string job_state = new_job->getState();
-
+    int jobId = -1;
     try
     {
-      if (job_state == "CREATED")
-      {
-        // In this case, we ignore run_part information
-        createJob(new_job);
-        new_jobs_id_list.push_back(new_job->getNumber());
-      }
-      else if (job_state == "QUEUED"     ||
-               job_state == "RUNNING"    ||
-               job_state == "IN_PROCESS" ||
-               job_state == "PAUSED")
-      {
-        addJobDirectlyToMap(new_job);
-        new_jobs_id_list.push_back(new_job->getNumber());
-
-        // Step 4: We check that the BatchManager could resume
-        // the job
-#ifdef WITH_LIBBATCH
-        if (new_job->getBatchManagerJobId().getReference() != new_job->getReference())
-        {
-          LAUNCHER_INFOS("BatchManager type cannot resume a job - job state is set to ERROR");
-          new_job->setState("ERROR");
-        }
-#endif
-      }
-      else if (job_state == "FINISHED" ||
-               job_state == "FAILED"   ||
-               job_state == "ERROR")
-      {
-        // Step 2: We add run_part information
-        addJobDirectlyToMap(new_job);
-        new_jobs_id_list.push_back(new_job->getNumber());
-      }
+      jobId = addJob(new_job);
+      if(jobId >= 0)
+        new_jobs_id_list.push_back(jobId);
       else
-      {
-        LAUNCHER_INFOS("A bad job is found, state unknown " << job_state);
         delete new_job;
-      }
     }
     catch(const LauncherException &ex)
     {
@@ -881,13 +877,27 @@ Launcher_cpp::saveJobs(const char* jobs_file)
 {
   // Create a sorted list from the internal job map
   list<const Launcher::Job *> jobs_list;
-  for (int i=0; i<_job_cpt; i++)
   {
-    map<int, Launcher::Job *>::const_iterator it_job = _launcher_job_map.find(i);
-    if (it_job != _launcher_job_map.end())
-      jobs_list.push_back(it_job->second);
+    for (int i=0; i<_job_cpt; i++)
+    {
+      map<int, Launcher::Job *>::const_iterator it_job = _launcher_job_map.find(i);
+      if (it_job != _launcher_job_map.end())
+        jobs_list.push_back(it_job->second);
+    }
   }
-
   // Save the jobs in XML file
   Launcher::XML_Persistence::saveJobs(jobs_file, jobs_list);
 }
+
+Launcher::Job *
+Launcher_cpp::findJob(int job_id)
+{
+  std::map<int, Launcher::Job *>::const_iterator it_job = _launcher_job_map.find(job_id);
+  if (it_job == _launcher_job_map.end())
+  {
+    LAUNCHER_INFOS("Cannot find the job, is it created ? job number: " << job_id);
+    throw LauncherException("Cannot find the job, is it created ?");
+  }
+  Launcher::Job * job = it_job->second;
+  return job;
+}
\ No newline at end of file
index 728694eddfe5a9047d568e43baf2d18c836cf7c1..a8fbef7a8b37d4723bf0e19370dc36898b24a6ca 100644 (file)
@@ -35,8 +35,6 @@
 #include <vector>
 #include <list>
 
-#include <pthread.h>
-
 class MpiImpl;
 
 namespace Batch{
@@ -73,6 +71,8 @@ public:
   bool         getJobWorkFile(int job_id, std::string work_file, std::string directory);
   void         stopJob(int job_id);
   void         removeJob(int job_id);
+  std::string  dumpJob(int job_id);
+  int restoreJob(const std::string& dumpedJob);
 
   /*! Load the jobs from the file "jobs_file" and add them to the Launcher.
    *  Return a list with the IDs of the jobs that were successfully loaded.
@@ -85,7 +85,6 @@ public:
   // Useful methods
   long createJobWithFile(std::string xmlExecuteFile, std::string clusterName);
   std::map<int, Launcher::Job *> getJobs();
-  void createBatchManagerForJob(Launcher::Job * job);
   void addJobDirectlyToMap(Launcher::Job * new_job);
 
   // Lib methods
@@ -96,17 +95,20 @@ public:
 protected:
 
   virtual void notifyObservers(const std::string & event_name, const std::string & event_data) {}
+  int addJob(Launcher::Job * new_job);
+
+  Launcher::Job * findJob(int job_id);
 
   // Methods used by user interface methods
 #ifdef WITH_LIBBATCH
   Batch::BatchManager *FactoryBatchManager(ParserResourcesType& params);
   std::map <int, Batch::BatchManager*> _batchmap;
+  Batch::BatchManager* getBatchManager(Launcher::Job * job);
 #endif
   ParserLauncherType ParseXmlFile(std::string xmlExecuteFile);
 
   std::map <int, Launcher::Job *> _launcher_job_map;  
   int _job_cpt; // job number counter
-  pthread_mutex_t * _job_cpt_mutex; // mutex for job counter
 };
 
 #endif
index c725dc24fd3caae950a08d5034556442d057a103..b71d7373534db4fc3b575fbfd68c411c3c06476b 100644 (file)
@@ -78,6 +78,8 @@ Launcher::Job_Command::buildCommandScript(Batch::Parametre params, std::string l
 #ifndef WIN32
   launch_script_stream << "#!/bin/sh -f" << std::endl;
   launch_script_stream << "cd " << work_directory << std::endl;
+  // remove the exit code from any previous execution
+  launch_script_stream << "rm -f logs/exit_code.log" << std::endl;
   launch_script_stream << "export PYTHONPATH=" << work_directory << ":$PYTHONPATH" << std::endl;
   launch_script_stream << "export PATH=" << work_directory << ":$PATH" << std::endl;
   if (_env_file != "")
@@ -97,6 +99,10 @@ Launcher::Job_Command::buildCommandScript(Batch::Parametre params, std::string l
   }
 #endif
   launch_script_stream << runCommandString() << std::endl;
+#ifndef WIN32
+  // log the exit code
+  launch_script_stream << "echo $? > logs/exit_code.log" << std::endl;
+#endif
 
   // Return
   launch_script_stream.flush();
index ee2812c1b6f5cae12d61d3d316d198bb928ee3a9..17e7e1766056bc5a68cc16a6a7d42fb2cdf4e5f2 100644 (file)
@@ -79,6 +79,9 @@ Launcher::Job_SALOME::buildSalomeScript(Batch::Parametre params)
   // Begin of script
   launch_script_stream << "#!/bin/sh -f" << std::endl;
   launch_script_stream << "cd " << work_directory << std::endl;
+  // remove the exit code from any previous execution
+  launch_script_stream << "rm -f logs/exit_code.log" << std::endl;
+
   launch_script_stream << "export PYTHONPATH=" << work_directory << ":$PYTHONPATH" << std::endl;
   launch_script_stream << "export PATH=" << work_directory << ":$PATH" << std::endl;
   if (_env_file != "")
@@ -138,6 +141,8 @@ Launcher::Job_SALOME::buildSalomeScript(Batch::Parametre params)
 
   // Call real job type
   addJobTypeSpecificScript(launch_script_stream);
+  // log the exit code
+  launch_script_stream << "echo $? > logs/exit_code.log" << std::endl;
 
   // End
   launch_script_stream << _resource_definition.AppliPath << "/salome kill \"$appli_port\"" << std::endl;
index f7db94e5825f074a8eb21051cd4d97332ef99573..3e4c50df48113b04248191b6a18bcd507c035e75 100644 (file)
@@ -515,4 +515,72 @@ XML_Persistence::addAttr(xmlNodePtr node, const string & name, const string & va
   xmlFree(xmlStrValue);
 }
 
+Job*
+XML_Persistence::createJobFromString(const std::string& jobDump)
+{
+  xmlDocPtr doc;
+  doc = xmlReadMemory(jobDump.c_str(), jobDump.length(), "noname.xml", NULL, 0);
+  if (doc == NULL)
+  {
+    std::string error = "Error in xmlReadMemory in XML_Persistence::createJobFromString, could not parse string: " + jobDump;
+    LAUNCHER_INFOS(error);
+    throw LauncherException(error);
+  }
+
+  // Step 3: Find jobs
+  Job * result = NULL;
+  xmlNodePtr root_node = xmlDocGetRootElement(doc);
+  if (xmlStrToString(root_node->name) == "jobs")
+  {
+    xmlNodePtr xmlCurrentNode = root_node->xmlChildrenNode;
+    while(xmlCurrentNode != NULL && result == NULL)
+    {
+      if (xmlStrToString(xmlCurrentNode->name) == "job")
+      {
+        LAUNCHER_INFOS("A job is found");
+        result = createJobFromXmlNode(xmlCurrentNode);
+      }
+      xmlCurrentNode = xmlCurrentNode->next;
+    }
+  }
+  else
+  {
+    xmlFreeDoc(doc);
+    std::string error = "Error while parsing job dump: " + jobDump;
+    LAUNCHER_INFOS(error);
+    throw LauncherException(error);
+  }
+
+  // Clean
+  xmlFreeDoc(doc);
+  return result;
+}
+
+std::string
+XML_Persistence::dumpJob(const Job& job)
+{
+  // Initialization
+  xmlKeepBlanksDefault(0);
+  xmlDocPtr doc = xmlNewDoc(xmlCharStrdup("1.0"));
+  xmlNodePtr root_node = xmlNewNode(NULL, xmlCharStrdup("jobs"));
+  xmlDocSetRootElement(doc, root_node);
+  xmlNodePtr doc_comment = xmlNewDocComment(doc, xmlCharStrdup("SALOME Launcher job"));
+  xmlAddPrevSibling(root_node, doc_comment);
+
+  addJobToXmlDocument(root_node, job);
+
+  // Final step: write to result
+  xmlChar *xmlbuff;
+  int buffersize;
+  xmlDocDumpFormatMemory(doc, &xmlbuff, &buffersize, 1);
+  std::string result;
+  if(buffersize > 0)
+    result = (const char*) xmlbuff;
+
+  // Clean
+  xmlFree(xmlbuff);
+  xmlFreeDoc(doc);
+  return result;
+}
+
 }
index 42756a2d24c58aec7b11bbdce896670a9e09b6f9..d80d3d80dbe8a8bb65f135713a245e73f6666845 100644 (file)
@@ -48,6 +48,9 @@ namespace Launcher
     //! Save the jobs in the list "jobs_list" to the XML file "jobs_file".
     static void saveJobs(const char* jobs_file, const std::list<const Job *> & jobs_list);
 
+    static Job* createJobFromString(const std::string& jobDump);
+    static std::string dumpJob(const Job& job);
+
   private:
     // This class is static only, not instanciable
     XML_Persistence() {}
index cc7129bf0878b7e0a88d12eb4348cc887667856a..b35e1c563caaf3fb7763bd2b7dc319e965acc79e 100644 (file)
@@ -373,6 +373,43 @@ SALOME_Launcher::stopJob(CORBA::Long job_id)
   }
 }
 
+char *
+SALOME_Launcher::dumpJob(CORBA::Long job_id)
+{
+  std::string result;
+  try
+  {
+    result = _l.dumpJob(job_id);
+  }
+  catch(const LauncherException &ex)
+  {
+    INFOS(ex.msg.c_str());
+    THROW_SALOME_CORBA_EXCEPTION(ex.msg.c_str(),SALOME::BAD_PARAM);
+  }
+  return CORBA::string_dup(result.c_str());
+}
+
+CORBA::Long
+SALOME_Launcher::restoreJob(const char * dumpedJob)
+{
+  CORBA::Long jobId;
+  try{
+    jobId = _l.restoreJob(dumpedJob);
+    if(jobId >= 0)
+    {
+      std::ostringstream job_str;
+      job_str << jobId;
+      notifyObservers("NEW_JOB", job_str.str());
+    }
+  }
+  catch(const LauncherException &ex){
+    INFOS(ex.msg.c_str());
+    THROW_SALOME_CORBA_EXCEPTION(ex.msg.c_str(),SALOME::INTERNAL_ERROR);
+  }
+
+  return jobId;
+}
+
 //=============================================================================
 /*! CORBA Method:
  *  Create a job in the launcher with a file
index a04a5d48ee989c43e96a51096712eb1bf327e1e0..4c46a6399c3dd868043dd94f1c5b7655addb7612 100644 (file)
@@ -56,6 +56,8 @@ public:
   CORBA::Boolean getJobWorkFile(CORBA::Long job_id, const char * work_file, const char * directory);
   void        stopJob      (CORBA::Long job_id);
   void        removeJob    (CORBA::Long job_id);
+  char * dumpJob(CORBA::Long job_id);
+  CORBA::Long restoreJob(const char * dumpedJob);
 
   // Useful methods
   CORBA::Long    createJobWithFile(const char * xmlExecuteFile, const char * clusterName);
index e7da460f6bbecc86e6a3eaf99eb254fac26569a7..a69958cab827957e945861b7bc453aa084e4ee36 100644 (file)
@@ -107,7 +107,14 @@ int main(int argc, char* argv[])
   }
   try
     {
-      SALOME_Launcher *lServ(new SALOME_Launcher(orb,root_poa));
+      CORBA::PolicyList policies;
+      policies.length(1);
+      PortableServer::ThreadPolicy_var threadPol(root_poa->create_thread_policy(PortableServer::SINGLE_THREAD_MODEL));
+      policies[0] = PortableServer::ThreadPolicy::_duplicate(threadPol);
+      PortableServer::POA_var safePOA = root_poa->create_POA("SingleThreadPOA",pman,policies);
+      threadPol->destroy();
+
+      SALOME_Launcher *lServ(new SALOME_Launcher(orb,safePOA));
       lServ->_remove_ref();
       //
       SALOMESDS::DataServerManager *dsm(new SALOMESDS::DataServerManager(argc,argv,orb,root_poa));
index 48378f35b4177eba7da8bfadd5215f29cc09e5f6..68037d26b0b17173ed5961f64714d77b1ee22ecf 100644 (file)
@@ -33,4 +33,10 @@ IF(NOT WIN32)
   INSTALL(FILES CTestTestfileInstall.cmake
           DESTINATION ${KERNEL_TEST_DIR}/Launcher
           RENAME CTestTestfile.cmake)
+  SET(STRESS_TEST_FILES
+      test_stress.sh
+      launcher_use_case.py
+      )
+  INSTALL(PROGRAMS ${STRESS_TEST_FILES}
+          DESTINATION ${KERNEL_TEST_DIR}/Launcher)
 ENDIF()
index 3d4df3a2d0251415da7372351d6552d0fe1342de..9c7e2763954d04c6ef7965f46c232ac0d5a387e2 100644 (file)
 IF(NOT WIN32)
   SET(TEST_NAME ${COMPONENT_NAME}_Launcher)
   ADD_TEST(${TEST_NAME} python ${SALOME_TEST_DRIVER} 2000 test_launcher.py)
+  SET_TESTS_PROPERTIES(${TEST_NAME} PROPERTIES LABELS "${COMPONENT_NAME}"
+    #                                                 TIMEOUT 500
+    )
+
+  SET(TEST_NAME ${COMPONENT_NAME}_StressLauncher)
+  ADD_TEST(${TEST_NAME} python ${SALOME_TEST_DRIVER} 2000 ./test_stress.sh)
   SET_TESTS_PROPERTIES(${TEST_NAME} PROPERTIES LABELS "${COMPONENT_NAME}"
     #                                                 TIMEOUT 500
     )
diff --git a/src/Launcher/Test/launcher_use_case.py b/src/Launcher/Test/launcher_use_case.py
new file mode 100644 (file)
index 0000000..eac14d7
--- /dev/null
@@ -0,0 +1,44 @@
+#! /usr/bin/env python
+# -*- coding: utf-8 -*-
+# classic use case of a job
+import os
+import salome
+import tempfile
+import time
+import sys
+
+if __name__ == '__main__':
+  salome.salome_init()
+  launcher = salome.naming_service.Resolve('/SalomeLauncher')
+  job_params = salome.JobParameters()
+  job_params.resource_required = salome.ResourceParameters()
+  job_params.resource_required.name = "localhost"
+  job_params.resource_required.nb_proc = 1 # slurm: --ntasks
+
+  job_params.job_type = "command"
+  #cwd = os.getcwd()
+  cwd = sys.argv[1]
+  case_dir = tempfile.mkdtemp(prefix="test")
+  job_params.local_directory = cwd
+  job_params.job_file = "command.sh"
+  job_params.work_directory = os.path.join(case_dir, "run")
+  job_params.result_directory = os.path.join(case_dir, "result")
+  job_params.out_files = ["result.txt"]
+  job_params.wckey="P11U5:CARBONES"
+  job_params.job_name = "MyJob"
+  job_id = launcher.createJob(job_params)
+  launcher.launchJob(job_id)
+  job_string = launcher.dumpJob(job_id)
+  jobState = launcher.getJobState(job_id)
+  while jobState != 'FINISHED' and jobState != 'FAILED':
+      time.sleep(1)
+      jobState = launcher.getJobState(job_id)
+  launcher.getJobResults(job_id, '')
+  launcher.getJobWorkFile(job_id, "result.txt", '')
+  launcher.clearJobWorkingDir(job_id)
+  launcher.removeJob(job_id)
+  new_id = launcher.restoreJob(job_string)
+  job_params_bis = launcher.getJobParameters(new_id)
+  jobState = launcher.getJobState(new_id)
+  launcher.removeJob(new_id)
+  launcher.saveJobs(os.path.join(case_dir,"savejobs.xml"))
diff --git a/src/Launcher/Test/test_stress.sh b/src/Launcher/Test/test_stress.sh
new file mode 100755 (executable)
index 0000000..4ed5a05
--- /dev/null
@@ -0,0 +1,24 @@
+#! /bin/bash
+# This test launches in parallel a greate number of instancies of a usual use case
+WORKDIR=`mktemp -d`
+echo WORKDIR: $WORKDIR
+cat > $WORKDIR/command.sh <<< 'echo "OK" > result.txt'
+chmod 755 $WORKDIR/command.sh
+for i in {1..500}
+do
+  python launcher_use_case.py $WORKDIR 2> $WORKDIR/log$i.err &
+done
+exit_code=0
+for i in {1..500}
+do
+  wait -n
+  ret=$?
+  if [ $ret -ne "0" ]
+  then
+     echo "Error detected!"
+     exit_code=1
+  fi
+done
+# list of error files not empty
+ls -l $WORKDIR/*.err | awk '{if ($5 != "0") print $0}'
+exit $exit_code
\ No newline at end of file
diff --git a/src/Launcher/launcher_proxy.py b/src/Launcher/launcher_proxy.py
new file mode 100755 (executable)
index 0000000..77c1d77
--- /dev/null
@@ -0,0 +1,122 @@
+#! /usr/bin/env python
+# -*- coding: utf-8 -*-
+# Copyright (C) 2017-2018  CEA/DEN, EDF R&D
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
+#
+# See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
+#
+""" Easier access to SalomeLauncher"""
+
+import os
+import salome
+import time
+
+JOB_FILE_NAME = "jobDump.xml"
+
+class Job(object):
+  """
+  This class makes an easier access to SalomeLauncher.
+  It adds an automatic save of the job's parameters after the launch. The save
+  is done into the result directory of the job. It is then possible to reload
+  the job from that file. It also provides a waiting loop for the job to finish.
+  This proxy also hides the calls to the naming service in order to get the
+  instance of SalomeLauncher.
+  """
+
+  @staticmethod
+  def launch(job_params):
+    """ Create, submit and dump a new job to result_dir."""
+    myjob = Job()
+    launcher = salome.naming_service.Resolve('/SalomeLauncher')
+    myjob.launcher = launcher
+    result_dir = job_params.result_directory # local path where to copy out_files
+
+    myjob.job_id = launcher.createJob(job_params) #SALOME id of the job
+    launcher.launchJob(myjob.job_id) # copy files, run pre_command, submit job
+    myjob.saveJob(result_dir)
+    return myjob
+
+  @staticmethod
+  def reloadJob(result_dir):
+    """ Create a new job from a job dumped in result_dir."""
+    myjob = Job()
+    launcher = salome.naming_service.Resolve('/SalomeLauncher')
+    myjob.launcher = launcher
+    myjob.job_id = -1
+    try:
+      job_file_path = os.path.join(result_dir, JOB_FILE_NAME)
+      job_string = ""
+      with open(job_file_path, "r") as f:
+        job_string = f.read()
+      myjob.job_id = launcher.restoreJob(job_string)
+    except:
+      myjob = None
+    return myjob
+
+  def saveJob(self, result_dir):
+    job_string = self.launcher.dumpJob(self.job_id)
+    job_file_path = os.path.join(result_dir, JOB_FILE_NAME)
+    if not os.path.exists(result_dir):
+      os.makedirs(result_dir)
+    with open(job_file_path, "w") as f:
+      f.write(job_string)
+
+  def wait(self, sleep_delay=10):
+    """ Wait for the end of the job """
+    launcher = self.launcher
+    job_id = self.job_id
+    jobState = launcher.getJobState(job_id)
+    while jobState != "FINISHED" and jobState != "FAILED" :
+      time.sleep(sleep_delay)
+      jobState = launcher.getJobState(job_id)
+      print ("Job %d state: %s" % (job_id,jobState))
+
+  def verify(self):
+    """ Get the return code of the solver if the job is finished.
+        If the job is not finished (submission error or another state),
+        return an empty string.
+    """
+    launcher = self.launcher
+    job_id = self.job_id
+    jobState = launcher.getJobState(job_id)
+    exit_code = ""
+    if jobState != "FINISHED" :
+      print ("Job has not finished correctly.")
+      print ("Job %d state: %s" % (job_id,jobState))
+    else :
+      job_params = launcher.getJobParameters(job_id)
+      temp_dir = job_params.result_directory
+      temp_log_dir = os.path.join(temp_dir, "logs")
+      if(launcher.getJobWorkFile(job_id, "logs/exit_code.log", temp_log_dir)):
+        exit_code_file = os.path.join(temp_log_dir, "exit_code.log")
+        if os.path.isfile(exit_code_file):
+          with open(exit_code_file) as myfile:
+            exit_code = myfile.read()
+    return exit_code.strip()
+
+  def getResults(self):
+    """ Copy the result files from remote working_directory
+        to the local result_directory."""
+    self.launcher.getJobResults(self.job_id, "")
+
+  def relaunch(self, script=""):
+    job_params = self.launcher.getJobParameters(self.job_id)
+    if script:
+      job_params.job_file = script
+    job_params.pre_command = ""
+    self.job_id = self.launcher.createJob(job_params)
+    self.launcher.launchJob(self.job_id)
+    self.saveJob(job_params.result_directory)
index 6ce6d140e76feb03a0e4ba3d1b02da4b7962343b..b2ed58fa572a64ffb0ae935932a33572a36649df 100644 (file)
@@ -53,7 +53,7 @@ resourceParams::resourceParams()
 : can_launch_batch_jobs(false),
   can_run_containers(false),
   nb_proc(-1),
-  nb_node(-1),
+  nb_node(0),
   nb_proc_per_node(-1),
   cpu_clock(-1),
   mem_mb(-1)
@@ -327,8 +327,21 @@ void
 ResourcesManager_cpp::AddResourceInCatalog(const ParserResourcesType & new_resource)
 {
   if (new_resource.Name == DEFAULT_RESOURCE_NAME){
-    std::string error("Cannot modify default local resource \"" + DEFAULT_RESOURCE_NAME + "\"");
-    throw ResourcesException(error);
+    ParserResourcesType default_resource = _resourcesList[DEFAULT_RESOURCE_NAME];
+    // some of the properties of the default resource shouldn't be modified
+    std::string check;
+    if( default_resource.HostName != new_resource.HostName)
+      check += "The Hostname property of the default resource can not be modified.\n";
+    if( default_resource.AppliPath != new_resource.AppliPath)
+      check += "The Applipath property of the default resource can not be modified.\n";
+    if( !new_resource.can_run_containers)
+      check += "The default resource should be able to run containers.\n";
+    if( !new_resource.can_launch_batch_jobs)
+      check += "The default resource should be able to launch batch jobs.\n";
+    if( default_resource.Protocol != new_resource.Protocol)
+      check += "The Protocol property of the default resource can not be modified.\n";
+    if(!check.empty())
+      throw ResourcesException(check);
   }
   // TODO - Add minimal check
   _resourcesList[new_resource.Name] = new_resource;
@@ -364,12 +377,9 @@ void ResourcesManager_cpp::WriteInXmlFile(std::string xml_file)
   RES_MESSAGE("WriteInXmlFile : start");
 
   MapOfParserResourcesType resourceListToSave(_resourcesList);
-  // We do not save default local resource because it is automatically created at startup
-  resourceListToSave.erase(DEFAULT_RESOURCE_NAME);
   if (resourceListToSave.empty())
   {
-    RES_MESSAGE("WriteInXmlFile: nothing to do, no resource except default \"" <<
-                DEFAULT_RESOURCE_NAME << "\"");
+    RES_MESSAGE("WriteInXmlFile: nothing to do, no resource to save!");
     return;
   }