]> SALOME platform Git repositories - modules/yacs.git/commitdiff
Salome HOME
Add the preprocess feature to SALOME_Launcher.
authorOvidiu Mircescu <ovidiu.mircescu@edf.fr>
Thu, 7 Dec 2017 16:24:02 +0000 (17:24 +0100)
committerOvidiu Mircescu <ovidiu.mircescu@edf.fr>
Thu, 7 Dec 2017 16:24:02 +0000 (17:24 +0100)
It is now possible to run a command on the frontal of a cluster
before the submission of the batch.

idl/SALOME_Launcher.idl
src/Launcher/Launcher_Job.cxx
src/Launcher/Launcher_Job.hxx
src/Launcher/Launcher_XML_Persistence.cxx
src/Launcher/SALOME_Launcher.cxx
src/Launcher/Test/test_launcher.py
src/LifeCycleCORBA_SWIG/LifeCycleCORBA.py

index bebe60d4387ee3509ec4be41b8b5be6accec2ad4..b3619363c77b9291cea34453519208f4ddd20c77 100644 (file)
@@ -66,6 +66,12 @@ struct JobParameters
   */
   string job_file;
 
+  //! Pre processing command.
+  /*! This command is called on the remote resource, from #work_directory, after
+      the copy of #in_files and before submiting the job.
+  */
+  string pre_command;
+
   //! Local path to a script to be sourced in the environment of the job.
   /*! It may contain modifications of environment variables.
   */
@@ -198,6 +204,7 @@ interface SalomeLauncher
   /*! Launching the job consists of:
       - create the working directory on the remote file system
       - copy the input files into the working directory
+      - launch the pre processing command if one is defined
       - submit the job to the batch manager
   */
   void   launchJob    (in long job_id)                           raises (SALOME::SALOME_Exception);
index e046fd8cfc14d88b8f5ed7a3fed573a1873bdb47..e3a2fc615c40a67d6370c03adbd64ae4b7bd9eee 100644 (file)
@@ -40,6 +40,7 @@ Launcher::Job::Job()
   _job_file = "";
   _job_file_name = "";
   _job_file_name_complete = "";
+  _pre_command = "";
   _work_directory = "";
   _local_directory = "";
   _result_directory = "";
@@ -442,6 +443,18 @@ Launcher::Job::getReference() const
   return _reference;
 }
 
+void
+Launcher::Job::setPreCommand(const std::string & preCommand)
+{
+  _pre_command = preCommand;
+}
+
+std::string
+Launcher::Job::getPreCommand() const
+{
+  return _pre_command;
+}
+
 void
 Launcher::Job::checkMaximumDuration(const std::string & maximum_duration)
 {
@@ -604,6 +617,7 @@ Launcher::Job::common_job_params()
     }
   }
   params[Batch::WORKDIR] = _work_directory;
+  params[Batch::PREPROCESS] = _pre_command;
 
   // Parameters for COORM
   params[Batch::LAUNCHER_FILE] = _launcher_file;
index 2f2322bf828eef095b4ad37182e6668c7ffd311d..8531cd1c03b8d4ba8f516b811f4f6551e31b1121 100644 (file)
@@ -67,6 +67,7 @@ namespace Launcher
       // Common parameters
       void setJobName(const std::string & job_name);
       virtual void setJobFile(const std::string & job_file);
+      void setPreCommand(const std::string & preCommand);
       void setWorkDirectory(const std::string & work_directory);
       void setLocalDirectory(const std::string & local_directory);
       void setResultDirectory(const std::string & result_directory);
@@ -82,12 +83,13 @@ namespace Launcher
       void setWCKey(const std::string & wckey);
       void setExtraParams(const std::string & extra_params);
       void setReference(const std::string & reference);
