Salome HOME
Rename table to board for jobs publishing
[tools/sat.git] / commands / jobs.py
1 #!/usr/bin/env python
2 #-*- coding:utf-8 -*-
3 #  Copyright (C) 2010-2013  CEA/DEN
4 #
5 #  This library is free software; you can redistribute it and/or
6 #  modify it under the terms of the GNU Lesser General Public
7 #  License as published by the Free Software Foundation; either
8 #  version 2.1 of the License.
9 #
10 #  This library is distributed in the hope that it will be useful,
11 #  but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 #  Lesser General Public License for more details.
14 #
15 #  You should have received a copy of the GNU Lesser General Public
16 #  License along with this library; if not, write to the Free Software
17 #  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
18
19 import os
20 import datetime
21 import time
22 import paramiko
23
24 import src
25
26 STYLESHEET_GLOBAL = "jobs_global_report.xsl"
27 STYLESHEET_BOARD = "jobs_board_report.xsl"
28
29 parser = src.options.Options()
30
31 parser.add_option('j', 'jobs_config', 'string', 'jobs_cfg', 
32                   _('The name of the config file that contains'
33                   ' the jobs configuration'))
34 parser.add_option('o', 'only_jobs', 'list2', 'only_jobs',
35                   _('The list of jobs to launch, by their name. '))
36 parser.add_option('l', 'list', 'boolean', 'list', 
37                   _('list all available config files.'))
38 parser.add_option('n', 'no_label', 'boolean', 'no_label',
39                   _("do not print labels, Works only with --list."), False)
40 parser.add_option('t', 'test_connection', 'boolean', 'test_connection',
41                   _("Try to connect to the machines. Not executing the jobs."),
42                   False)
43 parser.add_option('p', 'publish', 'boolean', 'publish',
44                   _("Generate an xml file that can be read in a browser to "
45                     "display the jobs status."),
46                   False)
47
48 class Machine(object):
49     '''Class to manage a ssh connection on a machine
50     '''
51     def __init__(self,
52                  name,
53                  host,
54                  user,
55                  port=22,
56                  passwd=None,
57                  sat_path="salomeTools"):
58         self.name = name
59         self.host = host
60         self.port = port
61         self.distribution = None # Will be filled after copying SAT on the machine
62         self.user = user
63         self.password = passwd
64         self.sat_path = sat_path
65         self.ssh = paramiko.SSHClient()
66         self._connection_successful = None
67     
68     def connect(self, logger):
69         '''Initiate the ssh connection to the remote machine
70         
71         :param logger src.logger.Logger: The logger instance 
72         :return: Nothing
73         :rtype: N\A
74         '''
75
76         self._connection_successful = False
77         self.ssh.load_system_host_keys()
78         self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
79         try:
80             self.ssh.connect(self.host,
81                              port=self.port,
82                              username=self.user,
83                              password = self.password)
84         except paramiko.AuthenticationException:
85             message = src.KO_STATUS + _("Authentication failed")
86         except paramiko.BadHostKeyException:
87             message = (src.KO_STATUS + 
88                        _("The server's host key could not be verified"))
89         except paramiko.SSHException:
90             message = ( _("SSHException error connecting or "
91                           "establishing an SSH session"))            
92         except:
93             message = ( _("Error connecting or establishing an SSH session"))
94         else:
95             self._connection_successful = True
96             message = ""
97         return message
98     
99     def successfully_connected(self, logger):
100         '''Verify if the connection to the remote machine has succeed
101         
102         :param logger src.logger.Logger: The logger instance 
103         :return: True if the connection has succeed, False if not
104         :rtype: bool
105         '''
106         if self._connection_successful == None:
107             message = _("Warning : trying to ask if the connection to "
108             "(name: %s host: %s, port: %s, user: %s) is OK whereas there were"
109             " no connection request" % 
110                         (self.name, self.host, self.port, self.user))
111             logger.write( src.printcolors.printcWarning(message))
112         return self._connection_successful
113
114     def copy_sat(self, sat_local_path, job_file):
115         '''Copy salomeTools to the remote machine in self.sat_path
116         '''
117         res = 0
118         try:
119             # open a sftp connection
120             self.sftp = self.ssh.open_sftp()
121             # Create the sat directory on remote machine if it is not existing
122             self.mkdir(self.sat_path, ignore_existing=True)
123             # Put sat
124             self.put_dir(sat_local_path, self.sat_path, filters = ['.git'])
125             # put the job configuration file in order to make it reachable 
126             # on the remote machine
127             job_file_name = os.path.basename(job_file)
128             self.sftp.put(job_file, os.path.join(self.sat_path,
129                                                  "data",
130                                                  "jobs",
131                                                  job_file_name))
132         except Exception as e:
133             res = str(e)
134             self._connection_successful = False
135         
136         return res
137         
138     def put_dir(self, source, target, filters = []):
139         ''' Uploads the contents of the source directory to the target path. The
140             target directory needs to exists. All subdirectories in source are 
141             created under target.
142         '''
143         for item in os.listdir(source):
144             if item in filters:
145                 continue
146             source_path = os.path.join(source, item)
147             destination_path = os.path.join(target, item)
148             if os.path.islink(source_path):
149                 linkto = os.readlink(source_path)
150                 try:
151                     self.sftp.symlink(linkto, destination_path)
152                     self.sftp.chmod(destination_path,
153                                     os.stat(source_path).st_mode)
154                 except IOError:
155                     pass
156             else:
157                 if os.path.isfile(source_path):
158                     self.sftp.put(source_path, destination_path)
159                     self.sftp.chmod(destination_path,
160                                     os.stat(source_path).st_mode)
161                 else:
162                     self.mkdir(destination_path, ignore_existing=True)
163                     self.put_dir(source_path, destination_path)
164
165     def mkdir(self, path, mode=511, ignore_existing=False):
166         ''' Augments mkdir by adding an option to not fail 
167             if the folder exists 
168         '''
169         try:
170             self.sftp.mkdir(path, mode)
171         except IOError:
172             if ignore_existing:
173                 pass
174             else:
175                 raise       
176     
177     def exec_command(self, command, logger):
178         '''Execute the command on the remote machine
179         
180         :param command str: The command to be run
181         :param logger src.logger.Logger: The logger instance 
182         :return: the stdin, stdout, and stderr of the executing command,
183                  as a 3-tuple
184         :rtype: (paramiko.channel.ChannelFile, paramiko.channel.ChannelFile,
185                 paramiko.channel.ChannelFile)
186         '''
187         try:        
188             # Does not wait the end of the command
189             (stdin, stdout, stderr) = self.ssh.exec_command(command)
190         except paramiko.SSHException:
191             message = src.KO_STATUS + _(
192                             ": the server failed to execute the command\n")
193             logger.write( src.printcolors.printcError(message))
194             return (None, None, None)
195         except:
196             logger.write( src.printcolors.printcError(src.KO_STATUS + '\n'))
197             return (None, None, None)
198         else:
199             return (stdin, stdout, stderr)
200
201     def close(self):
202         '''Close the ssh connection
203         
204         :rtype: N\A
205         '''
206         self.ssh.close()
207      
208     def write_info(self, logger):
209         '''Prints the informations relative to the machine in the logger 
210            (terminal traces and log file)
211         
212         :param logger src.logger.Logger: The logger instance
213         :return: Nothing
214         :rtype: N\A
215         '''
216         logger.write("host : " + self.host + "\n")
217         logger.write("port : " + str(self.port) + "\n")
218         logger.write("user : " + str(self.user) + "\n")
219         if self.successfully_connected(logger):
220             status = src.OK_STATUS
221         else:
222             status = src.KO_STATUS
223         logger.write("Connection : " + status + "\n\n") 
224
225
226 class Job(object):
227     '''Class to manage one job
228     '''
229     def __init__(self, name, machine, application, board, 
230                  commands, timeout, config, logger, job_file, after=None):
231
232         self.name = name
233         self.machine = machine
234         self.after = after
235         self.timeout = timeout
236         self.application = application
237         self.board = board
238         self.config = config
239         self.logger = logger
240         # The list of log files to download from the remote machine 
241         self.remote_log_files = []
242         
243         # The remote command status
244         # -1 means that it has not been launched, 
245         # 0 means success and 1 means fail
246         self.res_job = "-1"
247         self.cancelled = False
248         
249         self._T0 = -1
250         self._Tf = -1
251         self._has_begun = False
252         self._has_finished = False
253         self._has_timouted = False
254         self._stdin = None # Store the command inputs field
255         self._stdout = None # Store the command outputs field
256         self._stderr = None # Store the command errors field
257
258         self.out = None # Contains something only if the job is finished
259         self.err = None # Contains something only if the job is finished    
260                
261         self.commands = commands
262         self.command = (os.path.join(self.machine.sat_path, "sat") +
263                         " -l " +
264                         os.path.join(self.machine.sat_path,
265                                      "list_log_files.txt") +
266                         " job --jobs_config " +
267                         job_file +
268                         " --name " +
269                         self.name)
270     
271     def get_pids(self):
272         """ Get the pid(s) corresponding to the command that have been launched
273             On the remote machine
274         
275         :return: The list of integers corresponding to the found pids
276         :rtype: List
277         """
278         pids = []
279         cmd_pid = 'ps aux | grep "' + self.command + '" | awk \'{print $2}\''
280         (_, out_pid, _) = self.machine.exec_command(cmd_pid, self.logger)
281         pids_cmd = out_pid.readlines()
282         pids_cmd = [str(src.only_numbers(pid)) for pid in pids_cmd]
283         pids+=pids_cmd
284         return pids
285     
286     def kill_remote_process(self, wait=1):
287         '''Kills the process on the remote machine.
288         
289         :return: (the output of the kill, the error of the kill)
290         :rtype: (str, str)
291         '''
292         
293         pids = self.get_pids()
294         cmd_kill = " ; ".join([("kill -2 " + pid) for pid in pids])
295         (_, out_kill, err_kill) = self.machine.exec_command(cmd_kill, 
296                                                             self.logger)
297         time.sleep(wait)
298         return (out_kill, err_kill)
299             
300     def has_begun(self):
301         '''Returns True if the job has already begun
302         
303         :return: True if the job has already begun
304         :rtype: bool
305         '''
306         return self._has_begun
307     
308     def has_finished(self):
309         '''Returns True if the job has already finished 
310            (i.e. all the commands have been executed)
311            If it is finished, the outputs are stored in the fields out and err.
312         
313         :return: True if the job has already finished
314         :rtype: bool
315         '''
316         
317         # If the method has already been called and returned True
318         if self._has_finished:
319             return True
320         
321         # If the job has not begun yet
322         if not self.has_begun():
323             return False
324         
325         if self._stdout.channel.closed:
326             self._has_finished = True
327             # Store the result outputs
328             self.out = self._stdout.read().decode()
329             self.err = self._stderr.read().decode()
330             # Put end time
331             self._Tf = time.time()
332             # And get the remote command status and log files
333             self.get_log_files()
334         
335         return self._has_finished
336           
337     def get_log_files(self):
338         """Get the log files produced by the command launched 
339            on the remote machine.
340         """
341         # Do not get the files if the command is not finished
342         if not self.has_finished():
343             msg = _("Trying to get log files whereas the job is not finished.")
344             self.logger.write(src.printcolors.printcWarning(msg))
345             return
346         
347         # First get the file that contains the list of log files to get
348         tmp_file_path = src.get_tmp_filename(self.config, "list_log_files.txt")
349         self.machine.sftp.get(
350                     os.path.join(self.machine.sat_path, "list_log_files.txt"),
351                     tmp_file_path)
352         
353         # Read the file and get the result of the command and all the log files
354         # to get
355         fstream_tmp = open(tmp_file_path, "r")
356         file_lines = fstream_tmp.readlines()
357         file_lines = [line.replace("\n", "") for line in file_lines]
358         fstream_tmp.close()
359         os.remove(tmp_file_path)
360         # The first line is the result of the command (0 success or 1 fail)
361         self.res_job = file_lines[0]
362
363         for i, job_path_remote in enumerate(file_lines[1:]):
364             try:
365                 # For each command, there is two files to get :
366                 # 1- The xml file describing the command and giving the 
367                 # internal traces.
368                 # 2- The txt file containing the system command traces (like 
369                 # traces produced by the "make" command)
370                 if os.path.basename(os.path.dirname(job_path_remote)) != 'OUT':
371                     # Case 1-
372                     local_path = os.path.join(os.path.dirname(
373                                                         self.logger.logFilePath),
374                                               os.path.basename(job_path_remote))
375                     if i==0: # The first is the job command
376                         self.logger.add_link(os.path.basename(job_path_remote),
377                                              "job",
378                                              self.res_job,
379                                              self.command) 
380                 else:
381                     # Case 2-
382                     local_path = os.path.join(os.path.dirname(
383                                                         self.logger.logFilePath),
384                                               'OUT',
385                                               os.path.basename(job_path_remote))
386                 # Get the file
387                 if not os.path.exists(local_path):
388                     self.machine.sftp.get(job_path_remote, local_path)
389                 self.remote_log_files.append(local_path)
390             except Exception as e:
391                 self.err += _("Unable to get %s log file from remote: %s" % 
392                                                     (job_path_remote, str(e)))
393
394     def has_failed(self):
395         '''Returns True if the job has failed. 
396            A job is considered as failed if the machine could not be reached,
397            if the remote command failed, 
398            or if the job finished with a time out.
399         
400         :return: True if the job has failed
401         :rtype: bool
402         '''
403         if not self.has_finished():
404             return False
405         if not self.machine.successfully_connected(self.logger):
406             return True
407         if self.is_timeout():
408             return True
409         if self.res_job == "1":
410             return True
411         return False
412     
413     def cancel(self):
414         """In case of a failing job, one has to cancel every job that depend 
415            on it. This method put the job as failed and will not be executed.
416         """
417         self._has_begun = True
418         self._has_finished = True
419         self.cancelled = True
420         self.out = _("This job was not launched because its father has failed.")
421         self.err = _("This job was not launched because its father has failed.")
422
423     def is_running(self):
424         '''Returns True if the job commands are running 
425         
426         :return: True if the job is running
427         :rtype: bool
428         '''
429         return self.has_begun() and not self.has_finished()
430
431     def is_timeout(self):
432         '''Returns True if the job commands has finished with timeout 
433         
434         :return: True if the job has finished with timeout
435         :rtype: bool
436         '''
437         return self._has_timouted
438
439     def time_elapsed(self):
440         """Get the time elapsed since the job launching
441         
442         :return: The number of seconds
443         :rtype: int
444         """
445         if not self.has_begun():
446             return -1
447         T_now = time.time()
448         return T_now - self._T0
449     
450     def check_time(self):
451         """Verify that the job has not exceeded its timeout.
452            If it has, kill the remote command and consider the job as finished.
453         """
454         if not self.has_begun():
455             return
456         if self.time_elapsed() > self.timeout:
457             self._has_finished = True
458             self._has_timouted = True
459             self._Tf = time.time()
460             self.get_pids()
461             (out_kill, _) = self.kill_remote_process()
462             self.out = "TIMEOUT \n" + out_kill.read().decode()
463             self.err = "TIMEOUT : %s seconds elapsed\n" % str(self.timeout)
464             try:
465                 self.get_log_files()
466             except Exception as e:
467                 self.err += _("Unable to get remote log files: %s" % e)
468             
469     def total_duration(self):
470         """Give the total duration of the job
471         
472         :return: the total duration of the job in seconds
473         :rtype: int
474         """
475         return self._Tf - self._T0
476         
477     def run(self):
478         """Launch the job by executing the remote command.
479         """
480         
481         # Prevent multiple run
482         if self.has_begun():
483             msg = _("Warning: A job can only be launched one time")
484             msg2 = _("Trying to launch the job \"%s\" whereas it has "
485                      "already been launched." % self.name)
486             self.logger.write(src.printcolors.printcWarning("%s\n%s\n" % (msg,
487                                                                         msg2)))
488             return
489         
490         # Do not execute the command if the machine could not be reached
491         if not self.machine.successfully_connected(self.logger):
492             self._has_finished = True
493             self.out = "N\A"
494             self.err = ("Connection to machine (name : %s, host: %s, port:"
495                         " %s, user: %s) has failed\nUse the log command "
496                         "to get more information."
497                         % (self.machine.name,
498                            self.machine.host,
499                            self.machine.port,
500                            self.machine.user))
501         else:
502             # Usual case : Launch the command on remote machine
503             self._T0 = time.time()
504             self._stdin, self._stdout, self._stderr = self.machine.exec_command(
505                                                         self.command, self.logger)
506             # If the results are not initialized, finish the job
507             if (self._stdin, self._stdout, self._stderr) == (None, None, None):
508                 self._has_finished = True
509                 self._Tf = time.time()
510                 self.out = "N\A"
511                 self.err = "The server failed to execute the command"
512         
513         # Put the beginning flag to true.
514         self._has_begun = True
515     
516     def write_results(self):
517         """Display on the terminal all the job's information
518         """
519         self.logger.write("name : " + self.name + "\n")
520         if self.after:
521             self.logger.write("after : %s\n" % self.after)
522         self.logger.write("Time elapsed : %4imin %2is \n" % 
523                      (self.total_duration()//60 , self.total_duration()%60))
524         if self._T0 != -1:
525             self.logger.write("Begin time : %s\n" % 
526                          time.strftime('%Y-%m-%d %H:%M:%S', 
527                                        time.localtime(self._T0)) )
528         if self._Tf != -1:
529             self.logger.write("End time   : %s\n\n" % 
530                          time.strftime('%Y-%m-%d %H:%M:%S', 
531                                        time.localtime(self._Tf)) )
532         
533         machine_head = "Informations about connection :\n"
534         underline = (len(machine_head) - 2) * "-"
535         self.logger.write(src.printcolors.printcInfo(
536                                                 machine_head+underline+"\n"))
537         self.machine.write_info(self.logger)
538         
539         self.logger.write(src.printcolors.printcInfo("out : \n"))
540         if self.out is None:
541             self.logger.write("Unable to get output\n")
542         else:
543             self.logger.write(self.out + "\n")
544         self.logger.write(src.printcolors.printcInfo("err : \n"))
545         if self.err is None:
546             self.logger.write("Unable to get error\n")
547         else:
548             self.logger.write(self.err + "\n")
549         
550     def get_status(self):
551         """Get the status of the job (used by the Gui for xml display)
552         
553         :return: The current status of the job
554         :rtype: String
555         """
556         if not self.machine.successfully_connected(self.logger):
557             return "SSH connection KO"
558         if not self.has_begun():
559             return "Not launched"
560         if self.cancelled:
561             return "Cancelled"
562         if self.is_running():
563             return "running since " + time.strftime('%Y-%m-%d %H:%M:%S',
564                                                     time.localtime(self._T0))        
565         if self.has_finished():
566             if self.is_timeout():
567                 return "Timeout since " + time.strftime('%Y-%m-%d %H:%M:%S',
568                                                     time.localtime(self._Tf))
569             return "Finished since " + time.strftime('%Y-%m-%d %H:%M:%S',
570                                                      time.localtime(self._Tf))
571     
572 class Jobs(object):
573     '''Class to manage the jobs to be run
574     '''
575     def __init__(self,
576                  runner,
577                  logger,
578                  job_file,
579                  job_file_path,
580                  config_jobs,
581                  lenght_columns = 20):
582         # The jobs configuration
583         self.cfg_jobs = config_jobs
584         self.job_file = job_file
585         self.job_file_path = job_file_path
586         # The machine that will be used today
587         self.lmachines = []
588         # The list of machine (hosts, port) that will be used today 
589         # (a same host can have several machine instances since there 
590         # can be several ssh parameters) 
591         self.lhosts = []
592         # The jobs to be launched today 
593         self.ljobs = []
594         # The jobs that will not be launched today
595         self.ljobs_not_today = []
596         self.runner = runner
597         self.logger = logger
598         self.len_columns = lenght_columns
599         
600         # the list of jobs that have not been run yet
601         self._l_jobs_not_started = []
602         # the list of jobs that have already ran 
603         self._l_jobs_finished = []
604         # the list of jobs that are running 
605         self._l_jobs_running = [] 
606                 
607         self.determine_jobs_and_machines()
608     
609     def define_job(self, job_def, machine):
610         '''Takes a pyconf job definition and a machine (from class machine)
611            and returns the job instance corresponding to the definition.
612         
613         :param job_def src.config.Mapping: a job definition 
614         :param machine machine: the machine on which the job will run
615         :return: The corresponding job in a job class instance
616         :rtype: job
617         '''
618         name = job_def.name
619         cmmnds = job_def.commands
620         timeout = job_def.timeout
621         after = None
622         if 'after' in job_def:
623             after = job_def.after
624         application = None
625         if 'application' in job_def:
626             application = job_def.application
627         board = None
628         if 'board' in job_def:
629             board = job_def.board
630             
631         return Job(name,
632                    machine,
633                    application,
634                    board,
635                    cmmnds,
636                    timeout,
637                    self.runner.cfg,
638                    self.logger,
639                    self.job_file,
640                    after = after)
641     
642     def determine_jobs_and_machines(self):
643         '''Function that reads the pyconf jobs definition and instantiates all
644            the machines and jobs to be done today.
645
646         :return: Nothing
647         :rtype: N\A
648         '''
649         today = datetime.date.weekday(datetime.date.today())
650         host_list = []
651                
652         for job_def in self.cfg_jobs.jobs :
653                 
654             if not "machine" in job_def:
655                 msg = _('WARNING: The job "%s" do not have the key '
656                        '"machine", this job is ignored.\n\n' % job_def.name)
657                 self.logger.write(src.printcolors.printcWarning(msg))
658                 continue
659             name_machine = job_def.machine
660             
661             a_machine = None
662             for mach in self.lmachines:
663                 if mach.name == name_machine:
664                     a_machine = mach
665                     break
666             
667             if a_machine == None:
668                 for machine_def in self.cfg_jobs.machines:
669                     if machine_def.name == name_machine:
670                         if 'host' not in machine_def:
671                             host = self.runner.cfg.VARS.hostname
672                         else:
673                             host = machine_def.host
674
675                         if 'user' not in machine_def:
676                             user = self.runner.cfg.VARS.user
677                         else:
678                             user = machine_def.user
679
680                         if 'port' not in machine_def:
681                             port = 22
682                         else:
683                             port = machine_def.port
684             
685                         if 'password' not in machine_def:
686                             passwd = None
687                         else:
688                             passwd = machine_def.password    
689                             
690                         if 'sat_path' not in machine_def:
691                             sat_path = "salomeTools"
692                         else:
693                             sat_path = machine_def.sat_path
694                         
695                         a_machine = Machine(
696                                             machine_def.name,
697                                             host,
698                                             user,
699                                             port=port,
700                                             passwd=passwd,
701                                             sat_path=sat_path
702                                             )
703                         
704                         self.lmachines.append(a_machine)
705                         if (host, port) not in host_list:
706                             host_list.append((host, port))
707                 
708                 if a_machine == None:
709                     msg = _("WARNING: The job \"%(job_name)s\" requires the "
710                             "machine \"%(machine_name)s\" but this machine "
711                             "is not defined in the configuration file.\n"
712                             "The job will not be launched")
713                     self.logger.write(src.printcolors.printcWarning(msg))
714                                   
715             a_job = self.define_job(job_def, a_machine)
716                 
717             if today in job_def.when:    
718                 self.ljobs.append(a_job)
719             else: # today in job_def.when
720                 self.ljobs_not_today.append(a_job)
721                
722         self.lhosts = host_list
723         
724     def ssh_connection_all_machines(self, pad=50):
725         '''Function that do the ssh connection to every machine 
726            to be used today.
727
728         :return: Nothing
729         :rtype: N\A
730         '''
731         self.logger.write(src.printcolors.printcInfo((
732                         "Establishing connection with all the machines :\n")))
733         for machine in self.lmachines:
734             # little algorithm in order to display traces
735             begin_line = (_("Connection to %s: " % machine.name))
736             if pad - len(begin_line) < 0:
737                 endline = " "
738             else:
739                 endline = (pad - len(begin_line)) * "." + " "
740             
741             step = "SSH connection"
742             self.logger.write( begin_line + endline + step)
743             self.logger.flush()
744             # the call to the method that initiate the ssh connection
745             msg = machine.connect(self.logger)
746             
747             # Copy salomeTools to the remote machine
748             if machine.successfully_connected(self.logger):
749                 step = _("Copy SAT")
750                 self.logger.write('\r%s%s%s' % (begin_line, endline, 20 * " "),3)
751                 self.logger.write('\r%s%s%s' % (begin_line, endline, step), 3)
752                 self.logger.flush()
753                 res_copy = machine.copy_sat(self.runner.cfg.VARS.salometoolsway,
754                                             self.job_file_path)
755                 # get the remote machine distribution using a sat command
756                 (__, out_dist, __) = machine.exec_command(
757                                 os.path.join(machine.sat_path,
758                                     "sat config --value VARS.dist --no_label"),
759                                 self.logger)
760                 machine.distribution = out_dist.read().decode().replace("\n",
761                                                                         "")
762                 # Print the status of the copy
763                 if res_copy == 0:
764                     self.logger.write('\r%s' % 
765                                 ((len(begin_line)+len(endline)+20) * " "), 3)
766                     self.logger.write('\r%s%s%s' % 
767                         (begin_line, 
768                          endline, 
769                          src.printcolors.printc(src.OK_STATUS)), 3)
770                 else:
771                     self.logger.write('\r%s' % 
772                             ((len(begin_line)+len(endline)+20) * " "), 3)
773                     self.logger.write('\r%s%s%s %s' % 
774                         (begin_line,
775                          endline,
776                          src.printcolors.printc(src.OK_STATUS),
777                          _("Copy of SAT failed")), 3)
778             else:
779                 self.logger.write('\r%s' % 
780                                   ((len(begin_line)+len(endline)+20) * " "), 3)
781                 self.logger.write('\r%s%s%s %s' % 
782                     (begin_line,
783                      endline,
784                      src.printcolors.printc(src.KO_STATUS),
785                      msg), 3)
786             self.logger.write("\n", 3)
787                 
788         self.logger.write("\n")
789         
790
791     def is_occupied(self, hostname):
792         '''Function that returns True if a job is running on 
793            the machine defined by its host and its port.
794         
795         :param hostname (str, int): the pair (host, port)
796         :return: the job that is running on the host, 
797                 or false if there is no job running on the host. 
798         :rtype: job / bool
799         '''
800         host = hostname[0]
801         port = hostname[1]
802         for jb in self.ljobs:
803             if jb.machine.host == host and jb.machine.port == port:
804                 if jb.is_running():
805                     return jb
806         return False
807     
808     def update_jobs_states_list(self):
809         '''Function that updates the lists that store the currently
810            running jobs and the jobs that have already finished.
811         
812         :return: Nothing. 
813         :rtype: N\A
814         '''
815         jobs_finished_list = []
816         jobs_running_list = []
817         for jb in self.ljobs:
818             if jb.is_running():
819                 jobs_running_list.append(jb)
820                 jb.check_time()
821             if jb.has_finished():
822                 jobs_finished_list.append(jb)
823         
824         nb_job_finished_before = len(self._l_jobs_finished)
825         self._l_jobs_finished = jobs_finished_list
826         self._l_jobs_running = jobs_running_list
827         
828         nb_job_finished_now = len(self._l_jobs_finished)
829         
830         return nb_job_finished_now > nb_job_finished_before
831     
832     def cancel_dependencies_of_failing_jobs(self):
833         '''Function that cancels all the jobs that depend on a failing one.
834         
835         :return: Nothing. 
836         :rtype: N\A
837         '''
838         
839         for job in self.ljobs:
840             if job.after is None:
841                 continue
842             father_job = self.find_job_that_has_name(job.after)
843             if father_job is not None and father_job.has_failed():
844                 job.cancel()
845     
846     def find_job_that_has_name(self, name):
847         '''Returns the job by its name.
848         
849         :param name str: a job name
850         :return: the job that has the name. 
851         :rtype: job
852         '''
853         for jb in self.ljobs:
854             if jb.name == name:
855                 return jb
856         # the following is executed only if the job was not found
857         return None
858     
859     def str_of_length(self, text, length):
860         '''Takes a string text of any length and returns 
861            the most close string of length "length".
862         
863         :param text str: any string
864         :param length int: a length for the returned string
865         :return: the most close string of length "length"
866         :rtype: str
867         '''
868         if len(text) > length:
869             text_out = text[:length-3] + '...'
870         else:
871             diff = length - len(text)
872             before = " " * (diff//2)
873             after = " " * (diff//2 + diff%2)
874             text_out = before + text + after
875             
876         return text_out
877     
878     def display_status(self, len_col):
879         '''Takes a lenght and construct the display of the current status 
880            of the jobs in an array that has a column for each host.
881            It displays the job that is currently running on the host 
882            of the column.
883         
884         :param len_col int: the size of the column 
885         :return: Nothing
886         :rtype: N\A
887         '''
888         
889         display_line = ""
890         for host_port in self.lhosts:
891             jb = self.is_occupied(host_port)
892             if not jb: # nothing running on the host
893                 empty = self.str_of_length("empty", len_col)
894                 display_line += "|" + empty 
895             else:
896                 display_line += "|" + src.printcolors.printcInfo(
897                                         self.str_of_length(jb.name, len_col))
898         
899         self.logger.write("\r" + display_line + "|")
900         self.logger.flush()
901     
902
903     def run_jobs(self):
904         '''The main method. Runs all the jobs on every host. 
905            For each host, at a given time, only one job can be running.
906            The jobs that have the field after (that contain the job that has
907            to be run before it) are run after the previous job.
908            This method stops when all the jobs are finished.
909         
910         :return: Nothing
911         :rtype: N\A
912         '''
913
914         # Print header
915         self.logger.write(src.printcolors.printcInfo(
916                                                 _('Executing the jobs :\n')))
917         text_line = ""
918         for host_port in self.lhosts:
919             host = host_port[0]
920             port = host_port[1]
921             if port == 22: # default value
922                 text_line += "|" + self.str_of_length(host, self.len_columns)
923             else:
924                 text_line += "|" + self.str_of_length(
925                                 "("+host+", "+str(port)+")", self.len_columns)
926         
927         tiret_line = " " + "-"*(len(text_line)-1) + "\n"
928         self.logger.write(tiret_line)
929         self.logger.write(text_line + "|\n")
930         self.logger.write(tiret_line)
931         self.logger.flush()
932         
933         # The infinite loop that runs the jobs
934         l_jobs_not_started = src.deepcopy_list(self.ljobs)
935         while len(self._l_jobs_finished) != len(self.ljobs):
936             new_job_start = False
937             for host_port in self.lhosts:
938                 
939                 if self.is_occupied(host_port):
940                     continue
941              
942                 for jb in l_jobs_not_started:
943                     if (jb.machine.host, jb.machine.port) != host_port:
944                         continue 
945                     if jb.after == None:
946                         jb.run()
947                         l_jobs_not_started.remove(jb)
948                         new_job_start = True
949                         break
950                     else:
951                         jb_before = self.find_job_that_has_name(jb.after)
952                         if jb_before is None:
953                             jb.cancel()
954                             msg = _("This job was not launched because its father is not in the jobs list.")
955                             jb.out = msg
956                             jb.err = msg
957                             break
958                         if jb_before.has_finished():
959                             jb.run()
960                             l_jobs_not_started.remove(jb)
961                             new_job_start = True
962                             break
963             self.cancel_dependencies_of_failing_jobs()
964             new_job_finished = self.update_jobs_states_list()
965             
966             if new_job_start or new_job_finished:
967                 self.gui.update_xml_files(self.ljobs)            
968                 # Display the current status     
969                 self.display_status(self.len_columns)
970             
971             # Make sure that the proc is not entirely busy
972             time.sleep(0.001)
973         
974         self.logger.write("\n")    
975         self.logger.write(tiret_line)                   
976         self.logger.write("\n\n")
977         
978         self.gui.update_xml_files(self.ljobs)
979         self.gui.last_update()
980
981     def write_all_results(self):
982         '''Display all the jobs outputs.
983         
984         :return: Nothing
985         :rtype: N\A
986         '''
987         
988         for jb in self.ljobs:
989             self.logger.write(src.printcolors.printcLabel(
990                         "#------- Results for job %s -------#\n" % jb.name))
991             jb.write_results()
992             self.logger.write("\n\n")
993
994 class Gui(object):
995     '''Class to manage the the xml data that can be displayed in a browser to
996        see the jobs states
997     '''
998    
999     def __init__(self, xml_dir_path, l_jobs, l_jobs_not_today):
1000         '''Initialization
1001         
1002         :param xml_dir_path str: The path to the directory where to put 
1003                                  the xml resulting files
1004         :param l_jobs List: the list of jobs that run today
1005         :param l_jobs_not_today List: the list of jobs that do not run today
1006         '''
1007         # The path of the global xml file
1008         self.xml_dir_path = xml_dir_path
1009         # Initialize the xml files
1010         xml_global_path = os.path.join(self.xml_dir_path, "global_report.xml")
1011         self.xml_global_file = src.xmlManager.XmlLogFile(xml_global_path,
1012                                                          "JobsReport")
1013         # The xml files that corresponds to the boards.
1014         # {name_board : xml_object}}
1015         self.d_xml_board_files = {}
1016         # Create the lines and columns
1017         self.initialize_arrays(l_jobs, l_jobs_not_today)
1018         # Write the xml file
1019         self.update_xml_files(l_jobs)
1020     
1021     def initialize_arrays(self, l_jobs, l_jobs_not_today):
1022         '''Get all the first information needed for each file and write the 
1023            first version of the files   
1024         :param l_jobs List: the list of jobs that run today
1025         :param l_jobs_not_today List: the list of jobs that do not run today
1026         '''
1027         # Get the boards to fill and put it in a dictionary
1028         # {board_name : xml instance corresponding to the board}
1029         for job in l_jobs + l_jobs_not_today:
1030             board = job.board
1031             if (board is not None and 
1032                     board not in self.d_xml_board_files.keys()):
1033                 xml_board_path = os.path.join(self.xml_dir_path, board + ".xml")
1034                 self.d_xml_board_files[board] =  src.xmlManager.XmlLogFile(
1035                                                             xml_board_path,
1036                                                             "JobsReport")
1037                 self.d_xml_board_files[board].add_simple_node("distributions")
1038                 self.d_xml_board_files[board].add_simple_node("applications")
1039                 self.d_xml_board_files[board].add_simple_node("board", text=board)
1040         
1041         # Loop over all jobs in order to get the lines and columns for each 
1042         # xml file
1043         d_dist = {}
1044         d_application = {}
1045         for board in self.d_xml_board_files:
1046             d_dist[board] = []
1047             d_application[board] = []
1048             
1049         l_hosts_ports = []
1050             
1051         for job in l_jobs + l_jobs_not_today:
1052             
1053             if (job.machine.host, job.machine.port) not in l_hosts_ports:
1054                 l_hosts_ports.append((job.machine.host, job.machine.port))
1055                 
1056             distrib = job.machine.distribution
1057             application = job.application
1058             
1059             board_job = job.board
1060             if board is None:
1061                 continue
1062             for board in self.d_xml_board_files:
1063                 if board_job == board:
1064                     if distrib is not None and distrib not in d_dist[board]:
1065                         d_dist[board].append(distrib)
1066                         src.xmlManager.add_simple_node(
1067                             self.d_xml_board_files[board].xmlroot.find('distributions'),
1068                                                    "dist",
1069                                                    attrib={"name" : distrib})
1070                     
1071                 if board_job == board:
1072                     if (application is not None and 
1073                                     application not in d_application[board]):
1074                         d_application[board].append(application)
1075                         src.xmlManager.add_simple_node(
1076                             self.d_xml_board_files[board].xmlroot.find('applications'),
1077                                                    "application",
1078                                                    attrib={"name" : application})
1079
1080         # Initialize the hosts_ports node for the global file
1081         self.xmlhosts_ports = self.xml_global_file.add_simple_node("hosts_ports")
1082         for host, port in l_hosts_ports:
1083             host_port = "%s:%i" % (host, port)
1084             src.xmlManager.add_simple_node(self.xmlhosts_ports,
1085                                            "host_port",
1086                                            attrib={"name" : host_port})
1087         
1088         # Initialize the jobs node in all files
1089         for xml_file in [self.xml_global_file] + list(self.d_xml_board_files.values()):
1090             xml_jobs = xml_file.add_simple_node("jobs")      
1091             # Get the jobs present in the config file but 
1092             # that will not be launched today
1093             self.put_jobs_not_today(l_jobs_not_today, xml_jobs)
1094             
1095             xml_file.add_simple_node("infos",
1096                                      attrib={"name" : "last update",
1097                                              "JobsCommandStatus" : "running"})
1098
1099     
1100     def put_jobs_not_today(self, l_jobs_not_today, xml_node_jobs):
1101         '''Get all the first information needed for each file and write the 
1102            first version of the files   
1103
1104         :param xml_node_jobs etree.Element: the node corresponding to a job
1105         :param l_jobs_not_today List: the list of jobs that do not run today
1106         '''
1107         for job in l_jobs_not_today:
1108             xmlj = src.xmlManager.add_simple_node(xml_node_jobs,
1109                                                  "job",
1110                                                  attrib={"name" : job.name})
1111             src.xmlManager.add_simple_node(xmlj, "application", job.application)
1112             src.xmlManager.add_simple_node(xmlj,
1113                                            "distribution",
1114                                            job.machine.distribution)
1115             src.xmlManager.add_simple_node(xmlj, "board", job.board)
1116             src.xmlManager.add_simple_node(xmlj,
1117                                        "commands", " ; ".join(job.commands))
1118             src.xmlManager.add_simple_node(xmlj, "state", "Not today")
1119             src.xmlManager.add_simple_node(xmlj, "machine", job.machine.name)
1120             src.xmlManager.add_simple_node(xmlj, "host", job.machine.host)
1121             src.xmlManager.add_simple_node(xmlj, "port", str(job.machine.port))
1122             src.xmlManager.add_simple_node(xmlj, "user", job.machine.user)
1123             src.xmlManager.add_simple_node(xmlj, "sat_path",
1124                                                         job.machine.sat_path)
1125     
1126     def update_xml_files(self, l_jobs):
1127         '''Write all the xml files with updated information about the jobs   
1128
1129         :param l_jobs List: the list of jobs that run today
1130         '''
1131         for xml_file in [self.xml_global_file] + list(self.d_xml_board_files.values()):
1132             self.update_xml_file(l_jobs, xml_file)
1133             
1134         # Write the file
1135         self.write_xml_files()
1136             
1137     def update_xml_file(self, l_jobs, xml_file):      
1138         '''update information about the jobs for the file xml_file   
1139
1140         :param l_jobs List: the list of jobs that run today
1141         :param xml_file xmlManager.XmlLogFile: the xml instance to update
1142         '''
1143         
1144         xml_node_jobs = xml_file.xmlroot.find('jobs')
1145         # Update the job names and status node
1146         for job in l_jobs:
1147             # Find the node corresponding to the job and delete it
1148             # in order to recreate it
1149             for xmljob in xml_node_jobs.findall('job'):
1150                 if xmljob.attrib['name'] == job.name:
1151                     xml_node_jobs.remove(xmljob)
1152             
1153             T0 = str(job._T0)
1154             if T0 != "-1":
1155                 T0 = time.strftime('%Y-%m-%d %H:%M:%S', 
1156                                        time.localtime(job._T0))
1157             Tf = str(job._Tf)
1158             if Tf != "-1":
1159                 Tf = time.strftime('%Y-%m-%d %H:%M:%S', 
1160                                        time.localtime(job._Tf))
1161             
1162             # recreate the job node
1163             xmlj = src.xmlManager.add_simple_node(xml_node_jobs,
1164                                                   "job",
1165                                                   attrib={"name" : job.name})
1166             src.xmlManager.add_simple_node(xmlj, "machine", job.machine.name)
1167             src.xmlManager.add_simple_node(xmlj, "host", job.machine.host)
1168             src.xmlManager.add_simple_node(xmlj, "port", str(job.machine.port))
1169             src.xmlManager.add_simple_node(xmlj, "user", job.machine.user)
1170             src.xmlManager.add_simple_node(xmlj, "sat_path",
1171                                            job.machine.sat_path)
1172             src.xmlManager.add_simple_node(xmlj, "application", job.application)
1173             src.xmlManager.add_simple_node(xmlj, "distribution",
1174                                            job.machine.distribution)
1175             src.xmlManager.add_simple_node(xmlj, "board", job.board)
1176             src.xmlManager.add_simple_node(xmlj, "timeout", str(job.timeout))
1177             src.xmlManager.add_simple_node(xmlj, "commands",
1178                                            " ; ".join(job.commands))
1179             src.xmlManager.add_simple_node(xmlj, "state", job.get_status())
1180             src.xmlManager.add_simple_node(xmlj, "begin", T0)
1181             src.xmlManager.add_simple_node(xmlj, "end", Tf)
1182             src.xmlManager.add_simple_node(xmlj, "out",
1183                                            src.printcolors.cleancolor(job.out))
1184             src.xmlManager.add_simple_node(xmlj, "err",
1185                                            src.printcolors.cleancolor(job.err))
1186             src.xmlManager.add_simple_node(xmlj, "res", str(job.res_job))
1187             if len(job.remote_log_files) > 0:
1188                 src.xmlManager.add_simple_node(xmlj,
1189                                                "remote_log_file_path",
1190                                                job.remote_log_files[0])
1191             else:
1192                 src.xmlManager.add_simple_node(xmlj,
1193                                                "remote_log_file_path",
1194                                                "nothing")           
1195             
1196             xmlafter = src.xmlManager.add_simple_node(xmlj, "after", job.after)
1197             # get the job father
1198             if job.after is not None:
1199                 job_father = None
1200                 for jb in l_jobs:
1201                     if jb.name == job.after:
1202                         job_father = jb
1203                 
1204                 if (job_father is not None and 
1205                         len(job_father.remote_log_files) > 0):
1206                     link = job_father.remote_log_files[0]
1207                 else:
1208                     link = "nothing"
1209                 src.xmlManager.append_node_attrib(xmlafter, {"link" : link})
1210             
1211         
1212         # Update the date
1213         xml_node_infos = xml_file.xmlroot.find('infos')
1214         src.xmlManager.append_node_attrib(xml_node_infos,
1215                     attrib={"value" : 
1216                     datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")})
1217                
1218
1219     
1220     def last_update(self, finish_status = "finished"):
1221         '''update information about the jobs for the file xml_file   
1222
1223         :param l_jobs List: the list of jobs that run today
1224         :param xml_file xmlManager.XmlLogFile: the xml instance to update
1225         '''
1226         for xml_file in [self.xml_global_file] + list(self.d_xml_board_files.values()):
1227             xml_node_infos = xml_file.xmlroot.find('infos')
1228             src.xmlManager.append_node_attrib(xml_node_infos,
1229                         attrib={"JobsCommandStatus" : finish_status})
1230         # Write the file
1231         self.write_xml_files()
1232     
1233     def write_xml_files(self):
1234         ''' Write the xml files   
1235         '''
1236         self.xml_global_file.write_tree(STYLESHEET_GLOBAL)
1237         for xml_file in self.d_xml_board_files.values():
1238             xml_file.write_tree(STYLESHEET_BOARD)
1239         
1240 ##
1241 # Describes the command
1242 def description():
1243     return _("The jobs command launches maintenances that are described"
1244              " in the dedicated jobs configuration file.")
1245
1246 ##
1247 # Runs the command.
1248 def run(args, runner, logger):
1249        
1250     (options, args) = parser.parse_args(args)
1251     
1252     jobs_cfg_files_dir = runner.cfg.SITE.jobs.config_path
1253     
1254     l_cfg_dir = [os.path.join(runner.cfg.VARS.datadir, "jobs"),
1255                  jobs_cfg_files_dir]
1256     
1257     # Make sure the path to the jobs config files directory exists 
1258     src.ensure_path_exists(jobs_cfg_files_dir)   
1259     
1260     # list option : display all the available config files
1261     if options.list:
1262         for cfg_dir in l_cfg_dir:
1263             if not options.no_label:
1264                 logger.write("------ %s\n" % 
1265                                  src.printcolors.printcHeader(cfg_dir))
1266     
1267             for f in sorted(os.listdir(cfg_dir)):
1268                 if not f.endswith('.pyconf'):
1269                     continue
1270                 cfilename = f[:-7]
1271                 logger.write("%s\n" % cfilename)
1272         return 0
1273
1274     # Make sure the jobs_config option has been called
1275     if not options.jobs_cfg:
1276         message = _("The option --jobs_config is required\n")      
1277         src.printcolors.printcError(message)
1278         return 1
1279     
1280     # Find the file in the directories
1281     found = False
1282     for cfg_dir in l_cfg_dir:
1283         file_jobs_cfg = os.path.join(cfg_dir, options.jobs_cfg)
1284         if not file_jobs_cfg.endswith('.pyconf'):
1285             file_jobs_cfg += '.pyconf'
1286         
1287         if not os.path.exists(file_jobs_cfg):
1288             continue
1289         else:
1290             found = True
1291             break
1292     
1293     if not found:
1294         msg = _("The file configuration %(name_file)s was not found."
1295                 "\nUse the --list option to get the possible files.")
1296         src.printcolors.printcError(msg)
1297         return 1
1298     
1299     info = [
1300         (_("Platform"), runner.cfg.VARS.dist),
1301         (_("File containing the jobs configuration"), file_jobs_cfg)
1302     ]    
1303     src.print_info(logger, info)
1304     
1305     # Read the config that is in the file
1306     config_jobs = src.read_config_from_a_file(file_jobs_cfg)
1307     if options.only_jobs:
1308         l_jb = src.pyconf.Sequence()
1309         for jb in config_jobs.jobs:
1310             if jb.name in options.only_jobs:
1311                 l_jb.append(jb,
1312                 "Adding a job that was given in only_jobs option parameters")
1313         config_jobs.jobs = l_jb
1314      
1315     # Initialization
1316     today_jobs = Jobs(runner,
1317                       logger,
1318                       options.jobs_cfg,
1319                       file_jobs_cfg,
1320                       config_jobs)
1321     # SSH connection to all machines
1322     today_jobs.ssh_connection_all_machines()
1323     if options.test_connection:
1324         return 0
1325     
1326     gui = None
1327     if options.publish:
1328         gui = Gui(runner.cfg.SITE.log.log_dir,
1329                   today_jobs.ljobs,
1330                   today_jobs.ljobs_not_today,)
1331     
1332     today_jobs.gui = gui
1333     
1334     interruped = False
1335     try:
1336         # Run all the jobs contained in config_jobs
1337         today_jobs.run_jobs()
1338     except KeyboardInterrupt:
1339         interruped = True
1340         logger.write("\n\n%s\n\n" % 
1341                 (src.printcolors.printcWarning(_("Forced interruption"))), 1)
1342     finally:
1343         if interruped:
1344             msg = _("Killing the running jobs and trying"
1345                     " to get the corresponding logs\n")
1346             logger.write(src.printcolors.printcWarning(msg))
1347             
1348         # find the potential not finished jobs and kill them
1349         for jb in today_jobs.ljobs:
1350             if not jb.has_finished():
1351                 jb.kill_remote_process()
1352         if interruped:
1353             today_jobs.gui.last_update(_("Forced interruption"))
1354         else:
1355             today_jobs.gui.last_update()
1356         # Output the results
1357         today_jobs.write_all_results()