Salome HOME
Merge from BR_LIBBATCH_2_0
[tools/libbatch.git] / src / Core / BatchManager.cxx
1 //  Copyright (C) 2007-2012  CEA/DEN, EDF R&D, OPEN CASCADE
2 //
3 //  Copyright (C) 2003-2007  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 //  CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
5 //
6 //  This library is free software; you can redistribute it and/or
7 //  modify it under the terms of the GNU Lesser General Public
8 //  License as published by the Free Software Foundation; either
9 //  version 2.1 of the License.
10 //
11 //  This library is distributed in the hope that it will be useful,
12 //  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 //  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 //  Lesser General Public License for more details.
15 //
16 //  You should have received a copy of the GNU Lesser General Public
17 //  License along with this library; if not, write to the Free Software
18 //  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
19 //
20 //  See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 //
22 /*
23  * BatchManager.cxx :
24  *
25  * Auteur : Ivan DUTKA-MALEN - EDF R&D
26  * Date   : Septembre 2003
27  * Projet : SALOME 2
28  *
29  */
30
31 #include <iostream>
32 #include <sstream>
33 #include <string>
34
35 #include "Constants.hxx"
36 #include "Job.hxx"
37 #include "JobId.hxx"
38 #include "JobInfo.hxx"
39 #include "FactBatchManager.hxx"
40 #include "BatchManager.hxx"
41 #include "Utils.hxx"
42 #include "NotYetImplementedException.hxx"
43 #include "Log.hxx"
44
45 using namespace std;
46
47 namespace Batch {
48
49   BatchManager::BatchManager(const Batch::FactBatchManager * parent, const char* host,
50                              const char * username,
51                              CommunicationProtocolType protocolType, const char* mpiImpl)
52     : _hostname(host), jobid_map(), _type(parent->getType()),
53       _protocol(CommunicationProtocol::getInstance(protocolType)),
54       _username(username), _mpiImpl(FactoryMpiImpl(mpiImpl))
55   {
56   }
57
58
59   // Destructeur
60   BatchManager::~BatchManager()
61   {
62     delete _mpiImpl;
63   }
64
65   string BatchManager::__repr__() const
66   {
67     ostringstream oss;
68     oss << "<BatchManager of type '" << _type << "' connected to server '" << _hostname << "'>";
69     return oss.str();
70   }
71
72   // Recupere le l'identifiant d'un job deja soumis au BatchManager
73   const JobId BatchManager::getJobIdByReference(const char * ref)
74   {
75     return JobId(this, ref);
76   }
77
78   // Methode pour le controle des jobs : soumet un job au gestionnaire
79   const JobId BatchManager::submitJob(const Job & job)
80   {
81     throw NotYetImplementedException("Method submitJob not implemented by Batch Manager \"" + _type + "\"");
82   }
83
84   // Methode pour le controle des jobs : retire un job du gestionnaire
85   void BatchManager::deleteJob(const JobId & jobid)
86   {
87     throw NotYetImplementedException("Method deleteJob not implemented by Batch Manager \"" + _type + "\"");
88   }
89
90   // Methode pour le controle des jobs : suspend un job en file d'attente
91   void BatchManager::holdJob(const JobId & jobid)
92   {
93     throw NotYetImplementedException("Method holdJob not implemented by Batch Manager \"" + _type + "\"");
94   }
95
96   // Methode pour le controle des jobs : relache un job suspendu
97   void BatchManager::releaseJob(const JobId & jobid)
98   {
99     throw NotYetImplementedException("Method releaseJob not implemented by Batch Manager \"" + _type + "\"");
100   }
101
102   // Methode pour le controle des jobs : modifie un job en file d'attente
103   void BatchManager::alterJob(const JobId & jobid, const Parametre & param, const Environnement & env)
104   {
105     throw NotYetImplementedException("Method alterJob not implemented by Batch Manager \"" + _type + "\"");
106   }
107
108   // Methode pour le controle des jobs : modifie un job en file d'attente
109   void BatchManager::alterJob(const JobId & jobid, const Parametre & param)
110   {
111     throw NotYetImplementedException("Method alterJob not implemented by Batch Manager \"" + _type + "\"");
112   }
113
114   // Methode pour le controle des jobs : modifie un job en file d'attente
115   void BatchManager::alterJob(const JobId & jobid, const Environnement & env)
116   {
117     throw NotYetImplementedException("Method alterJob not implemented by Batch Manager \"" + _type + "\"");
118   }
119
120   // Methode pour le controle des jobs : renvoie l'etat du job
121   JobInfo BatchManager::queryJob(const JobId & jobid)
122   {
123     throw NotYetImplementedException("Method queryJob not implemented by Batch Manager \"" + _type + "\"");
124   }
125
126   const JobId BatchManager::addJob(const Job & job, const string & reference)
127   {
128     return JobId(this, reference);
129   }
130
131   //! Wait for the end of a job
132   /*!
133    *  This method is a simple way to wait for a job to end. It will query the job state at
134    *  increasing intervals and return when the job is finished (whether successfully or not) or
135    *  when the timeout is reached. This method is not intended to be generic. In many cases you
136    *  will have to write your own loop to wait for the end of a job.
137    *  \param jobid ID of the job to wait for.
138    *  \param timeout Maximum time to wait in seconds. If -1 (default), wait indefinitely.
139    *  \param initSleepTime Initial time in seconds between two queries for the job state (default is 1).
140    *  \param maxSleepTime Maximum time in seconds between two queries for the job state (default is 600).
141    *  \return The job state as returned by the last query.
142    */
143   string BatchManager::waitForJobEnd(const JobId & jobid, long timeout,
144                                      long initSleepTime, long maxSleepTime)
145   {
146     int time = 0;
147     int sleeptime = initSleepTime;
148     bool testTimeout = (timeout > -1);
149     bool timeoutReached = (testTimeout && time >= timeout);
150     JobInfo jinfo = jobid.queryJob();
151     string state = jinfo.getParametre()[STATE].str();
152     while (!timeoutReached && state != FINISHED && state != FAILED) {
153       LOG("State is \"" << state << "\"" << ", sleeping " << sleeptime << "s...");
154       Utils::sleep(sleeptime);
155       time += sleeptime;
156       timeoutReached = (testTimeout && time >= timeout);
157       sleeptime *= 2;
158       if (testTimeout && sleeptime > timeout - time)
159         sleeptime = timeout - time;
160       if (sleeptime > maxSleepTime)
161         sleeptime = maxSleepTime;
162       jinfo = jobid.queryJob();
163       state = jinfo.getParametre()[STATE].str();
164     }
165     LOG("State is \"" << state << "\"");
166     return state;
167   }
168
169
170   void BatchManager::exportInputFiles(const Job& job)
171   {
172     int status;
173     Parametre params = job.getParametre();
174     const Versatile & V = params[INFILE];
175     Versatile::const_iterator Vit;
176
177     // Create remote directories
178     string logdir = string(params[WORKDIR]) + "/logs";
179     status = _protocol.makeDirectory(logdir, _hostname, _username);
180     if (status) {
181       std::ostringstream oss;
182       oss << "Cannot create directory " << logdir << " on host " << _hostname;
183       oss << ". Return status is " << status;
184       throw RunTimeException(oss.str());
185     }
186
187     // Copy the file to execute into the remote working directory
188     string executeFile = params[EXECUTABLE];
189     if (executeFile.size() != 0) {
190       status = _protocol.copyFile(executeFile, "", "",
191                                   params[WORKDIR], _hostname, _username);
192       if (status) {
193         std::ostringstream oss;
194         oss << "Cannot copy file " << executeFile << " on host " << _hostname;
195         oss << ". Return status is " << status;
196         throw RunTimeException(oss.str());
197       }
198
199 #ifdef WIN32
200       if (_protocol.getType() != SH) {
201         // On Windows, we make the remote file executable afterward because
202         // pscp does not preserve access permissions on files
203
204         string remoteExec = string(params[EXECUTABLE]);
205         remoteExec = remoteExec.substr(remoteExec.rfind("/") + 1, remoteExec.length());
206         remoteExec = string(params[WORKDIR]) + "/" + remoteExec;
207
208         string subCommand = string("chmod u+x ") + remoteExec;
209         string command = _protocol.getExecCommand(subCommand, _hostname, _username);
210         LOG(command);
211         status = system(command.c_str());
212         if (status) {
213           std::ostringstream oss;
214           oss << "Cannot change permissions of file " << remoteExec << " on host " << _hostname;
215           oss << ". Return status is " << status;
216           throw RunTimeException(oss.str());
217         }
218       }
219 #endif
220     }
221
222     // Copy input files into the remote working directory
223     for (Vit=V.begin() ; Vit!=V.end() ; Vit++) {
224       CoupleType cpt = *static_cast< CoupleType * >(*Vit);
225       Couple inputFile = cpt;
226       string remotePath = inputFile.getRemote();
227       if (!Utils::isAbsolutePath(remotePath)) {
228         remotePath = params[WORKDIR].str() + "/" + remotePath;
229       }
230       status = _protocol.copyFile(inputFile.getLocal(), "", "",
231                                   remotePath, _hostname, _username);
232       if (status) {
233         std::ostringstream oss;
234         oss << "Cannot copy file " << inputFile.getLocal() << " on host " << _hostname;
235         oss << ". Return status is " << status;
236         throw RunTimeException(oss.str());
237       }
238     }
239
240   }
241
242   void BatchManager::importOutputFiles( const Job & job, const string directory )
243   {
244     Parametre params = job.getParametre();
245     const Versatile & V = params[OUTFILE];
246     Versatile::const_iterator Vit;
247
248     // Create local result directory
249     int status = CommunicationProtocol::getInstance(SH).makeDirectory(directory, "", "");
250     if (status)
251       LOG("Directory creation failed. Status is: " << status);
252
253     for(Vit=V.begin(); Vit!=V.end(); Vit++) {
254       CoupleType cpt  = *static_cast< CoupleType * >(*Vit);
255       Couple outputFile = cpt;
256       string remotePath = outputFile.getRemote();
257       if (!Utils::isAbsolutePath(remotePath)) {
258         remotePath = params[WORKDIR].str() + "/" + remotePath;
259       }
260       string localPath = outputFile.getLocal();
261       if (!Utils::isAbsolutePath(localPath)) {
262         localPath = directory + "/" + localPath;
263       }
264       status = _protocol.copyFile(remotePath, _hostname, _username,
265                                   localPath, "", "");
266       if (status)
267         LOG("Copy command failed. Status is: " << status);
268     }
269
270     // Copy logs
271     status = _protocol.copyFile(string(params[WORKDIR]) + string("/logs"), _hostname, _username,
272                                 directory, "", "");
273     if (status)
274       LOG("Copy logs directory failed. Status is: " << status);
275   }
276
277   bool BatchManager::importDumpStateFile( const Job & job, const string directory )
278   {
279     Parametre params = job.getParametre();
280
281     // Create local result directory
282     int status = CommunicationProtocol::getInstance(SH).makeDirectory(directory, "", "");
283     if (status)
284       LOG("Directory creation failed. Status is: " << status);
285
286     bool ret = true;
287     status = _protocol.copyFile(string(params[WORKDIR]) + string("/dumpState*.xml"), _hostname, _username,
288                                 directory, "", "");
289     if (status) {
290       LOG("Copy command failed. Status is: " << status);
291       ret = false;
292     }
293     return ret;
294   }
295
296   MpiImpl *BatchManager::FactoryMpiImpl(string mpiImpl)
297   {
298     if(mpiImpl == "lam")
299       return new MpiImpl_LAM();
300     else if(mpiImpl == "mpich1")
301       return new MpiImpl_MPICH1();
302     else if(mpiImpl == "mpich2")
303       return new MpiImpl_MPICH2();
304     else if(mpiImpl == "openmpi")
305       return new MpiImpl_OPENMPI();
306     else if(mpiImpl == "ompi")
307       return new MpiImpl_OMPI();
308     else if(mpiImpl == "slurm")
309       return new MpiImpl_SLURM();
310     else if(mpiImpl == "prun")
311       return new MpiImpl_PRUN();
312     else if(mpiImpl == "nompi")
313       return NULL;
314     else{
315       ostringstream oss;
316       oss << mpiImpl << " : not yet implemented";
317       throw RunTimeException(oss.str().c_str());
318     }
319   }
320
321   const CommunicationProtocol & BatchManager::getProtocol() const
322   {
323     return _protocol;
324   }
325
326 }