3 # Copyright (C) 2010-2013 CEA/DEN
5 # This library is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation; either
8 # version 2.1 of the License.
10 # This library is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 # Lesser General Public License for more details.
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this library; if not, write to the Free Software
17 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
29 STYLESHEET_GLOBAL = "jobs_global_report.xsl"
30 STYLESHEET_BOARD = "jobs_board_report.xsl"
32 parser = src.options.Options()
34 parser.add_option('n', 'name', 'string', 'jobs_cfg',
35 _('The name of the config file that contains'
36 ' the jobs configuration'))
37 parser.add_option('o', 'only_jobs', 'list2', 'only_jobs',
38 _('Optional: the list of jobs to launch, by their name. '))
39 parser.add_option('l', 'list', 'boolean', 'list',
40 _('Optional: list all available config files.'))
41 parser.add_option('t', 'test_connection', 'boolean', 'test_connection',
42 _("Optional: try to connect to the machines. "
43 "Not executing the jobs."),
45 parser.add_option('p', 'publish', 'boolean', 'publish',
46 _("Optional: generate an xml file that can be read in a "
47 "browser to display the jobs status."),
49 parser.add_option('i', 'input_boards', 'string', 'input_boards', _("Optional: "
50 "the path to csv file that contain "
51 "the expected boards."),"")
52 parser.add_option('n', 'completion', 'boolean', 'no_label',
53 _("Optional (internal use): do not print labels, Works only "
57 class Machine(object):
58 '''Class to manage a ssh connection on a machine
66 sat_path="salomeTools"):
70 self.distribution = None # Will be filled after copying SAT on the machine
72 self.password = passwd
73 self.sat_path = sat_path
74 self.ssh = paramiko.SSHClient()
75 self._connection_successful = None
77 def connect(self, logger):
78 '''Initiate the ssh connection to the remote machine
80 :param logger src.logger.Logger: The logger instance
85 self._connection_successful = False
86 self.ssh.load_system_host_keys()
87 self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
89 self.ssh.connect(self.host,
92 password = self.password)
93 except paramiko.AuthenticationException:
94 message = src.KO_STATUS + _("Authentication failed")
95 except paramiko.BadHostKeyException:
96 message = (src.KO_STATUS +
97 _("The server's host key could not be verified"))
98 except paramiko.SSHException:
99 message = ( _("SSHException error connecting or "
100 "establishing an SSH session"))
102 message = ( _("Error connecting or establishing an SSH session"))
104 self._connection_successful = True
108 def successfully_connected(self, logger):
109 '''Verify if the connection to the remote machine has succeed
111 :param logger src.logger.Logger: The logger instance
112 :return: True if the connection has succeed, False if not
115 if self._connection_successful == None:
116 message = _("Warning : trying to ask if the connection to "
117 "(name: %s host: %s, port: %s, user: %s) is OK whereas there were"
118 " no connection request" %
119 (self.name, self.host, self.port, self.user))
120 logger.write( src.printcolors.printcWarning(message))
121 return self._connection_successful
123 def copy_sat(self, sat_local_path, job_file):
124 '''Copy salomeTools to the remote machine in self.sat_path
128 # open a sftp connection
129 self.sftp = self.ssh.open_sftp()
130 # Create the sat directory on remote machine if it is not existing
131 self.mkdir(self.sat_path, ignore_existing=True)
133 self.put_dir(sat_local_path, self.sat_path, filters = ['.git'])
134 # put the job configuration file in order to make it reachable
135 # on the remote machine
136 self.sftp.put(job_file, os.path.join(".salomeTools",
138 ".jobs_command_file.pyconf"))
139 except Exception as e:
141 self._connection_successful = False
145 def put_dir(self, source, target, filters = []):
146 ''' Uploads the contents of the source directory to the target path. The
147 target directory needs to exists. All sub-directories in source are
148 created under target.
150 for item in os.listdir(source):
153 source_path = os.path.join(source, item)
154 destination_path = os.path.join(target, item)
155 if os.path.islink(source_path):
156 linkto = os.readlink(source_path)
158 self.sftp.symlink(linkto, destination_path)
159 self.sftp.chmod(destination_path,
160 os.stat(source_path).st_mode)
164 if os.path.isfile(source_path):
165 self.sftp.put(source_path, destination_path)
166 self.sftp.chmod(destination_path,
167 os.stat(source_path).st_mode)
169 self.mkdir(destination_path, ignore_existing=True)
170 self.put_dir(source_path, destination_path)
172 def mkdir(self, path, mode=511, ignore_existing=False):
173 ''' Augments mkdir by adding an option to not fail
177 self.sftp.mkdir(path, mode)
184 def exec_command(self, command, logger):
185 '''Execute the command on the remote machine
187 :param command str: The command to be run
188 :param logger src.logger.Logger: The logger instance
189 :return: the stdin, stdout, and stderr of the executing command,
191 :rtype: (paramiko.channel.ChannelFile, paramiko.channel.ChannelFile,
192 paramiko.channel.ChannelFile)
195 # Does not wait the end of the command
196 (stdin, stdout, stderr) = self.ssh.exec_command(command)
197 except paramiko.SSHException:
198 message = src.KO_STATUS + _(
199 ": the server failed to execute the command\n")
200 logger.write( src.printcolors.printcError(message))
201 return (None, None, None)
203 logger.write( src.printcolors.printcError(src.KO_STATUS + '\n'))
204 return (None, None, None)
206 return (stdin, stdout, stderr)
209 '''Close the ssh connection
215 def write_info(self, logger):
216 '''Prints the informations relative to the machine in the logger
217 (terminal traces and log file)
219 :param logger src.logger.Logger: The logger instance
223 logger.write("host : " + self.host + "\n")
224 logger.write("port : " + str(self.port) + "\n")
225 logger.write("user : " + str(self.user) + "\n")
226 if self.successfully_connected(logger):
227 status = src.OK_STATUS
229 status = src.KO_STATUS
230 logger.write("Connection : " + status + "\n\n")
234 '''Class to manage one job
236 def __init__(self, name, machine, application, board,
237 commands, timeout, config, logger, after=None):
240 self.machine = machine
242 self.timeout = timeout
243 self.application = application
247 # The list of log files to download from the remote machine
248 self.remote_log_files = []
250 # The remote command status
251 # -1 means that it has not been launched,
252 # 0 means success and 1 means fail
254 self.cancelled = False
258 self._has_begun = False
259 self._has_finished = False
260 self._has_timouted = False
261 self._stdin = None # Store the command inputs field
262 self._stdout = None # Store the command outputs field
263 self._stderr = None # Store the command errors field
268 self.commands = commands
269 self.command = (os.path.join(self.machine.sat_path, "sat") +
271 os.path.join(self.machine.sat_path,
272 "list_log_files.txt") +
273 " job --jobs_config .jobs_command_file" +
278 """ Get the pid(s) corresponding to the command that have been launched
279 On the remote machine
281 :return: The list of integers corresponding to the found pids
285 cmd_pid = 'ps aux | grep "' + self.command + '" | awk \'{print $2}\''
286 (_, out_pid, _) = self.machine.exec_command(cmd_pid, self.logger)
287 pids_cmd = out_pid.readlines()
288 pids_cmd = [str(src.only_numbers(pid)) for pid in pids_cmd]
292 def kill_remote_process(self, wait=1):
293 '''Kills the process on the remote machine.
295 :return: (the output of the kill, the error of the kill)
299 pids = self.get_pids()
300 cmd_kill = " ; ".join([("kill -2 " + pid) for pid in pids])
301 (_, out_kill, err_kill) = self.machine.exec_command(cmd_kill,
304 return (out_kill, err_kill)
307 '''Returns True if the job has already begun
309 :return: True if the job has already begun
312 return self._has_begun
314 def has_finished(self):
315 '''Returns True if the job has already finished
316 (i.e. all the commands have been executed)
317 If it is finished, the outputs are stored in the fields out and err.
319 :return: True if the job has already finished
323 # If the method has already been called and returned True
324 if self._has_finished:
327 # If the job has not begun yet
328 if not self.has_begun():
331 if self._stdout.channel.closed:
332 self._has_finished = True
333 # Store the result outputs
334 self.out += self._stdout.read().decode()
335 self.err += self._stderr.read().decode()
337 self._Tf = time.time()
338 # And get the remote command status and log files
341 return self._has_finished
343 def get_log_files(self):
344 """Get the log files produced by the command launched
345 on the remote machine, and put it in the log directory of the user,
346 so they can be accessible from
348 # Do not get the files if the command is not finished
349 if not self.has_finished():
350 msg = _("Trying to get log files whereas the job is not finished.")
351 self.logger.write(src.printcolors.printcWarning(msg))
354 # First get the file that contains the list of log files to get
355 tmp_file_path = src.get_tmp_filename(self.config, "list_log_files.txt")
356 remote_path = os.path.join(self.machine.sat_path, "list_log_files.txt")
357 self.machine.sftp.get(
361 # Read the file and get the result of the command and all the log files
363 fstream_tmp = open(tmp_file_path, "r")
364 file_lines = fstream_tmp.readlines()
365 file_lines = [line.replace("\n", "") for line in file_lines]
367 os.remove(tmp_file_path)
370 # The first line is the result of the command (0 success or 1 fail)
371 self.res_job = file_lines[0]
372 except Exception as e:
373 self.err += _("Unable to get status from remote file %s: %s" %
374 (remote_path, str(e)))
376 for i, job_path_remote in enumerate(file_lines[1:]):
378 # For each command, there is two files to get :
379 # 1- The xml file describing the command and giving the
381 # 2- The txt file containing the system command traces (like
382 # traces produced by the "make" command)
383 # 3- In case of the test command, there is another file to get :
384 # the xml board that contain the test results
385 dirname = os.path.basename(os.path.dirname(job_path_remote))
386 if dirname != 'OUT' and dirname != 'TEST':
388 local_path = os.path.join(os.path.dirname(
389 self.logger.logFilePath),
390 os.path.basename(job_path_remote))
391 if i==0: # The first is the job command
392 self.logger.add_link(os.path.basename(job_path_remote),
396 elif dirname == 'OUT':
398 local_path = os.path.join(os.path.dirname(
399 self.logger.logFilePath),
401 os.path.basename(job_path_remote))
402 elif dirname == 'TEST':
404 local_path = os.path.join(os.path.dirname(
405 self.logger.logFilePath),
407 os.path.basename(job_path_remote))
410 if not os.path.exists(local_path):
411 self.machine.sftp.get(job_path_remote, local_path)
412 self.remote_log_files.append(local_path)
413 except Exception as e:
414 self.err += _("Unable to get %s log file from remote: %s" %
415 (job_path_remote, str(e)))
417 def has_failed(self):
418 '''Returns True if the job has failed.
419 A job is considered as failed if the machine could not be reached,
420 if the remote command failed,
421 or if the job finished with a time out.
423 :return: True if the job has failed
426 if not self.has_finished():
428 if not self.machine.successfully_connected(self.logger):
430 if self.is_timeout():
432 if self.res_job == "1":
437 """In case of a failing job, one has to cancel every job that depend
438 on it. This method put the job as failed and will not be executed.
440 self._has_begun = True
441 self._has_finished = True
442 self.cancelled = True
443 self.out += _("This job was not launched because its father has failed.")
444 self.err += _("This job was not launched because its father has failed.")
446 def is_running(self):
447 '''Returns True if the job commands are running
449 :return: True if the job is running
452 return self.has_begun() and not self.has_finished()
454 def is_timeout(self):
455 '''Returns True if the job commands has finished with timeout
457 :return: True if the job has finished with timeout
460 return self._has_timouted
462 def time_elapsed(self):
463 """Get the time elapsed since the job launching
465 :return: The number of seconds
468 if not self.has_begun():
471 return T_now - self._T0
473 def check_time(self):
474 """Verify that the job has not exceeded its timeout.
475 If it has, kill the remote command and consider the job as finished.
477 if not self.has_begun():
479 if self.time_elapsed() > self.timeout:
480 self._has_finished = True
481 self._has_timouted = True
482 self._Tf = time.time()
484 (out_kill, _) = self.kill_remote_process()
485 self.out += "TIMEOUT \n" + out_kill.read().decode()
486 self.err += "TIMEOUT : %s seconds elapsed\n" % str(self.timeout)
489 except Exception as e:
490 self.err += _("Unable to get remote log files: %s" % e)
492 def total_duration(self):
493 """Give the total duration of the job
495 :return: the total duration of the job in seconds
498 return self._Tf - self._T0
501 """Launch the job by executing the remote command.
504 # Prevent multiple run
506 msg = _("Warning: A job can only be launched one time")
507 msg2 = _("Trying to launch the job \"%s\" whereas it has "
508 "already been launched." % self.name)
509 self.logger.write(src.printcolors.printcWarning("%s\n%s\n" % (msg,
513 # Do not execute the command if the machine could not be reached
514 if not self.machine.successfully_connected(self.logger):
515 self._has_finished = True
517 self.err += ("Connection to machine (name : %s, host: %s, port:"
518 " %s, user: %s) has failed\nUse the log command "
519 "to get more information."
520 % (self.machine.name,
525 # Usual case : Launch the command on remote machine
526 self._T0 = time.time()
527 self._stdin, self._stdout, self._stderr = self.machine.exec_command(
530 # If the results are not initialized, finish the job
531 if (self._stdin, self._stdout, self._stderr) == (None, None, None):
532 self._has_finished = True
533 self._Tf = time.time()
535 self.err += "The server failed to execute the command"
537 # Put the beginning flag to true.
538 self._has_begun = True
540 def write_results(self):
541 """Display on the terminal all the job's information
543 self.logger.write("name : " + self.name + "\n")
545 self.logger.write("after : %s\n" % self.after)
546 self.logger.write("Time elapsed : %4imin %2is \n" %
547 (self.total_duration()//60 , self.total_duration()%60))
549 self.logger.write("Begin time : %s\n" %
550 time.strftime('%Y-%m-%d %H:%M:%S',
551 time.localtime(self._T0)) )
553 self.logger.write("End time : %s\n\n" %
554 time.strftime('%Y-%m-%d %H:%M:%S',
555 time.localtime(self._Tf)) )
557 machine_head = "Informations about connection :\n"
558 underline = (len(machine_head) - 2) * "-"
559 self.logger.write(src.printcolors.printcInfo(
560 machine_head+underline+"\n"))
561 self.machine.write_info(self.logger)
563 self.logger.write(src.printcolors.printcInfo("out : \n"))
565 self.logger.write("Unable to get output\n")
567 self.logger.write(self.out + "\n")
568 self.logger.write(src.printcolors.printcInfo("err : \n"))
569 self.logger.write(self.err + "\n")
571 def get_status(self):
572 """Get the status of the job (used by the Gui for xml display)
574 :return: The current status of the job
577 if not self.machine.successfully_connected(self.logger):
578 return "SSH connection KO"
579 if not self.has_begun():
580 return "Not launched"
583 if self.is_running():
584 return "running since " + time.strftime('%Y-%m-%d %H:%M:%S',
585 time.localtime(self._T0))
586 if self.has_finished():
587 if self.is_timeout():
588 return "Timeout since " + time.strftime('%Y-%m-%d %H:%M:%S',
589 time.localtime(self._Tf))
590 return "Finished since " + time.strftime('%Y-%m-%d %H:%M:%S',
591 time.localtime(self._Tf))
594 '''Class to manage the jobs to be run
601 lenght_columns = 20):
602 # The jobs configuration
603 self.cfg_jobs = config_jobs
604 self.job_file_path = job_file_path
605 # The machine that will be used today
607 # The list of machine (hosts, port) that will be used today
608 # (a same host can have several machine instances since there
609 # can be several ssh parameters)
611 # The jobs to be launched today
613 # The jobs that will not be launched today
614 self.ljobs_not_today = []
617 self.len_columns = lenght_columns
619 # the list of jobs that have not been run yet
620 self._l_jobs_not_started = []
621 # the list of jobs that have already ran
622 self._l_jobs_finished = []
623 # the list of jobs that are running
624 self._l_jobs_running = []
626 self.determine_jobs_and_machines()
628 def define_job(self, job_def, machine):
629 '''Takes a pyconf job definition and a machine (from class machine)
630 and returns the job instance corresponding to the definition.
632 :param job_def src.config.Mapping: a job definition
633 :param machine machine: the machine on which the job will run
634 :return: The corresponding job in a job class instance
638 cmmnds = job_def.commands
639 timeout = job_def.timeout
641 if 'after' in job_def:
642 after = job_def.after
644 if 'application' in job_def:
645 application = job_def.application
647 if 'board' in job_def:
648 board = job_def.board
660 def determine_jobs_and_machines(self):
661 '''Function that reads the pyconf jobs definition and instantiates all
662 the machines and jobs to be done today.
667 today = datetime.date.weekday(datetime.date.today())
670 for job_def in self.cfg_jobs.jobs :
672 if not "machine" in job_def:
673 msg = _('WARNING: The job "%s" do not have the key '
674 '"machine", this job is ignored.\n\n' % job_def.name)
675 self.logger.write(src.printcolors.printcWarning(msg))
677 name_machine = job_def.machine
680 for mach in self.lmachines:
681 if mach.name == name_machine:
685 if a_machine == None:
686 for machine_def in self.cfg_jobs.machines:
687 if machine_def.name == name_machine:
688 if 'host' not in machine_def:
689 host = self.runner.cfg.VARS.hostname
691 host = machine_def.host
693 if 'user' not in machine_def:
694 user = self.runner.cfg.VARS.user
696 user = machine_def.user
698 if 'port' not in machine_def:
701 port = machine_def.port
703 if 'password' not in machine_def:
706 passwd = machine_def.password
708 if 'sat_path' not in machine_def:
709 sat_path = "salomeTools"
711 sat_path = machine_def.sat_path
722 self.lmachines.append(a_machine)
723 if (host, port) not in host_list:
724 host_list.append((host, port))
726 if a_machine == None:
727 msg = _("WARNING: The job \"%(job_name)s\" requires the "
728 "machine \"%(machine_name)s\" but this machine "
729 "is not defined in the configuration file.\n"
730 "The job will not be launched")
731 self.logger.write(src.printcolors.printcWarning(msg))
733 a_job = self.define_job(job_def, a_machine)
735 if today in job_def.when:
736 self.ljobs.append(a_job)
737 else: # today in job_def.when
738 self.ljobs_not_today.append(a_job)
740 self.lhosts = host_list
742 def ssh_connection_all_machines(self, pad=50):
743 '''Function that do the ssh connection to every machine
749 self.logger.write(src.printcolors.printcInfo((
750 "Establishing connection with all the machines :\n")))
751 for machine in self.lmachines:
752 # little algorithm in order to display traces
753 begin_line = (_("Connection to %s: " % machine.name))
754 if pad - len(begin_line) < 0:
757 endline = (pad - len(begin_line)) * "." + " "
759 step = "SSH connection"
760 self.logger.write( begin_line + endline + step)
762 # the call to the method that initiate the ssh connection
763 msg = machine.connect(self.logger)
765 # Copy salomeTools to the remote machine
766 if machine.successfully_connected(self.logger):
768 self.logger.write('\r%s%s%s' % (begin_line, endline, 20 * " "),3)
769 self.logger.write('\r%s%s%s' % (begin_line, endline, step), 3)
771 res_copy = machine.copy_sat(self.runner.cfg.VARS.salometoolsway,
773 # get the remote machine distribution using a sat command
774 (__, out_dist, __) = machine.exec_command(
775 os.path.join(machine.sat_path,
776 "sat config --value VARS.dist --no_label"),
778 machine.distribution = out_dist.read().decode().replace("\n",
780 # Print the status of the copy
782 self.logger.write('\r%s' %
783 ((len(begin_line)+len(endline)+20) * " "), 3)
784 self.logger.write('\r%s%s%s' %
787 src.printcolors.printc(src.OK_STATUS)), 3)
789 self.logger.write('\r%s' %
790 ((len(begin_line)+len(endline)+20) * " "), 3)
791 self.logger.write('\r%s%s%s %s' %
794 src.printcolors.printc(src.OK_STATUS),
795 _("Copy of SAT failed")), 3)
797 self.logger.write('\r%s' %
798 ((len(begin_line)+len(endline)+20) * " "), 3)
799 self.logger.write('\r%s%s%s %s' %
802 src.printcolors.printc(src.KO_STATUS),
804 self.logger.write("\n", 3)
806 self.logger.write("\n")
809 def is_occupied(self, hostname):
810 '''Function that returns True if a job is running on
811 the machine defined by its host and its port.
813 :param hostname (str, int): the pair (host, port)
814 :return: the job that is running on the host,
815 or false if there is no job running on the host.
820 for jb in self.ljobs:
821 if jb.machine.host == host and jb.machine.port == port:
826 def update_jobs_states_list(self):
827 '''Function that updates the lists that store the currently
828 running jobs and the jobs that have already finished.
833 jobs_finished_list = []
834 jobs_running_list = []
835 for jb in self.ljobs:
837 jobs_running_list.append(jb)
839 if jb.has_finished():
840 jobs_finished_list.append(jb)
842 nb_job_finished_before = len(self._l_jobs_finished)
843 self._l_jobs_finished = jobs_finished_list
844 self._l_jobs_running = jobs_running_list
846 nb_job_finished_now = len(self._l_jobs_finished)
848 return nb_job_finished_now > nb_job_finished_before
850 def cancel_dependencies_of_failing_jobs(self):
851 '''Function that cancels all the jobs that depend on a failing one.
857 for job in self.ljobs:
858 if job.after is None:
860 father_job = self.find_job_that_has_name(job.after)
861 if father_job is not None and father_job.has_failed():
864 def find_job_that_has_name(self, name):
865 '''Returns the job by its name.
867 :param name str: a job name
868 :return: the job that has the name.
871 for jb in self.ljobs:
874 # the following is executed only if the job was not found
877 def str_of_length(self, text, length):
878 '''Takes a string text of any length and returns
879 the most close string of length "length".
881 :param text str: any string
882 :param length int: a length for the returned string
883 :return: the most close string of length "length"
886 if len(text) > length:
887 text_out = text[:length-3] + '...'
889 diff = length - len(text)
890 before = " " * (diff//2)
891 after = " " * (diff//2 + diff%2)
892 text_out = before + text + after
896 def display_status(self, len_col):
897 '''Takes a lenght and construct the display of the current status
898 of the jobs in an array that has a column for each host.
899 It displays the job that is currently running on the host
902 :param len_col int: the size of the column
908 for host_port in self.lhosts:
909 jb = self.is_occupied(host_port)
910 if not jb: # nothing running on the host
911 empty = self.str_of_length("empty", len_col)
912 display_line += "|" + empty
914 display_line += "|" + src.printcolors.printcInfo(
915 self.str_of_length(jb.name, len_col))
917 self.logger.write("\r" + display_line + "|")
922 '''The main method. Runs all the jobs on every host.
923 For each host, at a given time, only one job can be running.
924 The jobs that have the field after (that contain the job that has
925 to be run before it) are run after the previous job.
926 This method stops when all the jobs are finished.
933 self.logger.write(src.printcolors.printcInfo(
934 _('Executing the jobs :\n')))
936 for host_port in self.lhosts:
939 if port == 22: # default value
940 text_line += "|" + self.str_of_length(host, self.len_columns)
942 text_line += "|" + self.str_of_length(
943 "("+host+", "+str(port)+")", self.len_columns)
945 tiret_line = " " + "-"*(len(text_line)-1) + "\n"
946 self.logger.write(tiret_line)
947 self.logger.write(text_line + "|\n")
948 self.logger.write(tiret_line)
951 # The infinite loop that runs the jobs
952 l_jobs_not_started = src.deepcopy_list(self.ljobs)
953 while len(self._l_jobs_finished) != len(self.ljobs):
954 new_job_start = False
955 for host_port in self.lhosts:
957 if self.is_occupied(host_port):
960 for jb in l_jobs_not_started:
961 if (jb.machine.host, jb.machine.port) != host_port:
965 l_jobs_not_started.remove(jb)
969 jb_before = self.find_job_that_has_name(jb.after)
970 if jb_before is None:
972 msg = _("This job was not launched because its "
973 "father is not in the jobs list.")
977 if jb_before.has_finished():
979 l_jobs_not_started.remove(jb)
982 self.cancel_dependencies_of_failing_jobs()
983 new_job_finished = self.update_jobs_states_list()
985 if new_job_start or new_job_finished:
987 self.gui.update_xml_files(self.ljobs)
988 # Display the current status
989 self.display_status(self.len_columns)
991 # Make sure that the proc is not entirely busy
994 self.logger.write("\n")
995 self.logger.write(tiret_line)
996 self.logger.write("\n\n")
999 self.gui.update_xml_files(self.ljobs)
1000 self.gui.last_update()
1002 def write_all_results(self):
1003 '''Display all the jobs outputs.
1009 for jb in self.ljobs:
1010 self.logger.write(src.printcolors.printcLabel(
1011 "#------- Results for job %s -------#\n" % jb.name))
1013 self.logger.write("\n\n")
1016 '''Class to manage the the xml data that can be displayed in a browser to
1020 def __init__(self, xml_dir_path, l_jobs, l_jobs_not_today, file_boards=""):
1023 :param xml_dir_path str: The path to the directory where to put
1024 the xml resulting files
1025 :param l_jobs List: the list of jobs that run today
1026 :param l_jobs_not_today List: the list of jobs that do not run today
1027 :param file_boards str: the file path from which to read the
1030 # The path of the csv files to read to fill the expected boards
1031 self.file_boards = file_boards
1033 today = datetime.date.weekday(datetime.date.today())
1034 self.parse_csv_boards(today)
1036 # The path of the global xml file
1037 self.xml_dir_path = xml_dir_path
1038 # Initialize the xml files
1039 xml_global_path = os.path.join(self.xml_dir_path, "global_report.xml")
1040 self.xml_global_file = src.xmlManager.XmlLogFile(xml_global_path,
1042 # The xml files that corresponds to the boards.
1043 # {name_board : xml_object}}
1044 self.d_xml_board_files = {}
1045 # Create the lines and columns
1046 self.initialize_boards(l_jobs, l_jobs_not_today)
1048 # Write the xml file
1049 self.update_xml_files(l_jobs)
1051 def add_xml_board(self, name):
1052 xml_board_path = os.path.join(self.xml_dir_path, name + ".xml")
1053 self.d_xml_board_files[name] = src.xmlManager.XmlLogFile(
1056 self.d_xml_board_files[name].add_simple_node("distributions")
1057 self.d_xml_board_files[name].add_simple_node("applications")
1058 self.d_xml_board_files[name].add_simple_node("board", text=name)
1060 def initialize_boards(self, l_jobs, l_jobs_not_today):
1061 '''Get all the first information needed for each file and write the
1062 first version of the files
1063 :param l_jobs List: the list of jobs that run today
1064 :param l_jobs_not_today List: the list of jobs that do not run today
1066 # Get the boards to fill and put it in a dictionary
1067 # {board_name : xml instance corresponding to the board}
1068 for job in l_jobs + l_jobs_not_today:
1070 if (board is not None and
1071 board not in self.d_xml_board_files.keys()):
1072 self.add_xml_board(board)
1074 # Verify that the boards given as input are done
1075 for board in list(self.d_input_boards.keys()):
1076 if board not in self.d_xml_board_files:
1077 self.add_xml_board(board)
1079 # Loop over all jobs in order to get the lines and columns for each
1083 for board in self.d_xml_board_files:
1085 d_application[board] = []
1089 for job in l_jobs + l_jobs_not_today:
1091 if (job.machine.host, job.machine.port) not in l_hosts_ports:
1092 l_hosts_ports.append((job.machine.host, job.machine.port))
1094 distrib = job.machine.distribution
1095 application = job.application
1097 board_job = job.board
1100 for board in self.d_xml_board_files:
1101 if board_job == board:
1102 if distrib is not None and distrib not in d_dist[board]:
1103 d_dist[board].append(distrib)
1104 src.xmlManager.add_simple_node(
1105 self.d_xml_board_files[board].xmlroot.find(
1108 attrib={"name" : distrib})
1110 if board_job == board:
1111 if (application is not None and
1112 application not in d_application[board]):
1113 d_application[board].append(application)
1114 src.xmlManager.add_simple_node(
1115 self.d_xml_board_files[board].xmlroot.find(
1119 "name" : application})
1121 # Verify that there are no missing application or distribution in the
1122 # xml board files (regarding the input boards)
1123 for board in self.d_xml_board_files:
1124 l_dist = d_dist[board]
1125 if board not in self.d_input_boards.keys():
1127 for dist in self.d_input_boards[board]["rows"]:
1128 if dist not in l_dist:
1129 src.xmlManager.add_simple_node(
1130 self.d_xml_board_files[board].xmlroot.find(
1133 attrib={"name" : dist})
1134 l_appli = d_application[board]
1135 for appli in self.d_input_boards[board]["columns"]:
1136 if appli not in l_appli:
1137 src.xmlManager.add_simple_node(
1138 self.d_xml_board_files[board].xmlroot.find(
1141 attrib={"name" : appli})
1143 # Initialize the hosts_ports node for the global file
1144 self.xmlhosts_ports = self.xml_global_file.add_simple_node(
1146 for host, port in l_hosts_ports:
1147 host_port = "%s:%i" % (host, port)
1148 src.xmlManager.add_simple_node(self.xmlhosts_ports,
1150 attrib={"name" : host_port})
1152 # Initialize the jobs node in all files
1153 for xml_file in [self.xml_global_file] + list(
1154 self.d_xml_board_files.values()):
1155 xml_jobs = xml_file.add_simple_node("jobs")
1156 # Get the jobs present in the config file but
1157 # that will not be launched today
1158 self.put_jobs_not_today(l_jobs_not_today, xml_jobs)
1160 xml_file.add_simple_node("infos",
1161 attrib={"name" : "last update",
1162 "JobsCommandStatus" : "running"})
1164 # Find in each board the squares that needs to be filled regarding the
1165 # input csv files but that are not covered by a today job
1166 for board in self.d_input_boards.keys():
1167 xml_root_board = self.d_xml_board_files[board].xmlroot
1168 xml_missing = src.xmlManager.add_simple_node(xml_root_board,
1170 for row, column in self.d_input_boards[board]["jobs"]:
1173 if (job.application == column and
1174 job.machine.distribution == row):
1178 src.xmlManager.add_simple_node(xml_missing,
1180 attrib={"distribution" : row,
1181 "application" : column })
1183 def put_jobs_not_today(self, l_jobs_not_today, xml_node_jobs):
1184 '''Get all the first information needed for each file and write the
1185 first version of the files
1187 :param xml_node_jobs etree.Element: the node corresponding to a job
1188 :param l_jobs_not_today List: the list of jobs that do not run today
1190 for job in l_jobs_not_today:
1191 xmlj = src.xmlManager.add_simple_node(xml_node_jobs,
1193 attrib={"name" : job.name})
1194 src.xmlManager.add_simple_node(xmlj, "application", job.application)
1195 src.xmlManager.add_simple_node(xmlj,
1197 job.machine.distribution)
1198 src.xmlManager.add_simple_node(xmlj, "board", job.board)
1199 src.xmlManager.add_simple_node(xmlj,
1200 "commands", " ; ".join(job.commands))
1201 src.xmlManager.add_simple_node(xmlj, "state", "Not today")
1202 src.xmlManager.add_simple_node(xmlj, "machine", job.machine.name)
1203 src.xmlManager.add_simple_node(xmlj, "host", job.machine.host)
1204 src.xmlManager.add_simple_node(xmlj, "port", str(job.machine.port))
1205 src.xmlManager.add_simple_node(xmlj, "user", job.machine.user)
1206 src.xmlManager.add_simple_node(xmlj, "sat_path",
1207 job.machine.sat_path)
1209 def parse_csv_boards(self, today):
1210 """ Parse the csv files that describes the boards to produce and fill
1211 the dict d_input_boards that contain the csv file contain
1213 :param today int: the current day of the week
1215 # loop over each csv file and read its content
1217 with open(self.file_boards, 'r') as f:
1218 reader = csv.reader(f,delimiter=';')
1221 # get the delimiter for the boards (empty line)
1222 boards_delimiter = [''] * len(l_read[0])
1223 # Make the list of boards, by splitting with the delimiter
1224 l_boards = [list(y) for x, y in itertools.groupby(l_read,
1225 lambda z: z == boards_delimiter) if not x]
1227 # loop over the csv lists of lines and get the rows, columns and jobs
1229 for input_board in l_boards:
1231 board_name = input_board[0][0]
1234 columns = input_board[0][1:]
1239 for line in input_board[1:]:
1241 for i, square in enumerate(line[1:]):
1244 days = square.split(',')
1245 days = [int(day) for day in days]
1249 if columns[i] not in columns_out:
1250 columns_out.append(columns[i])
1251 job = (row, columns[i])
1254 d_boards[board_name] = {"rows" : rows,
1255 "columns" : columns_out,
1258 self.d_input_boards = d_boards
1260 def update_xml_files(self, l_jobs):
1261 '''Write all the xml files with updated information about the jobs
1263 :param l_jobs List: the list of jobs that run today
1265 for xml_file in [self.xml_global_file] + list(
1266 self.d_xml_board_files.values()):
1267 self.update_xml_file(l_jobs, xml_file)
1270 self.write_xml_files()
1272 def update_xml_file(self, l_jobs, xml_file):
1273 '''update information about the jobs for the file xml_file
1275 :param l_jobs List: the list of jobs that run today
1276 :param xml_file xmlManager.XmlLogFile: the xml instance to update
1279 xml_node_jobs = xml_file.xmlroot.find('jobs')
1280 # Update the job names and status node
1282 # Find the node corresponding to the job and delete it
1283 # in order to recreate it
1284 for xmljob in xml_node_jobs.findall('job'):
1285 if xmljob.attrib['name'] == job.name:
1286 xml_node_jobs.remove(xmljob)
1290 T0 = time.strftime('%Y-%m-%d %H:%M:%S',
1291 time.localtime(job._T0))
1294 Tf = time.strftime('%Y-%m-%d %H:%M:%S',
1295 time.localtime(job._Tf))
1297 # recreate the job node
1298 xmlj = src.xmlManager.add_simple_node(xml_node_jobs,
1300 attrib={"name" : job.name})
1301 src.xmlManager.add_simple_node(xmlj, "machine", job.machine.name)
1302 src.xmlManager.add_simple_node(xmlj, "host", job.machine.host)
1303 src.xmlManager.add_simple_node(xmlj, "port", str(job.machine.port))
1304 src.xmlManager.add_simple_node(xmlj, "user", job.machine.user)
1305 src.xmlManager.add_simple_node(xmlj, "sat_path",
1306 job.machine.sat_path)
1307 src.xmlManager.add_simple_node(xmlj, "application", job.application)
1308 src.xmlManager.add_simple_node(xmlj, "distribution",
1309 job.machine.distribution)
1310 src.xmlManager.add_simple_node(xmlj, "board", job.board)
1311 src.xmlManager.add_simple_node(xmlj, "timeout", str(job.timeout))
1312 src.xmlManager.add_simple_node(xmlj, "commands",
1313 " ; ".join(job.commands))
1314 src.xmlManager.add_simple_node(xmlj, "state", job.get_status())
1315 src.xmlManager.add_simple_node(xmlj, "begin", T0)
1316 src.xmlManager.add_simple_node(xmlj, "end", Tf)
1317 src.xmlManager.add_simple_node(xmlj, "out",
1318 src.printcolors.cleancolor(job.out))
1319 src.xmlManager.add_simple_node(xmlj, "err",
1320 src.printcolors.cleancolor(job.err))
1321 src.xmlManager.add_simple_node(xmlj, "res", str(job.res_job))
1322 if len(job.remote_log_files) > 0:
1323 src.xmlManager.add_simple_node(xmlj,
1324 "remote_log_file_path",
1325 job.remote_log_files[0])
1327 src.xmlManager.add_simple_node(xmlj,
1328 "remote_log_file_path",
1331 xmlafter = src.xmlManager.add_simple_node(xmlj, "after", job.after)
1332 # get the job father
1333 if job.after is not None:
1336 if jb.name == job.after:
1339 if (job_father is not None and
1340 len(job_father.remote_log_files) > 0):
1341 link = job_father.remote_log_files[0]
1344 src.xmlManager.append_node_attrib(xmlafter, {"link" : link})
1346 # Verify that the job is to be done today regarding the input csv
1348 if job.board and job.board in self.d_input_boards.keys():
1350 for dist, appli in self.d_input_boards[job.board]["jobs"]:
1351 if (job.machine.distribution == dist
1352 and job.application == appli):
1354 src.xmlManager.add_simple_node(xmlj,
1359 src.xmlManager.add_simple_node(xmlj,
1365 xml_node_infos = xml_file.xmlroot.find('infos')
1366 src.xmlManager.append_node_attrib(xml_node_infos,
1368 datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")})
1372 def last_update(self, finish_status = "finished"):
1373 '''update information about the jobs for the file xml_file
1375 :param l_jobs List: the list of jobs that run today
1376 :param xml_file xmlManager.XmlLogFile: the xml instance to update
1378 for xml_file in [self.xml_global_file] + list(self.d_xml_board_files.values()):
1379 xml_node_infos = xml_file.xmlroot.find('infos')
1380 src.xmlManager.append_node_attrib(xml_node_infos,
1381 attrib={"JobsCommandStatus" : finish_status})
1383 self.write_xml_files()
1385 def write_xml_files(self):
1386 ''' Write the xml files
1388 self.xml_global_file.write_tree(STYLESHEET_GLOBAL)
1389 for xml_file in self.d_xml_board_files.values():
1390 xml_file.write_tree(STYLESHEET_BOARD)
1393 # Describes the command
1395 return _("The jobs command launches maintenances that are described"
1396 " in the dedicated jobs configuration file.")
1400 def run(args, runner, logger):
1402 (options, args) = parser.parse_args(args)
1404 l_cfg_dir = runner.cfg.PATHS.JOBPATH
1406 # list option : display all the available config files
1408 for cfg_dir in l_cfg_dir:
1409 if not options.no_label:
1410 logger.write("------ %s\n" %
1411 src.printcolors.printcHeader(cfg_dir))
1413 for f in sorted(os.listdir(cfg_dir)):
1414 if not f.endswith('.pyconf'):
1417 logger.write("%s\n" % cfilename)
1420 # Make sure the jobs_config option has been called
1421 if not options.jobs_cfg:
1422 message = _("The option --jobs_config is required\n")
1423 src.printcolors.printcError(message)
1426 # Find the file in the directories
1428 for cfg_dir in l_cfg_dir:
1429 file_jobs_cfg = os.path.join(cfg_dir, options.jobs_cfg)
1430 if not file_jobs_cfg.endswith('.pyconf'):
1431 file_jobs_cfg += '.pyconf'
1433 if not os.path.exists(file_jobs_cfg):
1440 msg = _("The file configuration %(name_file)s was not found."
1441 "\nUse the --list option to get the possible files.")
1442 src.printcolors.printcError(msg)
1446 (_("Platform"), runner.cfg.VARS.dist),
1447 (_("File containing the jobs configuration"), file_jobs_cfg)
1449 src.print_info(logger, info)
1451 # Read the config that is in the file
1452 config_jobs = src.read_config_from_a_file(file_jobs_cfg)
1453 if options.only_jobs:
1454 l_jb = src.pyconf.Sequence()
1455 for jb in config_jobs.jobs:
1456 if jb.name in options.only_jobs:
1458 "Adding a job that was given in only_jobs option parameters")
1459 config_jobs.jobs = l_jb
1462 today_jobs = Jobs(runner,
1466 # SSH connection to all machines
1467 today_jobs.ssh_connection_all_machines()
1468 if options.test_connection:
1473 # Copy the stylesheets in the log directory
1474 log_dir = runner.cfg.SITE.log.log_dir
1475 xsl_dir = os.path.join(runner.cfg.VARS.srcDir, 'xsl')
1477 files_to_copy.append(os.path.join(xsl_dir, STYLESHEET_GLOBAL))
1478 files_to_copy.append(os.path.join(xsl_dir, STYLESHEET_BOARD))
1479 files_to_copy.append(os.path.join(xsl_dir, "running.gif"))
1480 for file_path in files_to_copy:
1481 shutil.copy2(file_path, log_dir)
1483 # Instanciate the Gui in order to produce the xml files that contain all
1485 gui = Gui(runner.cfg.SITE.log.log_dir,
1487 today_jobs.ljobs_not_today,
1488 file_boards = options.input_boards)
1490 # Display the list of the xml files
1491 logger.write(src.printcolors.printcInfo(("Here is the list of published"
1493 logger.write("%s\n" % gui.xml_global_file.logFile, 4)
1494 for board in gui.d_xml_board_files.keys():
1495 logger.write("%s\n" % gui.d_xml_board_files[board].logFile, 4)
1497 logger.write("\n", 4)
1499 today_jobs.gui = gui
1503 # Run all the jobs contained in config_jobs
1504 today_jobs.run_jobs()
1505 except KeyboardInterrupt:
1507 logger.write("\n\n%s\n\n" %
1508 (src.printcolors.printcWarning(_("Forced interruption"))), 1)
1511 msg = _("Killing the running jobs and trying"
1512 " to get the corresponding logs\n")
1513 logger.write(src.printcolors.printcWarning(msg))
1515 # find the potential not finished jobs and kill them
1516 for jb in today_jobs.ljobs:
1517 if not jb.has_finished():
1519 jb.kill_remote_process()
1520 except Exception as e:
1521 msg = _("Failed to kill job %s: %s\n" % (jb.name, e))
1522 logger.write(src.printcolors.printcWarning(msg))
1525 today_jobs.gui.last_update(_("Forced interruption"))
1528 today_jobs.gui.last_update()
1529 # Output the results
1530 today_jobs.write_all_results()