3 # Copyright (C) 2010-2013 CEA/DEN
5 # This library is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation; either
8 # version 2.1 of the License.
10 # This library is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 # Lesser General Public License for more details.
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this library; if not, write to the Free Software
17 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
30 STYLESHEET_GLOBAL = "jobs_global_report.xsl"
31 STYLESHEET_BOARD = "jobs_board_report.xsl"
36 parser = src.options.Options()
38 parser.add_option('n', 'name', 'string', 'jobs_cfg',
39 _('Mandatory: The name of the config file that contains'
40 ' the jobs configuration'))
41 parser.add_option('o', 'only_jobs', 'list2', 'only_jobs',
42 _('Optional: the list of jobs to launch, by their name. '))
43 parser.add_option('l', 'list', 'boolean', 'list',
44 _('Optional: list all available config files.'))
45 parser.add_option('t', 'test_connection', 'boolean', 'test_connection',
46 _("Optional: try to connect to the machines. "
47 "Not executing the jobs."),
49 parser.add_option('p', 'publish', 'boolean', 'publish',
50 _("Optional: generate an xml file that can be read in a "
51 "browser to display the jobs status."),
53 parser.add_option('i', 'input_boards', 'string', 'input_boards', _("Optional: "
54 "the path to csv file that contain "
55 "the expected boards."),"")
56 parser.add_option('', 'completion', 'boolean', 'no_label',
57 _("Optional (internal use): do not print labels, Works only "
61 class Machine(object):
62 '''Class to manage a ssh connection on a machine
70 sat_path="salomeTools"):
74 self.distribution = None # Will be filled after copying SAT on the machine
76 self.password = passwd
77 self.sat_path = sat_path
78 self.ssh = paramiko.SSHClient()
79 self._connection_successful = None
81 def connect(self, logger):
82 '''Initiate the ssh connection to the remote machine
84 :param logger src.logger.Logger: The logger instance
89 self._connection_successful = False
90 self.ssh.load_system_host_keys()
91 self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
93 self.ssh.connect(self.host,
96 password = self.password)
97 except paramiko.AuthenticationException:
98 message = src.KO_STATUS + _("Authentication failed")
99 except paramiko.BadHostKeyException:
100 message = (src.KO_STATUS +
101 _("The server's host key could not be verified"))
102 except paramiko.SSHException:
103 message = ( _("SSHException error connecting or "
104 "establishing an SSH session"))
106 message = ( _("Error connecting or establishing an SSH session"))
108 self._connection_successful = True
112 def successfully_connected(self, logger):
113 '''Verify if the connection to the remote machine has succeed
115 :param logger src.logger.Logger: The logger instance
116 :return: True if the connection has succeed, False if not
119 if self._connection_successful == None:
120 message = _("Warning : trying to ask if the connection to "
121 "(name: %s host: %s, port: %s, user: %s) is OK whereas there were"
122 " no connection request" %
123 (self.name, self.host, self.port, self.user))
124 logger.write( src.printcolors.printcWarning(message))
125 return self._connection_successful
127 def copy_sat(self, sat_local_path, job_file):
128 '''Copy salomeTools to the remote machine in self.sat_path
132 # open a sftp connection
133 self.sftp = self.ssh.open_sftp()
134 # Create the sat directory on remote machine if it is not existing
135 self.mkdir(self.sat_path, ignore_existing=True)
137 self.put_dir(sat_local_path, self.sat_path, filters = ['.git'])
138 # put the job configuration file in order to make it reachable
139 # on the remote machine
140 self.sftp.put(job_file, os.path.join(self.sat_path,
141 ".jobs_command_file.pyconf"))
142 except Exception as e:
144 self._connection_successful = False
148 def put_dir(self, source, target, filters = []):
149 ''' Uploads the contents of the source directory to the target path. The
150 target directory needs to exists. All sub-directories in source are
151 created under target.
153 for item in os.listdir(source):
156 source_path = os.path.join(source, item)
157 destination_path = os.path.join(target, item)
158 if os.path.islink(source_path):
159 linkto = os.readlink(source_path)
161 self.sftp.symlink(linkto, destination_path)
162 self.sftp.chmod(destination_path,
163 os.stat(source_path).st_mode)
167 if os.path.isfile(source_path):
168 self.sftp.put(source_path, destination_path)
169 self.sftp.chmod(destination_path,
170 os.stat(source_path).st_mode)
172 self.mkdir(destination_path, ignore_existing=True)
173 self.put_dir(source_path, destination_path)
175 def mkdir(self, path, mode=511, ignore_existing=False):
176 ''' Augments mkdir by adding an option to not fail
180 self.sftp.mkdir(path, mode)
187 def exec_command(self, command, logger):
188 '''Execute the command on the remote machine
190 :param command str: The command to be run
191 :param logger src.logger.Logger: The logger instance
192 :return: the stdin, stdout, and stderr of the executing command,
194 :rtype: (paramiko.channel.ChannelFile, paramiko.channel.ChannelFile,
195 paramiko.channel.ChannelFile)
198 # Does not wait the end of the command
199 (stdin, stdout, stderr) = self.ssh.exec_command(command)
200 except paramiko.SSHException:
201 message = src.KO_STATUS + _(
202 ": the server failed to execute the command\n")
203 logger.write( src.printcolors.printcError(message))
204 return (None, None, None)
206 logger.write( src.printcolors.printcError(src.KO_STATUS + '\n'))
207 return (None, None, None)
209 return (stdin, stdout, stderr)
212 '''Close the ssh connection
218 def write_info(self, logger):
219 '''Prints the informations relative to the machine in the logger
220 (terminal traces and log file)
222 :param logger src.logger.Logger: The logger instance
226 logger.write("host : " + self.host + "\n")
227 logger.write("port : " + str(self.port) + "\n")
228 logger.write("user : " + str(self.user) + "\n")
229 if self.successfully_connected(logger):
230 status = src.OK_STATUS
232 status = src.KO_STATUS
233 logger.write("Connection : " + status + "\n\n")
237 '''Class to manage one job
239 def __init__(self, name, machine, application, board,
240 commands, timeout, config, logger, after=None, prefix=None):
243 self.machine = machine
245 self.timeout = timeout
246 self.application = application
250 # The list of log files to download from the remote machine
251 self.remote_log_files = []
253 # The remote command status
254 # -1 means that it has not been launched,
255 # 0 means success and 1 means fail
257 self.cancelled = False
261 self._has_begun = False
262 self._has_finished = False
263 self._has_timouted = False
264 self._stdin = None # Store the command inputs field
265 self._stdout = None # Store the command outputs field
266 self._stderr = None # Store the command errors field
271 name_remote_jobs_pyconf = ".jobs_command_file.pyconf"
272 self.commands = commands
273 self.command = (os.path.join(self.machine.sat_path, "sat") +
275 os.path.join(self.machine.sat_path,
276 "list_log_files.txt") +
277 " job --jobs_config " +
278 os.path.join(self.machine.sat_path,
279 ".jobs_command_file.pyconf") +
283 self.command = prefix + ' "' + self.command +'"'
286 """ Get the pid(s) corresponding to the command that have been launched
287 On the remote machine
289 :return: The list of integers corresponding to the found pids
293 cmd_pid = 'ps aux | grep "' + self.command + '" | awk \'{print $2}\''
294 (_, out_pid, _) = self.machine.exec_command(cmd_pid, self.logger)
295 pids_cmd = out_pid.readlines()
296 pids_cmd = [str(src.only_numbers(pid)) for pid in pids_cmd]
300 def kill_remote_process(self, wait=1):
301 '''Kills the process on the remote machine.
303 :return: (the output of the kill, the error of the kill)
307 pids = self.get_pids()
308 cmd_kill = " ; ".join([("kill -2 " + pid) for pid in pids])
309 (_, out_kill, err_kill) = self.machine.exec_command(cmd_kill,
312 return (out_kill, err_kill)
315 '''Returns True if the job has already begun
317 :return: True if the job has already begun
320 return self._has_begun
322 def has_finished(self):
323 '''Returns True if the job has already finished
324 (i.e. all the commands have been executed)
325 If it is finished, the outputs are stored in the fields out and err.
327 :return: True if the job has already finished
331 # If the method has already been called and returned True
332 if self._has_finished:
335 # If the job has not begun yet
336 if not self.has_begun():
339 if self._stdout.channel.closed:
340 self._has_finished = True
341 # Store the result outputs
342 self.out += self._stdout.read().decode()
343 self.err += self._stderr.read().decode()
345 self._Tf = time.time()
346 # And get the remote command status and log files
349 return self._has_finished
351 def get_log_files(self):
352 """Get the log files produced by the command launched
353 on the remote machine, and put it in the log directory of the user,
354 so they can be accessible from
356 # Do not get the files if the command is not finished
357 if not self.has_finished():
358 msg = _("Trying to get log files whereas the job is not finished.")
359 self.logger.write(src.printcolors.printcWarning(msg))
362 # First get the file that contains the list of log files to get
363 tmp_file_path = src.get_tmp_filename(self.config, "list_log_files.txt")
364 remote_path = os.path.join(self.machine.sat_path, "list_log_files.txt")
365 self.machine.sftp.get(
369 # Read the file and get the result of the command and all the log files
371 fstream_tmp = open(tmp_file_path, "r")
372 file_lines = fstream_tmp.readlines()
373 file_lines = [line.replace("\n", "") for line in file_lines]
375 os.remove(tmp_file_path)
378 # The first line is the result of the command (0 success or 1 fail)
379 self.res_job = file_lines[0]
380 except Exception as e:
381 self.err += _("Unable to get status from remote file %s: %s" %
382 (remote_path, str(e)))
384 for i, job_path_remote in enumerate(file_lines[1:]):
386 # For each command, there is two files to get :
387 # 1- The xml file describing the command and giving the
389 # 2- The txt file containing the system command traces (like
390 # traces produced by the "make" command)
391 # 3- In case of the test command, there is another file to get :
392 # the xml board that contain the test results
393 dirname = os.path.basename(os.path.dirname(job_path_remote))
394 if dirname != 'OUT' and dirname != 'TEST':
396 local_path = os.path.join(os.path.dirname(
397 self.logger.logFilePath),
398 os.path.basename(job_path_remote))
399 if i==0: # The first is the job command
400 self.logger.add_link(os.path.basename(job_path_remote),
404 elif dirname == 'OUT':
406 local_path = os.path.join(os.path.dirname(
407 self.logger.logFilePath),
409 os.path.basename(job_path_remote))
410 elif dirname == 'TEST':
412 local_path = os.path.join(os.path.dirname(
413 self.logger.logFilePath),
415 os.path.basename(job_path_remote))
418 if not os.path.exists(local_path):
419 self.machine.sftp.get(job_path_remote, local_path)
420 self.remote_log_files.append(local_path)
421 except Exception as e:
422 self.err += _("Unable to get %s log file from remote: %s" %
423 (str(job_path_remote),
426 def has_failed(self):
427 '''Returns True if the job has failed.
428 A job is considered as failed if the machine could not be reached,
429 if the remote command failed,
430 or if the job finished with a time out.
432 :return: True if the job has failed
435 if not self.has_finished():
437 if not self.machine.successfully_connected(self.logger):
439 if self.is_timeout():
441 if self.res_job == "1":
446 """In case of a failing job, one has to cancel every job that depend
447 on it. This method put the job as failed and will not be executed.
451 self._has_begun = True
452 self._has_finished = True
453 self.cancelled = True
454 self.out += _("This job was not launched because its father has failed.")
455 self.err += _("This job was not launched because its father has failed.")
457 def is_running(self):
458 '''Returns True if the job commands are running
460 :return: True if the job is running
463 return self.has_begun() and not self.has_finished()
465 def is_timeout(self):
466 '''Returns True if the job commands has finished with timeout
468 :return: True if the job has finished with timeout
471 return self._has_timouted
473 def time_elapsed(self):
474 """Get the time elapsed since the job launching
476 :return: The number of seconds
479 if not self.has_begun():
482 return T_now - self._T0
484 def check_time(self):
485 """Verify that the job has not exceeded its timeout.
486 If it has, kill the remote command and consider the job as finished.
488 if not self.has_begun():
490 if self.time_elapsed() > self.timeout:
491 self._has_finished = True
492 self._has_timouted = True
493 self._Tf = time.time()
495 (out_kill, _) = self.kill_remote_process()
496 self.out += "TIMEOUT \n" + out_kill.read().decode()
497 self.err += "TIMEOUT : %s seconds elapsed\n" % str(self.timeout)
500 except Exception as e:
501 self.err += _("Unable to get remote log files: %s" % e)
503 def total_duration(self):
504 """Give the total duration of the job
506 :return: the total duration of the job in seconds
509 return self._Tf - self._T0
512 """Launch the job by executing the remote command.
515 # Prevent multiple run
517 msg = _("Warning: A job can only be launched one time")
518 msg2 = _("Trying to launch the job \"%s\" whereas it has "
519 "already been launched." % self.name)
520 self.logger.write(src.printcolors.printcWarning("%s\n%s\n" % (msg,
524 # Do not execute the command if the machine could not be reached
525 if not self.machine.successfully_connected(self.logger):
526 self._has_finished = True
528 self.err += ("Connection to machine (name : %s, host: %s, port:"
529 " %s, user: %s) has failed\nUse the log command "
530 "to get more information."
531 % (self.machine.name,
536 # Usual case : Launch the command on remote machine
537 self._T0 = time.time()
538 self._stdin, self._stdout, self._stderr = self.machine.exec_command(
541 # If the results are not initialized, finish the job
542 if (self._stdin, self._stdout, self._stderr) == (None, None, None):
543 self._has_finished = True
544 self._Tf = time.time()
546 self.err += "The server failed to execute the command"
548 # Put the beginning flag to true.
549 self._has_begun = True
551 def write_results(self):
552 """Display on the terminal all the job's information
554 self.logger.write("name : " + self.name + "\n")
556 self.logger.write("after : %s\n" % self.after)
557 self.logger.write("Time elapsed : %4imin %2is \n" %
558 (self.total_duration()//60 , self.total_duration()%60))
560 self.logger.write("Begin time : %s\n" %
561 time.strftime('%Y-%m-%d %H:%M:%S',
562 time.localtime(self._T0)) )
564 self.logger.write("End time : %s\n\n" %
565 time.strftime('%Y-%m-%d %H:%M:%S',
566 time.localtime(self._Tf)) )
568 machine_head = "Informations about connection :\n"
569 underline = (len(machine_head) - 2) * "-"
570 self.logger.write(src.printcolors.printcInfo(
571 machine_head+underline+"\n"))
572 self.machine.write_info(self.logger)
574 self.logger.write(src.printcolors.printcInfo("out : \n"))
576 self.logger.write("Unable to get output\n")
578 self.logger.write(self.out + "\n")
579 self.logger.write(src.printcolors.printcInfo("err : \n"))
580 self.logger.write(self.err + "\n")
582 def get_status(self):
583 """Get the status of the job (used by the Gui for xml display)
585 :return: The current status of the job
588 if not self.machine.successfully_connected(self.logger):
589 return "SSH connection KO"
590 if not self.has_begun():
591 return "Not launched"
594 if self.is_running():
595 return "running since " + time.strftime('%Y-%m-%d %H:%M:%S',
596 time.localtime(self._T0))
597 if self.has_finished():
598 if self.is_timeout():
599 return "Timeout since " + time.strftime('%Y-%m-%d %H:%M:%S',
600 time.localtime(self._Tf))
601 return "Finished since " + time.strftime('%Y-%m-%d %H:%M:%S',
602 time.localtime(self._Tf))
605 '''Class to manage the jobs to be run
612 lenght_columns = 20):
613 # The jobs configuration
614 self.cfg_jobs = config_jobs
615 self.job_file_path = job_file_path
616 # The machine that will be used today
618 # The list of machine (hosts, port) that will be used today
619 # (a same host can have several machine instances since there
620 # can be several ssh parameters)
622 # The jobs to be launched today
624 # The jobs that will not be launched today
625 self.ljobs_not_today = []
628 self.len_columns = lenght_columns
630 # the list of jobs that have not been run yet
631 self._l_jobs_not_started = []
632 # the list of jobs that have already ran
633 self._l_jobs_finished = []
634 # the list of jobs that are running
635 self._l_jobs_running = []
637 self.determine_jobs_and_machines()
639 def define_job(self, job_def, machine):
640 '''Takes a pyconf job definition and a machine (from class machine)
641 and returns the job instance corresponding to the definition.
643 :param job_def src.config.Mapping: a job definition
644 :param machine machine: the machine on which the job will run
645 :return: The corresponding job in a job class instance
649 cmmnds = job_def.commands
650 if not "timeout" in job_def:
651 timeout = 4*60*60 # default timeout = 4h
653 timeout = job_def.timeout
655 if 'after' in job_def:
656 after = job_def.after
658 if 'application' in job_def:
659 application = job_def.application
661 if 'board' in job_def:
662 board = job_def.board
664 if "prefix" in job_def:
665 prefix = job_def.prefix
678 def determine_jobs_and_machines(self):
679 '''Function that reads the pyconf jobs definition and instantiates all
680 the machines and jobs to be done today.
685 today = datetime.date.weekday(datetime.date.today())
688 for job_def in self.cfg_jobs.jobs :
690 if not "machine" in job_def:
691 msg = _('WARNING: The job "%s" do not have the key '
692 '"machine", this job is ignored.\n\n' % job_def.name)
693 self.logger.write(src.printcolors.printcWarning(msg))
695 name_machine = job_def.machine
698 for mach in self.lmachines:
699 if mach.name == name_machine:
703 if a_machine == None:
704 for machine_def in self.cfg_jobs.machines:
705 if machine_def.name == name_machine:
706 if 'host' not in machine_def:
707 host = self.runner.cfg.VARS.hostname
709 host = machine_def.host
711 if 'user' not in machine_def:
712 user = self.runner.cfg.VARS.user
714 user = machine_def.user
716 if 'port' not in machine_def:
719 port = machine_def.port
721 if 'password' not in machine_def:
724 passwd = machine_def.password
726 if 'sat_path' not in machine_def:
727 sat_path = "salomeTools"
729 sat_path = machine_def.sat_path
740 self.lmachines.append(a_machine)
741 if (host, port) not in host_list:
742 host_list.append((host, port))
744 if a_machine == None:
745 msg = _("WARNING: The job \"%(job_name)s\" requires the "
746 "machine \"%(machine_name)s\" but this machine "
747 "is not defined in the configuration file.\n"
748 "The job will not be launched")
749 self.logger.write(src.printcolors.printcWarning(msg))
751 a_job = self.define_job(job_def, a_machine)
753 if today in job_def.when:
754 self.ljobs.append(a_job)
755 else: # today in job_def.when
756 self.ljobs_not_today.append(a_job)
758 self.lhosts = host_list
760 def ssh_connection_all_machines(self, pad=50):
761 '''Function that do the ssh connection to every machine
767 self.logger.write(src.printcolors.printcInfo((
768 "Establishing connection with all the machines :\n")))
769 for machine in self.lmachines:
770 # little algorithm in order to display traces
771 begin_line = (_("Connection to %s: " % machine.name))
772 if pad - len(begin_line) < 0:
775 endline = (pad - len(begin_line)) * "." + " "
777 step = "SSH connection"
778 self.logger.write( begin_line + endline + step)
780 # the call to the method that initiate the ssh connection
781 msg = machine.connect(self.logger)
783 # Copy salomeTools to the remote machine
784 if machine.successfully_connected(self.logger):
785 step = _("Remove SAT")
786 self.logger.write('\r%s%s%s' % (begin_line, endline, 20 * " "),3)
787 self.logger.write('\r%s%s%s' % (begin_line, endline, step), 3)
788 (__, out_dist, __) = machine.exec_command(
789 "rm -rf %s" % machine.sat_path,
795 self.logger.write('\r%s%s%s' % (begin_line, endline, 20 * " "),3)
796 self.logger.write('\r%s%s%s' % (begin_line, endline, step), 3)
798 res_copy = machine.copy_sat(self.runner.cfg.VARS.salometoolsway,
800 # get the remote machine distribution using a sat command
801 (__, out_dist, __) = machine.exec_command(
802 os.path.join(machine.sat_path,
803 "sat config --value VARS.dist --no_label"),
805 machine.distribution = out_dist.read().decode().replace("\n",
807 # Print the status of the copy
809 self.logger.write('\r%s' %
810 ((len(begin_line)+len(endline)+20) * " "), 3)
811 self.logger.write('\r%s%s%s' %
814 src.printcolors.printc(src.OK_STATUS)), 3)
816 self.logger.write('\r%s' %
817 ((len(begin_line)+len(endline)+20) * " "), 3)
818 self.logger.write('\r%s%s%s %s' %
821 src.printcolors.printc(src.KO_STATUS),
822 _("Copy of SAT failed: %s" % res_copy)), 3)
824 self.logger.write('\r%s' %
825 ((len(begin_line)+len(endline)+20) * " "), 3)
826 self.logger.write('\r%s%s%s %s' %
829 src.printcolors.printc(src.KO_STATUS),
831 self.logger.write("\n", 3)
833 self.logger.write("\n")
836 def is_occupied(self, hostname):
837 '''Function that returns True if a job is running on
838 the machine defined by its host and its port.
840 :param hostname (str, int): the pair (host, port)
841 :return: the job that is running on the host,
842 or false if there is no job running on the host.
847 for jb in self.ljobs:
848 if jb.machine.host == host and jb.machine.port == port:
853 def update_jobs_states_list(self):
854 '''Function that updates the lists that store the currently
855 running jobs and the jobs that have already finished.
860 jobs_finished_list = []
861 jobs_running_list = []
862 for jb in self.ljobs:
864 jobs_running_list.append(jb)
866 if jb.has_finished():
867 jobs_finished_list.append(jb)
869 nb_job_finished_before = len(self._l_jobs_finished)
870 self._l_jobs_finished = jobs_finished_list
871 self._l_jobs_running = jobs_running_list
873 nb_job_finished_now = len(self._l_jobs_finished)
875 return nb_job_finished_now > nb_job_finished_before
877 def cancel_dependencies_of_failing_jobs(self):
878 '''Function that cancels all the jobs that depend on a failing one.
884 for job in self.ljobs:
885 if job.after is None:
887 father_job = self.find_job_that_has_name(job.after)
888 if father_job is not None and father_job.has_failed():
891 def find_job_that_has_name(self, name):
892 '''Returns the job by its name.
894 :param name str: a job name
895 :return: the job that has the name.
898 for jb in self.ljobs:
901 # the following is executed only if the job was not found
904 def str_of_length(self, text, length):
905 '''Takes a string text of any length and returns
906 the most close string of length "length".
908 :param text str: any string
909 :param length int: a length for the returned string
910 :return: the most close string of length "length"
913 if len(text) > length:
914 text_out = text[:length-3] + '...'
916 diff = length - len(text)
917 before = " " * (diff//2)
918 after = " " * (diff//2 + diff%2)
919 text_out = before + text + after
923 def display_status(self, len_col):
924 '''Takes a lenght and construct the display of the current status
925 of the jobs in an array that has a column for each host.
926 It displays the job that is currently running on the host
929 :param len_col int: the size of the column
935 for host_port in self.lhosts:
936 jb = self.is_occupied(host_port)
937 if not jb: # nothing running on the host
938 empty = self.str_of_length("empty", len_col)
939 display_line += "|" + empty
941 display_line += "|" + src.printcolors.printcInfo(
942 self.str_of_length(jb.name, len_col))
944 self.logger.write("\r" + display_line + "|")
949 '''The main method. Runs all the jobs on every host.
950 For each host, at a given time, only one job can be running.
951 The jobs that have the field after (that contain the job that has
952 to be run before it) are run after the previous job.
953 This method stops when all the jobs are finished.
960 self.logger.write(src.printcolors.printcInfo(
961 _('Executing the jobs :\n')))
963 for host_port in self.lhosts:
966 if port == 22: # default value
967 text_line += "|" + self.str_of_length(host, self.len_columns)
969 text_line += "|" + self.str_of_length(
970 "("+host+", "+str(port)+")", self.len_columns)
972 tiret_line = " " + "-"*(len(text_line)-1) + "\n"
973 self.logger.write(tiret_line)
974 self.logger.write(text_line + "|\n")
975 self.logger.write(tiret_line)
978 # The infinite loop that runs the jobs
979 l_jobs_not_started = src.deepcopy_list(self.ljobs)
980 while len(self._l_jobs_finished) != len(self.ljobs):
981 new_job_start = False
982 for host_port in self.lhosts:
984 if self.is_occupied(host_port):
987 for jb in l_jobs_not_started:
988 if (jb.machine.host, jb.machine.port) != host_port:
992 l_jobs_not_started.remove(jb)
996 jb_before = self.find_job_that_has_name(jb.after)
997 if jb_before is None:
999 msg = _("This job was not launched because its "
1000 "father is not in the jobs list.")
1004 if jb_before.has_finished():
1006 l_jobs_not_started.remove(jb)
1007 new_job_start = True
1009 self.cancel_dependencies_of_failing_jobs()
1010 new_job_finished = self.update_jobs_states_list()
1012 if new_job_start or new_job_finished:
1014 self.gui.update_xml_files(self.ljobs)
1015 # Display the current status
1016 self.display_status(self.len_columns)
1018 # Make sure that the proc is not entirely busy
1021 self.logger.write("\n")
1022 self.logger.write(tiret_line)
1023 self.logger.write("\n\n")
1026 self.gui.update_xml_files(self.ljobs)
1027 self.gui.last_update()
1029 def write_all_results(self):
1030 '''Display all the jobs outputs.
1036 for jb in self.ljobs:
1037 self.logger.write(src.printcolors.printcLabel(
1038 "#------- Results for job %s -------#\n" % jb.name))
1040 self.logger.write("\n\n")
1043 '''Class to manage the the xml data that can be displayed in a browser to
1056 :param xml_dir_path str: The path to the directory where to put
1057 the xml resulting files
1058 :param l_jobs List: the list of jobs that run today
1059 :param l_jobs_not_today List: the list of jobs that do not run today
1060 :param file_boards str: the file path from which to read the
1063 # The logging instance
1064 self.logger = logger
1066 # The prefix to add to the xml files : date_hour
1067 self.prefix = prefix
1069 # The path of the csv files to read to fill the expected boards
1070 self.file_boards = file_boards
1072 if file_boards != "":
1073 today = datetime.date.weekday(datetime.date.today())
1074 self.parse_csv_boards(today)
1076 self.d_input_boards = {}
1078 # The path of the global xml file
1079 self.xml_dir_path = xml_dir_path
1080 # Initialize the xml files
1081 self.global_name = "global_report"
1082 xml_global_path = os.path.join(self.xml_dir_path,
1083 self.global_name + ".xml")
1084 self.xml_global_file = src.xmlManager.XmlLogFile(xml_global_path,
1087 # Find history for each job
1089 self.find_history(l_jobs, l_jobs_not_today)
1091 # The xml files that corresponds to the boards.
1092 # {name_board : xml_object}}
1093 self.d_xml_board_files = {}
1095 # Create the lines and columns
1096 self.initialize_boards(l_jobs, l_jobs_not_today)
1098 # Write the xml file
1099 self.update_xml_files(l_jobs)
1101 def add_xml_board(self, name):
1102 '''Add a board to the board list
1103 :param name str: the board name
1105 xml_board_path = os.path.join(self.xml_dir_path, name + ".xml")
1106 self.d_xml_board_files[name] = src.xmlManager.XmlLogFile(
1109 self.d_xml_board_files[name].add_simple_node("distributions")
1110 self.d_xml_board_files[name].add_simple_node("applications")
1111 self.d_xml_board_files[name].add_simple_node("board", text=name)
1113 def initialize_boards(self, l_jobs, l_jobs_not_today):
1114 '''Get all the first information needed for each file and write the
1115 first version of the files
1116 :param l_jobs List: the list of jobs that run today
1117 :param l_jobs_not_today List: the list of jobs that do not run today
1119 # Get the boards to fill and put it in a dictionary
1120 # {board_name : xml instance corresponding to the board}
1121 for job in l_jobs + l_jobs_not_today:
1123 if (board is not None and
1124 board not in self.d_xml_board_files.keys()):
1125 self.add_xml_board(board)
1127 # Verify that the boards given as input are done
1128 for board in list(self.d_input_boards.keys()):
1129 if board not in self.d_xml_board_files:
1130 self.add_xml_board(board)
1131 root_node = self.d_xml_board_files[board].xmlroot
1132 src.xmlManager.append_node_attrib(root_node,
1133 {"input_file" : self.file_boards})
1135 # Loop over all jobs in order to get the lines and columns for each
1139 for board in self.d_xml_board_files:
1141 d_application[board] = []
1145 for job in l_jobs + l_jobs_not_today:
1147 if (job.machine.host, job.machine.port) not in l_hosts_ports:
1148 l_hosts_ports.append((job.machine.host, job.machine.port))
1150 distrib = job.machine.distribution
1151 application = job.application
1153 board_job = job.board
1156 for board in self.d_xml_board_files:
1157 if board_job == board:
1158 if distrib is not None and distrib not in d_dist[board]:
1159 d_dist[board].append(distrib)
1160 src.xmlManager.add_simple_node(
1161 self.d_xml_board_files[board].xmlroot.find(
1164 attrib={"name" : distrib})
1166 if board_job == board:
1167 if (application is not None and
1168 application not in d_application[board]):
1169 d_application[board].append(application)
1170 src.xmlManager.add_simple_node(
1171 self.d_xml_board_files[board].xmlroot.find(
1175 "name" : application})
1177 # Verify that there are no missing application or distribution in the
1178 # xml board files (regarding the input boards)
1179 for board in self.d_xml_board_files:
1180 l_dist = d_dist[board]
1181 if board not in self.d_input_boards.keys():
1183 for dist in self.d_input_boards[board]["rows"]:
1184 if dist not in l_dist:
1185 src.xmlManager.add_simple_node(
1186 self.d_xml_board_files[board].xmlroot.find(
1189 attrib={"name" : dist})
1190 l_appli = d_application[board]
1191 for appli in self.d_input_boards[board]["columns"]:
1192 if appli not in l_appli:
1193 src.xmlManager.add_simple_node(
1194 self.d_xml_board_files[board].xmlroot.find(
1197 attrib={"name" : appli})
1199 # Initialize the hosts_ports node for the global file
1200 self.xmlhosts_ports = self.xml_global_file.add_simple_node(
1202 for host, port in l_hosts_ports:
1203 host_port = "%s:%i" % (host, port)
1204 src.xmlManager.add_simple_node(self.xmlhosts_ports,
1206 attrib={"name" : host_port})
1208 # Initialize the jobs node in all files
1209 for xml_file in [self.xml_global_file] + list(
1210 self.d_xml_board_files.values()):
1211 xml_jobs = xml_file.add_simple_node("jobs")
1212 # Get the jobs present in the config file but
1213 # that will not be launched today
1214 self.put_jobs_not_today(l_jobs_not_today, xml_jobs)
1216 # add also the infos node
1217 xml_file.add_simple_node("infos",
1218 attrib={"name" : "last update",
1219 "JobsCommandStatus" : "running"})
1221 # and put the history node
1222 history_node = xml_file.add_simple_node("history")
1223 name_board = os.path.basename(xml_file.logFile)[:-len(".xml")]
1224 # serach for board files
1225 expression = "^[0-9]{8}_+[0-9]{6}_" + name_board + ".xml$"
1226 oExpr = re.compile(expression)
1227 # Get the list of xml borad files that are in the log directory
1228 for file_name in os.listdir(self.xml_dir_path):
1229 if oExpr.search(file_name):
1230 date = os.path.basename(file_name).split("_")[0]
1231 file_path = os.path.join(self.xml_dir_path, file_name)
1232 src.xmlManager.add_simple_node(history_node,
1235 attrib={"date" : date})
1238 # Find in each board the squares that needs to be filled regarding the
1239 # input csv files but that are not covered by a today job
1240 for board in self.d_input_boards.keys():
1241 xml_root_board = self.d_xml_board_files[board].xmlroot
1242 # Find the missing jobs for today
1243 xml_missing = src.xmlManager.add_simple_node(xml_root_board,
1245 for row, column in self.d_input_boards[board]["jobs"]:
1248 if (job.application == column and
1249 job.machine.distribution == row):
1253 src.xmlManager.add_simple_node(xml_missing,
1255 attrib={"distribution" : row,
1256 "application" : column })
1257 # Find the missing jobs not today
1258 xml_missing_not_today = src.xmlManager.add_simple_node(
1260 "missing_jobs_not_today")
1261 for row, column in self.d_input_boards[board]["jobs_not_today"]:
1263 for job in l_jobs_not_today:
1264 if (job.application == column and
1265 job.machine.distribution == row):
1269 src.xmlManager.add_simple_node(xml_missing_not_today,
1271 attrib={"distribution" : row,
1272 "application" : column })
1274 def find_history(self, l_jobs, l_jobs_not_today):
1275 """find, for each job, in the existent xml boards the results for the
1276 job. Store the results in the dictionnary self.history = {name_job :
1277 list of (date, status, list links)}
1279 :param l_jobs List: the list of jobs to run today
1280 :param l_jobs_not_today List: the list of jobs that do not run today
1282 # load the all the history
1283 expression = "^[0-9]{8}_+[0-9]{6}_" + self.global_name + ".xml$"
1284 oExpr = re.compile(expression)
1285 # Get the list of global xml that are in the log directory
1287 for file_name in os.listdir(self.xml_dir_path):
1288 if oExpr.search(file_name):
1289 file_path = os.path.join(self.xml_dir_path, file_name)
1291 global_xml = src.xmlManager.ReadXmlFile(file_path)
1292 l_globalxml.append(global_xml)
1293 except Exception as e:
1294 msg = _("\nWARNING: the file %s can not be read, it will be "
1295 "ignored\n%s" % (file_path, e))
1296 self.logger.write("%s\n" % src.printcolors.printcWarning(
1300 # Construct the dictionnary self.history
1301 for job in l_jobs + l_jobs_not_today:
1303 for global_xml in l_globalxml:
1304 date = os.path.basename(global_xml.filePath).split("_")[0]
1305 global_root_node = global_xml.xmlroot.find("jobs")
1306 job_node = src.xmlManager.find_node_by_attrib(
1312 if job_node.find("remote_log_file_path") is not None:
1313 link = job_node.find("remote_log_file_path").text
1314 res_job = job_node.find("res").text
1315 if link != "nothing":
1316 l_links.append((date, res_job, link))
1317 l_links = sorted(l_links, reverse=True)
1318 self.history[job.name] = l_links
1320 def put_jobs_not_today(self, l_jobs_not_today, xml_node_jobs):
1321 '''Get all the first information needed for each file and write the
1322 first version of the files
1324 :param xml_node_jobs etree.Element: the node corresponding to a job
1325 :param l_jobs_not_today List: the list of jobs that do not run today
1327 for job in l_jobs_not_today:
1328 xmlj = src.xmlManager.add_simple_node(xml_node_jobs,
1330 attrib={"name" : job.name})
1331 src.xmlManager.add_simple_node(xmlj, "application", job.application)
1332 src.xmlManager.add_simple_node(xmlj,
1334 job.machine.distribution)
1335 src.xmlManager.add_simple_node(xmlj, "board", job.board)
1336 src.xmlManager.add_simple_node(xmlj,
1337 "commands", " ; ".join(job.commands))
1338 src.xmlManager.add_simple_node(xmlj, "state", "Not today")
1339 src.xmlManager.add_simple_node(xmlj, "machine", job.machine.name)
1340 src.xmlManager.add_simple_node(xmlj, "host", job.machine.host)
1341 src.xmlManager.add_simple_node(xmlj, "port", str(job.machine.port))
1342 src.xmlManager.add_simple_node(xmlj, "user", job.machine.user)
1343 src.xmlManager.add_simple_node(xmlj, "sat_path",
1344 job.machine.sat_path)
1345 xml_history = src.xmlManager.add_simple_node(xmlj, "history")
1346 for i, (date, res_job, link) in enumerate(self.history[job.name]):
1348 # tag the first one (the last one)
1349 src.xmlManager.add_simple_node(xml_history,
1352 attrib={"date" : date,
1356 src.xmlManager.add_simple_node(xml_history,
1359 attrib={"date" : date,
1363 def parse_csv_boards(self, today):
1364 """ Parse the csv file that describes the boards to produce and fill
1365 the dict d_input_boards that contain the csv file contain
1367 :param today int: the current day of the week
1369 # open the csv file and read its content
1371 with open(self.file_boards, 'r') as f:
1372 reader = csv.reader(f,delimiter=CSV_DELIMITER)
1375 # get the delimiter for the boards (empty line)
1376 boards_delimiter = [''] * len(l_read[0])
1377 # Make the list of boards, by splitting with the delimiter
1378 l_boards = [list(y) for x, y in itertools.groupby(l_read,
1379 lambda z: z == boards_delimiter) if not x]
1381 # loop over the csv lists of lines and get the rows, columns and jobs
1383 for input_board in l_boards:
1385 board_name = input_board[0][0]
1388 columns = input_board[0][1:]
1393 for line in input_board[1:]:
1396 for i, square in enumerate(line[1:]):
1399 days = square.split(DAYS_SEPARATOR)
1400 days = [int(day) for day in days]
1401 job = (row, columns[i])
1405 jobs_not_today.append(job)
1407 d_boards[board_name] = {"rows" : rows,
1408 "columns" : columns,
1410 "jobs_not_today" : jobs_not_today}
1412 self.d_input_boards = d_boards
1414 def update_xml_files(self, l_jobs):
1415 '''Write all the xml files with updated information about the jobs
1417 :param l_jobs List: the list of jobs that run today
1419 for xml_file in [self.xml_global_file] + list(
1420 self.d_xml_board_files.values()):
1421 self.update_xml_file(l_jobs, xml_file)
1424 self.write_xml_files()
1426 def update_xml_file(self, l_jobs, xml_file):
1427 '''update information about the jobs for the file xml_file
1429 :param l_jobs List: the list of jobs that run today
1430 :param xml_file xmlManager.XmlLogFile: the xml instance to update
1433 xml_node_jobs = xml_file.xmlroot.find('jobs')
1434 # Update the job names and status node
1436 # Find the node corresponding to the job and delete it
1437 # in order to recreate it
1438 for xmljob in xml_node_jobs.findall('job'):
1439 if xmljob.attrib['name'] == job.name:
1440 xml_node_jobs.remove(xmljob)
1444 T0 = time.strftime('%Y-%m-%d %H:%M:%S',
1445 time.localtime(job._T0))
1448 Tf = time.strftime('%Y-%m-%d %H:%M:%S',
1449 time.localtime(job._Tf))
1451 # recreate the job node
1452 xmlj = src.xmlManager.add_simple_node(xml_node_jobs,
1454 attrib={"name" : job.name})
1455 src.xmlManager.add_simple_node(xmlj, "machine", job.machine.name)
1456 src.xmlManager.add_simple_node(xmlj, "host", job.machine.host)
1457 src.xmlManager.add_simple_node(xmlj, "port", str(job.machine.port))
1458 src.xmlManager.add_simple_node(xmlj, "user", job.machine.user)
1459 xml_history = src.xmlManager.add_simple_node(xmlj, "history")
1460 for date, res_job, link in self.history[job.name]:
1461 src.xmlManager.add_simple_node(xml_history,
1464 attrib={"date" : date,
1467 src.xmlManager.add_simple_node(xmlj, "sat_path",
1468 job.machine.sat_path)
1469 src.xmlManager.add_simple_node(xmlj, "application", job.application)
1470 src.xmlManager.add_simple_node(xmlj, "distribution",
1471 job.machine.distribution)
1472 src.xmlManager.add_simple_node(xmlj, "board", job.board)
1473 src.xmlManager.add_simple_node(xmlj, "timeout", str(job.timeout))
1474 src.xmlManager.add_simple_node(xmlj, "commands",
1475 " ; ".join(job.commands))
1476 src.xmlManager.add_simple_node(xmlj, "state", job.get_status())
1477 src.xmlManager.add_simple_node(xmlj, "begin", T0)
1478 src.xmlManager.add_simple_node(xmlj, "end", Tf)
1479 src.xmlManager.add_simple_node(xmlj, "out",
1480 src.printcolors.cleancolor(job.out))
1481 src.xmlManager.add_simple_node(xmlj, "err",
1482 src.printcolors.cleancolor(job.err))
1483 src.xmlManager.add_simple_node(xmlj, "res", str(job.res_job))
1484 if len(job.remote_log_files) > 0:
1485 src.xmlManager.add_simple_node(xmlj,
1486 "remote_log_file_path",
1487 job.remote_log_files[0])
1489 src.xmlManager.add_simple_node(xmlj,
1490 "remote_log_file_path",
1493 xmlafter = src.xmlManager.add_simple_node(xmlj, "after", job.after)
1494 # get the job father
1495 if job.after is not None:
1498 if jb.name == job.after:
1501 if (job_father is not None and
1502 len(job_father.remote_log_files) > 0):
1503 link = job_father.remote_log_files[0]
1506 src.xmlManager.append_node_attrib(xmlafter, {"link" : link})
1508 # Verify that the job is to be done today regarding the input csv
1510 if job.board and job.board in self.d_input_boards.keys():
1512 for dist, appli in self.d_input_boards[job.board]["jobs"]:
1513 if (job.machine.distribution == dist
1514 and job.application == appli):
1516 src.xmlManager.add_simple_node(xmlj,
1521 src.xmlManager.add_simple_node(xmlj,
1527 xml_node_infos = xml_file.xmlroot.find('infos')
1528 src.xmlManager.append_node_attrib(xml_node_infos,
1530 datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")})
1534 def last_update(self, finish_status = "finished"):
1535 '''update information about the jobs for the file xml_file
1537 :param l_jobs List: the list of jobs that run today
1538 :param xml_file xmlManager.XmlLogFile: the xml instance to update
1540 for xml_file in [self.xml_global_file] + list(self.d_xml_board_files.values()):
1541 xml_node_infos = xml_file.xmlroot.find('infos')
1542 src.xmlManager.append_node_attrib(xml_node_infos,
1543 attrib={"JobsCommandStatus" : finish_status})
1545 self.write_xml_files()
1547 def write_xml_file(self, xml_file, stylesheet):
1548 ''' Write one xml file and the same file with prefix
1550 xml_file.write_tree(stylesheet)
1551 file_path = xml_file.logFile
1552 file_dir = os.path.dirname(file_path)
1553 file_name = os.path.basename(file_path)
1554 file_name_with_prefix = self.prefix + "_" + file_name
1555 xml_file.write_tree(stylesheet, os.path.join(file_dir,
1556 file_name_with_prefix))
1558 def write_xml_files(self):
1559 ''' Write the xml files
1561 self.write_xml_file(self.xml_global_file, STYLESHEET_GLOBAL)
1562 for xml_file in self.d_xml_board_files.values():
1563 self.write_xml_file(xml_file, STYLESHEET_BOARD)
1567 # Describes the command
1569 return _("The jobs command launches maintenances that are described"
1570 " in the dedicated jobs configuration file.\n\nexample:\nsat "
1571 "jobs --name my_jobs --publish")
1575 def run(args, runner, logger):
1577 (options, args) = parser.parse_args(args)
1579 l_cfg_dir = runner.cfg.PATHS.JOBPATH
1581 # list option : display all the available config files
1583 for cfg_dir in l_cfg_dir:
1584 if not options.no_label:
1585 logger.write("------ %s\n" %
1586 src.printcolors.printcHeader(cfg_dir))
1587 if not os.path.exists(cfg_dir):
1589 for f in sorted(os.listdir(cfg_dir)):
1590 if not f.endswith('.pyconf'):
1593 logger.write("%s\n" % cfilename)
1596 # Make sure the jobs_config option has been called
1597 if not options.jobs_cfg:
1598 message = _("The option --jobs_config is required\n")
1599 src.printcolors.printcError(message)
1602 # Find the file in the directories, unless it is a full path
1604 if os.path.exists(options.jobs_cfg):
1606 file_jobs_cfg = options.jobs_cfg
1608 for cfg_dir in l_cfg_dir:
1609 file_jobs_cfg = os.path.join(cfg_dir, options.jobs_cfg)
1610 if not file_jobs_cfg.endswith('.pyconf'):
1611 file_jobs_cfg += '.pyconf'
1613 if not os.path.exists(file_jobs_cfg):
1620 msg = _("The file configuration %(name_file)s was not found."
1621 "\nUse the --list option to get the possible files.")
1622 src.printcolors.printcError(msg)
1626 (_("Platform"), runner.cfg.VARS.dist),
1627 (_("File containing the jobs configuration"), file_jobs_cfg)
1629 src.print_info(logger, info)
1631 # Read the config that is in the file
1632 config_jobs = src.read_config_from_a_file(file_jobs_cfg)
1633 if options.only_jobs:
1634 l_jb = src.pyconf.Sequence()
1635 for jb in config_jobs.jobs:
1636 if jb.name in options.only_jobs:
1638 "Adding a job that was given in only_jobs option parameters")
1639 config_jobs.jobs = l_jb
1642 today_jobs = Jobs(runner,
1646 # SSH connection to all machines
1647 today_jobs.ssh_connection_all_machines()
1648 if options.test_connection:
1653 logger.write(src.printcolors.printcInfo(
1654 _("Initialize the xml boards : ")), 5)
1657 # Copy the stylesheets in the log directory
1658 log_dir = runner.cfg.USER.log_dir
1659 xsl_dir = os.path.join(runner.cfg.VARS.srcDir, 'xsl')
1661 files_to_copy.append(os.path.join(xsl_dir, STYLESHEET_GLOBAL))
1662 files_to_copy.append(os.path.join(xsl_dir, STYLESHEET_BOARD))
1663 files_to_copy.append(os.path.join(xsl_dir, "running.gif"))
1664 for file_path in files_to_copy:
1665 shutil.copy2(file_path, log_dir)
1667 # Instanciate the Gui in order to produce the xml files that contain all
1669 gui = Gui(runner.cfg.USER.log_dir,
1671 today_jobs.ljobs_not_today,
1672 runner.cfg.VARS.datehour,
1674 file_boards = options.input_boards)
1676 logger.write(src.printcolors.printcSuccess("OK"), 5)
1677 logger.write("\n\n", 5)
1680 # Display the list of the xml files
1681 logger.write(src.printcolors.printcInfo(("Here is the list of published"
1683 logger.write("%s\n" % gui.xml_global_file.logFile, 4)
1684 for board in gui.d_xml_board_files.keys():
1685 file_path = gui.d_xml_board_files[board].logFile
1686 file_name = os.path.basename(file_path)
1687 logger.write("%s\n" % file_path, 4)
1688 logger.add_link(file_name, "board", 0, board)
1690 logger.write("\n", 4)
1692 today_jobs.gui = gui
1696 # Run all the jobs contained in config_jobs
1697 today_jobs.run_jobs()
1698 except KeyboardInterrupt:
1700 logger.write("\n\n%s\n\n" %
1701 (src.printcolors.printcWarning(_("Forced interruption"))), 1)
1706 msg = _("Killing the running jobs and trying"
1707 " to get the corresponding logs\n")
1708 logger.write(src.printcolors.printcWarning(msg))
1710 # find the potential not finished jobs and kill them
1711 for jb in today_jobs.ljobs:
1712 if not jb.has_finished():
1715 jb.kill_remote_process()
1716 except Exception as e:
1717 msg = _("Failed to kill job %s: %s\n" % (jb.name, e))
1718 logger.write(src.printcolors.printcWarning(msg))
1719 if jb.res_job != "0":
1723 today_jobs.gui.last_update(_("Forced interruption"))
1726 today_jobs.gui.last_update()
1727 # Output the results
1728 today_jobs.write_all_results()