Salome HOME
Merge class BatchManager_eClient into class BatchManager
[tools/libbatch.git] / src / Core / Batch_BatchManager.cxx
1 //  Copyright (C) 2007-2012  CEA/DEN, EDF R&D, OPEN CASCADE
2 //
3 //  Copyright (C) 2003-2007  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 //  CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
5 //
6 //  This library is free software; you can redistribute it and/or
7 //  modify it under the terms of the GNU Lesser General Public
8 //  License as published by the Free Software Foundation; either
9 //  version 2.1 of the License.
10 //
11 //  This library is distributed in the hope that it will be useful,
12 //  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 //  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 //  Lesser General Public License for more details.
15 //
16 //  You should have received a copy of the GNU Lesser General Public
17 //  License along with this library; if not, write to the Free Software
18 //  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
19 //
20 //  See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 //
22 /*
23  * BatchManager.cxx :
24  *
25  * Auteur : Ivan DUTKA-MALEN - EDF R&D
26  * Date   : Septembre 2003
27  * Projet : SALOME 2
28  *
29  */
30
31 #include <iostream>
32 #include <sstream>
33 #include <string>
34 #ifdef WIN32
35 # include<winsock2.h>
36 #else
37 #include <unistd.h>
38 # include <netdb.h>
39 #endif
40
41 #include "Batch_Constants.hxx"
42 #include "Batch_Job.hxx"
43 #include "Batch_JobId.hxx"
44 #include "Batch_JobInfo.hxx"
45 #include "Batch_InvalidArgumentException.hxx"
46 #include "Batch_FactBatchManager.hxx"
47 #include "Batch_BatchManager.hxx"
48 #include "Batch_Utils.hxx"
49
50 #ifdef WIN32
51 #define sleep(seconds) Sleep((seconds)*1000)
52 #endif
53
54 using namespace std;
55
56 namespace Batch {
57
58   BatchManager::BatchManager(const Batch::FactBatchManager * parent, const char* host,
59                              const char * username,
60                              CommunicationProtocolType protocolType, const char* mpiImpl)
61     : _hostname(host), jobid_map(), _parent(parent),
62       _protocol(CommunicationProtocol::getInstance(protocolType)),
63       _username(username), _mpiImpl(FactoryMpiImpl(mpiImpl))
64   {
65   }
66
67
68   // Destructeur
69   BatchManager::~BatchManager()
70   {
71     delete _mpiImpl;
72   }
73
74   string BatchManager::__repr__() const
75   {
76     ostringstream oss;
77     oss << "<BatchManager of type '" << (_parent ? _parent->getType() : "unknown (no factory)") << "' connected to server '" << _hostname << "'>";
78     return oss.str();
79   }
80
81   // Recupere le l'identifiant d'un job deja soumis au BatchManager
82 //   const JobId BatchManager::getJobIdByReference(const string & ref)
83 //   {
84 //     return JobId(this, ref);
85 //   }
86   const JobId BatchManager::getJobIdByReference(const char * ref)
87   {
88     return JobId(this, ref);
89   }
90
91 //   // Methode pour le controle des jobs : soumet un job au gestionnaire
92 //   const JobId BatchManager::submitJob(const Job & job)
93 //   {
94 //     static int idx = 0;
95 //     //MEDMEM::STRING sst;
96 //     ostringstream sst;
97 //     sst << "Jobid_" << idx++;
98 //     JobId id(this, sst.str());
99 //     return id;
100 //   }
101
102 //   // Methode pour le controle des jobs : retire un job du gestionnaire
103 //   void BatchManager::deleteJob(const JobId & jobid)
104 //   {
105 //     // Nothing to do
106 //   }
107
108 //   // Methode pour le controle des jobs : suspend un job en file d'attente
109 //   void BatchManager::holdJob(const JobId & jobid)
110 //   {
111 //     // Nothing to do
112 //   }
113
114 //   // Methode pour le controle des jobs : relache un job suspendu
115 //   void BatchManager::releaseJob(const JobId & jobid)
116 //   {
117 //     // Nothing to do
118 //   }
119
120 //   // Methode pour le controle des jobs : modifie un job en file d'attente
121 //   void BatchManager::alterJob(const JobId & jobid, const Parametre & param, const Environnement & env)
122 //   {
123 //     // Nothing to do
124 //   }
125
126 //   // Methode pour le controle des jobs : modifie un job en file d'attente
127 //   void BatchManager::alterJob(const JobId & jobid, const Parametre & param)
128 //   {
129 //     // Nothing to do
130 //   }
131
132 //   // Methode pour le controle des jobs : modifie un job en file d'attente
133 //   void BatchManager::alterJob(const JobId & jobid, const Environnement & env)
134 //   {
135 //     // Nothing to do
136 //   }
137
138 //   // Methode pour le controle des jobs : renvoie l'etat du job
139 //   JobInfo BatchManager::queryJob(const JobId & jobid)
140 //   {
141 //     return JobInfo();
142 //   }
143
144   //! Wait for the end of a job
145   /*!
146    *  This method is a simple way to wait for a job to end. It will query the job state at
147    *  increasing intervals and return when the job is finished (whether successfully or not) or
148    *  when the timeout is reached. This method is not intended to be generic. In many cases you
149    *  will have to write your own loop to wait for the end of a job.
150    *  \param jobid ID of the job to wait for.
151    *  \param timeout Maximum time to wait in seconds. If -1 (default), wait indefinitely.
152    *  \param initSleepTime Initial time in seconds between two queries for the job state (default is 1).
153    *  \param maxSleepTime Maximum time in seconds between two queries for the job state (default is 600).
154    *  \return The job state as returned by the last query.
155    */
156   string BatchManager::waitForJobEnd(const JobId & jobid, long timeout,
157                                      long initSleepTime, long maxSleepTime)
158   {
159     int time = 0;
160     int sleeptime = initSleepTime;
161     bool testTimeout = (timeout > -1);
162     bool timeoutReached = (testTimeout && time >= timeout);
163     JobInfo jinfo = jobid.queryJob();
164     string state = jinfo.getParametre()[STATE].str();
165     cout << "State is \"" << state << "\"";
166     while (!timeoutReached && state != FINISHED && state != FAILED) {
167       cout << ", sleeping " << sleeptime << "s..." << endl;
168       sleep(sleeptime);
169       time += sleeptime;
170       timeoutReached = (testTimeout && time >= timeout);
171       sleeptime *= 2;
172       if (testTimeout && sleeptime > timeout - time)
173         sleeptime = timeout - time;
174       if (sleeptime > maxSleepTime)
175         sleeptime = maxSleepTime;
176       jinfo = jobid.queryJob();
177       state = jinfo.getParametre()[STATE].str();
178       cout << "State is \"" << state << "\"";
179     }
180     cout << endl;
181     return state;
182   }
183
184
185   void BatchManager::exportInputFiles(const Job& job)
186   {
187     int status;
188     Parametre params = job.getParametre();
189     const Versatile & V = params[INFILE];
190     Versatile::const_iterator Vit;
191
192     // Create remote directories
193     string logdir = string(params[WORKDIR]) + "/logs";
194     status = _protocol.makeDirectory(logdir, _hostname, _username);
195     if (status) {
196       std::ostringstream oss;
197       oss << "Cannot create directory " << logdir << " on host " << _hostname;
198       oss << ". Return status is " << status;
199       throw RunTimeException(oss.str());
200     }
201
202     // Copy the file to execute into the remote working directory
203     string executeFile = params[EXECUTABLE];
204     if (executeFile.size() != 0) {
205       status = _protocol.copyFile(executeFile, "", "",
206                                   params[WORKDIR], _hostname, _username);
207       if (status) {
208         std::ostringstream oss;
209         oss << "Cannot copy file " << executeFile << " on host " << _hostname;
210         oss << ". Return status is " << status;
211         throw RunTimeException(oss.str());
212       }
213
214 #ifdef WIN32
215       // On Windows, we make the remote file executable afterward because
216       // pscp does not preserve access permissions on files
217
218       string remoteExec = string(params[EXECUTABLE]);
219       remoteExec = remoteExec.substr(remoteExec.rfind("\\") + 1, remoteExec.length());
220       remoteExec = string(params[WORKDIR]) + "/" + executable;
221
222       string subCommand = string("chmod u+x ") + remoteExec;
223       string command = _protocol.getExecCommand(subCommand, _hostname, _username);
224       cerr << command.c_str() << endl;
225       status = system(command.c_str());
226       if (status) {
227         std::ostringstream oss;
228         oss << "Cannot change permissions of file " << remoteExec << " on host " << _hostname;
229         oss << ". Return status is " << status;
230         throw RunTimeException(oss.str());
231       }
232 #endif
233     }
234
235     // Copy input files into the remote working directory
236     for (Vit=V.begin() ; Vit!=V.end() ; Vit++) {
237       CoupleType cpt = *static_cast< CoupleType * >(*Vit);
238       Couple inputFile = cpt;
239       status = _protocol.copyFile(inputFile.getLocal(), "", "",
240                                   inputFile.getRemote(), _hostname, _username);
241       if (status) {
242         std::ostringstream oss;
243         oss << "Cannot copy file " << inputFile.getLocal() << " on host " << _hostname;
244         oss << ". Return status is " << status;
245         throw RunTimeException(oss.str());
246       }
247     }
248
249   }
250
251   void BatchManager::importOutputFiles( const Job & job, const string directory )
252   {
253     Parametre params = job.getParametre();
254     const Versatile & V = params[OUTFILE];
255     Versatile::const_iterator Vit;
256
257     // Create local result directory
258     int status = CommunicationProtocol::getInstance(SH).makeDirectory(directory, "", "");
259     if (status) {
260       string mess("Directory creation failed. Status is :");
261       ostringstream status_str;
262       status_str << status;
263       mess += status_str.str();
264       cerr << mess << endl;
265     }
266
267     for(Vit=V.begin(); Vit!=V.end(); Vit++) {
268       CoupleType cpt  = *static_cast< CoupleType * >(*Vit);
269       Couple outputFile = cpt;
270       string localPath = outputFile.getLocal();
271       if (!Utils::isAbsolutePath(localPath)) {
272         localPath = directory + "/" + localPath;
273       }
274       status = _protocol.copyFile(outputFile.getRemote(), _hostname, _username,
275                                   localPath, "", "");
276       if (status) {
277         // Try to get what we can (logs files)
278         // throw BatchException("Error of connection on remote host");
279         std::string mess("Copy command failed ! status is :");
280         ostringstream status_str;
281         status_str << status;
282         mess += status_str.str();
283         cerr << mess << endl;
284       }
285     }
286
287     // Copy logs
288     status = _protocol.copyFile(string(params[WORKDIR]) + string("/logs"), _hostname, _username,
289                                 directory, "", "");
290     if (status) {
291       std::string mess("Copy logs directory failed ! status is :");
292       ostringstream status_str;
293       status_str << status;
294       mess += status_str.str();
295       cerr << mess << endl;
296     }
297
298   }
299
300   bool BatchManager::importDumpStateFile( const Job & job, const string directory )
301   {
302     Parametre params = job.getParametre();
303
304     // Create local result directory
305     int status = CommunicationProtocol::getInstance(SH).makeDirectory(directory, "", "");
306     if (status) {
307       string mess("Directory creation failed. Status is :");
308       ostringstream status_str;
309       status_str << status;
310       mess += status_str.str();
311       cerr << mess << endl;
312     }
313
314     bool ret = true;
315     status = _protocol.copyFile(string(params[WORKDIR]) + string("/dumpState*.xml"), _hostname, _username,
316                                 directory, "", "");
317     if (status) {
318       // Try to get what we can (logs files)
319       // throw BatchException("Error of connection on remote host");
320       std::string mess("Copy command failed ! status is :");
321       ostringstream status_str;
322       status_str << status;
323       mess += status_str.str();
324       cerr << mess << endl;
325       ret = false;
326     }
327     return ret;
328   }
329
330   MpiImpl *BatchManager::FactoryMpiImpl(string mpiImpl)
331   {
332     if(mpiImpl == "lam")
333       return new MpiImpl_LAM();
334     else if(mpiImpl == "mpich1")
335       return new MpiImpl_MPICH1();
336     else if(mpiImpl == "mpich2")
337       return new MpiImpl_MPICH2();
338     else if(mpiImpl == "openmpi")
339       return new MpiImpl_OPENMPI();
340     else if(mpiImpl == "ompi")
341       return new MpiImpl_OMPI();
342     else if(mpiImpl == "slurm")
343       return new MpiImpl_SLURM();
344     else if(mpiImpl == "prun")
345       return new MpiImpl_PRUN();
346     else if(mpiImpl == "nompi")
347       return NULL;
348     else{
349       ostringstream oss;
350       oss << mpiImpl << " : not yet implemented";
351       throw RunTimeException(oss.str().c_str());
352     }
353   }
354
355   const CommunicationProtocol & BatchManager::getProtocol() const
356   {
357     return _protocol;
358   }
359
360 }