3 # Copyright (C) 2010-2013 CEA/DEN
5 # This library is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation; either
8 # version 2.1 of the License.
10 # This library is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 # Lesser General Public License for more details.
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this library; if not, write to the Free Software
17 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
28 parser = src.options.Options()
30 parser.add_option('j', 'jobs_config', 'string', 'jobs_cfg',
31 _('The name of the config file that contains'
32 ' the jobs configuration'))
33 parser.add_option('o', 'only_jobs', 'list2', 'only_jobs',
34 _('The list of jobs to launch, by their name. '))
35 parser.add_option('l', 'list', 'boolean', 'list',
36 _('list all available config files.'))
37 parser.add_option('n', 'no_label', 'boolean', 'no_label',
38 _("do not print labels, Works only with --list."), False)
39 parser.add_option('t', 'test_connection', 'boolean', 'test_connection',
40 _("Try to connect to the machines. Not executing the jobs."),
42 parser.add_option('p', 'publish', 'boolean', 'publish',
43 _("Generate an xml file that can be read in a browser to "
44 "display the jobs status."),
47 class machine(object):
48 '''Class to manage a ssh connection on a machine
50 def __init__(self, host, user, port=22, passwd=None):
54 self.password = passwd
55 self.ssh = paramiko.SSHClient()
56 self._connection_successful = None
58 def connect(self, logger):
59 '''Initiate the ssh connection to the remote machine
61 :param logger src.logger.Logger: The logger instance
66 self._connection_successful = False
67 self.ssh.load_system_host_keys()
68 self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
70 self.ssh.connect(self.host,
73 password = self.password)
74 except paramiko.AuthenticationException:
75 message = src.KO_STATUS + _(": authentication failed\n")
76 logger.write( src.printcolors.printcError(message))
77 except paramiko.BadHostKeyException:
78 message = (src.KO_STATUS +
79 _(": the server's host key could not be verified\n"))
80 logger.write( src.printcolors.printcError(message))
81 except paramiko.SSHException:
82 message = (src.KO_STATUS +
83 _(": error connecting or establishing an SSH session\n"))
84 logger.write( src.printcolors.printcError(message))
86 logger.write( src.printcolors.printcError(src.KO_STATUS + '\n'))
88 self._connection_successful = True
89 logger.write( src.printcolors.printcSuccess(src.OK_STATUS) + '\n')
91 def successfully_connected(self, logger):
92 '''Verify if the connection to the remote machine has succeed
94 :param logger src.logger.Logger: The logger instance
95 :return: True if the connection has succeed, False if not
98 if self._connection_successful == None:
99 message = "Warning : trying to ask if the connection to "
100 "(host: %s, port: %s, user: %s) is OK whereas there were"
101 " no connection request" % \
102 (machine.host, machine.port, machine.user)
103 logger.write( src.printcolors.printcWarning(message))
104 return self._connection_successful
108 '''Close the ssh connection
114 def exec_command(self, command, logger):
115 '''Execute the command on the remote machine
117 :param command str: The command to be run
118 :param logger src.logger.Logger: The logger instance
119 :return: the stdin, stdout, and stderr of the executing command,
121 :rtype: (paramiko.channel.ChannelFile, paramiko.channel.ChannelFile,
122 paramiko.channel.ChannelFile)
125 # Does not wait the end of the command
126 (stdin, stdout, stderr) = self.ssh.exec_command(command)
127 except paramiko.SSHException:
128 message = src.KO_STATUS + _(
129 ": the server failed to execute the command\n")
130 logger.write( src.printcolors.printcError(message))
131 return (None, None, None)
133 logger.write( src.printcolors.printcError(src.KO_STATUS + '\n'))
134 return (None, None, None)
136 return (stdin, stdout, stderr)
138 def write_info(self, logger):
139 '''Prints the informations relative to the machine in the logger
140 (terminal traces and log file)
142 :param logger src.logger.Logger: The logger instance
146 logger.write("host : " + self.host + "\n")
147 logger.write("port : " + str(self.port) + "\n")
148 logger.write("user : " + str(self.user) + "\n")
149 logger.write("password : " + str(self.password) + "\n")
150 if self.successfully_connected(logger):
151 status = src.OK_STATUS
153 status = src.KO_STATUS
154 logger.write("Connection : " + status + "\n\n")
158 '''Class to manage one job
160 def __init__(self, name, machine, commands, timeout, logger, after=None):
163 self.machine = machine
165 self.timeout = timeout
170 self._has_begun = False
171 self._has_finished = False
172 self._stdin = None # Store the command inputs field
173 self._stdout = None # Store the command outputs field
174 self._stderr = None # Store the command errors field
176 self.out = None # Contains something only if the job is finished
177 self.err = None # Contains something only if the job is finished
179 self.commands = " ; ".join(commands)
183 for cmd in self.commands.split(" ; "):
184 cmd_pid = 'ps aux | grep "' + cmd + '" | awk \'{print $2}\''
185 (_, out_pid, _) = self.machine.exec_command(cmd_pid, self.logger)
186 pids_cmd = out_pid.readlines()
187 pids_cmd = [str(src.only_numbers(pid)) for pid in pids_cmd]
191 def kill_remote_process(self):
192 '''Kills the process on the remote machine.
194 :return: (the output of the kill, the error of the kill)
198 pids = self.get_pids()
199 cmd_kill = " ; ".join([("kill -9 " + pid) for pid in pids])
200 (_, out_kill, err_kill) = self.machine.exec_command(cmd_kill,
202 return (out_kill, err_kill)
205 '''Returns True if the job has already begun
207 :return: True if the job has already begun
210 return self._has_begun
212 def has_finished(self):
213 '''Returns True if the job has already finished
214 (i.e. all the commands have been executed)
215 If it is finished, the outputs are stored in the fields out and err.
217 :return: True if the job has already finished
221 # If the method has already been called and returned True
222 if self._has_finished:
225 # If the job has not begun yet
226 if not self.has_begun():
229 if self._stdout.channel.closed:
230 self._has_finished = True
231 # And store the result outputs
232 self.out = self._stdout.read()
233 self.err = self._stderr.read()
235 self._Tf = time.time()
237 return self._has_finished
239 def is_running(self):
240 '''Returns True if the job commands are running
242 :return: True if the job is running
245 return self.has_begun() and not self.has_finished()
247 def time_elapsed(self):
248 if not self.has_begun():
251 return T_now - self._T0
253 def check_time(self):
254 if not self.has_begun():
256 if self.time_elapsed() > self.timeout:
257 self._has_finished = True
258 self._Tf = time.time()
260 (out_kill, _) = self.kill_remote_process()
261 self.out = "TIMEOUT \n" + out_kill.read()
262 self.err = "TIMEOUT : %s seconds elapsed\n" % str(self.timeout)
264 def total_duration(self):
265 return self._Tf - self._T0
267 def run(self, logger):
269 print("Warn the user that a job can only be launched one time")
272 if not self.machine.successfully_connected(logger):
273 self._has_finished = True
275 self.err = ("Connection to machine (host: %s, port: %s, user: %s) has failed"
276 % (self.machine.host, self.machine.port, self.machine.user))
278 self._T0 = time.time()
279 self._stdin, self._stdout, self._stderr = self.machine.exec_command(
280 self.commands, logger)
281 if (self._stdin, self._stdout, self._stderr) == (None, None, None):
282 self._has_finished = True
283 self._Tf = time.time()
285 self.err = "The server failed to execute the command"
287 self._has_begun = True
289 def write_results(self, logger):
290 logger.write("name : " + self.name + "\n")
292 logger.write("after : %s\n" % self.after)
293 logger.write("Time elapsed : %4imin %2is \n" %
294 (self.total_duration()/60 , self.total_duration()%60))
296 logger.write("Begin time : %s\n" %
297 time.strftime('%Y-%m-%d %H:%M:%S',
298 time.localtime(self._T0)) )
300 logger.write("End time : %s\n\n" %
301 time.strftime('%Y-%m-%d %H:%M:%S',
302 time.localtime(self._Tf)) )
304 machine_head = "Informations about connection :\n"
305 underline = (len(machine_head) - 2) * "-"
306 logger.write(src.printcolors.printcInfo(machine_head + underline + "\n"))
307 self.machine.write_info(logger)
309 logger.write(src.printcolors.printcInfo("out : \n"))
311 logger.write("Unable to get output\n")
313 logger.write(self.out + "\n")
314 logger.write(src.printcolors.printcInfo("err : \n"))
316 logger.write("Unable to get error\n")
318 logger.write(self.err + "\n")
323 '''Class to manage the jobs to be run
325 def __init__(self, runner, logger, config_jobs, lenght_columns = 20):
326 # The jobs configuration
327 self.cfg_jobs = config_jobs
328 # The machine that will be used today
330 # The list of machine (hosts, port) that will be used today
331 # (a same host can have several machine instances since there
332 # can be several ssh parameters)
334 # The jobs to be launched today
338 # The correlation dictionary between jobs and machines
339 self.dic_job_machine = {}
340 self.len_columns = lenght_columns
342 # the list of jobs that have not been run yet
343 self._l_jobs_not_started = []
344 # the list of jobs that have already ran
345 self._l_jobs_finished = []
346 # the list of jobs that are running
347 self._l_jobs_running = []
349 self.determine_products_and_machines()
351 def define_job(self, job_def, machine):
352 '''Takes a pyconf job definition and a machine (from class machine)
353 and returns the job instance corresponding to the definition.
355 :param job_def src.config.Mapping: a job definition
356 :param machine machine: the machine on which the job will run
357 :return: The corresponding job in a job class instance
361 cmmnds = job_def.commands
362 timeout = job_def.timeout
364 if 'after' in job_def:
365 after = job_def.after
367 return job(name, machine, cmmnds, timeout, self.logger, after = after)
369 def determine_products_and_machines(self):
370 '''Function that reads the pyconf jobs definition and instantiates all
371 the machines and jobs to be done today.
376 today = datetime.date.weekday(datetime.date.today())
379 for job_def in self.cfg_jobs.jobs :
380 if today in job_def.when:
381 if 'host' not in job_def:
382 host = self.runner.cfg.VARS.hostname
386 if 'port' not in job_def:
391 if (host, port) not in host_list:
392 host_list.append((host, port))
394 if 'user' not in job_def:
395 user = self.runner.cfg.VARS.user
399 if 'password' not in job_def:
402 passwd = job_def.password
404 a_machine = machine(host, user, port=port, passwd=passwd)
406 self.lmachines.append(a_machine)
408 a_job = self.define_job(job_def, a_machine)
410 self.ljobs.append(a_job)
412 self.dic_job_machine[a_job] = a_machine
414 self.lhosts = host_list
416 def ssh_connection_all_machines(self, pad=50):
417 '''Function that do the ssh connection to every machine
423 self.logger.write(src.printcolors.printcInfo((
424 "Establishing connection with all the machines :\n")))
425 for machine in self.lmachines:
426 # little algorithm in order to display traces
427 begin_line = ("(host: %s, port: %s, user: %s)" %
428 (machine.host, machine.port, machine.user))
429 if pad - len(begin_line) < 0:
432 endline = (pad - len(begin_line)) * "." + " "
433 self.logger.write( begin_line + endline )
435 # the call to the method that initiate the ssh connection
436 machine.connect(self.logger)
437 self.logger.write("\n")
440 def is_occupied(self, hostname):
441 '''Function that returns True if a job is running on
442 the machine defined by its host and its port.
444 :param hostname (str, int): the pair (host, port)
445 :return: the job that is running on the host,
446 or false if there is no job running on the host.
451 for jb in self.dic_job_machine:
452 if jb.machine.host == host and jb.machine.port == port:
457 def update_jobs_states_list(self):
458 '''Function that updates the lists that store the currently
459 running jobs and the jobs that have already finished.
464 jobs_finished_list = []
465 jobs_running_list = []
466 for jb in self.dic_job_machine:
468 jobs_running_list.append(jb)
470 if jb.has_finished():
471 jobs_finished_list.append(jb)
472 self._l_jobs_finished = jobs_finished_list
473 self._l_jobs_running = jobs_running_list
475 def findJobThatHasName(self, name):
476 '''Returns the job by its name.
478 :param name str: a job name
479 :return: the job that has the name.
482 for jb in self.ljobs:
486 # the following is executed only if the job was not found
487 msg = _('The job "%s" seems to be nonexistent') % name
488 raise src.SatException(msg)
490 def str_of_length(self, text, length):
491 '''Takes a string text of any length and returns
492 the most close string of length "length".
494 :param text str: any string
495 :param length int: a length for the returned string
496 :return: the most close string of length "length"
499 if len(text) > length:
500 text_out = text[:length-3] + '...'
502 diff = length - len(text)
503 before = " " * (diff/2)
504 after = " " * (diff/2 + diff%2)
505 text_out = before + text + after
509 def display_status(self, len_col):
510 '''Takes a lenght and construct the display of the current status
511 of the jobs in an array that has a column for each host.
512 It displays the job that is currently running on the host
515 :param len_col int: the size of the column
521 for host_port in self.lhosts:
522 jb = self.is_occupied(host_port)
523 if not jb: # nothing running on the host
524 empty = self.str_of_length("empty", len_col)
525 display_line += "|" + empty
527 display_line += "|" + src.printcolors.printcInfo(
528 self.str_of_length(jb.name, len_col))
530 self.logger.write("\r" + display_line + "|")
535 '''The main method. Runs all the jobs on every host.
536 For each host, at a given time, only one job can be running.
537 The jobs that have the field after (that contain the job that has
538 to be run before it) are run after the previous job.
539 This method stops when all the jobs are finished.
546 self.logger.write(src.printcolors.printcInfo(
547 _('Executing the jobs :\n')))
549 for host_port in self.lhosts:
552 if port == 22: # default value
553 text_line += "|" + self.str_of_length(host, self.len_columns)
555 text_line += "|" + self.str_of_length(
556 "("+host+", "+str(port)+")", self.len_columns)
558 tiret_line = " " + "-"*(len(text_line)-1) + "\n"
559 self.logger.write(tiret_line)
560 self.logger.write(text_line + "|\n")
561 self.logger.write(tiret_line)
564 # The infinite loop that runs the jobs
565 l_jobs_not_started = self.dic_job_machine.keys()
566 while len(self._l_jobs_finished) != len(self.dic_job_machine.keys()):
567 for host_port in self.lhosts:
569 if self.is_occupied(host_port):
572 for jb in l_jobs_not_started:
573 if (jb.machine.host, jb.machine.port) != host_port:
577 l_jobs_not_started.remove(jb)
580 jb_before = self.findJobThatHasName(jb.after)
581 if jb_before.has_finished():
583 l_jobs_not_started.remove(jb)
586 self.update_jobs_states_list()
588 # Display the current status
589 self.display_status(self.len_columns)
591 # Make sure that the proc is not entirely busy
594 self.logger.write("\n")
595 self.logger.write(tiret_line)
596 self.logger.write("\n\n")
598 def write_all_results(self):
599 '''Display all the jobs outputs.
605 for jb in self.dic_job_machine.keys():
606 self.logger.write(src.printcolors.printcLabel(
607 "#------- Results for job %s -------#\n" % jb.name))
608 jb.write_results(self.logger)
609 self.logger.write("\n\n")
612 '''Class to manage the the xml data that can be displayed in a browser to
615 def __init__(self, xml_file_path, l_jobs):
616 # The path of the xml file
617 self.xml_file_path = xml_file_path
618 # Open the file in a writing stream
619 self.xml_file = src.xmlManager.XmlLogFile(xml_file_path, "JobsReport")
620 # Create the lines and columns
621 self.initialize_array(l_jobs)
623 self.update_xml_file()
625 def initialize_array(self, l_jobs):
627 l_job_names_status = []
629 host = (job.machine.host, job.machine.port)
630 if host not in l_hosts:
632 l_job_names_status.append((job.name, host, "Not launched"))
633 self.l_hosts = l_hosts
634 self.l_job_names_status = l_job_names_status
636 def update_xml_file(self):
637 # Update the hosts node
638 self.xml_file.add_simple_node("hosts")
639 for host_name, host_port in self.l_hosts:
640 self.xml_file.append_node_attrib("hosts", {host_name : host_port})
642 # Update the job names and status node
643 for jname, jhost, jstatus in self.l_job_names_status:
644 self.xml_file.add_simple_node("job", jstatus, {"name" : jname, "host" : jhost[0] + ":" + str(jhost[1])})
646 self.xml_file.write_tree("job_report.xsl")
649 def print_info(logger, arch, JobsFilePath):
650 '''Prints information header..
652 :param logger src.logger.Logger: The logger instance
653 :param arch str: a string that gives the architecture of the machine on
654 which the command is launched
655 :param JobsFilePath str: The path of the file
656 that contains the jobs configuration
661 (_("Platform"), arch),
662 (_("File containing the jobs configuration"), JobsFilePath)
665 smax = max(map(lambda l: len(l[0]), info))
667 sp = " " * (smax - len(i[0]))
668 src.printcolors.print_value(logger, sp + i[0], i[1], 2)
669 logger.write("\n", 2)
672 # Describes the command
674 return _("The jobs command launches maintenances that are described"
675 " in the dedicated jobs configuration file.")
679 def run(args, runner, logger):
680 (options, args) = parser.parse_args(args)
682 jobs_cfg_files_dir = runner.cfg.SITE.jobs.config_path
684 # Make sure the path to the jobs config files directory exists
685 if not os.path.exists(jobs_cfg_files_dir):
686 logger.write(_("Creating directory %s\n") %
687 src.printcolors.printcLabel(jobs_cfg_files_dir), 1)
688 os.mkdir(jobs_cfg_files_dir)
690 # list option : display all the available config files
693 if not options.no_label:
694 sys.stdout.write("------ %s\n" %
695 src.printcolors.printcHeader(jobs_cfg_files_dir))
697 for f in sorted(os.listdir(jobs_cfg_files_dir)):
698 if not f.endswith('.pyconf'):
701 lcfiles.append(cfilename)
702 sys.stdout.write("%s\n" % cfilename)
705 # Make sure the jobs_config option has been called
706 if not options.jobs_cfg:
707 message = _("The option --jobs_config is required\n")
708 raise src.SatException( message )
710 # Make sure the invoked file exists
711 file_jobs_cfg = os.path.join(jobs_cfg_files_dir, options.jobs_cfg)
712 if not file_jobs_cfg.endswith('.pyconf'):
713 file_jobs_cfg += '.pyconf'
715 if not os.path.exists(file_jobs_cfg):
716 message = _("The file %s does not exist.\n") % file_jobs_cfg
717 logger.write(src.printcolors.printcError(message), 1)
718 message = _("The possible files are :\n")
719 logger.write( src.printcolors.printcInfo(message), 1)
720 for f in sorted(os.listdir(jobs_cfg_files_dir)):
721 if not f.endswith('.pyconf'):
724 sys.stdout.write("%s\n" % jobscfgname)
725 raise src.SatException( _("No corresponding file") )
727 print_info(logger, runner.cfg.VARS.dist, file_jobs_cfg)
729 # Read the config that is in the file
730 config_jobs = src.read_config_from_a_file(file_jobs_cfg)
731 if options.only_jobs:
732 l_jb = src.pyconf.Sequence()
733 for jb in config_jobs.jobs:
734 if jb.name in options.only_jobs:
736 "Adding a job that was given in only_jobs option parameters")
737 config_jobs.jobs = l_jb
740 today_jobs = Jobs(runner, logger, config_jobs)
741 # SSH connection to all machines
742 today_jobs.ssh_connection_all_machines()
743 if options.test_connection:
747 Gui("/export/home/serioja/LOGS/test.xml", today_jobs.ljobs)
750 # Run all the jobs contained in config_jobs
751 today_jobs.run_jobs()
752 except KeyboardInterrupt:
753 logger.write("\n\n%s\n\n" %
754 (src.printcolors.printcWarning(_("Forced interruption"))), 1)
756 # find the potential not finished jobs and kill them
757 for jb in today_jobs.ljobs:
758 if not jb.has_finished():
759 jb.kill_remote_process()
762 today_jobs.write_all_results()