3 # Copyright (C) 2010-2013 CEA/DEN
5 # This library is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation; either
8 # version 2.1 of the License.
10 # This library is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 # Lesser General Public License for more details.
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this library; if not, write to the Free Software
17 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
30 STYLESHEET_GLOBAL = "jobs_global_report.xsl"
31 STYLESHEET_BOARD = "jobs_board_report.xsl"
36 parser = src.options.Options()
38 parser.add_option('n', 'name', 'string', 'jobs_cfg',
39 _('Mandatory: The name of the config file that contains'
40 ' the jobs configuration'))
41 parser.add_option('o', 'only_jobs', 'list2', 'only_jobs',
42 _('Optional: the list of jobs to launch, by their name. '))
43 parser.add_option('l', 'list', 'boolean', 'list',
44 _('Optional: list all available config files.'))
45 parser.add_option('t', 'test_connection', 'boolean', 'test_connection',
46 _("Optional: try to connect to the machines. "
47 "Not executing the jobs."),
49 parser.add_option('p', 'publish', 'boolean', 'publish',
50 _("Optional: generate an xml file that can be read in a "
51 "browser to display the jobs status."),
53 parser.add_option('i', 'input_boards', 'string', 'input_boards', _("Optional: "
54 "the path to csv file that contain "
55 "the expected boards."),"")
56 parser.add_option('', 'completion', 'boolean', 'no_label',
57 _("Optional (internal use): do not print labels, Works only "
61 class Machine(object):
62 '''Class to manage a ssh connection on a machine
70 sat_path="salomeTools"):
74 self.distribution = None # Will be filled after copying SAT on the machine
76 self.password = passwd
77 self.sat_path = sat_path
78 self.ssh = paramiko.SSHClient()
79 self._connection_successful = None
81 def connect(self, logger):
82 '''Initiate the ssh connection to the remote machine
84 :param logger src.logger.Logger: The logger instance
89 self._connection_successful = False
90 self.ssh.load_system_host_keys()
91 self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
93 self.ssh.connect(self.host,
96 password = self.password)
97 except paramiko.AuthenticationException:
98 message = src.KO_STATUS + _("Authentication failed")
99 except paramiko.BadHostKeyException:
100 message = (src.KO_STATUS +
101 _("The server's host key could not be verified"))
102 except paramiko.SSHException:
103 message = ( _("SSHException error connecting or "
104 "establishing an SSH session"))
106 message = ( _("Error connecting or establishing an SSH session"))
108 self._connection_successful = True
112 def successfully_connected(self, logger):
113 '''Verify if the connection to the remote machine has succeed
115 :param logger src.logger.Logger: The logger instance
116 :return: True if the connection has succeed, False if not
119 if self._connection_successful == None:
120 message = _("Warning : trying to ask if the connection to "
121 "(name: %s host: %s, port: %s, user: %s) is OK whereas there were"
122 " no connection request" %
123 (self.name, self.host, self.port, self.user))
124 logger.write( src.printcolors.printcWarning(message))
125 return self._connection_successful
127 def copy_sat(self, sat_local_path, job_file):
128 '''Copy salomeTools to the remote machine in self.sat_path
132 # open a sftp connection
133 self.sftp = self.ssh.open_sftp()
134 # Create the sat directory on remote machine if it is not existing
135 self.mkdir(self.sat_path, ignore_existing=True)
137 self.put_dir(sat_local_path, self.sat_path, filters = ['.git'])
138 # put the job configuration file in order to make it reachable
139 # on the remote machine
140 self.sftp.put(job_file, os.path.join(".salomeTools",
142 ".jobs_command_file.pyconf"))
143 except Exception as e:
145 self._connection_successful = False
149 def put_dir(self, source, target, filters = []):
150 ''' Uploads the contents of the source directory to the target path. The
151 target directory needs to exists. All sub-directories in source are
152 created under target.
154 for item in os.listdir(source):
157 source_path = os.path.join(source, item)
158 destination_path = os.path.join(target, item)
159 if os.path.islink(source_path):
160 linkto = os.readlink(source_path)
162 self.sftp.symlink(linkto, destination_path)
163 self.sftp.chmod(destination_path,
164 os.stat(source_path).st_mode)
168 if os.path.isfile(source_path):
169 self.sftp.put(source_path, destination_path)
170 self.sftp.chmod(destination_path,
171 os.stat(source_path).st_mode)
173 self.mkdir(destination_path, ignore_existing=True)
174 self.put_dir(source_path, destination_path)
176 def mkdir(self, path, mode=511, ignore_existing=False):
177 ''' Augments mkdir by adding an option to not fail
181 self.sftp.mkdir(path, mode)
188 def exec_command(self, command, logger):
189 '''Execute the command on the remote machine
191 :param command str: The command to be run
192 :param logger src.logger.Logger: The logger instance
193 :return: the stdin, stdout, and stderr of the executing command,
195 :rtype: (paramiko.channel.ChannelFile, paramiko.channel.ChannelFile,
196 paramiko.channel.ChannelFile)
199 # Does not wait the end of the command
200 (stdin, stdout, stderr) = self.ssh.exec_command(command)
201 except paramiko.SSHException:
202 message = src.KO_STATUS + _(
203 ": the server failed to execute the command\n")
204 logger.write( src.printcolors.printcError(message))
205 return (None, None, None)
207 logger.write( src.printcolors.printcError(src.KO_STATUS + '\n'))
208 return (None, None, None)
210 return (stdin, stdout, stderr)
213 '''Close the ssh connection
219 def write_info(self, logger):
220 '''Prints the informations relative to the machine in the logger
221 (terminal traces and log file)
223 :param logger src.logger.Logger: The logger instance
227 logger.write("host : " + self.host + "\n")
228 logger.write("port : " + str(self.port) + "\n")
229 logger.write("user : " + str(self.user) + "\n")
230 if self.successfully_connected(logger):
231 status = src.OK_STATUS
233 status = src.KO_STATUS
234 logger.write("Connection : " + status + "\n\n")
238 '''Class to manage one job
240 def __init__(self, name, machine, application, board,
241 commands, timeout, config, logger, after=None):
244 self.machine = machine
246 self.timeout = timeout
247 self.application = application
251 # The list of log files to download from the remote machine
252 self.remote_log_files = []
254 # The remote command status
255 # -1 means that it has not been launched,
256 # 0 means success and 1 means fail
258 self.cancelled = False
262 self._has_begun = False
263 self._has_finished = False
264 self._has_timouted = False
265 self._stdin = None # Store the command inputs field
266 self._stdout = None # Store the command outputs field
267 self._stderr = None # Store the command errors field
272 self.commands = commands
273 self.command = (os.path.join(self.machine.sat_path, "sat") +
275 os.path.join(self.machine.sat_path,
276 "list_log_files.txt") +
277 " job --jobs_config .jobs_command_file" +
282 """ Get the pid(s) corresponding to the command that have been launched
283 On the remote machine
285 :return: The list of integers corresponding to the found pids
289 cmd_pid = 'ps aux | grep "' + self.command + '" | awk \'{print $2}\''
290 (_, out_pid, _) = self.machine.exec_command(cmd_pid, self.logger)
291 pids_cmd = out_pid.readlines()
292 pids_cmd = [str(src.only_numbers(pid)) for pid in pids_cmd]
296 def kill_remote_process(self, wait=1):
297 '''Kills the process on the remote machine.
299 :return: (the output of the kill, the error of the kill)
303 pids = self.get_pids()
304 cmd_kill = " ; ".join([("kill -2 " + pid) for pid in pids])
305 (_, out_kill, err_kill) = self.machine.exec_command(cmd_kill,
308 return (out_kill, err_kill)
311 '''Returns True if the job has already begun
313 :return: True if the job has already begun
316 return self._has_begun
318 def has_finished(self):
319 '''Returns True if the job has already finished
320 (i.e. all the commands have been executed)
321 If it is finished, the outputs are stored in the fields out and err.
323 :return: True if the job has already finished
327 # If the method has already been called and returned True
328 if self._has_finished:
331 # If the job has not begun yet
332 if not self.has_begun():
335 if self._stdout.channel.closed:
336 self._has_finished = True
337 # Store the result outputs
338 self.out += self._stdout.read().decode()
339 self.err += self._stderr.read().decode()
341 self._Tf = time.time()
342 # And get the remote command status and log files
345 return self._has_finished
347 def get_log_files(self):
348 """Get the log files produced by the command launched
349 on the remote machine, and put it in the log directory of the user,
350 so they can be accessible from
352 # Do not get the files if the command is not finished
353 if not self.has_finished():
354 msg = _("Trying to get log files whereas the job is not finished.")
355 self.logger.write(src.printcolors.printcWarning(msg))
358 # First get the file that contains the list of log files to get
359 tmp_file_path = src.get_tmp_filename(self.config, "list_log_files.txt")
360 remote_path = os.path.join(self.machine.sat_path, "list_log_files.txt")
361 self.machine.sftp.get(
365 # Read the file and get the result of the command and all the log files
367 fstream_tmp = open(tmp_file_path, "r")
368 file_lines = fstream_tmp.readlines()
369 file_lines = [line.replace("\n", "") for line in file_lines]
371 os.remove(tmp_file_path)
374 # The first line is the result of the command (0 success or 1 fail)
375 self.res_job = file_lines[0]
376 except Exception as e:
377 self.err += _("Unable to get status from remote file %s: %s" %
378 (remote_path, str(e)))
380 for i, job_path_remote in enumerate(file_lines[1:]):
382 # For each command, there is two files to get :
383 # 1- The xml file describing the command and giving the
385 # 2- The txt file containing the system command traces (like
386 # traces produced by the "make" command)
387 # 3- In case of the test command, there is another file to get :
388 # the xml board that contain the test results
389 dirname = os.path.basename(os.path.dirname(job_path_remote))
390 if dirname != 'OUT' and dirname != 'TEST':
392 local_path = os.path.join(os.path.dirname(
393 self.logger.logFilePath),
394 os.path.basename(job_path_remote))
395 if i==0: # The first is the job command
396 self.logger.add_link(os.path.basename(job_path_remote),
400 elif dirname == 'OUT':
402 local_path = os.path.join(os.path.dirname(
403 self.logger.logFilePath),
405 os.path.basename(job_path_remote))
406 elif dirname == 'TEST':
408 local_path = os.path.join(os.path.dirname(
409 self.logger.logFilePath),
411 os.path.basename(job_path_remote))
414 if not os.path.exists(local_path):
415 self.machine.sftp.get(job_path_remote, local_path)
416 self.remote_log_files.append(local_path)
417 except Exception as e:
418 self.err += _("Unable to get %s log file from remote: %s" %
419 (str(job_path_remote),
422 def has_failed(self):
423 '''Returns True if the job has failed.
424 A job is considered as failed if the machine could not be reached,
425 if the remote command failed,
426 or if the job finished with a time out.
428 :return: True if the job has failed
431 if not self.has_finished():
433 if not self.machine.successfully_connected(self.logger):
435 if self.is_timeout():
437 if self.res_job == "1":
442 """In case of a failing job, one has to cancel every job that depend
443 on it. This method put the job as failed and will not be executed.
447 self._has_begun = True
448 self._has_finished = True
449 self.cancelled = True
450 self.out += _("This job was not launched because its father has failed.")
451 self.err += _("This job was not launched because its father has failed.")
453 def is_running(self):
454 '''Returns True if the job commands are running
456 :return: True if the job is running
459 return self.has_begun() and not self.has_finished()
461 def is_timeout(self):
462 '''Returns True if the job commands has finished with timeout
464 :return: True if the job has finished with timeout
467 return self._has_timouted
469 def time_elapsed(self):
470 """Get the time elapsed since the job launching
472 :return: The number of seconds
475 if not self.has_begun():
478 return T_now - self._T0
480 def check_time(self):
481 """Verify that the job has not exceeded its timeout.
482 If it has, kill the remote command and consider the job as finished.
484 if not self.has_begun():
486 if self.time_elapsed() > self.timeout:
487 self._has_finished = True
488 self._has_timouted = True
489 self._Tf = time.time()
491 (out_kill, _) = self.kill_remote_process()
492 self.out += "TIMEOUT \n" + out_kill.read().decode()
493 self.err += "TIMEOUT : %s seconds elapsed\n" % str(self.timeout)
496 except Exception as e:
497 self.err += _("Unable to get remote log files: %s" % e)
499 def total_duration(self):
500 """Give the total duration of the job
502 :return: the total duration of the job in seconds
505 return self._Tf - self._T0
508 """Launch the job by executing the remote command.
511 # Prevent multiple run
513 msg = _("Warning: A job can only be launched one time")
514 msg2 = _("Trying to launch the job \"%s\" whereas it has "
515 "already been launched." % self.name)
516 self.logger.write(src.printcolors.printcWarning("%s\n%s\n" % (msg,
520 # Do not execute the command if the machine could not be reached
521 if not self.machine.successfully_connected(self.logger):
522 self._has_finished = True
524 self.err += ("Connection to machine (name : %s, host: %s, port:"
525 " %s, user: %s) has failed\nUse the log command "
526 "to get more information."
527 % (self.machine.name,
532 # Usual case : Launch the command on remote machine
533 self._T0 = time.time()
534 self._stdin, self._stdout, self._stderr = self.machine.exec_command(
537 # If the results are not initialized, finish the job
538 if (self._stdin, self._stdout, self._stderr) == (None, None, None):
539 self._has_finished = True
540 self._Tf = time.time()
542 self.err += "The server failed to execute the command"
544 # Put the beginning flag to true.
545 self._has_begun = True
547 def write_results(self):
548 """Display on the terminal all the job's information
550 self.logger.write("name : " + self.name + "\n")
552 self.logger.write("after : %s\n" % self.after)
553 self.logger.write("Time elapsed : %4imin %2is \n" %
554 (self.total_duration()//60 , self.total_duration()%60))
556 self.logger.write("Begin time : %s\n" %
557 time.strftime('%Y-%m-%d %H:%M:%S',
558 time.localtime(self._T0)) )
560 self.logger.write("End time : %s\n\n" %
561 time.strftime('%Y-%m-%d %H:%M:%S',
562 time.localtime(self._Tf)) )
564 machine_head = "Informations about connection :\n"
565 underline = (len(machine_head) - 2) * "-"
566 self.logger.write(src.printcolors.printcInfo(
567 machine_head+underline+"\n"))
568 self.machine.write_info(self.logger)
570 self.logger.write(src.printcolors.printcInfo("out : \n"))
572 self.logger.write("Unable to get output\n")
574 self.logger.write(self.out + "\n")
575 self.logger.write(src.printcolors.printcInfo("err : \n"))
576 self.logger.write(self.err + "\n")
578 def get_status(self):
579 """Get the status of the job (used by the Gui for xml display)
581 :return: The current status of the job
584 if not self.machine.successfully_connected(self.logger):
585 return "SSH connection KO"
586 if not self.has_begun():
587 return "Not launched"
590 if self.is_running():
591 return "running since " + time.strftime('%Y-%m-%d %H:%M:%S',
592 time.localtime(self._T0))
593 if self.has_finished():
594 if self.is_timeout():
595 return "Timeout since " + time.strftime('%Y-%m-%d %H:%M:%S',
596 time.localtime(self._Tf))
597 return "Finished since " + time.strftime('%Y-%m-%d %H:%M:%S',
598 time.localtime(self._Tf))
601 '''Class to manage the jobs to be run
608 lenght_columns = 20):
609 # The jobs configuration
610 self.cfg_jobs = config_jobs
611 self.job_file_path = job_file_path
612 # The machine that will be used today
614 # The list of machine (hosts, port) that will be used today
615 # (a same host can have several machine instances since there
616 # can be several ssh parameters)
618 # The jobs to be launched today
620 # The jobs that will not be launched today
621 self.ljobs_not_today = []
624 self.len_columns = lenght_columns
626 # the list of jobs that have not been run yet
627 self._l_jobs_not_started = []
628 # the list of jobs that have already ran
629 self._l_jobs_finished = []
630 # the list of jobs that are running
631 self._l_jobs_running = []
633 self.determine_jobs_and_machines()
635 def define_job(self, job_def, machine):
636 '''Takes a pyconf job definition and a machine (from class machine)
637 and returns the job instance corresponding to the definition.
639 :param job_def src.config.Mapping: a job definition
640 :param machine machine: the machine on which the job will run
641 :return: The corresponding job in a job class instance
645 cmmnds = job_def.commands
646 if not "timeout" in job_def:
647 timeout = 4*60*60 # default timeout = 4h
649 timeout = job_def.timeout
651 if 'after' in job_def:
652 after = job_def.after
654 if 'application' in job_def:
655 application = job_def.application
657 if 'board' in job_def:
658 board = job_def.board
670 def determine_jobs_and_machines(self):
671 '''Function that reads the pyconf jobs definition and instantiates all
672 the machines and jobs to be done today.
677 today = datetime.date.weekday(datetime.date.today())
680 for job_def in self.cfg_jobs.jobs :
682 if not "machine" in job_def:
683 msg = _('WARNING: The job "%s" do not have the key '
684 '"machine", this job is ignored.\n\n' % job_def.name)
685 self.logger.write(src.printcolors.printcWarning(msg))
687 name_machine = job_def.machine
690 for mach in self.lmachines:
691 if mach.name == name_machine:
695 if a_machine == None:
696 for machine_def in self.cfg_jobs.machines:
697 if machine_def.name == name_machine:
698 if 'host' not in machine_def:
699 host = self.runner.cfg.VARS.hostname
701 host = machine_def.host
703 if 'user' not in machine_def:
704 user = self.runner.cfg.VARS.user
706 user = machine_def.user
708 if 'port' not in machine_def:
711 port = machine_def.port
713 if 'password' not in machine_def:
716 passwd = machine_def.password
718 if 'sat_path' not in machine_def:
719 sat_path = "salomeTools"
721 sat_path = machine_def.sat_path
732 self.lmachines.append(a_machine)
733 if (host, port) not in host_list:
734 host_list.append((host, port))
736 if a_machine == None:
737 msg = _("WARNING: The job \"%(job_name)s\" requires the "
738 "machine \"%(machine_name)s\" but this machine "
739 "is not defined in the configuration file.\n"
740 "The job will not be launched")
741 self.logger.write(src.printcolors.printcWarning(msg))
743 a_job = self.define_job(job_def, a_machine)
745 if today in job_def.when:
746 self.ljobs.append(a_job)
747 else: # today in job_def.when
748 self.ljobs_not_today.append(a_job)
750 self.lhosts = host_list
752 def ssh_connection_all_machines(self, pad=50):
753 '''Function that do the ssh connection to every machine
759 self.logger.write(src.printcolors.printcInfo((
760 "Establishing connection with all the machines :\n")))
761 for machine in self.lmachines:
762 # little algorithm in order to display traces
763 begin_line = (_("Connection to %s: " % machine.name))
764 if pad - len(begin_line) < 0:
767 endline = (pad - len(begin_line)) * "." + " "
769 step = "SSH connection"
770 self.logger.write( begin_line + endline + step)
772 # the call to the method that initiate the ssh connection
773 msg = machine.connect(self.logger)
775 # Copy salomeTools to the remote machine
776 if machine.successfully_connected(self.logger):
778 self.logger.write('\r%s%s%s' % (begin_line, endline, 20 * " "),3)
779 self.logger.write('\r%s%s%s' % (begin_line, endline, step), 3)
781 res_copy = machine.copy_sat(self.runner.cfg.VARS.salometoolsway,
783 # get the remote machine distribution using a sat command
784 (__, out_dist, __) = machine.exec_command(
785 os.path.join(machine.sat_path,
786 "sat config --value VARS.dist --no_label"),
788 machine.distribution = out_dist.read().decode().replace("\n",
790 # Print the status of the copy
792 self.logger.write('\r%s' %
793 ((len(begin_line)+len(endline)+20) * " "), 3)
794 self.logger.write('\r%s%s%s' %
797 src.printcolors.printc(src.OK_STATUS)), 3)
799 self.logger.write('\r%s' %
800 ((len(begin_line)+len(endline)+20) * " "), 3)
801 self.logger.write('\r%s%s%s %s' %
804 src.printcolors.printc(src.KO_STATUS),
805 _("Copy of SAT failed: %s" % res_copy)), 3)
807 self.logger.write('\r%s' %
808 ((len(begin_line)+len(endline)+20) * " "), 3)
809 self.logger.write('\r%s%s%s %s' %
812 src.printcolors.printc(src.KO_STATUS),
814 self.logger.write("\n", 3)
816 self.logger.write("\n")
819 def is_occupied(self, hostname):
820 '''Function that returns True if a job is running on
821 the machine defined by its host and its port.
823 :param hostname (str, int): the pair (host, port)
824 :return: the job that is running on the host,
825 or false if there is no job running on the host.
830 for jb in self.ljobs:
831 if jb.machine.host == host and jb.machine.port == port:
836 def update_jobs_states_list(self):
837 '''Function that updates the lists that store the currently
838 running jobs and the jobs that have already finished.
843 jobs_finished_list = []
844 jobs_running_list = []
845 for jb in self.ljobs:
847 jobs_running_list.append(jb)
849 if jb.has_finished():
850 jobs_finished_list.append(jb)
852 nb_job_finished_before = len(self._l_jobs_finished)
853 self._l_jobs_finished = jobs_finished_list
854 self._l_jobs_running = jobs_running_list
856 nb_job_finished_now = len(self._l_jobs_finished)
858 return nb_job_finished_now > nb_job_finished_before
860 def cancel_dependencies_of_failing_jobs(self):
861 '''Function that cancels all the jobs that depend on a failing one.
867 for job in self.ljobs:
868 if job.after is None:
870 father_job = self.find_job_that_has_name(job.after)
871 if father_job is not None and father_job.has_failed():
874 def find_job_that_has_name(self, name):
875 '''Returns the job by its name.
877 :param name str: a job name
878 :return: the job that has the name.
881 for jb in self.ljobs:
884 # the following is executed only if the job was not found
887 def str_of_length(self, text, length):
888 '''Takes a string text of any length and returns
889 the most close string of length "length".
891 :param text str: any string
892 :param length int: a length for the returned string
893 :return: the most close string of length "length"
896 if len(text) > length:
897 text_out = text[:length-3] + '...'
899 diff = length - len(text)
900 before = " " * (diff//2)
901 after = " " * (diff//2 + diff%2)
902 text_out = before + text + after
906 def display_status(self, len_col):
907 '''Takes a lenght and construct the display of the current status
908 of the jobs in an array that has a column for each host.
909 It displays the job that is currently running on the host
912 :param len_col int: the size of the column
918 for host_port in self.lhosts:
919 jb = self.is_occupied(host_port)
920 if not jb: # nothing running on the host
921 empty = self.str_of_length("empty", len_col)
922 display_line += "|" + empty
924 display_line += "|" + src.printcolors.printcInfo(
925 self.str_of_length(jb.name, len_col))
927 self.logger.write("\r" + display_line + "|")
932 '''The main method. Runs all the jobs on every host.
933 For each host, at a given time, only one job can be running.
934 The jobs that have the field after (that contain the job that has
935 to be run before it) are run after the previous job.
936 This method stops when all the jobs are finished.
943 self.logger.write(src.printcolors.printcInfo(
944 _('Executing the jobs :\n')))
946 for host_port in self.lhosts:
949 if port == 22: # default value
950 text_line += "|" + self.str_of_length(host, self.len_columns)
952 text_line += "|" + self.str_of_length(
953 "("+host+", "+str(port)+")", self.len_columns)
955 tiret_line = " " + "-"*(len(text_line)-1) + "\n"
956 self.logger.write(tiret_line)
957 self.logger.write(text_line + "|\n")
958 self.logger.write(tiret_line)
961 # The infinite loop that runs the jobs
962 l_jobs_not_started = src.deepcopy_list(self.ljobs)
963 while len(self._l_jobs_finished) != len(self.ljobs):
964 new_job_start = False
965 for host_port in self.lhosts:
967 if self.is_occupied(host_port):
970 for jb in l_jobs_not_started:
971 if (jb.machine.host, jb.machine.port) != host_port:
975 l_jobs_not_started.remove(jb)
979 jb_before = self.find_job_that_has_name(jb.after)
980 if jb_before is None:
982 msg = _("This job was not launched because its "
983 "father is not in the jobs list.")
987 if jb_before.has_finished():
989 l_jobs_not_started.remove(jb)
992 self.cancel_dependencies_of_failing_jobs()
993 new_job_finished = self.update_jobs_states_list()
995 if new_job_start or new_job_finished:
997 self.gui.update_xml_files(self.ljobs)
998 # Display the current status
999 self.display_status(self.len_columns)
1001 # Make sure that the proc is not entirely busy
1004 self.logger.write("\n")
1005 self.logger.write(tiret_line)
1006 self.logger.write("\n\n")
1009 self.gui.update_xml_files(self.ljobs)
1010 self.gui.last_update()
1012 def write_all_results(self):
1013 '''Display all the jobs outputs.
1019 for jb in self.ljobs:
1020 self.logger.write(src.printcolors.printcLabel(
1021 "#------- Results for job %s -------#\n" % jb.name))
1023 self.logger.write("\n\n")
1026 '''Class to manage the the xml data that can be displayed in a browser to
1030 def __init__(self, xml_dir_path, l_jobs, l_jobs_not_today, prefix, file_boards=""):
1033 :param xml_dir_path str: The path to the directory where to put
1034 the xml resulting files
1035 :param l_jobs List: the list of jobs that run today
1036 :param l_jobs_not_today List: the list of jobs that do not run today
1037 :param file_boards str: the file path from which to read the
1040 # The prefix to add to the xml files : date_hour
1041 self.prefix = prefix
1043 # The path of the csv files to read to fill the expected boards
1044 self.file_boards = file_boards
1046 if file_boards != "":
1047 today = datetime.date.weekday(datetime.date.today())
1048 self.parse_csv_boards(today)
1050 self.d_input_boards = {}
1052 # The path of the global xml file
1053 self.xml_dir_path = xml_dir_path
1054 # Initialize the xml files
1055 self.global_name = "global_report"
1056 xml_global_path = os.path.join(self.xml_dir_path,
1057 self.global_name + ".xml")
1058 self.xml_global_file = src.xmlManager.XmlLogFile(xml_global_path,
1061 # Find history for each job
1063 self.find_history(l_jobs, l_jobs_not_today)
1065 # The xml files that corresponds to the boards.
1066 # {name_board : xml_object}}
1067 self.d_xml_board_files = {}
1069 # Create the lines and columns
1070 self.initialize_boards(l_jobs, l_jobs_not_today)
1072 # Write the xml file
1073 self.update_xml_files(l_jobs)
1075 def add_xml_board(self, name):
1076 '''Add a board to the board list
1077 :param name str: the board name
1079 xml_board_path = os.path.join(self.xml_dir_path, name + ".xml")
1080 self.d_xml_board_files[name] = src.xmlManager.XmlLogFile(
1083 self.d_xml_board_files[name].add_simple_node("distributions")
1084 self.d_xml_board_files[name].add_simple_node("applications")
1085 self.d_xml_board_files[name].add_simple_node("board", text=name)
1087 def initialize_boards(self, l_jobs, l_jobs_not_today):
1088 '''Get all the first information needed for each file and write the
1089 first version of the files
1090 :param l_jobs List: the list of jobs that run today
1091 :param l_jobs_not_today List: the list of jobs that do not run today
1093 # Get the boards to fill and put it in a dictionary
1094 # {board_name : xml instance corresponding to the board}
1095 for job in l_jobs + l_jobs_not_today:
1097 if (board is not None and
1098 board not in self.d_xml_board_files.keys()):
1099 self.add_xml_board(board)
1101 # Verify that the boards given as input are done
1102 for board in list(self.d_input_boards.keys()):
1103 if board not in self.d_xml_board_files:
1104 self.add_xml_board(board)
1105 root_node = self.d_xml_board_files[board].xmlroot
1106 src.xmlManager.append_node_attrib(root_node,
1107 {"input_file" : self.file_boards})
1109 # Loop over all jobs in order to get the lines and columns for each
1113 for board in self.d_xml_board_files:
1115 d_application[board] = []
1119 for job in l_jobs + l_jobs_not_today:
1121 if (job.machine.host, job.machine.port) not in l_hosts_ports:
1122 l_hosts_ports.append((job.machine.host, job.machine.port))
1124 distrib = job.machine.distribution
1125 application = job.application
1127 board_job = job.board
1130 for board in self.d_xml_board_files:
1131 if board_job == board:
1132 if distrib is not None and distrib not in d_dist[board]:
1133 d_dist[board].append(distrib)
1134 src.xmlManager.add_simple_node(
1135 self.d_xml_board_files[board].xmlroot.find(
1138 attrib={"name" : distrib})
1140 if board_job == board:
1141 if (application is not None and
1142 application not in d_application[board]):
1143 d_application[board].append(application)
1144 src.xmlManager.add_simple_node(
1145 self.d_xml_board_files[board].xmlroot.find(
1149 "name" : application})
1151 # Verify that there are no missing application or distribution in the
1152 # xml board files (regarding the input boards)
1153 for board in self.d_xml_board_files:
1154 l_dist = d_dist[board]
1155 if board not in self.d_input_boards.keys():
1157 for dist in self.d_input_boards[board]["rows"]:
1158 if dist not in l_dist:
1159 src.xmlManager.add_simple_node(
1160 self.d_xml_board_files[board].xmlroot.find(
1163 attrib={"name" : dist})
1164 l_appli = d_application[board]
1165 for appli in self.d_input_boards[board]["columns"]:
1166 if appli not in l_appli:
1167 src.xmlManager.add_simple_node(
1168 self.d_xml_board_files[board].xmlroot.find(
1171 attrib={"name" : appli})
1173 # Initialize the hosts_ports node for the global file
1174 self.xmlhosts_ports = self.xml_global_file.add_simple_node(
1176 for host, port in l_hosts_ports:
1177 host_port = "%s:%i" % (host, port)
1178 src.xmlManager.add_simple_node(self.xmlhosts_ports,
1180 attrib={"name" : host_port})
1182 # Initialize the jobs node in all files
1183 for xml_file in [self.xml_global_file] + list(
1184 self.d_xml_board_files.values()):
1185 xml_jobs = xml_file.add_simple_node("jobs")
1186 # Get the jobs present in the config file but
1187 # that will not be launched today
1188 self.put_jobs_not_today(l_jobs_not_today, xml_jobs)
1190 # add also the infos node
1191 xml_file.add_simple_node("infos",
1192 attrib={"name" : "last update",
1193 "JobsCommandStatus" : "running"})
1195 # and put the history node
1196 history_node = xml_file.add_simple_node("history")
1197 name_board = os.path.basename(xml_file.logFile)[:-len(".xml")]
1198 # serach for board files
1199 expression = "^[0-9]{8}_+[0-9]{6}_" + name_board + ".xml$"
1200 oExpr = re.compile(expression)
1201 # Get the list of xml borad files that are in the log directory
1202 for file_name in os.listdir(self.xml_dir_path):
1203 if oExpr.search(file_name):
1204 date = os.path.basename(file_name).split("_")[0]
1205 file_path = os.path.join(self.xml_dir_path, file_name)
1206 src.xmlManager.add_simple_node(history_node,
1209 attrib={"date" : date})
1212 # Find in each board the squares that needs to be filled regarding the
1213 # input csv files but that are not covered by a today job
1214 for board in self.d_input_boards.keys():
1215 xml_root_board = self.d_xml_board_files[board].xmlroot
1216 xml_missing = src.xmlManager.add_simple_node(xml_root_board,
1218 for row, column in self.d_input_boards[board]["jobs"]:
1221 if (job.application == column and
1222 job.machine.distribution == row):
1226 src.xmlManager.add_simple_node(xml_missing,
1228 attrib={"distribution" : row,
1229 "application" : column })
1231 def find_history(self, l_jobs, l_jobs_not_today):
1232 """find, for each job, in the existent xml boards the results for the
1233 job. Store the results in the dictionnary self.history = {name_job :
1234 list of (date, status, list links)}
1236 :param l_jobs List: the list of jobs to run today
1237 :param l_jobs_not_today List: the list of jobs that do not run today
1239 # load the all the history
1240 expression = "^[0-9]{8}_+[0-9]{6}_" + self.global_name + ".xml$"
1241 oExpr = re.compile(expression)
1242 # Get the list of global xml that are in the log directory
1244 for file_name in os.listdir(self.xml_dir_path):
1245 if oExpr.search(file_name):
1246 file_path = os.path.join(self.xml_dir_path, file_name)
1247 global_xml = src.xmlManager.ReadXmlFile(file_path)
1248 l_globalxml.append(global_xml)
1250 # Construct the dictionnary self.history
1251 for job in l_jobs + l_jobs_not_today:
1253 for global_xml in l_globalxml:
1254 date = os.path.basename(global_xml.filePath).split("_")[0]
1255 global_root_node = global_xml.xmlroot.find("jobs")
1256 job_node = src.xmlManager.find_node_by_attrib(
1262 if job_node.find("remote_log_file_path") is not None:
1263 link = job_node.find("remote_log_file_path").text
1264 res_job = job_node.find("res").text
1265 if link != "nothing":
1266 l_links.append((date, res_job, link))
1268 self.history[job.name] = l_links
1270 def put_jobs_not_today(self, l_jobs_not_today, xml_node_jobs):
1271 '''Get all the first information needed for each file and write the
1272 first version of the files
1274 :param xml_node_jobs etree.Element: the node corresponding to a job
1275 :param l_jobs_not_today List: the list of jobs that do not run today
1277 for job in l_jobs_not_today:
1278 xmlj = src.xmlManager.add_simple_node(xml_node_jobs,
1280 attrib={"name" : job.name})
1281 src.xmlManager.add_simple_node(xmlj, "application", job.application)
1282 src.xmlManager.add_simple_node(xmlj,
1284 job.machine.distribution)
1285 src.xmlManager.add_simple_node(xmlj, "board", job.board)
1286 src.xmlManager.add_simple_node(xmlj,
1287 "commands", " ; ".join(job.commands))
1288 src.xmlManager.add_simple_node(xmlj, "state", "Not today")
1289 src.xmlManager.add_simple_node(xmlj, "machine", job.machine.name)
1290 src.xmlManager.add_simple_node(xmlj, "host", job.machine.host)
1291 src.xmlManager.add_simple_node(xmlj, "port", str(job.machine.port))
1292 src.xmlManager.add_simple_node(xmlj, "user", job.machine.user)
1293 src.xmlManager.add_simple_node(xmlj, "sat_path",
1294 job.machine.sat_path)
1295 xml_history = src.xmlManager.add_simple_node(xmlj, "history")
1296 for date, res_job, link in self.history[job.name]:
1297 src.xmlManager.add_simple_node(xml_history,
1300 attrib={"date" : date,
1303 def parse_csv_boards(self, today):
1304 """ Parse the csv file that describes the boards to produce and fill
1305 the dict d_input_boards that contain the csv file contain
1307 :param today int: the current day of the week
1309 # open the csv file and read its content
1311 with open(self.file_boards, 'r') as f:
1312 reader = csv.reader(f,delimiter=CSV_DELIMITER)
1315 # get the delimiter for the boards (empty line)
1316 boards_delimiter = [''] * len(l_read[0])
1317 # Make the list of boards, by splitting with the delimiter
1318 l_boards = [list(y) for x, y in itertools.groupby(l_read,
1319 lambda z: z == boards_delimiter) if not x]
1321 # loop over the csv lists of lines and get the rows, columns and jobs
1323 for input_board in l_boards:
1325 board_name = input_board[0][0]
1328 columns = input_board[0][1:]
1332 for line in input_board[1:]:
1334 for i, square in enumerate(line[1:]):
1337 days = square.split(DAYS_SEPARATOR)
1338 days = [int(day) for day in days]
1342 job = (row, columns[i])
1345 d_boards[board_name] = {"rows" : rows,
1346 "columns" : columns,
1349 self.d_input_boards = d_boards
1351 def update_xml_files(self, l_jobs):
1352 '''Write all the xml files with updated information about the jobs
1354 :param l_jobs List: the list of jobs that run today
1356 for xml_file in [self.xml_global_file] + list(
1357 self.d_xml_board_files.values()):
1358 self.update_xml_file(l_jobs, xml_file)
1361 self.write_xml_files()
1363 def update_xml_file(self, l_jobs, xml_file):
1364 '''update information about the jobs for the file xml_file
1366 :param l_jobs List: the list of jobs that run today
1367 :param xml_file xmlManager.XmlLogFile: the xml instance to update
1370 xml_node_jobs = xml_file.xmlroot.find('jobs')
1371 # Update the job names and status node
1373 # Find the node corresponding to the job and delete it
1374 # in order to recreate it
1375 for xmljob in xml_node_jobs.findall('job'):
1376 if xmljob.attrib['name'] == job.name:
1377 xml_node_jobs.remove(xmljob)
1381 T0 = time.strftime('%Y-%m-%d %H:%M:%S',
1382 time.localtime(job._T0))
1385 Tf = time.strftime('%Y-%m-%d %H:%M:%S',
1386 time.localtime(job._Tf))
1388 # recreate the job node
1389 xmlj = src.xmlManager.add_simple_node(xml_node_jobs,
1391 attrib={"name" : job.name})
1392 src.xmlManager.add_simple_node(xmlj, "machine", job.machine.name)
1393 src.xmlManager.add_simple_node(xmlj, "host", job.machine.host)
1394 src.xmlManager.add_simple_node(xmlj, "port", str(job.machine.port))
1395 src.xmlManager.add_simple_node(xmlj, "user", job.machine.user)
1396 xml_history = src.xmlManager.add_simple_node(xmlj, "history")
1397 for date, res_job, link in self.history[job.name]:
1398 src.xmlManager.add_simple_node(xml_history,
1401 attrib={"date" : date,
1404 src.xmlManager.add_simple_node(xmlj, "sat_path",
1405 job.machine.sat_path)
1406 src.xmlManager.add_simple_node(xmlj, "application", job.application)
1407 src.xmlManager.add_simple_node(xmlj, "distribution",
1408 job.machine.distribution)
1409 src.xmlManager.add_simple_node(xmlj, "board", job.board)
1410 src.xmlManager.add_simple_node(xmlj, "timeout", str(job.timeout))
1411 src.xmlManager.add_simple_node(xmlj, "commands",
1412 " ; ".join(job.commands))
1413 src.xmlManager.add_simple_node(xmlj, "state", job.get_status())
1414 src.xmlManager.add_simple_node(xmlj, "begin", T0)
1415 src.xmlManager.add_simple_node(xmlj, "end", Tf)
1416 src.xmlManager.add_simple_node(xmlj, "out",
1417 src.printcolors.cleancolor(job.out))
1418 src.xmlManager.add_simple_node(xmlj, "err",
1419 src.printcolors.cleancolor(job.err))
1420 src.xmlManager.add_simple_node(xmlj, "res", str(job.res_job))
1421 if len(job.remote_log_files) > 0:
1422 src.xmlManager.add_simple_node(xmlj,
1423 "remote_log_file_path",
1424 job.remote_log_files[0])
1426 src.xmlManager.add_simple_node(xmlj,
1427 "remote_log_file_path",
1430 xmlafter = src.xmlManager.add_simple_node(xmlj, "after", job.after)
1431 # get the job father
1432 if job.after is not None:
1435 if jb.name == job.after:
1438 if (job_father is not None and
1439 len(job_father.remote_log_files) > 0):
1440 link = job_father.remote_log_files[0]
1443 src.xmlManager.append_node_attrib(xmlafter, {"link" : link})
1445 # Verify that the job is to be done today regarding the input csv
1447 if job.board and job.board in self.d_input_boards.keys():
1449 for dist, appli in self.d_input_boards[job.board]["jobs"]:
1450 if (job.machine.distribution == dist
1451 and job.application == appli):
1453 src.xmlManager.add_simple_node(xmlj,
1458 src.xmlManager.add_simple_node(xmlj,
1464 xml_node_infos = xml_file.xmlroot.find('infos')
1465 src.xmlManager.append_node_attrib(xml_node_infos,
1467 datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")})
1471 def last_update(self, finish_status = "finished"):
1472 '''update information about the jobs for the file xml_file
1474 :param l_jobs List: the list of jobs that run today
1475 :param xml_file xmlManager.XmlLogFile: the xml instance to update
1477 for xml_file in [self.xml_global_file] + list(self.d_xml_board_files.values()):
1478 xml_node_infos = xml_file.xmlroot.find('infos')
1479 src.xmlManager.append_node_attrib(xml_node_infos,
1480 attrib={"JobsCommandStatus" : finish_status})
1482 self.write_xml_files()
1484 def write_xml_file(self, xml_file, stylesheet):
1485 ''' Write one xml file and the same file with prefix
1487 xml_file.write_tree(stylesheet)
1488 file_path = xml_file.logFile
1489 file_dir = os.path.dirname(file_path)
1490 file_name = os.path.basename(file_path)
1491 file_name_with_prefix = self.prefix + "_" + file_name
1492 xml_file.write_tree(stylesheet, os.path.join(file_dir,
1493 file_name_with_prefix))
1495 def write_xml_files(self):
1496 ''' Write the xml files
1498 self.write_xml_file(self.xml_global_file, STYLESHEET_GLOBAL)
1499 for xml_file in self.d_xml_board_files.values():
1500 self.write_xml_file(xml_file, STYLESHEET_BOARD)
1504 # Describes the command
1506 return _("The jobs command launches maintenances that are described"
1507 " in the dedicated jobs configuration file.\n\nexample:\nsat "
1508 "jobs --name my_jobs --publish")
1512 def run(args, runner, logger):
1514 (options, args) = parser.parse_args(args)
1516 l_cfg_dir = runner.cfg.PATHS.JOBPATH
1518 # list option : display all the available config files
1520 for cfg_dir in l_cfg_dir:
1521 if not options.no_label:
1522 logger.write("------ %s\n" %
1523 src.printcolors.printcHeader(cfg_dir))
1525 for f in sorted(os.listdir(cfg_dir)):
1526 if not f.endswith('.pyconf'):
1529 logger.write("%s\n" % cfilename)
1532 # Make sure the jobs_config option has been called
1533 if not options.jobs_cfg:
1534 message = _("The option --jobs_config is required\n")
1535 src.printcolors.printcError(message)
1538 # Find the file in the directories
1540 for cfg_dir in l_cfg_dir:
1541 file_jobs_cfg = os.path.join(cfg_dir, options.jobs_cfg)
1542 if not file_jobs_cfg.endswith('.pyconf'):
1543 file_jobs_cfg += '.pyconf'
1545 if not os.path.exists(file_jobs_cfg):
1552 msg = _("The file configuration %(name_file)s was not found."
1553 "\nUse the --list option to get the possible files.")
1554 src.printcolors.printcError(msg)
1558 (_("Platform"), runner.cfg.VARS.dist),
1559 (_("File containing the jobs configuration"), file_jobs_cfg)
1561 src.print_info(logger, info)
1563 # Read the config that is in the file
1564 config_jobs = src.read_config_from_a_file(file_jobs_cfg)
1565 if options.only_jobs:
1566 l_jb = src.pyconf.Sequence()
1567 for jb in config_jobs.jobs:
1568 if jb.name in options.only_jobs:
1570 "Adding a job that was given in only_jobs option parameters")
1571 config_jobs.jobs = l_jb
1574 today_jobs = Jobs(runner,
1578 # SSH connection to all machines
1579 today_jobs.ssh_connection_all_machines()
1580 if options.test_connection:
1585 # Copy the stylesheets in the log directory
1586 log_dir = runner.cfg.USER.log_dir
1587 xsl_dir = os.path.join(runner.cfg.VARS.srcDir, 'xsl')
1589 files_to_copy.append(os.path.join(xsl_dir, STYLESHEET_GLOBAL))
1590 files_to_copy.append(os.path.join(xsl_dir, STYLESHEET_BOARD))
1591 files_to_copy.append(os.path.join(xsl_dir, "running.gif"))
1592 for file_path in files_to_copy:
1593 shutil.copy2(file_path, log_dir)
1595 # Instanciate the Gui in order to produce the xml files that contain all
1597 gui = Gui(runner.cfg.USER.log_dir,
1599 today_jobs.ljobs_not_today,
1600 runner.cfg.VARS.datehour,
1601 file_boards = options.input_boards)
1603 # Display the list of the xml files
1604 logger.write(src.printcolors.printcInfo(("Here is the list of published"
1606 logger.write("%s\n" % gui.xml_global_file.logFile, 4)
1607 for board in gui.d_xml_board_files.keys():
1608 file_path = gui.d_xml_board_files[board].logFile
1609 file_name = os.path.basename(file_path)
1610 logger.write("%s\n" % file_path, 4)
1611 logger.add_link(file_name, "board", 0, board)
1613 logger.write("\n", 4)
1615 today_jobs.gui = gui
1619 # Run all the jobs contained in config_jobs
1620 today_jobs.run_jobs()
1621 except KeyboardInterrupt:
1623 logger.write("\n\n%s\n\n" %
1624 (src.printcolors.printcWarning(_("Forced interruption"))), 1)
1627 msg = _("Killing the running jobs and trying"
1628 " to get the corresponding logs\n")
1629 logger.write(src.printcolors.printcWarning(msg))
1631 # find the potential not finished jobs and kill them
1632 for jb in today_jobs.ljobs:
1633 if not jb.has_finished():
1635 jb.kill_remote_process()
1636 except Exception as e:
1637 msg = _("Failed to kill job %s: %s\n" % (jb.name, e))
1638 logger.write(src.printcolors.printcWarning(msg))
1641 today_jobs.gui.last_update(_("Forced interruption"))
1644 today_jobs.gui.last_update()
1645 # Output the results
1646 today_jobs.write_all_results()