3 # Copyright (C) 2010-2013 CEA/DEN
5 # This library is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation; either
8 # version 2.1 of the License.
10 # This library is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 # Lesser General Public License for more details.
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this library; if not, write to the Free Software
17 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
30 STYLESHEET_GLOBAL = "jobs_global_report.xsl"
31 STYLESHEET_BOARD = "jobs_board_report.xsl"
36 parser = src.options.Options()
38 parser.add_option('n', 'name', 'string', 'jobs_cfg',
39 _('Mandatory: The name of the config file that contains'
40 ' the jobs configuration'))
41 parser.add_option('o', 'only_jobs', 'list2', 'only_jobs',
42 _('Optional: the list of jobs to launch, by their name. '))
43 parser.add_option('l', 'list', 'boolean', 'list',
44 _('Optional: list all available config files.'))
45 parser.add_option('t', 'test_connection', 'boolean', 'test_connection',
46 _("Optional: try to connect to the machines. "
47 "Not executing the jobs."),
49 parser.add_option('p', 'publish', 'boolean', 'publish',
50 _("Optional: generate an xml file that can be read in a "
51 "browser to display the jobs status."),
53 parser.add_option('i', 'input_boards', 'string', 'input_boards', _("Optional: "
54 "the path to csv file that contain "
55 "the expected boards."),"")
56 parser.add_option('', 'completion', 'boolean', 'no_label',
57 _("Optional (internal use): do not print labels, Works only "
61 class Machine(object):
62 '''Class to manage a ssh connection on a machine
70 sat_path="salomeTools"):
74 self.distribution = None # Will be filled after copying SAT on the machine
76 self.password = passwd
77 self.sat_path = sat_path
78 self.ssh = paramiko.SSHClient()
79 self._connection_successful = None
81 def connect(self, logger):
82 '''Initiate the ssh connection to the remote machine
84 :param logger src.logger.Logger: The logger instance
89 self._connection_successful = False
90 self.ssh.load_system_host_keys()
91 self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
93 self.ssh.connect(self.host,
96 password = self.password)
97 except paramiko.AuthenticationException:
98 message = src.KO_STATUS + _("Authentication failed")
99 except paramiko.BadHostKeyException:
100 message = (src.KO_STATUS +
101 _("The server's host key could not be verified"))
102 except paramiko.SSHException:
103 message = ( _("SSHException error connecting or "
104 "establishing an SSH session"))
106 message = ( _("Error connecting or establishing an SSH session"))
108 self._connection_successful = True
112 def successfully_connected(self, logger):
113 '''Verify if the connection to the remote machine has succeed
115 :param logger src.logger.Logger: The logger instance
116 :return: True if the connection has succeed, False if not
119 if self._connection_successful == None:
120 message = _("Warning : trying to ask if the connection to "
121 "(name: %s host: %s, port: %s, user: %s) is OK whereas there were"
122 " no connection request" %
123 (self.name, self.host, self.port, self.user))
124 logger.write( src.printcolors.printcWarning(message))
125 return self._connection_successful
127 def copy_sat(self, sat_local_path, job_file):
128 '''Copy salomeTools to the remote machine in self.sat_path
132 # open a sftp connection
133 self.sftp = self.ssh.open_sftp()
134 # Create the sat directory on remote machine if it is not existing
135 self.mkdir(self.sat_path, ignore_existing=True)
137 self.put_dir(sat_local_path, self.sat_path, filters = ['.git'])
138 # put the job configuration file in order to make it reachable
139 # on the remote machine
140 self.sftp.put(job_file, os.path.join(self.sat_path,
141 ".jobs_command_file.pyconf"))
142 except Exception as e:
144 self._connection_successful = False
148 def put_dir(self, source, target, filters = []):
149 ''' Uploads the contents of the source directory to the target path. The
150 target directory needs to exists. All sub-directories in source are
151 created under target.
153 for item in os.listdir(source):
156 source_path = os.path.join(source, item)
157 destination_path = os.path.join(target, item)
158 if os.path.islink(source_path):
159 linkto = os.readlink(source_path)
161 self.sftp.symlink(linkto, destination_path)
162 self.sftp.chmod(destination_path,
163 os.stat(source_path).st_mode)
167 if os.path.isfile(source_path):
168 self.sftp.put(source_path, destination_path)
169 self.sftp.chmod(destination_path,
170 os.stat(source_path).st_mode)
172 self.mkdir(destination_path, ignore_existing=True)
173 self.put_dir(source_path, destination_path)
175 def mkdir(self, path, mode=511, ignore_existing=False):
176 ''' Augments mkdir by adding an option to not fail
180 self.sftp.mkdir(path, mode)
187 def exec_command(self, command, logger):
188 '''Execute the command on the remote machine
190 :param command str: The command to be run
191 :param logger src.logger.Logger: The logger instance
192 :return: the stdin, stdout, and stderr of the executing command,
194 :rtype: (paramiko.channel.ChannelFile, paramiko.channel.ChannelFile,
195 paramiko.channel.ChannelFile)
198 # Does not wait the end of the command
199 (stdin, stdout, stderr) = self.ssh.exec_command(command)
200 except paramiko.SSHException:
201 message = src.KO_STATUS + _(
202 ": the server failed to execute the command\n")
203 logger.write( src.printcolors.printcError(message))
204 return (None, None, None)
206 logger.write( src.printcolors.printcError(src.KO_STATUS + '\n'))
207 return (None, None, None)
209 return (stdin, stdout, stderr)
212 '''Close the ssh connection
218 def write_info(self, logger):
219 '''Prints the informations relative to the machine in the logger
220 (terminal traces and log file)
222 :param logger src.logger.Logger: The logger instance
226 logger.write("host : " + self.host + "\n")
227 logger.write("port : " + str(self.port) + "\n")
228 logger.write("user : " + str(self.user) + "\n")
229 if self.successfully_connected(logger):
230 status = src.OK_STATUS
232 status = src.KO_STATUS
233 logger.write("Connection : " + status + "\n\n")
237 '''Class to manage one job
239 def __init__(self, name, machine, application, board,
240 commands, timeout, config, logger, after=None, prefix=None):
243 self.machine = machine
245 self.timeout = timeout
246 self.application = application
250 # The list of log files to download from the remote machine
251 self.remote_log_files = []
253 # The remote command status
254 # -1 means that it has not been launched,
255 # 0 means success and 1 means fail
257 self.cancelled = False
261 self._has_begun = False
262 self._has_finished = False
263 self._has_timouted = False
264 self._stdin = None # Store the command inputs field
265 self._stdout = None # Store the command outputs field
266 self._stderr = None # Store the command errors field
271 self.commands = commands
272 self.command = (os.path.join(self.machine.sat_path, "sat") +
274 os.path.join(self.machine.sat_path,
275 "list_log_files.txt") +
276 " job --jobs_config " +
277 os.path.join(self.machine.sat_path,
278 ".jobs_command_file.pyconf") +
282 self.command = prefix + ' "' + self.command +'"'
285 """ Get the pid(s) corresponding to the command that have been launched
286 On the remote machine
288 :return: The list of integers corresponding to the found pids
292 cmd_pid = 'ps aux | grep "' + self.command + '" | awk \'{print $2}\''
293 (_, out_pid, _) = self.machine.exec_command(cmd_pid, self.logger)
294 pids_cmd = out_pid.readlines()
295 pids_cmd = [str(src.only_numbers(pid)) for pid in pids_cmd]
299 def kill_remote_process(self, wait=1):
300 '''Kills the process on the remote machine.
302 :return: (the output of the kill, the error of the kill)
306 pids = self.get_pids()
307 cmd_kill = " ; ".join([("kill -2 " + pid) for pid in pids])
308 (_, out_kill, err_kill) = self.machine.exec_command(cmd_kill,
311 return (out_kill, err_kill)
314 '''Returns True if the job has already begun
316 :return: True if the job has already begun
319 return self._has_begun
321 def has_finished(self):
322 '''Returns True if the job has already finished
323 (i.e. all the commands have been executed)
324 If it is finished, the outputs are stored in the fields out and err.
326 :return: True if the job has already finished
330 # If the method has already been called and returned True
331 if self._has_finished:
334 # If the job has not begun yet
335 if not self.has_begun():
338 if self._stdout.channel.closed:
339 self._has_finished = True
340 # Store the result outputs
341 self.out += self._stdout.read().decode()
342 self.err += self._stderr.read().decode()
344 self._Tf = time.time()
345 # And get the remote command status and log files
348 return self._has_finished
350 def get_log_files(self):
351 """Get the log files produced by the command launched
352 on the remote machine, and put it in the log directory of the user,
353 so they can be accessible from
355 # Do not get the files if the command is not finished
356 if not self.has_finished():
357 msg = _("Trying to get log files whereas the job is not finished.")
358 self.logger.write(src.printcolors.printcWarning(msg))
361 # First get the file that contains the list of log files to get
362 tmp_file_path = src.get_tmp_filename(self.config, "list_log_files.txt")
363 remote_path = os.path.join(self.machine.sat_path, "list_log_files.txt")
364 self.machine.sftp.get(
368 # Read the file and get the result of the command and all the log files
370 fstream_tmp = open(tmp_file_path, "r")
371 file_lines = fstream_tmp.readlines()
372 file_lines = [line.replace("\n", "") for line in file_lines]
374 os.remove(tmp_file_path)
377 # The first line is the result of the command (0 success or 1 fail)
378 self.res_job = file_lines[0]
379 except Exception as e:
380 self.err += _("Unable to get status from remote file %s: %s" %
381 (remote_path, str(e)))
383 for i, job_path_remote in enumerate(file_lines[1:]):
385 # For each command, there is two files to get :
386 # 1- The xml file describing the command and giving the
388 # 2- The txt file containing the system command traces (like
389 # traces produced by the "make" command)
390 # 3- In case of the test command, there is another file to get :
391 # the xml board that contain the test results
392 dirname = os.path.basename(os.path.dirname(job_path_remote))
393 if dirname != 'OUT' and dirname != 'TEST':
395 local_path = os.path.join(os.path.dirname(
396 self.logger.logFilePath),
397 os.path.basename(job_path_remote))
398 if i==0: # The first is the job command
399 self.logger.add_link(os.path.basename(job_path_remote),
403 elif dirname == 'OUT':
405 local_path = os.path.join(os.path.dirname(
406 self.logger.logFilePath),
408 os.path.basename(job_path_remote))
409 elif dirname == 'TEST':
411 local_path = os.path.join(os.path.dirname(
412 self.logger.logFilePath),
414 os.path.basename(job_path_remote))
417 if not os.path.exists(local_path):
418 self.machine.sftp.get(job_path_remote, local_path)
419 self.remote_log_files.append(local_path)
420 except Exception as e:
421 self.err += _("Unable to get %s log file from remote: %s" %
422 (str(job_path_remote),
425 def has_failed(self):
426 '''Returns True if the job has failed.
427 A job is considered as failed if the machine could not be reached,
428 if the remote command failed,
429 or if the job finished with a time out.
431 :return: True if the job has failed
434 if not self.has_finished():
436 if not self.machine.successfully_connected(self.logger):
438 if self.is_timeout():
440 if self.res_job == "1":
445 """In case of a failing job, one has to cancel every job that depend
446 on it. This method put the job as failed and will not be executed.
450 self._has_begun = True
451 self._has_finished = True
452 self.cancelled = True
453 self.out += _("This job was not launched because its father has failed.")
454 self.err += _("This job was not launched because its father has failed.")
456 def is_running(self):
457 '''Returns True if the job commands are running
459 :return: True if the job is running
462 return self.has_begun() and not self.has_finished()
464 def is_timeout(self):
465 '''Returns True if the job commands has finished with timeout
467 :return: True if the job has finished with timeout
470 return self._has_timouted
472 def time_elapsed(self):
473 """Get the time elapsed since the job launching
475 :return: The number of seconds
478 if not self.has_begun():
481 return T_now - self._T0
483 def check_time(self):
484 """Verify that the job has not exceeded its timeout.
485 If it has, kill the remote command and consider the job as finished.
487 if not self.has_begun():
489 if self.time_elapsed() > self.timeout:
490 self._has_finished = True
491 self._has_timouted = True
492 self._Tf = time.time()
494 (out_kill, _) = self.kill_remote_process()
495 self.out += "TIMEOUT \n" + out_kill.read().decode()
496 self.err += "TIMEOUT : %s seconds elapsed\n" % str(self.timeout)
499 except Exception as e:
500 self.err += _("Unable to get remote log files: %s" % e)
502 def total_duration(self):
503 """Give the total duration of the job
505 :return: the total duration of the job in seconds
508 return self._Tf - self._T0
511 """Launch the job by executing the remote command.
514 # Prevent multiple run
516 msg = _("Warning: A job can only be launched one time")
517 msg2 = _("Trying to launch the job \"%s\" whereas it has "
518 "already been launched." % self.name)
519 self.logger.write(src.printcolors.printcWarning("%s\n%s\n" % (msg,
523 # Do not execute the command if the machine could not be reached
524 if not self.machine.successfully_connected(self.logger):
525 self._has_finished = True
527 self.err += ("Connection to machine (name : %s, host: %s, port:"
528 " %s, user: %s) has failed\nUse the log command "
529 "to get more information."
530 % (self.machine.name,
535 # Usual case : Launch the command on remote machine
536 self._T0 = time.time()
537 self._stdin, self._stdout, self._stderr = self.machine.exec_command(
540 # If the results are not initialized, finish the job
541 if (self._stdin, self._stdout, self._stderr) == (None, None, None):
542 self._has_finished = True
543 self._Tf = time.time()
545 self.err += "The server failed to execute the command"
547 # Put the beginning flag to true.
548 self._has_begun = True
550 def write_results(self):
551 """Display on the terminal all the job's information
553 self.logger.write("name : " + self.name + "\n")
555 self.logger.write("after : %s\n" % self.after)
556 self.logger.write("Time elapsed : %4imin %2is \n" %
557 (self.total_duration()//60 , self.total_duration()%60))
559 self.logger.write("Begin time : %s\n" %
560 time.strftime('%Y-%m-%d %H:%M:%S',
561 time.localtime(self._T0)) )
563 self.logger.write("End time : %s\n\n" %
564 time.strftime('%Y-%m-%d %H:%M:%S',
565 time.localtime(self._Tf)) )
567 machine_head = "Informations about connection :\n"
568 underline = (len(machine_head) - 2) * "-"
569 self.logger.write(src.printcolors.printcInfo(
570 machine_head+underline+"\n"))
571 self.machine.write_info(self.logger)
573 self.logger.write(src.printcolors.printcInfo("out : \n"))
575 self.logger.write("Unable to get output\n")
577 self.logger.write(self.out + "\n")
578 self.logger.write(src.printcolors.printcInfo("err : \n"))
579 self.logger.write(self.err + "\n")
581 def get_status(self):
582 """Get the status of the job (used by the Gui for xml display)
584 :return: The current status of the job
587 if not self.machine.successfully_connected(self.logger):
588 return "SSH connection KO"
589 if not self.has_begun():
590 return "Not launched"
593 if self.is_running():
594 return "running since " + time.strftime('%Y-%m-%d %H:%M:%S',
595 time.localtime(self._T0))
596 if self.has_finished():
597 if self.is_timeout():
598 return "Timeout since " + time.strftime('%Y-%m-%d %H:%M:%S',
599 time.localtime(self._Tf))
600 return "Finished since " + time.strftime('%Y-%m-%d %H:%M:%S',
601 time.localtime(self._Tf))
604 '''Class to manage the jobs to be run
611 lenght_columns = 20):
612 # The jobs configuration
613 self.cfg_jobs = config_jobs
614 self.job_file_path = job_file_path
615 # The machine that will be used today
617 # The list of machine (hosts, port) that will be used today
618 # (a same host can have several machine instances since there
619 # can be several ssh parameters)
621 # The jobs to be launched today
623 # The jobs that will not be launched today
624 self.ljobs_not_today = []
627 self.len_columns = lenght_columns
629 # the list of jobs that have not been run yet
630 self._l_jobs_not_started = []
631 # the list of jobs that have already ran
632 self._l_jobs_finished = []
633 # the list of jobs that are running
634 self._l_jobs_running = []
636 self.determine_jobs_and_machines()
638 def define_job(self, job_def, machine):
639 '''Takes a pyconf job definition and a machine (from class machine)
640 and returns the job instance corresponding to the definition.
642 :param job_def src.config.Mapping: a job definition
643 :param machine machine: the machine on which the job will run
644 :return: The corresponding job in a job class instance
648 cmmnds = job_def.commands
649 if not "timeout" in job_def:
650 timeout = 4*60*60 # default timeout = 4h
652 timeout = job_def.timeout
654 if 'after' in job_def:
655 after = job_def.after
657 if 'application' in job_def:
658 application = job_def.application
660 if 'board' in job_def:
661 board = job_def.board
663 if "prefix" in job_def:
664 prefix = job_def.prefix
677 def determine_jobs_and_machines(self):
678 '''Function that reads the pyconf jobs definition and instantiates all
679 the machines and jobs to be done today.
684 today = datetime.date.weekday(datetime.date.today())
687 for job_def in self.cfg_jobs.jobs :
689 if not "machine" in job_def:
690 msg = _('WARNING: The job "%s" do not have the key '
691 '"machine", this job is ignored.\n\n' % job_def.name)
692 self.logger.write(src.printcolors.printcWarning(msg))
694 name_machine = job_def.machine
697 for mach in self.lmachines:
698 if mach.name == name_machine:
702 if a_machine == None:
703 for machine_def in self.cfg_jobs.machines:
704 if machine_def.name == name_machine:
705 if 'host' not in machine_def:
706 host = self.runner.cfg.VARS.hostname
708 host = machine_def.host
710 if 'user' not in machine_def:
711 user = self.runner.cfg.VARS.user
713 user = machine_def.user
715 if 'port' not in machine_def:
718 port = machine_def.port
720 if 'password' not in machine_def:
723 passwd = machine_def.password
725 if 'sat_path' not in machine_def:
726 sat_path = "salomeTools"
728 sat_path = machine_def.sat_path
739 self.lmachines.append(a_machine)
740 if (host, port) not in host_list:
741 host_list.append((host, port))
743 if a_machine == None:
744 msg = _("WARNING: The job \"%(job_name)s\" requires the "
745 "machine \"%(machine_name)s\" but this machine "
746 "is not defined in the configuration file.\n"
747 "The job will not be launched")
748 self.logger.write(src.printcolors.printcWarning(msg))
750 a_job = self.define_job(job_def, a_machine)
752 if today in job_def.when:
753 self.ljobs.append(a_job)
754 else: # today in job_def.when
755 self.ljobs_not_today.append(a_job)
757 self.lhosts = host_list
759 def ssh_connection_all_machines(self, pad=50):
760 '''Function that do the ssh connection to every machine
766 self.logger.write(src.printcolors.printcInfo((
767 "Establishing connection with all the machines :\n")))
768 for machine in self.lmachines:
769 # little algorithm in order to display traces
770 begin_line = (_("Connection to %s: " % machine.name))
771 if pad - len(begin_line) < 0:
774 endline = (pad - len(begin_line)) * "." + " "
776 step = "SSH connection"
777 self.logger.write( begin_line + endline + step)
779 # the call to the method that initiate the ssh connection
780 msg = machine.connect(self.logger)
782 # Copy salomeTools to the remote machine
783 if machine.successfully_connected(self.logger):
784 step = _("Remove SAT")
785 self.logger.write('\r%s%s%s' % (begin_line, endline, 20 * " "),3)
786 self.logger.write('\r%s%s%s' % (begin_line, endline, step), 3)
787 (__, out_dist, __) = machine.exec_command(
788 "rm -rf %s" % machine.sat_path,
794 self.logger.write('\r%s%s%s' % (begin_line, endline, 20 * " "),3)
795 self.logger.write('\r%s%s%s' % (begin_line, endline, step), 3)
797 res_copy = machine.copy_sat(self.runner.cfg.VARS.salometoolsway,
799 # get the remote machine distribution using a sat command
800 (__, out_dist, __) = machine.exec_command(
801 os.path.join(machine.sat_path,
802 "sat config --value VARS.dist --no_label"),
804 machine.distribution = out_dist.read().decode().replace("\n",
806 # Print the status of the copy
808 self.logger.write('\r%s' %
809 ((len(begin_line)+len(endline)+20) * " "), 3)
810 self.logger.write('\r%s%s%s' %
813 src.printcolors.printc(src.OK_STATUS)), 3)
815 self.logger.write('\r%s' %
816 ((len(begin_line)+len(endline)+20) * " "), 3)
817 self.logger.write('\r%s%s%s %s' %
820 src.printcolors.printc(src.KO_STATUS),
821 _("Copy of SAT failed: %s" % res_copy)), 3)
823 self.logger.write('\r%s' %
824 ((len(begin_line)+len(endline)+20) * " "), 3)
825 self.logger.write('\r%s%s%s %s' %
828 src.printcolors.printc(src.KO_STATUS),
830 self.logger.write("\n", 3)
832 self.logger.write("\n")
835 def is_occupied(self, hostname):
836 '''Function that returns True if a job is running on
837 the machine defined by its host and its port.
839 :param hostname (str, int): the pair (host, port)
840 :return: the job that is running on the host,
841 or false if there is no job running on the host.
846 for jb in self.ljobs:
847 if jb.machine.host == host and jb.machine.port == port:
852 def update_jobs_states_list(self):
853 '''Function that updates the lists that store the currently
854 running jobs and the jobs that have already finished.
859 jobs_finished_list = []
860 jobs_running_list = []
861 for jb in self.ljobs:
863 jobs_running_list.append(jb)
865 if jb.has_finished():
866 jobs_finished_list.append(jb)
868 nb_job_finished_before = len(self._l_jobs_finished)
869 self._l_jobs_finished = jobs_finished_list
870 self._l_jobs_running = jobs_running_list
872 nb_job_finished_now = len(self._l_jobs_finished)
874 return nb_job_finished_now > nb_job_finished_before
876 def cancel_dependencies_of_failing_jobs(self):
877 '''Function that cancels all the jobs that depend on a failing one.
883 for job in self.ljobs:
884 if job.after is None:
886 father_job = self.find_job_that_has_name(job.after)
887 if father_job is not None and father_job.has_failed():
890 def find_job_that_has_name(self, name):
891 '''Returns the job by its name.
893 :param name str: a job name
894 :return: the job that has the name.
897 for jb in self.ljobs:
900 # the following is executed only if the job was not found
903 def str_of_length(self, text, length):
904 '''Takes a string text of any length and returns
905 the most close string of length "length".
907 :param text str: any string
908 :param length int: a length for the returned string
909 :return: the most close string of length "length"
912 if len(text) > length:
913 text_out = text[:length-3] + '...'
915 diff = length - len(text)
916 before = " " * (diff//2)
917 after = " " * (diff//2 + diff%2)
918 text_out = before + text + after
922 def display_status(self, len_col):
923 '''Takes a lenght and construct the display of the current status
924 of the jobs in an array that has a column for each host.
925 It displays the job that is currently running on the host
928 :param len_col int: the size of the column
934 for host_port in self.lhosts:
935 jb = self.is_occupied(host_port)
936 if not jb: # nothing running on the host
937 empty = self.str_of_length("empty", len_col)
938 display_line += "|" + empty
940 display_line += "|" + src.printcolors.printcInfo(
941 self.str_of_length(jb.name, len_col))
943 self.logger.write("\r" + display_line + "|")
948 '''The main method. Runs all the jobs on every host.
949 For each host, at a given time, only one job can be running.
950 The jobs that have the field after (that contain the job that has
951 to be run before it) are run after the previous job.
952 This method stops when all the jobs are finished.
959 self.logger.write(src.printcolors.printcInfo(
960 _('Executing the jobs :\n')))
962 for host_port in self.lhosts:
965 if port == 22: # default value
966 text_line += "|" + self.str_of_length(host, self.len_columns)
968 text_line += "|" + self.str_of_length(
969 "("+host+", "+str(port)+")", self.len_columns)
971 tiret_line = " " + "-"*(len(text_line)-1) + "\n"
972 self.logger.write(tiret_line)
973 self.logger.write(text_line + "|\n")
974 self.logger.write(tiret_line)
977 # The infinite loop that runs the jobs
978 l_jobs_not_started = src.deepcopy_list(self.ljobs)
979 while len(self._l_jobs_finished) != len(self.ljobs):
980 new_job_start = False
981 for host_port in self.lhosts:
983 if self.is_occupied(host_port):
986 for jb in l_jobs_not_started:
987 if (jb.machine.host, jb.machine.port) != host_port:
991 l_jobs_not_started.remove(jb)
995 jb_before = self.find_job_that_has_name(jb.after)
996 if jb_before is None:
998 msg = _("This job was not launched because its "
999 "father is not in the jobs list.")
1003 if jb_before.has_finished():
1005 l_jobs_not_started.remove(jb)
1006 new_job_start = True
1008 self.cancel_dependencies_of_failing_jobs()
1009 new_job_finished = self.update_jobs_states_list()
1011 if new_job_start or new_job_finished:
1013 self.gui.update_xml_files(self.ljobs)
1014 # Display the current status
1015 self.display_status(self.len_columns)
1017 # Make sure that the proc is not entirely busy
1020 self.logger.write("\n")
1021 self.logger.write(tiret_line)
1022 self.logger.write("\n\n")
1025 self.gui.update_xml_files(self.ljobs)
1026 self.gui.last_update()
1028 def write_all_results(self):
1029 '''Display all the jobs outputs.
1035 for jb in self.ljobs:
1036 self.logger.write(src.printcolors.printcLabel(
1037 "#------- Results for job %s -------#\n" % jb.name))
1039 self.logger.write("\n\n")
1042 '''Class to manage the the xml data that can be displayed in a browser to
1046 def __init__(self, xml_dir_path, l_jobs, l_jobs_not_today, prefix, file_boards=""):
1049 :param xml_dir_path str: The path to the directory where to put
1050 the xml resulting files
1051 :param l_jobs List: the list of jobs that run today
1052 :param l_jobs_not_today List: the list of jobs that do not run today
1053 :param file_boards str: the file path from which to read the
1056 # The prefix to add to the xml files : date_hour
1057 self.prefix = prefix
1059 # The path of the csv files to read to fill the expected boards
1060 self.file_boards = file_boards
1062 if file_boards != "":
1063 today = datetime.date.weekday(datetime.date.today())
1064 self.parse_csv_boards(today)
1066 self.d_input_boards = {}
1068 # The path of the global xml file
1069 self.xml_dir_path = xml_dir_path
1070 # Initialize the xml files
1071 self.global_name = "global_report"
1072 xml_global_path = os.path.join(self.xml_dir_path,
1073 self.global_name + ".xml")
1074 self.xml_global_file = src.xmlManager.XmlLogFile(xml_global_path,
1077 # Find history for each job
1079 self.find_history(l_jobs, l_jobs_not_today)
1081 # The xml files that corresponds to the boards.
1082 # {name_board : xml_object}}
1083 self.d_xml_board_files = {}
1085 # Create the lines and columns
1086 self.initialize_boards(l_jobs, l_jobs_not_today)
1088 # Write the xml file
1089 self.update_xml_files(l_jobs)
1091 def add_xml_board(self, name):
1092 '''Add a board to the board list
1093 :param name str: the board name
1095 xml_board_path = os.path.join(self.xml_dir_path, name + ".xml")
1096 self.d_xml_board_files[name] = src.xmlManager.XmlLogFile(
1099 self.d_xml_board_files[name].add_simple_node("distributions")
1100 self.d_xml_board_files[name].add_simple_node("applications")
1101 self.d_xml_board_files[name].add_simple_node("board", text=name)
1103 def initialize_boards(self, l_jobs, l_jobs_not_today):
1104 '''Get all the first information needed for each file and write the
1105 first version of the files
1106 :param l_jobs List: the list of jobs that run today
1107 :param l_jobs_not_today List: the list of jobs that do not run today
1109 # Get the boards to fill and put it in a dictionary
1110 # {board_name : xml instance corresponding to the board}
1111 for job in l_jobs + l_jobs_not_today:
1113 if (board is not None and
1114 board not in self.d_xml_board_files.keys()):
1115 self.add_xml_board(board)
1117 # Verify that the boards given as input are done
1118 for board in list(self.d_input_boards.keys()):
1119 if board not in self.d_xml_board_files:
1120 self.add_xml_board(board)
1121 root_node = self.d_xml_board_files[board].xmlroot
1122 src.xmlManager.append_node_attrib(root_node,
1123 {"input_file" : self.file_boards})
1125 # Loop over all jobs in order to get the lines and columns for each
1129 for board in self.d_xml_board_files:
1131 d_application[board] = []
1135 for job in l_jobs + l_jobs_not_today:
1137 if (job.machine.host, job.machine.port) not in l_hosts_ports:
1138 l_hosts_ports.append((job.machine.host, job.machine.port))
1140 distrib = job.machine.distribution
1141 application = job.application
1143 board_job = job.board
1146 for board in self.d_xml_board_files:
1147 if board_job == board:
1148 if distrib is not None and distrib not in d_dist[board]:
1149 d_dist[board].append(distrib)
1150 src.xmlManager.add_simple_node(
1151 self.d_xml_board_files[board].xmlroot.find(
1154 attrib={"name" : distrib})
1156 if board_job == board:
1157 if (application is not None and
1158 application not in d_application[board]):
1159 d_application[board].append(application)
1160 src.xmlManager.add_simple_node(
1161 self.d_xml_board_files[board].xmlroot.find(
1165 "name" : application})
1167 # Verify that there are no missing application or distribution in the
1168 # xml board files (regarding the input boards)
1169 for board in self.d_xml_board_files:
1170 l_dist = d_dist[board]
1171 if board not in self.d_input_boards.keys():
1173 for dist in self.d_input_boards[board]["rows"]:
1174 if dist not in l_dist:
1175 src.xmlManager.add_simple_node(
1176 self.d_xml_board_files[board].xmlroot.find(
1179 attrib={"name" : dist})
1180 l_appli = d_application[board]
1181 for appli in self.d_input_boards[board]["columns"]:
1182 if appli not in l_appli:
1183 src.xmlManager.add_simple_node(
1184 self.d_xml_board_files[board].xmlroot.find(
1187 attrib={"name" : appli})
1189 # Initialize the hosts_ports node for the global file
1190 self.xmlhosts_ports = self.xml_global_file.add_simple_node(
1192 for host, port in l_hosts_ports:
1193 host_port = "%s:%i" % (host, port)
1194 src.xmlManager.add_simple_node(self.xmlhosts_ports,
1196 attrib={"name" : host_port})
1198 # Initialize the jobs node in all files
1199 for xml_file in [self.xml_global_file] + list(
1200 self.d_xml_board_files.values()):
1201 xml_jobs = xml_file.add_simple_node("jobs")
1202 # Get the jobs present in the config file but
1203 # that will not be launched today
1204 self.put_jobs_not_today(l_jobs_not_today, xml_jobs)
1206 # add also the infos node
1207 xml_file.add_simple_node("infos",
1208 attrib={"name" : "last update",
1209 "JobsCommandStatus" : "running"})
1211 # and put the history node
1212 history_node = xml_file.add_simple_node("history")
1213 name_board = os.path.basename(xml_file.logFile)[:-len(".xml")]
1214 # serach for board files
1215 expression = "^[0-9]{8}_+[0-9]{6}_" + name_board + ".xml$"
1216 oExpr = re.compile(expression)
1217 # Get the list of xml borad files that are in the log directory
1218 for file_name in os.listdir(self.xml_dir_path):
1219 if oExpr.search(file_name):
1220 date = os.path.basename(file_name).split("_")[0]
1221 file_path = os.path.join(self.xml_dir_path, file_name)
1222 src.xmlManager.add_simple_node(history_node,
1225 attrib={"date" : date})
1228 # Find in each board the squares that needs to be filled regarding the
1229 # input csv files but that are not covered by a today job
1230 for board in self.d_input_boards.keys():
1231 xml_root_board = self.d_xml_board_files[board].xmlroot
1232 # Find the missing jobs for today
1233 xml_missing = src.xmlManager.add_simple_node(xml_root_board,
1235 for row, column in self.d_input_boards[board]["jobs"]:
1238 if (job.application == column and
1239 job.machine.distribution == row):
1243 src.xmlManager.add_simple_node(xml_missing,
1245 attrib={"distribution" : row,
1246 "application" : column })
1247 # Find the missing jobs not today
1248 xml_missing_not_today = src.xmlManager.add_simple_node(
1250 "missing_jobs_not_today")
1251 for row, column in self.d_input_boards[board]["jobs_not_today"]:
1253 for job in l_jobs_not_today:
1254 if (job.application == column and
1255 job.machine.distribution == row):
1259 src.xmlManager.add_simple_node(xml_missing_not_today,
1261 attrib={"distribution" : row,
1262 "application" : column })
1264 def find_history(self, l_jobs, l_jobs_not_today):
1265 """find, for each job, in the existent xml boards the results for the
1266 job. Store the results in the dictionnary self.history = {name_job :
1267 list of (date, status, list links)}
1269 :param l_jobs List: the list of jobs to run today
1270 :param l_jobs_not_today List: the list of jobs that do not run today
1272 # load the all the history
1273 expression = "^[0-9]{8}_+[0-9]{6}_" + self.global_name + ".xml$"
1274 oExpr = re.compile(expression)
1275 # Get the list of global xml that are in the log directory
1277 for file_name in os.listdir(self.xml_dir_path):
1278 if oExpr.search(file_name):
1279 file_path = os.path.join(self.xml_dir_path, file_name)
1280 global_xml = src.xmlManager.ReadXmlFile(file_path)
1281 l_globalxml.append(global_xml)
1283 # Construct the dictionnary self.history
1284 for job in l_jobs + l_jobs_not_today:
1286 for global_xml in l_globalxml:
1287 date = os.path.basename(global_xml.filePath).split("_")[0]
1288 global_root_node = global_xml.xmlroot.find("jobs")
1289 job_node = src.xmlManager.find_node_by_attrib(
1295 if job_node.find("remote_log_file_path") is not None:
1296 link = job_node.find("remote_log_file_path").text
1297 res_job = job_node.find("res").text
1298 if link != "nothing":
1299 l_links.append((date, res_job, link))
1300 l_links = sorted(l_links, reverse=True)
1301 self.history[job.name] = l_links
1303 def put_jobs_not_today(self, l_jobs_not_today, xml_node_jobs):
1304 '''Get all the first information needed for each file and write the
1305 first version of the files
1307 :param xml_node_jobs etree.Element: the node corresponding to a job
1308 :param l_jobs_not_today List: the list of jobs that do not run today
1310 for job in l_jobs_not_today:
1311 xmlj = src.xmlManager.add_simple_node(xml_node_jobs,
1313 attrib={"name" : job.name})
1314 src.xmlManager.add_simple_node(xmlj, "application", job.application)
1315 src.xmlManager.add_simple_node(xmlj,
1317 job.machine.distribution)
1318 src.xmlManager.add_simple_node(xmlj, "board", job.board)
1319 src.xmlManager.add_simple_node(xmlj,
1320 "commands", " ; ".join(job.commands))
1321 src.xmlManager.add_simple_node(xmlj, "state", "Not today")
1322 src.xmlManager.add_simple_node(xmlj, "machine", job.machine.name)
1323 src.xmlManager.add_simple_node(xmlj, "host", job.machine.host)
1324 src.xmlManager.add_simple_node(xmlj, "port", str(job.machine.port))
1325 src.xmlManager.add_simple_node(xmlj, "user", job.machine.user)
1326 src.xmlManager.add_simple_node(xmlj, "sat_path",
1327 job.machine.sat_path)
1328 xml_history = src.xmlManager.add_simple_node(xmlj, "history")
1329 for i, (date, res_job, link) in enumerate(self.history[job.name]):
1331 # tag the first one (the last one)
1332 src.xmlManager.add_simple_node(xml_history,
1335 attrib={"date" : date,
1339 src.xmlManager.add_simple_node(xml_history,
1342 attrib={"date" : date,
1346 def parse_csv_boards(self, today):
1347 """ Parse the csv file that describes the boards to produce and fill
1348 the dict d_input_boards that contain the csv file contain
1350 :param today int: the current day of the week
1352 # open the csv file and read its content
1354 with open(self.file_boards, 'r') as f:
1355 reader = csv.reader(f,delimiter=CSV_DELIMITER)
1358 # get the delimiter for the boards (empty line)
1359 boards_delimiter = [''] * len(l_read[0])
1360 # Make the list of boards, by splitting with the delimiter
1361 l_boards = [list(y) for x, y in itertools.groupby(l_read,
1362 lambda z: z == boards_delimiter) if not x]
1364 # loop over the csv lists of lines and get the rows, columns and jobs
1366 for input_board in l_boards:
1368 board_name = input_board[0][0]
1371 columns = input_board[0][1:]
1376 for line in input_board[1:]:
1379 for i, square in enumerate(line[1:]):
1382 days = square.split(DAYS_SEPARATOR)
1383 days = [int(day) for day in days]
1384 job = (row, columns[i])
1388 jobs_not_today.append(job)
1390 d_boards[board_name] = {"rows" : rows,
1391 "columns" : columns,
1393 "jobs_not_today" : jobs_not_today}
1395 self.d_input_boards = d_boards
1397 def update_xml_files(self, l_jobs):
1398 '''Write all the xml files with updated information about the jobs
1400 :param l_jobs List: the list of jobs that run today
1402 for xml_file in [self.xml_global_file] + list(
1403 self.d_xml_board_files.values()):
1404 self.update_xml_file(l_jobs, xml_file)
1407 self.write_xml_files()
1409 def update_xml_file(self, l_jobs, xml_file):
1410 '''update information about the jobs for the file xml_file
1412 :param l_jobs List: the list of jobs that run today
1413 :param xml_file xmlManager.XmlLogFile: the xml instance to update
1416 xml_node_jobs = xml_file.xmlroot.find('jobs')
1417 # Update the job names and status node
1419 # Find the node corresponding to the job and delete it
1420 # in order to recreate it
1421 for xmljob in xml_node_jobs.findall('job'):
1422 if xmljob.attrib['name'] == job.name:
1423 xml_node_jobs.remove(xmljob)
1427 T0 = time.strftime('%Y-%m-%d %H:%M:%S',
1428 time.localtime(job._T0))
1431 Tf = time.strftime('%Y-%m-%d %H:%M:%S',
1432 time.localtime(job._Tf))
1434 # recreate the job node
1435 xmlj = src.xmlManager.add_simple_node(xml_node_jobs,
1437 attrib={"name" : job.name})
1438 src.xmlManager.add_simple_node(xmlj, "machine", job.machine.name)
1439 src.xmlManager.add_simple_node(xmlj, "host", job.machine.host)
1440 src.xmlManager.add_simple_node(xmlj, "port", str(job.machine.port))
1441 src.xmlManager.add_simple_node(xmlj, "user", job.machine.user)
1442 xml_history = src.xmlManager.add_simple_node(xmlj, "history")
1443 for date, res_job, link in self.history[job.name]:
1444 src.xmlManager.add_simple_node(xml_history,
1447 attrib={"date" : date,
1450 src.xmlManager.add_simple_node(xmlj, "sat_path",
1451 job.machine.sat_path)
1452 src.xmlManager.add_simple_node(xmlj, "application", job.application)
1453 src.xmlManager.add_simple_node(xmlj, "distribution",
1454 job.machine.distribution)
1455 src.xmlManager.add_simple_node(xmlj, "board", job.board)
1456 src.xmlManager.add_simple_node(xmlj, "timeout", str(job.timeout))
1457 src.xmlManager.add_simple_node(xmlj, "commands",
1458 " ; ".join(job.commands))
1459 src.xmlManager.add_simple_node(xmlj, "state", job.get_status())
1460 src.xmlManager.add_simple_node(xmlj, "begin", T0)
1461 src.xmlManager.add_simple_node(xmlj, "end", Tf)
1462 src.xmlManager.add_simple_node(xmlj, "out",
1463 src.printcolors.cleancolor(job.out))
1464 src.xmlManager.add_simple_node(xmlj, "err",
1465 src.printcolors.cleancolor(job.err))
1466 src.xmlManager.add_simple_node(xmlj, "res", str(job.res_job))
1467 if len(job.remote_log_files) > 0:
1468 src.xmlManager.add_simple_node(xmlj,
1469 "remote_log_file_path",
1470 job.remote_log_files[0])
1472 src.xmlManager.add_simple_node(xmlj,
1473 "remote_log_file_path",
1476 xmlafter = src.xmlManager.add_simple_node(xmlj, "after", job.after)
1477 # get the job father
1478 if job.after is not None:
1481 if jb.name == job.after:
1484 if (job_father is not None and
1485 len(job_father.remote_log_files) > 0):
1486 link = job_father.remote_log_files[0]
1489 src.xmlManager.append_node_attrib(xmlafter, {"link" : link})
1491 # Verify that the job is to be done today regarding the input csv
1493 if job.board and job.board in self.d_input_boards.keys():
1495 for dist, appli in self.d_input_boards[job.board]["jobs"]:
1496 if (job.machine.distribution == dist
1497 and job.application == appli):
1499 src.xmlManager.add_simple_node(xmlj,
1504 src.xmlManager.add_simple_node(xmlj,
1510 xml_node_infos = xml_file.xmlroot.find('infos')
1511 src.xmlManager.append_node_attrib(xml_node_infos,
1513 datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")})
1517 def last_update(self, finish_status = "finished"):
1518 '''update information about the jobs for the file xml_file
1520 :param l_jobs List: the list of jobs that run today
1521 :param xml_file xmlManager.XmlLogFile: the xml instance to update
1523 for xml_file in [self.xml_global_file] + list(self.d_xml_board_files.values()):
1524 xml_node_infos = xml_file.xmlroot.find('infos')
1525 src.xmlManager.append_node_attrib(xml_node_infos,
1526 attrib={"JobsCommandStatus" : finish_status})
1528 self.write_xml_files()
1530 def write_xml_file(self, xml_file, stylesheet):
1531 ''' Write one xml file and the same file with prefix
1533 xml_file.write_tree(stylesheet)
1534 file_path = xml_file.logFile
1535 file_dir = os.path.dirname(file_path)
1536 file_name = os.path.basename(file_path)
1537 file_name_with_prefix = self.prefix + "_" + file_name
1538 xml_file.write_tree(stylesheet, os.path.join(file_dir,
1539 file_name_with_prefix))
1541 def write_xml_files(self):
1542 ''' Write the xml files
1544 self.write_xml_file(self.xml_global_file, STYLESHEET_GLOBAL)
1545 for xml_file in self.d_xml_board_files.values():
1546 self.write_xml_file(xml_file, STYLESHEET_BOARD)
1550 # Describes the command
1552 return _("The jobs command launches maintenances that are described"
1553 " in the dedicated jobs configuration file.\n\nexample:\nsat "
1554 "jobs --name my_jobs --publish")
1558 def run(args, runner, logger):
1560 (options, args) = parser.parse_args(args)
1562 l_cfg_dir = runner.cfg.PATHS.JOBPATH
1564 # list option : display all the available config files
1566 for cfg_dir in l_cfg_dir:
1567 if not options.no_label:
1568 logger.write("------ %s\n" %
1569 src.printcolors.printcHeader(cfg_dir))
1571 for f in sorted(os.listdir(cfg_dir)):
1572 if not f.endswith('.pyconf'):
1575 logger.write("%s\n" % cfilename)
1578 # Make sure the jobs_config option has been called
1579 if not options.jobs_cfg:
1580 message = _("The option --jobs_config is required\n")
1581 src.printcolors.printcError(message)
1584 # Find the file in the directories, unless it is a full path
1586 if os.path.exists(options.jobs_cfg):
1588 file_jobs_cfg = options.jobs_cfg
1590 for cfg_dir in l_cfg_dir:
1591 file_jobs_cfg = os.path.join(cfg_dir, options.jobs_cfg)
1592 if not file_jobs_cfg.endswith('.pyconf'):
1593 file_jobs_cfg += '.pyconf'
1595 if not os.path.exists(file_jobs_cfg):
1602 msg = _("The file configuration %(name_file)s was not found."
1603 "\nUse the --list option to get the possible files.")
1604 src.printcolors.printcError(msg)
1608 (_("Platform"), runner.cfg.VARS.dist),
1609 (_("File containing the jobs configuration"), file_jobs_cfg)
1611 src.print_info(logger, info)
1613 # Read the config that is in the file
1614 config_jobs = src.read_config_from_a_file(file_jobs_cfg)
1615 if options.only_jobs:
1616 l_jb = src.pyconf.Sequence()
1617 for jb in config_jobs.jobs:
1618 if jb.name in options.only_jobs:
1620 "Adding a job that was given in only_jobs option parameters")
1621 config_jobs.jobs = l_jb
1624 today_jobs = Jobs(runner,
1628 # SSH connection to all machines
1629 today_jobs.ssh_connection_all_machines()
1630 if options.test_connection:
1635 logger.write(src.printcolors.printcInfo(
1636 _("Initialize the xml boards : ")), 5)
1639 # Copy the stylesheets in the log directory
1640 log_dir = runner.cfg.USER.log_dir
1641 xsl_dir = os.path.join(runner.cfg.VARS.srcDir, 'xsl')
1643 files_to_copy.append(os.path.join(xsl_dir, STYLESHEET_GLOBAL))
1644 files_to_copy.append(os.path.join(xsl_dir, STYLESHEET_BOARD))
1645 files_to_copy.append(os.path.join(xsl_dir, "running.gif"))
1646 for file_path in files_to_copy:
1647 shutil.copy2(file_path, log_dir)
1649 # Instanciate the Gui in order to produce the xml files that contain all
1651 gui = Gui(runner.cfg.USER.log_dir,
1653 today_jobs.ljobs_not_today,
1654 runner.cfg.VARS.datehour,
1655 file_boards = options.input_boards)
1657 logger.write(src.printcolors.printcSuccess("OK"), 5)
1658 logger.write("\n\n", 5)
1661 # Display the list of the xml files
1662 logger.write(src.printcolors.printcInfo(("Here is the list of published"
1664 logger.write("%s\n" % gui.xml_global_file.logFile, 4)
1665 for board in gui.d_xml_board_files.keys():
1666 file_path = gui.d_xml_board_files[board].logFile
1667 file_name = os.path.basename(file_path)
1668 logger.write("%s\n" % file_path, 4)
1669 logger.add_link(file_name, "board", 0, board)
1671 logger.write("\n", 4)
1673 today_jobs.gui = gui
1677 # Run all the jobs contained in config_jobs
1678 today_jobs.run_jobs()
1679 except KeyboardInterrupt:
1681 logger.write("\n\n%s\n\n" %
1682 (src.printcolors.printcWarning(_("Forced interruption"))), 1)
1687 msg = _("Killing the running jobs and trying"
1688 " to get the corresponding logs\n")
1689 logger.write(src.printcolors.printcWarning(msg))
1691 # find the potential not finished jobs and kill them
1692 for jb in today_jobs.ljobs:
1693 if not jb.has_finished():
1696 jb.kill_remote_process()
1697 except Exception as e:
1698 msg = _("Failed to kill job %s: %s\n" % (jb.name, e))
1699 logger.write(src.printcolors.printcWarning(msg))
1700 if jb.res_job != "0":
1704 today_jobs.gui.last_update(_("Forced interruption"))
1707 today_jobs.gui.last_update()
1708 # Output the results
1709 today_jobs.write_all_results()