Salome HOME
port jobs command to python3. Get the remote distributions using the ssh in jobs...
[tools/sat.git] / commands / jobs.py
index a62e410f82a3ae6203e07ec866e9c8e2f1834d1f..edbdffc265a83e10abafc8867f81714b9ab32834 100644 (file)
@@ -58,6 +58,7 @@ class Machine(object):
         self.name = name
         self.host = host
         self.port = port
+        self.distribution = None # Will be filled after copying SAT on the machine
         self.user = user
         self.password = passwd
         self.sat_path = sat_path
@@ -115,9 +116,14 @@ class Machine(object):
         '''
         res = 0
         try:
+            # open a sftp connection
             self.sftp = self.ssh.open_sftp()
+            # Create the sat directory on remote machine if it is not existing
             self.mkdir(self.sat_path, ignore_existing=True)
+            # Put sat
             self.put_dir(sat_local_path, self.sat_path, filters = ['.git'])
+            # put the job configuration file in order to make it reachable 
+            # on the remote machine
             job_file_name = os.path.basename(job_file)
             self.sftp.put(job_file, os.path.join(self.sat_path,
                                                  "data",
@@ -220,7 +226,7 @@ class Machine(object):
 class Job(object):
     '''Class to manage one job
     '''
-    def __init__(self, name, machine, application, distribution, table, 
+    def __init__(self, name, machine, application, table, 
                  commands, timeout, config, logger, job_file, after=None):
 
         self.name = name
@@ -228,7 +234,6 @@ class Job(object):
         self.after = after
         self.timeout = timeout
         self.application = application
-        self.distribution = distribution
         self.table = table
         self.config = config
         self.logger = logger
@@ -260,7 +265,7 @@ class Job(object):
                                      "list_log_files.txt") +
                         " job --jobs_config " +
                         job_file +
-                        " --job " +
+                        " --name " +
                         self.name)
     
     def get_pids(self):
@@ -320,8 +325,8 @@ class Job(object):
         if self._stdout.channel.closed:
             self._has_finished = True
             # Store the result outputs
-            self.out = self._stdout.read()
-            self.err = self._stderr.read()
+            self.out = self._stdout.read().decode()
+            self.err = self._stderr.read().decode()
             # Put end time
             self._Tf = time.time()
             # And get the remote command status and log files
@@ -354,7 +359,8 @@ class Job(object):
         os.remove(tmp_file_path)
         # The first line is the result of the command (0 success or 1 fail)
         self.res_job = file_lines[0]
-        for job_path_remote in file_lines[1:]:
+
+        for i, job_path_remote in enumerate(file_lines[1:]):
             try:
                 # For each command, there is two files to get :
                 # 1- The xml file describing the command and giving the 
@@ -366,6 +372,11 @@ class Job(object):
                     local_path = os.path.join(os.path.dirname(
                                                         self.logger.logFilePath),
                                               os.path.basename(job_path_remote))
+                    if i==0: # The first is the job command
+                        self.logger.add_link(os.path.basename(job_path_remote),
+                                             "job",
+                                             self.res_job,
+                                             self.command) 
                 else:
                     # Case 2-
                     local_path = os.path.join(os.path.dirname(
@@ -448,7 +459,7 @@ class Job(object):
             self._Tf = time.time()
             self.get_pids()
             (out_kill, _) = self.kill_remote_process()
-            self.out = "TIMEOUT \n" + out_kill.read()
+            self.out = "TIMEOUT \n" + out_kill.read().decode()
             self.err = "TIMEOUT : %s seconds elapsed\n" % str(self.timeout)
             try:
                 self.get_log_files()
@@ -472,7 +483,8 @@ class Job(object):
             msg = _("Warning: A job can only be launched one time")
             msg2 = _("Trying to launch the job \"%s\" whereas it has "
                      "already been launched." % self.name)
-            self.logger.write(src.printcolors.printcWarning("%s\n%s\n" % (msg, msg2)))
+            self.logger.write(src.printcolors.printcWarning("%s\n%s\n" % (msg,
+                                                                        msg2)))
             return
         
         # Do not execute the command if the machine could not be reached
@@ -508,7 +520,7 @@ class Job(object):
         if self.after:
             self.logger.write("after : %s\n" % self.after)
         self.logger.write("Time elapsed : %4imin %2is \n" % 
-                     (self.total_duration()/60 , self.total_duration()%60))
+                     (self.total_duration()//60 , self.total_duration()%60))
         if self._T0 != -1:
             self.logger.write("Begin time : %s\n" % 
                          time.strftime('%Y-%m-%d %H:%M:%S', 
@@ -583,8 +595,6 @@ class Jobs(object):
         self.ljobs_not_today = []
         self.runner = runner
         self.logger = logger
-        # The correlation dictionary between jobs and machines
-        self.dic_job_machine = {} 
         self.len_columns = lenght_columns
         
         # the list of jobs that have not been run yet
@@ -614,9 +624,6 @@ class Jobs(object):
         application = None
         if 'application' in job_def:
             application = job_def.application
-        distribution = None
-        if 'distribution' in job_def:
-            distribution = job_def.distribution
         table = None
         if 'table' in job_def:
             table = job_def.table
@@ -624,7 +631,6 @@ class Jobs(object):
         return Job(name,
                    machine,
                    application,
-                   distribution,
                    table,
                    cmmnds,
                    timeout,
@@ -707,13 +713,12 @@ class Jobs(object):
                     self.logger.write(src.printcolors.printcWarning(msg))
                                   
             a_job = self.define_job(job_def, a_machine)
-            self.dic_job_machine[a_job] = a_machine
                 
             if today in job_def.when:    
                 self.ljobs.append(a_job)
             else: # today in job_def.when
                 self.ljobs_not_today.append(a_job)
-                                     
+               
         self.lhosts = host_list
         
     def ssh_connection_all_machines(self, pad=50):
@@ -747,6 +752,13 @@ class Jobs(object):
                 self.logger.flush()
                 res_copy = machine.copy_sat(self.runner.cfg.VARS.salometoolsway,
                                             self.job_file_path)
+                # get the remote machine distribution using a sat command
+                (__, out_dist, __) = machine.exec_command(
+                                os.path.join(machine.sat_path,
+                                    "sat config --value VARS.dist --no_label"),
+                                self.logger)
+                machine.distribution = out_dist.read().decode().replace("\n",
+                                                                        "")
                 # Print the status of the copy
                 if res_copy == 0:
                     self.logger.write('\r%s' % 
@@ -787,7 +799,7 @@ class Jobs(object):
         '''
         host = hostname[0]
         port = hostname[1]
