Salome HOME
Merge branch 'omu/multijob'
[modules/kernel.git] / src / Launcher / Launcher_Job.cxx
1 // Copyright (C) 2009-2020  CEA/DEN, EDF R&D, OPEN CASCADE
2 //
3 // This library is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU Lesser General Public
5 // License as published by the Free Software Foundation; either
6 // version 2.1 of the License, or (at your option) any later version.
7 //
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
11 // Lesser General Public License for more details.
12 //
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
16 //
17 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
18 //
19
20 // Author: AndrĂ© RIBES - EDF R&D
21 //
22 //#define _DEBUG_
23 #include "Launcher_Job.hxx"
24 #include "Launcher.hxx"
25
26 #include <Basics_DirUtils.hxx>
27
28 #include <boost/filesystem.hpp>
29
30 #ifdef WITH_LIBBATCH
31 #include <libbatch/Constants.hxx>
32 #endif
33
34 #include <sstream>
35 #ifdef WIN32
36   static const char SEPARATOR = '\\';
37 #include <process.h>
38 #else
39   static const char SEPARATOR = '/';
40 #endif
41
42 Launcher::Job::Job()
43 {
44   _number = -1;
45   _state = "CREATED";
46   _launch_date = getLaunchDate();
47
48   _env_file = "";
49   _job_name = "";
50   _job_file = "";
51   _job_file_name = "";
52   _job_file_name_complete = "";
53   _pre_command = "";
54   _work_directory = "";
55   _local_directory = "";
56   _result_directory = "";
57   _maximum_duration = "";
58   _maximum_duration_in_second = -1;
59   _queue = "";
60   _partition = "";
61   _job_type = "";
62   _exclusive = false;
63   _mem_per_cpu = 0;
64
65   // Parameters for COORM
66   _launcher_file = "";
67   _launcher_args = "";
68
69 #ifdef WITH_LIBBATCH
70   _batch_job = new Batch::Job();
71 #endif
72 }
73
74 Launcher::Job::~Job()
75 {
76   LAUNCHER_MESSAGE("Deleting job number: " << _number);
77 #ifdef WITH_LIBBATCH
78   if (_batch_job)
79     delete _batch_job;
80 #endif
81 }
82
83 void
84 Launcher::Job::stopJob()
85 {
86   LAUNCHER_MESSAGE("Stop resquested for job number: " << _number);
87   setState("FAILED");
88 #ifdef WITH_LIBBATCH
89   if (_batch_job_id.getReference() != "undefined")
90   {
91     try
92     {
93       _batch_job_id.deleteJob();
94     }
95     catch (const Batch::GenericException &ex)
96     {
97       LAUNCHER_INFOS("WARNING: exception when stopping the job: " << ex.message);
98     }
99   }
100 #endif
101 }
102
103
104 void
105 Launcher::Job::removeJob()
106 {
107   LAUNCHER_MESSAGE("Removing job number: " << _number);
108 #ifdef WITH_LIBBATCH
109   if (_batch_job_id.getReference() != "undefined")
110   {
111     try
112     {
113       _batch_job_id.deleteJob();
114     }
115     catch (const Batch::GenericException &ex)
116     {
117       LAUNCHER_INFOS("WARNING: exception when removing the job: " << ex.message);
118     }
119   }
120 #endif
121 }
122
123 std::string
124 Launcher::Job::getJobType() const
125 {
126   return _job_type;
127 }
128
129 void
130 Launcher::Job::setJobName(const std::string & job_name)
131 {
132   _job_name = job_name;
133 }
134
135 std::string
136 Launcher::Job::getJobName() const
137 {
138   return _job_name;
139 }
140
141 void
142 Launcher::Job::setState(const std::string & state)
143 {
144   // State of a Job: CREATED, QUEUED, RUNNING, FINISHED, FAILED
145   if (state != "CREATED" &&
146       state != "IN_PROCESS" &&
147       state != "QUEUED" &&
148       state != "RUNNING" &&
149       state != "PAUSED" &&
150       state != "FINISHED" &&
151       state != "FAILED" &&
152       state != "ERROR")
153   {
154     throw LauncherException("Bad state, this state does not exist: " + state);
155   }
156   _state = state;
157 }
158
159 std::string
160 Launcher::Job::getState() const
161 {
162   return _state;
163 }
164
165 // Get names or ids of hosts assigned to the job
166 std::string
167 Launcher::Job::getAssignedHostnames()
168 {
169   return _assigned_hostnames;
170 }
171
172 void
173 Launcher::Job::setNumber(const int & number)
174 {
175   if (_number != -1)
176     std::cerr << "Launcher::Job::setNumber -- Job number was already defined, before: " << _number << " now: " << number << std::endl;
177   _number = number;
178 }
179
180 int
181 Launcher::Job::getNumber()
182 {
183   return _number;
184 }
185
186 void
187 Launcher::Job::setResourceDefinition(const ParserResourcesType & resource_definition)
188 {
189   // Check machine_definition
190   std::string user_name = "";
191   if (resource_definition.UserName == "")
192   {
193 #ifndef WIN32
194     user_name = getenv("USER");
195 #else
196     user_name = getenv("USERNAME");
197 #endif
198     if (user_name == "")
199       user_name = getenv("LOGNAME");
200     if (user_name == "")
201     {
202       std::string mess = "You must define a user name: into your resource description or with one of env variables USER/LOGNAME";
203       throw LauncherException(mess);
204     }
205   }
206   else
207     user_name = resource_definition.UserName;
208
209   _resource_definition = resource_definition;
210   _resource_definition.UserName = user_name;
211 }
212
213 ParserResourcesType
214 Launcher::Job::getResourceDefinition() const
215 {
216   return _resource_definition;
217 }
218
219 void
220 Launcher::Job::setJobFile(const std::string & job_file)
221 {
222   // Check job file
223   if (job_file == "")
224   {
225     std::string mess = "Empty Job File is forbidden !";
226     throw LauncherException(mess);
227   }
228
229   _job_file = job_file;
230   std::string::size_type p1 = _job_file.find_last_of(SEPARATOR);
231   std::string::size_type p2 = _job_file.find_last_of(".");
232   _job_file_name_complete = _job_file.substr(p1+1);
233   _job_file_name = _job_file.substr(p1+1,p2-p1-1);
234 }
235
236 std::string
237 Launcher::Job::getJobFile() const
238 {
239   return _job_file;
240 }
241 void
242 Launcher::Job::setEnvFile(const std::string & env_file)
243 {
244   _env_file = env_file;
245 }
246
247 std::string
248 Launcher::Job::getEnvFile() const
249 {
250   return _env_file;
251 }
252
253 void
254 Launcher::Job::setWorkDirectory(const std::string & work_directory)
255 {
256   _work_directory = work_directory;
257 }
258
259 void
260 Launcher::Job::setLocalDirectory(const std::string & local_directory)
261 {
262   _local_directory = local_directory;
263 }
264
265 void
266 Launcher::Job::setResultDirectory(const std::string & result_directory)
267 {
268   _result_directory = result_directory;
269 }
270
271 void
272 Launcher::Job::add_in_file(const std::string & file)
273 {
274   std::list<std::string>::iterator it = std::find(_in_files.begin(), _in_files.end(), file);
275   if (it == _in_files.end())
276     _in_files.push_back(file);
277   else
278     std::cerr << "Launcher::Job::add_in_file -- Warning file was already entered in in_files: " << file << std::endl;
279 }
280
281 void
282 Launcher::Job::add_out_file(const std::string & file)
283 {
284   std::list<std::string>::iterator it = std::find(_out_files.begin(), _out_files.end(), file);
285   if (it == _out_files.end())
286     _out_files.push_back(file);
287   else
288     std::cerr << "Launcher::Job::add_out_file -- Warning file was already entered in out_files: " << file << std::endl;
289 }
290
291 void
292 Launcher::Job::setMaximumDuration(const std::string & maximum_duration)
293 {
294   checkMaximumDuration(maximum_duration);
295   _maximum_duration_in_second = convertMaximumDuration(maximum_duration);
296   _maximum_duration = maximum_duration;
297 }
298
299 // For COORM
300 void
301 Launcher::Job::setLauncherFile(const std::string & launcher_file)
302 {
303         _launcher_file = launcher_file;
304 }
305 void
306 Launcher::Job::setLauncherArgs(const std::string & launcher_args)
307 {
308         _launcher_args = launcher_args;
309 }
310
311 void
312 Launcher::Job::setResourceRequiredParams(const resourceParams & resource_required_params)
313 {
314   checkResourceRequiredParams(resource_required_params);
315   _resource_required_params = resource_required_params;
316 }
317
318 void
319 Launcher::Job::setQueue(const std::string & queue)
320 {
321   _queue = queue;
322 }
323
324 void
325 Launcher::Job::setPartition(const std::string & partition)
326 {
327   _partition = partition;
328 }
329
330 void
331 Launcher::Job::setExclusive(bool exclusive)
332 {
333   _exclusive = exclusive;
334 }
335
336 void
337 Launcher::Job::setExclusiveStr(const std::string & exclusiveStr)
338 {
339   if (exclusiveStr == "true")
340     _exclusive = true;
341   else if (exclusiveStr == "false")
342     _exclusive = false;
343   else
344     throw LauncherException(std::string("Invalid boolean value for exclusive: ") + exclusiveStr);
345 }
346
347 void
348 Launcher::Job::setMemPerCpu(unsigned long mem_per_cpu)
349 {
350   _mem_per_cpu = mem_per_cpu;
351 }
352
353 void
354 Launcher::Job::setWCKey(const std::string & wckey)
355 {
356   _wckey = wckey;
357 }
358
359 void
360 Launcher::Job::setExtraParams(const std::string & extra_params)
361 {
362   _extra_params = extra_params;
363 }
364
365 void
366 Launcher::Job::setReference(const std::string & reference)
367 {
368   _reference = reference;
369 }
370
371 std::string
372 Launcher::Job::getWorkDirectory() const
373 {
374   return _work_directory;
375 }
376
377 std::string
378 Launcher::Job::getLocalDirectory() const
379 {
380   return _local_directory;
381 }
382
383 std::string
384 Launcher::Job::getResultDirectory() const
385 {
386   return _result_directory;
387 }
388
389 const std::list<std::string> &
390 Launcher::Job::get_in_files() const
391 {
392   return _in_files;
393 }
394
395 const std::list<std::string> &
396 Launcher::Job::get_out_files() const
397 {
398   return _out_files;
399 }
400
401 std::string
402 Launcher::Job::getMaximumDuration() const
403 {
404   return _maximum_duration;
405 }
406
407 // For COORM
408 std::string
409 Launcher::Job::getLauncherFile() const
410 {
411         return _launcher_file;
412 }
413 std::string
414 Launcher::Job::getLauncherArgs() const
415 {
416         return _launcher_args;
417 }
418
419 resourceParams
420 Launcher::Job::getResourceRequiredParams() const
421 {
422   return _resource_required_params;
423 }
424
425 std::string
426 Launcher::Job::getQueue() const
427 {
428   return _queue;
429 }
430
431 std::string
432 Launcher::Job::getPartition() const
433 {
434   return _partition;
435 }
436
437 bool
438 Launcher::Job::getExclusive() const
439 {
440   return _exclusive;
441 }
442
443 std::string
444 Launcher::Job::getExclusiveStr() const
445 {
446   return _exclusive ? "true" : "false";
447 }
448
449 unsigned long
450 Launcher::Job::getMemPerCpu() const
451 {
452   return _mem_per_cpu;
453 }
454
455 std::string
456 Launcher::Job::getWCKey() const
457 {
458   return _wckey;
459 }
460
461 std::string
462 Launcher::Job::getExtraParams() const
463 {
464   return _extra_params;
465 }
466
467 std::string
468 Launcher::Job::getReference() const
469 {
470   return _reference;
471 }
472
473 void
474 Launcher::Job::setPreCommand(const std::string & preCommand)
475 {
476   _pre_command = preCommand;
477 }
478
479 std::string
480 Launcher::Job::getPreCommand() const
481 {
482   return _pre_command;
483 }
484
485 void
486 Launcher::Job::checkMaximumDuration(const std::string & maximum_duration)
487 {
488   std::string result("");
489   std::string edt_value = maximum_duration;
490   std::size_t pos = edt_value.find(":");
491
492   if (edt_value != "") {
493     if (pos == edt_value.npos) {
494       throw LauncherException("[Launcher::Job::checkMaximumDuration] Error on definition: " + edt_value);
495     }
496     std::string begin_edt_value = edt_value.substr(0, pos);
497     std::string mid_edt_value = edt_value.substr(pos, 1);
498     std::string end_edt_value = edt_value.substr(pos + 1, edt_value.npos);
499
500     long value;
501     std::istringstream iss(begin_edt_value);
502     if (!(iss >> value)) {
503       result = "[Launcher::Job::checkExpectedDuration] Error on definition ! : " + edt_value;
504     }
505     else if (value < 0) {
506       result = "[Launcher::Job::checkExpectedDuration] Error on definition time is negative ! : " + value;
507     }
508     std::istringstream iss_2(end_edt_value);
509     if (!(iss_2 >> value)) {
510       result = "[Launcher::Job::checkExpectedDuration] Error on definition ! : " + edt_value;
511     }
512     else if (value < 0) {
513       result = "[Launcher::Job::checkExpectedDuration] Error on definition time is negative ! : " + value;
514     }
515     if (mid_edt_value != ":") {
516       result = "[Launcher::Job::checkExpectedDuration] Error on definition ! :" + edt_value;
517     }
518   }
519   if (result != "")
520     throw LauncherException(result);
521 }
522
523 void
524 Launcher::Job::checkResourceRequiredParams(const resourceParams & resource_required_params)
525 {
526   // nb_proc has be to > 0
527   if (resource_required_params.nb_proc <= 0)
528   {
529     std::string message("[Launcher::Job::checkResourceRequiredParams] proc number is not > 0 ! ");
530     throw LauncherException(message);
531   }
532 }
533
534 long
535 Launcher::Job::convertMaximumDuration(const std::string & edt)
536 {
537   long hh, mm, ret;
538
539   if( edt.size() == 0 )
540     return -1;
541
542   std::string::size_type pos = edt.find(":");
543   std::string h = edt.substr(0,pos);
544   std::string m = edt.substr(pos+1,edt.size()-pos+1);
545   std::istringstream issh(h);
546   issh >> hh;
547   std::istringstream issm(m);
548   issm >> mm;
549   ret = hh*60 + mm;
550   ret = ret * 60;
551
552   return ret;
553 }
554
555 std::string
556 Launcher::Job::getLaunchDate() const
557 {
558   time_t rawtime;
559   time(&rawtime);
560   std::string launch_date = ctime(&rawtime);
561   size_t i = 0 ;
562   for (;i < launch_date.size(); i++)
563     if (launch_date[i] == '/' ||
564         launch_date[i] == '-' ||
565         launch_date[i] == ':' ||
566         launch_date[i] == ' ')
567       launch_date[i] = '_';
568   launch_date.erase(--launch_date.end()); // Last character is a \n
569
570   return launch_date;
571 }
572
573 std::string
574 Launcher::Job::updateJobState()
575 {
576
577   if (_state != "FINISHED" &&
578       _state != "ERROR"    &&
579       _state != "FAILED")
580   {
581 #ifdef WITH_LIBBATCH
582     if (_batch_job_id.getReference() != "undefined")
583     {
584       // A batch manager has been affected to the job
585       Batch::JobInfo job_info = _batch_job_id.queryJob();
586       Batch::Parametre par = job_info.getParametre();
587       _state = par[Batch::STATE].str();
588       _assigned_hostnames = (par.find(Batch::ASSIGNEDHOSTNAMES) == par.end())?
589                             "" : par[Batch::ASSIGNEDHOSTNAMES].str();
590       LAUNCHER_MESSAGE("State received is: " << par[Batch::STATE].str());
591     }
592 #endif
593   }
594   return _state;
595 }
596
597 #ifdef WITH_LIBBATCH
598 Batch::Job *
599 Launcher::Job::getBatchJob()
600 {
601   update_job();
602   return _batch_job;
603 }
604
605 Batch::Parametre
606 Launcher::Job::common_job_params()
607 {
608   Batch::Parametre params;
609
610   params[Batch::NAME] = getJobName();
611   params[Batch::NBPROC] = _resource_required_params.nb_proc;
612   if(_resource_required_params.nb_proc_per_node > 0)
613     params[Batch::NBPROCPERNODE] = _resource_required_params.nb_proc_per_node;
614
615   if(_resource_required_params.nb_node > 0)
616     params[Batch::NBNODE] = _resource_required_params.nb_node;
617
618   // Memory in megabytes
619   if (_resource_required_params.mem_mb > 0)
620   {
621     params[Batch::MAXRAMSIZE] = _resource_required_params.mem_mb;
622   }
623   else if (_mem_per_cpu > 0)
624   {
625     params[Batch::MEMPERCPU] = (long)_mem_per_cpu;
626   }
627
628   // We define a default directory
629   if (_work_directory == "")
630   {
631     const size_t BUFSIZE = 32;
632     char date[BUFSIZE];
633     time_t curtime = time(NULL);
634     strftime(date, BUFSIZE, "%Y_%m_%d__%H_%M_%S", localtime(&curtime));
635     if(!_resource_definition.working_directory.empty())
636     {
637       std::string date_dir = std::string("/job_") + date;
638       std::ostringstream str_pid;
639 #ifdef WIN32
640           str_pid << _getpid();
641 #else
642       str_pid << ::getpid();
643 #endif
644       std::string job_dir = date_dir + "-" + str_pid.str();
645
646       _work_directory = _resource_definition.working_directory + job_dir;
647     }
648     else
649     {
650 #ifndef WIN32
651       _work_directory = std::string("/$HOME/Batch/workdir_");
652 #else
653       _work_directory = std::string("%USERPROFILE%\\Batch\\workdir_");
654 #endif
655       _work_directory += date;
656     }
657   }
658   params[Batch::WORKDIR] = _work_directory;
659   std::string libbatch_pre_command("");
660   if(!_pre_command.empty())
661   {
662     boost::filesystem::path pre_command_path(_pre_command);
663     libbatch_pre_command += "./" + pre_command_path.filename().string();
664   }
665   params[Batch::PREPROCESS] = libbatch_pre_command;
666
667   // Parameters for COORM
668   params[Batch::LAUNCHER_FILE] = _launcher_file;
669   params[Batch::LAUNCHER_ARGS] = _launcher_args;
670
671   // If result_directory is not defined, we use HOME environment
672   if (_result_directory == "")
673     _result_directory = Kernel_Utils::HomePath();
674
675   // _in_files
676   std::list<std::string> in_files(_in_files);
677   in_files.push_back(_job_file);
678   if (_env_file != "")
679           in_files.push_back(_env_file);
680   if(!_pre_command.empty())
681      in_files.push_back(_pre_command);
682   for(std::list<std::string>::iterator it = in_files.begin(); it != in_files.end(); it++)
683   {
684     std::string file = *it;
685
686     // local file -> If file is not an absolute path, we apply _local_directory
687     std::string local_file;
688 #ifndef WIN32
689     if (file.substr(0, 1) == std::string("/"))
690 #else
691     // On Windows, absolute paths may begin with something like "C:"
692     if (file.substr(1, 1) == std::string(":"))
693 #endif
694       local_file = file;
695     else if (file.substr(0, 1) == std::string("-")) // using rsync options
696       local_file = file;
697     else
698 #ifndef WIN32
699       // '/./' is used by rsync to find the root of the relative path
700       // /a/b/./c/f -> _working_directory/c/f
701       local_file = _local_directory + "/./" + file;
702 #else
703       local_file = _local_directory + SEPARATOR + file;
704 #endif
705
706     // remote file -> get only file name from in_files
707     std::string remote_file = _work_directory + "/";
708
709     params[Batch::INFILE] += Batch::Couple(local_file, remote_file);
710   }
711
712   // _out_files
713   for(std::list<std::string>::iterator it = _out_files.begin(); it != _out_files.end(); it++)
714   {
715     std::string file = *it;
716     // remote file -> If file is not an absolute path, we apply _work_directory
717     std::string remote_file;
718     std::string local_file;
719     if (file.substr(0, 1) == std::string("/"))
720     {
721       remote_file = file;
722       size_t found = file.find_last_of("/");
723       local_file = file.substr(found+1);
724     }
725     else if (file.substr(0, 1) == std::string("-")) // using rsync options
726     {
727       remote_file = file;
728       local_file = "";
729     }
730     else
731     {
732       // '/./' is used by rsync to find the root of the relative path
733       remote_file = _work_directory + "/./" + file;
734       local_file = "";
735     }
736
737     params[Batch::OUTFILE] += Batch::Couple(local_file, remote_file);
738   }
739
740   // Time
741   if (_maximum_duration_in_second != -1)
742     params[Batch::MAXWALLTIME] = _maximum_duration_in_second / 60;
743
744   // Queue
745   if (_queue != "")
746     params[Batch::QUEUE] = _queue;
747
748   // Partition
749   if (_partition != "")
750     params[Batch::PARTITION] = _partition;
751
752   // Exclusive
753   if (getExclusive())
754     params[Batch::EXCLUSIVE] = true;
755
756   // WC Key
757   if (_wckey != "")
758     params[Batch::WCKEY] = _wckey;
759
760   // Extra params
761   if (_extra_params != "")
762     params[Batch::EXTRAPARAMS] = _extra_params;
763
764   // Specific parameters
765   std::map<std::string, std::string>::iterator it = _specific_parameters.find("LoalLevelerJobType");
766   if (it != _specific_parameters.end())
767     params["LL_JOBTYPE"] = it->second;
768   return params;
769 }
770
771 void
772 Launcher::Job::setBatchManagerJobId(Batch::JobId batch_manager_job_id)
773 {
774   _batch_job_id = batch_manager_job_id;
775 }
776
777 Batch::JobId
778 Launcher::Job::getBatchManagerJobId() const
779 {
780   return _batch_job_id;
781 }
782 #endif
783
784 void
785 Launcher::Job::addSpecificParameter(const std::string & name,
786                                       const std::string & value)
787 {
788   _specific_parameters[name] = value;
789 }
790
791 const std::map<std::string, std::string> &
792 Launcher::Job::getSpecificParameters() const
793 {
794   return _specific_parameters;
795 }
796
797 void
798 Launcher::Job::checkSpecificParameters()
799 {
800 }