3 # Copyright (C) 2010-2013 CEA/DEN
5 # This library is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation; either
8 # version 2.1 of the License.
10 # This library is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 # Lesser General Public License for more details.
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this library; if not, write to the Free Software
17 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
27 parser = src.options.Options()
29 parser.add_option('j', 'jobs_config', 'string', 'jobs_cfg',
30 _('The name of the config file that contains'
31 ' the jobs configuration'))
32 parser.add_option('o', 'only_jobs', 'list2', 'only_jobs',
33 _('The list of jobs to launch, by their name. '))
34 parser.add_option('l', 'list', 'boolean', 'list',
35 _('list all available config files.'))
36 parser.add_option('n', 'no_label', 'boolean', 'no_label',
37 _("do not print labels, Works only with --list."), False)
38 parser.add_option('t', 'test_connection', 'boolean', 'test_connection',
39 _("Try to connect to the machines. Not executing the jobs."),
41 parser.add_option('p', 'publish', 'boolean', 'publish',
42 _("Generate an xml file that can be read in a browser to "
43 "display the jobs status."),
46 class machine(object):
47 '''Class to manage a ssh connection on a machine
49 def __init__(self, name, host, user, port=22, passwd=None, sat_path="salomeTools"):
54 self.password = passwd
55 self.sat_path = sat_path
56 self.ssh = paramiko.SSHClient()
57 self._connection_successful = None
59 def connect(self, logger):
60 '''Initiate the ssh connection to the remote machine
62 :param logger src.logger.Logger: The logger instance
67 self._connection_successful = False
68 self.ssh.load_system_host_keys()
69 self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
71 self.ssh.connect(self.host,
74 password = self.password)
75 except paramiko.AuthenticationException:
76 message = src.KO_STATUS + _("Authentication failed")
77 except paramiko.BadHostKeyException:
78 message = (src.KO_STATUS +
79 _("The server's host key could not be verified"))
80 except paramiko.SSHException:
81 message = ( _("SSHException error connecting or "
82 "establishing an SSH session"))
84 message = ( _("Error connecting or establishing an SSH session"))
86 self._connection_successful = True
90 def successfully_connected(self, logger):
91 '''Verify if the connection to the remote machine has succeed
93 :param logger src.logger.Logger: The logger instance
94 :return: True if the connection has succeed, False if not
97 if self._connection_successful == None:
98 message = "Warning : trying to ask if the connection to "
99 "(host: %s, port: %s, user: %s) is OK whereas there were"
100 " no connection request" % \
101 (machine.host, machine.port, machine.user)
102 logger.write( src.printcolors.printcWarning(message))
103 return self._connection_successful
105 def copy_sat(self, sat_local_path, job_file):
106 '''Copy salomeTools to the remote machine in self.sat_path
110 self.sftp = self.ssh.open_sftp()
111 self.mkdir(self.sat_path, ignore_existing=True)
112 self.put_dir(sat_local_path, self.sat_path, filters = ['.git'])
113 job_file_name = os.path.basename(job_file)
114 self.sftp.put(job_file, os.path.join(self.sat_path,
118 except Exception as e:
120 self._connection_successful = False
124 def put_dir(self, source, target, filters = []):
125 ''' Uploads the contents of the source directory to the target path. The
126 target directory needs to exists. All subdirectories in source are
127 created under target.
129 for item in os.listdir(source):
132 source_path = os.path.join(source, item)
133 destination_path = os.path.join(target, item)
134 if os.path.islink(source_path):
135 linkto = os.readlink(source_path)
137 self.sftp.symlink(linkto, destination_path)
138 self.sftp.chmod(destination_path,
139 os.stat(source_path).st_mode)
143 if os.path.isfile(source_path):
144 self.sftp.put(source_path, destination_path)
145 self.sftp.chmod(destination_path,
146 os.stat(source_path).st_mode)
148 self.mkdir(destination_path, ignore_existing=True)
149 self.put_dir(source_path, destination_path)
151 def mkdir(self, path, mode=511, ignore_existing=False):
152 ''' Augments mkdir by adding an option to not fail
156 self.sftp.mkdir(path, mode)
163 def exec_command(self, command, logger):
164 '''Execute the command on the remote machine
166 :param command str: The command to be run
167 :param logger src.logger.Logger: The logger instance
168 :return: the stdin, stdout, and stderr of the executing command,
170 :rtype: (paramiko.channel.ChannelFile, paramiko.channel.ChannelFile,
171 paramiko.channel.ChannelFile)
174 # Does not wait the end of the command
175 (stdin, stdout, stderr) = self.ssh.exec_command(command)
176 except paramiko.SSHException:
177 message = src.KO_STATUS + _(
178 ": the server failed to execute the command\n")
179 logger.write( src.printcolors.printcError(message))
180 return (None, None, None)
182 logger.write( src.printcolors.printcError(src.KO_STATUS + '\n'))
183 return (None, None, None)
185 return (stdin, stdout, stderr)
188 '''Close the ssh connection
194 def write_info(self, logger):
195 '''Prints the informations relative to the machine in the logger
196 (terminal traces and log file)
198 :param logger src.logger.Logger: The logger instance
202 logger.write("host : " + self.host + "\n")
203 logger.write("port : " + str(self.port) + "\n")
204 logger.write("user : " + str(self.user) + "\n")
205 if self.successfully_connected(logger):
206 status = src.OK_STATUS
208 status = src.KO_STATUS
209 logger.write("Connection : " + status + "\n\n")
213 '''Class to manage one job
215 def __init__(self, name, machine, application, distribution,
216 commands, timeout, logger, job_file, after=None):
219 self.machine = machine
221 self.timeout = timeout
222 self.application = application
223 self.distribution = distribution
225 # The list of log files to download from the remote machine
226 self.remote_log_files = []
227 # The remote command status
228 # -1 means that it has not been launched,
229 # 0 means success and 1 means fail
231 self.cancelled = False
235 self._has_begun = False
236 self._has_finished = False
237 self._has_timouted = False
238 self._stdin = None # Store the command inputs field
239 self._stdout = None # Store the command outputs field
240 self._stderr = None # Store the command errors field
242 self.out = None # Contains something only if the job is finished
243 self.err = None # Contains something only if the job is finished
245 self.commands = commands
246 self.command = (os.path.join(self.machine.sat_path, "sat") +
247 " -v1 job --jobs_config " +
254 cmd_pid = 'ps aux | grep "sat -v1 job --jobs_config" | awk \'{print $2}\''
255 (_, out_pid, _) = self.machine.exec_command(cmd_pid, self.logger)
256 pids_cmd = out_pid.readlines()
257 pids_cmd = [str(src.only_numbers(pid)) for pid in pids_cmd]
261 def kill_remote_process(self):
262 '''Kills the process on the remote machine.
264 :return: (the output of the kill, the error of the kill)
268 pids = self.get_pids()
269 cmd_kill = " ; ".join([("kill -9 " + pid) for pid in pids])
270 (_, out_kill, err_kill) = self.machine.exec_command(cmd_kill,
272 return (out_kill, err_kill)
275 '''Returns True if the job has already begun
277 :return: True if the job has already begun
280 return self._has_begun
282 def has_finished(self):
283 '''Returns True if the job has already finished
284 (i.e. all the commands have been executed)
285 If it is finished, the outputs are stored in the fields out and err.
287 :return: True if the job has already finished
291 # If the method has already been called and returned True
292 if self._has_finished:
295 # If the job has not begun yet
296 if not self.has_begun():
299 if self._stdout.channel.closed:
300 self._has_finished = True
301 # Store the result outputs
302 self.out = self._stdout.read()
303 self.err = self._stderr.read()
305 self._Tf = time.time()
306 # And get the remote command status and log files
309 return self._has_finished
311 def has_failed(self):
312 '''Returns True if the job has failed.
313 A job is considered as failed if the machine could not be reached,
314 if the remote command failed,
315 or if the job finished with a time out.
317 :return: True if the job has failed
320 if not self.has_finished():
322 if not self.machine.successfully_connected(self.logger):
324 if self.is_timeout():
326 if self.res_job == "1":
331 """In case of a failing job, one has to cancel every job that depend
332 on it. This method put the job as failed and will not be executed.
334 self._has_begun = True
335 self._has_finished = True
336 self.cancelled = True
337 self.out = _("This job was not launched because its father has failed.")
338 self.err = _("This job was not launched because its father has failed.")
340 def get_log_files(self):
341 if not self.has_finished():
342 msg = _("Trying to get log files whereas the job is not finished.")
343 self.logger.write(src.printcolors.printcWarning(msg))
345 out_lines = self.out.split("\n")
346 out_lines = [line for line in out_lines if line != '']
347 self.res_job = out_lines[0]
348 for job_path_remote in out_lines[1:]:
349 if os.path.basename(os.path.dirname(job_path_remote)) != 'OUT':
350 local_path = os.path.join(os.path.dirname(
351 self.logger.logFilePath),
352 os.path.basename(job_path_remote))
353 if not os.path.exists(local_path):
354 self.machine.sftp.get(job_path_remote, local_path)
356 local_path = os.path.join(os.path.dirname(
357 self.logger.logFilePath),
359 os.path.basename(job_path_remote))
360 if not os.path.exists(local_path):
361 self.machine.sftp.get(job_path_remote, local_path)
362 self.remote_log_files.append(local_path)
364 def is_running(self):
365 '''Returns True if the job commands are running
367 :return: True if the job is running
370 return self.has_begun() and not self.has_finished()
372 def is_timeout(self):
373 '''Returns True if the job commands has finished with timeout
375 :return: True if the job has finished with timeout
378 return self._has_timouted
380 def time_elapsed(self):
381 if not self.has_begun():
384 return T_now - self._T0
386 def check_time(self):
387 if not self.has_begun():
389 if self.time_elapsed() > self.timeout:
390 self._has_finished = True
391 self._has_timouted = True
392 self._Tf = time.time()
394 (out_kill, _) = self.kill_remote_process()
395 self.out = "TIMEOUT \n" + out_kill.read()
396 self.err = "TIMEOUT : %s seconds elapsed\n" % str(self.timeout)
398 def total_duration(self):
399 return self._Tf - self._T0
401 def run(self, logger):
403 print("Warn the user that a job can only be launched one time")
406 if not self.machine.successfully_connected(logger):
407 self._has_finished = True
409 self.err = ("Connection to machine (name : %s, host: %s, port: %s, user: %s) has failed\nUse the log command to get more information."
410 % (self.machine.name, self.machine.host, self.machine.port, self.machine.user))
412 self._T0 = time.time()
413 self._stdin, self._stdout, self._stderr = self.machine.exec_command(
414 self.command, logger)
415 if (self._stdin, self._stdout, self._stderr) == (None, None, None):
416 self._has_finished = True
417 self._Tf = time.time()
419 self.err = "The server failed to execute the command"
421 self._has_begun = True
423 def write_results(self, logger):
424 logger.write("name : " + self.name + "\n")
426 logger.write("after : %s\n" % self.after)
427 logger.write("Time elapsed : %4imin %2is \n" %
428 (self.total_duration()/60 , self.total_duration()%60))
430 logger.write("Begin time : %s\n" %
431 time.strftime('%Y-%m-%d %H:%M:%S',
432 time.localtime(self._T0)) )
434 logger.write("End time : %s\n\n" %
435 time.strftime('%Y-%m-%d %H:%M:%S',
436 time.localtime(self._Tf)) )
438 machine_head = "Informations about connection :\n"
439 underline = (len(machine_head) - 2) * "-"
440 logger.write(src.printcolors.printcInfo(machine_head+underline+"\n"))
441 self.machine.write_info(logger)
443 logger.write(src.printcolors.printcInfo("out : \n"))
445 logger.write("Unable to get output\n")
447 logger.write(self.out + "\n")
448 logger.write(src.printcolors.printcInfo("err : \n"))
450 logger.write("Unable to get error\n")
452 logger.write(self.err + "\n")
454 def get_status(self):
455 if not self.machine.successfully_connected(self.logger):
456 return "SSH connection KO"
457 if not self.has_begun():
458 return "Not launched"
461 if self.is_running():
462 return "running since " + time.strftime('%Y-%m-%d %H:%M:%S',
463 time.localtime(self._T0))
464 if self.has_finished():
465 if self.is_timeout():
466 return "Timeout since " + time.strftime('%Y-%m-%d %H:%M:%S',
467 time.localtime(self._Tf))
468 return "Finished since " + time.strftime('%Y-%m-%d %H:%M:%S',
469 time.localtime(self._Tf))
472 '''Class to manage the jobs to be run
480 lenght_columns = 20):
481 # The jobs configuration
482 self.cfg_jobs = config_jobs
483 self.job_file = job_file
484 self.job_file_path = job_file_path
485 # The machine that will be used today
487 # The list of machine (hosts, port) that will be used today
488 # (a same host can have several machine instances since there
489 # can be several ssh parameters)
491 # The jobs to be launched today
495 # The correlation dictionary between jobs and machines
496 self.dic_job_machine = {}
497 self.len_columns = lenght_columns
499 # the list of jobs that have not been run yet
500 self._l_jobs_not_started = []
501 # the list of jobs that have already ran
502 self._l_jobs_finished = []
503 # the list of jobs that are running
504 self._l_jobs_running = []
506 self.determine_products_and_machines()
508 def define_job(self, job_def, machine):
509 '''Takes a pyconf job definition and a machine (from class machine)
510 and returns the job instance corresponding to the definition.
512 :param job_def src.config.Mapping: a job definition
513 :param machine machine: the machine on which the job will run
514 :return: The corresponding job in a job class instance
518 cmmnds = job_def.commands
519 timeout = job_def.timeout
521 if 'after' in job_def:
522 after = job_def.after
524 if 'application' in job_def:
525 application = job_def.application
527 if 'distribution' in job_def:
528 distribution = job_def.distribution
540 def determine_products_and_machines(self):
541 '''Function that reads the pyconf jobs definition and instantiates all
542 the machines and jobs to be done today.
547 today = datetime.date.weekday(datetime.date.today())
550 for job_def in self.cfg_jobs.jobs :
551 if today in job_def.when:
553 name_machine = job_def.machine
556 for mach in self.lmachines:
557 if mach.name == name_machine:
561 if a_machine == None:
562 for machine_def in self.cfg_jobs.machines:
563 if machine_def.name == name_machine:
564 if 'host' not in machine_def:
565 host = self.runner.cfg.VARS.hostname
567 host = machine_def.host
569 if 'user' not in machine_def:
570 user = self.runner.cfg.VARS.user
572 user = machine_def.user
574 if 'port' not in machine_def:
577 port = machine_def.port
579 if 'password' not in machine_def:
582 passwd = machine_def.password
584 if 'sat_path' not in machine_def:
585 sat_path = "salomeTools"
587 sat_path = machine_def.sat_path
598 if (host, port) not in host_list:
599 host_list.append((host, port))
601 self.lmachines.append(a_machine)
603 if a_machine == None:
604 msg = _("WARNING: The job \"%(job_name)s\" requires the "
605 "machine \"%(machine_name)s\" but this machine "
606 "is not defined in the configuration file.\n"
607 "The job will not be launched")
608 self.logger.write(src.printcolors.printcWarning(msg))
610 a_job = self.define_job(job_def, a_machine)
611 self.dic_job_machine[a_job] = a_machine
613 self.ljobs.append(a_job)
615 self.lhosts = host_list
617 def ssh_connection_all_machines(self, pad=50):
618 '''Function that do the ssh connection to every machine
624 self.logger.write(src.printcolors.printcInfo((
625 "Establishing connection with all the machines :\n")))
626 for machine in self.lmachines:
627 # little algorithm in order to display traces
628 begin_line = (_("Connection to %s: " % machine.name))
629 if pad - len(begin_line) < 0:
632 endline = (pad - len(begin_line)) * "." + " "
634 step = "SSH connection"
635 self.logger.write( begin_line + endline + step)
637 # the call to the method that initiate the ssh connection
638 msg = machine.connect(self.logger)
640 # Copy salomeTools to the remote machine
641 if machine.successfully_connected(self.logger):
643 self.logger.write('\r%s%s%s' % (begin_line, endline, 20 * " "),3)
644 self.logger.write('\r%s%s%s' % (begin_line, endline, step), 3)
646 res_copy = machine.copy_sat(self.runner.cfg.VARS.salometoolsway,
648 # Print the status of the copy
650 self.logger.write('\r%s' %
651 ((len(begin_line)+len(endline)+20) * " "), 3)
652 self.logger.write('\r%s%s%s' %
655 src.printcolors.printc(src.OK_STATUS)), 3)
657 self.logger.write('\r%s' %
658 ((len(begin_line)+len(endline)+20) * " "), 3)
659 self.logger.write('\r%s%s%s %s' %
662 src.printcolors.printc(src.OK_STATUS),
663 _("Copy of SAT failed")), 3)
665 self.logger.write('\r%s' %
666 ((len(begin_line)+len(endline)+20) * " "), 3)
667 self.logger.write('\r%s%s%s %s' %
670 src.printcolors.printc(src.KO_STATUS),
672 self.logger.write("\n", 3)
674 self.logger.write("\n")
677 def is_occupied(self, hostname):
678 '''Function that returns True if a job is running on
679 the machine defined by its host and its port.
681 :param hostname (str, int): the pair (host, port)
682 :return: the job that is running on the host,
683 or false if there is no job running on the host.
688 for jb in self.dic_job_machine:
689 if jb.machine.host == host and jb.machine.port == port:
694 def update_jobs_states_list(self):
695 '''Function that updates the lists that store the currently
696 running jobs and the jobs that have already finished.
701 jobs_finished_list = []
702 jobs_running_list = []
703 for jb in self.dic_job_machine:
705 jobs_running_list.append(jb)
707 if jb.has_finished():
708 jobs_finished_list.append(jb)
710 nb_job_finished_before = len(self._l_jobs_finished)
711 self._l_jobs_finished = jobs_finished_list
712 self._l_jobs_running = jobs_running_list
714 nb_job_finished_now = len(self._l_jobs_finished)
716 return nb_job_finished_now > nb_job_finished_before
718 def cancel_dependencies_of_failing_jobs(self):
719 '''Function that cancels all the jobs that depend on a failing one.
725 for job in self.ljobs:
726 if job.after is None:
728 father_job = self.find_job_that_has_name(job.after)
729 if father_job.has_failed():
732 def find_job_that_has_name(self, name):
733 '''Returns the job by its name.
735 :param name str: a job name
736 :return: the job that has the name.
739 for jb in self.ljobs:
743 # the following is executed only if the job was not found
744 msg = _('The job "%s" seems to be nonexistent') % name
745 raise src.SatException(msg)
747 def str_of_length(self, text, length):
748 '''Takes a string text of any length and returns
749 the most close string of length "length".
751 :param text str: any string
752 :param length int: a length for the returned string
753 :return: the most close string of length "length"
756 if len(text) > length:
757 text_out = text[:length-3] + '...'
759 diff = length - len(text)
760 before = " " * (diff/2)
761 after = " " * (diff/2 + diff%2)
762 text_out = before + text + after
766 def display_status(self, len_col):
767 '''Takes a lenght and construct the display of the current status
768 of the jobs in an array that has a column for each host.
769 It displays the job that is currently running on the host
772 :param len_col int: the size of the column
778 for host_port in self.lhosts:
779 jb = self.is_occupied(host_port)
780 if not jb: # nothing running on the host
781 empty = self.str_of_length("empty", len_col)
782 display_line += "|" + empty
784 display_line += "|" + src.printcolors.printcInfo(
785 self.str_of_length(jb.name, len_col))
787 self.logger.write("\r" + display_line + "|")
792 '''The main method. Runs all the jobs on every host.
793 For each host, at a given time, only one job can be running.
794 The jobs that have the field after (that contain the job that has
795 to be run before it) are run after the previous job.
796 This method stops when all the jobs are finished.
803 self.logger.write(src.printcolors.printcInfo(
804 _('Executing the jobs :\n')))
806 for host_port in self.lhosts:
809 if port == 22: # default value
810 text_line += "|" + self.str_of_length(host, self.len_columns)
812 text_line += "|" + self.str_of_length(
813 "("+host+", "+str(port)+")", self.len_columns)
815 tiret_line = " " + "-"*(len(text_line)-1) + "\n"
816 self.logger.write(tiret_line)
817 self.logger.write(text_line + "|\n")
818 self.logger.write(tiret_line)
821 # The infinite loop that runs the jobs
822 l_jobs_not_started = self.dic_job_machine.keys()
823 while len(self._l_jobs_finished) != len(self.dic_job_machine.keys()):
824 new_job_start = False
825 for host_port in self.lhosts:
827 if self.is_occupied(host_port):
830 for jb in l_jobs_not_started:
831 if (jb.machine.host, jb.machine.port) != host_port:
835 l_jobs_not_started.remove(jb)
839 jb_before = self.find_job_that_has_name(jb.after)
840 if jb_before.has_finished():
842 l_jobs_not_started.remove(jb)
845 self.cancel_dependencies_of_failing_jobs()
846 new_job_finished = self.update_jobs_states_list()
848 if new_job_start or new_job_finished:
849 self.gui.update_xml_file(self.ljobs)
850 # Display the current status
851 self.display_status(self.len_columns)
853 # Make sure that the proc is not entirely busy
856 self.logger.write("\n")
857 self.logger.write(tiret_line)
858 self.logger.write("\n\n")
860 self.gui.update_xml_file(self.ljobs)
861 self.gui.last_update()
863 def write_all_results(self):
864 '''Display all the jobs outputs.
870 for jb in self.dic_job_machine.keys():
871 self.logger.write(src.printcolors.printcLabel(
872 "#------- Results for job %s -------#\n" % jb.name))
873 jb.write_results(self.logger)
874 self.logger.write("\n\n")
877 '''Class to manage the the xml data that can be displayed in a browser to
882 <?xml version='1.0' encoding='utf-8'?>
883 <?xml-stylesheet type='text/xsl' href='job_report.xsl'?>
886 <info name="generated" value="2016-06-02 07:06:45"/>
889 <host name=is221553 port=22 distribution=UB12.04/>
890 <host name=is221560 port=22/>
891 <host name=is221553 port=22 distribution=FD20/>
894 <application name=SALOME-7.8.0/>
895 <application name=SALOME-master/>
896 <application name=MED-STANDALONE-master/>
897 <application name=CORPUS/>
901 <job name="7.8.0 FD22">
902 <host>is228809</host>
904 <application>SALOME-7.8.0</application>
905 <user>adminuser</user>
906 <timeout>240</timeout>
908 export DISPLAY=is221560
909 scp -p salome@is221560.intra.cea.fr:/export/home/salome/SALOME-7.7.1p1-src.tgz /local/adminuser
910 tar xf /local/adminuser/SALOME-7.7.1p1-src.tgz -C /local/adminuser
912 <state>Not launched</state>
915 <job name="master MG05">
916 <host>is221560</host>
918 <application>SALOME-master</application>
920 <timeout>240</timeout>
922 export DISPLAY=is221560
923 scp -p salome@is221560.intra.cea.fr:/export/home/salome/SALOME-7.7.1p1-src.tgz /local/adminuser
924 sat prepare SALOME-master
925 sat compile SALOME-master
926 sat check SALOME-master
927 sat launcher SALOME-master
928 sat test SALOME-master
930 <state>Running since 23 min</state>
931 <!-- <state>time out</state> -->
932 <!-- <state>OK</state> -->
933 <!-- <state>KO</state> -->
934 <begin>10/05/2016 20h32</begin>
935 <end>10/05/2016 22h59</end>
943 def __init__(self, xml_file_path, l_jobs, stylesheet):
944 # The path of the xml file
945 self.xml_file_path = xml_file_path
947 self.stylesheet = stylesheet
948 # Open the file in a writing stream
949 self.xml_file = src.xmlManager.XmlLogFile(xml_file_path, "JobsReport")
950 # Create the lines and columns
951 self.initialize_array(l_jobs)
953 self.update_xml_file(l_jobs)
955 def initialize_array(self, l_jobs):
959 distrib = job.distribution
960 if distrib not in l_dist:
961 l_dist.append(distrib)
963 application = job.application
964 if application not in l_applications:
965 l_applications.append(application)
968 self.l_applications = l_applications
970 # Update the hosts node
971 self.xmldists = self.xml_file.add_simple_node("distributions")
972 for dist_name in self.l_dist:
973 src.xmlManager.add_simple_node(self.xmldists, "dist", attrib={"name" : dist_name})
975 # Update the applications node
976 self.xmlapplications = self.xml_file.add_simple_node("applications")
977 for application in self.l_applications:
978 src.xmlManager.add_simple_node(self.xmlapplications, "application", attrib={"name" : application})
980 # Initialize the jobs node
981 self.xmljobs = self.xml_file.add_simple_node("jobs")
983 # Initialize the info node (when generated)
984 self.xmlinfos = self.xml_file.add_simple_node("infos", attrib={"name" : "last update", "JobsCommandStatus" : "running"})
986 def update_xml_file(self, l_jobs):
988 # Update the job names and status node
990 # Find the node corresponding to the job and delete it
991 # in order to recreate it
992 for xmljob in self.xmljobs.findall('job'):
993 if xmljob.attrib['name'] == job.name:
994 self.xmljobs.remove(xmljob)
998 T0 = time.strftime('%Y-%m-%d %H:%M:%S',
999 time.localtime(job._T0))
1002 Tf = time.strftime('%Y-%m-%d %H:%M:%S',
1003 time.localtime(job._Tf))
1005 # recreate the job node
1006 xmlj = src.xmlManager.add_simple_node(self.xmljobs, "job", attrib={"name" : job.name})
1007 src.xmlManager.add_simple_node(xmlj, "host", job.machine.host)
1008 src.xmlManager.add_simple_node(xmlj, "port", str(job.machine.port))
1009 src.xmlManager.add_simple_node(xmlj, "user", job.machine.user)
1010 src.xmlManager.add_simple_node(xmlj, "sat_path", job.machine.sat_path)
1011 src.xmlManager.add_simple_node(xmlj, "application", job.application)
1012 src.xmlManager.add_simple_node(xmlj, "distribution", job.distribution)
1013 src.xmlManager.add_simple_node(xmlj, "timeout", str(job.timeout))
1014 src.xmlManager.add_simple_node(xmlj, "commands", " ; ".join(job.commands))
1015 src.xmlManager.add_simple_node(xmlj, "state", job.get_status())
1016 src.xmlManager.add_simple_node(xmlj, "begin", T0)
1017 src.xmlManager.add_simple_node(xmlj, "end", Tf)
1018 src.xmlManager.add_simple_node(xmlj, "out", src.printcolors.cleancolor(job.out))
1019 src.xmlManager.add_simple_node(xmlj, "err", src.printcolors.cleancolor(job.err))
1020 src.xmlManager.add_simple_node(xmlj, "res", str(job.res_job))
1021 if len(job.remote_log_files) > 0:
1022 src.xmlManager.add_simple_node(xmlj, "remote_log_file_path", job.remote_log_files[0])
1024 src.xmlManager.add_simple_node(xmlj, "remote_log_file_path", "nothing")
1026 xmlafter = src.xmlManager.add_simple_node(xmlj, "after", job.after)
1027 # get the job father
1028 if job.after is not None:
1031 if jb.name == job.after:
1033 if job_father is None:
1034 msg = _("The job %(father_name)s that is parent of "
1035 "%(son_name)s is not in the job list." %
1036 {"father_name" : job.after , "son_name" : job.name})
1037 raise src.SatException(msg)
1039 if len(job_father.remote_log_files) > 0:
1040 link = job_father.remote_log_files[0]
1043 src.xmlManager.append_node_attrib(xmlafter, {"link" : link})
1047 src.xmlManager.append_node_attrib(self.xmlinfos,
1049 datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")})
1052 self.write_xml_file()
1054 def last_update(self, finish_status = "finished"):
1055 src.xmlManager.append_node_attrib(self.xmlinfos,
1056 attrib={"JobsCommandStatus" : finish_status})
1058 self.write_xml_file()
1060 def write_xml_file(self):
1061 self.xml_file.write_tree(self.stylesheet)
1064 # Describes the command
1066 return _("The jobs command launches maintenances that are described"
1067 " in the dedicated jobs configuration file.")
1071 def run(args, runner, logger):
1073 (options, args) = parser.parse_args(args)
1075 jobs_cfg_files_dir = runner.cfg.SITE.jobs.config_path
1077 l_cfg_dir = [jobs_cfg_files_dir, os.path.join(runner.cfg.VARS.datadir, "jobs")]
1079 # Make sure the path to the jobs config files directory exists
1080 src.ensure_path_exists(jobs_cfg_files_dir)
1082 # list option : display all the available config files
1084 for cfg_dir in l_cfg_dir:
1085 if not options.no_label:
1086 logger.write("------ %s\n" %
1087 src.printcolors.printcHeader(cfg_dir))
1089 for f in sorted(os.listdir(cfg_dir)):
1090 if not f.endswith('.pyconf'):
1093 logger.write("%s\n" % cfilename)
1096 # Make sure the jobs_config option has been called
1097 if not options.jobs_cfg:
1098 message = _("The option --jobs_config is required\n")
1099 raise src.SatException( message )
1101 # Find the file in the directories
1103 for cfg_dir in l_cfg_dir:
1104 file_jobs_cfg = os.path.join(cfg_dir, options.jobs_cfg)
1105 if not file_jobs_cfg.endswith('.pyconf'):
1106 file_jobs_cfg += '.pyconf'
1108 if not os.path.exists(file_jobs_cfg):
1115 msg = _("The file configuration %(name_file)s was not found."
1116 "\nUse the --list option to get the possible files.")
1117 src.printcolors.printcError(msg)
1121 (_("Platform"), runner.cfg.VARS.dist),
1122 (_("File containing the jobs configuration"), file_jobs_cfg)
1124 src.print_info(logger, info)
1126 # Read the config that is in the file
1127 config_jobs = src.read_config_from_a_file(file_jobs_cfg)
1128 if options.only_jobs:
1129 l_jb = src.pyconf.Sequence()
1130 for jb in config_jobs.jobs:
1131 if jb.name in options.only_jobs:
1133 "Adding a job that was given in only_jobs option parameters")
1134 config_jobs.jobs = l_jb
1137 today_jobs = Jobs(runner, logger, options.jobs_cfg, file_jobs_cfg, config_jobs)
1138 # SSH connection to all machines
1139 today_jobs.ssh_connection_all_machines()
1140 if options.test_connection:
1145 gui = Gui("/export/home/serioja/LOGS/test.xml", today_jobs.ljobs, "job_report.xsl")
1147 today_jobs.gui = gui
1151 # Run all the jobs contained in config_jobs
1152 today_jobs.run_jobs()
1153 except KeyboardInterrupt:
1155 logger.write("\n\n%s\n\n" %
1156 (src.printcolors.printcWarning(_("Forced interruption"))), 1)
1159 # find the potential not finished jobs and kill them
1160 for jb in today_jobs.ljobs:
1161 if not jb.has_finished():
1162 jb.kill_remote_process()
1164 today_jobs.gui.last_update(_("Forced interruption"))
1166 today_jobs.gui.last_update()
1167 # Output the results
1168 today_jobs.write_all_results()