Salome HOME
link to remote machine logs in the matrix of jobs command
[tools/sat.git] / commands / jobs.py
1 #!/usr/bin/env python
2 #-*- coding:utf-8 -*-
3 #  Copyright (C) 2010-2013  CEA/DEN
4 #
5 #  This library is free software; you can redistribute it and/or
6 #  modify it under the terms of the GNU Lesser General Public
7 #  License as published by the Free Software Foundation; either
8 #  version 2.1 of the License.
9 #
10 #  This library is distributed in the hope that it will be useful,
11 #  but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 #  Lesser General Public License for more details.
14 #
15 #  You should have received a copy of the GNU Lesser General Public
16 #  License along with this library; if not, write to the Free Software
17 #  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
18
19 import os
20 import datetime
21 import time
22 import paramiko
23
24 import src
25
26
27 parser = src.options.Options()
28
29 parser.add_option('j', 'jobs_config', 'string', 'jobs_cfg', 
30                   _('The name of the config file that contains'
31                   ' the jobs configuration'))
32 parser.add_option('o', 'only_jobs', 'list2', 'only_jobs',
33                   _('The list of jobs to launch, by their name. '))
34 parser.add_option('l', 'list', 'boolean', 'list', 
35                   _('list all available config files.'))
36 parser.add_option('n', 'no_label', 'boolean', 'no_label',
37                   _("do not print labels, Works only with --list."), False)
38 parser.add_option('t', 'test_connection', 'boolean', 'test_connection',
39                   _("Try to connect to the machines. Not executing the jobs."),
40                   False)
41 parser.add_option('p', 'publish', 'boolean', 'publish',
42                   _("Generate an xml file that can be read in a browser to "
43                     "display the jobs status."),
44                   False)
45
46 class machine(object):
47     '''Class to manage a ssh connection on a machine
48     '''
49     def __init__(self, name, host, user, port=22, passwd=None, sat_path="./"):
50         self.name = name
51         self.host = host
52         self.port = port
53         self.user = user
54         self.password = passwd
55         self.sat_path = sat_path
56         self.ssh = paramiko.SSHClient()
57         self._connection_successful = None
58     
59     def connect(self, logger):
60         '''Initiate the ssh connection to the remote machine
61         
62         :param logger src.logger.Logger: The logger instance 
63         :return: Nothing
64         :rtype: N\A
65         '''
66
67         self._connection_successful = False
68         self.ssh.load_system_host_keys()
69         self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
70         try:
71             self.ssh.connect(self.host,
72                              port=self.port,
73                              username=self.user,
74                              password = self.password)
75         except paramiko.AuthenticationException:
76             message = src.KO_STATUS + _("Authentication failed")
77         except paramiko.BadHostKeyException:
78             message = (src.KO_STATUS + 
79                        _("The server's host key could not be verified"))
80         except paramiko.SSHException:
81             message = ( _("SSHException error connecting or establishing an SSH session"))            
82         except:
83             message = ( _("Error connecting or establishing an SSH session"))
84         else:
85             self._connection_successful = True
86             message = ""
87         return message
88     
89     def successfully_connected(self, logger):
90         '''Verify if the connection to the remote machine has succeed
91         
92         :param logger src.logger.Logger: The logger instance 
93         :return: True if the connection has succeed, False if not
94         :rtype: bool
95         '''
96         if self._connection_successful == None:
97             message = "Warning : trying to ask if the connection to "
98             "(host: %s, port: %s, user: %s) is OK whereas there were"
99             " no connection request" % \
100             (machine.host, machine.port, machine.user)
101             logger.write( src.printcolors.printcWarning(message))
102         return self._connection_successful
103
104     def copy_sat(self, sat_local_path, job_file):
105         '''Copy salomeTools to the remote machine in self.sat_path
106         '''
107         res = 0
108         try:
109             self.sftp = self.ssh.open_sftp()
110             self.mkdir(self.sat_path, ignore_existing=True)
111             self.put_dir(sat_local_path, self.sat_path, filters = ['.git'])
112             job_file_name = os.path.basename(job_file)
113             self.sftp.put(job_file, os.path.join(self.sat_path, "data", "jobs", job_file_name))
114         except Exception as e:
115             res = str(e)
116             self._connection_successful = False
117         
118         return res
119         
120     def put_dir(self, source, target, filters = []):
121         ''' Uploads the contents of the source directory to the target path. The
122             target directory needs to exists. All subdirectories in source are 
123             created under target.
124         '''
125         for item in os.listdir(source):
126             if item in filters:
127                 continue
128             source_path = os.path.join(source, item)
129             destination_path = os.path.join(target, item)
130             if os.path.islink(source_path):
131                 linkto = os.readlink(source_path)
132                 try:
133                     self.sftp.remove(destination_path)
134                     self.sftp.symlink(linkto, destination_path)
135                     self.sftp.chmod(destination_path, os.stat(source_path).st_mode)
136                 except IOError:
137                     pass
138             else:
139                 if os.path.isfile(source_path):
140                     self.sftp.put(source_path, destination_path)
141                     self.sftp.chmod(destination_path, os.stat(source_path).st_mode)
142                 else:
143                     self.mkdir(destination_path, ignore_existing=True)
144                     self.put_dir(source_path, destination_path)
145
146     def mkdir(self, path, mode=511, ignore_existing=False):
147         ''' Augments mkdir by adding an option to not fail if the folder exists  '''
148         try:
149             self.sftp.mkdir(path, mode)
150         except IOError:
151             if ignore_existing:
152                 pass
153             else:
154                 raise       
155     
156     def exec_command(self, command, logger):
157         '''Execute the command on the remote machine
158         
159         :param command str: The command to be run
160         :param logger src.logger.Logger: The logger instance 
161         :return: the stdin, stdout, and stderr of the executing command,
162                  as a 3-tuple
163         :rtype: (paramiko.channel.ChannelFile, paramiko.channel.ChannelFile, 
164                 paramiko.channel.ChannelFile)
165         '''
166         try:        
167             # Does not wait the end of the command
168             (stdin, stdout, stderr) = self.ssh.exec_command(command)
169         except paramiko.SSHException:
170             message = src.KO_STATUS + _(
171                             ": the server failed to execute the command\n")
172             logger.write( src.printcolors.printcError(message))
173             return (None, None, None)
174         except:
175             logger.write( src.printcolors.printcError(src.KO_STATUS + '\n'))
176             return (None, None, None)
177         else:
178             return (stdin, stdout, stderr)
179
180     def close(self):
181         '''Close the ssh connection
182         
183         :rtype: N\A
184         '''
185         self.ssh.close()
186      
187     def write_info(self, logger):
188         '''Prints the informations relative to the machine in the logger 
189            (terminal traces and log file)
190         
191         :param logger src.logger.Logger: The logger instance
192         :return: Nothing
193         :rtype: N\A
194         '''
195         logger.write("host : " + self.host + "\n")
196         logger.write("port : " + str(self.port) + "\n")
197         logger.write("user : " + str(self.user) + "\n")
198         if self.successfully_connected(logger):
199             status = src.OK_STATUS
200         else:
201             status = src.KO_STATUS
202         logger.write("Connection : " + status + "\n\n") 
203
204
205 class job(object):
206     '''Class to manage one job
207     '''
208     def __init__(self, name, machine, application, distribution,
209                  commands, timeout, logger, job_file, after=None):
210
211         self.name = name
212         self.machine = machine
213         self.after = after
214         self.timeout = timeout
215         self.application = application
216         self.distribution = distribution
217         self.logger = logger
218         self.remote_log_files = []
219         
220         self._T0 = -1
221         self._Tf = -1
222         self._has_begun = False
223         self._has_finished = False
224         self._has_timouted = False
225         self._stdin = None # Store the command inputs field
226         self._stdout = None # Store the command outputs field
227         self._stderr = None # Store the command errors field
228
229         self.out = None # Contains something only if the job is finished
230         self.err = None # Contains something only if the job is finished    
231                
232         self.commands = commands
233         self.command = os.path.join(self.machine.sat_path, "sat") + " -v1 job --jobs_config " + job_file + " --job " + self.name
234     
235     def get_pids(self):
236         pids = []
237         cmd_pid = 'ps aux | grep "sat -v1 job --jobs_config" | awk \'{print $2}\''
238         (_, out_pid, _) = self.machine.exec_command(cmd_pid, self.logger)
239         pids_cmd = out_pid.readlines()
240         pids_cmd = [str(src.only_numbers(pid)) for pid in pids_cmd]
241         pids+=pids_cmd
242         return pids
243     
244     def kill_remote_process(self):
245         '''Kills the process on the remote machine.
246         
247         :return: (the output of the kill, the error of the kill)
248         :rtype: (str, str)
249         '''
250         
251         pids = self.get_pids()
252         cmd_kill = " ; ".join([("kill -9 " + pid) for pid in pids])
253         (_, out_kill, err_kill) = self.machine.exec_command(cmd_kill, 
254                                                             self.logger)
255         return (out_kill, err_kill)
256             
257     def has_begun(self):
258         '''Returns True if the job has already begun
259         
260         :return: True if the job has already begun
261         :rtype: bool
262         '''
263         return self._has_begun
264     
265     def has_finished(self):
266         '''Returns True if the job has already finished 
267            (i.e. all the commands have been executed)
268            If it is finished, the outputs are stored in the fields out and err.  
269         
270         :return: True if the job has already finished
271         :rtype: bool
272         '''
273         
274         # If the method has already been called and returned True
275         if self._has_finished:
276             return True
277         
278         # If the job has not begun yet
279         if not self.has_begun():
280             return False
281         
282         if self._stdout.channel.closed:
283             self._has_finished = True
284             # Store the result outputs
285             self.out = self._stdout.read()
286             self.err = self._stderr.read()
287             # Put end time
288             self._Tf = time.time()
289             # And get the remote command status and log files
290             self.get_log_files()
291         
292         return self._has_finished
293     
294     def get_log_files(self):
295         if not self.has_finished():
296             msg = _("Trying to get log files whereas the job is not finished.")
297             self.logger.write(src.printcolors.printcWarning(msg))
298             return
299         out_lines = self.out.split("\n")
300         out_lines = [line for line in out_lines if line != '']
301         self.res_job = out_lines[0]
302         for job_path_remote in out_lines[1:]:
303             if os.path.basename(os.path.dirname(job_path_remote)) != 'OUT':
304                 local_path = os.path.join(os.path.dirname(self.logger.logFilePath), os.path.basename(job_path_remote))
305                 self.machine.sftp.get(job_path_remote, local_path)
306             else:
307                 local_path = os.path.join(os.path.dirname(self.logger.logFilePath), 'OUT', os.path.basename(job_path_remote))
308                 self.machine.sftp.get(job_path_remote, local_path)
309             self.remote_log_files.append(local_path)
310     
311     def is_running(self):
312         '''Returns True if the job commands are running 
313         
314         :return: True if the job is running
315         :rtype: bool
316         '''
317         return self.has_begun() and not self.has_finished()
318
319     def is_timeout(self):
320         '''Returns True if the job commands has finished with timeout 
321         
322         :return: True if the job has finished with timeout
323         :rtype: bool
324         '''
325         return self._has_timouted
326
327     def time_elapsed(self):
328         if not self.has_begun():
329             return -1
330         T_now = time.time()
331         return T_now - self._T0
332     
333     def check_time(self):
334         if not self.has_begun():
335             return
336         if self.time_elapsed() > self.timeout:
337             self._has_finished = True
338             self._has_timouted = True
339             self._Tf = time.time()
340             self.get_pids()
341             (out_kill, _) = self.kill_remote_process()
342             self.out = "TIMEOUT \n" + out_kill.read()
343             self.err = "TIMEOUT : %s seconds elapsed\n" % str(self.timeout)
344     
345     def total_duration(self):
346         return self._Tf - self._T0
347         
348     def run(self, logger):
349         if self.has_begun():
350             print("Warn the user that a job can only be launched one time")
351             return
352         
353         if not self.machine.successfully_connected(logger):
354             self._has_finished = True
355             self.out = "N\A"
356             self.err = ("Connection to machine (name : %s, host: %s, port: %s, user: %s) has failed\nUse the log command to get more information." 
357                         % (self.machine.name, self.machine.host, self.machine.port, self.machine.user))
358         else:
359             self._T0 = time.time()
360             self._stdin, self._stdout, self._stderr = self.machine.exec_command(
361                                                         self.command, logger)
362             if (self._stdin, self._stdout, self._stderr) == (None, None, None):
363                 self._has_finished = True
364                 self._Tf = time.time()
365                 self.out = "N\A"
366                 self.err = "The server failed to execute the command"
367         
368         self._has_begun = True
369     
370     def write_results(self, logger):
371         logger.write("name : " + self.name + "\n")
372         if self.after:
373             logger.write("after : %s\n" % self.after)
374         logger.write("Time elapsed : %4imin %2is \n" % 
375                      (self.total_duration()/60 , self.total_duration()%60))
376         if self._T0 != -1:
377             logger.write("Begin time : %s\n" % 
378                          time.strftime('%Y-%m-%d %H:%M:%S', 
379                                        time.localtime(self._T0)) )
380         if self._Tf != -1:
381             logger.write("End time   : %s\n\n" % 
382                          time.strftime('%Y-%m-%d %H:%M:%S', 
383                                        time.localtime(self._Tf)) )
384         
385         machine_head = "Informations about connection :\n"
386         underline = (len(machine_head) - 2) * "-"
387         logger.write(src.printcolors.printcInfo(machine_head + underline + "\n"))
388         self.machine.write_info(logger)
389         
390         logger.write(src.printcolors.printcInfo("out : \n"))
391         if self.out is None:
392             logger.write("Unable to get output\n")
393         else:
394             logger.write(self.out + "\n")
395         logger.write(src.printcolors.printcInfo("err : \n"))
396         if self.err is None:
397             logger.write("Unable to get error\n")
398         else:
399             logger.write(self.err + "\n")
400         
401     def get_status(self):
402         if not self.machine.successfully_connected(self.logger):
403             return "SSH connection KO"
404         if not self.has_begun():
405             return "Not launched"
406         if self.is_running():
407             return "running since " + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(self._T0))        
408         if self.has_finished():
409             if self.is_timeout():
410                 return "Timeout since " + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(self._Tf))
411             return "Finished since " + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(self._Tf))
412     
413 class Jobs(object):
414     '''Class to manage the jobs to be run
415     '''
416     def __init__(self, runner, logger, job_file, job_file_path, config_jobs, lenght_columns = 20):
417         # The jobs configuration
418         self.cfg_jobs = config_jobs
419         self.job_file = job_file
420         self.job_file_path = job_file_path
421         # The machine that will be used today
422         self.lmachines = []
423         # The list of machine (hosts, port) that will be used today 
424         # (a same host can have several machine instances since there 
425         # can be several ssh parameters) 
426         self.lhosts = []
427         # The jobs to be launched today 
428         self.ljobs = []     
429         self.runner = runner
430         self.logger = logger
431         # The correlation dictionary between jobs and machines
432         self.dic_job_machine = {} 
433         self.len_columns = lenght_columns
434         
435         # the list of jobs that have not been run yet
436         self._l_jobs_not_started = []
437         # the list of jobs that have already ran 
438         self._l_jobs_finished = []
439         # the list of jobs that are running 
440         self._l_jobs_running = [] 
441                 
442         self.determine_products_and_machines()
443     
444     def define_job(self, job_def, machine):
445         '''Takes a pyconf job definition and a machine (from class machine) 
446            and returns the job instance corresponding to the definition.
447         
448         :param job_def src.config.Mapping: a job definition 
449         :param machine machine: the machine on which the job will run
450         :return: The corresponding job in a job class instance
451         :rtype: job
452         '''
453         name = job_def.name
454         cmmnds = job_def.commands
455         timeout = job_def.timeout
456         after = None
457         if 'after' in job_def:
458             after = job_def.after
459         application = None
460         if 'application' in job_def:
461             application = job_def.application
462         distribution = None
463         if 'distribution' in job_def:
464             distribution = job_def.distribution
465             
466         return job(name, machine, application, distribution, cmmnds, timeout, self.logger, self.job_file , after = after)
467     
468     def determine_products_and_machines(self):
469         '''Function that reads the pyconf jobs definition and instantiates all
470            the machines and jobs to be done today.
471
472         :return: Nothing
473         :rtype: N\A
474         '''
475         today = datetime.date.weekday(datetime.date.today())
476         host_list = []
477                
478         for job_def in self.cfg_jobs.jobs :
479             if today in job_def.when:
480                 
481                 name_machine = job_def.machine
482                 
483                 a_machine = None
484                 for mach in self.lmachines:
485                     if mach.name == name_machine:
486                         a_machine = mach
487                         break
488                 
489                 if a_machine == None:
490                     for machine_def in self.cfg_jobs.machines:
491                         if machine_def.name == name_machine:
492                             if 'host' not in machine_def:
493                                 host = self.runner.cfg.VARS.hostname
494                             else:
495                                 host = machine_def.host
496
497                             if 'user' not in machine_def:
498                                 user = self.runner.cfg.VARS.user
499                             else:
500                                 user = machine_def.user
501
502                             if 'port' not in machine_def:
503                                 port = 22
504                             else:
505                                 port = machine_def.port
506                 
507                             if 'password' not in machine_def:
508                                 passwd = None
509                             else:
510                                 passwd = machine_def.password    
511                                 
512                             if 'sat_path' not in machine_def:
513                                 sat_path = "./"
514                             else:
515                                 sat_path = machine_def.sat_path
516                             
517                             a_machine = machine(
518                                                 machine_def.name,
519                                                 host,
520                                                 user,
521                                                 port=port,
522                                                 passwd=passwd,
523                                                 sat_path=sat_path
524                                                 )
525                             
526                             if (host, port) not in host_list:
527                                 host_list.append((host, port))
528                                              
529                             self.lmachines.append(a_machine)
530                 
531                 if a_machine == None:
532                     msg = _("WARNING: The job \"%(job_name)s\" requires the "
533                             "machine \"%(machine_name)s\" but this machine "
534                             "is not defined in the configuration file.\n"
535                             "The job will not be launched")
536                     self.logger.write(src.printcolors.printcWarning(msg))
537                                   
538                 a_job = self.define_job(job_def, a_machine)
539                 self.dic_job_machine[a_job] = a_machine
540                 
541                 self.ljobs.append(a_job)
542                                      
543         self.lhosts = host_list
544         
545     def ssh_connection_all_machines(self, pad=50):
546         '''Function that do the ssh connection to every machine 
547            to be used today.
548
549         :return: Nothing
550         :rtype: N\A
551         '''
552         self.logger.write(src.printcolors.printcInfo((
553                         "Establishing connection with all the machines :\n")))
554         for machine in self.lmachines:
555             # little algorithm in order to display traces
556             begin_line = (_("Connection to %s: " % machine.name))
557             if pad - len(begin_line) < 0:
558                 endline = " "
559             else:
560                 endline = (pad - len(begin_line)) * "." + " "
561             
562             step = "SSH connection"
563             self.logger.write( begin_line + endline + step)
564             self.logger.flush()
565             # the call to the method that initiate the ssh connection
566             msg = machine.connect(self.logger)
567             
568             # Copy salomeTools to the remote machine
569             if machine.successfully_connected(self.logger):
570                 step = _("Copy SAT")
571                 self.logger.write('\r%s%s%s' % (begin_line, endline, 20 * " "), 3)
572                 self.logger.write('\r%s%s%s' % (begin_line, endline, step), 3)
573                 self.logger.flush()
574                 res_copy = machine.copy_sat(self.runner.cfg.VARS.salometoolsway, self.job_file_path)
575                 # Print the status of the copy
576                 if res_copy == 0:
577                     self.logger.write('\r%s' % ((len(begin_line)+len(endline)+20) * " "), 3)
578                     self.logger.write('\r%s%s%s' % (begin_line, endline, src.printcolors.printc(src.OK_STATUS)), 3)
579                 else:
580                     self.logger.write('\r%s' % ((len(begin_line)+len(endline)+20) * " "), 3)
581                     self.logger.write('\r%s%s%s %s' % (begin_line, endline, src.printcolors.printc(src.OK_STATUS), _("Copy of SAT failed")), 3)
582             else:
583                 self.logger.write('\r%s' % ((len(begin_line)+len(endline)+20) * " "), 3)
584                 self.logger.write('\r%s%s%s %s' % (begin_line, endline, src.printcolors.printc(src.KO_STATUS), msg), 3)
585             self.logger.write("\n", 3)
586                 
587         self.logger.write("\n")
588         
589
590     def is_occupied(self, hostname):
591         '''Function that returns True if a job is running on 
592            the machine defined by its host and its port.
593         
594         :param hostname (str, int): the pair (host, port)
595         :return: the job that is running on the host, 
596                 or false if there is no job running on the host. 
597         :rtype: job / bool
598         '''
599         host = hostname[0]
600         port = hostname[1]
601         for jb in self.dic_job_machine:
602             if jb.machine.host == host and jb.machine.port == port:
603                 if jb.is_running():
604                     return jb
605         return False
606     
607     def update_jobs_states_list(self):
608         '''Function that updates the lists that store the currently
609            running jobs and the jobs that have already finished.
610         
611         :return: Nothing. 
612         :rtype: N\A
613         '''
614         jobs_finished_list = []
615         jobs_running_list = []
616         for jb in self.dic_job_machine:
617             if jb.is_running():
618                 jobs_running_list.append(jb)
619                 jb.check_time()
620             if jb.has_finished():
621                 jobs_finished_list.append(jb)
622         
623         nb_job_finished_before = len(self._l_jobs_finished)
624         self._l_jobs_finished = jobs_finished_list
625         self._l_jobs_running = jobs_running_list
626         
627         nb_job_finished_now = len(self._l_jobs_finished)
628         
629         return nb_job_finished_now > nb_job_finished_before
630             
631     
632     def findJobThatHasName(self, name):
633         '''Returns the job by its name.
634         
635         :param name str: a job name
636         :return: the job that has the name. 
637         :rtype: job
638         '''
639         for jb in self.ljobs:
640             if jb.name == name:
641                 return jb
642
643         # the following is executed only if the job was not found
644         msg = _('The job "%s" seems to be nonexistent') % name
645         raise src.SatException(msg)
646     
647     def str_of_length(self, text, length):
648         '''Takes a string text of any length and returns 
649            the most close string of length "length".
650         
651         :param text str: any string
652         :param length int: a length for the returned string
653         :return: the most close string of length "length"
654         :rtype: str
655         '''
656         if len(text) > length:
657             text_out = text[:length-3] + '...'
658         else:
659             diff = length - len(text)
660             before = " " * (diff/2)
661             after = " " * (diff/2 + diff%2)
662             text_out = before + text + after
663             
664         return text_out
665     
666     def display_status(self, len_col):
667         '''Takes a lenght and construct the display of the current status 
668            of the jobs in an array that has a column for each host.
669            It displays the job that is currently running on the host 
670            of the column.
671         
672         :param len_col int: the size of the column 
673         :return: Nothing
674         :rtype: N\A
675         '''
676         
677         display_line = ""
678         for host_port in self.lhosts:
679             jb = self.is_occupied(host_port)
680             if not jb: # nothing running on the host
681                 empty = self.str_of_length("empty", len_col)
682                 display_line += "|" + empty 
683             else:
684                 display_line += "|" + src.printcolors.printcInfo(
685                                         self.str_of_length(jb.name, len_col))
686         
687         self.logger.write("\r" + display_line + "|")
688         self.logger.flush()
689     
690
691     def run_jobs(self):
692         '''The main method. Runs all the jobs on every host. 
693            For each host, at a given time, only one job can be running.
694            The jobs that have the field after (that contain the job that has
695            to be run before it) are run after the previous job.
696            This method stops when all the jobs are finished.
697         
698         :return: Nothing
699         :rtype: N\A
700         '''
701
702         # Print header
703         self.logger.write(src.printcolors.printcInfo(
704                                                 _('Executing the jobs :\n')))
705         text_line = ""
706         for host_port in self.lhosts:
707             host = host_port[0]
708             port = host_port[1]
709             if port == 22: # default value
710                 text_line += "|" + self.str_of_length(host, self.len_columns)
711             else:
712                 text_line += "|" + self.str_of_length(
713                                 "("+host+", "+str(port)+")", self.len_columns)
714         
715         tiret_line = " " + "-"*(len(text_line)-1) + "\n"
716         self.logger.write(tiret_line)
717         self.logger.write(text_line + "|\n")
718         self.logger.write(tiret_line)
719         self.logger.flush()
720         
721         # The infinite loop that runs the jobs
722         l_jobs_not_started = self.dic_job_machine.keys()
723         while len(self._l_jobs_finished) != len(self.dic_job_machine.keys()):
724             new_job_start = False
725             for host_port in self.lhosts:
726                 
727                 if self.is_occupied(host_port):
728                     continue
729              
730                 for jb in l_jobs_not_started:
731                     if (jb.machine.host, jb.machine.port) != host_port:
732                         continue 
733                     if jb.after == None:
734                         jb.run(self.logger)
735                         l_jobs_not_started.remove(jb)
736                         new_job_start = True
737                         break
738                     else:
739                         jb_before = self.findJobThatHasName(jb.after) 
740                         if jb_before.has_finished():
741                             jb.run(self.logger)
742                             l_jobs_not_started.remove(jb)
743                             new_job_start = True
744                             break
745             
746             new_job_finished = self.update_jobs_states_list()
747             
748             if new_job_start or new_job_finished:
749                 self.gui.update_xml_file(self.ljobs)            
750                 # Display the current status     
751                 self.display_status(self.len_columns)
752             
753             # Make sure that the proc is not entirely busy
754             time.sleep(0.001)
755         
756         self.logger.write("\n")    
757         self.logger.write(tiret_line)                   
758         self.logger.write("\n\n")
759         
760         self.gui.update_xml_file(self.ljobs)
761         self.gui.last_update()
762
763     def write_all_results(self):
764         '''Display all the jobs outputs.
765         
766         :return: Nothing
767         :rtype: N\A
768         '''
769         
770         for jb in self.dic_job_machine.keys():
771             self.logger.write(src.printcolors.printcLabel(
772                         "#------- Results for job %s -------#\n" % jb.name))
773             jb.write_results(self.logger)
774             self.logger.write("\n\n")
775
776 class Gui(object):
777     '''Class to manage the the xml data that can be displayed in a browser to
778        see the jobs states
779     '''
780     
781     """
782     <?xml version='1.0' encoding='utf-8'?>
783     <?xml-stylesheet type='text/xsl' href='job_report.xsl'?>
784     <JobsReport>
785       <infos>
786         <info name="generated" value="2016-06-02 07:06:45"/>
787       </infos>
788       <hosts>
789           <host name=is221553 port=22 distribution=UB12.04/>
790           <host name=is221560 port=22/>
791           <host name=is221553 port=22 distribution=FD20/>
792       </hosts>
793       <applications>
794           <application name=SALOME-7.8.0/>
795           <application name=SALOME-master/>
796           <application name=MED-STANDALONE-master/>
797           <application name=CORPUS/>
798       </applications>
799       
800       <jobs>
801           <job name="7.8.0 FD22">
802                 <host>is228809</host>
803                 <port>2200</port>
804                 <application>SALOME-7.8.0</application>
805                 <user>adminuser</user>
806                 <timeout>240</timeout>
807                 <commands>
808                     export DISPLAY=is221560
809                     scp -p salome@is221560.intra.cea.fr:/export/home/salome/SALOME-7.7.1p1-src.tgz /local/adminuser         
810                     tar xf /local/adminuser/SALOME-7.7.1p1-src.tgz -C /local/adminuser
811                 </commands>
812                 <state>Not launched</state>
813           </job>
814
815           <job name="master MG05">
816                 <host>is221560</host>
817                 <port>22</port>
818                 <application>SALOME-master</application>
819                 <user>salome</user>
820                 <timeout>240</timeout>
821                 <commands>
822                     export DISPLAY=is221560
823                     scp -p salome@is221560.intra.cea.fr:/export/home/salome/SALOME-7.7.1p1-src.tgz /local/adminuser         
824                     sat prepare SALOME-master
825                     sat compile SALOME-master
826                     sat check SALOME-master
827                     sat launcher SALOME-master
828                     sat test SALOME-master
829                 </commands>
830                 <state>Running since 23 min</state>
831                 <!-- <state>time out</state> -->
832                 <!-- <state>OK</state> -->
833                 <!-- <state>KO</state> -->
834                 <begin>10/05/2016 20h32</begin>
835                 <end>10/05/2016 22h59</end>
836           </job>
837
838       </jobs>
839     </JobsReport>
840     
841     """
842     
843     def __init__(self, xml_file_path, l_jobs, stylesheet):
844         # The path of the xml file
845         self.xml_file_path = xml_file_path
846         # The stylesheet
847         self.stylesheet = stylesheet
848         # Open the file in a writing stream
849         self.xml_file = src.xmlManager.XmlLogFile(xml_file_path, "JobsReport")
850         # Create the lines and columns
851         self.initialize_array(l_jobs)
852         # Write the wml file
853         self.update_xml_file(l_jobs)
854     
855     def initialize_array(self, l_jobs):
856         l_dist = []
857         l_applications = []
858         for job in l_jobs:
859             distrib = job.distribution
860             if distrib not in l_dist:
861                 l_dist.append(distrib)
862             
863             application = job.application
864             if application not in l_applications:
865                 l_applications.append(application)
866                     
867         self.l_dist = l_dist
868         self.l_applications = l_applications
869         
870         # Update the hosts node
871         self.xmldists = self.xml_file.add_simple_node("distributions")
872         for dist_name in self.l_dist:
873             src.xmlManager.add_simple_node(self.xmldists, "dist", attrib={"name" : dist_name})
874             
875         # Update the applications node
876         self.xmlapplications = self.xml_file.add_simple_node("applications")
877         for application in self.l_applications:
878             src.xmlManager.add_simple_node(self.xmlapplications, "application", attrib={"name" : application})
879         
880         # Initialize the jobs node
881         self.xmljobs = self.xml_file.add_simple_node("jobs")
882         
883         # Initialize the info node (when generated)
884         self.xmlinfos = self.xml_file.add_simple_node("infos", attrib={"name" : "last update", "JobsCommandStatus" : "running"})
885         
886     def update_xml_file(self, l_jobs):      
887         
888         # Update the job names and status node
889         for job in l_jobs:
890             # Find the node corresponding to the job and delete it
891             # in order to recreate it
892             for xmljob in self.xmljobs.findall('job'):
893                 if xmljob.attrib['name'] == job.name:
894                     self.xmljobs.remove(xmljob)
895                 
896             # recreate the job node
897             xmlj = src.xmlManager.add_simple_node(self.xmljobs, "job", attrib={"name" : job.name})
898             src.xmlManager.add_simple_node(xmlj, "host", job.machine.host)
899             src.xmlManager.add_simple_node(xmlj, "port", str(job.machine.port))
900             src.xmlManager.add_simple_node(xmlj, "user", job.machine.user)
901             src.xmlManager.add_simple_node(xmlj, "application", job.application)
902             src.xmlManager.add_simple_node(xmlj, "distribution", job.distribution)
903             src.xmlManager.add_simple_node(xmlj, "timeout", str(job.timeout))
904             src.xmlManager.add_simple_node(xmlj, "commands", " ; ".join(job.commands))
905             src.xmlManager.add_simple_node(xmlj, "state", job.get_status())
906             src.xmlManager.add_simple_node(xmlj, "begin", str(job._T0))
907             src.xmlManager.add_simple_node(xmlj, "end", str(job._Tf))
908             src.xmlManager.add_simple_node(xmlj, "out", src.printcolors.cleancolor(job.out))
909             src.xmlManager.add_simple_node(xmlj, "err", src.printcolors.cleancolor(job.err))
910             if len(job.remote_log_files) > 0:
911                 src.xmlManager.add_simple_node(xmlj, "remote_log_file_path", job.remote_log_files[0])
912             else:
913                 src.xmlManager.add_simple_node(xmlj, "remote_log_file_path", "nothing")
914         # Update the date
915         src.xmlManager.append_node_attrib(self.xmlinfos,
916                     attrib={"value" : 
917                     datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")})
918                
919         # Write the file
920         self.write_xml_file()
921     
922     def last_update(self, finish_status = "finished"):
923         src.xmlManager.append_node_attrib(self.xmlinfos,
924                     attrib={"JobsCommandStatus" : finish_status})
925         # Write the file
926         self.write_xml_file()
927     
928     def write_xml_file(self):
929         self.xml_file.write_tree(self.stylesheet)
930         
931 ##
932 # Describes the command
933 def description():
934     return _("The jobs command launches maintenances that are described"
935              " in the dedicated jobs configuration file.")
936
937 ##
938 # Runs the command.
939 def run(args, runner, logger):
940        
941     (options, args) = parser.parse_args(args)
942        
943     jobs_cfg_files_dir = runner.cfg.SITE.jobs.config_path
944     
945     l_cfg_dir = [jobs_cfg_files_dir, os.path.join(runner.cfg.VARS.datadir, "jobs")]
946     
947     # Make sure the path to the jobs config files directory exists 
948     src.ensure_path_exists(jobs_cfg_files_dir)   
949     
950     # list option : display all the available config files
951     if options.list:
952         for cfg_dir in l_cfg_dir:
953             if not options.no_label:
954                 logger.write("------ %s\n" % 
955                                  src.printcolors.printcHeader(cfg_dir))
956     
957             for f in sorted(os.listdir(cfg_dir)):
958                 if not f.endswith('.pyconf'):
959                     continue
960                 cfilename = f[:-7]
961                 logger.write("%s\n" % cfilename)
962         return 0
963
964     # Make sure the jobs_config option has been called
965     if not options.jobs_cfg:
966         message = _("The option --jobs_config is required\n")      
967         raise src.SatException( message )
968     
969     # Find the file in the directories
970     found = False
971     for cfg_dir in l_cfg_dir:
972         file_jobs_cfg = os.path.join(cfg_dir, options.jobs_cfg)
973         if not file_jobs_cfg.endswith('.pyconf'):
974             file_jobs_cfg += '.pyconf'
975         
976         if not os.path.exists(file_jobs_cfg):
977             continue
978         else:
979             found = True
980             break
981     
982     if not found:
983         msg = _("The file configuration %(name_file)s was not found."
984                 "\nUse the --list option to get the possible files.")
985         src.printcolors.printcError(msg)
986         return 1
987     
988     info = [
989         (_("Platform"), runner.cfg.VARS.dist),
990         (_("File containing the jobs configuration"), file_jobs_cfg)
991     ]    
992     src.print_info(logger, info)
993     
994     # Read the config that is in the file
995     config_jobs = src.read_config_from_a_file(file_jobs_cfg)
996     if options.only_jobs:
997         l_jb = src.pyconf.Sequence()
998         for jb in config_jobs.jobs:
999             if jb.name in options.only_jobs:
1000                 l_jb.append(jb,
1001                 "Adding a job that was given in only_jobs option parameters")
1002         config_jobs.jobs = l_jb
1003               
1004     # Initialization
1005     today_jobs = Jobs(runner, logger, options.jobs_cfg, file_jobs_cfg, config_jobs)
1006     # SSH connection to all machines
1007     today_jobs.ssh_connection_all_machines()
1008     if options.test_connection:
1009         return 0
1010     
1011     gui = None
1012     if options.publish:
1013         gui = Gui("/export/home/serioja/LOGS/test.xml", today_jobs.ljobs, "job_report.xsl")
1014     
1015     today_jobs.gui = gui
1016     
1017     interruped = False
1018     try:
1019         # Run all the jobs contained in config_jobs
1020         today_jobs.run_jobs()
1021     except KeyboardInterrupt:
1022         interruped = True
1023         logger.write("\n\n%s\n\n" % 
1024                 (src.printcolors.printcWarning(_("Forced interruption"))), 1)
1025         
1026     finally:
1027         # find the potential not finished jobs and kill them
1028         for jb in today_jobs.ljobs:
1029             if not jb.has_finished():
1030                 jb.kill_remote_process()
1031         if interruped:
1032             today_jobs.gui.last_update(_("Forced interruption"))
1033         else:
1034             today_jobs.gui.last_update()
1035         # Output the results
1036         today_jobs.write_all_results()