Salome HOME
Launcher is now usable outside a SALOME session. Launcher::Job instances lifecycle...
[modules/kernel.git] / src / Launcher / Launcher_Job.cxx
1 // Copyright (C) 2009-2017  CEA/DEN, EDF R&D, OPEN CASCADE
2 //
3 // This library is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU Lesser General Public
5 // License as published by the Free Software Foundation; either
6 // version 2.1 of the License, or (at your option) any later version.
7 //
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
11 // Lesser General Public License for more details.
12 //
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
16 //
17 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
18 //
19
20 // Author: AndrĂ© RIBES - EDF R&D
21 //
22 //#define _DEBUG_
23 #include "Launcher_Job.hxx"
24 #include "Launcher.hxx"
25 #include <boost/filesystem.hpp>
26
27 #ifdef WITH_LIBBATCH
28 #include <libbatch/Constants.hxx>
29 #endif
30
31 #include <sstream>
32 #ifdef WIN32
33   static const char SEPARATOR = '\\';
34 #include <process.h>
35 #else
36   static const char SEPARATOR = '/';
37 #endif
38
39 Launcher::Job::Job()
40 {
41   _number = -1;
42   _state = "CREATED";
43   _launch_date = getLaunchDate();
44
45   _env_file = "";
46   _job_name = "";
47   _job_file = "";
48   _job_file_name = "";
49   _job_file_name_complete = "";
50   _pre_command = "";
51   _work_directory = "";
52   _local_directory = "";
53   _result_directory = "";
54   _maximum_duration = "";
55   _maximum_duration_in_second = -1;
56   _queue = "";
57   _partition = "";
58   _job_type = "";
59   _exclusive = false;
60   _mem_per_cpu = 0;
61
62   // Parameters for COORM
63   _launcher_file = "";
64   _launcher_args = "";
65
66 #ifdef WITH_LIBBATCH
67   _batch_job = new Batch::Job();
68 #endif
69 }
70
71 Launcher::Job::~Job()
72 {
73   LAUNCHER_MESSAGE("Deleting job number: " << _number);
74 #ifdef WITH_LIBBATCH
75   if (_batch_job)
76     delete _batch_job;
77 #endif
78 }
79
80 bool Launcher::Job::decrRef() const
81 {
82   bool ret=((--_cnt)==0);
83   if(ret)
84     delete this;
85   return ret;
86 }
87
88 void Launcher::Job::incrRef() const
89 {
90   _cnt++;
91 }
92
93 void
94 Launcher::Job::stopJob()
95 {
96   LAUNCHER_MESSAGE("Stop resquested for job number: " << _number);
97   setState("FAILED");
98 #ifdef WITH_LIBBATCH
99   if (_batch_job_id.getReference() != "undefined")
100   {
101     try
102     {
103       _batch_job_id.deleteJob();
104     }
105     catch (const Batch::GenericException &ex)
106     {
107       LAUNCHER_INFOS("WARNING: exception when stopping the job: " << ex.message);
108     }
109   }
110 #endif
111 }
112
113
114 void
115 Launcher::Job::removeJob()
116 {
117   LAUNCHER_MESSAGE("Removing job number: " << _number);
118 #ifdef WITH_LIBBATCH
119   if (_batch_job_id.getReference() != "undefined")
120   {
121     try
122     {
123       _batch_job_id.deleteJob();
124     }
125     catch (const Batch::GenericException &ex)
126     {
127       LAUNCHER_INFOS("WARNING: exception when removing the job: " << ex.message);
128     }
129   }
130 #endif
131 }
132
133 std::string
134 Launcher::Job::getJobType() const
135 {
136   return _job_type;
137 }
138
139 void
140 Launcher::Job::setJobName(const std::string & job_name)
141 {
142   _job_name = job_name;
143 }
144
145 std::string
146 Launcher::Job::getJobName() const
147 {
148   return _job_name;
149 }
150
151 void
152 Launcher::Job::setState(const std::string & state)
153 {
154   // State of a Job: CREATED, QUEUED, RUNNING, FINISHED, FAILED
155   if (state != "CREATED" &&
156       state != "IN_PROCESS" &&
157       state != "QUEUED" &&
158       state != "RUNNING" &&
159       state != "PAUSED" &&
160       state != "FINISHED" &&
161       state != "FAILED" &&
162       state != "ERROR")
163   {
164     throw LauncherException("Bad state, this state does not exist: " + state);
165   }
166   _state = state;
167 }
168
169 std::string
170 Launcher::Job::getState() const
171 {
172   return _state;
173 }
174
175 // Get names or ids of hosts assigned to the job
176 std::string
177 Launcher::Job::getAssignedHostnames()
178 {
179   return _assigned_hostnames;
180 }
181
182 void
183 Launcher::Job::setNumber(const int & number)
184 {
185   if (_number != -1)
186     std::cerr << "Launcher::Job::setNumber -- Job number was already defined, before: " << _number << " now: " << number << std::endl;
187   _number = number;
188 }
189
190 int
191 Launcher::Job::getNumber()
192 {
193   return _number;
194 }
195
196 void
197 Launcher::Job::setResourceDefinition(const ParserResourcesType & resource_definition)
198 {
199   // Check machine_definition
200   std::string user_name = "";
201   if (resource_definition.UserName == "")
202   {
203 #ifndef WIN32
204     user_name = getenv("USER");
205 #else
206     user_name = getenv("USERNAME");
207 #endif
208     if (user_name == "")
209       user_name = getenv("LOGNAME");
210     if (user_name == "")
211     {
212       std::string mess = "You must define a user name: into your resource description or with one of env variables USER/LOGNAME";
213       throw LauncherException(mess);
214     }
215   }
216   else
217     user_name = resource_definition.UserName;
218
219   _resource_definition = resource_definition;
220   _resource_definition.UserName = user_name;
221 }
222
223 ParserResourcesType
224 Launcher::Job::getResourceDefinition() const
225 {
226   return _resource_definition;
227 }
228
229 void
230 Launcher::Job::setJobFile(const std::string & job_file)
231 {
232   // Check job file
233   if (job_file == "")
234   {
235     std::string mess = "Empty Job File is forbidden !";
236     throw LauncherException(mess);
237   }
238
239   _job_file = job_file;
240   std::string::size_type p1 = _job_file.find_last_of(SEPARATOR);
241   std::string::size_type p2 = _job_file.find_last_of(".");
242   _job_file_name_complete = _job_file.substr(p1+1);
243   _job_file_name = _job_file.substr(p1+1,p2-p1-1);
244 }
245
246 std::string
247 Launcher::Job::getJobFile() const
248 {
249   return _job_file;
250 }
251 void
252 Launcher::Job::setEnvFile(const std::string & env_file)
253 {
254   _env_file = env_file;
255 }
256
257 std::string
258 Launcher::Job::getEnvFile() const
259 {
260   return _env_file;
261 }
262
263 void
264 Launcher::Job::setWorkDirectory(const std::string & work_directory)
265 {
266   _work_directory = work_directory;
267 }
268
269 void
270 Launcher::Job::setLocalDirectory(const std::string & local_directory)
271 {
272   _local_directory = local_directory;
273 }
274
275 void
276 Launcher::Job::setResultDirectory(const std::string & result_directory)
277 {
278   _result_directory = result_directory;
279 }
280
281 void
282 Launcher::Job::add_in_file(const std::string & file)
283 {
284   std::list<std::string>::iterator it = std::find(_in_files.begin(), _in_files.end(), file);
285   if (it == _in_files.end())
286     _in_files.push_back(file);
287   else
288     std::cerr << "Launcher::Job::add_in_file -- Warning file was already entered in in_files: " << file << std::endl;
289 }
290
291 void
292 Launcher::Job::add_out_file(const std::string & file)
293 {
294   std::list<std::string>::iterator it = std::find(_out_files.begin(), _out_files.end(), file);
295   if (it == _out_files.end())
296     _out_files.push_back(file);
297   else
298     std::cerr << "Launcher::Job::add_out_file -- Warning file was already entered in out_files: " << file << std::endl;
299 }
300
301 void
302 Launcher::Job::setMaximumDuration(const std::string & maximum_duration)
303 {
304   checkMaximumDuration(maximum_duration);
305   _maximum_duration_in_second = convertMaximumDuration(maximum_duration);
306   _maximum_duration = maximum_duration;
307 }
308
309 // For COORM
310 void
311 Launcher::Job::setLauncherFile(const std::string & launcher_file)
312 {
313         _launcher_file = launcher_file;
314 }
315 void
316 Launcher::Job::setLauncherArgs(const std::string & launcher_args)
317 {
318         _launcher_args = launcher_args;
319 }
320
321 void
322 Launcher::Job::setResourceRequiredParams(const resourceParams & resource_required_params)
323 {
324   checkResourceRequiredParams(resource_required_params);
325   _resource_required_params = resource_required_params;
326 }
327
328 void
329 Launcher::Job::setQueue(const std::string & queue)
330 {
331   _queue = queue;
332 }
333
334 void
335 Launcher::Job::setPartition(const std::string & partition)
336 {
337   _partition = partition;
338 }
339
340 void
341 Launcher::Job::setExclusive(bool exclusive)
342 {
343   _exclusive = exclusive;
344 }
345
346 void
347 Launcher::Job::setExclusiveStr(const std::string & exclusiveStr)
348 {
349   if (exclusiveStr == "true")
350     _exclusive = true;
351   else if (exclusiveStr == "false")
352     _exclusive = false;
353   else
354     throw LauncherException(std::string("Invalid boolean value for exclusive: ") + exclusiveStr);
355 }
356
357 void
358 Launcher::Job::setMemPerCpu(unsigned long mem_per_cpu)
359 {
360   _mem_per_cpu = mem_per_cpu;
361 }
362
363 void
364 Launcher::Job::setWCKey(const std::string & wckey)
365 {
366   _wckey = wckey;
367 }
368
369 void
370 Launcher::Job::setExtraParams(const std::string & extra_params)
371 {
372   _extra_params = extra_params;
373 }
374
375 void
376 Launcher::Job::setReference(const std::string & reference)
377 {
378   _reference = reference;
379 }
380
381 std::string
382 Launcher::Job::getWorkDirectory() const
383 {
384   return _work_directory;
385 }
386
387 std::string
388 Launcher::Job::getLocalDirectory() const
389 {
390   return _local_directory;
391 }
392
393 std::string
394 Launcher::Job::getResultDirectory() const
395 {
396   return _result_directory;
397 }
398
399 const std::list<std::string> &
400 Launcher::Job::get_in_files() const
401 {
402   return _in_files;
403 }
404
405 const std::list<std::string> &
406 Launcher::Job::get_out_files() const
407 {
408   return _out_files;
409 }
410
411 std::string
412 Launcher::Job::getMaximumDuration() const
413 {
414   return _maximum_duration;
415 }
416
417 // For COORM
418 std::string
419 Launcher::Job::getLauncherFile() const
420 {
421         return _launcher_file;
422 }
423 std::string
424 Launcher::Job::getLauncherArgs() const
425 {
426         return _launcher_args;
427 }
428
429 resourceParams
430 Launcher::Job::getResourceRequiredParams() const
431 {
432   return _resource_required_params;
433 }
434
435 std::string
436 Launcher::Job::getQueue() const
437 {
438   return _queue;
439 }
440
441 std::string
442 Launcher::Job::getPartition() const
443 {
444   return _partition;
445 }
446
447 bool
448 Launcher::Job::getExclusive() const
449 {
450   return _exclusive;
451 }
452
453 std::string
454 Launcher::Job::getExclusiveStr() const
455 {
456   return _exclusive ? "true" : "false";
457 }
458
459 unsigned long
460 Launcher::Job::getMemPerCpu() const
461 {
462   return _mem_per_cpu;
463 }
464
465 std::string
466 Launcher::Job::getWCKey() const
467 {
468   return _wckey;
469 }
470
471 std::string
472 Launcher::Job::getExtraParams() const
473 {
474   return _extra_params;
475 }
476
477 std::string
478 Launcher::Job::getReference() const
479 {
480   return _reference;
481 }
482
483 void
484 Launcher::Job::setPreCommand(const std::string & preCommand)
485 {
486   _pre_command = preCommand;
487 }
488
489 std::string
490 Launcher::Job::getPreCommand() const
491 {
492   return _pre_command;
493 }
494
495 void
496 Launcher::Job::checkMaximumDuration(const std::string & maximum_duration)
497 {
498   std::string result("");
499   std::string edt_value = maximum_duration;
500   std::size_t pos = edt_value.find(":");
501
502   if (edt_value != "") {
503     if (pos == edt_value.npos) {
504       throw LauncherException("[Launcher::Job::checkMaximumDuration] Error on definition: " + edt_value);
505     }
506     std::string begin_edt_value = edt_value.substr(0, pos);
507     std::string mid_edt_value = edt_value.substr(pos, 1);
508     std::string end_edt_value = edt_value.substr(pos + 1, edt_value.npos);
509
510     long value;
511     std::istringstream iss(begin_edt_value);
512     if (!(iss >> value)) {
513       result = "[Launcher::Job::checkExpectedDuration] Error on definition ! : " + edt_value;
514     }
515     else if (value < 0) {
516       result = "[Launcher::Job::checkExpectedDuration] Error on definition time is negative ! : " + value;
517     }
518     std::istringstream iss_2(end_edt_value);
519     if (!(iss_2 >> value)) {
520       result = "[Launcher::Job::checkExpectedDuration] Error on definition ! : " + edt_value;
521     }
522     else if (value < 0) {
523       result = "[Launcher::Job::checkExpectedDuration] Error on definition time is negative ! : " + value;
524     }
525     if (mid_edt_value != ":") {
526       result = "[Launcher::Job::checkExpectedDuration] Error on definition ! :" + edt_value;
527     }
528   }
529   if (result != "")
530     throw LauncherException(result);
531 }
532
533 void
534 Launcher::Job::checkResourceRequiredParams(const resourceParams & resource_required_params)
535 {
536   // nb_proc has be to > 0
537   if (resource_required_params.nb_proc <= 0)
538   {
539     std::string message("[Launcher::Job::checkResourceRequiredParams] proc number is not > 0 ! ");
540     throw LauncherException(message);
541   }
542 }
543
544 long
545 Launcher::Job::convertMaximumDuration(const std::string & edt)
546 {
547   long hh, mm, ret;
548
549   if( edt.size() == 0 )
550     return -1;
551
552   std::string::size_type pos = edt.find(":");
553   std::string h = edt.substr(0,pos);
554   std::string m = edt.substr(pos+1,edt.size()-pos+1);
555   std::istringstream issh(h);
556   issh >> hh;
557   std::istringstream issm(m);
558   issm >> mm;
559   ret = hh*60 + mm;
560   ret = ret * 60;
561
562   return ret;
563 }
564
565 std::string
566 Launcher::Job::getLaunchDate() const
567 {
568   time_t rawtime;
569   time(&rawtime);
570   std::string launch_date = ctime(&rawtime);
571   size_t i = 0 ;
572   for (;i < launch_date.size(); i++)
573     if (launch_date[i] == '/' ||
574         launch_date[i] == '-' ||
575         launch_date[i] == ':' ||
576         launch_date[i] == ' ')
577       launch_date[i] = '_';
578   launch_date.erase(--launch_date.end()); // Last character is a \n
579
580   return launch_date;
581 }
582
583 std::string
584 Launcher::Job::updateJobState()
585 {
586
587   if (_state != "FINISHED" &&
588       _state != "ERROR"    &&
589       _state != "FAILED")
590   {
591 #ifdef WITH_LIBBATCH
592     if (_batch_job_id.getReference() != "undefined")
593     {
594       // A batch manager has been affected to the job
595       Batch::JobInfo job_info = _batch_job_id.queryJob();
596       Batch::Parametre par = job_info.getParametre();
597       _state = par[Batch::STATE].str();
598       _assigned_hostnames = (par.find(Batch::ASSIGNEDHOSTNAMES) == par.end())?
599                             "" : par[Batch::ASSIGNEDHOSTNAMES].str();
600       LAUNCHER_MESSAGE("State received is: " << par[Batch::STATE].str());
601     }
602 #endif
603   }
604   return _state;
605 }
606
607 #ifdef WITH_LIBBATCH
608 Batch::Job *
609 Launcher::Job::getBatchJob()
610 {
611   update_job();
612   return _batch_job;
613 }
614
615 Batch::Parametre
616 Launcher::Job::common_job_params()
617 {
618   Batch::Parametre params;
619
620   params[Batch::NAME] = getJobName();
621   params[Batch::NBPROC] = _resource_required_params.nb_proc;
622   params[Batch::NBPROCPERNODE] = _resource_required_params.nb_proc_per_node;
623
624   if(_resource_required_params.nb_node > 0)
625     params[Batch::NBNODE] = _resource_required_params.nb_node;
626
627   // Memory in megabytes
628   if (_resource_required_params.mem_mb > 0)
629   {
630     params[Batch::MAXRAMSIZE] = _resource_required_params.mem_mb;
631   }
632   else if (_mem_per_cpu > 0)
633   {
634     params[Batch::MEMPERCPU] = (long)_mem_per_cpu;
635   }
636
637   // We define a default directory
638   if (_work_directory == "")
639   {
640     const size_t BUFSIZE = 32;
641     char date[BUFSIZE];
642     time_t curtime = time(NULL);
643     strftime(date, BUFSIZE, "%Y_%m_%d__%H_%M_%S", localtime(&curtime));
644     if(!_resource_definition.working_directory.empty())
645     {
646       std::string date_dir = std::string("/job_") + date;
647       std::ostringstream str_pid;
648 #ifdef WIN32
649           str_pid << _getpid();
650 #else
651       str_pid << ::getpid();
652 #endif
653       std::string job_dir = date_dir + "-" + str_pid.str();
654
655       _work_directory = _resource_definition.working_directory + job_dir;
656     }
657     else
658     {
659 #ifndef WIN32
660       _work_directory = std::string("/$HOME/Batch/workdir_");
661 #else
662       _work_directory = std::string("%USERPROFILE%\\Batch\\workdir_");
663 #endif
664       _work_directory += date;
665     }
666   }
667   params[Batch::WORKDIR] = _work_directory;
668   std::string libbatch_pre_command("");
669   if(!_pre_command.empty())
670   {
671     boost::filesystem::path pre_command_path(_pre_command);
672     libbatch_pre_command += "./" + pre_command_path.filename().string();
673   }
674   params[Batch::PREPROCESS] = libbatch_pre_command;
675
676   // Parameters for COORM
677   params[Batch::LAUNCHER_FILE] = _launcher_file;
678   params[Batch::LAUNCHER_ARGS] = _launcher_args;
679
680   // If result_directory is not defined, we use HOME environment
681   if (_result_directory == "")
682 #ifndef WIN32
683     _result_directory = getenv("HOME");
684 #else
685     _result_directory = getenv("USERPROFILE");
686 #endif
687   // _in_files
688   std::list<std::string> in_files(_in_files);
689   in_files.push_back(_job_file);
690   if (_env_file != "")
691           in_files.push_back(_env_file);
692   if(!_pre_command.empty())
693      in_files.push_back(_pre_command);
694   for(std::list<std::string>::iterator it = in_files.begin(); it != in_files.end(); it++)
695   {
696     std::string file = *it;
697
698     // local file -> If file is not an absolute path, we apply _local_directory
699     std::string local_file;
700 #ifndef WIN32
701     if (file.substr(0, 1) == std::string("/"))
702 #else
703     // On Windows, absolute paths may begin with something like "C:"
704     if (file.substr(1, 1) == std::string(":"))
705 #endif
706       local_file = file;
707     else if (file.substr(0, 1) == std::string("-")) // using rsync options
708       local_file = file;
709     else
710 #ifndef WIN32
711       // '/./' is used by rsync to find the root of the relative path
712       // /a/b/./c/f -> _working_directory/c/f
713       local_file = _local_directory + "/./" + file;
714 #else
715       local_file = _local_directory + SEPARATOR + file;
716 #endif
717
718     // remote file -> get only file name from in_files
719     std::string remote_file = _work_directory + "/";
720
721     params[Batch::INFILE] += Batch::Couple(local_file, remote_file);
722   }
723
724   // _out_files
725   for(std::list<std::string>::iterator it = _out_files.begin(); it != _out_files.end(); it++)
726   {
727     std::string file = *it;
728     // remote file -> If file is not an absolute path, we apply _work_directory
729     std::string remote_file;
730     std::string local_file;
731     if (file.substr(0, 1) == std::string("/"))
732     {
733       remote_file = file;
734       size_t found = file.find_last_of("/");
735       local_file = file.substr(found+1);
736     }
737     else if (file.substr(0, 1) == std::string("-")) // using rsync options
738     {
739       remote_file = file;
740       local_file = "";
741     }
742     else
743     {
744       // '/./' is used by rsync to find the root of the relative path
745       remote_file = _work_directory + "/./" + file;
746       local_file = "";
747     }
748
749     params[Batch::OUTFILE] += Batch::Couple(local_file, remote_file);
750   }
751
752   // Time
753   if (_maximum_duration_in_second != -1)
754     params[Batch::MAXWALLTIME] = _maximum_duration_in_second / 60;
755
756   // Queue
757   if (_queue != "")
758     params[Batch::QUEUE] = _queue;
759
760   // Partition
761   if (_partition != "")
762     params[Batch::PARTITION] = _partition;
763
764   // Exclusive
765   if (getExclusive())
766     params[Batch::EXCLUSIVE] = true;
767
768   // WC Key
769   if (_wckey != "")
770     params[Batch::WCKEY] = _wckey;
771
772   // Extra params
773   if (_extra_params != "")
774     params[Batch::EXTRAPARAMS] = _extra_params;
775
776   // Specific parameters
777   std::map<std::string, std::string>::iterator it = _specific_parameters.find("LoalLevelerJobType");
778   if (it != _specific_parameters.end())
779     params["LL_JOBTYPE"] = it->second;
780   return params;
781 }
782
783 void
784 Launcher::Job::setBatchManagerJobId(Batch::JobId batch_manager_job_id)
785 {
786   _batch_job_id = batch_manager_job_id;
787 }
788
789 Batch::JobId
790 Launcher::Job::getBatchManagerJobId() const
791 {
792   return _batch_job_id;
793 }
794 #endif
795
796 void
797 Launcher::Job::addSpecificParameter(const std::string & name,
798                                       const std::string & value)
799 {
800   _specific_parameters[name] = value;
801 }
802
803 const std::map<std::string, std::string> &
804 Launcher::Job::getSpecificParameters() const
805 {
806   return _specific_parameters;
807 }
808
809 void
810 Launcher::Job::checkSpecificParameters()
811 {
812 }