3 # Copyright (C) 2010-2013 CEA/DEN
5 # This library is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation; either
8 # version 2.1 of the License.
10 # This library is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 # Lesser General Public License for more details.
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this library; if not, write to the Free Software
17 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
30 STYLESHEET_GLOBAL = "jobs_global_report.xsl"
31 STYLESHEET_BOARD = "jobs_board_report.xsl"
36 parser = src.options.Options()
38 parser.add_option('n', 'name', 'string', 'jobs_cfg',
39 _('Mandatory: The name of the config file that contains'
40 ' the jobs configuration'))
41 parser.add_option('o', 'only_jobs', 'list2', 'only_jobs',
42 _('Optional: the list of jobs to launch, by their name. '))
43 parser.add_option('l', 'list', 'boolean', 'list',
44 _('Optional: list all available config files.'))
45 parser.add_option('t', 'test_connection', 'boolean', 'test_connection',
46 _("Optional: try to connect to the machines. "
47 "Not executing the jobs."),
49 parser.add_option('p', 'publish', 'boolean', 'publish',
50 _("Optional: generate an xml file that can be read in a "
51 "browser to display the jobs status."),
53 parser.add_option('i', 'input_boards', 'string', 'input_boards', _("Optional: "
54 "the path to csv file that contain "
55 "the expected boards."),"")
56 parser.add_option('', 'completion', 'boolean', 'no_label',
57 _("Optional (internal use): do not print labels, Works only "
61 class Machine(object):
62 '''Class to manage a ssh connection on a machine
70 sat_path="salomeTools"):
74 self.distribution = None # Will be filled after copying SAT on the machine
76 self.password = passwd
77 self.sat_path = sat_path
78 self.ssh = paramiko.SSHClient()
79 self._connection_successful = None
81 def connect(self, logger):
82 '''Initiate the ssh connection to the remote machine
84 :param logger src.logger.Logger: The logger instance
89 self._connection_successful = False
90 self.ssh.load_system_host_keys()
91 self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
93 self.ssh.connect(self.host,
96 password = self.password)
97 except paramiko.AuthenticationException:
98 message = src.KO_STATUS + _("Authentication failed")
99 except paramiko.BadHostKeyException:
100 message = (src.KO_STATUS +
101 _("The server's host key could not be verified"))
102 except paramiko.SSHException:
103 message = ( _("SSHException error connecting or "
104 "establishing an SSH session"))
106 message = ( _("Error connecting or establishing an SSH session"))
108 self._connection_successful = True
112 def successfully_connected(self, logger):
113 '''Verify if the connection to the remote machine has succeed
115 :param logger src.logger.Logger: The logger instance
116 :return: True if the connection has succeed, False if not
119 if self._connection_successful == None:
120 message = _("Warning : trying to ask if the connection to "
121 "(name: %s host: %s, port: %s, user: %s) is OK whereas there were"
122 " no connection request" %
123 (self.name, self.host, self.port, self.user))
124 logger.write( src.printcolors.printcWarning(message))
125 return self._connection_successful
127 def copy_sat(self, sat_local_path, job_file):
128 '''Copy salomeTools to the remote machine in self.sat_path
132 # open a sftp connection
133 self.sftp = self.ssh.open_sftp()
134 # Create the sat directory on remote machine if it is not existing
135 self.mkdir(self.sat_path, ignore_existing=True)
137 self.put_dir(sat_local_path, self.sat_path, filters = ['.git'])
138 # put the job configuration file in order to make it reachable
139 # on the remote machine
140 remote_job_file_name = ".%s" % os.path.basename(job_file)
141 self.sftp.put(job_file, os.path.join(self.sat_path,
142 remote_job_file_name))
143 except Exception as e:
145 self._connection_successful = False
149 def put_dir(self, source, target, filters = []):
150 ''' Uploads the contents of the source directory to the target path. The
151 target directory needs to exists. All sub-directories in source are
152 created under target.
154 for item in os.listdir(source):
157 source_path = os.path.join(source, item)
158 destination_path = os.path.join(target, item)
159 if os.path.islink(source_path):
160 linkto = os.readlink(source_path)
162 self.sftp.symlink(linkto, destination_path)
163 self.sftp.chmod(destination_path,
164 os.stat(source_path).st_mode)
168 if os.path.isfile(source_path):
169 self.sftp.put(source_path, destination_path)
170 self.sftp.chmod(destination_path,
171 os.stat(source_path).st_mode)
173 self.mkdir(destination_path, ignore_existing=True)
174 self.put_dir(source_path, destination_path)
176 def mkdir(self, path, mode=511, ignore_existing=False):
177 ''' Augments mkdir by adding an option to not fail
181 self.sftp.mkdir(path, mode)
188 def exec_command(self, command, logger):
189 '''Execute the command on the remote machine
191 :param command str: The command to be run
192 :param logger src.logger.Logger: The logger instance
193 :return: the stdin, stdout, and stderr of the executing command,
195 :rtype: (paramiko.channel.ChannelFile, paramiko.channel.ChannelFile,
196 paramiko.channel.ChannelFile)
199 # Does not wait the end of the command
200 (stdin, stdout, stderr) = self.ssh.exec_command(command)
201 except paramiko.SSHException:
202 message = src.KO_STATUS + _(
203 ": the server failed to execute the command\n")
204 logger.write( src.printcolors.printcError(message))
205 return (None, None, None)
207 logger.write( src.printcolors.printcError(src.KO_STATUS + '\n'))
208 return (None, None, None)
210 return (stdin, stdout, stderr)
213 '''Close the ssh connection
219 def write_info(self, logger):
220 '''Prints the informations relative to the machine in the logger
221 (terminal traces and log file)
223 :param logger src.logger.Logger: The logger instance
227 logger.write("host : " + self.host + "\n")
228 logger.write("port : " + str(self.port) + "\n")
229 logger.write("user : " + str(self.user) + "\n")
230 if self.successfully_connected(logger):
231 status = src.OK_STATUS
233 status = src.KO_STATUS
234 logger.write("Connection : " + status + "\n\n")
238 '''Class to manage one job
254 self.machine = machine
256 self.timeout = timeout
257 self.application = application
261 # The list of log files to download from the remote machine
262 self.remote_log_files = []
264 # The remote command status
265 # -1 means that it has not been launched,
266 # 0 means success and 1 means fail
268 self.cancelled = False
272 self._has_begun = False
273 self._has_finished = False
274 self._has_timouted = False
275 self._stdin = None # Store the command inputs field
276 self._stdout = None # Store the command outputs field
277 self._stderr = None # Store the command errors field
282 self.name_remote_jobs_pyconf = ".%s" % os.path.basename(job_file_path)
283 self.commands = commands
284 self.command = (os.path.join(self.machine.sat_path, "sat") +
286 os.path.join(self.machine.sat_path,
287 "list_log_files.txt") +
288 " job --jobs_config " +
289 os.path.join(self.machine.sat_path,
290 self.name_remote_jobs_pyconf) +
294 self.command = prefix + ' "' + self.command +'"'
297 """ Get the pid(s) corresponding to the command that have been launched
298 On the remote machine
300 :return: The list of integers corresponding to the found pids
304 cmd_pid = 'ps aux | grep "' + self.command + '" | awk \'{print $2}\''
305 (_, out_pid, _) = self.machine.exec_command(cmd_pid, self.logger)
306 pids_cmd = out_pid.readlines()
307 pids_cmd = [str(src.only_numbers(pid)) for pid in pids_cmd]
311 def kill_remote_process(self, wait=1):
312 '''Kills the process on the remote machine.
314 :return: (the output of the kill, the error of the kill)
318 pids = self.get_pids()
319 cmd_kill = " ; ".join([("kill -2 " + pid) for pid in pids])
320 (_, out_kill, err_kill) = self.machine.exec_command(cmd_kill,
323 return (out_kill, err_kill)
326 '''Returns True if the job has already begun
328 :return: True if the job has already begun
331 return self._has_begun
333 def has_finished(self):
334 '''Returns True if the job has already finished
335 (i.e. all the commands have been executed)
336 If it is finished, the outputs are stored in the fields out and err.
338 :return: True if the job has already finished
342 # If the method has already been called and returned True
343 if self._has_finished:
346 # If the job has not begun yet
347 if not self.has_begun():
350 if self._stdout.channel.closed:
351 self._has_finished = True
352 # Store the result outputs
353 self.out += self._stdout.read().decode()
354 self.err += self._stderr.read().decode()
356 self._Tf = time.time()
357 # And get the remote command status and log files
360 return self._has_finished
362 def get_log_files(self):
363 """Get the log files produced by the command launched
364 on the remote machine, and put it in the log directory of the user,
365 so they can be accessible from
367 # Do not get the files if the command is not finished
368 if not self.has_finished():
369 msg = _("Trying to get log files whereas the job is not finished.")
370 self.logger.write(src.printcolors.printcWarning(msg))
373 # First get the file that contains the list of log files to get
374 tmp_file_path = src.get_tmp_filename(self.config, "list_log_files.txt")
375 remote_path = os.path.join(self.machine.sat_path, "list_log_files.txt")
376 self.machine.sftp.get(
380 # Read the file and get the result of the command and all the log files
382 fstream_tmp = open(tmp_file_path, "r")
383 file_lines = fstream_tmp.readlines()
384 file_lines = [line.replace("\n", "") for line in file_lines]
386 os.remove(tmp_file_path)
389 # The first line is the result of the command (0 success or 1 fail)
390 self.res_job = file_lines[0]
391 except Exception as e:
392 self.err += _("Unable to get status from remote file %s: %s" %
393 (remote_path, str(e)))
395 for i, job_path_remote in enumerate(file_lines[1:]):
397 # For each command, there is two files to get :
398 # 1- The xml file describing the command and giving the
400 # 2- The txt file containing the system command traces (like
401 # traces produced by the "make" command)
402 # 3- In case of the test command, there is another file to get :
403 # the xml board that contain the test results
404 dirname = os.path.basename(os.path.dirname(job_path_remote))
405 if dirname != 'OUT' and dirname != 'TEST':
407 local_path = os.path.join(os.path.dirname(
408 self.logger.logFilePath),
409 os.path.basename(job_path_remote))
410 if i==0: # The first is the job command
411 self.logger.add_link(os.path.basename(job_path_remote),
415 elif dirname == 'OUT':
417 local_path = os.path.join(os.path.dirname(
418 self.logger.logFilePath),
420 os.path.basename(job_path_remote))
421 elif dirname == 'TEST':
423 local_path = os.path.join(os.path.dirname(
424 self.logger.logFilePath),
426 os.path.basename(job_path_remote))
429 if not os.path.exists(local_path):
430 self.machine.sftp.get(job_path_remote, local_path)
431 self.remote_log_files.append(local_path)
432 except Exception as e:
433 self.err += _("Unable to get %s log file from remote: %s" %
434 (str(job_path_remote),
437 def has_failed(self):
438 '''Returns True if the job has failed.
439 A job is considered as failed if the machine could not be reached,
440 if the remote command failed,
441 or if the job finished with a time out.
443 :return: True if the job has failed
446 if not self.has_finished():
448 if not self.machine.successfully_connected(self.logger):
450 if self.is_timeout():
452 if self.res_job == "1":
457 """In case of a failing job, one has to cancel every job that depend
458 on it. This method put the job as failed and will not be executed.
462 self._has_begun = True
463 self._has_finished = True
464 self.cancelled = True
465 self.out += _("This job was not launched because its father has failed.")
466 self.err += _("This job was not launched because its father has failed.")
468 def is_running(self):
469 '''Returns True if the job commands are running
471 :return: True if the job is running
474 return self.has_begun() and not self.has_finished()
476 def is_timeout(self):
477 '''Returns True if the job commands has finished with timeout
479 :return: True if the job has finished with timeout
482 return self._has_timouted
484 def time_elapsed(self):
485 """Get the time elapsed since the job launching
487 :return: The number of seconds
490 if not self.has_begun():
493 return T_now - self._T0
495 def check_time(self):
496 """Verify that the job has not exceeded its timeout.
497 If it has, kill the remote command and consider the job as finished.
499 if not self.has_begun():
501 if self.time_elapsed() > self.timeout:
502 self._has_finished = True
503 self._has_timouted = True
504 self._Tf = time.time()
506 (out_kill, _) = self.kill_remote_process()
507 self.out += "TIMEOUT \n" + out_kill.read().decode()
508 self.err += "TIMEOUT : %s seconds elapsed\n" % str(self.timeout)
511 except Exception as e:
512 self.err += _("Unable to get remote log files: %s" % e)
514 def total_duration(self):
515 """Give the total duration of the job
517 :return: the total duration of the job in seconds
520 return self._Tf - self._T0
523 """Launch the job by executing the remote command.
526 # Prevent multiple run
528 msg = _("Warning: A job can only be launched one time")
529 msg2 = _("Trying to launch the job \"%s\" whereas it has "
530 "already been launched." % self.name)
531 self.logger.write(src.printcolors.printcWarning("%s\n%s\n" % (msg,
535 # Do not execute the command if the machine could not be reached
536 if not self.machine.successfully_connected(self.logger):
537 self._has_finished = True
539 self.err += ("Connection to machine (name : %s, host: %s, port:"
540 " %s, user: %s) has failed\nUse the log command "
541 "to get more information."
542 % (self.machine.name,
547 # Usual case : Launch the command on remote machine
548 self._T0 = time.time()
549 self._stdin, self._stdout, self._stderr = self.machine.exec_command(
552 # If the results are not initialized, finish the job
553 if (self._stdin, self._stdout, self._stderr) == (None, None, None):
554 self._has_finished = True
555 self._Tf = time.time()
557 self.err += "The server failed to execute the command"
559 # Put the beginning flag to true.
560 self._has_begun = True
562 def write_results(self):
563 """Display on the terminal all the job's information
565 self.logger.write("name : " + self.name + "\n")
567 self.logger.write("after : %s\n" % self.after)
568 self.logger.write("Time elapsed : %4imin %2is \n" %
569 (self.total_duration()//60 , self.total_duration()%60))
571 self.logger.write("Begin time : %s\n" %
572 time.strftime('%Y-%m-%d %H:%M:%S',
573 time.localtime(self._T0)) )
575 self.logger.write("End time : %s\n\n" %
576 time.strftime('%Y-%m-%d %H:%M:%S',
577 time.localtime(self._Tf)) )
579 machine_head = "Informations about connection :\n"
580 underline = (len(machine_head) - 2) * "-"
581 self.logger.write(src.printcolors.printcInfo(
582 machine_head+underline+"\n"))
583 self.machine.write_info(self.logger)
585 self.logger.write(src.printcolors.printcInfo("out : \n"))
587 self.logger.write("Unable to get output\n")
589 self.logger.write(self.out + "\n")
590 self.logger.write(src.printcolors.printcInfo("err : \n"))
591 self.logger.write(self.err + "\n")
593 def get_status(self):
594 """Get the status of the job (used by the Gui for xml display)
596 :return: The current status of the job
599 if not self.machine.successfully_connected(self.logger):
600 return "SSH connection KO"
601 if not self.has_begun():
602 return "Not launched"
605 if self.is_running():
606 return "running since " + time.strftime('%Y-%m-%d %H:%M:%S',
607 time.localtime(self._T0))
608 if self.has_finished():
609 if self.is_timeout():
610 return "Timeout since " + time.strftime('%Y-%m-%d %H:%M:%S',
611 time.localtime(self._Tf))
612 return "Finished since " + time.strftime('%Y-%m-%d %H:%M:%S',
613 time.localtime(self._Tf))
616 '''Class to manage the jobs to be run
623 lenght_columns = 20):
624 # The jobs configuration
625 self.cfg_jobs = config_jobs
626 self.job_file_path = job_file_path
627 # The machine that will be used today
629 # The list of machine (hosts, port) that will be used today
630 # (a same host can have several machine instances since there
631 # can be several ssh parameters)
633 # The jobs to be launched today
635 # The jobs that will not be launched today
636 self.ljobs_not_today = []
639 self.len_columns = lenght_columns
641 # the list of jobs that have not been run yet
642 self._l_jobs_not_started = []
643 # the list of jobs that have already ran
644 self._l_jobs_finished = []
645 # the list of jobs that are running
646 self._l_jobs_running = []
648 self.determine_jobs_and_machines()
650 def define_job(self, job_def, machine):
651 '''Takes a pyconf job definition and a machine (from class machine)
652 and returns the job instance corresponding to the definition.
654 :param job_def src.config.Mapping: a job definition
655 :param machine machine: the machine on which the job will run
656 :return: The corresponding job in a job class instance
660 cmmnds = job_def.commands
661 if not "timeout" in job_def:
662 timeout = 4*60*60 # default timeout = 4h
664 timeout = job_def.timeout
666 if 'after' in job_def:
667 after = job_def.after
669 if 'application' in job_def:
670 application = job_def.application
672 if 'board' in job_def:
673 board = job_def.board
675 if "prefix" in job_def:
676 prefix = job_def.prefix
690 def determine_jobs_and_machines(self):
691 '''Function that reads the pyconf jobs definition and instantiates all
692 the machines and jobs to be done today.
697 today = datetime.date.weekday(datetime.date.today())
700 for job_def in self.cfg_jobs.jobs :
702 if not "machine" in job_def:
703 msg = _('WARNING: The job "%s" do not have the key '
704 '"machine", this job is ignored.\n\n' % job_def.name)
705 self.logger.write(src.printcolors.printcWarning(msg))
707 name_machine = job_def.machine
710 for mach in self.lmachines:
711 if mach.name == name_machine:
715 if a_machine == None:
716 for machine_def in self.cfg_jobs.machines:
717 if machine_def.name == name_machine:
718 if 'host' not in machine_def:
719 host = self.runner.cfg.VARS.hostname
721 host = machine_def.host
723 if 'user' not in machine_def:
724 user = self.runner.cfg.VARS.user
726 user = machine_def.user
728 if 'port' not in machine_def:
731 port = machine_def.port
733 if 'password' not in machine_def:
736 passwd = machine_def.password
738 if 'sat_path' not in machine_def:
739 sat_path = "salomeTools"
741 sat_path = machine_def.sat_path
752 self.lmachines.append(a_machine)
753 if (host, port) not in host_list:
754 host_list.append((host, port))
756 if a_machine == None:
757 msg = _("WARNING: The job \"%(job_name)s\" requires the "
758 "machine \"%(machine_name)s\" but this machine "
759 "is not defined in the configuration file.\n"
760 "The job will not be launched")
761 self.logger.write(src.printcolors.printcWarning(msg))
763 a_job = self.define_job(job_def, a_machine)
765 if today in job_def.when:
766 self.ljobs.append(a_job)
767 else: # today in job_def.when
768 self.ljobs_not_today.append(a_job)
770 self.lhosts = host_list
772 def ssh_connection_all_machines(self, pad=50):
773 '''Function that do the ssh connection to every machine
779 self.logger.write(src.printcolors.printcInfo((
780 "Establishing connection with all the machines :\n")))
781 for machine in self.lmachines:
782 # little algorithm in order to display traces
783 begin_line = (_("Connection to %s: " % machine.name))
784 if pad - len(begin_line) < 0:
787 endline = (pad - len(begin_line)) * "." + " "
789 step = "SSH connection"
790 self.logger.write( begin_line + endline + step)
792 # the call to the method that initiate the ssh connection
793 msg = machine.connect(self.logger)
795 # Copy salomeTools to the remote machine
796 if machine.successfully_connected(self.logger):
797 step = _("Remove SAT")
798 self.logger.write('\r%s%s%s' % (begin_line, endline, 20 * " "),3)
799 self.logger.write('\r%s%s%s' % (begin_line, endline, step), 3)
800 (__, out_dist, __) = machine.exec_command(
801 "rm -rf %s" % machine.sat_path,
807 self.logger.write('\r%s%s%s' % (begin_line, endline, 20 * " "),3)
808 self.logger.write('\r%s%s%s' % (begin_line, endline, step), 3)
810 res_copy = machine.copy_sat(self.runner.cfg.VARS.salometoolsway,
812 # get the remote machine distribution using a sat command
813 (__, out_dist, __) = machine.exec_command(
814 os.path.join(machine.sat_path,
815 "sat config --value VARS.dist --no_label"),
817 machine.distribution = out_dist.read().decode().replace("\n",
819 # Print the status of the copy
821 self.logger.write('\r%s' %
822 ((len(begin_line)+len(endline)+20) * " "), 3)
823 self.logger.write('\r%s%s%s' %
826 src.printcolors.printc(src.OK_STATUS)), 3)
828 self.logger.write('\r%s' %
829 ((len(begin_line)+len(endline)+20) * " "), 3)
830 self.logger.write('\r%s%s%s %s' %
833 src.printcolors.printc(src.KO_STATUS),
834 _("Copy of SAT failed: %s" % res_copy)), 3)
836 self.logger.write('\r%s' %
837 ((len(begin_line)+len(endline)+20) * " "), 3)
838 self.logger.write('\r%s%s%s %s' %
841 src.printcolors.printc(src.KO_STATUS),
843 self.logger.write("\n", 3)
845 self.logger.write("\n")
848 def is_occupied(self, hostname):
849 '''Function that returns True if a job is running on
850 the machine defined by its host and its port.
852 :param hostname (str, int): the pair (host, port)
853 :return: the job that is running on the host,
854 or false if there is no job running on the host.
859 for jb in self.ljobs:
860 if jb.machine.host == host and jb.machine.port == port:
865 def update_jobs_states_list(self):
866 '''Function that updates the lists that store the currently
867 running jobs and the jobs that have already finished.
872 jobs_finished_list = []
873 jobs_running_list = []
874 for jb in self.ljobs:
876 jobs_running_list.append(jb)
878 if jb.has_finished():
879 jobs_finished_list.append(jb)
881 nb_job_finished_before = len(self._l_jobs_finished)
882 self._l_jobs_finished = jobs_finished_list
883 self._l_jobs_running = jobs_running_list
885 nb_job_finished_now = len(self._l_jobs_finished)
887 return nb_job_finished_now > nb_job_finished_before
889 def cancel_dependencies_of_failing_jobs(self):
890 '''Function that cancels all the jobs that depend on a failing one.
896 for job in self.ljobs:
897 if job.after is None:
899 father_job = self.find_job_that_has_name(job.after)
900 if father_job is not None and father_job.has_failed():
903 def find_job_that_has_name(self, name):
904 '''Returns the job by its name.
906 :param name str: a job name
907 :return: the job that has the name.
910 for jb in self.ljobs:
913 # the following is executed only if the job was not found
916 def str_of_length(self, text, length):
917 '''Takes a string text of any length and returns
918 the most close string of length "length".
920 :param text str: any string
921 :param length int: a length for the returned string
922 :return: the most close string of length "length"
925 if len(text) > length:
926 text_out = text[:length-3] + '...'
928 diff = length - len(text)
929 before = " " * (diff//2)
930 after = " " * (diff//2 + diff%2)
931 text_out = before + text + after
935 def display_status(self, len_col):
936 '''Takes a lenght and construct the display of the current status
937 of the jobs in an array that has a column for each host.
938 It displays the job that is currently running on the host
941 :param len_col int: the size of the column
947 for host_port in self.lhosts:
948 jb = self.is_occupied(host_port)
949 if not jb: # nothing running on the host
950 empty = self.str_of_length("empty", len_col)
951 display_line += "|" + empty
953 display_line += "|" + src.printcolors.printcInfo(
954 self.str_of_length(jb.name, len_col))
956 self.logger.write("\r" + display_line + "|")
961 '''The main method. Runs all the jobs on every host.
962 For each host, at a given time, only one job can be running.
963 The jobs that have the field after (that contain the job that has
964 to be run before it) are run after the previous job.
965 This method stops when all the jobs are finished.
972 self.logger.write(src.printcolors.printcInfo(
973 _('Executing the jobs :\n')))
975 for host_port in self.lhosts:
978 if port == 22: # default value
979 text_line += "|" + self.str_of_length(host, self.len_columns)
981 text_line += "|" + self.str_of_length(
982 "("+host+", "+str(port)+")", self.len_columns)
984 tiret_line = " " + "-"*(len(text_line)-1) + "\n"
985 self.logger.write(tiret_line)
986 self.logger.write(text_line + "|\n")
987 self.logger.write(tiret_line)
990 # The infinite loop that runs the jobs
991 l_jobs_not_started = src.deepcopy_list(self.ljobs)
992 while len(self._l_jobs_finished) != len(self.ljobs):
993 new_job_start = False
994 for host_port in self.lhosts:
996 if self.is_occupied(host_port):
999 for jb in l_jobs_not_started:
1000 if (jb.machine.host, jb.machine.port) != host_port:
1002 if jb.after == None:
1004 l_jobs_not_started.remove(jb)
1005 new_job_start = True
1008 jb_before = self.find_job_that_has_name(jb.after)
1009 if jb_before is None:
1011 msg = _("This job was not launched because its "
1012 "father is not in the jobs list.")
1016 if jb_before.has_finished():
1018 l_jobs_not_started.remove(jb)
1019 new_job_start = True
1021 self.cancel_dependencies_of_failing_jobs()
1022 new_job_finished = self.update_jobs_states_list()
1024 if new_job_start or new_job_finished:
1026 self.gui.update_xml_files(self.ljobs)
1027 # Display the current status
1028 self.display_status(self.len_columns)
1030 # Make sure that the proc is not entirely busy
1033 self.logger.write("\n")
1034 self.logger.write(tiret_line)
1035 self.logger.write("\n\n")
1038 self.gui.update_xml_files(self.ljobs)
1039 self.gui.last_update()
1041 def write_all_results(self):
1042 '''Display all the jobs outputs.
1048 for jb in self.ljobs:
1049 self.logger.write(src.printcolors.printcLabel(
1050 "#------- Results for job %s -------#\n" % jb.name))
1052 self.logger.write("\n\n")
1055 '''Class to manage the the xml data that can be displayed in a browser to
1068 :param xml_dir_path str: The path to the directory where to put
1069 the xml resulting files
1070 :param l_jobs List: the list of jobs that run today
1071 :param l_jobs_not_today List: the list of jobs that do not run today
1072 :param file_boards str: the file path from which to read the
1075 # The logging instance
1076 self.logger = logger
1078 # The prefix to add to the xml files : date_hour
1079 self.prefix = prefix
1081 # The path of the csv files to read to fill the expected boards
1082 self.file_boards = file_boards
1084 if file_boards != "":
1085 today = datetime.date.weekday(datetime.date.today())
1086 self.parse_csv_boards(today)
1088 self.d_input_boards = {}
1090 # The path of the global xml file
1091 self.xml_dir_path = xml_dir_path
1092 # Initialize the xml files
1093 self.global_name = "global_report"
1094 xml_global_path = os.path.join(self.xml_dir_path,
1095 self.global_name + ".xml")
1096 self.xml_global_file = src.xmlManager.XmlLogFile(xml_global_path,
1099 # Find history for each job
1101 self.find_history(l_jobs, l_jobs_not_today)
1103 # The xml files that corresponds to the boards.
1104 # {name_board : xml_object}}
1105 self.d_xml_board_files = {}
1107 # Create the lines and columns
1108 self.initialize_boards(l_jobs, l_jobs_not_today)
1110 # Write the xml file
1111 self.update_xml_files(l_jobs)
1113 def add_xml_board(self, name):
1114 '''Add a board to the board list
1115 :param name str: the board name
1117 xml_board_path = os.path.join(self.xml_dir_path, name + ".xml")
1118 self.d_xml_board_files[name] = src.xmlManager.XmlLogFile(
1121 self.d_xml_board_files[name].add_simple_node("distributions")
1122 self.d_xml_board_files[name].add_simple_node("applications")
1123 self.d_xml_board_files[name].add_simple_node("board", text=name)
1125 def initialize_boards(self, l_jobs, l_jobs_not_today):
1126 '''Get all the first information needed for each file and write the
1127 first version of the files
1128 :param l_jobs List: the list of jobs that run today
1129 :param l_jobs_not_today List: the list of jobs that do not run today
1131 # Get the boards to fill and put it in a dictionary
1132 # {board_name : xml instance corresponding to the board}
1133 for job in l_jobs + l_jobs_not_today:
1135 if (board is not None and
1136 board not in self.d_xml_board_files.keys()):
1137 self.add_xml_board(board)
1139 # Verify that the boards given as input are done
1140 for board in list(self.d_input_boards.keys()):
1141 if board not in self.d_xml_board_files:
1142 self.add_xml_board(board)
1143 root_node = self.d_xml_board_files[board].xmlroot
1144 src.xmlManager.append_node_attrib(root_node,
1145 {"input_file" : self.file_boards})
1147 # Loop over all jobs in order to get the lines and columns for each
1151 for board in self.d_xml_board_files:
1153 d_application[board] = []
1157 for job in l_jobs + l_jobs_not_today:
1159 if (job.machine.host, job.machine.port) not in l_hosts_ports:
1160 l_hosts_ports.append((job.machine.host, job.machine.port))
1162 distrib = job.machine.distribution
1163 application = job.application
1165 board_job = job.board
1168 for board in self.d_xml_board_files:
1169 if board_job == board:
1170 if distrib is not None and distrib not in d_dist[board]:
1171 d_dist[board].append(distrib)
1172 src.xmlManager.add_simple_node(
1173 self.d_xml_board_files[board].xmlroot.find(
1176 attrib={"name" : distrib})
1178 if board_job == board:
1179 if (application is not None and
1180 application not in d_application[board]):
1181 d_application[board].append(application)
1182 src.xmlManager.add_simple_node(
1183 self.d_xml_board_files[board].xmlroot.find(
1187 "name" : application})
1189 # Verify that there are no missing application or distribution in the
1190 # xml board files (regarding the input boards)
1191 for board in self.d_xml_board_files:
1192 l_dist = d_dist[board]
1193 if board not in self.d_input_boards.keys():
1195 for dist in self.d_input_boards[board]["rows"]:
1196 if dist not in l_dist:
1197 src.xmlManager.add_simple_node(
1198 self.d_xml_board_files[board].xmlroot.find(
1201 attrib={"name" : dist})
1202 l_appli = d_application[board]
1203 for appli in self.d_input_boards[board]["columns"]:
1204 if appli not in l_appli:
1205 src.xmlManager.add_simple_node(
1206 self.d_xml_board_files[board].xmlroot.find(
1209 attrib={"name" : appli})
1211 # Initialize the hosts_ports node for the global file
1212 self.xmlhosts_ports = self.xml_global_file.add_simple_node(
1214 for host, port in l_hosts_ports:
1215 host_port = "%s:%i" % (host, port)
1216 src.xmlManager.add_simple_node(self.xmlhosts_ports,
1218 attrib={"name" : host_port})
1220 # Initialize the jobs node in all files
1221 for xml_file in [self.xml_global_file] + list(
1222 self.d_xml_board_files.values()):
1223 xml_jobs = xml_file.add_simple_node("jobs")
1224 # Get the jobs present in the config file but
1225 # that will not be launched today
1226 self.put_jobs_not_today(l_jobs_not_today, xml_jobs)
1228 # add also the infos node
1229 xml_file.add_simple_node("infos",
1230 attrib={"name" : "last update",
1231 "JobsCommandStatus" : "running"})
1233 # and put the history node
1234 history_node = xml_file.add_simple_node("history")
1235 name_board = os.path.basename(xml_file.logFile)[:-len(".xml")]
1236 # serach for board files
1237 expression = "^[0-9]{8}_+[0-9]{6}_" + name_board + ".xml$"
1238 oExpr = re.compile(expression)
1239 # Get the list of xml borad files that are in the log directory
1240 for file_name in os.listdir(self.xml_dir_path):
1241 if oExpr.search(file_name):
1242 date = os.path.basename(file_name).split("_")[0]
1243 file_path = os.path.join(self.xml_dir_path, file_name)
1244 src.xmlManager.add_simple_node(history_node,
1247 attrib={"date" : date})
1250 # Find in each board the squares that needs to be filled regarding the
1251 # input csv files but that are not covered by a today job
1252 for board in self.d_input_boards.keys():
1253 xml_root_board = self.d_xml_board_files[board].xmlroot
1254 # Find the missing jobs for today
1255 xml_missing = src.xmlManager.add_simple_node(xml_root_board,
1257 for row, column in self.d_input_boards[board]["jobs"]:
1260 if (job.application == column and
1261 job.machine.distribution == row):
1265 src.xmlManager.add_simple_node(xml_missing,
1267 attrib={"distribution" : row,
1268 "application" : column })
1269 # Find the missing jobs not today
1270 xml_missing_not_today = src.xmlManager.add_simple_node(
1272 "missing_jobs_not_today")
1273 for row, column in self.d_input_boards[board]["jobs_not_today"]:
1275 for job in l_jobs_not_today:
1276 if (job.application == column and
1277 job.machine.distribution == row):
1281 src.xmlManager.add_simple_node(xml_missing_not_today,
1283 attrib={"distribution" : row,
1284 "application" : column })
1286 def find_history(self, l_jobs, l_jobs_not_today):
1287 """find, for each job, in the existent xml boards the results for the
1288 job. Store the results in the dictionnary self.history = {name_job :
1289 list of (date, status, list links)}
1291 :param l_jobs List: the list of jobs to run today
1292 :param l_jobs_not_today List: the list of jobs that do not run today
1294 # load the all the history
1295 expression = "^[0-9]{8}_+[0-9]{6}_" + self.global_name + ".xml$"
1296 oExpr = re.compile(expression)
1297 # Get the list of global xml that are in the log directory
1299 for file_name in os.listdir(self.xml_dir_path):
1300 if oExpr.search(file_name):
1301 file_path = os.path.join(self.xml_dir_path, file_name)
1303 global_xml = src.xmlManager.ReadXmlFile(file_path)
1304 l_globalxml.append(global_xml)
1305 except Exception as e:
1306 msg = _("\nWARNING: the file %s can not be read, it will be "
1307 "ignored\n%s" % (file_path, e))
1308 self.logger.write("%s\n" % src.printcolors.printcWarning(
1312 # Construct the dictionnary self.history
1313 for job in l_jobs + l_jobs_not_today:
1315 for global_xml in l_globalxml:
1316 date = os.path.basename(global_xml.filePath).split("_")[0]
1317 global_root_node = global_xml.xmlroot.find("jobs")
1318 job_node = src.xmlManager.find_node_by_attrib(
1324 if job_node.find("remote_log_file_path") is not None:
1325 link = job_node.find("remote_log_file_path").text
1326 res_job = job_node.find("res").text
1327 if link != "nothing":
1328 l_links.append((date, res_job, link))
1329 l_links = sorted(l_links, reverse=True)
1330 self.history[job.name] = l_links
1332 def put_jobs_not_today(self, l_jobs_not_today, xml_node_jobs):
1333 '''Get all the first information needed for each file and write the
1334 first version of the files
1336 :param xml_node_jobs etree.Element: the node corresponding to a job
1337 :param l_jobs_not_today List: the list of jobs that do not run today
1339 for job in l_jobs_not_today:
1340 xmlj = src.xmlManager.add_simple_node(xml_node_jobs,
1342 attrib={"name" : job.name})
1343 src.xmlManager.add_simple_node(xmlj, "application", job.application)
1344 src.xmlManager.add_simple_node(xmlj,
1346 job.machine.distribution)
1347 src.xmlManager.add_simple_node(xmlj, "board", job.board)
1348 src.xmlManager.add_simple_node(xmlj,
1349 "commands", " ; ".join(job.commands))
1350 src.xmlManager.add_simple_node(xmlj, "state", "Not today")
1351 src.xmlManager.add_simple_node(xmlj, "machine", job.machine.name)
1352 src.xmlManager.add_simple_node(xmlj, "host", job.machine.host)
1353 src.xmlManager.add_simple_node(xmlj, "port", str(job.machine.port))
1354 src.xmlManager.add_simple_node(xmlj, "user", job.machine.user)
1355 src.xmlManager.add_simple_node(xmlj, "sat_path",
1356 job.machine.sat_path)
1357 xml_history = src.xmlManager.add_simple_node(xmlj, "history")
1358 for i, (date, res_job, link) in enumerate(self.history[job.name]):
1360 # tag the first one (the last one)
1361 src.xmlManager.add_simple_node(xml_history,
1364 attrib={"date" : date,
1368 src.xmlManager.add_simple_node(xml_history,
1371 attrib={"date" : date,
1375 def parse_csv_boards(self, today):
1376 """ Parse the csv file that describes the boards to produce and fill
1377 the dict d_input_boards that contain the csv file contain
1379 :param today int: the current day of the week
1381 # open the csv file and read its content
1383 with open(self.file_boards, 'r') as f:
1384 reader = csv.reader(f,delimiter=CSV_DELIMITER)
1387 # get the delimiter for the boards (empty line)
1388 boards_delimiter = [''] * len(l_read[0])
1389 # Make the list of boards, by splitting with the delimiter
1390 l_boards = [list(y) for x, y in itertools.groupby(l_read,
1391 lambda z: z == boards_delimiter) if not x]
1393 # loop over the csv lists of lines and get the rows, columns and jobs
1395 for input_board in l_boards:
1397 board_name = input_board[0][0]
1400 columns = input_board[0][1:]
1405 for line in input_board[1:]:
1408 for i, square in enumerate(line[1:]):
1411 days = square.split(DAYS_SEPARATOR)
1412 days = [int(day) for day in days]
1413 job = (row, columns[i])
1417 jobs_not_today.append(job)
1419 d_boards[board_name] = {"rows" : rows,
1420 "columns" : columns,
1422 "jobs_not_today" : jobs_not_today}
1424 self.d_input_boards = d_boards
1426 def update_xml_files(self, l_jobs):
1427 '''Write all the xml files with updated information about the jobs
1429 :param l_jobs List: the list of jobs that run today
1431 for xml_file in [self.xml_global_file] + list(
1432 self.d_xml_board_files.values()):
1433 self.update_xml_file(l_jobs, xml_file)
1436 self.write_xml_files()
1438 def update_xml_file(self, l_jobs, xml_file):
1439 '''update information about the jobs for the file xml_file
1441 :param l_jobs List: the list of jobs that run today
1442 :param xml_file xmlManager.XmlLogFile: the xml instance to update
1445 xml_node_jobs = xml_file.xmlroot.find('jobs')
1446 # Update the job names and status node
1448 # Find the node corresponding to the job and delete it
1449 # in order to recreate it
1450 for xmljob in xml_node_jobs.findall('job'):
1451 if xmljob.attrib['name'] == job.name:
1452 xml_node_jobs.remove(xmljob)
1456 T0 = time.strftime('%Y-%m-%d %H:%M:%S',
1457 time.localtime(job._T0))
1460 Tf = time.strftime('%Y-%m-%d %H:%M:%S',
1461 time.localtime(job._Tf))
1463 # recreate the job node
1464 xmlj = src.xmlManager.add_simple_node(xml_node_jobs,
1466 attrib={"name" : job.name})
1467 src.xmlManager.add_simple_node(xmlj, "machine", job.machine.name)
1468 src.xmlManager.add_simple_node(xmlj, "host", job.machine.host)
1469 src.xmlManager.add_simple_node(xmlj, "port", str(job.machine.port))
1470 src.xmlManager.add_simple_node(xmlj, "user", job.machine.user)
1471 xml_history = src.xmlManager.add_simple_node(xmlj, "history")
1472 for date, res_job, link in self.history[job.name]:
1473 src.xmlManager.add_simple_node(xml_history,
1476 attrib={"date" : date,
1479 src.xmlManager.add_simple_node(xmlj, "sat_path",
1480 job.machine.sat_path)
1481 src.xmlManager.add_simple_node(xmlj, "application", job.application)
1482 src.xmlManager.add_simple_node(xmlj, "distribution",
1483 job.machine.distribution)
1484 src.xmlManager.add_simple_node(xmlj, "board", job.board)
1485 src.xmlManager.add_simple_node(xmlj, "timeout", str(job.timeout))
1486 src.xmlManager.add_simple_node(xmlj, "commands",
1487 " ; ".join(job.commands))
1488 src.xmlManager.add_simple_node(xmlj, "state", job.get_status())
1489 src.xmlManager.add_simple_node(xmlj, "begin", T0)
1490 src.xmlManager.add_simple_node(xmlj, "end", Tf)
1491 src.xmlManager.add_simple_node(xmlj, "out",
1492 src.printcolors.cleancolor(job.out))
1493 src.xmlManager.add_simple_node(xmlj, "err",
1494 src.printcolors.cleancolor(job.err))
1495 src.xmlManager.add_simple_node(xmlj, "res", str(job.res_job))
1496 if len(job.remote_log_files) > 0:
1497 src.xmlManager.add_simple_node(xmlj,
1498 "remote_log_file_path",
1499 job.remote_log_files[0])
1501 src.xmlManager.add_simple_node(xmlj,
1502 "remote_log_file_path",
1504 # Search for the test log if there is any
1505 l_test_log_files = self.find_test_log(job.remote_log_files)
1506 xml_test = src.xmlManager.add_simple_node(xmlj,
1507 "test_log_file_path")
1508 for test_log_path in l_test_log_files:
1509 src.xmlManager.add_simple_node(xml_test, "path", test_log_path)
1511 xmlafter = src.xmlManager.add_simple_node(xmlj, "after", job.after)
1512 # get the job father
1513 if job.after is not None:
1516 if jb.name == job.after:
1519 if (job_father is not None and
1520 len(job_father.remote_log_files) > 0):
1521 link = job_father.remote_log_files[0]
1524 src.xmlManager.append_node_attrib(xmlafter, {"link" : link})
1526 # Verify that the job is to be done today regarding the input csv
1528 if job.board and job.board in self.d_input_boards.keys():
1530 for dist, appli in self.d_input_boards[job.board]["jobs"]:
1531 if (job.machine.distribution == dist
1532 and job.application == appli):
1534 src.xmlManager.add_simple_node(xmlj,
1539 src.xmlManager.add_simple_node(xmlj,
1545 xml_node_infos = xml_file.xmlroot.find('infos')
1546 src.xmlManager.append_node_attrib(xml_node_infos,
1548 datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")})
1551 def find_test_log(self, l_remote_log_files):
1552 '''Find if there is a test log (board) in the remote log files and
1553 the path to it. There can be several test command, so the result is
1556 :param l_remote_log_files List: the list of all remote log files
1557 :return: the list of test log files path
1561 for file_path in l_remote_log_files:
1562 dirname = os.path.basename(os.path.dirname(file_path))
1563 if dirname == "TEST":
1564 res.append(file_path)
1567 def last_update(self, finish_status = "finished"):
1568 '''update information about the jobs for the file xml_file
1570 :param l_jobs List: the list of jobs that run today
1571 :param xml_file xmlManager.XmlLogFile: the xml instance to update
1573 for xml_file in [self.xml_global_file] + list(self.d_xml_board_files.values()):
1574 xml_node_infos = xml_file.xmlroot.find('infos')
1575 src.xmlManager.append_node_attrib(xml_node_infos,
1576 attrib={"JobsCommandStatus" : finish_status})
1578 self.write_xml_files()
1580 def write_xml_file(self, xml_file, stylesheet):
1581 ''' Write one xml file and the same file with prefix
1583 xml_file.write_tree(stylesheet)
1584 file_path = xml_file.logFile
1585 file_dir = os.path.dirname(file_path)
1586 file_name = os.path.basename(file_path)
1587 file_name_with_prefix = self.prefix + "_" + file_name
1588 xml_file.write_tree(stylesheet, os.path.join(file_dir,
1589 file_name_with_prefix))
1591 def write_xml_files(self):
1592 ''' Write the xml files
1594 self.write_xml_file(self.xml_global_file, STYLESHEET_GLOBAL)
1595 for xml_file in self.d_xml_board_files.values():
1596 self.write_xml_file(xml_file, STYLESHEET_BOARD)
1600 # Describes the command
1602 return _("The jobs command launches maintenances that are described"
1603 " in the dedicated jobs configuration file.\n\nexample:\nsat "
1604 "jobs --name my_jobs --publish")
1608 def run(args, runner, logger):
1610 (options, args) = parser.parse_args(args)
1612 l_cfg_dir = runner.cfg.PATHS.JOBPATH
1614 # list option : display all the available config files
1616 for cfg_dir in l_cfg_dir:
1617 if not options.no_label:
1618 logger.write("------ %s\n" %
1619 src.printcolors.printcHeader(cfg_dir))
1620 if not os.path.exists(cfg_dir):
1622 for f in sorted(os.listdir(cfg_dir)):
1623 if not f.endswith('.pyconf'):
1626 logger.write("%s\n" % cfilename)
1629 # Make sure the jobs_config option has been called
1630 if not options.jobs_cfg:
1631 message = _("The option --jobs_config is required\n")
1632 src.printcolors.printcError(message)
1635 # Find the file in the directories, unless it is a full path
1637 if os.path.exists(options.jobs_cfg):
1639 file_jobs_cfg = options.jobs_cfg
1641 for cfg_dir in l_cfg_dir:
1642 file_jobs_cfg = os.path.join(cfg_dir, options.jobs_cfg)
1643 if not file_jobs_cfg.endswith('.pyconf'):
1644 file_jobs_cfg += '.pyconf'
1646 if not os.path.exists(file_jobs_cfg):
1653 msg = _("The file configuration %(name_file)s was not found."
1654 "\nUse the --list option to get the possible files.")
1655 src.printcolors.printcError(msg)
1659 (_("Platform"), runner.cfg.VARS.dist),
1660 (_("File containing the jobs configuration"), file_jobs_cfg)
1662 src.print_info(logger, info)
1664 # Read the config that is in the file
1665 config_jobs = src.read_config_from_a_file(file_jobs_cfg)
1666 if options.only_jobs:
1667 l_jb = src.pyconf.Sequence()
1668 for jb in config_jobs.jobs:
1669 if jb.name in options.only_jobs:
1671 "Adding a job that was given in only_jobs option parameters")
1672 config_jobs.jobs = l_jb
1675 today_jobs = Jobs(runner,
1679 # SSH connection to all machines
1680 today_jobs.ssh_connection_all_machines()
1681 if options.test_connection:
1686 logger.write(src.printcolors.printcInfo(
1687 _("Initialize the xml boards : ")), 5)
1690 # Copy the stylesheets in the log directory
1691 log_dir = runner.cfg.USER.log_dir
1692 xsl_dir = os.path.join(runner.cfg.VARS.srcDir, 'xsl')
1694 files_to_copy.append(os.path.join(xsl_dir, STYLESHEET_GLOBAL))
1695 files_to_copy.append(os.path.join(xsl_dir, STYLESHEET_BOARD))
1696 files_to_copy.append(os.path.join(xsl_dir, "running.gif"))
1697 for file_path in files_to_copy:
1698 shutil.copy2(file_path, log_dir)
1700 # Instanciate the Gui in order to produce the xml files that contain all
1702 gui = Gui(runner.cfg.USER.log_dir,
1704 today_jobs.ljobs_not_today,
1705 runner.cfg.VARS.datehour,
1707 file_boards = options.input_boards)
1709 logger.write(src.printcolors.printcSuccess("OK"), 5)
1710 logger.write("\n\n", 5)
1713 # Display the list of the xml files
1714 logger.write(src.printcolors.printcInfo(("Here is the list of published"
1716 logger.write("%s\n" % gui.xml_global_file.logFile, 4)
1717 for board in gui.d_xml_board_files.keys():
1718 file_path = gui.d_xml_board_files[board].logFile
1719 file_name = os.path.basename(file_path)
1720 logger.write("%s\n" % file_path, 4)
1721 logger.add_link(file_name, "board", 0, board)
1723 logger.write("\n", 4)
1725 today_jobs.gui = gui
1729 # Run all the jobs contained in config_jobs
1730 today_jobs.run_jobs()
1731 except KeyboardInterrupt:
1733 logger.write("\n\n%s\n\n" %
1734 (src.printcolors.printcWarning(_("Forced interruption"))), 1)
1739 msg = _("Killing the running jobs and trying"
1740 " to get the corresponding logs\n")
1741 logger.write(src.printcolors.printcWarning(msg))
1743 # find the potential not finished jobs and kill them
1744 for jb in today_jobs.ljobs:
1745 if not jb.has_finished():
1748 jb.kill_remote_process()
1749 except Exception as e:
1750 msg = _("Failed to kill job %s: %s\n" % (jb.name, e))
1751 logger.write(src.printcolors.printcWarning(msg))
1752 if jb.res_job != "0":
1756 today_jobs.gui.last_update(_("Forced interruption"))
1759 today_jobs.gui.last_update()
1760 # Output the results
1761 today_jobs.write_all_results()