From 1f02c81118d5a867fa34056b79b2433620899a10 Mon Sep 17 00:00:00 2001 From: Serge Rehbinder Date: Mon, 27 Jun 2016 11:30:46 +0200 Subject: [PATCH] port jobs command to python3. Get the remote distributions using the ssh in jobs command. Put links to remote commands in the regular log of the jobs command --- commands/compile.py | 18 +----- commands/job.py | 2 +- commands/jobs.py | 89 +++++++++++++++++------------- complete_sat.sh | 4 +- salomeTools.py | 12 ++-- src/__init__.py | 14 ++++- src/internal_config/distrib.pyconf | 1 + src/logger.py | 21 +++++++ 8 files changed, 97 insertions(+), 64 deletions(-) diff --git a/commands/compile.py b/commands/compile.py index 5ab1c50..f0f8afd 100644 --- a/commands/compile.py +++ b/commands/compile.py @@ -202,7 +202,7 @@ def sort_products(config, p_infos): :param config Config: The global configuration :param p_infos list: List of (str, Config) => (product_name, product_info) """ - l_prod_sorted = deepcopy_list(p_infos) + l_prod_sorted = src.deepcopy_list(p_infos) for prod in p_infos: l_fathers = get_recursive_fathers(config, prod, @@ -219,21 +219,9 @@ def sort_products(config, p_infos): break return l_prod_sorted - -def deepcopy_list(input_list): - """ Do a deep copy of a list - - :param input_list List: The list to copy - :return: The copy of the list - :rtype: List - """ - res = [] - for elem in input_list: - res.append(elem) - return res def extend_with_fathers(config, p_infos): - p_infos_res = deepcopy_list(p_infos) + p_infos_res = src.deepcopy_list(p_infos) for p_name_p_info in p_infos: fathers = get_recursive_fathers(config, p_name_p_info, @@ -244,7 +232,7 @@ def extend_with_fathers(config, p_infos): return p_infos_res def extend_with_children(config, p_infos): - p_infos_res = deepcopy_list(p_infos) + p_infos_res = src.deepcopy_list(p_infos) for p_name_p_info in p_infos: children = get_recursive_children(config, p_name_p_info, diff --git a/commands/job.py b/commands/job.py index d8a10a0..ca56a01 100644 --- a/commands/job.py +++ b/commands/job.py @@ -25,7 +25,7 @@ parser = src.options.Options() parser.add_option('j', 'jobs_config', 'string', 'jobs_cfg', _('The name of the config file that contains' ' the jobs configuration')) -parser.add_option('', 'job', 'string', 'job', +parser.add_option('', 'name', 'string', 'job', _('The job name from which to execute commands.'), "") def description(): diff --git a/commands/jobs.py b/commands/jobs.py index a62e410..edbdffc 100644 --- a/commands/jobs.py +++ b/commands/jobs.py @@ -58,6 +58,7 @@ class Machine(object): self.name = name self.host = host self.port = port + self.distribution = None # Will be filled after copying SAT on the machine self.user = user self.password = passwd self.sat_path = sat_path @@ -115,9 +116,14 @@ class Machine(object): ''' res = 0 try: + # open a sftp connection self.sftp = self.ssh.open_sftp() + # Create the sat directory on remote machine if it is not existing self.mkdir(self.sat_path, ignore_existing=True) + # Put sat self.put_dir(sat_local_path, self.sat_path, filters = ['.git']) + # put the job configuration file in order to make it reachable + # on the remote machine job_file_name = os.path.basename(job_file) self.sftp.put(job_file, os.path.join(self.sat_path, "data", @@ -220,7 +226,7 @@ class Machine(object): class Job(object): '''Class to manage one job ''' - def __init__(self, name, machine, application, distribution, table, + def __init__(self, name, machine, application, table, commands, timeout, config, logger, job_file, after=None): self.name = name @@ -228,7 +234,6 @@ class Job(object): self.after = after self.timeout = timeout self.application = application - self.distribution = distribution self.table = table self.config = config self.logger = logger @@ -260,7 +265,7 @@ class Job(object): "list_log_files.txt") + " job --jobs_config " + job_file + - " --job " + + " --name " + self.name) def get_pids(self): @@ -320,8 +325,8 @@ class Job(object): if self._stdout.channel.closed: self._has_finished = True # Store the result outputs - self.out = self._stdout.read() - self.err = self._stderr.read() + self.out = self._stdout.read().decode() + self.err = self._stderr.read().decode() # Put end time self._Tf = time.time() # And get the remote command status and log files @@ -354,7 +359,8 @@ class Job(object): os.remove(tmp_file_path) # The first line is the result of the command (0 success or 1 fail) self.res_job = file_lines[0] - for job_path_remote in file_lines[1:]: + + for i, job_path_remote in enumerate(file_lines[1:]): try: # For each command, there is two files to get : # 1- The xml file describing the command and giving the @@ -366,6 +372,11 @@ class Job(object): local_path = os.path.join(os.path.dirname( self.logger.logFilePath), os.path.basename(job_path_remote)) + if i==0: # The first is the job command + self.logger.add_link(os.path.basename(job_path_remote), + "job", + self.res_job, + self.command) else: # Case 2- local_path = os.path.join(os.path.dirname( @@ -448,7 +459,7 @@ class Job(object): self._Tf = time.time() self.get_pids() (out_kill, _) = self.kill_remote_process() - self.out = "TIMEOUT \n" + out_kill.read() + self.out = "TIMEOUT \n" + out_kill.read().decode() self.err = "TIMEOUT : %s seconds elapsed\n" % str(self.timeout) try: self.get_log_files() @@ -472,7 +483,8 @@ class Job(object): msg = _("Warning: A job can only be launched one time") msg2 = _("Trying to launch the job \"%s\" whereas it has " "already been launched." % self.name) - self.logger.write(src.printcolors.printcWarning("%s\n%s\n" % (msg, msg2))) + self.logger.write(src.printcolors.printcWarning("%s\n%s\n" % (msg, + msg2))) return # Do not execute the command if the machine could not be reached @@ -508,7 +520,7 @@ class Job(object): if self.after: self.logger.write("after : %s\n" % self.after) self.logger.write("Time elapsed : %4imin %2is \n" % - (self.total_duration()/60 , self.total_duration()%60)) + (self.total_duration()//60 , self.total_duration()%60)) if self._T0 != -1: self.logger.write("Begin time : %s\n" % time.strftime('%Y-%m-%d %H:%M:%S', @@ -583,8 +595,6 @@ class Jobs(object): self.ljobs_not_today = [] self.runner = runner self.logger = logger - # The correlation dictionary between jobs and machines - self.dic_job_machine = {} self.len_columns = lenght_columns # the list of jobs that have not been run yet @@ -614,9 +624,6 @@ class Jobs(object): application = None if 'application' in job_def: application = job_def.application - distribution = None - if 'distribution' in job_def: - distribution = job_def.distribution table = None if 'table' in job_def: table = job_def.table @@ -624,7 +631,6 @@ class Jobs(object): return Job(name, machine, application, - distribution, table, cmmnds, timeout, @@ -707,13 +713,12 @@ class Jobs(object): self.logger.write(src.printcolors.printcWarning(msg)) a_job = self.define_job(job_def, a_machine) - self.dic_job_machine[a_job] = a_machine if today in job_def.when: self.ljobs.append(a_job) else: # today in job_def.when self.ljobs_not_today.append(a_job) - + self.lhosts = host_list def ssh_connection_all_machines(self, pad=50): @@ -747,6 +752,13 @@ class Jobs(object): self.logger.flush() res_copy = machine.copy_sat(self.runner.cfg.VARS.salometoolsway, self.job_file_path) + # get the remote machine distribution using a sat command + (__, out_dist, __) = machine.exec_command( + os.path.join(machine.sat_path, + "sat config --value VARS.dist --no_label"), + self.logger) + machine.distribution = out_dist.read().decode().replace("\n", + "") # Print the status of the copy if res_copy == 0: self.logger.write('\r%s' % @@ -787,7 +799,7 @@ class Jobs(object): ''' host = hostname[0] port = hostname[1] - for jb in self.dic_job_machine: + for jb in self.ljobs: if jb.machine.host == host and jb.machine.port == port: if jb.is_running(): return jb @@ -802,7 +814,7 @@ class Jobs(object): ''' jobs_finished_list = [] jobs_running_list = [] - for jb in self.dic_job_machine: + for jb in self.ljobs: if jb.is_running(): jobs_running_list.append(jb) jb.check_time() @@ -841,7 +853,6 @@ class Jobs(object): for jb in self.ljobs: if jb.name == name: return jb - # the following is executed only if the job was not found msg = _('The job "%s" seems to be nonexistent') % name raise src.SatException(msg) @@ -859,8 +870,8 @@ class Jobs(object): text_out = text[:length-3] + '...' else: diff = length - len(text) - before = " " * (diff/2) - after = " " * (diff/2 + diff%2) + before = " " * (diff//2) + after = " " * (diff//2 + diff%2) text_out = before + text + after return text_out @@ -921,8 +932,8 @@ class Jobs(object): self.logger.flush() # The infinite loop that runs the jobs - l_jobs_not_started = self.dic_job_machine.keys() - while len(self._l_jobs_finished) != len(self.dic_job_machine.keys()): + l_jobs_not_started = src.deepcopy_list(self.ljobs) + while len(self._l_jobs_finished) != len(self.ljobs): new_job_start = False for host_port in self.lhosts: @@ -969,7 +980,7 @@ class Jobs(object): :rtype: N\A ''' - for jb in self.dic_job_machine.keys(): + for jb in self.ljobs: self.logger.write(src.printcolors.printcLabel( "#------- Results for job %s -------#\n" % jb.name)) jb.write_results() @@ -1037,7 +1048,7 @@ class Gui(object): if (job.machine.host, job.machine.port) not in l_hosts_ports: l_hosts_ports.append((job.machine.host, job.machine.port)) - distrib = job.distribution + distrib = job.machine.distribution application = job.application table_job = job.table @@ -1053,9 +1064,11 @@ class Gui(object): attrib={"name" : distrib}) if table_job == table: - if application is not None and application not in d_application[table]: + if (application is not None and + application not in d_application[table]): d_application[table].append(application) - src.xmlManager.add_simple_node(self.d_xml_table_files[table].xmlroot.find('applications'), + src.xmlManager.add_simple_node( + self.d_xml_table_files[table].xmlroot.find('applications'), "application", attrib={"name" : application}) @@ -1068,13 +1081,15 @@ class Gui(object): attrib={"name" : host_port}) # Initialize the jobs node in all files - for xml_file in [self.xml_global_file] + self.d_xml_table_files.values(): + for xml_file in [self.xml_global_file] + list(self.d_xml_table_files.values()): xml_jobs = xml_file.add_simple_node("jobs") - # Get the jobs present in the config file but that will not be launched - # today + # Get the jobs present in the config file but + # that will not be launched today self.put_jobs_not_today(l_jobs_not_today, xml_jobs) - xml_file.add_simple_node("infos", attrib={"name" : "last update", "JobsCommandStatus" : "running"}) + xml_file.add_simple_node("infos", + attrib={"name" : "last update", + "JobsCommandStatus" : "running"}) def put_jobs_not_today(self, l_jobs_not_today, xml_node_jobs): @@ -1091,7 +1106,7 @@ class Gui(object): src.xmlManager.add_simple_node(xmlj, "application", job.application) src.xmlManager.add_simple_node(xmlj, "distribution", - job.distribution) + job.machine.distribution) src.xmlManager.add_simple_node(xmlj, "table", job.table) src.xmlManager.add_simple_node(xmlj, "commands", " ; ".join(job.commands)) @@ -1108,7 +1123,7 @@ class Gui(object): :param l_jobs List: the list of jobs that run today ''' - for xml_file in [self.xml_global_file] + self.d_xml_table_files.values(): + for xml_file in [self.xml_global_file] + list(self.d_xml_table_files.values()): self.update_xml_file(l_jobs, xml_file) # Write the file @@ -1151,7 +1166,7 @@ class Gui(object): job.machine.sat_path) src.xmlManager.add_simple_node(xmlj, "application", job.application) src.xmlManager.add_simple_node(xmlj, "distribution", - job.distribution) + job.machine.distribution) src.xmlManager.add_simple_node(xmlj, "table", job.table) src.xmlManager.add_simple_node(xmlj, "timeout", str(job.timeout)) src.xmlManager.add_simple_node(xmlj, "commands", @@ -1207,7 +1222,7 @@ class Gui(object): :param l_jobs List: the list of jobs that run today :param xml_file xmlManager.XmlLogFile: the xml instance to update ''' - for xml_file in [self.xml_global_file] + self.d_xml_table_files.values(): + for xml_file in [self.xml_global_file] + list(self.d_xml_table_files.values()): xml_node_infos = xml_file.xmlroot.find('infos') src.xmlManager.append_node_attrib(xml_node_infos, attrib={"JobsCommandStatus" : finish_status}) @@ -1309,7 +1324,7 @@ def run(args, runner, logger): gui = None if options.publish: - gui = Gui("/export/home/serioja/LOGS", + gui = Gui(runner.cfg.SITE.log.log_dir, today_jobs.ljobs, today_jobs.ljobs_not_today,) diff --git a/complete_sat.sh b/complete_sat.sh index a915f60..2762e7b 100755 --- a/complete_sat.sh +++ b/complete_sat.sh @@ -41,7 +41,7 @@ _show_applications() opts2=$(echo --command --sat $opts2) ;; job) - opts2=$(echo --jobs_config --job $opts2) + opts2=$(echo --jobs_config --name $opts2) ;; esac @@ -209,7 +209,7 @@ _salomeTools_complete() return 0 ;; job) - opts="--jobs_config --job" + opts="--jobs_config --name" COMPREPLY=( $(compgen -W "${opts}" -- ${cur}) ) return 0 ;; diff --git a/salomeTools.py b/salomeTools.py index 38badf8..6bf491e 100755 --- a/salomeTools.py +++ b/salomeTools.py @@ -261,14 +261,10 @@ class Sat(object): # Add a link to the parent command if logger_add_link is not None: - xmlLinks = logger_add_link.xmlFile.xmlroot.find( - "Links") - src.xmlManager.add_simple_node(xmlLinks, - "link", - text = logger_command.logFileName, - attrib = {"command" : __nameCmd__, - "passed" : res, - "launchedCommand" : launchedCommand}) + logger_add_link.add_link(logger_command.logFileName, + __nameCmd__, + res, + launchedCommand) logger_add_link.l_logFiles += logger_command.l_logFiles finally: diff --git a/src/__init__.py b/src/__init__.py index 45da5cc..3a39375 100644 --- a/src/__init__.py +++ b/src/__init__.py @@ -255,4 +255,16 @@ def handleRemoveReadonly(func, path, exc): os.chmod(path, stat.S_IRWXU| stat.S_IRWXG| stat.S_IRWXO) # 0777 func(path) else: - raise \ No newline at end of file + raise + +def deepcopy_list(input_list): + """ Do a deep copy of a list + + :param input_list List: The list to copy + :return: The copy of the list + :rtype: List + """ + res = [] + for elem in input_list: + res.append(elem) + return res \ No newline at end of file diff --git a/src/internal_config/distrib.pyconf b/src/internal_config/distrib.pyconf index 18ea1b4..b1cdae3 100644 --- a/src/internal_config/distrib.pyconf +++ b/src/internal_config/distrib.pyconf @@ -54,5 +54,6 @@ VERSIONS : "CO": { "7.1.1503": "7.1" + "CO7.2.1511": "7.2" } } diff --git a/src/logger.py b/src/logger.py index a87a771..3e1e5a1 100644 --- a/src/logger.py +++ b/src/logger.py @@ -115,6 +115,27 @@ class Logger(object): # command self.xmlFile.add_simple_node("Links") + def add_link(self, + log_file_name, + command_name, + command_res, + full_launched_command): + '''Add a link to another log file. + + :param log_file_name str: The file name of the link. + :param command_name str: The name of the command linked. + :param command_res str: The result of the command linked. "0" or "1" + :parma full_launched_command str: The full lanch command + ("sat command ...") + ''' + xmlLinks = self.xmlFile.xmlroot.find("Links") + src.xmlManager.add_simple_node(xmlLinks, + "link", + text = log_file_name, + attrib = {"command" : command_name, + "passed" : command_res, + "launchedCommand" : full_launched_command}) + def write(self, message, level=None, screenOnly=False): '''the function used in the commands that will print in the terminal and the log file. -- 2.39.2