1 // Copyright (C) 2007-2015 CEA/DEN, EDF R&D, OPEN CASCADE
3 // Copyright (C) 2003-2007 OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 // CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
6 // This library is free software; you can redistribute it and/or
7 // modify it under the terms of the GNU Lesser General Public
8 // License as published by the Free Software Foundation; either
9 // version 2.1 of the License, or (at your option) any later version.
11 // This library is distributed in the hope that it will be useful,
12 // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 // Lesser General Public License for more details.
16 // You should have received a copy of the GNU Lesser General Public
17 // License along with this library; if not, write to the Free Software
18 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
20 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
25 * Auteur : Ivan DUTKA-MALEN - EDF R&D
26 * Date : Septembre 2003
35 #include "Constants.hxx"
38 #include "JobInfo.hxx"
39 #include "FactBatchManager.hxx"
40 #include "BatchManager.hxx"
42 #include "NotYetImplementedException.hxx"
49 BatchManager::BatchManager(const Batch::FactBatchManager * parent, const char* host,
50 const char * username,
51 CommunicationProtocolType protocolType, const char* mpiImpl)
52 : _hostname(host), jobid_map(), _type(parent->getType()),
53 _protocol(CommunicationProtocol::getInstance(protocolType)),
54 _username(username), _mpiImpl(FactoryMpiImpl(mpiImpl))
60 BatchManager::~BatchManager()
65 string BatchManager::__repr__() const
68 oss << "<BatchManager of type '" << _type << "' connected to server '" << _hostname << "'>";
72 // Recupere le l'identifiant d'un job deja soumis au BatchManager
73 const JobId BatchManager::getJobIdByReference(const char * ref)
75 return JobId(this, ref);
78 // Methode pour le controle des jobs : soumet un job au gestionnaire
79 const JobId BatchManager::submitJob(const Job & job)
81 throw NotYetImplementedException("Method submitJob not implemented by Batch Manager \"" + _type + "\"");
84 // Methode pour le controle des jobs : retire un job du gestionnaire
85 void BatchManager::deleteJob(const JobId & jobid)
87 throw NotYetImplementedException("Method deleteJob not implemented by Batch Manager \"" + _type + "\"");
90 // Methode pour le controle des jobs : suspend un job en file d'attente
91 void BatchManager::holdJob(const JobId & jobid)
93 throw NotYetImplementedException("Method holdJob not implemented by Batch Manager \"" + _type + "\"");
96 // Methode pour le controle des jobs : relache un job suspendu
97 void BatchManager::releaseJob(const JobId & jobid)
99 throw NotYetImplementedException("Method releaseJob not implemented by Batch Manager \"" + _type + "\"");
102 // Methode pour le controle des jobs : modifie un job en file d'attente
103 void BatchManager::alterJob(const JobId & jobid, const Parametre & param, const Environnement & env)
105 throw NotYetImplementedException("Method alterJob not implemented by Batch Manager \"" + _type + "\"");
108 // Methode pour le controle des jobs : modifie un job en file d'attente
109 void BatchManager::alterJob(const JobId & jobid, const Parametre & param)
111 throw NotYetImplementedException("Method alterJob not implemented by Batch Manager \"" + _type + "\"");
114 // Methode pour le controle des jobs : modifie un job en file d'attente
115 void BatchManager::alterJob(const JobId & jobid, const Environnement & env)
117 throw NotYetImplementedException("Method alterJob not implemented by Batch Manager \"" + _type + "\"");
120 // Methode pour le controle des jobs : renvoie l'etat du job
121 JobInfo BatchManager::queryJob(const JobId & jobid)
123 throw NotYetImplementedException("Method queryJob not implemented by Batch Manager \"" + _type + "\"");
126 const JobId BatchManager::addJob(const Job & job, const string & reference)
128 return JobId(this, reference);
131 //! Wait for the end of a job
133 * This method is a simple way to wait for a job to end. It will query the job state at
134 * increasing intervals and return when the job is finished (whether successfully or not) or
135 * when the timeout is reached. This method is not intended to be generic. In many cases you
136 * will have to write your own loop to wait for the end of a job.
137 * \param jobid ID of the job to wait for.
138 * \param timeout Maximum time to wait in seconds. If -1 (default), wait indefinitely.
139 * \param initSleepTime Initial time in seconds between two queries for the job state (default is 1).
140 * \param maxSleepTime Maximum time in seconds between two queries for the job state (default is 600).
141 * \return The job state as returned by the last query.
143 string BatchManager::waitForJobEnd(const JobId & jobid, long timeout,
144 long initSleepTime, long maxSleepTime)
147 int sleeptime = initSleepTime;
148 bool testTimeout = (timeout > -1);
149 bool timeoutReached = (testTimeout && time >= timeout);
150 JobInfo jinfo = jobid.queryJob();
151 string state = jinfo.getParametre()[STATE].str();
152 while (!timeoutReached && state != FINISHED && state != FAILED) {
153 LOG("State is \"" << state << "\"" << ", sleeping " << sleeptime << "s...");
154 Utils::sleep(sleeptime);
156 timeoutReached = (testTimeout && time >= timeout);
158 if (testTimeout && sleeptime > timeout - time)
159 sleeptime = timeout - time;
160 if (sleeptime > maxSleepTime)
161 sleeptime = maxSleepTime;
162 jinfo = jobid.queryJob();
163 state = jinfo.getParametre()[STATE].str();
165 LOG("State is \"" << state << "\"");
170 void BatchManager::exportInputFiles(const Job& job)
173 Parametre params = job.getParametre();
174 const Versatile & V = params[INFILE];
175 Versatile::const_iterator Vit;
177 // Create remote directories
178 string logdir = string(params[WORKDIR]) + "/logs";
179 status = _protocol.makeDirectory(logdir, _hostname, _username);
181 std::ostringstream oss;
182 oss << "Cannot create directory " << logdir << " on host " << _hostname;
183 oss << ". Return status is " << status;
184 throw RunTimeException(oss.str());
187 // Copy the file to execute into the remote working directory
188 string executeFile = params[EXECUTABLE];
189 if (executeFile.size() != 0) {
190 status = _protocol.copyFile(executeFile, "", "",
191 params[WORKDIR], _hostname, _username);
193 std::ostringstream oss;
194 oss << "Cannot copy file " << executeFile << " on host " << _hostname;
195 oss << ". Return status is " << status;
196 throw RunTimeException(oss.str());
200 if (_protocol.getType() != SH) {
201 // On Windows, we make the remote file executable afterward because
202 // pscp does not preserve access permissions on files
204 string remoteExec = string(params[EXECUTABLE]);
205 remoteExec = remoteExec.substr(remoteExec.rfind("/") + 1, remoteExec.length());
206 remoteExec = string(params[WORKDIR]) + "/" + remoteExec;
208 string subCommand = string("chmod u+x ") + remoteExec;
209 string command = _protocol.getExecCommand(subCommand, _hostname, _username);
211 status = system(command.c_str());
213 std::ostringstream oss;
214 oss << "Cannot change permissions of file " << remoteExec << " on host " << _hostname;
215 oss << ". Return status is " << status;
216 throw RunTimeException(oss.str());
222 // Copy input files into the remote working directory
223 for (Vit=V.begin() ; Vit!=V.end() ; Vit++) {
224 CoupleType cpt = *static_cast< CoupleType * >(*Vit);
225 Couple inputFile = cpt;
226 string remotePath = inputFile.getRemote();
227 if (!Utils::isAbsolutePath(remotePath)) {
228 remotePath = params[WORKDIR].str() + "/" + remotePath;
230 status = _protocol.copyFile(inputFile.getLocal(), "", "",
231 remotePath, _hostname, _username);
233 std::ostringstream oss;
234 oss << "Cannot copy file " << inputFile.getLocal() << " on host " << _hostname;
235 oss << ". Return status is " << status;
236 throw RunTimeException(oss.str());
242 void BatchManager::importOutputFiles( const Job & job, const string directory )
244 Parametre params = job.getParametre();
245 const Versatile & V = params[OUTFILE];
246 Versatile::const_iterator Vit;
248 // Create local result directory
249 int status = CommunicationProtocol::getInstance(SH).makeDirectory(directory, "", "");
251 LOG("Directory creation failed. Status is: " << status);
253 for(Vit=V.begin(); Vit!=V.end(); Vit++) {
254 CoupleType cpt = *static_cast< CoupleType * >(*Vit);
255 Couple outputFile = cpt;
256 string remotePath = outputFile.getRemote();
257 if (!Utils::isAbsolutePath(remotePath)) {
258 remotePath = params[WORKDIR].str() + "/" + remotePath;
260 string localPath = outputFile.getLocal();
261 if (!Utils::isAbsolutePath(localPath)) {
262 localPath = directory + "/" + localPath;
264 status = _protocol.copyFile(remotePath, _hostname, _username,
267 LOG("Copy command failed. Status is: " << status);
271 status = _protocol.copyFile(string(params[WORKDIR]) + string("/logs"), _hostname, _username,
274 LOG("Copy logs directory failed. Status is: " << status);
277 bool BatchManager::importDumpStateFile( const Job & job, const string directory )
279 Parametre params = job.getParametre();
281 // Create local result directory
282 int status = CommunicationProtocol::getInstance(SH).makeDirectory(directory, "", "");
284 LOG("Directory creation failed. Status is: " << status);
287 status = _protocol.copyFile(string(params[WORKDIR]) + string("/dumpState*.xml"), _hostname, _username,
290 LOG("Copy command failed. Status is: " << status);
296 bool BatchManager::importWorkFile( const Job & job,
297 const std::string& work_file,
298 const std::string& directory )
300 Parametre params = job.getParametre();
302 // Create local result directory
303 int status = CommunicationProtocol::getInstance(SH).makeDirectory(directory, "", "");
305 LOG("Directory creation failed. Status is: " << status);
308 status = _protocol.copyFile(string(params[WORKDIR]) + "/" + work_file,
309 _hostname, _username,
312 LOG("Copy command failed. Status is: " << status);
318 void BatchManager::clearWorkingDir( const Job & job)
320 Parametre params = job.getParametre();
322 string wd = params[WORKDIR];
323 if(!wd.empty() && wd != "/")
325 int status = _protocol.removeDirectory(wd, _hostname, _username);
327 LOG("removeDirectory command failed. Status is: " << status);
330 LOG("removeDirectory command failed. Invalid working directory: " << wd);
333 MpiImpl *BatchManager::FactoryMpiImpl(string mpiImpl)
336 return new MpiImpl_LAM();
337 else if(mpiImpl == "mpich1")
338 return new MpiImpl_MPICH1();
339 else if(mpiImpl == "mpich2")
340 return new MpiImpl_MPICH2();
341 else if(mpiImpl == "openmpi")
342 return new MpiImpl_OPENMPI();
343 else if(mpiImpl == "ompi")
344 return new MpiImpl_OMPI();
345 else if(mpiImpl == "slurm")
346 return new MpiImpl_SLURM();
347 else if(mpiImpl == "prun")
348 return new MpiImpl_PRUN();
349 else if(mpiImpl == "nompi")
353 oss << mpiImpl << " : not yet implemented";
354 throw RunTimeException(oss.str().c_str());
358 const CommunicationProtocol & BatchManager::getProtocol() const