3 # Copyright (C) 2010-2013 CEA/DEN
5 # This library is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation; either
8 # version 2.1 of the License.
10 # This library is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 # Lesser General Public License for more details.
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this library; if not, write to the Free Software
17 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
29 STYLESHEET_GLOBAL = "jobs_global_report.xsl"
30 STYLESHEET_BOARD = "jobs_board_report.xsl"
35 parser = src.options.Options()
37 parser.add_option('n', 'name', 'string', 'jobs_cfg',
38 _('The name of the config file that contains'
39 ' the jobs configuration'))
40 parser.add_option('o', 'only_jobs', 'list2', 'only_jobs',
41 _('Optional: the list of jobs to launch, by their name. '))
42 parser.add_option('l', 'list', 'boolean', 'list',
43 _('Optional: list all available config files.'))
44 parser.add_option('t', 'test_connection', 'boolean', 'test_connection',
45 _("Optional: try to connect to the machines. "
46 "Not executing the jobs."),
48 parser.add_option('p', 'publish', 'boolean', 'publish',
49 _("Optional: generate an xml file that can be read in a "
50 "browser to display the jobs status."),
52 parser.add_option('i', 'input_boards', 'string', 'input_boards', _("Optional: "
53 "the path to csv file that contain "
54 "the expected boards."),"")
55 parser.add_option('n', 'completion', 'boolean', 'no_label',
56 _("Optional (internal use): do not print labels, Works only "
60 class Machine(object):
61 '''Class to manage a ssh connection on a machine
69 sat_path="salomeTools"):
73 self.distribution = None # Will be filled after copying SAT on the machine
75 self.password = passwd
76 self.sat_path = sat_path
77 self.ssh = paramiko.SSHClient()
78 self._connection_successful = None
80 def connect(self, logger):
81 '''Initiate the ssh connection to the remote machine
83 :param logger src.logger.Logger: The logger instance
88 self._connection_successful = False
89 self.ssh.load_system_host_keys()
90 self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
92 self.ssh.connect(self.host,
95 password = self.password)
96 except paramiko.AuthenticationException:
97 message = src.KO_STATUS + _("Authentication failed")
98 except paramiko.BadHostKeyException:
99 message = (src.KO_STATUS +
100 _("The server's host key could not be verified"))
101 except paramiko.SSHException:
102 message = ( _("SSHException error connecting or "
103 "establishing an SSH session"))
105 message = ( _("Error connecting or establishing an SSH session"))
107 self._connection_successful = True
111 def successfully_connected(self, logger):
112 '''Verify if the connection to the remote machine has succeed
114 :param logger src.logger.Logger: The logger instance
115 :return: True if the connection has succeed, False if not
118 if self._connection_successful == None:
119 message = _("Warning : trying to ask if the connection to "
120 "(name: %s host: %s, port: %s, user: %s) is OK whereas there were"
121 " no connection request" %
122 (self.name, self.host, self.port, self.user))
123 logger.write( src.printcolors.printcWarning(message))
124 return self._connection_successful
126 def copy_sat(self, sat_local_path, job_file):
127 '''Copy salomeTools to the remote machine in self.sat_path
131 # open a sftp connection
132 self.sftp = self.ssh.open_sftp()
133 # Create the sat directory on remote machine if it is not existing
134 self.mkdir(self.sat_path, ignore_existing=True)
136 self.put_dir(sat_local_path, self.sat_path, filters = ['.git'])
137 # put the job configuration file in order to make it reachable
138 # on the remote machine
139 self.sftp.put(job_file, os.path.join(".salomeTools",
141 ".jobs_command_file.pyconf"))
142 except Exception as e:
144 self._connection_successful = False
148 def put_dir(self, source, target, filters = []):
149 ''' Uploads the contents of the source directory to the target path. The
150 target directory needs to exists. All sub-directories in source are
151 created under target.
153 for item in os.listdir(source):
156 source_path = os.path.join(source, item)
157 destination_path = os.path.join(target, item)
158 if os.path.islink(source_path):
159 linkto = os.readlink(source_path)
161 self.sftp.symlink(linkto, destination_path)
162 self.sftp.chmod(destination_path,
163 os.stat(source_path).st_mode)
167 if os.path.isfile(source_path):
168 self.sftp.put(source_path, destination_path)
169 self.sftp.chmod(destination_path,
170 os.stat(source_path).st_mode)
172 self.mkdir(destination_path, ignore_existing=True)
173 self.put_dir(source_path, destination_path)
175 def mkdir(self, path, mode=511, ignore_existing=False):
176 ''' Augments mkdir by adding an option to not fail
180 self.sftp.mkdir(path, mode)
187 def exec_command(self, command, logger):
188 '''Execute the command on the remote machine
190 :param command str: The command to be run
191 :param logger src.logger.Logger: The logger instance
192 :return: the stdin, stdout, and stderr of the executing command,
194 :rtype: (paramiko.channel.ChannelFile, paramiko.channel.ChannelFile,
195 paramiko.channel.ChannelFile)
198 # Does not wait the end of the command
199 (stdin, stdout, stderr) = self.ssh.exec_command(command)
200 except paramiko.SSHException:
201 message = src.KO_STATUS + _(
202 ": the server failed to execute the command\n")
203 logger.write( src.printcolors.printcError(message))
204 return (None, None, None)
206 logger.write( src.printcolors.printcError(src.KO_STATUS + '\n'))
207 return (None, None, None)
209 return (stdin, stdout, stderr)
212 '''Close the ssh connection
218 def write_info(self, logger):
219 '''Prints the informations relative to the machine in the logger
220 (terminal traces and log file)
222 :param logger src.logger.Logger: The logger instance
226 logger.write("host : " + self.host + "\n")
227 logger.write("port : " + str(self.port) + "\n")
228 logger.write("user : " + str(self.user) + "\n")
229 if self.successfully_connected(logger):
230 status = src.OK_STATUS
232 status = src.KO_STATUS
233 logger.write("Connection : " + status + "\n\n")
237 '''Class to manage one job
239 def __init__(self, name, machine, application, board,
240 commands, timeout, config, logger, after=None):
243 self.machine = machine
245 self.timeout = timeout
246 self.application = application
250 # The list of log files to download from the remote machine
251 self.remote_log_files = []
253 # The remote command status
254 # -1 means that it has not been launched,
255 # 0 means success and 1 means fail
257 self.cancelled = False
261 self._has_begun = False
262 self._has_finished = False
263 self._has_timouted = False
264 self._stdin = None # Store the command inputs field
265 self._stdout = None # Store the command outputs field
266 self._stderr = None # Store the command errors field
271 self.commands = commands
272 self.command = (os.path.join(self.machine.sat_path, "sat") +
274 os.path.join(self.machine.sat_path,
275 "list_log_files.txt") +
276 " job --jobs_config .jobs_command_file" +
281 """ Get the pid(s) corresponding to the command that have been launched
282 On the remote machine
284 :return: The list of integers corresponding to the found pids
288 cmd_pid = 'ps aux | grep "' + self.command + '" | awk \'{print $2}\''
289 (_, out_pid, _) = self.machine.exec_command(cmd_pid, self.logger)
290 pids_cmd = out_pid.readlines()
291 pids_cmd = [str(src.only_numbers(pid)) for pid in pids_cmd]
295 def kill_remote_process(self, wait=1):
296 '''Kills the process on the remote machine.
298 :return: (the output of the kill, the error of the kill)
302 pids = self.get_pids()
303 cmd_kill = " ; ".join([("kill -2 " + pid) for pid in pids])
304 (_, out_kill, err_kill) = self.machine.exec_command(cmd_kill,
307 return (out_kill, err_kill)
310 '''Returns True if the job has already begun
312 :return: True if the job has already begun
315 return self._has_begun
317 def has_finished(self):
318 '''Returns True if the job has already finished
319 (i.e. all the commands have been executed)
320 If it is finished, the outputs are stored in the fields out and err.
322 :return: True if the job has already finished
326 # If the method has already been called and returned True
327 if self._has_finished:
330 # If the job has not begun yet
331 if not self.has_begun():
334 if self._stdout.channel.closed:
335 self._has_finished = True
336 # Store the result outputs
337 self.out += self._stdout.read().decode()
338 self.err += self._stderr.read().decode()
340 self._Tf = time.time()
341 # And get the remote command status and log files
344 return self._has_finished
346 def get_log_files(self):
347 """Get the log files produced by the command launched
348 on the remote machine, and put it in the log directory of the user,
349 so they can be accessible from
351 # Do not get the files if the command is not finished
352 if not self.has_finished():
353 msg = _("Trying to get log files whereas the job is not finished.")
354 self.logger.write(src.printcolors.printcWarning(msg))
357 # First get the file that contains the list of log files to get
358 tmp_file_path = src.get_tmp_filename(self.config, "list_log_files.txt")
359 remote_path = os.path.join(self.machine.sat_path, "list_log_files.txt")
360 self.machine.sftp.get(
364 # Read the file and get the result of the command and all the log files
366 fstream_tmp = open(tmp_file_path, "r")
367 file_lines = fstream_tmp.readlines()
368 file_lines = [line.replace("\n", "") for line in file_lines]
370 os.remove(tmp_file_path)
373 # The first line is the result of the command (0 success or 1 fail)
374 self.res_job = file_lines[0]
375 except Exception as e:
376 self.err += _("Unable to get status from remote file %s: %s" %
377 (remote_path, str(e)))
379 for i, job_path_remote in enumerate(file_lines[1:]):
381 # For each command, there is two files to get :
382 # 1- The xml file describing the command and giving the
384 # 2- The txt file containing the system command traces (like
385 # traces produced by the "make" command)
386 # 3- In case of the test command, there is another file to get :
387 # the xml board that contain the test results
388 dirname = os.path.basename(os.path.dirname(job_path_remote))
389 if dirname != 'OUT' and dirname != 'TEST':
391 local_path = os.path.join(os.path.dirname(
392 self.logger.logFilePath),
393 os.path.basename(job_path_remote))
394 if i==0: # The first is the job command
395 self.logger.add_link(os.path.basename(job_path_remote),
399 elif dirname == 'OUT':
401 local_path = os.path.join(os.path.dirname(
402 self.logger.logFilePath),
404 os.path.basename(job_path_remote))
405 elif dirname == 'TEST':
407 local_path = os.path.join(os.path.dirname(
408 self.logger.logFilePath),
410 os.path.basename(job_path_remote))
413 if not os.path.exists(local_path):
414 self.machine.sftp.get(job_path_remote, local_path)
415 self.remote_log_files.append(local_path)
416 except Exception as e:
417 self.err += _("Unable to get %s log file from remote: %s" %
418 (str(job_path_remote),
421 def has_failed(self):
422 '''Returns True if the job has failed.
423 A job is considered as failed if the machine could not be reached,
424 if the remote command failed,
425 or if the job finished with a time out.
427 :return: True if the job has failed
430 if not self.has_finished():
432 if not self.machine.successfully_connected(self.logger):
434 if self.is_timeout():
436 if self.res_job == "1":
441 """In case of a failing job, one has to cancel every job that depend
442 on it. This method put the job as failed and will not be executed.
446 self._has_begun = True
447 self._has_finished = True
448 self.cancelled = True
449 self.out += _("This job was not launched because its father has failed.")
450 self.err += _("This job was not launched because its father has failed.")
452 def is_running(self):
453 '''Returns True if the job commands are running
455 :return: True if the job is running
458 return self.has_begun() and not self.has_finished()
460 def is_timeout(self):
461 '''Returns True if the job commands has finished with timeout
463 :return: True if the job has finished with timeout
466 return self._has_timouted
468 def time_elapsed(self):
469 """Get the time elapsed since the job launching
471 :return: The number of seconds
474 if not self.has_begun():
477 return T_now - self._T0
479 def check_time(self):
480 """Verify that the job has not exceeded its timeout.
481 If it has, kill the remote command and consider the job as finished.
483 if not self.has_begun():
485 if self.time_elapsed() > self.timeout:
486 self._has_finished = True
487 self._has_timouted = True
488 self._Tf = time.time()
490 (out_kill, _) = self.kill_remote_process()
491 self.out += "TIMEOUT \n" + out_kill.read().decode()
492 self.err += "TIMEOUT : %s seconds elapsed\n" % str(self.timeout)
495 except Exception as e:
496 self.err += _("Unable to get remote log files: %s" % e)
498 def total_duration(self):
499 """Give the total duration of the job
501 :return: the total duration of the job in seconds
504 return self._Tf - self._T0
507 """Launch the job by executing the remote command.
510 # Prevent multiple run
512 msg = _("Warning: A job can only be launched one time")
513 msg2 = _("Trying to launch the job \"%s\" whereas it has "
514 "already been launched." % self.name)
515 self.logger.write(src.printcolors.printcWarning("%s\n%s\n" % (msg,
519 # Do not execute the command if the machine could not be reached
520 if not self.machine.successfully_connected(self.logger):
521 self._has_finished = True
523 self.err += ("Connection to machine (name : %s, host: %s, port:"
524 " %s, user: %s) has failed\nUse the log command "
525 "to get more information."
526 % (self.machine.name,
531 # Usual case : Launch the command on remote machine
532 self._T0 = time.time()
533 self._stdin, self._stdout, self._stderr = self.machine.exec_command(
536 # If the results are not initialized, finish the job
537 if (self._stdin, self._stdout, self._stderr) == (None, None, None):
538 self._has_finished = True
539 self._Tf = time.time()
541 self.err += "The server failed to execute the command"
543 # Put the beginning flag to true.
544 self._has_begun = True
546 def write_results(self):
547 """Display on the terminal all the job's information
549 self.logger.write("name : " + self.name + "\n")
551 self.logger.write("after : %s\n" % self.after)
552 self.logger.write("Time elapsed : %4imin %2is \n" %
553 (self.total_duration()//60 , self.total_duration()%60))
555 self.logger.write("Begin time : %s\n" %
556 time.strftime('%Y-%m-%d %H:%M:%S',
557 time.localtime(self._T0)) )
559 self.logger.write("End time : %s\n\n" %
560 time.strftime('%Y-%m-%d %H:%M:%S',
561 time.localtime(self._Tf)) )
563 machine_head = "Informations about connection :\n"
564 underline = (len(machine_head) - 2) * "-"
565 self.logger.write(src.printcolors.printcInfo(
566 machine_head+underline+"\n"))
567 self.machine.write_info(self.logger)
569 self.logger.write(src.printcolors.printcInfo("out : \n"))
571 self.logger.write("Unable to get output\n")
573 self.logger.write(self.out + "\n")
574 self.logger.write(src.printcolors.printcInfo("err : \n"))
575 self.logger.write(self.err + "\n")
577 def get_status(self):
578 """Get the status of the job (used by the Gui for xml display)
580 :return: The current status of the job
583 if not self.machine.successfully_connected(self.logger):
584 return "SSH connection KO"
585 if not self.has_begun():
586 return "Not launched"
589 if self.is_running():
590 return "running since " + time.strftime('%Y-%m-%d %H:%M:%S',
591 time.localtime(self._T0))
592 if self.has_finished():
593 if self.is_timeout():
594 return "Timeout since " + time.strftime('%Y-%m-%d %H:%M:%S',
595 time.localtime(self._Tf))
596 return "Finished since " + time.strftime('%Y-%m-%d %H:%M:%S',
597 time.localtime(self._Tf))
600 '''Class to manage the jobs to be run
607 lenght_columns = 20):
608 # The jobs configuration
609 self.cfg_jobs = config_jobs
610 self.job_file_path = job_file_path
611 # The machine that will be used today
613 # The list of machine (hosts, port) that will be used today
614 # (a same host can have several machine instances since there
615 # can be several ssh parameters)
617 # The jobs to be launched today
619 # The jobs that will not be launched today
620 self.ljobs_not_today = []
623 self.len_columns = lenght_columns
625 # the list of jobs that have not been run yet
626 self._l_jobs_not_started = []
627 # the list of jobs that have already ran
628 self._l_jobs_finished = []
629 # the list of jobs that are running
630 self._l_jobs_running = []
632 self.determine_jobs_and_machines()
634 def define_job(self, job_def, machine):
635 '''Takes a pyconf job definition and a machine (from class machine)
636 and returns the job instance corresponding to the definition.
638 :param job_def src.config.Mapping: a job definition
639 :param machine machine: the machine on which the job will run
640 :return: The corresponding job in a job class instance
644 cmmnds = job_def.commands
645 if not "timeout" in job_def:
646 timeout = 4*60*60 # default timeout = 4h
648 timeout = job_def.timeout
650 if 'after' in job_def:
651 after = job_def.after
653 if 'application' in job_def:
654 application = job_def.application
656 if 'board' in job_def:
657 board = job_def.board
669 def determine_jobs_and_machines(self):
670 '''Function that reads the pyconf jobs definition and instantiates all
671 the machines and jobs to be done today.
676 today = datetime.date.weekday(datetime.date.today())
679 for job_def in self.cfg_jobs.jobs :
681 if not "machine" in job_def:
682 msg = _('WARNING: The job "%s" do not have the key '
683 '"machine", this job is ignored.\n\n' % job_def.name)
684 self.logger.write(src.printcolors.printcWarning(msg))
686 name_machine = job_def.machine
689 for mach in self.lmachines:
690 if mach.name == name_machine:
694 if a_machine == None:
695 for machine_def in self.cfg_jobs.machines:
696 if machine_def.name == name_machine:
697 if 'host' not in machine_def:
698 host = self.runner.cfg.VARS.hostname
700 host = machine_def.host
702 if 'user' not in machine_def:
703 user = self.runner.cfg.VARS.user
705 user = machine_def.user
707 if 'port' not in machine_def:
710 port = machine_def.port
712 if 'password' not in machine_def:
715 passwd = machine_def.password
717 if 'sat_path' not in machine_def:
718 sat_path = "salomeTools"
720 sat_path = machine_def.sat_path
731 self.lmachines.append(a_machine)
732 if (host, port) not in host_list:
733 host_list.append((host, port))
735 if a_machine == None:
736 msg = _("WARNING: The job \"%(job_name)s\" requires the "
737 "machine \"%(machine_name)s\" but this machine "
738 "is not defined in the configuration file.\n"
739 "The job will not be launched")
740 self.logger.write(src.printcolors.printcWarning(msg))
742 a_job = self.define_job(job_def, a_machine)
744 if today in job_def.when:
745 self.ljobs.append(a_job)
746 else: # today in job_def.when
747 self.ljobs_not_today.append(a_job)
749 self.lhosts = host_list
751 def ssh_connection_all_machines(self, pad=50):
752 '''Function that do the ssh connection to every machine
758 self.logger.write(src.printcolors.printcInfo((
759 "Establishing connection with all the machines :\n")))
760 for machine in self.lmachines:
761 # little algorithm in order to display traces
762 begin_line = (_("Connection to %s: " % machine.name))
763 if pad - len(begin_line) < 0:
766 endline = (pad - len(begin_line)) * "." + " "
768 step = "SSH connection"
769 self.logger.write( begin_line + endline + step)
771 # the call to the method that initiate the ssh connection
772 msg = machine.connect(self.logger)
774 # Copy salomeTools to the remote machine
775 if machine.successfully_connected(self.logger):
777 self.logger.write('\r%s%s%s' % (begin_line, endline, 20 * " "),3)
778 self.logger.write('\r%s%s%s' % (begin_line, endline, step), 3)
780 res_copy = machine.copy_sat(self.runner.cfg.VARS.salometoolsway,
782 # get the remote machine distribution using a sat command
783 (__, out_dist, __) = machine.exec_command(
784 os.path.join(machine.sat_path,
785 "sat config --value VARS.dist --no_label"),
787 machine.distribution = out_dist.read().decode().replace("\n",
789 # Print the status of the copy
791 self.logger.write('\r%s' %
792 ((len(begin_line)+len(endline)+20) * " "), 3)
793 self.logger.write('\r%s%s%s' %
796 src.printcolors.printc(src.OK_STATUS)), 3)
798 self.logger.write('\r%s' %
799 ((len(begin_line)+len(endline)+20) * " "), 3)
800 self.logger.write('\r%s%s%s %s' %
803 src.printcolors.printc(src.OK_STATUS),
804 _("Copy of SAT failed")), 3)
806 self.logger.write('\r%s' %
807 ((len(begin_line)+len(endline)+20) * " "), 3)
808 self.logger.write('\r%s%s%s %s' %
811 src.printcolors.printc(src.KO_STATUS),
813 self.logger.write("\n", 3)
815 self.logger.write("\n")
818 def is_occupied(self, hostname):
819 '''Function that returns True if a job is running on
820 the machine defined by its host and its port.
822 :param hostname (str, int): the pair (host, port)
823 :return: the job that is running on the host,
824 or false if there is no job running on the host.
829 for jb in self.ljobs:
830 if jb.machine.host == host and jb.machine.port == port:
835 def update_jobs_states_list(self):
836 '''Function that updates the lists that store the currently
837 running jobs and the jobs that have already finished.
842 jobs_finished_list = []
843 jobs_running_list = []
844 for jb in self.ljobs:
846 jobs_running_list.append(jb)
848 if jb.has_finished():
849 jobs_finished_list.append(jb)
851 nb_job_finished_before = len(self._l_jobs_finished)
852 self._l_jobs_finished = jobs_finished_list
853 self._l_jobs_running = jobs_running_list
855 nb_job_finished_now = len(self._l_jobs_finished)
857 return nb_job_finished_now > nb_job_finished_before
859 def cancel_dependencies_of_failing_jobs(self):
860 '''Function that cancels all the jobs that depend on a failing one.
866 for job in self.ljobs:
867 if job.after is None:
869 father_job = self.find_job_that_has_name(job.after)
870 if father_job is not None and father_job.has_failed():
873 def find_job_that_has_name(self, name):
874 '''Returns the job by its name.
876 :param name str: a job name
877 :return: the job that has the name.
880 for jb in self.ljobs:
883 # the following is executed only if the job was not found
886 def str_of_length(self, text, length):
887 '''Takes a string text of any length and returns
888 the most close string of length "length".
890 :param text str: any string
891 :param length int: a length for the returned string
892 :return: the most close string of length "length"
895 if len(text) > length:
896 text_out = text[:length-3] + '...'
898 diff = length - len(text)
899 before = " " * (diff//2)
900 after = " " * (diff//2 + diff%2)
901 text_out = before + text + after
905 def display_status(self, len_col):
906 '''Takes a lenght and construct the display of the current status
907 of the jobs in an array that has a column for each host.
908 It displays the job that is currently running on the host
911 :param len_col int: the size of the column
917 for host_port in self.lhosts:
918 jb = self.is_occupied(host_port)
919 if not jb: # nothing running on the host
920 empty = self.str_of_length("empty", len_col)
921 display_line += "|" + empty
923 display_line += "|" + src.printcolors.printcInfo(
924 self.str_of_length(jb.name, len_col))
926 self.logger.write("\r" + display_line + "|")
931 '''The main method. Runs all the jobs on every host.
932 For each host, at a given time, only one job can be running.
933 The jobs that have the field after (that contain the job that has
934 to be run before it) are run after the previous job.
935 This method stops when all the jobs are finished.
942 self.logger.write(src.printcolors.printcInfo(
943 _('Executing the jobs :\n')))
945 for host_port in self.lhosts:
948 if port == 22: # default value
949 text_line += "|" + self.str_of_length(host, self.len_columns)
951 text_line += "|" + self.str_of_length(
952 "("+host+", "+str(port)+")", self.len_columns)
954 tiret_line = " " + "-"*(len(text_line)-1) + "\n"
955 self.logger.write(tiret_line)
956 self.logger.write(text_line + "|\n")
957 self.logger.write(tiret_line)
960 # The infinite loop that runs the jobs
961 l_jobs_not_started = src.deepcopy_list(self.ljobs)
962 while len(self._l_jobs_finished) != len(self.ljobs):
963 new_job_start = False
964 for host_port in self.lhosts:
966 if self.is_occupied(host_port):
969 for jb in l_jobs_not_started:
970 if (jb.machine.host, jb.machine.port) != host_port:
974 l_jobs_not_started.remove(jb)
978 jb_before = self.find_job_that_has_name(jb.after)
979 if jb_before is None:
981 msg = _("This job was not launched because its "
982 "father is not in the jobs list.")
986 if jb_before.has_finished():
988 l_jobs_not_started.remove(jb)
991 self.cancel_dependencies_of_failing_jobs()
992 new_job_finished = self.update_jobs_states_list()
994 if new_job_start or new_job_finished:
996 self.gui.update_xml_files(self.ljobs)
997 # Display the current status
998 self.display_status(self.len_columns)
1000 # Make sure that the proc is not entirely busy
1003 self.logger.write("\n")
1004 self.logger.write(tiret_line)
1005 self.logger.write("\n\n")
1008 self.gui.update_xml_files(self.ljobs)
1009 self.gui.last_update()
1011 def write_all_results(self):
1012 '''Display all the jobs outputs.
1018 for jb in self.ljobs:
1019 self.logger.write(src.printcolors.printcLabel(
1020 "#------- Results for job %s -------#\n" % jb.name))
1022 self.logger.write("\n\n")
1025 '''Class to manage the the xml data that can be displayed in a browser to
1029 def __init__(self, xml_dir_path, l_jobs, l_jobs_not_today, prefix, file_boards=""):
1032 :param xml_dir_path str: The path to the directory where to put
1033 the xml resulting files
1034 :param l_jobs List: the list of jobs that run today
1035 :param l_jobs_not_today List: the list of jobs that do not run today
1036 :param file_boards str: the file path from which to read the
1039 # The prefix to add to the xml files : date_hour
1040 self.prefix = prefix
1042 # The path of the csv files to read to fill the expected boards
1043 self.file_boards = file_boards
1045 if file_boards != "":
1046 today = datetime.date.weekday(datetime.date.today())
1047 self.parse_csv_boards(today)
1049 self.d_input_boards = {}
1051 # The path of the global xml file
1052 self.xml_dir_path = xml_dir_path
1053 # Initialize the xml files
1054 xml_global_path = os.path.join(self.xml_dir_path, "global_report.xml")
1055 self.xml_global_file = src.xmlManager.XmlLogFile(xml_global_path,
1057 # The xml files that corresponds to the boards.
1058 # {name_board : xml_object}}
1059 self.d_xml_board_files = {}
1060 # Create the lines and columns
1061 self.initialize_boards(l_jobs, l_jobs_not_today)
1063 # Write the xml file
1064 self.update_xml_files(l_jobs)
1066 def add_xml_board(self, name):
1067 xml_board_path = os.path.join(self.xml_dir_path, name + ".xml")
1068 self.d_xml_board_files[name] = src.xmlManager.XmlLogFile(
1071 self.d_xml_board_files[name].add_simple_node("distributions")
1072 self.d_xml_board_files[name].add_simple_node("applications")
1073 self.d_xml_board_files[name].add_simple_node("board", text=name)
1075 def initialize_boards(self, l_jobs, l_jobs_not_today):
1076 '''Get all the first information needed for each file and write the
1077 first version of the files
1078 :param l_jobs List: the list of jobs that run today
1079 :param l_jobs_not_today List: the list of jobs that do not run today
1081 # Get the boards to fill and put it in a dictionary
1082 # {board_name : xml instance corresponding to the board}
1083 for job in l_jobs + l_jobs_not_today:
1085 if (board is not None and
1086 board not in self.d_xml_board_files.keys()):
1087 self.add_xml_board(board)
1089 # Verify that the boards given as input are done
1090 for board in list(self.d_input_boards.keys()):
1091 if board not in self.d_xml_board_files:
1092 self.add_xml_board(board)
1093 root_node = self.d_xml_board_files[board].xmlroot
1094 src.xmlManager.append_node_attrib(root_node,
1095 {"input_file" : self.file_boards})
1097 # Loop over all jobs in order to get the lines and columns for each
1101 for board in self.d_xml_board_files:
1103 d_application[board] = []
1107 for job in l_jobs + l_jobs_not_today:
1109 if (job.machine.host, job.machine.port) not in l_hosts_ports:
1110 l_hosts_ports.append((job.machine.host, job.machine.port))
1112 distrib = job.machine.distribution
1113 application = job.application
1115 board_job = job.board
1118 for board in self.d_xml_board_files:
1119 if board_job == board:
1120 if distrib is not None and distrib not in d_dist[board]:
1121 d_dist[board].append(distrib)
1122 src.xmlManager.add_simple_node(
1123 self.d_xml_board_files[board].xmlroot.find(
1126 attrib={"name" : distrib})
1128 if board_job == board:
1129 if (application is not None and
1130 application not in d_application[board]):
1131 d_application[board].append(application)
1132 src.xmlManager.add_simple_node(
1133 self.d_xml_board_files[board].xmlroot.find(
1137 "name" : application})
1139 # Verify that there are no missing application or distribution in the
1140 # xml board files (regarding the input boards)
1141 for board in self.d_xml_board_files:
1142 l_dist = d_dist[board]
1143 if board not in self.d_input_boards.keys():
1145 for dist in self.d_input_boards[board]["rows"]:
1146 if dist not in l_dist:
1147 src.xmlManager.add_simple_node(
1148 self.d_xml_board_files[board].xmlroot.find(
1151 attrib={"name" : dist})
1152 l_appli = d_application[board]
1153 for appli in self.d_input_boards[board]["columns"]:
1154 if appli not in l_appli:
1155 src.xmlManager.add_simple_node(
1156 self.d_xml_board_files[board].xmlroot.find(
1159 attrib={"name" : appli})
1161 # Initialize the hosts_ports node for the global file
1162 self.xmlhosts_ports = self.xml_global_file.add_simple_node(
1164 for host, port in l_hosts_ports:
1165 host_port = "%s:%i" % (host, port)
1166 src.xmlManager.add_simple_node(self.xmlhosts_ports,
1168 attrib={"name" : host_port})
1170 # Initialize the jobs node in all files
1171 for xml_file in [self.xml_global_file] + list(
1172 self.d_xml_board_files.values()):
1173 xml_jobs = xml_file.add_simple_node("jobs")
1174 # Get the jobs present in the config file but
1175 # that will not be launched today
1176 self.put_jobs_not_today(l_jobs_not_today, xml_jobs)
1178 xml_file.add_simple_node("infos",
1179 attrib={"name" : "last update",
1180 "JobsCommandStatus" : "running"})
1182 # Find in each board the squares that needs to be filled regarding the
1183 # input csv files but that are not covered by a today job
1184 for board in self.d_input_boards.keys():
1185 xml_root_board = self.d_xml_board_files[board].xmlroot
1186 xml_missing = src.xmlManager.add_simple_node(xml_root_board,
1188 for row, column in self.d_input_boards[board]["jobs"]:
1191 if (job.application == column and
1192 job.machine.distribution == row):
1196 src.xmlManager.add_simple_node(xml_missing,
1198 attrib={"distribution" : row,
1199 "application" : column })
1201 def put_jobs_not_today(self, l_jobs_not_today, xml_node_jobs):
1202 '''Get all the first information needed for each file and write the
1203 first version of the files
1205 :param xml_node_jobs etree.Element: the node corresponding to a job
1206 :param l_jobs_not_today List: the list of jobs that do not run today
1208 for job in l_jobs_not_today:
1209 xmlj = src.xmlManager.add_simple_node(xml_node_jobs,
1211 attrib={"name" : job.name})
1212 src.xmlManager.add_simple_node(xmlj, "application", job.application)
1213 src.xmlManager.add_simple_node(xmlj,
1215 job.machine.distribution)
1216 src.xmlManager.add_simple_node(xmlj, "board", job.board)
1217 src.xmlManager.add_simple_node(xmlj,
1218 "commands", " ; ".join(job.commands))
1219 src.xmlManager.add_simple_node(xmlj, "state", "Not today")
1220 src.xmlManager.add_simple_node(xmlj, "machine", job.machine.name)
1221 src.xmlManager.add_simple_node(xmlj, "host", job.machine.host)
1222 src.xmlManager.add_simple_node(xmlj, "port", str(job.machine.port))
1223 src.xmlManager.add_simple_node(xmlj, "user", job.machine.user)
1224 src.xmlManager.add_simple_node(xmlj, "sat_path",
1225 job.machine.sat_path)
1227 def parse_csv_boards(self, today):
1228 """ Parse the csv file that describes the boards to produce and fill
1229 the dict d_input_boards that contain the csv file contain
1231 :param today int: the current day of the week
1233 # open the csv file and read its content
1235 with open(self.file_boards, 'r') as f:
1236 reader = csv.reader(f,delimiter=CSV_DELIMITER)
1239 # get the delimiter for the boards (empty line)
1240 boards_delimiter = [''] * len(l_read[0])
1241 # Make the list of boards, by splitting with the delimiter
1242 l_boards = [list(y) for x, y in itertools.groupby(l_read,
1243 lambda z: z == boards_delimiter) if not x]
1245 # loop over the csv lists of lines and get the rows, columns and jobs
1247 for input_board in l_boards:
1249 board_name = input_board[0][0]
1252 columns = input_board[0][1:]
1256 for line in input_board[1:]:
1258 for i, square in enumerate(line[1:]):
1261 days = square.split(DAYS_SEPARATOR)
1262 days = [int(day) for day in days]
1266 job = (row, columns[i])
1269 d_boards[board_name] = {"rows" : rows,
1270 "columns" : columns,
1273 self.d_input_boards = d_boards
1275 def update_xml_files(self, l_jobs):
1276 '''Write all the xml files with updated information about the jobs
1278 :param l_jobs List: the list of jobs that run today
1280 for xml_file in [self.xml_global_file] + list(
1281 self.d_xml_board_files.values()):
1282 self.update_xml_file(l_jobs, xml_file)
1285 self.write_xml_files()
1287 def update_xml_file(self, l_jobs, xml_file):
1288 '''update information about the jobs for the file xml_file
1290 :param l_jobs List: the list of jobs that run today
1291 :param xml_file xmlManager.XmlLogFile: the xml instance to update
1294 xml_node_jobs = xml_file.xmlroot.find('jobs')
1295 # Update the job names and status node
1297 # Find the node corresponding to the job and delete it
1298 # in order to recreate it
1299 for xmljob in xml_node_jobs.findall('job'):
1300 if xmljob.attrib['name'] == job.name:
1301 xml_node_jobs.remove(xmljob)
1305 T0 = time.strftime('%Y-%m-%d %H:%M:%S',
1306 time.localtime(job._T0))
1309 Tf = time.strftime('%Y-%m-%d %H:%M:%S',
1310 time.localtime(job._Tf))
1312 # recreate the job node
1313 xmlj = src.xmlManager.add_simple_node(xml_node_jobs,
1315 attrib={"name" : job.name})
1316 src.xmlManager.add_simple_node(xmlj, "machine", job.machine.name)
1317 src.xmlManager.add_simple_node(xmlj, "host", job.machine.host)
1318 src.xmlManager.add_simple_node(xmlj, "port", str(job.machine.port))
1319 src.xmlManager.add_simple_node(xmlj, "user", job.machine.user)
1320 src.xmlManager.add_simple_node(xmlj, "sat_path",
1321 job.machine.sat_path)
1322 src.xmlManager.add_simple_node(xmlj, "application", job.application)
1323 src.xmlManager.add_simple_node(xmlj, "distribution",
1324 job.machine.distribution)
1325 src.xmlManager.add_simple_node(xmlj, "board", job.board)
1326 src.xmlManager.add_simple_node(xmlj, "timeout", str(job.timeout))
1327 src.xmlManager.add_simple_node(xmlj, "commands",
1328 " ; ".join(job.commands))
1329 src.xmlManager.add_simple_node(xmlj, "state", job.get_status())
1330 src.xmlManager.add_simple_node(xmlj, "begin", T0)
1331 src.xmlManager.add_simple_node(xmlj, "end", Tf)
1332 src.xmlManager.add_simple_node(xmlj, "out",
1333 src.printcolors.cleancolor(job.out))
1334 src.xmlManager.add_simple_node(xmlj, "err",
1335 src.printcolors.cleancolor(job.err))
1336 src.xmlManager.add_simple_node(xmlj, "res", str(job.res_job))
1337 if len(job.remote_log_files) > 0:
1338 src.xmlManager.add_simple_node(xmlj,
1339 "remote_log_file_path",
1340 job.remote_log_files[0])
1342 src.xmlManager.add_simple_node(xmlj,
1343 "remote_log_file_path",
1346 xmlafter = src.xmlManager.add_simple_node(xmlj, "after", job.after)
1347 # get the job father
1348 if job.after is not None:
1351 if jb.name == job.after:
1354 if (job_father is not None and
1355 len(job_father.remote_log_files) > 0):
1356 link = job_father.remote_log_files[0]
1359 src.xmlManager.append_node_attrib(xmlafter, {"link" : link})
1361 # Verify that the job is to be done today regarding the input csv
1363 if job.board and job.board in self.d_input_boards.keys():
1365 for dist, appli in self.d_input_boards[job.board]["jobs"]:
1366 if (job.machine.distribution == dist
1367 and job.application == appli):
1369 src.xmlManager.add_simple_node(xmlj,
1374 src.xmlManager.add_simple_node(xmlj,
1380 xml_node_infos = xml_file.xmlroot.find('infos')
1381 src.xmlManager.append_node_attrib(xml_node_infos,
1383 datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")})
1387 def last_update(self, finish_status = "finished"):
1388 '''update information about the jobs for the file xml_file
1390 :param l_jobs List: the list of jobs that run today
1391 :param xml_file xmlManager.XmlLogFile: the xml instance to update
1393 for xml_file in [self.xml_global_file] + list(self.d_xml_board_files.values()):
1394 xml_node_infos = xml_file.xmlroot.find('infos')
1395 src.xmlManager.append_node_attrib(xml_node_infos,
1396 attrib={"JobsCommandStatus" : finish_status})
1398 self.write_xml_files()
1400 def write_xml_file(self, xml_file, stylesheet):
1401 ''' Write one xml file and the same file with prefix
1403 xml_file.write_tree(stylesheet)
1404 file_path = xml_file.logFile
1405 file_dir = os.path.dirname(file_path)
1406 file_name = os.path.basename(file_path)
1407 file_name_with_prefix = self.prefix + "_" + file_name
1408 xml_file.write_tree(stylesheet, os.path.join(file_dir,
1409 file_name_with_prefix))
1411 def write_xml_files(self):
1412 ''' Write the xml files
1414 self.write_xml_file(self.xml_global_file, STYLESHEET_GLOBAL)
1415 for xml_file in self.d_xml_board_files.values():
1416 self.write_xml_file(xml_file, STYLESHEET_BOARD)
1420 # Describes the command
1422 return _("The jobs command launches maintenances that are described"
1423 " in the dedicated jobs configuration file.")
1427 def run(args, runner, logger):
1429 (options, args) = parser.parse_args(args)
1431 l_cfg_dir = runner.cfg.PATHS.JOBPATH
1433 # list option : display all the available config files
1435 for cfg_dir in l_cfg_dir:
1436 if not options.no_label:
1437 logger.write("------ %s\n" %
1438 src.printcolors.printcHeader(cfg_dir))
1440 for f in sorted(os.listdir(cfg_dir)):
1441 if not f.endswith('.pyconf'):
1444 logger.write("%s\n" % cfilename)
1447 # Make sure the jobs_config option has been called
1448 if not options.jobs_cfg:
1449 message = _("The option --jobs_config is required\n")
1450 src.printcolors.printcError(message)
1453 # Find the file in the directories
1455 for cfg_dir in l_cfg_dir:
1456 file_jobs_cfg = os.path.join(cfg_dir, options.jobs_cfg)
1457 if not file_jobs_cfg.endswith('.pyconf'):
1458 file_jobs_cfg += '.pyconf'
1460 if not os.path.exists(file_jobs_cfg):
1467 msg = _("The file configuration %(name_file)s was not found."
1468 "\nUse the --list option to get the possible files.")
1469 src.printcolors.printcError(msg)
1473 (_("Platform"), runner.cfg.VARS.dist),
1474 (_("File containing the jobs configuration"), file_jobs_cfg)
1476 src.print_info(logger, info)
1478 # Read the config that is in the file
1479 config_jobs = src.read_config_from_a_file(file_jobs_cfg)
1480 if options.only_jobs:
1481 l_jb = src.pyconf.Sequence()
1482 for jb in config_jobs.jobs:
1483 if jb.name in options.only_jobs:
1485 "Adding a job that was given in only_jobs option parameters")
1486 config_jobs.jobs = l_jb
1489 today_jobs = Jobs(runner,
1493 # SSH connection to all machines
1494 today_jobs.ssh_connection_all_machines()
1495 if options.test_connection:
1500 # Copy the stylesheets in the log directory
1501 log_dir = runner.cfg.USER.log_dir
1502 xsl_dir = os.path.join(runner.cfg.VARS.srcDir, 'xsl')
1504 files_to_copy.append(os.path.join(xsl_dir, STYLESHEET_GLOBAL))
1505 files_to_copy.append(os.path.join(xsl_dir, STYLESHEET_BOARD))
1506 files_to_copy.append(os.path.join(xsl_dir, "running.gif"))
1507 for file_path in files_to_copy:
1508 shutil.copy2(file_path, log_dir)
1510 # Instanciate the Gui in order to produce the xml files that contain all
1512 gui = Gui(runner.cfg.USER.log_dir,
1514 today_jobs.ljobs_not_today,
1515 runner.cfg.VARS.datehour,
1516 file_boards = options.input_boards)
1518 # Display the list of the xml files
1519 logger.write(src.printcolors.printcInfo(("Here is the list of published"
1521 logger.write("%s\n" % gui.xml_global_file.logFile, 4)
1522 for board in gui.d_xml_board_files.keys():
1523 file_path = gui.d_xml_board_files[board].logFile
1524 file_name = os.path.basename(file_path)
1525 logger.write("%s\n" % file_path, 4)
1526 logger.add_link(file_name, "board", 0, board)
1528 logger.write("\n", 4)
1530 today_jobs.gui = gui
1534 # Run all the jobs contained in config_jobs
1535 today_jobs.run_jobs()
1536 except KeyboardInterrupt:
1538 logger.write("\n\n%s\n\n" %
1539 (src.printcolors.printcWarning(_("Forced interruption"))), 1)
1542 msg = _("Killing the running jobs and trying"
1543 " to get the corresponding logs\n")
1544 logger.write(src.printcolors.printcWarning(msg))
1546 # find the potential not finished jobs and kill them
1547 for jb in today_jobs.ljobs:
1548 if not jb.has_finished():
1550 jb.kill_remote_process()
1551 except Exception as e:
1552 msg = _("Failed to kill job %s: %s\n" % (jb.name, e))
1553 logger.write(src.printcolors.printcWarning(msg))
1556 today_jobs.gui.last_update(_("Forced interruption"))
1559 today_jobs.gui.last_update()
1560 # Output the results
1561 today_jobs.write_all_results()