Salome HOME
4645ffbb554f0ae26ca24cb9c41f9c7f4849d009
[tools/libbatch.git] / src / Core / BatchManager.cxx
1 // Copyright (C) 2007-2015  CEA/DEN, EDF R&D, OPEN CASCADE
2 //
3 // Copyright (C) 2003-2007  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 // CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
5 //
6 // This library is free software; you can redistribute it and/or
7 // modify it under the terms of the GNU Lesser General Public
8 // License as published by the Free Software Foundation; either
9 // version 2.1 of the License, or (at your option) any later version.
10 //
11 // This library is distributed in the hope that it will be useful,
12 // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 // Lesser General Public License for more details.
15 //
16 // You should have received a copy of the GNU Lesser General Public
17 // License along with this library; if not, write to the Free Software
18 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
19 //
20 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 //
22 /*
23  * BatchManager.cxx :
24  *
25  * Auteur : Ivan DUTKA-MALEN - EDF R&D
26  * Date   : Septembre 2003
27  * Projet : SALOME 2
28  *
29  */
30
31 #include <iostream>
32 #include <sstream>
33 #include <string>
34
35 #include "Constants.hxx"
36 #include "Job.hxx"
37 #include "JobId.hxx"
38 #include "JobInfo.hxx"
39 #include "FactBatchManager.hxx"
40 #include "BatchManager.hxx"
41 #include "Utils.hxx"
42 #include "NotYetImplementedException.hxx"
43 #include "Log.hxx"
44
45 using namespace std;
46
47 namespace Batch {
48
49   BatchManager::BatchManager(const Batch::FactBatchManager * parent, const char* host,
50                              const char * username,
51                              CommunicationProtocolType protocolType, const char* mpiImpl)
52     : _hostname(host), jobid_map(), _type(parent->getType()),
53       _protocol(CommunicationProtocol::getInstance(protocolType)),
54       _username(username), _mpiImpl(FactoryMpiImpl(mpiImpl))
55   {
56   }
57
58
59   // Destructeur
60   BatchManager::~BatchManager()
61   {
62     delete _mpiImpl;
63   }
64
65   string BatchManager::__repr__() const
66   {
67     ostringstream oss;
68     oss << "<BatchManager of type '" << _type << "' connected to server '" << _hostname << "'>";
69     return oss.str();
70   }
71
72   // Recupere le l'identifiant d'un job deja soumis au BatchManager
73   const JobId BatchManager::getJobIdByReference(const char * ref)
74   {
75     return JobId(this, ref);
76   }
77
78   // Methode pour le controle des jobs : soumet un job au gestionnaire
79   const JobId BatchManager::submitJob(const Job & job)
80   {
81     throw NotYetImplementedException("Method submitJob not implemented by Batch Manager \"" + _type + "\"");
82   }
83
84   // Methode pour le controle des jobs : retire un job du gestionnaire
85   void BatchManager::deleteJob(const JobId & jobid)
86   {
87     throw NotYetImplementedException("Method deleteJob not implemented by Batch Manager \"" + _type + "\"");
88   }
89
90   // Methode pour le controle des jobs : suspend un job en file d'attente
91   void BatchManager::holdJob(const JobId & jobid)
92   {
93     throw NotYetImplementedException("Method holdJob not implemented by Batch Manager \"" + _type + "\"");
94   }
95
96   // Methode pour le controle des jobs : relache un job suspendu
97   void BatchManager::releaseJob(const JobId & jobid)
98   {
99     throw NotYetImplementedException("Method releaseJob not implemented by Batch Manager \"" + _type + "\"");
100   }
101
102   // Methode pour le controle des jobs : modifie un job en file d'attente
103   void BatchManager::alterJob(const JobId & jobid, const Parametre & param, const Environnement & env)
104   {
105     throw NotYetImplementedException("Method alterJob not implemented by Batch Manager \"" + _type + "\"");
106   }
107
108   // Methode pour le controle des jobs : modifie un job en file d'attente
109   void BatchManager::alterJob(const JobId & jobid, const Parametre & param)
110   {
111     throw NotYetImplementedException("Method alterJob not implemented by Batch Manager \"" + _type + "\"");
112   }
113
114   // Methode pour le controle des jobs : modifie un job en file d'attente
115   void BatchManager::alterJob(const JobId & jobid, const Environnement & env)
116   {
117     throw NotYetImplementedException("Method alterJob not implemented by Batch Manager \"" + _type + "\"");
118   }
119
120   // Methode pour le controle des jobs : renvoie l'etat du job
121   JobInfo BatchManager::queryJob(const JobId & jobid)
122   {
123     throw NotYetImplementedException("Method queryJob not implemented by Batch Manager \"" + _type + "\"");
124   }
125
126   const JobId BatchManager::addJob(const Job & job, const string & reference)
127   {
128     return JobId(this, reference);
129   }
130
131   //! Wait for the end of a job
132   /*!
133    *  This method is a simple way to wait for a job to end. It will query the job state at
134    *  increasing intervals and return when the job is finished (whether successfully or not) or
135    *  when the timeout is reached. This method is not intended to be generic. In many cases you
136    *  will have to write your own loop to wait for the end of a job.
137    *  \param jobid ID of the job to wait for.
138    *  \param timeout Maximum time to wait in seconds. If -1 (default), wait indefinitely.
139    *  \param initSleepTime Initial time in seconds between two queries for the job state (default is 1).
140    *  \param maxSleepTime Maximum time in seconds between two queries for the job state (default is 600).
141    *  \return The job state as returned by the last query.
142    */
143   string BatchManager::waitForJobEnd(const JobId & jobid, long timeout,
144                                      long initSleepTime, long maxSleepTime)
145   {
146     int time = 0;
147     int sleeptime = initSleepTime;
148     bool testTimeout = (timeout > -1);
149     bool timeoutReached = (testTimeout && time >= timeout);
150     JobInfo jinfo = jobid.queryJob();
151     string state = jinfo.getParametre()[STATE].str();
152     while (!timeoutReached && state != FINISHED && state != FAILED) {
153       LOG("State is \"" << state << "\"" << ", sleeping " << sleeptime << "s...");
154       Utils::sleep(sleeptime);
155       time += sleeptime;
156       timeoutReached = (testTimeout && time >= timeout);
157       sleeptime *= 2;
158       if (testTimeout && sleeptime > timeout - time)
159         sleeptime = timeout - time;
160       if (sleeptime > maxSleepTime)
161         sleeptime = maxSleepTime;
162       jinfo = jobid.queryJob();
163       state = jinfo.getParametre()[STATE].str();
164     }
165     LOG("State is \"" << state << "\"");
166     return state;
167   }
168
169
170   void BatchManager::exportInputFiles(const Job& job)
171   {
172     int status;
173     Parametre params = job.getParametre();
174     const Versatile & V = params[INFILE];
175     Versatile::const_iterator Vit;
176
177     // Create remote directories
178     string logdir = string(params[WORKDIR]) + "/logs";
179     status = _protocol.makeDirectory(logdir, _hostname, _username);
180     if (status) {
181       std::ostringstream oss;
182       oss << "Cannot create directory " << logdir << " on host " << _hostname;
183       oss << ". Return status is " << status;
184       throw RunTimeException(oss.str());
185     }
186
187     // Copy the file to execute into the remote working directory
188     string executeFile = params[EXECUTABLE];
189     if (executeFile.size() != 0) {
190       status = _protocol.copyFile(executeFile, "", "",
191                                   params[WORKDIR], _hostname, _username);
192       if (status) {
193         std::ostringstream oss;
194         oss << "Cannot copy file " << executeFile << " on host " << _hostname;
195         oss << ". Return status is " << status;
196         throw RunTimeException(oss.str());
197       }
198
199 #ifdef WIN32
200       if (_protocol.getType() != SH) {
201         // On Windows, we make the remote file executable afterward because
202         // pscp does not preserve access permissions on files
203
204         string remoteExec = string(params[EXECUTABLE]);
205         remoteExec = remoteExec.substr(remoteExec.rfind("/") + 1, remoteExec.length());
206         remoteExec = string(params[WORKDIR]) + "/" + remoteExec;
207
208         string subCommand = string("chmod u+x ") + remoteExec;
209         string command = _protocol.getExecCommand(subCommand, _hostname, _username);
210         LOG(command);
211         status = system(command.c_str());
212         if (status) {
213           std::ostringstream oss;
214           oss << "Cannot change permissions of file " << remoteExec << " on host " << _hostname;
215           oss << ". Return status is " << status;
216           throw RunTimeException(oss.str());
217         }
218       }
219 #endif
220     }
221
222     // Copy input files into the remote working directory
223     for (Vit=V.begin() ; Vit!=V.end() ; Vit++) {
224       CoupleType cpt = *static_cast< CoupleType * >(*Vit);
225       Couple inputFile = cpt;
226       string remotePath = inputFile.getRemote();
227       if (!Utils::isAbsolutePath(remotePath)) {
228         remotePath = params[WORKDIR].str() + "/" + remotePath;
229       }
230       status = _protocol.copyFile(inputFile.getLocal(), "", "",
231                                   remotePath, _hostname, _username);
232       if (status) {
233         std::ostringstream oss;
234         oss << "Cannot copy file " << inputFile.getLocal() << " on host " << _hostname;
235         oss << ". Return status is " << status;
236         throw RunTimeException(oss.str());
237       }
238     }
239
240   }
241
242   void BatchManager::importOutputFiles( const Job & job, const string directory )
243   {
244     Parametre params = job.getParametre();
245     const Versatile & V = params[OUTFILE];
246     Versatile::const_iterator Vit;
247
248     // Create local result directory
249     int status = CommunicationProtocol::getInstance(SH).makeDirectory(directory, "", "");
250     if (status)
251       LOG("Directory creation failed. Status is: " << status);
252
253     for(Vit=V.begin(); Vit!=V.end(); Vit++) {
254       CoupleType cpt  = *static_cast< CoupleType * >(*Vit);
255       Couple outputFile = cpt;
256       string remotePath = outputFile.getRemote();
257       if (!Utils::isAbsolutePath(remotePath)) {
258         remotePath = params[WORKDIR].str() + "/" + remotePath;
259       }
260       string localPath = outputFile.getLocal();
261       if (!Utils::isAbsolutePath(localPath)) {
262         localPath = directory + "/" + localPath;
263       }
264       status = _protocol.copyFile(remotePath, _hostname, _username,
265                                   localPath, "", "");
266       if (status)
267         LOG("Copy command failed. Status is: " << status);
268     }
269
270     // Copy logs
271     status = _protocol.copyFile(string(params[WORKDIR]) + string("/logs"), _hostname, _username,
272                                 directory, "", "");
273     if (status)
274       LOG("Copy logs directory failed. Status is: " << status);
275   }
276
277   bool BatchManager::importDumpStateFile( const Job & job, const string directory )
278   {
279     Parametre params = job.getParametre();
280
281     // Create local result directory
282     int status = CommunicationProtocol::getInstance(SH).makeDirectory(directory, "", "");
283     if (status)
284       LOG("Directory creation failed. Status is: " << status);
285
286     bool ret = true;
287     status = _protocol.copyFile(string(params[WORKDIR]) + string("/dumpState*.xml"), _hostname, _username,
288                                 directory, "", "");
289     if (status) {
290       LOG("Copy command failed. Status is: " << status);
291       ret = false;
292     }
293     return ret;
294   }
295
296   bool BatchManager::importWorkFile( const Job & job,
297                                      const std::string& work_file,
298                                      const std::string& directory )
299   {
300     Parametre params = job.getParametre();
301
302     // Create local result directory
303     int status = CommunicationProtocol::getInstance(SH).makeDirectory(directory, "", "");
304     if (status)
305       LOG("Directory creation failed. Status is: " << status);
306
307     bool ret = true;
308     status = _protocol.copyFile(string(params[WORKDIR]) + "/" + work_file,
309                                 _hostname, _username,
310                                 directory, "", "");
311     if (status) {
312       LOG("Copy command failed. Status is: " << status);
313       ret = false;
314     }
315     return ret;
316   }
317
318   void BatchManager::clearWorkingDir( const Job & job)
319   {
320     Parametre params = job.getParametre();
321
322     string wd = params[WORKDIR];
323     if(!wd.empty() && wd != "/")
324     {
325       int status = _protocol.removeDirectory(wd, _hostname, _username);
326       if (status)
327         LOG("removeDirectory command failed. Status is: " << status);
328     }
329     else
330       LOG("removeDirectory command failed. Invalid working directory: " << wd);
331   }
332
333   MpiImpl *BatchManager::FactoryMpiImpl(string mpiImpl)
334   {
335     if(mpiImpl == "lam")
336       return new MpiImpl_LAM();
337     else if(mpiImpl == "mpich1")
338       return new MpiImpl_MPICH1();
339     else if(mpiImpl == "mpich2")
340       return new MpiImpl_MPICH2();
341     else if(mpiImpl == "openmpi")
342       return new MpiImpl_OPENMPI();
343     else if(mpiImpl == "ompi")
344       return new MpiImpl_OMPI();
345     else if(mpiImpl == "slurm")
346       return new MpiImpl_SLURM();
347     else if(mpiImpl == "prun")
348       return new MpiImpl_PRUN();
349     else if(mpiImpl == "nompi")
350       return NULL;
351     else{
352       ostringstream oss;
353       oss << mpiImpl << " : not yet implemented";
354       throw RunTimeException(oss.str().c_str());
355     }
356   }
357
358   const CommunicationProtocol & BatchManager::getProtocol() const
359   {
360     return _protocol;
361   }
362
363 }