Salome HOME
[EDF30356] : Extend management of maximum_time attribute format from pylauncher to...
[modules/kernel.git] / src / Launcher / Launcher_Job.cxx
1 // Copyright (C) 2009-2024  CEA, EDF, OPEN CASCADE
2 //
3 // This library is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU Lesser General Public
5 // License as published by the Free Software Foundation; either
6 // version 2.1 of the License, or (at your option) any later version.
7 //
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
11 // Lesser General Public License for more details.
12 //
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
16 //
17 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
18 //
19
20 // Author: AndrĂ© RIBES - EDF R&D
21 //
22 //#define _DEBUG_
23 #include "Launcher_Job.hxx"
24 #include "Launcher.hxx"
25
26 #include <Basics_DirUtils.hxx>
27
28 #include <boost/filesystem.hpp>
29
30 #ifdef WITH_LIBBATCH
31 #include <libbatch/Constants.hxx>
32 #endif
33
34 #include <sstream>
35 #ifdef WIN32
36   static const char SEPARATOR = '\\';
37 #include <process.h>
38 #else
39   static const char SEPARATOR = '/';
40 #include <pwd.h>
41 #endif
42
43 Launcher::Job::Job()
44 {
45   _number = -1;
46   _state = "CREATED";
47   _launch_date = getLaunchDate();
48
49   _env_file = "";
50   _job_name = "";
51   _job_file = "";
52   _job_file_name = "";
53   _job_file_name_complete = "";
54   _pre_command = "";
55   _work_directory = "";
56   _local_directory = "";
57   _result_directory = "";
58   _maximum_duration = "";
59   _maximum_duration_in_second = -1;
60   _queue = "";
61   _partition = "";
62   _job_type = "";
63   _exclusive = false;
64   _mem_per_cpu = 0;
65
66   // Parameters for COORM
67   _launcher_file = "";
68   _launcher_args = "";
69
70 #ifdef WITH_LIBBATCH
71   _batch_job = new Batch::Job();
72 #endif
73 }
74
75 Launcher::Job::~Job()
76 {
77   LAUNCHER_MESSAGE("Deleting job number: " << _number);
78 #ifdef WITH_LIBBATCH
79   if (_batch_job)
80     delete _batch_job;
81 #endif
82 }
83
84 void
85 Launcher::Job::stopJob()
86 {
87   LAUNCHER_MESSAGE("Stop resquested for job number: " << _number);
88   setState("FAILED");
89 #ifdef WITH_LIBBATCH
90   if (_batch_job_id.getReference() != "undefined")
91   {
92     try
93     {
94       _batch_job_id.deleteJob();
95     }
96     catch (const Batch::GenericException &ex)
97     {
98       LAUNCHER_INFOS("WARNING: exception when stopping the job: " << ex.message);
99     }
100   }
101 #endif
102 }
103
104
105 void
106 Launcher::Job::removeJob()
107 {
108   LAUNCHER_MESSAGE("Removing job number: " << _number);
109 #ifdef WITH_LIBBATCH
110   if (_batch_job_id.getReference() != "undefined")
111   {
112     try
113     {
114       _batch_job_id.deleteJob();
115     }
116     catch (const Batch::GenericException &ex)
117     {
118       LAUNCHER_INFOS("WARNING: exception when removing the job: " << ex.message);
119     }
120   }
121 #endif
122 }
123
124 std::string
125 Launcher::Job::getJobType() const
126 {
127   return _job_type;
128 }
129
130 void
131 Launcher::Job::setJobName(const std::string & job_name)
132 {
133   _job_name = job_name;
134 }
135
136 std::string
137 Launcher::Job::getJobName() const
138 {
139   return _job_name;
140 }
141
142 void
143 Launcher::Job::setState(const std::string & state)
144 {
145   // State of a Job: CREATED, QUEUED, RUNNING, FINISHED, FAILED
146   if (state != "CREATED" &&
147       state != "IN_PROCESS" &&
148       state != "QUEUED" &&
149       state != "RUNNING" &&
150       state != "PAUSED" &&
151       state != "FINISHED" &&
152       state != "FAILED" &&
153       state != "ERROR")
154   {
155     throw LauncherException("Bad state, this state does not exist: " + state);
156   }
157   _state = state;
158 }
159
160 std::string
161 Launcher::Job::getState() const
162 {
163   return _state;
164 }
165
166 // Get names or ids of hosts assigned to the job
167 std::string
168 Launcher::Job::getAssignedHostnames()
169 {
170   return _assigned_hostnames;
171 }
172
173 void
174 Launcher::Job::setNumber(const int & number)
175 {
176   if (_number != -1)
177     std::cerr << "Launcher::Job::setNumber -- Job number was already defined, before: " << _number << " now: " << number << std::endl;
178   _number = number;
179 }
180
181 int
182 Launcher::Job::getNumber()
183 {
184   return _number;
185 }
186
187 void
188 Launcher::Job::setResourceDefinition(const ParserResourcesType & resource_definition)
189 {
190   // Check machine_definition
191   std::string user_name = "";
192   if (resource_definition.UserName == "")
193   {
194 #ifndef WIN32
195     struct passwd *pwd = getpwuid(getuid());
196     if (pwd) {
197       user_name = std::string(pwd->pw_name);
198     }
199     if (user_name == "")
200       user_name = getenv("USER");
201 #else
202     user_name = getenv("USERNAME");
203 #endif
204     if (user_name == "")
205       user_name = getenv("LOGNAME");
206     if (user_name == "")
207     {
208       std::string mess = "You must define a user name: into your resource description or with one of env variables USER/LOGNAME";
209       throw LauncherException(mess);
210     }
211   }
212   else
213     user_name = resource_definition.UserName;
214
215   _resource_definition = resource_definition;
216   _resource_definition.UserName = user_name;
217 }
218
219 ParserResourcesType
220 Launcher::Job::getResourceDefinition() const
221 {
222   return _resource_definition;
223 }
224
225 void
226 Launcher::Job::setJobFile(const std::string & job_file)
227 {
228   // Check job file
229   if (job_file == "")
230   {
231     std::string mess = "Empty Job File is forbidden !";
232     throw LauncherException(mess);
233   }
234
235   _job_file = job_file;
236   std::string::size_type p1 = _job_file.find_last_of(SEPARATOR);
237   std::string::size_type p2 = _job_file.find_last_of(".");
238   _job_file_name_complete = _job_file.substr(p1+1);
239   _job_file_name = _job_file.substr(p1+1,p2-p1-1);
240 }
241
242 std::string
243 Launcher::Job::getJobFile() const
244 {
245   return _job_file;
246 }
247 void
248 Launcher::Job::setEnvFile(const std::string & env_file)
249 {
250   _env_file = env_file;
251 }
252
253 std::string
254 Launcher::Job::getEnvFile() const
255 {
256   return _env_file;
257 }
258
259 void
260 Launcher::Job::setWorkDirectory(const std::string & work_directory)
261 {
262   _work_directory = work_directory;
263 }
264
265 void
266 Launcher::Job::setLocalDirectory(const std::string & local_directory)
267 {
268   _local_directory = local_directory;
269 }
270
271 void
272 Launcher::Job::setResultDirectory(const std::string & result_directory)
273 {
274   _result_directory = result_directory;
275 }
276
277 void
278 Launcher::Job::add_in_file(const std::string & file)
279 {
280   std::list<std::string>::iterator it = std::find(_in_files.begin(), _in_files.end(), file);
281   if (it == _in_files.end())
282     _in_files.push_back(file);
283   else
284     std::cerr << "Launcher::Job::add_in_file -- Warning file was already entered in in_files: " << file << std::endl;
285 }
286
287 void
288 Launcher::Job::add_out_file(const std::string & file)
289 {
290   std::list<std::string>::iterator it = std::find(_out_files.begin(), _out_files.end(), file);
291   if (it == _out_files.end())
292     _out_files.push_back(file);
293   else
294     std::cerr << "Launcher::Job::add_out_file -- Warning file was already entered in out_files: " << file << std::endl;
295 }
296
297 void
298 Launcher::Job::setMaximumDuration(const std::string & maximum_duration)
299 {
300   checkMaximumDuration(maximum_duration);
301   _maximum_duration_in_second = convertMaximumDuration(maximum_duration);
302   _maximum_duration = maximum_duration;
303 }
304
305 // For COORM
306 void
307 Launcher::Job::setLauncherFile(const std::string & launcher_file)
308 {
309         _launcher_file = launcher_file;
310 }
311 void
312 Launcher::Job::setLauncherArgs(const std::string & launcher_args)
313 {
314         _launcher_args = launcher_args;
315 }
316
317 void
318 Launcher::Job::setResourceRequiredParams(const resourceParams & resource_required_params)
319 {
320   checkResourceRequiredParams(resource_required_params);
321   _resource_required_params = resource_required_params;
322 }
323
324 void
325 Launcher::Job::setQueue(const std::string & queue)
326 {
327   _queue = queue;
328 }
329
330 void
331 Launcher::Job::setPartition(const std::string & partition)
332 {
333   _partition = partition;
334 }
335
336 void
337 Launcher::Job::setExclusive(bool exclusive)
338 {
339   _exclusive = exclusive;
340 }
341
342 void
343 Launcher::Job::setExclusiveStr(const std::string & exclusiveStr)
344 {
345   if (exclusiveStr == "true")
346     _exclusive = true;
347   else if (exclusiveStr == "false")
348     _exclusive = false;
349   else
350     throw LauncherException(std::string("Invalid boolean value for exclusive: ") + exclusiveStr);
351 }
352
353 void
354 Launcher::Job::setMemPerCpu(unsigned long mem_per_cpu)
355 {
356   _mem_per_cpu = mem_per_cpu;
357 }
358
359 void
360 Launcher::Job::setWCKey(const std::string & wckey)
361 {
362   _wckey = wckey;
363 }
364
365 void
366 Launcher::Job::setExtraParams(const std::string & extra_params)
367 {
368   _extra_params = extra_params;
369 }
370
371 void
372 Launcher::Job::setReference(const std::string & reference)
373 {
374   _reference = reference;
375 }
376
377 std::string
378 Launcher::Job::getWorkDirectory() const
379 {
380   return _work_directory;
381 }
382
383 std::string
384 Launcher::Job::getLocalDirectory() const
385 {
386   return _local_directory;
387 }
388
389 std::string
390 Launcher::Job::getResultDirectory() const
391 {
392   return _result_directory;
393 }
394
395 const std::list<std::string> &
396 Launcher::Job::get_in_files() const
397 {
398   return _in_files;
399 }
400
401 const std::list<std::string> &
402 Launcher::Job::get_out_files() const
403 {
404   return _out_files;
405 }
406
407 std::string
408 Launcher::Job::getMaximumDuration() const
409 {
410   return _maximum_duration;
411 }
412
413 // For COORM
414 std::string
415 Launcher::Job::getLauncherFile() const
416 {
417         return _launcher_file;
418 }
419 std::string
420 Launcher::Job::getLauncherArgs() const
421 {
422         return _launcher_args;
423 }
424
425 resourceParams
426 Launcher::Job::getResourceRequiredParams() const
427 {
428   return _resource_required_params;
429 }
430
431 std::string
432 Launcher::Job::getQueue() const
433 {
434   return _queue;
435 }
436
437 std::string
438 Launcher::Job::getPartition() const
439 {
440   return _partition;
441 }
442
443 bool
444 Launcher::Job::getExclusive() const
445 {
446   return _exclusive;
447 }
448
449 std::string
450 Launcher::Job::getExclusiveStr() const
451 {
452   return _exclusive ? "true" : "false";
453 }
454
455 unsigned long
456 Launcher::Job::getMemPerCpu() const
457 {
458   return _mem_per_cpu;
459 }
460
461 std::string
462 Launcher::Job::getWCKey() const
463 {
464   return _wckey;
465 }
466
467 std::string
468 Launcher::Job::getExtraParams() const
469 {
470   return _extra_params;
471 }
472
473 std::string
474 Launcher::Job::getReference() const
475 {
476   return _reference;
477 }
478
479 void
480 Launcher::Job::setPreCommand(const std::string & preCommand)
481 {
482   _pre_command = preCommand;
483 }
484
485 std::string
486 Launcher::Job::getPreCommand() const
487 {
488   return _pre_command;
489 }
490
491 void
492 Launcher::Job::checkMaximumDuration(const std::string & maximum_duration)
493 {
494   std::string result("");
495   std::string edt_value = maximum_duration;
496   std::size_t pos = edt_value.find(":");
497
498   if (edt_value != "") {
499     if (pos == edt_value.npos) {
500       throw LauncherException("[Launcher::Job::checkMaximumDuration] Error on definition: " + edt_value);
501     }
502     std::string begin_edt_value = edt_value.substr(0, pos);
503     std::string mid_edt_value = edt_value.substr(pos, 1);
504     std::string end_edt_value = edt_value.substr(pos + 1, edt_value.npos);
505
506     long value;
507     std::istringstream iss(begin_edt_value);
508     if (!(iss >> value)) {
509       result = "[Launcher::Job::checkExpectedDuration] Error on definition ! : " + edt_value;
510     }
511     else if (value < 0) {
512       result = "[Launcher::Job::checkExpectedDuration] Error on definition time is negative ! : " + value;
513     }
514     std::istringstream iss_2(end_edt_value);
515     if (!(iss_2 >> value)) {
516       result = "[Launcher::Job::checkExpectedDuration] Error on definition ! : " + edt_value;
517     }
518     else if (value < 0) {
519       result = "[Launcher::Job::checkExpectedDuration] Error on definition time is negative ! : " + value;
520     }
521     if (mid_edt_value != ":") {
522       result = "[Launcher::Job::checkExpectedDuration] Error on definition ! :" + edt_value;
523     }
524   }
525   if (result != "")
526     throw LauncherException(result);
527 }
528
529 void
530 Launcher::Job::checkResourceRequiredParams(const resourceParams & resource_required_params)
531 {
532   // nb_proc has be to > 0
533   if (resource_required_params.nb_proc <= 0)
534   {
535     std::string message("[Launcher::Job::checkResourceRequiredParams] proc number is not > 0 ! ");
536     throw LauncherException(message);
537   }
538 }
539
540 long
541 Launcher::Job::convertMaximumDuration(const std::string & edt)
542 {
543   long dd(0), hh(0), mm(0);
544
545   if( edt.size() == 0 )
546     return -1;
547
548   std::string remain( edt );
549
550   auto pos_day = edt.find('-');
551   if( pos_day != std::string::npos)
552   {
553     std::string d = edt.substr(0,pos_day);
554     if(pos_day == edt.size()-1)
555       return -1;
556     remain = edt.substr(pos_day+1);
557     std::istringstream issd(d);
558     issd >> dd;
559   }
560   std::string::size_type pos = remain.find(':');
561   std::string h = remain.substr(0,pos);
562   std::string m = remain.substr(pos+1,remain.size()-pos+1);
563   std::istringstream issh(h);
564   issh >> hh;
565   std::istringstream issm(m);
566   issm >> mm;
567   long ret = dd*60*24 + hh*60 + mm;
568   return 60*ret;
569 }
570
571 std::string
572 Launcher::Job::getLaunchDate() const
573 {
574   time_t rawtime;
575   time(&rawtime);
576   std::string launch_date = ctime(&rawtime);
577   size_t i = 0 ;
578   for (;i < launch_date.size(); i++)
579     if (launch_date[i] == '/' ||
580         launch_date[i] == '-' ||
581         launch_date[i] == ':' ||
582         launch_date[i] == ' ')
583       launch_date[i] = '_';
584   launch_date.erase(--launch_date.end()); // Last character is a \n
585
586   return launch_date;
587 }
588
589 std::string
590 Launcher::Job::updateJobState()
591 {
592
593   if (_state != "FINISHED" &&
594       _state != "ERROR"    &&
595       _state != "FAILED")
596   {
597 #ifdef WITH_LIBBATCH
598     if (_batch_job_id.getReference() != "undefined")
599     {
600       // A batch manager has been affected to the job
601       Batch::JobInfo job_info = _batch_job_id.queryJob();
602       Batch::Parametre par = job_info.getParametre();
603       _state = par[Batch::STATE].str();
604       _assigned_hostnames = (par.find(Batch::ASSIGNEDHOSTNAMES) == par.end())?
605                             "" : par[Batch::ASSIGNEDHOSTNAMES].str();
606       LAUNCHER_MESSAGE("State received is: " << par[Batch::STATE].str());
607     }
608 #endif
609   }
610   return _state;
611 }
612
613 #ifdef WITH_LIBBATCH
614 Batch::Job *
615 Launcher::Job::getBatchJob()
616 {
617   update_job();
618   return _batch_job;
619 }
620
621 Batch::Parametre
622 Launcher::Job::common_job_params()
623 {
624   Batch::Parametre params;
625
626   params[Batch::NAME] = getJobName();
627   params[Batch::NBPROC] = _resource_required_params.nb_proc;
628   if(_resource_required_params.nb_proc_per_node > 0)
629     params[Batch::NBPROCPERNODE] = _resource_required_params.nb_proc_per_node;
630
631   if(_resource_required_params.nb_node > 0)
632     params[Batch::NBNODE] = _resource_required_params.nb_node;
633
634   // Memory in megabytes
635   if (_resource_required_params.mem_mb > 0)
636   {
637     params[Batch::MAXRAMSIZE] = _resource_required_params.mem_mb;
638   }
639   else if (_mem_per_cpu > 0)
640   {
641     params[Batch::MEMPERCPU] = (long)_mem_per_cpu;
642   }
643
644   // We define a default directory
645   if (_work_directory == "")
646   {
647     const size_t BUFSIZE = 32;
648     char date[BUFSIZE];
649     time_t curtime = time(NULL);
650     strftime(date, BUFSIZE, "%Y_%m_%d__%H_%M_%S", localtime(&curtime));
651     if(!_resource_definition.working_directory.empty())
652     {
653       std::string date_dir = std::string("/job_") + date;
654       std::ostringstream str_pid;
655 #ifdef WIN32
656           str_pid << _getpid();
657 #else
658       str_pid << ::getpid();
659 #endif
660       std::string job_dir = date_dir + "-" + str_pid.str();
661
662       _work_directory = _resource_definition.working_directory + job_dir;
663     }
664     else
665     {
666 #ifndef WIN32
667       _work_directory = std::string("/$HOME/Batch/workdir_");
668 #else
669       _work_directory = std::string("%USERPROFILE%\\Batch\\workdir_");
670 #endif
671       _work_directory += date;
672     }
673   }
674   params[Batch::WORKDIR] = _work_directory;
675   std::string libbatch_pre_command("");
676   if(!_pre_command.empty())
677   {
678     boost::filesystem::path pre_command_path(_pre_command);
679     libbatch_pre_command += "./" + pre_command_path.filename().string();
680   }
681   params[Batch::PREPROCESS] = libbatch_pre_command;
682
683   // Parameters for COORM
684   params[Batch::LAUNCHER_FILE] = _launcher_file;
685   params[Batch::LAUNCHER_ARGS] = _launcher_args;
686
687   // If result_directory is not defined, we use HOME environment
688   if (_result_directory == "")
689     _result_directory = Kernel_Utils::HomePath();
690
691   // _in_files
692   std::list<std::string> in_files(_in_files);
693   in_files.push_back(_job_file);
694   if (_env_file != "")
695           in_files.push_back(_env_file);
696   if(!_pre_command.empty())
697      in_files.push_back(_pre_command);
698   for(std::list<std::string>::iterator it = in_files.begin(); it != in_files.end(); it++)
699   {
700     std::string file = *it;
701
702     // local file -> If file is not an absolute path, we apply _local_directory
703     std::string local_file;
704 #ifndef WIN32
705     if (file.substr(0, 1) == std::string("/"))
706 #else
707     // On Windows, absolute paths may begin with something like "C:"
708     if (file.substr(1, 1) == std::string(":"))
709 #endif
710       local_file = file;
711     else if (file.substr(0, 1) == std::string("-")) // using rsync options
712       local_file = file;
713     else
714 #ifndef WIN32
715       // '/./' is used by rsync to find the root of the relative path
716       // /a/b/./c/f -> _working_directory/c/f
717       local_file = _local_directory + "/./" + file;
718 #else
719       local_file = _local_directory + SEPARATOR + file;
720 #endif
721
722     // remote file -> get only file name from in_files
723     std::string remote_file = _work_directory + "/";
724
725     params[Batch::INFILE] += Batch::Couple(local_file, remote_file);
726   }
727
728   // _out_files
729   for(std::list<std::string>::iterator it = _out_files.begin(); it != _out_files.end(); it++)
730   {
731     std::string file = *it;
732     // remote file -> If file is not an absolute path, we apply _work_directory
733     std::string remote_file;
734     std::string local_file;
735     if (file.substr(0, 1) == std::string("/"))
736     {
737       remote_file = file;
738       size_t found = file.find_last_of("/");
739       local_file = file.substr(found+1);
740     }
741     else if (file.substr(0, 1) == std::string("-")) // using rsync options
742     {
743       remote_file = file;
744       local_file = "";
745     }
746     else
747     {
748       // '/./' is used by rsync to find the root of the relative path
749       remote_file = _work_directory + "/./" + file;
750       local_file = "";
751     }
752
753     params[Batch::OUTFILE] += Batch::Couple(local_file, remote_file);
754   }
755
756   // Time
757   if (_maximum_duration_in_second != -1)
758     params[Batch::MAXWALLTIME] = _maximum_duration_in_second / 60;
759
760   // Queue
761   if (_queue != "")
762     params[Batch::QUEUE] = _queue;
763
764   // Partition
765   if (_partition != "")
766     params[Batch::PARTITION] = _partition;
767
768   // Exclusive
769   if (getExclusive())
770     params[Batch::EXCLUSIVE] = true;
771
772   // WC Key
773   if (_wckey != "")
774     params[Batch::WCKEY] = _wckey;
775
776   // Extra params
777   if (_extra_params != "")
778     params[Batch::EXTRAPARAMS] = _extra_params;
779
780   // Specific parameters
781   std::map<std::string, std::string>::iterator it = _specific_parameters.find("LoalLevelerJobType");
782   if (it != _specific_parameters.end())
783     params["LL_JOBTYPE"] = it->second;
784   return params;
785 }
786
787 void
788 Launcher::Job::setBatchManagerJobId(Batch::JobId batch_manager_job_id)
789 {
790   _batch_job_id = batch_manager_job_id;
791 }
792
793 Batch::JobId
794 Launcher::Job::getBatchManagerJobId() const
795 {
796   return _batch_job_id;
797 }
798 #endif
799
800 void
801 Launcher::Job::addSpecificParameter(const std::string & name,
802                                       const std::string & value)
803 {
804   _specific_parameters[name] = value;
805 }
806
807 const std::map<std::string, std::string> &
808 Launcher::Job::getSpecificParameters() const
809 {
810   return _specific_parameters;
811 }
812
813 void
814 Launcher::Job::checkSpecificParameters()
815 {
816 }