-         // For COORM
-         void setLauncherFile(const std::string & launcher_file);
-         void setLauncherArgs(const std::string & launcher_args);
+      // For COORM
+      void setLauncherFile(const std::string & launcher_file);
+      void setLauncherArgs(const std::string & launcher_args);
 
       std::string getJobName() const;
       std::string getJobFile() const;
+      std::string getPreCommand() const;
       std::string getWorkDirectory() const;
       std::string getLocalDirectory() const;
       std::string getResultDirectory() const;
@@ -105,9 +107,9 @@ namespace Launcher
       std::string getExtraParams() const;
       std::string getReference() const;
 
-         // For COORM
-         std::string getLauncherFile() const;
-         std::string getLauncherArgs() const;
+      // For COORM
+      std::string getLauncherFile() const;
+      std::string getLauncherArgs() const;
 
       std::string updateJobState();
 
@@ -146,6 +148,7 @@ namespace Launcher
       std::string _job_file;
       std::string _job_file_name;
       std::string _job_file_name_complete;
+      std::string _pre_command;
 
       std::string _work_directory;
       std::string _local_directory;
@@ -163,9 +166,9 @@ namespace Launcher
       std::string _extra_params;
       std::string _reference; //! Reference of the job for the batch manager
 
-         // Parameters for COORM
-         std::string _launcher_file;
-         std::string _launcher_args;
+      // Parameters for COORM
+      std::string _launcher_file;
+      std::string _launcher_args;
 
 #ifdef WITH_LIBBATCH
     // Connection with LIBBATCH
index ab0c929b8a5a866ecb49a00ec5f5c36174fa19ff..eb7775e5a99251cd5cf5c4eee4976427c651fef2 100644 (file)
@@ -152,6 +152,8 @@ XML_Persistence::addJobToXmlDocument(xmlNodePtr root_node, const Job & job)
     addNode(node, "local_directory", job.getLocalDirectory());
   if (!job.getResultDirectory().empty())
     addNode(node, "result_directory", job.getResultDirectory());
+  if (!job.getPreCommand().empty())
+    addNode(node, "pre_command", job.getPreCommand());
 
   // Parameters for COORM
   if (!job.getLauncherFile().empty())
@@ -300,6 +302,8 @@ XML_Persistence::parseUserNode(Job * new_job, xmlNodePtr user_node)
     }
     else if (node_name == "env_file")
       new_job->setEnvFile(getNodeContent(current_node));
+    else if (node_name == "pre_command")
+      new_job->setPreCommand(getNodeContent(current_node));
     else if (node_name == "work_directory")
       new_job->setWorkDirectory(getNodeContent(current_node));
     else if (node_name == "local_directory")
index 33a961d04fc279db497f8a021818ee3e6e750667..39447a7fdb4a203d041947f79285cf43c34e93fc 100644 (file)
@@ -139,6 +139,7 @@ SALOME_Launcher::createJob(const Engines::JobParameters & job_parameters)
     INFOS(ex.msg.c_str());
     THROW_SALOME_CORBA_EXCEPTION(ex.msg.c_str(),SALOME::INTERNAL_ERROR);
   }
+  new_job->setPreCommand(job_parameters.pre_command.in());
 
   // Files
   std::string env_file = job_parameters.env_file.in();
@@ -518,6 +519,7 @@ SALOME_Launcher::getJobParameters(CORBA::Long job_id)
   job_parameters->work_directory   = CORBA::string_dup(job->getWorkDirectory().c_str());
   job_parameters->local_directory  = CORBA::string_dup(job->getLocalDirectory().c_str());
   job_parameters->result_directory = CORBA::string_dup(job->getResultDirectory().c_str());
+  job_parameters->pre_command      = CORBA::string_dup(job->getPreCommand().c_str());
 
   // Parameters for COORM
   job_parameters->launcher_file = CORBA::string_dup(job->getLauncherFile().c_str());
index 546b0057fda6241aad64b9d312a5edfc8c947078..b0bee509de0582e547500dc9756dda8d021c1d8d 100755 (executable)
@@ -61,6 +61,13 @@ class TestCompo(unittest.TestCase):
     except IOError,ex:
       self.fail("IO exception:" + str(ex));
 
