1 // Copyright (C) 2007-2012 CEA/DEN, EDF R&D, OPEN CASCADE
3 // Copyright (C) 2003-2007 OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 // CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
6 // This library is free software; you can redistribute it and/or
7 // modify it under the terms of the GNU Lesser General Public
8 // License as published by the Free Software Foundation; either
9 // version 2.1 of the License.
11 // This library is distributed in the hope that it will be useful,
12 // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 // Lesser General Public License for more details.
16 // You should have received a copy of the GNU Lesser General Public
17 // License along with this library; if not, write to the Free Software
18 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
20 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
25 * Auteur : Ivan DUTKA-MALEN - EDF R&D
26 * Date : Septembre 2003
41 #include "Batch_Constants.hxx"
42 #include "Batch_Job.hxx"
43 #include "Batch_JobId.hxx"
44 #include "Batch_JobInfo.hxx"
45 #include "Batch_InvalidArgumentException.hxx"
46 #include "Batch_FactBatchManager.hxx"
47 #include "Batch_BatchManager.hxx"
48 #include "Batch_Utils.hxx"
51 #define sleep(seconds) Sleep((seconds)*1000)
58 BatchManager::BatchManager(const Batch::FactBatchManager * parent, const char* host,
59 const char * username,
60 CommunicationProtocolType protocolType, const char* mpiImpl)
61 : _hostname(host), jobid_map(), _parent(parent),
62 _protocol(CommunicationProtocol::getInstance(protocolType)),
63 _username(username), _mpiImpl(FactoryMpiImpl(mpiImpl))
69 BatchManager::~BatchManager()
74 string BatchManager::__repr__() const
77 oss << "<BatchManager of type '" << (_parent ? _parent->getType() : "unknown (no factory)") << "' connected to server '" << _hostname << "'>";
81 // Recupere le l'identifiant d'un job deja soumis au BatchManager
82 // const JobId BatchManager::getJobIdByReference(const string & ref)
84 // return JobId(this, ref);
86 const JobId BatchManager::getJobIdByReference(const char * ref)
88 return JobId(this, ref);
91 // // Methode pour le controle des jobs : soumet un job au gestionnaire
92 // const JobId BatchManager::submitJob(const Job & job)
94 // static int idx = 0;
95 // //MEDMEM::STRING sst;
97 // sst << "Jobid_" << idx++;
98 // JobId id(this, sst.str());
102 // // Methode pour le controle des jobs : retire un job du gestionnaire
103 // void BatchManager::deleteJob(const JobId & jobid)
108 // // Methode pour le controle des jobs : suspend un job en file d'attente
109 // void BatchManager::holdJob(const JobId & jobid)
114 // // Methode pour le controle des jobs : relache un job suspendu
115 // void BatchManager::releaseJob(const JobId & jobid)
120 // // Methode pour le controle des jobs : modifie un job en file d'attente
121 // void BatchManager::alterJob(const JobId & jobid, const Parametre & param, const Environnement & env)
126 // // Methode pour le controle des jobs : modifie un job en file d'attente
127 // void BatchManager::alterJob(const JobId & jobid, const Parametre & param)
132 // // Methode pour le controle des jobs : modifie un job en file d'attente
133 // void BatchManager::alterJob(const JobId & jobid, const Environnement & env)
138 // // Methode pour le controle des jobs : renvoie l'etat du job
139 // JobInfo BatchManager::queryJob(const JobId & jobid)
144 //! Wait for the end of a job
146 * This method is a simple way to wait for a job to end. It will query the job state at
147 * increasing intervals and return when the job is finished (whether successfully or not) or
148 * when the timeout is reached. This method is not intended to be generic. In many cases you
149 * will have to write your own loop to wait for the end of a job.
150 * \param jobid ID of the job to wait for.
151 * \param timeout Maximum time to wait in seconds. If -1 (default), wait indefinitely.
152 * \param initSleepTime Initial time in seconds between two queries for the job state (default is 1).
153 * \param maxSleepTime Maximum time in seconds between two queries for the job state (default is 600).
154 * \return The job state as returned by the last query.
156 string BatchManager::waitForJobEnd(const JobId & jobid, long timeout,
157 long initSleepTime, long maxSleepTime)
160 int sleeptime = initSleepTime;
161 bool testTimeout = (timeout > -1);
162 bool timeoutReached = (testTimeout && time >= timeout);
163 JobInfo jinfo = jobid.queryJob();
164 string state = jinfo.getParametre()[STATE].str();
165 cout << "State is \"" << state << "\"";
166 while (!timeoutReached && state != FINISHED && state != FAILED) {
167 cout << ", sleeping " << sleeptime << "s..." << endl;
170 timeoutReached = (testTimeout && time >= timeout);
172 if (testTimeout && sleeptime > timeout - time)
173 sleeptime = timeout - time;
174 if (sleeptime > maxSleepTime)
175 sleeptime = maxSleepTime;
176 jinfo = jobid.queryJob();
177 state = jinfo.getParametre()[STATE].str();
178 cout << "State is \"" << state << "\"";
185 void BatchManager::exportInputFiles(const Job& job)
188 Parametre params = job.getParametre();
189 const Versatile & V = params[INFILE];
190 Versatile::const_iterator Vit;
192 // Create remote directories
193 string logdir = string(params[WORKDIR]) + "/logs";
194 status = _protocol.makeDirectory(logdir, _hostname, _username);
196 std::ostringstream oss;
197 oss << "Cannot create directory " << logdir << " on host " << _hostname;
198 oss << ". Return status is " << status;
199 throw RunTimeException(oss.str());
202 // Copy the file to execute into the remote working directory
203 string executeFile = params[EXECUTABLE];
204 if (executeFile.size() != 0) {
205 status = _protocol.copyFile(executeFile, "", "",
206 params[WORKDIR], _hostname, _username);
208 std::ostringstream oss;
209 oss << "Cannot copy file " << executeFile << " on host " << _hostname;
210 oss << ". Return status is " << status;
211 throw RunTimeException(oss.str());
215 // On Windows, we make the remote file executable afterward because
216 // pscp does not preserve access permissions on files
218 string remoteExec = string(params[EXECUTABLE]);
219 remoteExec = remoteExec.substr(remoteExec.rfind("\\") + 1, remoteExec.length());
220 remoteExec = string(params[WORKDIR]) + "/" + executable;
222 string subCommand = string("chmod u+x ") + remoteExec;
223 string command = _protocol.getExecCommand(subCommand, _hostname, _username);
224 cerr << command.c_str() << endl;
225 status = system(command.c_str());
227 std::ostringstream oss;
228 oss << "Cannot change permissions of file " << remoteExec << " on host " << _hostname;
229 oss << ". Return status is " << status;
230 throw RunTimeException(oss.str());
235 // Copy input files into the remote working directory
236 for (Vit=V.begin() ; Vit!=V.end() ; Vit++) {
237 CoupleType cpt = *static_cast< CoupleType * >(*Vit);
238 Couple inputFile = cpt;
239 status = _protocol.copyFile(inputFile.getLocal(), "", "",
240 inputFile.getRemote(), _hostname, _username);
242 std::ostringstream oss;
243 oss << "Cannot copy file " << inputFile.getLocal() << " on host " << _hostname;
244 oss << ". Return status is " << status;
245 throw RunTimeException(oss.str());
251 void BatchManager::importOutputFiles( const Job & job, const string directory )
253 Parametre params = job.getParametre();
254 const Versatile & V = params[OUTFILE];
255 Versatile::const_iterator Vit;
257 // Create local result directory
258 int status = CommunicationProtocol::getInstance(SH).makeDirectory(directory, "", "");
260 string mess("Directory creation failed. Status is :");
261 ostringstream status_str;
262 status_str << status;
263 mess += status_str.str();
264 cerr << mess << endl;
267 for(Vit=V.begin(); Vit!=V.end(); Vit++) {
268 CoupleType cpt = *static_cast< CoupleType * >(*Vit);
269 Couple outputFile = cpt;
270 string localPath = outputFile.getLocal();
271 if (!Utils::isAbsolutePath(localPath)) {
272 localPath = directory + "/" + localPath;
274 status = _protocol.copyFile(outputFile.getRemote(), _hostname, _username,
277 // Try to get what we can (logs files)
278 // throw BatchException("Error of connection on remote host");
279 std::string mess("Copy command failed ! status is :");
280 ostringstream status_str;
281 status_str << status;
282 mess += status_str.str();
283 cerr << mess << endl;
288 status = _protocol.copyFile(string(params[WORKDIR]) + string("/logs"), _hostname, _username,
291 std::string mess("Copy logs directory failed ! status is :");
292 ostringstream status_str;
293 status_str << status;
294 mess += status_str.str();
295 cerr << mess << endl;
300 bool BatchManager::importDumpStateFile( const Job & job, const string directory )
302 Parametre params = job.getParametre();
304 // Create local result directory
305 int status = CommunicationProtocol::getInstance(SH).makeDirectory(directory, "", "");
307 string mess("Directory creation failed. Status is :");
308 ostringstream status_str;
309 status_str << status;
310 mess += status_str.str();
311 cerr << mess << endl;
315 status = _protocol.copyFile(string(params[WORKDIR]) + string("/dumpState*.xml"), _hostname, _username,
318 // Try to get what we can (logs files)
319 // throw BatchException("Error of connection on remote host");
320 std::string mess("Copy command failed ! status is :");
321 ostringstream status_str;
322 status_str << status;
323 mess += status_str.str();
324 cerr << mess << endl;
330 MpiImpl *BatchManager::FactoryMpiImpl(string mpiImpl)
333 return new MpiImpl_LAM();
334 else if(mpiImpl == "mpich1")
335 return new MpiImpl_MPICH1();
336 else if(mpiImpl == "mpich2")
337 return new MpiImpl_MPICH2();
338 else if(mpiImpl == "openmpi")
339 return new MpiImpl_OPENMPI();
340 else if(mpiImpl == "ompi")
341 return new MpiImpl_OMPI();
342 else if(mpiImpl == "slurm")
343 return new MpiImpl_SLURM();
344 else if(mpiImpl == "prun")
345 return new MpiImpl_PRUN();
346 else if(mpiImpl == "nompi")
350 oss << mpiImpl << " : not yet implemented";
351 throw RunTimeException(oss.str().c_str());
355 const CommunicationProtocol & BatchManager::getProtocol() const