From d956938680c84173a623f3f2dc3e27d585e01536 Mon Sep 17 00:00:00 2001 From: Serge Rehbinder Date: Mon, 13 Jun 2016 12:50:44 +0200 Subject: [PATCH] link to remote machine logs in the matrix of jobs command --- commands/config.py | 2 - commands/job.py | 53 +++--- commands/jobs.py | 369 +++++++++++++++++++++++++++-------------- salomeTools.py | 8 + src/printcolors.py | 3 + src/xmlManager.py | 6 + src/xsl/job_report.xsl | 13 +- 7 files changed, 295 insertions(+), 159 deletions(-) diff --git a/commands/config.py b/commands/config.py index fbb4cb9..f839f61 100644 --- a/commands/config.py +++ b/commands/config.py @@ -403,8 +403,6 @@ class ConfigManager: f = open(cfg_name, 'w') user_cfg.__save__(f) f.close() - print(_("You can edit it to configure salomeTools " - "(use: sat config --edit).\n")) return user_cfg diff --git a/commands/job.py b/commands/job.py index cbaf3ea..207c81c 100644 --- a/commands/job.py +++ b/commands/job.py @@ -19,7 +19,6 @@ import os import src -import jobs # Define all possible option for the make command : sat make parser = src.options.Options() @@ -47,40 +46,40 @@ def run(args, runner, logger): jobs_cfg_files_dir = runner.cfg.SITE.jobs.config_path + l_cfg_dir = [jobs_cfg_files_dir, os.path.join(runner.cfg.VARS.datadir, "jobs")] + # Make sure the path to the jobs config files directory exists - if not os.path.exists(jobs_cfg_files_dir): - logger.write(_("Creating directory %s\n") % - src.printcolors.printcLabel(jobs_cfg_files_dir), 1) - os.mkdir(jobs_cfg_files_dir) - + src.ensure_path_exists(jobs_cfg_files_dir) + # Make sure the jobs_config option has been called if not options.jobs_cfg: message = _("The option --jobs_config is required\n") raise src.SatException( message ) - # Make sure the job option has been called - if not options.job: - message = _("The option --job is required\n") - raise src.SatException( message ) - - # Make sure the invoked file exists - file_jobs_cfg = os.path.join(jobs_cfg_files_dir, options.jobs_cfg) - if not file_jobs_cfg.endswith('.pyconf'): - file_jobs_cfg += '.pyconf' + # Find the file in the directories + found = False + for cfg_dir in l_cfg_dir: + file_jobs_cfg = os.path.join(cfg_dir, options.jobs_cfg) + if not file_jobs_cfg.endswith('.pyconf'): + file_jobs_cfg += '.pyconf' - if not os.path.exists(file_jobs_cfg): - message = _("The file %s does not exist.\n") % file_jobs_cfg - logger.write(src.printcolors.printcError(message), 1) - message = _("The possible files are :\n") - logger.write( src.printcolors.printcInfo(message), 1) - for f in sorted(os.listdir(jobs_cfg_files_dir)): - if not f.endswith('.pyconf'): - continue - jobscfgname = f[:-7] - logger.write("%s\n" % jobscfgname) - raise src.SatException( _("No corresponding file") ) + if not os.path.exists(file_jobs_cfg): + continue + else: + found = True + break + + if not found: + msg = _("The file configuration %(name_file)s was not found." + "\nUse the --list option to get the possible files.") + src.printcolors.printcError(msg) + return 1 - jobs.print_info(logger, runner.cfg.VARS.dist, file_jobs_cfg) + info = [ + (_("Platform"), runner.cfg.VARS.dist), + (_("File containing the jobs configuration"), file_jobs_cfg) + ] + src.print_info(logger, info) # Read the config that is in the file config_jobs = src.read_config_from_a_file(file_jobs_cfg) diff --git a/commands/jobs.py b/commands/jobs.py index cda3516..506f0ca 100644 --- a/commands/jobs.py +++ b/commands/jobs.py @@ -17,7 +17,6 @@ # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA import os -import sys import datetime import time import paramiko @@ -47,11 +46,13 @@ parser.add_option('p', 'publish', 'boolean', 'publish', class machine(object): '''Class to manage a ssh connection on a machine ''' - def __init__(self, host, user, port=22, passwd=None): + def __init__(self, name, host, user, port=22, passwd=None, sat_path="./"): + self.name = name self.host = host self.port = port self.user = user self.password = passwd + self.sat_path = sat_path self.ssh = paramiko.SSHClient() self._connection_successful = None @@ -72,21 +73,18 @@ class machine(object): username=self.user, password = self.password) except paramiko.AuthenticationException: - message = src.KO_STATUS + _(": authentication failed\n") - logger.write( src.printcolors.printcError(message)) + message = src.KO_STATUS + _("Authentication failed") except paramiko.BadHostKeyException: message = (src.KO_STATUS + - _(": the server's host key could not be verified\n")) - logger.write( src.printcolors.printcError(message)) + _("The server's host key could not be verified")) except paramiko.SSHException: - message = (src.KO_STATUS + - _(": error connecting or establishing an SSH session\n")) - logger.write( src.printcolors.printcError(message)) + message = ( _("SSHException error connecting or establishing an SSH session")) except: - logger.write( src.printcolors.printcError(src.KO_STATUS + '\n')) + message = ( _("Error connecting or establishing an SSH session")) else: self._connection_successful = True - logger.write( src.printcolors.printcSuccess(src.OK_STATUS) + '\n') + message = "" + return message def successfully_connected(self, logger): '''Verify if the connection to the remote machine has succeed @@ -102,14 +100,58 @@ class machine(object): (machine.host, machine.port, machine.user) logger.write( src.printcolors.printcWarning(message)) return self._connection_successful - - - def close(self): - '''Close the ssh connection + + def copy_sat(self, sat_local_path, job_file): + '''Copy salomeTools to the remote machine in self.sat_path + ''' + res = 0 + try: + self.sftp = self.ssh.open_sftp() + self.mkdir(self.sat_path, ignore_existing=True) + self.put_dir(sat_local_path, self.sat_path, filters = ['.git']) + job_file_name = os.path.basename(job_file) + self.sftp.put(job_file, os.path.join(self.sat_path, "data", "jobs", job_file_name)) + except Exception as e: + res = str(e) + self._connection_successful = False - :rtype: N\A + return res + + def put_dir(self, source, target, filters = []): + ''' Uploads the contents of the source directory to the target path. The + target directory needs to exists. All subdirectories in source are + created under target. ''' - self.ssh.close() + for item in os.listdir(source): + if item in filters: + continue + source_path = os.path.join(source, item) + destination_path = os.path.join(target, item) + if os.path.islink(source_path): + linkto = os.readlink(source_path) + try: + self.sftp.remove(destination_path) + self.sftp.symlink(linkto, destination_path) + self.sftp.chmod(destination_path, os.stat(source_path).st_mode) + except IOError: + pass + else: + if os.path.isfile(source_path): + self.sftp.put(source_path, destination_path) + self.sftp.chmod(destination_path, os.stat(source_path).st_mode) + else: + self.mkdir(destination_path, ignore_existing=True) + self.put_dir(source_path, destination_path) + + def mkdir(self, path, mode=511, ignore_existing=False): + ''' Augments mkdir by adding an option to not fail if the folder exists ''' + try: + self.sftp.mkdir(path, mode) + except IOError: + if ignore_existing: + pass + else: + raise def exec_command(self, command, logger): '''Execute the command on the remote machine @@ -134,7 +176,14 @@ class machine(object): return (None, None, None) else: return (stdin, stdout, stderr) - + + def close(self): + '''Close the ssh connection + + :rtype: N\A + ''' + self.ssh.close() + def write_info(self, logger): '''Prints the informations relative to the machine in the logger (terminal traces and log file) @@ -156,7 +205,8 @@ class machine(object): class job(object): '''Class to manage one job ''' - def __init__(self, name, machine, application, distribution, commands, timeout, logger, after=None): + def __init__(self, name, machine, application, distribution, + commands, timeout, logger, job_file, after=None): self.name = name self.machine = machine @@ -165,6 +215,7 @@ class job(object): self.application = application self.distribution = distribution self.logger = logger + self.remote_log_files = [] self._T0 = -1 self._Tf = -1 @@ -178,16 +229,16 @@ class job(object): self.out = None # Contains something only if the job is finished self.err = None # Contains something only if the job is finished - self.commands = " ; ".join(commands) + self.commands = commands + self.command = os.path.join(self.machine.sat_path, "sat") + " -v1 job --jobs_config " + job_file + " --job " + self.name def get_pids(self): pids = [] - for cmd in self.commands.split(" ; "): - cmd_pid = 'ps aux | grep "' + cmd + '" | awk \'{print $2}\'' - (_, out_pid, _) = self.machine.exec_command(cmd_pid, self.logger) - pids_cmd = out_pid.readlines() - pids_cmd = [str(src.only_numbers(pid)) for pid in pids_cmd] - pids+=pids_cmd + cmd_pid = 'ps aux | grep "sat -v1 job --jobs_config" | awk \'{print $2}\'' + (_, out_pid, _) = self.machine.exec_command(cmd_pid, self.logger) + pids_cmd = out_pid.readlines() + pids_cmd = [str(src.only_numbers(pid)) for pid in pids_cmd] + pids+=pids_cmd return pids def kill_remote_process(self): @@ -230,14 +281,33 @@ class job(object): if self._stdout.channel.closed: self._has_finished = True - # And store the result outputs + # Store the result outputs self.out = self._stdout.read() self.err = self._stderr.read() - # And put end time + # Put end time self._Tf = time.time() + # And get the remote command status and log files + self.get_log_files() return self._has_finished + def get_log_files(self): + if not self.has_finished(): + msg = _("Trying to get log files whereas the job is not finished.") + self.logger.write(src.printcolors.printcWarning(msg)) + return + out_lines = self.out.split("\n") + out_lines = [line for line in out_lines if line != ''] + self.res_job = out_lines[0] + for job_path_remote in out_lines[1:]: + if os.path.basename(os.path.dirname(job_path_remote)) != 'OUT': + local_path = os.path.join(os.path.dirname(self.logger.logFilePath), os.path.basename(job_path_remote)) + self.machine.sftp.get(job_path_remote, local_path) + else: + local_path = os.path.join(os.path.dirname(self.logger.logFilePath), 'OUT', os.path.basename(job_path_remote)) + self.machine.sftp.get(job_path_remote, local_path) + self.remote_log_files.append(local_path) + def is_running(self): '''Returns True if the job commands are running @@ -283,12 +353,12 @@ class job(object): if not self.machine.successfully_connected(logger): self._has_finished = True self.out = "N\A" - self.err = ("Connection to machine (host: %s, port: %s, user: %s) has failed" - % (self.machine.host, self.machine.port, self.machine.user)) + self.err = ("Connection to machine (name : %s, host: %s, port: %s, user: %s) has failed\nUse the log command to get more information." + % (self.machine.name, self.machine.host, self.machine.port, self.machine.user)) else: self._T0 = time.time() self._stdin, self._stdout, self._stderr = self.machine.exec_command( - self.commands, logger) + self.command, logger) if (self._stdin, self._stdout, self._stderr) == (None, None, None): self._has_finished = True self._Tf = time.time() @@ -343,9 +413,11 @@ class job(object): class Jobs(object): '''Class to manage the jobs to be run ''' - def __init__(self, runner, logger, config_jobs, lenght_columns = 20): + def __init__(self, runner, logger, job_file, job_file_path, config_jobs, lenght_columns = 20): # The jobs configuration self.cfg_jobs = config_jobs + self.job_file = job_file + self.job_file_path = job_file_path # The machine that will be used today self.lmachines = [] # The list of machine (hosts, port) that will be used today @@ -391,7 +463,7 @@ class Jobs(object): if 'distribution' in job_def: distribution = job_def.distribution - return job(name, machine, application, distribution, cmmnds, timeout, self.logger, after = after) + return job(name, machine, application, distribution, cmmnds, timeout, self.logger, self.job_file , after = after) def determine_products_and_machines(self): '''Function that reads the pyconf jobs definition and instantiates all @@ -402,42 +474,72 @@ class Jobs(object): ''' today = datetime.date.weekday(datetime.date.today()) host_list = [] - + for job_def in self.cfg_jobs.jobs : - if today in job_def.when: - if 'host' not in job_def: - host = self.runner.cfg.VARS.hostname - else: - host = job_def.host - - if 'port' not in job_def: - port = 22 - else: - port = job_def.port + if today in job_def.when: - if (host, port) not in host_list: - host_list.append((host, port)) + name_machine = job_def.machine - if 'user' not in job_def: - user = self.runner.cfg.VARS.user - else: - user = job_def.user + a_machine = None + for mach in self.lmachines: + if mach.name == name_machine: + a_machine = mach + break - if 'password' not in job_def: - passwd = None - else: - passwd = job_def.password - - a_machine = machine(host, user, port=port, passwd=passwd) + if a_machine == None: + for machine_def in self.cfg_jobs.machines: + if machine_def.name == name_machine: + if 'host' not in machine_def: + host = self.runner.cfg.VARS.hostname + else: + host = machine_def.host + + if 'user' not in machine_def: + user = self.runner.cfg.VARS.user + else: + user = machine_def.user + + if 'port' not in machine_def: + port = 22 + else: + port = machine_def.port - self.lmachines.append(a_machine) + if 'password' not in machine_def: + passwd = None + else: + passwd = machine_def.password + + if 'sat_path' not in machine_def: + sat_path = "./" + else: + sat_path = machine_def.sat_path + + a_machine = machine( + machine_def.name, + host, + user, + port=port, + passwd=passwd, + sat_path=sat_path + ) + + if (host, port) not in host_list: + host_list.append((host, port)) + + self.lmachines.append(a_machine) + if a_machine == None: + msg = _("WARNING: The job \"%(job_name)s\" requires the " + "machine \"%(machine_name)s\" but this machine " + "is not defined in the configuration file.\n" + "The job will not be launched") + self.logger.write(src.printcolors.printcWarning(msg)) + a_job = self.define_job(job_def, a_machine) + self.dic_job_machine[a_job] = a_machine self.ljobs.append(a_job) - - self.dic_job_machine[a_job] = a_machine - + self.lhosts = host_list def ssh_connection_all_machines(self, pad=50): @@ -451,16 +553,37 @@ class Jobs(object): "Establishing connection with all the machines :\n"))) for machine in self.lmachines: # little algorithm in order to display traces - begin_line = ("(host: %s, port: %s, user: %s)" % - (machine.host, machine.port, machine.user)) + begin_line = (_("Connection to %s: " % machine.name)) if pad - len(begin_line) < 0: endline = " " else: endline = (pad - len(begin_line)) * "." + " " - self.logger.write( begin_line + endline ) + + step = "SSH connection" + self.logger.write( begin_line + endline + step) self.logger.flush() # the call to the method that initiate the ssh connection - machine.connect(self.logger) + msg = machine.connect(self.logger) + + # Copy salomeTools to the remote machine + if machine.successfully_connected(self.logger): + step = _("Copy SAT") + self.logger.write('\r%s%s%s' % (begin_line, endline, 20 * " "), 3) + self.logger.write('\r%s%s%s' % (begin_line, endline, step), 3) + self.logger.flush() + res_copy = machine.copy_sat(self.runner.cfg.VARS.salometoolsway, self.job_file_path) + # Print the status of the copy + if res_copy == 0: + self.logger.write('\r%s' % ((len(begin_line)+len(endline)+20) * " "), 3) + self.logger.write('\r%s%s%s' % (begin_line, endline, src.printcolors.printc(src.OK_STATUS)), 3) + else: + self.logger.write('\r%s' % ((len(begin_line)+len(endline)+20) * " "), 3) + self.logger.write('\r%s%s%s %s' % (begin_line, endline, src.printcolors.printc(src.OK_STATUS), _("Copy of SAT failed")), 3) + else: + self.logger.write('\r%s' % ((len(begin_line)+len(endline)+20) * " "), 3) + self.logger.write('\r%s%s%s %s' % (begin_line, endline, src.printcolors.printc(src.KO_STATUS), msg), 3) + self.logger.write("\n", 3) + self.logger.write("\n") @@ -778,13 +901,16 @@ class Gui(object): src.xmlManager.add_simple_node(xmlj, "application", job.application) src.xmlManager.add_simple_node(xmlj, "distribution", job.distribution) src.xmlManager.add_simple_node(xmlj, "timeout", str(job.timeout)) - src.xmlManager.add_simple_node(xmlj, "commands", job.commands) + src.xmlManager.add_simple_node(xmlj, "commands", " ; ".join(job.commands)) src.xmlManager.add_simple_node(xmlj, "state", job.get_status()) src.xmlManager.add_simple_node(xmlj, "begin", str(job._T0)) src.xmlManager.add_simple_node(xmlj, "end", str(job._Tf)) - src.xmlManager.add_simple_node(xmlj, "out", job.out) - src.xmlManager.add_simple_node(xmlj, "err", job.err) - + src.xmlManager.add_simple_node(xmlj, "out", src.printcolors.cleancolor(job.out)) + src.xmlManager.add_simple_node(xmlj, "err", src.printcolors.cleancolor(job.err)) + if len(job.remote_log_files) > 0: + src.xmlManager.add_simple_node(xmlj, "remote_log_file_path", job.remote_log_files[0]) + else: + src.xmlManager.add_simple_node(xmlj, "remote_log_file_path", "nothing") # Update the date src.xmlManager.append_node_attrib(self.xmlinfos, attrib={"value" : @@ -793,37 +919,15 @@ class Gui(object): # Write the file self.write_xml_file() - def last_update(self): + def last_update(self, finish_status = "finished"): src.xmlManager.append_node_attrib(self.xmlinfos, - attrib={"JobsCommandStatus" : "finished"}) + attrib={"JobsCommandStatus" : finish_status}) # Write the file self.write_xml_file() def write_xml_file(self): self.xml_file.write_tree(self.stylesheet) -def print_info(logger, arch, JobsFilePath): - '''Prints information header.. - - :param logger src.logger.Logger: The logger instance - :param arch str: a string that gives the architecture of the machine on - which the command is launched - :param JobsFilePath str: The path of the file - that contains the jobs configuration - :return: Nothing - :rtype: N\A - ''' - info = [ - (_("Platform"), arch), - (_("File containing the jobs configuration"), JobsFilePath) - ] - - smax = max(map(lambda l: len(l[0]), info)) - for i in info: - sp = " " * (smax - len(i[0])) - src.printcolors.print_value(logger, sp + i[0], i[1], 2) - logger.write("\n", 2) - ## # Describes the command def description(): @@ -833,29 +937,28 @@ def description(): ## # Runs the command. def run(args, runner, logger): + (options, args) = parser.parse_args(args) jobs_cfg_files_dir = runner.cfg.SITE.jobs.config_path + l_cfg_dir = [jobs_cfg_files_dir, os.path.join(runner.cfg.VARS.datadir, "jobs")] + # Make sure the path to the jobs config files directory exists - if not os.path.exists(jobs_cfg_files_dir): - logger.write(_("Creating directory %s\n") % - src.printcolors.printcLabel(jobs_cfg_files_dir), 1) - os.mkdir(jobs_cfg_files_dir) - + src.ensure_path_exists(jobs_cfg_files_dir) + # list option : display all the available config files if options.list: - lcfiles = [] - if not options.no_label: - sys.stdout.write("------ %s\n" % - src.printcolors.printcHeader(jobs_cfg_files_dir)) - - for f in sorted(os.listdir(jobs_cfg_files_dir)): - if not f.endswith('.pyconf'): - continue - cfilename = f[:-7] - lcfiles.append(cfilename) - sys.stdout.write("%s\n" % cfilename) + for cfg_dir in l_cfg_dir: + if not options.no_label: + logger.write("------ %s\n" % + src.printcolors.printcHeader(cfg_dir)) + + for f in sorted(os.listdir(cfg_dir)): + if not f.endswith('.pyconf'): + continue + cfilename = f[:-7] + logger.write("%s\n" % cfilename) return 0 # Make sure the jobs_config option has been called @@ -863,24 +966,30 @@ def run(args, runner, logger): message = _("The option --jobs_config is required\n") raise src.SatException( message ) - # Make sure the invoked file exists - file_jobs_cfg = os.path.join(jobs_cfg_files_dir, options.jobs_cfg) - if not file_jobs_cfg.endswith('.pyconf'): - file_jobs_cfg += '.pyconf' - - if not os.path.exists(file_jobs_cfg): - message = _("The file %s does not exist.\n") % file_jobs_cfg - logger.write(src.printcolors.printcError(message), 1) - message = _("The possible files are :\n") - logger.write( src.printcolors.printcInfo(message), 1) - for f in sorted(os.listdir(jobs_cfg_files_dir)): - if not f.endswith('.pyconf'): - continue - jobscfgname = f[:-7] - sys.stdout.write("%s\n" % jobscfgname) - raise src.SatException( _("No corresponding file") ) + # Find the file in the directories + found = False + for cfg_dir in l_cfg_dir: + file_jobs_cfg = os.path.join(cfg_dir, options.jobs_cfg) + if not file_jobs_cfg.endswith('.pyconf'): + file_jobs_cfg += '.pyconf' + + if not os.path.exists(file_jobs_cfg): + continue + else: + found = True + break - print_info(logger, runner.cfg.VARS.dist, file_jobs_cfg) + if not found: + msg = _("The file configuration %(name_file)s was not found." + "\nUse the --list option to get the possible files.") + src.printcolors.printcError(msg) + return 1 + + info = [ + (_("Platform"), runner.cfg.VARS.dist), + (_("File containing the jobs configuration"), file_jobs_cfg) + ] + src.print_info(logger, info) # Read the config that is in the file config_jobs = src.read_config_from_a_file(file_jobs_cfg) @@ -893,7 +1002,7 @@ def run(args, runner, logger): config_jobs.jobs = l_jb # Initialization - today_jobs = Jobs(runner, logger, config_jobs) + today_jobs = Jobs(runner, logger, options.jobs_cfg, file_jobs_cfg, config_jobs) # SSH connection to all machines today_jobs.ssh_connection_all_machines() if options.test_connection: @@ -905,17 +1014,23 @@ def run(args, runner, logger): today_jobs.gui = gui + interruped = False try: # Run all the jobs contained in config_jobs today_jobs.run_jobs() except KeyboardInterrupt: + interruped = True logger.write("\n\n%s\n\n" % (src.printcolors.printcWarning(_("Forced interruption"))), 1) + finally: # find the potential not finished jobs and kill them for jb in today_jobs.ljobs: if not jb.has_finished(): jb.kill_remote_process() - + if interruped: + today_jobs.gui.last_update(_("Forced interruption")) + else: + today_jobs.gui.last_update() # Output the results today_jobs.write_all_results() diff --git a/salomeTools.py b/salomeTools.py index e384fc4..9edcf15 100755 --- a/salomeTools.py +++ b/salomeTools.py @@ -137,6 +137,14 @@ class Sat(object): ''' # loop on the commands name for nameCmd in lCommand: + + # Exception for the jobs command that require the paramiko module + if nameCmd == "jobs": + try: + import paramiko + except: + continue + # load the module that has name nameCmd in dirPath (file_, pathname, description) = imp.find_module(nameCmd, [dirPath]) module = imp.load_module(nameCmd, file_, pathname, description) diff --git a/src/printcolors.py b/src/printcolors.py index 68e0a8b..52d41f6 100644 --- a/src/printcolors.py +++ b/src/printcolors.py @@ -136,6 +136,9 @@ def cleancolor(message): :return: The cleaned text. :rtype: str ''' + if message == None: + return message + message = message.replace('\033[0m', '') for i in __code_range__: message = message.replace('\033[%dm' % i, '') diff --git a/src/xmlManager.py b/src/xmlManager.py index 3ef103e..ee3e425 100644 --- a/src/xmlManager.py +++ b/src/xmlManager.py @@ -17,6 +17,12 @@ # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA import os +try: # For python2 + import sys + reload(sys) + sys.setdefaultencoding('utf8') +except: + pass import src from . import ElementTree as etree diff --git a/src/xsl/job_report.xsl b/src/xsl/job_report.xsl index dd9753a..a89e378 100644 --- a/src/xsl/job_report.xsl +++ b/src/xsl/job_report.xsl @@ -145,6 +145,13 @@ javascript:Toggle('')   : + + + remote log + + remote log + +
@@ -162,7 +169,7 @@ - Command status : finished + Command status : @@ -189,9 +196,9 @@

Commands :


-

Out :

+

Out :


-

Err :

+

Err :

-- 2.39.2