+  def create_JobParameters(self):
+    job_params = salome.JobParameters()
+    job_params.wckey="P11U50:CARBONES" #needed by edf clusters
+    job_params.resource_required = salome.ResourceParameters()
+    job_params.resource_required.nb_proc = 1
+    return job_params
+
   ##############################
   # test of python_salome job
   ##############################
@@ -96,13 +103,11 @@ f.close()
     f.close()
 
     local_result_dir = os.path.join(case_test_dir, "result_py_job-")
-    job_params = salome.JobParameters()
+    job_params = self.create_JobParameters()
     job_params.job_type = "python_salome"
     job_params.job_file = job_script_file
     job_params.in_files = []
     job_params.out_files = ["result.txt", "subdir"]
-    job_params.resource_required = salome.ResourceParameters()
-    job_params.resource_required.nb_proc = 1
 
     launcher = salome.naming_service.Resolve('/SalomeLauncher')
 
@@ -194,15 +199,13 @@ f.close()
 
     # job params
     local_result_dir = os.path.join(case_test_dir, "result_com_job-")
-    job_params = salome.JobParameters()
+    job_params = self.create_JobParameters()
     job_params.job_type = "command"
     job_params.job_file = script_file
     job_params.env_file = env_file
     job_params.in_files = [data_file]
     job_params.out_files = ["result.txt", "copie"]
     job_params.local_directory = case_test_dir
-    job_params.resource_required = salome.ResourceParameters()
-    job_params.resource_required.nb_proc = 1
 
     # create and launch the job
     launcher = salome.naming_service.Resolve('/SalomeLauncher')
@@ -299,7 +302,7 @@ f.close()
     f.close()
 
     local_result_dir = os.path.join(case_test_dir, "result_yacs_job-")
-    job_params = salome.JobParameters()
+    job_params = self.create_JobParameters()
     job_params.job_type = "yacs_file"
     job_params.job_file = job_script_file
     job_params.env_file = os.path.join(case_test_dir,env_file)
@@ -308,8 +311,6 @@ f.close()
     # define the interval between two YACS schema dumps (3 seconds)
     import Engines
     job_params.specific_parameters = [Engines.Parameter("EnableDumpYACS", "3")]
-    job_params.resource_required = salome.ResourceParameters()
-    job_params.resource_required.nb_proc = 1
 
     launcher = salome.naming_service.Resolve('/SalomeLauncher')
     resManager= salome.lcc.getResourcesManager()
@@ -412,10 +413,9 @@ f.close()
     f.close()
 
     local_result_dir = os.path.join(case_test_dir, "result_yacsopt_job-")
-    job_params = salome.JobParameters()
+    job_params = self.create_JobParameters()
     job_params.job_type = "yacs_file"
     job_params.job_file = job_script_file
-    #job_params.env_file = os.path.join(case_test_dir,env_file)
     job_params.out_files = ["result.txt"]
 
     # define the interval between two YACS schema dumps (3 seconds)
@@ -423,8 +423,6 @@ f.close()
     job_params.specific_parameters = [Engines.Parameter("YACSDriverOptions",
                "-imynode.i=5 -imynode.d=3.7 -imynode.b=False -imynode.s=lili")]
     expected_result="i=5,d=3.7,b=False,s=lili"
-    job_params.resource_required = salome.ResourceParameters()
-    job_params.resource_required.nb_proc = 1
 
     launcher = salome.naming_service.Resolve('/SalomeLauncher')
     resManager= salome.lcc.getResourcesManager()
@@ -460,6 +458,78 @@ f.close()
       self.verifyFile(os.path.join(job_params.result_directory, "result.txt"),
                       expected_result)
 
