X-Git-Url: http://git.salome-platform.org/gitweb/?a=blobdiff_plain;f=commands%2Fjobs.py;h=81a08db1248d5a5484a32a3d1602113d757918bb;hb=981fd24a20e3158c625ac025d5b3687e375e9ff2;hp=04a238513598757ca1f6a644d5ebb3faef1b7120;hpb=7f04114e1b2d7a21f0a979a4f969cc5c6d59d9ee;p=tools%2Fsat.git diff --git a/commands/jobs.py b/commands/jobs.py index 04a2385..81a08db 100644 --- a/commands/jobs.py +++ b/commands/jobs.py @@ -19,37 +19,58 @@ import os import datetime import time +import csv +import shutil +import itertools import paramiko import src +STYLESHEET_GLOBAL = "jobs_global_report.xsl" +STYLESHEET_BOARD = "jobs_board_report.xsl" + +DAYS_SEPARATOR = "," +CSV_DELIMITER = ";" parser = src.options.Options() -parser.add_option('j', 'jobs_config', 'string', 'jobs_cfg', +parser.add_option('n', 'name', 'string', 'jobs_cfg', _('The name of the config file that contains' ' the jobs configuration')) parser.add_option('o', 'only_jobs', 'list2', 'only_jobs', - _('The list of jobs to launch, by their name. ')) + _('Optional: the list of jobs to launch, by their name. ')) parser.add_option('l', 'list', 'boolean', 'list', - _('list all available config files.')) -parser.add_option('n', 'no_label', 'boolean', 'no_label', - _("do not print labels, Works only with --list."), False) + _('Optional: list all available config files.')) parser.add_option('t', 'test_connection', 'boolean', 'test_connection', - _("Try to connect to the machines. Not executing the jobs."), + _("Optional: try to connect to the machines. " + "Not executing the jobs."), False) parser.add_option('p', 'publish', 'boolean', 'publish', - _("Generate an xml file that can be read in a browser to " - "display the jobs status."), + _("Optional: generate an xml file that can be read in a " + "browser to display the jobs status."), + False) +parser.add_option('i', 'input_boards', 'string', 'input_boards', _("Optional: " + "the path to csv file that contain " + "the expected boards."),"") +parser.add_option('n', 'completion', 'boolean', 'no_label', + _("Optional (internal use): do not print labels, Works only " + "with --list."), False) -class machine(object): +class Machine(object): '''Class to manage a ssh connection on a machine ''' - def __init__(self, name, host, user, port=22, passwd=None, sat_path="salomeTools"): + def __init__(self, + name, + host, + user, + port=22, + passwd=None, + sat_path="salomeTools"): self.name = name self.host = host self.port = port + self.distribution = None # Will be filled after copying SAT on the machine self.user = user self.password = passwd self.sat_path = sat_path @@ -95,10 +116,10 @@ class machine(object): :rtype: bool ''' if self._connection_successful == None: - message = "Warning : trying to ask if the connection to " - "(host: %s, port: %s, user: %s) is OK whereas there were" - " no connection request" % \ - (machine.host, machine.port, machine.user) + message = _("Warning : trying to ask if the connection to " + "(name: %s host: %s, port: %s, user: %s) is OK whereas there were" + " no connection request" % + (self.name, self.host, self.port, self.user)) logger.write( src.printcolors.printcWarning(message)) return self._connection_successful @@ -107,14 +128,17 @@ class machine(object): ''' res = 0 try: + # open a sftp connection self.sftp = self.ssh.open_sftp() + # Create the sat directory on remote machine if it is not existing self.mkdir(self.sat_path, ignore_existing=True) + # Put sat self.put_dir(sat_local_path, self.sat_path, filters = ['.git']) - job_file_name = os.path.basename(job_file) - self.sftp.put(job_file, os.path.join(self.sat_path, - "data", - "jobs", - job_file_name)) + # put the job configuration file in order to make it reachable + # on the remote machine + self.sftp.put(job_file, os.path.join(".salomeTools", + "Jobs", + ".jobs_command_file.pyconf")) except Exception as e: res = str(e) self._connection_successful = False @@ -123,7 +147,7 @@ class machine(object): def put_dir(self, source, target, filters = []): ''' Uploads the contents of the source directory to the target path. The - target directory needs to exists. All subdirectories in source are + target directory needs to exists. All sub-directories in source are created under target. ''' for item in os.listdir(source): @@ -209,18 +233,18 @@ class machine(object): logger.write("Connection : " + status + "\n\n") -class job(object): +class Job(object): '''Class to manage one job ''' - def __init__(self, name, machine, application, distribution, - commands, timeout, config, logger, job_file, after=None): + def __init__(self, name, machine, application, board, + commands, timeout, config, logger, after=None): self.name = name self.machine = machine self.after = after self.timeout = timeout self.application = application - self.distribution = distribution + self.board = board self.config = config self.logger = logger # The list of log files to download from the remote machine @@ -241,20 +265,25 @@ class job(object): self._stdout = None # Store the command outputs field self._stderr = None # Store the command errors field - self.out = None # Contains something only if the job is finished - self.err = None # Contains something only if the job is finished + self.out = "" + self.err = "" self.commands = commands self.command = (os.path.join(self.machine.sat_path, "sat") + " -l " + os.path.join(self.machine.sat_path, "list_log_files.txt") + - " job --jobs_config " + - job_file + - " --job " + + " job --jobs_config .jobs_command_file" + + " --name " + self.name) def get_pids(self): + """ Get the pid(s) corresponding to the command that have been launched + On the remote machine + + :return: The list of integers corresponding to the found pids + :rtype: List + """ pids = [] cmd_pid = 'ps aux | grep "' + self.command + '" | awk \'{print $2}\'' (_, out_pid, _) = self.machine.exec_command(cmd_pid, self.logger) @@ -263,7 +292,7 @@ class job(object): pids+=pids_cmd return pids - def kill_remote_process(self): + def kill_remote_process(self, wait=1): '''Kills the process on the remote machine. :return: (the output of the kill, the error of the kill) @@ -274,6 +303,7 @@ class job(object): cmd_kill = " ; ".join([("kill -2 " + pid) for pid in pids]) (_, out_kill, err_kill) = self.machine.exec_command(cmd_kill, self.logger) + time.sleep(wait) return (out_kill, err_kill) def has_begun(self): @@ -304,8 +334,8 @@ class job(object): if self._stdout.channel.closed: self._has_finished = True # Store the result outputs - self.out = self._stdout.read() - self.err = self._stderr.read() + self.out += self._stdout.read().decode() + self.err += self._stderr.read().decode() # Put end time self._Tf = time.time() # And get the remote command status and log files @@ -314,37 +344,79 @@ class job(object): return self._has_finished def get_log_files(self): + """Get the log files produced by the command launched + on the remote machine, and put it in the log directory of the user, + so they can be accessible from + """ + # Do not get the files if the command is not finished if not self.has_finished(): msg = _("Trying to get log files whereas the job is not finished.") self.logger.write(src.printcolors.printcWarning(msg)) return + # First get the file that contains the list of log files to get tmp_file_path = src.get_tmp_filename(self.config, "list_log_files.txt") + remote_path = os.path.join(self.machine.sat_path, "list_log_files.txt") self.machine.sftp.get( - os.path.join(self.machine.sat_path, "list_log_files.txt"), + remote_path, tmp_file_path) + # Read the file and get the result of the command and all the log files + # to get fstream_tmp = open(tmp_file_path, "r") file_lines = fstream_tmp.readlines() file_lines = [line.replace("\n", "") for line in file_lines] fstream_tmp.close() os.remove(tmp_file_path) - self.res_job = file_lines[0] - for job_path_remote in file_lines[1:]: - if os.path.basename(os.path.dirname(job_path_remote)) != 'OUT': - local_path = os.path.join(os.path.dirname( - self.logger.logFilePath), - os.path.basename(job_path_remote)) - if not os.path.exists(local_path): - self.machine.sftp.get(job_path_remote, local_path) - else: - local_path = os.path.join(os.path.dirname( - self.logger.logFilePath), - 'OUT', - os.path.basename(job_path_remote)) + + try : + # The first line is the result of the command (0 success or 1 fail) + self.res_job = file_lines[0] + except Exception as e: + self.err += _("Unable to get status from remote file %s: %s" % + (remote_path, str(e))) + + for i, job_path_remote in enumerate(file_lines[1:]): + try: + # For each command, there is two files to get : + # 1- The xml file describing the command and giving the + # internal traces. + # 2- The txt file containing the system command traces (like + # traces produced by the "make" command) + # 3- In case of the test command, there is another file to get : + # the xml board that contain the test results + dirname = os.path.basename(os.path.dirname(job_path_remote)) + if dirname != 'OUT' and dirname != 'TEST': + # Case 1- + local_path = os.path.join(os.path.dirname( + self.logger.logFilePath), + os.path.basename(job_path_remote)) + if i==0: # The first is the job command + self.logger.add_link(os.path.basename(job_path_remote), + "job", + self.res_job, + self.command) + elif dirname == 'OUT': + # Case 2- + local_path = os.path.join(os.path.dirname( + self.logger.logFilePath), + 'OUT', + os.path.basename(job_path_remote)) + elif dirname == 'TEST': + # Case 3- + local_path = os.path.join(os.path.dirname( + self.logger.logFilePath), + 'TEST', + os.path.basename(job_path_remote)) + + # Get the file if not os.path.exists(local_path): self.machine.sftp.get(job_path_remote, local_path) - self.remote_log_files.append(local_path) + self.remote_log_files.append(local_path) + except Exception as e: + self.err += _("Unable to get %s log file from remote: %s" % + (str(job_path_remote), + str(e))) def has_failed(self): '''Returns True if the job has failed. @@ -372,8 +444,8 @@ class job(object): self._has_begun = True self._has_finished = True self.cancelled = True - self.out = _("This job was not launched because its father has failed.") - self.err = _("This job was not launched because its father has failed.") + self.out += _("This job was not launched because its father has failed.") + self.err += _("This job was not launched because its father has failed.") def is_running(self): '''Returns True if the job commands are running @@ -392,12 +464,20 @@ class job(object): return self._has_timouted def time_elapsed(self): + """Get the time elapsed since the job launching + + :return: The number of seconds + :rtype: int + """ if not self.has_begun(): return -1 T_now = time.time() return T_now - self._T0 def check_time(self): + """Verify that the job has not exceeded its timeout. + If it has, kill the remote command and consider the job as finished. + """ if not self.has_begun(): return if self.time_elapsed() > self.timeout: @@ -406,21 +486,39 @@ class job(object): self._Tf = time.time() self.get_pids() (out_kill, _) = self.kill_remote_process() - self.out = "TIMEOUT \n" + out_kill.read() - self.err = "TIMEOUT : %s seconds elapsed\n" % str(self.timeout) - + self.out += "TIMEOUT \n" + out_kill.read().decode() + self.err += "TIMEOUT : %s seconds elapsed\n" % str(self.timeout) + try: + self.get_log_files() + except Exception as e: + self.err += _("Unable to get remote log files: %s" % e) + def total_duration(self): + """Give the total duration of the job + + :return: the total duration of the job in seconds + :rtype: int + """ return self._Tf - self._T0 - def run(self, logger): + def run(self): + """Launch the job by executing the remote command. + """ + + # Prevent multiple run if self.has_begun(): - print("Warn the user that a job can only be launched one time") + msg = _("Warning: A job can only be launched one time") + msg2 = _("Trying to launch the job \"%s\" whereas it has " + "already been launched." % self.name) + self.logger.write(src.printcolors.printcWarning("%s\n%s\n" % (msg, + msg2))) return - if not self.machine.successfully_connected(logger): + # Do not execute the command if the machine could not be reached + if not self.machine.successfully_connected(self.logger): self._has_finished = True self.out = "N\A" - self.err = ("Connection to machine (name : %s, host: %s, port:" + self.err += ("Connection to machine (name : %s, host: %s, port:" " %s, user: %s) has failed\nUse the log command " "to get more information." % (self.machine.name, @@ -428,49 +526,58 @@ class job(object): self.machine.port, self.machine.user)) else: + # Usual case : Launch the command on remote machine self._T0 = time.time() self._stdin, self._stdout, self._stderr = self.machine.exec_command( - self.command, logger) + self.command, + self.logger) + # If the results are not initialized, finish the job if (self._stdin, self._stdout, self._stderr) == (None, None, None): self._has_finished = True self._Tf = time.time() - self.out = "N\A" - self.err = "The server failed to execute the command" + self.out += "N\A" + self.err += "The server failed to execute the command" + # Put the beginning flag to true. self._has_begun = True - def write_results(self, logger): - logger.write("name : " + self.name + "\n") + def write_results(self): + """Display on the terminal all the job's information + """ + self.logger.write("name : " + self.name + "\n") if self.after: - logger.write("after : %s\n" % self.after) - logger.write("Time elapsed : %4imin %2is \n" % - (self.total_duration()/60 , self.total_duration()%60)) + self.logger.write("after : %s\n" % self.after) + self.logger.write("Time elapsed : %4imin %2is \n" % + (self.total_duration()//60 , self.total_duration()%60)) if self._T0 != -1: - logger.write("Begin time : %s\n" % + self.logger.write("Begin time : %s\n" % time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(self._T0)) ) if self._Tf != -1: - logger.write("End time : %s\n\n" % + self.logger.write("End time : %s\n\n" % time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(self._Tf)) ) machine_head = "Informations about connection :\n" underline = (len(machine_head) - 2) * "-" - logger.write(src.printcolors.printcInfo(machine_head+underline+"\n")) - self.machine.write_info(logger) + self.logger.write(src.printcolors.printcInfo( + machine_head+underline+"\n")) + self.machine.write_info(self.logger) - logger.write(src.printcolors.printcInfo("out : \n")) - if self.out is None: - logger.write("Unable to get output\n") + self.logger.write(src.printcolors.printcInfo("out : \n")) + if self.out == "": + self.logger.write("Unable to get output\n") else: - logger.write(self.out + "\n") - logger.write(src.printcolors.printcInfo("err : \n")) - if self.err is None: - logger.write("Unable to get error\n") - else: - logger.write(self.err + "\n") + self.logger.write(self.out + "\n") + self.logger.write(src.printcolors.printcInfo("err : \n")) + self.logger.write(self.err + "\n") def get_status(self): + """Get the status of the job (used by the Gui for xml display) + + :return: The current status of the job + :rtype: String + """ if not self.machine.successfully_connected(self.logger): return "SSH connection KO" if not self.has_begun(): @@ -493,13 +600,11 @@ class Jobs(object): def __init__(self, runner, logger, - job_file, job_file_path, config_jobs, lenght_columns = 20): # The jobs configuration self.cfg_jobs = config_jobs - self.job_file = job_file self.job_file_path = job_file_path # The machine that will be used today self.lmachines = [] @@ -510,11 +615,9 @@ class Jobs(object): # The jobs to be launched today self.ljobs = [] # The jobs that will not be launched today - self.ljobsdef_not_today = [] + self.ljobs_not_today = [] self.runner = runner self.logger = logger - # The correlation dictionary between jobs and machines - self.dic_job_machine = {} self.len_columns = lenght_columns # the list of jobs that have not been run yet @@ -524,7 +627,7 @@ class Jobs(object): # the list of jobs that are running self._l_jobs_running = [] - self.determine_products_and_machines() + self.determine_jobs_and_machines() def define_job(self, job_def, machine): '''Takes a pyconf job definition and a machine (from class machine) @@ -537,29 +640,31 @@ class Jobs(object): ''' name = job_def.name cmmnds = job_def.commands - timeout = job_def.timeout + if not "timeout" in job_def: + timeout = 4*60*60 # default timeout = 4h + else: + timeout = job_def.timeout after = None if 'after' in job_def: after = job_def.after application = None if 'application' in job_def: application = job_def.application - distribution = None - if 'distribution' in job_def: - distribution = job_def.distribution + board = None + if 'board' in job_def: + board = job_def.board - return job(name, + return Job(name, machine, application, - distribution, + board, cmmnds, timeout, self.runner.cfg, self.logger, - self.job_file, after = after) - def determine_products_and_machines(self): + def determine_jobs_and_machines(self): '''Function that reads the pyconf jobs definition and instantiates all the machines and jobs to be done today. @@ -570,57 +675,60 @@ class Jobs(object): host_list = [] for job_def in self.cfg_jobs.jobs : - if today in job_def.when: - - name_machine = job_def.machine - - a_machine = None - for mach in self.lmachines: - if mach.name == name_machine: - a_machine = mach - break - if a_machine == None: - for machine_def in self.cfg_jobs.machines: - if machine_def.name == name_machine: - if 'host' not in machine_def: - host = self.runner.cfg.VARS.hostname - else: - host = machine_def.host + if not "machine" in job_def: + msg = _('WARNING: The job "%s" do not have the key ' + '"machine", this job is ignored.\n\n' % job_def.name) + self.logger.write(src.printcolors.printcWarning(msg)) + continue + name_machine = job_def.machine + + a_machine = None + for mach in self.lmachines: + if mach.name == name_machine: + a_machine = mach + break + + if a_machine == None: + for machine_def in self.cfg_jobs.machines: + if machine_def.name == name_machine: + if 'host' not in machine_def: + host = self.runner.cfg.VARS.hostname + else: + host = machine_def.host - if 'user' not in machine_def: - user = self.runner.cfg.VARS.user - else: - user = machine_def.user + if 'user' not in machine_def: + user = self.runner.cfg.VARS.user + else: + user = machine_def.user - if 'port' not in machine_def: - port = 22 - else: - port = machine_def.port - - if 'password' not in machine_def: - passwd = None - else: - passwd = machine_def.password - - if 'sat_path' not in machine_def: - sat_path = "salomeTools" - else: - sat_path = machine_def.sat_path - - a_machine = machine( - machine_def.name, - host, - user, - port=port, - passwd=passwd, - sat_path=sat_path - ) + if 'port' not in machine_def: + port = 22 + else: + port = machine_def.port + + if 'password' not in machine_def: + passwd = None + else: + passwd = machine_def.password - if (host, port) not in host_list: - host_list.append((host, port)) - - self.lmachines.append(a_machine) + if 'sat_path' not in machine_def: + sat_path = "salomeTools" + else: + sat_path = machine_def.sat_path + + a_machine = Machine( + machine_def.name, + host, + user, + port=port, + passwd=passwd, + sat_path=sat_path + ) + + self.lmachines.append(a_machine) + if (host, port) not in host_list: + host_list.append((host, port)) if a_machine == None: msg = _("WARNING: The job \"%(job_name)s\" requires the " @@ -629,13 +737,13 @@ class Jobs(object): "The job will not be launched") self.logger.write(src.printcolors.printcWarning(msg)) - a_job = self.define_job(job_def, a_machine) - self.dic_job_machine[a_job] = a_machine + a_job = self.define_job(job_def, a_machine) + if today in job_def.when: self.ljobs.append(a_job) else: # today in job_def.when - self.ljobsdef_not_today.append(job_def) - + self.ljobs_not_today.append(a_job) + self.lhosts = host_list def ssh_connection_all_machines(self, pad=50): @@ -669,6 +777,13 @@ class Jobs(object): self.logger.flush() res_copy = machine.copy_sat(self.runner.cfg.VARS.salometoolsway, self.job_file_path) + # get the remote machine distribution using a sat command + (__, out_dist, __) = machine.exec_command( + os.path.join(machine.sat_path, + "sat config --value VARS.dist --no_label"), + self.logger) + machine.distribution = out_dist.read().decode().replace("\n", + "") # Print the status of the copy if res_copy == 0: self.logger.write('\r%s' % @@ -709,7 +824,7 @@ class Jobs(object): ''' host = hostname[0] port = hostname[1] - for jb in self.dic_job_machine: + for jb in self.ljobs: if jb.machine.host == host and jb.machine.port == port: if jb.is_running(): return jb @@ -724,7 +839,7 @@ class Jobs(object): ''' jobs_finished_list = [] jobs_running_list = [] - for jb in self.dic_job_machine: + for jb in self.ljobs: if jb.is_running(): jobs_running_list.append(jb) jb.check_time() @@ -750,7 +865,7 @@ class Jobs(object): if job.after is None: continue father_job = self.find_job_that_has_name(job.after) - if father_job.has_failed(): + if father_job is not None and father_job.has_failed(): job.cancel() def find_job_that_has_name(self, name): @@ -763,10 +878,8 @@ class Jobs(object): for jb in self.ljobs: if jb.name == name: return jb - # the following is executed only if the job was not found - msg = _('The job "%s" seems to be nonexistent') % name - raise src.SatException(msg) + return None def str_of_length(self, text, length): '''Takes a string text of any length and returns @@ -781,8 +894,8 @@ class Jobs(object): text_out = text[:length-3] + '...' else: diff = length - len(text) - before = " " * (diff/2) - after = " " * (diff/2 + diff%2) + before = " " * (diff//2) + after = " " * (diff//2 + diff%2) text_out = before + text + after return text_out @@ -843,8 +956,8 @@ class Jobs(object): self.logger.flush() # The infinite loop that runs the jobs - l_jobs_not_started = self.dic_job_machine.keys() - while len(self._l_jobs_finished) != len(self.dic_job_machine.keys()): + l_jobs_not_started = src.deepcopy_list(self.ljobs) + while len(self._l_jobs_finished) != len(self.ljobs): new_job_start = False for host_port in self.lhosts: @@ -855,14 +968,21 @@ class Jobs(object): if (jb.machine.host, jb.machine.port) != host_port: continue if jb.after == None: - jb.run(self.logger) + jb.run() l_jobs_not_started.remove(jb) new_job_start = True break else: - jb_before = self.find_job_that_has_name(jb.after) + jb_before = self.find_job_that_has_name(jb.after) + if jb_before is None: + jb.cancel() + msg = _("This job was not launched because its " + "father is not in the jobs list.") + jb.out = msg + jb.err = msg + break if jb_before.has_finished(): - jb.run(self.logger) + jb.run() l_jobs_not_started.remove(jb) new_job_start = True break @@ -870,7 +990,8 @@ class Jobs(object): new_job_finished = self.update_jobs_states_list() if new_job_start or new_job_finished: - self.gui.update_xml_file(self.ljobs) + if self.gui: + self.gui.update_xml_files(self.ljobs) # Display the current status self.display_status(self.len_columns) @@ -881,8 +1002,9 @@ class Jobs(object): self.logger.write(tiret_line) self.logger.write("\n\n") - self.gui.update_xml_file(self.ljobs) - self.gui.last_update() + if self.gui: + self.gui.update_xml_files(self.ljobs) + self.gui.last_update() def write_all_results(self): '''Display all the jobs outputs. @@ -891,151 +1013,284 @@ class Jobs(object): :rtype: N\A ''' - for jb in self.dic_job_machine.keys(): + for jb in self.ljobs: self.logger.write(src.printcolors.printcLabel( "#------- Results for job %s -------#\n" % jb.name)) - jb.write_results(self.logger) + jb.write_results() self.logger.write("\n\n") class Gui(object): '''Class to manage the the xml data that can be displayed in a browser to see the jobs states ''' - - """ - - - - - - - - - - - - - - - - - - - - - is228809 - 2200 - SALOME-7.8.0 - adminuser - 240 - - export DISPLAY=is221560 - scp -p salome@is221560.intra.cea.fr:/export/home/salome/SALOME-7.7.1p1-src.tgz /local/adminuser - tar xf /local/adminuser/SALOME-7.7.1p1-src.tgz -C /local/adminuser - - Not launched - - - - is221560 - 22 - SALOME-master - salome - 240 - - export DISPLAY=is221560 - scp -p salome@is221560.intra.cea.fr:/export/home/salome/SALOME-7.7.1p1-src.tgz /local/adminuser - sat prepare SALOME-master - sat compile SALOME-master - sat check SALOME-master - sat launcher SALOME-master - sat test SALOME-master - - Running since 23 min - - - - 10/05/2016 20h32 - 10/05/2016 22h59 - - - - - - """ - - def __init__(self, xml_file_path, l_jobs, l_jobs_not_today, stylesheet): - # The path of the xml file - self.xml_file_path = xml_file_path - # The stylesheet - self.stylesheet = stylesheet - # Open the file in a writing stream - self.xml_file = src.xmlManager.XmlLogFile(xml_file_path, "JobsReport") + + def __init__(self, xml_dir_path, l_jobs, l_jobs_not_today, file_boards=""): + '''Initialization + + :param xml_dir_path str: The path to the directory where to put + the xml resulting files + :param l_jobs List: the list of jobs that run today + :param l_jobs_not_today List: the list of jobs that do not run today + :param file_boards str: the file path from which to read the + expected boards + ''' + # The path of the csv files to read to fill the expected boards + self.file_boards = file_boards + + today = datetime.date.weekday(datetime.date.today()) + self.parse_csv_boards(today) + + # The path of the global xml file + self.xml_dir_path = xml_dir_path + # Initialize the xml files + xml_global_path = os.path.join(self.xml_dir_path, "global_report.xml") + self.xml_global_file = src.xmlManager.XmlLogFile(xml_global_path, + "JobsReport") + # The xml files that corresponds to the boards. + # {name_board : xml_object}} + self.d_xml_board_files = {} # Create the lines and columns - self.initialize_array(l_jobs, l_jobs_not_today) - # Write the wml file - self.update_xml_file(l_jobs) + self.initialize_boards(l_jobs, l_jobs_not_today) + + # Write the xml file + self.update_xml_files(l_jobs) - def initialize_array(self, l_jobs, l_jobs_not_today): - l_dist = [] - l_applications = [] - for job in l_jobs: - distrib = job.distribution - if distrib is not None and distrib not in l_dist: - l_dist.append(distrib) + def add_xml_board(self, name): + xml_board_path = os.path.join(self.xml_dir_path, name + ".xml") + self.d_xml_board_files[name] = src.xmlManager.XmlLogFile( + xml_board_path, + "JobsReport") + self.d_xml_board_files[name].add_simple_node("distributions") + self.d_xml_board_files[name].add_simple_node("applications") + self.d_xml_board_files[name].add_simple_node("board", text=name) + + def initialize_boards(self, l_jobs, l_jobs_not_today): + '''Get all the first information needed for each file and write the + first version of the files + :param l_jobs List: the list of jobs that run today + :param l_jobs_not_today List: the list of jobs that do not run today + ''' + # Get the boards to fill and put it in a dictionary + # {board_name : xml instance corresponding to the board} + for job in l_jobs + l_jobs_not_today: + board = job.board + if (board is not None and + board not in self.d_xml_board_files.keys()): + self.add_xml_board(board) + + # Verify that the boards given as input are done + for board in list(self.d_input_boards.keys()): + if board not in self.d_xml_board_files: + self.add_xml_board(board) + root_node = self.d_xml_board_files[board].xmlroot + src.xmlManager.append_node_attrib(root_node, + {"input_file" : self.file_boards}) + + # Loop over all jobs in order to get the lines and columns for each + # xml file + d_dist = {} + d_application = {} + for board in self.d_xml_board_files: + d_dist[board] = [] + d_application[board] = [] + + l_hosts_ports = [] + + for job in l_jobs + l_jobs_not_today: + if (job.machine.host, job.machine.port) not in l_hosts_ports: + l_hosts_ports.append((job.machine.host, job.machine.port)) + + distrib = job.machine.distribution application = job.application - if application is not None and application not in l_applications: - l_applications.append(application) + + board_job = job.board + if board is None: + continue + for board in self.d_xml_board_files: + if board_job == board: + if distrib is not None and distrib not in d_dist[board]: + d_dist[board].append(distrib) + src.xmlManager.add_simple_node( + self.d_xml_board_files[board].xmlroot.find( + 'distributions'), + "dist", + attrib={"name" : distrib}) + + if board_job == board: + if (application is not None and + application not in d_application[board]): + d_application[board].append(application) + src.xmlManager.add_simple_node( + self.d_xml_board_files[board].xmlroot.find( + 'applications'), + "application", + attrib={ + "name" : application}) - for job_def in l_jobs_not_today: - distrib = src.get_cfg_param(job_def, "distribution", "nothing") - if distrib is not "nothing" and distrib not in l_dist: - l_dist.append(distrib) + # Verify that there are no missing application or distribution in the + # xml board files (regarding the input boards) + for board in self.d_xml_board_files: + l_dist = d_dist[board] + if board not in self.d_input_boards.keys(): + continue + for dist in self.d_input_boards[board]["rows"]: + if dist not in l_dist: + src.xmlManager.add_simple_node( + self.d_xml_board_files[board].xmlroot.find( + 'distributions'), + "dist", + attrib={"name" : dist}) + l_appli = d_application[board] + for appli in self.d_input_boards[board]["columns"]: + if appli not in l_appli: + src.xmlManager.add_simple_node( + self.d_xml_board_files[board].xmlroot.find( + 'applications'), + "application", + attrib={"name" : appli}) - application = src.get_cfg_param(job_def, "application", "nothing") - if application is not "nothing" and application not in l_applications: - l_applications.append(application) + # Initialize the hosts_ports node for the global file + self.xmlhosts_ports = self.xml_global_file.add_simple_node( + "hosts_ports") + for host, port in l_hosts_ports: + host_port = "%s:%i" % (host, port) + src.xmlManager.add_simple_node(self.xmlhosts_ports, + "host_port", + attrib={"name" : host_port}) - self.l_dist = l_dist - self.l_applications = l_applications - - # Update the hosts node - self.xmldists = self.xml_file.add_simple_node("distributions") - for dist_name in self.l_dist: - src.xmlManager.add_simple_node(self.xmldists, "dist", attrib={"name" : dist_name}) + # Initialize the jobs node in all files + for xml_file in [self.xml_global_file] + list( + self.d_xml_board_files.values()): + xml_jobs = xml_file.add_simple_node("jobs") + # Get the jobs present in the config file but + # that will not be launched today + self.put_jobs_not_today(l_jobs_not_today, xml_jobs) - # Update the applications node - self.xmlapplications = self.xml_file.add_simple_node("applications") - for application in self.l_applications: - src.xmlManager.add_simple_node(self.xmlapplications, "application", attrib={"name" : application}) - - # Initialize the jobs node - self.xmljobs = self.xml_file.add_simple_node("jobs") - - # - self.put_jobs_not_today(l_jobs_not_today) + xml_file.add_simple_node("infos", + attrib={"name" : "last update", + "JobsCommandStatus" : "running"}) - # Initialize the info node (when generated) - self.xmlinfos = self.xml_file.add_simple_node("infos", attrib={"name" : "last update", "JobsCommandStatus" : "running"}) + # Find in each board the squares that needs to be filled regarding the + # input csv files but that are not covered by a today job + for board in self.d_input_boards.keys(): + xml_root_board = self.d_xml_board_files[board].xmlroot + xml_missing = src.xmlManager.add_simple_node(xml_root_board, + "missing_jobs") + for row, column in self.d_input_boards[board]["jobs"]: + found = False + for job in l_jobs: + if (job.application == column and + job.machine.distribution == row): + found = True + break + if not found: + src.xmlManager.add_simple_node(xml_missing, + "job", + attrib={"distribution" : row, + "application" : column }) - def put_jobs_not_today(self, l_jobs_not_today): - for job_def in l_jobs_not_today: - xmlj = src.xmlManager.add_simple_node(self.xmljobs, "job", attrib={"name" : job_def.name}) - src.xmlManager.add_simple_node(xmlj, "application", src.get_cfg_param(job_def, "application", "nothing")) - src.xmlManager.add_simple_node(xmlj, "distribution", src.get_cfg_param(job_def, "distribution", "nothing")) - src.xmlManager.add_simple_node(xmlj, "commands", " ; ".join(job_def.commands)) - src.xmlManager.add_simple_node(xmlj, "state", "Not today") + def put_jobs_not_today(self, l_jobs_not_today, xml_node_jobs): + '''Get all the first information needed for each file and write the + first version of the files + + :param xml_node_jobs etree.Element: the node corresponding to a job + :param l_jobs_not_today List: the list of jobs that do not run today + ''' + for job in l_jobs_not_today: + xmlj = src.xmlManager.add_simple_node(xml_node_jobs, + "job", + attrib={"name" : job.name}) + src.xmlManager.add_simple_node(xmlj, "application", job.application) + src.xmlManager.add_simple_node(xmlj, + "distribution", + job.machine.distribution) + src.xmlManager.add_simple_node(xmlj, "board", job.board) + src.xmlManager.add_simple_node(xmlj, + "commands", " ; ".join(job.commands)) + src.xmlManager.add_simple_node(xmlj, "state", "Not today") + src.xmlManager.add_simple_node(xmlj, "machine", job.machine.name) + src.xmlManager.add_simple_node(xmlj, "host", job.machine.host) + src.xmlManager.add_simple_node(xmlj, "port", str(job.machine.port)) + src.xmlManager.add_simple_node(xmlj, "user", job.machine.user) + src.xmlManager.add_simple_node(xmlj, "sat_path", + job.machine.sat_path) + + def parse_csv_boards(self, today): + """ Parse the csv file that describes the boards to produce and fill + the dict d_input_boards that contain the csv file contain - def update_xml_file(self, l_jobs): + :param today int: the current day of the week + """ + # open the csv file and read its content + l_read = [] + with open(self.file_boards, 'r') as f: + reader = csv.reader(f,delimiter=CSV_DELIMITER) + for row in reader: + l_read.append(row) + # get the delimiter for the boards (empty line) + boards_delimiter = [''] * len(l_read[0]) + # Make the list of boards, by splitting with the delimiter + l_boards = [list(y) for x, y in itertools.groupby(l_read, + lambda z: z == boards_delimiter) if not x] + + # loop over the csv lists of lines and get the rows, columns and jobs + d_boards = {} + for input_board in l_boards: + # get board name + board_name = input_board[0][0] + + # Get columns list + columns = input_board[0][1:] + + rows = [] + jobs = [] + for line in input_board[1:]: + row = line[0] + for i, square in enumerate(line[1:]): + if square=='': + continue + days = square.split(DAYS_SEPARATOR) + days = [int(day) for day in days] + if today in days: + if row not in rows: + rows.append(row) + job = (row, columns[i]) + jobs.append(job) + + d_boards[board_name] = {"rows" : rows, + "columns" : columns, + "jobs" : jobs} + self.d_input_boards = d_boards + + def update_xml_files(self, l_jobs): + '''Write all the xml files with updated information about the jobs + + :param l_jobs List: the list of jobs that run today + ''' + for xml_file in [self.xml_global_file] + list( + self.d_xml_board_files.values()): + self.update_xml_file(l_jobs, xml_file) + + # Write the file + self.write_xml_files() + + def update_xml_file(self, l_jobs, xml_file): + '''update information about the jobs for the file xml_file + + :param l_jobs List: the list of jobs that run today + :param xml_file xmlManager.XmlLogFile: the xml instance to update + ''' + + xml_node_jobs = xml_file.xmlroot.find('jobs') # Update the job names and status node for job in l_jobs: # Find the node corresponding to the job and delete it # in order to recreate it - for xmljob in self.xmljobs.findall('job'): + for xmljob in xml_node_jobs.findall('job'): if xmljob.attrib['name'] == job.name: - self.xmljobs.remove(xmljob) + xml_node_jobs.remove(xmljob) T0 = str(job._T0) if T0 != "-1": @@ -1047,25 +1302,38 @@ class Gui(object): time.localtime(job._Tf)) # recreate the job node - xmlj = src.xmlManager.add_simple_node(self.xmljobs, "job", attrib={"name" : job.name}) + xmlj = src.xmlManager.add_simple_node(xml_node_jobs, + "job", + attrib={"name" : job.name}) + src.xmlManager.add_simple_node(xmlj, "machine", job.machine.name) src.xmlManager.add_simple_node(xmlj, "host", job.machine.host) src.xmlManager.add_simple_node(xmlj, "port", str(job.machine.port)) src.xmlManager.add_simple_node(xmlj, "user", job.machine.user) - src.xmlManager.add_simple_node(xmlj, "sat_path", job.machine.sat_path) + src.xmlManager.add_simple_node(xmlj, "sat_path", + job.machine.sat_path) src.xmlManager.add_simple_node(xmlj, "application", job.application) - src.xmlManager.add_simple_node(xmlj, "distribution", job.distribution) + src.xmlManager.add_simple_node(xmlj, "distribution", + job.machine.distribution) + src.xmlManager.add_simple_node(xmlj, "board", job.board) src.xmlManager.add_simple_node(xmlj, "timeout", str(job.timeout)) - src.xmlManager.add_simple_node(xmlj, "commands", " ; ".join(job.commands)) + src.xmlManager.add_simple_node(xmlj, "commands", + " ; ".join(job.commands)) src.xmlManager.add_simple_node(xmlj, "state", job.get_status()) src.xmlManager.add_simple_node(xmlj, "begin", T0) src.xmlManager.add_simple_node(xmlj, "end", Tf) - src.xmlManager.add_simple_node(xmlj, "out", src.printcolors.cleancolor(job.out)) - src.xmlManager.add_simple_node(xmlj, "err", src.printcolors.cleancolor(job.err)) + src.xmlManager.add_simple_node(xmlj, "out", + src.printcolors.cleancolor(job.out)) + src.xmlManager.add_simple_node(xmlj, "err", + src.printcolors.cleancolor(job.err)) src.xmlManager.add_simple_node(xmlj, "res", str(job.res_job)) if len(job.remote_log_files) > 0: - src.xmlManager.add_simple_node(xmlj, "remote_log_file_path", job.remote_log_files[0]) + src.xmlManager.add_simple_node(xmlj, + "remote_log_file_path", + job.remote_log_files[0]) else: - src.xmlManager.add_simple_node(xmlj, "remote_log_file_path", "nothing") + src.xmlManager.add_simple_node(xmlj, + "remote_log_file_path", + "nothing") xmlafter = src.xmlManager.add_simple_node(xmlj, "after", job.after) # get the job father @@ -1074,35 +1342,59 @@ class Gui(object): for jb in l_jobs: if jb.name == job.after: job_father = jb - if job_father is None: - msg = _("The job %(father_name)s that is parent of " - "%(son_name)s is not in the job list." % - {"father_name" : job.after , "son_name" : job.name}) - raise src.SatException(msg) - if len(job_father.remote_log_files) > 0: + if (job_father is not None and + len(job_father.remote_log_files) > 0): link = job_father.remote_log_files[0] else: link = "nothing" src.xmlManager.append_node_attrib(xmlafter, {"link" : link}) + # Verify that the job is to be done today regarding the input csv + # files + if job.board and job.board in self.d_input_boards.keys(): + found = False + for dist, appli in self.d_input_boards[job.board]["jobs"]: + if (job.machine.distribution == dist + and job.application == appli): + found = True + src.xmlManager.add_simple_node(xmlj, + "extra_job", + "no") + break + if not found: + src.xmlManager.add_simple_node(xmlj, + "extra_job", + "yes") + # Update the date - src.xmlManager.append_node_attrib(self.xmlinfos, + xml_node_infos = xml_file.xmlroot.find('infos') + src.xmlManager.append_node_attrib(xml_node_infos, attrib={"value" : datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}) - # Write the file - self.write_xml_file() + def last_update(self, finish_status = "finished"): - src.xmlManager.append_node_attrib(self.xmlinfos, - attrib={"JobsCommandStatus" : finish_status}) + '''update information about the jobs for the file xml_file + + :param l_jobs List: the list of jobs that run today + :param xml_file xmlManager.XmlLogFile: the xml instance to update + ''' + for xml_file in [self.xml_global_file] + list(self.d_xml_board_files.values()): + xml_node_infos = xml_file.xmlroot.find('infos') + src.xmlManager.append_node_attrib(xml_node_infos, + attrib={"JobsCommandStatus" : finish_status}) # Write the file - self.write_xml_file() + self.write_xml_files() - def write_xml_file(self): - self.xml_file.write_tree(self.stylesheet) + def write_xml_files(self): + ''' Write the xml files + ''' + self.xml_global_file.write_tree(STYLESHEET_GLOBAL) + for xml_file in self.d_xml_board_files.values(): + xml_file.write_tree(STYLESHEET_BOARD) ## # Describes the command @@ -1116,12 +1408,7 @@ def run(args, runner, logger): (options, args) = parser.parse_args(args) - jobs_cfg_files_dir = runner.cfg.SITE.jobs.config_path - - l_cfg_dir = [jobs_cfg_files_dir, os.path.join(runner.cfg.VARS.datadir, "jobs")] - - # Make sure the path to the jobs config files directory exists - src.ensure_path_exists(jobs_cfg_files_dir) + l_cfg_dir = runner.cfg.PATHS.JOBPATH # list option : display all the available config files if options.list: @@ -1140,7 +1427,8 @@ def run(args, runner, logger): # Make sure the jobs_config option has been called if not options.jobs_cfg: message = _("The option --jobs_config is required\n") - raise src.SatException( message ) + src.printcolors.printcError(message) + return 1 # Find the file in the directories found = False @@ -1166,7 +1454,7 @@ def run(args, runner, logger): (_("File containing the jobs configuration"), file_jobs_cfg) ] src.print_info(logger, info) - + # Read the config that is in the file config_jobs = src.read_config_from_a_file(file_jobs_cfg) if options.only_jobs: @@ -1176,9 +1464,12 @@ def run(args, runner, logger): l_jb.append(jb, "Adding a job that was given in only_jobs option parameters") config_jobs.jobs = l_jb - + # Initialization - today_jobs = Jobs(runner, logger, options.jobs_cfg, file_jobs_cfg, config_jobs) + today_jobs = Jobs(runner, + logger, + file_jobs_cfg, + config_jobs) # SSH connection to all machines today_jobs.ssh_connection_all_machines() if options.test_connection: @@ -1186,7 +1477,34 @@ def run(args, runner, logger): gui = None if options.publish: - gui = Gui("/export/home/serioja/LOGS/test.xml", today_jobs.ljobs, today_jobs.ljobsdef_not_today, "job_report.xsl") + # Copy the stylesheets in the log directory + log_dir = runner.cfg.USER.log_dir + xsl_dir = os.path.join(runner.cfg.VARS.srcDir, 'xsl') + files_to_copy = [] + files_to_copy.append(os.path.join(xsl_dir, STYLESHEET_GLOBAL)) + files_to_copy.append(os.path.join(xsl_dir, STYLESHEET_BOARD)) + files_to_copy.append(os.path.join(xsl_dir, "running.gif")) + for file_path in files_to_copy: + shutil.copy2(file_path, log_dir) + + # Instanciate the Gui in order to produce the xml files that contain all + # the boards + gui = Gui(runner.cfg.USER.log_dir, + today_jobs.ljobs, + today_jobs.ljobs_not_today, + file_boards = options.input_boards) + + # Display the list of the xml files + logger.write(src.printcolors.printcInfo(("Here is the list of published" + " files :\n")), 4) + logger.write("%s\n" % gui.xml_global_file.logFile, 4) + for board in gui.d_xml_board_files.keys(): + file_path = gui.d_xml_board_files[board].logFile + file_name = os.path.basename(file_path) + logger.write("%s\n" % file_path, 4) + logger.add_link(file_name, "board", 0, board) + + logger.write("\n", 4) today_jobs.gui = gui @@ -1198,15 +1516,25 @@ def run(args, runner, logger): interruped = True logger.write("\n\n%s\n\n" % (src.printcolors.printcWarning(_("Forced interruption"))), 1) - finally: + if interruped: + msg = _("Killing the running jobs and trying" + " to get the corresponding logs\n") + logger.write(src.printcolors.printcWarning(msg)) + # find the potential not finished jobs and kill them for jb in today_jobs.ljobs: if not jb.has_finished(): - jb.kill_remote_process() + try: + jb.kill_remote_process() + except Exception as e: + msg = _("Failed to kill job %s: %s\n" % (jb.name, e)) + logger.write(src.printcolors.printcWarning(msg)) if interruped: - today_jobs.gui.last_update(_("Forced interruption")) + if today_jobs.gui: + today_jobs.gui.last_update(_("Forced interruption")) else: - today_jobs.gui.last_update() + if today_jobs.gui: + today_jobs.gui.last_update() # Output the results today_jobs.write_all_results()