3 # Copyright (C) 2010-2013 CEA/DEN
5 # This library is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation; either
8 # version 2.1 of the License.
10 # This library is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 # Lesser General Public License for more details.
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this library; if not, write to the Free Software
17 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
30 STYLESHEET_GLOBAL = "jobs_global_report.xsl"
31 STYLESHEET_BOARD = "jobs_board_report.xsl"
36 parser = src.options.Options()
38 parser.add_option('n', 'name', 'string', 'jobs_cfg',
39 _('Mandatory: The name of the config file that contains'
40 ' the jobs configuration'))
41 parser.add_option('o', 'only_jobs', 'list2', 'only_jobs',
42 _('Optional: the list of jobs to launch, by their name. '))
43 parser.add_option('l', 'list', 'boolean', 'list',
44 _('Optional: list all available config files.'))
45 parser.add_option('t', 'test_connection', 'boolean', 'test_connection',
46 _("Optional: try to connect to the machines. "
47 "Not executing the jobs."),
49 parser.add_option('p', 'publish', 'boolean', 'publish',
50 _("Optional: generate an xml file that can be read in a "
51 "browser to display the jobs status."),
53 parser.add_option('i', 'input_boards', 'string', 'input_boards', _("Optional: "
54 "the path to csv file that contain "
55 "the expected boards."),"")
56 parser.add_option('', 'completion', 'boolean', 'no_label',
57 _("Optional (internal use): do not print labels, Works only "
61 class Machine(object):
62 '''Class to manage a ssh connection on a machine
70 sat_path="salomeTools"):
74 self.distribution = None # Will be filled after copying SAT on the machine
76 self.password = passwd
77 self.sat_path = sat_path
78 self.ssh = paramiko.SSHClient()
79 self._connection_successful = None
81 def connect(self, logger):
82 '''Initiate the ssh connection to the remote machine
84 :param logger src.logger.Logger: The logger instance
89 self._connection_successful = False
90 self.ssh.load_system_host_keys()
91 self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
93 self.ssh.connect(self.host,
96 password = self.password)
97 except paramiko.AuthenticationException:
98 message = src.KO_STATUS + _("Authentication failed")
99 except paramiko.BadHostKeyException:
100 message = (src.KO_STATUS +
101 _("The server's host key could not be verified"))
102 except paramiko.SSHException:
103 message = ( _("SSHException error connecting or "
104 "establishing an SSH session"))
106 message = ( _("Error connecting or establishing an SSH session"))
108 self._connection_successful = True
112 def successfully_connected(self, logger):
113 '''Verify if the connection to the remote machine has succeed
115 :param logger src.logger.Logger: The logger instance
116 :return: True if the connection has succeed, False if not
119 if self._connection_successful == None:
120 message = _("Warning : trying to ask if the connection to "
121 "(name: %s host: %s, port: %s, user: %s) is OK whereas there were"
122 " no connection request" %
123 (self.name, self.host, self.port, self.user))
124 logger.write( src.printcolors.printcWarning(message))
125 return self._connection_successful
127 def copy_sat(self, sat_local_path, job_file):
128 '''Copy salomeTools to the remote machine in self.sat_path
132 # open a sftp connection
133 self.sftp = self.ssh.open_sftp()
134 # Create the sat directory on remote machine if it is not existing
135 self.mkdir(self.sat_path, ignore_existing=True)
137 self.put_dir(sat_local_path, self.sat_path, filters = ['.git'])
138 # put the job configuration file in order to make it reachable
139 # on the remote machine
140 self.sftp.put(job_file, os.path.join(".salomeTools",
142 ".jobs_command_file.pyconf"))
143 except Exception as e:
145 self._connection_successful = False
149 def put_dir(self, source, target, filters = []):
150 ''' Uploads the contents of the source directory to the target path. The
151 target directory needs to exists. All sub-directories in source are
152 created under target.
154 for item in os.listdir(source):
157 source_path = os.path.join(source, item)
158 destination_path = os.path.join(target, item)
159 if os.path.islink(source_path):
160 linkto = os.readlink(source_path)
162 self.sftp.symlink(linkto, destination_path)
163 self.sftp.chmod(destination_path,
164 os.stat(source_path).st_mode)
168 if os.path.isfile(source_path):
169 self.sftp.put(source_path, destination_path)
170 self.sftp.chmod(destination_path,
171 os.stat(source_path).st_mode)
173 self.mkdir(destination_path, ignore_existing=True)
174 self.put_dir(source_path, destination_path)
176 def mkdir(self, path, mode=511, ignore_existing=False):
177 ''' Augments mkdir by adding an option to not fail
181 self.sftp.mkdir(path, mode)
188 def exec_command(self, command, logger):
189 '''Execute the command on the remote machine
191 :param command str: The command to be run
192 :param logger src.logger.Logger: The logger instance
193 :return: the stdin, stdout, and stderr of the executing command,
195 :rtype: (paramiko.channel.ChannelFile, paramiko.channel.ChannelFile,
196 paramiko.channel.ChannelFile)
199 # Does not wait the end of the command
200 (stdin, stdout, stderr) = self.ssh.exec_command(command)
201 except paramiko.SSHException:
202 message = src.KO_STATUS + _(
203 ": the server failed to execute the command\n")
204 logger.write( src.printcolors.printcError(message))
205 return (None, None, None)
207 logger.write( src.printcolors.printcError(src.KO_STATUS + '\n'))
208 return (None, None, None)
210 return (stdin, stdout, stderr)
213 '''Close the ssh connection
219 def write_info(self, logger):
220 '''Prints the informations relative to the machine in the logger
221 (terminal traces and log file)
223 :param logger src.logger.Logger: The logger instance
227 logger.write("host : " + self.host + "\n")
228 logger.write("port : " + str(self.port) + "\n")
229 logger.write("user : " + str(self.user) + "\n")
230 if self.successfully_connected(logger):
231 status = src.OK_STATUS
233 status = src.KO_STATUS
234 logger.write("Connection : " + status + "\n\n")
238 '''Class to manage one job
240 def __init__(self, name, machine, application, board,
241 commands, timeout, config, logger, after=None, prefix=None):
244 self.machine = machine
246 self.timeout = timeout
247 self.application = application
251 # The list of log files to download from the remote machine
252 self.remote_log_files = []
254 # The remote command status
255 # -1 means that it has not been launched,
256 # 0 means success and 1 means fail
258 self.cancelled = False
262 self._has_begun = False
263 self._has_finished = False
264 self._has_timouted = False
265 self._stdin = None # Store the command inputs field
266 self._stdout = None # Store the command outputs field
267 self._stderr = None # Store the command errors field
272 self.commands = commands
273 self.command = (os.path.join(self.machine.sat_path, "sat") +
275 os.path.join(self.machine.sat_path,
276 "list_log_files.txt") +
277 " job --jobs_config .jobs_command_file" +
281 self.command = prefix + ' "' + self.command +'"'
284 """ Get the pid(s) corresponding to the command that have been launched
285 On the remote machine
287 :return: The list of integers corresponding to the found pids
291 cmd_pid = 'ps aux | grep "' + self.command + '" | awk \'{print $2}\''
292 (_, out_pid, _) = self.machine.exec_command(cmd_pid, self.logger)
293 pids_cmd = out_pid.readlines()
294 pids_cmd = [str(src.only_numbers(pid)) for pid in pids_cmd]
298 def kill_remote_process(self, wait=1):
299 '''Kills the process on the remote machine.
301 :return: (the output of the kill, the error of the kill)
305 pids = self.get_pids()
306 cmd_kill = " ; ".join([("kill -2 " + pid) for pid in pids])
307 (_, out_kill, err_kill) = self.machine.exec_command(cmd_kill,
310 return (out_kill, err_kill)
313 '''Returns True if the job has already begun
315 :return: True if the job has already begun
318 return self._has_begun
320 def has_finished(self):
321 '''Returns True if the job has already finished
322 (i.e. all the commands have been executed)
323 If it is finished, the outputs are stored in the fields out and err.
325 :return: True if the job has already finished
329 # If the method has already been called and returned True
330 if self._has_finished:
333 # If the job has not begun yet
334 if not self.has_begun():
337 if self._stdout.channel.closed:
338 self._has_finished = True
339 # Store the result outputs
340 self.out += self._stdout.read().decode()
341 self.err += self._stderr.read().decode()
343 self._Tf = time.time()
344 # And get the remote command status and log files
347 return self._has_finished
349 def get_log_files(self):
350 """Get the log files produced by the command launched
351 on the remote machine, and put it in the log directory of the user,
352 so they can be accessible from
354 # Do not get the files if the command is not finished
355 if not self.has_finished():
356 msg = _("Trying to get log files whereas the job is not finished.")
357 self.logger.write(src.printcolors.printcWarning(msg))
360 # First get the file that contains the list of log files to get
361 tmp_file_path = src.get_tmp_filename(self.config, "list_log_files.txt")
362 remote_path = os.path.join(self.machine.sat_path, "list_log_files.txt")
363 self.machine.sftp.get(
367 # Read the file and get the result of the command and all the log files
369 fstream_tmp = open(tmp_file_path, "r")
370 file_lines = fstream_tmp.readlines()
371 file_lines = [line.replace("\n", "") for line in file_lines]
373 os.remove(tmp_file_path)
376 # The first line is the result of the command (0 success or 1 fail)
377 self.res_job = file_lines[0]
378 except Exception as e:
379 self.err += _("Unable to get status from remote file %s: %s" %
380 (remote_path, str(e)))
382 for i, job_path_remote in enumerate(file_lines[1:]):
384 # For each command, there is two files to get :
385 # 1- The xml file describing the command and giving the
387 # 2- The txt file containing the system command traces (like
388 # traces produced by the "make" command)
389 # 3- In case of the test command, there is another file to get :
390 # the xml board that contain the test results
391 dirname = os.path.basename(os.path.dirname(job_path_remote))
392 if dirname != 'OUT' and dirname != 'TEST':
394 local_path = os.path.join(os.path.dirname(
395 self.logger.logFilePath),
396 os.path.basename(job_path_remote))
397 if i==0: # The first is the job command
398 self.logger.add_link(os.path.basename(job_path_remote),
402 elif dirname == 'OUT':
404 local_path = os.path.join(os.path.dirname(
405 self.logger.logFilePath),
407 os.path.basename(job_path_remote))
408 elif dirname == 'TEST':
410 local_path = os.path.join(os.path.dirname(
411 self.logger.logFilePath),
413 os.path.basename(job_path_remote))
416 if not os.path.exists(local_path):
417 self.machine.sftp.get(job_path_remote, local_path)
418 self.remote_log_files.append(local_path)
419 except Exception as e:
420 self.err += _("Unable to get %s log file from remote: %s" %
421 (str(job_path_remote),
424 def has_failed(self):
425 '''Returns True if the job has failed.
426 A job is considered as failed if the machine could not be reached,
427 if the remote command failed,
428 or if the job finished with a time out.
430 :return: True if the job has failed
433 if not self.has_finished():
435 if not self.machine.successfully_connected(self.logger):
437 if self.is_timeout():
439 if self.res_job == "1":
444 """In case of a failing job, one has to cancel every job that depend
445 on it. This method put the job as failed and will not be executed.
449 self._has_begun = True
450 self._has_finished = True
451 self.cancelled = True
452 self.out += _("This job was not launched because its father has failed.")
453 self.err += _("This job was not launched because its father has failed.")
455 def is_running(self):
456 '''Returns True if the job commands are running
458 :return: True if the job is running
461 return self.has_begun() and not self.has_finished()
463 def is_timeout(self):
464 '''Returns True if the job commands has finished with timeout
466 :return: True if the job has finished with timeout
469 return self._has_timouted
471 def time_elapsed(self):
472 """Get the time elapsed since the job launching
474 :return: The number of seconds
477 if not self.has_begun():
480 return T_now - self._T0
482 def check_time(self):
483 """Verify that the job has not exceeded its timeout.
484 If it has, kill the remote command and consider the job as finished.
486 if not self.has_begun():
488 if self.time_elapsed() > self.timeout:
489 self._has_finished = True
490 self._has_timouted = True
491 self._Tf = time.time()
493 (out_kill, _) = self.kill_remote_process()
494 self.out += "TIMEOUT \n" + out_kill.read().decode()
495 self.err += "TIMEOUT : %s seconds elapsed\n" % str(self.timeout)
498 except Exception as e:
499 self.err += _("Unable to get remote log files: %s" % e)
501 def total_duration(self):
502 """Give the total duration of the job
504 :return: the total duration of the job in seconds
507 return self._Tf - self._T0
510 """Launch the job by executing the remote command.
513 # Prevent multiple run
515 msg = _("Warning: A job can only be launched one time")
516 msg2 = _("Trying to launch the job \"%s\" whereas it has "
517 "already been launched." % self.name)
518 self.logger.write(src.printcolors.printcWarning("%s\n%s\n" % (msg,
522 # Do not execute the command if the machine could not be reached
523 if not self.machine.successfully_connected(self.logger):
524 self._has_finished = True
526 self.err += ("Connection to machine (name : %s, host: %s, port:"
527 " %s, user: %s) has failed\nUse the log command "
528 "to get more information."
529 % (self.machine.name,
534 # Usual case : Launch the command on remote machine
535 self._T0 = time.time()
536 self._stdin, self._stdout, self._stderr = self.machine.exec_command(
539 # If the results are not initialized, finish the job
540 if (self._stdin, self._stdout, self._stderr) == (None, None, None):
541 self._has_finished = True
542 self._Tf = time.time()
544 self.err += "The server failed to execute the command"
546 # Put the beginning flag to true.
547 self._has_begun = True
549 def write_results(self):
550 """Display on the terminal all the job's information
552 self.logger.write("name : " + self.name + "\n")
554 self.logger.write("after : %s\n" % self.after)
555 self.logger.write("Time elapsed : %4imin %2is \n" %
556 (self.total_duration()//60 , self.total_duration()%60))
558 self.logger.write("Begin time : %s\n" %
559 time.strftime('%Y-%m-%d %H:%M:%S',
560 time.localtime(self._T0)) )
562 self.logger.write("End time : %s\n\n" %
563 time.strftime('%Y-%m-%d %H:%M:%S',
564 time.localtime(self._Tf)) )
566 machine_head = "Informations about connection :\n"
567 underline = (len(machine_head) - 2) * "-"
568 self.logger.write(src.printcolors.printcInfo(
569 machine_head+underline+"\n"))
570 self.machine.write_info(self.logger)
572 self.logger.write(src.printcolors.printcInfo("out : \n"))
574 self.logger.write("Unable to get output\n")
576 self.logger.write(self.out + "\n")
577 self.logger.write(src.printcolors.printcInfo("err : \n"))
578 self.logger.write(self.err + "\n")
580 def get_status(self):
581 """Get the status of the job (used by the Gui for xml display)
583 :return: The current status of the job
586 if not self.machine.successfully_connected(self.logger):
587 return "SSH connection KO"
588 if not self.has_begun():
589 return "Not launched"
592 if self.is_running():
593 return "running since " + time.strftime('%Y-%m-%d %H:%M:%S',
594 time.localtime(self._T0))
595 if self.has_finished():
596 if self.is_timeout():
597 return "Timeout since " + time.strftime('%Y-%m-%d %H:%M:%S',
598 time.localtime(self._Tf))
599 return "Finished since " + time.strftime('%Y-%m-%d %H:%M:%S',
600 time.localtime(self._Tf))
603 '''Class to manage the jobs to be run
610 lenght_columns = 20):
611 # The jobs configuration
612 self.cfg_jobs = config_jobs
613 self.job_file_path = job_file_path
614 # The machine that will be used today
616 # The list of machine (hosts, port) that will be used today
617 # (a same host can have several machine instances since there
618 # can be several ssh parameters)
620 # The jobs to be launched today
622 # The jobs that will not be launched today
623 self.ljobs_not_today = []
626 self.len_columns = lenght_columns
628 # the list of jobs that have not been run yet
629 self._l_jobs_not_started = []
630 # the list of jobs that have already ran
631 self._l_jobs_finished = []
632 # the list of jobs that are running
633 self._l_jobs_running = []
635 self.determine_jobs_and_machines()
637 def define_job(self, job_def, machine):
638 '''Takes a pyconf job definition and a machine (from class machine)
639 and returns the job instance corresponding to the definition.
641 :param job_def src.config.Mapping: a job definition
642 :param machine machine: the machine on which the job will run
643 :return: The corresponding job in a job class instance
647 cmmnds = job_def.commands
648 if not "timeout" in job_def:
649 timeout = 4*60*60 # default timeout = 4h
651 timeout = job_def.timeout
653 if 'after' in job_def:
654 after = job_def.after
656 if 'application' in job_def:
657 application = job_def.application
659 if 'board' in job_def:
660 board = job_def.board
662 if "prefix" in job_def:
663 prefix = job_def.prefix
676 def determine_jobs_and_machines(self):
677 '''Function that reads the pyconf jobs definition and instantiates all
678 the machines and jobs to be done today.
683 today = datetime.date.weekday(datetime.date.today())
686 for job_def in self.cfg_jobs.jobs :
688 if not "machine" in job_def:
689 msg = _('WARNING: The job "%s" do not have the key '
690 '"machine", this job is ignored.\n\n' % job_def.name)
691 self.logger.write(src.printcolors.printcWarning(msg))
693 name_machine = job_def.machine
696 for mach in self.lmachines:
697 if mach.name == name_machine:
701 if a_machine == None:
702 for machine_def in self.cfg_jobs.machines:
703 if machine_def.name == name_machine:
704 if 'host' not in machine_def:
705 host = self.runner.cfg.VARS.hostname
707 host = machine_def.host
709 if 'user' not in machine_def:
710 user = self.runner.cfg.VARS.user
712 user = machine_def.user
714 if 'port' not in machine_def:
717 port = machine_def.port
719 if 'password' not in machine_def:
722 passwd = machine_def.password
724 if 'sat_path' not in machine_def:
725 sat_path = "salomeTools"
727 sat_path = machine_def.sat_path
738 self.lmachines.append(a_machine)
739 if (host, port) not in host_list:
740 host_list.append((host, port))
742 if a_machine == None:
743 msg = _("WARNING: The job \"%(job_name)s\" requires the "
744 "machine \"%(machine_name)s\" but this machine "
745 "is not defined in the configuration file.\n"
746 "The job will not be launched")
747 self.logger.write(src.printcolors.printcWarning(msg))
749 a_job = self.define_job(job_def, a_machine)
751 if today in job_def.when:
752 self.ljobs.append(a_job)
753 else: # today in job_def.when
754 self.ljobs_not_today.append(a_job)
756 self.lhosts = host_list
758 def ssh_connection_all_machines(self, pad=50):
759 '''Function that do the ssh connection to every machine
765 self.logger.write(src.printcolors.printcInfo((
766 "Establishing connection with all the machines :\n")))
767 for machine in self.lmachines:
768 # little algorithm in order to display traces
769 begin_line = (_("Connection to %s: " % machine.name))
770 if pad - len(begin_line) < 0:
773 endline = (pad - len(begin_line)) * "." + " "
775 step = "SSH connection"
776 self.logger.write( begin_line + endline + step)
778 # the call to the method that initiate the ssh connection
779 msg = machine.connect(self.logger)
781 # Copy salomeTools to the remote machine
782 if machine.successfully_connected(self.logger):
783 step = _("Remove SAT")
784 self.logger.write('\r%s%s%s' % (begin_line, endline, 20 * " "),3)
785 self.logger.write('\r%s%s%s' % (begin_line, endline, step), 3)
786 (__, out_dist, __) = machine.exec_command(
787 "rm -rf %s" % machine.sat_path,
793 self.logger.write('\r%s%s%s' % (begin_line, endline, 20 * " "),3)
794 self.logger.write('\r%s%s%s' % (begin_line, endline, step), 3)
796 res_copy = machine.copy_sat(self.runner.cfg.VARS.salometoolsway,
798 # get the remote machine distribution using a sat command
799 (__, out_dist, __) = machine.exec_command(
800 os.path.join(machine.sat_path,
801 "sat config --value VARS.dist --no_label"),
803 machine.distribution = out_dist.read().decode().replace("\n",
805 # Print the status of the copy
807 self.logger.write('\r%s' %
808 ((len(begin_line)+len(endline)+20) * " "), 3)
809 self.logger.write('\r%s%s%s' %
812 src.printcolors.printc(src.OK_STATUS)), 3)
814 self.logger.write('\r%s' %
815 ((len(begin_line)+len(endline)+20) * " "), 3)
816 self.logger.write('\r%s%s%s %s' %
819 src.printcolors.printc(src.KO_STATUS),
820 _("Copy of SAT failed: %s" % res_copy)), 3)
822 self.logger.write('\r%s' %
823 ((len(begin_line)+len(endline)+20) * " "), 3)
824 self.logger.write('\r%s%s%s %s' %
827 src.printcolors.printc(src.KO_STATUS),
829 self.logger.write("\n", 3)
831 self.logger.write("\n")
834 def is_occupied(self, hostname):
835 '''Function that returns True if a job is running on
836 the machine defined by its host and its port.
838 :param hostname (str, int): the pair (host, port)
839 :return: the job that is running on the host,
840 or false if there is no job running on the host.
845 for jb in self.ljobs:
846 if jb.machine.host == host and jb.machine.port == port:
851 def update_jobs_states_list(self):
852 '''Function that updates the lists that store the currently
853 running jobs and the jobs that have already finished.
858 jobs_finished_list = []
859 jobs_running_list = []
860 for jb in self.ljobs:
862 jobs_running_list.append(jb)
864 if jb.has_finished():
865 jobs_finished_list.append(jb)
867 nb_job_finished_before = len(self._l_jobs_finished)
868 self._l_jobs_finished = jobs_finished_list
869 self._l_jobs_running = jobs_running_list
871 nb_job_finished_now = len(self._l_jobs_finished)
873 return nb_job_finished_now > nb_job_finished_before
875 def cancel_dependencies_of_failing_jobs(self):
876 '''Function that cancels all the jobs that depend on a failing one.
882 for job in self.ljobs:
883 if job.after is None:
885 father_job = self.find_job_that_has_name(job.after)
886 if father_job is not None and father_job.has_failed():
889 def find_job_that_has_name(self, name):
890 '''Returns the job by its name.
892 :param name str: a job name
893 :return: the job that has the name.
896 for jb in self.ljobs:
899 # the following is executed only if the job was not found
902 def str_of_length(self, text, length):
903 '''Takes a string text of any length and returns
904 the most close string of length "length".
906 :param text str: any string
907 :param length int: a length for the returned string
908 :return: the most close string of length "length"
911 if len(text) > length:
912 text_out = text[:length-3] + '...'
914 diff = length - len(text)
915 before = " " * (diff//2)
916 after = " " * (diff//2 + diff%2)
917 text_out = before + text + after
921 def display_status(self, len_col):
922 '''Takes a lenght and construct the display of the current status
923 of the jobs in an array that has a column for each host.
924 It displays the job that is currently running on the host
927 :param len_col int: the size of the column
933 for host_port in self.lhosts:
934 jb = self.is_occupied(host_port)
935 if not jb: # nothing running on the host
936 empty = self.str_of_length("empty", len_col)
937 display_line += "|" + empty
939 display_line += "|" + src.printcolors.printcInfo(
940 self.str_of_length(jb.name, len_col))
942 self.logger.write("\r" + display_line + "|")
947 '''The main method. Runs all the jobs on every host.
948 For each host, at a given time, only one job can be running.
949 The jobs that have the field after (that contain the job that has
950 to be run before it) are run after the previous job.
951 This method stops when all the jobs are finished.
958 self.logger.write(src.printcolors.printcInfo(
959 _('Executing the jobs :\n')))
961 for host_port in self.lhosts:
964 if port == 22: # default value
965 text_line += "|" + self.str_of_length(host, self.len_columns)
967 text_line += "|" + self.str_of_length(
968 "("+host+", "+str(port)+")", self.len_columns)
970 tiret_line = " " + "-"*(len(text_line)-1) + "\n"
971 self.logger.write(tiret_line)
972 self.logger.write(text_line + "|\n")
973 self.logger.write(tiret_line)
976 # The infinite loop that runs the jobs
977 l_jobs_not_started = src.deepcopy_list(self.ljobs)
978 while len(self._l_jobs_finished) != len(self.ljobs):
979 new_job_start = False
980 for host_port in self.lhosts:
982 if self.is_occupied(host_port):
985 for jb in l_jobs_not_started:
986 if (jb.machine.host, jb.machine.port) != host_port:
990 l_jobs_not_started.remove(jb)
994 jb_before = self.find_job_that_has_name(jb.after)
995 if jb_before is None:
997 msg = _("This job was not launched because its "
998 "father is not in the jobs list.")
1002 if jb_before.has_finished():
1004 l_jobs_not_started.remove(jb)
1005 new_job_start = True
1007 self.cancel_dependencies_of_failing_jobs()
1008 new_job_finished = self.update_jobs_states_list()
1010 if new_job_start or new_job_finished:
1012 self.gui.update_xml_files(self.ljobs)
1013 # Display the current status
1014 self.display_status(self.len_columns)
1016 # Make sure that the proc is not entirely busy
1019 self.logger.write("\n")
1020 self.logger.write(tiret_line)
1021 self.logger.write("\n\n")
1024 self.gui.update_xml_files(self.ljobs)
1025 self.gui.last_update()
1027 def write_all_results(self):
1028 '''Display all the jobs outputs.
1034 for jb in self.ljobs:
1035 self.logger.write(src.printcolors.printcLabel(
1036 "#------- Results for job %s -------#\n" % jb.name))
1038 self.logger.write("\n\n")
1041 '''Class to manage the the xml data that can be displayed in a browser to
1045 def __init__(self, xml_dir_path, l_jobs, l_jobs_not_today, prefix, file_boards=""):
1048 :param xml_dir_path str: The path to the directory where to put
1049 the xml resulting files
1050 :param l_jobs List: the list of jobs that run today
1051 :param l_jobs_not_today List: the list of jobs that do not run today
1052 :param file_boards str: the file path from which to read the
1055 # The prefix to add to the xml files : date_hour
1056 self.prefix = prefix
1058 # The path of the csv files to read to fill the expected boards
1059 self.file_boards = file_boards
1061 if file_boards != "":
1062 today = datetime.date.weekday(datetime.date.today())
1063 self.parse_csv_boards(today)
1065 self.d_input_boards = {}
1067 # The path of the global xml file
1068 self.xml_dir_path = xml_dir_path
1069 # Initialize the xml files
1070 self.global_name = "global_report"
1071 xml_global_path = os.path.join(self.xml_dir_path,
1072 self.global_name + ".xml")
1073 self.xml_global_file = src.xmlManager.XmlLogFile(xml_global_path,
1076 # Find history for each job
1078 self.find_history(l_jobs, l_jobs_not_today)
1080 # The xml files that corresponds to the boards.
1081 # {name_board : xml_object}}
1082 self.d_xml_board_files = {}
1084 # Create the lines and columns
1085 self.initialize_boards(l_jobs, l_jobs_not_today)
1087 # Write the xml file
1088 self.update_xml_files(l_jobs)
1090 def add_xml_board(self, name):
1091 '''Add a board to the board list
1092 :param name str: the board name
1094 xml_board_path = os.path.join(self.xml_dir_path, name + ".xml")
1095 self.d_xml_board_files[name] = src.xmlManager.XmlLogFile(
1098 self.d_xml_board_files[name].add_simple_node("distributions")
1099 self.d_xml_board_files[name].add_simple_node("applications")
1100 self.d_xml_board_files[name].add_simple_node("board", text=name)
1102 def initialize_boards(self, l_jobs, l_jobs_not_today):
1103 '''Get all the first information needed for each file and write the
1104 first version of the files
1105 :param l_jobs List: the list of jobs that run today
1106 :param l_jobs_not_today List: the list of jobs that do not run today
1108 # Get the boards to fill and put it in a dictionary
1109 # {board_name : xml instance corresponding to the board}
1110 for job in l_jobs + l_jobs_not_today:
1112 if (board is not None and
1113 board not in self.d_xml_board_files.keys()):
1114 self.add_xml_board(board)
1116 # Verify that the boards given as input are done
1117 for board in list(self.d_input_boards.keys()):
1118 if board not in self.d_xml_board_files:
1119 self.add_xml_board(board)
1120 root_node = self.d_xml_board_files[board].xmlroot
1121 src.xmlManager.append_node_attrib(root_node,
1122 {"input_file" : self.file_boards})
1124 # Loop over all jobs in order to get the lines and columns for each
1128 for board in self.d_xml_board_files:
1130 d_application[board] = []
1134 for job in l_jobs + l_jobs_not_today:
1136 if (job.machine.host, job.machine.port) not in l_hosts_ports:
1137 l_hosts_ports.append((job.machine.host, job.machine.port))
1139 distrib = job.machine.distribution
1140 application = job.application
1142 board_job = job.board
1145 for board in self.d_xml_board_files:
1146 if board_job == board:
1147 if distrib is not None and distrib not in d_dist[board]:
1148 d_dist[board].append(distrib)
1149 src.xmlManager.add_simple_node(
1150 self.d_xml_board_files[board].xmlroot.find(
1153 attrib={"name" : distrib})
1155 if board_job == board:
1156 if (application is not None and
1157 application not in d_application[board]):
1158 d_application[board].append(application)
1159 src.xmlManager.add_simple_node(
1160 self.d_xml_board_files[board].xmlroot.find(
1164 "name" : application})
1166 # Verify that there are no missing application or distribution in the
1167 # xml board files (regarding the input boards)
1168 for board in self.d_xml_board_files:
1169 l_dist = d_dist[board]
1170 if board not in self.d_input_boards.keys():
1172 for dist in self.d_input_boards[board]["rows"]:
1173 if dist not in l_dist:
1174 src.xmlManager.add_simple_node(
1175 self.d_xml_board_files[board].xmlroot.find(
1178 attrib={"name" : dist})
1179 l_appli = d_application[board]
1180 for appli in self.d_input_boards[board]["columns"]:
1181 if appli not in l_appli:
1182 src.xmlManager.add_simple_node(
1183 self.d_xml_board_files[board].xmlroot.find(
1186 attrib={"name" : appli})
1188 # Initialize the hosts_ports node for the global file
1189 self.xmlhosts_ports = self.xml_global_file.add_simple_node(
1191 for host, port in l_hosts_ports:
1192 host_port = "%s:%i" % (host, port)
1193 src.xmlManager.add_simple_node(self.xmlhosts_ports,
1195 attrib={"name" : host_port})
1197 # Initialize the jobs node in all files
1198 for xml_file in [self.xml_global_file] + list(
1199 self.d_xml_board_files.values()):
1200 xml_jobs = xml_file.add_simple_node("jobs")
1201 # Get the jobs present in the config file but
1202 # that will not be launched today
1203 self.put_jobs_not_today(l_jobs_not_today, xml_jobs)
1205 # add also the infos node
1206 xml_file.add_simple_node("infos",
1207 attrib={"name" : "last update",
1208 "JobsCommandStatus" : "running"})
1210 # and put the history node
1211 history_node = xml_file.add_simple_node("history")
1212 name_board = os.path.basename(xml_file.logFile)[:-len(".xml")]
1213 # serach for board files
1214 expression = "^[0-9]{8}_+[0-9]{6}_" + name_board + ".xml$"
1215 oExpr = re.compile(expression)
1216 # Get the list of xml borad files that are in the log directory
1217 for file_name in os.listdir(self.xml_dir_path):
1218 if oExpr.search(file_name):
1219 date = os.path.basename(file_name).split("_")[0]
1220 file_path = os.path.join(self.xml_dir_path, file_name)
1221 src.xmlManager.add_simple_node(history_node,
1224 attrib={"date" : date})
1227 # Find in each board the squares that needs to be filled regarding the
1228 # input csv files but that are not covered by a today job
1229 for board in self.d_input_boards.keys():
1230 xml_root_board = self.d_xml_board_files[board].xmlroot
1231 # Find the missing jobs for today
1232 xml_missing = src.xmlManager.add_simple_node(xml_root_board,
1234 for row, column in self.d_input_boards[board]["jobs"]:
1237 if (job.application == column and
1238 job.machine.distribution == row):
1242 src.xmlManager.add_simple_node(xml_missing,
1244 attrib={"distribution" : row,
1245 "application" : column })
1246 # Find the missing jobs not today
1247 xml_missing_not_today = src.xmlManager.add_simple_node(
1249 "missing_jobs_not_today")
1250 for row, column in self.d_input_boards[board]["jobs_not_today"]:
1252 for job in l_jobs_not_today:
1253 if (job.application == column and
1254 job.machine.distribution == row):
1258 src.xmlManager.add_simple_node(xml_missing_not_today,
1260 attrib={"distribution" : row,
1261 "application" : column })
1263 def find_history(self, l_jobs, l_jobs_not_today):
1264 """find, for each job, in the existent xml boards the results for the
1265 job. Store the results in the dictionnary self.history = {name_job :
1266 list of (date, status, list links)}
1268 :param l_jobs List: the list of jobs to run today
1269 :param l_jobs_not_today List: the list of jobs that do not run today
1271 # load the all the history
1272 expression = "^[0-9]{8}_+[0-9]{6}_" + self.global_name + ".xml$"
1273 oExpr = re.compile(expression)
1274 # Get the list of global xml that are in the log directory
1276 for file_name in os.listdir(self.xml_dir_path):
1277 if oExpr.search(file_name):
1278 file_path = os.path.join(self.xml_dir_path, file_name)
1279 global_xml = src.xmlManager.ReadXmlFile(file_path)
1280 l_globalxml.append(global_xml)
1282 # Construct the dictionnary self.history
1283 for job in l_jobs + l_jobs_not_today:
1285 for global_xml in l_globalxml:
1286 date = os.path.basename(global_xml.filePath).split("_")[0]
1287 global_root_node = global_xml.xmlroot.find("jobs")
1288 job_node = src.xmlManager.find_node_by_attrib(
1294 if job_node.find("remote_log_file_path") is not None:
1295 link = job_node.find("remote_log_file_path").text
1296 res_job = job_node.find("res").text
1297 if link != "nothing":
1298 l_links.append((date, res_job, link))
1299 l_links = sorted(l_links, reverse=True)
1300 self.history[job.name] = l_links
1302 def put_jobs_not_today(self, l_jobs_not_today, xml_node_jobs):
1303 '''Get all the first information needed for each file and write the
1304 first version of the files
1306 :param xml_node_jobs etree.Element: the node corresponding to a job
1307 :param l_jobs_not_today List: the list of jobs that do not run today
1309 for job in l_jobs_not_today:
1310 xmlj = src.xmlManager.add_simple_node(xml_node_jobs,
1312 attrib={"name" : job.name})
1313 src.xmlManager.add_simple_node(xmlj, "application", job.application)
1314 src.xmlManager.add_simple_node(xmlj,
1316 job.machine.distribution)
1317 src.xmlManager.add_simple_node(xmlj, "board", job.board)
1318 src.xmlManager.add_simple_node(xmlj,
1319 "commands", " ; ".join(job.commands))
1320 src.xmlManager.add_simple_node(xmlj, "state", "Not today")
1321 src.xmlManager.add_simple_node(xmlj, "machine", job.machine.name)
1322 src.xmlManager.add_simple_node(xmlj, "host", job.machine.host)
1323 src.xmlManager.add_simple_node(xmlj, "port", str(job.machine.port))
1324 src.xmlManager.add_simple_node(xmlj, "user", job.machine.user)
1325 src.xmlManager.add_simple_node(xmlj, "sat_path",
1326 job.machine.sat_path)
1327 xml_history = src.xmlManager.add_simple_node(xmlj, "history")
1328 for i, (date, res_job, link) in enumerate(self.history[job.name]):
1330 # tag the first one (the last one)
1331 src.xmlManager.add_simple_node(xml_history,
1334 attrib={"date" : date,
1338 src.xmlManager.add_simple_node(xml_history,
1341 attrib={"date" : date,
1345 def parse_csv_boards(self, today):
1346 """ Parse the csv file that describes the boards to produce and fill
1347 the dict d_input_boards that contain the csv file contain
1349 :param today int: the current day of the week
1351 # open the csv file and read its content
1353 with open(self.file_boards, 'r') as f:
1354 reader = csv.reader(f,delimiter=CSV_DELIMITER)
1357 # get the delimiter for the boards (empty line)
1358 boards_delimiter = [''] * len(l_read[0])
1359 # Make the list of boards, by splitting with the delimiter
1360 l_boards = [list(y) for x, y in itertools.groupby(l_read,
1361 lambda z: z == boards_delimiter) if not x]
1363 # loop over the csv lists of lines and get the rows, columns and jobs
1365 for input_board in l_boards:
1367 board_name = input_board[0][0]
1370 columns = input_board[0][1:]
1375 for line in input_board[1:]:
1378 for i, square in enumerate(line[1:]):
1381 days = square.split(DAYS_SEPARATOR)
1382 days = [int(day) for day in days]
1383 job = (row, columns[i])
1387 jobs_not_today.append(job)
1389 d_boards[board_name] = {"rows" : rows,
1390 "columns" : columns,
1392 "jobs_not_today" : jobs_not_today}
1394 self.d_input_boards = d_boards
1396 def update_xml_files(self, l_jobs):
1397 '''Write all the xml files with updated information about the jobs
1399 :param l_jobs List: the list of jobs that run today
1401 for xml_file in [self.xml_global_file] + list(
1402 self.d_xml_board_files.values()):
1403 self.update_xml_file(l_jobs, xml_file)
1406 self.write_xml_files()
1408 def update_xml_file(self, l_jobs, xml_file):
1409 '''update information about the jobs for the file xml_file
1411 :param l_jobs List: the list of jobs that run today
1412 :param xml_file xmlManager.XmlLogFile: the xml instance to update
1415 xml_node_jobs = xml_file.xmlroot.find('jobs')
1416 # Update the job names and status node
1418 # Find the node corresponding to the job and delete it
1419 # in order to recreate it
1420 for xmljob in xml_node_jobs.findall('job'):
1421 if xmljob.attrib['name'] == job.name:
1422 xml_node_jobs.remove(xmljob)
1426 T0 = time.strftime('%Y-%m-%d %H:%M:%S',
1427 time.localtime(job._T0))
1430 Tf = time.strftime('%Y-%m-%d %H:%M:%S',
1431 time.localtime(job._Tf))
1433 # recreate the job node
1434 xmlj = src.xmlManager.add_simple_node(xml_node_jobs,
1436 attrib={"name" : job.name})
1437 src.xmlManager.add_simple_node(xmlj, "machine", job.machine.name)
1438 src.xmlManager.add_simple_node(xmlj, "host", job.machine.host)
1439 src.xmlManager.add_simple_node(xmlj, "port", str(job.machine.port))
1440 src.xmlManager.add_simple_node(xmlj, "user", job.machine.user)
1441 xml_history = src.xmlManager.add_simple_node(xmlj, "history")
1442 for date, res_job, link in self.history[job.name]:
1443 src.xmlManager.add_simple_node(xml_history,
1446 attrib={"date" : date,
1449 src.xmlManager.add_simple_node(xmlj, "sat_path",
1450 job.machine.sat_path)
1451 src.xmlManager.add_simple_node(xmlj, "application", job.application)
1452 src.xmlManager.add_simple_node(xmlj, "distribution",
1453 job.machine.distribution)
1454 src.xmlManager.add_simple_node(xmlj, "board", job.board)
1455 src.xmlManager.add_simple_node(xmlj, "timeout", str(job.timeout))
1456 src.xmlManager.add_simple_node(xmlj, "commands",
1457 " ; ".join(job.commands))
1458 src.xmlManager.add_simple_node(xmlj, "state", job.get_status())
1459 src.xmlManager.add_simple_node(xmlj, "begin", T0)
1460 src.xmlManager.add_simple_node(xmlj, "end", Tf)
1461 src.xmlManager.add_simple_node(xmlj, "out",
1462 src.printcolors.cleancolor(job.out))
1463 src.xmlManager.add_simple_node(xmlj, "err",
1464 src.printcolors.cleancolor(job.err))
1465 src.xmlManager.add_simple_node(xmlj, "res", str(job.res_job))
1466 if len(job.remote_log_files) > 0:
1467 src.xmlManager.add_simple_node(xmlj,
1468 "remote_log_file_path",
1469 job.remote_log_files[0])
1471 src.xmlManager.add_simple_node(xmlj,
1472 "remote_log_file_path",
1475 xmlafter = src.xmlManager.add_simple_node(xmlj, "after", job.after)
1476 # get the job father
1477 if job.after is not None:
1480 if jb.name == job.after:
1483 if (job_father is not None and
1484 len(job_father.remote_log_files) > 0):
1485 link = job_father.remote_log_files[0]
1488 src.xmlManager.append_node_attrib(xmlafter, {"link" : link})
1490 # Verify that the job is to be done today regarding the input csv
1492 if job.board and job.board in self.d_input_boards.keys():
1494 for dist, appli in self.d_input_boards[job.board]["jobs"]:
1495 if (job.machine.distribution == dist
1496 and job.application == appli):
1498 src.xmlManager.add_simple_node(xmlj,
1503 src.xmlManager.add_simple_node(xmlj,
1509 xml_node_infos = xml_file.xmlroot.find('infos')
1510 src.xmlManager.append_node_attrib(xml_node_infos,
1512 datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")})
1516 def last_update(self, finish_status = "finished"):
1517 '''update information about the jobs for the file xml_file
1519 :param l_jobs List: the list of jobs that run today
1520 :param xml_file xmlManager.XmlLogFile: the xml instance to update
1522 for xml_file in [self.xml_global_file] + list(self.d_xml_board_files.values()):
1523 xml_node_infos = xml_file.xmlroot.find('infos')
1524 src.xmlManager.append_node_attrib(xml_node_infos,
1525 attrib={"JobsCommandStatus" : finish_status})
1527 self.write_xml_files()
1529 def write_xml_file(self, xml_file, stylesheet):
1530 ''' Write one xml file and the same file with prefix
1532 xml_file.write_tree(stylesheet)
1533 file_path = xml_file.logFile
1534 file_dir = os.path.dirname(file_path)
1535 file_name = os.path.basename(file_path)
1536 file_name_with_prefix = self.prefix + "_" + file_name
1537 xml_file.write_tree(stylesheet, os.path.join(file_dir,
1538 file_name_with_prefix))
1540 def write_xml_files(self):
1541 ''' Write the xml files
1543 self.write_xml_file(self.xml_global_file, STYLESHEET_GLOBAL)
1544 for xml_file in self.d_xml_board_files.values():
1545 self.write_xml_file(xml_file, STYLESHEET_BOARD)
1549 # Describes the command
1551 return _("The jobs command launches maintenances that are described"
1552 " in the dedicated jobs configuration file.\n\nexample:\nsat "
1553 "jobs --name my_jobs --publish")
1557 def run(args, runner, logger):
1559 (options, args) = parser.parse_args(args)
1561 l_cfg_dir = runner.cfg.PATHS.JOBPATH
1563 # list option : display all the available config files
1565 for cfg_dir in l_cfg_dir:
1566 if not options.no_label:
1567 logger.write("------ %s\n" %
1568 src.printcolors.printcHeader(cfg_dir))
1570 for f in sorted(os.listdir(cfg_dir)):
1571 if not f.endswith('.pyconf'):
1574 logger.write("%s\n" % cfilename)
1577 # Make sure the jobs_config option has been called
1578 if not options.jobs_cfg:
1579 message = _("The option --jobs_config is required\n")
1580 src.printcolors.printcError(message)
1583 # Find the file in the directories
1585 for cfg_dir in l_cfg_dir:
1586 file_jobs_cfg = os.path.join(cfg_dir, options.jobs_cfg)
1587 if not file_jobs_cfg.endswith('.pyconf'):
1588 file_jobs_cfg += '.pyconf'
1590 if not os.path.exists(file_jobs_cfg):
1597 msg = _("The file configuration %(name_file)s was not found."
1598 "\nUse the --list option to get the possible files.")
1599 src.printcolors.printcError(msg)
1603 (_("Platform"), runner.cfg.VARS.dist),
1604 (_("File containing the jobs configuration"), file_jobs_cfg)
1606 src.print_info(logger, info)
1608 # Read the config that is in the file
1609 config_jobs = src.read_config_from_a_file(file_jobs_cfg)
1610 if options.only_jobs:
1611 l_jb = src.pyconf.Sequence()
1612 for jb in config_jobs.jobs:
1613 if jb.name in options.only_jobs:
1615 "Adding a job that was given in only_jobs option parameters")
1616 config_jobs.jobs = l_jb
1619 today_jobs = Jobs(runner,
1623 # SSH connection to all machines
1624 today_jobs.ssh_connection_all_machines()
1625 if options.test_connection:
1630 logger.write(src.printcolors.printcInfo(
1631 _("Initialize the xml boards : ")), 5)
1634 # Copy the stylesheets in the log directory
1635 log_dir = runner.cfg.USER.log_dir
1636 xsl_dir = os.path.join(runner.cfg.VARS.srcDir, 'xsl')
1638 files_to_copy.append(os.path.join(xsl_dir, STYLESHEET_GLOBAL))
1639 files_to_copy.append(os.path.join(xsl_dir, STYLESHEET_BOARD))
1640 files_to_copy.append(os.path.join(xsl_dir, "running.gif"))
1641 for file_path in files_to_copy:
1642 shutil.copy2(file_path, log_dir)
1644 # Instanciate the Gui in order to produce the xml files that contain all
1646 gui = Gui(runner.cfg.USER.log_dir,
1648 today_jobs.ljobs_not_today,
1649 runner.cfg.VARS.datehour,
1650 file_boards = options.input_boards)
1652 logger.write(src.printcolors.printcSuccess("OK"), 5)
1653 logger.write("\n\n", 5)
1656 # Display the list of the xml files
1657 logger.write(src.printcolors.printcInfo(("Here is the list of published"
1659 logger.write("%s\n" % gui.xml_global_file.logFile, 4)
1660 for board in gui.d_xml_board_files.keys():
1661 file_path = gui.d_xml_board_files[board].logFile
1662 file_name = os.path.basename(file_path)
1663 logger.write("%s\n" % file_path, 4)
1664 logger.add_link(file_name, "board", 0, board)
1666 logger.write("\n", 4)
1668 today_jobs.gui = gui
1672 # Run all the jobs contained in config_jobs
1673 today_jobs.run_jobs()
1674 except KeyboardInterrupt:
1676 logger.write("\n\n%s\n\n" %
1677 (src.printcolors.printcWarning(_("Forced interruption"))), 1)
1682 msg = _("Killing the running jobs and trying"
1683 " to get the corresponding logs\n")
1684 logger.write(src.printcolors.printcWarning(msg))
1686 # find the potential not finished jobs and kill them
1687 for jb in today_jobs.ljobs:
1688 if not jb.has_finished():
1691 jb.kill_remote_process()
1692 except Exception as e:
1693 msg = _("Failed to kill job %s: %s\n" % (jb.name, e))
1694 logger.write(src.printcolors.printcWarning(msg))
1695 if jb.res_job != "0":
1699 today_jobs.gui.last_update(_("Forced interruption"))
1702 today_jobs.gui.last_update()
1703 # Output the results
1704 today_jobs.write_all_results()