-        for jb in self.dic_job_machine:
+        for jb in self.ljobs:
             if jb.machine.host == host and jb.machine.port == port:
                 if jb.is_running():
                     return jb
@@ -802,7 +814,7 @@ class Jobs(object):
         '''
         jobs_finished_list = []
         jobs_running_list = []
-        for jb in self.dic_job_machine:
+        for jb in self.ljobs:
             if jb.is_running():
                 jobs_running_list.append(jb)
                 jb.check_time()
@@ -841,7 +853,6 @@ class Jobs(object):
         for jb in self.ljobs:
             if jb.name == name:
                 return jb
-
         # the following is executed only if the job was not found
         msg = _('The job "%s" seems to be nonexistent') % name
         raise src.SatException(msg)
@@ -859,8 +870,8 @@ class Jobs(object):
             text_out = text[:length-3] + '...'
         else:
             diff = length - len(text)
-            before = " " * (diff/2)
-            after = " " * (diff/2 + diff%2)
+            before = " " * (diff//2)
+            after = " " * (diff//2 + diff%2)
             text_out = before + text + after
             
         return text_out
@@ -921,8 +932,8 @@ class Jobs(object):
         self.logger.flush()
         
         # The infinite loop that runs the jobs
-        l_jobs_not_started = self.dic_job_machine.keys()
-        while len(self._l_jobs_finished) != len(self.dic_job_machine.keys()):
+        l_jobs_not_started = src.deepcopy_list(self.ljobs)
+        while len(self._l_jobs_finished) != len(self.ljobs):
             new_job_start = False
             for host_port in self.lhosts:
                 
@@ -969,7 +980,7 @@ class Jobs(object):
         :rtype: N\A
         '''
         
-        for jb in self.dic_job_machine.keys():
+        for jb in self.ljobs:
             self.logger.write(src.printcolors.printcLabel(
                         "#------- Results for job %s -------#\n" % jb.name))
             jb.write_results()
