3 # Copyright (C) 2010-2013 CEA/DEN
5 # This library is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation; either
8 # version 2.1 of the License.
10 # This library is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 # Lesser General Public License for more details.
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this library; if not, write to the Free Software
17 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
27 parser = src.options.Options()
29 parser.add_option('j', 'jobs_config', 'string', 'jobs_cfg',
30 _('The name of the config file that contains'
31 ' the jobs configuration'))
32 parser.add_option('o', 'only_jobs', 'list2', 'only_jobs',
33 _('The list of jobs to launch, by their name. '))
34 parser.add_option('l', 'list', 'boolean', 'list',
35 _('list all available config files.'))
36 parser.add_option('n', 'no_label', 'boolean', 'no_label',
37 _("do not print labels, Works only with --list."), False)
38 parser.add_option('t', 'test_connection', 'boolean', 'test_connection',
39 _("Try to connect to the machines. Not executing the jobs."),
41 parser.add_option('p', 'publish', 'boolean', 'publish',
42 _("Generate an xml file that can be read in a browser to "
43 "display the jobs status."),
46 class Machine(object):
47 '''Class to manage a ssh connection on a machine
49 def __init__(self, name, host, user, port=22, passwd=None, sat_path="salomeTools"):
54 self.password = passwd
55 self.sat_path = sat_path
56 self.ssh = paramiko.SSHClient()
57 self._connection_successful = None
59 def connect(self, logger):
60 '''Initiate the ssh connection to the remote machine
62 :param logger src.logger.Logger: The logger instance
67 self._connection_successful = False
68 self.ssh.load_system_host_keys()
69 self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
71 self.ssh.connect(self.host,
74 password = self.password)
75 except paramiko.AuthenticationException:
76 message = src.KO_STATUS + _("Authentication failed")
77 except paramiko.BadHostKeyException:
78 message = (src.KO_STATUS +
79 _("The server's host key could not be verified"))
80 except paramiko.SSHException:
81 message = ( _("SSHException error connecting or "
82 "establishing an SSH session"))
84 message = ( _("Error connecting or establishing an SSH session"))
86 self._connection_successful = True
90 def successfully_connected(self, logger):
91 '''Verify if the connection to the remote machine has succeed
93 :param logger src.logger.Logger: The logger instance
94 :return: True if the connection has succeed, False if not
97 if self._connection_successful == None:
98 message = "Warning : trying to ask if the connection to "
99 "(host: %s, port: %s, user: %s) is OK whereas there were"
100 " no connection request" % \
101 (self.host, self.port, self.user)
102 logger.write( src.printcolors.printcWarning(message))
103 return self._connection_successful
105 def copy_sat(self, sat_local_path, job_file):
106 '''Copy salomeTools to the remote machine in self.sat_path
110 self.sftp = self.ssh.open_sftp()
111 self.mkdir(self.sat_path, ignore_existing=True)
112 self.put_dir(sat_local_path, self.sat_path, filters = ['.git'])
113 job_file_name = os.path.basename(job_file)
114 self.sftp.put(job_file, os.path.join(self.sat_path,
118 except Exception as e:
120 self._connection_successful = False
124 def put_dir(self, source, target, filters = []):
125 ''' Uploads the contents of the source directory to the target path. The
126 target directory needs to exists. All subdirectories in source are
127 created under target.
129 for item in os.listdir(source):
132 source_path = os.path.join(source, item)
133 destination_path = os.path.join(target, item)
134 if os.path.islink(source_path):
135 linkto = os.readlink(source_path)
137 self.sftp.symlink(linkto, destination_path)
138 self.sftp.chmod(destination_path,
139 os.stat(source_path).st_mode)
143 if os.path.isfile(source_path):
144 self.sftp.put(source_path, destination_path)
145 self.sftp.chmod(destination_path,
146 os.stat(source_path).st_mode)
148 self.mkdir(destination_path, ignore_existing=True)
149 self.put_dir(source_path, destination_path)
151 def mkdir(self, path, mode=511, ignore_existing=False):
152 ''' Augments mkdir by adding an option to not fail
156 self.sftp.mkdir(path, mode)
163 def exec_command(self, command, logger):
164 '''Execute the command on the remote machine
166 :param command str: The command to be run
167 :param logger src.logger.Logger: The logger instance
168 :return: the stdin, stdout, and stderr of the executing command,
170 :rtype: (paramiko.channel.ChannelFile, paramiko.channel.ChannelFile,
171 paramiko.channel.ChannelFile)
174 # Does not wait the end of the command
175 (stdin, stdout, stderr) = self.ssh.exec_command(command)
176 except paramiko.SSHException:
177 message = src.KO_STATUS + _(
178 ": the server failed to execute the command\n")
179 logger.write( src.printcolors.printcError(message))
180 return (None, None, None)
182 logger.write( src.printcolors.printcError(src.KO_STATUS + '\n'))
183 return (None, None, None)
185 return (stdin, stdout, stderr)
188 '''Close the ssh connection
194 def write_info(self, logger):
195 '''Prints the informations relative to the machine in the logger
196 (terminal traces and log file)
198 :param logger src.logger.Logger: The logger instance
202 logger.write("host : " + self.host + "\n")
203 logger.write("port : " + str(self.port) + "\n")
204 logger.write("user : " + str(self.user) + "\n")
205 if self.successfully_connected(logger):
206 status = src.OK_STATUS
208 status = src.KO_STATUS
209 logger.write("Connection : " + status + "\n\n")
213 '''Class to manage one job
215 def __init__(self, name, machine, application, distribution,
216 commands, timeout, config, logger, job_file, after=None):
219 self.machine = machine
221 self.timeout = timeout
222 self.application = application
223 self.distribution = distribution
226 # The list of log files to download from the remote machine
227 self.remote_log_files = []
229 # The remote command status
230 # -1 means that it has not been launched,
231 # 0 means success and 1 means fail
233 self.cancelled = False
237 self._has_begun = False
238 self._has_finished = False
239 self._has_timouted = False
240 self._stdin = None # Store the command inputs field
241 self._stdout = None # Store the command outputs field
242 self._stderr = None # Store the command errors field
244 self.out = None # Contains something only if the job is finished
245 self.err = None # Contains something only if the job is finished
247 self.commands = commands
248 self.command = (os.path.join(self.machine.sat_path, "sat") +
250 os.path.join(self.machine.sat_path,
251 "list_log_files.txt") +
252 " job --jobs_config " +
259 cmd_pid = 'ps aux | grep "' + self.command + '" | awk \'{print $2}\''
260 (_, out_pid, _) = self.machine.exec_command(cmd_pid, self.logger)
261 pids_cmd = out_pid.readlines()
262 pids_cmd = [str(src.only_numbers(pid)) for pid in pids_cmd]
266 def kill_remote_process(self):
267 '''Kills the process on the remote machine.
269 :return: (the output of the kill, the error of the kill)
273 pids = self.get_pids()
274 cmd_kill = " ; ".join([("kill -2 " + pid) for pid in pids])
275 (_, out_kill, err_kill) = self.machine.exec_command(cmd_kill,
277 return (out_kill, err_kill)
280 '''Returns True if the job has already begun
282 :return: True if the job has already begun
285 return self._has_begun
287 def has_finished(self):
288 '''Returns True if the job has already finished
289 (i.e. all the commands have been executed)
290 If it is finished, the outputs are stored in the fields out and err.
292 :return: True if the job has already finished
296 # If the method has already been called and returned True
297 if self._has_finished:
300 # If the job has not begun yet
301 if not self.has_begun():
304 if self._stdout.channel.closed:
305 self._has_finished = True
306 # Store the result outputs
307 self.out = self._stdout.read()
308 self.err = self._stderr.read()
310 self._Tf = time.time()
311 # And get the remote command status and log files
314 return self._has_finished
316 def get_log_files(self):
317 if not self.has_finished():
318 msg = _("Trying to get log files whereas the job is not finished.")
319 self.logger.write(src.printcolors.printcWarning(msg))
322 tmp_file_path = src.get_tmp_filename(self.config, "list_log_files.txt")
323 self.machine.sftp.get(
324 os.path.join(self.machine.sat_path, "list_log_files.txt"),
327 fstream_tmp = open(tmp_file_path, "r")
328 file_lines = fstream_tmp.readlines()
329 file_lines = [line.replace("\n", "") for line in file_lines]
331 os.remove(tmp_file_path)
332 self.res_job = file_lines[0]
333 for job_path_remote in file_lines[1:]:
334 if os.path.basename(os.path.dirname(job_path_remote)) != 'OUT':
335 local_path = os.path.join(os.path.dirname(
336 self.logger.logFilePath),
337 os.path.basename(job_path_remote))
338 if not os.path.exists(local_path):
339 self.machine.sftp.get(job_path_remote, local_path)
341 local_path = os.path.join(os.path.dirname(
342 self.logger.logFilePath),
344 os.path.basename(job_path_remote))
345 if not os.path.exists(local_path):
346 self.machine.sftp.get(job_path_remote, local_path)
347 self.remote_log_files.append(local_path)
349 def has_failed(self):
350 '''Returns True if the job has failed.
351 A job is considered as failed if the machine could not be reached,
352 if the remote command failed,
353 or if the job finished with a time out.
355 :return: True if the job has failed
358 if not self.has_finished():
360 if not self.machine.successfully_connected(self.logger):
362 if self.is_timeout():
364 if self.res_job == "1":
369 """In case of a failing job, one has to cancel every job that depend
370 on it. This method put the job as failed and will not be executed.
372 self._has_begun = True
373 self._has_finished = True
374 self.cancelled = True
375 self.out = _("This job was not launched because its father has failed.")
376 self.err = _("This job was not launched because its father has failed.")
378 def is_running(self):
379 '''Returns True if the job commands are running
381 :return: True if the job is running
384 return self.has_begun() and not self.has_finished()
386 def is_timeout(self):
387 '''Returns True if the job commands has finished with timeout
389 :return: True if the job has finished with timeout
392 return self._has_timouted
394 def time_elapsed(self):
395 if not self.has_begun():
398 return T_now - self._T0
400 def check_time(self):
401 if not self.has_begun():
403 if self.time_elapsed() > self.timeout:
404 self._has_finished = True
405 self._has_timouted = True
406 self._Tf = time.time()
408 (out_kill, _) = self.kill_remote_process()
409 self.out = "TIMEOUT \n" + out_kill.read()
410 self.err = "TIMEOUT : %s seconds elapsed\n" % str(self.timeout)
412 def total_duration(self):
413 return self._Tf - self._T0
415 def run(self, logger):
417 print("Warn the user that a job can only be launched one time")
420 if not self.machine.successfully_connected(logger):
421 self._has_finished = True
423 self.err = ("Connection to machine (name : %s, host: %s, port:"
424 " %s, user: %s) has failed\nUse the log command "
425 "to get more information."
426 % (self.machine.name,
431 self._T0 = time.time()
432 self._stdin, self._stdout, self._stderr = self.machine.exec_command(
433 self.command, logger)
434 if (self._stdin, self._stdout, self._stderr) == (None, None, None):
435 self._has_finished = True
436 self._Tf = time.time()
438 self.err = "The server failed to execute the command"
440 self._has_begun = True
442 def write_results(self, logger):
443 logger.write("name : " + self.name + "\n")
445 logger.write("after : %s\n" % self.after)
446 logger.write("Time elapsed : %4imin %2is \n" %
447 (self.total_duration()/60 , self.total_duration()%60))
449 logger.write("Begin time : %s\n" %
450 time.strftime('%Y-%m-%d %H:%M:%S',
451 time.localtime(self._T0)) )
453 logger.write("End time : %s\n\n" %
454 time.strftime('%Y-%m-%d %H:%M:%S',
455 time.localtime(self._Tf)) )
457 machine_head = "Informations about connection :\n"
458 underline = (len(machine_head) - 2) * "-"
459 logger.write(src.printcolors.printcInfo(machine_head+underline+"\n"))
460 self.machine.write_info(logger)
462 logger.write(src.printcolors.printcInfo("out : \n"))
464 logger.write("Unable to get output\n")
466 logger.write(self.out + "\n")
467 logger.write(src.printcolors.printcInfo("err : \n"))
469 logger.write("Unable to get error\n")
471 logger.write(self.err + "\n")
473 def get_status(self):
474 if not self.machine.successfully_connected(self.logger):
475 return "SSH connection KO"
476 if not self.has_begun():
477 return "Not launched"
480 if self.is_running():
481 return "running since " + time.strftime('%Y-%m-%d %H:%M:%S',
482 time.localtime(self._T0))
483 if self.has_finished():
484 if self.is_timeout():
485 return "Timeout since " + time.strftime('%Y-%m-%d %H:%M:%S',
486 time.localtime(self._Tf))
487 return "Finished since " + time.strftime('%Y-%m-%d %H:%M:%S',
488 time.localtime(self._Tf))
491 '''Class to manage the jobs to be run
499 lenght_columns = 20):
500 # The jobs configuration
501 self.cfg_jobs = config_jobs
502 self.job_file = job_file
503 self.job_file_path = job_file_path
504 # The machine that will be used today
506 # The list of machine (hosts, port) that will be used today
507 # (a same host can have several machine instances since there
508 # can be several ssh parameters)
510 # The jobs to be launched today
512 # The jobs that will not be launched today
513 self.ljobsdef_not_today = []
516 # The correlation dictionary between jobs and machines
517 self.dic_job_machine = {}
518 self.len_columns = lenght_columns
520 # the list of jobs that have not been run yet
521 self._l_jobs_not_started = []
522 # the list of jobs that have already ran
523 self._l_jobs_finished = []
524 # the list of jobs that are running
525 self._l_jobs_running = []
527 self.determine_products_and_machines()
529 def define_job(self, job_def, machine):
530 '''Takes a pyconf job definition and a machine (from class machine)
531 and returns the job instance corresponding to the definition.
533 :param job_def src.config.Mapping: a job definition
534 :param machine machine: the machine on which the job will run
535 :return: The corresponding job in a job class instance
539 cmmnds = job_def.commands
540 timeout = job_def.timeout
542 if 'after' in job_def:
543 after = job_def.after
545 if 'application' in job_def:
546 application = job_def.application
548 if 'distribution' in job_def:
549 distribution = job_def.distribution
562 def determine_products_and_machines(self):
563 '''Function that reads the pyconf jobs definition and instantiates all
564 the machines and jobs to be done today.
569 today = datetime.date.weekday(datetime.date.today())
572 for job_def in self.cfg_jobs.jobs :
573 if today in job_def.when:
575 name_machine = job_def.machine
578 for mach in self.lmachines:
579 if mach.name == name_machine:
583 if a_machine == None:
584 for machine_def in self.cfg_jobs.machines:
585 if machine_def.name == name_machine:
586 if 'host' not in machine_def:
587 host = self.runner.cfg.VARS.hostname
589 host = machine_def.host
591 if 'user' not in machine_def:
592 user = self.runner.cfg.VARS.user
594 user = machine_def.user
596 if 'port' not in machine_def:
599 port = machine_def.port
601 if 'password' not in machine_def:
604 passwd = machine_def.password
606 if 'sat_path' not in machine_def:
607 sat_path = "salomeTools"
609 sat_path = machine_def.sat_path
620 if (host, port) not in host_list:
621 host_list.append((host, port))
623 self.lmachines.append(a_machine)
625 if a_machine == None:
626 msg = _("WARNING: The job \"%(job_name)s\" requires the "
627 "machine \"%(machine_name)s\" but this machine "
628 "is not defined in the configuration file.\n"
629 "The job will not be launched")
630 self.logger.write(src.printcolors.printcWarning(msg))
632 a_job = self.define_job(job_def, a_machine)
633 self.dic_job_machine[a_job] = a_machine
635 self.ljobs.append(a_job)
636 else: # today in job_def.when
637 self.ljobsdef_not_today.append(job_def)
639 self.lhosts = host_list
641 def ssh_connection_all_machines(self, pad=50):
642 '''Function that do the ssh connection to every machine
648 self.logger.write(src.printcolors.printcInfo((
649 "Establishing connection with all the machines :\n")))
650 for machine in self.lmachines:
651 # little algorithm in order to display traces
652 begin_line = (_("Connection to %s: " % machine.name))
653 if pad - len(begin_line) < 0:
656 endline = (pad - len(begin_line)) * "." + " "
658 step = "SSH connection"
659 self.logger.write( begin_line + endline + step)
661 # the call to the method that initiate the ssh connection
662 msg = machine.connect(self.logger)
664 # Copy salomeTools to the remote machine
665 if machine.successfully_connected(self.logger):
667 self.logger.write('\r%s%s%s' % (begin_line, endline, 20 * " "),3)
668 self.logger.write('\r%s%s%s' % (begin_line, endline, step), 3)
670 res_copy = machine.copy_sat(self.runner.cfg.VARS.salometoolsway,
672 # Print the status of the copy
674 self.logger.write('\r%s' %
675 ((len(begin_line)+len(endline)+20) * " "), 3)
676 self.logger.write('\r%s%s%s' %
679 src.printcolors.printc(src.OK_STATUS)), 3)
681 self.logger.write('\r%s' %
682 ((len(begin_line)+len(endline)+20) * " "), 3)
683 self.logger.write('\r%s%s%s %s' %
686 src.printcolors.printc(src.OK_STATUS),
687 _("Copy of SAT failed")), 3)
689 self.logger.write('\r%s' %
690 ((len(begin_line)+len(endline)+20) * " "), 3)
691 self.logger.write('\r%s%s%s %s' %
694 src.printcolors.printc(src.KO_STATUS),
696 self.logger.write("\n", 3)
698 self.logger.write("\n")
701 def is_occupied(self, hostname):
702 '''Function that returns True if a job is running on
703 the machine defined by its host and its port.
705 :param hostname (str, int): the pair (host, port)
706 :return: the job that is running on the host,
707 or false if there is no job running on the host.
712 for jb in self.dic_job_machine:
713 if jb.machine.host == host and jb.machine.port == port:
718 def update_jobs_states_list(self):
719 '''Function that updates the lists that store the currently
720 running jobs and the jobs that have already finished.
725 jobs_finished_list = []
726 jobs_running_list = []
727 for jb in self.dic_job_machine:
729 jobs_running_list.append(jb)
731 if jb.has_finished():
732 jobs_finished_list.append(jb)
734 nb_job_finished_before = len(self._l_jobs_finished)
735 self._l_jobs_finished = jobs_finished_list
736 self._l_jobs_running = jobs_running_list
738 nb_job_finished_now = len(self._l_jobs_finished)
740 return nb_job_finished_now > nb_job_finished_before
742 def cancel_dependencies_of_failing_jobs(self):
743 '''Function that cancels all the jobs that depend on a failing one.
749 for job in self.ljobs:
750 if job.after is None:
752 father_job = self.find_job_that_has_name(job.after)
753 if father_job.has_failed():
756 def find_job_that_has_name(self, name):
757 '''Returns the job by its name.
759 :param name str: a job name
760 :return: the job that has the name.
763 for jb in self.ljobs:
767 # the following is executed only if the job was not found
768 msg = _('The job "%s" seems to be nonexistent') % name
769 raise src.SatException(msg)
771 def str_of_length(self, text, length):
772 '''Takes a string text of any length and returns
773 the most close string of length "length".
775 :param text str: any string
776 :param length int: a length for the returned string
777 :return: the most close string of length "length"
780 if len(text) > length:
781 text_out = text[:length-3] + '...'
783 diff = length - len(text)
784 before = " " * (diff/2)
785 after = " " * (diff/2 + diff%2)
786 text_out = before + text + after
790 def display_status(self, len_col):
791 '''Takes a lenght and construct the display of the current status
792 of the jobs in an array that has a column for each host.
793 It displays the job that is currently running on the host
796 :param len_col int: the size of the column
802 for host_port in self.lhosts:
803 jb = self.is_occupied(host_port)
804 if not jb: # nothing running on the host
805 empty = self.str_of_length("empty", len_col)
806 display_line += "|" + empty
808 display_line += "|" + src.printcolors.printcInfo(
809 self.str_of_length(jb.name, len_col))
811 self.logger.write("\r" + display_line + "|")
816 '''The main method. Runs all the jobs on every host.
817 For each host, at a given time, only one job can be running.
818 The jobs that have the field after (that contain the job that has
819 to be run before it) are run after the previous job.
820 This method stops when all the jobs are finished.
827 self.logger.write(src.printcolors.printcInfo(
828 _('Executing the jobs :\n')))
830 for host_port in self.lhosts:
833 if port == 22: # default value
834 text_line += "|" + self.str_of_length(host, self.len_columns)
836 text_line += "|" + self.str_of_length(
837 "("+host+", "+str(port)+")", self.len_columns)
839 tiret_line = " " + "-"*(len(text_line)-1) + "\n"
840 self.logger.write(tiret_line)
841 self.logger.write(text_line + "|\n")
842 self.logger.write(tiret_line)
845 # The infinite loop that runs the jobs
846 l_jobs_not_started = self.dic_job_machine.keys()
847 while len(self._l_jobs_finished) != len(self.dic_job_machine.keys()):
848 new_job_start = False
849 for host_port in self.lhosts:
851 if self.is_occupied(host_port):
854 for jb in l_jobs_not_started:
855 if (jb.machine.host, jb.machine.port) != host_port:
859 l_jobs_not_started.remove(jb)
863 jb_before = self.find_job_that_has_name(jb.after)
864 if jb_before.has_finished():
866 l_jobs_not_started.remove(jb)
869 self.cancel_dependencies_of_failing_jobs()
870 new_job_finished = self.update_jobs_states_list()
872 if new_job_start or new_job_finished:
873 self.gui.update_xml_file(self.ljobs)
874 # Display the current status
875 self.display_status(self.len_columns)
877 # Make sure that the proc is not entirely busy
880 self.logger.write("\n")
881 self.logger.write(tiret_line)
882 self.logger.write("\n\n")
884 self.gui.update_xml_file(self.ljobs)
885 self.gui.last_update()
887 def write_all_results(self):
888 '''Display all the jobs outputs.
894 for jb in self.dic_job_machine.keys():
895 self.logger.write(src.printcolors.printcLabel(
896 "#------- Results for job %s -------#\n" % jb.name))
897 jb.write_results(self.logger)
898 self.logger.write("\n\n")
901 '''Class to manage the the xml data that can be displayed in a browser to
906 <?xml version='1.0' encoding='utf-8'?>
907 <?xml-stylesheet type='text/xsl' href='job_report.xsl'?>
910 <info name="generated" value="2016-06-02 07:06:45"/>
913 <host name=is221553 port=22 distribution=UB12.04/>
914 <host name=is221560 port=22/>
915 <host name=is221553 port=22 distribution=FD20/>
918 <application name=SALOME-7.8.0/>
919 <application name=SALOME-master/>
920 <application name=MED-STANDALONE-master/>
921 <application name=CORPUS/>
925 <job name="7.8.0 FD22">
926 <host>is228809</host>
928 <application>SALOME-7.8.0</application>
929 <user>adminuser</user>
930 <timeout>240</timeout>
932 export DISPLAY=is221560
933 scp -p salome@is221560.intra.cea.fr:/export/home/salome/SALOME-7.7.1p1-src.tgz /local/adminuser
934 tar xf /local/adminuser/SALOME-7.7.1p1-src.tgz -C /local/adminuser
936 <state>Not launched</state>
939 <job name="master MG05">
940 <host>is221560</host>
942 <application>SALOME-master</application>
944 <timeout>240</timeout>
946 export DISPLAY=is221560
947 scp -p salome@is221560.intra.cea.fr:/export/home/salome/SALOME-7.7.1p1-src.tgz /local/adminuser
948 sat prepare SALOME-master
949 sat compile SALOME-master
950 sat check SALOME-master
951 sat launcher SALOME-master
952 sat test SALOME-master
954 <state>Running since 23 min</state>
955 <!-- <state>time out</state> -->
956 <!-- <state>OK</state> -->
957 <!-- <state>KO</state> -->
958 <begin>10/05/2016 20h32</begin>
959 <end>10/05/2016 22h59</end>
967 def __init__(self, xml_file_path, l_jobs, l_jobs_not_today, stylesheet):
968 # The path of the xml file
969 self.xml_file_path = xml_file_path
971 self.stylesheet = stylesheet
972 # Open the file in a writing stream
973 self.xml_file = src.xmlManager.XmlLogFile(xml_file_path, "JobsReport")
974 # Create the lines and columns
975 self.initialize_array(l_jobs, l_jobs_not_today)
977 self.update_xml_file(l_jobs)
979 def initialize_array(self, l_jobs, l_jobs_not_today):
983 distrib = job.distribution
984 if distrib is not None and distrib not in l_dist:
985 l_dist.append(distrib)
987 application = job.application
988 if application is not None and application not in l_applications:
989 l_applications.append(application)
991 for job_def in l_jobs_not_today:
992 distrib = src.get_cfg_param(job_def, "distribution", "nothing")
993 if distrib is not "nothing" and distrib not in l_dist:
994 l_dist.append(distrib)
996 application = src.get_cfg_param(job_def, "application", "nothing")
997 if application is not "nothing" and application not in l_applications:
998 l_applications.append(application)
1000 self.l_dist = l_dist
1001 self.l_applications = l_applications
1003 # Update the hosts node
1004 self.xmldists = self.xml_file.add_simple_node("distributions")
1005 for dist_name in self.l_dist:
1006 src.xmlManager.add_simple_node(self.xmldists, "dist", attrib={"name" : dist_name})
1008 # Update the applications node
1009 self.xmlapplications = self.xml_file.add_simple_node("applications")
1010 for application in self.l_applications:
1011 src.xmlManager.add_simple_node(self.xmlapplications, "application", attrib={"name" : application})
1013 # Initialize the jobs node
1014 self.xmljobs = self.xml_file.add_simple_node("jobs")
1017 self.put_jobs_not_today(l_jobs_not_today)
1019 # Initialize the info node (when generated)
1020 self.xmlinfos = self.xml_file.add_simple_node("infos", attrib={"name" : "last update", "JobsCommandStatus" : "running"})
1022 def put_jobs_not_today(self, l_jobs_not_today):
1023 for job_def in l_jobs_not_today:
1024 xmlj = src.xmlManager.add_simple_node(self.xmljobs, "job", attrib={"name" : job_def.name})
1025 src.xmlManager.add_simple_node(xmlj, "application", src.get_cfg_param(job_def, "application", "nothing"))
1026 src.xmlManager.add_simple_node(xmlj, "distribution", src.get_cfg_param(job_def, "distribution", "nothing"))
1027 src.xmlManager.add_simple_node(xmlj, "commands", " ; ".join(job_def.commands))
1028 src.xmlManager.add_simple_node(xmlj, "state", "Not today")
1030 def update_xml_file(self, l_jobs):
1032 # Update the job names and status node
1034 # Find the node corresponding to the job and delete it
1035 # in order to recreate it
1036 for xmljob in self.xmljobs.findall('job'):
1037 if xmljob.attrib['name'] == job.name:
1038 self.xmljobs.remove(xmljob)
1042 T0 = time.strftime('%Y-%m-%d %H:%M:%S',
1043 time.localtime(job._T0))
1046 Tf = time.strftime('%Y-%m-%d %H:%M:%S',
1047 time.localtime(job._Tf))
1049 # recreate the job node
1050 xmlj = src.xmlManager.add_simple_node(self.xmljobs, "job", attrib={"name" : job.name})
1051 src.xmlManager.add_simple_node(xmlj, "host", job.machine.host)
1052 src.xmlManager.add_simple_node(xmlj, "port", str(job.machine.port))
1053 src.xmlManager.add_simple_node(xmlj, "user", job.machine.user)
1054 src.xmlManager.add_simple_node(xmlj, "sat_path", job.machine.sat_path)
1055 src.xmlManager.add_simple_node(xmlj, "application", job.application)
1056 src.xmlManager.add_simple_node(xmlj, "distribution", job.distribution)
1057 src.xmlManager.add_simple_node(xmlj, "timeout", str(job.timeout))
1058 src.xmlManager.add_simple_node(xmlj, "commands", " ; ".join(job.commands))
1059 src.xmlManager.add_simple_node(xmlj, "state", job.get_status())
1060 src.xmlManager.add_simple_node(xmlj, "begin", T0)
1061 src.xmlManager.add_simple_node(xmlj, "end", Tf)
1062 src.xmlManager.add_simple_node(xmlj, "out", src.printcolors.cleancolor(job.out))
1063 src.xmlManager.add_simple_node(xmlj, "err", src.printcolors.cleancolor(job.err))
1064 src.xmlManager.add_simple_node(xmlj, "res", str(job.res_job))
1065 if len(job.remote_log_files) > 0:
1066 src.xmlManager.add_simple_node(xmlj, "remote_log_file_path", job.remote_log_files[0])
1068 src.xmlManager.add_simple_node(xmlj, "remote_log_file_path", "nothing")
1070 xmlafter = src.xmlManager.add_simple_node(xmlj, "after", job.after)
1071 # get the job father
1072 if job.after is not None:
1075 if jb.name == job.after:
1077 if job_father is None:
1078 msg = _("The job %(father_name)s that is parent of "
1079 "%(son_name)s is not in the job list." %
1080 {"father_name" : job.after , "son_name" : job.name})
1081 raise src.SatException(msg)
1083 if len(job_father.remote_log_files) > 0:
1084 link = job_father.remote_log_files[0]
1087 src.xmlManager.append_node_attrib(xmlafter, {"link" : link})
1091 src.xmlManager.append_node_attrib(self.xmlinfos,
1093 datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")})
1096 self.write_xml_file()
1098 def last_update(self, finish_status = "finished"):
1099 src.xmlManager.append_node_attrib(self.xmlinfos,
1100 attrib={"JobsCommandStatus" : finish_status})
1102 self.write_xml_file()
1104 def write_xml_file(self):
1105 self.xml_file.write_tree(self.stylesheet)
1108 # Describes the command
1110 return _("The jobs command launches maintenances that are described"
1111 " in the dedicated jobs configuration file.")
1115 def run(args, runner, logger):
1117 (options, args) = parser.parse_args(args)
1119 jobs_cfg_files_dir = runner.cfg.SITE.jobs.config_path
1121 l_cfg_dir = [jobs_cfg_files_dir, os.path.join(runner.cfg.VARS.datadir, "jobs")]
1123 # Make sure the path to the jobs config files directory exists
1124 src.ensure_path_exists(jobs_cfg_files_dir)
1126 # list option : display all the available config files
1128 for cfg_dir in l_cfg_dir:
1129 if not options.no_label:
1130 logger.write("------ %s\n" %
1131 src.printcolors.printcHeader(cfg_dir))
1133 for f in sorted(os.listdir(cfg_dir)):
1134 if not f.endswith('.pyconf'):
1137 logger.write("%s\n" % cfilename)
1140 # Make sure the jobs_config option has been called
1141 if not options.jobs_cfg:
1142 message = _("The option --jobs_config is required\n")
1143 raise src.SatException( message )
1145 # Find the file in the directories
1147 for cfg_dir in l_cfg_dir:
1148 file_jobs_cfg = os.path.join(cfg_dir, options.jobs_cfg)
1149 if not file_jobs_cfg.endswith('.pyconf'):
1150 file_jobs_cfg += '.pyconf'
1152 if not os.path.exists(file_jobs_cfg):
1159 msg = _("The file configuration %(name_file)s was not found."
1160 "\nUse the --list option to get the possible files.")
1161 src.printcolors.printcError(msg)
1165 (_("Platform"), runner.cfg.VARS.dist),
1166 (_("File containing the jobs configuration"), file_jobs_cfg)
1168 src.print_info(logger, info)
1170 # Read the config that is in the file
1171 config_jobs = src.read_config_from_a_file(file_jobs_cfg)
1172 if options.only_jobs:
1173 l_jb = src.pyconf.Sequence()
1174 for jb in config_jobs.jobs:
1175 if jb.name in options.only_jobs:
1177 "Adding a job that was given in only_jobs option parameters")
1178 config_jobs.jobs = l_jb
1181 today_jobs = Jobs(runner, logger, options.jobs_cfg, file_jobs_cfg, config_jobs)
1182 # SSH connection to all machines
1183 today_jobs.ssh_connection_all_machines()
1184 if options.test_connection:
1189 gui = Gui("/export/home/serioja/LOGS/test.xml", today_jobs.ljobs, today_jobs.ljobsdef_not_today, "job_report.xsl")
1191 today_jobs.gui = gui
1195 # Run all the jobs contained in config_jobs
1196 today_jobs.run_jobs()
1197 except KeyboardInterrupt:
1199 logger.write("\n\n%s\n\n" %
1200 (src.printcolors.printcWarning(_("Forced interruption"))), 1)
1203 # find the potential not finished jobs and kill them
1204 for jb in today_jobs.ljobs:
1205 if not jb.has_finished():
1206 jb.kill_remote_process()
1208 today_jobs.gui.last_update(_("Forced interruption"))
1210 today_jobs.gui.last_update()
1211 # Output the results
1212 today_jobs.write_all_results()