# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
import os
-import sys
import datetime
import time
import paramiko
class machine(object):
'''Class to manage a ssh connection on a machine
'''
- def __init__(self, host, user, port=22, passwd=None):
+ def __init__(self, name, host, user, port=22, passwd=None, sat_path="./"):
+ self.name = name
self.host = host
self.port = port
self.user = user
self.password = passwd
+ self.sat_path = sat_path
self.ssh = paramiko.SSHClient()
self._connection_successful = None
username=self.user,
password = self.password)
except paramiko.AuthenticationException:
- message = src.KO_STATUS + _(": authentication failed\n")
- logger.write( src.printcolors.printcError(message))
+ message = src.KO_STATUS + _("Authentication failed")
except paramiko.BadHostKeyException:
message = (src.KO_STATUS +
- _(": the server's host key could not be verified\n"))
- logger.write( src.printcolors.printcError(message))
+ _("The server's host key could not be verified"))
except paramiko.SSHException:
- message = (src.KO_STATUS +
- _(": error connecting or establishing an SSH session\n"))
- logger.write( src.printcolors.printcError(message))
+ message = ( _("SSHException error connecting or establishing an SSH session"))
except:
- logger.write( src.printcolors.printcError(src.KO_STATUS + '\n'))
+ message = ( _("Error connecting or establishing an SSH session"))
else:
self._connection_successful = True
- logger.write( src.printcolors.printcSuccess(src.OK_STATUS) + '\n')
+ message = ""
+ return message
def successfully_connected(self, logger):
'''Verify if the connection to the remote machine has succeed
(machine.host, machine.port, machine.user)
logger.write( src.printcolors.printcWarning(message))
return self._connection_successful
-
-
- def close(self):
- '''Close the ssh connection
+
+ def copy_sat(self, sat_local_path, job_file):
+ '''Copy salomeTools to the remote machine in self.sat_path
+ '''
+ res = 0
+ try:
+ self.sftp = self.ssh.open_sftp()
+ self.mkdir(self.sat_path, ignore_existing=True)
+ self.put_dir(sat_local_path, self.sat_path, filters = ['.git'])
+ job_file_name = os.path.basename(job_file)
+ self.sftp.put(job_file, os.path.join(self.sat_path, "data", "jobs", job_file_name))
+ except Exception as e:
+ res = str(e)
+ self._connection_successful = False
- :rtype: N\A
+ return res
+
+ def put_dir(self, source, target, filters = []):
+ ''' Uploads the contents of the source directory to the target path. The
+ target directory needs to exists. All subdirectories in source are
+ created under target.
'''
- self.ssh.close()
+ for item in os.listdir(source):
+ if item in filters:
+ continue
+ source_path = os.path.join(source, item)
+ destination_path = os.path.join(target, item)
+ if os.path.islink(source_path):
+ linkto = os.readlink(source_path)
+ try:
+ self.sftp.remove(destination_path)
+ self.sftp.symlink(linkto, destination_path)
+ self.sftp.chmod(destination_path, os.stat(source_path).st_mode)
+ except IOError:
+ pass
+ else:
+ if os.path.isfile(source_path):
+ self.sftp.put(source_path, destination_path)
+ self.sftp.chmod(destination_path, os.stat(source_path).st_mode)
+ else:
+ self.mkdir(destination_path, ignore_existing=True)
+ self.put_dir(source_path, destination_path)
+
+ def mkdir(self, path, mode=511, ignore_existing=False):
+ ''' Augments mkdir by adding an option to not fail if the folder exists '''
+ try:
+ self.sftp.mkdir(path, mode)
+ except IOError:
+ if ignore_existing:
+ pass
+ else:
+ raise
def exec_command(self, command, logger):
'''Execute the command on the remote machine
return (None, None, None)
else:
return (stdin, stdout, stderr)
-
+
+ def close(self):
+ '''Close the ssh connection
+
+ :rtype: N\A
+ '''
+ self.ssh.close()
+
def write_info(self, logger):
'''Prints the informations relative to the machine in the logger
(terminal traces and log file)
class job(object):
'''Class to manage one job
'''
- def __init__(self, name, machine, application, distribution, commands, timeout, logger, after=None):
+ def __init__(self, name, machine, application, distribution,
+ commands, timeout, logger, job_file, after=None):
self.name = name
self.machine = machine
self.application = application
self.distribution = distribution
self.logger = logger
+ self.remote_log_files = []
self._T0 = -1
self._Tf = -1
self.out = None # Contains something only if the job is finished
self.err = None # Contains something only if the job is finished
- self.commands = " ; ".join(commands)
+ self.commands = commands
+ self.command = os.path.join(self.machine.sat_path, "sat") + " -v1 job --jobs_config " + job_file + " --job " + self.name
def get_pids(self):
pids = []
- for cmd in self.commands.split(" ; "):
- cmd_pid = 'ps aux | grep "' + cmd + '" | awk \'{print $2}\''
- (_, out_pid, _) = self.machine.exec_command(cmd_pid, self.logger)
- pids_cmd = out_pid.readlines()
- pids_cmd = [str(src.only_numbers(pid)) for pid in pids_cmd]
- pids+=pids_cmd
+ cmd_pid = 'ps aux | grep "sat -v1 job --jobs_config" | awk \'{print $2}\''
+ (_, out_pid, _) = self.machine.exec_command(cmd_pid, self.logger)
+ pids_cmd = out_pid.readlines()
+ pids_cmd = [str(src.only_numbers(pid)) for pid in pids_cmd]
+ pids+=pids_cmd
return pids
def kill_remote_process(self):
if self._stdout.channel.closed:
self._has_finished = True
- # And store the result outputs
+ # Store the result outputs
self.out = self._stdout.read()
self.err = self._stderr.read()
- # And put end time
+ # Put end time
self._Tf = time.time()
+ # And get the remote command status and log files
+ self.get_log_files()
return self._has_finished
+ def get_log_files(self):
+ if not self.has_finished():
+ msg = _("Trying to get log files whereas the job is not finished.")
+ self.logger.write(src.printcolors.printcWarning(msg))
+ return
+ out_lines = self.out.split("\n")
+ out_lines = [line for line in out_lines if line != '']
+ self.res_job = out_lines[0]
+ for job_path_remote in out_lines[1:]:
+ if os.path.basename(os.path.dirname(job_path_remote)) != 'OUT':
+ local_path = os.path.join(os.path.dirname(self.logger.logFilePath), os.path.basename(job_path_remote))
+ self.machine.sftp.get(job_path_remote, local_path)
+ else:
+ local_path = os.path.join(os.path.dirname(self.logger.logFilePath), 'OUT', os.path.basename(job_path_remote))
+ self.machine.sftp.get(job_path_remote, local_path)
+ self.remote_log_files.append(local_path)
+
def is_running(self):
'''Returns True if the job commands are running
if not self.machine.successfully_connected(logger):
self._has_finished = True
self.out = "N\A"
- self.err = ("Connection to machine (host: %s, port: %s, user: %s) has failed"
- % (self.machine.host, self.machine.port, self.machine.user))
+ self.err = ("Connection to machine (name : %s, host: %s, port: %s, user: %s) has failed\nUse the log command to get more information."
+ % (self.machine.name, self.machine.host, self.machine.port, self.machine.user))
else:
self._T0 = time.time()
self._stdin, self._stdout, self._stderr = self.machine.exec_command(
- self.commands, logger)
+ self.command, logger)
if (self._stdin, self._stdout, self._stderr) == (None, None, None):
self._has_finished = True
self._Tf = time.time()
class Jobs(object):
'''Class to manage the jobs to be run
'''
- def __init__(self, runner, logger, config_jobs, lenght_columns = 20):
+ def __init__(self, runner, logger, job_file, job_file_path, config_jobs, lenght_columns = 20):
# The jobs configuration
self.cfg_jobs = config_jobs
+ self.job_file = job_file
+ self.job_file_path = job_file_path
# The machine that will be used today
self.lmachines = []
# The list of machine (hosts, port) that will be used today
if 'distribution' in job_def:
distribution = job_def.distribution
- return job(name, machine, application, distribution, cmmnds, timeout, self.logger, after = after)
+ return job(name, machine, application, distribution, cmmnds, timeout, self.logger, self.job_file , after = after)
def determine_products_and_machines(self):
'''Function that reads the pyconf jobs definition and instantiates all
'''
today = datetime.date.weekday(datetime.date.today())
host_list = []
-
+
for job_def in self.cfg_jobs.jobs :
- if today in job_def.when:
- if 'host' not in job_def:
- host = self.runner.cfg.VARS.hostname
- else:
- host = job_def.host
-
- if 'port' not in job_def:
- port = 22
- else:
- port = job_def.port
+ if today in job_def.when:
- if (host, port) not in host_list:
- host_list.append((host, port))
+ name_machine = job_def.machine
- if 'user' not in job_def:
- user = self.runner.cfg.VARS.user
- else:
- user = job_def.user
+ a_machine = None
+ for mach in self.lmachines:
+ if mach.name == name_machine:
+ a_machine = mach
+ break
- if 'password' not in job_def:
- passwd = None
- else:
- passwd = job_def.password
-
- a_machine = machine(host, user, port=port, passwd=passwd)
+ if a_machine == None:
+ for machine_def in self.cfg_jobs.machines:
+ if machine_def.name == name_machine:
+ if 'host' not in machine_def:
+ host = self.runner.cfg.VARS.hostname
+ else:
+ host = machine_def.host
+
+ if 'user' not in machine_def:
+ user = self.runner.cfg.VARS.user
+ else:
+ user = machine_def.user
+
+ if 'port' not in machine_def:
+ port = 22
+ else:
+ port = machine_def.port
- self.lmachines.append(a_machine)
+ if 'password' not in machine_def:
+ passwd = None
+ else:
+ passwd = machine_def.password
+
+ if 'sat_path' not in machine_def:
+ sat_path = "./"
+ else:
+ sat_path = machine_def.sat_path
+
+ a_machine = machine(
+ machine_def.name,
+ host,
+ user,
+ port=port,
+ passwd=passwd,
+ sat_path=sat_path
+ )
+
+ if (host, port) not in host_list:
+ host_list.append((host, port))
+
+ self.lmachines.append(a_machine)
+ if a_machine == None:
+ msg = _("WARNING: The job \"%(job_name)s\" requires the "
+ "machine \"%(machine_name)s\" but this machine "
+ "is not defined in the configuration file.\n"
+ "The job will not be launched")
+ self.logger.write(src.printcolors.printcWarning(msg))
+
a_job = self.define_job(job_def, a_machine)
+ self.dic_job_machine[a_job] = a_machine
self.ljobs.append(a_job)
-
- self.dic_job_machine[a_job] = a_machine
-
+
self.lhosts = host_list
def ssh_connection_all_machines(self, pad=50):
"Establishing connection with all the machines :\n")))
for machine in self.lmachines:
# little algorithm in order to display traces
- begin_line = ("(host: %s, port: %s, user: %s)" %
- (machine.host, machine.port, machine.user))
+ begin_line = (_("Connection to %s: " % machine.name))
if pad - len(begin_line) < 0:
endline = " "
else:
endline = (pad - len(begin_line)) * "." + " "
- self.logger.write( begin_line + endline )
+
+ step = "SSH connection"
+ self.logger.write( begin_line + endline + step)
self.logger.flush()
# the call to the method that initiate the ssh connection
- machine.connect(self.logger)
+ msg = machine.connect(self.logger)
+
+ # Copy salomeTools to the remote machine
+ if machine.successfully_connected(self.logger):
+ step = _("Copy SAT")
+ self.logger.write('\r%s%s%s' % (begin_line, endline, 20 * " "), 3)
+ self.logger.write('\r%s%s%s' % (begin_line, endline, step), 3)
+ self.logger.flush()
+ res_copy = machine.copy_sat(self.runner.cfg.VARS.salometoolsway, self.job_file_path)
+ # Print the status of the copy
+ if res_copy == 0:
+ self.logger.write('\r%s' % ((len(begin_line)+len(endline)+20) * " "), 3)
+ self.logger.write('\r%s%s%s' % (begin_line, endline, src.printcolors.printc(src.OK_STATUS)), 3)
+ else:
+ self.logger.write('\r%s' % ((len(begin_line)+len(endline)+20) * " "), 3)
+ self.logger.write('\r%s%s%s %s' % (begin_line, endline, src.printcolors.printc(src.OK_STATUS), _("Copy of SAT failed")), 3)
+ else:
+ self.logger.write('\r%s' % ((len(begin_line)+len(endline)+20) * " "), 3)
+ self.logger.write('\r%s%s%s %s' % (begin_line, endline, src.printcolors.printc(src.KO_STATUS), msg), 3)
+ self.logger.write("\n", 3)
+
self.logger.write("\n")
src.xmlManager.add_simple_node(xmlj, "application", job.application)
src.xmlManager.add_simple_node(xmlj, "distribution", job.distribution)
src.xmlManager.add_simple_node(xmlj, "timeout", str(job.timeout))
- src.xmlManager.add_simple_node(xmlj, "commands", job.commands)
+ src.xmlManager.add_simple_node(xmlj, "commands", " ; ".join(job.commands))
src.xmlManager.add_simple_node(xmlj, "state", job.get_status())
src.xmlManager.add_simple_node(xmlj, "begin", str(job._T0))
src.xmlManager.add_simple_node(xmlj, "end", str(job._Tf))
- src.xmlManager.add_simple_node(xmlj, "out", job.out)
- src.xmlManager.add_simple_node(xmlj, "err", job.err)
-
+ src.xmlManager.add_simple_node(xmlj, "out", src.printcolors.cleancolor(job.out))
+ src.xmlManager.add_simple_node(xmlj, "err", src.printcolors.cleancolor(job.err))
+ if len(job.remote_log_files) > 0:
+ src.xmlManager.add_simple_node(xmlj, "remote_log_file_path", job.remote_log_files[0])
+ else:
+ src.xmlManager.add_simple_node(xmlj, "remote_log_file_path", "nothing")
# Update the date
src.xmlManager.append_node_attrib(self.xmlinfos,
attrib={"value" :
# Write the file
self.write_xml_file()
- def last_update(self):
+ def last_update(self, finish_status = "finished"):
src.xmlManager.append_node_attrib(self.xmlinfos,
- attrib={"JobsCommandStatus" : "finished"})
+ attrib={"JobsCommandStatus" : finish_status})
# Write the file
self.write_xml_file()
def write_xml_file(self):
self.xml_file.write_tree(self.stylesheet)
-def print_info(logger, arch, JobsFilePath):
- '''Prints information header..
-
- :param logger src.logger.Logger: The logger instance
- :param arch str: a string that gives the architecture of the machine on
- which the command is launched
- :param JobsFilePath str: The path of the file
- that contains the jobs configuration
- :return: Nothing
- :rtype: N\A
- '''
- info = [
- (_("Platform"), arch),
- (_("File containing the jobs configuration"), JobsFilePath)
- ]
-
- smax = max(map(lambda l: len(l[0]), info))
- for i in info:
- sp = " " * (smax - len(i[0]))
- src.printcolors.print_value(logger, sp + i[0], i[1], 2)
- logger.write("\n", 2)
-
##
# Describes the command
def description():
##
# Runs the command.
def run(args, runner, logger):
+
(options, args) = parser.parse_args(args)
jobs_cfg_files_dir = runner.cfg.SITE.jobs.config_path
+ l_cfg_dir = [jobs_cfg_files_dir, os.path.join(runner.cfg.VARS.datadir, "jobs")]
+
# Make sure the path to the jobs config files directory exists
- if not os.path.exists(jobs_cfg_files_dir):
- logger.write(_("Creating directory %s\n") %
- src.printcolors.printcLabel(jobs_cfg_files_dir), 1)
- os.mkdir(jobs_cfg_files_dir)
-
+ src.ensure_path_exists(jobs_cfg_files_dir)
+
# list option : display all the available config files
if options.list:
- lcfiles = []
- if not options.no_label:
- sys.stdout.write("------ %s\n" %
- src.printcolors.printcHeader(jobs_cfg_files_dir))
-
- for f in sorted(os.listdir(jobs_cfg_files_dir)):
- if not f.endswith('.pyconf'):
- continue
- cfilename = f[:-7]
- lcfiles.append(cfilename)
- sys.stdout.write("%s\n" % cfilename)
+ for cfg_dir in l_cfg_dir:
+ if not options.no_label:
+ logger.write("------ %s\n" %
+ src.printcolors.printcHeader(cfg_dir))
+
+ for f in sorted(os.listdir(cfg_dir)):
+ if not f.endswith('.pyconf'):
+ continue
+ cfilename = f[:-7]
+ logger.write("%s\n" % cfilename)
return 0
# Make sure the jobs_config option has been called
message = _("The option --jobs_config is required\n")
raise src.SatException( message )
- # Make sure the invoked file exists
- file_jobs_cfg = os.path.join(jobs_cfg_files_dir, options.jobs_cfg)
- if not file_jobs_cfg.endswith('.pyconf'):
- file_jobs_cfg += '.pyconf'
-
- if not os.path.exists(file_jobs_cfg):
- message = _("The file %s does not exist.\n") % file_jobs_cfg
- logger.write(src.printcolors.printcError(message), 1)
- message = _("The possible files are :\n")
- logger.write( src.printcolors.printcInfo(message), 1)
- for f in sorted(os.listdir(jobs_cfg_files_dir)):
- if not f.endswith('.pyconf'):
- continue
- jobscfgname = f[:-7]
- sys.stdout.write("%s\n" % jobscfgname)
- raise src.SatException( _("No corresponding file") )
+ # Find the file in the directories
+ found = False
+ for cfg_dir in l_cfg_dir:
+ file_jobs_cfg = os.path.join(cfg_dir, options.jobs_cfg)
+ if not file_jobs_cfg.endswith('.pyconf'):
+ file_jobs_cfg += '.pyconf'
+
+ if not os.path.exists(file_jobs_cfg):
+ continue
+ else:
+ found = True
+ break
- print_info(logger, runner.cfg.VARS.dist, file_jobs_cfg)
+ if not found:
+ msg = _("The file configuration %(name_file)s was not found."
+ "\nUse the --list option to get the possible files.")
+ src.printcolors.printcError(msg)
+ return 1
+
+ info = [
+ (_("Platform"), runner.cfg.VARS.dist),
+ (_("File containing the jobs configuration"), file_jobs_cfg)
+ ]
+ src.print_info(logger, info)
# Read the config that is in the file
config_jobs = src.read_config_from_a_file(file_jobs_cfg)
config_jobs.jobs = l_jb
# Initialization
- today_jobs = Jobs(runner, logger, config_jobs)
+ today_jobs = Jobs(runner, logger, options.jobs_cfg, file_jobs_cfg, config_jobs)
# SSH connection to all machines
today_jobs.ssh_connection_all_machines()
if options.test_connection:
today_jobs.gui = gui
+ interruped = False
try:
# Run all the jobs contained in config_jobs
today_jobs.run_jobs()
except KeyboardInterrupt:
+ interruped = True
logger.write("\n\n%s\n\n" %
(src.printcolors.printcWarning(_("Forced interruption"))), 1)
+
finally:
# find the potential not finished jobs and kill them
for jb in today_jobs.ljobs:
if not jb.has_finished():
jb.kill_remote_process()
-
+ if interruped:
+ today_jobs.gui.last_update(_("Forced interruption"))
+ else:
+ today_jobs.gui.last_update()
# Output the results
today_jobs.write_all_results()