Salome HOME
Add missing commentaries
[tools/sat.git] / commands / jobs.py
1 #!/usr/bin/env python
2 #-*- coding:utf-8 -*-
3 #  Copyright (C) 2010-2013  CEA/DEN
4 #
5 #  This library is free software; you can redistribute it and/or
6 #  modify it under the terms of the GNU Lesser General Public
7 #  License as published by the Free Software Foundation; either
8 #  version 2.1 of the License.
9 #
10 #  This library is distributed in the hope that it will be useful,
11 #  but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 #  Lesser General Public License for more details.
14 #
15 #  You should have received a copy of the GNU Lesser General Public
16 #  License along with this library; if not, write to the Free Software
17 #  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
18
19 import os
20 import datetime
21 import time
22 import paramiko
23
24 import src
25
26 STYLESHEET_GLOBAL = "jobs_global_report.xsl"
27 STYLESHEET_TABLE = "jobs_table_report.xsl"
28
29 parser = src.options.Options()
30
31 parser.add_option('j', 'jobs_config', 'string', 'jobs_cfg', 
32                   _('The name of the config file that contains'
33                   ' the jobs configuration'))
34 parser.add_option('o', 'only_jobs', 'list2', 'only_jobs',
35                   _('The list of jobs to launch, by their name. '))
36 parser.add_option('l', 'list', 'boolean', 'list', 
37                   _('list all available config files.'))
38 parser.add_option('n', 'no_label', 'boolean', 'no_label',
39                   _("do not print labels, Works only with --list."), False)
40 parser.add_option('t', 'test_connection', 'boolean', 'test_connection',
41                   _("Try to connect to the machines. Not executing the jobs."),
42                   False)
43 parser.add_option('p', 'publish', 'boolean', 'publish',
44                   _("Generate an xml file that can be read in a browser to "
45                     "display the jobs status."),
46                   False)
47
48 class Machine(object):
49     '''Class to manage a ssh connection on a machine
50     '''
51     def __init__(self,
52                  name,
53                  host,
54                  user,
55                  port=22,
56                  passwd=None,
57                  sat_path="salomeTools"):
58         self.name = name
59         self.host = host
60         self.port = port
61         self.user = user
62         self.password = passwd
63         self.sat_path = sat_path
64         self.ssh = paramiko.SSHClient()
65         self._connection_successful = None
66     
67     def connect(self, logger):
68         '''Initiate the ssh connection to the remote machine
69         
70         :param logger src.logger.Logger: The logger instance 
71         :return: Nothing
72         :rtype: N\A
73         '''
74
75         self._connection_successful = False
76         self.ssh.load_system_host_keys()
77         self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
78         try:
79             self.ssh.connect(self.host,
80                              port=self.port,
81                              username=self.user,
82                              password = self.password)
83         except paramiko.AuthenticationException:
84             message = src.KO_STATUS + _("Authentication failed")
85         except paramiko.BadHostKeyException:
86             message = (src.KO_STATUS + 
87                        _("The server's host key could not be verified"))
88         except paramiko.SSHException:
89             message = ( _("SSHException error connecting or "
90                           "establishing an SSH session"))            
91         except:
92             message = ( _("Error connecting or establishing an SSH session"))
93         else:
94             self._connection_successful = True
95             message = ""
96         return message
97     
98     def successfully_connected(self, logger):
99         '''Verify if the connection to the remote machine has succeed
100         
101         :param logger src.logger.Logger: The logger instance 
102         :return: True if the connection has succeed, False if not
103         :rtype: bool
104         '''
105         if self._connection_successful == None:
106             message = _("Warning : trying to ask if the connection to "
107             "(name: %s host: %s, port: %s, user: %s) is OK whereas there were"
108             " no connection request" % 
109                         (self.name, self.host, self.port, self.user))
110             logger.write( src.printcolors.printcWarning(message))
111         return self._connection_successful
112
113     def copy_sat(self, sat_local_path, job_file):
114         '''Copy salomeTools to the remote machine in self.sat_path
115         '''
116         res = 0
117         try:
118             self.sftp = self.ssh.open_sftp()
119             self.mkdir(self.sat_path, ignore_existing=True)
120             self.put_dir(sat_local_path, self.sat_path, filters = ['.git'])
121             job_file_name = os.path.basename(job_file)
122             self.sftp.put(job_file, os.path.join(self.sat_path,
123                                                  "data",
124                                                  "jobs",
125                                                  job_file_name))
126         except Exception as e:
127             res = str(e)
128             self._connection_successful = False
129         
130         return res
131         
132     def put_dir(self, source, target, filters = []):
133         ''' Uploads the contents of the source directory to the target path. The
134             target directory needs to exists. All subdirectories in source are 
135             created under target.
136         '''
137         for item in os.listdir(source):
138             if item in filters:
139                 continue
140             source_path = os.path.join(source, item)
141             destination_path = os.path.join(target, item)
142             if os.path.islink(source_path):
143                 linkto = os.readlink(source_path)
144                 try:
145                     self.sftp.symlink(linkto, destination_path)
146                     self.sftp.chmod(destination_path,
147                                     os.stat(source_path).st_mode)
148                 except IOError:
149                     pass
150             else:
151                 if os.path.isfile(source_path):
152                     self.sftp.put(source_path, destination_path)
153                     self.sftp.chmod(destination_path,
154                                     os.stat(source_path).st_mode)
155                 else:
156                     self.mkdir(destination_path, ignore_existing=True)
157                     self.put_dir(source_path, destination_path)
158
159     def mkdir(self, path, mode=511, ignore_existing=False):
160         ''' Augments mkdir by adding an option to not fail 
161             if the folder exists 
162         '''
163         try:
164             self.sftp.mkdir(path, mode)
165         except IOError:
166             if ignore_existing:
167                 pass
168             else:
169                 raise       
170     
171     def exec_command(self, command, logger):
172         '''Execute the command on the remote machine
173         
174         :param command str: The command to be run
175         :param logger src.logger.Logger: The logger instance 
176         :return: the stdin, stdout, and stderr of the executing command,
177                  as a 3-tuple
178         :rtype: (paramiko.channel.ChannelFile, paramiko.channel.ChannelFile,
179                 paramiko.channel.ChannelFile)
180         '''
181         try:        
182             # Does not wait the end of the command
183             (stdin, stdout, stderr) = self.ssh.exec_command(command)
184         except paramiko.SSHException:
185             message = src.KO_STATUS + _(
186                             ": the server failed to execute the command\n")
187             logger.write( src.printcolors.printcError(message))
188             return (None, None, None)
189         except:
190             logger.write( src.printcolors.printcError(src.KO_STATUS + '\n'))
191             return (None, None, None)
192         else:
193             return (stdin, stdout, stderr)
194
195     def close(self):
196         '''Close the ssh connection
197         
198         :rtype: N\A
199         '''
200         self.ssh.close()
201      
202     def write_info(self, logger):
203         '''Prints the informations relative to the machine in the logger 
204            (terminal traces and log file)
205         
206         :param logger src.logger.Logger: The logger instance
207         :return: Nothing
208         :rtype: N\A
209         '''
210         logger.write("host : " + self.host + "\n")
211         logger.write("port : " + str(self.port) + "\n")
212         logger.write("user : " + str(self.user) + "\n")
213         if self.successfully_connected(logger):
214             status = src.OK_STATUS
215         else:
216             status = src.KO_STATUS
217         logger.write("Connection : " + status + "\n\n") 
218
219
220 class Job(object):
221     '''Class to manage one job
222     '''
223     def __init__(self, name, machine, application, distribution, table, 
224                  commands, timeout, config, logger, job_file, after=None):
225
226         self.name = name
227         self.machine = machine
228         self.after = after
229         self.timeout = timeout
230         self.application = application
231         self.distribution = distribution
232         self.table = table
233         self.config = config
234         self.logger = logger
235         # The list of log files to download from the remote machine 
236         self.remote_log_files = []
237         
238         # The remote command status
239         # -1 means that it has not been launched, 
240         # 0 means success and 1 means fail
241         self.res_job = "-1"
242         self.cancelled = False
243         
244         self._T0 = -1
245         self._Tf = -1
246         self._has_begun = False
247         self._has_finished = False
248         self._has_timouted = False
249         self._stdin = None # Store the command inputs field
250         self._stdout = None # Store the command outputs field
251         self._stderr = None # Store the command errors field
252
253         self.out = None # Contains something only if the job is finished
254         self.err = None # Contains something only if the job is finished    
255                
256         self.commands = commands
257         self.command = (os.path.join(self.machine.sat_path, "sat") +
258                         " -l " +
259                         os.path.join(self.machine.sat_path,
260                                      "list_log_files.txt") +
261                         " job --jobs_config " +
262                         job_file +
263                         " --job " +
264                         self.name)
265     
266     def get_pids(self):
267         """ Get the pid(s) corresponding to the command that have been launched
268             On the remote machine
269         
270         :return: The list of integers corresponding to the found pids
271         :rtype: List
272         """
273         pids = []
274         cmd_pid = 'ps aux | grep "' + self.command + '" | awk \'{print $2}\''
275         (_, out_pid, _) = self.machine.exec_command(cmd_pid, self.logger)
276         pids_cmd = out_pid.readlines()
277         pids_cmd = [str(src.only_numbers(pid)) for pid in pids_cmd]
278         pids+=pids_cmd
279         return pids
280     
281     def kill_remote_process(self, wait=1):
282         '''Kills the process on the remote machine.
283         
284         :return: (the output of the kill, the error of the kill)
285         :rtype: (str, str)
286         '''
287         
288         pids = self.get_pids()
289         cmd_kill = " ; ".join([("kill -2 " + pid) for pid in pids])
290         (_, out_kill, err_kill) = self.machine.exec_command(cmd_kill, 
291                                                             self.logger)
292         time.sleep(wait)
293         return (out_kill, err_kill)
294             
295     def has_begun(self):
296         '''Returns True if the job has already begun
297         
298         :return: True if the job has already begun
299         :rtype: bool
300         '''
301         return self._has_begun
302     
303     def has_finished(self):
304         '''Returns True if the job has already finished 
305            (i.e. all the commands have been executed)
306            If it is finished, the outputs are stored in the fields out and err.
307         
308         :return: True if the job has already finished
309         :rtype: bool
310         '''
311         
312         # If the method has already been called and returned True
313         if self._has_finished:
314             return True
315         
316         # If the job has not begun yet
317         if not self.has_begun():
318             return False
319         
320         if self._stdout.channel.closed:
321             self._has_finished = True
322             # Store the result outputs
323             self.out = self._stdout.read()
324             self.err = self._stderr.read()
325             # Put end time
326             self._Tf = time.time()
327             # And get the remote command status and log files
328             self.get_log_files()
329         
330         return self._has_finished
331           
332     def get_log_files(self):
333         """Get the log files produced by the command launched 
334            on the remote machine.
335         """
336         # Do not get the files if the command is not finished
337         if not self.has_finished():
338             msg = _("Trying to get log files whereas the job is not finished.")
339             self.logger.write(src.printcolors.printcWarning(msg))
340             return
341         
342         # First get the file that contains the list of log files to get
343         tmp_file_path = src.get_tmp_filename(self.config, "list_log_files.txt")
344         self.machine.sftp.get(
345                     os.path.join(self.machine.sat_path, "list_log_files.txt"),
346                     tmp_file_path)
347         
348         # Read the file and get the result of the command and all the log files
349         # to get
350         fstream_tmp = open(tmp_file_path, "r")
351         file_lines = fstream_tmp.readlines()
352         file_lines = [line.replace("\n", "") for line in file_lines]
353         fstream_tmp.close()
354         os.remove(tmp_file_path)
355         # The first line is the result of the command (0 success or 1 fail)
356         self.res_job = file_lines[0]
357         for job_path_remote in file_lines[1:]:
358             try:
359                 # For each command, there is two files to get :
360                 # 1- The xml file describing the command and giving the 
361                 # internal traces.
362                 # 2- The txt file containing the system command traces (like 
363                 # traces produced by the "make" command)
364                 if os.path.basename(os.path.dirname(job_path_remote)) != 'OUT':
365                     # Case 1-
366                     local_path = os.path.join(os.path.dirname(
367                                                         self.logger.logFilePath),
368                                               os.path.basename(job_path_remote))
369                 else:
370                     # Case 2-
371                     local_path = os.path.join(os.path.dirname(
372                                                         self.logger.logFilePath),
373                                               'OUT',
374                                               os.path.basename(job_path_remote))
375                 # Get the file
376                 if not os.path.exists(local_path):
377                     self.machine.sftp.get(job_path_remote, local_path)
378                 self.remote_log_files.append(local_path)
379             except Exception as e:
380                 self.err += _("Unable to get %s log file from remote: %s" % 
381                                                     (job_path_remote, str(e)))
382
383     def has_failed(self):
384         '''Returns True if the job has failed. 
385            A job is considered as failed if the machine could not be reached,
386            if the remote command failed, 
387            or if the job finished with a time out.
388         
389         :return: True if the job has failed
390         :rtype: bool
391         '''
392         if not self.has_finished():
393             return False
394         if not self.machine.successfully_connected(self.logger):
395             return True
396         if self.is_timeout():
397             return True
398         if self.res_job == "1":
399             return True
400         return False
401     
402     def cancel(self):
403         """In case of a failing job, one has to cancel every job that depend 
404            on it. This method put the job as failed and will not be executed.
405         """
406         self._has_begun = True
407         self._has_finished = True
408         self.cancelled = True
409         self.out = _("This job was not launched because its father has failed.")
410         self.err = _("This job was not launched because its father has failed.")
411
412     def is_running(self):
413         '''Returns True if the job commands are running 
414         
415         :return: True if the job is running
416         :rtype: bool
417         '''
418         return self.has_begun() and not self.has_finished()
419
420     def is_timeout(self):
421         '''Returns True if the job commands has finished with timeout 
422         
423         :return: True if the job has finished with timeout
424         :rtype: bool
425         '''
426         return self._has_timouted
427
428     def time_elapsed(self):
429         """Get the time elapsed since the job launching
430         
431         :return: The number of seconds
432         :rtype: int
433         """
434         if not self.has_begun():
435             return -1
436         T_now = time.time()
437         return T_now - self._T0
438     
439     def check_time(self):
440         """Verify that the job has not exceeded its timeout.
441            If it has, kill the remote command and consider the job as finished.
442         """
443         if not self.has_begun():
444             return
445         if self.time_elapsed() > self.timeout:
446             self._has_finished = True
447             self._has_timouted = True
448             self._Tf = time.time()
449             self.get_pids()
450             (out_kill, _) = self.kill_remote_process()
451             self.out = "TIMEOUT \n" + out_kill.read()
452             self.err = "TIMEOUT : %s seconds elapsed\n" % str(self.timeout)
453             try:
454                 self.get_log_files()
455             except Exception as e:
456                 self.err += _("Unable to get remote log files: %s" % e)
457             
458     def total_duration(self):
459         """Give the total duration of the job
460         
461         :return: the total duration of the job in seconds
462         :rtype: int
463         """
464         return self._Tf - self._T0
465         
466     def run(self):
467         """Launch the job by executing the remote command.
468         """
469         
470         # Prevent multiple run
471         if self.has_begun():
472             msg = _("Warning: A job can only be launched one time")
473             msg2 = _("Trying to launch the job \"%s\" whereas it has "
474                      "already been launched." % self.name)
475             self.logger.write(src.printcolors.printcWarning("%s\n%s\n" % (msg, msg2)))
476             return
477         
478         # Do not execute the command if the machine could not be reached
479         if not self.machine.successfully_connected(self.logger):
480             self._has_finished = True
481             self.out = "N\A"
482             self.err = ("Connection to machine (name : %s, host: %s, port:"
483                         " %s, user: %s) has failed\nUse the log command "
484                         "to get more information."
485                         % (self.machine.name,
486                            self.machine.host,
487                            self.machine.port,
488                            self.machine.user))
489         else:
490             # Usual case : Launch the command on remote machine
491             self._T0 = time.time()
492             self._stdin, self._stdout, self._stderr = self.machine.exec_command(
493                                                         self.command, self.logger)
494             # If the results are not initialized, finish the job
495             if (self._stdin, self._stdout, self._stderr) == (None, None, None):
496                 self._has_finished = True
497                 self._Tf = time.time()
498                 self.out = "N\A"
499                 self.err = "The server failed to execute the command"
500         
501         # Put the beginning flag to true.
502         self._has_begun = True
503     
504     def write_results(self):
505         """Display on the terminal all the job's information
506         """
507         self.logger.write("name : " + self.name + "\n")
508         if self.after:
509             self.logger.write("after : %s\n" % self.after)
510         self.logger.write("Time elapsed : %4imin %2is \n" % 
511                      (self.total_duration()/60 , self.total_duration()%60))
512         if self._T0 != -1:
513             self.logger.write("Begin time : %s\n" % 
514                          time.strftime('%Y-%m-%d %H:%M:%S', 
515                                        time.localtime(self._T0)) )
516         if self._Tf != -1:
517             self.logger.write("End time   : %s\n\n" % 
518                          time.strftime('%Y-%m-%d %H:%M:%S', 
519                                        time.localtime(self._Tf)) )
520         
521         machine_head = "Informations about connection :\n"
522         underline = (len(machine_head) - 2) * "-"
523         self.logger.write(src.printcolors.printcInfo(
524                                                 machine_head+underline+"\n"))
525         self.machine.write_info(self.logger)
526         
527         self.logger.write(src.printcolors.printcInfo("out : \n"))
528         if self.out is None:
529             self.logger.write("Unable to get output\n")
530         else:
531             self.logger.write(self.out + "\n")
532         self.logger.write(src.printcolors.printcInfo("err : \n"))
533         if self.err is None:
534             self.logger.write("Unable to get error\n")
535         else:
536             self.logger.write(self.err + "\n")
537         
538     def get_status(self):
539         """Get the status of the job (used by the Gui for xml display)
540         
541         :return: The current status of the job
542         :rtype: String
543         """
544         if not self.machine.successfully_connected(self.logger):
545             return "SSH connection KO"
546         if not self.has_begun():
547             return "Not launched"
548         if self.cancelled:
549             return "Cancelled"
550         if self.is_running():
551             return "running since " + time.strftime('%Y-%m-%d %H:%M:%S',
552                                                     time.localtime(self._T0))        
553         if self.has_finished():
554             if self.is_timeout():
555                 return "Timeout since " + time.strftime('%Y-%m-%d %H:%M:%S',
556                                                     time.localtime(self._Tf))
557             return "Finished since " + time.strftime('%Y-%m-%d %H:%M:%S',
558                                                      time.localtime(self._Tf))
559     
560 class Jobs(object):
561     '''Class to manage the jobs to be run
562     '''
563     def __init__(self,
564                  runner,
565                  logger,
566                  job_file,
567                  job_file_path,
568                  config_jobs,
569                  lenght_columns = 20):
570         # The jobs configuration
571         self.cfg_jobs = config_jobs
572         self.job_file = job_file
573         self.job_file_path = job_file_path
574         # The machine that will be used today
575         self.lmachines = []
576         # The list of machine (hosts, port) that will be used today 
577         # (a same host can have several machine instances since there 
578         # can be several ssh parameters) 
579         self.lhosts = []
580         # The jobs to be launched today 
581         self.ljobs = []
582         # The jobs that will not be launched today
583         self.ljobs_not_today = []
584         self.runner = runner
585         self.logger = logger
586         # The correlation dictionary between jobs and machines
587         self.dic_job_machine = {} 
588         self.len_columns = lenght_columns
589         
590         # the list of jobs that have not been run yet
591         self._l_jobs_not_started = []
592         # the list of jobs that have already ran 
593         self._l_jobs_finished = []
594         # the list of jobs that are running 
595         self._l_jobs_running = [] 
596                 
597         self.determine_jobs_and_machines()
598     
599     def define_job(self, job_def, machine):
600         '''Takes a pyconf job definition and a machine (from class machine)
601            and returns the job instance corresponding to the definition.
602         
603         :param job_def src.config.Mapping: a job definition 
604         :param machine machine: the machine on which the job will run
605         :return: The corresponding job in a job class instance
606         :rtype: job
607         '''
608         name = job_def.name
609         cmmnds = job_def.commands
610         timeout = job_def.timeout
611         after = None
612         if 'after' in job_def:
613             after = job_def.after
614         application = None
615         if 'application' in job_def:
616             application = job_def.application
617         distribution = None
618         if 'distribution' in job_def:
619             distribution = job_def.distribution
620         table = None
621         if 'table' in job_def:
622             table = job_def.table
623             
624         return Job(name,
625                    machine,
626                    application,
627                    distribution,
628                    table,
629                    cmmnds,
630                    timeout,
631                    self.runner.cfg,
632                    self.logger,
633                    self.job_file,
634                    after = after)
635     
636     def determine_jobs_and_machines(self):
637         '''Function that reads the pyconf jobs definition and instantiates all
638            the machines and jobs to be done today.
639
640         :return: Nothing
641         :rtype: N\A
642         '''
643         today = datetime.date.weekday(datetime.date.today())
644         host_list = []
645                
646         for job_def in self.cfg_jobs.jobs :
647                 
648             if not "machine" in job_def:
649                 msg = _('WARNING: The job "%s" do not have the key '
650                        '"machine", this job is ignored.\n\n' % job_def.name)
651                 self.logger.write(src.printcolors.printcWarning(msg))
652                 continue
653             name_machine = job_def.machine
654             
655             a_machine = None
656             for mach in self.lmachines:
657                 if mach.name == name_machine:
658                     a_machine = mach
659                     break
660             
661             if a_machine == None:
662                 for machine_def in self.cfg_jobs.machines:
663                     if machine_def.name == name_machine:
664                         if 'host' not in machine_def:
665                             host = self.runner.cfg.VARS.hostname
666                         else:
667                             host = machine_def.host
668
669                         if 'user' not in machine_def:
670                             user = self.runner.cfg.VARS.user
671                         else:
672                             user = machine_def.user
673
674                         if 'port' not in machine_def:
675                             port = 22
676                         else:
677                             port = machine_def.port
678             
679                         if 'password' not in machine_def:
680                             passwd = None
681                         else:
682                             passwd = machine_def.password    
683                             
684                         if 'sat_path' not in machine_def:
685                             sat_path = "salomeTools"
686                         else:
687                             sat_path = machine_def.sat_path
688                         
689                         a_machine = Machine(
690                                             machine_def.name,
691                                             host,
692                                             user,
693                                             port=port,
694                                             passwd=passwd,
695                                             sat_path=sat_path
696                                             )
697                         
698                         self.lmachines.append(a_machine)
699                         if (host, port) not in host_list:
700                             host_list.append((host, port))
701                 
702                 if a_machine == None:
703                     msg = _("WARNING: The job \"%(job_name)s\" requires the "
704                             "machine \"%(machine_name)s\" but this machine "
705                             "is not defined in the configuration file.\n"
706                             "The job will not be launched")
707                     self.logger.write(src.printcolors.printcWarning(msg))
708                                   
709             a_job = self.define_job(job_def, a_machine)
710             self.dic_job_machine[a_job] = a_machine
711                 
712             if today in job_def.when:    
713                 self.ljobs.append(a_job)
714             else: # today in job_def.when
715                 self.ljobs_not_today.append(a_job)
716                                      
717         self.lhosts = host_list
718         
719     def ssh_connection_all_machines(self, pad=50):
720         '''Function that do the ssh connection to every machine 
721            to be used today.
722
723         :return: Nothing
724         :rtype: N\A
725         '''
726         self.logger.write(src.printcolors.printcInfo((
727                         "Establishing connection with all the machines :\n")))
728         for machine in self.lmachines:
729             # little algorithm in order to display traces
730             begin_line = (_("Connection to %s: " % machine.name))
731             if pad - len(begin_line) < 0:
732                 endline = " "
733             else:
734                 endline = (pad - len(begin_line)) * "." + " "
735             
736             step = "SSH connection"
737             self.logger.write( begin_line + endline + step)
738             self.logger.flush()
739             # the call to the method that initiate the ssh connection
740             msg = machine.connect(self.logger)
741             
742             # Copy salomeTools to the remote machine
743             if machine.successfully_connected(self.logger):
744                 step = _("Copy SAT")
745                 self.logger.write('\r%s%s%s' % (begin_line, endline, 20 * " "),3)
746                 self.logger.write('\r%s%s%s' % (begin_line, endline, step), 3)
747                 self.logger.flush()
748                 res_copy = machine.copy_sat(self.runner.cfg.VARS.salometoolsway,
749                                             self.job_file_path)
750                 # Print the status of the copy
751                 if res_copy == 0:
752                     self.logger.write('\r%s' % 
753                                 ((len(begin_line)+len(endline)+20) * " "), 3)
754                     self.logger.write('\r%s%s%s' % 
755                         (begin_line, 
756                          endline, 
757                          src.printcolors.printc(src.OK_STATUS)), 3)
758                 else:
759                     self.logger.write('\r%s' % 
760                             ((len(begin_line)+len(endline)+20) * " "), 3)
761                     self.logger.write('\r%s%s%s %s' % 
762                         (begin_line,
763                          endline,
764                          src.printcolors.printc(src.OK_STATUS),
765                          _("Copy of SAT failed")), 3)
766             else:
767                 self.logger.write('\r%s' % 
768                                   ((len(begin_line)+len(endline)+20) * " "), 3)
769                 self.logger.write('\r%s%s%s %s' % 
770                     (begin_line,
771                      endline,
772                      src.printcolors.printc(src.KO_STATUS),
773                      msg), 3)
774             self.logger.write("\n", 3)
775                 
776         self.logger.write("\n")
777         
778
779     def is_occupied(self, hostname):
780         '''Function that returns True if a job is running on 
781            the machine defined by its host and its port.
782         
783         :param hostname (str, int): the pair (host, port)
784         :return: the job that is running on the host, 
785                 or false if there is no job running on the host. 
786         :rtype: job / bool
787         '''
788         host = hostname[0]
789         port = hostname[1]
790         for jb in self.dic_job_machine:
791             if jb.machine.host == host and jb.machine.port == port:
792                 if jb.is_running():
793                     return jb
794         return False
795     
796     def update_jobs_states_list(self):
797         '''Function that updates the lists that store the currently
798            running jobs and the jobs that have already finished.
799         
800         :return: Nothing. 
801         :rtype: N\A
802         '''
803         jobs_finished_list = []
804         jobs_running_list = []
805         for jb in self.dic_job_machine:
806             if jb.is_running():
807                 jobs_running_list.append(jb)
808                 jb.check_time()
809             if jb.has_finished():
810                 jobs_finished_list.append(jb)
811         
812         nb_job_finished_before = len(self._l_jobs_finished)
813         self._l_jobs_finished = jobs_finished_list
814         self._l_jobs_running = jobs_running_list
815         
816         nb_job_finished_now = len(self._l_jobs_finished)
817         
818         return nb_job_finished_now > nb_job_finished_before
819     
820     def cancel_dependencies_of_failing_jobs(self):
821         '''Function that cancels all the jobs that depend on a failing one.
822         
823         :return: Nothing. 
824         :rtype: N\A
825         '''
826         
827         for job in self.ljobs:
828             if job.after is None:
829                 continue
830             father_job = self.find_job_that_has_name(job.after)
831             if father_job.has_failed():
832                 job.cancel()
833     
834     def find_job_that_has_name(self, name):
835         '''Returns the job by its name.
836         
837         :param name str: a job name
838         :return: the job that has the name. 
839         :rtype: job
840         '''
841         for jb in self.ljobs:
842             if jb.name == name:
843                 return jb
844
845         # the following is executed only if the job was not found
846         msg = _('The job "%s" seems to be nonexistent') % name
847         raise src.SatException(msg)
848     
849     def str_of_length(self, text, length):
850         '''Takes a string text of any length and returns 
851            the most close string of length "length".
852         
853         :param text str: any string
854         :param length int: a length for the returned string
855         :return: the most close string of length "length"
856         :rtype: str
857         '''
858         if len(text) > length:
859             text_out = text[:length-3] + '...'
860         else:
861             diff = length - len(text)
862             before = " " * (diff/2)
863             after = " " * (diff/2 + diff%2)
864             text_out = before + text + after
865             
866         return text_out
867     
868     def display_status(self, len_col):
869         '''Takes a lenght and construct the display of the current status 
870            of the jobs in an array that has a column for each host.
871            It displays the job that is currently running on the host 
872            of the column.
873         
874         :param len_col int: the size of the column 
875         :return: Nothing
876         :rtype: N\A
877         '''
878         
879         display_line = ""
880         for host_port in self.lhosts:
881             jb = self.is_occupied(host_port)
882             if not jb: # nothing running on the host
883                 empty = self.str_of_length("empty", len_col)
884                 display_line += "|" + empty 
885             else:
886                 display_line += "|" + src.printcolors.printcInfo(
887                                         self.str_of_length(jb.name, len_col))
888         
889         self.logger.write("\r" + display_line + "|")
890         self.logger.flush()
891     
892
893     def run_jobs(self):
894         '''The main method. Runs all the jobs on every host. 
895            For each host, at a given time, only one job can be running.
896            The jobs that have the field after (that contain the job that has
897            to be run before it) are run after the previous job.
898            This method stops when all the jobs are finished.
899         
900         :return: Nothing
901         :rtype: N\A
902         '''
903
904         # Print header
905         self.logger.write(src.printcolors.printcInfo(
906                                                 _('Executing the jobs :\n')))
907         text_line = ""
908         for host_port in self.lhosts:
909             host = host_port[0]
910             port = host_port[1]
911             if port == 22: # default value
912                 text_line += "|" + self.str_of_length(host, self.len_columns)
913             else:
914                 text_line += "|" + self.str_of_length(
915                                 "("+host+", "+str(port)+")", self.len_columns)
916         
917         tiret_line = " " + "-"*(len(text_line)-1) + "\n"
918         self.logger.write(tiret_line)
919         self.logger.write(text_line + "|\n")
920         self.logger.write(tiret_line)
921         self.logger.flush()
922         
923         # The infinite loop that runs the jobs
924         l_jobs_not_started = self.dic_job_machine.keys()
925         while len(self._l_jobs_finished) != len(self.dic_job_machine.keys()):
926             new_job_start = False
927             for host_port in self.lhosts:
928                 
929                 if self.is_occupied(host_port):
930                     continue
931              
932                 for jb in l_jobs_not_started:
933                     if (jb.machine.host, jb.machine.port) != host_port:
934                         continue 
935                     if jb.after == None:
936                         jb.run()
937                         l_jobs_not_started.remove(jb)
938                         new_job_start = True
939                         break
940                     else:
941                         jb_before = self.find_job_that_has_name(jb.after) 
942                         if jb_before.has_finished():
943                             jb.run()
944                             l_jobs_not_started.remove(jb)
945                             new_job_start = True
946                             break
947             self.cancel_dependencies_of_failing_jobs()
948             new_job_finished = self.update_jobs_states_list()
949             
950             if new_job_start or new_job_finished:
951                 self.gui.update_xml_files(self.ljobs)            
952                 # Display the current status     
953                 self.display_status(self.len_columns)
954             
955             # Make sure that the proc is not entirely busy
956             time.sleep(0.001)
957         
958         self.logger.write("\n")    
959         self.logger.write(tiret_line)                   
960         self.logger.write("\n\n")
961         
962         self.gui.update_xml_files(self.ljobs)
963         self.gui.last_update()
964
965     def write_all_results(self):
966         '''Display all the jobs outputs.
967         
968         :return: Nothing
969         :rtype: N\A
970         '''
971         
972         for jb in self.dic_job_machine.keys():
973             self.logger.write(src.printcolors.printcLabel(
974                         "#------- Results for job %s -------#\n" % jb.name))
975             jb.write_results()
976             self.logger.write("\n\n")
977
978 class Gui(object):
979     '''Class to manage the the xml data that can be displayed in a browser to
980        see the jobs states
981     '''
982    
983     def __init__(self, xml_dir_path, l_jobs, l_jobs_not_today):
984         '''Initialization
985         
986         :param xml_dir_path str: The path to the directory where to put 
987                                  the xml resulting files
988         :param l_jobs List: the list of jobs that run today
989         :param l_jobs_not_today List: the list of jobs that do not run today
990         '''
991         # The path of the global xml file
992         self.xml_dir_path = xml_dir_path
993         # Initialize the xml files
994         xml_global_path = os.path.join(self.xml_dir_path, "global_report.xml")
995         self.xml_global_file = src.xmlManager.XmlLogFile(xml_global_path,
996                                                          "JobsReport")
997         # The xml files that corresponds to the tables.
998         # {name_table : xml_object}}
999         self.d_xml_table_files = {}
1000         # Create the lines and columns
1001         self.initialize_arrays(l_jobs, l_jobs_not_today)
1002         # Write the xml file
1003         self.update_xml_files(l_jobs)
1004     
1005     def initialize_arrays(self, l_jobs, l_jobs_not_today):
1006         '''Get all the first information needed for each file and write the 
1007            first version of the files   
1008         :param l_jobs List: the list of jobs that run today
1009         :param l_jobs_not_today List: the list of jobs that do not run today
1010         '''
1011         # Get the tables to fill and put it in a dictionary
1012         # {table_name : xml instance corresponding to the table}
1013         for job in l_jobs + l_jobs_not_today:
1014             table = job.table
1015             if (table is not None and 
1016                     table not in self.d_xml_table_files.keys()):
1017                 xml_table_path = os.path.join(self.xml_dir_path, table + ".xml")
1018                 self.d_xml_table_files[table] =  src.xmlManager.XmlLogFile(
1019                                                             xml_table_path,
1020                                                             "JobsReport")
1021                 self.d_xml_table_files[table].add_simple_node("distributions")
1022                 self.d_xml_table_files[table].add_simple_node("applications")
1023                 self.d_xml_table_files[table].add_simple_node("table", text=table)
1024         
1025         # Loop over all jobs in order to get the lines and columns for each 
1026         # xml file
1027         d_dist = {}
1028         d_application = {}
1029         for table in self.d_xml_table_files:
1030             d_dist[table] = []
1031             d_application[table] = []
1032             
1033         l_hosts_ports = []
1034             
1035         for job in l_jobs + l_jobs_not_today:
1036             
1037             if (job.machine.host, job.machine.port) not in l_hosts_ports:
1038                 l_hosts_ports.append((job.machine.host, job.machine.port))
1039                 
1040             distrib = job.distribution
1041             application = job.application
1042             
1043             table_job = job.table
1044             if table is None:
1045                 continue
1046             for table in self.d_xml_table_files:
1047                 if table_job == table:
1048                     if distrib is not None and distrib not in d_dist[table]:
1049                         d_dist[table].append(distrib)
1050                         src.xmlManager.add_simple_node(
1051                             self.d_xml_table_files[table].xmlroot.find('distributions'),
1052                                                    "dist",
1053                                                    attrib={"name" : distrib})
1054                     
1055                 if table_job == table:
1056                     if application is not None and application not in d_application[table]:
1057                         d_application[table].append(application)
1058                         src.xmlManager.add_simple_node(self.d_xml_table_files[table].xmlroot.find('applications'),
1059                                                    "application",
1060                                                    attrib={"name" : application})
1061
1062         # Initialize the hosts_ports node for the global file
1063         self.xmlhosts_ports = self.xml_global_file.add_simple_node("hosts_ports")
1064         for host, port in l_hosts_ports:
1065             host_port = "%s:%i" % (host, port)
1066             src.xmlManager.add_simple_node(self.xmlhosts_ports,
1067                                            "host_port",
1068                                            attrib={"name" : host_port})
1069         
1070         # Initialize the jobs node in all files
1071         for xml_file in [self.xml_global_file] + self.d_xml_table_files.values():
1072             xml_jobs = xml_file.add_simple_node("jobs")      
1073             # Get the jobs present in the config file but that will not be launched
1074             # today
1075             self.put_jobs_not_today(l_jobs_not_today, xml_jobs)
1076             
1077             xml_file.add_simple_node("infos", attrib={"name" : "last update", "JobsCommandStatus" : "running"})
1078
1079     
1080     def put_jobs_not_today(self, l_jobs_not_today, xml_node_jobs):
1081         '''Get all the first information needed for each file and write the 
1082            first version of the files   
1083
1084         :param xml_node_jobs etree.Element: the node corresponding to a job
1085         :param l_jobs_not_today List: the list of jobs that do not run today
1086         '''
1087         for job in l_jobs_not_today:
1088             xmlj = src.xmlManager.add_simple_node(xml_node_jobs,
1089                                                  "job",
1090                                                  attrib={"name" : job.name})
1091             src.xmlManager.add_simple_node(xmlj, "application", job.application)
1092             src.xmlManager.add_simple_node(xmlj,
1093                                            "distribution",
1094                                            job.distribution)
1095             src.xmlManager.add_simple_node(xmlj, "table", job.table)
1096             src.xmlManager.add_simple_node(xmlj,
1097                                        "commands", " ; ".join(job.commands))
1098             src.xmlManager.add_simple_node(xmlj, "state", "Not today")
1099             src.xmlManager.add_simple_node(xmlj, "machine", job.machine.name)
1100             src.xmlManager.add_simple_node(xmlj, "host", job.machine.host)
1101             src.xmlManager.add_simple_node(xmlj, "port", str(job.machine.port))
1102             src.xmlManager.add_simple_node(xmlj, "user", job.machine.user)
1103             src.xmlManager.add_simple_node(xmlj, "sat_path",
1104                                                         job.machine.sat_path)
1105     
1106     def update_xml_files(self, l_jobs):
1107         '''Write all the xml files with updated information about the jobs   
1108
1109         :param l_jobs List: the list of jobs that run today
1110         '''
1111         for xml_file in [self.xml_global_file] + self.d_xml_table_files.values():
1112             self.update_xml_file(l_jobs, xml_file)
1113             
1114         # Write the file
1115         self.write_xml_files()
1116             
1117     def update_xml_file(self, l_jobs, xml_file):      
1118         '''update information about the jobs for the file xml_file   
1119
1120         :param l_jobs List: the list of jobs that run today
1121         :param xml_file xmlManager.XmlLogFile: the xml instance to update
1122         '''
1123         
1124         xml_node_jobs = xml_file.xmlroot.find('jobs')
1125         # Update the job names and status node
1126         for job in l_jobs:
1127             # Find the node corresponding to the job and delete it
1128             # in order to recreate it
1129             for xmljob in xml_node_jobs.findall('job'):
1130                 if xmljob.attrib['name'] == job.name:
1131                     xml_node_jobs.remove(xmljob)
1132             
1133             T0 = str(job._T0)
1134             if T0 != "-1":
1135                 T0 = time.strftime('%Y-%m-%d %H:%M:%S', 
1136                                        time.localtime(job._T0))
1137             Tf = str(job._Tf)
1138             if Tf != "-1":
1139                 Tf = time.strftime('%Y-%m-%d %H:%M:%S', 
1140                                        time.localtime(job._Tf))
1141             
1142             # recreate the job node
1143             xmlj = src.xmlManager.add_simple_node(xml_node_jobs,
1144                                                   "job",
1145                                                   attrib={"name" : job.name})
1146             src.xmlManager.add_simple_node(xmlj, "machine", job.machine.name)
1147             src.xmlManager.add_simple_node(xmlj, "host", job.machine.host)
1148             src.xmlManager.add_simple_node(xmlj, "port", str(job.machine.port))
1149             src.xmlManager.add_simple_node(xmlj, "user", job.machine.user)
1150             src.xmlManager.add_simple_node(xmlj, "sat_path",
1151                                            job.machine.sat_path)
1152             src.xmlManager.add_simple_node(xmlj, "application", job.application)
1153             src.xmlManager.add_simple_node(xmlj, "distribution",
1154                                            job.distribution)
1155             src.xmlManager.add_simple_node(xmlj, "table", job.table)
1156             src.xmlManager.add_simple_node(xmlj, "timeout", str(job.timeout))
1157             src.xmlManager.add_simple_node(xmlj, "commands",
1158                                            " ; ".join(job.commands))
1159             src.xmlManager.add_simple_node(xmlj, "state", job.get_status())
1160             src.xmlManager.add_simple_node(xmlj, "begin", T0)
1161             src.xmlManager.add_simple_node(xmlj, "end", Tf)
1162             src.xmlManager.add_simple_node(xmlj, "out",
1163                                            src.printcolors.cleancolor(job.out))
1164             src.xmlManager.add_simple_node(xmlj, "err",
1165                                            src.printcolors.cleancolor(job.err))
1166             src.xmlManager.add_simple_node(xmlj, "res", str(job.res_job))
1167             if len(job.remote_log_files) > 0:
1168                 src.xmlManager.add_simple_node(xmlj,
1169                                                "remote_log_file_path",
1170                                                job.remote_log_files[0])
1171             else:
1172                 src.xmlManager.add_simple_node(xmlj,
1173                                                "remote_log_file_path",
1174                                                "nothing")           
1175             
1176             xmlafter = src.xmlManager.add_simple_node(xmlj, "after", job.after)
1177             # get the job father
1178             if job.after is not None:
1179                 job_father = None
1180                 for jb in l_jobs:
1181                     if jb.name == job.after:
1182                         job_father = jb
1183                 if job_father is None:
1184                     msg = _("The job %(father_name)s that is parent of "
1185                             "%(son_name)s is not in the job list." %
1186                             {"father_name" : job.after , "son_name" : job.name})
1187                     raise src.SatException(msg)
1188                 
1189                 if len(job_father.remote_log_files) > 0:
1190                     link = job_father.remote_log_files[0]
1191                 else:
1192                     link = "nothing"
1193                 src.xmlManager.append_node_attrib(xmlafter, {"link" : link})
1194             
1195         
1196         # Update the date
1197         xml_node_infos = xml_file.xmlroot.find('infos')
1198         src.xmlManager.append_node_attrib(xml_node_infos,
1199                     attrib={"value" : 
1200                     datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")})
1201                
1202
1203     
1204     def last_update(self, finish_status = "finished"):
1205         '''update information about the jobs for the file xml_file   
1206
1207         :param l_jobs List: the list of jobs that run today
1208         :param xml_file xmlManager.XmlLogFile: the xml instance to update
1209         '''
1210         for xml_file in [self.xml_global_file] + self.d_xml_table_files.values():
1211             xml_node_infos = xml_file.xmlroot.find('infos')
1212             src.xmlManager.append_node_attrib(xml_node_infos,
1213                         attrib={"JobsCommandStatus" : finish_status})
1214         # Write the file
1215         self.write_xml_files()
1216     
1217     def write_xml_files(self):
1218         ''' Write the xml files   
1219         '''
1220         self.xml_global_file.write_tree(STYLESHEET_GLOBAL)
1221         for xml_file in self.d_xml_table_files.values():
1222             xml_file.write_tree(STYLESHEET_TABLE)
1223         
1224 ##
1225 # Describes the command
1226 def description():
1227     return _("The jobs command launches maintenances that are described"
1228              " in the dedicated jobs configuration file.")
1229
1230 ##
1231 # Runs the command.
1232 def run(args, runner, logger):
1233        
1234     (options, args) = parser.parse_args(args)
1235        
1236     jobs_cfg_files_dir = runner.cfg.SITE.jobs.config_path
1237     
1238     l_cfg_dir = [jobs_cfg_files_dir,
1239                  os.path.join(runner.cfg.VARS.datadir, "jobs")]
1240     
1241     # Make sure the path to the jobs config files directory exists 
1242     src.ensure_path_exists(jobs_cfg_files_dir)   
1243     
1244     # list option : display all the available config files
1245     if options.list:
1246         for cfg_dir in l_cfg_dir:
1247             if not options.no_label:
1248                 logger.write("------ %s\n" % 
1249                                  src.printcolors.printcHeader(cfg_dir))
1250     
1251             for f in sorted(os.listdir(cfg_dir)):
1252                 if not f.endswith('.pyconf'):
1253                     continue
1254                 cfilename = f[:-7]
1255                 logger.write("%s\n" % cfilename)
1256         return 0
1257
1258     # Make sure the jobs_config option has been called
1259     if not options.jobs_cfg:
1260         message = _("The option --jobs_config is required\n")      
1261         src.printcolors.printcError(message)
1262         return 1
1263     
1264     # Find the file in the directories
1265     found = False
1266     for cfg_dir in l_cfg_dir:
1267         file_jobs_cfg = os.path.join(cfg_dir, options.jobs_cfg)
1268         if not file_jobs_cfg.endswith('.pyconf'):
1269             file_jobs_cfg += '.pyconf'
1270         
1271         if not os.path.exists(file_jobs_cfg):
1272             continue
1273         else:
1274             found = True
1275             break
1276     
1277     if not found:
1278         msg = _("The file configuration %(name_file)s was not found."
1279                 "\nUse the --list option to get the possible files.")
1280         src.printcolors.printcError(msg)
1281         return 1
1282     
1283     info = [
1284         (_("Platform"), runner.cfg.VARS.dist),
1285         (_("File containing the jobs configuration"), file_jobs_cfg)
1286     ]    
1287     src.print_info(logger, info)
1288     
1289     # Read the config that is in the file
1290     config_jobs = src.read_config_from_a_file(file_jobs_cfg)
1291     if options.only_jobs:
1292         l_jb = src.pyconf.Sequence()
1293         for jb in config_jobs.jobs:
1294             if jb.name in options.only_jobs:
1295                 l_jb.append(jb,
1296                 "Adding a job that was given in only_jobs option parameters")
1297         config_jobs.jobs = l_jb
1298               
1299     # Initialization
1300     today_jobs = Jobs(runner,
1301                       logger,
1302                       options.jobs_cfg,
1303                       file_jobs_cfg,
1304                       config_jobs)
1305     # SSH connection to all machines
1306     today_jobs.ssh_connection_all_machines()
1307     if options.test_connection:
1308         return 0
1309     
1310     gui = None
1311     if options.publish:
1312         gui = Gui("/export/home/serioja/LOGS",
1313                   today_jobs.ljobs,
1314                   today_jobs.ljobs_not_today,)
1315     
1316     today_jobs.gui = gui
1317     
1318     interruped = False
1319     try:
1320         # Run all the jobs contained in config_jobs
1321         today_jobs.run_jobs()
1322     except KeyboardInterrupt:
1323         interruped = True
1324         logger.write("\n\n%s\n\n" % 
1325                 (src.printcolors.printcWarning(_("Forced interruption"))), 1)
1326         
1327     finally:
1328         # find the potential not finished jobs and kill them
1329         for jb in today_jobs.ljobs:
1330             if not jb.has_finished():
1331                 jb.kill_remote_process()
1332         if interruped:
1333             today_jobs.gui.last_update(_("Forced interruption"))
1334         else:
1335             today_jobs.gui.last_update()
1336         # Output the results
1337         today_jobs.write_all_results()