self.name = name
self.host = host
self.port = port
+ self.distribution = None # Will be filled after copying SAT on the machine
self.user = user
self.password = passwd
self.sat_path = sat_path
'''
res = 0
try:
+ # open a sftp connection
self.sftp = self.ssh.open_sftp()
+ # Create the sat directory on remote machine if it is not existing
self.mkdir(self.sat_path, ignore_existing=True)
+ # Put sat
self.put_dir(sat_local_path, self.sat_path, filters = ['.git'])
+ # put the job configuration file in order to make it reachable
+ # on the remote machine
job_file_name = os.path.basename(job_file)
self.sftp.put(job_file, os.path.join(self.sat_path,
"data",
class Job(object):
'''Class to manage one job
'''
- def __init__(self, name, machine, application, distribution, table,
+ def __init__(self, name, machine, application, table,
commands, timeout, config, logger, job_file, after=None):
self.name = name
self.after = after
self.timeout = timeout
self.application = application
- self.distribution = distribution
self.table = table
self.config = config
self.logger = logger
"list_log_files.txt") +
" job --jobs_config " +
job_file +
- " --job " +
+ " --name " +
self.name)
def get_pids(self):
if self._stdout.channel.closed:
self._has_finished = True
# Store the result outputs
- self.out = self._stdout.read()
- self.err = self._stderr.read()
+ self.out = self._stdout.read().decode()
+ self.err = self._stderr.read().decode()
# Put end time
self._Tf = time.time()
# And get the remote command status and log files
os.remove(tmp_file_path)
# The first line is the result of the command (0 success or 1 fail)
self.res_job = file_lines[0]
- for job_path_remote in file_lines[1:]:
+
+ for i, job_path_remote in enumerate(file_lines[1:]):
try:
# For each command, there is two files to get :
# 1- The xml file describing the command and giving the
local_path = os.path.join(os.path.dirname(
self.logger.logFilePath),
os.path.basename(job_path_remote))
+ if i==0: # The first is the job command
+ self.logger.add_link(os.path.basename(job_path_remote),
+ "job",
+ self.res_job,
+ self.command)
else:
# Case 2-
local_path = os.path.join(os.path.dirname(
self._Tf = time.time()
self.get_pids()
(out_kill, _) = self.kill_remote_process()
- self.out = "TIMEOUT \n" + out_kill.read()
+ self.out = "TIMEOUT \n" + out_kill.read().decode()
self.err = "TIMEOUT : %s seconds elapsed\n" % str(self.timeout)
try:
self.get_log_files()
msg = _("Warning: A job can only be launched one time")
msg2 = _("Trying to launch the job \"%s\" whereas it has "
"already been launched." % self.name)
- self.logger.write(src.printcolors.printcWarning("%s\n%s\n" % (msg, msg2)))
+ self.logger.write(src.printcolors.printcWarning("%s\n%s\n" % (msg,
+ msg2)))
return
# Do not execute the command if the machine could not be reached
if self.after:
self.logger.write("after : %s\n" % self.after)
self.logger.write("Time elapsed : %4imin %2is \n" %
- (self.total_duration()/60 , self.total_duration()%60))
+ (self.total_duration()//60 , self.total_duration()%60))
if self._T0 != -1:
self.logger.write("Begin time : %s\n" %
time.strftime('%Y-%m-%d %H:%M:%S',
self.ljobs_not_today = []
self.runner = runner
self.logger = logger
- # The correlation dictionary between jobs and machines
- self.dic_job_machine = {}
self.len_columns = lenght_columns
# the list of jobs that have not been run yet
application = None
if 'application' in job_def:
application = job_def.application
- distribution = None
- if 'distribution' in job_def:
- distribution = job_def.distribution
table = None
if 'table' in job_def:
table = job_def.table
return Job(name,
machine,
application,
- distribution,
table,
cmmnds,
timeout,
self.logger.write(src.printcolors.printcWarning(msg))
a_job = self.define_job(job_def, a_machine)
- self.dic_job_machine[a_job] = a_machine
if today in job_def.when:
self.ljobs.append(a_job)
else: # today in job_def.when
self.ljobs_not_today.append(a_job)
-
+
self.lhosts = host_list
def ssh_connection_all_machines(self, pad=50):
self.logger.flush()
res_copy = machine.copy_sat(self.runner.cfg.VARS.salometoolsway,
self.job_file_path)
+ # get the remote machine distribution using a sat command
+ (__, out_dist, __) = machine.exec_command(
+ os.path.join(machine.sat_path,
+ "sat config --value VARS.dist --no_label"),
+ self.logger)
+ machine.distribution = out_dist.read().decode().replace("\n",
+ "")
# Print the status of the copy
if res_copy == 0:
self.logger.write('\r%s' %
'''
host = hostname[0]
port = hostname[1]
- for jb in self.dic_job_machine:
+ for jb in self.ljobs:
if jb.machine.host == host and jb.machine.port == port:
if jb.is_running():
return jb
'''
jobs_finished_list = []
jobs_running_list = []
- for jb in self.dic_job_machine:
+ for jb in self.ljobs:
if jb.is_running():
jobs_running_list.append(jb)
jb.check_time()
for jb in self.ljobs:
if jb.name == name:
return jb
-
# the following is executed only if the job was not found
msg = _('The job "%s" seems to be nonexistent') % name
raise src.SatException(msg)
text_out = text[:length-3] + '...'
else:
diff = length - len(text)
- before = " " * (diff/2)
- after = " " * (diff/2 + diff%2)
+ before = " " * (diff//2)
+ after = " " * (diff//2 + diff%2)
text_out = before + text + after
return text_out
self.logger.flush()
# The infinite loop that runs the jobs
- l_jobs_not_started = self.dic_job_machine.keys()
- while len(self._l_jobs_finished) != len(self.dic_job_machine.keys()):
+ l_jobs_not_started = src.deepcopy_list(self.ljobs)
+ while len(self._l_jobs_finished) != len(self.ljobs):
new_job_start = False
for host_port in self.lhosts:
:rtype: N\A
'''
- for jb in self.dic_job_machine.keys():
+ for jb in self.ljobs:
self.logger.write(src.printcolors.printcLabel(
"#------- Results for job %s -------#\n" % jb.name))
jb.write_results()
if (job.machine.host, job.machine.port) not in l_hosts_ports:
l_hosts_ports.append((job.machine.host, job.machine.port))
- distrib = job.distribution
+ distrib = job.machine.distribution
application = job.application
table_job = job.table
attrib={"name" : distrib})
if table_job == table:
- if application is not None and application not in d_application[table]:
+ if (application is not None and
+ application not in d_application[table]):
d_application[table].append(application)
- src.xmlManager.add_simple_node(self.d_xml_table_files[table].xmlroot.find('applications'),
+ src.xmlManager.add_simple_node(
+ self.d_xml_table_files[table].xmlroot.find('applications'),
"application",
attrib={"name" : application})
attrib={"name" : host_port})
# Initialize the jobs node in all files
- for xml_file in [self.xml_global_file] + self.d_xml_table_files.values():
+ for xml_file in [self.xml_global_file] + list(self.d_xml_table_files.values()):
xml_jobs = xml_file.add_simple_node("jobs")
- # Get the jobs present in the config file but that will not be launched
- # today
+ # Get the jobs present in the config file but
+ # that will not be launched today
self.put_jobs_not_today(l_jobs_not_today, xml_jobs)
- xml_file.add_simple_node("infos", attrib={"name" : "last update", "JobsCommandStatus" : "running"})
+ xml_file.add_simple_node("infos",
+ attrib={"name" : "last update",
+ "JobsCommandStatus" : "running"})
def put_jobs_not_today(self, l_jobs_not_today, xml_node_jobs):
src.xmlManager.add_simple_node(xmlj, "application", job.application)
src.xmlManager.add_simple_node(xmlj,
"distribution",
- job.distribution)
+ job.machine.distribution)
src.xmlManager.add_simple_node(xmlj, "table", job.table)
src.xmlManager.add_simple_node(xmlj,
"commands", " ; ".join(job.commands))
:param l_jobs List: the list of jobs that run today
'''
- for xml_file in [self.xml_global_file] + self.d_xml_table_files.values():
+ for xml_file in [self.xml_global_file] + list(self.d_xml_table_files.values()):
self.update_xml_file(l_jobs, xml_file)
# Write the file
job.machine.sat_path)
src.xmlManager.add_simple_node(xmlj, "application", job.application)
src.xmlManager.add_simple_node(xmlj, "distribution",
- job.distribution)
+ job.machine.distribution)
src.xmlManager.add_simple_node(xmlj, "table", job.table)
src.xmlManager.add_simple_node(xmlj, "timeout", str(job.timeout))
src.xmlManager.add_simple_node(xmlj, "commands",
:param l_jobs List: the list of jobs that run today
:param xml_file xmlManager.XmlLogFile: the xml instance to update
'''
- for xml_file in [self.xml_global_file] + self.d_xml_table_files.values():
+ for xml_file in [self.xml_global_file] + list(self.d_xml_table_files.values()):
xml_node_infos = xml_file.xmlroot.find('infos')
src.xmlManager.append_node_attrib(xml_node_infos,
attrib={"JobsCommandStatus" : finish_status})
gui = None
if options.publish:
- gui = Gui("/export/home/serioja/LOGS",
+ gui = Gui(runner.cfg.SITE.log.log_dir,
today_jobs.ljobs,
today_jobs.ljobs_not_today,)