3 # Copyright (C) 2010-2013 CEA/DEN
5 # This library is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation; either
8 # version 2.1 of the License.
10 # This library is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 # Lesser General Public License for more details.
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this library; if not, write to the Free Software
17 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
27 parser = src.options.Options()
29 parser.add_option('j', 'jobs_config', 'string', 'jobs_cfg',
30 _('The name of the config file that contains'
31 ' the jobs configuration'))
32 parser.add_option('o', 'only_jobs', 'list2', 'only_jobs',
33 _('The list of jobs to launch, by their name. '))
34 parser.add_option('l', 'list', 'boolean', 'list',
35 _('list all available config files.'))
36 parser.add_option('n', 'no_label', 'boolean', 'no_label',
37 _("do not print labels, Works only with --list."), False)
38 parser.add_option('t', 'test_connection', 'boolean', 'test_connection',
39 _("Try to connect to the machines. Not executing the jobs."),
41 parser.add_option('p', 'publish', 'boolean', 'publish',
42 _("Generate an xml file that can be read in a browser to "
43 "display the jobs status."),
46 class machine(object):
47 '''Class to manage a ssh connection on a machine
49 def __init__(self, name, host, user, port=22, passwd=None, sat_path="salomeTools"):
54 self.password = passwd
55 self.sat_path = sat_path
56 self.ssh = paramiko.SSHClient()
57 self._connection_successful = None
59 def connect(self, logger):
60 '''Initiate the ssh connection to the remote machine
62 :param logger src.logger.Logger: The logger instance
67 self._connection_successful = False
68 self.ssh.load_system_host_keys()
69 self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
71 self.ssh.connect(self.host,
74 password = self.password)
75 except paramiko.AuthenticationException:
76 message = src.KO_STATUS + _("Authentication failed")
77 except paramiko.BadHostKeyException:
78 message = (src.KO_STATUS +
79 _("The server's host key could not be verified"))
80 except paramiko.SSHException:
81 message = ( _("SSHException error connecting or "
82 "establishing an SSH session"))
84 message = ( _("Error connecting or establishing an SSH session"))
86 self._connection_successful = True
90 def successfully_connected(self, logger):
91 '''Verify if the connection to the remote machine has succeed
93 :param logger src.logger.Logger: The logger instance
94 :return: True if the connection has succeed, False if not
97 if self._connection_successful == None:
98 message = "Warning : trying to ask if the connection to "
99 "(host: %s, port: %s, user: %s) is OK whereas there were"
100 " no connection request" % \
101 (machine.host, machine.port, machine.user)
102 logger.write( src.printcolors.printcWarning(message))
103 return self._connection_successful
105 def copy_sat(self, sat_local_path, job_file):
106 '''Copy salomeTools to the remote machine in self.sat_path
110 self.sftp = self.ssh.open_sftp()
111 self.mkdir(self.sat_path, ignore_existing=True)
112 self.put_dir(sat_local_path, self.sat_path, filters = ['.git'])
113 job_file_name = os.path.basename(job_file)
114 self.sftp.put(job_file, os.path.join(self.sat_path,
118 except Exception as e:
120 self._connection_successful = False
124 def put_dir(self, source, target, filters = []):
125 ''' Uploads the contents of the source directory to the target path. The
126 target directory needs to exists. All subdirectories in source are
127 created under target.
129 for item in os.listdir(source):
132 source_path = os.path.join(source, item)
133 destination_path = os.path.join(target, item)
134 if os.path.islink(source_path):
135 linkto = os.readlink(source_path)
137 self.sftp.symlink(linkto, destination_path)
138 self.sftp.chmod(destination_path,
139 os.stat(source_path).st_mode)
143 if os.path.isfile(source_path):
144 self.sftp.put(source_path, destination_path)
145 self.sftp.chmod(destination_path,
146 os.stat(source_path).st_mode)
148 self.mkdir(destination_path, ignore_existing=True)
149 self.put_dir(source_path, destination_path)
151 def mkdir(self, path, mode=511, ignore_existing=False):
152 ''' Augments mkdir by adding an option to not fail
156 self.sftp.mkdir(path, mode)
163 def exec_command(self, command, logger):
164 '''Execute the command on the remote machine
166 :param command str: The command to be run
167 :param logger src.logger.Logger: The logger instance
168 :return: the stdin, stdout, and stderr of the executing command,
170 :rtype: (paramiko.channel.ChannelFile, paramiko.channel.ChannelFile,
171 paramiko.channel.ChannelFile)
174 # Does not wait the end of the command
175 (stdin, stdout, stderr) = self.ssh.exec_command(command)
176 except paramiko.SSHException:
177 message = src.KO_STATUS + _(
178 ": the server failed to execute the command\n")
179 logger.write( src.printcolors.printcError(message))
180 return (None, None, None)
182 logger.write( src.printcolors.printcError(src.KO_STATUS + '\n'))
183 return (None, None, None)
185 return (stdin, stdout, stderr)
188 '''Close the ssh connection
194 def write_info(self, logger):
195 '''Prints the informations relative to the machine in the logger
196 (terminal traces and log file)
198 :param logger src.logger.Logger: The logger instance
202 logger.write("host : " + self.host + "\n")
203 logger.write("port : " + str(self.port) + "\n")
204 logger.write("user : " + str(self.user) + "\n")
205 if self.successfully_connected(logger):
206 status = src.OK_STATUS
208 status = src.KO_STATUS
209 logger.write("Connection : " + status + "\n\n")
213 '''Class to manage one job
215 def __init__(self, name, machine, application, distribution,
216 commands, timeout, logger, job_file, after=None):
219 self.machine = machine
221 self.timeout = timeout
222 self.application = application
223 self.distribution = distribution
225 # The list of log files to download from the remote machine
226 self.remote_log_files = []
227 # The remote command status
228 # -1 means that it has not been launched,
229 # 0 means success and 1 means fail
231 self.cancelled = False
235 self._has_begun = False
236 self._has_finished = False
237 self._has_timouted = False
238 self._stdin = None # Store the command inputs field
239 self._stdout = None # Store the command outputs field
240 self._stderr = None # Store the command errors field
242 self.out = None # Contains something only if the job is finished
243 self.err = None # Contains something only if the job is finished
245 self.commands = commands
246 self.command = (os.path.join(self.machine.sat_path, "sat") +
247 " -v1 job --jobs_config " +
254 cmd_pid = 'ps aux | grep "sat -v1 job --jobs_config" | awk \'{print $2}\''
255 (_, out_pid, _) = self.machine.exec_command(cmd_pid, self.logger)
256 pids_cmd = out_pid.readlines()
257 pids_cmd = [str(src.only_numbers(pid)) for pid in pids_cmd]
261 def kill_remote_process(self):
262 '''Kills the process on the remote machine.
264 :return: (the output of the kill, the error of the kill)
268 pids = self.get_pids()
269 cmd_kill = " ; ".join([("kill -9 " + pid) for pid in pids])
270 (_, out_kill, err_kill) = self.machine.exec_command(cmd_kill,
272 return (out_kill, err_kill)
275 '''Returns True if the job has already begun
277 :return: True if the job has already begun
280 return self._has_begun
282 def has_finished(self):
283 '''Returns True if the job has already finished
284 (i.e. all the commands have been executed)
285 If it is finished, the outputs are stored in the fields out and err.
287 :return: True if the job has already finished
291 # If the method has already been called and returned True
292 if self._has_finished:
295 # If the job has not begun yet
296 if not self.has_begun():
299 if self._stdout.channel.closed:
300 self._has_finished = True
301 # Store the result outputs
302 self.out = self._stdout.read()
303 self.err = self._stderr.read()
305 self._Tf = time.time()
306 # And get the remote command status and log files
309 return self._has_finished
311 def has_failed(self):
312 '''Returns True if the job has failed.
313 A job is considered as failed if the machine could not be reached,
314 if the remote command failed,
315 or if the job finished with a time out.
317 :return: True if the job has failed
320 if not self.has_finished():
322 if not self.machine.successfully_connected(self.logger):
324 if self.is_timeout():
326 if self.res_job == "1":
331 """In case of a failing job, one has to cancel every job that depend
332 on it. This method put the job as failed and will not be executed.
334 self._has_begun = True
335 self._has_finished = True
336 self.cancelled = True
337 self.out = _("This job was not launched because its father has failed.")
338 self.err = _("This job was not launched because its father has failed.")
340 def get_log_files(self):
341 if not self.has_finished():
342 msg = _("Trying to get log files whereas the job is not finished.")
343 self.logger.write(src.printcolors.printcWarning(msg))
345 out_lines = self.out.split("\n")
346 out_lines = [line for line in out_lines if line != '']
347 self.res_job = out_lines[0]
348 for job_path_remote in out_lines[1:]:
349 if os.path.basename(os.path.dirname(job_path_remote)) != 'OUT':
350 local_path = os.path.join(os.path.dirname(
351 self.logger.logFilePath),
352 os.path.basename(job_path_remote))
353 if not os.path.exists(local_path):
354 self.machine.sftp.get(job_path_remote, local_path)
356 local_path = os.path.join(os.path.dirname(
357 self.logger.logFilePath),
359 os.path.basename(job_path_remote))
360 if not os.path.exists(local_path):
361 self.machine.sftp.get(job_path_remote, local_path)
362 self.remote_log_files.append(local_path)
364 def is_running(self):
365 '''Returns True if the job commands are running
367 :return: True if the job is running
370 return self.has_begun() and not self.has_finished()
372 def is_timeout(self):
373 '''Returns True if the job commands has finished with timeout
375 :return: True if the job has finished with timeout
378 return self._has_timouted
380 def time_elapsed(self):
381 if not self.has_begun():
384 return T_now - self._T0
386 def check_time(self):
387 if not self.has_begun():
389 if self.time_elapsed() > self.timeout:
390 self._has_finished = True
391 self._has_timouted = True
392 self._Tf = time.time()
394 (out_kill, _) = self.kill_remote_process()
395 self.out = "TIMEOUT \n" + out_kill.read()
396 self.err = "TIMEOUT : %s seconds elapsed\n" % str(self.timeout)
398 def total_duration(self):
399 return self._Tf - self._T0
401 def run(self, logger):
403 print("Warn the user that a job can only be launched one time")
406 if not self.machine.successfully_connected(logger):
407 self._has_finished = True
409 self.err = ("Connection to machine (name : %s, host: %s, port: %s, user: %s) has failed\nUse the log command to get more information."
410 % (self.machine.name, self.machine.host, self.machine.port, self.machine.user))
412 self._T0 = time.time()
413 self._stdin, self._stdout, self._stderr = self.machine.exec_command(
414 self.command, logger)
415 if (self._stdin, self._stdout, self._stderr) == (None, None, None):
416 self._has_finished = True
417 self._Tf = time.time()
419 self.err = "The server failed to execute the command"
421 self._has_begun = True
423 def write_results(self, logger):
424 logger.write("name : " + self.name + "\n")
426 logger.write("after : %s\n" % self.after)
427 logger.write("Time elapsed : %4imin %2is \n" %
428 (self.total_duration()/60 , self.total_duration()%60))
430 logger.write("Begin time : %s\n" %
431 time.strftime('%Y-%m-%d %H:%M:%S',
432 time.localtime(self._T0)) )
434 logger.write("End time : %s\n\n" %
435 time.strftime('%Y-%m-%d %H:%M:%S',
436 time.localtime(self._Tf)) )
438 machine_head = "Informations about connection :\n"
439 underline = (len(machine_head) - 2) * "-"
440 logger.write(src.printcolors.printcInfo(machine_head+underline+"\n"))
441 self.machine.write_info(logger)
443 logger.write(src.printcolors.printcInfo("out : \n"))
445 logger.write("Unable to get output\n")
447 logger.write(self.out + "\n")
448 logger.write(src.printcolors.printcInfo("err : \n"))
450 logger.write("Unable to get error\n")
452 logger.write(self.err + "\n")
454 def get_status(self):
455 if not self.machine.successfully_connected(self.logger):
456 return "SSH connection KO"
457 if not self.has_begun():
458 return "Not launched"
461 if self.is_running():
462 return "running since " + time.strftime('%Y-%m-%d %H:%M:%S',
463 time.localtime(self._T0))
464 if self.has_finished():
465 if self.is_timeout():
466 return "Timeout since " + time.strftime('%Y-%m-%d %H:%M:%S',
467 time.localtime(self._Tf))
468 return "Finished since " + time.strftime('%Y-%m-%d %H:%M:%S',
469 time.localtime(self._Tf))
472 '''Class to manage the jobs to be run
480 lenght_columns = 20):
481 # The jobs configuration
482 self.cfg_jobs = config_jobs
483 self.job_file = job_file
484 self.job_file_path = job_file_path
485 # The machine that will be used today
487 # The list of machine (hosts, port) that will be used today
488 # (a same host can have several machine instances since there
489 # can be several ssh parameters)
491 # The jobs to be launched today
493 # The jobs that will not be launched today
494 self.ljobsdef_not_today = []
497 # The correlation dictionary between jobs and machines
498 self.dic_job_machine = {}
499 self.len_columns = lenght_columns
501 # the list of jobs that have not been run yet
502 self._l_jobs_not_started = []
503 # the list of jobs that have already ran
504 self._l_jobs_finished = []
505 # the list of jobs that are running
506 self._l_jobs_running = []
508 self.determine_products_and_machines()
510 def define_job(self, job_def, machine):
511 '''Takes a pyconf job definition and a machine (from class machine)
512 and returns the job instance corresponding to the definition.
514 :param job_def src.config.Mapping: a job definition
515 :param machine machine: the machine on which the job will run
516 :return: The corresponding job in a job class instance
520 cmmnds = job_def.commands
521 timeout = job_def.timeout
523 if 'after' in job_def:
524 after = job_def.after
526 if 'application' in job_def:
527 application = job_def.application
529 if 'distribution' in job_def:
530 distribution = job_def.distribution
542 def determine_products_and_machines(self):
543 '''Function that reads the pyconf jobs definition and instantiates all
544 the machines and jobs to be done today.
549 today = datetime.date.weekday(datetime.date.today())
552 for job_def in self.cfg_jobs.jobs :
553 if today in job_def.when:
555 name_machine = job_def.machine
558 for mach in self.lmachines:
559 if mach.name == name_machine:
563 if a_machine == None:
564 for machine_def in self.cfg_jobs.machines:
565 if machine_def.name == name_machine:
566 if 'host' not in machine_def:
567 host = self.runner.cfg.VARS.hostname
569 host = machine_def.host
571 if 'user' not in machine_def:
572 user = self.runner.cfg.VARS.user
574 user = machine_def.user
576 if 'port' not in machine_def:
579 port = machine_def.port
581 if 'password' not in machine_def:
584 passwd = machine_def.password
586 if 'sat_path' not in machine_def:
587 sat_path = "salomeTools"
589 sat_path = machine_def.sat_path
600 if (host, port) not in host_list:
601 host_list.append((host, port))
603 self.lmachines.append(a_machine)
605 if a_machine == None:
606 msg = _("WARNING: The job \"%(job_name)s\" requires the "
607 "machine \"%(machine_name)s\" but this machine "
608 "is not defined in the configuration file.\n"
609 "The job will not be launched")
610 self.logger.write(src.printcolors.printcWarning(msg))
612 a_job = self.define_job(job_def, a_machine)
613 self.dic_job_machine[a_job] = a_machine
615 self.ljobs.append(a_job)
616 else: # today in job_def.when
617 self.ljobsdef_not_today.append(job_def)
619 self.lhosts = host_list
621 def ssh_connection_all_machines(self, pad=50):
622 '''Function that do the ssh connection to every machine
628 self.logger.write(src.printcolors.printcInfo((
629 "Establishing connection with all the machines :\n")))
630 for machine in self.lmachines:
631 # little algorithm in order to display traces
632 begin_line = (_("Connection to %s: " % machine.name))
633 if pad - len(begin_line) < 0:
636 endline = (pad - len(begin_line)) * "." + " "
638 step = "SSH connection"
639 self.logger.write( begin_line + endline + step)
641 # the call to the method that initiate the ssh connection
642 msg = machine.connect(self.logger)
644 # Copy salomeTools to the remote machine
645 if machine.successfully_connected(self.logger):
647 self.logger.write('\r%s%s%s' % (begin_line, endline, 20 * " "),3)
648 self.logger.write('\r%s%s%s' % (begin_line, endline, step), 3)
650 res_copy = machine.copy_sat(self.runner.cfg.VARS.salometoolsway,
652 # Print the status of the copy
654 self.logger.write('\r%s' %
655 ((len(begin_line)+len(endline)+20) * " "), 3)
656 self.logger.write('\r%s%s%s' %
659 src.printcolors.printc(src.OK_STATUS)), 3)
661 self.logger.write('\r%s' %
662 ((len(begin_line)+len(endline)+20) * " "), 3)
663 self.logger.write('\r%s%s%s %s' %
666 src.printcolors.printc(src.OK_STATUS),
667 _("Copy of SAT failed")), 3)
669 self.logger.write('\r%s' %
670 ((len(begin_line)+len(endline)+20) * " "), 3)
671 self.logger.write('\r%s%s%s %s' %
674 src.printcolors.printc(src.KO_STATUS),
676 self.logger.write("\n", 3)
678 self.logger.write("\n")
681 def is_occupied(self, hostname):
682 '''Function that returns True if a job is running on
683 the machine defined by its host and its port.
685 :param hostname (str, int): the pair (host, port)
686 :return: the job that is running on the host,
687 or false if there is no job running on the host.
692 for jb in self.dic_job_machine:
693 if jb.machine.host == host and jb.machine.port == port:
698 def update_jobs_states_list(self):
699 '''Function that updates the lists that store the currently
700 running jobs and the jobs that have already finished.
705 jobs_finished_list = []
706 jobs_running_list = []
707 for jb in self.dic_job_machine:
709 jobs_running_list.append(jb)
711 if jb.has_finished():
712 jobs_finished_list.append(jb)
714 nb_job_finished_before = len(self._l_jobs_finished)
715 self._l_jobs_finished = jobs_finished_list
716 self._l_jobs_running = jobs_running_list
718 nb_job_finished_now = len(self._l_jobs_finished)
720 return nb_job_finished_now > nb_job_finished_before
722 def cancel_dependencies_of_failing_jobs(self):
723 '''Function that cancels all the jobs that depend on a failing one.
729 for job in self.ljobs:
730 if job.after is None:
732 father_job = self.find_job_that_has_name(job.after)
733 if father_job.has_failed():
736 def find_job_that_has_name(self, name):
737 '''Returns the job by its name.
739 :param name str: a job name
740 :return: the job that has the name.
743 for jb in self.ljobs:
747 # the following is executed only if the job was not found
748 msg = _('The job "%s" seems to be nonexistent') % name
749 raise src.SatException(msg)
751 def str_of_length(self, text, length):
752 '''Takes a string text of any length and returns
753 the most close string of length "length".
755 :param text str: any string
756 :param length int: a length for the returned string
757 :return: the most close string of length "length"
760 if len(text) > length:
761 text_out = text[:length-3] + '...'
763 diff = length - len(text)
764 before = " " * (diff/2)
765 after = " " * (diff/2 + diff%2)
766 text_out = before + text + after
770 def display_status(self, len_col):
771 '''Takes a lenght and construct the display of the current status
772 of the jobs in an array that has a column for each host.
773 It displays the job that is currently running on the host
776 :param len_col int: the size of the column
782 for host_port in self.lhosts:
783 jb = self.is_occupied(host_port)
784 if not jb: # nothing running on the host
785 empty = self.str_of_length("empty", len_col)
786 display_line += "|" + empty
788 display_line += "|" + src.printcolors.printcInfo(
789 self.str_of_length(jb.name, len_col))
791 self.logger.write("\r" + display_line + "|")
796 '''The main method. Runs all the jobs on every host.
797 For each host, at a given time, only one job can be running.
798 The jobs that have the field after (that contain the job that has
799 to be run before it) are run after the previous job.
800 This method stops when all the jobs are finished.
807 self.logger.write(src.printcolors.printcInfo(
808 _('Executing the jobs :\n')))
810 for host_port in self.lhosts:
813 if port == 22: # default value
814 text_line += "|" + self.str_of_length(host, self.len_columns)
816 text_line += "|" + self.str_of_length(
817 "("+host+", "+str(port)+")", self.len_columns)
819 tiret_line = " " + "-"*(len(text_line)-1) + "\n"
820 self.logger.write(tiret_line)
821 self.logger.write(text_line + "|\n")
822 self.logger.write(tiret_line)
825 # The infinite loop that runs the jobs
826 l_jobs_not_started = self.dic_job_machine.keys()
827 while len(self._l_jobs_finished) != len(self.dic_job_machine.keys()):
828 new_job_start = False
829 for host_port in self.lhosts:
831 if self.is_occupied(host_port):
834 for jb in l_jobs_not_started:
835 if (jb.machine.host, jb.machine.port) != host_port:
839 l_jobs_not_started.remove(jb)
843 jb_before = self.find_job_that_has_name(jb.after)
844 if jb_before.has_finished():
846 l_jobs_not_started.remove(jb)
849 self.cancel_dependencies_of_failing_jobs()
850 new_job_finished = self.update_jobs_states_list()
852 if new_job_start or new_job_finished:
853 self.gui.update_xml_file(self.ljobs)
854 # Display the current status
855 self.display_status(self.len_columns)
857 # Make sure that the proc is not entirely busy
860 self.logger.write("\n")
861 self.logger.write(tiret_line)
862 self.logger.write("\n\n")
864 self.gui.update_xml_file(self.ljobs)
865 self.gui.last_update()
867 def write_all_results(self):
868 '''Display all the jobs outputs.
874 for jb in self.dic_job_machine.keys():
875 self.logger.write(src.printcolors.printcLabel(
876 "#------- Results for job %s -------#\n" % jb.name))
877 jb.write_results(self.logger)
878 self.logger.write("\n\n")
881 '''Class to manage the the xml data that can be displayed in a browser to
886 <?xml version='1.0' encoding='utf-8'?>
887 <?xml-stylesheet type='text/xsl' href='job_report.xsl'?>
890 <info name="generated" value="2016-06-02 07:06:45"/>
893 <host name=is221553 port=22 distribution=UB12.04/>
894 <host name=is221560 port=22/>
895 <host name=is221553 port=22 distribution=FD20/>
898 <application name=SALOME-7.8.0/>
899 <application name=SALOME-master/>
900 <application name=MED-STANDALONE-master/>
901 <application name=CORPUS/>
905 <job name="7.8.0 FD22">
906 <host>is228809</host>
908 <application>SALOME-7.8.0</application>
909 <user>adminuser</user>
910 <timeout>240</timeout>
912 export DISPLAY=is221560
913 scp -p salome@is221560.intra.cea.fr:/export/home/salome/SALOME-7.7.1p1-src.tgz /local/adminuser
914 tar xf /local/adminuser/SALOME-7.7.1p1-src.tgz -C /local/adminuser
916 <state>Not launched</state>
919 <job name="master MG05">
920 <host>is221560</host>
922 <application>SALOME-master</application>
924 <timeout>240</timeout>
926 export DISPLAY=is221560
927 scp -p salome@is221560.intra.cea.fr:/export/home/salome/SALOME-7.7.1p1-src.tgz /local/adminuser
928 sat prepare SALOME-master
929 sat compile SALOME-master
930 sat check SALOME-master
931 sat launcher SALOME-master
932 sat test SALOME-master
934 <state>Running since 23 min</state>
935 <!-- <state>time out</state> -->
936 <!-- <state>OK</state> -->
937 <!-- <state>KO</state> -->
938 <begin>10/05/2016 20h32</begin>
939 <end>10/05/2016 22h59</end>
947 def __init__(self, xml_file_path, l_jobs, l_jobs_not_today, stylesheet):
948 # The path of the xml file
949 self.xml_file_path = xml_file_path
951 self.stylesheet = stylesheet
952 # Open the file in a writing stream
953 self.xml_file = src.xmlManager.XmlLogFile(xml_file_path, "JobsReport")
954 # Create the lines and columns
955 self.initialize_array(l_jobs, l_jobs_not_today)
957 self.update_xml_file(l_jobs)
959 def initialize_array(self, l_jobs, l_jobs_not_today):
963 distrib = job.distribution
964 if distrib is not None and distrib not in l_dist:
965 l_dist.append(distrib)
967 application = job.application
968 if application is not None and application not in l_applications:
969 l_applications.append(application)
971 for job_def in l_jobs_not_today:
972 distrib = src.get_cfg_param(job_def, "distribution", "nothing")
973 if distrib is not "nothing" and distrib not in l_dist:
974 l_dist.append(distrib)
976 application = src.get_cfg_param(job_def, "application", "nothing")
977 if application is not "nothing" and application not in l_applications:
978 l_applications.append(application)
981 self.l_applications = l_applications
983 # Update the hosts node
984 self.xmldists = self.xml_file.add_simple_node("distributions")
985 for dist_name in self.l_dist:
986 src.xmlManager.add_simple_node(self.xmldists, "dist", attrib={"name" : dist_name})
988 # Update the applications node
989 self.xmlapplications = self.xml_file.add_simple_node("applications")
990 for application in self.l_applications:
991 src.xmlManager.add_simple_node(self.xmlapplications, "application", attrib={"name" : application})
993 # Initialize the jobs node
994 self.xmljobs = self.xml_file.add_simple_node("jobs")
997 self.put_jobs_not_today(l_jobs_not_today)
999 # Initialize the info node (when generated)
1000 self.xmlinfos = self.xml_file.add_simple_node("infos", attrib={"name" : "last update", "JobsCommandStatus" : "running"})
1002 def put_jobs_not_today(self, l_jobs_not_today):
1003 for job_def in l_jobs_not_today:
1004 xmlj = src.xmlManager.add_simple_node(self.xmljobs, "job", attrib={"name" : job_def.name})
1005 src.xmlManager.add_simple_node(xmlj, "application", src.get_cfg_param(job_def, "application", "nothing"))
1006 src.xmlManager.add_simple_node(xmlj, "distribution", src.get_cfg_param(job_def, "distribution", "nothing"))
1007 src.xmlManager.add_simple_node(xmlj, "commands", " ; ".join(job_def.commands))
1008 src.xmlManager.add_simple_node(xmlj, "state", "Not today")
1010 def update_xml_file(self, l_jobs):
1012 # Update the job names and status node
1014 # Find the node corresponding to the job and delete it
1015 # in order to recreate it
1016 for xmljob in self.xmljobs.findall('job'):
1017 if xmljob.attrib['name'] == job.name:
1018 self.xmljobs.remove(xmljob)
1022 T0 = time.strftime('%Y-%m-%d %H:%M:%S',
1023 time.localtime(job._T0))
1026 Tf = time.strftime('%Y-%m-%d %H:%M:%S',
1027 time.localtime(job._Tf))
1029 # recreate the job node
1030 xmlj = src.xmlManager.add_simple_node(self.xmljobs, "job", attrib={"name" : job.name})
1031 src.xmlManager.add_simple_node(xmlj, "host", job.machine.host)
1032 src.xmlManager.add_simple_node(xmlj, "port", str(job.machine.port))
1033 src.xmlManager.add_simple_node(xmlj, "user", job.machine.user)
1034 src.xmlManager.add_simple_node(xmlj, "sat_path", job.machine.sat_path)
1035 src.xmlManager.add_simple_node(xmlj, "application", job.application)
1036 src.xmlManager.add_simple_node(xmlj, "distribution", job.distribution)
1037 src.xmlManager.add_simple_node(xmlj, "timeout", str(job.timeout))
1038 src.xmlManager.add_simple_node(xmlj, "commands", " ; ".join(job.commands))
1039 src.xmlManager.add_simple_node(xmlj, "state", job.get_status())
1040 src.xmlManager.add_simple_node(xmlj, "begin", T0)
1041 src.xmlManager.add_simple_node(xmlj, "end", Tf)
1042 src.xmlManager.add_simple_node(xmlj, "out", src.printcolors.cleancolor(job.out))
1043 src.xmlManager.add_simple_node(xmlj, "err", src.printcolors.cleancolor(job.err))
1044 src.xmlManager.add_simple_node(xmlj, "res", str(job.res_job))
1045 if len(job.remote_log_files) > 0:
1046 src.xmlManager.add_simple_node(xmlj, "remote_log_file_path", job.remote_log_files[0])
1048 src.xmlManager.add_simple_node(xmlj, "remote_log_file_path", "nothing")
1050 xmlafter = src.xmlManager.add_simple_node(xmlj, "after", job.after)
1051 # get the job father
1052 if job.after is not None:
1055 if jb.name == job.after:
1057 if job_father is None:
1058 msg = _("The job %(father_name)s that is parent of "
1059 "%(son_name)s is not in the job list." %
1060 {"father_name" : job.after , "son_name" : job.name})
1061 raise src.SatException(msg)
1063 if len(job_father.remote_log_files) > 0:
1064 link = job_father.remote_log_files[0]
1067 src.xmlManager.append_node_attrib(xmlafter, {"link" : link})
1071 src.xmlManager.append_node_attrib(self.xmlinfos,
1073 datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")})
1076 self.write_xml_file()
1078 def last_update(self, finish_status = "finished"):
1079 src.xmlManager.append_node_attrib(self.xmlinfos,
1080 attrib={"JobsCommandStatus" : finish_status})
1082 self.write_xml_file()
1084 def write_xml_file(self):
1085 self.xml_file.write_tree(self.stylesheet)
1088 # Describes the command
1090 return _("The jobs command launches maintenances that are described"
1091 " in the dedicated jobs configuration file.")
1095 def run(args, runner, logger):
1097 (options, args) = parser.parse_args(args)
1099 jobs_cfg_files_dir = runner.cfg.SITE.jobs.config_path
1101 l_cfg_dir = [jobs_cfg_files_dir, os.path.join(runner.cfg.VARS.datadir, "jobs")]
1103 # Make sure the path to the jobs config files directory exists
1104 src.ensure_path_exists(jobs_cfg_files_dir)
1106 # list option : display all the available config files
1108 for cfg_dir in l_cfg_dir:
1109 if not options.no_label:
1110 logger.write("------ %s\n" %
1111 src.printcolors.printcHeader(cfg_dir))
1113 for f in sorted(os.listdir(cfg_dir)):
1114 if not f.endswith('.pyconf'):
1117 logger.write("%s\n" % cfilename)
1120 # Make sure the jobs_config option has been called
1121 if not options.jobs_cfg:
1122 message = _("The option --jobs_config is required\n")
1123 raise src.SatException( message )
1125 # Find the file in the directories
1127 for cfg_dir in l_cfg_dir:
1128 file_jobs_cfg = os.path.join(cfg_dir, options.jobs_cfg)
1129 if not file_jobs_cfg.endswith('.pyconf'):
1130 file_jobs_cfg += '.pyconf'
1132 if not os.path.exists(file_jobs_cfg):
1139 msg = _("The file configuration %(name_file)s was not found."
1140 "\nUse the --list option to get the possible files.")
1141 src.printcolors.printcError(msg)
1145 (_("Platform"), runner.cfg.VARS.dist),
1146 (_("File containing the jobs configuration"), file_jobs_cfg)
1148 src.print_info(logger, info)
1150 # Read the config that is in the file
1151 config_jobs = src.read_config_from_a_file(file_jobs_cfg)
1152 if options.only_jobs:
1153 l_jb = src.pyconf.Sequence()
1154 for jb in config_jobs.jobs:
1155 if jb.name in options.only_jobs:
1157 "Adding a job that was given in only_jobs option parameters")
1158 config_jobs.jobs = l_jb
1161 today_jobs = Jobs(runner, logger, options.jobs_cfg, file_jobs_cfg, config_jobs)
1162 # SSH connection to all machines
1163 today_jobs.ssh_connection_all_machines()
1164 if options.test_connection:
1169 gui = Gui("/export/home/serioja/LOGS/test.xml", today_jobs.ljobs, today_jobs.ljobsdef_not_today, "job_report.xsl")
1171 today_jobs.gui = gui
1175 # Run all the jobs contained in config_jobs
1176 today_jobs.run_jobs()
1177 except KeyboardInterrupt:
1179 logger.write("\n\n%s\n\n" %
1180 (src.printcolors.printcWarning(_("Forced interruption"))), 1)
1183 # find the potential not finished jobs and kill them
1184 for jb in today_jobs.ljobs:
1185 if not jb.has_finished():
1186 jb.kill_remote_process()
1188 today_jobs.gui.last_update(_("Forced interruption"))
1190 today_jobs.gui.last_update()
1191 # Output the results
1192 today_jobs.write_all_results()