1 // Copyright (C) 2007-2012 CEA/DEN, EDF R&D, OPEN CASCADE
3 // Copyright (C) 2003-2007 OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 // CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
6 // This library is free software; you can redistribute it and/or
7 // modify it under the terms of the GNU Lesser General Public
8 // License as published by the Free Software Foundation; either
9 // version 2.1 of the License.
11 // This library is distributed in the hope that it will be useful,
12 // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 // Lesser General Public License for more details.
16 // You should have received a copy of the GNU Lesser General Public
17 // License along with this library; if not, write to the Free Software
18 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
20 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
25 * Auteur : Ivan DUTKA-MALEN - EDF R&D
26 * Date : Septembre 2003
41 #include "Constants.hxx"
44 #include "JobInfo.hxx"
45 #include "InvalidArgumentException.hxx"
46 #include "FactBatchManager.hxx"
47 #include "BatchManager.hxx"
51 #define sleep(seconds) Sleep((seconds)*1000)
58 BatchManager::BatchManager(const Batch::FactBatchManager * parent, const char* host,
59 const char * username,
60 CommunicationProtocolType protocolType, const char* mpiImpl)
61 : _hostname(host), jobid_map(), _parent(parent),
62 _protocol(CommunicationProtocol::getInstance(protocolType)),
63 _username(username), _mpiImpl(FactoryMpiImpl(mpiImpl))
69 BatchManager::~BatchManager()
74 string BatchManager::__repr__() const
77 oss << "<BatchManager of type '" << (_parent ? _parent->getType() : "unknown (no factory)") << "' connected to server '" << _hostname << "'>";
81 // Recupere le l'identifiant d'un job deja soumis au BatchManager
82 // const JobId BatchManager::getJobIdByReference(const string & ref)
84 // return JobId(this, ref);
86 const JobId BatchManager::getJobIdByReference(const char * ref)
88 return JobId(this, ref);
91 // // Methode pour le controle des jobs : soumet un job au gestionnaire
92 // const JobId BatchManager::submitJob(const Job & job)
94 // static int idx = 0;
95 // //MEDMEM::STRING sst;
97 // sst << "Jobid_" << idx++;
98 // JobId id(this, sst.str());
102 // // Methode pour le controle des jobs : retire un job du gestionnaire
103 // void BatchManager::deleteJob(const JobId & jobid)
108 // // Methode pour le controle des jobs : suspend un job en file d'attente
109 // void BatchManager::holdJob(const JobId & jobid)
114 // // Methode pour le controle des jobs : relache un job suspendu
115 // void BatchManager::releaseJob(const JobId & jobid)
120 // // Methode pour le controle des jobs : modifie un job en file d'attente
121 // void BatchManager::alterJob(const JobId & jobid, const Parametre & param, const Environnement & env)
126 // // Methode pour le controle des jobs : modifie un job en file d'attente
127 // void BatchManager::alterJob(const JobId & jobid, const Parametre & param)
132 // // Methode pour le controle des jobs : modifie un job en file d'attente
133 // void BatchManager::alterJob(const JobId & jobid, const Environnement & env)
138 // // Methode pour le controle des jobs : renvoie l'etat du job
139 // JobInfo BatchManager::queryJob(const JobId & jobid)
144 //! Wait for the end of a job
146 * This method is a simple way to wait for a job to end. It will query the job state at
147 * increasing intervals and return when the job is finished (whether successfully or not) or
148 * when the timeout is reached. This method is not intended to be generic. In many cases you
149 * will have to write your own loop to wait for the end of a job.
150 * \param jobid ID of the job to wait for.
151 * \param timeout Maximum time to wait in seconds. If -1 (default), wait indefinitely.
152 * \param initSleepTime Initial time in seconds between two queries for the job state (default is 1).
153 * \param maxSleepTime Maximum time in seconds between two queries for the job state (default is 600).
154 * \return The job state as returned by the last query.
156 string BatchManager::waitForJobEnd(const JobId & jobid, long timeout,
157 long initSleepTime, long maxSleepTime)
160 int sleeptime = initSleepTime;
161 bool testTimeout = (timeout > -1);
162 bool timeoutReached = (testTimeout && time >= timeout);
163 JobInfo jinfo = jobid.queryJob();
164 string state = jinfo.getParametre()[STATE].str();
165 cout << "State is \"" << state << "\"";
166 while (!timeoutReached && state != FINISHED && state != FAILED) {
167 cout << ", sleeping " << sleeptime << "s..." << endl;
170 timeoutReached = (testTimeout && time >= timeout);
172 if (testTimeout && sleeptime > timeout - time)
173 sleeptime = timeout - time;
174 if (sleeptime > maxSleepTime)
175 sleeptime = maxSleepTime;
176 jinfo = jobid.queryJob();
177 state = jinfo.getParametre()[STATE].str();
178 cout << "State is \"" << state << "\"";
185 void BatchManager::exportInputFiles(const Job& job)
188 Parametre params = job.getParametre();
189 const Versatile & V = params[INFILE];
190 Versatile::const_iterator Vit;
192 // Create remote directories
193 string logdir = string(params[WORKDIR]) + "/logs";
194 status = _protocol.makeDirectory(logdir, _hostname, _username);
196 std::ostringstream oss;
197 oss << "Cannot create directory " << logdir << " on host " << _hostname;
198 oss << ". Return status is " << status;
199 throw RunTimeException(oss.str());
202 // Copy the file to execute into the remote working directory
203 string executeFile = params[EXECUTABLE];
204 if (executeFile.size() != 0) {
205 status = _protocol.copyFile(executeFile, "", "",
206 params[WORKDIR], _hostname, _username);
208 std::ostringstream oss;
209 oss << "Cannot copy file " << executeFile << " on host " << _hostname;
210 oss << ". Return status is " << status;
211 throw RunTimeException(oss.str());
215 // On Windows, we make the remote file executable afterward because
216 // pscp does not preserve access permissions on files
218 string remoteExec = string(params[EXECUTABLE]);
219 remoteExec = remoteExec.substr(remoteExec.rfind("\\") + 1, remoteExec.length());
220 remoteExec = string(params[WORKDIR]) + "/" + executable;
222 string subCommand = string("chmod u+x ") + remoteExec;
223 string command = _protocol.getExecCommand(subCommand, _hostname, _username);
224 cerr << command.c_str() << endl;
225 status = system(command.c_str());
227 std::ostringstream oss;
228 oss << "Cannot change permissions of file " << remoteExec << " on host " << _hostname;
229 oss << ". Return status is " << status;
230 throw RunTimeException(oss.str());
235 // Copy input files into the remote working directory
236 for (Vit=V.begin() ; Vit!=V.end() ; Vit++) {
237 CoupleType cpt = *static_cast< CoupleType * >(*Vit);
238 Couple inputFile = cpt;
239 string remotePath = inputFile.getRemote();
240 if (!Utils::isAbsolutePath(remotePath)) {
241 remotePath = params[WORKDIR].str() + "/" + remotePath;
243 status = _protocol.copyFile(inputFile.getLocal(), "", "",
244 remotePath, _hostname, _username);
246 std::ostringstream oss;
247 oss << "Cannot copy file " << inputFile.getLocal() << " on host " << _hostname;
248 oss << ". Return status is " << status;
249 throw RunTimeException(oss.str());
255 void BatchManager::importOutputFiles( const Job & job, const string directory )
257 Parametre params = job.getParametre();
258 const Versatile & V = params[OUTFILE];
259 Versatile::const_iterator Vit;
261 // Create local result directory
262 int status = CommunicationProtocol::getInstance(SH).makeDirectory(directory, "", "");
264 string mess("Directory creation failed. Status is :");
265 ostringstream status_str;
266 status_str << status;
267 mess += status_str.str();
268 cerr << mess << endl;
271 for(Vit=V.begin(); Vit!=V.end(); Vit++) {
272 CoupleType cpt = *static_cast< CoupleType * >(*Vit);
273 Couple outputFile = cpt;
274 string remotePath = outputFile.getRemote();
275 if (!Utils::isAbsolutePath(remotePath)) {
276 remotePath = params[WORKDIR].str() + "/" + remotePath;
278 string localPath = outputFile.getLocal();
279 if (!Utils::isAbsolutePath(localPath)) {
280 localPath = directory + "/" + localPath;
282 status = _protocol.copyFile(remotePath, _hostname, _username,
285 // Try to get what we can (logs files)
286 // throw BatchException("Error of connection on remote host");
287 std::string mess("Copy command failed ! status is :");
288 ostringstream status_str;
289 status_str << status;
290 mess += status_str.str();
291 cerr << mess << endl;
296 status = _protocol.copyFile(string(params[WORKDIR]) + string("/logs"), _hostname, _username,
299 std::string mess("Copy logs directory failed ! status is :");
300 ostringstream status_str;
301 status_str << status;
302 mess += status_str.str();
303 cerr << mess << endl;
308 bool BatchManager::importDumpStateFile( const Job & job, const string directory )
310 Parametre params = job.getParametre();
312 // Create local result directory
313 int status = CommunicationProtocol::getInstance(SH).makeDirectory(directory, "", "");
315 string mess("Directory creation failed. Status is :");
316 ostringstream status_str;
317 status_str << status;
318 mess += status_str.str();
319 cerr << mess << endl;
323 status = _protocol.copyFile(string(params[WORKDIR]) + string("/dumpState*.xml"), _hostname, _username,
326 // Try to get what we can (logs files)
327 // throw BatchException("Error of connection on remote host");
328 std::string mess("Copy command failed ! status is :");
329 ostringstream status_str;
330 status_str << status;
331 mess += status_str.str();
332 cerr << mess << endl;
338 MpiImpl *BatchManager::FactoryMpiImpl(string mpiImpl)
341 return new MpiImpl_LAM();
342 else if(mpiImpl == "mpich1")
343 return new MpiImpl_MPICH1();
344 else if(mpiImpl == "mpich2")
345 return new MpiImpl_MPICH2();
346 else if(mpiImpl == "openmpi")
347 return new MpiImpl_OPENMPI();
348 else if(mpiImpl == "ompi")
349 return new MpiImpl_OMPI();
350 else if(mpiImpl == "slurm")
351 return new MpiImpl_SLURM();
352 else if(mpiImpl == "prun")
353 return new MpiImpl_PRUN();
354 else if(mpiImpl == "nompi")
358 oss << mpiImpl << " : not yet implemented";
359 throw RunTimeException(oss.str().c_str());
363 const CommunicationProtocol & BatchManager::getProtocol() const