Salome HOME
Add the preprocess feature to SALOME_Launcher.
[modules/kernel.git] / src / Launcher / Launcher_Job.cxx
1 // Copyright (C) 2009-2017  CEA/DEN, EDF R&D, OPEN CASCADE
2 //
3 // This library is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU Lesser General Public
5 // License as published by the Free Software Foundation; either
6 // version 2.1 of the License, or (at your option) any later version.
7 //
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
11 // Lesser General Public License for more details.
12 //
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
16 //
17 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
18 //
19
20 // Author: AndrĂ© RIBES - EDF R&D
21 //
22 //#define _DEBUG_
23 #include "Launcher_Job.hxx"
24 #include "Launcher.hxx"
25
26 #ifdef WITH_LIBBATCH
27 #include <libbatch/Constants.hxx>
28 #endif
29
30 #include <sstream>
31
32 Launcher::Job::Job()
33 {
34   _number = -1;
35   _state = "CREATED";
36   _launch_date = getLaunchDate();
37
38   _env_file = "";
39   _job_name = "";
40   _job_file = "";
41   _job_file_name = "";
42   _job_file_name_complete = "";
43   _pre_command = "";
44   _work_directory = "";
45   _local_directory = "";
46   _result_directory = "";
47   _maximum_duration = "";
48   _maximum_duration_in_second = -1;
49   _queue = "";
50   _job_type = "";
51   _exclusive = false;
52   _mem_per_cpu = 0;
53
54   // Parameters for COORM
55   _launcher_file = "";
56   _launcher_args = "";
57
58 #ifdef WITH_LIBBATCH
59   _batch_job = new Batch::Job();
60 #endif
61 }
62
63 Launcher::Job::~Job()
64 {
65   LAUNCHER_MESSAGE("Deleting job number: " << _number);
66 #ifdef WITH_LIBBATCH
67   if (_batch_job)
68     delete _batch_job;
69 #endif
70 }
71
72 void
73 Launcher::Job::stopJob()
74 {
75   LAUNCHER_MESSAGE("Stop resquested for job number: " << _number);
76   setState("FAILED");
77 #ifdef WITH_LIBBATCH
78   if (_batch_job_id.getReference() != "undefined")
79   {
80     try
81     {
82       _batch_job_id.deleteJob();
83     }
84     catch (const Batch::GenericException &ex)
85     {
86       LAUNCHER_INFOS("WARNING: exception when stopping the job: " << ex.message);
87     }
88   }
89 #endif
90 }
91
92
93 void
94 Launcher::Job::removeJob()
95 {
96   LAUNCHER_MESSAGE("Removing job number: " << _number);
97 #ifdef WITH_LIBBATCH
98   if (_batch_job_id.getReference() != "undefined")
99   {
100     try
101     {
102       _batch_job_id.deleteJob();
103     }
104     catch (const Batch::GenericException &ex)
105     {
106       LAUNCHER_INFOS("WARNING: exception when removing the job: " << ex.message);
107     }
108   }
109 #endif
110 }
111
112 std::string
113 Launcher::Job::getJobType() const
114 {
115   return _job_type;
116 }
117
118 void
119 Launcher::Job::setJobName(const std::string & job_name)
120 {
121   _job_name = job_name;
122 }
123
124 std::string
125 Launcher::Job::getJobName() const
126 {
127   return _job_name;
128 }
129
130 void
131 Launcher::Job::setState(const std::string & state)
132 {
133   // State of a Job: CREATED, QUEUED, RUNNING, FINISHED, FAILED
134   if (state != "CREATED" &&
135       state != "IN_PROCESS" &&
136       state != "QUEUED" &&
137       state != "RUNNING" &&
138       state != "PAUSED" &&
139       state != "FINISHED" &&
140       state != "FAILED" &&
141       state != "ERROR")
142   {
143     throw LauncherException("Bad state, this state does not exist: " + state);
144   }
145   _state = state;
146 }
147
148 std::string
149 Launcher::Job::getState() const
150 {
151   return _state;
152 }
153
154 // Get names or ids of hosts assigned to the job
155 std::string
156 Launcher::Job::getAssignedHostnames()
157 {
158   return _assigned_hostnames;
159 }
160
161 void
162 Launcher::Job::setNumber(const int & number)
163 {
164   if (_number != -1)
165     std::cerr << "Launcher::Job::setNumber -- Job number was already defined, before: " << _number << " now: " << number << std::endl;
166   _number = number;
167 }
168
169 int
170 Launcher::Job::getNumber()
171 {
172   return _number;
173 }
174
175 void
176 Launcher::Job::setResourceDefinition(const ParserResourcesType & resource_definition)
177 {
178   // Check machine_definition
179   std::string user_name = "";
180   if (resource_definition.UserName == "")
181   {
182     user_name = getenv("USER");
183     if (user_name == "")
184       user_name = getenv("LOGNAME");
185     if (user_name == "")
186     {
187       std::string mess = "You must define a user name: into your resource description or with one of env variables USER/LOGNAME";
188       throw LauncherException(mess);
189     }
190   }
191   else
192     user_name = resource_definition.UserName;
193
194   _resource_definition = resource_definition;
195   _resource_definition.UserName = user_name;
196 }
197
198 ParserResourcesType
199 Launcher::Job::getResourceDefinition() const
200 {
201   return _resource_definition;
202 }
203
204 void
205 Launcher::Job::setJobFile(const std::string & job_file)
206 {
207   // Check job file
208   if (job_file == "")
209   {
210     std::string mess = "Empty Job File is forbidden !";
211     throw LauncherException(mess);
212   }
213
214   _job_file = job_file;
215   std::string::size_type p1 = _job_file.find_last_of("/");
216   std::string::size_type p2 = _job_file.find_last_of(".");
217   _job_file_name_complete = _job_file.substr(p1+1);
218   _job_file_name = _job_file.substr(p1+1,p2-p1-1);
219 }
220
221 std::string
222 Launcher::Job::getJobFile() const
223 {
224   return _job_file;
225 }
226 void
227 Launcher::Job::setEnvFile(const std::string & env_file)
228 {
229   _env_file = env_file;
230 }
231
232 std::string
233 Launcher::Job::getEnvFile() const
234 {
235   return _env_file;
236 }
237
238 void
239 Launcher::Job::setWorkDirectory(const std::string & work_directory)
240 {
241   _work_directory = work_directory;
242 }
243
244 void
245 Launcher::Job::setLocalDirectory(const std::string & local_directory)
246 {
247   _local_directory = local_directory;
248 }
249
250 void
251 Launcher::Job::setResultDirectory(const std::string & result_directory)
252 {
253   _result_directory = result_directory;
254 }
255
256 void
257 Launcher::Job::add_in_file(const std::string & file)
258 {
259   std::list<std::string>::iterator it = std::find(_in_files.begin(), _in_files.end(), file);
260   if (it == _in_files.end())
261     _in_files.push_back(file);
262   else
263     std::cerr << "Launcher::Job::add_in_file -- Warning file was already entered in in_files: " << file << std::endl;
264 }
265
266 void
267 Launcher::Job::add_out_file(const std::string & file)
268 {
269   std::list<std::string>::iterator it = std::find(_out_files.begin(), _out_files.end(), file);
270   if (it == _out_files.end())
271     _out_files.push_back(file);
272   else
273     std::cerr << "Launcher::Job::add_out_file -- Warning file was already entered in out_files: " << file << std::endl;
274 }
275
276 void
277 Launcher::Job::setMaximumDuration(const std::string & maximum_duration)
278 {
279   checkMaximumDuration(maximum_duration);
280   _maximum_duration_in_second = convertMaximumDuration(maximum_duration);
281   _maximum_duration = maximum_duration;
282 }
283
284 // For COORM
285 void
286 Launcher::Job::setLauncherFile(const std::string & launcher_file)
287 {
288         _launcher_file = launcher_file;
289 }
290 void
291 Launcher::Job::setLauncherArgs(const std::string & launcher_args)
292 {
293         _launcher_args = launcher_args;
294 }
295
296 void
297 Launcher::Job::setResourceRequiredParams(const resourceParams & resource_required_params)
298 {
299   checkResourceRequiredParams(resource_required_params);
300   _resource_required_params = resource_required_params;
301 }
302
303 void
304 Launcher::Job::setQueue(const std::string & queue)
305 {
306   _queue = queue;
307 }
308
309 void
310 Launcher::Job::setExclusive(bool exclusive)
311 {
312   _exclusive = exclusive;
313 }
314
315 void
316 Launcher::Job::setExclusiveStr(const std::string & exclusiveStr)
317 {
318   if (exclusiveStr == "true")
319     _exclusive = true;
320   else if (exclusiveStr == "false")
321     _exclusive = false;
322   else
323     throw LauncherException(std::string("Invalid boolean value for exclusive: ") + exclusiveStr);
324 }
325
326 void
327 Launcher::Job::setMemPerCpu(unsigned long mem_per_cpu)
328 {
329   _mem_per_cpu = mem_per_cpu;
330 }
331
332 void
333 Launcher::Job::setWCKey(const std::string & wckey)
334 {
335   _wckey = wckey;
336 }
337
338 void
339 Launcher::Job::setExtraParams(const std::string & extra_params)
340 {
341   _extra_params = extra_params;
342 }
343
344 void
345 Launcher::Job::setReference(const std::string & reference)
346 {
347   _reference = reference;
348 }
349
350 std::string
351 Launcher::Job::getWorkDirectory() const
352 {
353   return _work_directory;
354 }
355
356 std::string
357 Launcher::Job::getLocalDirectory() const
358 {
359   return _local_directory;
360 }
361
362 std::string
363 Launcher::Job::getResultDirectory() const
364 {
365   return _result_directory;
366 }
367
368 const std::list<std::string> &
369 Launcher::Job::get_in_files() const
370 {
371   return _in_files;
372 }
373
374 const std::list<std::string> &
375 Launcher::Job::get_out_files() const
376 {
377   return _out_files;
378 }
379
380 std::string
381 Launcher::Job::getMaximumDuration() const
382 {
383   return _maximum_duration;
384 }
385
386 // For COORM
387 std::string
388 Launcher::Job::getLauncherFile() const
389 {
390         return _launcher_file;
391 }
392 std::string
393 Launcher::Job::getLauncherArgs() const
394 {
395         return _launcher_args;
396 }
397
398 resourceParams
399 Launcher::Job::getResourceRequiredParams() const
400 {
401   return _resource_required_params;
402 }
403
404 std::string
405 Launcher::Job::getQueue() const
406 {
407   return _queue;
408 }
409
410 bool
411 Launcher::Job::getExclusive() const
412 {
413   return _exclusive;
414 }
415
416 std::string
417 Launcher::Job::getExclusiveStr() const
418 {
419   return _exclusive ? "true" : "false";
420 }
421
422 unsigned long
423 Launcher::Job::getMemPerCpu() const
424 {
425   return _mem_per_cpu;
426 }
427
428 std::string
429 Launcher::Job::getWCKey() const
430 {
431   return _wckey;
432 }
433
434 std::string
435 Launcher::Job::getExtraParams() const
436 {
437   return _extra_params;
438 }
439
440 std::string
441 Launcher::Job::getReference() const
442 {
443   return _reference;
444 }
445
446 void
447 Launcher::Job::setPreCommand(const std::string & preCommand)
448 {
449   _pre_command = preCommand;
450 }
451
452 std::string
453 Launcher::Job::getPreCommand() const
454 {
455   return _pre_command;
456 }
457
458 void
459 Launcher::Job::checkMaximumDuration(const std::string & maximum_duration)
460 {
461   std::string result("");
462   std::string edt_value = maximum_duration;
463   std::size_t pos = edt_value.find(":");
464
465   if (edt_value != "") {
466     if (pos == edt_value.npos) {
467       throw LauncherException("[Launcher::Job::checkMaximumDuration] Error on definition: " + edt_value);
468     }
469     std::string begin_edt_value = edt_value.substr(0, pos);
470     std::string mid_edt_value = edt_value.substr(pos, 1);
471     std::string end_edt_value = edt_value.substr(pos + 1, edt_value.npos);
472
473     long value;
474     std::istringstream iss(begin_edt_value);
475     if (!(iss >> value)) {
476       result = "[Launcher::Job::checkExpectedDuration] Error on definition ! : " + edt_value;
477     }
478     else if (value < 0) {
479       result = "[Launcher::Job::checkExpectedDuration] Error on definition time is negative ! : " + value;
480     }
481     std::istringstream iss_2(end_edt_value);
482     if (!(iss_2 >> value)) {
483       result = "[Launcher::Job::checkExpectedDuration] Error on definition ! : " + edt_value;
484     }
485     else if (value < 0) {
486       result = "[Launcher::Job::checkExpectedDuration] Error on definition time is negative ! : " + value;
487     }
488     if (mid_edt_value != ":") {
489       result = "[Launcher::Job::checkExpectedDuration] Error on definition ! :" + edt_value;
490     }
491   }
492   if (result != "")
493     throw LauncherException(result);
494 }
495
496 void
497 Launcher::Job::checkResourceRequiredParams(const resourceParams & resource_required_params)
498 {
499   // nb_proc has be to > 0
500   if (resource_required_params.nb_proc <= 0)
501   {
502     std::string message("[Launcher::Job::checkResourceRequiredParams] proc number is not > 0 ! ");
503     throw LauncherException(message);
504   }
505 }
506
507 long
508 Launcher::Job::convertMaximumDuration(const std::string & edt)
509 {
510   long hh, mm, ret;
511
512   if( edt.size() == 0 )
513     return -1;
514
515   std::string::size_type pos = edt.find(":");
516   std::string h = edt.substr(0,pos);
517   std::string m = edt.substr(pos+1,edt.size()-pos+1);
518   std::istringstream issh(h);
519   issh >> hh;
520   std::istringstream issm(m);
521   issm >> mm;
522   ret = hh*60 + mm;
523   ret = ret * 60;
524
525   return ret;
526 }
527
528 std::string
529 Launcher::Job::getLaunchDate() const
530 {
531   time_t rawtime;
532   time(&rawtime);
533   std::string launch_date = ctime(&rawtime);
534   size_t i = 0 ;
535   for (;i < launch_date.size(); i++)
536     if (launch_date[i] == '/' ||
537         launch_date[i] == '-' ||
538         launch_date[i] == ':' ||
539         launch_date[i] == ' ')
540       launch_date[i] = '_';
541   launch_date.erase(--launch_date.end()); // Last caracter is a \n
542
543   return launch_date;
544 }
545
546 std::string
547 Launcher::Job::updateJobState()
548 {
549
550   if (_state != "FINISHED" &&
551       _state != "ERROR"    &&
552       _state != "FAILED")
553   {
554 #ifdef WITH_LIBBATCH
555     if (_batch_job_id.getReference() != "undefined")
556     {
557       // A batch manager has been affected to the job
558       Batch::JobInfo job_info = _batch_job_id.queryJob();
559       Batch::Parametre par = job_info.getParametre();
560       _state = par[Batch::STATE].str();
561       _assigned_hostnames = (par.find(Batch::ASSIGNEDHOSTNAMES) == par.end())?
562                             "" : par[Batch::ASSIGNEDHOSTNAMES].str();
563       LAUNCHER_MESSAGE("State received is: " << par[Batch::STATE].str());
564     }
565 #endif
566   }
567   return _state;
568 }
569
570 #ifdef WITH_LIBBATCH
571 Batch::Job *
572 Launcher::Job::getBatchJob()
573 {
574   update_job();
575   return _batch_job;
576 }
577
578 Batch::Parametre
579 Launcher::Job::common_job_params()
580 {
581   Batch::Parametre params;
582
583   params[Batch::NAME] = getJobName();
584   params[Batch::NBPROC] = _resource_required_params.nb_proc;
585   params[Batch::NBPROCPERNODE] = _resource_required_params.nb_proc_per_node;
586
587   // Memory in megabytes
588   if (_resource_required_params.mem_mb > 0)
589   {
590     params[Batch::MAXRAMSIZE] = _resource_required_params.mem_mb;
591   }
592   else if (_mem_per_cpu > 0)
593   {
594     params[Batch::MEMPERCPU] = (long)_mem_per_cpu;
595   }
596
597   // We define a default directory
598   if (_work_directory == "")
599   {
600     const size_t BUFSIZE = 32;
601     char date[BUFSIZE];
602     time_t curtime = time(NULL);
603     strftime(date, BUFSIZE, "%Y_%m_%d__%H_%M_%S", localtime(&curtime));
604     if(!_resource_definition.working_directory.empty())
605     {
606       std::string date_dir = std::string("/job_") + date;
607       std::ostringstream str_pid;
608       str_pid << ::getpid();
609       std::string job_dir = date_dir + "-" + str_pid.str();
610
611       _work_directory = _resource_definition.working_directory + job_dir;
612     }
613     else
614     {
615       _work_directory = std::string("/$HOME/Batch/workdir_");
616       _work_directory += date;
617     }
618   }
619   params[Batch::WORKDIR] = _work_directory;
620   params[Batch::PREPROCESS] = _pre_command;
621
622   // Parameters for COORM
623   params[Batch::LAUNCHER_FILE] = _launcher_file;
624   params[Batch::LAUNCHER_ARGS] = _launcher_args;
625
626   // If result_directory is not defined, we use HOME environnement
627   if (_result_directory == "")
628     _result_directory = getenv("HOME");
629
630   // _in_files
631   std::list<std::string> in_files(_in_files);
632   in_files.push_back(_job_file);
633   if (_env_file != "")
634           in_files.push_back(_env_file);
635   for(std::list<std::string>::iterator it = in_files.begin(); it != in_files.end(); it++)
636   {
637     std::string file = *it;
638
639     // local file -> If file is not an absolute path, we apply _local_directory
640     std::string local_file;
641     if (file.substr(0, 1) == std::string("/"))
642       local_file = file;
643     else
644 #ifndef WIN32
645       local_file = _local_directory + "/" + file;
646 #else
647       local_file = file;
648 #endif
649
650     // remote file -> get only file name from in_files
651     size_t found = file.find_last_of("/");
652     std::string remote_file = _work_directory + "/" + file.substr(found+1);
653
654     params[Batch::INFILE] += Batch::Couple(local_file, remote_file);
655   }
656
657   // _out_files
658   for(std::list<std::string>::iterator it = _out_files.begin(); it != _out_files.end(); it++)
659   {
660     std::string file = *it;
661     // remote file -> If file is not an absolute path, we apply _work_directory
662     std::string remote_file;
663     std::string local_file;
664     if (file.substr(0, 1) == std::string("/"))
665     {
666       remote_file = file;
667       size_t found = file.find_last_of("/");
668       local_file = file.substr(found+1);
669     }
670     else
671     {
672       remote_file = _work_directory + "/" + file;
673       local_file = file;
674     }
675
676     params[Batch::OUTFILE] += Batch::Couple(local_file, remote_file);
677   }
678
679   // Time
680   if (_maximum_duration_in_second != -1)
681     params[Batch::MAXWALLTIME] = _maximum_duration_in_second / 60;
682
683   // Queue
684   if (_queue != "")
685     params[Batch::QUEUE] = _queue;
686
687   // Exclusive
688   if (getExclusive())
689     params[Batch::EXCLUSIVE] = true;
690
691   // WC Key
692   if (_wckey != "")
693     params[Batch::WCKEY] = _wckey;
694
695   // Extra params
696   if (_extra_params != "")
697     params[Batch::EXTRAPARAMS] = _extra_params;
698
699   // Specific parameters
700   std::map<std::string, std::string>::iterator it = _specific_parameters.find("LoalLevelerJobType");
701   if (it != _specific_parameters.end())
702     params["LL_JOBTYPE"] = it->second;
703   return params;
704 }
705
706 void
707 Launcher::Job::setBatchManagerJobId(Batch::JobId batch_manager_job_id)
708 {
709   _batch_job_id = batch_manager_job_id;
710 }
711
712 Batch::JobId
713 Launcher::Job::getBatchManagerJobId() const
714 {
715   return _batch_job_id;
716 }
717 #endif
718
719 void
720 Launcher::Job::addSpecificParameter(const std::string & name,
721                                       const std::string & value)
722 {
723   _specific_parameters[name] = value;
724 }
725
726 const std::map<std::string, std::string> &
727 Launcher::Job::getSpecificParameters() const
728 {
729   return _specific_parameters;
730 }
731
732 void
733 Launcher::Job::checkSpecificParameters()
734 {
735 }