Salome HOME
Try to get remote log files when a job finishes with a timeout
[tools/sat.git] / commands / jobs.py
1 #!/usr/bin/env python
2 #-*- coding:utf-8 -*-
3 #  Copyright (C) 2010-2013  CEA/DEN
4 #
5 #  This library is free software; you can redistribute it and/or
6 #  modify it under the terms of the GNU Lesser General Public
7 #  License as published by the Free Software Foundation; either
8 #  version 2.1 of the License.
9 #
10 #  This library is distributed in the hope that it will be useful,
11 #  but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 #  Lesser General Public License for more details.
14 #
15 #  You should have received a copy of the GNU Lesser General Public
16 #  License along with this library; if not, write to the Free Software
17 #  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
18
19 import os
20 import datetime
21 import time
22 import paramiko
23
24 import src
25
26 STYLESHEET_GLOBAL = "jobs_global_report.xsl"
27 STYLESHEET_TABLE = "jobs_table_report.xsl"
28
29 parser = src.options.Options()
30
31 parser.add_option('j', 'jobs_config', 'string', 'jobs_cfg', 
32                   _('The name of the config file that contains'
33                   ' the jobs configuration'))
34 parser.add_option('o', 'only_jobs', 'list2', 'only_jobs',
35                   _('The list of jobs to launch, by their name. '))
36 parser.add_option('l', 'list', 'boolean', 'list', 
37                   _('list all available config files.'))
38 parser.add_option('n', 'no_label', 'boolean', 'no_label',
39                   _("do not print labels, Works only with --list."), False)
40 parser.add_option('t', 'test_connection', 'boolean', 'test_connection',
41                   _("Try to connect to the machines. Not executing the jobs."),
42                   False)
43 parser.add_option('p', 'publish', 'boolean', 'publish',
44                   _("Generate an xml file that can be read in a browser to "
45                     "display the jobs status."),
46                   False)
47
48 class Machine(object):
49     '''Class to manage a ssh connection on a machine
50     '''
51     def __init__(self,
52                  name,
53                  host,
54                  user,
55                  port=22,
56                  passwd=None,
57                  sat_path="salomeTools"):
58         self.name = name
59         self.host = host
60         self.port = port
61         self.user = user
62         self.password = passwd
63         self.sat_path = sat_path
64         self.ssh = paramiko.SSHClient()
65         self._connection_successful = None
66     
67     def connect(self, logger):
68         '''Initiate the ssh connection to the remote machine
69         
70         :param logger src.logger.Logger: The logger instance 
71         :return: Nothing
72         :rtype: N\A
73         '''
74
75         self._connection_successful = False
76         self.ssh.load_system_host_keys()
77         self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
78         try:
79             self.ssh.connect(self.host,
80                              port=self.port,
81                              username=self.user,
82                              password = self.password)
83         except paramiko.AuthenticationException:
84             message = src.KO_STATUS + _("Authentication failed")
85         except paramiko.BadHostKeyException:
86             message = (src.KO_STATUS + 
87                        _("The server's host key could not be verified"))
88         except paramiko.SSHException:
89             message = ( _("SSHException error connecting or "
90                           "establishing an SSH session"))            
91         except:
92             message = ( _("Error connecting or establishing an SSH session"))
93         else:
94             self._connection_successful = True
95             message = ""
96         return message
97     
98     def successfully_connected(self, logger):
99         '''Verify if the connection to the remote machine has succeed
100         
101         :param logger src.logger.Logger: The logger instance 
102         :return: True if the connection has succeed, False if not
103         :rtype: bool
104         '''
105         if self._connection_successful == None:
106             message = _("Warning : trying to ask if the connection to "
107             "(name: %s host: %s, port: %s, user: %s) is OK whereas there were"
108             " no connection request" % 
109                         (self.name, self.host, self.port, self.user))
110             logger.write( src.printcolors.printcWarning(message))
111         return self._connection_successful
112
113     def copy_sat(self, sat_local_path, job_file):
114         '''Copy salomeTools to the remote machine in self.sat_path
115         '''
116         res = 0
117         try:
118             self.sftp = self.ssh.open_sftp()
119             self.mkdir(self.sat_path, ignore_existing=True)
120             self.put_dir(sat_local_path, self.sat_path, filters = ['.git'])
121             job_file_name = os.path.basename(job_file)
122             self.sftp.put(job_file, os.path.join(self.sat_path,
123                                                  "data",
124                                                  "jobs",
125                                                  job_file_name))
126         except Exception as e:
127             res = str(e)
128             self._connection_successful = False
129         
130         return res
131         
132     def put_dir(self, source, target, filters = []):
133         ''' Uploads the contents of the source directory to the target path. The
134             target directory needs to exists. All subdirectories in source are 
135             created under target.
136         '''
137         for item in os.listdir(source):
138             if item in filters:
139                 continue
140             source_path = os.path.join(source, item)
141             destination_path = os.path.join(target, item)
142             if os.path.islink(source_path):
143                 linkto = os.readlink(source_path)
144                 try:
145                     self.sftp.symlink(linkto, destination_path)
146                     self.sftp.chmod(destination_path,
147                                     os.stat(source_path).st_mode)
148                 except IOError:
149                     pass
150             else:
151                 if os.path.isfile(source_path):
152                     self.sftp.put(source_path, destination_path)
153                     self.sftp.chmod(destination_path,
154                                     os.stat(source_path).st_mode)
155                 else:
156                     self.mkdir(destination_path, ignore_existing=True)
157                     self.put_dir(source_path, destination_path)
158
159     def mkdir(self, path, mode=511, ignore_existing=False):
160         ''' Augments mkdir by adding an option to not fail 
161             if the folder exists 
162         '''
163         try:
164             self.sftp.mkdir(path, mode)
165         except IOError:
166             if ignore_existing:
167                 pass
168             else:
169                 raise       
170     
171     def exec_command(self, command, logger):
172         '''Execute the command on the remote machine
173         
174         :param command str: The command to be run
175         :param logger src.logger.Logger: The logger instance 
176         :return: the stdin, stdout, and stderr of the executing command,
177                  as a 3-tuple
178         :rtype: (paramiko.channel.ChannelFile, paramiko.channel.ChannelFile,
179                 paramiko.channel.ChannelFile)
180         '''
181         try:        
182             # Does not wait the end of the command
183             (stdin, stdout, stderr) = self.ssh.exec_command(command)
184         except paramiko.SSHException:
185             message = src.KO_STATUS + _(
186                             ": the server failed to execute the command\n")
187             logger.write( src.printcolors.printcError(message))
188             return (None, None, None)
189         except:
190             logger.write( src.printcolors.printcError(src.KO_STATUS + '\n'))
191             return (None, None, None)
192         else:
193             return (stdin, stdout, stderr)
194
195     def close(self):
196         '''Close the ssh connection
197         
198         :rtype: N\A
199         '''
200         self.ssh.close()
201      
202     def write_info(self, logger):
203         '''Prints the informations relative to the machine in the logger 
204            (terminal traces and log file)
205         
206         :param logger src.logger.Logger: The logger instance
207         :return: Nothing
208         :rtype: N\A
209         '''
210         logger.write("host : " + self.host + "\n")
211         logger.write("port : " + str(self.port) + "\n")
212         logger.write("user : " + str(self.user) + "\n")
213         if self.successfully_connected(logger):
214             status = src.OK_STATUS
215         else:
216             status = src.KO_STATUS
217         logger.write("Connection : " + status + "\n\n") 
218
219
220 class Job(object):
221     '''Class to manage one job
222     '''
223     def __init__(self, name, machine, application, distribution, table, 
224                  commands, timeout, config, logger, job_file, after=None):
225
226         self.name = name
227         self.machine = machine
228         self.after = after
229         self.timeout = timeout
230         self.application = application
231         self.distribution = distribution
232         self.table = table
233         self.config = config
234         self.logger = logger
235         # The list of log files to download from the remote machine 
236         self.remote_log_files = []
237         
238         # The remote command status
239         # -1 means that it has not been launched, 
240         # 0 means success and 1 means fail
241         self.res_job = "-1"
242         self.cancelled = False
243         
244         self._T0 = -1
245         self._Tf = -1
246         self._has_begun = False
247         self._has_finished = False
248         self._has_timouted = False
249         self._stdin = None # Store the command inputs field
250         self._stdout = None # Store the command outputs field
251         self._stderr = None # Store the command errors field
252
253         self.out = None # Contains something only if the job is finished
254         self.err = None # Contains something only if the job is finished    
255                
256         self.commands = commands
257         self.command = (os.path.join(self.machine.sat_path, "sat") +
258                         " -l " +
259                         os.path.join(self.machine.sat_path,
260                                      "list_log_files.txt") +
261                         " job --jobs_config " +
262                         job_file +
263                         " --job " +
264                         self.name)
265     
266     def get_pids(self):
267         pids = []
268         cmd_pid = 'ps aux | grep "' + self.command + '" | awk \'{print $2}\''
269         (_, out_pid, _) = self.machine.exec_command(cmd_pid, self.logger)
270         pids_cmd = out_pid.readlines()
271         pids_cmd = [str(src.only_numbers(pid)) for pid in pids_cmd]
272         pids+=pids_cmd
273         return pids
274     
275     def kill_remote_process(self, wait=1):
276         '''Kills the process on the remote machine.
277         
278         :return: (the output of the kill, the error of the kill)
279         :rtype: (str, str)
280         '''
281         
282         pids = self.get_pids()
283         cmd_kill = " ; ".join([("kill -2 " + pid) for pid in pids])
284         (_, out_kill, err_kill) = self.machine.exec_command(cmd_kill, 
285                                                             self.logger)
286         time.sleep(wait)
287         return (out_kill, err_kill)
288             
289     def has_begun(self):
290         '''Returns True if the job has already begun
291         
292         :return: True if the job has already begun
293         :rtype: bool
294         '''
295         return self._has_begun
296     
297     def has_finished(self):
298         '''Returns True if the job has already finished 
299            (i.e. all the commands have been executed)
300            If it is finished, the outputs are stored in the fields out and err.
301         
302         :return: True if the job has already finished
303         :rtype: bool
304         '''
305         
306         # If the method has already been called and returned True
307         if self._has_finished:
308             return True
309         
310         # If the job has not begun yet
311         if not self.has_begun():
312             return False
313         
314         if self._stdout.channel.closed:
315             self._has_finished = True
316             # Store the result outputs
317             self.out = self._stdout.read()
318             self.err = self._stderr.read()
319             # Put end time
320             self._Tf = time.time()
321             # And get the remote command status and log files
322             self.get_log_files()
323         
324         return self._has_finished
325           
326     def get_log_files(self):
327         if not self.has_finished():
328             msg = _("Trying to get log files whereas the job is not finished.")
329             self.logger.write(src.printcolors.printcWarning(msg))
330             return
331         
332         tmp_file_path = src.get_tmp_filename(self.config, "list_log_files.txt")
333         self.machine.sftp.get(
334                     os.path.join(self.machine.sat_path, "list_log_files.txt"),
335                     tmp_file_path)
336         
337         fstream_tmp = open(tmp_file_path, "r")
338         file_lines = fstream_tmp.readlines()
339         file_lines = [line.replace("\n", "") for line in file_lines]
340         fstream_tmp.close()
341         os.remove(tmp_file_path)
342         self.res_job = file_lines[0]
343         for job_path_remote in file_lines[1:]:
344             try:
345                 if os.path.basename(os.path.dirname(job_path_remote)) != 'OUT':
346                     local_path = os.path.join(os.path.dirname(
347                                                         self.logger.logFilePath),
348                                               os.path.basename(job_path_remote))
349                     if not os.path.exists(local_path):
350                         self.machine.sftp.get(job_path_remote, local_path)
351                 else:
352                     local_path = os.path.join(os.path.dirname(
353                                                         self.logger.logFilePath),
354                                               'OUT',
355                                               os.path.basename(job_path_remote))
356                     if not os.path.exists(local_path):
357                         self.machine.sftp.get(job_path_remote, local_path)
358                 self.remote_log_files.append(local_path)
359             except:
360                 self.err += _("Unable to get %s log file from remote.") % job_path_remote
361
362     def has_failed(self):
363         '''Returns True if the job has failed. 
364            A job is considered as failed if the machine could not be reached,
365            if the remote command failed, 
366            or if the job finished with a time out.
367         
368         :return: True if the job has failed
369         :rtype: bool
370         '''
371         if not self.has_finished():
372             return False
373         if not self.machine.successfully_connected(self.logger):
374             return True
375         if self.is_timeout():
376             return True
377         if self.res_job == "1":
378             return True
379         return False
380     
381     def cancel(self):
382         """In case of a failing job, one has to cancel every job that depend 
383            on it. This method put the job as failed and will not be executed.
384         """
385         self._has_begun = True
386         self._has_finished = True
387         self.cancelled = True
388         self.out = _("This job was not launched because its father has failed.")
389         self.err = _("This job was not launched because its father has failed.")
390
391     def is_running(self):
392         '''Returns True if the job commands are running 
393         
394         :return: True if the job is running
395         :rtype: bool
396         '''
397         return self.has_begun() and not self.has_finished()
398
399     def is_timeout(self):
400         '''Returns True if the job commands has finished with timeout 
401         
402         :return: True if the job has finished with timeout
403         :rtype: bool
404         '''
405         return self._has_timouted
406
407     def time_elapsed(self):
408         if not self.has_begun():
409             return -1
410         T_now = time.time()
411         return T_now - self._T0
412     
413     def check_time(self):
414         if not self.has_begun():
415             return
416         if self.time_elapsed() > self.timeout:
417             self._has_finished = True
418             self._has_timouted = True
419             self._Tf = time.time()
420             self.get_pids()
421             (out_kill, _) = self.kill_remote_process()
422             self.out = "TIMEOUT \n" + out_kill.read()
423             self.err = "TIMEOUT : %s seconds elapsed\n" % str(self.timeout)
424             try:
425                 self.get_log_files()
426             except:
427                 self.err += _("Unable to get remote log files")
428             
429     def total_duration(self):
430         return self._Tf - self._T0
431         
432     def run(self, logger):
433         if self.has_begun():
434             print("Warn the user that a job can only be launched one time")
435             return
436         
437         if not self.machine.successfully_connected(logger):
438             self._has_finished = True
439             self.out = "N\A"
440             self.err = ("Connection to machine (name : %s, host: %s, port:"
441                         " %s, user: %s) has failed\nUse the log command "
442                         "to get more information."
443                         % (self.machine.name,
444                            self.machine.host,
445                            self.machine.port,
446                            self.machine.user))
447         else:
448             self._T0 = time.time()
449             self._stdin, self._stdout, self._stderr = self.machine.exec_command(
450                                                         self.command, logger)
451             if (self._stdin, self._stdout, self._stderr) == (None, None, None):
452                 self._has_finished = True
453                 self._Tf = time.time()
454                 self.out = "N\A"
455                 self.err = "The server failed to execute the command"
456         
457         self._has_begun = True
458     
459     def write_results(self, logger):
460         logger.write("name : " + self.name + "\n")
461         if self.after:
462             logger.write("after : %s\n" % self.after)
463         logger.write("Time elapsed : %4imin %2is \n" % 
464                      (self.total_duration()/60 , self.total_duration()%60))
465         if self._T0 != -1:
466             logger.write("Begin time : %s\n" % 
467                          time.strftime('%Y-%m-%d %H:%M:%S', 
468                                        time.localtime(self._T0)) )
469         if self._Tf != -1:
470             logger.write("End time   : %s\n\n" % 
471                          time.strftime('%Y-%m-%d %H:%M:%S', 
472                                        time.localtime(self._Tf)) )
473         
474         machine_head = "Informations about connection :\n"
475         underline = (len(machine_head) - 2) * "-"
476         logger.write(src.printcolors.printcInfo(machine_head+underline+"\n"))
477         self.machine.write_info(logger)
478         
479         logger.write(src.printcolors.printcInfo("out : \n"))
480         if self.out is None:
481             logger.write("Unable to get output\n")
482         else:
483             logger.write(self.out + "\n")
484         logger.write(src.printcolors.printcInfo("err : \n"))
485         if self.err is None:
486             logger.write("Unable to get error\n")
487         else:
488             logger.write(self.err + "\n")
489         
490     def get_status(self):
491         if not self.machine.successfully_connected(self.logger):
492             return "SSH connection KO"
493         if not self.has_begun():
494             return "Not launched"
495         if self.cancelled:
496             return "Cancelled"
497         if self.is_running():
498             return "running since " + time.strftime('%Y-%m-%d %H:%M:%S',
499                                                     time.localtime(self._T0))        
500         if self.has_finished():
501             if self.is_timeout():
502                 return "Timeout since " + time.strftime('%Y-%m-%d %H:%M:%S',
503                                                     time.localtime(self._Tf))
504             return "Finished since " + time.strftime('%Y-%m-%d %H:%M:%S',
505                                                      time.localtime(self._Tf))
506     
507 class Jobs(object):
508     '''Class to manage the jobs to be run
509     '''
510     def __init__(self,
511                  runner,
512                  logger,
513                  job_file,
514                  job_file_path,
515                  config_jobs,
516                  lenght_columns = 20):
517         # The jobs configuration
518         self.cfg_jobs = config_jobs
519         self.job_file = job_file
520         self.job_file_path = job_file_path
521         # The machine that will be used today
522         self.lmachines = []
523         # The list of machine (hosts, port) that will be used today 
524         # (a same host can have several machine instances since there 
525         # can be several ssh parameters) 
526         self.lhosts = []
527         # The jobs to be launched today 
528         self.ljobs = []
529         # The jobs that will not be launched today
530         self.ljobs_not_today = []
531         self.runner = runner
532         self.logger = logger
533         # The correlation dictionary between jobs and machines
534         self.dic_job_machine = {} 
535         self.len_columns = lenght_columns
536         
537         # the list of jobs that have not been run yet
538         self._l_jobs_not_started = []
539         # the list of jobs that have already ran 
540         self._l_jobs_finished = []
541         # the list of jobs that are running 
542         self._l_jobs_running = [] 
543                 
544         self.determine_jobs_and_machines()
545     
546     def define_job(self, job_def, machine):
547         '''Takes a pyconf job definition and a machine (from class machine)
548            and returns the job instance corresponding to the definition.
549         
550         :param job_def src.config.Mapping: a job definition 
551         :param machine machine: the machine on which the job will run
552         :return: The corresponding job in a job class instance
553         :rtype: job
554         '''
555         name = job_def.name
556         cmmnds = job_def.commands
557         timeout = job_def.timeout
558         after = None
559         if 'after' in job_def:
560             after = job_def.after
561         application = None
562         if 'application' in job_def:
563             application = job_def.application
564         distribution = None
565         if 'distribution' in job_def:
566             distribution = job_def.distribution
567         table = None
568         if 'table' in job_def:
569             table = job_def.table
570             
571         return Job(name,
572                    machine,
573                    application,
574                    distribution,
575                    table,
576                    cmmnds,
577                    timeout,
578                    self.runner.cfg,
579                    self.logger,
580                    self.job_file,
581                    after = after)
582     
583     def determine_jobs_and_machines(self):
584         '''Function that reads the pyconf jobs definition and instantiates all
585            the machines and jobs to be done today.
586
587         :return: Nothing
588         :rtype: N\A
589         '''
590         today = datetime.date.weekday(datetime.date.today())
591         host_list = []
592                
593         for job_def in self.cfg_jobs.jobs :
594                 
595             if not "machine" in job_def:
596                 msg = _('WARNING: The job "%s" do not have the key '
597                        '"machine", this job is ignored.\n\n' % job_def.name)
598                 self.logger.write(src.printcolors.printcWarning(msg))
599                 continue
600             name_machine = job_def.machine
601             
602             a_machine = None
603             for mach in self.lmachines:
604                 if mach.name == name_machine:
605                     a_machine = mach
606                     break
607             
608             if a_machine == None:
609                 for machine_def in self.cfg_jobs.machines:
610                     if machine_def.name == name_machine:
611                         if 'host' not in machine_def:
612                             host = self.runner.cfg.VARS.hostname
613                         else:
614                             host = machine_def.host
615
616                         if 'user' not in machine_def:
617                             user = self.runner.cfg.VARS.user
618                         else:
619                             user = machine_def.user
620
621                         if 'port' not in machine_def:
622                             port = 22
623                         else:
624                             port = machine_def.port
625             
626                         if 'password' not in machine_def:
627                             passwd = None
628                         else:
629                             passwd = machine_def.password    
630                             
631                         if 'sat_path' not in machine_def:
632                             sat_path = "salomeTools"
633                         else:
634                             sat_path = machine_def.sat_path
635                         
636                         a_machine = Machine(
637                                             machine_def.name,
638                                             host,
639                                             user,
640                                             port=port,
641                                             passwd=passwd,
642                                             sat_path=sat_path
643                                             )
644                         
645                         self.lmachines.append(a_machine)
646                         if (host, port) not in host_list:
647                             host_list.append((host, port))
648                 
649                 if a_machine == None:
650                     msg = _("WARNING: The job \"%(job_name)s\" requires the "
651                             "machine \"%(machine_name)s\" but this machine "
652                             "is not defined in the configuration file.\n"
653                             "The job will not be launched")
654                     self.logger.write(src.printcolors.printcWarning(msg))
655                                   
656             a_job = self.define_job(job_def, a_machine)
657             self.dic_job_machine[a_job] = a_machine
658                 
659             if today in job_def.when:    
660                 self.ljobs.append(a_job)
661             else: # today in job_def.when
662                 self.ljobs_not_today.append(a_job)
663                                      
664         self.lhosts = host_list
665         
666     def ssh_connection_all_machines(self, pad=50):
667         '''Function that do the ssh connection to every machine 
668            to be used today.
669
670         :return: Nothing
671         :rtype: N\A
672         '''
673         self.logger.write(src.printcolors.printcInfo((
674                         "Establishing connection with all the machines :\n")))
675         for machine in self.lmachines:
676             # little algorithm in order to display traces
677             begin_line = (_("Connection to %s: " % machine.name))
678             if pad - len(begin_line) < 0:
679                 endline = " "
680             else:
681                 endline = (pad - len(begin_line)) * "." + " "
682             
683             step = "SSH connection"
684             self.logger.write( begin_line + endline + step)
685             self.logger.flush()
686             # the call to the method that initiate the ssh connection
687             msg = machine.connect(self.logger)
688             
689             # Copy salomeTools to the remote machine
690             if machine.successfully_connected(self.logger):
691                 step = _("Copy SAT")
692                 self.logger.write('\r%s%s%s' % (begin_line, endline, 20 * " "),3)
693                 self.logger.write('\r%s%s%s' % (begin_line, endline, step), 3)
694                 self.logger.flush()
695                 res_copy = machine.copy_sat(self.runner.cfg.VARS.salometoolsway,
696                                             self.job_file_path)
697                 # Print the status of the copy
698                 if res_copy == 0:
699                     self.logger.write('\r%s' % 
700                                 ((len(begin_line)+len(endline)+20) * " "), 3)
701                     self.logger.write('\r%s%s%s' % 
702                         (begin_line, 
703                          endline, 
704                          src.printcolors.printc(src.OK_STATUS)), 3)
705                 else:
706                     self.logger.write('\r%s' % 
707                             ((len(begin_line)+len(endline)+20) * " "), 3)
708                     self.logger.write('\r%s%s%s %s' % 
709                         (begin_line,
710                          endline,
711                          src.printcolors.printc(src.OK_STATUS),
712                          _("Copy of SAT failed")), 3)
713             else:
714                 self.logger.write('\r%s' % 
715                                   ((len(begin_line)+len(endline)+20) * " "), 3)
716                 self.logger.write('\r%s%s%s %s' % 
717                     (begin_line,
718                      endline,
719                      src.printcolors.printc(src.KO_STATUS),
720                      msg), 3)
721             self.logger.write("\n", 3)
722                 
723         self.logger.write("\n")
724         
725
726     def is_occupied(self, hostname):
727         '''Function that returns True if a job is running on 
728            the machine defined by its host and its port.
729         
730         :param hostname (str, int): the pair (host, port)
731         :return: the job that is running on the host, 
732                 or false if there is no job running on the host. 
733         :rtype: job / bool
734         '''
735         host = hostname[0]
736         port = hostname[1]
737         for jb in self.dic_job_machine:
738             if jb.machine.host == host and jb.machine.port == port:
739                 if jb.is_running():
740                     return jb
741         return False
742     
743     def update_jobs_states_list(self):
744         '''Function that updates the lists that store the currently
745            running jobs and the jobs that have already finished.
746         
747         :return: Nothing. 
748         :rtype: N\A
749         '''
750         jobs_finished_list = []
751         jobs_running_list = []
752         for jb in self.dic_job_machine:
753             if jb.is_running():
754                 jobs_running_list.append(jb)
755                 jb.check_time()
756             if jb.has_finished():
757                 jobs_finished_list.append(jb)
758         
759         nb_job_finished_before = len(self._l_jobs_finished)
760         self._l_jobs_finished = jobs_finished_list
761         self._l_jobs_running = jobs_running_list
762         
763         nb_job_finished_now = len(self._l_jobs_finished)
764         
765         return nb_job_finished_now > nb_job_finished_before
766     
767     def cancel_dependencies_of_failing_jobs(self):
768         '''Function that cancels all the jobs that depend on a failing one.
769         
770         :return: Nothing. 
771         :rtype: N\A
772         '''
773         
774         for job in self.ljobs:
775             if job.after is None:
776                 continue
777             father_job = self.find_job_that_has_name(job.after)
778             if father_job.has_failed():
779                 job.cancel()
780     
781     def find_job_that_has_name(self, name):
782         '''Returns the job by its name.
783         
784         :param name str: a job name
785         :return: the job that has the name. 
786         :rtype: job
787         '''
788         for jb in self.ljobs:
789             if jb.name == name:
790                 return jb
791
792         # the following is executed only if the job was not found
793         msg = _('The job "%s" seems to be nonexistent') % name
794         raise src.SatException(msg)
795     
796     def str_of_length(self, text, length):
797         '''Takes a string text of any length and returns 
798            the most close string of length "length".
799         
800         :param text str: any string
801         :param length int: a length for the returned string
802         :return: the most close string of length "length"
803         :rtype: str
804         '''
805         if len(text) > length:
806             text_out = text[:length-3] + '...'
807         else:
808             diff = length - len(text)
809             before = " " * (diff/2)
810             after = " " * (diff/2 + diff%2)
811             text_out = before + text + after
812             
813         return text_out
814     
815     def display_status(self, len_col):
816         '''Takes a lenght and construct the display of the current status 
817            of the jobs in an array that has a column for each host.
818            It displays the job that is currently running on the host 
819            of the column.
820         
821         :param len_col int: the size of the column 
822         :return: Nothing
823         :rtype: N\A
824         '''
825         
826         display_line = ""
827         for host_port in self.lhosts:
828             jb = self.is_occupied(host_port)
829             if not jb: # nothing running on the host
830                 empty = self.str_of_length("empty", len_col)
831                 display_line += "|" + empty 
832             else:
833                 display_line += "|" + src.printcolors.printcInfo(
834                                         self.str_of_length(jb.name, len_col))
835         
836         self.logger.write("\r" + display_line + "|")
837         self.logger.flush()
838     
839
840     def run_jobs(self):
841         '''The main method. Runs all the jobs on every host. 
842            For each host, at a given time, only one job can be running.
843            The jobs that have the field after (that contain the job that has
844            to be run before it) are run after the previous job.
845            This method stops when all the jobs are finished.
846         
847         :return: Nothing
848         :rtype: N\A
849         '''
850
851         # Print header
852         self.logger.write(src.printcolors.printcInfo(
853                                                 _('Executing the jobs :\n')))
854         text_line = ""
855         for host_port in self.lhosts:
856             host = host_port[0]
857             port = host_port[1]
858             if port == 22: # default value
859                 text_line += "|" + self.str_of_length(host, self.len_columns)
860             else:
861                 text_line += "|" + self.str_of_length(
862                                 "("+host+", "+str(port)+")", self.len_columns)
863         
864         tiret_line = " " + "-"*(len(text_line)-1) + "\n"
865         self.logger.write(tiret_line)
866         self.logger.write(text_line + "|\n")
867         self.logger.write(tiret_line)
868         self.logger.flush()
869         
870         # The infinite loop that runs the jobs
871         l_jobs_not_started = self.dic_job_machine.keys()
872         while len(self._l_jobs_finished) != len(self.dic_job_machine.keys()):
873             new_job_start = False
874             for host_port in self.lhosts:
875                 
876                 if self.is_occupied(host_port):
877                     continue
878              
879                 for jb in l_jobs_not_started:
880                     if (jb.machine.host, jb.machine.port) != host_port:
881                         continue 
882                     if jb.after == None:
883                         jb.run(self.logger)
884                         l_jobs_not_started.remove(jb)
885                         new_job_start = True
886                         break
887                     else:
888                         jb_before = self.find_job_that_has_name(jb.after) 
889                         if jb_before.has_finished():
890                             jb.run(self.logger)
891                             l_jobs_not_started.remove(jb)
892                             new_job_start = True
893                             break
894             self.cancel_dependencies_of_failing_jobs()
895             new_job_finished = self.update_jobs_states_list()
896             
897             if new_job_start or new_job_finished:
898                 self.gui.update_xml_files(self.ljobs)            
899                 # Display the current status     
900                 self.display_status(self.len_columns)
901             
902             # Make sure that the proc is not entirely busy
903             time.sleep(0.001)
904         
905         self.logger.write("\n")    
906         self.logger.write(tiret_line)                   
907         self.logger.write("\n\n")
908         
909         self.gui.update_xml_files(self.ljobs)
910         self.gui.last_update()
911
912     def write_all_results(self):
913         '''Display all the jobs outputs.
914         
915         :return: Nothing
916         :rtype: N\A
917         '''
918         
919         for jb in self.dic_job_machine.keys():
920             self.logger.write(src.printcolors.printcLabel(
921                         "#------- Results for job %s -------#\n" % jb.name))
922             jb.write_results(self.logger)
923             self.logger.write("\n\n")
924
925 class Gui(object):
926     '''Class to manage the the xml data that can be displayed in a browser to
927        see the jobs states
928     '''
929    
930     def __init__(self, xml_dir_path, l_jobs, l_jobs_not_today):
931         '''Initialization
932         
933         :param xml_dir_path str: The path to the directory where to put 
934                                  the xml resulting files
935         :param l_jobs List: the list of jobs that run today
936         :param l_jobs_not_today List: the list of jobs that do not run today
937         '''
938         # The path of the global xml file
939         self.xml_dir_path = xml_dir_path
940         # Initialize the xml files
941         xml_global_path = os.path.join(self.xml_dir_path, "global_report.xml")
942         self.xml_global_file = src.xmlManager.XmlLogFile(xml_global_path,
943                                                          "JobsReport")
944         # The xml files that corresponds to the tables.
945         # {name_table : xml_object}}
946         self.d_xml_table_files = {}
947         # Create the lines and columns
948         self.initialize_arrays(l_jobs, l_jobs_not_today)
949         # Write the xml file
950         self.update_xml_files(l_jobs)
951     
952     def initialize_arrays(self, l_jobs, l_jobs_not_today):
953         '''Get all the first information needed for each file and write the 
954            first version of the files   
955         :param l_jobs List: the list of jobs that run today
956         :param l_jobs_not_today List: the list of jobs that do not run today
957         '''
958         # Get the tables to fill and put it in a dictionary
959         # {table_name : xml instance corresponding to the table}
960         for job in l_jobs + l_jobs_not_today:
961             table = job.table
962             if (table is not None and 
963                     table not in self.d_xml_table_files.keys()):
964                 xml_table_path = os.path.join(self.xml_dir_path, table + ".xml")
965                 self.d_xml_table_files[table] =  src.xmlManager.XmlLogFile(
966                                                             xml_table_path,
967                                                             "JobsReport")
968                 self.d_xml_table_files[table].add_simple_node("distributions")
969                 self.d_xml_table_files[table].add_simple_node("applications")
970                 self.d_xml_table_files[table].add_simple_node("table", text=table)
971         
972         # Loop over all jobs in order to get the lines and columns for each 
973         # xml file
974         d_dist = {}
975         d_application = {}
976         for table in self.d_xml_table_files:
977             d_dist[table] = []
978             d_application[table] = []
979             
980         l_hosts_ports = []
981             
982         for job in l_jobs + l_jobs_not_today:
983             
984             if (job.machine.host, job.machine.port) not in l_hosts_ports:
985                 l_hosts_ports.append((job.machine.host, job.machine.port))
986                 
987             distrib = job.distribution
988             application = job.application
989             
990             table_job = job.table
991             if table is None:
992                 continue
993             for table in self.d_xml_table_files:
994                 if table_job == table:
995                     if distrib is not None and distrib not in d_dist[table]:
996                         d_dist[table].append(distrib)
997                         src.xmlManager.add_simple_node(
998                             self.d_xml_table_files[table].xmlroot.find('distributions'),
999                                                    "dist",
1000                                                    attrib={"name" : distrib})
1001                     
1002                 if table_job == table:
1003                     if application is not None and application not in d_application[table]:
1004                         d_application[table].append(application)
1005                         src.xmlManager.add_simple_node(self.d_xml_table_files[table].xmlroot.find('applications'),
1006                                                    "application",
1007                                                    attrib={"name" : application})
1008
1009         # Initialize the hosts_ports node for the global file
1010         self.xmlhosts_ports = self.xml_global_file.add_simple_node("hosts_ports")
1011         for host, port in l_hosts_ports:
1012             host_port = "%s:%i" % (host, port)
1013             src.xmlManager.add_simple_node(self.xmlhosts_ports,
1014                                            "host_port",
1015                                            attrib={"name" : host_port})
1016         
1017         # Initialize the jobs node in all files
1018         for xml_file in [self.xml_global_file] + self.d_xml_table_files.values():
1019             xml_jobs = xml_file.add_simple_node("jobs")      
1020             # Get the jobs present in the config file but that will not be launched
1021             # today
1022             self.put_jobs_not_today(l_jobs_not_today, xml_jobs)
1023             
1024             xml_file.add_simple_node("infos", attrib={"name" : "last update", "JobsCommandStatus" : "running"})
1025
1026     
1027     def put_jobs_not_today(self, l_jobs_not_today, xml_node_jobs):
1028         '''Get all the first information needed for each file and write the 
1029            first version of the files   
1030
1031         :param xml_node_jobs etree.Element: the node corresponding to a job
1032         :param l_jobs_not_today List: the list of jobs that do not run today
1033         '''
1034         for job in l_jobs_not_today:
1035             xmlj = src.xmlManager.add_simple_node(xml_node_jobs,
1036                                                  "job",
1037                                                  attrib={"name" : job.name})
1038             src.xmlManager.add_simple_node(xmlj, "application", job.application)
1039             src.xmlManager.add_simple_node(xmlj,
1040                                            "distribution",
1041                                            job.distribution)
1042             src.xmlManager.add_simple_node(xmlj, "table", job.table)
1043             src.xmlManager.add_simple_node(xmlj,
1044                                        "commands", " ; ".join(job.commands))
1045             src.xmlManager.add_simple_node(xmlj, "state", "Not today")
1046             src.xmlManager.add_simple_node(xmlj, "machine", job.machine.name)
1047             src.xmlManager.add_simple_node(xmlj, "host", job.machine.host)
1048             src.xmlManager.add_simple_node(xmlj, "port", str(job.machine.port))
1049             src.xmlManager.add_simple_node(xmlj, "user", job.machine.user)
1050             src.xmlManager.add_simple_node(xmlj, "sat_path",
1051                                                         job.machine.sat_path)
1052     
1053     def update_xml_files(self, l_jobs):
1054         '''Write all the xml files with updated information about the jobs   
1055
1056         :param l_jobs List: the list of jobs that run today
1057         '''
1058         for xml_file in [self.xml_global_file] + self.d_xml_table_files.values():
1059             self.update_xml_file(l_jobs, xml_file)
1060             
1061         # Write the file
1062         self.write_xml_files()
1063             
1064     def update_xml_file(self, l_jobs, xml_file):      
1065         '''update information about the jobs for the file xml_file   
1066
1067         :param l_jobs List: the list of jobs that run today
1068         :param xml_file xmlManager.XmlLogFile: the xml instance to update
1069         '''
1070         
1071         xml_node_jobs = xml_file.xmlroot.find('jobs')
1072         # Update the job names and status node
1073         for job in l_jobs:
1074             # Find the node corresponding to the job and delete it
1075             # in order to recreate it
1076             for xmljob in xml_node_jobs.findall('job'):
1077                 if xmljob.attrib['name'] == job.name:
1078                     xml_node_jobs.remove(xmljob)
1079             
1080             T0 = str(job._T0)
1081             if T0 != "-1":
1082                 T0 = time.strftime('%Y-%m-%d %H:%M:%S', 
1083                                        time.localtime(job._T0))
1084             Tf = str(job._Tf)
1085             if Tf != "-1":
1086                 Tf = time.strftime('%Y-%m-%d %H:%M:%S', 
1087                                        time.localtime(job._Tf))
1088             
1089             # recreate the job node
1090             xmlj = src.xmlManager.add_simple_node(xml_node_jobs,
1091                                                   "job",
1092                                                   attrib={"name" : job.name})
1093             src.xmlManager.add_simple_node(xmlj, "machine", job.machine.name)
1094             src.xmlManager.add_simple_node(xmlj, "host", job.machine.host)
1095             src.xmlManager.add_simple_node(xmlj, "port", str(job.machine.port))
1096             src.xmlManager.add_simple_node(xmlj, "user", job.machine.user)
1097             src.xmlManager.add_simple_node(xmlj, "sat_path",
1098                                            job.machine.sat_path)
1099             src.xmlManager.add_simple_node(xmlj, "application", job.application)
1100             src.xmlManager.add_simple_node(xmlj, "distribution",
1101                                            job.distribution)
1102             src.xmlManager.add_simple_node(xmlj, "table", job.table)
1103             src.xmlManager.add_simple_node(xmlj, "timeout", str(job.timeout))
1104             src.xmlManager.add_simple_node(xmlj, "commands",
1105                                            " ; ".join(job.commands))
1106             src.xmlManager.add_simple_node(xmlj, "state", job.get_status())
1107             src.xmlManager.add_simple_node(xmlj, "begin", T0)
1108             src.xmlManager.add_simple_node(xmlj, "end", Tf)
1109             src.xmlManager.add_simple_node(xmlj, "out",
1110                                            src.printcolors.cleancolor(job.out))
1111             src.xmlManager.add_simple_node(xmlj, "err",
1112                                            src.printcolors.cleancolor(job.err))
1113             src.xmlManager.add_simple_node(xmlj, "res", str(job.res_job))
1114             if len(job.remote_log_files) > 0:
1115                 src.xmlManager.add_simple_node(xmlj,
1116                                                "remote_log_file_path",
1117                                                job.remote_log_files[0])
1118             else:
1119                 src.xmlManager.add_simple_node(xmlj,
1120                                                "remote_log_file_path",
1121                                                "nothing")           
1122             
1123             xmlafter = src.xmlManager.add_simple_node(xmlj, "after", job.after)
1124             # get the job father
1125             if job.after is not None:
1126                 job_father = None
1127                 for jb in l_jobs:
1128                     if jb.name == job.after:
1129                         job_father = jb
1130                 if job_father is None:
1131                     msg = _("The job %(father_name)s that is parent of "
1132                             "%(son_name)s is not in the job list." %
1133                             {"father_name" : job.after , "son_name" : job.name})
1134                     raise src.SatException(msg)
1135                 
1136                 if len(job_father.remote_log_files) > 0:
1137                     link = job_father.remote_log_files[0]
1138                 else:
1139                     link = "nothing"
1140                 src.xmlManager.append_node_attrib(xmlafter, {"link" : link})
1141             
1142         
1143         # Update the date
1144         xml_node_infos = xml_file.xmlroot.find('infos')
1145         src.xmlManager.append_node_attrib(xml_node_infos,
1146                     attrib={"value" : 
1147                     datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")})
1148                
1149
1150     
1151     def last_update(self, finish_status = "finished"):
1152         '''update information about the jobs for the file xml_file   
1153
1154         :param l_jobs List: the list of jobs that run today
1155         :param xml_file xmlManager.XmlLogFile: the xml instance to update
1156         '''
1157         for xml_file in [self.xml_global_file] + self.d_xml_table_files.values():
1158             xml_node_infos = xml_file.xmlroot.find('infos')
1159             src.xmlManager.append_node_attrib(xml_node_infos,
1160                         attrib={"JobsCommandStatus" : finish_status})
1161         # Write the file
1162         self.write_xml_files()
1163     
1164     def write_xml_files(self):
1165         ''' Write the xml files   
1166         '''
1167         self.xml_global_file.write_tree(STYLESHEET_GLOBAL)
1168         for xml_file in self.d_xml_table_files.values():
1169             xml_file.write_tree(STYLESHEET_TABLE)
1170         
1171 ##
1172 # Describes the command
1173 def description():
1174     return _("The jobs command launches maintenances that are described"
1175              " in the dedicated jobs configuration file.")
1176
1177 ##
1178 # Runs the command.
1179 def run(args, runner, logger):
1180        
1181     (options, args) = parser.parse_args(args)
1182        
1183     jobs_cfg_files_dir = runner.cfg.SITE.jobs.config_path
1184     
1185     l_cfg_dir = [jobs_cfg_files_dir,
1186                  os.path.join(runner.cfg.VARS.datadir, "jobs")]
1187     
1188     # Make sure the path to the jobs config files directory exists 
1189     src.ensure_path_exists(jobs_cfg_files_dir)   
1190     
1191     # list option : display all the available config files
1192     if options.list:
1193         for cfg_dir in l_cfg_dir:
1194             if not options.no_label:
1195                 logger.write("------ %s\n" % 
1196                                  src.printcolors.printcHeader(cfg_dir))
1197     
1198             for f in sorted(os.listdir(cfg_dir)):
1199                 if not f.endswith('.pyconf'):
1200                     continue
1201                 cfilename = f[:-7]
1202                 logger.write("%s\n" % cfilename)
1203         return 0
1204
1205     # Make sure the jobs_config option has been called
1206     if not options.jobs_cfg:
1207         message = _("The option --jobs_config is required\n")      
1208         raise src.SatException( message )
1209     
1210     # Find the file in the directories
1211     found = False
1212     for cfg_dir in l_cfg_dir:
1213         file_jobs_cfg = os.path.join(cfg_dir, options.jobs_cfg)
1214         if not file_jobs_cfg.endswith('.pyconf'):
1215             file_jobs_cfg += '.pyconf'
1216         
1217         if not os.path.exists(file_jobs_cfg):
1218             continue
1219         else:
1220             found = True
1221             break
1222     
1223     if not found:
1224         msg = _("The file configuration %(name_file)s was not found."
1225                 "\nUse the --list option to get the possible files.")
1226         src.printcolors.printcError(msg)
1227         return 1
1228     
1229     info = [
1230         (_("Platform"), runner.cfg.VARS.dist),
1231         (_("File containing the jobs configuration"), file_jobs_cfg)
1232     ]    
1233     src.print_info(logger, info)
1234     
1235     # Read the config that is in the file
1236     config_jobs = src.read_config_from_a_file(file_jobs_cfg)
1237     if options.only_jobs:
1238         l_jb = src.pyconf.Sequence()
1239         for jb in config_jobs.jobs:
1240             if jb.name in options.only_jobs:
1241                 l_jb.append(jb,
1242                 "Adding a job that was given in only_jobs option parameters")
1243         config_jobs.jobs = l_jb
1244               
1245     # Initialization
1246     today_jobs = Jobs(runner,
1247                       logger,
1248                       options.jobs_cfg,
1249                       file_jobs_cfg,
1250                       config_jobs)
1251     # SSH connection to all machines
1252     today_jobs.ssh_connection_all_machines()
1253     if options.test_connection:
1254         return 0
1255     
1256     gui = None
1257     if options.publish:
1258         gui = Gui("/export/home/serioja/LOGS",
1259                   today_jobs.ljobs,
1260                   today_jobs.ljobs_not_today,)
1261     
1262     today_jobs.gui = gui
1263     
1264     interruped = False
1265     try:
1266         # Run all the jobs contained in config_jobs
1267         today_jobs.run_jobs()
1268     except KeyboardInterrupt:
1269         interruped = True
1270         logger.write("\n\n%s\n\n" % 
1271                 (src.printcolors.printcWarning(_("Forced interruption"))), 1)
1272         
1273     finally:
1274         # find the potential not finished jobs and kill them
1275         for jb in today_jobs.ljobs:
1276             if not jb.has_finished():
1277                 jb.kill_remote_process()
1278         if interruped:
1279             today_jobs.gui.last_update(_("Forced interruption"))
1280         else:
1281             today_jobs.gui.last_update()
1282         # Output the results
1283         today_jobs.write_all_results()