3 # Copyright (C) 2010-2013 CEA/DEN
5 # This library is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation; either
8 # version 2.1 of the License.
10 # This library is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 # Lesser General Public License for more details.
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this library; if not, write to the Free Software
17 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
26 STYLESHEET_GLOBAL = "jobs_global_report.xsl"
27 STYLESHEET_TABLE = "jobs_table_report.xsl"
29 parser = src.options.Options()
31 parser.add_option('j', 'jobs_config', 'string', 'jobs_cfg',
32 _('The name of the config file that contains'
33 ' the jobs configuration'))
34 parser.add_option('o', 'only_jobs', 'list2', 'only_jobs',
35 _('The list of jobs to launch, by their name. '))
36 parser.add_option('l', 'list', 'boolean', 'list',
37 _('list all available config files.'))
38 parser.add_option('n', 'no_label', 'boolean', 'no_label',
39 _("do not print labels, Works only with --list."), False)
40 parser.add_option('t', 'test_connection', 'boolean', 'test_connection',
41 _("Try to connect to the machines. Not executing the jobs."),
43 parser.add_option('p', 'publish', 'boolean', 'publish',
44 _("Generate an xml file that can be read in a browser to "
45 "display the jobs status."),
48 class Machine(object):
49 '''Class to manage a ssh connection on a machine
57 sat_path="salomeTools"):
61 self.distribution = None # Will be filled after copying SAT on the machine
63 self.password = passwd
64 self.sat_path = sat_path
65 self.ssh = paramiko.SSHClient()
66 self._connection_successful = None
68 def connect(self, logger):
69 '''Initiate the ssh connection to the remote machine
71 :param logger src.logger.Logger: The logger instance
76 self._connection_successful = False
77 self.ssh.load_system_host_keys()
78 self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
80 self.ssh.connect(self.host,
83 password = self.password)
84 except paramiko.AuthenticationException:
85 message = src.KO_STATUS + _("Authentication failed")
86 except paramiko.BadHostKeyException:
87 message = (src.KO_STATUS +
88 _("The server's host key could not be verified"))
89 except paramiko.SSHException:
90 message = ( _("SSHException error connecting or "
91 "establishing an SSH session"))
93 message = ( _("Error connecting or establishing an SSH session"))
95 self._connection_successful = True
99 def successfully_connected(self, logger):
100 '''Verify if the connection to the remote machine has succeed
102 :param logger src.logger.Logger: The logger instance
103 :return: True if the connection has succeed, False if not
106 if self._connection_successful == None:
107 message = _("Warning : trying to ask if the connection to "
108 "(name: %s host: %s, port: %s, user: %s) is OK whereas there were"
109 " no connection request" %
110 (self.name, self.host, self.port, self.user))
111 logger.write( src.printcolors.printcWarning(message))
112 return self._connection_successful
114 def copy_sat(self, sat_local_path, job_file):
115 '''Copy salomeTools to the remote machine in self.sat_path
119 # open a sftp connection
120 self.sftp = self.ssh.open_sftp()
121 # Create the sat directory on remote machine if it is not existing
122 self.mkdir(self.sat_path, ignore_existing=True)
124 self.put_dir(sat_local_path, self.sat_path, filters = ['.git'])
125 # put the job configuration file in order to make it reachable
126 # on the remote machine
127 job_file_name = os.path.basename(job_file)
128 self.sftp.put(job_file, os.path.join(self.sat_path,
132 except Exception as e:
134 self._connection_successful = False
138 def put_dir(self, source, target, filters = []):
139 ''' Uploads the contents of the source directory to the target path. The
140 target directory needs to exists. All subdirectories in source are
141 created under target.
143 for item in os.listdir(source):
146 source_path = os.path.join(source, item)
147 destination_path = os.path.join(target, item)
148 if os.path.islink(source_path):
149 linkto = os.readlink(source_path)
151 self.sftp.symlink(linkto, destination_path)
152 self.sftp.chmod(destination_path,
153 os.stat(source_path).st_mode)
157 if os.path.isfile(source_path):
158 self.sftp.put(source_path, destination_path)
159 self.sftp.chmod(destination_path,
160 os.stat(source_path).st_mode)
162 self.mkdir(destination_path, ignore_existing=True)
163 self.put_dir(source_path, destination_path)
165 def mkdir(self, path, mode=511, ignore_existing=False):
166 ''' Augments mkdir by adding an option to not fail
170 self.sftp.mkdir(path, mode)
177 def exec_command(self, command, logger):
178 '''Execute the command on the remote machine
180 :param command str: The command to be run
181 :param logger src.logger.Logger: The logger instance
182 :return: the stdin, stdout, and stderr of the executing command,
184 :rtype: (paramiko.channel.ChannelFile, paramiko.channel.ChannelFile,
185 paramiko.channel.ChannelFile)
188 # Does not wait the end of the command
189 (stdin, stdout, stderr) = self.ssh.exec_command(command)
190 except paramiko.SSHException:
191 message = src.KO_STATUS + _(
192 ": the server failed to execute the command\n")
193 logger.write( src.printcolors.printcError(message))
194 return (None, None, None)
196 logger.write( src.printcolors.printcError(src.KO_STATUS + '\n'))
197 return (None, None, None)
199 return (stdin, stdout, stderr)
202 '''Close the ssh connection
208 def write_info(self, logger):
209 '''Prints the informations relative to the machine in the logger
210 (terminal traces and log file)
212 :param logger src.logger.Logger: The logger instance
216 logger.write("host : " + self.host + "\n")
217 logger.write("port : " + str(self.port) + "\n")
218 logger.write("user : " + str(self.user) + "\n")
219 if self.successfully_connected(logger):
220 status = src.OK_STATUS
222 status = src.KO_STATUS
223 logger.write("Connection : " + status + "\n\n")
227 '''Class to manage one job
229 def __init__(self, name, machine, application, table,
230 commands, timeout, config, logger, job_file, after=None):
233 self.machine = machine
235 self.timeout = timeout
236 self.application = application
240 # The list of log files to download from the remote machine
241 self.remote_log_files = []
243 # The remote command status
244 # -1 means that it has not been launched,
245 # 0 means success and 1 means fail
247 self.cancelled = False
251 self._has_begun = False
252 self._has_finished = False
253 self._has_timouted = False
254 self._stdin = None # Store the command inputs field
255 self._stdout = None # Store the command outputs field
256 self._stderr = None # Store the command errors field
258 self.out = None # Contains something only if the job is finished
259 self.err = None # Contains something only if the job is finished
261 self.commands = commands
262 self.command = (os.path.join(self.machine.sat_path, "sat") +
264 os.path.join(self.machine.sat_path,
265 "list_log_files.txt") +
266 " job --jobs_config " +
272 """ Get the pid(s) corresponding to the command that have been launched
273 On the remote machine
275 :return: The list of integers corresponding to the found pids
279 cmd_pid = 'ps aux | grep "' + self.command + '" | awk \'{print $2}\''
280 (_, out_pid, _) = self.machine.exec_command(cmd_pid, self.logger)
281 pids_cmd = out_pid.readlines()
282 pids_cmd = [str(src.only_numbers(pid)) for pid in pids_cmd]
286 def kill_remote_process(self, wait=1):
287 '''Kills the process on the remote machine.
289 :return: (the output of the kill, the error of the kill)
293 pids = self.get_pids()
294 cmd_kill = " ; ".join([("kill -2 " + pid) for pid in pids])
295 (_, out_kill, err_kill) = self.machine.exec_command(cmd_kill,
298 return (out_kill, err_kill)
301 '''Returns True if the job has already begun
303 :return: True if the job has already begun
306 return self._has_begun
308 def has_finished(self):
309 '''Returns True if the job has already finished
310 (i.e. all the commands have been executed)
311 If it is finished, the outputs are stored in the fields out and err.
313 :return: True if the job has already finished
317 # If the method has already been called and returned True
318 if self._has_finished:
321 # If the job has not begun yet
322 if not self.has_begun():
325 if self._stdout.channel.closed:
326 self._has_finished = True
327 # Store the result outputs
328 self.out = self._stdout.read().decode()
329 self.err = self._stderr.read().decode()
331 self._Tf = time.time()
332 # And get the remote command status and log files
335 return self._has_finished
337 def get_log_files(self):
338 """Get the log files produced by the command launched
339 on the remote machine.
341 # Do not get the files if the command is not finished
342 if not self.has_finished():
343 msg = _("Trying to get log files whereas the job is not finished.")
344 self.logger.write(src.printcolors.printcWarning(msg))
347 # First get the file that contains the list of log files to get
348 tmp_file_path = src.get_tmp_filename(self.config, "list_log_files.txt")
349 self.machine.sftp.get(
350 os.path.join(self.machine.sat_path, "list_log_files.txt"),
353 # Read the file and get the result of the command and all the log files
355 fstream_tmp = open(tmp_file_path, "r")
356 file_lines = fstream_tmp.readlines()
357 file_lines = [line.replace("\n", "") for line in file_lines]
359 os.remove(tmp_file_path)
360 # The first line is the result of the command (0 success or 1 fail)
361 self.res_job = file_lines[0]
363 for i, job_path_remote in enumerate(file_lines[1:]):
365 # For each command, there is two files to get :
366 # 1- The xml file describing the command and giving the
368 # 2- The txt file containing the system command traces (like
369 # traces produced by the "make" command)
370 if os.path.basename(os.path.dirname(job_path_remote)) != 'OUT':
372 local_path = os.path.join(os.path.dirname(
373 self.logger.logFilePath),
374 os.path.basename(job_path_remote))
375 if i==0: # The first is the job command
376 self.logger.add_link(os.path.basename(job_path_remote),
382 local_path = os.path.join(os.path.dirname(
383 self.logger.logFilePath),
385 os.path.basename(job_path_remote))
387 if not os.path.exists(local_path):
388 self.machine.sftp.get(job_path_remote, local_path)
389 self.remote_log_files.append(local_path)
390 except Exception as e:
391 self.err += _("Unable to get %s log file from remote: %s" %
392 (job_path_remote, str(e)))
394 def has_failed(self):
395 '''Returns True if the job has failed.
396 A job is considered as failed if the machine could not be reached,
397 if the remote command failed,
398 or if the job finished with a time out.
400 :return: True if the job has failed
403 if not self.has_finished():
405 if not self.machine.successfully_connected(self.logger):
407 if self.is_timeout():
409 if self.res_job == "1":
414 """In case of a failing job, one has to cancel every job that depend
415 on it. This method put the job as failed and will not be executed.
417 self._has_begun = True
418 self._has_finished = True
419 self.cancelled = True
420 self.out = _("This job was not launched because its father has failed.")
421 self.err = _("This job was not launched because its father has failed.")
423 def is_running(self):
424 '''Returns True if the job commands are running
426 :return: True if the job is running
429 return self.has_begun() and not self.has_finished()
431 def is_timeout(self):
432 '''Returns True if the job commands has finished with timeout
434 :return: True if the job has finished with timeout
437 return self._has_timouted
439 def time_elapsed(self):
440 """Get the time elapsed since the job launching
442 :return: The number of seconds
445 if not self.has_begun():
448 return T_now - self._T0
450 def check_time(self):
451 """Verify that the job has not exceeded its timeout.
452 If it has, kill the remote command and consider the job as finished.
454 if not self.has_begun():
456 if self.time_elapsed() > self.timeout:
457 self._has_finished = True
458 self._has_timouted = True
459 self._Tf = time.time()
461 (out_kill, _) = self.kill_remote_process()
462 self.out = "TIMEOUT \n" + out_kill.read().decode()
463 self.err = "TIMEOUT : %s seconds elapsed\n" % str(self.timeout)
466 except Exception as e:
467 self.err += _("Unable to get remote log files: %s" % e)
469 def total_duration(self):
470 """Give the total duration of the job
472 :return: the total duration of the job in seconds
475 return self._Tf - self._T0
478 """Launch the job by executing the remote command.
481 # Prevent multiple run
483 msg = _("Warning: A job can only be launched one time")
484 msg2 = _("Trying to launch the job \"%s\" whereas it has "
485 "already been launched." % self.name)
486 self.logger.write(src.printcolors.printcWarning("%s\n%s\n" % (msg,
490 # Do not execute the command if the machine could not be reached
491 if not self.machine.successfully_connected(self.logger):
492 self._has_finished = True
494 self.err = ("Connection to machine (name : %s, host: %s, port:"
495 " %s, user: %s) has failed\nUse the log command "
496 "to get more information."
497 % (self.machine.name,
502 # Usual case : Launch the command on remote machine
503 self._T0 = time.time()
504 self._stdin, self._stdout, self._stderr = self.machine.exec_command(
505 self.command, self.logger)
506 # If the results are not initialized, finish the job
507 if (self._stdin, self._stdout, self._stderr) == (None, None, None):
508 self._has_finished = True
509 self._Tf = time.time()
511 self.err = "The server failed to execute the command"
513 # Put the beginning flag to true.
514 self._has_begun = True
516 def write_results(self):
517 """Display on the terminal all the job's information
519 self.logger.write("name : " + self.name + "\n")
521 self.logger.write("after : %s\n" % self.after)
522 self.logger.write("Time elapsed : %4imin %2is \n" %
523 (self.total_duration()//60 , self.total_duration()%60))
525 self.logger.write("Begin time : %s\n" %
526 time.strftime('%Y-%m-%d %H:%M:%S',
527 time.localtime(self._T0)) )
529 self.logger.write("End time : %s\n\n" %
530 time.strftime('%Y-%m-%d %H:%M:%S',
531 time.localtime(self._Tf)) )
533 machine_head = "Informations about connection :\n"
534 underline = (len(machine_head) - 2) * "-"
535 self.logger.write(src.printcolors.printcInfo(
536 machine_head+underline+"\n"))
537 self.machine.write_info(self.logger)
539 self.logger.write(src.printcolors.printcInfo("out : \n"))
541 self.logger.write("Unable to get output\n")
543 self.logger.write(self.out + "\n")
544 self.logger.write(src.printcolors.printcInfo("err : \n"))
546 self.logger.write("Unable to get error\n")
548 self.logger.write(self.err + "\n")
550 def get_status(self):
551 """Get the status of the job (used by the Gui for xml display)
553 :return: The current status of the job
556 if not self.machine.successfully_connected(self.logger):
557 return "SSH connection KO"
558 if not self.has_begun():
559 return "Not launched"
562 if self.is_running():
563 return "running since " + time.strftime('%Y-%m-%d %H:%M:%S',
564 time.localtime(self._T0))
565 if self.has_finished():
566 if self.is_timeout():
567 return "Timeout since " + time.strftime('%Y-%m-%d %H:%M:%S',
568 time.localtime(self._Tf))
569 return "Finished since " + time.strftime('%Y-%m-%d %H:%M:%S',
570 time.localtime(self._Tf))
573 '''Class to manage the jobs to be run
581 lenght_columns = 20):
582 # The jobs configuration
583 self.cfg_jobs = config_jobs
584 self.job_file = job_file
585 self.job_file_path = job_file_path
586 # The machine that will be used today
588 # The list of machine (hosts, port) that will be used today
589 # (a same host can have several machine instances since there
590 # can be several ssh parameters)
592 # The jobs to be launched today
594 # The jobs that will not be launched today
595 self.ljobs_not_today = []
598 self.len_columns = lenght_columns
600 # the list of jobs that have not been run yet
601 self._l_jobs_not_started = []
602 # the list of jobs that have already ran
603 self._l_jobs_finished = []
604 # the list of jobs that are running
605 self._l_jobs_running = []
607 self.determine_jobs_and_machines()
609 def define_job(self, job_def, machine):
610 '''Takes a pyconf job definition and a machine (from class machine)
611 and returns the job instance corresponding to the definition.
613 :param job_def src.config.Mapping: a job definition
614 :param machine machine: the machine on which the job will run
615 :return: The corresponding job in a job class instance
619 cmmnds = job_def.commands
620 timeout = job_def.timeout
622 if 'after' in job_def:
623 after = job_def.after
625 if 'application' in job_def:
626 application = job_def.application
628 if 'table' in job_def:
629 table = job_def.table
642 def determine_jobs_and_machines(self):
643 '''Function that reads the pyconf jobs definition and instantiates all
644 the machines and jobs to be done today.
649 today = datetime.date.weekday(datetime.date.today())
652 for job_def in self.cfg_jobs.jobs :
654 if not "machine" in job_def:
655 msg = _('WARNING: The job "%s" do not have the key '
656 '"machine", this job is ignored.\n\n' % job_def.name)
657 self.logger.write(src.printcolors.printcWarning(msg))
659 name_machine = job_def.machine
662 for mach in self.lmachines:
663 if mach.name == name_machine:
667 if a_machine == None:
668 for machine_def in self.cfg_jobs.machines:
669 if machine_def.name == name_machine:
670 if 'host' not in machine_def:
671 host = self.runner.cfg.VARS.hostname
673 host = machine_def.host
675 if 'user' not in machine_def:
676 user = self.runner.cfg.VARS.user
678 user = machine_def.user
680 if 'port' not in machine_def:
683 port = machine_def.port
685 if 'password' not in machine_def:
688 passwd = machine_def.password
690 if 'sat_path' not in machine_def:
691 sat_path = "salomeTools"
693 sat_path = machine_def.sat_path
704 self.lmachines.append(a_machine)
705 if (host, port) not in host_list:
706 host_list.append((host, port))
708 if a_machine == None:
709 msg = _("WARNING: The job \"%(job_name)s\" requires the "
710 "machine \"%(machine_name)s\" but this machine "
711 "is not defined in the configuration file.\n"
712 "The job will not be launched")
713 self.logger.write(src.printcolors.printcWarning(msg))
715 a_job = self.define_job(job_def, a_machine)
717 if today in job_def.when:
718 self.ljobs.append(a_job)
719 else: # today in job_def.when
720 self.ljobs_not_today.append(a_job)
722 self.lhosts = host_list
724 def ssh_connection_all_machines(self, pad=50):
725 '''Function that do the ssh connection to every machine
731 self.logger.write(src.printcolors.printcInfo((
732 "Establishing connection with all the machines :\n")))
733 for machine in self.lmachines:
734 # little algorithm in order to display traces
735 begin_line = (_("Connection to %s: " % machine.name))
736 if pad - len(begin_line) < 0:
739 endline = (pad - len(begin_line)) * "." + " "
741 step = "SSH connection"
742 self.logger.write( begin_line + endline + step)
744 # the call to the method that initiate the ssh connection
745 msg = machine.connect(self.logger)
747 # Copy salomeTools to the remote machine
748 if machine.successfully_connected(self.logger):
750 self.logger.write('\r%s%s%s' % (begin_line, endline, 20 * " "),3)
751 self.logger.write('\r%s%s%s' % (begin_line, endline, step), 3)
753 res_copy = machine.copy_sat(self.runner.cfg.VARS.salometoolsway,
755 # get the remote machine distribution using a sat command
756 (__, out_dist, __) = machine.exec_command(
757 os.path.join(machine.sat_path,
758 "sat config --value VARS.dist --no_label"),
760 machine.distribution = out_dist.read().decode().replace("\n",
762 # Print the status of the copy
764 self.logger.write('\r%s' %
765 ((len(begin_line)+len(endline)+20) * " "), 3)
766 self.logger.write('\r%s%s%s' %
769 src.printcolors.printc(src.OK_STATUS)), 3)
771 self.logger.write('\r%s' %
772 ((len(begin_line)+len(endline)+20) * " "), 3)
773 self.logger.write('\r%s%s%s %s' %
776 src.printcolors.printc(src.OK_STATUS),
777 _("Copy of SAT failed")), 3)
779 self.logger.write('\r%s' %
780 ((len(begin_line)+len(endline)+20) * " "), 3)
781 self.logger.write('\r%s%s%s %s' %
784 src.printcolors.printc(src.KO_STATUS),
786 self.logger.write("\n", 3)
788 self.logger.write("\n")
791 def is_occupied(self, hostname):
792 '''Function that returns True if a job is running on
793 the machine defined by its host and its port.
795 :param hostname (str, int): the pair (host, port)
796 :return: the job that is running on the host,
797 or false if there is no job running on the host.
802 for jb in self.ljobs:
803 if jb.machine.host == host and jb.machine.port == port:
808 def update_jobs_states_list(self):
809 '''Function that updates the lists that store the currently
810 running jobs and the jobs that have already finished.
815 jobs_finished_list = []
816 jobs_running_list = []
817 for jb in self.ljobs:
819 jobs_running_list.append(jb)
821 if jb.has_finished():
822 jobs_finished_list.append(jb)
824 nb_job_finished_before = len(self._l_jobs_finished)
825 self._l_jobs_finished = jobs_finished_list
826 self._l_jobs_running = jobs_running_list
828 nb_job_finished_now = len(self._l_jobs_finished)
830 return nb_job_finished_now > nb_job_finished_before
832 def cancel_dependencies_of_failing_jobs(self):
833 '''Function that cancels all the jobs that depend on a failing one.
839 for job in self.ljobs:
840 if job.after is None:
842 father_job = self.find_job_that_has_name(job.after)
843 if father_job.has_failed():
846 def find_job_that_has_name(self, name):
847 '''Returns the job by its name.
849 :param name str: a job name
850 :return: the job that has the name.
853 for jb in self.ljobs:
856 # the following is executed only if the job was not found
857 msg = _('The job "%s" seems to be nonexistent') % name
858 raise src.SatException(msg)
860 def str_of_length(self, text, length):
861 '''Takes a string text of any length and returns
862 the most close string of length "length".
864 :param text str: any string
865 :param length int: a length for the returned string
866 :return: the most close string of length "length"
869 if len(text) > length:
870 text_out = text[:length-3] + '...'
872 diff = length - len(text)
873 before = " " * (diff//2)
874 after = " " * (diff//2 + diff%2)
875 text_out = before + text + after
879 def display_status(self, len_col):
880 '''Takes a lenght and construct the display of the current status
881 of the jobs in an array that has a column for each host.
882 It displays the job that is currently running on the host
885 :param len_col int: the size of the column
891 for host_port in self.lhosts:
892 jb = self.is_occupied(host_port)
893 if not jb: # nothing running on the host
894 empty = self.str_of_length("empty", len_col)
895 display_line += "|" + empty
897 display_line += "|" + src.printcolors.printcInfo(
898 self.str_of_length(jb.name, len_col))
900 self.logger.write("\r" + display_line + "|")
905 '''The main method. Runs all the jobs on every host.
906 For each host, at a given time, only one job can be running.
907 The jobs that have the field after (that contain the job that has
908 to be run before it) are run after the previous job.
909 This method stops when all the jobs are finished.
916 self.logger.write(src.printcolors.printcInfo(
917 _('Executing the jobs :\n')))
919 for host_port in self.lhosts:
922 if port == 22: # default value
923 text_line += "|" + self.str_of_length(host, self.len_columns)
925 text_line += "|" + self.str_of_length(
926 "("+host+", "+str(port)+")", self.len_columns)
928 tiret_line = " " + "-"*(len(text_line)-1) + "\n"
929 self.logger.write(tiret_line)
930 self.logger.write(text_line + "|\n")
931 self.logger.write(tiret_line)
934 # The infinite loop that runs the jobs
935 l_jobs_not_started = src.deepcopy_list(self.ljobs)
936 while len(self._l_jobs_finished) != len(self.ljobs):
937 new_job_start = False
938 for host_port in self.lhosts:
940 if self.is_occupied(host_port):
943 for jb in l_jobs_not_started:
944 if (jb.machine.host, jb.machine.port) != host_port:
948 l_jobs_not_started.remove(jb)
952 jb_before = self.find_job_that_has_name(jb.after)
953 if jb_before.has_finished():
955 l_jobs_not_started.remove(jb)
958 self.cancel_dependencies_of_failing_jobs()
959 new_job_finished = self.update_jobs_states_list()
961 if new_job_start or new_job_finished:
962 self.gui.update_xml_files(self.ljobs)
963 # Display the current status
964 self.display_status(self.len_columns)
966 # Make sure that the proc is not entirely busy
969 self.logger.write("\n")
970 self.logger.write(tiret_line)
971 self.logger.write("\n\n")
973 self.gui.update_xml_files(self.ljobs)
974 self.gui.last_update()
976 def write_all_results(self):
977 '''Display all the jobs outputs.
983 for jb in self.ljobs:
984 self.logger.write(src.printcolors.printcLabel(
985 "#------- Results for job %s -------#\n" % jb.name))
987 self.logger.write("\n\n")
990 '''Class to manage the the xml data that can be displayed in a browser to
994 def __init__(self, xml_dir_path, l_jobs, l_jobs_not_today):
997 :param xml_dir_path str: The path to the directory where to put
998 the xml resulting files
999 :param l_jobs List: the list of jobs that run today
1000 :param l_jobs_not_today List: the list of jobs that do not run today
1002 # The path of the global xml file
1003 self.xml_dir_path = xml_dir_path
1004 # Initialize the xml files
1005 xml_global_path = os.path.join(self.xml_dir_path, "global_report.xml")
1006 self.xml_global_file = src.xmlManager.XmlLogFile(xml_global_path,
1008 # The xml files that corresponds to the tables.
1009 # {name_table : xml_object}}
1010 self.d_xml_table_files = {}
1011 # Create the lines and columns
1012 self.initialize_arrays(l_jobs, l_jobs_not_today)
1013 # Write the xml file
1014 self.update_xml_files(l_jobs)
1016 def initialize_arrays(self, l_jobs, l_jobs_not_today):
1017 '''Get all the first information needed for each file and write the
1018 first version of the files
1019 :param l_jobs List: the list of jobs that run today
1020 :param l_jobs_not_today List: the list of jobs that do not run today
1022 # Get the tables to fill and put it in a dictionary
1023 # {table_name : xml instance corresponding to the table}
1024 for job in l_jobs + l_jobs_not_today:
1026 if (table is not None and
1027 table not in self.d_xml_table_files.keys()):
1028 xml_table_path = os.path.join(self.xml_dir_path, table + ".xml")
1029 self.d_xml_table_files[table] = src.xmlManager.XmlLogFile(
1032 self.d_xml_table_files[table].add_simple_node("distributions")
1033 self.d_xml_table_files[table].add_simple_node("applications")
1034 self.d_xml_table_files[table].add_simple_node("table", text=table)
1036 # Loop over all jobs in order to get the lines and columns for each
1040 for table in self.d_xml_table_files:
1042 d_application[table] = []
1046 for job in l_jobs + l_jobs_not_today:
1048 if (job.machine.host, job.machine.port) not in l_hosts_ports:
1049 l_hosts_ports.append((job.machine.host, job.machine.port))
1051 distrib = job.machine.distribution
1052 application = job.application
1054 table_job = job.table
1057 for table in self.d_xml_table_files:
1058 if table_job == table:
1059 if distrib is not None and distrib not in d_dist[table]:
1060 d_dist[table].append(distrib)
1061 src.xmlManager.add_simple_node(
1062 self.d_xml_table_files[table].xmlroot.find('distributions'),
1064 attrib={"name" : distrib})
1066 if table_job == table:
1067 if (application is not None and
1068 application not in d_application[table]):
1069 d_application[table].append(application)
1070 src.xmlManager.add_simple_node(
1071 self.d_xml_table_files[table].xmlroot.find('applications'),
1073 attrib={"name" : application})
1075 # Initialize the hosts_ports node for the global file
1076 self.xmlhosts_ports = self.xml_global_file.add_simple_node("hosts_ports")
1077 for host, port in l_hosts_ports:
1078 host_port = "%s:%i" % (host, port)
1079 src.xmlManager.add_simple_node(self.xmlhosts_ports,
1081 attrib={"name" : host_port})
1083 # Initialize the jobs node in all files
1084 for xml_file in [self.xml_global_file] + list(self.d_xml_table_files.values()):
1085 xml_jobs = xml_file.add_simple_node("jobs")
1086 # Get the jobs present in the config file but
1087 # that will not be launched today
1088 self.put_jobs_not_today(l_jobs_not_today, xml_jobs)
1090 xml_file.add_simple_node("infos",
1091 attrib={"name" : "last update",
1092 "JobsCommandStatus" : "running"})
1095 def put_jobs_not_today(self, l_jobs_not_today, xml_node_jobs):
1096 '''Get all the first information needed for each file and write the
1097 first version of the files
1099 :param xml_node_jobs etree.Element: the node corresponding to a job
1100 :param l_jobs_not_today List: the list of jobs that do not run today
1102 for job in l_jobs_not_today:
1103 xmlj = src.xmlManager.add_simple_node(xml_node_jobs,
1105 attrib={"name" : job.name})
1106 src.xmlManager.add_simple_node(xmlj, "application", job.application)
1107 src.xmlManager.add_simple_node(xmlj,
1109 job.machine.distribution)
1110 src.xmlManager.add_simple_node(xmlj, "table", job.table)
1111 src.xmlManager.add_simple_node(xmlj,
1112 "commands", " ; ".join(job.commands))
1113 src.xmlManager.add_simple_node(xmlj, "state", "Not today")
1114 src.xmlManager.add_simple_node(xmlj, "machine", job.machine.name)
1115 src.xmlManager.add_simple_node(xmlj, "host", job.machine.host)
1116 src.xmlManager.add_simple_node(xmlj, "port", str(job.machine.port))
1117 src.xmlManager.add_simple_node(xmlj, "user", job.machine.user)
1118 src.xmlManager.add_simple_node(xmlj, "sat_path",
1119 job.machine.sat_path)
1121 def update_xml_files(self, l_jobs):
1122 '''Write all the xml files with updated information about the jobs
1124 :param l_jobs List: the list of jobs that run today
1126 for xml_file in [self.xml_global_file] + list(self.d_xml_table_files.values()):
1127 self.update_xml_file(l_jobs, xml_file)
1130 self.write_xml_files()
1132 def update_xml_file(self, l_jobs, xml_file):
1133 '''update information about the jobs for the file xml_file
1135 :param l_jobs List: the list of jobs that run today
1136 :param xml_file xmlManager.XmlLogFile: the xml instance to update
1139 xml_node_jobs = xml_file.xmlroot.find('jobs')
1140 # Update the job names and status node
1142 # Find the node corresponding to the job and delete it
1143 # in order to recreate it
1144 for xmljob in xml_node_jobs.findall('job'):
1145 if xmljob.attrib['name'] == job.name:
1146 xml_node_jobs.remove(xmljob)
1150 T0 = time.strftime('%Y-%m-%d %H:%M:%S',
1151 time.localtime(job._T0))
1154 Tf = time.strftime('%Y-%m-%d %H:%M:%S',
1155 time.localtime(job._Tf))
1157 # recreate the job node
1158 xmlj = src.xmlManager.add_simple_node(xml_node_jobs,
1160 attrib={"name" : job.name})
1161 src.xmlManager.add_simple_node(xmlj, "machine", job.machine.name)
1162 src.xmlManager.add_simple_node(xmlj, "host", job.machine.host)
1163 src.xmlManager.add_simple_node(xmlj, "port", str(job.machine.port))
1164 src.xmlManager.add_simple_node(xmlj, "user", job.machine.user)
1165 src.xmlManager.add_simple_node(xmlj, "sat_path",
1166 job.machine.sat_path)
1167 src.xmlManager.add_simple_node(xmlj, "application", job.application)
1168 src.xmlManager.add_simple_node(xmlj, "distribution",
1169 job.machine.distribution)
1170 src.xmlManager.add_simple_node(xmlj, "table", job.table)
1171 src.xmlManager.add_simple_node(xmlj, "timeout", str(job.timeout))
1172 src.xmlManager.add_simple_node(xmlj, "commands",
1173 " ; ".join(job.commands))
1174 src.xmlManager.add_simple_node(xmlj, "state", job.get_status())
1175 src.xmlManager.add_simple_node(xmlj, "begin", T0)
1176 src.xmlManager.add_simple_node(xmlj, "end", Tf)
1177 src.xmlManager.add_simple_node(xmlj, "out",
1178 src.printcolors.cleancolor(job.out))
1179 src.xmlManager.add_simple_node(xmlj, "err",
1180 src.printcolors.cleancolor(job.err))
1181 src.xmlManager.add_simple_node(xmlj, "res", str(job.res_job))
1182 if len(job.remote_log_files) > 0:
1183 src.xmlManager.add_simple_node(xmlj,
1184 "remote_log_file_path",
1185 job.remote_log_files[0])
1187 src.xmlManager.add_simple_node(xmlj,
1188 "remote_log_file_path",
1191 xmlafter = src.xmlManager.add_simple_node(xmlj, "after", job.after)
1192 # get the job father
1193 if job.after is not None:
1196 if jb.name == job.after:
1198 if job_father is None:
1199 msg = _("The job %(father_name)s that is parent of "
1200 "%(son_name)s is not in the job list." %
1201 {"father_name" : job.after , "son_name" : job.name})
1202 raise src.SatException(msg)
1204 if len(job_father.remote_log_files) > 0:
1205 link = job_father.remote_log_files[0]
1208 src.xmlManager.append_node_attrib(xmlafter, {"link" : link})
1212 xml_node_infos = xml_file.xmlroot.find('infos')
1213 src.xmlManager.append_node_attrib(xml_node_infos,
1215 datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")})
1219 def last_update(self, finish_status = "finished"):
1220 '''update information about the jobs for the file xml_file
1222 :param l_jobs List: the list of jobs that run today
1223 :param xml_file xmlManager.XmlLogFile: the xml instance to update
1225 for xml_file in [self.xml_global_file] + list(self.d_xml_table_files.values()):
1226 xml_node_infos = xml_file.xmlroot.find('infos')
1227 src.xmlManager.append_node_attrib(xml_node_infos,
1228 attrib={"JobsCommandStatus" : finish_status})
1230 self.write_xml_files()
1232 def write_xml_files(self):
1233 ''' Write the xml files
1235 self.xml_global_file.write_tree(STYLESHEET_GLOBAL)
1236 for xml_file in self.d_xml_table_files.values():
1237 xml_file.write_tree(STYLESHEET_TABLE)
1240 # Describes the command
1242 return _("The jobs command launches maintenances that are described"
1243 " in the dedicated jobs configuration file.")
1247 def run(args, runner, logger):
1249 (options, args) = parser.parse_args(args)
1251 jobs_cfg_files_dir = runner.cfg.SITE.jobs.config_path
1253 l_cfg_dir = [jobs_cfg_files_dir,
1254 os.path.join(runner.cfg.VARS.datadir, "jobs")]
1256 # Make sure the path to the jobs config files directory exists
1257 src.ensure_path_exists(jobs_cfg_files_dir)
1259 # list option : display all the available config files
1261 for cfg_dir in l_cfg_dir:
1262 if not options.no_label:
1263 logger.write("------ %s\n" %
1264 src.printcolors.printcHeader(cfg_dir))
1266 for f in sorted(os.listdir(cfg_dir)):
1267 if not f.endswith('.pyconf'):
1270 logger.write("%s\n" % cfilename)
1273 # Make sure the jobs_config option has been called
1274 if not options.jobs_cfg:
1275 message = _("The option --jobs_config is required\n")
1276 src.printcolors.printcError(message)
1279 # Find the file in the directories
1281 for cfg_dir in l_cfg_dir:
1282 file_jobs_cfg = os.path.join(cfg_dir, options.jobs_cfg)
1283 if not file_jobs_cfg.endswith('.pyconf'):
1284 file_jobs_cfg += '.pyconf'
1286 if not os.path.exists(file_jobs_cfg):
1293 msg = _("The file configuration %(name_file)s was not found."
1294 "\nUse the --list option to get the possible files.")
1295 src.printcolors.printcError(msg)
1299 (_("Platform"), runner.cfg.VARS.dist),
1300 (_("File containing the jobs configuration"), file_jobs_cfg)
1302 src.print_info(logger, info)
1304 # Read the config that is in the file
1305 config_jobs = src.read_config_from_a_file(file_jobs_cfg)
1306 if options.only_jobs:
1307 l_jb = src.pyconf.Sequence()
1308 for jb in config_jobs.jobs:
1309 if jb.name in options.only_jobs:
1311 "Adding a job that was given in only_jobs option parameters")
1312 config_jobs.jobs = l_jb
1315 today_jobs = Jobs(runner,
1320 # SSH connection to all machines
1321 today_jobs.ssh_connection_all_machines()
1322 if options.test_connection:
1327 gui = Gui(runner.cfg.SITE.log.log_dir,
1329 today_jobs.ljobs_not_today,)
1331 today_jobs.gui = gui
1335 # Run all the jobs contained in config_jobs
1336 today_jobs.run_jobs()
1337 except KeyboardInterrupt:
1339 logger.write("\n\n%s\n\n" %
1340 (src.printcolors.printcWarning(_("Forced interruption"))), 1)
1343 # find the potential not finished jobs and kill them
1344 for jb in today_jobs.ljobs:
1345 if not jb.has_finished():
1346 jb.kill_remote_process()
1348 today_jobs.gui.last_update(_("Forced interruption"))
1350 today_jobs.gui.last_update()
1351 # Output the results
1352 today_jobs.write_all_results()