3 # Copyright (C) 2010-2013 CEA/DEN
5 # This library is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation; either
8 # version 2.1 of the License.
10 # This library is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 # Lesser General Public License for more details.
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this library; if not, write to the Free Software
17 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
26 STYLESHEET_GLOBAL = "jobs_global_report.xsl"
27 STYLESHEET_TABLE = "jobs_table_report.xsl"
29 parser = src.options.Options()
31 parser.add_option('j', 'jobs_config', 'string', 'jobs_cfg',
32 _('The name of the config file that contains'
33 ' the jobs configuration'))
34 parser.add_option('o', 'only_jobs', 'list2', 'only_jobs',
35 _('The list of jobs to launch, by their name. '))
36 parser.add_option('l', 'list', 'boolean', 'list',
37 _('list all available config files.'))
38 parser.add_option('n', 'no_label', 'boolean', 'no_label',
39 _("do not print labels, Works only with --list."), False)
40 parser.add_option('t', 'test_connection', 'boolean', 'test_connection',
41 _("Try to connect to the machines. Not executing the jobs."),
43 parser.add_option('p', 'publish', 'boolean', 'publish',
44 _("Generate an xml file that can be read in a browser to "
45 "display the jobs status."),
48 class Machine(object):
49 '''Class to manage a ssh connection on a machine
57 sat_path="salomeTools"):
62 self.password = passwd
63 self.sat_path = sat_path
64 self.ssh = paramiko.SSHClient()
65 self._connection_successful = None
67 def connect(self, logger):
68 '''Initiate the ssh connection to the remote machine
70 :param logger src.logger.Logger: The logger instance
75 self._connection_successful = False
76 self.ssh.load_system_host_keys()
77 self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
79 self.ssh.connect(self.host,
82 password = self.password)
83 except paramiko.AuthenticationException:
84 message = src.KO_STATUS + _("Authentication failed")
85 except paramiko.BadHostKeyException:
86 message = (src.KO_STATUS +
87 _("The server's host key could not be verified"))
88 except paramiko.SSHException:
89 message = ( _("SSHException error connecting or "
90 "establishing an SSH session"))
92 message = ( _("Error connecting or establishing an SSH session"))
94 self._connection_successful = True
98 def successfully_connected(self, logger):
99 '''Verify if the connection to the remote machine has succeed
101 :param logger src.logger.Logger: The logger instance
102 :return: True if the connection has succeed, False if not
105 if self._connection_successful == None:
106 message = _("Warning : trying to ask if the connection to "
107 "(name: %s host: %s, port: %s, user: %s) is OK whereas there were"
108 " no connection request" %
109 (self.name, self.host, self.port, self.user))
110 logger.write( src.printcolors.printcWarning(message))
111 return self._connection_successful
113 def copy_sat(self, sat_local_path, job_file):
114 '''Copy salomeTools to the remote machine in self.sat_path
118 self.sftp = self.ssh.open_sftp()
119 self.mkdir(self.sat_path, ignore_existing=True)
120 self.put_dir(sat_local_path, self.sat_path, filters = ['.git'])
121 job_file_name = os.path.basename(job_file)
122 self.sftp.put(job_file, os.path.join(self.sat_path,
126 except Exception as e:
128 self._connection_successful = False
132 def put_dir(self, source, target, filters = []):
133 ''' Uploads the contents of the source directory to the target path. The
134 target directory needs to exists. All subdirectories in source are
135 created under target.
137 for item in os.listdir(source):
140 source_path = os.path.join(source, item)
141 destination_path = os.path.join(target, item)
142 if os.path.islink(source_path):
143 linkto = os.readlink(source_path)
145 self.sftp.symlink(linkto, destination_path)
146 self.sftp.chmod(destination_path,
147 os.stat(source_path).st_mode)
151 if os.path.isfile(source_path):
152 self.sftp.put(source_path, destination_path)
153 self.sftp.chmod(destination_path,
154 os.stat(source_path).st_mode)
156 self.mkdir(destination_path, ignore_existing=True)
157 self.put_dir(source_path, destination_path)
159 def mkdir(self, path, mode=511, ignore_existing=False):
160 ''' Augments mkdir by adding an option to not fail
164 self.sftp.mkdir(path, mode)
171 def exec_command(self, command, logger):
172 '''Execute the command on the remote machine
174 :param command str: The command to be run
175 :param logger src.logger.Logger: The logger instance
176 :return: the stdin, stdout, and stderr of the executing command,
178 :rtype: (paramiko.channel.ChannelFile, paramiko.channel.ChannelFile,
179 paramiko.channel.ChannelFile)
182 # Does not wait the end of the command
183 (stdin, stdout, stderr) = self.ssh.exec_command(command)
184 except paramiko.SSHException:
185 message = src.KO_STATUS + _(
186 ": the server failed to execute the command\n")
187 logger.write( src.printcolors.printcError(message))
188 return (None, None, None)
190 logger.write( src.printcolors.printcError(src.KO_STATUS + '\n'))
191 return (None, None, None)
193 return (stdin, stdout, stderr)
196 '''Close the ssh connection
202 def write_info(self, logger):
203 '''Prints the informations relative to the machine in the logger
204 (terminal traces and log file)
206 :param logger src.logger.Logger: The logger instance
210 logger.write("host : " + self.host + "\n")
211 logger.write("port : " + str(self.port) + "\n")
212 logger.write("user : " + str(self.user) + "\n")
213 if self.successfully_connected(logger):
214 status = src.OK_STATUS
216 status = src.KO_STATUS
217 logger.write("Connection : " + status + "\n\n")
221 '''Class to manage one job
223 def __init__(self, name, machine, application, distribution, table,
224 commands, timeout, config, logger, job_file, after=None):
227 self.machine = machine
229 self.timeout = timeout
230 self.application = application
231 self.distribution = distribution
235 # The list of log files to download from the remote machine
236 self.remote_log_files = []
238 # The remote command status
239 # -1 means that it has not been launched,
240 # 0 means success and 1 means fail
242 self.cancelled = False
246 self._has_begun = False
247 self._has_finished = False
248 self._has_timouted = False
249 self._stdin = None # Store the command inputs field
250 self._stdout = None # Store the command outputs field
251 self._stderr = None # Store the command errors field
253 self.out = None # Contains something only if the job is finished
254 self.err = None # Contains something only if the job is finished
256 self.commands = commands
257 self.command = (os.path.join(self.machine.sat_path, "sat") +
259 os.path.join(self.machine.sat_path,
260 "list_log_files.txt") +
261 " job --jobs_config " +
267 """ Get the pid(s) corresponding to the command that have been launched
268 On the remote machine
270 :return: The list of integers corresponding to the found pids
274 cmd_pid = 'ps aux | grep "' + self.command + '" | awk \'{print $2}\''
275 (_, out_pid, _) = self.machine.exec_command(cmd_pid, self.logger)
276 pids_cmd = out_pid.readlines()
277 pids_cmd = [str(src.only_numbers(pid)) for pid in pids_cmd]
281 def kill_remote_process(self, wait=1):
282 '''Kills the process on the remote machine.
284 :return: (the output of the kill, the error of the kill)
288 pids = self.get_pids()
289 cmd_kill = " ; ".join([("kill -2 " + pid) for pid in pids])
290 (_, out_kill, err_kill) = self.machine.exec_command(cmd_kill,
293 return (out_kill, err_kill)
296 '''Returns True if the job has already begun
298 :return: True if the job has already begun
301 return self._has_begun
303 def has_finished(self):
304 '''Returns True if the job has already finished
305 (i.e. all the commands have been executed)
306 If it is finished, the outputs are stored in the fields out and err.
308 :return: True if the job has already finished
312 # If the method has already been called and returned True
313 if self._has_finished:
316 # If the job has not begun yet
317 if not self.has_begun():
320 if self._stdout.channel.closed:
321 self._has_finished = True
322 # Store the result outputs
323 self.out = self._stdout.read()
324 self.err = self._stderr.read()
326 self._Tf = time.time()
327 # And get the remote command status and log files
330 return self._has_finished
332 def get_log_files(self):
333 """Get the log files produced by the command launched
334 on the remote machine.
336 # Do not get the files if the command is not finished
337 if not self.has_finished():
338 msg = _("Trying to get log files whereas the job is not finished.")
339 self.logger.write(src.printcolors.printcWarning(msg))
342 # First get the file that contains the list of log files to get
343 tmp_file_path = src.get_tmp_filename(self.config, "list_log_files.txt")
344 self.machine.sftp.get(
345 os.path.join(self.machine.sat_path, "list_log_files.txt"),
348 # Read the file and get the result of the command and all the log files
350 fstream_tmp = open(tmp_file_path, "r")
351 file_lines = fstream_tmp.readlines()
352 file_lines = [line.replace("\n", "") for line in file_lines]
354 os.remove(tmp_file_path)
355 # The first line is the result of the command (0 success or 1 fail)
356 self.res_job = file_lines[0]
357 for job_path_remote in file_lines[1:]:
359 # For each command, there is two files to get :
360 # 1- The xml file describing the command and giving the
362 # 2- The txt file containing the system command traces (like
363 # traces produced by the "make" command)
364 if os.path.basename(os.path.dirname(job_path_remote)) != 'OUT':
366 local_path = os.path.join(os.path.dirname(
367 self.logger.logFilePath),
368 os.path.basename(job_path_remote))
371 local_path = os.path.join(os.path.dirname(
372 self.logger.logFilePath),
374 os.path.basename(job_path_remote))
376 if not os.path.exists(local_path):
377 self.machine.sftp.get(job_path_remote, local_path)
378 self.remote_log_files.append(local_path)
379 except Exception as e:
380 self.err += _("Unable to get %s log file from remote: %s" %
381 (job_path_remote, str(e)))
383 def has_failed(self):
384 '''Returns True if the job has failed.
385 A job is considered as failed if the machine could not be reached,
386 if the remote command failed,
387 or if the job finished with a time out.
389 :return: True if the job has failed
392 if not self.has_finished():
394 if not self.machine.successfully_connected(self.logger):
396 if self.is_timeout():
398 if self.res_job == "1":
403 """In case of a failing job, one has to cancel every job that depend
404 on it. This method put the job as failed and will not be executed.
406 self._has_begun = True
407 self._has_finished = True
408 self.cancelled = True
409 self.out = _("This job was not launched because its father has failed.")
410 self.err = _("This job was not launched because its father has failed.")
412 def is_running(self):
413 '''Returns True if the job commands are running
415 :return: True if the job is running
418 return self.has_begun() and not self.has_finished()
420 def is_timeout(self):
421 '''Returns True if the job commands has finished with timeout
423 :return: True if the job has finished with timeout
426 return self._has_timouted
428 def time_elapsed(self):
429 """Get the time elapsed since the job launching
431 :return: The number of seconds
434 if not self.has_begun():
437 return T_now - self._T0
439 def check_time(self):
440 """Verify that the job has not exceeded its timeout.
441 If it has, kill the remote command and consider the job as finished.
443 if not self.has_begun():
445 if self.time_elapsed() > self.timeout:
446 self._has_finished = True
447 self._has_timouted = True
448 self._Tf = time.time()
450 (out_kill, _) = self.kill_remote_process()
451 self.out = "TIMEOUT \n" + out_kill.read()
452 self.err = "TIMEOUT : %s seconds elapsed\n" % str(self.timeout)
455 except Exception as e:
456 self.err += _("Unable to get remote log files: %s" % e)
458 def total_duration(self):
459 """Give the total duration of the job
461 :return: the total duration of the job in seconds
464 return self._Tf - self._T0
467 """Launch the job by executing the remote command.
470 # Prevent multiple run
472 msg = _("Warning: A job can only be launched one time")
473 msg2 = _("Trying to launch the job \"%s\" whereas it has "
474 "already been launched." % self.name)
475 self.logger.write(src.printcolors.printcWarning("%s\n%s\n" % (msg, msg2)))
478 # Do not execute the command if the machine could not be reached
479 if not self.machine.successfully_connected(self.logger):
480 self._has_finished = True
482 self.err = ("Connection to machine (name : %s, host: %s, port:"
483 " %s, user: %s) has failed\nUse the log command "
484 "to get more information."
485 % (self.machine.name,
490 # Usual case : Launch the command on remote machine
491 self._T0 = time.time()
492 self._stdin, self._stdout, self._stderr = self.machine.exec_command(
493 self.command, self.logger)
494 # If the results are not initialized, finish the job
495 if (self._stdin, self._stdout, self._stderr) == (None, None, None):
496 self._has_finished = True
497 self._Tf = time.time()
499 self.err = "The server failed to execute the command"
501 # Put the beginning flag to true.
502 self._has_begun = True
504 def write_results(self):
505 """Display on the terminal all the job's information
507 self.logger.write("name : " + self.name + "\n")
509 self.logger.write("after : %s\n" % self.after)
510 self.logger.write("Time elapsed : %4imin %2is \n" %
511 (self.total_duration()/60 , self.total_duration()%60))
513 self.logger.write("Begin time : %s\n" %
514 time.strftime('%Y-%m-%d %H:%M:%S',
515 time.localtime(self._T0)) )
517 self.logger.write("End time : %s\n\n" %
518 time.strftime('%Y-%m-%d %H:%M:%S',
519 time.localtime(self._Tf)) )
521 machine_head = "Informations about connection :\n"
522 underline = (len(machine_head) - 2) * "-"
523 self.logger.write(src.printcolors.printcInfo(
524 machine_head+underline+"\n"))
525 self.machine.write_info(self.logger)
527 self.logger.write(src.printcolors.printcInfo("out : \n"))
529 self.logger.write("Unable to get output\n")
531 self.logger.write(self.out + "\n")
532 self.logger.write(src.printcolors.printcInfo("err : \n"))
534 self.logger.write("Unable to get error\n")
536 self.logger.write(self.err + "\n")
538 def get_status(self):
539 """Get the status of the job (used by the Gui for xml display)
541 :return: The current status of the job
544 if not self.machine.successfully_connected(self.logger):
545 return "SSH connection KO"
546 if not self.has_begun():
547 return "Not launched"
550 if self.is_running():
551 return "running since " + time.strftime('%Y-%m-%d %H:%M:%S',
552 time.localtime(self._T0))
553 if self.has_finished():
554 if self.is_timeout():
555 return "Timeout since " + time.strftime('%Y-%m-%d %H:%M:%S',
556 time.localtime(self._Tf))
557 return "Finished since " + time.strftime('%Y-%m-%d %H:%M:%S',
558 time.localtime(self._Tf))
561 '''Class to manage the jobs to be run
569 lenght_columns = 20):
570 # The jobs configuration
571 self.cfg_jobs = config_jobs
572 self.job_file = job_file
573 self.job_file_path = job_file_path
574 # The machine that will be used today
576 # The list of machine (hosts, port) that will be used today
577 # (a same host can have several machine instances since there
578 # can be several ssh parameters)
580 # The jobs to be launched today
582 # The jobs that will not be launched today
583 self.ljobs_not_today = []
586 # The correlation dictionary between jobs and machines
587 self.dic_job_machine = {}
588 self.len_columns = lenght_columns
590 # the list of jobs that have not been run yet
591 self._l_jobs_not_started = []
592 # the list of jobs that have already ran
593 self._l_jobs_finished = []
594 # the list of jobs that are running
595 self._l_jobs_running = []
597 self.determine_jobs_and_machines()
599 def define_job(self, job_def, machine):
600 '''Takes a pyconf job definition and a machine (from class machine)
601 and returns the job instance corresponding to the definition.
603 :param job_def src.config.Mapping: a job definition
604 :param machine machine: the machine on which the job will run
605 :return: The corresponding job in a job class instance
609 cmmnds = job_def.commands
610 timeout = job_def.timeout
612 if 'after' in job_def:
613 after = job_def.after
615 if 'application' in job_def:
616 application = job_def.application
618 if 'distribution' in job_def:
619 distribution = job_def.distribution
621 if 'table' in job_def:
622 table = job_def.table
636 def determine_jobs_and_machines(self):
637 '''Function that reads the pyconf jobs definition and instantiates all
638 the machines and jobs to be done today.
643 today = datetime.date.weekday(datetime.date.today())
646 for job_def in self.cfg_jobs.jobs :
648 if not "machine" in job_def:
649 msg = _('WARNING: The job "%s" do not have the key '
650 '"machine", this job is ignored.\n\n' % job_def.name)
651 self.logger.write(src.printcolors.printcWarning(msg))
653 name_machine = job_def.machine
656 for mach in self.lmachines:
657 if mach.name == name_machine:
661 if a_machine == None:
662 for machine_def in self.cfg_jobs.machines:
663 if machine_def.name == name_machine:
664 if 'host' not in machine_def:
665 host = self.runner.cfg.VARS.hostname
667 host = machine_def.host
669 if 'user' not in machine_def:
670 user = self.runner.cfg.VARS.user
672 user = machine_def.user
674 if 'port' not in machine_def:
677 port = machine_def.port
679 if 'password' not in machine_def:
682 passwd = machine_def.password
684 if 'sat_path' not in machine_def:
685 sat_path = "salomeTools"
687 sat_path = machine_def.sat_path
698 self.lmachines.append(a_machine)
699 if (host, port) not in host_list:
700 host_list.append((host, port))
702 if a_machine == None:
703 msg = _("WARNING: The job \"%(job_name)s\" requires the "
704 "machine \"%(machine_name)s\" but this machine "
705 "is not defined in the configuration file.\n"
706 "The job will not be launched")
707 self.logger.write(src.printcolors.printcWarning(msg))
709 a_job = self.define_job(job_def, a_machine)
710 self.dic_job_machine[a_job] = a_machine
712 if today in job_def.when:
713 self.ljobs.append(a_job)
714 else: # today in job_def.when
715 self.ljobs_not_today.append(a_job)
717 self.lhosts = host_list
719 def ssh_connection_all_machines(self, pad=50):
720 '''Function that do the ssh connection to every machine
726 self.logger.write(src.printcolors.printcInfo((
727 "Establishing connection with all the machines :\n")))
728 for machine in self.lmachines:
729 # little algorithm in order to display traces
730 begin_line = (_("Connection to %s: " % machine.name))
731 if pad - len(begin_line) < 0:
734 endline = (pad - len(begin_line)) * "." + " "
736 step = "SSH connection"
737 self.logger.write( begin_line + endline + step)
739 # the call to the method that initiate the ssh connection
740 msg = machine.connect(self.logger)
742 # Copy salomeTools to the remote machine
743 if machine.successfully_connected(self.logger):
745 self.logger.write('\r%s%s%s' % (begin_line, endline, 20 * " "),3)
746 self.logger.write('\r%s%s%s' % (begin_line, endline, step), 3)
748 res_copy = machine.copy_sat(self.runner.cfg.VARS.salometoolsway,
750 # Print the status of the copy
752 self.logger.write('\r%s' %
753 ((len(begin_line)+len(endline)+20) * " "), 3)
754 self.logger.write('\r%s%s%s' %
757 src.printcolors.printc(src.OK_STATUS)), 3)
759 self.logger.write('\r%s' %
760 ((len(begin_line)+len(endline)+20) * " "), 3)
761 self.logger.write('\r%s%s%s %s' %
764 src.printcolors.printc(src.OK_STATUS),
765 _("Copy of SAT failed")), 3)
767 self.logger.write('\r%s' %
768 ((len(begin_line)+len(endline)+20) * " "), 3)
769 self.logger.write('\r%s%s%s %s' %
772 src.printcolors.printc(src.KO_STATUS),
774 self.logger.write("\n", 3)
776 self.logger.write("\n")
779 def is_occupied(self, hostname):
780 '''Function that returns True if a job is running on
781 the machine defined by its host and its port.
783 :param hostname (str, int): the pair (host, port)
784 :return: the job that is running on the host,
785 or false if there is no job running on the host.
790 for jb in self.dic_job_machine:
791 if jb.machine.host == host and jb.machine.port == port:
796 def update_jobs_states_list(self):
797 '''Function that updates the lists that store the currently
798 running jobs and the jobs that have already finished.
803 jobs_finished_list = []
804 jobs_running_list = []
805 for jb in self.dic_job_machine:
807 jobs_running_list.append(jb)
809 if jb.has_finished():
810 jobs_finished_list.append(jb)
812 nb_job_finished_before = len(self._l_jobs_finished)
813 self._l_jobs_finished = jobs_finished_list
814 self._l_jobs_running = jobs_running_list
816 nb_job_finished_now = len(self._l_jobs_finished)
818 return nb_job_finished_now > nb_job_finished_before
820 def cancel_dependencies_of_failing_jobs(self):
821 '''Function that cancels all the jobs that depend on a failing one.
827 for job in self.ljobs:
828 if job.after is None:
830 father_job = self.find_job_that_has_name(job.after)
831 if father_job.has_failed():
834 def find_job_that_has_name(self, name):
835 '''Returns the job by its name.
837 :param name str: a job name
838 :return: the job that has the name.
841 for jb in self.ljobs:
845 # the following is executed only if the job was not found
846 msg = _('The job "%s" seems to be nonexistent') % name
847 raise src.SatException(msg)
849 def str_of_length(self, text, length):
850 '''Takes a string text of any length and returns
851 the most close string of length "length".
853 :param text str: any string
854 :param length int: a length for the returned string
855 :return: the most close string of length "length"
858 if len(text) > length:
859 text_out = text[:length-3] + '...'
861 diff = length - len(text)
862 before = " " * (diff/2)
863 after = " " * (diff/2 + diff%2)
864 text_out = before + text + after
868 def display_status(self, len_col):
869 '''Takes a lenght and construct the display of the current status
870 of the jobs in an array that has a column for each host.
871 It displays the job that is currently running on the host
874 :param len_col int: the size of the column
880 for host_port in self.lhosts:
881 jb = self.is_occupied(host_port)
882 if not jb: # nothing running on the host
883 empty = self.str_of_length("empty", len_col)
884 display_line += "|" + empty
886 display_line += "|" + src.printcolors.printcInfo(
887 self.str_of_length(jb.name, len_col))
889 self.logger.write("\r" + display_line + "|")
894 '''The main method. Runs all the jobs on every host.
895 For each host, at a given time, only one job can be running.
896 The jobs that have the field after (that contain the job that has
897 to be run before it) are run after the previous job.
898 This method stops when all the jobs are finished.
905 self.logger.write(src.printcolors.printcInfo(
906 _('Executing the jobs :\n')))
908 for host_port in self.lhosts:
911 if port == 22: # default value
912 text_line += "|" + self.str_of_length(host, self.len_columns)
914 text_line += "|" + self.str_of_length(
915 "("+host+", "+str(port)+")", self.len_columns)
917 tiret_line = " " + "-"*(len(text_line)-1) + "\n"
918 self.logger.write(tiret_line)
919 self.logger.write(text_line + "|\n")
920 self.logger.write(tiret_line)
923 # The infinite loop that runs the jobs
924 l_jobs_not_started = self.dic_job_machine.keys()
925 while len(self._l_jobs_finished) != len(self.dic_job_machine.keys()):
926 new_job_start = False
927 for host_port in self.lhosts:
929 if self.is_occupied(host_port):
932 for jb in l_jobs_not_started:
933 if (jb.machine.host, jb.machine.port) != host_port:
937 l_jobs_not_started.remove(jb)
941 jb_before = self.find_job_that_has_name(jb.after)
942 if jb_before.has_finished():
944 l_jobs_not_started.remove(jb)
947 self.cancel_dependencies_of_failing_jobs()
948 new_job_finished = self.update_jobs_states_list()
950 if new_job_start or new_job_finished:
951 self.gui.update_xml_files(self.ljobs)
952 # Display the current status
953 self.display_status(self.len_columns)
955 # Make sure that the proc is not entirely busy
958 self.logger.write("\n")
959 self.logger.write(tiret_line)
960 self.logger.write("\n\n")
962 self.gui.update_xml_files(self.ljobs)
963 self.gui.last_update()
965 def write_all_results(self):
966 '''Display all the jobs outputs.
972 for jb in self.dic_job_machine.keys():
973 self.logger.write(src.printcolors.printcLabel(
974 "#------- Results for job %s -------#\n" % jb.name))
976 self.logger.write("\n\n")
979 '''Class to manage the the xml data that can be displayed in a browser to
983 def __init__(self, xml_dir_path, l_jobs, l_jobs_not_today):
986 :param xml_dir_path str: The path to the directory where to put
987 the xml resulting files
988 :param l_jobs List: the list of jobs that run today
989 :param l_jobs_not_today List: the list of jobs that do not run today
991 # The path of the global xml file
992 self.xml_dir_path = xml_dir_path
993 # Initialize the xml files
994 xml_global_path = os.path.join(self.xml_dir_path, "global_report.xml")
995 self.xml_global_file = src.xmlManager.XmlLogFile(xml_global_path,
997 # The xml files that corresponds to the tables.
998 # {name_table : xml_object}}
999 self.d_xml_table_files = {}
1000 # Create the lines and columns
1001 self.initialize_arrays(l_jobs, l_jobs_not_today)
1002 # Write the xml file
1003 self.update_xml_files(l_jobs)
1005 def initialize_arrays(self, l_jobs, l_jobs_not_today):
1006 '''Get all the first information needed for each file and write the
1007 first version of the files
1008 :param l_jobs List: the list of jobs that run today
1009 :param l_jobs_not_today List: the list of jobs that do not run today
1011 # Get the tables to fill and put it in a dictionary
1012 # {table_name : xml instance corresponding to the table}
1013 for job in l_jobs + l_jobs_not_today:
1015 if (table is not None and
1016 table not in self.d_xml_table_files.keys()):
1017 xml_table_path = os.path.join(self.xml_dir_path, table + ".xml")
1018 self.d_xml_table_files[table] = src.xmlManager.XmlLogFile(
1021 self.d_xml_table_files[table].add_simple_node("distributions")
1022 self.d_xml_table_files[table].add_simple_node("applications")
1023 self.d_xml_table_files[table].add_simple_node("table", text=table)
1025 # Loop over all jobs in order to get the lines and columns for each
1029 for table in self.d_xml_table_files:
1031 d_application[table] = []
1035 for job in l_jobs + l_jobs_not_today:
1037 if (job.machine.host, job.machine.port) not in l_hosts_ports:
1038 l_hosts_ports.append((job.machine.host, job.machine.port))
1040 distrib = job.distribution
1041 application = job.application
1043 table_job = job.table
1046 for table in self.d_xml_table_files:
1047 if table_job == table:
1048 if distrib is not None and distrib not in d_dist[table]:
1049 d_dist[table].append(distrib)
1050 src.xmlManager.add_simple_node(
1051 self.d_xml_table_files[table].xmlroot.find('distributions'),
1053 attrib={"name" : distrib})
1055 if table_job == table:
1056 if application is not None and application not in d_application[table]:
1057 d_application[table].append(application)
1058 src.xmlManager.add_simple_node(self.d_xml_table_files[table].xmlroot.find('applications'),
1060 attrib={"name" : application})
1062 # Initialize the hosts_ports node for the global file
1063 self.xmlhosts_ports = self.xml_global_file.add_simple_node("hosts_ports")
1064 for host, port in l_hosts_ports:
1065 host_port = "%s:%i" % (host, port)
1066 src.xmlManager.add_simple_node(self.xmlhosts_ports,
1068 attrib={"name" : host_port})
1070 # Initialize the jobs node in all files
1071 for xml_file in [self.xml_global_file] + self.d_xml_table_files.values():
1072 xml_jobs = xml_file.add_simple_node("jobs")
1073 # Get the jobs present in the config file but that will not be launched
1075 self.put_jobs_not_today(l_jobs_not_today, xml_jobs)
1077 xml_file.add_simple_node("infos", attrib={"name" : "last update", "JobsCommandStatus" : "running"})
1080 def put_jobs_not_today(self, l_jobs_not_today, xml_node_jobs):
1081 '''Get all the first information needed for each file and write the
1082 first version of the files
1084 :param xml_node_jobs etree.Element: the node corresponding to a job
1085 :param l_jobs_not_today List: the list of jobs that do not run today
1087 for job in l_jobs_not_today:
1088 xmlj = src.xmlManager.add_simple_node(xml_node_jobs,
1090 attrib={"name" : job.name})
1091 src.xmlManager.add_simple_node(xmlj, "application", job.application)
1092 src.xmlManager.add_simple_node(xmlj,
1095 src.xmlManager.add_simple_node(xmlj, "table", job.table)
1096 src.xmlManager.add_simple_node(xmlj,
1097 "commands", " ; ".join(job.commands))
1098 src.xmlManager.add_simple_node(xmlj, "state", "Not today")
1099 src.xmlManager.add_simple_node(xmlj, "machine", job.machine.name)
1100 src.xmlManager.add_simple_node(xmlj, "host", job.machine.host)
1101 src.xmlManager.add_simple_node(xmlj, "port", str(job.machine.port))
1102 src.xmlManager.add_simple_node(xmlj, "user", job.machine.user)
1103 src.xmlManager.add_simple_node(xmlj, "sat_path",
1104 job.machine.sat_path)
1106 def update_xml_files(self, l_jobs):
1107 '''Write all the xml files with updated information about the jobs
1109 :param l_jobs List: the list of jobs that run today
1111 for xml_file in [self.xml_global_file] + self.d_xml_table_files.values():
1112 self.update_xml_file(l_jobs, xml_file)
1115 self.write_xml_files()
1117 def update_xml_file(self, l_jobs, xml_file):
1118 '''update information about the jobs for the file xml_file
1120 :param l_jobs List: the list of jobs that run today
1121 :param xml_file xmlManager.XmlLogFile: the xml instance to update
1124 xml_node_jobs = xml_file.xmlroot.find('jobs')
1125 # Update the job names and status node
1127 # Find the node corresponding to the job and delete it
1128 # in order to recreate it
1129 for xmljob in xml_node_jobs.findall('job'):
1130 if xmljob.attrib['name'] == job.name:
1131 xml_node_jobs.remove(xmljob)
1135 T0 = time.strftime('%Y-%m-%d %H:%M:%S',
1136 time.localtime(job._T0))
1139 Tf = time.strftime('%Y-%m-%d %H:%M:%S',
1140 time.localtime(job._Tf))
1142 # recreate the job node
1143 xmlj = src.xmlManager.add_simple_node(xml_node_jobs,
1145 attrib={"name" : job.name})
1146 src.xmlManager.add_simple_node(xmlj, "machine", job.machine.name)
1147 src.xmlManager.add_simple_node(xmlj, "host", job.machine.host)
1148 src.xmlManager.add_simple_node(xmlj, "port", str(job.machine.port))
1149 src.xmlManager.add_simple_node(xmlj, "user", job.machine.user)
1150 src.xmlManager.add_simple_node(xmlj, "sat_path",
1151 job.machine.sat_path)
1152 src.xmlManager.add_simple_node(xmlj, "application", job.application)
1153 src.xmlManager.add_simple_node(xmlj, "distribution",
1155 src.xmlManager.add_simple_node(xmlj, "table", job.table)
1156 src.xmlManager.add_simple_node(xmlj, "timeout", str(job.timeout))
1157 src.xmlManager.add_simple_node(xmlj, "commands",
1158 " ; ".join(job.commands))
1159 src.xmlManager.add_simple_node(xmlj, "state", job.get_status())
1160 src.xmlManager.add_simple_node(xmlj, "begin", T0)
1161 src.xmlManager.add_simple_node(xmlj, "end", Tf)
1162 src.xmlManager.add_simple_node(xmlj, "out",
1163 src.printcolors.cleancolor(job.out))
1164 src.xmlManager.add_simple_node(xmlj, "err",
1165 src.printcolors.cleancolor(job.err))
1166 src.xmlManager.add_simple_node(xmlj, "res", str(job.res_job))
1167 if len(job.remote_log_files) > 0:
1168 src.xmlManager.add_simple_node(xmlj,
1169 "remote_log_file_path",
1170 job.remote_log_files[0])
1172 src.xmlManager.add_simple_node(xmlj,
1173 "remote_log_file_path",
1176 xmlafter = src.xmlManager.add_simple_node(xmlj, "after", job.after)
1177 # get the job father
1178 if job.after is not None:
1181 if jb.name == job.after:
1183 if job_father is None:
1184 msg = _("The job %(father_name)s that is parent of "
1185 "%(son_name)s is not in the job list." %
1186 {"father_name" : job.after , "son_name" : job.name})
1187 raise src.SatException(msg)
1189 if len(job_father.remote_log_files) > 0:
1190 link = job_father.remote_log_files[0]
1193 src.xmlManager.append_node_attrib(xmlafter, {"link" : link})
1197 xml_node_infos = xml_file.xmlroot.find('infos')
1198 src.xmlManager.append_node_attrib(xml_node_infos,
1200 datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")})
1204 def last_update(self, finish_status = "finished"):
1205 '''update information about the jobs for the file xml_file
1207 :param l_jobs List: the list of jobs that run today
1208 :param xml_file xmlManager.XmlLogFile: the xml instance to update
1210 for xml_file in [self.xml_global_file] + self.d_xml_table_files.values():
1211 xml_node_infos = xml_file.xmlroot.find('infos')
1212 src.xmlManager.append_node_attrib(xml_node_infos,
1213 attrib={"JobsCommandStatus" : finish_status})
1215 self.write_xml_files()
1217 def write_xml_files(self):
1218 ''' Write the xml files
1220 self.xml_global_file.write_tree(STYLESHEET_GLOBAL)
1221 for xml_file in self.d_xml_table_files.values():
1222 xml_file.write_tree(STYLESHEET_TABLE)
1225 # Describes the command
1227 return _("The jobs command launches maintenances that are described"
1228 " in the dedicated jobs configuration file.")
1232 def run(args, runner, logger):
1234 (options, args) = parser.parse_args(args)
1236 jobs_cfg_files_dir = runner.cfg.SITE.jobs.config_path
1238 l_cfg_dir = [jobs_cfg_files_dir,
1239 os.path.join(runner.cfg.VARS.datadir, "jobs")]
1241 # Make sure the path to the jobs config files directory exists
1242 src.ensure_path_exists(jobs_cfg_files_dir)
1244 # list option : display all the available config files
1246 for cfg_dir in l_cfg_dir:
1247 if not options.no_label:
1248 logger.write("------ %s\n" %
1249 src.printcolors.printcHeader(cfg_dir))
1251 for f in sorted(os.listdir(cfg_dir)):
1252 if not f.endswith('.pyconf'):
1255 logger.write("%s\n" % cfilename)
1258 # Make sure the jobs_config option has been called
1259 if not options.jobs_cfg:
1260 message = _("The option --jobs_config is required\n")
1261 src.printcolors.printcError(message)
1264 # Find the file in the directories
1266 for cfg_dir in l_cfg_dir:
1267 file_jobs_cfg = os.path.join(cfg_dir, options.jobs_cfg)
1268 if not file_jobs_cfg.endswith('.pyconf'):
1269 file_jobs_cfg += '.pyconf'
1271 if not os.path.exists(file_jobs_cfg):
1278 msg = _("The file configuration %(name_file)s was not found."
1279 "\nUse the --list option to get the possible files.")
1280 src.printcolors.printcError(msg)
1284 (_("Platform"), runner.cfg.VARS.dist),
1285 (_("File containing the jobs configuration"), file_jobs_cfg)
1287 src.print_info(logger, info)
1289 # Read the config that is in the file
1290 config_jobs = src.read_config_from_a_file(file_jobs_cfg)
1291 if options.only_jobs:
1292 l_jb = src.pyconf.Sequence()
1293 for jb in config_jobs.jobs:
1294 if jb.name in options.only_jobs:
1296 "Adding a job that was given in only_jobs option parameters")
1297 config_jobs.jobs = l_jb
1300 today_jobs = Jobs(runner,
1305 # SSH connection to all machines
1306 today_jobs.ssh_connection_all_machines()
1307 if options.test_connection:
1312 gui = Gui("/export/home/serioja/LOGS",
1314 today_jobs.ljobs_not_today,)
1316 today_jobs.gui = gui
1320 # Run all the jobs contained in config_jobs
1321 today_jobs.run_jobs()
1322 except KeyboardInterrupt:
1324 logger.write("\n\n%s\n\n" %
1325 (src.printcolors.printcWarning(_("Forced interruption"))), 1)
1328 # find the potential not finished jobs and kill them
1329 for jb in today_jobs.ljobs:
1330 if not jb.has_finished():
1331 jb.kill_remote_process()
1333 today_jobs.gui.last_update(_("Forced interruption"))
1335 today_jobs.gui.last_update()
1336 # Output the results
1337 today_jobs.write_all_results()