From: Ovidiu Mircescu Date: Thu, 7 Dec 2017 16:13:38 +0000 (+0100) Subject: Add preprocess feature. X-Git-Tag: V2_4_0~4 X-Git-Url: http://git.salome-platform.org/gitweb/?a=commitdiff_plain;h=35e605daf073935d7feecaa7b0dcad2036af37c6;p=tools%2Flibbatch.git Add preprocess feature. This feature allows to run a command on the frontal before the submission of the batch. --- diff --git a/src/CCC/BatchManager_CCC.cxx b/src/CCC/BatchManager_CCC.cxx index d2f05d2..03912ba 100644 --- a/src/CCC/BatchManager_CCC.cxx +++ b/src/CCC/BatchManager_CCC.cxx @@ -73,7 +73,7 @@ namespace Batch { } // Methode pour le controle des jobs : soumet un job au gestionnaire - const JobId BatchManager_CCC::submitJob(const Job & job) + const JobId BatchManager_CCC::runJob(const Job & job) { Parametre params = job.getParametre(); const std::string workDir = params[WORKDIR]; @@ -82,10 +82,6 @@ namespace Batch { string::size_type p2 = fileToExecute.find_last_of("."); std::string fileNameToExecute = fileToExecute.substr(p1+1,p2-p1-1); - // export input files on cluster - LOG("Export des fichiers en entree"); - exportInputFiles(job); - // build batch script for job LOG("Construction du script de batch"); buildBatchScript(job); diff --git a/src/CCC/BatchManager_CCC.hxx b/src/CCC/BatchManager_CCC.hxx index 9319033..916ddcc 100644 --- a/src/CCC/BatchManager_CCC.hxx +++ b/src/CCC/BatchManager_CCC.hxx @@ -53,7 +53,7 @@ namespace Batch { // static string BatchManager_LSF::getDefaultServer(); // Methodes pour le controle des jobs - virtual const JobId submitJob(const Job & job); // soumet un job au gestionnaire + virtual const JobId runJob(const Job & job); // soumet un job au gestionnaire virtual void deleteJob(const JobId & jobid); // retire un job du gestionnaire virtual JobInfo queryJob(const JobId & jobid); // renvoie l'etat du job virtual bool isRunning(const JobId & jobid); // teste si un job est present en machine diff --git a/src/COORM/BatchManager_COORM.cxx b/src/COORM/BatchManager_COORM.cxx index 24530be..87dd529 100644 --- a/src/COORM/BatchManager_COORM.cxx +++ b/src/COORM/BatchManager_COORM.cxx @@ -31,234 +31,230 @@ using namespace std; namespace Batch { - BatchManager_COORM::BatchManager_COORM(const FactBatchManager * parent, const char * host, - const char * username, - CommunicationProtocolType protocolType, const char * mpiImpl) - : BatchManager(parent, host, username, protocolType, mpiImpl) - { - } - - BatchManager_COORM::~BatchManager_COORM() - { - } - - // Soumet un job au gestionnaire - const JobId BatchManager_COORM::submitJob(const Job & job) - { - Parametre params = job.getParametre(); - const string workDir = params[WORKDIR]; - const string fileToExecute = params[EXECUTABLE]; - - // For CooRM - const string launcherArgs = params[LAUNCHER_ARGS]; - const string launcherFile = params[LAUNCHER_FILE]; - - const string jobName = params[NAME]; - - string::size_type p1 = fileToExecute.find_last_of("/"); - string::size_type p2 = fileToExecute.find_last_of("."); - std::string fileNameToExecute = fileToExecute.substr(p1+1,p2-p1-1); - - // For CooRM - p1 = launcherFile.find_last_of("/"); - std::string fileNameToLaunch = launcherFile.substr(p1+1); - - - // export input files on cluster - exportInputFiles(job); - - // build batch script for job - string scriptFile = buildBatchScript(job); - - // Get REMOTE_COORM_PATH environment variable - const char * remote_coorm_path = getenv("REMOTE_COORM_PATH"); - if (remote_coorm_path == NULL) - { - throw RunTimeException("Unable to get REMOTE_COORM_PATH environment variable"); - } - - // We need omniORB to execute launcher.py - const string set_env_cmd = "source " + string(remote_coorm_path) + "/coorm_prerequis.sh;"; - - - // define command to submit batch - string subCommand = set_env_cmd + "python " + workDir + "/" + fileNameToLaunch + " --name="+ jobName + - " --workdir=" + workDir + " --outputs=" + workDir + "/logs/outputs.log" + - " --errors=" + workDir + "/logs/errors.log" + - " --executable=" + scriptFile + " " + launcherArgs; - string command = _protocol.getExecCommand(subCommand, _hostname, _username); - command += " 2>&1"; - LOG(command); - - // submit job - string output; - int status = Utils::getCommandOutput(command, output); - LOG(output); - if (status != 0) throw RunTimeException("Can't submit job, error was: " + output); - - // read id of submitted job in output - istringstream logfile(output); - string sline, idline, id; - - if (logfile) - { - while (getline(logfile, sline) && sline != "") - { - idline = sline; - } - - vector tokens; - JobInfo::Tokenize(idline, tokens, "="); - id = tokens[1] ; - } - else - { - throw RunTimeException("Error in the submission of the job on the remote host"); - } - - JobId jobid(this, (string) id); - return jobid; - } - - // retire un job du gestionnaire - void BatchManager_COORM::deleteJob(const JobId & jobid) - { - // Get REMOTE_COORM_PATH environment variable - const char * remote_coorm_path = getenv("REMOTE_COORM_PATH"); - if (remote_coorm_path == NULL) - { - throw RunTimeException("Unable to get REMOTE_COORM_PATH environment variable"); - } - - // We need omniORB to execute launcher.py - const string set_env_cmd = "source " + string(remote_coorm_path) + "/coorm_prerequis.sh;"; - - // define command to delete job - string subCommand = set_env_cmd + "python " + string(remote_coorm_path) + "/coormdel.py --jobID=" + jobid.getReference(); - string command = _protocol.getExecCommand(subCommand, _hostname, _username); - LOG(command); - - int status = system(command.c_str()); - if (status) - throw RunTimeException("Can't delete job " + jobid.getReference()); - - LOG("job " << jobid.getReference() << " killed"); - } - - // Renvoie l'etat du job - JobInfo BatchManager_COORM::queryJob(const JobId & jobid) - { - // Get REMOTE_COORM_PATH environment variable - const char * remote_coorm_path = getenv("REMOTE_COORM_PATH"); - if (remote_coorm_path == NULL) - { - throw RunTimeException("Unable to get REMOTE_COORM_PATH environment variable"); - } - - // We need omniORB to execute launcher.py - const string set_env_cmd = "source " + string(remote_coorm_path) + "/coorm_prerequis.sh;"; - - // define command to query batch - string subCommand = set_env_cmd + "python " + string(remote_coorm_path) + "/coormstat.py --jobID=" + jobid.getReference(); - string command = _protocol.getExecCommand(subCommand, _hostname, _username); - LOG(command); - string output; - int status = Utils::getCommandOutput(command, output); - if(status && status != 153 && status != 256*153) - throw RunTimeException("Error of connection on remote host"); - - JobInfo_COORM jobinfo = JobInfo_COORM(jobid.getReference(), output); - return jobinfo; - } - - string BatchManager_COORM::buildBatchScript(const Job & job) - { - Parametre params = job.getParametre(); - - // Job Parameters - string workDir = ""; - string fileToExecute = ""; - string tmpDir = ""; - int nbproc = 0; - int edt = 0; - int mem = 0; - string queue = ""; - - // Mandatory parameters - if (params.find(WORKDIR) != params.end()) - workDir = params[WORKDIR].str(); - else - throw RunTimeException("params[WORKDIR] is not defined. Please define it, cannot submit this job."); - if (params.find(EXECUTABLE) != params.end()) - fileToExecute = params[EXECUTABLE].str(); - else - throw RunTimeException("params[EXECUTABLE] is not defined. Please define it, cannot submit this job."); - - string::size_type p1 = fileToExecute.find_last_of("/"); - string::size_type p2 = fileToExecute.find_last_of("."); - string rootNameToExecute = fileToExecute.substr(p1+1,p2-p1-1); - string fileNameToExecute = fileToExecute.substr(p1+1); - - // Create batch submit file - ofstream tempOutputFile; - string tmpFileName = Utils::createAndOpenTemporaryFile("COORM-script", tempOutputFile); - - tempOutputFile << "#!/bin/sh -f" << endl; - tempOutputFile << "export LIBBATCH_NODEFILE=$COORM_NODEFILE" << endl; - // Launch the executable - tempOutputFile << "cd " << tmpDir << endl; - tempOutputFile << "./" + fileNameToExecute << endl; - tempOutputFile.flush(); - tempOutputFile.close(); - - Utils::chmod(tmpFileName.c_str(), 0x1ED); - LOG("Batch script file generated is: " << tmpFileName); - - string remoteFileName = rootNameToExecute + "_Batch.sh"; - int status = _protocol.copyFile(tmpFileName, "", "", - workDir + "/" + remoteFileName, - _hostname, _username); - if (status) - throw RunTimeException("Cannot copy batch submission file on host " + _hostname); - - return remoteFileName; - } - - void BatchManager_COORM::exportInputFiles(const Job & job) - { - BatchManager::exportInputFiles(job); - - int status; - Parametre params = job.getParametre(); - - string launcherFile = params[LAUNCHER_FILE]; - - if (launcherFile.size() != 0) - { - // Copy launcherFile into batch working directory - status = _protocol.copyFile(launcherFile, "", "", - params[WORKDIR], _hostname, _username); - if (status) { - std::ostringstream oss; - oss << "Cannot copy file " << launcherFile << " on host " << _hostname; - oss << ". Return status is " << status; - throw RunTimeException(oss.str()); + BatchManager_COORM::BatchManager_COORM(const FactBatchManager * parent, const char * host, + const char * username, + CommunicationProtocolType protocolType, const char * mpiImpl) + : BatchManager(parent, host, username, protocolType, mpiImpl) + { } - string remoteLauncher = launcherFile; - remoteLauncher = remoteLauncher.substr(remoteLauncher.rfind("/") + 1, remoteLauncher.length()); - remoteLauncher = string(params[WORKDIR]) + "/" + remoteLauncher; - - string subCommand = string("chmod u+x ") + remoteLauncher; - string command = _protocol.getExecCommand(subCommand, _hostname, _username); - LOG(command); - status = system(command.c_str()); - if (status) { - std::ostringstream oss; - oss << "Cannot change permissions of file " << remoteLauncher << " on host " << _hostname; - oss << ". Return status is " << status; - throw RunTimeException(oss.str()); + BatchManager_COORM::~BatchManager_COORM() + { + } + + // Soumet un job au gestionnaire + const JobId BatchManager_COORM::runJob(const Job & job) + { + Parametre params = job.getParametre(); + const string workDir = params[WORKDIR]; + const string fileToExecute = params[EXECUTABLE]; + + // For CooRM + const string launcherArgs = params[LAUNCHER_ARGS]; + const string launcherFile = params[LAUNCHER_FILE]; + + const string jobName = params[NAME]; + + string::size_type p1 = fileToExecute.find_last_of("/"); + string::size_type p2 = fileToExecute.find_last_of("."); + std::string fileNameToExecute = fileToExecute.substr(p1+1,p2-p1-1); + + // For CooRM + p1 = launcherFile.find_last_of("/"); + std::string fileNameToLaunch = launcherFile.substr(p1+1); + + // build batch script for job + string scriptFile = buildBatchScript(job); + + // Get REMOTE_COORM_PATH environment variable + const char * remote_coorm_path = getenv("REMOTE_COORM_PATH"); + if (remote_coorm_path == NULL) + { + throw RunTimeException("Unable to get REMOTE_COORM_PATH environment variable"); + } + + // We need omniORB to execute launcher.py + const string set_env_cmd = "source " + string(remote_coorm_path) + "/coorm_prerequis.sh;"; + + + // define command to submit batch + string subCommand = set_env_cmd + "python " + workDir + "/" + fileNameToLaunch + " --name="+ jobName + + " --workdir=" + workDir + " --outputs=" + workDir + "/logs/outputs.log" + + " --errors=" + workDir + "/logs/errors.log" + + " --executable=" + scriptFile + " " + launcherArgs; + string command = _protocol.getExecCommand(subCommand, _hostname, _username); + command += " 2>&1"; + LOG(command); + + // submit job + string output; + int status = Utils::getCommandOutput(command, output); + LOG(output); + if (status != 0) throw RunTimeException("Can't submit job, error was: " + output); + + // read id of submitted job in output + istringstream logfile(output); + string sline, idline, id; + + if (logfile) + { + while (getline(logfile, sline) && sline != "") + { + idline = sline; + } + + vector tokens; + JobInfo::Tokenize(idline, tokens, "="); + id = tokens[1] ; + } + else + { + throw RunTimeException("Error in the submission of the job on the remote host"); + } + + JobId jobid(this, (string) id); + return jobid; + } + + // retire un job du gestionnaire + void BatchManager_COORM::deleteJob(const JobId & jobid) + { + // Get REMOTE_COORM_PATH environment variable + const char * remote_coorm_path = getenv("REMOTE_COORM_PATH"); + if (remote_coorm_path == NULL) + { + throw RunTimeException("Unable to get REMOTE_COORM_PATH environment variable"); + } + + // We need omniORB to execute launcher.py + const string set_env_cmd = "source " + string(remote_coorm_path) + "/coorm_prerequis.sh;"; + + // define command to delete job + string subCommand = set_env_cmd + "python " + string(remote_coorm_path) + "/coormdel.py --jobID=" + jobid.getReference(); + string command = _protocol.getExecCommand(subCommand, _hostname, _username); + LOG(command); + + int status = system(command.c_str()); + if (status) + throw RunTimeException("Can't delete job " + jobid.getReference()); + + LOG("job " << jobid.getReference() << " killed"); + } + + // Renvoie l'etat du job + JobInfo BatchManager_COORM::queryJob(const JobId & jobid) + { + // Get REMOTE_COORM_PATH environment variable + const char * remote_coorm_path = getenv("REMOTE_COORM_PATH"); + if (remote_coorm_path == NULL) + { + throw RunTimeException("Unable to get REMOTE_COORM_PATH environment variable"); + } + + // We need omniORB to execute launcher.py + const string set_env_cmd = "source " + string(remote_coorm_path) + "/coorm_prerequis.sh;"; + + // define command to query batch + string subCommand = set_env_cmd + "python " + string(remote_coorm_path) + "/coormstat.py --jobID=" + jobid.getReference(); + string command = _protocol.getExecCommand(subCommand, _hostname, _username); + LOG(command); + string output; + int status = Utils::getCommandOutput(command, output); + if(status && status != 153 && status != 256*153) + throw RunTimeException("Error of connection on remote host"); + + JobInfo_COORM jobinfo = JobInfo_COORM(jobid.getReference(), output); + return jobinfo; + } + + string BatchManager_COORM::buildBatchScript(const Job & job) + { + Parametre params = job.getParametre(); + + // Job Parameters + string workDir = ""; + string fileToExecute = ""; + string tmpDir = ""; + int nbproc = 0; + int edt = 0; + int mem = 0; + string queue = ""; + + // Mandatory parameters + if (params.find(WORKDIR) != params.end()) + workDir = params[WORKDIR].str(); + else + throw RunTimeException("params[WORKDIR] is not defined. Please define it, cannot submit this job."); + if (params.find(EXECUTABLE) != params.end()) + fileToExecute = params[EXECUTABLE].str(); + else + throw RunTimeException("params[EXECUTABLE] is not defined. Please define it, cannot submit this job."); + + string::size_type p1 = fileToExecute.find_last_of("/"); + string::size_type p2 = fileToExecute.find_last_of("."); + string rootNameToExecute = fileToExecute.substr(p1+1,p2-p1-1); + string fileNameToExecute = fileToExecute.substr(p1+1); + + // Create batch submit file + ofstream tempOutputFile; + string tmpFileName = Utils::createAndOpenTemporaryFile("COORM-script", tempOutputFile); + + tempOutputFile << "#!/bin/sh -f" << endl; + tempOutputFile << "export LIBBATCH_NODEFILE=$COORM_NODEFILE" << endl; + // Launch the executable + tempOutputFile << "cd " << tmpDir << endl; + tempOutputFile << "./" + fileNameToExecute << endl; + tempOutputFile.flush(); + tempOutputFile.close(); + + Utils::chmod(tmpFileName.c_str(), 0x1ED); + LOG("Batch script file generated is: " << tmpFileName); + + string remoteFileName = rootNameToExecute + "_Batch.sh"; + int status = _protocol.copyFile(tmpFileName, "", "", + workDir + "/" + remoteFileName, + _hostname, _username); + if (status) + throw RunTimeException("Cannot copy batch submission file on host " + _hostname); + + return remoteFileName; + } + + void BatchManager_COORM::exportInputFiles(const Job & job) + { + BatchManager::exportInputFiles(job); + + int status; + Parametre params = job.getParametre(); + + string launcherFile = params[LAUNCHER_FILE]; + + if (launcherFile.size() != 0) + { + // Copy launcherFile into batch working directory + status = _protocol.copyFile(launcherFile, "", "", + params[WORKDIR], _hostname, _username); + if (status) { + std::ostringstream oss; + oss << "Cannot copy file " << launcherFile << " on host " << _hostname; + oss << ". Return status is " << status; + throw RunTimeException(oss.str()); + } + + string remoteLauncher = launcherFile; + remoteLauncher = remoteLauncher.substr(remoteLauncher.rfind("/") + 1, remoteLauncher.length()); + remoteLauncher = string(params[WORKDIR]) + "/" + remoteLauncher; + + string subCommand = string("chmod u+x ") + remoteLauncher; + string command = _protocol.getExecCommand(subCommand, _hostname, _username); + LOG(command); + status = system(command.c_str()); + if (status) { + std::ostringstream oss; + oss << "Cannot change permissions of file " << remoteLauncher << " on host " << _hostname; + oss << ". Return status is " << status; + throw RunTimeException(oss.str()); + } + } } - } - } } diff --git a/src/COORM/BatchManager_COORM.hxx b/src/COORM/BatchManager_COORM.hxx index 9dbc4cf..87abb24 100644 --- a/src/COORM/BatchManager_COORM.hxx +++ b/src/COORM/BatchManager_COORM.hxx @@ -28,42 +28,42 @@ namespace Batch { - class BATCH_EXPORT BatchManager_COORM : public BatchManager - { - public: - // Constructeur - BatchManager_COORM(const FactBatchManager * parent, const char * host="localhost", - const char * username="", - CommunicationProtocolType protocolType = SSH, const char * mpiImpl="nompi"); + class BATCH_EXPORT BatchManager_COORM : public BatchManager + { + public: + // Constructeur + BatchManager_COORM(const FactBatchManager * parent, const char * host="localhost", + const char * username="", + CommunicationProtocolType protocolType = SSH, const char * mpiImpl="nompi"); - // Destructeur - virtual ~BatchManager_COORM(); + // Destructeur + virtual ~BatchManager_COORM(); - // Soumet un job - virtual const JobId submitJob(const Job & job); + // Soumet un job + virtual const JobId runJob(const Job & job); - // Supprime un job - virtual void deleteJob(const JobId & jobid); + // Supprime un job + virtual void deleteJob(const JobId & jobid); - // Donne l'etat du job - virtual JobInfo queryJob(const JobId & jobid); + // Donne l'etat du job + virtual JobInfo queryJob(const JobId & jobid); - // Modifie un job en file d'attente - virtual void setParametre(const JobId & jobid, const Parametre & param) { return alterJob(jobid, param); } + // Modifie un job en file d'attente + virtual void setParametre(const JobId & jobid, const Parametre & param) { return alterJob(jobid, param); } - // Modifie un job en file d'attente - virtual void setEnvironnement(const JobId & jobid, const Environnement & env) { return alterJob(jobid, env); } + // Modifie un job en file d'attente + virtual void setEnvironnement(const JobId & jobid, const Environnement & env) { return alterJob(jobid, env); } - protected: - std::string buildBatchScript(const Job & job); - void exportInputFiles(const Job & job); + protected: + std::string buildBatchScript(const Job & job); + void exportInputFiles(const Job & job); #ifdef SWIG - public: - // Recupere le l'identifiant d'un job deja soumis au BatchManager - virtual const JobId getJobIdByReference(const char * ref) { return BatchManager::getJobIdByReference(ref); } + public: + // Recupere le l'identifiant d'un job deja soumis au BatchManager + virtual const JobId getJobIdByReference(const char * ref) { return BatchManager::getJobIdByReference(ref); } #endif - }; + }; } #endif diff --git a/src/Core/BatchManager.cxx b/src/Core/BatchManager.cxx index 4645ffb..c50266f 100644 --- a/src/Core/BatchManager.cxx +++ b/src/Core/BatchManager.cxx @@ -78,7 +78,9 @@ namespace Batch { // Methode pour le controle des jobs : soumet un job au gestionnaire const JobId BatchManager::submitJob(const Job & job) { - throw NotYetImplementedException("Method submitJob not implemented by Batch Manager \"" + _type + "\""); + exportInputFiles(job); + preprocess(job); + return runJob(job); } // Methode pour le controle des jobs : retire un job du gestionnaire @@ -360,4 +362,35 @@ namespace Batch { return _protocol; } + void BatchManager::preprocess(const Batch::Job & job) + { + std::string preproCommand; + std::string workDir; + Parametre params = job.getParametre(); + if (params.find(PREPROCESS) != params.end()) + preproCommand = params[PREPROCESS].str(); + if (params.find(WORKDIR) != params.end()) + workDir = params[WORKDIR].str(); + + if(!preproCommand.empty() && !workDir.empty()) + { + std::string subCommand = "cd " + workDir + "; " + preproCommand; + std::string command = _protocol.getExecCommand(subCommand, _hostname, _username); + command += " 2>&1"; + LOG(command); + + // submit job + std::string output; + int status = Utils::getCommandOutput(command, output); + LOG(output); + if (status != 0) + throw RunTimeException("Error when executing: " + command + + "\nOutput:" + output); + } + } + + const Batch::JobId BatchManager::runJob(const Batch::Job & job) + { + throw NotYetImplementedException("Method runJob not implemented by Batch Manager \"" + _type + "\""); + } } diff --git a/src/Core/BatchManager.hxx b/src/Core/BatchManager.hxx index 7d78f9c..9bc9ddd 100644 --- a/src/Core/BatchManager.hxx +++ b/src/Core/BatchManager.hxx @@ -94,7 +94,15 @@ namespace Batch { MpiImpl *_mpiImpl; // Mpi implementation to launch executable in batch script MpiImpl* FactoryMpiImpl(std::string mpiImpl); - void exportInputFiles(const Job & job); + virtual void exportInputFiles(const Job & job); + + // Preprocessing done on the frontal using "PREPROCESS" parameter as a script. + // May throw exceptions in case of failure. + virtual void preprocess(const Batch::Job & job); + + // Submit to batch manager, but we suppose input files are already copied + // and preprocess finished without error + virtual const Batch::JobId runJob(const Batch::Job & job); private: diff --git a/src/Core/Constants.cxx b/src/Core/Constants.cxx index 1352180..480919f 100644 --- a/src/Core/Constants.cxx +++ b/src/Core/Constants.cxx @@ -52,6 +52,7 @@ namespace Batch { def_Constant(MEMPERCPU); def_Constant(WCKEY); def_Constant(EXTRAPARAMS); + def_Constant(PREPROCESS); // Parameters for COORM def_Constant(LAUNCHER_FILE); diff --git a/src/Core/Constants.hxx b/src/Core/Constants.hxx index 6c7552a..ac07e1d 100644 --- a/src/Core/Constants.hxx +++ b/src/Core/Constants.hxx @@ -63,6 +63,7 @@ namespace Batch { decl_extern_Constant(MEMPERCPU); decl_extern_Constant(WCKEY); decl_extern_Constant(EXTRAPARAMS); + decl_extern_Constant(PREPROCESS); // Parameters for COORM decl_extern_Constant(LAUNCHER_FILE); diff --git a/src/Core/ParameterTypeMap.cxx b/src/Core/ParameterTypeMap.cxx index 897eede..8b23095 100644 --- a/src/Core/ParameterTypeMap.cxx +++ b/src/Core/ParameterTypeMap.cxx @@ -65,8 +65,9 @@ namespace Batch { addParameter("MEMPERCPU", LONG, 1); addParameter("WCKEY", STRING, 1); addParameter("EXTRAPARAMS", STRING, 1); + addParameter("PREPROCESS", STRING, 1); - // Parameters for COORM + // Parameters for COORM addParameter("LAUNCHER_FILE", STRING, 1); addParameter("LAUNCHER_ARGS", STRING, 1); } diff --git a/src/Core/RunTimeException.hxx b/src/Core/RunTimeException.hxx index 09a37e1..6bb01e6 100644 --- a/src/Core/RunTimeException.hxx +++ b/src/Core/RunTimeException.hxx @@ -42,7 +42,7 @@ namespace Batch { class BATCH_EXPORT RunTimeException : public GenericException { public: - // Constructeur + // Constructeur RunTimeException(std::string ch = "undefined") : GenericException("RunTimeException", ch) {} }; diff --git a/src/LSF/BatchManager_LSF.cxx b/src/LSF/BatchManager_LSF.cxx index 7956303..732ff0b 100644 --- a/src/LSF/BatchManager_LSF.cxx +++ b/src/LSF/BatchManager_LSF.cxx @@ -59,15 +59,11 @@ namespace Batch { } // Methode pour le controle des jobs : soumet un job au gestionnaire - const JobId BatchManager_LSF::submitJob(const Job & job) + const JobId BatchManager_LSF::runJob(const Job & job) { Parametre params = job.getParametre(); const std::string workDir = params[WORKDIR]; - // export input files on cluster - LOG("Export des fichiers en entree"); - exportInputFiles(job); - // build batch script for job LOG("Construction du script de batch"); string scriptFile = buildSubmissionScript(job); diff --git a/src/LSF/BatchManager_LSF.hxx b/src/LSF/BatchManager_LSF.hxx index 3d6fb1e..d521609 100644 --- a/src/LSF/BatchManager_LSF.hxx +++ b/src/LSF/BatchManager_LSF.hxx @@ -53,7 +53,7 @@ namespace Batch { // static string BatchManager_LSF::getDefaultServer(); // Methodes pour le controle des jobs - virtual const JobId submitJob(const Job & job); // soumet un job au gestionnaire + virtual const JobId runJob(const Job & job); // soumet un job au gestionnaire virtual void deleteJob(const JobId & jobid); // retire un job du gestionnaire virtual JobInfo queryJob(const JobId & jobid); // renvoie l'etat du job virtual bool isRunning(const JobId & jobid); // teste si un job est present en machine diff --git a/src/LoadLeveler/BatchManager_LL.cxx b/src/LoadLeveler/BatchManager_LL.cxx index 4f89f04..53396ed 100644 --- a/src/LoadLeveler/BatchManager_LL.cxx +++ b/src/LoadLeveler/BatchManager_LL.cxx @@ -57,14 +57,11 @@ namespace Batch { } // Method to submit a job to the batch manager - const JobId BatchManager_LL::submitJob(const Job & job) + const JobId BatchManager_LL::runJob(const Job & job) { Parametre params = job.getParametre(); const string workDir = params[WORKDIR]; - // export input files on cluster - exportInputFiles(job); - // build command file to submit the job and copy it on the server string cmdFile = buildCommandFile(job); diff --git a/src/LoadLeveler/BatchManager_LL.hxx b/src/LoadLeveler/BatchManager_LL.hxx index cbff7f0..716cd19 100644 --- a/src/LoadLeveler/BatchManager_LL.hxx +++ b/src/LoadLeveler/BatchManager_LL.hxx @@ -48,7 +48,7 @@ namespace Batch { virtual ~BatchManager_LL(); // Methods to control jobs - virtual const JobId submitJob(const Job & job); + virtual const JobId runJob(const Job & job); virtual void deleteJob(const JobId & jobid); virtual JobInfo queryJob(const JobId & jobid); diff --git a/src/Local/BatchManager_Local.cxx b/src/Local/BatchManager_Local.cxx index 0c08993..f913ee6 100644 --- a/src/Local/BatchManager_Local.cxx +++ b/src/Local/BatchManager_Local.cxx @@ -93,11 +93,8 @@ namespace Batch { } // Methode pour le controle des jobs : soumet un job au gestionnaire - const JobId BatchManager_Local::submitJob(const Job & job) + const JobId BatchManager_Local::runJob(const Job & job) { - // export input files in the working directory of the execution host - exportInputFiles(job); - Job_Local jobLocal = job; Id id = _idCounter++; ThreadAdapter * p_ta = new ThreadAdapter(*this, job, id); diff --git a/src/Local/BatchManager_Local.hxx b/src/Local/BatchManager_Local.hxx index 4f7e0fb..d4e89d8 100644 --- a/src/Local/BatchManager_Local.hxx +++ b/src/Local/BatchManager_Local.hxx @@ -132,7 +132,7 @@ namespace Batch { // static string BatchManager_Local::getDefaultServer(); // Methodes pour le controle des jobs - virtual const JobId submitJob(const Job & job); // soumet un job au gestionnaire + virtual const JobId runJob(const Job & job); // soumet un job au gestionnaire virtual void deleteJob(const JobId & jobid); // retire un job du gestionnaire virtual void holdJob(const JobId & jobid); // suspend un job en file d'attente virtual void releaseJob(const JobId & jobid); // relache un job suspendu diff --git a/src/OAR/BatchManager_OAR.cxx b/src/OAR/BatchManager_OAR.cxx index 503ad8d..0989bf2 100644 --- a/src/OAR/BatchManager_OAR.cxx +++ b/src/OAR/BatchManager_OAR.cxx @@ -31,36 +31,33 @@ using namespace std; namespace Batch { - BatchManager_OAR::BatchManager_OAR(const FactBatchManager * parent, const char * host, - const char * username, - CommunicationProtocolType protocolType, const char * mpiImpl) - : BatchManager(parent, host, username, protocolType, mpiImpl) - { - } - - BatchManager_OAR::~BatchManager_OAR() - { - } - - // Soumet un job au gestionnaire - const JobId BatchManager_OAR::submitJob(const Job & job) - { - Parametre params = job.getParametre(); - const string workDir = params[WORKDIR]; - const string fileToExecute = params[EXECUTABLE]; - string::size_type p1 = fileToExecute.find_last_of("/"); - string::size_type p2 = fileToExecute.find_last_of("."); - std::string fileNameToExecute = fileToExecute.substr(p1+1,p2-p1-1); - - // export input files on cluster - exportInputFiles(job); - - // build batch script for job - string scriptFile = buildBatchScript(job); + BatchManager_OAR::BatchManager_OAR(const FactBatchManager * parent, const char * host, + const char * username, + CommunicationProtocolType protocolType, const char * mpiImpl) + : BatchManager(parent, host, username, protocolType, mpiImpl) + { + } + + BatchManager_OAR::~BatchManager_OAR() + { + } + + // Soumet un job au gestionnaire + const JobId BatchManager_OAR::runJob(const Job & job) + { + Parametre params = job.getParametre(); + const string workDir = params[WORKDIR]; + const string fileToExecute = params[EXECUTABLE]; + string::size_type p1 = fileToExecute.find_last_of("/"); + string::size_type p2 = fileToExecute.find_last_of("."); + std::string fileNameToExecute = fileToExecute.substr(p1+1,p2-p1-1); + + // build batch script for job + string scriptFile = buildBatchScript(job); // define command to submit batch - string subCommand = string("oarsub -t allow_classic_ssh -d ") + workDir + " -S " + workDir + "/" + scriptFile; - string command = _protocol.getExecCommand(subCommand, _hostname, _username); + string subCommand = string("oarsub -t allow_classic_ssh -d ") + workDir + " -S " + workDir + "/" + scriptFile; + string command = _protocol.getExecCommand(subCommand, _hostname, _username); command += " 2>&1"; LOG(command); @@ -70,33 +67,33 @@ namespace Batch LOG(output); if (status != 0) throw RunTimeException("Can't submit job, error was: " + output); - // read id of submitted job in output - istringstream logfile(output); - string sline, idline, id; - - if (logfile) - { - while (getline(logfile, sline) && sline != "") - { - idline = sline; - } - - vector tokens; - JobInfo::Tokenize(idline, tokens, "="); - id = tokens[1]; - } - else - { - throw RunTimeException("Error in the submission of the job on the remote host"); - } - - JobId jobid(this, id); - return jobid; - } - - // retire un job du gestionnaire - void BatchManager_OAR::deleteJob(const JobId & jobid) - { + // read id of submitted job in output + istringstream logfile(output); + string sline, idline, id; + + if (logfile) + { + while (getline(logfile, sline) && sline != "") + { + idline = sline; + } + + vector tokens; + JobInfo::Tokenize(idline, tokens, "="); + id = tokens[1]; + } + else + { + throw RunTimeException("Error in the submission of the job on the remote host"); + } + + JobId jobid(this, id); + return jobid; + } + + // retire un job du gestionnaire + void BatchManager_OAR::deleteJob(const JobId & jobid) + { // define command to delete job string subCommand = "oardel " + jobid.getReference(); string command = _protocol.getExecCommand(subCommand, _hostname, _username); @@ -107,11 +104,11 @@ namespace Batch throw RunTimeException("Can't delete job " + jobid.getReference()); LOG("job " << jobid.getReference() << " killed"); - } + } - // Renvoie l'etat du job - JobInfo BatchManager_OAR::queryJob(const JobId & jobid) - { + // Renvoie l'etat du job + JobInfo BatchManager_OAR::queryJob(const JobId & jobid) + { // define command to query batch string subCommand = "oarstat -fj " + jobid.getReference(); string command = _protocol.getExecCommand(subCommand, _hostname, _username); @@ -123,19 +120,19 @@ namespace Batch JobInfo_OAR jobinfo = JobInfo_OAR(jobid.getReference(), output); return jobinfo; - } + } - string BatchManager_OAR::buildBatchScript(const Job & job) - { - Parametre params = job.getParametre(); + string BatchManager_OAR::buildBatchScript(const Job & job) + { + Parametre params = job.getParametre(); - // Job Parameters - string workDir = ""; - string fileToExecute = ""; - string tmpDir = ""; - int nbproc = 0; - int mem = 0; - string queue = ""; + // Job Parameters + string workDir = ""; + string fileToExecute = ""; + string tmpDir = ""; + int nbproc = 0; + int mem = 0; + string queue = ""; // Mandatory parameters if (params.find(WORKDIR) != params.end()) @@ -147,85 +144,85 @@ namespace Batch else throw RunTimeException("params[EXECUTABLE] is not defined. Please define it, cannot submit this job."); - // Optional parameters - if (params.find(NBPROC) != params.end()) - nbproc = params[NBPROC]; + // Optional parameters + if (params.find(NBPROC) != params.end()) + nbproc = params[NBPROC]; int nbprocpernode = 1; if (params.find(NBPROCPERNODE) != params.end()) nbprocpernode = params[NBPROCPERNODE]; long walltimeSecs = 0; - if (params.find(MAXWALLTIME) != params.end()) - walltimeSecs = (long)params[MAXWALLTIME] * 60; - if (params.find(MAXRAMSIZE) != params.end()) - mem = params[MAXRAMSIZE]; - if (params.find(QUEUE) != params.end()) - queue = params[QUEUE].str(); - - string::size_type p1 = fileToExecute.find_last_of("/"); - string::size_type p2 = fileToExecute.find_last_of("."); - string rootNameToExecute = fileToExecute.substr(p1+1,p2-p1-1); - string fileNameToExecute = fileToExecute.substr(p1+1); - - // Create batch submit file + if (params.find(MAXWALLTIME) != params.end()) + walltimeSecs = (long)params[MAXWALLTIME] * 60; + if (params.find(MAXRAMSIZE) != params.end()) + mem = params[MAXRAMSIZE]; + if (params.find(QUEUE) != params.end()) + queue = params[QUEUE].str(); + + string::size_type p1 = fileToExecute.find_last_of("/"); + string::size_type p2 = fileToExecute.find_last_of("."); + string rootNameToExecute = fileToExecute.substr(p1+1,p2-p1-1); + string fileNameToExecute = fileToExecute.substr(p1+1); + + // Create batch submit file ofstream tempOutputFile; string tmpFileName = Utils::createAndOpenTemporaryFile("OAR-script", tempOutputFile); - tempOutputFile << "#!/bin/sh -f" << endl; - - int nb_full_nodes(0); - int nb_proc_on_last_node(0); - - if (nbproc > 0) - { - nb_full_nodes = nbproc / nbprocpernode; - nb_proc_on_last_node = nbproc % nbprocpernode; - - // In exclusive mode, we reserve all procs on the nodes - if (params.find(EXCLUSIVE) != params.end() && params[EXCLUSIVE] && nb_proc_on_last_node > 0) - { - nb_full_nodes += 1; - nb_proc_on_last_node = 0; - } - } - - if (nb_full_nodes > 0) - { - tempOutputFile << "#OAR -l nodes=" << nb_full_nodes; - if (walltimeSecs > 0) - { - tempOutputFile << ",walltime=" << convertSecTo_H_M_S(walltimeSecs) << endl; - } - else - { - tempOutputFile << endl; - } - } - else - { - if (walltimeSecs > 0) - { - tempOutputFile << "#OAR -l walltime=" << convertSecTo_H_M_S(walltimeSecs) << endl; - } - } - - if (queue != "") - { - tempOutputFile << "#OAR -q " << queue << endl; - } - - tempOutputFile << "#OAR -O " << tmpDir << "/logs/output.log." << rootNameToExecute << endl; - tempOutputFile << "#OAR -E " << tmpDir << "/logs/error.log." << rootNameToExecute << endl; - - tempOutputFile << "export LIBBATCH_NODEFILE=$OAR_NODEFILE" << endl; - - // Launch the executable - tempOutputFile << "cd " << tmpDir << endl; - tempOutputFile << "./" + fileNameToExecute << endl; - tempOutputFile.flush(); - tempOutputFile.close(); - - Utils::chmod(tmpFileName.c_str(), 0x1ED); - LOG("Batch script file generated is: " << tmpFileName); + tempOutputFile << "#!/bin/sh -f" << endl; + + int nb_full_nodes(0); + int nb_proc_on_last_node(0); + + if (nbproc > 0) + { + nb_full_nodes = nbproc / nbprocpernode; + nb_proc_on_last_node = nbproc % nbprocpernode; + + // In exclusive mode, we reserve all procs on the nodes + if (params.find(EXCLUSIVE) != params.end() && params[EXCLUSIVE] && nb_proc_on_last_node > 0) + { + nb_full_nodes += 1; + nb_proc_on_last_node = 0; + } + } + + if (nb_full_nodes > 0) + { + tempOutputFile << "#OAR -l nodes=" << nb_full_nodes; + if (walltimeSecs > 0) + { + tempOutputFile << ",walltime=" << convertSecTo_H_M_S(walltimeSecs) << endl; + } + else + { + tempOutputFile << endl; + } + } + else + { + if (walltimeSecs > 0) + { + tempOutputFile << "#OAR -l walltime=" << convertSecTo_H_M_S(walltimeSecs) << endl; + } + } + + if (queue != "") + { + tempOutputFile << "#OAR -q " << queue << endl; + } + + tempOutputFile << "#OAR -O " << tmpDir << "/logs/output.log." << rootNameToExecute << endl; + tempOutputFile << "#OAR -E " << tmpDir << "/logs/error.log." << rootNameToExecute << endl; + + tempOutputFile << "export LIBBATCH_NODEFILE=$OAR_NODEFILE" << endl; + + // Launch the executable + tempOutputFile << "cd " << tmpDir << endl; + tempOutputFile << "./" + fileNameToExecute << endl; + tempOutputFile.flush(); + tempOutputFile.close(); + + Utils::chmod(tmpFileName.c_str(), 0x1ED); + LOG("Batch script file generated is: " << tmpFileName); string remoteFileName = rootNameToExecute + "_Batch.sh"; int status = _protocol.copyFile(tmpFileName, "", "", @@ -235,17 +232,17 @@ namespace Batch throw RunTimeException("Cannot copy batch submission file on host " + _hostname); return remoteFileName; - } - - string BatchManager_OAR::convertSecTo_H_M_S(long seconds) const - { - int h(seconds / 3600); - int m((seconds % 3600) / 60); - int s((seconds % 3600) % 60); - - stringstream ss; - ss << h << ":" << m << ":" << s; - - return ss.str(); - } + } + + string BatchManager_OAR::convertSecTo_H_M_S(long seconds) const + { + int h(seconds / 3600); + int m((seconds % 3600) / 60); + int s((seconds % 3600) % 60); + + stringstream ss; + ss << h << ":" << m << ":" << s; + + return ss.str(); + } } diff --git a/src/OAR/BatchManager_OAR.hxx b/src/OAR/BatchManager_OAR.hxx index 9e186d4..8adb876 100644 --- a/src/OAR/BatchManager_OAR.hxx +++ b/src/OAR/BatchManager_OAR.hxx @@ -28,42 +28,42 @@ namespace Batch { - class BATCH_EXPORT BatchManager_OAR : public BatchManager - { - public: - // Constructeur - BatchManager_OAR(const FactBatchManager * parent, const char * host="localhost", - const char * username="", - CommunicationProtocolType protocolType = SSH, const char * mpiImpl="nompi"); + class BATCH_EXPORT BatchManager_OAR : public BatchManager + { + public: + // Constructeur + BatchManager_OAR(const FactBatchManager * parent, const char * host="localhost", + const char * username="", + CommunicationProtocolType protocolType = SSH, const char * mpiImpl="nompi"); - // Destructeur - virtual ~BatchManager_OAR(); + // Destructeur + virtual ~BatchManager_OAR(); - // Soumet un job - virtual const JobId submitJob(const Job & job); + // Soumet un job + virtual const JobId runJob(const Job & job); - // Supprime un job - virtual void deleteJob(const JobId & jobid); + // Supprime un job + virtual void deleteJob(const JobId & jobid); - // Donne l'etat du job - virtual JobInfo queryJob(const JobId & jobid); + // Donne l'etat du job + virtual JobInfo queryJob(const JobId & jobid); - // Modifie un job en file d'attente - virtual void setParametre(const JobId & jobid, const Parametre & param) { return alterJob(jobid, param); } + // Modifie un job en file d'attente + virtual void setParametre(const JobId & jobid, const Parametre & param) { return alterJob(jobid, param); } - // Modifie un job en file d'attente - virtual void setEnvironnement(const JobId & jobid, const Environnement & env) { return alterJob(jobid, env); } + // Modifie un job en file d'attente + virtual void setEnvironnement(const JobId & jobid, const Environnement & env) { return alterJob(jobid, env); } - protected: - std::string buildBatchScript(const Job & job); - std::string convertSecTo_H_M_S(long seconds) const; + protected: + std::string buildBatchScript(const Job & job); + std::string convertSecTo_H_M_S(long seconds) const; #ifdef SWIG - public: - // Recupere le l'identifiant d'un job deja soumis au BatchManager - virtual const JobId getJobIdByReference(const char * ref) { return BatchManager::getJobIdByReference(ref); } + public: + // Recupere l'identifiant d'un job deja soumis au BatchManager + virtual const JobId getJobIdByReference(const char * ref) { return BatchManager::getJobIdByReference(ref); } #endif - }; + }; } #endif diff --git a/src/PBS/BatchManager_PBS.cxx b/src/PBS/BatchManager_PBS.cxx index 14255ba..f8ab1fe 100644 --- a/src/PBS/BatchManager_PBS.cxx +++ b/src/PBS/BatchManager_PBS.cxx @@ -60,14 +60,11 @@ namespace Batch { } // Methode pour le controle des jobs : soumet un job au gestionnaire - const JobId BatchManager_PBS::submitJob(const Job & job) + const JobId BatchManager_PBS::runJob(const Job & job) { Parametre params = job.getParametre(); const std::string workDir = params[WORKDIR]; - // export input files on cluster - exportInputFiles(job); - // build batch script for job string scriptFile = buildSubmissionScript(job); diff --git a/src/PBS/BatchManager_PBS.hxx b/src/PBS/BatchManager_PBS.hxx index 9cbd3a6..2de7f45 100644 --- a/src/PBS/BatchManager_PBS.hxx +++ b/src/PBS/BatchManager_PBS.hxx @@ -53,7 +53,7 @@ namespace Batch { // static string BatchManager_LSF::getDefaultServer(); // Methodes pour le controle des jobs - virtual const JobId submitJob(const Job & job); // soumet un job au gestionnaire + virtual const JobId runJob(const Job & job); // soumet un job au gestionnaire virtual void deleteJob(const JobId & jobid); // retire un job du gestionnaire virtual JobInfo queryJob(const JobId & jobid); // renvoie l'etat du job virtual bool isRunning(const JobId & jobid); // teste si un job est present en machine diff --git a/src/SGE/BatchManager_SGE.cxx b/src/SGE/BatchManager_SGE.cxx index 3eee6b4..f89e2fb 100644 --- a/src/SGE/BatchManager_SGE.cxx +++ b/src/SGE/BatchManager_SGE.cxx @@ -72,7 +72,7 @@ namespace Batch { } // Methode pour le controle des jobs : soumet un job au gestionnaire - const JobId BatchManager_SGE::submitJob(const Job & job) + const JobId BatchManager_SGE::runJob(const Job & job) { Parametre params = job.getParametre(); const std::string workDir = params[WORKDIR]; @@ -81,9 +81,6 @@ namespace Batch { string::size_type p2 = fileToExecute.find_last_of("."); std::string fileNameToExecute = fileToExecute.substr(p1+1,p2-p1-1); - // export input files on cluster - exportInputFiles(job); - // build batch script for job buildBatchScript(job); diff --git a/src/SGE/BatchManager_SGE.hxx b/src/SGE/BatchManager_SGE.hxx index e9065a3..8ef6a0f 100644 --- a/src/SGE/BatchManager_SGE.hxx +++ b/src/SGE/BatchManager_SGE.hxx @@ -53,7 +53,7 @@ namespace Batch { // static string BatchManager_LSF::getDefaultServer(); // Methodes pour le controle des jobs - virtual const JobId submitJob(const Job & job); // soumet un job au gestionnaire + virtual const JobId runJob(const Job & job); // soumet un job au gestionnaire virtual void deleteJob(const JobId & jobid); // retire un job du gestionnaire virtual JobInfo queryJob(const JobId & jobid); // renvoie l'etat du job virtual bool isRunning(const JobId & jobid); // teste si un job est present en machine diff --git a/src/Slurm/BatchManager_Slurm.cxx b/src/Slurm/BatchManager_Slurm.cxx index b768386..4bb013d 100644 --- a/src/Slurm/BatchManager_Slurm.cxx +++ b/src/Slurm/BatchManager_Slurm.cxx @@ -55,14 +55,11 @@ namespace Batch { } // Method to submit a job to the batch manager - const JobId BatchManager_Slurm::submitJob(const Job & job) + const JobId BatchManager_Slurm::runJob(const Job & job) { Parametre params = job.getParametre(); const string workDir = params[WORKDIR]; - // export input files on cluster - exportInputFiles(job); - // build command file to submit the job and copy it on the server string cmdFile = buildCommandFile(job); diff --git a/src/Slurm/BatchManager_Slurm.hxx b/src/Slurm/BatchManager_Slurm.hxx index 7ee37a2..f21b27d 100644 --- a/src/Slurm/BatchManager_Slurm.hxx +++ b/src/Slurm/BatchManager_Slurm.hxx @@ -51,7 +51,7 @@ namespace Batch { virtual ~BatchManager_Slurm(); // Methods to control jobs - virtual const JobId submitJob(const Job & job); + virtual const JobId runJob(const Job & job); virtual void deleteJob(const JobId & jobid); virtual JobInfo queryJob(const JobId & jobid); diff --git a/src/Vishnu/BatchManager_Vishnu.cxx b/src/Vishnu/BatchManager_Vishnu.cxx index 09aacf0..ef518a4 100644 --- a/src/Vishnu/BatchManager_Vishnu.cxx +++ b/src/Vishnu/BatchManager_Vishnu.cxx @@ -61,11 +61,8 @@ namespace Batch { } // Method to submit a job to the batch manager - const JobId BatchManager_Vishnu::submitJob(const Job & job) + const JobId BatchManager_Vishnu::runJob(const Job & job) { - // export input files on cluster - exportInputFiles(job); - // build command file to submit the job string cmdFile = buildCommandFile(job); diff --git a/src/Vishnu/BatchManager_Vishnu.hxx b/src/Vishnu/BatchManager_Vishnu.hxx index 7b4b8fc..5ac9087 100644 --- a/src/Vishnu/BatchManager_Vishnu.hxx +++ b/src/Vishnu/BatchManager_Vishnu.hxx @@ -51,7 +51,7 @@ namespace Batch { virtual ~BatchManager_Vishnu(); // Methods to control jobs - virtual const JobId submitJob(const Job & job); + virtual const JobId runJob(const Job & job); virtual void deleteJob(const JobId & jobid); virtual JobInfo queryJob(const JobId & jobid); virtual void importOutputFiles(const Job & job, const std::string directory);