+  ############################################
+  # test of command job type with pre_command
+  ############################################
+  def test_command_pre(self):
+    case_test_dir = os.path.join(TestCompo.test_dir, "command_pre")
+    mkdir_p(case_test_dir)
+
+    # command to be run before the job
+    pre_command = "echo 'it works!' > in.txt"
+    
+    # job script
+    script_file = "myTestScript.py"
+    script_text = """#! /usr/bin/env python
+# -*- coding: utf-8 -*-
+
+in_f = open("in.txt", "r")
+in_text = in_f.read()
+in_f.close()
+
+f = open('result.txt', 'w')
+f.write(in_text)
+f.close()
+"""
+    abs_script_file = os.path.join(case_test_dir, script_file)
+    f = open(abs_script_file, "w")
+    f.write(script_text)
+    f.close()
+    os.chmod(abs_script_file, 0o755)
+
+    # job params
+    local_result_dir = os.path.join(case_test_dir, "result_com_pre_job-")
+    job_params = self.create_JobParameters()
+    job_params.job_type = "command"
+    job_params.job_file = script_file
+    job_params.pre_command = pre_command
+    job_params.in_files = []
+    job_params.out_files = ["result.txt"]
+    job_params.local_directory = case_test_dir
+
+    # create and launch the job
+    launcher = salome.naming_service.Resolve('/SalomeLauncher')
+    resManager= salome.lcc.getResourcesManager()
+
+    for resource in self.ressources:
+      print "Testing command job on ", resource
+      job_params.result_directory = local_result_dir + resource
+      job_params.job_name = "CommandPreJob_" + resource
+      job_params.resource_required.name = resource
+
+      # use the working directory of the resource
+      resParams = resManager.GetResourceDefinition(resource)
+      wd = os.path.join(resParams.working_directory,
+                        "CommandPreJob" + self.suffix)
+      job_params.work_directory = wd
+
+      job_id = launcher.createJob(job_params)
+      launcher.launchJob(job_id)
+      # wait for the end of the job
+      jobState = launcher.getJobState(job_id)
+      print "Job %d state: %s" % (job_id,jobState)
+      while jobState != "FINISHED" and jobState != "FAILED" :
+        time.sleep(3)
+        jobState = launcher.getJobState(job_id)
+        print "Job %d state: %s" % (job_id,jobState)
+        pass
+
+      # verify the results
+      self.assertEqual(jobState, "FINISHED")
+      launcher.getJobResults(job_id, "")
+      self.verifyFile(os.path.join(job_params.result_directory, "result.txt"),
+                      "it works!\n")
+
 if __name__ == '__main__':
     # creat study
     import salome
index 2087b1bcf8d291f11f233dc0250a02d97c3a04e6..bfb357a79af5544789fca145da9bcfe05e5ae6fc 100644 (file)
@@ -58,7 +58,7 @@ class ResourceParameters (Engines.ResourceParameters):
                                         nb_proc_per_node, policy, resList)
 
 class JobParameters (Engines.JobParameters):
-  def __init__(self, job_name="", job_type="", job_file="", env_file="", in_files=None, out_files=None,
+  def __init__(self, job_name="", job_type="", job_file="", pre_command="", env_file="", in_files=None, out_files=None,
                      work_directory="", local_directory="", result_directory="", maximum_duration="",
                      resource_required=None, queue="", exclusive = False, mem_per_cpu = 0,
                      wckey = "", extra_params = "",
@@ -69,7 +69,7 @@ class JobParameters (Engines.JobParameters):
       out_files = []
     if specific_parameters is None:
       specific_parameters = []
-    Engines.JobParameters.__init__(self, job_name, job_type, job_file, env_file, in_files, out_files,
+    Engines.JobParameters.__init__(self, job_name, job_type, job_file, pre_command, env_file, in_files, out_files,
                                          work_directory, local_directory, result_directory, maximum_duration,
                                          resource_required, queue, exclusive, mem_per_cpu,
                                          wckey, extra_params,