Salome HOME
Pre process command is now a script.
[modules/yacs.git] / src / Launcher / Launcher_Job.cxx
1 // Copyright (C) 2009-2017  CEA/DEN, EDF R&D, OPEN CASCADE
2 //
3 // This library is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU Lesser General Public
5 // License as published by the Free Software Foundation; either
6 // version 2.1 of the License, or (at your option) any later version.
7 //
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
11 // Lesser General Public License for more details.
12 //
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
16 //
17 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
18 //
19
20 // Author: AndrĂ© RIBES - EDF R&D
21 //
22 //#define _DEBUG_
23 #include "Launcher_Job.hxx"
24 #include "Launcher.hxx"
25 #include <boost/filesystem.hpp>
26
27 #ifdef WITH_LIBBATCH
28 #include <libbatch/Constants.hxx>
29 #endif
30
31 #include <sstream>
32
33 Launcher::Job::Job()
34 {
35   _number = -1;
36   _state = "CREATED";
37   _launch_date = getLaunchDate();
38
39   _env_file = "";
40   _job_name = "";
41   _job_file = "";
42   _job_file_name = "";
43   _job_file_name_complete = "";
44   _pre_command = "";
45   _work_directory = "";
46   _local_directory = "";
47   _result_directory = "";
48   _maximum_duration = "";
49   _maximum_duration_in_second = -1;
50   _queue = "";
51   _job_type = "";
52   _exclusive = false;
53   _mem_per_cpu = 0;
54
55   // Parameters for COORM
56   _launcher_file = "";
57   _launcher_args = "";
58
59 #ifdef WITH_LIBBATCH
60   _batch_job = new Batch::Job();
61 #endif
62 }
63
64 Launcher::Job::~Job()
65 {
66   LAUNCHER_MESSAGE("Deleting job number: " << _number);
67 #ifdef WITH_LIBBATCH
68   if (_batch_job)
69     delete _batch_job;
70 #endif
71 }
72
73 void
74 Launcher::Job::stopJob()
75 {
76   LAUNCHER_MESSAGE("Stop resquested for job number: " << _number);
77   setState("FAILED");
78 #ifdef WITH_LIBBATCH
79   if (_batch_job_id.getReference() != "undefined")
80   {
81     try
82     {
83       _batch_job_id.deleteJob();
84     }
85     catch (const Batch::GenericException &ex)
86     {
87       LAUNCHER_INFOS("WARNING: exception when stopping the job: " << ex.message);
88     }
89   }
90 #endif
91 }
92
93
94 void
95 Launcher::Job::removeJob()
96 {
97   LAUNCHER_MESSAGE("Removing job number: " << _number);
98 #ifdef WITH_LIBBATCH
99   if (_batch_job_id.getReference() != "undefined")
100   {
101     try
102     {
103       _batch_job_id.deleteJob();
104     }
105     catch (const Batch::GenericException &ex)
106     {
107       LAUNCHER_INFOS("WARNING: exception when removing the job: " << ex.message);
108     }
109   }
110 #endif
111 }
112
113 std::string
114 Launcher::Job::getJobType() const
115 {
116   return _job_type;
117 }
118
119 void
120 Launcher::Job::setJobName(const std::string & job_name)
121 {
122   _job_name = job_name;
123 }
124
125 std::string
126 Launcher::Job::getJobName() const
127 {
128   return _job_name;
129 }
130
131 void
132 Launcher::Job::setState(const std::string & state)
133 {
134   // State of a Job: CREATED, QUEUED, RUNNING, FINISHED, FAILED
135   if (state != "CREATED" &&
136       state != "IN_PROCESS" &&
137       state != "QUEUED" &&
138       state != "RUNNING" &&
139       state != "PAUSED" &&
140       state != "FINISHED" &&
141       state != "FAILED" &&
142       state != "ERROR")
143   {
144     throw LauncherException("Bad state, this state does not exist: " + state);
145   }
146   _state = state;
147 }
148
149 std::string
150 Launcher::Job::getState() const
151 {
152   return _state;
153 }
154
155 // Get names or ids of hosts assigned to the job
156 std::string
157 Launcher::Job::getAssignedHostnames()
158 {
159   return _assigned_hostnames;
160 }
161
162 void
163 Launcher::Job::setNumber(const int & number)
164 {
165   if (_number != -1)
166     std::cerr << "Launcher::Job::setNumber -- Job number was already defined, before: " << _number << " now: " << number << std::endl;
167   _number = number;
168 }
169
170 int
171 Launcher::Job::getNumber()
172 {
173   return _number;
174 }
175
176 void
177 Launcher::Job::setResourceDefinition(const ParserResourcesType & resource_definition)
178 {
179   // Check machine_definition
180   std::string user_name = "";
181   if (resource_definition.UserName == "")
182   {
183     user_name = getenv("USER");
184     if (user_name == "")
185       user_name = getenv("LOGNAME");
186     if (user_name == "")
187     {
188       std::string mess = "You must define a user name: into your resource description or with one of env variables USER/LOGNAME";
189       throw LauncherException(mess);
190     }
191   }
192   else
193     user_name = resource_definition.UserName;
194
195   _resource_definition = resource_definition;
196   _resource_definition.UserName = user_name;
197 }
198
199 ParserResourcesType
200 Launcher::Job::getResourceDefinition() const
201 {
202   return _resource_definition;
203 }
204
205 void
206 Launcher::Job::setJobFile(const std::string & job_file)
207 {
208   // Check job file
209   if (job_file == "")
210   {
211     std::string mess = "Empty Job File is forbidden !";
212     throw LauncherException(mess);
213   }
214
215   _job_file = job_file;
216   std::string::size_type p1 = _job_file.find_last_of("/");
217   std::string::size_type p2 = _job_file.find_last_of(".");
218   _job_file_name_complete = _job_file.substr(p1+1);
219   _job_file_name = _job_file.substr(p1+1,p2-p1-1);
220 }
221
222 std::string
223 Launcher::Job::getJobFile() const
224 {
225   return _job_file;
226 }
227 void
228 Launcher::Job::setEnvFile(const std::string & env_file)
229 {
230   _env_file = env_file;
231 }
232
233 std::string
234 Launcher::Job::getEnvFile() const
235 {
236   return _env_file;
237 }
238
239 void
240 Launcher::Job::setWorkDirectory(const std::string & work_directory)
241 {
242   _work_directory = work_directory;
243 }
244
245 void
246 Launcher::Job::setLocalDirectory(const std::string & local_directory)
247 {
248   _local_directory = local_directory;
249 }
250
251 void
252 Launcher::Job::setResultDirectory(const std::string & result_directory)
253 {
254   _result_directory = result_directory;
255 }
256
257 void
258 Launcher::Job::add_in_file(const std::string & file)
259 {
260   std::list<std::string>::iterator it = std::find(_in_files.begin(), _in_files.end(), file);
261   if (it == _in_files.end())
262     _in_files.push_back(file);
263   else
264     std::cerr << "Launcher::Job::add_in_file -- Warning file was already entered in in_files: " << file << std::endl;
265 }
266
267 void
268 Launcher::Job::add_out_file(const std::string & file)
269 {
270   std::list<std::string>::iterator it = std::find(_out_files.begin(), _out_files.end(), file);
271   if (it == _out_files.end())
272     _out_files.push_back(file);
273   else
274     std::cerr << "Launcher::Job::add_out_file -- Warning file was already entered in out_files: " << file << std::endl;
275 }
276
277 void
278 Launcher::Job::setMaximumDuration(const std::string & maximum_duration)
279 {
280   checkMaximumDuration(maximum_duration);
281   _maximum_duration_in_second = convertMaximumDuration(maximum_duration);
282   _maximum_duration = maximum_duration;
283 }
284
285 // For COORM
286 void
287 Launcher::Job::setLauncherFile(const std::string & launcher_file)
288 {
289         _launcher_file = launcher_file;
290 }
291 void
292 Launcher::Job::setLauncherArgs(const std::string & launcher_args)
293 {
294         _launcher_args = launcher_args;
295 }
296
297 void
298 Launcher::Job::setResourceRequiredParams(const resourceParams & resource_required_params)
299 {
300   checkResourceRequiredParams(resource_required_params);
301   _resource_required_params = resource_required_params;
302 }
303
304 void
305 Launcher::Job::setQueue(const std::string & queue)
306 {
307   _queue = queue;
308 }
309
310 void
311 Launcher::Job::setExclusive(bool exclusive)
312 {
313   _exclusive = exclusive;
314 }
315
316 void
317 Launcher::Job::setExclusiveStr(const std::string & exclusiveStr)
318 {
319   if (exclusiveStr == "true")
320     _exclusive = true;
321   else if (exclusiveStr == "false")
322     _exclusive = false;
323   else
324     throw LauncherException(std::string("Invalid boolean value for exclusive: ") + exclusiveStr);
325 }
326
327 void
328 Launcher::Job::setMemPerCpu(unsigned long mem_per_cpu)
329 {
330   _mem_per_cpu = mem_per_cpu;
331 }
332
333 void
334 Launcher::Job::setWCKey(const std::string & wckey)
335 {
336   _wckey = wckey;
337 }
338
339 void
340 Launcher::Job::setExtraParams(const std::string & extra_params)
341 {
342   _extra_params = extra_params;
343 }
344
345 void
346 Launcher::Job::setReference(const std::string & reference)
347 {
348   _reference = reference;
349 }
350
351 std::string
352 Launcher::Job::getWorkDirectory() const
353 {
354   return _work_directory;
355 }
356
357 std::string
358 Launcher::Job::getLocalDirectory() const
359 {
360   return _local_directory;
361 }
362
363 std::string
364 Launcher::Job::getResultDirectory() const
365 {
366   return _result_directory;
367 }
368
369 const std::list<std::string> &
370 Launcher::Job::get_in_files() const
371 {
372   return _in_files;
373 }
374
375 const std::list<std::string> &
376 Launcher::Job::get_out_files() const
377 {
378   return _out_files;
379 }
380
381 std::string
382 Launcher::Job::getMaximumDuration() const
383 {
384   return _maximum_duration;
385 }
386
387 // For COORM
388 std::string
389 Launcher::Job::getLauncherFile() const
390 {
391         return _launcher_file;
392 }
393 std::string
394 Launcher::Job::getLauncherArgs() const
395 {
396         return _launcher_args;
397 }
398
399 resourceParams
400 Launcher::Job::getResourceRequiredParams() const
401 {
402   return _resource_required_params;
403 }
404
405 std::string
406 Launcher::Job::getQueue() const
407 {
408   return _queue;
409 }
410
411 bool
412 Launcher::Job::getExclusive() const
413 {
414   return _exclusive;
415 }
416
417 std::string
418 Launcher::Job::getExclusiveStr() const
419 {
420   return _exclusive ? "true" : "false";
421 }
422
423 unsigned long
424 Launcher::Job::getMemPerCpu() const
425 {
426   return _mem_per_cpu;
427 }
428
429 std::string
430 Launcher::Job::getWCKey() const
431 {
432   return _wckey;
433 }
434
435 std::string
436 Launcher::Job::getExtraParams() const
437 {
438   return _extra_params;
439 }
440
441 std::string
442 Launcher::Job::getReference() const
443 {
444   return _reference;
445 }
446
447 void
448 Launcher::Job::setPreCommand(const std::string & preCommand)
449 {
450   _pre_command = preCommand;
451 }
452
453 std::string
454 Launcher::Job::getPreCommand() const
455 {
456   return _pre_command;
457 }
458
459 void
460 Launcher::Job::checkMaximumDuration(const std::string & maximum_duration)
461 {
462   std::string result("");
463   std::string edt_value = maximum_duration;
464   std::size_t pos = edt_value.find(":");
465
466   if (edt_value != "") {
467     if (pos == edt_value.npos) {
468       throw LauncherException("[Launcher::Job::checkMaximumDuration] Error on definition: " + edt_value);
469     }
470     std::string begin_edt_value = edt_value.substr(0, pos);
471     std::string mid_edt_value = edt_value.substr(pos, 1);
472     std::string end_edt_value = edt_value.substr(pos + 1, edt_value.npos);
473
474     long value;
475     std::istringstream iss(begin_edt_value);
476     if (!(iss >> value)) {
477       result = "[Launcher::Job::checkExpectedDuration] Error on definition ! : " + edt_value;
478     }
479     else if (value < 0) {
480       result = "[Launcher::Job::checkExpectedDuration] Error on definition time is negative ! : " + value;
481     }
482     std::istringstream iss_2(end_edt_value);
483     if (!(iss_2 >> value)) {
484       result = "[Launcher::Job::checkExpectedDuration] Error on definition ! : " + edt_value;
485     }
486     else if (value < 0) {
487       result = "[Launcher::Job::checkExpectedDuration] Error on definition time is negative ! : " + value;
488     }
489     if (mid_edt_value != ":") {
490       result = "[Launcher::Job::checkExpectedDuration] Error on definition ! :" + edt_value;
491     }
492   }
493   if (result != "")
494     throw LauncherException(result);
495 }
496
497 void
498 Launcher::Job::checkResourceRequiredParams(const resourceParams & resource_required_params)
499 {
500   // nb_proc has be to > 0
501   if (resource_required_params.nb_proc <= 0)
502   {
503     std::string message("[Launcher::Job::checkResourceRequiredParams] proc number is not > 0 ! ");
504     throw LauncherException(message);
505   }
506 }
507
508 long
509 Launcher::Job::convertMaximumDuration(const std::string & edt)
510 {
511   long hh, mm, ret;
512
513   if( edt.size() == 0 )
514     return -1;
515
516   std::string::size_type pos = edt.find(":");
517   std::string h = edt.substr(0,pos);
518   std::string m = edt.substr(pos+1,edt.size()-pos+1);
519   std::istringstream issh(h);
520   issh >> hh;
521   std::istringstream issm(m);
522   issm >> mm;
523   ret = hh*60 + mm;
524   ret = ret * 60;
525
526   return ret;
527 }
528
529 std::string
530 Launcher::Job::getLaunchDate() const
531 {
532   time_t rawtime;
533   time(&rawtime);
534   std::string launch_date = ctime(&rawtime);
535   size_t i = 0 ;
536   for (;i < launch_date.size(); i++)
537     if (launch_date[i] == '/' ||
538         launch_date[i] == '-' ||
539         launch_date[i] == ':' ||
540         launch_date[i] == ' ')
541       launch_date[i] = '_';
542   launch_date.erase(--launch_date.end()); // Last caracter is a \n
543
544   return launch_date;
545 }
546
547 std::string
548 Launcher::Job::updateJobState()
549 {
550
551   if (_state != "FINISHED" &&
552       _state != "ERROR"    &&
553       _state != "FAILED")
554   {
555 #ifdef WITH_LIBBATCH
556     if (_batch_job_id.getReference() != "undefined")
557     {
558       // A batch manager has been affected to the job
559       Batch::JobInfo job_info = _batch_job_id.queryJob();
560       Batch::Parametre par = job_info.getParametre();
561       _state = par[Batch::STATE].str();
562       _assigned_hostnames = (par.find(Batch::ASSIGNEDHOSTNAMES) == par.end())?
563                             "" : par[Batch::ASSIGNEDHOSTNAMES].str();
564       LAUNCHER_MESSAGE("State received is: " << par[Batch::STATE].str());
565     }
566 #endif
567   }
568   return _state;
569 }
570
571 #ifdef WITH_LIBBATCH
572 Batch::Job *
573 Launcher::Job::getBatchJob()
574 {
575   update_job();
576   return _batch_job;
577 }
578
579 Batch::Parametre
580 Launcher::Job::common_job_params()
581 {
582   Batch::Parametre params;
583
584   params[Batch::NAME] = getJobName();
585   params[Batch::NBPROC] = _resource_required_params.nb_proc;
586   params[Batch::NBPROCPERNODE] = _resource_required_params.nb_proc_per_node;
587
588   // Memory in megabytes
589   if (_resource_required_params.mem_mb > 0)
590   {
591     params[Batch::MAXRAMSIZE] = _resource_required_params.mem_mb;
592   }
593   else if (_mem_per_cpu > 0)
594   {
595     params[Batch::MEMPERCPU] = (long)_mem_per_cpu;
596   }
597
598   // We define a default directory
599   if (_work_directory == "")
600   {
601     const size_t BUFSIZE = 32;
602     char date[BUFSIZE];
603     time_t curtime = time(NULL);
604     strftime(date, BUFSIZE, "%Y_%m_%d__%H_%M_%S", localtime(&curtime));
605     if(!_resource_definition.working_directory.empty())
606     {
607       std::string date_dir = std::string("/job_") + date;
608       std::ostringstream str_pid;
609       str_pid << ::getpid();
610       std::string job_dir = date_dir + "-" + str_pid.str();
611
612       _work_directory = _resource_definition.working_directory + job_dir;
613     }
614     else
615     {
616       _work_directory = std::string("/$HOME/Batch/workdir_");
617       _work_directory += date;
618     }
619   }
620   params[Batch::WORKDIR] = _work_directory;
621   std::string libbatch_pre_command("");
622   if(!_pre_command.empty())
623   {
624     boost::filesystem::path pre_command_path(_pre_command);
625     libbatch_pre_command += "./" + pre_command_path.filename().string();
626   }
627   params[Batch::PREPROCESS] = libbatch_pre_command;
628
629   // Parameters for COORM
630   params[Batch::LAUNCHER_FILE] = _launcher_file;
631   params[Batch::LAUNCHER_ARGS] = _launcher_args;
632
633   // If result_directory is not defined, we use HOME environnement
634   if (_result_directory == "")
635     _result_directory = getenv("HOME");
636
637   // _in_files
638   std::list<std::string> in_files(_in_files);
639   in_files.push_back(_job_file);
640   if (_env_file != "")
641           in_files.push_back(_env_file);
642   if(!_pre_command.empty())
643      in_files.push_back(_pre_command);
644   for(std::list<std::string>::iterator it = in_files.begin(); it != in_files.end(); it++)
645   {
646     std::string file = *it;
647
648     // local file -> If file is not an absolute path, we apply _local_directory
649     std::string local_file;
650     if (file.substr(0, 1) == std::string("/"))
651       local_file = file;
652     else
653 #ifndef WIN32
654       local_file = _local_directory + "/" + file;
655 #else
656       local_file = file;
657 #endif
658
659     // remote file -> get only file name from in_files
660     size_t found = file.find_last_of("/");
661     std::string remote_file = _work_directory + "/" + file.substr(found+1);
662
663     params[Batch::INFILE] += Batch::Couple(local_file, remote_file);
664   }
665
666   // _out_files
667   for(std::list<std::string>::iterator it = _out_files.begin(); it != _out_files.end(); it++)
668   {
669     std::string file = *it;
670     // remote file -> If file is not an absolute path, we apply _work_directory
671     std::string remote_file;
672     std::string local_file;
673     if (file.substr(0, 1) == std::string("/"))
674     {
675       remote_file = file;
676       size_t found = file.find_last_of("/");
677       local_file = file.substr(found+1);
678     }
679     else
680     {
681       remote_file = _work_directory + "/" + file;
682       local_file = file;
683     }
684
685     params[Batch::OUTFILE] += Batch::Couple(local_file, remote_file);
686   }
687
688   // Time
689   if (_maximum_duration_in_second != -1)
690     params[Batch::MAXWALLTIME] = _maximum_duration_in_second / 60;
691
692   // Queue
693   if (_queue != "")
694     params[Batch::QUEUE] = _queue;
695
696   // Exclusive
697   if (getExclusive())
698     params[Batch::EXCLUSIVE] = true;
699
700   // WC Key
701   if (_wckey != "")
702     params[Batch::WCKEY] = _wckey;
703
704   // Extra params
705   if (_extra_params != "")
706     params[Batch::EXTRAPARAMS] = _extra_params;
707
708   // Specific parameters
709   std::map<std::string, std::string>::iterator it = _specific_parameters.find("LoalLevelerJobType");
710   if (it != _specific_parameters.end())
711     params["LL_JOBTYPE"] = it->second;
712   return params;
713 }
714
715 void
716 Launcher::Job::setBatchManagerJobId(Batch::JobId batch_manager_job_id)
717 {
718   _batch_job_id = batch_manager_job_id;
719 }
720
721 Batch::JobId
722 Launcher::Job::getBatchManagerJobId() const
723 {
724   return _batch_job_id;
725 }
726 #endif
727
728 void
729 Launcher::Job::addSpecificParameter(const std::string & name,
730                                       const std::string & value)
731 {
732   _specific_parameters[name] = value;
733 }
734
735 const std::map<std::string, std::string> &
736 Launcher::Job::getSpecificParameters() const
737 {
738   return _specific_parameters;
739 }
740
741 void
742 Launcher::Job::checkSpecificParameters()
743 {
744 }