3 # Copyright (C) 2010-2013 CEA/DEN
5 # This library is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation; either
8 # version 2.1 of the License.
10 # This library is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 # Lesser General Public License for more details.
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this library; if not, write to the Free Software
17 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
28 STYLESHEET_GLOBAL = "jobs_global_report.xsl"
29 STYLESHEET_BOARD = "jobs_board_report.xsl"
30 d_INT_DAY = {0 : "monday",
38 parser = src.options.Options()
40 parser.add_option('n', 'name', 'string', 'jobs_cfg',
41 _('The name of the config file that contains'
42 ' the jobs configuration'))
43 parser.add_option('o', 'only_jobs', 'list2', 'only_jobs',
44 _('Optional: the list of jobs to launch, by their name. '))
45 parser.add_option('l', 'list', 'boolean', 'list',
46 _('Optional: list all available config files.'))
47 parser.add_option('t', 'test_connection', 'boolean', 'test_connection',
48 _("Optional: try to connect to the machines. "
49 "Not executing the jobs."),
51 parser.add_option('p', 'publish', 'boolean', 'publish',
52 _("Optional: generate an xml file that can be read in a "
53 "browser to display the jobs status."),
55 parser.add_option('i', 'input_boards', 'list2', 'input_boards', _("Optional: "
56 "the list of path to csv files that contain "
57 "the expected boards."),[])
58 parser.add_option('n', 'completion', 'boolean', 'no_label',
59 _("Optional (internal use): do not print labels, Works only "
63 class Machine(object):
64 '''Class to manage a ssh connection on a machine
72 sat_path="salomeTools"):
76 self.distribution = None # Will be filled after copying SAT on the machine
78 self.password = passwd
79 self.sat_path = sat_path
80 self.ssh = paramiko.SSHClient()
81 self._connection_successful = None
83 def connect(self, logger):
84 '''Initiate the ssh connection to the remote machine
86 :param logger src.logger.Logger: The logger instance
91 self._connection_successful = False
92 self.ssh.load_system_host_keys()
93 self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
95 self.ssh.connect(self.host,
98 password = self.password)
99 except paramiko.AuthenticationException:
100 message = src.KO_STATUS + _("Authentication failed")
101 except paramiko.BadHostKeyException:
102 message = (src.KO_STATUS +
103 _("The server's host key could not be verified"))
104 except paramiko.SSHException:
105 message = ( _("SSHException error connecting or "
106 "establishing an SSH session"))
108 message = ( _("Error connecting or establishing an SSH session"))
110 self._connection_successful = True
114 def successfully_connected(self, logger):
115 '''Verify if the connection to the remote machine has succeed
117 :param logger src.logger.Logger: The logger instance
118 :return: True if the connection has succeed, False if not
121 if self._connection_successful == None:
122 message = _("Warning : trying to ask if the connection to "
123 "(name: %s host: %s, port: %s, user: %s) is OK whereas there were"
124 " no connection request" %
125 (self.name, self.host, self.port, self.user))
126 logger.write( src.printcolors.printcWarning(message))
127 return self._connection_successful
129 def copy_sat(self, sat_local_path, job_file):
130 '''Copy salomeTools to the remote machine in self.sat_path
134 # open a sftp connection
135 self.sftp = self.ssh.open_sftp()
136 # Create the sat directory on remote machine if it is not existing
137 self.mkdir(self.sat_path, ignore_existing=True)
139 self.put_dir(sat_local_path, self.sat_path, filters = ['.git'])
140 # put the job configuration file in order to make it reachable
141 # on the remote machine
142 self.sftp.put(job_file, os.path.join(".salomeTools",
144 ".jobs_command_file.pyconf"))
145 except Exception as e:
147 self._connection_successful = False
151 def put_dir(self, source, target, filters = []):
152 ''' Uploads the contents of the source directory to the target path. The
153 target directory needs to exists. All sub-directories in source are
154 created under target.
156 for item in os.listdir(source):
159 source_path = os.path.join(source, item)
160 destination_path = os.path.join(target, item)
161 if os.path.islink(source_path):
162 linkto = os.readlink(source_path)
164 self.sftp.symlink(linkto, destination_path)
165 self.sftp.chmod(destination_path,
166 os.stat(source_path).st_mode)
170 if os.path.isfile(source_path):
171 self.sftp.put(source_path, destination_path)
172 self.sftp.chmod(destination_path,
173 os.stat(source_path).st_mode)
175 self.mkdir(destination_path, ignore_existing=True)
176 self.put_dir(source_path, destination_path)
178 def mkdir(self, path, mode=511, ignore_existing=False):
179 ''' Augments mkdir by adding an option to not fail
183 self.sftp.mkdir(path, mode)
190 def exec_command(self, command, logger):
191 '''Execute the command on the remote machine
193 :param command str: The command to be run
194 :param logger src.logger.Logger: The logger instance
195 :return: the stdin, stdout, and stderr of the executing command,
197 :rtype: (paramiko.channel.ChannelFile, paramiko.channel.ChannelFile,
198 paramiko.channel.ChannelFile)
201 # Does not wait the end of the command
202 (stdin, stdout, stderr) = self.ssh.exec_command(command)
203 except paramiko.SSHException:
204 message = src.KO_STATUS + _(
205 ": the server failed to execute the command\n")
206 logger.write( src.printcolors.printcError(message))
207 return (None, None, None)
209 logger.write( src.printcolors.printcError(src.KO_STATUS + '\n'))
210 return (None, None, None)
212 return (stdin, stdout, stderr)
215 '''Close the ssh connection
221 def write_info(self, logger):
222 '''Prints the informations relative to the machine in the logger
223 (terminal traces and log file)
225 :param logger src.logger.Logger: The logger instance
229 logger.write("host : " + self.host + "\n")
230 logger.write("port : " + str(self.port) + "\n")
231 logger.write("user : " + str(self.user) + "\n")
232 if self.successfully_connected(logger):
233 status = src.OK_STATUS
235 status = src.KO_STATUS
236 logger.write("Connection : " + status + "\n\n")
240 '''Class to manage one job
242 def __init__(self, name, machine, application, board,
243 commands, timeout, config, logger, after=None):
246 self.machine = machine
248 self.timeout = timeout
249 self.application = application
253 # The list of log files to download from the remote machine
254 self.remote_log_files = []
256 # The remote command status
257 # -1 means that it has not been launched,
258 # 0 means success and 1 means fail
260 self.cancelled = False
264 self._has_begun = False
265 self._has_finished = False
266 self._has_timouted = False
267 self._stdin = None # Store the command inputs field
268 self._stdout = None # Store the command outputs field
269 self._stderr = None # Store the command errors field
271 self.out = None # Contains something only if the job is finished
272 self.err = None # Contains something only if the job is finished
274 self.commands = commands
275 self.command = (os.path.join(self.machine.sat_path, "sat") +
277 os.path.join(self.machine.sat_path,
278 "list_log_files.txt") +
279 " job --jobs_config .jobs_command_file" +
284 """ Get the pid(s) corresponding to the command that have been launched
285 On the remote machine
287 :return: The list of integers corresponding to the found pids
291 cmd_pid = 'ps aux | grep "' + self.command + '" | awk \'{print $2}\''
292 (_, out_pid, _) = self.machine.exec_command(cmd_pid, self.logger)
293 pids_cmd = out_pid.readlines()
294 pids_cmd = [str(src.only_numbers(pid)) for pid in pids_cmd]
298 def kill_remote_process(self, wait=1):
299 '''Kills the process on the remote machine.
301 :return: (the output of the kill, the error of the kill)
305 pids = self.get_pids()
306 cmd_kill = " ; ".join([("kill -2 " + pid) for pid in pids])
307 (_, out_kill, err_kill) = self.machine.exec_command(cmd_kill,
310 return (out_kill, err_kill)
313 '''Returns True if the job has already begun
315 :return: True if the job has already begun
318 return self._has_begun
320 def has_finished(self):
321 '''Returns True if the job has already finished
322 (i.e. all the commands have been executed)
323 If it is finished, the outputs are stored in the fields out and err.
325 :return: True if the job has already finished
329 # If the method has already been called and returned True
330 if self._has_finished:
333 # If the job has not begun yet
334 if not self.has_begun():
337 if self._stdout.channel.closed:
338 self._has_finished = True
339 # Store the result outputs
340 self.out = self._stdout.read().decode()
341 self.err = self._stderr.read().decode()
343 self._Tf = time.time()
344 # And get the remote command status and log files
347 return self._has_finished
349 def get_log_files(self):
350 """Get the log files produced by the command launched
351 on the remote machine.
353 # Do not get the files if the command is not finished
354 if not self.has_finished():
355 msg = _("Trying to get log files whereas the job is not finished.")
356 self.logger.write(src.printcolors.printcWarning(msg))
359 # First get the file that contains the list of log files to get
360 tmp_file_path = src.get_tmp_filename(self.config, "list_log_files.txt")
361 remote_path = os.path.join(self.machine.sat_path, "list_log_files.txt")
362 self.machine.sftp.get(
366 # Read the file and get the result of the command and all the log files
368 fstream_tmp = open(tmp_file_path, "r")
369 file_lines = fstream_tmp.readlines()
370 file_lines = [line.replace("\n", "") for line in file_lines]
372 os.remove(tmp_file_path)
375 # The first line is the result of the command (0 success or 1 fail)
376 self.res_job = file_lines[0]
377 except Exception as e:
378 self.err += _("Unable to get status from remote file %s: %s" %
379 (remote_path, str(e)))
381 for i, job_path_remote in enumerate(file_lines[1:]):
383 # For each command, there is two files to get :
384 # 1- The xml file describing the command and giving the
386 # 2- The txt file containing the system command traces (like
387 # traces produced by the "make" command)
388 if os.path.basename(os.path.dirname(job_path_remote)) != 'OUT':
390 local_path = os.path.join(os.path.dirname(
391 self.logger.logFilePath),
392 os.path.basename(job_path_remote))
393 if i==0: # The first is the job command
394 self.logger.add_link(os.path.basename(job_path_remote),
400 local_path = os.path.join(os.path.dirname(
401 self.logger.logFilePath),
403 os.path.basename(job_path_remote))
405 if not os.path.exists(local_path):
406 self.machine.sftp.get(job_path_remote, local_path)
407 self.remote_log_files.append(local_path)
408 except Exception as e:
409 self.err += _("Unable to get %s log file from remote: %s" %
410 (job_path_remote, str(e)))
412 def has_failed(self):
413 '''Returns True if the job has failed.
414 A job is considered as failed if the machine could not be reached,
415 if the remote command failed,
416 or if the job finished with a time out.
418 :return: True if the job has failed
421 if not self.has_finished():
423 if not self.machine.successfully_connected(self.logger):
425 if self.is_timeout():
427 if self.res_job == "1":
432 """In case of a failing job, one has to cancel every job that depend
433 on it. This method put the job as failed and will not be executed.
435 self._has_begun = True
436 self._has_finished = True
437 self.cancelled = True
438 self.out = _("This job was not launched because its father has failed.")
439 self.err = _("This job was not launched because its father has failed.")
441 def is_running(self):
442 '''Returns True if the job commands are running
444 :return: True if the job is running
447 return self.has_begun() and not self.has_finished()
449 def is_timeout(self):
450 '''Returns True if the job commands has finished with timeout
452 :return: True if the job has finished with timeout
455 return self._has_timouted
457 def time_elapsed(self):
458 """Get the time elapsed since the job launching
460 :return: The number of seconds
463 if not self.has_begun():
466 return T_now - self._T0
468 def check_time(self):
469 """Verify that the job has not exceeded its timeout.
470 If it has, kill the remote command and consider the job as finished.
472 if not self.has_begun():
474 if self.time_elapsed() > self.timeout:
475 self._has_finished = True
476 self._has_timouted = True
477 self._Tf = time.time()
479 (out_kill, _) = self.kill_remote_process()
480 self.out = "TIMEOUT \n" + out_kill.read().decode()
481 self.err = "TIMEOUT : %s seconds elapsed\n" % str(self.timeout)
484 except Exception as e:
485 self.err += _("Unable to get remote log files: %s" % e)
487 def total_duration(self):
488 """Give the total duration of the job
490 :return: the total duration of the job in seconds
493 return self._Tf - self._T0
496 """Launch the job by executing the remote command.
499 # Prevent multiple run
501 msg = _("Warning: A job can only be launched one time")
502 msg2 = _("Trying to launch the job \"%s\" whereas it has "
503 "already been launched." % self.name)
504 self.logger.write(src.printcolors.printcWarning("%s\n%s\n" % (msg,
508 # Do not execute the command if the machine could not be reached
509 if not self.machine.successfully_connected(self.logger):
510 self._has_finished = True
512 self.err = ("Connection to machine (name : %s, host: %s, port:"
513 " %s, user: %s) has failed\nUse the log command "
514 "to get more information."
515 % (self.machine.name,
520 # Usual case : Launch the command on remote machine
521 self._T0 = time.time()
522 self._stdin, self._stdout, self._stderr = self.machine.exec_command(
525 # If the results are not initialized, finish the job
526 if (self._stdin, self._stdout, self._stderr) == (None, None, None):
527 self._has_finished = True
528 self._Tf = time.time()
530 self.err = "The server failed to execute the command"
532 # Put the beginning flag to true.
533 self._has_begun = True
535 def write_results(self):
536 """Display on the terminal all the job's information
538 self.logger.write("name : " + self.name + "\n")
540 self.logger.write("after : %s\n" % self.after)
541 self.logger.write("Time elapsed : %4imin %2is \n" %
542 (self.total_duration()//60 , self.total_duration()%60))
544 self.logger.write("Begin time : %s\n" %
545 time.strftime('%Y-%m-%d %H:%M:%S',
546 time.localtime(self._T0)) )
548 self.logger.write("End time : %s\n\n" %
549 time.strftime('%Y-%m-%d %H:%M:%S',
550 time.localtime(self._Tf)) )
552 machine_head = "Informations about connection :\n"
553 underline = (len(machine_head) - 2) * "-"
554 self.logger.write(src.printcolors.printcInfo(
555 machine_head+underline+"\n"))
556 self.machine.write_info(self.logger)
558 self.logger.write(src.printcolors.printcInfo("out : \n"))
560 self.logger.write("Unable to get output\n")
562 self.logger.write(self.out + "\n")
563 self.logger.write(src.printcolors.printcInfo("err : \n"))
565 self.logger.write("Unable to get error\n")
567 self.logger.write(self.err + "\n")
569 def get_status(self):
570 """Get the status of the job (used by the Gui for xml display)
572 :return: The current status of the job
575 if not self.machine.successfully_connected(self.logger):
576 return "SSH connection KO"
577 if not self.has_begun():
578 return "Not launched"
581 if self.is_running():
582 return "running since " + time.strftime('%Y-%m-%d %H:%M:%S',
583 time.localtime(self._T0))
584 if self.has_finished():
585 if self.is_timeout():
586 return "Timeout since " + time.strftime('%Y-%m-%d %H:%M:%S',
587 time.localtime(self._Tf))
588 return "Finished since " + time.strftime('%Y-%m-%d %H:%M:%S',
589 time.localtime(self._Tf))
592 '''Class to manage the jobs to be run
599 lenght_columns = 20):
600 # The jobs configuration
601 self.cfg_jobs = config_jobs
602 self.job_file_path = job_file_path
603 # The machine that will be used today
605 # The list of machine (hosts, port) that will be used today
606 # (a same host can have several machine instances since there
607 # can be several ssh parameters)
609 # The jobs to be launched today
611 # The jobs that will not be launched today
612 self.ljobs_not_today = []
615 self.len_columns = lenght_columns
617 # the list of jobs that have not been run yet
618 self._l_jobs_not_started = []
619 # the list of jobs that have already ran
620 self._l_jobs_finished = []
621 # the list of jobs that are running
622 self._l_jobs_running = []
624 self.determine_jobs_and_machines()
626 def define_job(self, job_def, machine):
627 '''Takes a pyconf job definition and a machine (from class machine)
628 and returns the job instance corresponding to the definition.
630 :param job_def src.config.Mapping: a job definition
631 :param machine machine: the machine on which the job will run
632 :return: The corresponding job in a job class instance
636 cmmnds = job_def.commands
637 timeout = job_def.timeout
639 if 'after' in job_def:
640 after = job_def.after
642 if 'application' in job_def:
643 application = job_def.application
645 if 'board' in job_def:
646 board = job_def.board
658 def determine_jobs_and_machines(self):
659 '''Function that reads the pyconf jobs definition and instantiates all
660 the machines and jobs to be done today.
665 today = datetime.date.weekday(datetime.date.today())
668 for job_def in self.cfg_jobs.jobs :
670 if not "machine" in job_def:
671 msg = _('WARNING: The job "%s" do not have the key '
672 '"machine", this job is ignored.\n\n' % job_def.name)
673 self.logger.write(src.printcolors.printcWarning(msg))
675 name_machine = job_def.machine
678 for mach in self.lmachines:
679 if mach.name == name_machine:
683 if a_machine == None:
684 for machine_def in self.cfg_jobs.machines:
685 if machine_def.name == name_machine:
686 if 'host' not in machine_def:
687 host = self.runner.cfg.VARS.hostname
689 host = machine_def.host
691 if 'user' not in machine_def:
692 user = self.runner.cfg.VARS.user
694 user = machine_def.user
696 if 'port' not in machine_def:
699 port = machine_def.port
701 if 'password' not in machine_def:
704 passwd = machine_def.password
706 if 'sat_path' not in machine_def:
707 sat_path = "salomeTools"
709 sat_path = machine_def.sat_path
720 self.lmachines.append(a_machine)
721 if (host, port) not in host_list:
722 host_list.append((host, port))
724 if a_machine == None:
725 msg = _("WARNING: The job \"%(job_name)s\" requires the "
726 "machine \"%(machine_name)s\" but this machine "
727 "is not defined in the configuration file.\n"
728 "The job will not be launched")
729 self.logger.write(src.printcolors.printcWarning(msg))
731 a_job = self.define_job(job_def, a_machine)
733 if today in job_def.when:
734 self.ljobs.append(a_job)
735 else: # today in job_def.when
736 self.ljobs_not_today.append(a_job)
738 self.lhosts = host_list
740 def ssh_connection_all_machines(self, pad=50):
741 '''Function that do the ssh connection to every machine
747 self.logger.write(src.printcolors.printcInfo((
748 "Establishing connection with all the machines :\n")))
749 for machine in self.lmachines:
750 # little algorithm in order to display traces
751 begin_line = (_("Connection to %s: " % machine.name))
752 if pad - len(begin_line) < 0:
755 endline = (pad - len(begin_line)) * "." + " "
757 step = "SSH connection"
758 self.logger.write( begin_line + endline + step)
760 # the call to the method that initiate the ssh connection
761 msg = machine.connect(self.logger)
763 # Copy salomeTools to the remote machine
764 if machine.successfully_connected(self.logger):
766 self.logger.write('\r%s%s%s' % (begin_line, endline, 20 * " "),3)
767 self.logger.write('\r%s%s%s' % (begin_line, endline, step), 3)
769 res_copy = machine.copy_sat(self.runner.cfg.VARS.salometoolsway,
771 # get the remote machine distribution using a sat command
772 (__, out_dist, __) = machine.exec_command(
773 os.path.join(machine.sat_path,
774 "sat config --value VARS.dist --no_label"),
776 machine.distribution = out_dist.read().decode().replace("\n",
778 # Print the status of the copy
780 self.logger.write('\r%s' %
781 ((len(begin_line)+len(endline)+20) * " "), 3)
782 self.logger.write('\r%s%s%s' %
785 src.printcolors.printc(src.OK_STATUS)), 3)
787 self.logger.write('\r%s' %
788 ((len(begin_line)+len(endline)+20) * " "), 3)
789 self.logger.write('\r%s%s%s %s' %
792 src.printcolors.printc(src.OK_STATUS),
793 _("Copy of SAT failed")), 3)
795 self.logger.write('\r%s' %
796 ((len(begin_line)+len(endline)+20) * " "), 3)
797 self.logger.write('\r%s%s%s %s' %
800 src.printcolors.printc(src.KO_STATUS),
802 self.logger.write("\n", 3)
804 self.logger.write("\n")
807 def is_occupied(self, hostname):
808 '''Function that returns True if a job is running on
809 the machine defined by its host and its port.
811 :param hostname (str, int): the pair (host, port)
812 :return: the job that is running on the host,
813 or false if there is no job running on the host.
818 for jb in self.ljobs:
819 if jb.machine.host == host and jb.machine.port == port:
824 def update_jobs_states_list(self):
825 '''Function that updates the lists that store the currently
826 running jobs and the jobs that have already finished.
831 jobs_finished_list = []
832 jobs_running_list = []
833 for jb in self.ljobs:
835 jobs_running_list.append(jb)
837 if jb.has_finished():
838 jobs_finished_list.append(jb)
840 nb_job_finished_before = len(self._l_jobs_finished)
841 self._l_jobs_finished = jobs_finished_list
842 self._l_jobs_running = jobs_running_list
844 nb_job_finished_now = len(self._l_jobs_finished)
846 return nb_job_finished_now > nb_job_finished_before
848 def cancel_dependencies_of_failing_jobs(self):
849 '''Function that cancels all the jobs that depend on a failing one.
855 for job in self.ljobs:
856 if job.after is None:
858 father_job = self.find_job_that_has_name(job.after)
859 if father_job is not None and father_job.has_failed():
862 def find_job_that_has_name(self, name):
863 '''Returns the job by its name.
865 :param name str: a job name
866 :return: the job that has the name.
869 for jb in self.ljobs:
872 # the following is executed only if the job was not found
875 def str_of_length(self, text, length):
876 '''Takes a string text of any length and returns
877 the most close string of length "length".
879 :param text str: any string
880 :param length int: a length for the returned string
881 :return: the most close string of length "length"
884 if len(text) > length:
885 text_out = text[:length-3] + '...'
887 diff = length - len(text)
888 before = " " * (diff//2)
889 after = " " * (diff//2 + diff%2)
890 text_out = before + text + after
894 def display_status(self, len_col):
895 '''Takes a lenght and construct the display of the current status
896 of the jobs in an array that has a column for each host.
897 It displays the job that is currently running on the host
900 :param len_col int: the size of the column
906 for host_port in self.lhosts:
907 jb = self.is_occupied(host_port)
908 if not jb: # nothing running on the host
909 empty = self.str_of_length("empty", len_col)
910 display_line += "|" + empty
912 display_line += "|" + src.printcolors.printcInfo(
913 self.str_of_length(jb.name, len_col))
915 self.logger.write("\r" + display_line + "|")
920 '''The main method. Runs all the jobs on every host.
921 For each host, at a given time, only one job can be running.
922 The jobs that have the field after (that contain the job that has
923 to be run before it) are run after the previous job.
924 This method stops when all the jobs are finished.
931 self.logger.write(src.printcolors.printcInfo(
932 _('Executing the jobs :\n')))
934 for host_port in self.lhosts:
937 if port == 22: # default value
938 text_line += "|" + self.str_of_length(host, self.len_columns)
940 text_line += "|" + self.str_of_length(
941 "("+host+", "+str(port)+")", self.len_columns)
943 tiret_line = " " + "-"*(len(text_line)-1) + "\n"
944 self.logger.write(tiret_line)
945 self.logger.write(text_line + "|\n")
946 self.logger.write(tiret_line)
949 # The infinite loop that runs the jobs
950 l_jobs_not_started = src.deepcopy_list(self.ljobs)
951 while len(self._l_jobs_finished) != len(self.ljobs):
952 new_job_start = False
953 for host_port in self.lhosts:
955 if self.is_occupied(host_port):
958 for jb in l_jobs_not_started:
959 if (jb.machine.host, jb.machine.port) != host_port:
963 l_jobs_not_started.remove(jb)
967 jb_before = self.find_job_that_has_name(jb.after)
968 if jb_before is None:
970 msg = _("This job was not launched because its "
971 "father is not in the jobs list.")
975 if jb_before.has_finished():
977 l_jobs_not_started.remove(jb)
980 self.cancel_dependencies_of_failing_jobs()
981 new_job_finished = self.update_jobs_states_list()
983 if new_job_start or new_job_finished:
985 self.gui.update_xml_files(self.ljobs)
986 # Display the current status
987 self.display_status(self.len_columns)
989 # Make sure that the proc is not entirely busy
992 self.logger.write("\n")
993 self.logger.write(tiret_line)
994 self.logger.write("\n\n")
997 self.gui.update_xml_files(self.ljobs)
998 self.gui.last_update()
1000 def write_all_results(self):
1001 '''Display all the jobs outputs.
1007 for jb in self.ljobs:
1008 self.logger.write(src.printcolors.printcLabel(
1009 "#------- Results for job %s -------#\n" % jb.name))
1011 self.logger.write("\n\n")
1014 '''Class to manage the the xml data that can be displayed in a browser to
1018 def __init__(self, xml_dir_path, l_jobs, l_jobs_not_today, l_file_boards = []):
1021 :param xml_dir_path str: The path to the directory where to put
1022 the xml resulting files
1023 :param l_jobs List: the list of jobs that run today
1024 :param l_jobs_not_today List: the list of jobs that do not run today
1025 :param l_file_boards List: the list of file path from which to read the
1028 # The path of the csv files to read to fill the expected boards
1029 self.l_file_boards = l_file_boards
1031 today = d_INT_DAY[datetime.date.weekday(datetime.date.today())]
1032 self.parse_csv_boards(today)
1034 # The path of the global xml file
1035 self.xml_dir_path = xml_dir_path
1036 # Initialize the xml files
1037 xml_global_path = os.path.join(self.xml_dir_path, "global_report.xml")
1038 self.xml_global_file = src.xmlManager.XmlLogFile(xml_global_path,
1040 # The xml files that corresponds to the boards.
1041 # {name_board : xml_object}}
1042 self.d_xml_board_files = {}
1043 # Create the lines and columns
1044 self.initialize_boards(l_jobs, l_jobs_not_today)
1046 # Write the xml file
1047 self.update_xml_files(l_jobs)
1049 def add_xml_board(self, name):
1050 xml_board_path = os.path.join(self.xml_dir_path, name + ".xml")
1051 self.d_xml_board_files[name] = src.xmlManager.XmlLogFile(
1054 self.d_xml_board_files[name].add_simple_node("distributions")
1055 self.d_xml_board_files[name].add_simple_node("applications")
1056 self.d_xml_board_files[name].add_simple_node("board", text=name)
1058 def initialize_boards(self, l_jobs, l_jobs_not_today):
1059 '''Get all the first information needed for each file and write the
1060 first version of the files
1061 :param l_jobs List: the list of jobs that run today
1062 :param l_jobs_not_today List: the list of jobs that do not run today
1064 # Get the boards to fill and put it in a dictionary
1065 # {board_name : xml instance corresponding to the board}
1066 for job in l_jobs + l_jobs_not_today:
1068 if (board is not None and
1069 board not in self.d_xml_board_files.keys()):
1070 self.add_xml_board(board)
1072 # Verify that the boards given as input are done
1073 for board in list(self.d_input_boards.keys()):
1074 if board not in self.d_xml_board_files:
1075 self.add_xml_board(board)
1077 # Loop over all jobs in order to get the lines and columns for each
1081 for board in self.d_xml_board_files:
1083 d_application[board] = []
1087 for job in l_jobs + l_jobs_not_today:
1089 if (job.machine.host, job.machine.port) not in l_hosts_ports:
1090 l_hosts_ports.append((job.machine.host, job.machine.port))
1092 distrib = job.machine.distribution
1093 application = job.application
1095 board_job = job.board
1098 for board in self.d_xml_board_files:
1099 if board_job == board:
1100 if distrib is not None and distrib not in d_dist[board]:
1101 d_dist[board].append(distrib)
1102 src.xmlManager.add_simple_node(
1103 self.d_xml_board_files[board].xmlroot.find(
1106 attrib={"name" : distrib})
1108 if board_job == board:
1109 if (application is not None and
1110 application not in d_application[board]):
1111 d_application[board].append(application)
1112 src.xmlManager.add_simple_node(
1113 self.d_xml_board_files[board].xmlroot.find(
1117 "name" : application})
1119 # Verify that there are no missing application or distribution in the
1120 # xml board files (regarding the input boards)
1121 for board in self.d_xml_board_files:
1122 l_dist = d_dist[board]
1123 if board not in self.d_input_boards.keys():
1125 for dist in self.d_input_boards[board]["rows"]:
1126 if dist not in l_dist:
1127 src.xmlManager.add_simple_node(
1128 self.d_xml_board_files[board].xmlroot.find(
1131 attrib={"name" : dist})
1132 l_appli = d_application[board]
1133 for appli in self.d_input_boards[board]["columns"]:
1134 if appli not in l_appli:
1135 src.xmlManager.add_simple_node(
1136 self.d_xml_board_files[board].xmlroot.find(
1139 attrib={"name" : appli})
1141 # Initialize the hosts_ports node for the global file
1142 self.xmlhosts_ports = self.xml_global_file.add_simple_node(
1144 for host, port in l_hosts_ports:
1145 host_port = "%s:%i" % (host, port)
1146 src.xmlManager.add_simple_node(self.xmlhosts_ports,
1148 attrib={"name" : host_port})
1150 # Initialize the jobs node in all files
1151 for xml_file in [self.xml_global_file] + list(
1152 self.d_xml_board_files.values()):
1153 xml_jobs = xml_file.add_simple_node("jobs")
1154 # Get the jobs present in the config file but
1155 # that will not be launched today
1156 self.put_jobs_not_today(l_jobs_not_today, xml_jobs)
1158 xml_file.add_simple_node("infos",
1159 attrib={"name" : "last update",
1160 "JobsCommandStatus" : "running"})
1162 # Find in each board the squares that needs to be filled regarding the
1163 # input csv files but that are not covered by a today job
1164 for board in self.d_input_boards.keys():
1165 xml_root_board = self.d_xml_board_files[board].xmlroot
1166 xml_missing = src.xmlManager.add_simple_node(xml_root_board,
1168 for row, column in self.d_input_boards[board]["jobs"]:
1171 if (job.application == column and
1172 job.machine.distribution == row):
1176 src.xmlManager.add_simple_node(xml_missing,
1178 attrib={"distribution" : row,
1179 "application" : column })
1181 def put_jobs_not_today(self, l_jobs_not_today, xml_node_jobs):
1182 '''Get all the first information needed for each file and write the
1183 first version of the files
1185 :param xml_node_jobs etree.Element: the node corresponding to a job
1186 :param l_jobs_not_today List: the list of jobs that do not run today
1188 for job in l_jobs_not_today:
1189 xmlj = src.xmlManager.add_simple_node(xml_node_jobs,
1191 attrib={"name" : job.name})
1192 src.xmlManager.add_simple_node(xmlj, "application", job.application)
1193 src.xmlManager.add_simple_node(xmlj,
1195 job.machine.distribution)
1196 src.xmlManager.add_simple_node(xmlj, "board", job.board)
1197 src.xmlManager.add_simple_node(xmlj,
1198 "commands", " ; ".join(job.commands))
1199 src.xmlManager.add_simple_node(xmlj, "state", "Not today")
1200 src.xmlManager.add_simple_node(xmlj, "machine", job.machine.name)
1201 src.xmlManager.add_simple_node(xmlj, "host", job.machine.host)
1202 src.xmlManager.add_simple_node(xmlj, "port", str(job.machine.port))
1203 src.xmlManager.add_simple_node(xmlj, "user", job.machine.user)
1204 src.xmlManager.add_simple_node(xmlj, "sat_path",
1205 job.machine.sat_path)
1207 def parse_csv_boards(self, today):
1208 """ Parse the csv files that describes the boards to produce and fill
1209 the dict d_input_boards that contain the csv file contain
1211 :param today str: the current day of the week
1213 # loop over each csv file and read its content
1215 for file_path in self.l_file_boards:
1217 with open(file_path, 'r') as f:
1218 reader = csv.reader(f)
1221 l_boards.append(l_read)
1223 # loop over the csv lists of lines and get the rows, columns and jobs
1225 for input_board in l_boards:
1227 board_name = input_board[0][0]
1230 columns = input_board[0][1:]
1235 for line in input_board[1:]:
1237 for i, square in enumerate(line[1:]):
1241 if columns[i] not in columns_out:
1242 columns_out.append(columns[i])
1243 job = (row, columns[i])
1246 d_boards[board_name] = {"rows" : rows,
1247 "columns" : columns_out,
1250 self.d_input_boards = d_boards
1252 def update_xml_files(self, l_jobs):
1253 '''Write all the xml files with updated information about the jobs
1255 :param l_jobs List: the list of jobs that run today
1257 for xml_file in [self.xml_global_file] + list(
1258 self.d_xml_board_files.values()):
1259 self.update_xml_file(l_jobs, xml_file)
1262 self.write_xml_files()
1264 def update_xml_file(self, l_jobs, xml_file):
1265 '''update information about the jobs for the file xml_file
1267 :param l_jobs List: the list of jobs that run today
1268 :param xml_file xmlManager.XmlLogFile: the xml instance to update
1271 xml_node_jobs = xml_file.xmlroot.find('jobs')
1272 # Update the job names and status node
1274 # Find the node corresponding to the job and delete it
1275 # in order to recreate it
1276 for xmljob in xml_node_jobs.findall('job'):
1277 if xmljob.attrib['name'] == job.name:
1278 xml_node_jobs.remove(xmljob)
1282 T0 = time.strftime('%Y-%m-%d %H:%M:%S',
1283 time.localtime(job._T0))
1286 Tf = time.strftime('%Y-%m-%d %H:%M:%S',
1287 time.localtime(job._Tf))
1289 # recreate the job node
1290 xmlj = src.xmlManager.add_simple_node(xml_node_jobs,
1292 attrib={"name" : job.name})
1293 src.xmlManager.add_simple_node(xmlj, "machine", job.machine.name)
1294 src.xmlManager.add_simple_node(xmlj, "host", job.machine.host)
1295 src.xmlManager.add_simple_node(xmlj, "port", str(job.machine.port))
1296 src.xmlManager.add_simple_node(xmlj, "user", job.machine.user)
1297 src.xmlManager.add_simple_node(xmlj, "sat_path",
1298 job.machine.sat_path)
1299 src.xmlManager.add_simple_node(xmlj, "application", job.application)
1300 src.xmlManager.add_simple_node(xmlj, "distribution",
1301 job.machine.distribution)
1302 src.xmlManager.add_simple_node(xmlj, "board", job.board)
1303 src.xmlManager.add_simple_node(xmlj, "timeout", str(job.timeout))
1304 src.xmlManager.add_simple_node(xmlj, "commands",
1305 " ; ".join(job.commands))
1306 src.xmlManager.add_simple_node(xmlj, "state", job.get_status())
1307 src.xmlManager.add_simple_node(xmlj, "begin", T0)
1308 src.xmlManager.add_simple_node(xmlj, "end", Tf)
1309 src.xmlManager.add_simple_node(xmlj, "out",
1310 src.printcolors.cleancolor(job.out))
1311 src.xmlManager.add_simple_node(xmlj, "err",
1312 src.printcolors.cleancolor(job.err))
1313 src.xmlManager.add_simple_node(xmlj, "res", str(job.res_job))
1314 if len(job.remote_log_files) > 0:
1315 src.xmlManager.add_simple_node(xmlj,
1316 "remote_log_file_path",
1317 job.remote_log_files[0])
1319 src.xmlManager.add_simple_node(xmlj,
1320 "remote_log_file_path",
1323 xmlafter = src.xmlManager.add_simple_node(xmlj, "after", job.after)
1324 # get the job father
1325 if job.after is not None:
1328 if jb.name == job.after:
1331 if (job_father is not None and
1332 len(job_father.remote_log_files) > 0):
1333 link = job_father.remote_log_files[0]
1336 src.xmlManager.append_node_attrib(xmlafter, {"link" : link})
1338 # Verify that the job is to be done today regarding the input csv
1340 if job.board and job.board in self.d_input_boards.keys():
1342 for dist, appli in self.d_input_boards[job.board]["jobs"]:
1343 if (job.machine.distribution == dist
1344 and job.application == appli):
1346 src.xmlManager.add_simple_node(xmlj,
1351 src.xmlManager.add_simple_node(xmlj,
1357 xml_node_infos = xml_file.xmlroot.find('infos')
1358 src.xmlManager.append_node_attrib(xml_node_infos,
1360 datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")})
1364 def last_update(self, finish_status = "finished"):
1365 '''update information about the jobs for the file xml_file
1367 :param l_jobs List: the list of jobs that run today
1368 :param xml_file xmlManager.XmlLogFile: the xml instance to update
1370 for xml_file in [self.xml_global_file] + list(self.d_xml_board_files.values()):
1371 xml_node_infos = xml_file.xmlroot.find('infos')
1372 src.xmlManager.append_node_attrib(xml_node_infos,
1373 attrib={"JobsCommandStatus" : finish_status})
1375 self.write_xml_files()
1377 def write_xml_files(self):
1378 ''' Write the xml files
1380 self.xml_global_file.write_tree(STYLESHEET_GLOBAL)
1381 for xml_file in self.d_xml_board_files.values():
1382 xml_file.write_tree(STYLESHEET_BOARD)
1385 # Describes the command
1387 return _("The jobs command launches maintenances that are described"
1388 " in the dedicated jobs configuration file.")
1392 def run(args, runner, logger):
1394 (options, args) = parser.parse_args(args)
1396 l_cfg_dir = runner.cfg.PATHS.JOBPATH
1398 # list option : display all the available config files
1400 for cfg_dir in l_cfg_dir:
1401 if not options.no_label:
1402 logger.write("------ %s\n" %
1403 src.printcolors.printcHeader(cfg_dir))
1405 for f in sorted(os.listdir(cfg_dir)):
1406 if not f.endswith('.pyconf'):
1409 logger.write("%s\n" % cfilename)
1412 # Make sure the jobs_config option has been called
1413 if not options.jobs_cfg:
1414 message = _("The option --jobs_config is required\n")
1415 src.printcolors.printcError(message)
1418 # Find the file in the directories
1420 for cfg_dir in l_cfg_dir:
1421 file_jobs_cfg = os.path.join(cfg_dir, options.jobs_cfg)
1422 if not file_jobs_cfg.endswith('.pyconf'):
1423 file_jobs_cfg += '.pyconf'
1425 if not os.path.exists(file_jobs_cfg):
1432 msg = _("The file configuration %(name_file)s was not found."
1433 "\nUse the --list option to get the possible files.")
1434 src.printcolors.printcError(msg)
1438 (_("Platform"), runner.cfg.VARS.dist),
1439 (_("File containing the jobs configuration"), file_jobs_cfg)
1441 src.print_info(logger, info)
1443 # Read the config that is in the file
1444 config_jobs = src.read_config_from_a_file(file_jobs_cfg)
1445 if options.only_jobs:
1446 l_jb = src.pyconf.Sequence()
1447 for jb in config_jobs.jobs:
1448 if jb.name in options.only_jobs:
1450 "Adding a job that was given in only_jobs option parameters")
1451 config_jobs.jobs = l_jb
1454 today_jobs = Jobs(runner,
1458 # SSH connection to all machines
1459 today_jobs.ssh_connection_all_machines()
1460 if options.test_connection:
1465 # Copy the stylesheets in the log directory
1466 log_dir = runner.cfg.SITE.log.log_dir
1467 xsl_dir = os.path.join(runner.cfg.VARS.srcDir, 'xsl')
1469 files_to_copy.append(os.path.join(xsl_dir, STYLESHEET_GLOBAL))
1470 files_to_copy.append(os.path.join(xsl_dir, STYLESHEET_BOARD))
1471 files_to_copy.append(os.path.join(xsl_dir, "running.gif"))
1472 for file_path in files_to_copy:
1473 shutil.copy2(file_path, log_dir)
1475 # Instanciate the Gui in order to produce the xml files that contain all
1477 gui = Gui(runner.cfg.SITE.log.log_dir,
1479 today_jobs.ljobs_not_today,
1480 l_file_boards = options.input_boards)
1482 # Display the list of the xml files
1483 logger.write(src.printcolors.printcInfo(("Here is the list of published"
1485 logger.write("%s\n" % gui.xml_global_file.logFile, 4)
1486 for board in gui.d_xml_board_files.keys():
1487 logger.write("%s\n" % gui.d_xml_board_files[board].logFile, 4)
1489 logger.write("\n", 4)
1491 today_jobs.gui = gui
1495 # Run all the jobs contained in config_jobs
1496 today_jobs.run_jobs()
1497 except KeyboardInterrupt:
1499 logger.write("\n\n%s\n\n" %
1500 (src.printcolors.printcWarning(_("Forced interruption"))), 1)
1503 msg = _("Killing the running jobs and trying"
1504 " to get the corresponding logs\n")
1505 logger.write(src.printcolors.printcWarning(msg))
1507 # find the potential not finished jobs and kill them
1508 for jb in today_jobs.ljobs:
1509 if not jb.has_finished():
1510 jb.kill_remote_process()
1513 today_jobs.gui.last_update(_("Forced interruption"))
1516 today_jobs.gui.last_update()
1517 # Output the results
1518 today_jobs.write_all_results()