3 # Copyright (C) 2010-2013 CEA/DEN
5 # This library is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation; either
8 # version 2.1 of the License.
10 # This library is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 # Lesser General Public License for more details.
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this library; if not, write to the Free Software
17 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
27 parser = src.options.Options()
29 parser.add_option('j', 'jobs_config', 'string', 'jobs_cfg',
30 _('The name of the config file that contains'
31 ' the jobs configuration'))
32 parser.add_option('o', 'only_jobs', 'list2', 'only_jobs',
33 _('The list of jobs to launch, by their name. '))
34 parser.add_option('l', 'list', 'boolean', 'list',
35 _('list all available config files.'))
36 parser.add_option('n', 'no_label', 'boolean', 'no_label',
37 _("do not print labels, Works only with --list."), False)
38 parser.add_option('t', 'test_connection', 'boolean', 'test_connection',
39 _("Try to connect to the machines. Not executing the jobs."),
41 parser.add_option('p', 'publish', 'boolean', 'publish',
42 _("Generate an xml file that can be read in a browser to "
43 "display the jobs status."),
46 class machine(object):
47 '''Class to manage a ssh connection on a machine
49 def __init__(self, name, host, user, port=22, passwd=None, sat_path="./"):
54 self.password = passwd
55 self.sat_path = sat_path
56 self.ssh = paramiko.SSHClient()
57 self._connection_successful = None
59 def connect(self, logger):
60 '''Initiate the ssh connection to the remote machine
62 :param logger src.logger.Logger: The logger instance
67 self._connection_successful = False
68 self.ssh.load_system_host_keys()
69 self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
71 self.ssh.connect(self.host,
74 password = self.password)
75 except paramiko.AuthenticationException:
76 message = src.KO_STATUS + _("Authentication failed")
77 except paramiko.BadHostKeyException:
78 message = (src.KO_STATUS +
79 _("The server's host key could not be verified"))
80 except paramiko.SSHException:
81 message = ( _("SSHException error connecting or establishing an SSH session"))
83 message = ( _("Error connecting or establishing an SSH session"))
85 self._connection_successful = True
89 def successfully_connected(self, logger):
90 '''Verify if the connection to the remote machine has succeed
92 :param logger src.logger.Logger: The logger instance
93 :return: True if the connection has succeed, False if not
96 if self._connection_successful == None:
97 message = "Warning : trying to ask if the connection to "
98 "(host: %s, port: %s, user: %s) is OK whereas there were"
99 " no connection request" % \
100 (machine.host, machine.port, machine.user)
101 logger.write( src.printcolors.printcWarning(message))
102 return self._connection_successful
104 def copy_sat(self, sat_local_path, job_file):
105 '''Copy salomeTools to the remote machine in self.sat_path
109 self.sftp = self.ssh.open_sftp()
110 self.mkdir(self.sat_path, ignore_existing=True)
111 self.put_dir(sat_local_path, self.sat_path, filters = ['.git'])
112 job_file_name = os.path.basename(job_file)
113 self.sftp.put(job_file, os.path.join(self.sat_path, "data", "jobs", job_file_name))
114 except Exception as e:
116 self._connection_successful = False
120 def put_dir(self, source, target, filters = []):
121 ''' Uploads the contents of the source directory to the target path. The
122 target directory needs to exists. All subdirectories in source are
123 created under target.
125 for item in os.listdir(source):
128 source_path = os.path.join(source, item)
129 destination_path = os.path.join(target, item)
130 if os.path.islink(source_path):
131 linkto = os.readlink(source_path)
133 self.sftp.remove(destination_path)
134 self.sftp.symlink(linkto, destination_path)
135 self.sftp.chmod(destination_path, os.stat(source_path).st_mode)
139 if os.path.isfile(source_path):
140 self.sftp.put(source_path, destination_path)
141 self.sftp.chmod(destination_path, os.stat(source_path).st_mode)
143 self.mkdir(destination_path, ignore_existing=True)
144 self.put_dir(source_path, destination_path)
146 def mkdir(self, path, mode=511, ignore_existing=False):
147 ''' Augments mkdir by adding an option to not fail if the folder exists '''
149 self.sftp.mkdir(path, mode)
156 def exec_command(self, command, logger):
157 '''Execute the command on the remote machine
159 :param command str: The command to be run
160 :param logger src.logger.Logger: The logger instance
161 :return: the stdin, stdout, and stderr of the executing command,
163 :rtype: (paramiko.channel.ChannelFile, paramiko.channel.ChannelFile,
164 paramiko.channel.ChannelFile)
167 # Does not wait the end of the command
168 (stdin, stdout, stderr) = self.ssh.exec_command(command)
169 except paramiko.SSHException:
170 message = src.KO_STATUS + _(
171 ": the server failed to execute the command\n")
172 logger.write( src.printcolors.printcError(message))
173 return (None, None, None)
175 logger.write( src.printcolors.printcError(src.KO_STATUS + '\n'))
176 return (None, None, None)
178 return (stdin, stdout, stderr)
181 '''Close the ssh connection
187 def write_info(self, logger):
188 '''Prints the informations relative to the machine in the logger
189 (terminal traces and log file)
191 :param logger src.logger.Logger: The logger instance
195 logger.write("host : " + self.host + "\n")
196 logger.write("port : " + str(self.port) + "\n")
197 logger.write("user : " + str(self.user) + "\n")
198 if self.successfully_connected(logger):
199 status = src.OK_STATUS
201 status = src.KO_STATUS
202 logger.write("Connection : " + status + "\n\n")
206 '''Class to manage one job
208 def __init__(self, name, machine, application, distribution,
209 commands, timeout, logger, job_file, after=None):
212 self.machine = machine
214 self.timeout = timeout
215 self.application = application
216 self.distribution = distribution
218 self.remote_log_files = []
222 self._has_begun = False
223 self._has_finished = False
224 self._has_timouted = False
225 self._stdin = None # Store the command inputs field
226 self._stdout = None # Store the command outputs field
227 self._stderr = None # Store the command errors field
229 self.out = None # Contains something only if the job is finished
230 self.err = None # Contains something only if the job is finished
232 self.commands = commands
233 self.command = os.path.join(self.machine.sat_path, "sat") + " -v1 job --jobs_config " + job_file + " --job " + self.name
237 cmd_pid = 'ps aux | grep "sat -v1 job --jobs_config" | awk \'{print $2}\''
238 (_, out_pid, _) = self.machine.exec_command(cmd_pid, self.logger)
239 pids_cmd = out_pid.readlines()
240 pids_cmd = [str(src.only_numbers(pid)) for pid in pids_cmd]
244 def kill_remote_process(self):
245 '''Kills the process on the remote machine.
247 :return: (the output of the kill, the error of the kill)
251 pids = self.get_pids()
252 cmd_kill = " ; ".join([("kill -9 " + pid) for pid in pids])
253 (_, out_kill, err_kill) = self.machine.exec_command(cmd_kill,
255 return (out_kill, err_kill)
258 '''Returns True if the job has already begun
260 :return: True if the job has already begun
263 return self._has_begun
265 def has_finished(self):
266 '''Returns True if the job has already finished
267 (i.e. all the commands have been executed)
268 If it is finished, the outputs are stored in the fields out and err.
270 :return: True if the job has already finished
274 # If the method has already been called and returned True
275 if self._has_finished:
278 # If the job has not begun yet
279 if not self.has_begun():
282 if self._stdout.channel.closed:
283 self._has_finished = True
284 # Store the result outputs
285 self.out = self._stdout.read()
286 self.err = self._stderr.read()
288 self._Tf = time.time()
289 # And get the remote command status and log files
292 return self._has_finished
294 def get_log_files(self):
295 if not self.has_finished():
296 msg = _("Trying to get log files whereas the job is not finished.")
297 self.logger.write(src.printcolors.printcWarning(msg))
299 out_lines = self.out.split("\n")
300 out_lines = [line for line in out_lines if line != '']
301 self.res_job = out_lines[0]
302 for job_path_remote in out_lines[1:]:
303 if os.path.basename(os.path.dirname(job_path_remote)) != 'OUT':
304 local_path = os.path.join(os.path.dirname(self.logger.logFilePath), os.path.basename(job_path_remote))
305 self.machine.sftp.get(job_path_remote, local_path)
307 local_path = os.path.join(os.path.dirname(self.logger.logFilePath), 'OUT', os.path.basename(job_path_remote))
308 self.machine.sftp.get(job_path_remote, local_path)
309 self.remote_log_files.append(local_path)
311 def is_running(self):
312 '''Returns True if the job commands are running
314 :return: True if the job is running
317 return self.has_begun() and not self.has_finished()
319 def is_timeout(self):
320 '''Returns True if the job commands has finished with timeout
322 :return: True if the job has finished with timeout
325 return self._has_timouted
327 def time_elapsed(self):
328 if not self.has_begun():
331 return T_now - self._T0
333 def check_time(self):
334 if not self.has_begun():
336 if self.time_elapsed() > self.timeout:
337 self._has_finished = True
338 self._has_timouted = True
339 self._Tf = time.time()
341 (out_kill, _) = self.kill_remote_process()
342 self.out = "TIMEOUT \n" + out_kill.read()
343 self.err = "TIMEOUT : %s seconds elapsed\n" % str(self.timeout)
345 def total_duration(self):
346 return self._Tf - self._T0
348 def run(self, logger):
350 print("Warn the user that a job can only be launched one time")
353 if not self.machine.successfully_connected(logger):
354 self._has_finished = True
356 self.err = ("Connection to machine (name : %s, host: %s, port: %s, user: %s) has failed\nUse the log command to get more information."
357 % (self.machine.name, self.machine.host, self.machine.port, self.machine.user))
359 self._T0 = time.time()
360 self._stdin, self._stdout, self._stderr = self.machine.exec_command(
361 self.command, logger)
362 if (self._stdin, self._stdout, self._stderr) == (None, None, None):
363 self._has_finished = True
364 self._Tf = time.time()
366 self.err = "The server failed to execute the command"
368 self._has_begun = True
370 def write_results(self, logger):
371 logger.write("name : " + self.name + "\n")
373 logger.write("after : %s\n" % self.after)
374 logger.write("Time elapsed : %4imin %2is \n" %
375 (self.total_duration()/60 , self.total_duration()%60))
377 logger.write("Begin time : %s\n" %
378 time.strftime('%Y-%m-%d %H:%M:%S',
379 time.localtime(self._T0)) )
381 logger.write("End time : %s\n\n" %
382 time.strftime('%Y-%m-%d %H:%M:%S',
383 time.localtime(self._Tf)) )
385 machine_head = "Informations about connection :\n"
386 underline = (len(machine_head) - 2) * "-"
387 logger.write(src.printcolors.printcInfo(machine_head + underline + "\n"))
388 self.machine.write_info(logger)
390 logger.write(src.printcolors.printcInfo("out : \n"))
392 logger.write("Unable to get output\n")
394 logger.write(self.out + "\n")
395 logger.write(src.printcolors.printcInfo("err : \n"))
397 logger.write("Unable to get error\n")
399 logger.write(self.err + "\n")
401 def get_status(self):
402 if not self.machine.successfully_connected(self.logger):
403 return "SSH connection KO"
404 if not self.has_begun():
405 return "Not launched"
406 if self.is_running():
407 return "running since " + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(self._T0))
408 if self.has_finished():
409 if self.is_timeout():
410 return "Timeout since " + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(self._Tf))
411 return "Finished since " + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(self._Tf))
414 '''Class to manage the jobs to be run
416 def __init__(self, runner, logger, job_file, job_file_path, config_jobs, lenght_columns = 20):
417 # The jobs configuration
418 self.cfg_jobs = config_jobs
419 self.job_file = job_file
420 self.job_file_path = job_file_path
421 # The machine that will be used today
423 # The list of machine (hosts, port) that will be used today
424 # (a same host can have several machine instances since there
425 # can be several ssh parameters)
427 # The jobs to be launched today
431 # The correlation dictionary between jobs and machines
432 self.dic_job_machine = {}
433 self.len_columns = lenght_columns
435 # the list of jobs that have not been run yet
436 self._l_jobs_not_started = []
437 # the list of jobs that have already ran
438 self._l_jobs_finished = []
439 # the list of jobs that are running
440 self._l_jobs_running = []
442 self.determine_products_and_machines()
444 def define_job(self, job_def, machine):
445 '''Takes a pyconf job definition and a machine (from class machine)
446 and returns the job instance corresponding to the definition.
448 :param job_def src.config.Mapping: a job definition
449 :param machine machine: the machine on which the job will run
450 :return: The corresponding job in a job class instance
454 cmmnds = job_def.commands
455 timeout = job_def.timeout
457 if 'after' in job_def:
458 after = job_def.after
460 if 'application' in job_def:
461 application = job_def.application
463 if 'distribution' in job_def:
464 distribution = job_def.distribution
466 return job(name, machine, application, distribution, cmmnds, timeout, self.logger, self.job_file , after = after)
468 def determine_products_and_machines(self):
469 '''Function that reads the pyconf jobs definition and instantiates all
470 the machines and jobs to be done today.
475 today = datetime.date.weekday(datetime.date.today())
478 for job_def in self.cfg_jobs.jobs :
479 if today in job_def.when:
481 name_machine = job_def.machine
484 for mach in self.lmachines:
485 if mach.name == name_machine:
489 if a_machine == None:
490 for machine_def in self.cfg_jobs.machines:
491 if machine_def.name == name_machine:
492 if 'host' not in machine_def:
493 host = self.runner.cfg.VARS.hostname
495 host = machine_def.host
497 if 'user' not in machine_def:
498 user = self.runner.cfg.VARS.user
500 user = machine_def.user
502 if 'port' not in machine_def:
505 port = machine_def.port
507 if 'password' not in machine_def:
510 passwd = machine_def.password
512 if 'sat_path' not in machine_def:
515 sat_path = machine_def.sat_path
526 if (host, port) not in host_list:
527 host_list.append((host, port))
529 self.lmachines.append(a_machine)
531 if a_machine == None:
532 msg = _("WARNING: The job \"%(job_name)s\" requires the "
533 "machine \"%(machine_name)s\" but this machine "
534 "is not defined in the configuration file.\n"
535 "The job will not be launched")
536 self.logger.write(src.printcolors.printcWarning(msg))
538 a_job = self.define_job(job_def, a_machine)
539 self.dic_job_machine[a_job] = a_machine
541 self.ljobs.append(a_job)
543 self.lhosts = host_list
545 def ssh_connection_all_machines(self, pad=50):
546 '''Function that do the ssh connection to every machine
552 self.logger.write(src.printcolors.printcInfo((
553 "Establishing connection with all the machines :\n")))
554 for machine in self.lmachines:
555 # little algorithm in order to display traces
556 begin_line = (_("Connection to %s: " % machine.name))
557 if pad - len(begin_line) < 0:
560 endline = (pad - len(begin_line)) * "." + " "
562 step = "SSH connection"
563 self.logger.write( begin_line + endline + step)
565 # the call to the method that initiate the ssh connection
566 msg = machine.connect(self.logger)
568 # Copy salomeTools to the remote machine
569 if machine.successfully_connected(self.logger):
571 self.logger.write('\r%s%s%s' % (begin_line, endline, 20 * " "), 3)
572 self.logger.write('\r%s%s%s' % (begin_line, endline, step), 3)
574 res_copy = machine.copy_sat(self.runner.cfg.VARS.salometoolsway, self.job_file_path)
575 # Print the status of the copy
577 self.logger.write('\r%s' % ((len(begin_line)+len(endline)+20) * " "), 3)
578 self.logger.write('\r%s%s%s' % (begin_line, endline, src.printcolors.printc(src.OK_STATUS)), 3)
580 self.logger.write('\r%s' % ((len(begin_line)+len(endline)+20) * " "), 3)
581 self.logger.write('\r%s%s%s %s' % (begin_line, endline, src.printcolors.printc(src.OK_STATUS), _("Copy of SAT failed")), 3)
583 self.logger.write('\r%s' % ((len(begin_line)+len(endline)+20) * " "), 3)
584 self.logger.write('\r%s%s%s %s' % (begin_line, endline, src.printcolors.printc(src.KO_STATUS), msg), 3)
585 self.logger.write("\n", 3)
587 self.logger.write("\n")
590 def is_occupied(self, hostname):
591 '''Function that returns True if a job is running on
592 the machine defined by its host and its port.
594 :param hostname (str, int): the pair (host, port)
595 :return: the job that is running on the host,
596 or false if there is no job running on the host.
601 for jb in self.dic_job_machine:
602 if jb.machine.host == host and jb.machine.port == port:
607 def update_jobs_states_list(self):
608 '''Function that updates the lists that store the currently
609 running jobs and the jobs that have already finished.
614 jobs_finished_list = []
615 jobs_running_list = []
616 for jb in self.dic_job_machine:
618 jobs_running_list.append(jb)
620 if jb.has_finished():
621 jobs_finished_list.append(jb)
623 nb_job_finished_before = len(self._l_jobs_finished)
624 self._l_jobs_finished = jobs_finished_list
625 self._l_jobs_running = jobs_running_list
627 nb_job_finished_now = len(self._l_jobs_finished)
629 return nb_job_finished_now > nb_job_finished_before
632 def findJobThatHasName(self, name):
633 '''Returns the job by its name.
635 :param name str: a job name
636 :return: the job that has the name.
639 for jb in self.ljobs:
643 # the following is executed only if the job was not found
644 msg = _('The job "%s" seems to be nonexistent') % name
645 raise src.SatException(msg)
647 def str_of_length(self, text, length):
648 '''Takes a string text of any length and returns
649 the most close string of length "length".
651 :param text str: any string
652 :param length int: a length for the returned string
653 :return: the most close string of length "length"
656 if len(text) > length:
657 text_out = text[:length-3] + '...'
659 diff = length - len(text)
660 before = " " * (diff/2)
661 after = " " * (diff/2 + diff%2)
662 text_out = before + text + after
666 def display_status(self, len_col):
667 '''Takes a lenght and construct the display of the current status
668 of the jobs in an array that has a column for each host.
669 It displays the job that is currently running on the host
672 :param len_col int: the size of the column
678 for host_port in self.lhosts:
679 jb = self.is_occupied(host_port)
680 if not jb: # nothing running on the host
681 empty = self.str_of_length("empty", len_col)
682 display_line += "|" + empty
684 display_line += "|" + src.printcolors.printcInfo(
685 self.str_of_length(jb.name, len_col))
687 self.logger.write("\r" + display_line + "|")
692 '''The main method. Runs all the jobs on every host.
693 For each host, at a given time, only one job can be running.
694 The jobs that have the field after (that contain the job that has
695 to be run before it) are run after the previous job.
696 This method stops when all the jobs are finished.
703 self.logger.write(src.printcolors.printcInfo(
704 _('Executing the jobs :\n')))
706 for host_port in self.lhosts:
709 if port == 22: # default value
710 text_line += "|" + self.str_of_length(host, self.len_columns)
712 text_line += "|" + self.str_of_length(
713 "("+host+", "+str(port)+")", self.len_columns)
715 tiret_line = " " + "-"*(len(text_line)-1) + "\n"
716 self.logger.write(tiret_line)
717 self.logger.write(text_line + "|\n")
718 self.logger.write(tiret_line)
721 # The infinite loop that runs the jobs
722 l_jobs_not_started = self.dic_job_machine.keys()
723 while len(self._l_jobs_finished) != len(self.dic_job_machine.keys()):
724 new_job_start = False
725 for host_port in self.lhosts:
727 if self.is_occupied(host_port):
730 for jb in l_jobs_not_started:
731 if (jb.machine.host, jb.machine.port) != host_port:
735 l_jobs_not_started.remove(jb)
739 jb_before = self.findJobThatHasName(jb.after)
740 if jb_before.has_finished():
742 l_jobs_not_started.remove(jb)
746 new_job_finished = self.update_jobs_states_list()
748 if new_job_start or new_job_finished:
749 self.gui.update_xml_file(self.ljobs)
750 # Display the current status
751 self.display_status(self.len_columns)
753 # Make sure that the proc is not entirely busy
756 self.logger.write("\n")
757 self.logger.write(tiret_line)
758 self.logger.write("\n\n")
760 self.gui.update_xml_file(self.ljobs)
761 self.gui.last_update()
763 def write_all_results(self):
764 '''Display all the jobs outputs.
770 for jb in self.dic_job_machine.keys():
771 self.logger.write(src.printcolors.printcLabel(
772 "#------- Results for job %s -------#\n" % jb.name))
773 jb.write_results(self.logger)
774 self.logger.write("\n\n")
777 '''Class to manage the the xml data that can be displayed in a browser to
782 <?xml version='1.0' encoding='utf-8'?>
783 <?xml-stylesheet type='text/xsl' href='job_report.xsl'?>
786 <info name="generated" value="2016-06-02 07:06:45"/>
789 <host name=is221553 port=22 distribution=UB12.04/>
790 <host name=is221560 port=22/>
791 <host name=is221553 port=22 distribution=FD20/>
794 <application name=SALOME-7.8.0/>
795 <application name=SALOME-master/>
796 <application name=MED-STANDALONE-master/>
797 <application name=CORPUS/>
801 <job name="7.8.0 FD22">
802 <host>is228809</host>
804 <application>SALOME-7.8.0</application>
805 <user>adminuser</user>
806 <timeout>240</timeout>
808 export DISPLAY=is221560
809 scp -p salome@is221560.intra.cea.fr:/export/home/salome/SALOME-7.7.1p1-src.tgz /local/adminuser
810 tar xf /local/adminuser/SALOME-7.7.1p1-src.tgz -C /local/adminuser
812 <state>Not launched</state>
815 <job name="master MG05">
816 <host>is221560</host>
818 <application>SALOME-master</application>
820 <timeout>240</timeout>
822 export DISPLAY=is221560
823 scp -p salome@is221560.intra.cea.fr:/export/home/salome/SALOME-7.7.1p1-src.tgz /local/adminuser
824 sat prepare SALOME-master
825 sat compile SALOME-master
826 sat check SALOME-master
827 sat launcher SALOME-master
828 sat test SALOME-master
830 <state>Running since 23 min</state>
831 <!-- <state>time out</state> -->
832 <!-- <state>OK</state> -->
833 <!-- <state>KO</state> -->
834 <begin>10/05/2016 20h32</begin>
835 <end>10/05/2016 22h59</end>
843 def __init__(self, xml_file_path, l_jobs, stylesheet):
844 # The path of the xml file
845 self.xml_file_path = xml_file_path
847 self.stylesheet = stylesheet
848 # Open the file in a writing stream
849 self.xml_file = src.xmlManager.XmlLogFile(xml_file_path, "JobsReport")
850 # Create the lines and columns
851 self.initialize_array(l_jobs)
853 self.update_xml_file(l_jobs)
855 def initialize_array(self, l_jobs):
859 distrib = job.distribution
860 if distrib not in l_dist:
861 l_dist.append(distrib)
863 application = job.application
864 if application not in l_applications:
865 l_applications.append(application)
868 self.l_applications = l_applications
870 # Update the hosts node
871 self.xmldists = self.xml_file.add_simple_node("distributions")
872 for dist_name in self.l_dist:
873 src.xmlManager.add_simple_node(self.xmldists, "dist", attrib={"name" : dist_name})
875 # Update the applications node
876 self.xmlapplications = self.xml_file.add_simple_node("applications")
877 for application in self.l_applications:
878 src.xmlManager.add_simple_node(self.xmlapplications, "application", attrib={"name" : application})
880 # Initialize the jobs node
881 self.xmljobs = self.xml_file.add_simple_node("jobs")
883 # Initialize the info node (when generated)
884 self.xmlinfos = self.xml_file.add_simple_node("infos", attrib={"name" : "last update", "JobsCommandStatus" : "running"})
886 def update_xml_file(self, l_jobs):
888 # Update the job names and status node
890 # Find the node corresponding to the job and delete it
891 # in order to recreate it
892 for xmljob in self.xmljobs.findall('job'):
893 if xmljob.attrib['name'] == job.name:
894 self.xmljobs.remove(xmljob)
896 # recreate the job node
897 xmlj = src.xmlManager.add_simple_node(self.xmljobs, "job", attrib={"name" : job.name})
898 src.xmlManager.add_simple_node(xmlj, "host", job.machine.host)
899 src.xmlManager.add_simple_node(xmlj, "port", str(job.machine.port))
900 src.xmlManager.add_simple_node(xmlj, "user", job.machine.user)
901 src.xmlManager.add_simple_node(xmlj, "application", job.application)
902 src.xmlManager.add_simple_node(xmlj, "distribution", job.distribution)
903 src.xmlManager.add_simple_node(xmlj, "timeout", str(job.timeout))
904 src.xmlManager.add_simple_node(xmlj, "commands", " ; ".join(job.commands))
905 src.xmlManager.add_simple_node(xmlj, "state", job.get_status())
906 src.xmlManager.add_simple_node(xmlj, "begin", str(job._T0))
907 src.xmlManager.add_simple_node(xmlj, "end", str(job._Tf))
908 src.xmlManager.add_simple_node(xmlj, "out", src.printcolors.cleancolor(job.out))
909 src.xmlManager.add_simple_node(xmlj, "err", src.printcolors.cleancolor(job.err))
910 if len(job.remote_log_files) > 0:
911 src.xmlManager.add_simple_node(xmlj, "remote_log_file_path", job.remote_log_files[0])
913 src.xmlManager.add_simple_node(xmlj, "remote_log_file_path", "nothing")
915 src.xmlManager.append_node_attrib(self.xmlinfos,
917 datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")})
920 self.write_xml_file()
922 def last_update(self, finish_status = "finished"):
923 src.xmlManager.append_node_attrib(self.xmlinfos,
924 attrib={"JobsCommandStatus" : finish_status})
926 self.write_xml_file()
928 def write_xml_file(self):
929 self.xml_file.write_tree(self.stylesheet)
932 # Describes the command
934 return _("The jobs command launches maintenances that are described"
935 " in the dedicated jobs configuration file.")
939 def run(args, runner, logger):
941 (options, args) = parser.parse_args(args)
943 jobs_cfg_files_dir = runner.cfg.SITE.jobs.config_path
945 l_cfg_dir = [jobs_cfg_files_dir, os.path.join(runner.cfg.VARS.datadir, "jobs")]
947 # Make sure the path to the jobs config files directory exists
948 src.ensure_path_exists(jobs_cfg_files_dir)
950 # list option : display all the available config files
952 for cfg_dir in l_cfg_dir:
953 if not options.no_label:
954 logger.write("------ %s\n" %
955 src.printcolors.printcHeader(cfg_dir))
957 for f in sorted(os.listdir(cfg_dir)):
958 if not f.endswith('.pyconf'):
961 logger.write("%s\n" % cfilename)
964 # Make sure the jobs_config option has been called
965 if not options.jobs_cfg:
966 message = _("The option --jobs_config is required\n")
967 raise src.SatException( message )
969 # Find the file in the directories
971 for cfg_dir in l_cfg_dir:
972 file_jobs_cfg = os.path.join(cfg_dir, options.jobs_cfg)
973 if not file_jobs_cfg.endswith('.pyconf'):
974 file_jobs_cfg += '.pyconf'
976 if not os.path.exists(file_jobs_cfg):
983 msg = _("The file configuration %(name_file)s was not found."
984 "\nUse the --list option to get the possible files.")
985 src.printcolors.printcError(msg)
989 (_("Platform"), runner.cfg.VARS.dist),
990 (_("File containing the jobs configuration"), file_jobs_cfg)
992 src.print_info(logger, info)
994 # Read the config that is in the file
995 config_jobs = src.read_config_from_a_file(file_jobs_cfg)
996 if options.only_jobs:
997 l_jb = src.pyconf.Sequence()
998 for jb in config_jobs.jobs:
999 if jb.name in options.only_jobs:
1001 "Adding a job that was given in only_jobs option parameters")
1002 config_jobs.jobs = l_jb
1005 today_jobs = Jobs(runner, logger, options.jobs_cfg, file_jobs_cfg, config_jobs)
1006 # SSH connection to all machines
1007 today_jobs.ssh_connection_all_machines()
1008 if options.test_connection:
1013 gui = Gui("/export/home/serioja/LOGS/test.xml", today_jobs.ljobs, "job_report.xsl")
1015 today_jobs.gui = gui
1019 # Run all the jobs contained in config_jobs
1020 today_jobs.run_jobs()
1021 except KeyboardInterrupt:
1023 logger.write("\n\n%s\n\n" %
1024 (src.printcolors.printcWarning(_("Forced interruption"))), 1)
1027 # find the potential not finished jobs and kill them
1028 for jb in today_jobs.ljobs:
1029 if not jb.has_finished():
1030 jb.kill_remote_process()
1032 today_jobs.gui.last_update(_("Forced interruption"))
1034 today_jobs.gui.last_update()
1035 # Output the results
1036 today_jobs.write_all_results()