]> SALOME platform Git repositories - tools/sat.git/blob - commands/jobs.py
Salome HOME
e9b8f50e09d5fe15c7f9f2f0dde57308812a47b4
[tools/sat.git] / commands / jobs.py
1 #!/usr/bin/env python
2 #-*- coding:utf-8 -*-
3 #  Copyright (C) 2010-2013  CEA/DEN
4 #
5 #  This library is free software; you can redistribute it and/or
6 #  modify it under the terms of the GNU Lesser General Public
7 #  License as published by the Free Software Foundation; either
8 #  version 2.1 of the License.
9 #
10 #  This library is distributed in the hope that it will be useful,
11 #  but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 #  Lesser General Public License for more details.
14 #
15 #  You should have received a copy of the GNU Lesser General Public
16 #  License along with this library; if not, write to the Free Software
17 #  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
18
19 import os
20 import datetime
21 import time
22 import paramiko
23
24 import src
25
26
27 parser = src.options.Options()
28
29 parser.add_option('j', 'jobs_config', 'string', 'jobs_cfg', 
30                   _('The name of the config file that contains'
31                   ' the jobs configuration'))
32 parser.add_option('o', 'only_jobs', 'list2', 'only_jobs',
33                   _('The list of jobs to launch, by their name. '))
34 parser.add_option('l', 'list', 'boolean', 'list', 
35                   _('list all available config files.'))
36 parser.add_option('n', 'no_label', 'boolean', 'no_label',
37                   _("do not print labels, Works only with --list."), False)
38 parser.add_option('t', 'test_connection', 'boolean', 'test_connection',
39                   _("Try to connect to the machines. Not executing the jobs."),
40                   False)
41 parser.add_option('p', 'publish', 'boolean', 'publish',
42                   _("Generate an xml file that can be read in a browser to "
43                     "display the jobs status."),
44                   False)
45
46 class machine(object):
47     '''Class to manage a ssh connection on a machine
48     '''
49     def __init__(self, name, host, user, port=22, passwd=None, sat_path="salomeTools"):
50         self.name = name
51         self.host = host
52         self.port = port
53         self.user = user
54         self.password = passwd
55         self.sat_path = sat_path
56         self.ssh = paramiko.SSHClient()
57         self._connection_successful = None
58     
59     def connect(self, logger):
60         '''Initiate the ssh connection to the remote machine
61         
62         :param logger src.logger.Logger: The logger instance 
63         :return: Nothing
64         :rtype: N\A
65         '''
66
67         self._connection_successful = False
68         self.ssh.load_system_host_keys()
69         self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
70         try:
71             self.ssh.connect(self.host,
72                              port=self.port,
73                              username=self.user,
74                              password = self.password)
75         except paramiko.AuthenticationException:
76             message = src.KO_STATUS + _("Authentication failed")
77         except paramiko.BadHostKeyException:
78             message = (src.KO_STATUS + 
79                        _("The server's host key could not be verified"))
80         except paramiko.SSHException:
81             message = ( _("SSHException error connecting or "
82                           "establishing an SSH session"))            
83         except:
84             message = ( _("Error connecting or establishing an SSH session"))
85         else:
86             self._connection_successful = True
87             message = ""
88         return message
89     
90     def successfully_connected(self, logger):
91         '''Verify if the connection to the remote machine has succeed
92         
93         :param logger src.logger.Logger: The logger instance 
94         :return: True if the connection has succeed, False if not
95         :rtype: bool
96         '''
97         if self._connection_successful == None:
98             message = "Warning : trying to ask if the connection to "
99             "(host: %s, port: %s, user: %s) is OK whereas there were"
100             " no connection request" % \
101             (machine.host, machine.port, machine.user)
102             logger.write( src.printcolors.printcWarning(message))
103         return self._connection_successful
104
105     def copy_sat(self, sat_local_path, job_file):
106         '''Copy salomeTools to the remote machine in self.sat_path
107         '''
108         res = 0
109         try:
110             self.sftp = self.ssh.open_sftp()
111             self.mkdir(self.sat_path, ignore_existing=True)
112             self.put_dir(sat_local_path, self.sat_path, filters = ['.git'])
113             job_file_name = os.path.basename(job_file)
114             self.sftp.put(job_file, os.path.join(self.sat_path,
115                                                  "data",
116                                                  "jobs",
117                                                  job_file_name))
118         except Exception as e:
119             res = str(e)
120             self._connection_successful = False
121         
122         return res
123         
124     def put_dir(self, source, target, filters = []):
125         ''' Uploads the contents of the source directory to the target path. The
126             target directory needs to exists. All subdirectories in source are 
127             created under target.
128         '''
129         for item in os.listdir(source):
130             if item in filters:
131                 continue
132             source_path = os.path.join(source, item)
133             destination_path = os.path.join(target, item)
134             if os.path.islink(source_path):
135                 linkto = os.readlink(source_path)
136                 try:
137                     self.sftp.symlink(linkto, destination_path)
138                     self.sftp.chmod(destination_path,
139                                     os.stat(source_path).st_mode)
140                 except IOError:
141                     pass
142             else:
143                 if os.path.isfile(source_path):
144                     self.sftp.put(source_path, destination_path)
145                     self.sftp.chmod(destination_path,
146                                     os.stat(source_path).st_mode)
147                 else:
148                     self.mkdir(destination_path, ignore_existing=True)
149                     self.put_dir(source_path, destination_path)
150
151     def mkdir(self, path, mode=511, ignore_existing=False):
152         ''' Augments mkdir by adding an option to not fail 
153             if the folder exists 
154         '''
155         try:
156             self.sftp.mkdir(path, mode)
157         except IOError:
158             if ignore_existing:
159                 pass
160             else:
161                 raise       
162     
163     def exec_command(self, command, logger):
164         '''Execute the command on the remote machine
165         
166         :param command str: The command to be run
167         :param logger src.logger.Logger: The logger instance 
168         :return: the stdin, stdout, and stderr of the executing command,
169                  as a 3-tuple
170         :rtype: (paramiko.channel.ChannelFile, paramiko.channel.ChannelFile,
171                 paramiko.channel.ChannelFile)
172         '''
173         try:        
174             # Does not wait the end of the command
175             (stdin, stdout, stderr) = self.ssh.exec_command(command)
176         except paramiko.SSHException:
177             message = src.KO_STATUS + _(
178                             ": the server failed to execute the command\n")
179             logger.write( src.printcolors.printcError(message))
180             return (None, None, None)
181         except:
182             logger.write( src.printcolors.printcError(src.KO_STATUS + '\n'))
183             return (None, None, None)
184         else:
185             return (stdin, stdout, stderr)
186
187     def close(self):
188         '''Close the ssh connection
189         
190         :rtype: N\A
191         '''
192         self.ssh.close()
193      
194     def write_info(self, logger):
195         '''Prints the informations relative to the machine in the logger 
196            (terminal traces and log file)
197         
198         :param logger src.logger.Logger: The logger instance
199         :return: Nothing
200         :rtype: N\A
201         '''
202         logger.write("host : " + self.host + "\n")
203         logger.write("port : " + str(self.port) + "\n")
204         logger.write("user : " + str(self.user) + "\n")
205         if self.successfully_connected(logger):
206             status = src.OK_STATUS
207         else:
208             status = src.KO_STATUS
209         logger.write("Connection : " + status + "\n\n") 
210
211
212 class job(object):
213     '''Class to manage one job
214     '''
215     def __init__(self, name, machine, application, distribution,
216                  commands, timeout, logger, job_file, after=None):
217
218         self.name = name
219         self.machine = machine
220         self.after = after
221         self.timeout = timeout
222         self.application = application
223         self.distribution = distribution
224         self.logger = logger
225         # The list of log files to download from the remote machine 
226         self.remote_log_files = []
227         # The remote command status
228         # -1 means that it has not been launched, 
229         # 0 means success and 1 means fail
230         self.res_job = "-1"
231         self.cancelled = False
232         
233         self._T0 = -1
234         self._Tf = -1
235         self._has_begun = False
236         self._has_finished = False
237         self._has_timouted = False
238         self._stdin = None # Store the command inputs field
239         self._stdout = None # Store the command outputs field
240         self._stderr = None # Store the command errors field
241
242         self.out = None # Contains something only if the job is finished
243         self.err = None # Contains something only if the job is finished    
244                
245         self.commands = commands
246         self.command = (os.path.join(self.machine.sat_path, "sat") +
247                         " -v1 job --jobs_config " +
248                         job_file +
249                         " --job " +
250                         self.name)
251     
252     def get_pids(self):
253         pids = []
254         cmd_pid = 'ps aux | grep "sat -v1 job --jobs_config" | awk \'{print $2}\''
255         (_, out_pid, _) = self.machine.exec_command(cmd_pid, self.logger)
256         pids_cmd = out_pid.readlines()
257         pids_cmd = [str(src.only_numbers(pid)) for pid in pids_cmd]
258         pids+=pids_cmd
259         return pids
260     
261     def kill_remote_process(self):
262         '''Kills the process on the remote machine.
263         
264         :return: (the output of the kill, the error of the kill)
265         :rtype: (str, str)
266         '''
267         
268         pids = self.get_pids()
269         cmd_kill = " ; ".join([("kill -9 " + pid) for pid in pids])
270         (_, out_kill, err_kill) = self.machine.exec_command(cmd_kill, 
271                                                             self.logger)
272         return (out_kill, err_kill)
273             
274     def has_begun(self):
275         '''Returns True if the job has already begun
276         
277         :return: True if the job has already begun
278         :rtype: bool
279         '''
280         return self._has_begun
281     
282     def has_finished(self):
283         '''Returns True if the job has already finished 
284            (i.e. all the commands have been executed)
285            If it is finished, the outputs are stored in the fields out and err.
286         
287         :return: True if the job has already finished
288         :rtype: bool
289         '''
290         
291         # If the method has already been called and returned True
292         if self._has_finished:
293             return True
294         
295         # If the job has not begun yet
296         if not self.has_begun():
297             return False
298         
299         if self._stdout.channel.closed:
300             self._has_finished = True
301             # Store the result outputs
302             self.out = self._stdout.read()
303             self.err = self._stderr.read()
304             # Put end time
305             self._Tf = time.time()
306             # And get the remote command status and log files
307             self.get_log_files()
308         
309         return self._has_finished
310     
311     def has_failed(self):
312         '''Returns True if the job has failed. 
313            A job is considered as failed if the machine could not be reached,
314            if the remote command failed, 
315            or if the job finished with a time out.
316         
317         :return: True if the job has failed
318         :rtype: bool
319         '''
320         if not self.has_finished():
321             return False
322         if not self.machine.successfully_connected(self.logger):
323             return True
324         if self.is_timeout():
325             return True
326         if self.res_job == "1":
327             return True
328         return False
329     
330     def cancel(self):
331         """In case of a failing job, one has to cancel every job that depend 
332            on it. This method put the job as failed and will not be executed.
333         """
334         self._has_begun = True
335         self._has_finished = True
336         self.cancelled = True
337         self.out = _("This job was not launched because its father has failed.")
338         self.err = _("This job was not launched because its father has failed.")
339     
340     def get_log_files(self):
341         if not self.has_finished():
342             msg = _("Trying to get log files whereas the job is not finished.")
343             self.logger.write(src.printcolors.printcWarning(msg))
344             return
345         out_lines = self.out.split("\n")
346         out_lines = [line for line in out_lines if line != '']
347         self.res_job = out_lines[0]
348         for job_path_remote in out_lines[1:]:
349             if os.path.basename(os.path.dirname(job_path_remote)) != 'OUT':
350                 local_path = os.path.join(os.path.dirname(
351                                                     self.logger.logFilePath),
352                                           os.path.basename(job_path_remote))
353                 if not os.path.exists(local_path):
354                     self.machine.sftp.get(job_path_remote, local_path)
355             else:
356                 local_path = os.path.join(os.path.dirname(
357                                                     self.logger.logFilePath),
358                                           'OUT',
359                                           os.path.basename(job_path_remote))
360                 if not os.path.exists(local_path):
361                     self.machine.sftp.get(job_path_remote, local_path)
362             self.remote_log_files.append(local_path)
363     
364     def is_running(self):
365         '''Returns True if the job commands are running 
366         
367         :return: True if the job is running
368         :rtype: bool
369         '''
370         return self.has_begun() and not self.has_finished()
371
372     def is_timeout(self):
373         '''Returns True if the job commands has finished with timeout 
374         
375         :return: True if the job has finished with timeout
376         :rtype: bool
377         '''
378         return self._has_timouted
379
380     def time_elapsed(self):
381         if not self.has_begun():
382             return -1
383         T_now = time.time()
384         return T_now - self._T0
385     
386     def check_time(self):
387         if not self.has_begun():
388             return
389         if self.time_elapsed() > self.timeout:
390             self._has_finished = True
391             self._has_timouted = True
392             self._Tf = time.time()
393             self.get_pids()
394             (out_kill, _) = self.kill_remote_process()
395             self.out = "TIMEOUT \n" + out_kill.read()
396             self.err = "TIMEOUT : %s seconds elapsed\n" % str(self.timeout)
397     
398     def total_duration(self):
399         return self._Tf - self._T0
400         
401     def run(self, logger):
402         if self.has_begun():
403             print("Warn the user that a job can only be launched one time")
404             return
405         
406         if not self.machine.successfully_connected(logger):
407             self._has_finished = True
408             self.out = "N\A"
409             self.err = ("Connection to machine (name : %s, host: %s, port: %s, user: %s) has failed\nUse the log command to get more information." 
410                         % (self.machine.name, self.machine.host, self.machine.port, self.machine.user))
411         else:
412             self._T0 = time.time()
413             self._stdin, self._stdout, self._stderr = self.machine.exec_command(
414                                                         self.command, logger)
415             if (self._stdin, self._stdout, self._stderr) == (None, None, None):
416                 self._has_finished = True
417                 self._Tf = time.time()
418                 self.out = "N\A"
419                 self.err = "The server failed to execute the command"
420         
421         self._has_begun = True
422     
423     def write_results(self, logger):
424         logger.write("name : " + self.name + "\n")
425         if self.after:
426             logger.write("after : %s\n" % self.after)
427         logger.write("Time elapsed : %4imin %2is \n" % 
428                      (self.total_duration()/60 , self.total_duration()%60))
429         if self._T0 != -1:
430             logger.write("Begin time : %s\n" % 
431                          time.strftime('%Y-%m-%d %H:%M:%S', 
432                                        time.localtime(self._T0)) )
433         if self._Tf != -1:
434             logger.write("End time   : %s\n\n" % 
435                          time.strftime('%Y-%m-%d %H:%M:%S', 
436                                        time.localtime(self._Tf)) )
437         
438         machine_head = "Informations about connection :\n"
439         underline = (len(machine_head) - 2) * "-"
440         logger.write(src.printcolors.printcInfo(machine_head+underline+"\n"))
441         self.machine.write_info(logger)
442         
443         logger.write(src.printcolors.printcInfo("out : \n"))
444         if self.out is None:
445             logger.write("Unable to get output\n")
446         else:
447             logger.write(self.out + "\n")
448         logger.write(src.printcolors.printcInfo("err : \n"))
449         if self.err is None:
450             logger.write("Unable to get error\n")
451         else:
452             logger.write(self.err + "\n")
453         
454     def get_status(self):
455         if not self.machine.successfully_connected(self.logger):
456             return "SSH connection KO"
457         if not self.has_begun():
458             return "Not launched"
459         if self.cancelled:
460             return "Cancelled"
461         if self.is_running():
462             return "running since " + time.strftime('%Y-%m-%d %H:%M:%S',
463                                                     time.localtime(self._T0))        
464         if self.has_finished():
465             if self.is_timeout():
466                 return "Timeout since " + time.strftime('%Y-%m-%d %H:%M:%S',
467                                                     time.localtime(self._Tf))
468             return "Finished since " + time.strftime('%Y-%m-%d %H:%M:%S',
469                                                      time.localtime(self._Tf))
470     
471 class Jobs(object):
472     '''Class to manage the jobs to be run
473     '''
474     def __init__(self,
475                  runner,
476                  logger,
477                  job_file,
478                  job_file_path,
479                  config_jobs,
480                  lenght_columns = 20):
481         # The jobs configuration
482         self.cfg_jobs = config_jobs
483         self.job_file = job_file
484         self.job_file_path = job_file_path
485         # The machine that will be used today
486         self.lmachines = []
487         # The list of machine (hosts, port) that will be used today 
488         # (a same host can have several machine instances since there 
489         # can be several ssh parameters) 
490         self.lhosts = []
491         # The jobs to be launched today 
492         self.ljobs = []     
493         self.runner = runner
494         self.logger = logger
495         # The correlation dictionary between jobs and machines
496         self.dic_job_machine = {} 
497         self.len_columns = lenght_columns
498         
499         # the list of jobs that have not been run yet
500         self._l_jobs_not_started = []
501         # the list of jobs that have already ran 
502         self._l_jobs_finished = []
503         # the list of jobs that are running 
504         self._l_jobs_running = [] 
505                 
506         self.determine_products_and_machines()
507     
508     def define_job(self, job_def, machine):
509         '''Takes a pyconf job definition and a machine (from class machine)
510            and returns the job instance corresponding to the definition.
511         
512         :param job_def src.config.Mapping: a job definition 
513         :param machine machine: the machine on which the job will run
514         :return: The corresponding job in a job class instance
515         :rtype: job
516         '''
517         name = job_def.name
518         cmmnds = job_def.commands
519         timeout = job_def.timeout
520         after = None
521         if 'after' in job_def:
522             after = job_def.after
523         application = None
524         if 'application' in job_def:
525             application = job_def.application
526         distribution = None
527         if 'distribution' in job_def:
528             distribution = job_def.distribution
529             
530         return job(name,
531                    machine,
532                    application,
533                    distribution,
534                    cmmnds,
535                    timeout,
536                    self.logger,
537                    self.job_file,
538                    after = after)
539     
540     def determine_products_and_machines(self):
541         '''Function that reads the pyconf jobs definition and instantiates all
542            the machines and jobs to be done today.
543
544         :return: Nothing
545         :rtype: N\A
546         '''
547         today = datetime.date.weekday(datetime.date.today())
548         host_list = []
549                
550         for job_def in self.cfg_jobs.jobs :
551             if today in job_def.when:
552                 
553                 name_machine = job_def.machine
554                 
555                 a_machine = None
556                 for mach in self.lmachines:
557                     if mach.name == name_machine:
558                         a_machine = mach
559                         break
560                 
561                 if a_machine == None:
562                     for machine_def in self.cfg_jobs.machines:
563                         if machine_def.name == name_machine:
564                             if 'host' not in machine_def:
565                                 host = self.runner.cfg.VARS.hostname
566                             else:
567                                 host = machine_def.host
568
569                             if 'user' not in machine_def:
570                                 user = self.runner.cfg.VARS.user
571                             else:
572                                 user = machine_def.user
573
574                             if 'port' not in machine_def:
575                                 port = 22
576                             else:
577                                 port = machine_def.port
578                 
579                             if 'password' not in machine_def:
580                                 passwd = None
581                             else:
582                                 passwd = machine_def.password    
583                                 
584                             if 'sat_path' not in machine_def:
585                                 sat_path = "salomeTools"
586                             else:
587                                 sat_path = machine_def.sat_path
588                             
589                             a_machine = machine(
590                                                 machine_def.name,
591                                                 host,
592                                                 user,
593                                                 port=port,
594                                                 passwd=passwd,
595                                                 sat_path=sat_path
596                                                 )
597                             
598                             if (host, port) not in host_list:
599                                 host_list.append((host, port))
600                                              
601                             self.lmachines.append(a_machine)
602                 
603                 if a_machine == None:
604                     msg = _("WARNING: The job \"%(job_name)s\" requires the "
605                             "machine \"%(machine_name)s\" but this machine "
606                             "is not defined in the configuration file.\n"
607                             "The job will not be launched")
608                     self.logger.write(src.printcolors.printcWarning(msg))
609                                   
610                 a_job = self.define_job(job_def, a_machine)
611                 self.dic_job_machine[a_job] = a_machine
612                 
613                 self.ljobs.append(a_job)
614                                      
615         self.lhosts = host_list
616         
617     def ssh_connection_all_machines(self, pad=50):
618         '''Function that do the ssh connection to every machine 
619            to be used today.
620
621         :return: Nothing
622         :rtype: N\A
623         '''
624         self.logger.write(src.printcolors.printcInfo((
625                         "Establishing connection with all the machines :\n")))
626         for machine in self.lmachines:
627             # little algorithm in order to display traces
628             begin_line = (_("Connection to %s: " % machine.name))
629             if pad - len(begin_line) < 0:
630                 endline = " "
631             else:
632                 endline = (pad - len(begin_line)) * "." + " "
633             
634             step = "SSH connection"
635             self.logger.write( begin_line + endline + step)
636             self.logger.flush()
637             # the call to the method that initiate the ssh connection
638             msg = machine.connect(self.logger)
639             
640             # Copy salomeTools to the remote machine
641             if machine.successfully_connected(self.logger):
642                 step = _("Copy SAT")
643                 self.logger.write('\r%s%s%s' % (begin_line, endline, 20 * " "),3)
644                 self.logger.write('\r%s%s%s' % (begin_line, endline, step), 3)
645                 self.logger.flush()
646                 res_copy = machine.copy_sat(self.runner.cfg.VARS.salometoolsway,
647                                             self.job_file_path)
648                 # Print the status of the copy
649                 if res_copy == 0:
650                     self.logger.write('\r%s' % 
651                                 ((len(begin_line)+len(endline)+20) * " "), 3)
652                     self.logger.write('\r%s%s%s' % 
653                         (begin_line, 
654                          endline, 
655                          src.printcolors.printc(src.OK_STATUS)), 3)
656                 else:
657                     self.logger.write('\r%s' % 
658                             ((len(begin_line)+len(endline)+20) * " "), 3)
659                     self.logger.write('\r%s%s%s %s' % 
660                         (begin_line,
661                          endline,
662                          src.printcolors.printc(src.OK_STATUS),
663                          _("Copy of SAT failed")), 3)
664             else:
665                 self.logger.write('\r%s' % 
666                                   ((len(begin_line)+len(endline)+20) * " "), 3)
667                 self.logger.write('\r%s%s%s %s' % 
668                     (begin_line,
669                      endline,
670                      src.printcolors.printc(src.KO_STATUS),
671                      msg), 3)
672             self.logger.write("\n", 3)
673                 
674         self.logger.write("\n")
675         
676
677     def is_occupied(self, hostname):
678         '''Function that returns True if a job is running on 
679            the machine defined by its host and its port.
680         
681         :param hostname (str, int): the pair (host, port)
682         :return: the job that is running on the host, 
683                 or false if there is no job running on the host. 
684         :rtype: job / bool
685         '''
686         host = hostname[0]
687         port = hostname[1]
688         for jb in self.dic_job_machine:
689             if jb.machine.host == host and jb.machine.port == port:
690                 if jb.is_running():
691                     return jb
692         return False
693     
694     def update_jobs_states_list(self):
695         '''Function that updates the lists that store the currently
696            running jobs and the jobs that have already finished.
697         
698         :return: Nothing. 
699         :rtype: N\A
700         '''
701         jobs_finished_list = []
702         jobs_running_list = []
703         for jb in self.dic_job_machine:
704             if jb.is_running():
705                 jobs_running_list.append(jb)
706                 jb.check_time()
707             if jb.has_finished():
708                 jobs_finished_list.append(jb)
709         
710         nb_job_finished_before = len(self._l_jobs_finished)
711         self._l_jobs_finished = jobs_finished_list
712         self._l_jobs_running = jobs_running_list
713         
714         nb_job_finished_now = len(self._l_jobs_finished)
715         
716         return nb_job_finished_now > nb_job_finished_before
717     
718     def cancel_dependencies_of_failing_jobs(self):
719         '''Function that cancels all the jobs that depend on a failing one.
720         
721         :return: Nothing. 
722         :rtype: N\A
723         '''
724         
725         for job in self.ljobs:
726             if job.after is None:
727                 continue
728             father_job = self.find_job_that_has_name(job.after)
729             if father_job.has_failed():
730                 job.cancel()
731     
732     def find_job_that_has_name(self, name):
733         '''Returns the job by its name.
734         
735         :param name str: a job name
736         :return: the job that has the name. 
737         :rtype: job
738         '''
739         for jb in self.ljobs:
740             if jb.name == name:
741                 return jb
742
743         # the following is executed only if the job was not found
744         msg = _('The job "%s" seems to be nonexistent') % name
745         raise src.SatException(msg)
746     
747     def str_of_length(self, text, length):
748         '''Takes a string text of any length and returns 
749            the most close string of length "length".
750         
751         :param text str: any string
752         :param length int: a length for the returned string
753         :return: the most close string of length "length"
754         :rtype: str
755         '''
756         if len(text) > length:
757             text_out = text[:length-3] + '...'
758         else:
759             diff = length - len(text)
760             before = " " * (diff/2)
761             after = " " * (diff/2 + diff%2)
762             text_out = before + text + after
763             
764         return text_out
765     
766     def display_status(self, len_col):
767         '''Takes a lenght and construct the display of the current status 
768            of the jobs in an array that has a column for each host.
769            It displays the job that is currently running on the host 
770            of the column.
771         
772         :param len_col int: the size of the column 
773         :return: Nothing
774         :rtype: N\A
775         '''
776         
777         display_line = ""
778         for host_port in self.lhosts:
779             jb = self.is_occupied(host_port)
780             if not jb: # nothing running on the host
781                 empty = self.str_of_length("empty", len_col)
782                 display_line += "|" + empty 
783             else:
784                 display_line += "|" + src.printcolors.printcInfo(
785                                         self.str_of_length(jb.name, len_col))
786         
787         self.logger.write("\r" + display_line + "|")
788         self.logger.flush()
789     
790
791     def run_jobs(self):
792         '''The main method. Runs all the jobs on every host. 
793            For each host, at a given time, only one job can be running.
794            The jobs that have the field after (that contain the job that has
795            to be run before it) are run after the previous job.
796            This method stops when all the jobs are finished.
797         
798         :return: Nothing
799         :rtype: N\A
800         '''
801
802         # Print header
803         self.logger.write(src.printcolors.printcInfo(
804                                                 _('Executing the jobs :\n')))
805         text_line = ""
806         for host_port in self.lhosts:
807             host = host_port[0]
808             port = host_port[1]
809             if port == 22: # default value
810                 text_line += "|" + self.str_of_length(host, self.len_columns)
811             else:
812                 text_line += "|" + self.str_of_length(
813                                 "("+host+", "+str(port)+")", self.len_columns)
814         
815         tiret_line = " " + "-"*(len(text_line)-1) + "\n"
816         self.logger.write(tiret_line)
817         self.logger.write(text_line + "|\n")
818         self.logger.write(tiret_line)
819         self.logger.flush()
820         
821         # The infinite loop that runs the jobs
822         l_jobs_not_started = self.dic_job_machine.keys()
823         while len(self._l_jobs_finished) != len(self.dic_job_machine.keys()):
824             new_job_start = False
825             for host_port in self.lhosts:
826                 
827                 if self.is_occupied(host_port):
828                     continue
829              
830                 for jb in l_jobs_not_started:
831                     if (jb.machine.host, jb.machine.port) != host_port:
832                         continue 
833                     if jb.after == None:
834                         jb.run(self.logger)
835                         l_jobs_not_started.remove(jb)
836                         new_job_start = True
837                         break
838                     else:
839                         jb_before = self.find_job_that_has_name(jb.after) 
840                         if jb_before.has_finished():
841                             jb.run(self.logger)
842                             l_jobs_not_started.remove(jb)
843                             new_job_start = True
844                             break
845             self.cancel_dependencies_of_failing_jobs()
846             new_job_finished = self.update_jobs_states_list()
847             
848             if new_job_start or new_job_finished:
849                 self.gui.update_xml_file(self.ljobs)            
850                 # Display the current status     
851                 self.display_status(self.len_columns)
852             
853             # Make sure that the proc is not entirely busy
854             time.sleep(0.001)
855         
856         self.logger.write("\n")    
857         self.logger.write(tiret_line)                   
858         self.logger.write("\n\n")
859         
860         self.gui.update_xml_file(self.ljobs)
861         self.gui.last_update()
862
863     def write_all_results(self):
864         '''Display all the jobs outputs.
865         
866         :return: Nothing
867         :rtype: N\A
868         '''
869         
870         for jb in self.dic_job_machine.keys():
871             self.logger.write(src.printcolors.printcLabel(
872                         "#------- Results for job %s -------#\n" % jb.name))
873             jb.write_results(self.logger)
874             self.logger.write("\n\n")
875
876 class Gui(object):
877     '''Class to manage the the xml data that can be displayed in a browser to
878        see the jobs states
879     '''
880     
881     """
882     <?xml version='1.0' encoding='utf-8'?>
883     <?xml-stylesheet type='text/xsl' href='job_report.xsl'?>
884     <JobsReport>
885       <infos>
886         <info name="generated" value="2016-06-02 07:06:45"/>
887       </infos>
888       <hosts>
889           <host name=is221553 port=22 distribution=UB12.04/>
890           <host name=is221560 port=22/>
891           <host name=is221553 port=22 distribution=FD20/>
892       </hosts>
893       <applications>
894           <application name=SALOME-7.8.0/>
895           <application name=SALOME-master/>
896           <application name=MED-STANDALONE-master/>
897           <application name=CORPUS/>
898       </applications>
899       
900       <jobs>
901           <job name="7.8.0 FD22">
902                 <host>is228809</host>
903                 <port>2200</port>
904                 <application>SALOME-7.8.0</application>
905                 <user>adminuser</user>
906                 <timeout>240</timeout>
907                 <commands>
908                     export DISPLAY=is221560
909                     scp -p salome@is221560.intra.cea.fr:/export/home/salome/SALOME-7.7.1p1-src.tgz /local/adminuser         
910                     tar xf /local/adminuser/SALOME-7.7.1p1-src.tgz -C /local/adminuser
911                 </commands>
912                 <state>Not launched</state>
913           </job>
914
915           <job name="master MG05">
916                 <host>is221560</host>
917                 <port>22</port>
918                 <application>SALOME-master</application>
919                 <user>salome</user>
920                 <timeout>240</timeout>
921                 <commands>
922                     export DISPLAY=is221560
923                     scp -p salome@is221560.intra.cea.fr:/export/home/salome/SALOME-7.7.1p1-src.tgz /local/adminuser         
924                     sat prepare SALOME-master
925                     sat compile SALOME-master
926                     sat check SALOME-master
927                     sat launcher SALOME-master
928                     sat test SALOME-master
929                 </commands>
930                 <state>Running since 23 min</state>
931                 <!-- <state>time out</state> -->
932                 <!-- <state>OK</state> -->
933                 <!-- <state>KO</state> -->
934                 <begin>10/05/2016 20h32</begin>
935                 <end>10/05/2016 22h59</end>
936           </job>
937
938       </jobs>
939     </JobsReport>
940     
941     """
942     
943     def __init__(self, xml_file_path, l_jobs, stylesheet):
944         # The path of the xml file
945         self.xml_file_path = xml_file_path
946         # The stylesheet
947         self.stylesheet = stylesheet
948         # Open the file in a writing stream
949         self.xml_file = src.xmlManager.XmlLogFile(xml_file_path, "JobsReport")
950         # Create the lines and columns
951         self.initialize_array(l_jobs)
952         # Write the wml file
953         self.update_xml_file(l_jobs)
954     
955     def initialize_array(self, l_jobs):
956         l_dist = []
957         l_applications = []
958         for job in l_jobs:
959             distrib = job.distribution
960             if distrib not in l_dist:
961                 l_dist.append(distrib)
962             
963             application = job.application
964             if application not in l_applications:
965                 l_applications.append(application)
966                     
967         self.l_dist = l_dist
968         self.l_applications = l_applications
969         
970         # Update the hosts node
971         self.xmldists = self.xml_file.add_simple_node("distributions")
972         for dist_name in self.l_dist:
973             src.xmlManager.add_simple_node(self.xmldists, "dist", attrib={"name" : dist_name})
974             
975         # Update the applications node
976         self.xmlapplications = self.xml_file.add_simple_node("applications")
977         for application in self.l_applications:
978             src.xmlManager.add_simple_node(self.xmlapplications, "application", attrib={"name" : application})
979         
980         # Initialize the jobs node
981         self.xmljobs = self.xml_file.add_simple_node("jobs")
982         
983         # Initialize the info node (when generated)
984         self.xmlinfos = self.xml_file.add_simple_node("infos", attrib={"name" : "last update", "JobsCommandStatus" : "running"})
985         
986     def update_xml_file(self, l_jobs):      
987         
988         # Update the job names and status node
989         for job in l_jobs:
990             # Find the node corresponding to the job and delete it
991             # in order to recreate it
992             for xmljob in self.xmljobs.findall('job'):
993                 if xmljob.attrib['name'] == job.name:
994                     self.xmljobs.remove(xmljob)
995             
996             T0 = str(job._T0)
997             if T0 != "-1":
998                 T0 = time.strftime('%Y-%m-%d %H:%M:%S', 
999                                        time.localtime(job._T0))
1000             Tf = str(job._Tf)
1001             if Tf != "-1":
1002                 Tf = time.strftime('%Y-%m-%d %H:%M:%S', 
1003                                        time.localtime(job._Tf))
1004             
1005             # recreate the job node
1006             xmlj = src.xmlManager.add_simple_node(self.xmljobs, "job", attrib={"name" : job.name})
1007             src.xmlManager.add_simple_node(xmlj, "host", job.machine.host)
1008             src.xmlManager.add_simple_node(xmlj, "port", str(job.machine.port))
1009             src.xmlManager.add_simple_node(xmlj, "user", job.machine.user)
1010             src.xmlManager.add_simple_node(xmlj, "sat_path", job.machine.sat_path)
1011             src.xmlManager.add_simple_node(xmlj, "application", job.application)
1012             src.xmlManager.add_simple_node(xmlj, "distribution", job.distribution)
1013             src.xmlManager.add_simple_node(xmlj, "timeout", str(job.timeout))
1014             src.xmlManager.add_simple_node(xmlj, "commands", " ; ".join(job.commands))
1015             src.xmlManager.add_simple_node(xmlj, "state", job.get_status())
1016             src.xmlManager.add_simple_node(xmlj, "begin", T0)
1017             src.xmlManager.add_simple_node(xmlj, "end", Tf)
1018             src.xmlManager.add_simple_node(xmlj, "out", src.printcolors.cleancolor(job.out))
1019             src.xmlManager.add_simple_node(xmlj, "err", src.printcolors.cleancolor(job.err))
1020             src.xmlManager.add_simple_node(xmlj, "res", str(job.res_job))
1021             if len(job.remote_log_files) > 0:
1022                 src.xmlManager.add_simple_node(xmlj, "remote_log_file_path", job.remote_log_files[0])
1023             else:
1024                 src.xmlManager.add_simple_node(xmlj, "remote_log_file_path", "nothing")           
1025             
1026             xmlafter = src.xmlManager.add_simple_node(xmlj, "after", job.after)
1027             # get the job father
1028             if job.after is not None:
1029                 job_father = None
1030                 for jb in l_jobs:
1031                     if jb.name == job.after:
1032                         job_father = jb
1033                 if job_father is None:
1034                     msg = _("The job %(father_name)s that is parent of "
1035                             "%(son_name)s is not in the job list." %
1036                              {"father_name" : job.after , "son_name" : job.name})
1037                     raise src.SatException(msg)
1038                 
1039                 if len(job_father.remote_log_files) > 0:
1040                     link = job_father.remote_log_files[0]
1041                 else:
1042                     link = "nothing"
1043                 src.xmlManager.append_node_attrib(xmlafter, {"link" : link})
1044             
1045         
1046         # Update the date
1047         src.xmlManager.append_node_attrib(self.xmlinfos,
1048                     attrib={"value" : 
1049                     datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")})
1050                
1051         # Write the file
1052         self.write_xml_file()
1053     
1054     def last_update(self, finish_status = "finished"):
1055         src.xmlManager.append_node_attrib(self.xmlinfos,
1056                     attrib={"JobsCommandStatus" : finish_status})
1057         # Write the file
1058         self.write_xml_file()
1059     
1060     def write_xml_file(self):
1061         self.xml_file.write_tree(self.stylesheet)
1062         
1063 ##
1064 # Describes the command
1065 def description():
1066     return _("The jobs command launches maintenances that are described"
1067              " in the dedicated jobs configuration file.")
1068
1069 ##
1070 # Runs the command.
1071 def run(args, runner, logger):
1072        
1073     (options, args) = parser.parse_args(args)
1074        
1075     jobs_cfg_files_dir = runner.cfg.SITE.jobs.config_path
1076     
1077     l_cfg_dir = [jobs_cfg_files_dir, os.path.join(runner.cfg.VARS.datadir, "jobs")]
1078     
1079     # Make sure the path to the jobs config files directory exists 
1080     src.ensure_path_exists(jobs_cfg_files_dir)   
1081     
1082     # list option : display all the available config files
1083     if options.list:
1084         for cfg_dir in l_cfg_dir:
1085             if not options.no_label:
1086                 logger.write("------ %s\n" % 
1087                                  src.printcolors.printcHeader(cfg_dir))
1088     
1089             for f in sorted(os.listdir(cfg_dir)):
1090                 if not f.endswith('.pyconf'):
1091                     continue
1092                 cfilename = f[:-7]
1093                 logger.write("%s\n" % cfilename)
1094         return 0
1095
1096     # Make sure the jobs_config option has been called
1097     if not options.jobs_cfg:
1098         message = _("The option --jobs_config is required\n")      
1099         raise src.SatException( message )
1100     
1101     # Find the file in the directories
1102     found = False
1103     for cfg_dir in l_cfg_dir:
1104         file_jobs_cfg = os.path.join(cfg_dir, options.jobs_cfg)
1105         if not file_jobs_cfg.endswith('.pyconf'):
1106             file_jobs_cfg += '.pyconf'
1107         
1108         if not os.path.exists(file_jobs_cfg):
1109             continue
1110         else:
1111             found = True
1112             break
1113     
1114     if not found:
1115         msg = _("The file configuration %(name_file)s was not found."
1116                 "\nUse the --list option to get the possible files.")
1117         src.printcolors.printcError(msg)
1118         return 1
1119     
1120     info = [
1121         (_("Platform"), runner.cfg.VARS.dist),
1122         (_("File containing the jobs configuration"), file_jobs_cfg)
1123     ]    
1124     src.print_info(logger, info)
1125     
1126     # Read the config that is in the file
1127     config_jobs = src.read_config_from_a_file(file_jobs_cfg)
1128     if options.only_jobs:
1129         l_jb = src.pyconf.Sequence()
1130         for jb in config_jobs.jobs:
1131             if jb.name in options.only_jobs:
1132                 l_jb.append(jb,
1133                 "Adding a job that was given in only_jobs option parameters")
1134         config_jobs.jobs = l_jb
1135               
1136     # Initialization
1137     today_jobs = Jobs(runner, logger, options.jobs_cfg, file_jobs_cfg, config_jobs)
1138     # SSH connection to all machines
1139     today_jobs.ssh_connection_all_machines()
1140     if options.test_connection:
1141         return 0
1142     
1143     gui = None
1144     if options.publish:
1145         gui = Gui("/export/home/serioja/LOGS/test.xml", today_jobs.ljobs, "job_report.xsl")
1146     
1147     today_jobs.gui = gui
1148     
1149     interruped = False
1150     try:
1151         # Run all the jobs contained in config_jobs
1152         today_jobs.run_jobs()
1153     except KeyboardInterrupt:
1154         interruped = True
1155         logger.write("\n\n%s\n\n" % 
1156                 (src.printcolors.printcWarning(_("Forced interruption"))), 1)
1157         
1158     finally:
1159         # find the potential not finished jobs and kill them
1160         for jb in today_jobs.ljobs:
1161             if not jb.has_finished():
1162                 jb.kill_remote_process()
1163         if interruped:
1164             today_jobs.gui.last_update(_("Forced interruption"))
1165         else:
1166             today_jobs.gui.last_update()
1167         # Output the results
1168         today_jobs.write_all_results()