Salome HOME
1af2bec3e2edc2e743a4f671b60738e610de0650
[tools/libbatch.git] / src / Core / Batch_BatchManager.cxx
1 //  Copyright (C) 2007-2012  CEA/DEN, EDF R&D, OPEN CASCADE
2 //
3 //  Copyright (C) 2003-2007  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 //  CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
5 //
6 //  This library is free software; you can redistribute it and/or
7 //  modify it under the terms of the GNU Lesser General Public
8 //  License as published by the Free Software Foundation; either
9 //  version 2.1 of the License.
10 //
11 //  This library is distributed in the hope that it will be useful,
12 //  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 //  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 //  Lesser General Public License for more details.
15 //
16 //  You should have received a copy of the GNU Lesser General Public
17 //  License along with this library; if not, write to the Free Software
18 //  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
19 //
20 //  See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 //
22 /*
23  * BatchManager.cxx :
24  *
25  * Auteur : Ivan DUTKA-MALEN - EDF R&D
26  * Date   : Septembre 2003
27  * Projet : SALOME 2
28  *
29  */
30
31 #include <iostream>
32 #include <sstream>
33 #include <string>
34 #ifdef WIN32
35 # include<winsock2.h>
36 #else
37 #include <unistd.h>
38 # include <netdb.h>
39 #endif
40
41 #include "Batch_Constants.hxx"
42 #include "Batch_Job.hxx"
43 #include "Batch_JobId.hxx"
44 #include "Batch_JobInfo.hxx"
45 #include "Batch_InvalidArgumentException.hxx"
46 #include "Batch_FactBatchManager.hxx"
47 #include "Batch_BatchManager.hxx"
48 #include "Batch_Utils.hxx"
49
50 #ifdef WIN32
51 #define sleep(seconds) Sleep((seconds)*1000)
52 #endif
53
54 using namespace std;
55
56 namespace Batch {
57
58   BatchManager::BatchManager(const Batch::FactBatchManager * parent, const char* host,
59                              const char * username,
60                              CommunicationProtocolType protocolType, const char* mpiImpl)
61     : _hostname(host), jobid_map(), _parent(parent),
62       _protocol(CommunicationProtocol::getInstance(protocolType)),
63       _username(username), _mpiImpl(FactoryMpiImpl(mpiImpl))
64   {
65   }
66
67
68   // Destructeur
69   BatchManager::~BatchManager()
70   {
71     delete _mpiImpl;
72   }
73
74   string BatchManager::__repr__() const
75   {
76     ostringstream oss;
77     oss << "<BatchManager of type '" << (_parent ? _parent->getType() : "unknown (no factory)") << "' connected to server '" << _hostname << "'>";
78     return oss.str();
79   }
80
81   // Recupere le l'identifiant d'un job deja soumis au BatchManager
82 //   const JobId BatchManager::getJobIdByReference(const string & ref)
83 //   {
84 //     return JobId(this, ref);
85 //   }
86   const JobId BatchManager::getJobIdByReference(const char * ref)
87   {
88     return JobId(this, ref);
89   }
90
91 //   // Methode pour le controle des jobs : soumet un job au gestionnaire
92 //   const JobId BatchManager::submitJob(const Job & job)
93 //   {
94 //     static int idx = 0;
95 //     //MEDMEM::STRING sst;
96 //     ostringstream sst;
97 //     sst << "Jobid_" << idx++;
98 //     JobId id(this, sst.str());
99 //     return id;
100 //   }
101
102 //   // Methode pour le controle des jobs : retire un job du gestionnaire
103 //   void BatchManager::deleteJob(const JobId & jobid)
104 //   {
105 //     // Nothing to do
106 //   }
107
108 //   // Methode pour le controle des jobs : suspend un job en file d'attente
109 //   void BatchManager::holdJob(const JobId & jobid)
110 //   {
111 //     // Nothing to do
112 //   }
113
114 //   // Methode pour le controle des jobs : relache un job suspendu
115 //   void BatchManager::releaseJob(const JobId & jobid)
116 //   {
117 //     // Nothing to do
118 //   }
119
120 //   // Methode pour le controle des jobs : modifie un job en file d'attente
121 //   void BatchManager::alterJob(const JobId & jobid, const Parametre & param, const Environnement & env)
122 //   {
123 //     // Nothing to do
124 //   }
125
126 //   // Methode pour le controle des jobs : modifie un job en file d'attente
127 //   void BatchManager::alterJob(const JobId & jobid, const Parametre & param)
128 //   {
129 //     // Nothing to do
130 //   }
131
132 //   // Methode pour le controle des jobs : modifie un job en file d'attente
133 //   void BatchManager::alterJob(const JobId & jobid, const Environnement & env)
134 //   {
135 //     // Nothing to do
136 //   }
137
138 //   // Methode pour le controle des jobs : renvoie l'etat du job
139 //   JobInfo BatchManager::queryJob(const JobId & jobid)
140 //   {
141 //     return JobInfo();
142 //   }
143
144   //! Wait for the end of a job
145   /*!
146    *  This method is a simple way to wait for a job to end. It will query the job state at
147    *  increasing intervals and return when the job is finished (whether successfully or not) or
148    *  when the timeout is reached. This method is not intended to be generic. In many cases you
149    *  will have to write your own loop to wait for the end of a job.
150    *  \param jobid ID of the job to wait for.
151    *  \param timeout Maximum time to wait in seconds. If -1 (default), wait indefinitely.
152    *  \param initSleepTime Initial time in seconds between two queries for the job state (default is 1).
153    *  \param maxSleepTime Maximum time in seconds between two queries for the job state (default is 600).
154    *  \return The job state as returned by the last query.
155    */
156   string BatchManager::waitForJobEnd(const JobId & jobid, long timeout,
157                                      long initSleepTime, long maxSleepTime)
158   {
159     int time = 0;
160     int sleeptime = initSleepTime;
161     bool testTimeout = (timeout > -1);
162     bool timeoutReached = (testTimeout && time >= timeout);
163     JobInfo jinfo = jobid.queryJob();
164     string state = jinfo.getParametre()[STATE].str();
165     cout << "State is \"" << state << "\"";
166     while (!timeoutReached && state != FINISHED && state != FAILED) {
167       cout << ", sleeping " << sleeptime << "s..." << endl;
168       sleep(sleeptime);
169       time += sleeptime;
170       timeoutReached = (testTimeout && time >= timeout);
171       sleeptime *= 2;
172       if (testTimeout && sleeptime > timeout - time)
173         sleeptime = timeout - time;
174       if (sleeptime > maxSleepTime)
175         sleeptime = maxSleepTime;
176       jinfo = jobid.queryJob();
177       state = jinfo.getParametre()[STATE].str();
178       cout << "State is \"" << state << "\"";
179     }
180     cout << endl;
181     return state;
182   }
183
184
185   void BatchManager::exportInputFiles(const Job& job)
186   {
187     int status;
188     Parametre params = job.getParametre();
189     const Versatile & V = params[INFILE];
190     Versatile::const_iterator Vit;
191
192     // Create remote directories
193     string logdir = string(params[WORKDIR]) + "/logs";
194     status = _protocol.makeDirectory(logdir, _hostname, _username);
195     if (status) {
196       std::ostringstream oss;
197       oss << "Cannot create directory " << logdir << " on host " << _hostname;
198       oss << ". Return status is " << status;
199       throw RunTimeException(oss.str());
200     }
201
202     // Copy the file to execute into the remote working directory
203     string executeFile = params[EXECUTABLE];
204     if (executeFile.size() != 0) {
205       status = _protocol.copyFile(executeFile, "", "",
206                                   params[WORKDIR], _hostname, _username);
207       if (status) {
208         std::ostringstream oss;
209         oss << "Cannot copy file " << executeFile << " on host " << _hostname;
210         oss << ". Return status is " << status;
211         throw RunTimeException(oss.str());
212       }
213
214 #ifdef WIN32
215       // On Windows, we make the remote file executable afterward because
216       // pscp does not preserve access permissions on files
217
218       string remoteExec = string(params[EXECUTABLE]);
219       remoteExec = remoteExec.substr(remoteExec.rfind("\\") + 1, remoteExec.length());
220       remoteExec = string(params[WORKDIR]) + "/" + executable;
221
222       string subCommand = string("chmod u+x ") + remoteExec;
223       string command = _protocol.getExecCommand(subCommand, _hostname, _username);
224       cerr << command.c_str() << endl;
225       status = system(command.c_str());
226       if (status) {
227         std::ostringstream oss;
228         oss << "Cannot change permissions of file " << remoteExec << " on host " << _hostname;
229         oss << ". Return status is " << status;
230         throw RunTimeException(oss.str());
231       }
232 #endif
233     }
234
235     // Copy input files into the remote working directory
236     for (Vit=V.begin() ; Vit!=V.end() ; Vit++) {
237       CoupleType cpt = *static_cast< CoupleType * >(*Vit);
238       Couple inputFile = cpt;
239       string remotePath = inputFile.getRemote();
240       if (!Utils::isAbsolutePath(remotePath)) {
241         remotePath = params[WORKDIR].str() + "/" + remotePath;
242       }
243       status = _protocol.copyFile(inputFile.getLocal(), "", "",
244                                   remotePath, _hostname, _username);
245       if (status) {
246         std::ostringstream oss;
247         oss << "Cannot copy file " << inputFile.getLocal() << " on host " << _hostname;
248         oss << ". Return status is " << status;
249         throw RunTimeException(oss.str());
250       }
251     }
252
253   }
254
255   void BatchManager::importOutputFiles( const Job & job, const string directory )
256   {
257     Parametre params = job.getParametre();
258     const Versatile & V = params[OUTFILE];
259     Versatile::const_iterator Vit;
260
261     // Create local result directory
262     int status = CommunicationProtocol::getInstance(SH).makeDirectory(directory, "", "");
263     if (status) {
264       string mess("Directory creation failed. Status is :");
265       ostringstream status_str;
266       status_str << status;
267       mess += status_str.str();
268       cerr << mess << endl;
269     }
270
271     for(Vit=V.begin(); Vit!=V.end(); Vit++) {
272       CoupleType cpt  = *static_cast< CoupleType * >(*Vit);
273       Couple outputFile = cpt;
274       string remotePath = outputFile.getRemote();
275       if (!Utils::isAbsolutePath(remotePath)) {
276         remotePath = params[WORKDIR].str() + "/" + remotePath;
277       }
278       string localPath = outputFile.getLocal();
279       if (!Utils::isAbsolutePath(localPath)) {
280         localPath = directory + "/" + localPath;
281       }
282       status = _protocol.copyFile(remotePath, _hostname, _username,
283                                   localPath, "", "");
284       if (status) {
285         // Try to get what we can (logs files)
286         // throw BatchException("Error of connection on remote host");
287         std::string mess("Copy command failed ! status is :");
288         ostringstream status_str;
289         status_str << status;
290         mess += status_str.str();
291         cerr << mess << endl;
292       }
293     }
294
295     // Copy logs
296     status = _protocol.copyFile(string(params[WORKDIR]) + string("/logs"), _hostname, _username,
297                                 directory, "", "");
298     if (status) {
299       std::string mess("Copy logs directory failed ! status is :");
300       ostringstream status_str;
301       status_str << status;
302       mess += status_str.str();
303       cerr << mess << endl;
304     }
305
306   }
307
308   bool BatchManager::importDumpStateFile( const Job & job, const string directory )
309   {
310     Parametre params = job.getParametre();
311
312     // Create local result directory
313     int status = CommunicationProtocol::getInstance(SH).makeDirectory(directory, "", "");
314     if (status) {
315       string mess("Directory creation failed. Status is :");
316       ostringstream status_str;
317       status_str << status;
318       mess += status_str.str();
319       cerr << mess << endl;
320     }
321
322     bool ret = true;
323     status = _protocol.copyFile(string(params[WORKDIR]) + string("/dumpState*.xml"), _hostname, _username,
324                                 directory, "", "");
325     if (status) {
326       // Try to get what we can (logs files)
327       // throw BatchException("Error of connection on remote host");
328       std::string mess("Copy command failed ! status is :");
329       ostringstream status_str;
330       status_str << status;
331       mess += status_str.str();
332       cerr << mess << endl;
333       ret = false;
334     }
335     return ret;
336   }
337
338   MpiImpl *BatchManager::FactoryMpiImpl(string mpiImpl)
339   {
340     if(mpiImpl == "lam")
341       return new MpiImpl_LAM();
342     else if(mpiImpl == "mpich1")
343       return new MpiImpl_MPICH1();
344     else if(mpiImpl == "mpich2")
345       return new MpiImpl_MPICH2();
346     else if(mpiImpl == "openmpi")
347       return new MpiImpl_OPENMPI();
348     else if(mpiImpl == "ompi")
349       return new MpiImpl_OMPI();
350     else if(mpiImpl == "slurm")
351       return new MpiImpl_SLURM();
352     else if(mpiImpl == "prun")
353       return new MpiImpl_PRUN();
354     else if(mpiImpl == "nompi")
355       return NULL;
356     else{
357       ostringstream oss;
358       oss << mpiImpl << " : not yet implemented";
359       throw RunTimeException(oss.str().c_str());
360     }
361   }
362
363   const CommunicationProtocol & BatchManager::getProtocol() const
364   {
365     return _protocol;
366   }
367
368 }