Salome HOME
d63de43164e43dac1e446f57505cb6a4156ae71e
[tools/sat.git] / commands / jobs.py
1 #!/usr/bin/env python
2 #-*- coding:utf-8 -*-
3 #  Copyright (C) 2010-2013  CEA/DEN
4 #
5 #  This library is free software; you can redistribute it and/or
6 #  modify it under the terms of the GNU Lesser General Public
7 #  License as published by the Free Software Foundation; either
8 #  version 2.1 of the License.
9 #
10 #  This library is distributed in the hope that it will be useful,
11 #  but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 #  Lesser General Public License for more details.
14 #
15 #  You should have received a copy of the GNU Lesser General Public
16 #  License along with this library; if not, write to the Free Software
17 #  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
18
19 import os
20 import datetime
21 import time
22 import paramiko
23
24 import src
25
26 STYLESHEET_GLOBAL = "jobs_global_report.xsl"
27 STYLESHEET_TABLE = "jobs_table_report.xsl"
28
29 parser = src.options.Options()
30
31 parser.add_option('j', 'jobs_config', 'string', 'jobs_cfg', 
32                   _('The name of the config file that contains'
33                   ' the jobs configuration'))
34 parser.add_option('o', 'only_jobs', 'list2', 'only_jobs',
35                   _('The list of jobs to launch, by their name. '))
36 parser.add_option('l', 'list', 'boolean', 'list', 
37                   _('list all available config files.'))
38 parser.add_option('n', 'no_label', 'boolean', 'no_label',
39                   _("do not print labels, Works only with --list."), False)
40 parser.add_option('t', 'test_connection', 'boolean', 'test_connection',
41                   _("Try to connect to the machines. Not executing the jobs."),
42                   False)
43 parser.add_option('p', 'publish', 'boolean', 'publish',
44                   _("Generate an xml file that can be read in a browser to "
45                     "display the jobs status."),
46                   False)
47
48 class Machine(object):
49     '''Class to manage a ssh connection on a machine
50     '''
51     def __init__(self,
52                  name,
53                  host,
54                  user,
55                  port=22,
56                  passwd=None,
57                  sat_path="salomeTools"):
58         self.name = name
59         self.host = host
60         self.port = port
61         self.user = user
62         self.password = passwd
63         self.sat_path = sat_path
64         self.ssh = paramiko.SSHClient()
65         self._connection_successful = None
66     
67     def connect(self, logger):
68         '''Initiate the ssh connection to the remote machine
69         
70         :param logger src.logger.Logger: The logger instance 
71         :return: Nothing
72         :rtype: N\A
73         '''
74
75         self._connection_successful = False
76         self.ssh.load_system_host_keys()
77         self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
78         try:
79             self.ssh.connect(self.host,
80                              port=self.port,
81                              username=self.user,
82                              password = self.password)
83         except paramiko.AuthenticationException:
84             message = src.KO_STATUS + _("Authentication failed")
85         except paramiko.BadHostKeyException:
86             message = (src.KO_STATUS + 
87                        _("The server's host key could not be verified"))
88         except paramiko.SSHException:
89             message = ( _("SSHException error connecting or "
90                           "establishing an SSH session"))            
91         except:
92             message = ( _("Error connecting or establishing an SSH session"))
93         else:
94             self._connection_successful = True
95             message = ""
96         return message
97     
98     def successfully_connected(self, logger):
99         '''Verify if the connection to the remote machine has succeed
100         
101         :param logger src.logger.Logger: The logger instance 
102         :return: True if the connection has succeed, False if not
103         :rtype: bool
104         '''
105         if self._connection_successful == None:
106             message = _("Warning : trying to ask if the connection to "
107             "(name: %s host: %s, port: %s, user: %s) is OK whereas there were"
108             " no connection request" % 
109                         (self.name, self.host, self.port, self.user))
110             logger.write( src.printcolors.printcWarning(message))
111         return self._connection_successful
112
113     def copy_sat(self, sat_local_path, job_file):
114         '''Copy salomeTools to the remote machine in self.sat_path
115         '''
116         res = 0
117         try:
118             self.sftp = self.ssh.open_sftp()
119             self.mkdir(self.sat_path, ignore_existing=True)
120             self.put_dir(sat_local_path, self.sat_path, filters = ['.git'])
121             job_file_name = os.path.basename(job_file)
122             self.sftp.put(job_file, os.path.join(self.sat_path,
123                                                  "data",
124                                                  "jobs",
125                                                  job_file_name))
126         except Exception as e:
127             res = str(e)
128             self._connection_successful = False
129         
130         return res
131         
132     def put_dir(self, source, target, filters = []):
133         ''' Uploads the contents of the source directory to the target path. The
134             target directory needs to exists. All subdirectories in source are 
135             created under target.
136         '''
137         for item in os.listdir(source):
138             if item in filters:
139                 continue
140             source_path = os.path.join(source, item)
141             destination_path = os.path.join(target, item)
142             if os.path.islink(source_path):
143                 linkto = os.readlink(source_path)
144                 try:
145                     self.sftp.symlink(linkto, destination_path)
146                     self.sftp.chmod(destination_path,
147                                     os.stat(source_path).st_mode)
148                 except IOError:
149                     pass
150             else:
151                 if os.path.isfile(source_path):
152                     self.sftp.put(source_path, destination_path)
153                     self.sftp.chmod(destination_path,
154                                     os.stat(source_path).st_mode)
155                 else:
156                     self.mkdir(destination_path, ignore_existing=True)
157                     self.put_dir(source_path, destination_path)
158
159     def mkdir(self, path, mode=511, ignore_existing=False):
160         ''' Augments mkdir by adding an option to not fail 
161             if the folder exists 
162         '''
163         try:
164             self.sftp.mkdir(path, mode)
165         except IOError:
166             if ignore_existing:
167                 pass
168             else:
169                 raise       
170     
171     def exec_command(self, command, logger):
172         '''Execute the command on the remote machine
173         
174         :param command str: The command to be run
175         :param logger src.logger.Logger: The logger instance 
176         :return: the stdin, stdout, and stderr of the executing command,
177                  as a 3-tuple
178         :rtype: (paramiko.channel.ChannelFile, paramiko.channel.ChannelFile,
179                 paramiko.channel.ChannelFile)
180         '''
181         try:        
182             # Does not wait the end of the command
183             (stdin, stdout, stderr) = self.ssh.exec_command(command)
184         except paramiko.SSHException:
185             message = src.KO_STATUS + _(
186                             ": the server failed to execute the command\n")
187             logger.write( src.printcolors.printcError(message))
188             return (None, None, None)
189         except:
190             logger.write( src.printcolors.printcError(src.KO_STATUS + '\n'))
191             return (None, None, None)
192         else:
193             return (stdin, stdout, stderr)
194
195     def close(self):
196         '''Close the ssh connection
197         
198         :rtype: N\A
199         '''
200         self.ssh.close()
201      
202     def write_info(self, logger):
203         '''Prints the informations relative to the machine in the logger 
204            (terminal traces and log file)
205         
206         :param logger src.logger.Logger: The logger instance
207         :return: Nothing
208         :rtype: N\A
209         '''
210         logger.write("host : " + self.host + "\n")
211         logger.write("port : " + str(self.port) + "\n")
212         logger.write("user : " + str(self.user) + "\n")
213         if self.successfully_connected(logger):
214             status = src.OK_STATUS
215         else:
216             status = src.KO_STATUS
217         logger.write("Connection : " + status + "\n\n") 
218
219
220 class Job(object):
221     '''Class to manage one job
222     '''
223     def __init__(self, name, machine, application, distribution, table, 
224                  commands, timeout, config, logger, job_file, after=None):
225
226         self.name = name
227         self.machine = machine
228         self.after = after
229         self.timeout = timeout
230         self.application = application
231         self.distribution = distribution
232         self.table = table
233         self.config = config
234         self.logger = logger
235         # The list of log files to download from the remote machine 
236         self.remote_log_files = []
237         
238         # The remote command status
239         # -1 means that it has not been launched, 
240         # 0 means success and 1 means fail
241         self.res_job = "-1"
242         self.cancelled = False
243         
244         self._T0 = -1
245         self._Tf = -1
246         self._has_begun = False
247         self._has_finished = False
248         self._has_timouted = False
249         self._stdin = None # Store the command inputs field
250         self._stdout = None # Store the command outputs field
251         self._stderr = None # Store the command errors field
252
253         self.out = None # Contains something only if the job is finished
254         self.err = None # Contains something only if the job is finished    
255                
256         self.commands = commands
257         self.command = (os.path.join(self.machine.sat_path, "sat") +
258                         " -l " +
259                         os.path.join(self.machine.sat_path,
260                                      "list_log_files.txt") +
261                         " job --jobs_config " +
262                         job_file +
263                         " --job " +
264                         self.name)
265     
266     def get_pids(self):
267         pids = []
268         cmd_pid = 'ps aux | grep "' + self.command + '" | awk \'{print $2}\''
269         (_, out_pid, _) = self.machine.exec_command(cmd_pid, self.logger)
270         pids_cmd = out_pid.readlines()
271         pids_cmd = [str(src.only_numbers(pid)) for pid in pids_cmd]
272         pids+=pids_cmd
273         return pids
274     
275     def kill_remote_process(self):
276         '''Kills the process on the remote machine.
277         
278         :return: (the output of the kill, the error of the kill)
279         :rtype: (str, str)
280         '''
281         
282         pids = self.get_pids()
283         cmd_kill = " ; ".join([("kill -2 " + pid) for pid in pids])
284         (_, out_kill, err_kill) = self.machine.exec_command(cmd_kill, 
285                                                             self.logger)
286         return (out_kill, err_kill)
287             
288     def has_begun(self):
289         '''Returns True if the job has already begun
290         
291         :return: True if the job has already begun
292         :rtype: bool
293         '''
294         return self._has_begun
295     
296     def has_finished(self):
297         '''Returns True if the job has already finished 
298            (i.e. all the commands have been executed)
299            If it is finished, the outputs are stored in the fields out and err.
300         
301         :return: True if the job has already finished
302         :rtype: bool
303         '''
304         
305         # If the method has already been called and returned True
306         if self._has_finished:
307             return True
308         
309         # If the job has not begun yet
310         if not self.has_begun():
311             return False
312         
313         if self._stdout.channel.closed:
314             self._has_finished = True
315             # Store the result outputs
316             self.out = self._stdout.read()
317             self.err = self._stderr.read()
318             # Put end time
319             self._Tf = time.time()
320             # And get the remote command status and log files
321             self.get_log_files()
322         
323         return self._has_finished
324           
325     def get_log_files(self):
326         if not self.has_finished():
327             msg = _("Trying to get log files whereas the job is not finished.")
328             self.logger.write(src.printcolors.printcWarning(msg))
329             return
330         
331         tmp_file_path = src.get_tmp_filename(self.config, "list_log_files.txt")
332         self.machine.sftp.get(
333                     os.path.join(self.machine.sat_path, "list_log_files.txt"),
334                     tmp_file_path)
335         
336         fstream_tmp = open(tmp_file_path, "r")
337         file_lines = fstream_tmp.readlines()
338         file_lines = [line.replace("\n", "") for line in file_lines]
339         fstream_tmp.close()
340         os.remove(tmp_file_path)
341         self.res_job = file_lines[0]
342         for job_path_remote in file_lines[1:]:
343             if os.path.basename(os.path.dirname(job_path_remote)) != 'OUT':
344                 local_path = os.path.join(os.path.dirname(
345                                                     self.logger.logFilePath),
346                                           os.path.basename(job_path_remote))
347                 if not os.path.exists(local_path):
348                     self.machine.sftp.get(job_path_remote, local_path)
349             else:
350                 local_path = os.path.join(os.path.dirname(
351                                                     self.logger.logFilePath),
352                                           'OUT',
353                                           os.path.basename(job_path_remote))
354                 if not os.path.exists(local_path):
355                     self.machine.sftp.get(job_path_remote, local_path)
356             self.remote_log_files.append(local_path)
357
358     def has_failed(self):
359         '''Returns True if the job has failed. 
360            A job is considered as failed if the machine could not be reached,
361            if the remote command failed, 
362            or if the job finished with a time out.
363         
364         :return: True if the job has failed
365         :rtype: bool
366         '''
367         if not self.has_finished():
368             return False
369         if not self.machine.successfully_connected(self.logger):
370             return True
371         if self.is_timeout():
372             return True
373         if self.res_job == "1":
374             return True
375         return False
376     
377     def cancel(self):
378         """In case of a failing job, one has to cancel every job that depend 
379            on it. This method put the job as failed and will not be executed.
380         """
381         self._has_begun = True
382         self._has_finished = True
383         self.cancelled = True
384         self.out = _("This job was not launched because its father has failed.")
385         self.err = _("This job was not launched because its father has failed.")
386
387     def is_running(self):
388         '''Returns True if the job commands are running 
389         
390         :return: True if the job is running
391         :rtype: bool
392         '''
393         return self.has_begun() and not self.has_finished()
394
395     def is_timeout(self):
396         '''Returns True if the job commands has finished with timeout 
397         
398         :return: True if the job has finished with timeout
399         :rtype: bool
400         '''
401         return self._has_timouted
402
403     def time_elapsed(self):
404         if not self.has_begun():
405             return -1
406         T_now = time.time()
407         return T_now - self._T0
408     
409     def check_time(self):
410         if not self.has_begun():
411             return
412         if self.time_elapsed() > self.timeout:
413             self._has_finished = True
414             self._has_timouted = True
415             self._Tf = time.time()
416             self.get_pids()
417             (out_kill, _) = self.kill_remote_process()
418             self.out = "TIMEOUT \n" + out_kill.read()
419             self.err = "TIMEOUT : %s seconds elapsed\n" % str(self.timeout)
420     
421     def total_duration(self):
422         return self._Tf - self._T0
423         
424     def run(self, logger):
425         if self.has_begun():
426             print("Warn the user that a job can only be launched one time")
427             return
428         
429         if not self.machine.successfully_connected(logger):
430             self._has_finished = True
431             self.out = "N\A"
432             self.err = ("Connection to machine (name : %s, host: %s, port:"
433                         " %s, user: %s) has failed\nUse the log command "
434                         "to get more information."
435                         % (self.machine.name,
436                            self.machine.host,
437                            self.machine.port,
438                            self.machine.user))
439         else:
440             self._T0 = time.time()
441             self._stdin, self._stdout, self._stderr = self.machine.exec_command(
442                                                         self.command, logger)
443             if (self._stdin, self._stdout, self._stderr) == (None, None, None):
444                 self._has_finished = True
445                 self._Tf = time.time()
446                 self.out = "N\A"
447                 self.err = "The server failed to execute the command"
448         
449         self._has_begun = True
450     
451     def write_results(self, logger):
452         logger.write("name : " + self.name + "\n")
453         if self.after:
454             logger.write("after : %s\n" % self.after)
455         logger.write("Time elapsed : %4imin %2is \n" % 
456                      (self.total_duration()/60 , self.total_duration()%60))
457         if self._T0 != -1:
458             logger.write("Begin time : %s\n" % 
459                          time.strftime('%Y-%m-%d %H:%M:%S', 
460                                        time.localtime(self._T0)) )
461         if self._Tf != -1:
462             logger.write("End time   : %s\n\n" % 
463                          time.strftime('%Y-%m-%d %H:%M:%S', 
464                                        time.localtime(self._Tf)) )
465         
466         machine_head = "Informations about connection :\n"
467         underline = (len(machine_head) - 2) * "-"
468         logger.write(src.printcolors.printcInfo(machine_head+underline+"\n"))
469         self.machine.write_info(logger)
470         
471         logger.write(src.printcolors.printcInfo("out : \n"))
472         if self.out is None:
473             logger.write("Unable to get output\n")
474         else:
475             logger.write(self.out + "\n")
476         logger.write(src.printcolors.printcInfo("err : \n"))
477         if self.err is None:
478             logger.write("Unable to get error\n")
479         else:
480             logger.write(self.err + "\n")
481         
482     def get_status(self):
483         if not self.machine.successfully_connected(self.logger):
484             return "SSH connection KO"
485         if not self.has_begun():
486             return "Not launched"
487         if self.cancelled:
488             return "Cancelled"
489         if self.is_running():
490             return "running since " + time.strftime('%Y-%m-%d %H:%M:%S',
491                                                     time.localtime(self._T0))        
492         if self.has_finished():
493             if self.is_timeout():
494                 return "Timeout since " + time.strftime('%Y-%m-%d %H:%M:%S',
495                                                     time.localtime(self._Tf))
496             return "Finished since " + time.strftime('%Y-%m-%d %H:%M:%S',
497                                                      time.localtime(self._Tf))
498     
499 class Jobs(object):
500     '''Class to manage the jobs to be run
501     '''
502     def __init__(self,
503                  runner,
504                  logger,
505                  job_file,
506                  job_file_path,
507                  config_jobs,
508                  lenght_columns = 20):
509         # The jobs configuration
510         self.cfg_jobs = config_jobs
511         self.job_file = job_file
512         self.job_file_path = job_file_path
513         # The machine that will be used today
514         self.lmachines = []
515         # The list of machine (hosts, port) that will be used today 
516         # (a same host can have several machine instances since there 
517         # can be several ssh parameters) 
518         self.lhosts = []
519         # The jobs to be launched today 
520         self.ljobs = []
521         # The jobs that will not be launched today
522         self.ljobs_not_today = []
523         self.runner = runner
524         self.logger = logger
525         # The correlation dictionary between jobs and machines
526         self.dic_job_machine = {} 
527         self.len_columns = lenght_columns
528         
529         # the list of jobs that have not been run yet
530         self._l_jobs_not_started = []
531         # the list of jobs that have already ran 
532         self._l_jobs_finished = []
533         # the list of jobs that are running 
534         self._l_jobs_running = [] 
535                 
536         self.determine_jobs_and_machines()
537     
538     def define_job(self, job_def, machine):
539         '''Takes a pyconf job definition and a machine (from class machine)
540            and returns the job instance corresponding to the definition.
541         
542         :param job_def src.config.Mapping: a job definition 
543         :param machine machine: the machine on which the job will run
544         :return: The corresponding job in a job class instance
545         :rtype: job
546         '''
547         name = job_def.name
548         cmmnds = job_def.commands
549         timeout = job_def.timeout
550         after = None
551         if 'after' in job_def:
552             after = job_def.after
553         application = None
554         if 'application' in job_def:
555             application = job_def.application
556         distribution = None
557         if 'distribution' in job_def:
558             distribution = job_def.distribution
559         table = None
560         if 'table' in job_def:
561             table = job_def.table
562             
563         return Job(name,
564                    machine,
565                    application,
566                    distribution,
567                    table,
568                    cmmnds,
569                    timeout,
570                    self.runner.cfg,
571                    self.logger,
572                    self.job_file,
573                    after = after)
574     
575     def determine_jobs_and_machines(self):
576         '''Function that reads the pyconf jobs definition and instantiates all
577            the machines and jobs to be done today.
578
579         :return: Nothing
580         :rtype: N\A
581         '''
582         today = datetime.date.weekday(datetime.date.today())
583         host_list = []
584                
585         for job_def in self.cfg_jobs.jobs :
586                 
587             if not "machine" in job_def:
588                 msg = _('WARNING: The job "%s" do not have the key '
589                        '"machine", this job is ignored.\n\n' % job_def.name)
590                 self.logger.write(src.printcolors.printcWarning(msg))
591                 continue
592             name_machine = job_def.machine
593             
594             a_machine = None
595             for mach in self.lmachines:
596                 if mach.name == name_machine:
597                     a_machine = mach
598                     break
599             
600             if a_machine == None:
601                 for machine_def in self.cfg_jobs.machines:
602                     if machine_def.name == name_machine:
603                         if 'host' not in machine_def:
604                             host = self.runner.cfg.VARS.hostname
605                         else:
606                             host = machine_def.host
607
608                         if 'user' not in machine_def:
609                             user = self.runner.cfg.VARS.user
610                         else:
611                             user = machine_def.user
612
613                         if 'port' not in machine_def:
614                             port = 22
615                         else:
616                             port = machine_def.port
617             
618                         if 'password' not in machine_def:
619                             passwd = None
620                         else:
621                             passwd = machine_def.password    
622                             
623                         if 'sat_path' not in machine_def:
624                             sat_path = "salomeTools"
625                         else:
626                             sat_path = machine_def.sat_path
627                         
628                         a_machine = Machine(
629                                             machine_def.name,
630                                             host,
631                                             user,
632                                             port=port,
633                                             passwd=passwd,
634                                             sat_path=sat_path
635                                             )
636                         
637                         self.lmachines.append(a_machine)
638                         if (host, port) not in host_list:
639                             host_list.append((host, port))
640                 
641                 if a_machine == None:
642                     msg = _("WARNING: The job \"%(job_name)s\" requires the "
643                             "machine \"%(machine_name)s\" but this machine "
644                             "is not defined in the configuration file.\n"
645                             "The job will not be launched")
646                     self.logger.write(src.printcolors.printcWarning(msg))
647                                   
648             a_job = self.define_job(job_def, a_machine)
649             self.dic_job_machine[a_job] = a_machine
650                 
651             if today in job_def.when:    
652                 self.ljobs.append(a_job)
653             else: # today in job_def.when
654                 self.ljobs_not_today.append(a_job)
655                                      
656         self.lhosts = host_list
657         
658     def ssh_connection_all_machines(self, pad=50):
659         '''Function that do the ssh connection to every machine 
660            to be used today.
661
662         :return: Nothing
663         :rtype: N\A
664         '''
665         self.logger.write(src.printcolors.printcInfo((
666                         "Establishing connection with all the machines :\n")))
667         for machine in self.lmachines:
668             # little algorithm in order to display traces
669             begin_line = (_("Connection to %s: " % machine.name))
670             if pad - len(begin_line) < 0:
671                 endline = " "
672             else:
673                 endline = (pad - len(begin_line)) * "." + " "
674             
675             step = "SSH connection"
676             self.logger.write( begin_line + endline + step)
677             self.logger.flush()
678             # the call to the method that initiate the ssh connection
679             msg = machine.connect(self.logger)
680             
681             # Copy salomeTools to the remote machine
682             if machine.successfully_connected(self.logger):
683                 step = _("Copy SAT")
684                 self.logger.write('\r%s%s%s' % (begin_line, endline, 20 * " "),3)
685                 self.logger.write('\r%s%s%s' % (begin_line, endline, step), 3)
686                 self.logger.flush()
687                 res_copy = machine.copy_sat(self.runner.cfg.VARS.salometoolsway,
688                                             self.job_file_path)
689                 # Print the status of the copy
690                 if res_copy == 0:
691                     self.logger.write('\r%s' % 
692                                 ((len(begin_line)+len(endline)+20) * " "), 3)
693                     self.logger.write('\r%s%s%s' % 
694                         (begin_line, 
695                          endline, 
696                          src.printcolors.printc(src.OK_STATUS)), 3)
697                 else:
698                     self.logger.write('\r%s' % 
699                             ((len(begin_line)+len(endline)+20) * " "), 3)
700                     self.logger.write('\r%s%s%s %s' % 
701                         (begin_line,
702                          endline,
703                          src.printcolors.printc(src.OK_STATUS),
704                          _("Copy of SAT failed")), 3)
705             else:
706                 self.logger.write('\r%s' % 
707                                   ((len(begin_line)+len(endline)+20) * " "), 3)
708                 self.logger.write('\r%s%s%s %s' % 
709                     (begin_line,
710                      endline,
711                      src.printcolors.printc(src.KO_STATUS),
712                      msg), 3)
713             self.logger.write("\n", 3)
714                 
715         self.logger.write("\n")
716         
717
718     def is_occupied(self, hostname):
719         '''Function that returns True if a job is running on 
720            the machine defined by its host and its port.
721         
722         :param hostname (str, int): the pair (host, port)
723         :return: the job that is running on the host, 
724                 or false if there is no job running on the host. 
725         :rtype: job / bool
726         '''
727         host = hostname[0]
728         port = hostname[1]
729         for jb in self.dic_job_machine:
730             if jb.machine.host == host and jb.machine.port == port:
731                 if jb.is_running():
732                     return jb
733         return False
734     
735     def update_jobs_states_list(self):
736         '''Function that updates the lists that store the currently
737            running jobs and the jobs that have already finished.
738         
739         :return: Nothing. 
740         :rtype: N\A
741         '''
742         jobs_finished_list = []
743         jobs_running_list = []
744         for jb in self.dic_job_machine:
745             if jb.is_running():
746                 jobs_running_list.append(jb)
747                 jb.check_time()
748             if jb.has_finished():
749                 jobs_finished_list.append(jb)
750         
751         nb_job_finished_before = len(self._l_jobs_finished)
752         self._l_jobs_finished = jobs_finished_list
753         self._l_jobs_running = jobs_running_list
754         
755         nb_job_finished_now = len(self._l_jobs_finished)
756         
757         return nb_job_finished_now > nb_job_finished_before
758     
759     def cancel_dependencies_of_failing_jobs(self):
760         '''Function that cancels all the jobs that depend on a failing one.
761         
762         :return: Nothing. 
763         :rtype: N\A
764         '''
765         
766         for job in self.ljobs:
767             if job.after is None:
768                 continue
769             father_job = self.find_job_that_has_name(job.after)
770             if father_job.has_failed():
771                 job.cancel()
772     
773     def find_job_that_has_name(self, name):
774         '''Returns the job by its name.
775         
776         :param name str: a job name
777         :return: the job that has the name. 
778         :rtype: job
779         '''
780         for jb in self.ljobs:
781             if jb.name == name:
782                 return jb
783
784         # the following is executed only if the job was not found
785         msg = _('The job "%s" seems to be nonexistent') % name
786         raise src.SatException(msg)
787     
788     def str_of_length(self, text, length):
789         '''Takes a string text of any length and returns 
790            the most close string of length "length".
791         
792         :param text str: any string
793         :param length int: a length for the returned string
794         :return: the most close string of length "length"
795         :rtype: str
796         '''
797         if len(text) > length:
798             text_out = text[:length-3] + '...'
799         else:
800             diff = length - len(text)
801             before = " " * (diff/2)
802             after = " " * (diff/2 + diff%2)
803             text_out = before + text + after
804             
805         return text_out
806     
807     def display_status(self, len_col):
808         '''Takes a lenght and construct the display of the current status 
809            of the jobs in an array that has a column for each host.
810            It displays the job that is currently running on the host 
811            of the column.
812         
813         :param len_col int: the size of the column 
814         :return: Nothing
815         :rtype: N\A
816         '''
817         
818         display_line = ""
819         for host_port in self.lhosts:
820             jb = self.is_occupied(host_port)
821             if not jb: # nothing running on the host
822                 empty = self.str_of_length("empty", len_col)
823                 display_line += "|" + empty 
824             else:
825                 display_line += "|" + src.printcolors.printcInfo(
826                                         self.str_of_length(jb.name, len_col))
827         
828         self.logger.write("\r" + display_line + "|")
829         self.logger.flush()
830     
831
832     def run_jobs(self):
833         '''The main method. Runs all the jobs on every host. 
834            For each host, at a given time, only one job can be running.
835            The jobs that have the field after (that contain the job that has
836            to be run before it) are run after the previous job.
837            This method stops when all the jobs are finished.
838         
839         :return: Nothing
840         :rtype: N\A
841         '''
842
843         # Print header
844         self.logger.write(src.printcolors.printcInfo(
845                                                 _('Executing the jobs :\n')))
846         text_line = ""
847         for host_port in self.lhosts:
848             host = host_port[0]
849             port = host_port[1]
850             if port == 22: # default value
851                 text_line += "|" + self.str_of_length(host, self.len_columns)
852             else:
853                 text_line += "|" + self.str_of_length(
854                                 "("+host+", "+str(port)+")", self.len_columns)
855         
856         tiret_line = " " + "-"*(len(text_line)-1) + "\n"
857         self.logger.write(tiret_line)
858         self.logger.write(text_line + "|\n")
859         self.logger.write(tiret_line)
860         self.logger.flush()
861         
862         # The infinite loop that runs the jobs
863         l_jobs_not_started = self.dic_job_machine.keys()
864         while len(self._l_jobs_finished) != len(self.dic_job_machine.keys()):
865             new_job_start = False
866             for host_port in self.lhosts:
867                 
868                 if self.is_occupied(host_port):
869                     continue
870              
871                 for jb in l_jobs_not_started:
872                     if (jb.machine.host, jb.machine.port) != host_port:
873                         continue 
874                     if jb.after == None:
875                         jb.run(self.logger)
876                         l_jobs_not_started.remove(jb)
877                         new_job_start = True
878                         break
879                     else:
880                         jb_before = self.find_job_that_has_name(jb.after) 
881                         if jb_before.has_finished():
882                             jb.run(self.logger)
883                             l_jobs_not_started.remove(jb)
884                             new_job_start = True
885                             break
886             self.cancel_dependencies_of_failing_jobs()
887             new_job_finished = self.update_jobs_states_list()
888             
889             if new_job_start or new_job_finished:
890                 self.gui.update_xml_files(self.ljobs)            
891                 # Display the current status     
892                 self.display_status(self.len_columns)
893             
894             # Make sure that the proc is not entirely busy
895             time.sleep(0.001)
896         
897         self.logger.write("\n")    
898         self.logger.write(tiret_line)                   
899         self.logger.write("\n\n")
900         
901         self.gui.update_xml_files(self.ljobs)
902         self.gui.last_update()
903
904     def write_all_results(self):
905         '''Display all the jobs outputs.
906         
907         :return: Nothing
908         :rtype: N\A
909         '''
910         
911         for jb in self.dic_job_machine.keys():
912             self.logger.write(src.printcolors.printcLabel(
913                         "#------- Results for job %s -------#\n" % jb.name))
914             jb.write_results(self.logger)
915             self.logger.write("\n\n")
916
917 class Gui(object):
918     '''Class to manage the the xml data that can be displayed in a browser to
919        see the jobs states
920     '''
921    
922     def __init__(self, xml_dir_path, l_jobs, l_jobs_not_today):
923         # The path of the global xml file
924         self.xml_dir_path = xml_dir_path
925         # Initialize the xml files
926         xml_global_path = os.path.join(self.xml_dir_path, "global_report.xml")
927         self.xml_global_file = src.xmlManager.XmlLogFile(xml_global_path,
928                                                          "JobsReport")
929         # The xml files that corresponds to the tables.
930         # {name_table : xml_object}}
931         self.d_xml_table_files = {}
932         # Create the lines and columns
933         self.initialize_arrays(l_jobs, l_jobs_not_today)
934         # Write the xml file
935         self.update_xml_files(l_jobs)
936     
937     def initialize_arrays(self, l_jobs, l_jobs_not_today):
938        
939         # Get the tables to fill and put it in a dictionary
940         # {table_name : xml instance corresponding to the table}
941         for job in l_jobs + l_jobs_not_today:
942             table = job.table
943             if (table is not None and 
944                     table not in self.d_xml_table_files.keys()):
945                 xml_table_path = os.path.join(self.xml_dir_path, table + ".xml")
946                 self.d_xml_table_files[table] =  src.xmlManager.XmlLogFile(
947                                                             xml_table_path,
948                                                             "JobsReport")
949                 self.d_xml_table_files[table].add_simple_node("distributions")
950                 self.d_xml_table_files[table].add_simple_node("applications")
951                 self.d_xml_table_files[table].add_simple_node("table", text=table)
952                  
953         d_dist = {}
954         d_application = {}
955         for table in self.d_xml_table_files:
956             d_dist[table] = []
957             d_application[table] = []
958             
959         l_hosts_ports = []
960             
961         for job in l_jobs + l_jobs_not_today:
962             
963             if (job.machine.host, job.machine.port) not in l_hosts_ports:
964                 l_hosts_ports.append((job.machine.host, job.machine.port))
965                 
966             distrib = job.distribution
967             application = job.application
968             
969             table_job = job.table
970             if table is None:
971                 continue
972             for table in self.d_xml_table_files:
973                 if table_job == table:
974                     if distrib is not None and distrib not in d_dist[table]:
975                         d_dist[table].append(distrib)
976                         src.xmlManager.add_simple_node(
977                             self.d_xml_table_files[table].xmlroot.find('distributions'),
978                                                    "dist",
979                                                    attrib={"name" : distrib})
980                     
981                 if table_job == table:
982                     if application is not None and application not in d_application[table]:
983                         d_application[table].append(application)
984                         src.xmlManager.add_simple_node(self.d_xml_table_files[table].xmlroot.find('applications'),
985                                                    "application",
986                                                    attrib={"name" : application})
987
988         # Initialize the hosts_ports node for the global file
989         self.xmlhosts_ports = self.xml_global_file.add_simple_node("hosts_ports")
990         for host, port in l_hosts_ports:
991             host_port = "%s:%i" % (host, port)
992             src.xmlManager.add_simple_node(self.xmlhosts_ports,
993                                            "host_port",
994                                            attrib={"name" : host_port})
995         
996         # Initialize the jobs node in all files
997         for xml_file in [self.xml_global_file] + self.d_xml_table_files.values():
998             xml_jobs = xml_file.add_simple_node("jobs")      
999             # Get the jobs present in the config file but that will not be launched
1000             # today
1001             self.put_jobs_not_today(l_jobs_not_today, xml_jobs)
1002             
1003             xml_file.add_simple_node("infos", attrib={"name" : "last update", "JobsCommandStatus" : "running"})
1004
1005     
1006     def put_jobs_not_today(self, l_jobs_not_today, xml_node_jobs):
1007         for job in l_jobs_not_today:
1008             xmlj = src.xmlManager.add_simple_node(xml_node_jobs,
1009                                                  "job",
1010                                                  attrib={"name" : job.name})
1011             src.xmlManager.add_simple_node(xmlj, "application", job.application)
1012             src.xmlManager.add_simple_node(xmlj,
1013                                            "distribution",
1014                                            job.distribution)
1015             src.xmlManager.add_simple_node(xmlj, "table", job.table)
1016             src.xmlManager.add_simple_node(xmlj,
1017                                        "commands", " ; ".join(job.commands))
1018             src.xmlManager.add_simple_node(xmlj, "state", "Not today")
1019             src.xmlManager.add_simple_node(xmlj, "machine", job.machine.name)
1020             src.xmlManager.add_simple_node(xmlj, "host", job.machine.host)
1021             src.xmlManager.add_simple_node(xmlj, "port", str(job.machine.port))
1022             src.xmlManager.add_simple_node(xmlj, "user", job.machine.user)
1023             src.xmlManager.add_simple_node(xmlj, "sat_path",
1024                                                         job.machine.sat_path)
1025     
1026     def update_xml_files(self, l_jobs):
1027         for xml_file in [self.xml_global_file] + self.d_xml_table_files.values():
1028             self.update_xml_file(l_jobs, xml_file)
1029             
1030     def update_xml_file(self, l_jobs, xml_file):      
1031         
1032         xml_node_jobs = xml_file.xmlroot.find('jobs')
1033         # Update the job names and status node
1034         for job in l_jobs:
1035             # Find the node corresponding to the job and delete it
1036             # in order to recreate it
1037             for xmljob in xml_node_jobs.findall('job'):
1038                 if xmljob.attrib['name'] == job.name:
1039                     xml_node_jobs.remove(xmljob)
1040             
1041             T0 = str(job._T0)
1042             if T0 != "-1":
1043                 T0 = time.strftime('%Y-%m-%d %H:%M:%S', 
1044                                        time.localtime(job._T0))
1045             Tf = str(job._Tf)
1046             if Tf != "-1":
1047                 Tf = time.strftime('%Y-%m-%d %H:%M:%S', 
1048                                        time.localtime(job._Tf))
1049             
1050             # recreate the job node
1051             xmlj = src.xmlManager.add_simple_node(xml_node_jobs,
1052                                                   "job",
1053                                                   attrib={"name" : job.name})
1054             src.xmlManager.add_simple_node(xmlj, "machine", job.machine.name)
1055             src.xmlManager.add_simple_node(xmlj, "host", job.machine.host)
1056             src.xmlManager.add_simple_node(xmlj, "port", str(job.machine.port))
1057             src.xmlManager.add_simple_node(xmlj, "user", job.machine.user)
1058             src.xmlManager.add_simple_node(xmlj, "sat_path",
1059                                            job.machine.sat_path)
1060             src.xmlManager.add_simple_node(xmlj, "application", job.application)
1061             src.xmlManager.add_simple_node(xmlj, "distribution",
1062                                            job.distribution)
1063             src.xmlManager.add_simple_node(xmlj, "table", job.table)
1064             src.xmlManager.add_simple_node(xmlj, "timeout", str(job.timeout))
1065             src.xmlManager.add_simple_node(xmlj, "commands",
1066                                            " ; ".join(job.commands))
1067             src.xmlManager.add_simple_node(xmlj, "state", job.get_status())
1068             src.xmlManager.add_simple_node(xmlj, "begin", T0)
1069             src.xmlManager.add_simple_node(xmlj, "end", Tf)
1070             src.xmlManager.add_simple_node(xmlj, "out",
1071                                            src.printcolors.cleancolor(job.out))
1072             src.xmlManager.add_simple_node(xmlj, "err",
1073                                            src.printcolors.cleancolor(job.err))
1074             src.xmlManager.add_simple_node(xmlj, "res", str(job.res_job))
1075             if len(job.remote_log_files) > 0:
1076                 src.xmlManager.add_simple_node(xmlj,
1077                                                "remote_log_file_path",
1078                                                job.remote_log_files[0])
1079             else:
1080                 src.xmlManager.add_simple_node(xmlj,
1081                                                "remote_log_file_path",
1082                                                "nothing")           
1083             
1084             xmlafter = src.xmlManager.add_simple_node(xmlj, "after", job.after)
1085             # get the job father
1086             if job.after is not None:
1087                 job_father = None
1088                 for jb in l_jobs:
1089                     if jb.name == job.after:
1090                         job_father = jb
1091                 if job_father is None:
1092                     msg = _("The job %(father_name)s that is parent of "
1093                             "%(son_name)s is not in the job list." %
1094                             {"father_name" : job.after , "son_name" : job.name})
1095                     raise src.SatException(msg)
1096                 
1097                 if len(job_father.remote_log_files) > 0:
1098                     link = job_father.remote_log_files[0]
1099                 else:
1100                     link = "nothing"
1101                 src.xmlManager.append_node_attrib(xmlafter, {"link" : link})
1102             
1103         
1104         # Update the date
1105         xml_node_infos = xml_file.xmlroot.find('infos')
1106         src.xmlManager.append_node_attrib(xml_node_infos,
1107                     attrib={"value" : 
1108                     datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")})
1109                
1110         # Write the file
1111         self.write_xml_files()
1112     
1113     def last_update(self, finish_status = "finished"):
1114         for xml_file in [self.xml_global_file] + self.d_xml_table_files.values():
1115             xml_node_infos = xml_file.xmlroot.find('infos')
1116             src.xmlManager.append_node_attrib(xml_node_infos,
1117                         attrib={"JobsCommandStatus" : finish_status})
1118         # Write the file
1119         self.write_xml_files()
1120     
1121     def write_xml_files(self):
1122         self.xml_global_file.write_tree(STYLESHEET_GLOBAL)
1123         for xml_file in self.d_xml_table_files.values():
1124             xml_file.write_tree(STYLESHEET_TABLE)
1125         
1126 ##
1127 # Describes the command
1128 def description():
1129     return _("The jobs command launches maintenances that are described"
1130              " in the dedicated jobs configuration file.")
1131
1132 ##
1133 # Runs the command.
1134 def run(args, runner, logger):
1135        
1136     (options, args) = parser.parse_args(args)
1137        
1138     jobs_cfg_files_dir = runner.cfg.SITE.jobs.config_path
1139     
1140     l_cfg_dir = [jobs_cfg_files_dir,
1141                  os.path.join(runner.cfg.VARS.datadir, "jobs")]
1142     
1143     # Make sure the path to the jobs config files directory exists 
1144     src.ensure_path_exists(jobs_cfg_files_dir)   
1145     
1146     # list option : display all the available config files
1147     if options.list:
1148         for cfg_dir in l_cfg_dir:
1149             if not options.no_label:
1150                 logger.write("------ %s\n" % 
1151                                  src.printcolors.printcHeader(cfg_dir))
1152     
1153             for f in sorted(os.listdir(cfg_dir)):
1154                 if not f.endswith('.pyconf'):
1155                     continue
1156                 cfilename = f[:-7]
1157                 logger.write("%s\n" % cfilename)
1158         return 0
1159
1160     # Make sure the jobs_config option has been called
1161     if not options.jobs_cfg:
1162         message = _("The option --jobs_config is required\n")      
1163         raise src.SatException( message )
1164     
1165     # Find the file in the directories
1166     found = False
1167     for cfg_dir in l_cfg_dir:
1168         file_jobs_cfg = os.path.join(cfg_dir, options.jobs_cfg)
1169         if not file_jobs_cfg.endswith('.pyconf'):
1170             file_jobs_cfg += '.pyconf'
1171         
1172         if not os.path.exists(file_jobs_cfg):
1173             continue
1174         else:
1175             found = True
1176             break
1177     
1178     if not found:
1179         msg = _("The file configuration %(name_file)s was not found."
1180                 "\nUse the --list option to get the possible files.")
1181         src.printcolors.printcError(msg)
1182         return 1
1183     
1184     info = [
1185         (_("Platform"), runner.cfg.VARS.dist),
1186         (_("File containing the jobs configuration"), file_jobs_cfg)
1187     ]    
1188     src.print_info(logger, info)
1189     
1190     # Read the config that is in the file
1191     config_jobs = src.read_config_from_a_file(file_jobs_cfg)
1192     if options.only_jobs:
1193         l_jb = src.pyconf.Sequence()
1194         for jb in config_jobs.jobs:
1195             if jb.name in options.only_jobs:
1196                 l_jb.append(jb,
1197                 "Adding a job that was given in only_jobs option parameters")
1198         config_jobs.jobs = l_jb
1199               
1200     # Initialization
1201     today_jobs = Jobs(runner,
1202                       logger,
1203                       options.jobs_cfg,
1204                       file_jobs_cfg,
1205                       config_jobs)
1206     # SSH connection to all machines
1207     today_jobs.ssh_connection_all_machines()
1208     if options.test_connection:
1209         return 0
1210     
1211     gui = None
1212     if options.publish:
1213         gui = Gui("/export/home/serioja/LOGS",
1214                   today_jobs.ljobs,
1215                   today_jobs.ljobs_not_today,)
1216     
1217     today_jobs.gui = gui
1218     
1219     interruped = False
1220     try:
1221         # Run all the jobs contained in config_jobs
1222         today_jobs.run_jobs()
1223     except KeyboardInterrupt:
1224         interruped = True
1225         logger.write("\n\n%s\n\n" % 
1226                 (src.printcolors.printcWarning(_("Forced interruption"))), 1)
1227         
1228     finally:
1229         # find the potential not finished jobs and kill them
1230         for jb in today_jobs.ljobs:
1231             if not jb.has_finished():
1232                 jb.kill_remote_process()
1233         if interruped:
1234             today_jobs.gui.last_update(_("Forced interruption"))
1235         else:
1236             today_jobs.gui.last_update()
1237         # Output the results
1238         today_jobs.write_all_results()