3 # Copyright (C) 2010-2013 CEA/DEN
5 # This library is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation; either
8 # version 2.1 of the License.
10 # This library is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 # Lesser General Public License for more details.
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this library; if not, write to the Free Software
17 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
29 STYLESHEET_GLOBAL = "jobs_global_report.xsl"
30 STYLESHEET_BOARD = "jobs_board_report.xsl"
35 parser = src.options.Options()
37 parser.add_option('n', 'name', 'string', 'jobs_cfg',
38 _('The name of the config file that contains'
39 ' the jobs configuration'))
40 parser.add_option('o', 'only_jobs', 'list2', 'only_jobs',
41 _('Optional: the list of jobs to launch, by their name. '))
42 parser.add_option('l', 'list', 'boolean', 'list',
43 _('Optional: list all available config files.'))
44 parser.add_option('t', 'test_connection', 'boolean', 'test_connection',
45 _("Optional: try to connect to the machines. "
46 "Not executing the jobs."),
48 parser.add_option('p', 'publish', 'boolean', 'publish',
49 _("Optional: generate an xml file that can be read in a "
50 "browser to display the jobs status."),
52 parser.add_option('i', 'input_boards', 'string', 'input_boards', _("Optional: "
53 "the path to csv file that contain "
54 "the expected boards."),"")
55 parser.add_option('n', 'completion', 'boolean', 'no_label',
56 _("Optional (internal use): do not print labels, Works only "
60 class Machine(object):
61 '''Class to manage a ssh connection on a machine
69 sat_path="salomeTools"):
73 self.distribution = None # Will be filled after copying SAT on the machine
75 self.password = passwd
76 self.sat_path = sat_path
77 self.ssh = paramiko.SSHClient()
78 self._connection_successful = None
80 def connect(self, logger):
81 '''Initiate the ssh connection to the remote machine
83 :param logger src.logger.Logger: The logger instance
88 self._connection_successful = False
89 self.ssh.load_system_host_keys()
90 self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
92 self.ssh.connect(self.host,
95 password = self.password)
96 except paramiko.AuthenticationException:
97 message = src.KO_STATUS + _("Authentication failed")
98 except paramiko.BadHostKeyException:
99 message = (src.KO_STATUS +
100 _("The server's host key could not be verified"))
101 except paramiko.SSHException:
102 message = ( _("SSHException error connecting or "
103 "establishing an SSH session"))
105 message = ( _("Error connecting or establishing an SSH session"))
107 self._connection_successful = True
111 def successfully_connected(self, logger):
112 '''Verify if the connection to the remote machine has succeed
114 :param logger src.logger.Logger: The logger instance
115 :return: True if the connection has succeed, False if not
118 if self._connection_successful == None:
119 message = _("Warning : trying to ask if the connection to "
120 "(name: %s host: %s, port: %s, user: %s) is OK whereas there were"
121 " no connection request" %
122 (self.name, self.host, self.port, self.user))
123 logger.write( src.printcolors.printcWarning(message))
124 return self._connection_successful
126 def copy_sat(self, sat_local_path, job_file):
127 '''Copy salomeTools to the remote machine in self.sat_path
131 # open a sftp connection
132 self.sftp = self.ssh.open_sftp()
133 # Create the sat directory on remote machine if it is not existing
134 self.mkdir(self.sat_path, ignore_existing=True)
136 self.put_dir(sat_local_path, self.sat_path, filters = ['.git'])
137 # put the job configuration file in order to make it reachable
138 # on the remote machine
139 self.sftp.put(job_file, os.path.join(".salomeTools",
141 ".jobs_command_file.pyconf"))
142 except Exception as e:
144 self._connection_successful = False
148 def put_dir(self, source, target, filters = []):
149 ''' Uploads the contents of the source directory to the target path. The
150 target directory needs to exists. All sub-directories in source are
151 created under target.
153 for item in os.listdir(source):
156 source_path = os.path.join(source, item)
157 destination_path = os.path.join(target, item)
158 if os.path.islink(source_path):
159 linkto = os.readlink(source_path)
161 self.sftp.symlink(linkto, destination_path)
162 self.sftp.chmod(destination_path,
163 os.stat(source_path).st_mode)
167 if os.path.isfile(source_path):
168 self.sftp.put(source_path, destination_path)
169 self.sftp.chmod(destination_path,
170 os.stat(source_path).st_mode)
172 self.mkdir(destination_path, ignore_existing=True)
173 self.put_dir(source_path, destination_path)
175 def mkdir(self, path, mode=511, ignore_existing=False):
176 ''' Augments mkdir by adding an option to not fail
180 self.sftp.mkdir(path, mode)
187 def exec_command(self, command, logger):
188 '''Execute the command on the remote machine
190 :param command str: The command to be run
191 :param logger src.logger.Logger: The logger instance
192 :return: the stdin, stdout, and stderr of the executing command,
194 :rtype: (paramiko.channel.ChannelFile, paramiko.channel.ChannelFile,
195 paramiko.channel.ChannelFile)
198 # Does not wait the end of the command
199 (stdin, stdout, stderr) = self.ssh.exec_command(command)
200 except paramiko.SSHException:
201 message = src.KO_STATUS + _(
202 ": the server failed to execute the command\n")
203 logger.write( src.printcolors.printcError(message))
204 return (None, None, None)
206 logger.write( src.printcolors.printcError(src.KO_STATUS + '\n'))
207 return (None, None, None)
209 return (stdin, stdout, stderr)
212 '''Close the ssh connection
218 def write_info(self, logger):
219 '''Prints the informations relative to the machine in the logger
220 (terminal traces and log file)
222 :param logger src.logger.Logger: The logger instance
226 logger.write("host : " + self.host + "\n")
227 logger.write("port : " + str(self.port) + "\n")
228 logger.write("user : " + str(self.user) + "\n")
229 if self.successfully_connected(logger):
230 status = src.OK_STATUS
232 status = src.KO_STATUS
233 logger.write("Connection : " + status + "\n\n")
237 '''Class to manage one job
239 def __init__(self, name, machine, application, board,
240 commands, timeout, config, logger, after=None):
243 self.machine = machine
245 self.timeout = timeout
246 self.application = application
250 # The list of log files to download from the remote machine
251 self.remote_log_files = []
253 # The remote command status
254 # -1 means that it has not been launched,
255 # 0 means success and 1 means fail
257 self.cancelled = False
261 self._has_begun = False
262 self._has_finished = False
263 self._has_timouted = False
264 self._stdin = None # Store the command inputs field
265 self._stdout = None # Store the command outputs field
266 self._stderr = None # Store the command errors field
271 self.commands = commands
272 self.command = (os.path.join(self.machine.sat_path, "sat") +
274 os.path.join(self.machine.sat_path,
275 "list_log_files.txt") +
276 " job --jobs_config .jobs_command_file" +
281 """ Get the pid(s) corresponding to the command that have been launched
282 On the remote machine
284 :return: The list of integers corresponding to the found pids
288 cmd_pid = 'ps aux | grep "' + self.command + '" | awk \'{print $2}\''
289 (_, out_pid, _) = self.machine.exec_command(cmd_pid, self.logger)
290 pids_cmd = out_pid.readlines()
291 pids_cmd = [str(src.only_numbers(pid)) for pid in pids_cmd]
295 def kill_remote_process(self, wait=1):
296 '''Kills the process on the remote machine.
298 :return: (the output of the kill, the error of the kill)
302 pids = self.get_pids()
303 cmd_kill = " ; ".join([("kill -2 " + pid) for pid in pids])
304 (_, out_kill, err_kill) = self.machine.exec_command(cmd_kill,
307 return (out_kill, err_kill)
310 '''Returns True if the job has already begun
312 :return: True if the job has already begun
315 return self._has_begun
317 def has_finished(self):
318 '''Returns True if the job has already finished
319 (i.e. all the commands have been executed)
320 If it is finished, the outputs are stored in the fields out and err.
322 :return: True if the job has already finished
326 # If the method has already been called and returned True
327 if self._has_finished:
330 # If the job has not begun yet
331 if not self.has_begun():
334 if self._stdout.channel.closed:
335 self._has_finished = True
336 # Store the result outputs
337 self.out += self._stdout.read().decode()
338 self.err += self._stderr.read().decode()
340 self._Tf = time.time()
341 # And get the remote command status and log files
344 return self._has_finished
346 def get_log_files(self):
347 """Get the log files produced by the command launched
348 on the remote machine, and put it in the log directory of the user,
349 so they can be accessible from
351 # Do not get the files if the command is not finished
352 if not self.has_finished():
353 msg = _("Trying to get log files whereas the job is not finished.")
354 self.logger.write(src.printcolors.printcWarning(msg))
357 # First get the file that contains the list of log files to get
358 tmp_file_path = src.get_tmp_filename(self.config, "list_log_files.txt")
359 remote_path = os.path.join(self.machine.sat_path, "list_log_files.txt")
360 self.machine.sftp.get(
364 # Read the file and get the result of the command and all the log files
366 fstream_tmp = open(tmp_file_path, "r")
367 file_lines = fstream_tmp.readlines()
368 file_lines = [line.replace("\n", "") for line in file_lines]
370 os.remove(tmp_file_path)
373 # The first line is the result of the command (0 success or 1 fail)
374 self.res_job = file_lines[0]
375 except Exception as e:
376 self.err += _("Unable to get status from remote file %s: %s" %
377 (remote_path, str(e)))
379 for i, job_path_remote in enumerate(file_lines[1:]):
381 # For each command, there is two files to get :
382 # 1- The xml file describing the command and giving the
384 # 2- The txt file containing the system command traces (like
385 # traces produced by the "make" command)
386 # 3- In case of the test command, there is another file to get :
387 # the xml board that contain the test results
388 dirname = os.path.basename(os.path.dirname(job_path_remote))
389 if dirname != 'OUT' and dirname != 'TEST':
391 local_path = os.path.join(os.path.dirname(
392 self.logger.logFilePath),
393 os.path.basename(job_path_remote))
394 if i==0: # The first is the job command
395 self.logger.add_link(os.path.basename(job_path_remote),
399 elif dirname == 'OUT':
401 local_path = os.path.join(os.path.dirname(
402 self.logger.logFilePath),
404 os.path.basename(job_path_remote))
405 elif dirname == 'TEST':
407 local_path = os.path.join(os.path.dirname(
408 self.logger.logFilePath),
410 os.path.basename(job_path_remote))
413 if not os.path.exists(local_path):
414 self.machine.sftp.get(job_path_remote, local_path)
415 self.remote_log_files.append(local_path)
416 except Exception as e:
417 self.err += _("Unable to get %s log file from remote: %s" %
418 (job_path_remote, str(e)))
420 def has_failed(self):
421 '''Returns True if the job has failed.
422 A job is considered as failed if the machine could not be reached,
423 if the remote command failed,
424 or if the job finished with a time out.
426 :return: True if the job has failed
429 if not self.has_finished():
431 if not self.machine.successfully_connected(self.logger):
433 if self.is_timeout():
435 if self.res_job == "1":
440 """In case of a failing job, one has to cancel every job that depend
441 on it. This method put the job as failed and will not be executed.
443 self._has_begun = True
444 self._has_finished = True
445 self.cancelled = True
446 self.out += _("This job was not launched because its father has failed.")
447 self.err += _("This job was not launched because its father has failed.")
449 def is_running(self):
450 '''Returns True if the job commands are running
452 :return: True if the job is running
455 return self.has_begun() and not self.has_finished()
457 def is_timeout(self):
458 '''Returns True if the job commands has finished with timeout
460 :return: True if the job has finished with timeout
463 return self._has_timouted
465 def time_elapsed(self):
466 """Get the time elapsed since the job launching
468 :return: The number of seconds
471 if not self.has_begun():
474 return T_now - self._T0
476 def check_time(self):
477 """Verify that the job has not exceeded its timeout.
478 If it has, kill the remote command and consider the job as finished.
480 if not self.has_begun():
482 if self.time_elapsed() > self.timeout:
483 self._has_finished = True
484 self._has_timouted = True
485 self._Tf = time.time()
487 (out_kill, _) = self.kill_remote_process()
488 self.out += "TIMEOUT \n" + out_kill.read().decode()
489 self.err += "TIMEOUT : %s seconds elapsed\n" % str(self.timeout)
492 except Exception as e:
493 self.err += _("Unable to get remote log files: %s" % e)
495 def total_duration(self):
496 """Give the total duration of the job
498 :return: the total duration of the job in seconds
501 return self._Tf - self._T0
504 """Launch the job by executing the remote command.
507 # Prevent multiple run
509 msg = _("Warning: A job can only be launched one time")
510 msg2 = _("Trying to launch the job \"%s\" whereas it has "
511 "already been launched." % self.name)
512 self.logger.write(src.printcolors.printcWarning("%s\n%s\n" % (msg,
516 # Do not execute the command if the machine could not be reached
517 if not self.machine.successfully_connected(self.logger):
518 self._has_finished = True
520 self.err += ("Connection to machine (name : %s, host: %s, port:"
521 " %s, user: %s) has failed\nUse the log command "
522 "to get more information."
523 % (self.machine.name,
528 # Usual case : Launch the command on remote machine
529 self._T0 = time.time()
530 self._stdin, self._stdout, self._stderr = self.machine.exec_command(
533 # If the results are not initialized, finish the job
534 if (self._stdin, self._stdout, self._stderr) == (None, None, None):
535 self._has_finished = True
536 self._Tf = time.time()
538 self.err += "The server failed to execute the command"
540 # Put the beginning flag to true.
541 self._has_begun = True
543 def write_results(self):
544 """Display on the terminal all the job's information
546 self.logger.write("name : " + self.name + "\n")
548 self.logger.write("after : %s\n" % self.after)
549 self.logger.write("Time elapsed : %4imin %2is \n" %
550 (self.total_duration()//60 , self.total_duration()%60))
552 self.logger.write("Begin time : %s\n" %
553 time.strftime('%Y-%m-%d %H:%M:%S',
554 time.localtime(self._T0)) )
556 self.logger.write("End time : %s\n\n" %
557 time.strftime('%Y-%m-%d %H:%M:%S',
558 time.localtime(self._Tf)) )
560 machine_head = "Informations about connection :\n"
561 underline = (len(machine_head) - 2) * "-"
562 self.logger.write(src.printcolors.printcInfo(
563 machine_head+underline+"\n"))
564 self.machine.write_info(self.logger)
566 self.logger.write(src.printcolors.printcInfo("out : \n"))
568 self.logger.write("Unable to get output\n")
570 self.logger.write(self.out + "\n")
571 self.logger.write(src.printcolors.printcInfo("err : \n"))
572 self.logger.write(self.err + "\n")
574 def get_status(self):
575 """Get the status of the job (used by the Gui for xml display)
577 :return: The current status of the job
580 if not self.machine.successfully_connected(self.logger):
581 return "SSH connection KO"
582 if not self.has_begun():
583 return "Not launched"
586 if self.is_running():
587 return "running since " + time.strftime('%Y-%m-%d %H:%M:%S',
588 time.localtime(self._T0))
589 if self.has_finished():
590 if self.is_timeout():
591 return "Timeout since " + time.strftime('%Y-%m-%d %H:%M:%S',
592 time.localtime(self._Tf))
593 return "Finished since " + time.strftime('%Y-%m-%d %H:%M:%S',
594 time.localtime(self._Tf))
597 '''Class to manage the jobs to be run
604 lenght_columns = 20):
605 # The jobs configuration
606 self.cfg_jobs = config_jobs
607 self.job_file_path = job_file_path
608 # The machine that will be used today
610 # The list of machine (hosts, port) that will be used today
611 # (a same host can have several machine instances since there
612 # can be several ssh parameters)
614 # The jobs to be launched today
616 # The jobs that will not be launched today
617 self.ljobs_not_today = []
620 self.len_columns = lenght_columns
622 # the list of jobs that have not been run yet
623 self._l_jobs_not_started = []
624 # the list of jobs that have already ran
625 self._l_jobs_finished = []
626 # the list of jobs that are running
627 self._l_jobs_running = []
629 self.determine_jobs_and_machines()
631 def define_job(self, job_def, machine):
632 '''Takes a pyconf job definition and a machine (from class machine)
633 and returns the job instance corresponding to the definition.
635 :param job_def src.config.Mapping: a job definition
636 :param machine machine: the machine on which the job will run
637 :return: The corresponding job in a job class instance
641 cmmnds = job_def.commands
642 timeout = job_def.timeout
644 if 'after' in job_def:
645 after = job_def.after
647 if 'application' in job_def:
648 application = job_def.application
650 if 'board' in job_def:
651 board = job_def.board
663 def determine_jobs_and_machines(self):
664 '''Function that reads the pyconf jobs definition and instantiates all
665 the machines and jobs to be done today.
670 today = datetime.date.weekday(datetime.date.today())
673 for job_def in self.cfg_jobs.jobs :
675 if not "machine" in job_def:
676 msg = _('WARNING: The job "%s" do not have the key '
677 '"machine", this job is ignored.\n\n' % job_def.name)
678 self.logger.write(src.printcolors.printcWarning(msg))
680 name_machine = job_def.machine
683 for mach in self.lmachines:
684 if mach.name == name_machine:
688 if a_machine == None:
689 for machine_def in self.cfg_jobs.machines:
690 if machine_def.name == name_machine:
691 if 'host' not in machine_def:
692 host = self.runner.cfg.VARS.hostname
694 host = machine_def.host
696 if 'user' not in machine_def:
697 user = self.runner.cfg.VARS.user
699 user = machine_def.user
701 if 'port' not in machine_def:
704 port = machine_def.port
706 if 'password' not in machine_def:
709 passwd = machine_def.password
711 if 'sat_path' not in machine_def:
712 sat_path = "salomeTools"
714 sat_path = machine_def.sat_path
725 self.lmachines.append(a_machine)
726 if (host, port) not in host_list:
727 host_list.append((host, port))
729 if a_machine == None:
730 msg = _("WARNING: The job \"%(job_name)s\" requires the "
731 "machine \"%(machine_name)s\" but this machine "
732 "is not defined in the configuration file.\n"
733 "The job will not be launched")
734 self.logger.write(src.printcolors.printcWarning(msg))
736 a_job = self.define_job(job_def, a_machine)
738 if today in job_def.when:
739 self.ljobs.append(a_job)
740 else: # today in job_def.when
741 self.ljobs_not_today.append(a_job)
743 self.lhosts = host_list
745 def ssh_connection_all_machines(self, pad=50):
746 '''Function that do the ssh connection to every machine
752 self.logger.write(src.printcolors.printcInfo((
753 "Establishing connection with all the machines :\n")))
754 for machine in self.lmachines:
755 # little algorithm in order to display traces
756 begin_line = (_("Connection to %s: " % machine.name))
757 if pad - len(begin_line) < 0:
760 endline = (pad - len(begin_line)) * "." + " "
762 step = "SSH connection"
763 self.logger.write( begin_line + endline + step)
765 # the call to the method that initiate the ssh connection
766 msg = machine.connect(self.logger)
768 # Copy salomeTools to the remote machine
769 if machine.successfully_connected(self.logger):
771 self.logger.write('\r%s%s%s' % (begin_line, endline, 20 * " "),3)
772 self.logger.write('\r%s%s%s' % (begin_line, endline, step), 3)
774 res_copy = machine.copy_sat(self.runner.cfg.VARS.salometoolsway,
776 # get the remote machine distribution using a sat command
777 (__, out_dist, __) = machine.exec_command(
778 os.path.join(machine.sat_path,
779 "sat config --value VARS.dist --no_label"),
781 machine.distribution = out_dist.read().decode().replace("\n",
783 # Print the status of the copy
785 self.logger.write('\r%s' %
786 ((len(begin_line)+len(endline)+20) * " "), 3)
787 self.logger.write('\r%s%s%s' %
790 src.printcolors.printc(src.OK_STATUS)), 3)
792 self.logger.write('\r%s' %
793 ((len(begin_line)+len(endline)+20) * " "), 3)
794 self.logger.write('\r%s%s%s %s' %
797 src.printcolors.printc(src.OK_STATUS),
798 _("Copy of SAT failed")), 3)
800 self.logger.write('\r%s' %
801 ((len(begin_line)+len(endline)+20) * " "), 3)
802 self.logger.write('\r%s%s%s %s' %
805 src.printcolors.printc(src.KO_STATUS),
807 self.logger.write("\n", 3)
809 self.logger.write("\n")
812 def is_occupied(self, hostname):
813 '''Function that returns True if a job is running on
814 the machine defined by its host and its port.
816 :param hostname (str, int): the pair (host, port)
817 :return: the job that is running on the host,
818 or false if there is no job running on the host.
823 for jb in self.ljobs:
824 if jb.machine.host == host and jb.machine.port == port:
829 def update_jobs_states_list(self):
830 '''Function that updates the lists that store the currently
831 running jobs and the jobs that have already finished.
836 jobs_finished_list = []
837 jobs_running_list = []
838 for jb in self.ljobs:
840 jobs_running_list.append(jb)
842 if jb.has_finished():
843 jobs_finished_list.append(jb)
845 nb_job_finished_before = len(self._l_jobs_finished)
846 self._l_jobs_finished = jobs_finished_list
847 self._l_jobs_running = jobs_running_list
849 nb_job_finished_now = len(self._l_jobs_finished)
851 return nb_job_finished_now > nb_job_finished_before
853 def cancel_dependencies_of_failing_jobs(self):
854 '''Function that cancels all the jobs that depend on a failing one.
860 for job in self.ljobs:
861 if job.after is None:
863 father_job = self.find_job_that_has_name(job.after)
864 if father_job is not None and father_job.has_failed():
867 def find_job_that_has_name(self, name):
868 '''Returns the job by its name.
870 :param name str: a job name
871 :return: the job that has the name.
874 for jb in self.ljobs:
877 # the following is executed only if the job was not found
880 def str_of_length(self, text, length):
881 '''Takes a string text of any length and returns
882 the most close string of length "length".
884 :param text str: any string
885 :param length int: a length for the returned string
886 :return: the most close string of length "length"
889 if len(text) > length:
890 text_out = text[:length-3] + '...'
892 diff = length - len(text)
893 before = " " * (diff//2)
894 after = " " * (diff//2 + diff%2)
895 text_out = before + text + after
899 def display_status(self, len_col):
900 '''Takes a lenght and construct the display of the current status
901 of the jobs in an array that has a column for each host.
902 It displays the job that is currently running on the host
905 :param len_col int: the size of the column
911 for host_port in self.lhosts:
912 jb = self.is_occupied(host_port)
913 if not jb: # nothing running on the host
914 empty = self.str_of_length("empty", len_col)
915 display_line += "|" + empty
917 display_line += "|" + src.printcolors.printcInfo(
918 self.str_of_length(jb.name, len_col))
920 self.logger.write("\r" + display_line + "|")
925 '''The main method. Runs all the jobs on every host.
926 For each host, at a given time, only one job can be running.
927 The jobs that have the field after (that contain the job that has
928 to be run before it) are run after the previous job.
929 This method stops when all the jobs are finished.
936 self.logger.write(src.printcolors.printcInfo(
937 _('Executing the jobs :\n')))
939 for host_port in self.lhosts:
942 if port == 22: # default value
943 text_line += "|" + self.str_of_length(host, self.len_columns)
945 text_line += "|" + self.str_of_length(
946 "("+host+", "+str(port)+")", self.len_columns)
948 tiret_line = " " + "-"*(len(text_line)-1) + "\n"
949 self.logger.write(tiret_line)
950 self.logger.write(text_line + "|\n")
951 self.logger.write(tiret_line)
954 # The infinite loop that runs the jobs
955 l_jobs_not_started = src.deepcopy_list(self.ljobs)
956 while len(self._l_jobs_finished) != len(self.ljobs):
957 new_job_start = False
958 for host_port in self.lhosts:
960 if self.is_occupied(host_port):
963 for jb in l_jobs_not_started:
964 if (jb.machine.host, jb.machine.port) != host_port:
968 l_jobs_not_started.remove(jb)
972 jb_before = self.find_job_that_has_name(jb.after)
973 if jb_before is None:
975 msg = _("This job was not launched because its "
976 "father is not in the jobs list.")
980 if jb_before.has_finished():
982 l_jobs_not_started.remove(jb)
985 self.cancel_dependencies_of_failing_jobs()
986 new_job_finished = self.update_jobs_states_list()
988 if new_job_start or new_job_finished:
990 self.gui.update_xml_files(self.ljobs)
991 # Display the current status
992 self.display_status(self.len_columns)
994 # Make sure that the proc is not entirely busy
997 self.logger.write("\n")
998 self.logger.write(tiret_line)
999 self.logger.write("\n\n")
1002 self.gui.update_xml_files(self.ljobs)
1003 self.gui.last_update()
1005 def write_all_results(self):
1006 '''Display all the jobs outputs.
1012 for jb in self.ljobs:
1013 self.logger.write(src.printcolors.printcLabel(
1014 "#------- Results for job %s -------#\n" % jb.name))
1016 self.logger.write("\n\n")
1019 '''Class to manage the the xml data that can be displayed in a browser to
1023 def __init__(self, xml_dir_path, l_jobs, l_jobs_not_today, file_boards=""):
1026 :param xml_dir_path str: The path to the directory where to put
1027 the xml resulting files
1028 :param l_jobs List: the list of jobs that run today
1029 :param l_jobs_not_today List: the list of jobs that do not run today
1030 :param file_boards str: the file path from which to read the
1033 # The path of the csv files to read to fill the expected boards
1034 self.file_boards = file_boards
1036 today = datetime.date.weekday(datetime.date.today())
1037 self.parse_csv_boards(today)
1039 # The path of the global xml file
1040 self.xml_dir_path = xml_dir_path
1041 # Initialize the xml files
1042 xml_global_path = os.path.join(self.xml_dir_path, "global_report.xml")
1043 self.xml_global_file = src.xmlManager.XmlLogFile(xml_global_path,
1045 # The xml files that corresponds to the boards.
1046 # {name_board : xml_object}}
1047 self.d_xml_board_files = {}
1048 # Create the lines and columns
1049 self.initialize_boards(l_jobs, l_jobs_not_today)
1051 # Write the xml file
1052 self.update_xml_files(l_jobs)
1054 def add_xml_board(self, name):
1055 xml_board_path = os.path.join(self.xml_dir_path, name + ".xml")
1056 self.d_xml_board_files[name] = src.xmlManager.XmlLogFile(
1059 self.d_xml_board_files[name].add_simple_node("distributions")
1060 self.d_xml_board_files[name].add_simple_node("applications")
1061 self.d_xml_board_files[name].add_simple_node("board", text=name)
1063 def initialize_boards(self, l_jobs, l_jobs_not_today):
1064 '''Get all the first information needed for each file and write the
1065 first version of the files
1066 :param l_jobs List: the list of jobs that run today
1067 :param l_jobs_not_today List: the list of jobs that do not run today
1069 # Get the boards to fill and put it in a dictionary
1070 # {board_name : xml instance corresponding to the board}
1071 for job in l_jobs + l_jobs_not_today:
1073 if (board is not None and
1074 board not in self.d_xml_board_files.keys()):
1075 self.add_xml_board(board)
1077 # Verify that the boards given as input are done
1078 for board in list(self.d_input_boards.keys()):
1079 if board not in self.d_xml_board_files:
1080 self.add_xml_board(board)
1081 root_node = self.d_xml_board_files[board].xmlroot
1082 src.xmlManager.append_node_attrib(root_node,
1083 {"input_file" : self.file_boards})
1085 # Loop over all jobs in order to get the lines and columns for each
1089 for board in self.d_xml_board_files:
1091 d_application[board] = []
1095 for job in l_jobs + l_jobs_not_today:
1097 if (job.machine.host, job.machine.port) not in l_hosts_ports:
1098 l_hosts_ports.append((job.machine.host, job.machine.port))
1100 distrib = job.machine.distribution
1101 application = job.application
1103 board_job = job.board
1106 for board in self.d_xml_board_files:
1107 if board_job == board:
1108 if distrib is not None and distrib not in d_dist[board]:
1109 d_dist[board].append(distrib)
1110 src.xmlManager.add_simple_node(
1111 self.d_xml_board_files[board].xmlroot.find(
1114 attrib={"name" : distrib})
1116 if board_job == board:
1117 if (application is not None and
1118 application not in d_application[board]):
1119 d_application[board].append(application)
1120 src.xmlManager.add_simple_node(
1121 self.d_xml_board_files[board].xmlroot.find(
1125 "name" : application})
1127 # Verify that there are no missing application or distribution in the
1128 # xml board files (regarding the input boards)
1129 for board in self.d_xml_board_files:
1130 l_dist = d_dist[board]
1131 if board not in self.d_input_boards.keys():
1133 for dist in self.d_input_boards[board]["rows"]:
1134 if dist not in l_dist:
1135 src.xmlManager.add_simple_node(
1136 self.d_xml_board_files[board].xmlroot.find(
1139 attrib={"name" : dist})
1140 l_appli = d_application[board]
1141 for appli in self.d_input_boards[board]["columns"]:
1142 if appli not in l_appli:
1143 src.xmlManager.add_simple_node(
1144 self.d_xml_board_files[board].xmlroot.find(
1147 attrib={"name" : appli})
1149 # Initialize the hosts_ports node for the global file
1150 self.xmlhosts_ports = self.xml_global_file.add_simple_node(
1152 for host, port in l_hosts_ports:
1153 host_port = "%s:%i" % (host, port)
1154 src.xmlManager.add_simple_node(self.xmlhosts_ports,
1156 attrib={"name" : host_port})
1158 # Initialize the jobs node in all files
1159 for xml_file in [self.xml_global_file] + list(
1160 self.d_xml_board_files.values()):
1161 xml_jobs = xml_file.add_simple_node("jobs")
1162 # Get the jobs present in the config file but
1163 # that will not be launched today
1164 self.put_jobs_not_today(l_jobs_not_today, xml_jobs)
1166 xml_file.add_simple_node("infos",
1167 attrib={"name" : "last update",
1168 "JobsCommandStatus" : "running"})
1170 # Find in each board the squares that needs to be filled regarding the
1171 # input csv files but that are not covered by a today job
1172 for board in self.d_input_boards.keys():
1173 xml_root_board = self.d_xml_board_files[board].xmlroot
1174 xml_missing = src.xmlManager.add_simple_node(xml_root_board,
1176 for row, column in self.d_input_boards[board]["jobs"]:
1179 if (job.application == column and
1180 job.machine.distribution == row):
1184 src.xmlManager.add_simple_node(xml_missing,
1186 attrib={"distribution" : row,
1187 "application" : column })
1189 def put_jobs_not_today(self, l_jobs_not_today, xml_node_jobs):
1190 '''Get all the first information needed for each file and write the
1191 first version of the files
1193 :param xml_node_jobs etree.Element: the node corresponding to a job
1194 :param l_jobs_not_today List: the list of jobs that do not run today
1196 for job in l_jobs_not_today:
1197 xmlj = src.xmlManager.add_simple_node(xml_node_jobs,
1199 attrib={"name" : job.name})
1200 src.xmlManager.add_simple_node(xmlj, "application", job.application)
1201 src.xmlManager.add_simple_node(xmlj,
1203 job.machine.distribution)
1204 src.xmlManager.add_simple_node(xmlj, "board", job.board)
1205 src.xmlManager.add_simple_node(xmlj,
1206 "commands", " ; ".join(job.commands))
1207 src.xmlManager.add_simple_node(xmlj, "state", "Not today")
1208 src.xmlManager.add_simple_node(xmlj, "machine", job.machine.name)
1209 src.xmlManager.add_simple_node(xmlj, "host", job.machine.host)
1210 src.xmlManager.add_simple_node(xmlj, "port", str(job.machine.port))
1211 src.xmlManager.add_simple_node(xmlj, "user", job.machine.user)
1212 src.xmlManager.add_simple_node(xmlj, "sat_path",
1213 job.machine.sat_path)
1215 def parse_csv_boards(self, today):
1216 """ Parse the csv files that describes the boards to produce and fill
1217 the dict d_input_boards that contain the csv file contain
1219 :param today int: the current day of the week
1221 # loop over each csv file and read its content
1223 with open(self.file_boards, 'r') as f:
1224 reader = csv.reader(f,delimiter=CSV_DELIMITER)
1227 # get the delimiter for the boards (empty line)
1228 boards_delimiter = [''] * len(l_read[0])
1229 # Make the list of boards, by splitting with the delimiter
1230 l_boards = [list(y) for x, y in itertools.groupby(l_read,
1231 lambda z: z == boards_delimiter) if not x]
1233 # loop over the csv lists of lines and get the rows, columns and jobs
1235 for input_board in l_boards:
1237 board_name = input_board[0][0]
1240 columns = input_board[0][1:]
1245 for line in input_board[1:]:
1247 for i, square in enumerate(line[1:]):
1250 days = square.split(DAYS_SEPARATOR)
1251 days = [int(day) for day in days]
1255 if columns[i] not in columns_out:
1256 columns_out.append(columns[i])
1257 job = (row, columns[i])
1260 d_boards[board_name] = {"rows" : rows,
1261 "columns" : columns_out,
1264 self.d_input_boards = d_boards
1266 def update_xml_files(self, l_jobs):
1267 '''Write all the xml files with updated information about the jobs
1269 :param l_jobs List: the list of jobs that run today
1271 for xml_file in [self.xml_global_file] + list(
1272 self.d_xml_board_files.values()):
1273 self.update_xml_file(l_jobs, xml_file)
1276 self.write_xml_files()
1278 def update_xml_file(self, l_jobs, xml_file):
1279 '''update information about the jobs for the file xml_file
1281 :param l_jobs List: the list of jobs that run today
1282 :param xml_file xmlManager.XmlLogFile: the xml instance to update
1285 xml_node_jobs = xml_file.xmlroot.find('jobs')
1286 # Update the job names and status node
1288 # Find the node corresponding to the job and delete it
1289 # in order to recreate it
1290 for xmljob in xml_node_jobs.findall('job'):
1291 if xmljob.attrib['name'] == job.name:
1292 xml_node_jobs.remove(xmljob)
1296 T0 = time.strftime('%Y-%m-%d %H:%M:%S',
1297 time.localtime(job._T0))
1300 Tf = time.strftime('%Y-%m-%d %H:%M:%S',
1301 time.localtime(job._Tf))
1303 # recreate the job node
1304 xmlj = src.xmlManager.add_simple_node(xml_node_jobs,
1306 attrib={"name" : job.name})
1307 src.xmlManager.add_simple_node(xmlj, "machine", job.machine.name)
1308 src.xmlManager.add_simple_node(xmlj, "host", job.machine.host)
1309 src.xmlManager.add_simple_node(xmlj, "port", str(job.machine.port))
1310 src.xmlManager.add_simple_node(xmlj, "user", job.machine.user)
1311 src.xmlManager.add_simple_node(xmlj, "sat_path",
1312 job.machine.sat_path)
1313 src.xmlManager.add_simple_node(xmlj, "application", job.application)
1314 src.xmlManager.add_simple_node(xmlj, "distribution",
1315 job.machine.distribution)
1316 src.xmlManager.add_simple_node(xmlj, "board", job.board)
1317 src.xmlManager.add_simple_node(xmlj, "timeout", str(job.timeout))
1318 src.xmlManager.add_simple_node(xmlj, "commands",
1319 " ; ".join(job.commands))
1320 src.xmlManager.add_simple_node(xmlj, "state", job.get_status())
1321 src.xmlManager.add_simple_node(xmlj, "begin", T0)
1322 src.xmlManager.add_simple_node(xmlj, "end", Tf)
1323 src.xmlManager.add_simple_node(xmlj, "out",
1324 src.printcolors.cleancolor(job.out))
1325 src.xmlManager.add_simple_node(xmlj, "err",
1326 src.printcolors.cleancolor(job.err))
1327 src.xmlManager.add_simple_node(xmlj, "res", str(job.res_job))
1328 if len(job.remote_log_files) > 0:
1329 src.xmlManager.add_simple_node(xmlj,
1330 "remote_log_file_path",
1331 job.remote_log_files[0])
1333 src.xmlManager.add_simple_node(xmlj,
1334 "remote_log_file_path",
1337 xmlafter = src.xmlManager.add_simple_node(xmlj, "after", job.after)
1338 # get the job father
1339 if job.after is not None:
1342 if jb.name == job.after:
1345 if (job_father is not None and
1346 len(job_father.remote_log_files) > 0):
1347 link = job_father.remote_log_files[0]
1350 src.xmlManager.append_node_attrib(xmlafter, {"link" : link})
1352 # Verify that the job is to be done today regarding the input csv
1354 if job.board and job.board in self.d_input_boards.keys():
1356 for dist, appli in self.d_input_boards[job.board]["jobs"]:
1357 if (job.machine.distribution == dist
1358 and job.application == appli):
1360 src.xmlManager.add_simple_node(xmlj,
1365 src.xmlManager.add_simple_node(xmlj,
1371 xml_node_infos = xml_file.xmlroot.find('infos')
1372 src.xmlManager.append_node_attrib(xml_node_infos,
1374 datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")})
1378 def last_update(self, finish_status = "finished"):
1379 '''update information about the jobs for the file xml_file
1381 :param l_jobs List: the list of jobs that run today
1382 :param xml_file xmlManager.XmlLogFile: the xml instance to update
1384 for xml_file in [self.xml_global_file] + list(self.d_xml_board_files.values()):
1385 xml_node_infos = xml_file.xmlroot.find('infos')
1386 src.xmlManager.append_node_attrib(xml_node_infos,
1387 attrib={"JobsCommandStatus" : finish_status})
1389 self.write_xml_files()
1391 def write_xml_files(self):
1392 ''' Write the xml files
1394 self.xml_global_file.write_tree(STYLESHEET_GLOBAL)
1395 for xml_file in self.d_xml_board_files.values():
1396 xml_file.write_tree(STYLESHEET_BOARD)
1399 # Describes the command
1401 return _("The jobs command launches maintenances that are described"
1402 " in the dedicated jobs configuration file.")
1406 def run(args, runner, logger):
1408 (options, args) = parser.parse_args(args)
1410 l_cfg_dir = runner.cfg.PATHS.JOBPATH
1412 # list option : display all the available config files
1414 for cfg_dir in l_cfg_dir:
1415 if not options.no_label:
1416 logger.write("------ %s\n" %
1417 src.printcolors.printcHeader(cfg_dir))
1419 for f in sorted(os.listdir(cfg_dir)):
1420 if not f.endswith('.pyconf'):
1423 logger.write("%s\n" % cfilename)
1426 # Make sure the jobs_config option has been called
1427 if not options.jobs_cfg:
1428 message = _("The option --jobs_config is required\n")
1429 src.printcolors.printcError(message)
1432 # Find the file in the directories
1434 for cfg_dir in l_cfg_dir:
1435 file_jobs_cfg = os.path.join(cfg_dir, options.jobs_cfg)
1436 if not file_jobs_cfg.endswith('.pyconf'):
1437 file_jobs_cfg += '.pyconf'
1439 if not os.path.exists(file_jobs_cfg):
1446 msg = _("The file configuration %(name_file)s was not found."
1447 "\nUse the --list option to get the possible files.")
1448 src.printcolors.printcError(msg)
1452 (_("Platform"), runner.cfg.VARS.dist),
1453 (_("File containing the jobs configuration"), file_jobs_cfg)
1455 src.print_info(logger, info)
1457 # Read the config that is in the file
1458 config_jobs = src.read_config_from_a_file(file_jobs_cfg)
1459 if options.only_jobs:
1460 l_jb = src.pyconf.Sequence()
1461 for jb in config_jobs.jobs:
1462 if jb.name in options.only_jobs:
1464 "Adding a job that was given in only_jobs option parameters")
1465 config_jobs.jobs = l_jb
1468 today_jobs = Jobs(runner,
1472 # SSH connection to all machines
1473 today_jobs.ssh_connection_all_machines()
1474 if options.test_connection:
1479 # Copy the stylesheets in the log directory
1480 log_dir = runner.cfg.SITE.log.log_dir
1481 xsl_dir = os.path.join(runner.cfg.VARS.srcDir, 'xsl')
1483 files_to_copy.append(os.path.join(xsl_dir, STYLESHEET_GLOBAL))
1484 files_to_copy.append(os.path.join(xsl_dir, STYLESHEET_BOARD))
1485 files_to_copy.append(os.path.join(xsl_dir, "running.gif"))
1486 for file_path in files_to_copy:
1487 shutil.copy2(file_path, log_dir)
1489 # Instanciate the Gui in order to produce the xml files that contain all
1491 gui = Gui(runner.cfg.SITE.log.log_dir,
1493 today_jobs.ljobs_not_today,
1494 file_boards = options.input_boards)
1496 # Display the list of the xml files
1497 logger.write(src.printcolors.printcInfo(("Here is the list of published"
1499 logger.write("%s\n" % gui.xml_global_file.logFile, 4)
1500 for board in gui.d_xml_board_files.keys():
1501 logger.write("%s\n" % gui.d_xml_board_files[board].logFile, 4)
1503 logger.write("\n", 4)
1505 today_jobs.gui = gui
1509 # Run all the jobs contained in config_jobs
1510 today_jobs.run_jobs()
1511 except KeyboardInterrupt:
1513 logger.write("\n\n%s\n\n" %
1514 (src.printcolors.printcWarning(_("Forced interruption"))), 1)
1517 msg = _("Killing the running jobs and trying"
1518 " to get the corresponding logs\n")
1519 logger.write(src.printcolors.printcWarning(msg))
1521 # find the potential not finished jobs and kill them
1522 for jb in today_jobs.ljobs:
1523 if not jb.has_finished():
1525 jb.kill_remote_process()
1526 except Exception as e:
1527 msg = _("Failed to kill job %s: %s\n" % (jb.name, e))
1528 logger.write(src.printcolors.printcWarning(msg))
1531 today_jobs.gui.last_update(_("Forced interruption"))
1534 today_jobs.gui.last_update()
1535 # Output the results
1536 today_jobs.write_all_results()