1 // Copyright (C) 2007-2015 CEA/DEN, EDF R&D, OPEN CASCADE
3 // Copyright (C) 2003-2007 OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 // CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
6 // This library is free software; you can redistribute it and/or
7 // modify it under the terms of the GNU Lesser General Public
8 // License as published by the Free Software Foundation; either
9 // version 2.1 of the License, or (at your option) any later version.
11 // This library is distributed in the hope that it will be useful,
12 // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 // Lesser General Public License for more details.
16 // You should have received a copy of the GNU Lesser General Public
17 // License along with this library; if not, write to the Free Software
18 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
20 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
25 * Auteur : Ivan DUTKA-MALEN - EDF R&D
26 * Date : Septembre 2003
35 #include "Constants.hxx"
38 #include "JobInfo.hxx"
39 #include "FactBatchManager.hxx"
40 #include "BatchManager.hxx"
42 #include "NotYetImplementedException.hxx"
49 BatchManager::BatchManager(const Batch::FactBatchManager * parent, const char* host,
50 const char * username,
51 CommunicationProtocolType protocolType, const char* mpiImpl)
52 : _hostname(host), jobid_map(), _type(parent->getType()),
53 _protocol(CommunicationProtocol::getInstance(protocolType)),
54 _username(username), _mpiImpl(FactoryMpiImpl(mpiImpl))
60 BatchManager::~BatchManager()
65 string BatchManager::__repr__() const
68 oss << "<BatchManager of type '" << _type << "' connected to server '" << _hostname << "'>";
72 // Recupere le l'identifiant d'un job deja soumis au BatchManager
73 const JobId BatchManager::getJobIdByReference(const char * ref)
75 return JobId(this, ref);
78 // Methode pour le controle des jobs : soumet un job au gestionnaire
79 const JobId BatchManager::submitJob(const Job & job)
81 exportInputFiles(job);
86 // Methode pour le controle des jobs : retire un job du gestionnaire
87 void BatchManager::deleteJob(const JobId & jobid)
89 throw NotYetImplementedException("Method deleteJob not implemented by Batch Manager \"" + _type + "\"");
92 // Methode pour le controle des jobs : suspend un job en file d'attente
93 void BatchManager::holdJob(const JobId & jobid)
95 throw NotYetImplementedException("Method holdJob not implemented by Batch Manager \"" + _type + "\"");
98 // Methode pour le controle des jobs : relache un job suspendu
99 void BatchManager::releaseJob(const JobId & jobid)
101 throw NotYetImplementedException("Method releaseJob not implemented by Batch Manager \"" + _type + "\"");
104 // Methode pour le controle des jobs : modifie un job en file d'attente
105 void BatchManager::alterJob(const JobId & jobid, const Parametre & param, const Environnement & env)
107 throw NotYetImplementedException("Method alterJob not implemented by Batch Manager \"" + _type + "\"");
110 // Methode pour le controle des jobs : modifie un job en file d'attente
111 void BatchManager::alterJob(const JobId & jobid, const Parametre & param)
113 throw NotYetImplementedException("Method alterJob not implemented by Batch Manager \"" + _type + "\"");
116 // Methode pour le controle des jobs : modifie un job en file d'attente
117 void BatchManager::alterJob(const JobId & jobid, const Environnement & env)
119 throw NotYetImplementedException("Method alterJob not implemented by Batch Manager \"" + _type + "\"");
122 // Methode pour le controle des jobs : renvoie l'etat du job
123 JobInfo BatchManager::queryJob(const JobId & jobid)
125 throw NotYetImplementedException("Method queryJob not implemented by Batch Manager \"" + _type + "\"");
128 const JobId BatchManager::addJob(const Job & job, const string & reference)
130 return JobId(this, reference);
133 //! Wait for the end of a job
135 * This method is a simple way to wait for a job to end. It will query the job state at
136 * increasing intervals and return when the job is finished (whether successfully or not) or
137 * when the timeout is reached. This method is not intended to be generic. In many cases you
138 * will have to write your own loop to wait for the end of a job.
139 * \param jobid ID of the job to wait for.
140 * \param timeout Maximum time to wait in seconds. If -1 (default), wait indefinitely.
141 * \param initSleepTime Initial time in seconds between two queries for the job state (default is 1).
142 * \param maxSleepTime Maximum time in seconds between two queries for the job state (default is 600).
143 * \return The job state as returned by the last query.
145 string BatchManager::waitForJobEnd(const JobId & jobid, long timeout,
146 long initSleepTime, long maxSleepTime)
149 int sleeptime = initSleepTime;
150 bool testTimeout = (timeout > -1);
151 bool timeoutReached = (testTimeout && time >= timeout);
152 JobInfo jinfo = jobid.queryJob();
153 string state = jinfo.getParametre()[STATE].str();
154 while (!timeoutReached && state != FINISHED && state != FAILED) {
155 LOG("State is \"" << state << "\"" << ", sleeping " << sleeptime << "s...");
156 Utils::sleep(sleeptime);
158 timeoutReached = (testTimeout && time >= timeout);
160 if (testTimeout && sleeptime > timeout - time)
161 sleeptime = timeout - time;
162 if (sleeptime > maxSleepTime)
163 sleeptime = maxSleepTime;
164 jinfo = jobid.queryJob();
165 state = jinfo.getParametre()[STATE].str();
167 LOG("State is \"" << state << "\"");
172 void BatchManager::exportInputFiles(const Job& job)
175 Parametre params = job.getParametre();
176 const Versatile & V = params[INFILE];
177 Versatile::const_iterator Vit;
179 // Create remote directories
180 string logdir = string(params[WORKDIR]) + "/logs";
181 status = _protocol.makeDirectory(logdir, _hostname, _username);
183 std::ostringstream oss;
184 oss << "Cannot create directory " << logdir << " on host " << _hostname;
185 oss << ". Return status is " << status;
186 throw RunTimeException(oss.str());
189 // Copy the file to execute into the remote working directory
190 string executeFile = params[EXECUTABLE];
191 if (executeFile.size() != 0) {
192 status = _protocol.copyFile(executeFile, "", "",
193 params[WORKDIR], _hostname, _username);
195 std::ostringstream oss;
196 oss << "Cannot copy file " << executeFile << " on host " << _hostname;
197 oss << ". Return status is " << status;
198 throw RunTimeException(oss.str());
202 if (_protocol.getType() != SH) {
203 // On Windows, we make the remote file executable afterward because
204 // pscp does not preserve access permissions on files
206 string remoteExec = string(params[EXECUTABLE]);
207 remoteExec = remoteExec.substr(remoteExec.rfind("/") + 1, remoteExec.length());
208 remoteExec = string(params[WORKDIR]) + "/" + remoteExec;
210 string subCommand = string("chmod u+x ") + remoteExec;
211 string command = _protocol.getExecCommand(subCommand, _hostname, _username);
213 status = system(command.c_str());
215 std::ostringstream oss;
216 oss << "Cannot change permissions of file " << remoteExec << " on host " << _hostname;
217 oss << ". Return status is " << status;
218 throw RunTimeException(oss.str());
224 // Copy input files into the remote working directory
225 for (Vit=V.begin() ; Vit!=V.end() ; Vit++) {
226 CoupleType cpt = *static_cast< CoupleType * >(*Vit);
227 Couple inputFile = cpt;
228 string remotePath = inputFile.getRemote();
229 if (!Utils::isAbsolutePath(remotePath)) {
230 remotePath = params[WORKDIR].str() + "/" + remotePath;
232 status = _protocol.copyFile(inputFile.getLocal(), "", "",
233 remotePath, _hostname, _username);
235 std::ostringstream oss;
236 oss << "Cannot copy file " << inputFile.getLocal() << " on host " << _hostname;
237 oss << ". Return status is " << status;
238 throw RunTimeException(oss.str());
244 void BatchManager::importOutputFiles( const Job & job, const string directory )
246 Parametre params = job.getParametre();
247 const Versatile & V = params[OUTFILE];
248 Versatile::const_iterator Vit;
250 // Create local result directory
251 int status = CommunicationProtocol::getInstance(SH).makeDirectory(directory, "", "");
253 LOG("Directory creation failed. Status is: " << status);
255 for(Vit=V.begin(); Vit!=V.end(); Vit++) {
256 CoupleType cpt = *static_cast< CoupleType * >(*Vit);
257 Couple outputFile = cpt;
258 string remotePath = outputFile.getRemote();
259 if (!Utils::isAbsolutePath(remotePath)) {
260 remotePath = params[WORKDIR].str() + "/" + remotePath;
262 string localPath = outputFile.getLocal();
263 if (!Utils::isAbsolutePath(localPath)) {
264 localPath = directory + "/" + localPath;
266 status = CommunicationProtocol::getInstance(SH).makeDirectory(
267 Utils::dirname(localPath), "", "");
269 LOG("Directory creation failed. Status is: " << status);
270 status = _protocol.copyFile(remotePath, _hostname, _username,
273 LOG("Copy command failed. Status is: " << status);
277 status = _protocol.copyFile(string(params[WORKDIR]) + string("/logs"), _hostname, _username,
280 LOG("Copy logs directory failed. Status is: " << status);
283 bool BatchManager::importDumpStateFile( const Job & job, const string directory )
285 Parametre params = job.getParametre();
287 // Create local result directory
288 int status = CommunicationProtocol::getInstance(SH).makeDirectory(directory, "", "");
290 LOG("Directory creation failed. Status is: " << status);
293 status = _protocol.copyFile(string(params[WORKDIR]) + string("/dumpState*.xml"), _hostname, _username,
296 LOG("Copy command failed. Status is: " << status);
302 bool BatchManager::importWorkFile( const Job & job,
303 const std::string& work_file,
304 const std::string& directory )
306 Parametre params = job.getParametre();
308 // Create local result directory
309 int status = CommunicationProtocol::getInstance(SH).makeDirectory(directory, "", "");
311 LOG("Directory creation failed. Status is: " << status);
314 status = _protocol.copyFile(string(params[WORKDIR]) + "/" + work_file,
315 _hostname, _username,
318 LOG("Copy command failed. Status is: " << status);
324 void BatchManager::clearWorkingDir( const Job & job)
326 Parametre params = job.getParametre();
328 string wd = params[WORKDIR];
329 if(!wd.empty() && wd != "/")
331 int status = _protocol.removeDirectory(wd, _hostname, _username);
333 LOG("removeDirectory command failed. Status is: " << status);
336 LOG("removeDirectory command failed. Invalid working directory: " << wd);
339 MpiImpl *BatchManager::FactoryMpiImpl(string mpiImpl)
342 return new MpiImpl_LAM();
343 else if(mpiImpl == "mpich1")
344 return new MpiImpl_MPICH1();
345 else if(mpiImpl == "mpich2")
346 return new MpiImpl_MPICH2();
347 else if(mpiImpl == "openmpi")
348 return new MpiImpl_OPENMPI();
349 else if(mpiImpl == "ompi")
350 return new MpiImpl_OMPI();
351 else if(mpiImpl == "slurm")
352 return new MpiImpl_SLURM();
353 else if(mpiImpl == "prun")
354 return new MpiImpl_PRUN();
355 else if(mpiImpl == "nompi")
359 oss << mpiImpl << " : not yet implemented";
360 throw RunTimeException(oss.str().c_str());
364 const CommunicationProtocol & BatchManager::getProtocol() const
369 void BatchManager::preprocess(const Batch::Job & job)
371 std::string preproCommand;
373 Parametre params = job.getParametre();
374 if (params.find(PREPROCESS) != params.end())
375 preproCommand = params[PREPROCESS].str();
376 if (params.find(WORKDIR) != params.end())
377 workDir = params[WORKDIR].str();
379 if(!preproCommand.empty() && !workDir.empty())
381 std::string subCommand = "cd " + workDir + "; " + preproCommand;
382 std::string command = _protocol.getExecCommand(subCommand, _hostname, _username);
388 int status = Utils::getCommandOutput(command, output);
391 throw RunTimeException("Error when executing: " + command +
392 "\nOutput:" + output);
396 const Batch::JobId BatchManager::runJob(const Batch::Job & job)
398 throw NotYetImplementedException("Method runJob not implemented by Batch Manager \"" + _type + "\"");