Salome HOME
Copyright update 2021
[tools/libbatch.git] / src / Core / BatchManager.cxx
1 // Copyright (C) 2007-2021  CEA/DEN, EDF R&D, OPEN CASCADE
2 //
3 // Copyright (C) 2003-2007  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 // CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
5 //
6 // This library is free software; you can redistribute it and/or
7 // modify it under the terms of the GNU Lesser General Public
8 // License as published by the Free Software Foundation; either
9 // version 2.1 of the License, or (at your option) any later version.
10 //
11 // This library is distributed in the hope that it will be useful,
12 // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 // Lesser General Public License for more details.
15 //
16 // You should have received a copy of the GNU Lesser General Public
17 // License along with this library; if not, write to the Free Software
18 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
19 //
20 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 //
22 /*
23  * BatchManager.cxx :
24  *
25  * Auteur : Ivan DUTKA-MALEN - EDF R&D
26  * Date   : Septembre 2003
27  * Projet : SALOME 2
28  *
29  */
30
31 #include <iostream>
32 #include <sstream>
33 #include <string>
34
35 #include "Constants.hxx"
36 #include "Job.hxx"
37 #include "JobId.hxx"
38 #include "JobInfo.hxx"
39 #include "FactBatchManager.hxx"
40 #include "BatchManager.hxx"
41 #include "Utils.hxx"
42 #include "NotYetImplementedException.hxx"
43 #include "Log.hxx"
44
45 using namespace std;
46
47 namespace Batch {
48
49   BatchManager::BatchManager(const Batch::FactBatchManager * parent, const char* host,
50                              const char * username,
51                              CommunicationProtocolType protocolType, const char* mpiImpl)
52     : _hostname(host), jobid_map(), _type(parent->getType()),
53       _protocol(CommunicationProtocol::getInstance(protocolType)),
54       _username(username), _mpiImpl(FactoryMpiImpl(mpiImpl))
55   {
56   }
57
58
59   // Destructeur
60   BatchManager::~BatchManager()
61   {
62     delete _mpiImpl;
63   }
64
65   string BatchManager::__repr__() const
66   {
67     ostringstream oss;
68     oss << "<BatchManager of type '" << _type << "' connected to server '" << _hostname << "'>";
69     return oss.str();
70   }
71
72   // Recupere le l'identifiant d'un job deja soumis au BatchManager
73   const JobId BatchManager::getJobIdByReference(const char * ref)
74   {
75     return JobId(this, ref);
76   }
77
78   // Methode pour le controle des jobs : soumet un job au gestionnaire
79   const JobId BatchManager::submitJob(const Job & job)
80   {
81     exportInputFiles(job);
82     preprocess(job);
83     return runJob(job);
84   }
85
86   // Methode pour le controle des jobs : retire un job du gestionnaire
87   void BatchManager::deleteJob(const JobId & /*jobid*/)
88   {
89     throw NotYetImplementedException("Method deleteJob not implemented by Batch Manager \"" + _type + "\"");
90   }
91
92   // Methode pour le controle des jobs : suspend un job en file d'attente
93   void BatchManager::holdJob(const JobId & /*jobid*/)
94   {
95     throw NotYetImplementedException("Method holdJob not implemented by Batch Manager \"" + _type + "\"");
96   }
97
98   // Methode pour le controle des jobs : relache un job suspendu
99   void BatchManager::releaseJob(const JobId & /*jobid*/)
100   {
101     throw NotYetImplementedException("Method releaseJob not implemented by Batch Manager \"" + _type + "\"");
102   }
103
104   // Methode pour le controle des jobs : modifie un job en file d'attente
105   void BatchManager::alterJob(const JobId & /*jobid*/, const Parametre & /*param*/, const Environnement & /*env*/)
106   {
107     throw NotYetImplementedException("Method alterJob not implemented by Batch Manager \"" + _type + "\"");
108   }
109
110   // Methode pour le controle des jobs : modifie un job en file d'attente
111   void BatchManager::alterJob(const JobId & /*jobid*/, const Parametre & /*param*/)
112   {
113     throw NotYetImplementedException("Method alterJob not implemented by Batch Manager \"" + _type + "\"");
114   }
115
116   // Methode pour le controle des jobs : modifie un job en file d'attente
117   void BatchManager::alterJob(const JobId & /*jobid*/, const Environnement & /*env*/)
118   {
119     throw NotYetImplementedException("Method alterJob not implemented by Batch Manager \"" + _type + "\"");
120   }
121
122   // Methode pour le controle des jobs : renvoie l'etat du job
123   JobInfo BatchManager::queryJob(const JobId & /*jobid*/)
124   {
125     throw NotYetImplementedException("Method queryJob not implemented by Batch Manager \"" + _type + "\"");
126   }
127
128   const JobId BatchManager::addJob(const Job & /*job*/, const string & reference)
129   {
130     return JobId(this, reference);
131   }
132
133   //! Wait for the end of a job
134   /*!
135    *  This method is a simple way to wait for a job to end. It will query the job state at
136    *  increasing intervals and return when the job is finished (whether successfully or not) or
137    *  when the timeout is reached. This method is not intended to be generic. In many cases you
138    *  will have to write your own loop to wait for the end of a job.
139    *  \param jobid ID of the job to wait for.
140    *  \param timeout Maximum time to wait in seconds. If -1 (default), wait indefinitely.
141    *  \param initSleepTime Initial time in seconds between two queries for the job state (default is 1).
142    *  \param maxSleepTime Maximum time in seconds between two queries for the job state (default is 600).
143    *  \return The job state as returned by the last query.
144    */
145   string BatchManager::waitForJobEnd(const JobId & jobid, long timeout,
146                                      long initSleepTime, long maxSleepTime)
147   {
148     int time = 0;
149     int sleeptime = initSleepTime;
150     bool testTimeout = (timeout > -1);
151     bool timeoutReached = (testTimeout && time >= timeout);
152     JobInfo jinfo = jobid.queryJob();
153     string state = jinfo.getParametre()[STATE].str();
154     while (!timeoutReached && state != FINISHED && state != FAILED) {
155       LOG("State is \"" << state << "\"" << ", sleeping " << sleeptime << "s...");
156       Utils::sleep(sleeptime);
157       time += sleeptime;
158       timeoutReached = (testTimeout && time >= timeout);
159       sleeptime *= 2;
160       if (testTimeout && sleeptime > timeout - time)
161         sleeptime = timeout - time;
162       if (sleeptime > maxSleepTime)
163         sleeptime = maxSleepTime;
164       jinfo = jobid.queryJob();
165       state = jinfo.getParametre()[STATE].str();
166     }
167     LOG("State is \"" << state << "\"");
168     return state;
169   }
170
171
172   void BatchManager::exportInputFiles(const Job& job)
173   {
174     int status;
175     Parametre params = job.getParametre();
176     const Versatile & V = params[INFILE];
177     Versatile::const_iterator Vit;
178
179     // Create remote directories
180     string logdir = string(params[WORKDIR]) + "/logs";
181     status = _protocol.makeDirectory(logdir, _hostname, _username);
182     if (status) {
183       std::ostringstream oss;
184       oss << "Cannot create directory " << logdir << " on host " << _hostname;
185       oss << ". Return status is " << status;
186       throw RunTimeException(oss.str());
187     }
188
189     // Copy the file to execute into the remote working directory
190     string executeFile = params[EXECUTABLE];
191     if (executeFile.size() != 0) {
192       status = _protocol.copyFile(executeFile, "", "",
193                                   params[WORKDIR], _hostname, _username);
194       if (status) {
195         std::ostringstream oss;
196         oss << "Cannot copy file " << executeFile << " on host " << _hostname;
197         oss << ". Return status is " << status;
198         throw RunTimeException(oss.str());
199       }
200
201 #ifdef WIN32
202       if (_protocol.getType() != SH) {
203         // On Windows, we make the remote file executable afterward because
204         // pscp does not preserve access permissions on files
205
206         string remoteExec = string(params[EXECUTABLE]);
207         remoteExec = remoteExec.substr(remoteExec.rfind("/") + 1, remoteExec.length());
208         remoteExec = string(params[WORKDIR]) + "/" + remoteExec;
209
210         string subCommand = string("chmod u+x ") + remoteExec;
211         string command = _protocol.getExecCommand(subCommand, _hostname, _username);
212         LOG(command);
213         status = system(command.c_str());
214         if (status) {
215           std::ostringstream oss;
216           oss << "Cannot change permissions of file " << remoteExec << " on host " << _hostname;
217           oss << ". Return status is " << status;
218           throw RunTimeException(oss.str());
219         }
220       }
221 #endif
222     }
223
224     // Copy input files into the remote working directory
225     for (Vit=V.begin() ; Vit!=V.end() ; Vit++) {
226       CoupleType cpt = *static_cast< CoupleType * >(*Vit);
227       Couple inputFile = cpt;
228       string remotePath = inputFile.getRemote();
229       if (!Utils::isAbsolutePath(remotePath)) {
230         remotePath = params[WORKDIR].str() + "/" + remotePath;
231       }
232       status = _protocol.copyFile(inputFile.getLocal(), "", "",
233                                   remotePath, _hostname, _username);
234       if (status) {
235         std::ostringstream oss;
236         oss << "Cannot copy file " << inputFile.getLocal() << " on host " << _hostname;
237         oss << ". Return status is " << status;
238         throw RunTimeException(oss.str());
239       }
240     }
241
242   }
243
244   void BatchManager::importOutputFiles( const Job & job, const string directory )
245   {
246     Parametre params = job.getParametre();
247     const Versatile & V = params[OUTFILE];
248     Versatile::const_iterator Vit;
249
250     // Create local result directory
251     int status = CommunicationProtocol::getInstance(SH).makeDirectory(directory, "", "");
252     if (status)
253       LOG("Directory creation failed. Status is: " << status);
254
255     for(Vit=V.begin(); Vit!=V.end(); Vit++) {
256       CoupleType cpt  = *static_cast< CoupleType * >(*Vit);
257       Couple outputFile = cpt;
258       string remotePath = outputFile.getRemote();
259       if (!Utils::isAbsolutePath(remotePath) && !Utils::isOption(remotePath)) {
260         // rsync creates the whole tree after /./ in the destination folder
261         remotePath = params[WORKDIR].str() + "/./" + remotePath;
262       }
263       string localPath = outputFile.getLocal();
264       if (!Utils::isAbsolutePath(localPath)) {
265         localPath = directory + "/" + localPath;
266       }
267       status = _protocol.copyFile(remotePath, _hostname, _username,
268                                   localPath, "", "");
269       if (status)
270         LOG("Copy command failed. Status is: " << status);
271     }
272
273     // Copy logs
274     status = _protocol.copyFile(string(params[WORKDIR]) + string("/logs"), _hostname, _username,
275                                 directory, "", "");
276     if (status)
277       LOG("Copy logs directory failed. Status is: " << status);
278   }
279
280   bool BatchManager::importDumpStateFile( const Job & job, const string directory )
281   {
282     Parametre params = job.getParametre();
283
284     // Create local result directory
285     int status = CommunicationProtocol::getInstance(SH).makeDirectory(directory, "", "");
286     if (status)
287       LOG("Directory creation failed. Status is: " << status);
288
289     bool ret = true;
290     status = _protocol.copyFile(string(params[WORKDIR]) + string("/dumpState*.xml"), _hostname, _username,
291                                 directory, "", "");
292     if (status) {
293       LOG("Copy command failed. Status is: " << status);
294       ret = false;
295     }
296     return ret;
297   }
298
299   bool BatchManager::importWorkFile( const Job & job,
300                                      const std::string& work_file,
301                                      const std::string& directory )
302   {
303     Parametre params = job.getParametre();
304
305     // Create local result directory
306     int status = CommunicationProtocol::getInstance(SH).makeDirectory(directory, "", "");
307     if (status)
308       LOG("Directory creation failed. Status is: " << status);
309
310     bool ret = true;
311     status = _protocol.copyFile(string(params[WORKDIR]) + "/" + work_file,
312                                 _hostname, _username,
313                                 directory, "", "");
314     if (status) {
315       LOG("Copy command failed. Status is: " << status);
316       ret = false;
317     }
318     return ret;
319   }
320
321   void BatchManager::clearWorkingDir( const Job & job)
322   {
323     Parametre params = job.getParametre();
324
325     string wd = params[WORKDIR];
326     if(!wd.empty() && wd != "/")
327     {
328       int status = _protocol.removeDirectory(wd, _hostname, _username);
329       if (status)
330         LOG("removeDirectory command failed. Status is: " << status);
331     }
332     else
333       LOG("removeDirectory command failed. Invalid working directory: " << wd);
334   }
335
336   MpiImpl *BatchManager::FactoryMpiImpl(string mpiImpl)
337   {
338     if(mpiImpl == "lam")
339       return new MpiImpl_LAM();
340     else if(mpiImpl == "mpich1")
341       return new MpiImpl_MPICH1();
342     else if(mpiImpl == "mpich2")
343       return new MpiImpl_MPICH2();
344     else if(mpiImpl == "openmpi")
345       return new MpiImpl_OPENMPI();
346     else if(mpiImpl == "ompi")
347       return new MpiImpl_OMPI();
348     else if(mpiImpl == "slurm")
349       return new MpiImpl_SLURM();
350     else if(mpiImpl == "prun")
351       return new MpiImpl_PRUN();
352     else if(mpiImpl == "nompi")
353       return NULL;
354     else{
355       ostringstream oss;
356       oss << mpiImpl << " : not yet implemented";
357       throw RunTimeException(oss.str().c_str());
358     }
359   }
360
361   const CommunicationProtocol & BatchManager::getProtocol() const
362   {
363     return _protocol;
364   }
365
366   void BatchManager::preprocess(const Batch::Job & job)
367   {
368     std::string preproCommand;
369     std::string workDir;
370     Parametre params = job.getParametre();
371     if (params.find(PREPROCESS) != params.end())
372       preproCommand = params[PREPROCESS].str();
373     if (params.find(WORKDIR) != params.end())
374       workDir = params[WORKDIR].str();
375
376     if(!preproCommand.empty() && !workDir.empty())
377     {
378       std::string subCommand = "cd " + workDir + "; " + preproCommand;
379       std::string command = _protocol.getExecCommand(subCommand, _hostname, _username);
380       command += " 2>&1";
381       LOG(command);
382
383       // submit job
384       std::string output;
385       int status = Utils::getCommandOutput(command, output);
386       LOG(output);
387       if (status != 0)
388         throw RunTimeException("Error when executing: " + command +
389                                "\nOutput:" + output);
390     }
391   }
392   
393   const Batch::JobId BatchManager::runJob(const Batch::Job & /*job*/)
394   {
395     throw NotYetImplementedException("Method runJob not implemented by Batch Manager \"" + _type + "\"");
396   }
397 }