3 # Copyright (C) 2010-2013 CEA/DEN
5 # This library is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation; either
8 # version 2.1 of the License.
10 # This library is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 # Lesser General Public License for more details.
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this library; if not, write to the Free Software
17 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
26 STYLESHEET_GLOBAL = "jobs_global_report.xsl"
27 STYLESHEET_TABLE = "jobs_table_report.xsl"
29 parser = src.options.Options()
31 parser.add_option('j', 'jobs_config', 'string', 'jobs_cfg',
32 _('The name of the config file that contains'
33 ' the jobs configuration'))
34 parser.add_option('o', 'only_jobs', 'list2', 'only_jobs',
35 _('The list of jobs to launch, by their name. '))
36 parser.add_option('l', 'list', 'boolean', 'list',
37 _('list all available config files.'))
38 parser.add_option('n', 'no_label', 'boolean', 'no_label',
39 _("do not print labels, Works only with --list."), False)
40 parser.add_option('t', 'test_connection', 'boolean', 'test_connection',
41 _("Try to connect to the machines. Not executing the jobs."),
43 parser.add_option('p', 'publish', 'boolean', 'publish',
44 _("Generate an xml file that can be read in a browser to "
45 "display the jobs status."),
48 class Machine(object):
49 '''Class to manage a ssh connection on a machine
57 sat_path="salomeTools"):
62 self.password = passwd
63 self.sat_path = sat_path
64 self.ssh = paramiko.SSHClient()
65 self._connection_successful = None
67 def connect(self, logger):
68 '''Initiate the ssh connection to the remote machine
70 :param logger src.logger.Logger: The logger instance
75 self._connection_successful = False
76 self.ssh.load_system_host_keys()
77 self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
79 self.ssh.connect(self.host,
82 password = self.password)
83 except paramiko.AuthenticationException:
84 message = src.KO_STATUS + _("Authentication failed")
85 except paramiko.BadHostKeyException:
86 message = (src.KO_STATUS +
87 _("The server's host key could not be verified"))
88 except paramiko.SSHException:
89 message = ( _("SSHException error connecting or "
90 "establishing an SSH session"))
92 message = ( _("Error connecting or establishing an SSH session"))
94 self._connection_successful = True
98 def successfully_connected(self, logger):
99 '''Verify if the connection to the remote machine has succeed
101 :param logger src.logger.Logger: The logger instance
102 :return: True if the connection has succeed, False if not
105 if self._connection_successful == None:
106 message = _("Warning : trying to ask if the connection to "
107 "(name: %s host: %s, port: %s, user: %s) is OK whereas there were"
108 " no connection request" %
109 (self.name, self.host, self.port, self.user))
110 logger.write( src.printcolors.printcWarning(message))
111 return self._connection_successful
113 def copy_sat(self, sat_local_path, job_file):
114 '''Copy salomeTools to the remote machine in self.sat_path
118 self.sftp = self.ssh.open_sftp()
119 self.mkdir(self.sat_path, ignore_existing=True)
120 self.put_dir(sat_local_path, self.sat_path, filters = ['.git'])
121 job_file_name = os.path.basename(job_file)
122 self.sftp.put(job_file, os.path.join(self.sat_path,
126 except Exception as e:
128 self._connection_successful = False
132 def put_dir(self, source, target, filters = []):
133 ''' Uploads the contents of the source directory to the target path. The
134 target directory needs to exists. All subdirectories in source are
135 created under target.
137 for item in os.listdir(source):
140 source_path = os.path.join(source, item)
141 destination_path = os.path.join(target, item)
142 if os.path.islink(source_path):
143 linkto = os.readlink(source_path)
145 self.sftp.symlink(linkto, destination_path)
146 self.sftp.chmod(destination_path,
147 os.stat(source_path).st_mode)
151 if os.path.isfile(source_path):
152 self.sftp.put(source_path, destination_path)
153 self.sftp.chmod(destination_path,
154 os.stat(source_path).st_mode)
156 self.mkdir(destination_path, ignore_existing=True)
157 self.put_dir(source_path, destination_path)
159 def mkdir(self, path, mode=511, ignore_existing=False):
160 ''' Augments mkdir by adding an option to not fail
164 self.sftp.mkdir(path, mode)
171 def exec_command(self, command, logger):
172 '''Execute the command on the remote machine
174 :param command str: The command to be run
175 :param logger src.logger.Logger: The logger instance
176 :return: the stdin, stdout, and stderr of the executing command,
178 :rtype: (paramiko.channel.ChannelFile, paramiko.channel.ChannelFile,
179 paramiko.channel.ChannelFile)
182 # Does not wait the end of the command
183 (stdin, stdout, stderr) = self.ssh.exec_command(command)
184 except paramiko.SSHException:
185 message = src.KO_STATUS + _(
186 ": the server failed to execute the command\n")
187 logger.write( src.printcolors.printcError(message))
188 return (None, None, None)
190 logger.write( src.printcolors.printcError(src.KO_STATUS + '\n'))
191 return (None, None, None)
193 return (stdin, stdout, stderr)
196 '''Close the ssh connection
202 def write_info(self, logger):
203 '''Prints the informations relative to the machine in the logger
204 (terminal traces and log file)
206 :param logger src.logger.Logger: The logger instance
210 logger.write("host : " + self.host + "\n")
211 logger.write("port : " + str(self.port) + "\n")
212 logger.write("user : " + str(self.user) + "\n")
213 if self.successfully_connected(logger):
214 status = src.OK_STATUS
216 status = src.KO_STATUS
217 logger.write("Connection : " + status + "\n\n")
221 '''Class to manage one job
223 def __init__(self, name, machine, application, distribution, table,
224 commands, timeout, config, logger, job_file, after=None):
227 self.machine = machine
229 self.timeout = timeout
230 self.application = application
231 self.distribution = distribution
235 # The list of log files to download from the remote machine
236 self.remote_log_files = []
238 # The remote command status
239 # -1 means that it has not been launched,
240 # 0 means success and 1 means fail
242 self.cancelled = False
246 self._has_begun = False
247 self._has_finished = False
248 self._has_timouted = False
249 self._stdin = None # Store the command inputs field
250 self._stdout = None # Store the command outputs field
251 self._stderr = None # Store the command errors field
253 self.out = None # Contains something only if the job is finished
254 self.err = None # Contains something only if the job is finished
256 self.commands = commands
257 self.command = (os.path.join(self.machine.sat_path, "sat") +
259 os.path.join(self.machine.sat_path,
260 "list_log_files.txt") +
261 " job --jobs_config " +
268 cmd_pid = 'ps aux | grep "' + self.command + '" | awk \'{print $2}\''
269 (_, out_pid, _) = self.machine.exec_command(cmd_pid, self.logger)
270 pids_cmd = out_pid.readlines()
271 pids_cmd = [str(src.only_numbers(pid)) for pid in pids_cmd]
275 def kill_remote_process(self):
276 '''Kills the process on the remote machine.
278 :return: (the output of the kill, the error of the kill)
282 pids = self.get_pids()
283 cmd_kill = " ; ".join([("kill -2 " + pid) for pid in pids])
284 (_, out_kill, err_kill) = self.machine.exec_command(cmd_kill,
286 return (out_kill, err_kill)
289 '''Returns True if the job has already begun
291 :return: True if the job has already begun
294 return self._has_begun
296 def has_finished(self):
297 '''Returns True if the job has already finished
298 (i.e. all the commands have been executed)
299 If it is finished, the outputs are stored in the fields out and err.
301 :return: True if the job has already finished
305 # If the method has already been called and returned True
306 if self._has_finished:
309 # If the job has not begun yet
310 if not self.has_begun():
313 if self._stdout.channel.closed:
314 self._has_finished = True
315 # Store the result outputs
316 self.out = self._stdout.read()
317 self.err = self._stderr.read()
319 self._Tf = time.time()
320 # And get the remote command status and log files
323 return self._has_finished
325 def get_log_files(self):
326 if not self.has_finished():
327 msg = _("Trying to get log files whereas the job is not finished.")
328 self.logger.write(src.printcolors.printcWarning(msg))
331 tmp_file_path = src.get_tmp_filename(self.config, "list_log_files.txt")
332 self.machine.sftp.get(
333 os.path.join(self.machine.sat_path, "list_log_files.txt"),
336 fstream_tmp = open(tmp_file_path, "r")
337 file_lines = fstream_tmp.readlines()
338 file_lines = [line.replace("\n", "") for line in file_lines]
340 os.remove(tmp_file_path)
341 self.res_job = file_lines[0]
342 for job_path_remote in file_lines[1:]:
343 if os.path.basename(os.path.dirname(job_path_remote)) != 'OUT':
344 local_path = os.path.join(os.path.dirname(
345 self.logger.logFilePath),
346 os.path.basename(job_path_remote))
347 if not os.path.exists(local_path):
348 self.machine.sftp.get(job_path_remote, local_path)
350 local_path = os.path.join(os.path.dirname(
351 self.logger.logFilePath),
353 os.path.basename(job_path_remote))
354 if not os.path.exists(local_path):
355 self.machine.sftp.get(job_path_remote, local_path)
356 self.remote_log_files.append(local_path)
358 def has_failed(self):
359 '''Returns True if the job has failed.
360 A job is considered as failed if the machine could not be reached,
361 if the remote command failed,
362 or if the job finished with a time out.
364 :return: True if the job has failed
367 if not self.has_finished():
369 if not self.machine.successfully_connected(self.logger):
371 if self.is_timeout():
373 if self.res_job == "1":
378 """In case of a failing job, one has to cancel every job that depend
379 on it. This method put the job as failed and will not be executed.
381 self._has_begun = True
382 self._has_finished = True
383 self.cancelled = True
384 self.out = _("This job was not launched because its father has failed.")
385 self.err = _("This job was not launched because its father has failed.")
387 def is_running(self):
388 '''Returns True if the job commands are running
390 :return: True if the job is running
393 return self.has_begun() and not self.has_finished()
395 def is_timeout(self):
396 '''Returns True if the job commands has finished with timeout
398 :return: True if the job has finished with timeout
401 return self._has_timouted
403 def time_elapsed(self):
404 if not self.has_begun():
407 return T_now - self._T0
409 def check_time(self):
410 if not self.has_begun():
412 if self.time_elapsed() > self.timeout:
413 self._has_finished = True
414 self._has_timouted = True
415 self._Tf = time.time()
417 (out_kill, _) = self.kill_remote_process()
418 self.out = "TIMEOUT \n" + out_kill.read()
419 self.err = "TIMEOUT : %s seconds elapsed\n" % str(self.timeout)
421 def total_duration(self):
422 return self._Tf - self._T0
424 def run(self, logger):
426 print("Warn the user that a job can only be launched one time")
429 if not self.machine.successfully_connected(logger):
430 self._has_finished = True
432 self.err = ("Connection to machine (name : %s, host: %s, port:"
433 " %s, user: %s) has failed\nUse the log command "
434 "to get more information."
435 % (self.machine.name,
440 self._T0 = time.time()
441 self._stdin, self._stdout, self._stderr = self.machine.exec_command(
442 self.command, logger)
443 if (self._stdin, self._stdout, self._stderr) == (None, None, None):
444 self._has_finished = True
445 self._Tf = time.time()
447 self.err = "The server failed to execute the command"
449 self._has_begun = True
451 def write_results(self, logger):
452 logger.write("name : " + self.name + "\n")
454 logger.write("after : %s\n" % self.after)
455 logger.write("Time elapsed : %4imin %2is \n" %
456 (self.total_duration()/60 , self.total_duration()%60))
458 logger.write("Begin time : %s\n" %
459 time.strftime('%Y-%m-%d %H:%M:%S',
460 time.localtime(self._T0)) )
462 logger.write("End time : %s\n\n" %
463 time.strftime('%Y-%m-%d %H:%M:%S',
464 time.localtime(self._Tf)) )
466 machine_head = "Informations about connection :\n"
467 underline = (len(machine_head) - 2) * "-"
468 logger.write(src.printcolors.printcInfo(machine_head+underline+"\n"))
469 self.machine.write_info(logger)
471 logger.write(src.printcolors.printcInfo("out : \n"))
473 logger.write("Unable to get output\n")
475 logger.write(self.out + "\n")
476 logger.write(src.printcolors.printcInfo("err : \n"))
478 logger.write("Unable to get error\n")
480 logger.write(self.err + "\n")
482 def get_status(self):
483 if not self.machine.successfully_connected(self.logger):
484 return "SSH connection KO"
485 if not self.has_begun():
486 return "Not launched"
489 if self.is_running():
490 return "running since " + time.strftime('%Y-%m-%d %H:%M:%S',
491 time.localtime(self._T0))
492 if self.has_finished():
493 if self.is_timeout():
494 return "Timeout since " + time.strftime('%Y-%m-%d %H:%M:%S',
495 time.localtime(self._Tf))
496 return "Finished since " + time.strftime('%Y-%m-%d %H:%M:%S',
497 time.localtime(self._Tf))
500 '''Class to manage the jobs to be run
508 lenght_columns = 20):
509 # The jobs configuration
510 self.cfg_jobs = config_jobs
511 self.job_file = job_file
512 self.job_file_path = job_file_path
513 # The machine that will be used today
515 # The list of machine (hosts, port) that will be used today
516 # (a same host can have several machine instances since there
517 # can be several ssh parameters)
519 # The jobs to be launched today
521 # The jobs that will not be launched today
522 self.ljobs_not_today = []
525 # The correlation dictionary between jobs and machines
526 self.dic_job_machine = {}
527 self.len_columns = lenght_columns
529 # the list of jobs that have not been run yet
530 self._l_jobs_not_started = []
531 # the list of jobs that have already ran
532 self._l_jobs_finished = []
533 # the list of jobs that are running
534 self._l_jobs_running = []
536 self.determine_jobs_and_machines()
538 def define_job(self, job_def, machine):
539 '''Takes a pyconf job definition and a machine (from class machine)
540 and returns the job instance corresponding to the definition.
542 :param job_def src.config.Mapping: a job definition
543 :param machine machine: the machine on which the job will run
544 :return: The corresponding job in a job class instance
548 cmmnds = job_def.commands
549 timeout = job_def.timeout
551 if 'after' in job_def:
552 after = job_def.after
554 if 'application' in job_def:
555 application = job_def.application
557 if 'distribution' in job_def:
558 distribution = job_def.distribution
560 if 'table' in job_def:
561 table = job_def.table
575 def determine_jobs_and_machines(self):
576 '''Function that reads the pyconf jobs definition and instantiates all
577 the machines and jobs to be done today.
582 today = datetime.date.weekday(datetime.date.today())
585 for job_def in self.cfg_jobs.jobs :
587 if not "machine" in job_def:
588 msg = _('WARNING: The job "%s" do not have the key '
589 '"machine", this job is ignored.\n\n' % job_def.name)
590 self.logger.write(src.printcolors.printcWarning(msg))
592 name_machine = job_def.machine
595 for mach in self.lmachines:
596 if mach.name == name_machine:
600 if a_machine == None:
601 for machine_def in self.cfg_jobs.machines:
602 if machine_def.name == name_machine:
603 if 'host' not in machine_def:
604 host = self.runner.cfg.VARS.hostname
606 host = machine_def.host
608 if 'user' not in machine_def:
609 user = self.runner.cfg.VARS.user
611 user = machine_def.user
613 if 'port' not in machine_def:
616 port = machine_def.port
618 if 'password' not in machine_def:
621 passwd = machine_def.password
623 if 'sat_path' not in machine_def:
624 sat_path = "salomeTools"
626 sat_path = machine_def.sat_path
637 self.lmachines.append(a_machine)
638 if (host, port) not in host_list:
639 host_list.append((host, port))
641 if a_machine == None:
642 msg = _("WARNING: The job \"%(job_name)s\" requires the "
643 "machine \"%(machine_name)s\" but this machine "
644 "is not defined in the configuration file.\n"
645 "The job will not be launched")
646 self.logger.write(src.printcolors.printcWarning(msg))
648 a_job = self.define_job(job_def, a_machine)
649 self.dic_job_machine[a_job] = a_machine
651 if today in job_def.when:
652 self.ljobs.append(a_job)
653 else: # today in job_def.when
654 self.ljobs_not_today.append(a_job)
656 self.lhosts = host_list
658 def ssh_connection_all_machines(self, pad=50):
659 '''Function that do the ssh connection to every machine
665 self.logger.write(src.printcolors.printcInfo((
666 "Establishing connection with all the machines :\n")))
667 for machine in self.lmachines:
668 # little algorithm in order to display traces
669 begin_line = (_("Connection to %s: " % machine.name))
670 if pad - len(begin_line) < 0:
673 endline = (pad - len(begin_line)) * "." + " "
675 step = "SSH connection"
676 self.logger.write( begin_line + endline + step)
678 # the call to the method that initiate the ssh connection
679 msg = machine.connect(self.logger)
681 # Copy salomeTools to the remote machine
682 if machine.successfully_connected(self.logger):
684 self.logger.write('\r%s%s%s' % (begin_line, endline, 20 * " "),3)
685 self.logger.write('\r%s%s%s' % (begin_line, endline, step), 3)
687 res_copy = machine.copy_sat(self.runner.cfg.VARS.salometoolsway,
689 # Print the status of the copy
691 self.logger.write('\r%s' %
692 ((len(begin_line)+len(endline)+20) * " "), 3)
693 self.logger.write('\r%s%s%s' %
696 src.printcolors.printc(src.OK_STATUS)), 3)
698 self.logger.write('\r%s' %
699 ((len(begin_line)+len(endline)+20) * " "), 3)
700 self.logger.write('\r%s%s%s %s' %
703 src.printcolors.printc(src.OK_STATUS),
704 _("Copy of SAT failed")), 3)
706 self.logger.write('\r%s' %
707 ((len(begin_line)+len(endline)+20) * " "), 3)
708 self.logger.write('\r%s%s%s %s' %
711 src.printcolors.printc(src.KO_STATUS),
713 self.logger.write("\n", 3)
715 self.logger.write("\n")
718 def is_occupied(self, hostname):
719 '''Function that returns True if a job is running on
720 the machine defined by its host and its port.
722 :param hostname (str, int): the pair (host, port)
723 :return: the job that is running on the host,
724 or false if there is no job running on the host.
729 for jb in self.dic_job_machine:
730 if jb.machine.host == host and jb.machine.port == port:
735 def update_jobs_states_list(self):
736 '''Function that updates the lists that store the currently
737 running jobs and the jobs that have already finished.
742 jobs_finished_list = []
743 jobs_running_list = []
744 for jb in self.dic_job_machine:
746 jobs_running_list.append(jb)
748 if jb.has_finished():
749 jobs_finished_list.append(jb)
751 nb_job_finished_before = len(self._l_jobs_finished)
752 self._l_jobs_finished = jobs_finished_list
753 self._l_jobs_running = jobs_running_list
755 nb_job_finished_now = len(self._l_jobs_finished)
757 return nb_job_finished_now > nb_job_finished_before
759 def cancel_dependencies_of_failing_jobs(self):
760 '''Function that cancels all the jobs that depend on a failing one.
766 for job in self.ljobs:
767 if job.after is None:
769 father_job = self.find_job_that_has_name(job.after)
770 if father_job.has_failed():
773 def find_job_that_has_name(self, name):
774 '''Returns the job by its name.
776 :param name str: a job name
777 :return: the job that has the name.
780 for jb in self.ljobs:
784 # the following is executed only if the job was not found
785 msg = _('The job "%s" seems to be nonexistent') % name
786 raise src.SatException(msg)
788 def str_of_length(self, text, length):
789 '''Takes a string text of any length and returns
790 the most close string of length "length".
792 :param text str: any string
793 :param length int: a length for the returned string
794 :return: the most close string of length "length"
797 if len(text) > length:
798 text_out = text[:length-3] + '...'
800 diff = length - len(text)
801 before = " " * (diff/2)
802 after = " " * (diff/2 + diff%2)
803 text_out = before + text + after
807 def display_status(self, len_col):
808 '''Takes a lenght and construct the display of the current status
809 of the jobs in an array that has a column for each host.
810 It displays the job that is currently running on the host
813 :param len_col int: the size of the column
819 for host_port in self.lhosts:
820 jb = self.is_occupied(host_port)
821 if not jb: # nothing running on the host
822 empty = self.str_of_length("empty", len_col)
823 display_line += "|" + empty
825 display_line += "|" + src.printcolors.printcInfo(
826 self.str_of_length(jb.name, len_col))
828 self.logger.write("\r" + display_line + "|")
833 '''The main method. Runs all the jobs on every host.
834 For each host, at a given time, only one job can be running.
835 The jobs that have the field after (that contain the job that has
836 to be run before it) are run after the previous job.
837 This method stops when all the jobs are finished.
844 self.logger.write(src.printcolors.printcInfo(
845 _('Executing the jobs :\n')))
847 for host_port in self.lhosts:
850 if port == 22: # default value
851 text_line += "|" + self.str_of_length(host, self.len_columns)
853 text_line += "|" + self.str_of_length(
854 "("+host+", "+str(port)+")", self.len_columns)
856 tiret_line = " " + "-"*(len(text_line)-1) + "\n"
857 self.logger.write(tiret_line)
858 self.logger.write(text_line + "|\n")
859 self.logger.write(tiret_line)
862 # The infinite loop that runs the jobs
863 l_jobs_not_started = self.dic_job_machine.keys()
864 while len(self._l_jobs_finished) != len(self.dic_job_machine.keys()):
865 new_job_start = False
866 for host_port in self.lhosts:
868 if self.is_occupied(host_port):
871 for jb in l_jobs_not_started:
872 if (jb.machine.host, jb.machine.port) != host_port:
876 l_jobs_not_started.remove(jb)
880 jb_before = self.find_job_that_has_name(jb.after)
881 if jb_before.has_finished():
883 l_jobs_not_started.remove(jb)
886 self.cancel_dependencies_of_failing_jobs()
887 new_job_finished = self.update_jobs_states_list()
889 if new_job_start or new_job_finished:
890 self.gui.update_xml_files(self.ljobs)
891 # Display the current status
892 self.display_status(self.len_columns)
894 # Make sure that the proc is not entirely busy
897 self.logger.write("\n")
898 self.logger.write(tiret_line)
899 self.logger.write("\n\n")
901 self.gui.update_xml_files(self.ljobs)
902 self.gui.last_update()
904 def write_all_results(self):
905 '''Display all the jobs outputs.
911 for jb in self.dic_job_machine.keys():
912 self.logger.write(src.printcolors.printcLabel(
913 "#------- Results for job %s -------#\n" % jb.name))
914 jb.write_results(self.logger)
915 self.logger.write("\n\n")
918 '''Class to manage the the xml data that can be displayed in a browser to
922 def __init__(self, xml_dir_path, l_jobs, l_jobs_not_today):
923 # The path of the global xml file
924 self.xml_dir_path = xml_dir_path
925 # Initialize the xml files
926 xml_global_path = os.path.join(self.xml_dir_path, "global_report.xml")
927 self.xml_global_file = src.xmlManager.XmlLogFile(xml_global_path,
929 # The xml files that corresponds to the tables.
930 # {name_table : xml_object}}
931 self.d_xml_table_files = {}
932 # Create the lines and columns
933 self.initialize_arrays(l_jobs, l_jobs_not_today)
935 self.update_xml_files(l_jobs)
937 def initialize_arrays(self, l_jobs, l_jobs_not_today):
939 # Get the tables to fill and put it in a dictionary
940 # {table_name : xml instance corresponding to the table}
941 for job in l_jobs + l_jobs_not_today:
943 if (table is not None and
944 table not in self.d_xml_table_files.keys()):
945 xml_table_path = os.path.join(self.xml_dir_path, table + ".xml")
946 self.d_xml_table_files[table] = src.xmlManager.XmlLogFile(
949 self.d_xml_table_files[table].add_simple_node("distributions")
950 self.d_xml_table_files[table].add_simple_node("applications")
951 self.d_xml_table_files[table].add_simple_node("table", text=table)
955 for table in self.d_xml_table_files:
957 d_application[table] = []
961 for job in l_jobs + l_jobs_not_today:
963 if (job.machine.host, job.machine.port) not in l_hosts_ports:
964 l_hosts_ports.append((job.machine.host, job.machine.port))
966 distrib = job.distribution
967 application = job.application
969 table_job = job.table
972 for table in self.d_xml_table_files:
973 if table_job == table:
974 if distrib is not None and distrib not in d_dist[table]:
975 d_dist[table].append(distrib)
976 src.xmlManager.add_simple_node(
977 self.d_xml_table_files[table].xmlroot.find('distributions'),
979 attrib={"name" : distrib})
981 if table_job == table:
982 if application is not None and application not in d_application[table]:
983 d_application[table].append(application)
984 src.xmlManager.add_simple_node(self.d_xml_table_files[table].xmlroot.find('applications'),
986 attrib={"name" : application})
988 # Initialize the hosts_ports node for the global file
989 self.xmlhosts_ports = self.xml_global_file.add_simple_node("hosts_ports")
990 for host, port in l_hosts_ports:
991 host_port = "%s:%i" % (host, port)
992 src.xmlManager.add_simple_node(self.xmlhosts_ports,
994 attrib={"name" : host_port})
996 # Initialize the jobs node in all files
997 for xml_file in [self.xml_global_file] + self.d_xml_table_files.values():
998 xml_jobs = xml_file.add_simple_node("jobs")
999 # Get the jobs present in the config file but that will not be launched
1001 self.put_jobs_not_today(l_jobs_not_today, xml_jobs)
1003 xml_file.add_simple_node("infos", attrib={"name" : "last update", "JobsCommandStatus" : "running"})
1006 def put_jobs_not_today(self, l_jobs_not_today, xml_node_jobs):
1007 for job in l_jobs_not_today:
1008 xmlj = src.xmlManager.add_simple_node(xml_node_jobs,
1010 attrib={"name" : job.name})
1011 src.xmlManager.add_simple_node(xmlj, "application", job.application)
1012 src.xmlManager.add_simple_node(xmlj,
1015 src.xmlManager.add_simple_node(xmlj, "table", job.table)
1016 src.xmlManager.add_simple_node(xmlj,
1017 "commands", " ; ".join(job.commands))
1018 src.xmlManager.add_simple_node(xmlj, "state", "Not today")
1019 src.xmlManager.add_simple_node(xmlj, "machine", job.machine.name)
1020 src.xmlManager.add_simple_node(xmlj, "host", job.machine.host)
1021 src.xmlManager.add_simple_node(xmlj, "port", str(job.machine.port))
1022 src.xmlManager.add_simple_node(xmlj, "user", job.machine.user)
1023 src.xmlManager.add_simple_node(xmlj, "sat_path",
1024 job.machine.sat_path)
1026 def update_xml_files(self, l_jobs):
1027 for xml_file in [self.xml_global_file] + self.d_xml_table_files.values():
1028 self.update_xml_file(l_jobs, xml_file)
1030 def update_xml_file(self, l_jobs, xml_file):
1032 xml_node_jobs = xml_file.xmlroot.find('jobs')
1033 # Update the job names and status node
1035 # Find the node corresponding to the job and delete it
1036 # in order to recreate it
1037 for xmljob in xml_node_jobs.findall('job'):
1038 if xmljob.attrib['name'] == job.name:
1039 xml_node_jobs.remove(xmljob)
1043 T0 = time.strftime('%Y-%m-%d %H:%M:%S',
1044 time.localtime(job._T0))
1047 Tf = time.strftime('%Y-%m-%d %H:%M:%S',
1048 time.localtime(job._Tf))
1050 # recreate the job node
1051 xmlj = src.xmlManager.add_simple_node(xml_node_jobs,
1053 attrib={"name" : job.name})
1054 src.xmlManager.add_simple_node(xmlj, "machine", job.machine.name)
1055 src.xmlManager.add_simple_node(xmlj, "host", job.machine.host)
1056 src.xmlManager.add_simple_node(xmlj, "port", str(job.machine.port))
1057 src.xmlManager.add_simple_node(xmlj, "user", job.machine.user)
1058 src.xmlManager.add_simple_node(xmlj, "sat_path",
1059 job.machine.sat_path)
1060 src.xmlManager.add_simple_node(xmlj, "application", job.application)
1061 src.xmlManager.add_simple_node(xmlj, "distribution",
1063 src.xmlManager.add_simple_node(xmlj, "table", job.table)
1064 src.xmlManager.add_simple_node(xmlj, "timeout", str(job.timeout))
1065 src.xmlManager.add_simple_node(xmlj, "commands",
1066 " ; ".join(job.commands))
1067 src.xmlManager.add_simple_node(xmlj, "state", job.get_status())
1068 src.xmlManager.add_simple_node(xmlj, "begin", T0)
1069 src.xmlManager.add_simple_node(xmlj, "end", Tf)
1070 src.xmlManager.add_simple_node(xmlj, "out",
1071 src.printcolors.cleancolor(job.out))
1072 src.xmlManager.add_simple_node(xmlj, "err",
1073 src.printcolors.cleancolor(job.err))
1074 src.xmlManager.add_simple_node(xmlj, "res", str(job.res_job))
1075 if len(job.remote_log_files) > 0:
1076 src.xmlManager.add_simple_node(xmlj,
1077 "remote_log_file_path",
1078 job.remote_log_files[0])
1080 src.xmlManager.add_simple_node(xmlj,
1081 "remote_log_file_path",
1084 xmlafter = src.xmlManager.add_simple_node(xmlj, "after", job.after)
1085 # get the job father
1086 if job.after is not None:
1089 if jb.name == job.after:
1091 if job_father is None:
1092 msg = _("The job %(father_name)s that is parent of "
1093 "%(son_name)s is not in the job list." %
1094 {"father_name" : job.after , "son_name" : job.name})
1095 raise src.SatException(msg)
1097 if len(job_father.remote_log_files) > 0:
1098 link = job_father.remote_log_files[0]
1101 src.xmlManager.append_node_attrib(xmlafter, {"link" : link})
1105 xml_node_infos = xml_file.xmlroot.find('infos')
1106 src.xmlManager.append_node_attrib(xml_node_infos,
1108 datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")})
1111 self.write_xml_files()
1113 def last_update(self, finish_status = "finished"):
1114 for xml_file in [self.xml_global_file] + self.d_xml_table_files.values():
1115 xml_node_infos = xml_file.xmlroot.find('infos')
1116 src.xmlManager.append_node_attrib(xml_node_infos,
1117 attrib={"JobsCommandStatus" : finish_status})
1119 self.write_xml_files()
1121 def write_xml_files(self):
1122 self.xml_global_file.write_tree(STYLESHEET_GLOBAL)
1123 for xml_file in self.d_xml_table_files.values():
1124 xml_file.write_tree(STYLESHEET_TABLE)
1127 # Describes the command
1129 return _("The jobs command launches maintenances that are described"
1130 " in the dedicated jobs configuration file.")
1134 def run(args, runner, logger):
1136 (options, args) = parser.parse_args(args)
1138 jobs_cfg_files_dir = runner.cfg.SITE.jobs.config_path
1140 l_cfg_dir = [jobs_cfg_files_dir,
1141 os.path.join(runner.cfg.VARS.datadir, "jobs")]
1143 # Make sure the path to the jobs config files directory exists
1144 src.ensure_path_exists(jobs_cfg_files_dir)
1146 # list option : display all the available config files
1148 for cfg_dir in l_cfg_dir:
1149 if not options.no_label:
1150 logger.write("------ %s\n" %
1151 src.printcolors.printcHeader(cfg_dir))
1153 for f in sorted(os.listdir(cfg_dir)):
1154 if not f.endswith('.pyconf'):
1157 logger.write("%s\n" % cfilename)
1160 # Make sure the jobs_config option has been called
1161 if not options.jobs_cfg:
1162 message = _("The option --jobs_config is required\n")
1163 raise src.SatException( message )
1165 # Find the file in the directories
1167 for cfg_dir in l_cfg_dir:
1168 file_jobs_cfg = os.path.join(cfg_dir, options.jobs_cfg)
1169 if not file_jobs_cfg.endswith('.pyconf'):
1170 file_jobs_cfg += '.pyconf'
1172 if not os.path.exists(file_jobs_cfg):
1179 msg = _("The file configuration %(name_file)s was not found."
1180 "\nUse the --list option to get the possible files.")
1181 src.printcolors.printcError(msg)
1185 (_("Platform"), runner.cfg.VARS.dist),
1186 (_("File containing the jobs configuration"), file_jobs_cfg)
1188 src.print_info(logger, info)
1190 # Read the config that is in the file
1191 config_jobs = src.read_config_from_a_file(file_jobs_cfg)
1192 if options.only_jobs:
1193 l_jb = src.pyconf.Sequence()
1194 for jb in config_jobs.jobs:
1195 if jb.name in options.only_jobs:
1197 "Adding a job that was given in only_jobs option parameters")
1198 config_jobs.jobs = l_jb
1201 today_jobs = Jobs(runner,
1206 # SSH connection to all machines
1207 today_jobs.ssh_connection_all_machines()
1208 if options.test_connection:
1213 gui = Gui("/export/home/serioja/LOGS",
1215 today_jobs.ljobs_not_today,)
1217 today_jobs.gui = gui
1221 # Run all the jobs contained in config_jobs
1222 today_jobs.run_jobs()
1223 except KeyboardInterrupt:
1225 logger.write("\n\n%s\n\n" %
1226 (src.printcolors.printcWarning(_("Forced interruption"))), 1)
1229 # find the potential not finished jobs and kill them
1230 for jb in today_jobs.ljobs:
1231 if not jb.has_finished():
1232 jb.kill_remote_process()
1234 today_jobs.gui.last_update(_("Forced interruption"))
1236 today_jobs.gui.last_update()
1237 # Output the results
1238 today_jobs.write_all_results()