]> SALOME platform Git repositories - tools/libbatch.git/blob - src/Core/BatchManager.cxx
Salome HOME
Get results from sub directories.
[tools/libbatch.git] / src / Core / BatchManager.cxx
1 // Copyright (C) 2007-2015  CEA/DEN, EDF R&D, OPEN CASCADE
2 //
3 // Copyright (C) 2003-2007  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 // CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
5 //
6 // This library is free software; you can redistribute it and/or
7 // modify it under the terms of the GNU Lesser General Public
8 // License as published by the Free Software Foundation; either
9 // version 2.1 of the License, or (at your option) any later version.
10 //
11 // This library is distributed in the hope that it will be useful,
12 // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 // Lesser General Public License for more details.
15 //
16 // You should have received a copy of the GNU Lesser General Public
17 // License along with this library; if not, write to the Free Software
18 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
19 //
20 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 //
22 /*
23  * BatchManager.cxx :
24  *
25  * Auteur : Ivan DUTKA-MALEN - EDF R&D
26  * Date   : Septembre 2003
27  * Projet : SALOME 2
28  *
29  */
30
31 #include <iostream>
32 #include <sstream>
33 #include <string>
34
35 #include "Constants.hxx"
36 #include "Job.hxx"
37 #include "JobId.hxx"
38 #include "JobInfo.hxx"
39 #include "FactBatchManager.hxx"
40 #include "BatchManager.hxx"
41 #include "Utils.hxx"
42 #include "NotYetImplementedException.hxx"
43 #include "Log.hxx"
44
45 using namespace std;
46
47 namespace Batch {
48
49   BatchManager::BatchManager(const Batch::FactBatchManager * parent, const char* host,
50                              const char * username,
51                              CommunicationProtocolType protocolType, const char* mpiImpl)
52     : _hostname(host), jobid_map(), _type(parent->getType()),
53       _protocol(CommunicationProtocol::getInstance(protocolType)),
54       _username(username), _mpiImpl(FactoryMpiImpl(mpiImpl))
55   {
56   }
57
58
59   // Destructeur
60   BatchManager::~BatchManager()
61   {
62     delete _mpiImpl;
63   }
64
65   string BatchManager::__repr__() const
66   {
67     ostringstream oss;
68     oss << "<BatchManager of type '" << _type << "' connected to server '" << _hostname << "'>";
69     return oss.str();
70   }
71
72   // Recupere le l'identifiant d'un job deja soumis au BatchManager
73   const JobId BatchManager::getJobIdByReference(const char * ref)
74   {
75     return JobId(this, ref);
76   }
77
78   // Methode pour le controle des jobs : soumet un job au gestionnaire
79   const JobId BatchManager::submitJob(const Job & job)
80   {
81     exportInputFiles(job);
82     preprocess(job);
83     return runJob(job);
84   }
85
86   // Methode pour le controle des jobs : retire un job du gestionnaire
87   void BatchManager::deleteJob(const JobId & jobid)
88   {
89     throw NotYetImplementedException("Method deleteJob not implemented by Batch Manager \"" + _type + "\"");
90   }
91
92   // Methode pour le controle des jobs : suspend un job en file d'attente
93   void BatchManager::holdJob(const JobId & jobid)
94   {
95     throw NotYetImplementedException("Method holdJob not implemented by Batch Manager \"" + _type + "\"");
96   }
97
98   // Methode pour le controle des jobs : relache un job suspendu
99   void BatchManager::releaseJob(const JobId & jobid)
100   {
101     throw NotYetImplementedException("Method releaseJob not implemented by Batch Manager \"" + _type + "\"");
102   }
103
104   // Methode pour le controle des jobs : modifie un job en file d'attente
105   void BatchManager::alterJob(const JobId & jobid, const Parametre & param, const Environnement & env)
106   {
107     throw NotYetImplementedException("Method alterJob not implemented by Batch Manager \"" + _type + "\"");
108   }
109
110   // Methode pour le controle des jobs : modifie un job en file d'attente
111   void BatchManager::alterJob(const JobId & jobid, const Parametre & param)
112   {
113     throw NotYetImplementedException("Method alterJob not implemented by Batch Manager \"" + _type + "\"");
114   }
115
116   // Methode pour le controle des jobs : modifie un job en file d'attente
117   void BatchManager::alterJob(const JobId & jobid, const Environnement & env)
118   {
119     throw NotYetImplementedException("Method alterJob not implemented by Batch Manager \"" + _type + "\"");
120   }
121
122   // Methode pour le controle des jobs : renvoie l'etat du job
123   JobInfo BatchManager::queryJob(const JobId & jobid)
124   {
125     throw NotYetImplementedException("Method queryJob not implemented by Batch Manager \"" + _type + "\"");
126   }
127
128   const JobId BatchManager::addJob(const Job & job, const string & reference)
129   {
130     return JobId(this, reference);
131   }
132
133   //! Wait for the end of a job
134   /*!
135    *  This method is a simple way to wait for a job to end. It will query the job state at
136    *  increasing intervals and return when the job is finished (whether successfully or not) or
137    *  when the timeout is reached. This method is not intended to be generic. In many cases you
138    *  will have to write your own loop to wait for the end of a job.
139    *  \param jobid ID of the job to wait for.
140    *  \param timeout Maximum time to wait in seconds. If -1 (default), wait indefinitely.
141    *  \param initSleepTime Initial time in seconds between two queries for the job state (default is 1).
142    *  \param maxSleepTime Maximum time in seconds between two queries for the job state (default is 600).
143    *  \return The job state as returned by the last query.
144    */
145   string BatchManager::waitForJobEnd(const JobId & jobid, long timeout,
146                                      long initSleepTime, long maxSleepTime)
147   {
148     int time = 0;
149     int sleeptime = initSleepTime;
150     bool testTimeout = (timeout > -1);
151     bool timeoutReached = (testTimeout && time >= timeout);
152     JobInfo jinfo = jobid.queryJob();
153     string state = jinfo.getParametre()[STATE].str();
154     while (!timeoutReached && state != FINISHED && state != FAILED) {
155       LOG("State is \"" << state << "\"" << ", sleeping " << sleeptime << "s...");
156       Utils::sleep(sleeptime);
157       time += sleeptime;
158       timeoutReached = (testTimeout && time >= timeout);
159       sleeptime *= 2;
160       if (testTimeout && sleeptime > timeout - time)
161         sleeptime = timeout - time;
162       if (sleeptime > maxSleepTime)
163         sleeptime = maxSleepTime;
164       jinfo = jobid.queryJob();
165       state = jinfo.getParametre()[STATE].str();
166     }
167     LOG("State is \"" << state << "\"");
168     return state;
169   }
170
171
172   void BatchManager::exportInputFiles(const Job& job)
173   {
174     int status;
175     Parametre params = job.getParametre();
176     const Versatile & V = params[INFILE];
177     Versatile::const_iterator Vit;
178
179     // Create remote directories
180     string logdir = string(params[WORKDIR]) + "/logs";
181     status = _protocol.makeDirectory(logdir, _hostname, _username);
182     if (status) {
183       std::ostringstream oss;
184       oss << "Cannot create directory " << logdir << " on host " << _hostname;
185       oss << ". Return status is " << status;
186       throw RunTimeException(oss.str());
187     }
188
189     // Copy the file to execute into the remote working directory
190     string executeFile = params[EXECUTABLE];
191     if (executeFile.size() != 0) {
192       status = _protocol.copyFile(executeFile, "", "",
193                                   params[WORKDIR], _hostname, _username);
194       if (status) {
195         std::ostringstream oss;
196         oss << "Cannot copy file " << executeFile << " on host " << _hostname;
197         oss << ". Return status is " << status;
198         throw RunTimeException(oss.str());
199       }
200
201 #ifdef WIN32
202       if (_protocol.getType() != SH) {
203         // On Windows, we make the remote file executable afterward because
204         // pscp does not preserve access permissions on files
205
206         string remoteExec = string(params[EXECUTABLE]);
207         remoteExec = remoteExec.substr(remoteExec.rfind("/") + 1, remoteExec.length());
208         remoteExec = string(params[WORKDIR]) + "/" + remoteExec;
209
210         string subCommand = string("chmod u+x ") + remoteExec;
211         string command = _protocol.getExecCommand(subCommand, _hostname, _username);
212         LOG(command);
213         status = system(command.c_str());
214         if (status) {
215           std::ostringstream oss;
216           oss << "Cannot change permissions of file " << remoteExec << " on host " << _hostname;
217           oss << ". Return status is " << status;
218           throw RunTimeException(oss.str());
219         }
220       }
221 #endif
222     }
223
224     // Copy input files into the remote working directory
225     for (Vit=V.begin() ; Vit!=V.end() ; Vit++) {
226       CoupleType cpt = *static_cast< CoupleType * >(*Vit);
227       Couple inputFile = cpt;
228       string remotePath = inputFile.getRemote();
229       if (!Utils::isAbsolutePath(remotePath)) {
230         remotePath = params[WORKDIR].str() + "/" + remotePath;
231       }
232       status = _protocol.copyFile(inputFile.getLocal(), "", "",
233                                   remotePath, _hostname, _username);
234       if (status) {
235         std::ostringstream oss;
236         oss << "Cannot copy file " << inputFile.getLocal() << " on host " << _hostname;
237         oss << ". Return status is " << status;
238         throw RunTimeException(oss.str());
239       }
240     }
241
242   }
243
244   void BatchManager::importOutputFiles( const Job & job, const string directory )
245   {
246     Parametre params = job.getParametre();
247     const Versatile & V = params[OUTFILE];
248     Versatile::const_iterator Vit;
249
250     // Create local result directory
251     int status = CommunicationProtocol::getInstance(SH).makeDirectory(directory, "", "");
252     if (status)
253       LOG("Directory creation failed. Status is: " << status);
254
255     for(Vit=V.begin(); Vit!=V.end(); Vit++) {
256       CoupleType cpt  = *static_cast< CoupleType * >(*Vit);
257       Couple outputFile = cpt;
258       string remotePath = outputFile.getRemote();
259       if (!Utils::isAbsolutePath(remotePath)) {
260         remotePath = params[WORKDIR].str() + "/" + remotePath;
261       }
262       string localPath = outputFile.getLocal();
263       if (!Utils::isAbsolutePath(localPath)) {
264         localPath = directory + "/" + localPath;
265       }
266       status = CommunicationProtocol::getInstance(SH).makeDirectory(
267                                              Utils::dirname(localPath), "", "");
268       if (status)
269         LOG("Directory creation failed. Status is: " << status);
270       status = _protocol.copyFile(remotePath, _hostname, _username,
271                                   localPath, "", "");
272       if (status)
273         LOG("Copy command failed. Status is: " << status);
274     }
275
276     // Copy logs
277     status = _protocol.copyFile(string(params[WORKDIR]) + string("/logs"), _hostname, _username,
278                                 directory, "", "");
279     if (status)
280       LOG("Copy logs directory failed. Status is: " << status);
281   }
282
283   bool BatchManager::importDumpStateFile( const Job & job, const string directory )
284   {
285     Parametre params = job.getParametre();
286
287     // Create local result directory
288     int status = CommunicationProtocol::getInstance(SH).makeDirectory(directory, "", "");
289     if (status)
290       LOG("Directory creation failed. Status is: " << status);
291
292     bool ret = true;
293     status = _protocol.copyFile(string(params[WORKDIR]) + string("/dumpState*.xml"), _hostname, _username,
294                                 directory, "", "");
295     if (status) {
296       LOG("Copy command failed. Status is: " << status);
297       ret = false;
298     }
299     return ret;
300   }
301
302   bool BatchManager::importWorkFile( const Job & job,
303                                      const std::string& work_file,
304                                      const std::string& directory )
305   {
306     Parametre params = job.getParametre();
307
308     // Create local result directory
309     int status = CommunicationProtocol::getInstance(SH).makeDirectory(directory, "", "");
310     if (status)
311       LOG("Directory creation failed. Status is: " << status);
312
313     bool ret = true;
314     status = _protocol.copyFile(string(params[WORKDIR]) + "/" + work_file,
315                                 _hostname, _username,
316                                 directory, "", "");
317     if (status) {
318       LOG("Copy command failed. Status is: " << status);
319       ret = false;
320     }
321     return ret;
322   }
323
324   void BatchManager::clearWorkingDir( const Job & job)
325   {
326     Parametre params = job.getParametre();
327
328     string wd = params[WORKDIR];
329     if(!wd.empty() && wd != "/")
330     {
331       int status = _protocol.removeDirectory(wd, _hostname, _username);
332       if (status)
333         LOG("removeDirectory command failed. Status is: " << status);
334     }
335     else
336       LOG("removeDirectory command failed. Invalid working directory: " << wd);
337   }
338
339   MpiImpl *BatchManager::FactoryMpiImpl(string mpiImpl)
340   {
341     if(mpiImpl == "lam")
342       return new MpiImpl_LAM();
343     else if(mpiImpl == "mpich1")
344       return new MpiImpl_MPICH1();
345     else if(mpiImpl == "mpich2")
346       return new MpiImpl_MPICH2();
347     else if(mpiImpl == "openmpi")
348       return new MpiImpl_OPENMPI();
349     else if(mpiImpl == "ompi")
350       return new MpiImpl_OMPI();
351     else if(mpiImpl == "slurm")
352       return new MpiImpl_SLURM();
353     else if(mpiImpl == "prun")
354       return new MpiImpl_PRUN();
355     else if(mpiImpl == "nompi")
356       return NULL;
357     else{
358       ostringstream oss;
359       oss << mpiImpl << " : not yet implemented";
360       throw RunTimeException(oss.str().c_str());
361     }
362   }
363
364   const CommunicationProtocol & BatchManager::getProtocol() const
365   {
366     return _protocol;
367   }
368
369   void BatchManager::preprocess(const Batch::Job & job)
370   {
371     std::string preproCommand;
372     std::string workDir;
373     Parametre params = job.getParametre();
374     if (params.find(PREPROCESS) != params.end())
375       preproCommand = params[PREPROCESS].str();
376     if (params.find(WORKDIR) != params.end())
377       workDir = params[WORKDIR].str();
378
379     if(!preproCommand.empty() && !workDir.empty())
380     {
381       std::string subCommand = "cd " + workDir + "; " + preproCommand;
382       std::string command = _protocol.getExecCommand(subCommand, _hostname, _username);
383       command += " 2>&1";
384       LOG(command);
385
386       // submit job
387       std::string output;
388       int status = Utils::getCommandOutput(command, output);
389       LOG(output);
390       if (status != 0)
391         throw RunTimeException("Error when executing: " + command +
392                                "\nOutput:" + output);
393     }
394   }
395   
396   const Batch::JobId BatchManager::runJob(const Batch::Job & job)
397   {
398     throw NotYetImplementedException("Method runJob not implemented by Batch Manager \"" + _type + "\"");
399   }
400 }