3 # Copyright (C) 2010-2013 CEA/DEN
5 # This library is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation; either
8 # version 2.1 of the License.
10 # This library is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 # Lesser General Public License for more details.
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this library; if not, write to the Free Software
17 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
26 STYLESHEET_GLOBAL = "jobs_global_report.xsl"
27 STYLESHEET_TABLE = "jobs_table_report.xsl"
29 parser = src.options.Options()
31 parser.add_option('j', 'jobs_config', 'string', 'jobs_cfg',
32 _('The name of the config file that contains'
33 ' the jobs configuration'))
34 parser.add_option('o', 'only_jobs', 'list2', 'only_jobs',
35 _('The list of jobs to launch, by their name. '))
36 parser.add_option('l', 'list', 'boolean', 'list',
37 _('list all available config files.'))
38 parser.add_option('n', 'no_label', 'boolean', 'no_label',
39 _("do not print labels, Works only with --list."), False)
40 parser.add_option('t', 'test_connection', 'boolean', 'test_connection',
41 _("Try to connect to the machines. Not executing the jobs."),
43 parser.add_option('p', 'publish', 'boolean', 'publish',
44 _("Generate an xml file that can be read in a browser to "
45 "display the jobs status."),
48 class Machine(object):
49 '''Class to manage a ssh connection on a machine
57 sat_path="salomeTools"):
62 self.password = passwd
63 self.sat_path = sat_path
64 self.ssh = paramiko.SSHClient()
65 self._connection_successful = None
67 def connect(self, logger):
68 '''Initiate the ssh connection to the remote machine
70 :param logger src.logger.Logger: The logger instance
75 self._connection_successful = False
76 self.ssh.load_system_host_keys()
77 self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
79 self.ssh.connect(self.host,
82 password = self.password)
83 except paramiko.AuthenticationException:
84 message = src.KO_STATUS + _("Authentication failed")
85 except paramiko.BadHostKeyException:
86 message = (src.KO_STATUS +
87 _("The server's host key could not be verified"))
88 except paramiko.SSHException:
89 message = ( _("SSHException error connecting or "
90 "establishing an SSH session"))
92 message = ( _("Error connecting or establishing an SSH session"))
94 self._connection_successful = True
98 def successfully_connected(self, logger):
99 '''Verify if the connection to the remote machine has succeed
101 :param logger src.logger.Logger: The logger instance
102 :return: True if the connection has succeed, False if not
105 if self._connection_successful == None:
106 message = _("Warning : trying to ask if the connection to "
107 "(name: %s host: %s, port: %s, user: %s) is OK whereas there were"
108 " no connection request" %
109 (self.name, self.host, self.port, self.user))
110 logger.write( src.printcolors.printcWarning(message))
111 return self._connection_successful
113 def copy_sat(self, sat_local_path, job_file):
114 '''Copy salomeTools to the remote machine in self.sat_path
118 self.sftp = self.ssh.open_sftp()
119 self.mkdir(self.sat_path, ignore_existing=True)
120 self.put_dir(sat_local_path, self.sat_path, filters = ['.git'])
121 job_file_name = os.path.basename(job_file)
122 self.sftp.put(job_file, os.path.join(self.sat_path,
126 except Exception as e:
128 self._connection_successful = False
132 def put_dir(self, source, target, filters = []):
133 ''' Uploads the contents of the source directory to the target path. The
134 target directory needs to exists. All subdirectories in source are
135 created under target.
137 for item in os.listdir(source):
140 source_path = os.path.join(source, item)
141 destination_path = os.path.join(target, item)
142 if os.path.islink(source_path):
143 linkto = os.readlink(source_path)
145 self.sftp.symlink(linkto, destination_path)
146 self.sftp.chmod(destination_path,
147 os.stat(source_path).st_mode)
151 if os.path.isfile(source_path):
152 self.sftp.put(source_path, destination_path)
153 self.sftp.chmod(destination_path,
154 os.stat(source_path).st_mode)
156 self.mkdir(destination_path, ignore_existing=True)
157 self.put_dir(source_path, destination_path)
159 def mkdir(self, path, mode=511, ignore_existing=False):
160 ''' Augments mkdir by adding an option to not fail
164 self.sftp.mkdir(path, mode)
171 def exec_command(self, command, logger):
172 '''Execute the command on the remote machine
174 :param command str: The command to be run
175 :param logger src.logger.Logger: The logger instance
176 :return: the stdin, stdout, and stderr of the executing command,
178 :rtype: (paramiko.channel.ChannelFile, paramiko.channel.ChannelFile,
179 paramiko.channel.ChannelFile)
182 # Does not wait the end of the command
183 (stdin, stdout, stderr) = self.ssh.exec_command(command)
184 except paramiko.SSHException:
185 message = src.KO_STATUS + _(
186 ": the server failed to execute the command\n")
187 logger.write( src.printcolors.printcError(message))
188 return (None, None, None)
190 logger.write( src.printcolors.printcError(src.KO_STATUS + '\n'))
191 return (None, None, None)
193 return (stdin, stdout, stderr)
196 '''Close the ssh connection
202 def write_info(self, logger):
203 '''Prints the informations relative to the machine in the logger
204 (terminal traces and log file)
206 :param logger src.logger.Logger: The logger instance
210 logger.write("host : " + self.host + "\n")
211 logger.write("port : " + str(self.port) + "\n")
212 logger.write("user : " + str(self.user) + "\n")
213 if self.successfully_connected(logger):
214 status = src.OK_STATUS
216 status = src.KO_STATUS
217 logger.write("Connection : " + status + "\n\n")
221 '''Class to manage one job
223 def __init__(self, name, machine, application, distribution, table,
224 commands, timeout, config, logger, job_file, after=None):
227 self.machine = machine
229 self.timeout = timeout
230 self.application = application
231 self.distribution = distribution
235 # The list of log files to download from the remote machine
236 self.remote_log_files = []
238 # The remote command status
239 # -1 means that it has not been launched,
240 # 0 means success and 1 means fail
242 self.cancelled = False
246 self._has_begun = False
247 self._has_finished = False
248 self._has_timouted = False
249 self._stdin = None # Store the command inputs field
250 self._stdout = None # Store the command outputs field
251 self._stderr = None # Store the command errors field
253 self.out = None # Contains something only if the job is finished
254 self.err = None # Contains something only if the job is finished
256 self.commands = commands
257 self.command = (os.path.join(self.machine.sat_path, "sat") +
259 os.path.join(self.machine.sat_path,
260 "list_log_files.txt") +
261 " job --jobs_config " +
268 cmd_pid = 'ps aux | grep "' + self.command + '" | awk \'{print $2}\''
269 (_, out_pid, _) = self.machine.exec_command(cmd_pid, self.logger)
270 pids_cmd = out_pid.readlines()
271 pids_cmd = [str(src.only_numbers(pid)) for pid in pids_cmd]
275 def kill_remote_process(self, wait=1):
276 '''Kills the process on the remote machine.
278 :return: (the output of the kill, the error of the kill)
282 pids = self.get_pids()
283 cmd_kill = " ; ".join([("kill -2 " + pid) for pid in pids])
284 (_, out_kill, err_kill) = self.machine.exec_command(cmd_kill,
287 return (out_kill, err_kill)
290 '''Returns True if the job has already begun
292 :return: True if the job has already begun
295 return self._has_begun
297 def has_finished(self):
298 '''Returns True if the job has already finished
299 (i.e. all the commands have been executed)
300 If it is finished, the outputs are stored in the fields out and err.
302 :return: True if the job has already finished
306 # If the method has already been called and returned True
307 if self._has_finished:
310 # If the job has not begun yet
311 if not self.has_begun():
314 if self._stdout.channel.closed:
315 self._has_finished = True
316 # Store the result outputs
317 self.out = self._stdout.read()
318 self.err = self._stderr.read()
320 self._Tf = time.time()
321 # And get the remote command status and log files
324 return self._has_finished
326 def get_log_files(self):
327 if not self.has_finished():
328 msg = _("Trying to get log files whereas the job is not finished.")
329 self.logger.write(src.printcolors.printcWarning(msg))
332 tmp_file_path = src.get_tmp_filename(self.config, "list_log_files.txt")
333 self.machine.sftp.get(
334 os.path.join(self.machine.sat_path, "list_log_files.txt"),
337 fstream_tmp = open(tmp_file_path, "r")
338 file_lines = fstream_tmp.readlines()
339 file_lines = [line.replace("\n", "") for line in file_lines]
341 os.remove(tmp_file_path)
342 self.res_job = file_lines[0]
343 for job_path_remote in file_lines[1:]:
345 if os.path.basename(os.path.dirname(job_path_remote)) != 'OUT':
346 local_path = os.path.join(os.path.dirname(
347 self.logger.logFilePath),
348 os.path.basename(job_path_remote))
349 if not os.path.exists(local_path):
350 self.machine.sftp.get(job_path_remote, local_path)
352 local_path = os.path.join(os.path.dirname(
353 self.logger.logFilePath),
355 os.path.basename(job_path_remote))
356 if not os.path.exists(local_path):
357 self.machine.sftp.get(job_path_remote, local_path)
358 self.remote_log_files.append(local_path)
360 self.err += _("Unable to get %s log file from remote.") % job_path_remote
362 def has_failed(self):
363 '''Returns True if the job has failed.
364 A job is considered as failed if the machine could not be reached,
365 if the remote command failed,
366 or if the job finished with a time out.
368 :return: True if the job has failed
371 if not self.has_finished():
373 if not self.machine.successfully_connected(self.logger):
375 if self.is_timeout():
377 if self.res_job == "1":
382 """In case of a failing job, one has to cancel every job that depend
383 on it. This method put the job as failed and will not be executed.
385 self._has_begun = True
386 self._has_finished = True
387 self.cancelled = True
388 self.out = _("This job was not launched because its father has failed.")
389 self.err = _("This job was not launched because its father has failed.")
391 def is_running(self):
392 '''Returns True if the job commands are running
394 :return: True if the job is running
397 return self.has_begun() and not self.has_finished()
399 def is_timeout(self):
400 '''Returns True if the job commands has finished with timeout
402 :return: True if the job has finished with timeout
405 return self._has_timouted
407 def time_elapsed(self):
408 if not self.has_begun():
411 return T_now - self._T0
413 def check_time(self):
414 if not self.has_begun():
416 if self.time_elapsed() > self.timeout:
417 self._has_finished = True
418 self._has_timouted = True
419 self._Tf = time.time()
421 (out_kill, _) = self.kill_remote_process()
422 self.out = "TIMEOUT \n" + out_kill.read()
423 self.err = "TIMEOUT : %s seconds elapsed\n" % str(self.timeout)
427 self.err += _("Unable to get remote log files")
429 def total_duration(self):
430 return self._Tf - self._T0
432 def run(self, logger):
434 print("Warn the user that a job can only be launched one time")
437 if not self.machine.successfully_connected(logger):
438 self._has_finished = True
440 self.err = ("Connection to machine (name : %s, host: %s, port:"
441 " %s, user: %s) has failed\nUse the log command "
442 "to get more information."
443 % (self.machine.name,
448 self._T0 = time.time()
449 self._stdin, self._stdout, self._stderr = self.machine.exec_command(
450 self.command, logger)
451 if (self._stdin, self._stdout, self._stderr) == (None, None, None):
452 self._has_finished = True
453 self._Tf = time.time()
455 self.err = "The server failed to execute the command"
457 self._has_begun = True
459 def write_results(self, logger):
460 logger.write("name : " + self.name + "\n")
462 logger.write("after : %s\n" % self.after)
463 logger.write("Time elapsed : %4imin %2is \n" %
464 (self.total_duration()/60 , self.total_duration()%60))
466 logger.write("Begin time : %s\n" %
467 time.strftime('%Y-%m-%d %H:%M:%S',
468 time.localtime(self._T0)) )
470 logger.write("End time : %s\n\n" %
471 time.strftime('%Y-%m-%d %H:%M:%S',
472 time.localtime(self._Tf)) )
474 machine_head = "Informations about connection :\n"
475 underline = (len(machine_head) - 2) * "-"
476 logger.write(src.printcolors.printcInfo(machine_head+underline+"\n"))
477 self.machine.write_info(logger)
479 logger.write(src.printcolors.printcInfo("out : \n"))
481 logger.write("Unable to get output\n")
483 logger.write(self.out + "\n")
484 logger.write(src.printcolors.printcInfo("err : \n"))
486 logger.write("Unable to get error\n")
488 logger.write(self.err + "\n")
490 def get_status(self):
491 if not self.machine.successfully_connected(self.logger):
492 return "SSH connection KO"
493 if not self.has_begun():
494 return "Not launched"
497 if self.is_running():
498 return "running since " + time.strftime('%Y-%m-%d %H:%M:%S',
499 time.localtime(self._T0))
500 if self.has_finished():
501 if self.is_timeout():
502 return "Timeout since " + time.strftime('%Y-%m-%d %H:%M:%S',
503 time.localtime(self._Tf))
504 return "Finished since " + time.strftime('%Y-%m-%d %H:%M:%S',
505 time.localtime(self._Tf))
508 '''Class to manage the jobs to be run
516 lenght_columns = 20):
517 # The jobs configuration
518 self.cfg_jobs = config_jobs
519 self.job_file = job_file
520 self.job_file_path = job_file_path
521 # The machine that will be used today
523 # The list of machine (hosts, port) that will be used today
524 # (a same host can have several machine instances since there
525 # can be several ssh parameters)
527 # The jobs to be launched today
529 # The jobs that will not be launched today
530 self.ljobs_not_today = []
533 # The correlation dictionary between jobs and machines
534 self.dic_job_machine = {}
535 self.len_columns = lenght_columns
537 # the list of jobs that have not been run yet
538 self._l_jobs_not_started = []
539 # the list of jobs that have already ran
540 self._l_jobs_finished = []
541 # the list of jobs that are running
542 self._l_jobs_running = []
544 self.determine_jobs_and_machines()
546 def define_job(self, job_def, machine):
547 '''Takes a pyconf job definition and a machine (from class machine)
548 and returns the job instance corresponding to the definition.
550 :param job_def src.config.Mapping: a job definition
551 :param machine machine: the machine on which the job will run
552 :return: The corresponding job in a job class instance
556 cmmnds = job_def.commands
557 timeout = job_def.timeout
559 if 'after' in job_def:
560 after = job_def.after
562 if 'application' in job_def:
563 application = job_def.application
565 if 'distribution' in job_def:
566 distribution = job_def.distribution
568 if 'table' in job_def:
569 table = job_def.table
583 def determine_jobs_and_machines(self):
584 '''Function that reads the pyconf jobs definition and instantiates all
585 the machines and jobs to be done today.
590 today = datetime.date.weekday(datetime.date.today())
593 for job_def in self.cfg_jobs.jobs :
595 if not "machine" in job_def:
596 msg = _('WARNING: The job "%s" do not have the key '
597 '"machine", this job is ignored.\n\n' % job_def.name)
598 self.logger.write(src.printcolors.printcWarning(msg))
600 name_machine = job_def.machine
603 for mach in self.lmachines:
604 if mach.name == name_machine:
608 if a_machine == None:
609 for machine_def in self.cfg_jobs.machines:
610 if machine_def.name == name_machine:
611 if 'host' not in machine_def:
612 host = self.runner.cfg.VARS.hostname
614 host = machine_def.host
616 if 'user' not in machine_def:
617 user = self.runner.cfg.VARS.user
619 user = machine_def.user
621 if 'port' not in machine_def:
624 port = machine_def.port
626 if 'password' not in machine_def:
629 passwd = machine_def.password
631 if 'sat_path' not in machine_def:
632 sat_path = "salomeTools"
634 sat_path = machine_def.sat_path
645 self.lmachines.append(a_machine)
646 if (host, port) not in host_list:
647 host_list.append((host, port))
649 if a_machine == None:
650 msg = _("WARNING: The job \"%(job_name)s\" requires the "
651 "machine \"%(machine_name)s\" but this machine "
652 "is not defined in the configuration file.\n"
653 "The job will not be launched")
654 self.logger.write(src.printcolors.printcWarning(msg))
656 a_job = self.define_job(job_def, a_machine)
657 self.dic_job_machine[a_job] = a_machine
659 if today in job_def.when:
660 self.ljobs.append(a_job)
661 else: # today in job_def.when
662 self.ljobs_not_today.append(a_job)
664 self.lhosts = host_list
666 def ssh_connection_all_machines(self, pad=50):
667 '''Function that do the ssh connection to every machine
673 self.logger.write(src.printcolors.printcInfo((
674 "Establishing connection with all the machines :\n")))
675 for machine in self.lmachines:
676 # little algorithm in order to display traces
677 begin_line = (_("Connection to %s: " % machine.name))
678 if pad - len(begin_line) < 0:
681 endline = (pad - len(begin_line)) * "." + " "
683 step = "SSH connection"
684 self.logger.write( begin_line + endline + step)
686 # the call to the method that initiate the ssh connection
687 msg = machine.connect(self.logger)
689 # Copy salomeTools to the remote machine
690 if machine.successfully_connected(self.logger):
692 self.logger.write('\r%s%s%s' % (begin_line, endline, 20 * " "),3)
693 self.logger.write('\r%s%s%s' % (begin_line, endline, step), 3)
695 res_copy = machine.copy_sat(self.runner.cfg.VARS.salometoolsway,
697 # Print the status of the copy
699 self.logger.write('\r%s' %
700 ((len(begin_line)+len(endline)+20) * " "), 3)
701 self.logger.write('\r%s%s%s' %
704 src.printcolors.printc(src.OK_STATUS)), 3)
706 self.logger.write('\r%s' %
707 ((len(begin_line)+len(endline)+20) * " "), 3)
708 self.logger.write('\r%s%s%s %s' %
711 src.printcolors.printc(src.OK_STATUS),
712 _("Copy of SAT failed")), 3)
714 self.logger.write('\r%s' %
715 ((len(begin_line)+len(endline)+20) * " "), 3)
716 self.logger.write('\r%s%s%s %s' %
719 src.printcolors.printc(src.KO_STATUS),
721 self.logger.write("\n", 3)
723 self.logger.write("\n")
726 def is_occupied(self, hostname):
727 '''Function that returns True if a job is running on
728 the machine defined by its host and its port.
730 :param hostname (str, int): the pair (host, port)
731 :return: the job that is running on the host,
732 or false if there is no job running on the host.
737 for jb in self.dic_job_machine:
738 if jb.machine.host == host and jb.machine.port == port:
743 def update_jobs_states_list(self):
744 '''Function that updates the lists that store the currently
745 running jobs and the jobs that have already finished.
750 jobs_finished_list = []
751 jobs_running_list = []
752 for jb in self.dic_job_machine:
754 jobs_running_list.append(jb)
756 if jb.has_finished():
757 jobs_finished_list.append(jb)
759 nb_job_finished_before = len(self._l_jobs_finished)
760 self._l_jobs_finished = jobs_finished_list
761 self._l_jobs_running = jobs_running_list
763 nb_job_finished_now = len(self._l_jobs_finished)
765 return nb_job_finished_now > nb_job_finished_before
767 def cancel_dependencies_of_failing_jobs(self):
768 '''Function that cancels all the jobs that depend on a failing one.
774 for job in self.ljobs:
775 if job.after is None:
777 father_job = self.find_job_that_has_name(job.after)
778 if father_job.has_failed():
781 def find_job_that_has_name(self, name):
782 '''Returns the job by its name.
784 :param name str: a job name
785 :return: the job that has the name.
788 for jb in self.ljobs:
792 # the following is executed only if the job was not found
793 msg = _('The job "%s" seems to be nonexistent') % name
794 raise src.SatException(msg)
796 def str_of_length(self, text, length):
797 '''Takes a string text of any length and returns
798 the most close string of length "length".
800 :param text str: any string
801 :param length int: a length for the returned string
802 :return: the most close string of length "length"
805 if len(text) > length:
806 text_out = text[:length-3] + '...'
808 diff = length - len(text)
809 before = " " * (diff/2)
810 after = " " * (diff/2 + diff%2)
811 text_out = before + text + after
815 def display_status(self, len_col):
816 '''Takes a lenght and construct the display of the current status
817 of the jobs in an array that has a column for each host.
818 It displays the job that is currently running on the host
821 :param len_col int: the size of the column
827 for host_port in self.lhosts:
828 jb = self.is_occupied(host_port)
829 if not jb: # nothing running on the host
830 empty = self.str_of_length("empty", len_col)
831 display_line += "|" + empty
833 display_line += "|" + src.printcolors.printcInfo(
834 self.str_of_length(jb.name, len_col))
836 self.logger.write("\r" + display_line + "|")
841 '''The main method. Runs all the jobs on every host.
842 For each host, at a given time, only one job can be running.
843 The jobs that have the field after (that contain the job that has
844 to be run before it) are run after the previous job.
845 This method stops when all the jobs are finished.
852 self.logger.write(src.printcolors.printcInfo(
853 _('Executing the jobs :\n')))
855 for host_port in self.lhosts:
858 if port == 22: # default value
859 text_line += "|" + self.str_of_length(host, self.len_columns)
861 text_line += "|" + self.str_of_length(
862 "("+host+", "+str(port)+")", self.len_columns)
864 tiret_line = " " + "-"*(len(text_line)-1) + "\n"
865 self.logger.write(tiret_line)
866 self.logger.write(text_line + "|\n")
867 self.logger.write(tiret_line)
870 # The infinite loop that runs the jobs
871 l_jobs_not_started = self.dic_job_machine.keys()
872 while len(self._l_jobs_finished) != len(self.dic_job_machine.keys()):
873 new_job_start = False
874 for host_port in self.lhosts:
876 if self.is_occupied(host_port):
879 for jb in l_jobs_not_started:
880 if (jb.machine.host, jb.machine.port) != host_port:
884 l_jobs_not_started.remove(jb)
888 jb_before = self.find_job_that_has_name(jb.after)
889 if jb_before.has_finished():
891 l_jobs_not_started.remove(jb)
894 self.cancel_dependencies_of_failing_jobs()
895 new_job_finished = self.update_jobs_states_list()
897 if new_job_start or new_job_finished:
898 self.gui.update_xml_files(self.ljobs)
899 # Display the current status
900 self.display_status(self.len_columns)
902 # Make sure that the proc is not entirely busy
905 self.logger.write("\n")
906 self.logger.write(tiret_line)
907 self.logger.write("\n\n")
909 self.gui.update_xml_files(self.ljobs)
910 self.gui.last_update()
912 def write_all_results(self):
913 '''Display all the jobs outputs.
919 for jb in self.dic_job_machine.keys():
920 self.logger.write(src.printcolors.printcLabel(
921 "#------- Results for job %s -------#\n" % jb.name))
922 jb.write_results(self.logger)
923 self.logger.write("\n\n")
926 '''Class to manage the the xml data that can be displayed in a browser to
930 def __init__(self, xml_dir_path, l_jobs, l_jobs_not_today):
933 :param xml_dir_path str: The path to the directory where to put
934 the xml resulting files
935 :param l_jobs List: the list of jobs that run today
936 :param l_jobs_not_today List: the list of jobs that do not run today
938 # The path of the global xml file
939 self.xml_dir_path = xml_dir_path
940 # Initialize the xml files
941 xml_global_path = os.path.join(self.xml_dir_path, "global_report.xml")
942 self.xml_global_file = src.xmlManager.XmlLogFile(xml_global_path,
944 # The xml files that corresponds to the tables.
945 # {name_table : xml_object}}
946 self.d_xml_table_files = {}
947 # Create the lines and columns
948 self.initialize_arrays(l_jobs, l_jobs_not_today)
950 self.update_xml_files(l_jobs)
952 def initialize_arrays(self, l_jobs, l_jobs_not_today):
953 '''Get all the first information needed for each file and write the
954 first version of the files
955 :param l_jobs List: the list of jobs that run today
956 :param l_jobs_not_today List: the list of jobs that do not run today
958 # Get the tables to fill and put it in a dictionary
959 # {table_name : xml instance corresponding to the table}
960 for job in l_jobs + l_jobs_not_today:
962 if (table is not None and
963 table not in self.d_xml_table_files.keys()):
964 xml_table_path = os.path.join(self.xml_dir_path, table + ".xml")
965 self.d_xml_table_files[table] = src.xmlManager.XmlLogFile(
968 self.d_xml_table_files[table].add_simple_node("distributions")
969 self.d_xml_table_files[table].add_simple_node("applications")
970 self.d_xml_table_files[table].add_simple_node("table", text=table)
972 # Loop over all jobs in order to get the lines and columns for each
976 for table in self.d_xml_table_files:
978 d_application[table] = []
982 for job in l_jobs + l_jobs_not_today:
984 if (job.machine.host, job.machine.port) not in l_hosts_ports:
985 l_hosts_ports.append((job.machine.host, job.machine.port))
987 distrib = job.distribution
988 application = job.application
990 table_job = job.table
993 for table in self.d_xml_table_files:
994 if table_job == table:
995 if distrib is not None and distrib not in d_dist[table]:
996 d_dist[table].append(distrib)
997 src.xmlManager.add_simple_node(
998 self.d_xml_table_files[table].xmlroot.find('distributions'),
1000 attrib={"name" : distrib})
1002 if table_job == table:
1003 if application is not None and application not in d_application[table]:
1004 d_application[table].append(application)
1005 src.xmlManager.add_simple_node(self.d_xml_table_files[table].xmlroot.find('applications'),
1007 attrib={"name" : application})
1009 # Initialize the hosts_ports node for the global file
1010 self.xmlhosts_ports = self.xml_global_file.add_simple_node("hosts_ports")
1011 for host, port in l_hosts_ports:
1012 host_port = "%s:%i" % (host, port)
1013 src.xmlManager.add_simple_node(self.xmlhosts_ports,
1015 attrib={"name" : host_port})
1017 # Initialize the jobs node in all files
1018 for xml_file in [self.xml_global_file] + self.d_xml_table_files.values():
1019 xml_jobs = xml_file.add_simple_node("jobs")
1020 # Get the jobs present in the config file but that will not be launched
1022 self.put_jobs_not_today(l_jobs_not_today, xml_jobs)
1024 xml_file.add_simple_node("infos", attrib={"name" : "last update", "JobsCommandStatus" : "running"})
1027 def put_jobs_not_today(self, l_jobs_not_today, xml_node_jobs):
1028 '''Get all the first information needed for each file and write the
1029 first version of the files
1031 :param xml_node_jobs etree.Element: the node corresponding to a job
1032 :param l_jobs_not_today List: the list of jobs that do not run today
1034 for job in l_jobs_not_today:
1035 xmlj = src.xmlManager.add_simple_node(xml_node_jobs,
1037 attrib={"name" : job.name})
1038 src.xmlManager.add_simple_node(xmlj, "application", job.application)
1039 src.xmlManager.add_simple_node(xmlj,
1042 src.xmlManager.add_simple_node(xmlj, "table", job.table)
1043 src.xmlManager.add_simple_node(xmlj,
1044 "commands", " ; ".join(job.commands))
1045 src.xmlManager.add_simple_node(xmlj, "state", "Not today")
1046 src.xmlManager.add_simple_node(xmlj, "machine", job.machine.name)
1047 src.xmlManager.add_simple_node(xmlj, "host", job.machine.host)
1048 src.xmlManager.add_simple_node(xmlj, "port", str(job.machine.port))
1049 src.xmlManager.add_simple_node(xmlj, "user", job.machine.user)
1050 src.xmlManager.add_simple_node(xmlj, "sat_path",
1051 job.machine.sat_path)
1053 def update_xml_files(self, l_jobs):
1054 '''Write all the xml files with updated information about the jobs
1056 :param l_jobs List: the list of jobs that run today
1058 for xml_file in [self.xml_global_file] + self.d_xml_table_files.values():
1059 self.update_xml_file(l_jobs, xml_file)
1062 self.write_xml_files()
1064 def update_xml_file(self, l_jobs, xml_file):
1065 '''update information about the jobs for the file xml_file
1067 :param l_jobs List: the list of jobs that run today
1068 :param xml_file xmlManager.XmlLogFile: the xml instance to update
1071 xml_node_jobs = xml_file.xmlroot.find('jobs')
1072 # Update the job names and status node
1074 # Find the node corresponding to the job and delete it
1075 # in order to recreate it
1076 for xmljob in xml_node_jobs.findall('job'):
1077 if xmljob.attrib['name'] == job.name:
1078 xml_node_jobs.remove(xmljob)
1082 T0 = time.strftime('%Y-%m-%d %H:%M:%S',
1083 time.localtime(job._T0))
1086 Tf = time.strftime('%Y-%m-%d %H:%M:%S',
1087 time.localtime(job._Tf))
1089 # recreate the job node
1090 xmlj = src.xmlManager.add_simple_node(xml_node_jobs,
1092 attrib={"name" : job.name})
1093 src.xmlManager.add_simple_node(xmlj, "machine", job.machine.name)
1094 src.xmlManager.add_simple_node(xmlj, "host", job.machine.host)
1095 src.xmlManager.add_simple_node(xmlj, "port", str(job.machine.port))
1096 src.xmlManager.add_simple_node(xmlj, "user", job.machine.user)
1097 src.xmlManager.add_simple_node(xmlj, "sat_path",
1098 job.machine.sat_path)
1099 src.xmlManager.add_simple_node(xmlj, "application", job.application)
1100 src.xmlManager.add_simple_node(xmlj, "distribution",
1102 src.xmlManager.add_simple_node(xmlj, "table", job.table)
1103 src.xmlManager.add_simple_node(xmlj, "timeout", str(job.timeout))
1104 src.xmlManager.add_simple_node(xmlj, "commands",
1105 " ; ".join(job.commands))
1106 src.xmlManager.add_simple_node(xmlj, "state", job.get_status())
1107 src.xmlManager.add_simple_node(xmlj, "begin", T0)
1108 src.xmlManager.add_simple_node(xmlj, "end", Tf)
1109 src.xmlManager.add_simple_node(xmlj, "out",
1110 src.printcolors.cleancolor(job.out))
1111 src.xmlManager.add_simple_node(xmlj, "err",
1112 src.printcolors.cleancolor(job.err))
1113 src.xmlManager.add_simple_node(xmlj, "res", str(job.res_job))
1114 if len(job.remote_log_files) > 0:
1115 src.xmlManager.add_simple_node(xmlj,
1116 "remote_log_file_path",
1117 job.remote_log_files[0])
1119 src.xmlManager.add_simple_node(xmlj,
1120 "remote_log_file_path",
1123 xmlafter = src.xmlManager.add_simple_node(xmlj, "after", job.after)
1124 # get the job father
1125 if job.after is not None:
1128 if jb.name == job.after:
1130 if job_father is None:
1131 msg = _("The job %(father_name)s that is parent of "
1132 "%(son_name)s is not in the job list." %
1133 {"father_name" : job.after , "son_name" : job.name})
1134 raise src.SatException(msg)
1136 if len(job_father.remote_log_files) > 0:
1137 link = job_father.remote_log_files[0]
1140 src.xmlManager.append_node_attrib(xmlafter, {"link" : link})
1144 xml_node_infos = xml_file.xmlroot.find('infos')
1145 src.xmlManager.append_node_attrib(xml_node_infos,
1147 datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")})
1151 def last_update(self, finish_status = "finished"):
1152 '''update information about the jobs for the file xml_file
1154 :param l_jobs List: the list of jobs that run today
1155 :param xml_file xmlManager.XmlLogFile: the xml instance to update
1157 for xml_file in [self.xml_global_file] + self.d_xml_table_files.values():
1158 xml_node_infos = xml_file.xmlroot.find('infos')
1159 src.xmlManager.append_node_attrib(xml_node_infos,
1160 attrib={"JobsCommandStatus" : finish_status})
1162 self.write_xml_files()
1164 def write_xml_files(self):
1165 ''' Write the xml files
1167 self.xml_global_file.write_tree(STYLESHEET_GLOBAL)
1168 for xml_file in self.d_xml_table_files.values():
1169 xml_file.write_tree(STYLESHEET_TABLE)
1172 # Describes the command
1174 return _("The jobs command launches maintenances that are described"
1175 " in the dedicated jobs configuration file.")
1179 def run(args, runner, logger):
1181 (options, args) = parser.parse_args(args)
1183 jobs_cfg_files_dir = runner.cfg.SITE.jobs.config_path
1185 l_cfg_dir = [jobs_cfg_files_dir,
1186 os.path.join(runner.cfg.VARS.datadir, "jobs")]
1188 # Make sure the path to the jobs config files directory exists
1189 src.ensure_path_exists(jobs_cfg_files_dir)
1191 # list option : display all the available config files
1193 for cfg_dir in l_cfg_dir:
1194 if not options.no_label:
1195 logger.write("------ %s\n" %
1196 src.printcolors.printcHeader(cfg_dir))
1198 for f in sorted(os.listdir(cfg_dir)):
1199 if not f.endswith('.pyconf'):
1202 logger.write("%s\n" % cfilename)
1205 # Make sure the jobs_config option has been called
1206 if not options.jobs_cfg:
1207 message = _("The option --jobs_config is required\n")
1208 raise src.SatException( message )
1210 # Find the file in the directories
1212 for cfg_dir in l_cfg_dir:
1213 file_jobs_cfg = os.path.join(cfg_dir, options.jobs_cfg)
1214 if not file_jobs_cfg.endswith('.pyconf'):
1215 file_jobs_cfg += '.pyconf'
1217 if not os.path.exists(file_jobs_cfg):
1224 msg = _("The file configuration %(name_file)s was not found."
1225 "\nUse the --list option to get the possible files.")
1226 src.printcolors.printcError(msg)
1230 (_("Platform"), runner.cfg.VARS.dist),
1231 (_("File containing the jobs configuration"), file_jobs_cfg)
1233 src.print_info(logger, info)
1235 # Read the config that is in the file
1236 config_jobs = src.read_config_from_a_file(file_jobs_cfg)
1237 if options.only_jobs:
1238 l_jb = src.pyconf.Sequence()
1239 for jb in config_jobs.jobs:
1240 if jb.name in options.only_jobs:
1242 "Adding a job that was given in only_jobs option parameters")
1243 config_jobs.jobs = l_jb
1246 today_jobs = Jobs(runner,
1251 # SSH connection to all machines
1252 today_jobs.ssh_connection_all_machines()
1253 if options.test_connection:
1258 gui = Gui("/export/home/serioja/LOGS",
1260 today_jobs.ljobs_not_today,)
1262 today_jobs.gui = gui
1266 # Run all the jobs contained in config_jobs
1267 today_jobs.run_jobs()
1268 except KeyboardInterrupt:
1270 logger.write("\n\n%s\n\n" %
1271 (src.printcolors.printcWarning(_("Forced interruption"))), 1)
1274 # find the potential not finished jobs and kill them
1275 for jb in today_jobs.ljobs:
1276 if not jb.has_finished():
1277 jb.kill_remote_process()
1279 today_jobs.gui.last_update(_("Forced interruption"))
1281 today_jobs.gui.last_update()
1282 # Output the results
1283 today_jobs.write_all_results()