Salome HOME
New access protocol rsync.
[modules/kernel.git] / src / Launcher / Launcher_Job.cxx
1 // Copyright (C) 2009-2017  CEA/DEN, EDF R&D, OPEN CASCADE
2 //
3 // This library is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU Lesser General Public
5 // License as published by the Free Software Foundation; either
6 // version 2.1 of the License, or (at your option) any later version.
7 //
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
11 // Lesser General Public License for more details.
12 //
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
16 //
17 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
18 //
19
20 // Author: AndrĂ© RIBES - EDF R&D
21 //
22 //#define _DEBUG_
23 #include "Launcher_Job.hxx"
24 #include "Launcher.hxx"
25 #include <boost/filesystem.hpp>
26
27 #ifdef WITH_LIBBATCH
28 #include <libbatch/Constants.hxx>
29 #endif
30
31 #include <sstream>
32
33 Launcher::Job::Job()
34 {
35   _number = -1;
36   _state = "CREATED";
37   _launch_date = getLaunchDate();
38
39   _env_file = "";
40   _job_name = "";
41   _job_file = "";
42   _job_file_name = "";
43   _job_file_name_complete = "";
44   _pre_command = "";
45   _work_directory = "";
46   _local_directory = "";
47   _result_directory = "";
48   _maximum_duration = "";
49   _maximum_duration_in_second = -1;
50   _queue = "";
51   _partition = "";
52   _job_type = "";
53   _exclusive = false;
54   _mem_per_cpu = 0;
55
56   // Parameters for COORM
57   _launcher_file = "";
58   _launcher_args = "";
59
60 #ifdef WITH_LIBBATCH
61   _batch_job = new Batch::Job();
62 #endif
63 }
64
65 Launcher::Job::~Job()
66 {
67   LAUNCHER_MESSAGE("Deleting job number: " << _number);
68 #ifdef WITH_LIBBATCH
69   if (_batch_job)
70     delete _batch_job;
71 #endif
72 }
73
74 void
75 Launcher::Job::stopJob()
76 {
77   LAUNCHER_MESSAGE("Stop resquested for job number: " << _number);
78   setState("FAILED");
79 #ifdef WITH_LIBBATCH
80   if (_batch_job_id.getReference() != "undefined")
81   {
82     try
83     {
84       _batch_job_id.deleteJob();
85     }
86     catch (const Batch::GenericException &ex)
87     {
88       LAUNCHER_INFOS("WARNING: exception when stopping the job: " << ex.message);
89     }
90   }
91 #endif
92 }
93
94
95 void
96 Launcher::Job::removeJob()
97 {
98   LAUNCHER_MESSAGE("Removing job number: " << _number);
99 #ifdef WITH_LIBBATCH
100   if (_batch_job_id.getReference() != "undefined")
101   {
102     try
103     {
104       _batch_job_id.deleteJob();
105     }
106     catch (const Batch::GenericException &ex)
107     {
108       LAUNCHER_INFOS("WARNING: exception when removing the job: " << ex.message);
109     }
110   }
111 #endif
112 }
113
114 std::string
115 Launcher::Job::getJobType() const
116 {
117   return _job_type;
118 }
119
120 void
121 Launcher::Job::setJobName(const std::string & job_name)
122 {
123   _job_name = job_name;
124 }
125
126 std::string
127 Launcher::Job::getJobName() const
128 {
129   return _job_name;
130 }
131
132 void
133 Launcher::Job::setState(const std::string & state)
134 {
135   // State of a Job: CREATED, QUEUED, RUNNING, FINISHED, FAILED
136   if (state != "CREATED" &&
137       state != "IN_PROCESS" &&
138       state != "QUEUED" &&
139       state != "RUNNING" &&
140       state != "PAUSED" &&
141       state != "FINISHED" &&
142       state != "FAILED" &&
143       state != "ERROR")
144   {
145     throw LauncherException("Bad state, this state does not exist: " + state);
146   }
147   _state = state;
148 }
149
150 std::string
151 Launcher::Job::getState() const
152 {
153   return _state;
154 }
155
156 // Get names or ids of hosts assigned to the job
157 std::string
158 Launcher::Job::getAssignedHostnames()
159 {
160   return _assigned_hostnames;
161 }
162
163 void
164 Launcher::Job::setNumber(const int & number)
165 {
166   if (_number != -1)
167     std::cerr << "Launcher::Job::setNumber -- Job number was already defined, before: " << _number << " now: " << number << std::endl;
168   _number = number;
169 }
170
171 int
172 Launcher::Job::getNumber()
173 {
174   return _number;
175 }
176
177 void
178 Launcher::Job::setResourceDefinition(const ParserResourcesType & resource_definition)
179 {
180   // Check machine_definition
181   std::string user_name = "";
182   if (resource_definition.UserName == "")
183   {
184     user_name = getenv("USER");
185     if (user_name == "")
186       user_name = getenv("LOGNAME");
187     if (user_name == "")
188     {
189       std::string mess = "You must define a user name: into your resource description or with one of env variables USER/LOGNAME";
190       throw LauncherException(mess);
191     }
192   }
193   else
194     user_name = resource_definition.UserName;
195
196   _resource_definition = resource_definition;
197   _resource_definition.UserName = user_name;
198 }
199
200 ParserResourcesType
201 Launcher::Job::getResourceDefinition() const
202 {
203   return _resource_definition;
204 }
205
206 void
207 Launcher::Job::setJobFile(const std::string & job_file)
208 {
209   // Check job file
210   if (job_file == "")
211   {
212     std::string mess = "Empty Job File is forbidden !";
213     throw LauncherException(mess);
214   }
215
216   _job_file = job_file;
217   std::string::size_type p1 = _job_file.find_last_of("/");
218   std::string::size_type p2 = _job_file.find_last_of(".");
219   _job_file_name_complete = _job_file.substr(p1+1);
220   _job_file_name = _job_file.substr(p1+1,p2-p1-1);
221 }
222
223 std::string
224 Launcher::Job::getJobFile() const
225 {
226   return _job_file;
227 }
228 void
229 Launcher::Job::setEnvFile(const std::string & env_file)
230 {
231   _env_file = env_file;
232 }
233
234 std::string
235 Launcher::Job::getEnvFile() const
236 {
237   return _env_file;
238 }
239
240 void
241 Launcher::Job::setWorkDirectory(const std::string & work_directory)
242 {
243   _work_directory = work_directory;
244 }
245
246 void
247 Launcher::Job::setLocalDirectory(const std::string & local_directory)
248 {
249   _local_directory = local_directory;
250 }
251
252 void
253 Launcher::Job::setResultDirectory(const std::string & result_directory)
254 {
255   _result_directory = result_directory;
256 }
257
258 void
259 Launcher::Job::add_in_file(const std::string & file)
260 {
261   std::list<std::string>::iterator it = std::find(_in_files.begin(), _in_files.end(), file);
262   if (it == _in_files.end())
263     _in_files.push_back(file);
264   else
265     std::cerr << "Launcher::Job::add_in_file -- Warning file was already entered in in_files: " << file << std::endl;
266 }
267
268 void
269 Launcher::Job::add_out_file(const std::string & file)
270 {
271   std::list<std::string>::iterator it = std::find(_out_files.begin(), _out_files.end(), file);
272   if (it == _out_files.end())
273     _out_files.push_back(file);
274   else
275     std::cerr << "Launcher::Job::add_out_file -- Warning file was already entered in out_files: " << file << std::endl;
276 }
277
278 void
279 Launcher::Job::setMaximumDuration(const std::string & maximum_duration)
280 {
281   checkMaximumDuration(maximum_duration);
282   _maximum_duration_in_second = convertMaximumDuration(maximum_duration);
283   _maximum_duration = maximum_duration;
284 }
285
286 // For COORM
287 void
288 Launcher::Job::setLauncherFile(const std::string & launcher_file)
289 {
290         _launcher_file = launcher_file;
291 }
292 void
293 Launcher::Job::setLauncherArgs(const std::string & launcher_args)
294 {
295         _launcher_args = launcher_args;
296 }
297
298 void
299 Launcher::Job::setResourceRequiredParams(const resourceParams & resource_required_params)
300 {
301   checkResourceRequiredParams(resource_required_params);
302   _resource_required_params = resource_required_params;
303 }
304
305 void
306 Launcher::Job::setQueue(const std::string & queue)
307 {
308   _queue = queue;
309 }
310
311 void
312 Launcher::Job::setPartition(const std::string & partition)
313 {
314   _partition = partition;
315 }
316
317 void
318 Launcher::Job::setExclusive(bool exclusive)
319 {
320   _exclusive = exclusive;
321 }
322
323 void
324 Launcher::Job::setExclusiveStr(const std::string & exclusiveStr)
325 {
326   if (exclusiveStr == "true")
327     _exclusive = true;
328   else if (exclusiveStr == "false")
329     _exclusive = false;
330   else
331     throw LauncherException(std::string("Invalid boolean value for exclusive: ") + exclusiveStr);
332 }
333
334 void
335 Launcher::Job::setMemPerCpu(unsigned long mem_per_cpu)
336 {
337   _mem_per_cpu = mem_per_cpu;
338 }
339
340 void
341 Launcher::Job::setWCKey(const std::string & wckey)
342 {
343   _wckey = wckey;
344 }
345
346 void
347 Launcher::Job::setExtraParams(const std::string & extra_params)
348 {
349   _extra_params = extra_params;
350 }
351
352 void
353 Launcher::Job::setReference(const std::string & reference)
354 {
355   _reference = reference;
356 }
357
358 std::string
359 Launcher::Job::getWorkDirectory() const
360 {
361   return _work_directory;
362 }
363
364 std::string
365 Launcher::Job::getLocalDirectory() const
366 {
367   return _local_directory;
368 }
369
370 std::string
371 Launcher::Job::getResultDirectory() const
372 {
373   return _result_directory;
374 }
375
376 const std::list<std::string> &
377 Launcher::Job::get_in_files() const
378 {
379   return _in_files;
380 }
381
382 const std::list<std::string> &
383 Launcher::Job::get_out_files() const
384 {
385   return _out_files;
386 }
387
388 std::string
389 Launcher::Job::getMaximumDuration() const
390 {
391   return _maximum_duration;
392 }
393
394 // For COORM
395 std::string
396 Launcher::Job::getLauncherFile() const
397 {
398         return _launcher_file;
399 }
400 std::string
401 Launcher::Job::getLauncherArgs() const
402 {
403         return _launcher_args;
404 }
405
406 resourceParams
407 Launcher::Job::getResourceRequiredParams() const
408 {
409   return _resource_required_params;
410 }
411
412 std::string
413 Launcher::Job::getQueue() const
414 {
415   return _queue;
416 }
417
418 std::string
419 Launcher::Job::getPartition() const
420 {
421   return _partition;
422 }
423
424 bool
425 Launcher::Job::getExclusive() const
426 {
427   return _exclusive;
428 }
429
430 std::string
431 Launcher::Job::getExclusiveStr() const
432 {
433   return _exclusive ? "true" : "false";
434 }
435
436 unsigned long
437 Launcher::Job::getMemPerCpu() const
438 {
439   return _mem_per_cpu;
440 }
441
442 std::string
443 Launcher::Job::getWCKey() const
444 {
445   return _wckey;
446 }
447
448 std::string
449 Launcher::Job::getExtraParams() const
450 {
451   return _extra_params;
452 }
453
454 std::string
455 Launcher::Job::getReference() const
456 {
457   return _reference;
458 }
459
460 void
461 Launcher::Job::setPreCommand(const std::string & preCommand)
462 {
463   _pre_command = preCommand;
464 }
465
466 std::string
467 Launcher::Job::getPreCommand() const
468 {
469   return _pre_command;
470 }
471
472 void
473 Launcher::Job::checkMaximumDuration(const std::string & maximum_duration)
474 {
475   std::string result("");
476   std::string edt_value = maximum_duration;
477   std::size_t pos = edt_value.find(":");
478
479   if (edt_value != "") {
480     if (pos == edt_value.npos) {
481       throw LauncherException("[Launcher::Job::checkMaximumDuration] Error on definition: " + edt_value);
482     }
483     std::string begin_edt_value = edt_value.substr(0, pos);
484     std::string mid_edt_value = edt_value.substr(pos, 1);
485     std::string end_edt_value = edt_value.substr(pos + 1, edt_value.npos);
486
487     long value;
488     std::istringstream iss(begin_edt_value);
489     if (!(iss >> value)) {
490       result = "[Launcher::Job::checkExpectedDuration] Error on definition ! : " + edt_value;
491     }
492     else if (value < 0) {
493       result = "[Launcher::Job::checkExpectedDuration] Error on definition time is negative ! : " + value;
494     }
495     std::istringstream iss_2(end_edt_value);
496     if (!(iss_2 >> value)) {
497       result = "[Launcher::Job::checkExpectedDuration] Error on definition ! : " + edt_value;
498     }
499     else if (value < 0) {
500       result = "[Launcher::Job::checkExpectedDuration] Error on definition time is negative ! : " + value;
501     }
502     if (mid_edt_value != ":") {
503       result = "[Launcher::Job::checkExpectedDuration] Error on definition ! :" + edt_value;
504     }
505   }
506   if (result != "")
507     throw LauncherException(result);
508 }
509
510 void
511 Launcher::Job::checkResourceRequiredParams(const resourceParams & resource_required_params)
512 {
513   // nb_proc has be to > 0
514   if (resource_required_params.nb_proc <= 0)
515   {
516     std::string message("[Launcher::Job::checkResourceRequiredParams] proc number is not > 0 ! ");
517     throw LauncherException(message);
518   }
519 }
520
521 long
522 Launcher::Job::convertMaximumDuration(const std::string & edt)
523 {
524   long hh, mm, ret;
525
526   if( edt.size() == 0 )
527     return -1;
528
529   std::string::size_type pos = edt.find(":");
530   std::string h = edt.substr(0,pos);
531   std::string m = edt.substr(pos+1,edt.size()-pos+1);
532   std::istringstream issh(h);
533   issh >> hh;
534   std::istringstream issm(m);
535   issm >> mm;
536   ret = hh*60 + mm;
537   ret = ret * 60;
538
539   return ret;
540 }
541
542 std::string
543 Launcher::Job::getLaunchDate() const
544 {
545   time_t rawtime;
546   time(&rawtime);
547   std::string launch_date = ctime(&rawtime);
548   size_t i = 0 ;
549   for (;i < launch_date.size(); i++)
550     if (launch_date[i] == '/' ||
551         launch_date[i] == '-' ||
552         launch_date[i] == ':' ||
553         launch_date[i] == ' ')
554       launch_date[i] = '_';
555   launch_date.erase(--launch_date.end()); // Last character is a \n
556
557   return launch_date;
558 }
559
560 std::string
561 Launcher::Job::updateJobState()
562 {
563
564   if (_state != "FINISHED" &&
565       _state != "ERROR"    &&
566       _state != "FAILED")
567   {
568 #ifdef WITH_LIBBATCH
569     if (_batch_job_id.getReference() != "undefined")
570     {
571       // A batch manager has been affected to the job
572       Batch::JobInfo job_info = _batch_job_id.queryJob();
573       Batch::Parametre par = job_info.getParametre();
574       _state = par[Batch::STATE].str();
575       _assigned_hostnames = (par.find(Batch::ASSIGNEDHOSTNAMES) == par.end())?
576                             "" : par[Batch::ASSIGNEDHOSTNAMES].str();
577       LAUNCHER_MESSAGE("State received is: " << par[Batch::STATE].str());
578     }
579 #endif
580   }
581   return _state;
582 }
583
584 #ifdef WITH_LIBBATCH
585 Batch::Job *
586 Launcher::Job::getBatchJob()
587 {
588   update_job();
589   return _batch_job;
590 }
591
592 Batch::Parametre
593 Launcher::Job::common_job_params()
594 {
595   Batch::Parametre params;
596
597   params[Batch::NAME] = getJobName();
598   params[Batch::NBPROC] = _resource_required_params.nb_proc;
599   params[Batch::NBPROCPERNODE] = _resource_required_params.nb_proc_per_node;
600
601   if(_resource_required_params.nb_node > 0)
602     params[Batch::NBNODE] = _resource_required_params.nb_node;
603
604   // Memory in megabytes
605   if (_resource_required_params.mem_mb > 0)
606   {
607     params[Batch::MAXRAMSIZE] = _resource_required_params.mem_mb;
608   }
609   else if (_mem_per_cpu > 0)
610   {
611     params[Batch::MEMPERCPU] = (long)_mem_per_cpu;
612   }
613
614   // We define a default directory
615   if (_work_directory == "")
616   {
617     const size_t BUFSIZE = 32;
618     char date[BUFSIZE];
619     time_t curtime = time(NULL);
620     strftime(date, BUFSIZE, "%Y_%m_%d__%H_%M_%S", localtime(&curtime));
621     if(!_resource_definition.working_directory.empty())
622     {
623       std::string date_dir = std::string("/job_") + date;
624       std::ostringstream str_pid;
625       str_pid << ::getpid();
626       std::string job_dir = date_dir + "-" + str_pid.str();
627
628       _work_directory = _resource_definition.working_directory + job_dir;
629     }
630     else
631     {
632       _work_directory = std::string("/$HOME/Batch/workdir_");
633       _work_directory += date;
634     }
635   }
636   params[Batch::WORKDIR] = _work_directory;
637   std::string libbatch_pre_command("");
638   if(!_pre_command.empty())
639   {
640     boost::filesystem::path pre_command_path(_pre_command);
641     libbatch_pre_command += "./" + pre_command_path.filename().string();
642   }
643   params[Batch::PREPROCESS] = libbatch_pre_command;
644
645   // Parameters for COORM
646   params[Batch::LAUNCHER_FILE] = _launcher_file;
647   params[Batch::LAUNCHER_ARGS] = _launcher_args;
648
649   // If result_directory is not defined, we use HOME environnement
650   if (_result_directory == "")
651     _result_directory = getenv("HOME");
652
653   // _in_files
654   std::list<std::string> in_files(_in_files);
655   in_files.push_back(_job_file);
656   if (_env_file != "")
657           in_files.push_back(_env_file);
658   if(!_pre_command.empty())
659      in_files.push_back(_pre_command);
660   for(std::list<std::string>::iterator it = in_files.begin(); it != in_files.end(); it++)
661   {
662     std::string file = *it;
663
664     // local file -> If file is not an absolute path, we apply _local_directory
665     std::string local_file;
666     if (file.substr(0, 1) == std::string("/"))
667       local_file = file;
668     else if (file.substr(0, 1) == std::string("-")) // using rsync options
669       local_file = file;
670     else
671 #ifndef WIN32
672       // '/./' is used by rsync to find the root of the relative path
673       // /a/b/./c/f -> _working_directory/c/f
674       local_file = _local_directory + "/./" + file;
675 #else
676       local_file = file;
677 #endif
678
679     // remote file -> get only file name from in_files
680     std::string remote_file = _work_directory + "/";
681
682     params[Batch::INFILE] += Batch::Couple(local_file, remote_file);
683   }
684
685   // _out_files
686   for(std::list<std::string>::iterator it = _out_files.begin(); it != _out_files.end(); it++)
687   {
688     std::string file = *it;
689     // remote file -> If file is not an absolute path, we apply _work_directory
690     std::string remote_file;
691     std::string local_file;
692     if (file.substr(0, 1) == std::string("/"))
693     {
694       remote_file = file;
695       size_t found = file.find_last_of("/");
696       local_file = file.substr(found+1);
697     }
698     else if (file.substr(0, 1) == std::string("-")) // using rsync options
699     {
700       remote_file = file;
701       local_file = "";
702     }
703     else
704     {
705       // '/./' is used by rsync to find the root of the relative path
706       remote_file = _work_directory + "/./" + file;
707       local_file = "";
708     }
709
710     params[Batch::OUTFILE] += Batch::Couple(local_file, remote_file);
711   }
712
713   // Time
714   if (_maximum_duration_in_second != -1)
715     params[Batch::MAXWALLTIME] = _maximum_duration_in_second / 60;
716
717   // Queue
718   if (_queue != "")
719     params[Batch::QUEUE] = _queue;
720
721   // Partition
722   if (_partition != "")
723     params[Batch::PARTITION] = _partition;
724
725   // Exclusive
726   if (getExclusive())
727     params[Batch::EXCLUSIVE] = true;
728
729   // WC Key
730   if (_wckey != "")
731     params[Batch::WCKEY] = _wckey;
732
733   // Extra params
734   if (_extra_params != "")
735     params[Batch::EXTRAPARAMS] = _extra_params;
736
737   // Specific parameters
738   std::map<std::string, std::string>::iterator it = _specific_parameters.find("LoalLevelerJobType");
739   if (it != _specific_parameters.end())
740     params["LL_JOBTYPE"] = it->second;
741   return params;
742 }
743
744 void
745 Launcher::Job::setBatchManagerJobId(Batch::JobId batch_manager_job_id)
746 {
747   _batch_job_id = batch_manager_job_id;
748 }
749
750 Batch::JobId
751 Launcher::Job::getBatchManagerJobId() const
752 {
753   return _batch_job_id;
754 }
755 #endif
756
757 void
758 Launcher::Job::addSpecificParameter(const std::string & name,
759                                       const std::string & value)
760 {
761   _specific_parameters[name] = value;
762 }
763
764 const std::map<std::string, std::string> &
765 Launcher::Job::getSpecificParameters() const
766 {
767   return _specific_parameters;
768 }
769
770 void
771 Launcher::Job::checkSpecificParameters()
772 {
773 }