Salome HOME
04a238513598757ca1f6a644d5ebb3faef1b7120
[tools/sat.git] / commands / jobs.py
1 #!/usr/bin/env python
2 #-*- coding:utf-8 -*-
3 #  Copyright (C) 2010-2013  CEA/DEN
4 #
5 #  This library is free software; you can redistribute it and/or
6 #  modify it under the terms of the GNU Lesser General Public
7 #  License as published by the Free Software Foundation; either
8 #  version 2.1 of the License.
9 #
10 #  This library is distributed in the hope that it will be useful,
11 #  but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 #  Lesser General Public License for more details.
14 #
15 #  You should have received a copy of the GNU Lesser General Public
16 #  License along with this library; if not, write to the Free Software
17 #  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
18
19 import os
20 import datetime
21 import time
22 import paramiko
23
24 import src
25
26
27 parser = src.options.Options()
28
29 parser.add_option('j', 'jobs_config', 'string', 'jobs_cfg', 
30                   _('The name of the config file that contains'
31                   ' the jobs configuration'))
32 parser.add_option('o', 'only_jobs', 'list2', 'only_jobs',
33                   _('The list of jobs to launch, by their name. '))
34 parser.add_option('l', 'list', 'boolean', 'list', 
35                   _('list all available config files.'))
36 parser.add_option('n', 'no_label', 'boolean', 'no_label',
37                   _("do not print labels, Works only with --list."), False)
38 parser.add_option('t', 'test_connection', 'boolean', 'test_connection',
39                   _("Try to connect to the machines. Not executing the jobs."),
40                   False)
41 parser.add_option('p', 'publish', 'boolean', 'publish',
42                   _("Generate an xml file that can be read in a browser to "
43                     "display the jobs status."),
44                   False)
45
46 class machine(object):
47     '''Class to manage a ssh connection on a machine
48     '''
49     def __init__(self, name, host, user, port=22, passwd=None, sat_path="salomeTools"):
50         self.name = name
51         self.host = host
52         self.port = port
53         self.user = user
54         self.password = passwd
55         self.sat_path = sat_path
56         self.ssh = paramiko.SSHClient()
57         self._connection_successful = None
58     
59     def connect(self, logger):
60         '''Initiate the ssh connection to the remote machine
61         
62         :param logger src.logger.Logger: The logger instance 
63         :return: Nothing
64         :rtype: N\A
65         '''
66
67         self._connection_successful = False
68         self.ssh.load_system_host_keys()
69         self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
70         try:
71             self.ssh.connect(self.host,
72                              port=self.port,
73                              username=self.user,
74                              password = self.password)
75         except paramiko.AuthenticationException:
76             message = src.KO_STATUS + _("Authentication failed")
77         except paramiko.BadHostKeyException:
78             message = (src.KO_STATUS + 
79                        _("The server's host key could not be verified"))
80         except paramiko.SSHException:
81             message = ( _("SSHException error connecting or "
82                           "establishing an SSH session"))            
83         except:
84             message = ( _("Error connecting or establishing an SSH session"))
85         else:
86             self._connection_successful = True
87             message = ""
88         return message
89     
90     def successfully_connected(self, logger):
91         '''Verify if the connection to the remote machine has succeed
92         
93         :param logger src.logger.Logger: The logger instance 
94         :return: True if the connection has succeed, False if not
95         :rtype: bool
96         '''
97         if self._connection_successful == None:
98             message = "Warning : trying to ask if the connection to "
99             "(host: %s, port: %s, user: %s) is OK whereas there were"
100             " no connection request" % \
101             (machine.host, machine.port, machine.user)
102             logger.write( src.printcolors.printcWarning(message))
103         return self._connection_successful
104
105     def copy_sat(self, sat_local_path, job_file):
106         '''Copy salomeTools to the remote machine in self.sat_path
107         '''
108         res = 0
109         try:
110             self.sftp = self.ssh.open_sftp()
111             self.mkdir(self.sat_path, ignore_existing=True)
112             self.put_dir(sat_local_path, self.sat_path, filters = ['.git'])
113             job_file_name = os.path.basename(job_file)
114             self.sftp.put(job_file, os.path.join(self.sat_path,
115                                                  "data",
116                                                  "jobs",
117                                                  job_file_name))
118         except Exception as e:
119             res = str(e)
120             self._connection_successful = False
121         
122         return res
123         
124     def put_dir(self, source, target, filters = []):
125         ''' Uploads the contents of the source directory to the target path. The
126             target directory needs to exists. All subdirectories in source are 
127             created under target.
128         '''
129         for item in os.listdir(source):
130             if item in filters:
131                 continue
132             source_path = os.path.join(source, item)
133             destination_path = os.path.join(target, item)
134             if os.path.islink(source_path):
135                 linkto = os.readlink(source_path)
136                 try:
137                     self.sftp.symlink(linkto, destination_path)
138                     self.sftp.chmod(destination_path,
139                                     os.stat(source_path).st_mode)
140                 except IOError:
141                     pass
142             else:
143                 if os.path.isfile(source_path):
144                     self.sftp.put(source_path, destination_path)
145                     self.sftp.chmod(destination_path,
146                                     os.stat(source_path).st_mode)
147                 else:
148                     self.mkdir(destination_path, ignore_existing=True)
149                     self.put_dir(source_path, destination_path)
150
151     def mkdir(self, path, mode=511, ignore_existing=False):
152         ''' Augments mkdir by adding an option to not fail 
153             if the folder exists 
154         '''
155         try:
156             self.sftp.mkdir(path, mode)
157         except IOError:
158             if ignore_existing:
159                 pass
160             else:
161                 raise       
162     
163     def exec_command(self, command, logger):
164         '''Execute the command on the remote machine
165         
166         :param command str: The command to be run
167         :param logger src.logger.Logger: The logger instance 
168         :return: the stdin, stdout, and stderr of the executing command,
169                  as a 3-tuple
170         :rtype: (paramiko.channel.ChannelFile, paramiko.channel.ChannelFile,
171                 paramiko.channel.ChannelFile)
172         '''
173         try:        
174             # Does not wait the end of the command
175             (stdin, stdout, stderr) = self.ssh.exec_command(command)
176         except paramiko.SSHException:
177             message = src.KO_STATUS + _(
178                             ": the server failed to execute the command\n")
179             logger.write( src.printcolors.printcError(message))
180             return (None, None, None)
181         except:
182             logger.write( src.printcolors.printcError(src.KO_STATUS + '\n'))
183             return (None, None, None)
184         else:
185             return (stdin, stdout, stderr)
186
187     def close(self):
188         '''Close the ssh connection
189         
190         :rtype: N\A
191         '''
192         self.ssh.close()
193      
194     def write_info(self, logger):
195         '''Prints the informations relative to the machine in the logger 
196            (terminal traces and log file)
197         
198         :param logger src.logger.Logger: The logger instance
199         :return: Nothing
200         :rtype: N\A
201         '''
202         logger.write("host : " + self.host + "\n")
203         logger.write("port : " + str(self.port) + "\n")
204         logger.write("user : " + str(self.user) + "\n")
205         if self.successfully_connected(logger):
206             status = src.OK_STATUS
207         else:
208             status = src.KO_STATUS
209         logger.write("Connection : " + status + "\n\n") 
210
211
212 class job(object):
213     '''Class to manage one job
214     '''
215     def __init__(self, name, machine, application, distribution,
216                  commands, timeout, config, logger, job_file, after=None):
217
218         self.name = name
219         self.machine = machine
220         self.after = after
221         self.timeout = timeout
222         self.application = application
223         self.distribution = distribution
224         self.config = config
225         self.logger = logger
226         # The list of log files to download from the remote machine 
227         self.remote_log_files = []
228         
229         # The remote command status
230         # -1 means that it has not been launched, 
231         # 0 means success and 1 means fail
232         self.res_job = "-1"
233         self.cancelled = False
234         
235         self._T0 = -1
236         self._Tf = -1
237         self._has_begun = False
238         self._has_finished = False
239         self._has_timouted = False
240         self._stdin = None # Store the command inputs field
241         self._stdout = None # Store the command outputs field
242         self._stderr = None # Store the command errors field
243
244         self.out = None # Contains something only if the job is finished
245         self.err = None # Contains something only if the job is finished    
246                
247         self.commands = commands
248         self.command = (os.path.join(self.machine.sat_path, "sat") +
249                         " -l " +
250                         os.path.join(self.machine.sat_path,
251                                      "list_log_files.txt") +
252                         " job --jobs_config " +
253                         job_file +
254                         " --job " +
255                         self.name)
256     
257     def get_pids(self):
258         pids = []
259         cmd_pid = 'ps aux | grep "' + self.command + '" | awk \'{print $2}\''
260         (_, out_pid, _) = self.machine.exec_command(cmd_pid, self.logger)
261         pids_cmd = out_pid.readlines()
262         pids_cmd = [str(src.only_numbers(pid)) for pid in pids_cmd]
263         pids+=pids_cmd
264         return pids
265     
266     def kill_remote_process(self):
267         '''Kills the process on the remote machine.
268         
269         :return: (the output of the kill, the error of the kill)
270         :rtype: (str, str)
271         '''
272         
273         pids = self.get_pids()
274         cmd_kill = " ; ".join([("kill -2 " + pid) for pid in pids])
275         (_, out_kill, err_kill) = self.machine.exec_command(cmd_kill, 
276                                                             self.logger)
277         return (out_kill, err_kill)
278             
279     def has_begun(self):
280         '''Returns True if the job has already begun
281         
282         :return: True if the job has already begun
283         :rtype: bool
284         '''
285         return self._has_begun
286     
287     def has_finished(self):
288         '''Returns True if the job has already finished 
289            (i.e. all the commands have been executed)
290            If it is finished, the outputs are stored in the fields out and err.
291         
292         :return: True if the job has already finished
293         :rtype: bool
294         '''
295         
296         # If the method has already been called and returned True
297         if self._has_finished:
298             return True
299         
300         # If the job has not begun yet
301         if not self.has_begun():
302             return False
303         
304         if self._stdout.channel.closed:
305             self._has_finished = True
306             # Store the result outputs
307             self.out = self._stdout.read()
308             self.err = self._stderr.read()
309             # Put end time
310             self._Tf = time.time()
311             # And get the remote command status and log files
312             self.get_log_files()
313         
314         return self._has_finished
315           
316     def get_log_files(self):
317         if not self.has_finished():
318             msg = _("Trying to get log files whereas the job is not finished.")
319             self.logger.write(src.printcolors.printcWarning(msg))
320             return
321         
322         tmp_file_path = src.get_tmp_filename(self.config, "list_log_files.txt")
323         self.machine.sftp.get(
324                     os.path.join(self.machine.sat_path, "list_log_files.txt"),
325                     tmp_file_path)
326         
327         fstream_tmp = open(tmp_file_path, "r")
328         file_lines = fstream_tmp.readlines()
329         file_lines = [line.replace("\n", "") for line in file_lines]
330         fstream_tmp.close()
331         os.remove(tmp_file_path)
332         self.res_job = file_lines[0]
333         for job_path_remote in file_lines[1:]:
334             if os.path.basename(os.path.dirname(job_path_remote)) != 'OUT':
335                 local_path = os.path.join(os.path.dirname(
336                                                     self.logger.logFilePath),
337                                           os.path.basename(job_path_remote))
338                 if not os.path.exists(local_path):
339                     self.machine.sftp.get(job_path_remote, local_path)
340             else:
341                 local_path = os.path.join(os.path.dirname(
342                                                     self.logger.logFilePath),
343                                           'OUT',
344                                           os.path.basename(job_path_remote))
345                 if not os.path.exists(local_path):
346                     self.machine.sftp.get(job_path_remote, local_path)
347             self.remote_log_files.append(local_path)
348
349     def has_failed(self):
350         '''Returns True if the job has failed. 
351            A job is considered as failed if the machine could not be reached,
352            if the remote command failed, 
353            or if the job finished with a time out.
354         
355         :return: True if the job has failed
356         :rtype: bool
357         '''
358         if not self.has_finished():
359             return False
360         if not self.machine.successfully_connected(self.logger):
361             return True
362         if self.is_timeout():
363             return True
364         if self.res_job == "1":
365             return True
366         return False
367     
368     def cancel(self):
369         """In case of a failing job, one has to cancel every job that depend 
370            on it. This method put the job as failed and will not be executed.
371         """
372         self._has_begun = True
373         self._has_finished = True
374         self.cancelled = True
375         self.out = _("This job was not launched because its father has failed.")
376         self.err = _("This job was not launched because its father has failed.")
377
378     def is_running(self):
379         '''Returns True if the job commands are running 
380         
381         :return: True if the job is running
382         :rtype: bool
383         '''
384         return self.has_begun() and not self.has_finished()
385
386     def is_timeout(self):
387         '''Returns True if the job commands has finished with timeout 
388         
389         :return: True if the job has finished with timeout
390         :rtype: bool
391         '''
392         return self._has_timouted
393
394     def time_elapsed(self):
395         if not self.has_begun():
396             return -1
397         T_now = time.time()
398         return T_now - self._T0
399     
400     def check_time(self):
401         if not self.has_begun():
402             return
403         if self.time_elapsed() > self.timeout:
404             self._has_finished = True
405             self._has_timouted = True
406             self._Tf = time.time()
407             self.get_pids()
408             (out_kill, _) = self.kill_remote_process()
409             self.out = "TIMEOUT \n" + out_kill.read()
410             self.err = "TIMEOUT : %s seconds elapsed\n" % str(self.timeout)
411     
412     def total_duration(self):
413         return self._Tf - self._T0
414         
415     def run(self, logger):
416         if self.has_begun():
417             print("Warn the user that a job can only be launched one time")
418             return
419         
420         if not self.machine.successfully_connected(logger):
421             self._has_finished = True
422             self.out = "N\A"
423             self.err = ("Connection to machine (name : %s, host: %s, port:"
424                         " %s, user: %s) has failed\nUse the log command "
425                         "to get more information."
426                         % (self.machine.name,
427                            self.machine.host,
428                            self.machine.port,
429                            self.machine.user))
430         else:
431             self._T0 = time.time()
432             self._stdin, self._stdout, self._stderr = self.machine.exec_command(
433                                                         self.command, logger)
434             if (self._stdin, self._stdout, self._stderr) == (None, None, None):
435                 self._has_finished = True
436                 self._Tf = time.time()
437                 self.out = "N\A"
438                 self.err = "The server failed to execute the command"
439         
440         self._has_begun = True
441     
442     def write_results(self, logger):
443         logger.write("name : " + self.name + "\n")
444         if self.after:
445             logger.write("after : %s\n" % self.after)
446         logger.write("Time elapsed : %4imin %2is \n" % 
447                      (self.total_duration()/60 , self.total_duration()%60))
448         if self._T0 != -1:
449             logger.write("Begin time : %s\n" % 
450                          time.strftime('%Y-%m-%d %H:%M:%S', 
451                                        time.localtime(self._T0)) )
452         if self._Tf != -1:
453             logger.write("End time   : %s\n\n" % 
454                          time.strftime('%Y-%m-%d %H:%M:%S', 
455                                        time.localtime(self._Tf)) )
456         
457         machine_head = "Informations about connection :\n"
458         underline = (len(machine_head) - 2) * "-"
459         logger.write(src.printcolors.printcInfo(machine_head+underline+"\n"))
460         self.machine.write_info(logger)
461         
462         logger.write(src.printcolors.printcInfo("out : \n"))
463         if self.out is None:
464             logger.write("Unable to get output\n")
465         else:
466             logger.write(self.out + "\n")
467         logger.write(src.printcolors.printcInfo("err : \n"))
468         if self.err is None:
469             logger.write("Unable to get error\n")
470         else:
471             logger.write(self.err + "\n")
472         
473     def get_status(self):
474         if not self.machine.successfully_connected(self.logger):
475             return "SSH connection KO"
476         if not self.has_begun():
477             return "Not launched"
478         if self.cancelled:
479             return "Cancelled"
480         if self.is_running():
481             return "running since " + time.strftime('%Y-%m-%d %H:%M:%S',
482                                                     time.localtime(self._T0))        
483         if self.has_finished():
484             if self.is_timeout():
485                 return "Timeout since " + time.strftime('%Y-%m-%d %H:%M:%S',
486                                                     time.localtime(self._Tf))
487             return "Finished since " + time.strftime('%Y-%m-%d %H:%M:%S',
488                                                      time.localtime(self._Tf))
489     
490 class Jobs(object):
491     '''Class to manage the jobs to be run
492     '''
493     def __init__(self,
494                  runner,
495                  logger,
496                  job_file,
497                  job_file_path,
498                  config_jobs,
499                  lenght_columns = 20):
500         # The jobs configuration
501         self.cfg_jobs = config_jobs
502         self.job_file = job_file
503         self.job_file_path = job_file_path
504         # The machine that will be used today
505         self.lmachines = []
506         # The list of machine (hosts, port) that will be used today 
507         # (a same host can have several machine instances since there 
508         # can be several ssh parameters) 
509         self.lhosts = []
510         # The jobs to be launched today 
511         self.ljobs = []
512         # The jobs that will not be launched today
513         self.ljobsdef_not_today = []
514         self.runner = runner
515         self.logger = logger
516         # The correlation dictionary between jobs and machines
517         self.dic_job_machine = {} 
518         self.len_columns = lenght_columns
519         
520         # the list of jobs that have not been run yet
521         self._l_jobs_not_started = []
522         # the list of jobs that have already ran 
523         self._l_jobs_finished = []
524         # the list of jobs that are running 
525         self._l_jobs_running = [] 
526                 
527         self.determine_products_and_machines()
528     
529     def define_job(self, job_def, machine):
530         '''Takes a pyconf job definition and a machine (from class machine)
531            and returns the job instance corresponding to the definition.
532         
533         :param job_def src.config.Mapping: a job definition 
534         :param machine machine: the machine on which the job will run
535         :return: The corresponding job in a job class instance
536         :rtype: job
537         '''
538         name = job_def.name
539         cmmnds = job_def.commands
540         timeout = job_def.timeout
541         after = None
542         if 'after' in job_def:
543             after = job_def.after
544         application = None
545         if 'application' in job_def:
546             application = job_def.application
547         distribution = None
548         if 'distribution' in job_def:
549             distribution = job_def.distribution
550             
551         return job(name,
552                    machine,
553                    application,
554                    distribution,
555                    cmmnds,
556                    timeout,
557                    self.runner.cfg,
558                    self.logger,
559                    self.job_file,
560                    after = after)
561     
562     def determine_products_and_machines(self):
563         '''Function that reads the pyconf jobs definition and instantiates all
564            the machines and jobs to be done today.
565
566         :return: Nothing
567         :rtype: N\A
568         '''
569         today = datetime.date.weekday(datetime.date.today())
570         host_list = []
571                
572         for job_def in self.cfg_jobs.jobs :
573             if today in job_def.when:
574                 
575                 name_machine = job_def.machine
576                 
577                 a_machine = None
578                 for mach in self.lmachines:
579                     if mach.name == name_machine:
580                         a_machine = mach
581                         break
582                 
583                 if a_machine == None:
584                     for machine_def in self.cfg_jobs.machines:
585                         if machine_def.name == name_machine:
586                             if 'host' not in machine_def:
587                                 host = self.runner.cfg.VARS.hostname
588                             else:
589                                 host = machine_def.host
590
591                             if 'user' not in machine_def:
592                                 user = self.runner.cfg.VARS.user
593                             else:
594                                 user = machine_def.user
595
596                             if 'port' not in machine_def:
597                                 port = 22
598                             else:
599                                 port = machine_def.port
600                 
601                             if 'password' not in machine_def:
602                                 passwd = None
603                             else:
604                                 passwd = machine_def.password    
605                                 
606                             if 'sat_path' not in machine_def:
607                                 sat_path = "salomeTools"
608                             else:
609                                 sat_path = machine_def.sat_path
610                             
611                             a_machine = machine(
612                                                 machine_def.name,
613                                                 host,
614                                                 user,
615                                                 port=port,
616                                                 passwd=passwd,
617                                                 sat_path=sat_path
618                                                 )
619                             
620                             if (host, port) not in host_list:
621                                 host_list.append((host, port))
622                                              
623                             self.lmachines.append(a_machine)
624                 
625                 if a_machine == None:
626                     msg = _("WARNING: The job \"%(job_name)s\" requires the "
627                             "machine \"%(machine_name)s\" but this machine "
628                             "is not defined in the configuration file.\n"
629                             "The job will not be launched")
630                     self.logger.write(src.printcolors.printcWarning(msg))
631                                   
632                 a_job = self.define_job(job_def, a_machine)
633                 self.dic_job_machine[a_job] = a_machine
634                 
635                 self.ljobs.append(a_job)
636             else: # today in job_def.when
637                 self.ljobsdef_not_today.append(job_def)
638                                      
639         self.lhosts = host_list
640         
641     def ssh_connection_all_machines(self, pad=50):
642         '''Function that do the ssh connection to every machine 
643            to be used today.
644
645         :return: Nothing
646         :rtype: N\A
647         '''
648         self.logger.write(src.printcolors.printcInfo((
649                         "Establishing connection with all the machines :\n")))
650         for machine in self.lmachines:
651             # little algorithm in order to display traces
652             begin_line = (_("Connection to %s: " % machine.name))
653             if pad - len(begin_line) < 0:
654                 endline = " "
655             else:
656                 endline = (pad - len(begin_line)) * "." + " "
657             
658             step = "SSH connection"
659             self.logger.write( begin_line + endline + step)
660             self.logger.flush()
661             # the call to the method that initiate the ssh connection
662             msg = machine.connect(self.logger)
663             
664             # Copy salomeTools to the remote machine
665             if machine.successfully_connected(self.logger):
666                 step = _("Copy SAT")
667                 self.logger.write('\r%s%s%s' % (begin_line, endline, 20 * " "),3)
668                 self.logger.write('\r%s%s%s' % (begin_line, endline, step), 3)
669                 self.logger.flush()
670                 res_copy = machine.copy_sat(self.runner.cfg.VARS.salometoolsway,
671                                             self.job_file_path)
672                 # Print the status of the copy
673                 if res_copy == 0:
674                     self.logger.write('\r%s' % 
675                                 ((len(begin_line)+len(endline)+20) * " "), 3)
676                     self.logger.write('\r%s%s%s' % 
677                         (begin_line, 
678                          endline, 
679                          src.printcolors.printc(src.OK_STATUS)), 3)
680                 else:
681                     self.logger.write('\r%s' % 
682                             ((len(begin_line)+len(endline)+20) * " "), 3)
683                     self.logger.write('\r%s%s%s %s' % 
684                         (begin_line,
685                          endline,
686                          src.printcolors.printc(src.OK_STATUS),
687                          _("Copy of SAT failed")), 3)
688             else:
689                 self.logger.write('\r%s' % 
690                                   ((len(begin_line)+len(endline)+20) * " "), 3)
691                 self.logger.write('\r%s%s%s %s' % 
692                     (begin_line,
693                      endline,
694                      src.printcolors.printc(src.KO_STATUS),
695                      msg), 3)
696             self.logger.write("\n", 3)
697                 
698         self.logger.write("\n")
699         
700
701     def is_occupied(self, hostname):
702         '''Function that returns True if a job is running on 
703            the machine defined by its host and its port.
704         
705         :param hostname (str, int): the pair (host, port)
706         :return: the job that is running on the host, 
707                 or false if there is no job running on the host. 
708         :rtype: job / bool
709         '''
710         host = hostname[0]
711         port = hostname[1]
712         for jb in self.dic_job_machine:
713             if jb.machine.host == host and jb.machine.port == port:
714                 if jb.is_running():
715                     return jb
716         return False
717     
718     def update_jobs_states_list(self):
719         '''Function that updates the lists that store the currently
720            running jobs and the jobs that have already finished.
721         
722         :return: Nothing. 
723         :rtype: N\A
724         '''
725         jobs_finished_list = []
726         jobs_running_list = []
727         for jb in self.dic_job_machine:
728             if jb.is_running():
729                 jobs_running_list.append(jb)
730                 jb.check_time()
731             if jb.has_finished():
732                 jobs_finished_list.append(jb)
733         
734         nb_job_finished_before = len(self._l_jobs_finished)
735         self._l_jobs_finished = jobs_finished_list
736         self._l_jobs_running = jobs_running_list
737         
738         nb_job_finished_now = len(self._l_jobs_finished)
739         
740         return nb_job_finished_now > nb_job_finished_before
741     
742     def cancel_dependencies_of_failing_jobs(self):
743         '''Function that cancels all the jobs that depend on a failing one.
744         
745         :return: Nothing. 
746         :rtype: N\A
747         '''
748         
749         for job in self.ljobs:
750             if job.after is None:
751                 continue
752             father_job = self.find_job_that_has_name(job.after)
753             if father_job.has_failed():
754                 job.cancel()
755     
756     def find_job_that_has_name(self, name):
757         '''Returns the job by its name.
758         
759         :param name str: a job name
760         :return: the job that has the name. 
761         :rtype: job
762         '''
763         for jb in self.ljobs:
764             if jb.name == name:
765                 return jb
766
767         # the following is executed only if the job was not found
768         msg = _('The job "%s" seems to be nonexistent') % name
769         raise src.SatException(msg)
770     
771     def str_of_length(self, text, length):
772         '''Takes a string text of any length and returns 
773            the most close string of length "length".
774         
775         :param text str: any string
776         :param length int: a length for the returned string
777         :return: the most close string of length "length"
778         :rtype: str
779         '''
780         if len(text) > length:
781             text_out = text[:length-3] + '...'
782         else:
783             diff = length - len(text)
784             before = " " * (diff/2)
785             after = " " * (diff/2 + diff%2)
786             text_out = before + text + after
787             
788         return text_out
789     
790     def display_status(self, len_col):
791         '''Takes a lenght and construct the display of the current status 
792            of the jobs in an array that has a column for each host.
793            It displays the job that is currently running on the host 
794            of the column.
795         
796         :param len_col int: the size of the column 
797         :return: Nothing
798         :rtype: N\A
799         '''
800         
801         display_line = ""
802         for host_port in self.lhosts:
803             jb = self.is_occupied(host_port)
804             if not jb: # nothing running on the host
805                 empty = self.str_of_length("empty", len_col)
806                 display_line += "|" + empty 
807             else:
808                 display_line += "|" + src.printcolors.printcInfo(
809                                         self.str_of_length(jb.name, len_col))
810         
811         self.logger.write("\r" + display_line + "|")
812         self.logger.flush()
813     
814
815     def run_jobs(self):
816         '''The main method. Runs all the jobs on every host. 
817            For each host, at a given time, only one job can be running.
818            The jobs that have the field after (that contain the job that has
819            to be run before it) are run after the previous job.
820            This method stops when all the jobs are finished.
821         
822         :return: Nothing
823         :rtype: N\A
824         '''
825
826         # Print header
827         self.logger.write(src.printcolors.printcInfo(
828                                                 _('Executing the jobs :\n')))
829         text_line = ""
830         for host_port in self.lhosts:
831             host = host_port[0]
832             port = host_port[1]
833             if port == 22: # default value
834                 text_line += "|" + self.str_of_length(host, self.len_columns)
835             else:
836                 text_line += "|" + self.str_of_length(
837                                 "("+host+", "+str(port)+")", self.len_columns)
838         
839         tiret_line = " " + "-"*(len(text_line)-1) + "\n"
840         self.logger.write(tiret_line)
841         self.logger.write(text_line + "|\n")
842         self.logger.write(tiret_line)
843         self.logger.flush()
844         
845         # The infinite loop that runs the jobs
846         l_jobs_not_started = self.dic_job_machine.keys()
847         while len(self._l_jobs_finished) != len(self.dic_job_machine.keys()):
848             new_job_start = False
849             for host_port in self.lhosts:
850                 
851                 if self.is_occupied(host_port):
852                     continue
853              
854                 for jb in l_jobs_not_started:
855                     if (jb.machine.host, jb.machine.port) != host_port:
856                         continue 
857                     if jb.after == None:
858                         jb.run(self.logger)
859                         l_jobs_not_started.remove(jb)
860                         new_job_start = True
861                         break
862                     else:
863                         jb_before = self.find_job_that_has_name(jb.after) 
864                         if jb_before.has_finished():
865                             jb.run(self.logger)
866                             l_jobs_not_started.remove(jb)
867                             new_job_start = True
868                             break
869             self.cancel_dependencies_of_failing_jobs()
870             new_job_finished = self.update_jobs_states_list()
871             
872             if new_job_start or new_job_finished:
873                 self.gui.update_xml_file(self.ljobs)            
874                 # Display the current status     
875                 self.display_status(self.len_columns)
876             
877             # Make sure that the proc is not entirely busy
878             time.sleep(0.001)
879         
880         self.logger.write("\n")    
881         self.logger.write(tiret_line)                   
882         self.logger.write("\n\n")
883         
884         self.gui.update_xml_file(self.ljobs)
885         self.gui.last_update()
886
887     def write_all_results(self):
888         '''Display all the jobs outputs.
889         
890         :return: Nothing
891         :rtype: N\A
892         '''
893         
894         for jb in self.dic_job_machine.keys():
895             self.logger.write(src.printcolors.printcLabel(
896                         "#------- Results for job %s -------#\n" % jb.name))
897             jb.write_results(self.logger)
898             self.logger.write("\n\n")
899
900 class Gui(object):
901     '''Class to manage the the xml data that can be displayed in a browser to
902        see the jobs states
903     '''
904     
905     """
906     <?xml version='1.0' encoding='utf-8'?>
907     <?xml-stylesheet type='text/xsl' href='job_report.xsl'?>
908     <JobsReport>
909       <infos>
910         <info name="generated" value="2016-06-02 07:06:45"/>
911       </infos>
912       <hosts>
913           <host name=is221553 port=22 distribution=UB12.04/>
914           <host name=is221560 port=22/>
915           <host name=is221553 port=22 distribution=FD20/>
916       </hosts>
917       <applications>
918           <application name=SALOME-7.8.0/>
919           <application name=SALOME-master/>
920           <application name=MED-STANDALONE-master/>
921           <application name=CORPUS/>
922       </applications>
923       
924       <jobs>
925           <job name="7.8.0 FD22">
926                 <host>is228809</host>
927                 <port>2200</port>
928                 <application>SALOME-7.8.0</application>
929                 <user>adminuser</user>
930                 <timeout>240</timeout>
931                 <commands>
932                     export DISPLAY=is221560
933                     scp -p salome@is221560.intra.cea.fr:/export/home/salome/SALOME-7.7.1p1-src.tgz /local/adminuser         
934                     tar xf /local/adminuser/SALOME-7.7.1p1-src.tgz -C /local/adminuser
935                 </commands>
936                 <state>Not launched</state>
937           </job>
938
939           <job name="master MG05">
940                 <host>is221560</host>
941                 <port>22</port>
942                 <application>SALOME-master</application>
943                 <user>salome</user>
944                 <timeout>240</timeout>
945                 <commands>
946                     export DISPLAY=is221560
947                     scp -p salome@is221560.intra.cea.fr:/export/home/salome/SALOME-7.7.1p1-src.tgz /local/adminuser         
948                     sat prepare SALOME-master
949                     sat compile SALOME-master
950                     sat check SALOME-master
951                     sat launcher SALOME-master
952                     sat test SALOME-master
953                 </commands>
954                 <state>Running since 23 min</state>
955                 <!-- <state>time out</state> -->
956                 <!-- <state>OK</state> -->
957                 <!-- <state>KO</state> -->
958                 <begin>10/05/2016 20h32</begin>
959                 <end>10/05/2016 22h59</end>
960           </job>
961
962       </jobs>
963     </JobsReport>
964     
965     """
966     
967     def __init__(self, xml_file_path, l_jobs, l_jobs_not_today, stylesheet):
968         # The path of the xml file
969         self.xml_file_path = xml_file_path
970         # The stylesheet
971         self.stylesheet = stylesheet
972         # Open the file in a writing stream
973         self.xml_file = src.xmlManager.XmlLogFile(xml_file_path, "JobsReport")
974         # Create the lines and columns
975         self.initialize_array(l_jobs, l_jobs_not_today)
976         # Write the wml file
977         self.update_xml_file(l_jobs)
978     
979     def initialize_array(self, l_jobs, l_jobs_not_today):
980         l_dist = []
981         l_applications = []
982         for job in l_jobs:
983             distrib = job.distribution
984             if distrib is not None and distrib not in l_dist:
985                 l_dist.append(distrib)
986             
987             application = job.application
988             if application is not None and application not in l_applications:
989                 l_applications.append(application)
990         
991         for job_def in l_jobs_not_today:
992             distrib = src.get_cfg_param(job_def, "distribution", "nothing")
993             if distrib is not "nothing" and distrib not in l_dist:
994                 l_dist.append(distrib)
995                 
996             application = src.get_cfg_param(job_def, "application", "nothing")
997             if application is not "nothing" and application not in l_applications:
998                 l_applications.append(application)
999         
1000         self.l_dist = l_dist
1001         self.l_applications = l_applications
1002         
1003         # Update the hosts node
1004         self.xmldists = self.xml_file.add_simple_node("distributions")
1005         for dist_name in self.l_dist:
1006             src.xmlManager.add_simple_node(self.xmldists, "dist", attrib={"name" : dist_name})
1007             
1008         # Update the applications node
1009         self.xmlapplications = self.xml_file.add_simple_node("applications")
1010         for application in self.l_applications:
1011             src.xmlManager.add_simple_node(self.xmlapplications, "application", attrib={"name" : application})
1012         
1013         # Initialize the jobs node
1014         self.xmljobs = self.xml_file.add_simple_node("jobs")
1015         
1016         # 
1017         self.put_jobs_not_today(l_jobs_not_today)
1018         
1019         # Initialize the info node (when generated)
1020         self.xmlinfos = self.xml_file.add_simple_node("infos", attrib={"name" : "last update", "JobsCommandStatus" : "running"})
1021     
1022     def put_jobs_not_today(self, l_jobs_not_today):
1023         for job_def in l_jobs_not_today:
1024             xmlj = src.xmlManager.add_simple_node(self.xmljobs, "job", attrib={"name" : job_def.name})
1025             src.xmlManager.add_simple_node(xmlj, "application", src.get_cfg_param(job_def, "application", "nothing"))
1026             src.xmlManager.add_simple_node(xmlj, "distribution", src.get_cfg_param(job_def, "distribution", "nothing"))
1027             src.xmlManager.add_simple_node(xmlj, "commands", " ; ".join(job_def.commands))
1028             src.xmlManager.add_simple_node(xmlj, "state", "Not today")        
1029         
1030     def update_xml_file(self, l_jobs):      
1031         
1032         # Update the job names and status node
1033         for job in l_jobs:
1034             # Find the node corresponding to the job and delete it
1035             # in order to recreate it
1036             for xmljob in self.xmljobs.findall('job'):
1037                 if xmljob.attrib['name'] == job.name:
1038                     self.xmljobs.remove(xmljob)
1039             
1040             T0 = str(job._T0)
1041             if T0 != "-1":
1042                 T0 = time.strftime('%Y-%m-%d %H:%M:%S', 
1043                                        time.localtime(job._T0))
1044             Tf = str(job._Tf)
1045             if Tf != "-1":
1046                 Tf = time.strftime('%Y-%m-%d %H:%M:%S', 
1047                                        time.localtime(job._Tf))
1048             
1049             # recreate the job node
1050             xmlj = src.xmlManager.add_simple_node(self.xmljobs, "job", attrib={"name" : job.name})
1051             src.xmlManager.add_simple_node(xmlj, "host", job.machine.host)
1052             src.xmlManager.add_simple_node(xmlj, "port", str(job.machine.port))
1053             src.xmlManager.add_simple_node(xmlj, "user", job.machine.user)
1054             src.xmlManager.add_simple_node(xmlj, "sat_path", job.machine.sat_path)
1055             src.xmlManager.add_simple_node(xmlj, "application", job.application)
1056             src.xmlManager.add_simple_node(xmlj, "distribution", job.distribution)
1057             src.xmlManager.add_simple_node(xmlj, "timeout", str(job.timeout))
1058             src.xmlManager.add_simple_node(xmlj, "commands", " ; ".join(job.commands))
1059             src.xmlManager.add_simple_node(xmlj, "state", job.get_status())
1060             src.xmlManager.add_simple_node(xmlj, "begin", T0)
1061             src.xmlManager.add_simple_node(xmlj, "end", Tf)
1062             src.xmlManager.add_simple_node(xmlj, "out", src.printcolors.cleancolor(job.out))
1063             src.xmlManager.add_simple_node(xmlj, "err", src.printcolors.cleancolor(job.err))
1064             src.xmlManager.add_simple_node(xmlj, "res", str(job.res_job))
1065             if len(job.remote_log_files) > 0:
1066                 src.xmlManager.add_simple_node(xmlj, "remote_log_file_path", job.remote_log_files[0])
1067             else:
1068                 src.xmlManager.add_simple_node(xmlj, "remote_log_file_path", "nothing")           
1069             
1070             xmlafter = src.xmlManager.add_simple_node(xmlj, "after", job.after)
1071             # get the job father
1072             if job.after is not None:
1073                 job_father = None
1074                 for jb in l_jobs:
1075                     if jb.name == job.after:
1076                         job_father = jb
1077                 if job_father is None:
1078                     msg = _("The job %(father_name)s that is parent of "
1079                             "%(son_name)s is not in the job list." %
1080                              {"father_name" : job.after , "son_name" : job.name})
1081                     raise src.SatException(msg)
1082                 
1083                 if len(job_father.remote_log_files) > 0:
1084                     link = job_father.remote_log_files[0]
1085                 else:
1086                     link = "nothing"
1087                 src.xmlManager.append_node_attrib(xmlafter, {"link" : link})
1088             
1089         
1090         # Update the date
1091         src.xmlManager.append_node_attrib(self.xmlinfos,
1092                     attrib={"value" : 
1093                     datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")})
1094                
1095         # Write the file
1096         self.write_xml_file()
1097     
1098     def last_update(self, finish_status = "finished"):
1099         src.xmlManager.append_node_attrib(self.xmlinfos,
1100                     attrib={"JobsCommandStatus" : finish_status})
1101         # Write the file
1102         self.write_xml_file()
1103     
1104     def write_xml_file(self):
1105         self.xml_file.write_tree(self.stylesheet)
1106         
1107 ##
1108 # Describes the command
1109 def description():
1110     return _("The jobs command launches maintenances that are described"
1111              " in the dedicated jobs configuration file.")
1112
1113 ##
1114 # Runs the command.
1115 def run(args, runner, logger):
1116        
1117     (options, args) = parser.parse_args(args)
1118        
1119     jobs_cfg_files_dir = runner.cfg.SITE.jobs.config_path
1120     
1121     l_cfg_dir = [jobs_cfg_files_dir, os.path.join(runner.cfg.VARS.datadir, "jobs")]
1122     
1123     # Make sure the path to the jobs config files directory exists 
1124     src.ensure_path_exists(jobs_cfg_files_dir)   
1125     
1126     # list option : display all the available config files
1127     if options.list:
1128         for cfg_dir in l_cfg_dir:
1129             if not options.no_label:
1130                 logger.write("------ %s\n" % 
1131                                  src.printcolors.printcHeader(cfg_dir))
1132     
1133             for f in sorted(os.listdir(cfg_dir)):
1134                 if not f.endswith('.pyconf'):
1135                     continue
1136                 cfilename = f[:-7]
1137                 logger.write("%s\n" % cfilename)
1138         return 0
1139
1140     # Make sure the jobs_config option has been called
1141     if not options.jobs_cfg:
1142         message = _("The option --jobs_config is required\n")      
1143         raise src.SatException( message )
1144     
1145     # Find the file in the directories
1146     found = False
1147     for cfg_dir in l_cfg_dir:
1148         file_jobs_cfg = os.path.join(cfg_dir, options.jobs_cfg)
1149         if not file_jobs_cfg.endswith('.pyconf'):
1150             file_jobs_cfg += '.pyconf'
1151         
1152         if not os.path.exists(file_jobs_cfg):
1153             continue
1154         else:
1155             found = True
1156             break
1157     
1158     if not found:
1159         msg = _("The file configuration %(name_file)s was not found."
1160                 "\nUse the --list option to get the possible files.")
1161         src.printcolors.printcError(msg)
1162         return 1
1163     
1164     info = [
1165         (_("Platform"), runner.cfg.VARS.dist),
1166         (_("File containing the jobs configuration"), file_jobs_cfg)
1167     ]    
1168     src.print_info(logger, info)
1169     
1170     # Read the config that is in the file
1171     config_jobs = src.read_config_from_a_file(file_jobs_cfg)
1172     if options.only_jobs:
1173         l_jb = src.pyconf.Sequence()
1174         for jb in config_jobs.jobs:
1175             if jb.name in options.only_jobs:
1176                 l_jb.append(jb,
1177                 "Adding a job that was given in only_jobs option parameters")
1178         config_jobs.jobs = l_jb
1179               
1180     # Initialization
1181     today_jobs = Jobs(runner, logger, options.jobs_cfg, file_jobs_cfg, config_jobs)
1182     # SSH connection to all machines
1183     today_jobs.ssh_connection_all_machines()
1184     if options.test_connection:
1185         return 0
1186     
1187     gui = None
1188     if options.publish:
1189         gui = Gui("/export/home/serioja/LOGS/test.xml", today_jobs.ljobs, today_jobs.ljobsdef_not_today, "job_report.xsl")
1190     
1191     today_jobs.gui = gui
1192     
1193     interruped = False
1194     try:
1195         # Run all the jobs contained in config_jobs
1196         today_jobs.run_jobs()
1197     except KeyboardInterrupt:
1198         interruped = True
1199         logger.write("\n\n%s\n\n" % 
1200                 (src.printcolors.printcWarning(_("Forced interruption"))), 1)
1201         
1202     finally:
1203         # find the potential not finished jobs and kill them
1204         for jb in today_jobs.ljobs:
1205             if not jb.has_finished():
1206                 jb.kill_remote_process()
1207         if interruped:
1208             today_jobs.gui.last_update(_("Forced interruption"))
1209         else:
1210             today_jobs.gui.last_update()
1211         # Output the results
1212         today_jobs.write_all_results()