From: Ovidiu Mircescu Date: Thu, 7 Dec 2017 16:24:02 +0000 (+0100) Subject: Add the preprocess feature to SALOME_Launcher. X-Git-Url: http://git.salome-platform.org/gitweb/?a=commitdiff_plain;h=ba491a628e00e7030e2f453c22c41711f2c77530;p=modules%2Fyacs.git Add the preprocess feature to SALOME_Launcher. It is now possible to run a command on the frontal of a cluster before the submission of the batch. --- diff --git a/idl/SALOME_Launcher.idl b/idl/SALOME_Launcher.idl index bebe60d43..b3619363c 100644 --- a/idl/SALOME_Launcher.idl +++ b/idl/SALOME_Launcher.idl @@ -66,6 +66,12 @@ struct JobParameters */ string job_file; + //! Pre processing command. + /*! This command is called on the remote resource, from #work_directory, after + the copy of #in_files and before submiting the job. + */ + string pre_command; + //! Local path to a script to be sourced in the environment of the job. /*! It may contain modifications of environment variables. */ @@ -198,6 +204,7 @@ interface SalomeLauncher /*! Launching the job consists of: - create the working directory on the remote file system - copy the input files into the working directory + - launch the pre processing command if one is defined - submit the job to the batch manager */ void launchJob (in long job_id) raises (SALOME::SALOME_Exception); diff --git a/src/Launcher/Launcher_Job.cxx b/src/Launcher/Launcher_Job.cxx index e046fd8cf..e3a2fc615 100644 --- a/src/Launcher/Launcher_Job.cxx +++ b/src/Launcher/Launcher_Job.cxx @@ -40,6 +40,7 @@ Launcher::Job::Job() _job_file = ""; _job_file_name = ""; _job_file_name_complete = ""; + _pre_command = ""; _work_directory = ""; _local_directory = ""; _result_directory = ""; @@ -442,6 +443,18 @@ Launcher::Job::getReference() const return _reference; } +void +Launcher::Job::setPreCommand(const std::string & preCommand) +{ + _pre_command = preCommand; +} + +std::string +Launcher::Job::getPreCommand() const +{ + return _pre_command; +} + void Launcher::Job::checkMaximumDuration(const std::string & maximum_duration) { @@ -604,6 +617,7 @@ Launcher::Job::common_job_params() } } params[Batch::WORKDIR] = _work_directory; + params[Batch::PREPROCESS] = _pre_command; // Parameters for COORM params[Batch::LAUNCHER_FILE] = _launcher_file; diff --git a/src/Launcher/Launcher_Job.hxx b/src/Launcher/Launcher_Job.hxx index 2f2322bf8..8531cd1c0 100644 --- a/src/Launcher/Launcher_Job.hxx +++ b/src/Launcher/Launcher_Job.hxx @@ -67,6 +67,7 @@ namespace Launcher // Common parameters void setJobName(const std::string & job_name); virtual void setJobFile(const std::string & job_file); + void setPreCommand(const std::string & preCommand); void setWorkDirectory(const std::string & work_directory); void setLocalDirectory(const std::string & local_directory); void setResultDirectory(const std::string & result_directory); @@ -82,12 +83,13 @@ namespace Launcher void setWCKey(const std::string & wckey); void setExtraParams(const std::string & extra_params); void setReference(const std::string & reference); - // For COORM - void setLauncherFile(const std::string & launcher_file); - void setLauncherArgs(const std::string & launcher_args); + // For COORM + void setLauncherFile(const std::string & launcher_file); + void setLauncherArgs(const std::string & launcher_args); std::string getJobName() const; std::string getJobFile() const; + std::string getPreCommand() const; std::string getWorkDirectory() const; std::string getLocalDirectory() const; std::string getResultDirectory() const; @@ -105,9 +107,9 @@ namespace Launcher std::string getExtraParams() const; std::string getReference() const; - // For COORM - std::string getLauncherFile() const; - std::string getLauncherArgs() const; + // For COORM + std::string getLauncherFile() const; + std::string getLauncherArgs() const; std::string updateJobState(); @@ -146,6 +148,7 @@ namespace Launcher std::string _job_file; std::string _job_file_name; std::string _job_file_name_complete; + std::string _pre_command; std::string _work_directory; std::string _local_directory; @@ -163,9 +166,9 @@ namespace Launcher std::string _extra_params; std::string _reference; //! Reference of the job for the batch manager - // Parameters for COORM - std::string _launcher_file; - std::string _launcher_args; + // Parameters for COORM + std::string _launcher_file; + std::string _launcher_args; #ifdef WITH_LIBBATCH // Connection with LIBBATCH diff --git a/src/Launcher/Launcher_XML_Persistence.cxx b/src/Launcher/Launcher_XML_Persistence.cxx index ab0c929b8..eb7775e5a 100644 --- a/src/Launcher/Launcher_XML_Persistence.cxx +++ b/src/Launcher/Launcher_XML_Persistence.cxx @@ -152,6 +152,8 @@ XML_Persistence::addJobToXmlDocument(xmlNodePtr root_node, const Job & job) addNode(node, "local_directory", job.getLocalDirectory()); if (!job.getResultDirectory().empty()) addNode(node, "result_directory", job.getResultDirectory()); + if (!job.getPreCommand().empty()) + addNode(node, "pre_command", job.getPreCommand()); // Parameters for COORM if (!job.getLauncherFile().empty()) @@ -300,6 +302,8 @@ XML_Persistence::parseUserNode(Job * new_job, xmlNodePtr user_node) } else if (node_name == "env_file") new_job->setEnvFile(getNodeContent(current_node)); + else if (node_name == "pre_command") + new_job->setPreCommand(getNodeContent(current_node)); else if (node_name == "work_directory") new_job->setWorkDirectory(getNodeContent(current_node)); else if (node_name == "local_directory") diff --git a/src/Launcher/SALOME_Launcher.cxx b/src/Launcher/SALOME_Launcher.cxx index 33a961d04..39447a7fd 100644 --- a/src/Launcher/SALOME_Launcher.cxx +++ b/src/Launcher/SALOME_Launcher.cxx @@ -139,6 +139,7 @@ SALOME_Launcher::createJob(const Engines::JobParameters & job_parameters) INFOS(ex.msg.c_str()); THROW_SALOME_CORBA_EXCEPTION(ex.msg.c_str(),SALOME::INTERNAL_ERROR); } + new_job->setPreCommand(job_parameters.pre_command.in()); // Files std::string env_file = job_parameters.env_file.in(); @@ -518,6 +519,7 @@ SALOME_Launcher::getJobParameters(CORBA::Long job_id) job_parameters->work_directory = CORBA::string_dup(job->getWorkDirectory().c_str()); job_parameters->local_directory = CORBA::string_dup(job->getLocalDirectory().c_str()); job_parameters->result_directory = CORBA::string_dup(job->getResultDirectory().c_str()); + job_parameters->pre_command = CORBA::string_dup(job->getPreCommand().c_str()); // Parameters for COORM job_parameters->launcher_file = CORBA::string_dup(job->getLauncherFile().c_str()); diff --git a/src/Launcher/Test/test_launcher.py b/src/Launcher/Test/test_launcher.py index 546b0057f..b0bee509d 100755 --- a/src/Launcher/Test/test_launcher.py +++ b/src/Launcher/Test/test_launcher.py @@ -61,6 +61,13 @@ class TestCompo(unittest.TestCase): except IOError,ex: self.fail("IO exception:" + str(ex)); + def create_JobParameters(self): + job_params = salome.JobParameters() + job_params.wckey="P11U50:CARBONES" #needed by edf clusters + job_params.resource_required = salome.ResourceParameters() + job_params.resource_required.nb_proc = 1 + return job_params + ############################## # test of python_salome job ############################## @@ -96,13 +103,11 @@ f.close() f.close() local_result_dir = os.path.join(case_test_dir, "result_py_job-") - job_params = salome.JobParameters() + job_params = self.create_JobParameters() job_params.job_type = "python_salome" job_params.job_file = job_script_file job_params.in_files = [] job_params.out_files = ["result.txt", "subdir"] - job_params.resource_required = salome.ResourceParameters() - job_params.resource_required.nb_proc = 1 launcher = salome.naming_service.Resolve('/SalomeLauncher') @@ -194,15 +199,13 @@ f.close() # job params local_result_dir = os.path.join(case_test_dir, "result_com_job-") - job_params = salome.JobParameters() + job_params = self.create_JobParameters() job_params.job_type = "command" job_params.job_file = script_file job_params.env_file = env_file job_params.in_files = [data_file] job_params.out_files = ["result.txt", "copie"] job_params.local_directory = case_test_dir - job_params.resource_required = salome.ResourceParameters() - job_params.resource_required.nb_proc = 1 # create and launch the job launcher = salome.naming_service.Resolve('/SalomeLauncher') @@ -299,7 +302,7 @@ f.close() f.close() local_result_dir = os.path.join(case_test_dir, "result_yacs_job-") - job_params = salome.JobParameters() + job_params = self.create_JobParameters() job_params.job_type = "yacs_file" job_params.job_file = job_script_file job_params.env_file = os.path.join(case_test_dir,env_file) @@ -308,8 +311,6 @@ f.close() # define the interval between two YACS schema dumps (3 seconds) import Engines job_params.specific_parameters = [Engines.Parameter("EnableDumpYACS", "3")] - job_params.resource_required = salome.ResourceParameters() - job_params.resource_required.nb_proc = 1 launcher = salome.naming_service.Resolve('/SalomeLauncher') resManager= salome.lcc.getResourcesManager() @@ -412,10 +413,9 @@ f.close() f.close() local_result_dir = os.path.join(case_test_dir, "result_yacsopt_job-") - job_params = salome.JobParameters() + job_params = self.create_JobParameters() job_params.job_type = "yacs_file" job_params.job_file = job_script_file - #job_params.env_file = os.path.join(case_test_dir,env_file) job_params.out_files = ["result.txt"] # define the interval between two YACS schema dumps (3 seconds) @@ -423,8 +423,6 @@ f.close() job_params.specific_parameters = [Engines.Parameter("YACSDriverOptions", "-imynode.i=5 -imynode.d=3.7 -imynode.b=False -imynode.s=lili")] expected_result="i=5,d=3.7,b=False,s=lili" - job_params.resource_required = salome.ResourceParameters() - job_params.resource_required.nb_proc = 1 launcher = salome.naming_service.Resolve('/SalomeLauncher') resManager= salome.lcc.getResourcesManager() @@ -460,6 +458,78 @@ f.close() self.verifyFile(os.path.join(job_params.result_directory, "result.txt"), expected_result) + ############################################ + # test of command job type with pre_command + ############################################ + def test_command_pre(self): + case_test_dir = os.path.join(TestCompo.test_dir, "command_pre") + mkdir_p(case_test_dir) + + # command to be run before the job + pre_command = "echo 'it works!' > in.txt" + + # job script + script_file = "myTestScript.py" + script_text = """#! /usr/bin/env python +# -*- coding: utf-8 -*- + +in_f = open("in.txt", "r") +in_text = in_f.read() +in_f.close() + +f = open('result.txt', 'w') +f.write(in_text) +f.close() +""" + abs_script_file = os.path.join(case_test_dir, script_file) + f = open(abs_script_file, "w") + f.write(script_text) + f.close() + os.chmod(abs_script_file, 0o755) + + # job params + local_result_dir = os.path.join(case_test_dir, "result_com_pre_job-") + job_params = self.create_JobParameters() + job_params.job_type = "command" + job_params.job_file = script_file + job_params.pre_command = pre_command + job_params.in_files = [] + job_params.out_files = ["result.txt"] + job_params.local_directory = case_test_dir + + # create and launch the job + launcher = salome.naming_service.Resolve('/SalomeLauncher') + resManager= salome.lcc.getResourcesManager() + + for resource in self.ressources: + print "Testing command job on ", resource + job_params.result_directory = local_result_dir + resource + job_params.job_name = "CommandPreJob_" + resource + job_params.resource_required.name = resource + + # use the working directory of the resource + resParams = resManager.GetResourceDefinition(resource) + wd = os.path.join(resParams.working_directory, + "CommandPreJob" + self.suffix) + job_params.work_directory = wd + + job_id = launcher.createJob(job_params) + launcher.launchJob(job_id) + # wait for the end of the job + jobState = launcher.getJobState(job_id) + print "Job %d state: %s" % (job_id,jobState) + while jobState != "FINISHED" and jobState != "FAILED" : + time.sleep(3) + jobState = launcher.getJobState(job_id) + print "Job %d state: %s" % (job_id,jobState) + pass + + # verify the results + self.assertEqual(jobState, "FINISHED") + launcher.getJobResults(job_id, "") + self.verifyFile(os.path.join(job_params.result_directory, "result.txt"), + "it works!\n") + if __name__ == '__main__': # creat study import salome diff --git a/src/LifeCycleCORBA_SWIG/LifeCycleCORBA.py b/src/LifeCycleCORBA_SWIG/LifeCycleCORBA.py index 2087b1bcf..bfb357a79 100644 --- a/src/LifeCycleCORBA_SWIG/LifeCycleCORBA.py +++ b/src/LifeCycleCORBA_SWIG/LifeCycleCORBA.py @@ -58,7 +58,7 @@ class ResourceParameters (Engines.ResourceParameters): nb_proc_per_node, policy, resList) class JobParameters (Engines.JobParameters): - def __init__(self, job_name="", job_type="", job_file="", env_file="", in_files=None, out_files=None, + def __init__(self, job_name="", job_type="", job_file="", pre_command="", env_file="", in_files=None, out_files=None, work_directory="", local_directory="", result_directory="", maximum_duration="", resource_required=None, queue="", exclusive = False, mem_per_cpu = 0, wckey = "", extra_params = "", @@ -69,7 +69,7 @@ class JobParameters (Engines.JobParameters): out_files = [] if specific_parameters is None: specific_parameters = [] - Engines.JobParameters.__init__(self, job_name, job_type, job_file, env_file, in_files, out_files, + Engines.JobParameters.__init__(self, job_name, job_type, job_file, pre_command, env_file, in_files, out_files, work_directory, local_directory, result_directory, maximum_duration, resource_required, queue, exclusive, mem_per_cpu, wckey, extra_params,