Salome HOME
sat jobs: add the jobs that will not be ran today in the xml matrix
[tools/sat.git] / commands / jobs.py
1 #!/usr/bin/env python
2 #-*- coding:utf-8 -*-
3 #  Copyright (C) 2010-2013  CEA/DEN
4 #
5 #  This library is free software; you can redistribute it and/or
6 #  modify it under the terms of the GNU Lesser General Public
7 #  License as published by the Free Software Foundation; either
8 #  version 2.1 of the License.
9 #
10 #  This library is distributed in the hope that it will be useful,
11 #  but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 #  Lesser General Public License for more details.
14 #
15 #  You should have received a copy of the GNU Lesser General Public
16 #  License along with this library; if not, write to the Free Software
17 #  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
18
19 import os
20 import datetime
21 import time
22 import paramiko
23
24 import src
25
26
27 parser = src.options.Options()
28
29 parser.add_option('j', 'jobs_config', 'string', 'jobs_cfg', 
30                   _('The name of the config file that contains'
31                   ' the jobs configuration'))
32 parser.add_option('o', 'only_jobs', 'list2', 'only_jobs',
33                   _('The list of jobs to launch, by their name. '))
34 parser.add_option('l', 'list', 'boolean', 'list', 
35                   _('list all available config files.'))
36 parser.add_option('n', 'no_label', 'boolean', 'no_label',
37                   _("do not print labels, Works only with --list."), False)
38 parser.add_option('t', 'test_connection', 'boolean', 'test_connection',
39                   _("Try to connect to the machines. Not executing the jobs."),
40                   False)
41 parser.add_option('p', 'publish', 'boolean', 'publish',
42                   _("Generate an xml file that can be read in a browser to "
43                     "display the jobs status."),
44                   False)
45
46 class machine(object):
47     '''Class to manage a ssh connection on a machine
48     '''
49     def __init__(self, name, host, user, port=22, passwd=None, sat_path="salomeTools"):
50         self.name = name
51         self.host = host
52         self.port = port
53         self.user = user
54         self.password = passwd
55         self.sat_path = sat_path
56         self.ssh = paramiko.SSHClient()
57         self._connection_successful = None
58     
59     def connect(self, logger):
60         '''Initiate the ssh connection to the remote machine
61         
62         :param logger src.logger.Logger: The logger instance 
63         :return: Nothing
64         :rtype: N\A
65         '''
66
67         self._connection_successful = False
68         self.ssh.load_system_host_keys()
69         self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
70         try:
71             self.ssh.connect(self.host,
72                              port=self.port,
73                              username=self.user,
74                              password = self.password)
75         except paramiko.AuthenticationException:
76             message = src.KO_STATUS + _("Authentication failed")
77         except paramiko.BadHostKeyException:
78             message = (src.KO_STATUS + 
79                        _("The server's host key could not be verified"))
80         except paramiko.SSHException:
81             message = ( _("SSHException error connecting or "
82                           "establishing an SSH session"))            
83         except:
84             message = ( _("Error connecting or establishing an SSH session"))
85         else:
86             self._connection_successful = True
87             message = ""
88         return message
89     
90     def successfully_connected(self, logger):
91         '''Verify if the connection to the remote machine has succeed
92         
93         :param logger src.logger.Logger: The logger instance 
94         :return: True if the connection has succeed, False if not
95         :rtype: bool
96         '''
97         if self._connection_successful == None:
98             message = "Warning : trying to ask if the connection to "
99             "(host: %s, port: %s, user: %s) is OK whereas there were"
100             " no connection request" % \
101             (machine.host, machine.port, machine.user)
102             logger.write( src.printcolors.printcWarning(message))
103         return self._connection_successful
104
105     def copy_sat(self, sat_local_path, job_file):
106         '''Copy salomeTools to the remote machine in self.sat_path
107         '''
108         res = 0
109         try:
110             self.sftp = self.ssh.open_sftp()
111             self.mkdir(self.sat_path, ignore_existing=True)
112             self.put_dir(sat_local_path, self.sat_path, filters = ['.git'])
113             job_file_name = os.path.basename(job_file)
114             self.sftp.put(job_file, os.path.join(self.sat_path,
115                                                  "data",
116                                                  "jobs",
117                                                  job_file_name))
118         except Exception as e:
119             res = str(e)
120             self._connection_successful = False
121         
122         return res
123         
124     def put_dir(self, source, target, filters = []):
125         ''' Uploads the contents of the source directory to the target path. The
126             target directory needs to exists. All subdirectories in source are 
127             created under target.
128         '''
129         for item in os.listdir(source):
130             if item in filters:
131                 continue
132             source_path = os.path.join(source, item)
133             destination_path = os.path.join(target, item)
134             if os.path.islink(source_path):
135                 linkto = os.readlink(source_path)
136                 try:
137                     self.sftp.symlink(linkto, destination_path)
138                     self.sftp.chmod(destination_path,
139                                     os.stat(source_path).st_mode)
140                 except IOError:
141                     pass
142             else:
143                 if os.path.isfile(source_path):
144                     self.sftp.put(source_path, destination_path)
145                     self.sftp.chmod(destination_path,
146                                     os.stat(source_path).st_mode)
147                 else:
148                     self.mkdir(destination_path, ignore_existing=True)
149                     self.put_dir(source_path, destination_path)
150
151     def mkdir(self, path, mode=511, ignore_existing=False):
152         ''' Augments mkdir by adding an option to not fail 
153             if the folder exists 
154         '''
155         try:
156             self.sftp.mkdir(path, mode)
157         except IOError:
158             if ignore_existing:
159                 pass
160             else:
161                 raise       
162     
163     def exec_command(self, command, logger):
164         '''Execute the command on the remote machine
165         
166         :param command str: The command to be run
167         :param logger src.logger.Logger: The logger instance 
168         :return: the stdin, stdout, and stderr of the executing command,
169                  as a 3-tuple
170         :rtype: (paramiko.channel.ChannelFile, paramiko.channel.ChannelFile,
171                 paramiko.channel.ChannelFile)
172         '''
173         try:        
174             # Does not wait the end of the command
175             (stdin, stdout, stderr) = self.ssh.exec_command(command)
176         except paramiko.SSHException:
177             message = src.KO_STATUS + _(
178                             ": the server failed to execute the command\n")
179             logger.write( src.printcolors.printcError(message))
180             return (None, None, None)
181         except:
182             logger.write( src.printcolors.printcError(src.KO_STATUS + '\n'))
183             return (None, None, None)
184         else:
185             return (stdin, stdout, stderr)
186
187     def close(self):
188         '''Close the ssh connection
189         
190         :rtype: N\A
191         '''
192         self.ssh.close()
193      
194     def write_info(self, logger):
195         '''Prints the informations relative to the machine in the logger 
196            (terminal traces and log file)
197         
198         :param logger src.logger.Logger: The logger instance
199         :return: Nothing
200         :rtype: N\A
201         '''
202         logger.write("host : " + self.host + "\n")
203         logger.write("port : " + str(self.port) + "\n")
204         logger.write("user : " + str(self.user) + "\n")
205         if self.successfully_connected(logger):
206             status = src.OK_STATUS
207         else:
208             status = src.KO_STATUS
209         logger.write("Connection : " + status + "\n\n") 
210
211
212 class job(object):
213     '''Class to manage one job
214     '''
215     def __init__(self, name, machine, application, distribution,
216                  commands, timeout, logger, job_file, after=None):
217
218         self.name = name
219         self.machine = machine
220         self.after = after
221         self.timeout = timeout
222         self.application = application
223         self.distribution = distribution
224         self.logger = logger
225         # The list of log files to download from the remote machine 
226         self.remote_log_files = []
227         # The remote command status
228         # -1 means that it has not been launched, 
229         # 0 means success and 1 means fail
230         self.res_job = "-1"
231         self.cancelled = False
232         
233         self._T0 = -1
234         self._Tf = -1
235         self._has_begun = False
236         self._has_finished = False
237         self._has_timouted = False
238         self._stdin = None # Store the command inputs field
239         self._stdout = None # Store the command outputs field
240         self._stderr = None # Store the command errors field
241
242         self.out = None # Contains something only if the job is finished
243         self.err = None # Contains something only if the job is finished    
244                
245         self.commands = commands
246         self.command = (os.path.join(self.machine.sat_path, "sat") +
247                         " -v1 job --jobs_config " +
248                         job_file +
249                         " --job " +
250                         self.name)
251     
252     def get_pids(self):
253         pids = []
254         cmd_pid = 'ps aux | grep "sat -v1 job --jobs_config" | awk \'{print $2}\''
255         (_, out_pid, _) = self.machine.exec_command(cmd_pid, self.logger)
256         pids_cmd = out_pid.readlines()
257         pids_cmd = [str(src.only_numbers(pid)) for pid in pids_cmd]
258         pids+=pids_cmd
259         return pids
260     
261     def kill_remote_process(self):
262         '''Kills the process on the remote machine.
263         
264         :return: (the output of the kill, the error of the kill)
265         :rtype: (str, str)
266         '''
267         
268         pids = self.get_pids()
269         cmd_kill = " ; ".join([("kill -9 " + pid) for pid in pids])
270         (_, out_kill, err_kill) = self.machine.exec_command(cmd_kill, 
271                                                             self.logger)
272         return (out_kill, err_kill)
273             
274     def has_begun(self):
275         '''Returns True if the job has already begun
276         
277         :return: True if the job has already begun
278         :rtype: bool
279         '''
280         return self._has_begun
281     
282     def has_finished(self):
283         '''Returns True if the job has already finished 
284            (i.e. all the commands have been executed)
285            If it is finished, the outputs are stored in the fields out and err.
286         
287         :return: True if the job has already finished
288         :rtype: bool
289         '''
290         
291         # If the method has already been called and returned True
292         if self._has_finished:
293             return True
294         
295         # If the job has not begun yet
296         if not self.has_begun():
297             return False
298         
299         if self._stdout.channel.closed:
300             self._has_finished = True
301             # Store the result outputs
302             self.out = self._stdout.read()
303             self.err = self._stderr.read()
304             # Put end time
305             self._Tf = time.time()
306             # And get the remote command status and log files
307             self.get_log_files()
308         
309         return self._has_finished
310     
311     def has_failed(self):
312         '''Returns True if the job has failed. 
313            A job is considered as failed if the machine could not be reached,
314            if the remote command failed, 
315            or if the job finished with a time out.
316         
317         :return: True if the job has failed
318         :rtype: bool
319         '''
320         if not self.has_finished():
321             return False
322         if not self.machine.successfully_connected(self.logger):
323             return True
324         if self.is_timeout():
325             return True
326         if self.res_job == "1":
327             return True
328         return False
329     
330     def cancel(self):
331         """In case of a failing job, one has to cancel every job that depend 
332            on it. This method put the job as failed and will not be executed.
333         """
334         self._has_begun = True
335         self._has_finished = True
336         self.cancelled = True
337         self.out = _("This job was not launched because its father has failed.")
338         self.err = _("This job was not launched because its father has failed.")
339     
340     def get_log_files(self):
341         if not self.has_finished():
342             msg = _("Trying to get log files whereas the job is not finished.")
343             self.logger.write(src.printcolors.printcWarning(msg))
344             return
345         out_lines = self.out.split("\n")
346         out_lines = [line for line in out_lines if line != '']
347         self.res_job = out_lines[0]
348         for job_path_remote in out_lines[1:]:
349             if os.path.basename(os.path.dirname(job_path_remote)) != 'OUT':
350                 local_path = os.path.join(os.path.dirname(
351                                                     self.logger.logFilePath),
352                                           os.path.basename(job_path_remote))
353                 if not os.path.exists(local_path):
354                     self.machine.sftp.get(job_path_remote, local_path)
355             else:
356                 local_path = os.path.join(os.path.dirname(
357                                                     self.logger.logFilePath),
358                                           'OUT',
359                                           os.path.basename(job_path_remote))
360                 if not os.path.exists(local_path):
361                     self.machine.sftp.get(job_path_remote, local_path)
362             self.remote_log_files.append(local_path)
363     
364     def is_running(self):
365         '''Returns True if the job commands are running 
366         
367         :return: True if the job is running
368         :rtype: bool
369         '''
370         return self.has_begun() and not self.has_finished()
371
372     def is_timeout(self):
373         '''Returns True if the job commands has finished with timeout 
374         
375         :return: True if the job has finished with timeout
376         :rtype: bool
377         '''
378         return self._has_timouted
379
380     def time_elapsed(self):
381         if not self.has_begun():
382             return -1
383         T_now = time.time()
384         return T_now - self._T0
385     
386     def check_time(self):
387         if not self.has_begun():
388             return
389         if self.time_elapsed() > self.timeout:
390             self._has_finished = True
391             self._has_timouted = True
392             self._Tf = time.time()
393             self.get_pids()
394             (out_kill, _) = self.kill_remote_process()
395             self.out = "TIMEOUT \n" + out_kill.read()
396             self.err = "TIMEOUT : %s seconds elapsed\n" % str(self.timeout)
397     
398     def total_duration(self):
399         return self._Tf - self._T0
400         
401     def run(self, logger):
402         if self.has_begun():
403             print("Warn the user that a job can only be launched one time")
404             return
405         
406         if not self.machine.successfully_connected(logger):
407             self._has_finished = True
408             self.out = "N\A"
409             self.err = ("Connection to machine (name : %s, host: %s, port: %s, user: %s) has failed\nUse the log command to get more information." 
410                         % (self.machine.name, self.machine.host, self.machine.port, self.machine.user))
411         else:
412             self._T0 = time.time()
413             self._stdin, self._stdout, self._stderr = self.machine.exec_command(
414                                                         self.command, logger)
415             if (self._stdin, self._stdout, self._stderr) == (None, None, None):
416                 self._has_finished = True
417                 self._Tf = time.time()
418                 self.out = "N\A"
419                 self.err = "The server failed to execute the command"
420         
421         self._has_begun = True
422     
423     def write_results(self, logger):
424         logger.write("name : " + self.name + "\n")
425         if self.after:
426             logger.write("after : %s\n" % self.after)
427         logger.write("Time elapsed : %4imin %2is \n" % 
428                      (self.total_duration()/60 , self.total_duration()%60))
429         if self._T0 != -1:
430             logger.write("Begin time : %s\n" % 
431                          time.strftime('%Y-%m-%d %H:%M:%S', 
432                                        time.localtime(self._T0)) )
433         if self._Tf != -1:
434             logger.write("End time   : %s\n\n" % 
435                          time.strftime('%Y-%m-%d %H:%M:%S', 
436                                        time.localtime(self._Tf)) )
437         
438         machine_head = "Informations about connection :\n"
439         underline = (len(machine_head) - 2) * "-"
440         logger.write(src.printcolors.printcInfo(machine_head+underline+"\n"))
441         self.machine.write_info(logger)
442         
443         logger.write(src.printcolors.printcInfo("out : \n"))
444         if self.out is None:
445             logger.write("Unable to get output\n")
446         else:
447             logger.write(self.out + "\n")
448         logger.write(src.printcolors.printcInfo("err : \n"))
449         if self.err is None:
450             logger.write("Unable to get error\n")
451         else:
452             logger.write(self.err + "\n")
453         
454     def get_status(self):
455         if not self.machine.successfully_connected(self.logger):
456             return "SSH connection KO"
457         if not self.has_begun():
458             return "Not launched"
459         if self.cancelled:
460             return "Cancelled"
461         if self.is_running():
462             return "running since " + time.strftime('%Y-%m-%d %H:%M:%S',
463                                                     time.localtime(self._T0))        
464         if self.has_finished():
465             if self.is_timeout():
466                 return "Timeout since " + time.strftime('%Y-%m-%d %H:%M:%S',
467                                                     time.localtime(self._Tf))
468             return "Finished since " + time.strftime('%Y-%m-%d %H:%M:%S',
469                                                      time.localtime(self._Tf))
470     
471 class Jobs(object):
472     '''Class to manage the jobs to be run
473     '''
474     def __init__(self,
475                  runner,
476                  logger,
477                  job_file,
478                  job_file_path,
479                  config_jobs,
480                  lenght_columns = 20):
481         # The jobs configuration
482         self.cfg_jobs = config_jobs
483         self.job_file = job_file
484         self.job_file_path = job_file_path
485         # The machine that will be used today
486         self.lmachines = []
487         # The list of machine (hosts, port) that will be used today 
488         # (a same host can have several machine instances since there 
489         # can be several ssh parameters) 
490         self.lhosts = []
491         # The jobs to be launched today 
492         self.ljobs = []
493         # The jobs that will not be launched today
494         self.ljobsdef_not_today = []
495         self.runner = runner
496         self.logger = logger
497         # The correlation dictionary between jobs and machines
498         self.dic_job_machine = {} 
499         self.len_columns = lenght_columns
500         
501         # the list of jobs that have not been run yet
502         self._l_jobs_not_started = []
503         # the list of jobs that have already ran 
504         self._l_jobs_finished = []
505         # the list of jobs that are running 
506         self._l_jobs_running = [] 
507                 
508         self.determine_products_and_machines()
509     
510     def define_job(self, job_def, machine):
511         '''Takes a pyconf job definition and a machine (from class machine)
512            and returns the job instance corresponding to the definition.
513         
514         :param job_def src.config.Mapping: a job definition 
515         :param machine machine: the machine on which the job will run
516         :return: The corresponding job in a job class instance
517         :rtype: job
518         '''
519         name = job_def.name
520         cmmnds = job_def.commands
521         timeout = job_def.timeout
522         after = None
523         if 'after' in job_def:
524             after = job_def.after
525         application = None
526         if 'application' in job_def:
527             application = job_def.application
528         distribution = None
529         if 'distribution' in job_def:
530             distribution = job_def.distribution
531             
532         return job(name,
533                    machine,
534                    application,
535                    distribution,
536                    cmmnds,
537                    timeout,
538                    self.logger,
539                    self.job_file,
540                    after = after)
541     
542     def determine_products_and_machines(self):
543         '''Function that reads the pyconf jobs definition and instantiates all
544            the machines and jobs to be done today.
545
546         :return: Nothing
547         :rtype: N\A
548         '''
549         today = datetime.date.weekday(datetime.date.today())
550         host_list = []
551                
552         for job_def in self.cfg_jobs.jobs :
553             if today in job_def.when:
554                 
555                 name_machine = job_def.machine
556                 
557                 a_machine = None
558                 for mach in self.lmachines:
559                     if mach.name == name_machine:
560                         a_machine = mach
561                         break
562                 
563                 if a_machine == None:
564                     for machine_def in self.cfg_jobs.machines:
565                         if machine_def.name == name_machine:
566                             if 'host' not in machine_def:
567                                 host = self.runner.cfg.VARS.hostname
568                             else:
569                                 host = machine_def.host
570
571                             if 'user' not in machine_def:
572                                 user = self.runner.cfg.VARS.user
573                             else:
574                                 user = machine_def.user
575
576                             if 'port' not in machine_def:
577                                 port = 22
578                             else:
579                                 port = machine_def.port
580                 
581                             if 'password' not in machine_def:
582                                 passwd = None
583                             else:
584                                 passwd = machine_def.password    
585                                 
586                             if 'sat_path' not in machine_def:
587                                 sat_path = "salomeTools"
588                             else:
589                                 sat_path = machine_def.sat_path
590                             
591                             a_machine = machine(
592                                                 machine_def.name,
593                                                 host,
594                                                 user,
595                                                 port=port,
596                                                 passwd=passwd,
597                                                 sat_path=sat_path
598                                                 )
599                             
600                             if (host, port) not in host_list:
601                                 host_list.append((host, port))
602                                              
603                             self.lmachines.append(a_machine)
604                 
605                 if a_machine == None:
606                     msg = _("WARNING: The job \"%(job_name)s\" requires the "
607                             "machine \"%(machine_name)s\" but this machine "
608                             "is not defined in the configuration file.\n"
609                             "The job will not be launched")
610                     self.logger.write(src.printcolors.printcWarning(msg))
611                                   
612                 a_job = self.define_job(job_def, a_machine)
613                 self.dic_job_machine[a_job] = a_machine
614                 
615                 self.ljobs.append(a_job)
616             else: # today in job_def.when
617                 self.ljobsdef_not_today.append(job_def)
618                                      
619         self.lhosts = host_list
620         
621     def ssh_connection_all_machines(self, pad=50):
622         '''Function that do the ssh connection to every machine 
623            to be used today.
624
625         :return: Nothing
626         :rtype: N\A
627         '''
628         self.logger.write(src.printcolors.printcInfo((
629                         "Establishing connection with all the machines :\n")))
630         for machine in self.lmachines:
631             # little algorithm in order to display traces
632             begin_line = (_("Connection to %s: " % machine.name))
633             if pad - len(begin_line) < 0:
634                 endline = " "
635             else:
636                 endline = (pad - len(begin_line)) * "." + " "
637             
638             step = "SSH connection"
639             self.logger.write( begin_line + endline + step)
640             self.logger.flush()
641             # the call to the method that initiate the ssh connection
642             msg = machine.connect(self.logger)
643             
644             # Copy salomeTools to the remote machine
645             if machine.successfully_connected(self.logger):
646                 step = _("Copy SAT")
647                 self.logger.write('\r%s%s%s' % (begin_line, endline, 20 * " "),3)
648                 self.logger.write('\r%s%s%s' % (begin_line, endline, step), 3)
649                 self.logger.flush()
650                 res_copy = machine.copy_sat(self.runner.cfg.VARS.salometoolsway,
651                                             self.job_file_path)
652                 # Print the status of the copy
653                 if res_copy == 0:
654                     self.logger.write('\r%s' % 
655                                 ((len(begin_line)+len(endline)+20) * " "), 3)
656                     self.logger.write('\r%s%s%s' % 
657                         (begin_line, 
658                          endline, 
659                          src.printcolors.printc(src.OK_STATUS)), 3)
660                 else:
661                     self.logger.write('\r%s' % 
662                             ((len(begin_line)+len(endline)+20) * " "), 3)
663                     self.logger.write('\r%s%s%s %s' % 
664                         (begin_line,
665                          endline,
666                          src.printcolors.printc(src.OK_STATUS),
667                          _("Copy of SAT failed")), 3)
668             else:
669                 self.logger.write('\r%s' % 
670                                   ((len(begin_line)+len(endline)+20) * " "), 3)
671                 self.logger.write('\r%s%s%s %s' % 
672                     (begin_line,
673                      endline,
674                      src.printcolors.printc(src.KO_STATUS),
675                      msg), 3)
676             self.logger.write("\n", 3)
677                 
678         self.logger.write("\n")
679         
680
681     def is_occupied(self, hostname):
682         '''Function that returns True if a job is running on 
683            the machine defined by its host and its port.
684         
685         :param hostname (str, int): the pair (host, port)
686         :return: the job that is running on the host, 
687                 or false if there is no job running on the host. 
688         :rtype: job / bool
689         '''
690         host = hostname[0]
691         port = hostname[1]
692         for jb in self.dic_job_machine:
693             if jb.machine.host == host and jb.machine.port == port:
694                 if jb.is_running():
695                     return jb
696         return False
697     
698     def update_jobs_states_list(self):
699         '''Function that updates the lists that store the currently
700            running jobs and the jobs that have already finished.
701         
702         :return: Nothing. 
703         :rtype: N\A
704         '''
705         jobs_finished_list = []
706         jobs_running_list = []
707         for jb in self.dic_job_machine:
708             if jb.is_running():
709                 jobs_running_list.append(jb)
710                 jb.check_time()
711             if jb.has_finished():
712                 jobs_finished_list.append(jb)
713         
714         nb_job_finished_before = len(self._l_jobs_finished)
715         self._l_jobs_finished = jobs_finished_list
716         self._l_jobs_running = jobs_running_list
717         
718         nb_job_finished_now = len(self._l_jobs_finished)
719         
720         return nb_job_finished_now > nb_job_finished_before
721     
722     def cancel_dependencies_of_failing_jobs(self):
723         '''Function that cancels all the jobs that depend on a failing one.
724         
725         :return: Nothing. 
726         :rtype: N\A
727         '''
728         
729         for job in self.ljobs:
730             if job.after is None:
731                 continue
732             father_job = self.find_job_that_has_name(job.after)
733             if father_job.has_failed():
734                 job.cancel()
735     
736     def find_job_that_has_name(self, name):
737         '''Returns the job by its name.
738         
739         :param name str: a job name
740         :return: the job that has the name. 
741         :rtype: job
742         '''
743         for jb in self.ljobs:
744             if jb.name == name:
745                 return jb
746
747         # the following is executed only if the job was not found
748         msg = _('The job "%s" seems to be nonexistent') % name
749         raise src.SatException(msg)
750     
751     def str_of_length(self, text, length):
752         '''Takes a string text of any length and returns 
753            the most close string of length "length".
754         
755         :param text str: any string
756         :param length int: a length for the returned string
757         :return: the most close string of length "length"
758         :rtype: str
759         '''
760         if len(text) > length:
761             text_out = text[:length-3] + '...'
762         else:
763             diff = length - len(text)
764             before = " " * (diff/2)
765             after = " " * (diff/2 + diff%2)
766             text_out = before + text + after
767             
768         return text_out
769     
770     def display_status(self, len_col):
771         '''Takes a lenght and construct the display of the current status 
772            of the jobs in an array that has a column for each host.
773            It displays the job that is currently running on the host 
774            of the column.
775         
776         :param len_col int: the size of the column 
777         :return: Nothing
778         :rtype: N\A
779         '''
780         
781         display_line = ""
782         for host_port in self.lhosts:
783             jb = self.is_occupied(host_port)
784             if not jb: # nothing running on the host
785                 empty = self.str_of_length("empty", len_col)
786                 display_line += "|" + empty 
787             else:
788                 display_line += "|" + src.printcolors.printcInfo(
789                                         self.str_of_length(jb.name, len_col))
790         
791         self.logger.write("\r" + display_line + "|")
792         self.logger.flush()
793     
794
795     def run_jobs(self):
796         '''The main method. Runs all the jobs on every host. 
797            For each host, at a given time, only one job can be running.
798            The jobs that have the field after (that contain the job that has
799            to be run before it) are run after the previous job.
800            This method stops when all the jobs are finished.
801         
802         :return: Nothing
803         :rtype: N\A
804         '''
805
806         # Print header
807         self.logger.write(src.printcolors.printcInfo(
808                                                 _('Executing the jobs :\n')))
809         text_line = ""
810         for host_port in self.lhosts:
811             host = host_port[0]
812             port = host_port[1]
813             if port == 22: # default value
814                 text_line += "|" + self.str_of_length(host, self.len_columns)
815             else:
816                 text_line += "|" + self.str_of_length(
817                                 "("+host+", "+str(port)+")", self.len_columns)
818         
819         tiret_line = " " + "-"*(len(text_line)-1) + "\n"
820         self.logger.write(tiret_line)
821         self.logger.write(text_line + "|\n")
822         self.logger.write(tiret_line)
823         self.logger.flush()
824         
825         # The infinite loop that runs the jobs
826         l_jobs_not_started = self.dic_job_machine.keys()
827         while len(self._l_jobs_finished) != len(self.dic_job_machine.keys()):
828             new_job_start = False
829             for host_port in self.lhosts:
830                 
831                 if self.is_occupied(host_port):
832                     continue
833              
834                 for jb in l_jobs_not_started:
835                     if (jb.machine.host, jb.machine.port) != host_port:
836                         continue 
837                     if jb.after == None:
838                         jb.run(self.logger)
839                         l_jobs_not_started.remove(jb)
840                         new_job_start = True
841                         break
842                     else:
843                         jb_before = self.find_job_that_has_name(jb.after) 
844                         if jb_before.has_finished():
845                             jb.run(self.logger)
846                             l_jobs_not_started.remove(jb)
847                             new_job_start = True
848                             break
849             self.cancel_dependencies_of_failing_jobs()
850             new_job_finished = self.update_jobs_states_list()
851             
852             if new_job_start or new_job_finished:
853                 self.gui.update_xml_file(self.ljobs)            
854                 # Display the current status     
855                 self.display_status(self.len_columns)
856             
857             # Make sure that the proc is not entirely busy
858             time.sleep(0.001)
859         
860         self.logger.write("\n")    
861         self.logger.write(tiret_line)                   
862         self.logger.write("\n\n")
863         
864         self.gui.update_xml_file(self.ljobs)
865         self.gui.last_update()
866
867     def write_all_results(self):
868         '''Display all the jobs outputs.
869         
870         :return: Nothing
871         :rtype: N\A
872         '''
873         
874         for jb in self.dic_job_machine.keys():
875             self.logger.write(src.printcolors.printcLabel(
876                         "#------- Results for job %s -------#\n" % jb.name))
877             jb.write_results(self.logger)
878             self.logger.write("\n\n")
879
880 class Gui(object):
881     '''Class to manage the the xml data that can be displayed in a browser to
882        see the jobs states
883     '''
884     
885     """
886     <?xml version='1.0' encoding='utf-8'?>
887     <?xml-stylesheet type='text/xsl' href='job_report.xsl'?>
888     <JobsReport>
889       <infos>
890         <info name="generated" value="2016-06-02 07:06:45"/>
891       </infos>
892       <hosts>
893           <host name=is221553 port=22 distribution=UB12.04/>
894           <host name=is221560 port=22/>
895           <host name=is221553 port=22 distribution=FD20/>
896       </hosts>
897       <applications>
898           <application name=SALOME-7.8.0/>
899           <application name=SALOME-master/>
900           <application name=MED-STANDALONE-master/>
901           <application name=CORPUS/>
902       </applications>
903       
904       <jobs>
905           <job name="7.8.0 FD22">
906                 <host>is228809</host>
907                 <port>2200</port>
908                 <application>SALOME-7.8.0</application>
909                 <user>adminuser</user>
910                 <timeout>240</timeout>
911                 <commands>
912                     export DISPLAY=is221560
913                     scp -p salome@is221560.intra.cea.fr:/export/home/salome/SALOME-7.7.1p1-src.tgz /local/adminuser         
914                     tar xf /local/adminuser/SALOME-7.7.1p1-src.tgz -C /local/adminuser
915                 </commands>
916                 <state>Not launched</state>
917           </job>
918
919           <job name="master MG05">
920                 <host>is221560</host>
921                 <port>22</port>
922                 <application>SALOME-master</application>
923                 <user>salome</user>
924                 <timeout>240</timeout>
925                 <commands>
926                     export DISPLAY=is221560
927                     scp -p salome@is221560.intra.cea.fr:/export/home/salome/SALOME-7.7.1p1-src.tgz /local/adminuser         
928                     sat prepare SALOME-master
929                     sat compile SALOME-master
930                     sat check SALOME-master
931                     sat launcher SALOME-master
932                     sat test SALOME-master
933                 </commands>
934                 <state>Running since 23 min</state>
935                 <!-- <state>time out</state> -->
936                 <!-- <state>OK</state> -->
937                 <!-- <state>KO</state> -->
938                 <begin>10/05/2016 20h32</begin>
939                 <end>10/05/2016 22h59</end>
940           </job>
941
942       </jobs>
943     </JobsReport>
944     
945     """
946     
947     def __init__(self, xml_file_path, l_jobs, l_jobs_not_today, stylesheet):
948         # The path of the xml file
949         self.xml_file_path = xml_file_path
950         # The stylesheet
951         self.stylesheet = stylesheet
952         # Open the file in a writing stream
953         self.xml_file = src.xmlManager.XmlLogFile(xml_file_path, "JobsReport")
954         # Create the lines and columns
955         self.initialize_array(l_jobs, l_jobs_not_today)
956         # Write the wml file
957         self.update_xml_file(l_jobs)
958     
959     def initialize_array(self, l_jobs, l_jobs_not_today):
960         l_dist = []
961         l_applications = []
962         for job in l_jobs:
963             distrib = job.distribution
964             if distrib is not None and distrib not in l_dist:
965                 l_dist.append(distrib)
966             
967             application = job.application
968             if application is not None and application not in l_applications:
969                 l_applications.append(application)
970         
971         for job_def in l_jobs_not_today:
972             distrib = src.get_cfg_param(job_def, "distribution", "nothing")
973             if distrib is not "nothing" and distrib not in l_dist:
974                 l_dist.append(distrib)
975                 
976             application = src.get_cfg_param(job_def, "application", "nothing")
977             if application is not "nothing" and application not in l_applications:
978                 l_applications.append(application)
979         
980         self.l_dist = l_dist
981         self.l_applications = l_applications
982         
983         # Update the hosts node
984         self.xmldists = self.xml_file.add_simple_node("distributions")
985         for dist_name in self.l_dist:
986             src.xmlManager.add_simple_node(self.xmldists, "dist", attrib={"name" : dist_name})
987             
988         # Update the applications node
989         self.xmlapplications = self.xml_file.add_simple_node("applications")
990         for application in self.l_applications:
991             src.xmlManager.add_simple_node(self.xmlapplications, "application", attrib={"name" : application})
992         
993         # Initialize the jobs node
994         self.xmljobs = self.xml_file.add_simple_node("jobs")
995         
996         # 
997         self.put_jobs_not_today(l_jobs_not_today)
998         
999         # Initialize the info node (when generated)
1000         self.xmlinfos = self.xml_file.add_simple_node("infos", attrib={"name" : "last update", "JobsCommandStatus" : "running"})
1001     
1002     def put_jobs_not_today(self, l_jobs_not_today):
1003         for job_def in l_jobs_not_today:
1004             xmlj = src.xmlManager.add_simple_node(self.xmljobs, "job", attrib={"name" : job_def.name})
1005             src.xmlManager.add_simple_node(xmlj, "application", src.get_cfg_param(job_def, "application", "nothing"))
1006             src.xmlManager.add_simple_node(xmlj, "distribution", src.get_cfg_param(job_def, "distribution", "nothing"))
1007             src.xmlManager.add_simple_node(xmlj, "commands", " ; ".join(job_def.commands))
1008             src.xmlManager.add_simple_node(xmlj, "state", "Not today")        
1009         
1010     def update_xml_file(self, l_jobs):      
1011         
1012         # Update the job names and status node
1013         for job in l_jobs:
1014             # Find the node corresponding to the job and delete it
1015             # in order to recreate it
1016             for xmljob in self.xmljobs.findall('job'):
1017                 if xmljob.attrib['name'] == job.name:
1018                     self.xmljobs.remove(xmljob)
1019             
1020             T0 = str(job._T0)
1021             if T0 != "-1":
1022                 T0 = time.strftime('%Y-%m-%d %H:%M:%S', 
1023                                        time.localtime(job._T0))
1024             Tf = str(job._Tf)
1025             if Tf != "-1":
1026                 Tf = time.strftime('%Y-%m-%d %H:%M:%S', 
1027                                        time.localtime(job._Tf))
1028             
1029             # recreate the job node
1030             xmlj = src.xmlManager.add_simple_node(self.xmljobs, "job", attrib={"name" : job.name})
1031             src.xmlManager.add_simple_node(xmlj, "host", job.machine.host)
1032             src.xmlManager.add_simple_node(xmlj, "port", str(job.machine.port))
1033             src.xmlManager.add_simple_node(xmlj, "user", job.machine.user)
1034             src.xmlManager.add_simple_node(xmlj, "sat_path", job.machine.sat_path)
1035             src.xmlManager.add_simple_node(xmlj, "application", job.application)
1036             src.xmlManager.add_simple_node(xmlj, "distribution", job.distribution)
1037             src.xmlManager.add_simple_node(xmlj, "timeout", str(job.timeout))
1038             src.xmlManager.add_simple_node(xmlj, "commands", " ; ".join(job.commands))
1039             src.xmlManager.add_simple_node(xmlj, "state", job.get_status())
1040             src.xmlManager.add_simple_node(xmlj, "begin", T0)
1041             src.xmlManager.add_simple_node(xmlj, "end", Tf)
1042             src.xmlManager.add_simple_node(xmlj, "out", src.printcolors.cleancolor(job.out))
1043             src.xmlManager.add_simple_node(xmlj, "err", src.printcolors.cleancolor(job.err))
1044             src.xmlManager.add_simple_node(xmlj, "res", str(job.res_job))
1045             if len(job.remote_log_files) > 0:
1046                 src.xmlManager.add_simple_node(xmlj, "remote_log_file_path", job.remote_log_files[0])
1047             else:
1048                 src.xmlManager.add_simple_node(xmlj, "remote_log_file_path", "nothing")           
1049             
1050             xmlafter = src.xmlManager.add_simple_node(xmlj, "after", job.after)
1051             # get the job father
1052             if job.after is not None:
1053                 job_father = None
1054                 for jb in l_jobs:
1055                     if jb.name == job.after:
1056                         job_father = jb
1057                 if job_father is None:
1058                     msg = _("The job %(father_name)s that is parent of "
1059                             "%(son_name)s is not in the job list." %
1060                              {"father_name" : job.after , "son_name" : job.name})
1061                     raise src.SatException(msg)
1062                 
1063                 if len(job_father.remote_log_files) > 0:
1064                     link = job_father.remote_log_files[0]
1065                 else:
1066                     link = "nothing"
1067                 src.xmlManager.append_node_attrib(xmlafter, {"link" : link})
1068             
1069         
1070         # Update the date
1071         src.xmlManager.append_node_attrib(self.xmlinfos,
1072                     attrib={"value" : 
1073                     datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")})
1074                
1075         # Write the file
1076         self.write_xml_file()
1077     
1078     def last_update(self, finish_status = "finished"):
1079         src.xmlManager.append_node_attrib(self.xmlinfos,
1080                     attrib={"JobsCommandStatus" : finish_status})
1081         # Write the file
1082         self.write_xml_file()
1083     
1084     def write_xml_file(self):
1085         self.xml_file.write_tree(self.stylesheet)
1086         
1087 ##
1088 # Describes the command
1089 def description():
1090     return _("The jobs command launches maintenances that are described"
1091              " in the dedicated jobs configuration file.")
1092
1093 ##
1094 # Runs the command.
1095 def run(args, runner, logger):
1096        
1097     (options, args) = parser.parse_args(args)
1098        
1099     jobs_cfg_files_dir = runner.cfg.SITE.jobs.config_path
1100     
1101     l_cfg_dir = [jobs_cfg_files_dir, os.path.join(runner.cfg.VARS.datadir, "jobs")]
1102     
1103     # Make sure the path to the jobs config files directory exists 
1104     src.ensure_path_exists(jobs_cfg_files_dir)   
1105     
1106     # list option : display all the available config files
1107     if options.list:
1108         for cfg_dir in l_cfg_dir:
1109             if not options.no_label:
1110                 logger.write("------ %s\n" % 
1111                                  src.printcolors.printcHeader(cfg_dir))
1112     
1113             for f in sorted(os.listdir(cfg_dir)):
1114                 if not f.endswith('.pyconf'):
1115                     continue
1116                 cfilename = f[:-7]
1117                 logger.write("%s\n" % cfilename)
1118         return 0
1119
1120     # Make sure the jobs_config option has been called
1121     if not options.jobs_cfg:
1122         message = _("The option --jobs_config is required\n")      
1123         raise src.SatException( message )
1124     
1125     # Find the file in the directories
1126     found = False
1127     for cfg_dir in l_cfg_dir:
1128         file_jobs_cfg = os.path.join(cfg_dir, options.jobs_cfg)
1129         if not file_jobs_cfg.endswith('.pyconf'):
1130             file_jobs_cfg += '.pyconf'
1131         
1132         if not os.path.exists(file_jobs_cfg):
1133             continue
1134         else:
1135             found = True
1136             break
1137     
1138     if not found:
1139         msg = _("The file configuration %(name_file)s was not found."
1140                 "\nUse the --list option to get the possible files.")
1141         src.printcolors.printcError(msg)
1142         return 1
1143     
1144     info = [
1145         (_("Platform"), runner.cfg.VARS.dist),
1146         (_("File containing the jobs configuration"), file_jobs_cfg)
1147     ]    
1148     src.print_info(logger, info)
1149     
1150     # Read the config that is in the file
1151     config_jobs = src.read_config_from_a_file(file_jobs_cfg)
1152     if options.only_jobs:
1153         l_jb = src.pyconf.Sequence()
1154         for jb in config_jobs.jobs:
1155             if jb.name in options.only_jobs:
1156                 l_jb.append(jb,
1157                 "Adding a job that was given in only_jobs option parameters")
1158         config_jobs.jobs = l_jb
1159               
1160     # Initialization
1161     today_jobs = Jobs(runner, logger, options.jobs_cfg, file_jobs_cfg, config_jobs)
1162     # SSH connection to all machines
1163     today_jobs.ssh_connection_all_machines()
1164     if options.test_connection:
1165         return 0
1166     
1167     gui = None
1168     if options.publish:
1169         gui = Gui("/export/home/serioja/LOGS/test.xml", today_jobs.ljobs, today_jobs.ljobsdef_not_today, "job_report.xsl")
1170     
1171     today_jobs.gui = gui
1172     
1173     interruped = False
1174     try:
1175         # Run all the jobs contained in config_jobs
1176         today_jobs.run_jobs()
1177     except KeyboardInterrupt:
1178         interruped = True
1179         logger.write("\n\n%s\n\n" % 
1180                 (src.printcolors.printcWarning(_("Forced interruption"))), 1)
1181         
1182     finally:
1183         # find the potential not finished jobs and kill them
1184         for jb in today_jobs.ljobs:
1185             if not jb.has_finished():
1186                 jb.kill_remote_process()
1187         if interruped:
1188             today_jobs.gui.last_update(_("Forced interruption"))
1189         else:
1190             today_jobs.gui.last_update()
1191         # Output the results
1192         today_jobs.write_all_results()