@@ -1037,7 +1048,7 @@ class Gui(object):
             if (job.machine.host, job.machine.port) not in l_hosts_ports:
                 l_hosts_ports.append((job.machine.host, job.machine.port))
                 
-            distrib = job.distribution
+            distrib = job.machine.distribution
             application = job.application
             
             table_job = job.table
@@ -1053,9 +1064,11 @@ class Gui(object):
                                                    attrib={"name" : distrib})
                     
                 if table_job == table:
-                    if application is not None and application not in d_application[table]:
+                    if (application is not None and 
+                                    application not in d_application[table]):
                         d_application[table].append(application)
-                        src.xmlManager.add_simple_node(self.d_xml_table_files[table].xmlroot.find('applications'),
+                        src.xmlManager.add_simple_node(
+                            self.d_xml_table_files[table].xmlroot.find('applications'),
                                                    "application",
                                                    attrib={"name" : application})
 
@@ -1068,13 +1081,15 @@ class Gui(object):
                                            attrib={"name" : host_port})
         
         # Initialize the jobs node in all files
-        for xml_file in [self.xml_global_file] + self.d_xml_table_files.values():
+        for xml_file in [self.xml_global_file] + list(self.d_xml_table_files.values()):
             xml_jobs = xml_file.add_simple_node("jobs")      
-            # Get the jobs present in the config file but that will not be launched
-            # today
+            # Get the jobs present in the config file but 
+            # that will not be launched today
             self.put_jobs_not_today(l_jobs_not_today, xml_jobs)
             
-            xml_file.add_simple_node("infos", attrib={"name" : "last update", "JobsCommandStatus" : "running"})
+            xml_file.add_simple_node("infos",
+                                     attrib={"name" : "last update",
+                                             "JobsCommandStatus" : "running"})
 
     
     def put_jobs_not_today(self, l_jobs_not_today, xml_node_jobs):
@@ -1091,7 +1106,7 @@ class Gui(object):
             src.xmlManager.add_simple_node(xmlj, "application", job.application)
             src.xmlManager.add_simple_node(xmlj,
                                            "distribution",
-                                           job.distribution)
+                                           job.machine.distribution)
             src.xmlManager.add_simple_node(xmlj, "table", job.table)
             src.xmlManager.add_simple_node(xmlj,
                                        "commands", " ; ".join(job.commands))
@@ -1108,7 +1123,7 @@ class Gui(object):
 
         :param l_jobs List: the list of jobs that run today
         '''
-        for xml_file in [self.xml_global_file] + self.d_xml_table_files.values():
+        for xml_file in [self.xml_global_file] + list(self.d_xml_table_files.values()):
             self.update_xml_file(l_jobs, xml_file)
             
         # Write the file
@@ -1151,7 +1166,7 @@ class Gui(object):
                                            job.machine.sat_path)
             src.xmlManager.add_simple_node(xmlj, "application", job.application)
             src.xmlManager.add_simple_node(xmlj, "distribution",
-                                           job.distribution)
+                                           job.machine.distribution)
             src.xmlManager.add_simple_node(xmlj, "table", job.table)
             src.xmlManager.add_simple_node(xmlj, "timeout", str(job.timeout))
             src.xmlManager.add_simple_node(xmlj, "commands",
@@ -1207,7 +1222,7 @@ class Gui(object):
         :param l_jobs List: the list of jobs that run today
         :param xml_file xmlManager.XmlLogFile: the xml instance to update
         '''
-        for xml_file in [self.xml_global_file] + self.d_xml_table_files.values():
+        for xml_file in [self.xml_global_file] + list(self.d_xml_table_files.values()):
             xml_node_infos = xml_file.xmlroot.find('infos')
             src.xmlManager.append_node_attrib(xml_node_infos,
                         attrib={"JobsCommandStatus" : finish_status})
@@ -1309,7 +1324,7 @@ def run(args, runner, logger):
     
     gui = None
     if options.publish:
-        gui = Gui("/export/home/serioja/LOGS",
+        gui = Gui(runner.cfg.SITE.log.log_dir,
                   today_jobs.ljobs,
                   today_jobs.ljobs_not_today,)