Salome HOME
fix sat test for python2-3 compatibility
[tools/sat.git] / src / test_module.py
1 #!/usr/bin/env python
2 #-*- coding:utf-8 -*-
3 #  Copyright (C) 2010-2013  CEA/DEN
4 #
5 #  This library is free software; you can redistribute it and/or
6 #  modify it under the terms of the GNU Lesser General Public
7 #  License as published by the Free Software Foundation; either
8 #  version 2.1 of the License.
9 #
10 #  This library is distributed in the hope that it will be useful,
11 #  but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 #  Lesser General Public License for more details.
14 #
15 #  You should have received a copy of the GNU Lesser General Public
16 #  License along with this library; if not, write to the Free Software
17 #  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
18
19 # Python 2/3 compatibility for execfile function
20 try:
21     execfile
22 except:
23     def execfile(somefile, global_vars, local_vars):
24         with open(somefile) as f:
25             code = compile(f.read(), somefile, 'exec')
26             exec(code, global_vars, local_vars)
27
28 import os
29 import sys
30 import datetime
31 import shutil
32 import string
33 import imp
34 import subprocess
35 import glob
36 import pprint as PP
37
38 verbose = False # cvw TODO
39
40 from . import fork
41 import src
42
43 # directories not considered as test grids
44 C_IGNORE_GRIDS = ['.git', '.svn', 'RESSOURCES']
45
46 DEFAULT_TIMEOUT = 150
47
48 # Get directory to be used for the temporary files.
49 #
50 def getTmpDirDEFAULT():
51     if src.architecture.is_windows():
52         directory = os.getenv("TEMP")
53     else:
54         # for Linux: use /tmp/logs/{user} folder
55         directory = os.path.join( '/tmp', 'logs', os.getenv("USER", "unknown"))
56     return directory
57
58 class Test:
59     def __init__(self,
60                  config,
61                  logger,
62                  tmp_working_dir,
63                  testbase="",
64                  grids=None,
65                  sessions=None,
66                  launcher="",
67                  show_desktop=True):
68         self.grids = grids
69         self.config = config
70         self.logger = logger
71         self.tmp_working_dir = tmp_working_dir
72         self.sessions = sessions
73         self.launcher = launcher
74         self.show_desktop = show_desktop
75
76         res = self.prepare_testbase(testbase)
77         self.test_base_found = True
78         if res == 1:
79             # Fail
80             self.test_base_found = False
81         
82         self.settings = {}
83         self.known_errors = None
84
85         # create section for results
86         self.config.TESTS = src.pyconf.Sequence(self.config)
87
88         self.nb_run = 0
89         self.nb_succeed = 0
90         self.nb_timeout = 0
91         self.nb_not_run = 0
92         self.nb_acknoledge = 0
93
94     def _copy_dir(self, source, target):
95         if self.config.VARS.python >= "2.6":
96             shutil.copytree(source, target,
97                             symlinks=True,
98                             ignore=shutil.ignore_patterns('.git*','.svn*'))
99         else:
100             shutil.copytree(source, target,
101                             symlinks=True)
102
103     def prepare_testbase_from_dir(self, testbase_name, testbase_dir):
104         self.logger.write(_("get test base from dir: %s\n") % \
105                           src.printcolors.printcLabel(testbase_dir), 3)
106         if not os.access(testbase_dir, os.X_OK):
107             raise src.SatException(_("testbase %(name)s (%(dir)s) does not "
108                                      "exist ...\n") % { 'name': testbase_name,
109                                                        'dir': testbase_dir })
110
111         self._copy_dir(testbase_dir,
112                        os.path.join(self.tmp_working_dir, 'BASES', testbase_name))
113
114     def prepare_testbase_from_git(self,
115                                   testbase_name,
116                                   testbase_base,
117                                   testbase_tag):
118         self.logger.write(
119             _("get test base '%(testbase)s' with '%(tag)s' tag from git\n") % {
120                         "testbase" : src.printcolors.printcLabel(testbase_name),
121                         "tag" : src.printcolors.printcLabel(testbase_tag)},
122                           3)
123         try:
124             def set_signal(): # pragma: no cover
125                 """see http://bugs.python.org/issue1652"""
126                 import signal
127                 signal.signal(signal.SIGPIPE, signal.SIG_DFL)
128
129             cmd = "git clone --depth 1 %(base)s %(dir)s"
130             cmd += " && cd %(dir)s"
131             if testbase_tag=='master':
132                 cmd += " && git fetch origin %(branch)s"
133             else:
134                 cmd += " && git fetch origin %(branch)s:%(branch)s"
135             cmd += " && git checkout %(branch)s"
136             cmd = cmd % { 'branch': testbase_tag,
137                          'base': testbase_base,
138                          'dir': testbase_name }
139
140             self.logger.write("> %s\n" % cmd, 5)
141             if src.architecture.is_windows():
142                 # preexec_fn not supported on windows platform
143                 res = subprocess.call(cmd,
144                                 cwd=os.path.join(self.tmp_working_dir, 'BASES'),
145                                 shell=True,
146                                 stdout=self.logger.logTxtFile,
147                                 stderr=subprocess.PIPE)
148             else:
149                 res = subprocess.call(cmd,
150                                 cwd=os.path.join(self.tmp_working_dir, 'BASES'),
151                                 shell=True,
152                                 preexec_fn=set_signal,
153                                 stdout=self.logger.logTxtFile,
154                                 stderr=subprocess.PIPE)
155             if res != 0:
156                 raise src.SatException(_("Error: unable to get test base "
157                                          "'%(name)s' from git '%(repo)s'.") % \
158                                        { 'name': testbase_name,
159                                         'repo': testbase_base })
160
161         except OSError:
162             self.logger.error(_("git is not installed. exiting...\n"))
163             sys.exit(0)
164
165     def prepare_testbase_from_svn(self, user, testbase_name, testbase_base):
166         self.logger.write(_("get test base '%s' from svn\n") % \
167                           src.printcolors.printcLabel(testbase_name), 3)
168         try:
169             def set_signal(): # pragma: no cover
170                 """see http://bugs.python.org/issue1652"""
171                 import signal
172                 signal.signal(signal.SIGPIPE, signal.SIG_DFL)
173
174             cmd = "svn checkout --username %(user)s %(base)s %(dir)s"
175             cmd = cmd % { 'user': user,
176                          'base': testbase_base,
177                          'dir': testbase_name }
178             
179             # Get the application environment
180             self.logger.write(_("Set the application environment\n"), 5)
181             env_appli = src.environment.SalomeEnviron(self.config,
182                                       src.environment.Environ(dict(os.environ)))
183             env_appli.set_application_env(self.logger)
184             
185             self.logger.write("> %s\n" % cmd, 5)
186             if src.architecture.is_windows():
187                 # preexec_fn not supported on windows platform
188                 res = subprocess.call(cmd,
189                                 cwd=os.path.join(self.tmp_working_dir, 'BASES'),
190                                 shell=True,
191                                 stdout=self.logger.logTxtFile,
192                                 stderr=subprocess.PIPE)
193             else:
194                 res = subprocess.call(cmd,
195                                 cwd=os.path.join(self.tmp_working_dir, 'BASES'),
196                                 shell=True,
197                                 preexec_fn=set_signal,
198                                 stdout=self.logger.logTxtFile,
199                                 stderr=subprocess.PIPE,
200                                 env=env_appli.environ.environ,)
201
202             if res != 0:
203                 raise src.SatException(_("Error: unable to get test base '%(nam"
204                                          "e)s' from svn '%(repo)s'.") % \
205                                        { 'name': testbase_name,
206                                         'repo': testbase_base })
207
208         except OSError:
209             self.logger.error(_("svn is not installed. exiting...\n"))
210             sys.exit(0)
211
212     ##
213     # Configure tests base.
214     def prepare_testbase(self, test_base_name):
215         src.printcolors.print_value(self.logger,
216                                     _("Test base"),
217                                     test_base_name,
218                                     3)
219         self.logger.write("\n", 3, False)
220
221         # search for the test base
222         test_base_info = None
223         for project_name in self.config.PROJECTS.projects:
224             project_info = self.config.PROJECTS.projects[project_name]
225             if "test_bases" not in project_info:
226                 continue
227             for t_b_info in project_info.test_bases:
228                 if t_b_info.name == test_base_name:
229                     test_base_info = t_b_info
230         
231         if not test_base_info:
232             if os.path.exists(test_base_name):
233                 self.prepare_testbase_from_dir("DIR", test_base_name)
234                 self.currentTestBase = "DIR"
235                 return 0
236         
237         if not test_base_info:
238             message = (_("########## ERROR: test base '%s' not found\n") % 
239                        test_base_name)
240             self.logger.write("%s\n" % src.printcolors.printcError(message))
241             return 1
242
243         if test_base_info.get_sources == "dir":
244             self.prepare_testbase_from_dir(test_base_name,
245                                            test_base_info.info.dir)
246         elif test_base_info.get_sources == "git":
247             self.prepare_testbase_from_git(test_base_name,
248                                        test_base_info.info.base,
249                                        self.config.APPLICATION.test_base.tag)
250         elif test_base_info.get_sources == "svn":
251             svn_user = src.get_cfg_param(test_base_info.info,
252                                          "svn_user",
253                                          self.config.USER.svn_user)
254             self.prepare_testbase_from_svn(svn_user,
255                                        test_base_name,
256                                        test_base_info.info.base)
257         else:
258             raise src.SatException(_("unknown source type '%(type)s' for test b"
259                                      "ase '%(base)s' ...\n") % {
260                                         'type': test_base_info.get_sources,
261                                         'base': test_base_name })
262
263         self.currentTestBase = test_base_name
264
265     ##
266     # Searches if the script is declared in known errors pyconf.
267     # Update the status if needed.
268     def search_known_errors(self, status, test_grid, test_session, test):
269         test_path = os.path.join(test_grid, test_session, test)
270         if not src.config_has_application(self.config):
271             return status, []
272
273         if self.known_errors is None:
274             return status, []
275
276         platform = self.config.VARS.arch
277         application = self.config.VARS.application
278         error = self.known_errors.get_error(test_path, application, platform)
279         if error is None:
280             return status, []
281         
282         if status == src.OK_STATUS:
283             if not error.fixed:
284                 # the error is fixed
285                 self.known_errors.fix_error(error)
286                 #import testerror
287                 #testerror.write_test_failures(
288                 #                        self.config.TOOLS.testerror.file_path,
289                 #                        self.known_errors.errors)
290             return status, [ error.date,
291                             error.expected,
292                             error.comment,
293                             error.fixed ]
294
295         if error.fixed:
296             self.known_errors.unfix_error(error)
297             #import testerror
298             #testerror.write_test_failures(self.config.TOOLS.testerror.file_path,
299             #                              self.known_errors.errors)
300
301         delta = self.known_errors.get_expecting_days(error)
302         kfres = [ error.date, error.expected, error.comment, error.fixed ]
303         if delta < 0:
304             return src.KO_STATUS, kfres
305         return src.KNOWNFAILURE_STATUS, kfres
306
307     ##
308     # Read the *.result.py files.
309     def read_results(self, listTest, has_timed_out):
310         results = {}
311         for test in listTest:
312             resfile = os.path.join(self.currentDir,
313                                    self.currentgrid,
314                                    self.currentsession,
315                                    test[:-3] + ".result.py")
316
317             # check if <test>.result.py file exists
318             if not os.path.exists(resfile):
319                 results[test] = ["?", -1, "", []]
320             else:
321                 gdic, ldic = {}, {}
322                 if verbose:
323                   print("test script: '%s':\n'%s'\n" % (resfile, open(resfile, 'r').read()))
324
325                 execfile(resfile, gdic, ldic)
326
327                 status = src.TIMEOUT_STATUS
328                 if not has_timed_out:
329                     status = src.KO_STATUS
330
331                 if ldic.has_key('status'):
332                     status = ldic['status']
333
334                 expected = []
335                 if status == src.KO_STATUS or status == src.OK_STATUS:
336                     status, expected = self.search_known_errors(status,
337                                                             self.currentgrid,
338                                                             self.currentsession,
339                                                             test)
340
341                 callback = ""
342                 if ldic.has_key('callback'):
343                     callback = ldic['callback']
344                 elif status == src.KO_STATUS:
345                     callback = "CRASH"
346                     if verbose:
347                       print("--- CRASH ldic\n%s" % PP.pformat(ldic)) # cvw TODO
348                       print("--- CRASH gdic\n%s" %  PP.pformat(gdic))
349                       pass
350
351                 exec_time = -1
352                 if ldic.has_key('time'):
353                     try:
354                         exec_time = float(ldic['time'])
355                     except:
356                         pass
357
358                 results[test] = [status, exec_time, callback, expected]
359             
360             # check if <test>.py file exists
361             testfile = os.path.join(self.currentDir,
362                                    self.currentgrid,
363                                    self.currentsession,
364                                    test)
365             
366             if not os.path.exists(testfile):
367                 results[test].append('')
368             else:
369                 text = open(testfile, "r").read()
370                 results[test].append(text)
371
372             # check if <test>.out.py file exists
373             outfile = os.path.join(self.currentDir,
374                                    self.currentgrid,
375                                    self.currentsession,
376                                    test[:-3] + ".out.py")
377             
378             if not os.path.exists(outfile):
379                 results[test].append('')
380             else:
381                 text = open(outfile, "r").read()
382                 results[test].append(text)
383
384         return results
385
386     ##
387     # Generates the script to be run by Salome.
388     # This python script includes init and close statements and a loop
389     # calling all the scripts of a single directory.
390     def generate_script(self, listTest, script_path, ignoreList):
391         # open template file
392         template_file = open(os.path.join(self.config.VARS.srcDir,
393                                           "test",
394                                           "scriptTemplate.py"), 'r')
395         template = string.Template(template_file.read())
396         
397         # create substitution dictionary
398         d = dict()
399         d['resourcesWay'] = os.path.join(self.currentDir, 'RESSOURCES')
400         d['tmpDir'] = os.path.join(self.tmp_working_dir, 'WORK')
401         d['toolsWay'] = os.path.join(self.config.VARS.srcDir, "test")
402         d['sessionDir'] = os.path.join(self.currentDir,
403                                     self.currentgrid,
404                                     self.currentsession)
405         d['resultFile'] = os.path.join(self.tmp_working_dir,
406                                        'WORK',
407                                        'exec_result')
408         d['listTest'] = listTest
409         d['sessionName'] = self.currentsession
410         d['ignore'] = ignoreList
411
412         # create script with template
413         contents = template.safe_substitute(d)
414         if verbose: print("generate_script '%s':\n%s" % (script_path, contents)) # cvw TODO
415         with open(script_path, 'w') as f:
416           f.write(contents)
417
418
419     # Find the getTmpDir function that gives access to *pidict file directory.
420     # (the *pidict file exists when SALOME is launched) 
421     def get_tmp_dir(self):
422         # Rare case where there is no KERNEL in grid list 
423         # (for example MED_STANDALONE)
424         if ('APPLICATION' in self.config 
425                 and 'KERNEL' not in self.config.APPLICATION.products 
426                 and 'KERNEL_ROOT_DIR' not in os.environ):
427             return getTmpDirDEFAULT
428         
429         # Case where "sat test" is launched in an existing SALOME environment
430         if 'KERNEL_ROOT_DIR' in os.environ:
431             root_dir =  os.environ['KERNEL_ROOT_DIR']
432         
433         if ('APPLICATION' in self.config 
434                 and 'KERNEL' in self.config.APPLICATION.products):
435             root_dir = src.product.get_product_config(self.config,
436                                                       "KERNEL").install_dir
437
438         # Case where there the appli option is called (with path to launcher)
439         if len(self.launcher) > 0:
440             # There are two cases : The old application (runAppli) 
441             # and the new one
442             launcherName = os.path.basename(self.launcher)
443             launcherDir = os.path.dirname(self.launcher)
444             if launcherName == 'runAppli':
445                 # Old application
446                 cmd = ("for i in " + launcherDir + "/env.d/*.sh; do source ${i};"
447                        " done ; echo $KERNEL_ROOT_DIR")
448             else:
449                 # New application
450                 cmd = ("echo -e 'import os\nprint os.environ[\"KERNEL_" + 
451                        "ROOT_DIR\"]' > tmpscript.py; %s shell" + 
452                        " tmpscript.py") % self.launcher
453
454             # OP 14/11/2017 Ajout de traces pour essayer de decouvrir le pb
455             #               de remontee de log des tests
456             #root_dir = subprocess.Popen(cmd,
457             #                stdout=subprocess.PIPE,
458             #                shell=True,
459             #                executable='/bin/bash').communicate()[0].split()[-1]
460             # OP Add Windows case
461             if src.architecture.is_windows():
462                 subproc_res = subprocess.Popen(cmd,
463                             stdout=subprocess.PIPE,
464                             shell=True).communicate()
465                 pass
466             else:
467                 subproc_res = subprocess.Popen(cmd,
468                             stdout=subprocess.PIPE,
469                             shell=True,
470                             executable='/bin/bash').communicate()
471                 pass
472             #print "TRACES OP - test_module.py/Test.get_tmp_dir() subproc_res = "
473             #for resLine in subproc_res:
474             #    print "- '#%s#'" %resLine
475             
476             root_dir = subproc_res[0].split()[-1]
477
478         # OP 14/11/2017 Ajout de traces pour essayer de decouvrir le pb
479         #               de remontee de log des tests
480         #print "TRACES OP - test_module.py/Test.get_tmp_dir() root_dir = '#%s#'" %root_dir
481         
482         # import grid salome_utils from KERNEL that gives 
483         # the right getTmpDir function
484         (file_, pathname, description) = imp.find_module("salome_utils",
485                                                          [os.path.join(root_dir,
486                                                                     'bin',
487                                                                     'salome')])
488         try:
489             grid = imp.load_module("salome_utils",
490                                      file_,
491                                      pathname,
492                                      description)
493             return grid.getLogDir
494         except:
495             grid = imp.load_module("salome_utils",
496                                      file_,
497                                      pathname,
498                                      description)
499             return grid.getTmpDir
500         finally:
501             if file_:
502                 file_.close()
503
504
505     def get_test_timeout(self, test_name, default_value):
506         if ("timeout" in self.settings and 
507                 test_name in self.settings["timeout"]):
508             return self.settings["timeout"][test_name]
509
510         return default_value
511
512     def generate_launching_commands(self):
513         # Case where "sat test" is launched in an existing SALOME environment
514         if 'KERNEL_ROOT_DIR' in os.environ:
515             binSalome = "runSalome"
516             binPython = "python"
517             killSalome = "killSalome.py"
518         
519         # Rare case where there is no KERNEL in grid list 
520         # (for example MED_STANDALONE)
521         if ('APPLICATION' in self.config and 
522                 'KERNEL' not in self.config.APPLICATION.products):
523             binSalome = "runSalome"
524             binPython = "python" 
525             killSalome = "killSalome.py"   
526             src.environment.load_environment(self.config, False, self.logger)         
527             return binSalome, binPython, killSalome
528         
529         # Case where there the appli option is called (with path to launcher)
530         if len(self.launcher) > 0:
531             # There are two cases : The old application (runAppli) 
532             # and the new one
533             launcherName = os.path.basename(self.launcher)
534             launcherDir = os.path.dirname(self.launcher)
535             if launcherName == 'runAppli':
536                 # Old application
537                 binSalome = self.launcher
538                 binPython = ("for i in " +
539                              launcherDir +
540                              "/env.d/*.sh; do source ${i}; done ; python")
541                 killSalome = ("for i in " +
542                         launcherDir +
543                         "/env.d/*.sh; do source ${i}; done ; killSalome.py'")
544                 return binSalome, binPython, killSalome
545             else:
546                 # New application
547                 binSalome = self.launcher
548                 binPython = self.launcher + ' shell'
549                 killSalome = self.launcher + ' killall'
550                 return binSalome, binPython, killSalome
551
552         # SALOME version detection and APPLI repository detection
553         VersionSalome = src.get_salome_version(self.config)
554         appdir = 'APPLI'
555         if "APPLI" in self.config and "application_name" in self.config.APPLI:
556             appdir = self.config.APPLI.application_name
557         
558         # Case where SALOME has NOT the launcher that uses the SalomeContext API
559         if VersionSalome < 730:
560             binSalome = os.path.join(self.config.APPLICATION.workdir,
561                                      appdir,
562                                      "runAppli")
563             binPython = "python"
564             killSalome = "killSalome.py"
565             src.environment.load_environment(self.config, False, self.logger)           
566             return binSalome, binPython, killSalome
567         
568         # Case where SALOME has the launcher that uses the SalomeContext API
569         else:            
570             launcher_name = src.get_launcher_name(self.config)
571             binSalome = os.path.join(self.config.APPLICATION.workdir,
572                                      launcher_name)
573             
574             binPython = binSalome + ' shell'
575             killSalome = binSalome + ' killall'
576             return binSalome, binPython, killSalome
577                 
578         return binSalome, binPython, killSalome
579         
580
581     ##
582     # Runs tests of a session (using a single instance of Salome).
583     def run_tests(self, listTest, ignoreList):
584         out_path = os.path.join(self.currentDir,
585                                 self.currentgrid,
586                                 self.currentsession)
587         if verbose: print("run_tests '%s'\nlistTest: %s\nignoreList: %s" %
588                    (self.currentDir, PP.pformat(listTest), PP.pformat(ignoreList))) # cvw TODO
589         sessionname = "%s/%s" % (self.currentgrid, self.currentsession)
590         time_out = self.get_test_timeout(sessionname,
591                                          DEFAULT_TIMEOUT)
592
593         time_out_salome = DEFAULT_TIMEOUT
594
595         # generate wrapper script
596         script_path = os.path.join(out_path, 'wrapperScript.py')
597         self.generate_script(listTest, script_path, ignoreList)
598
599         tmpDir = self.get_tmp_dir()
600
601         binSalome, binPython, killSalome = self.generate_launching_commands()
602         if self.settings.has_key("run_with_grids") and \
603            self.settings["run_with_grids"].has_key(sessionname):
604             binSalome = (binSalome + " -m %s" % self.settings["run_with_grids"][sessionname])
605
606         logWay = os.path.join(self.tmp_working_dir, "WORK", "log_cxx")
607
608         status = False
609         elapsed = -1
610         if self.currentsession.startswith("NOGUI_"):
611             # runSalome -t (bash)
612             status, elapsed = fork.batch(
613                                 binSalome,
614                                 self.logger,
615                                 os.path.join(self.tmp_working_dir, "WORK"),
616                                 [ "-t", "--shutdown-server=1", script_path ],
617                                 delai=time_out,
618                                 log=logWay)
619
620         elif self.currentsession.startswith("PY_"):
621             # python script.py
622             status, elapsed = fork.batch(
623                                 binPython,
624                                 self.logger,
625                                 os.path.join(self.tmp_working_dir, "WORK"),
626                                 [script_path],
627                                 delai=time_out,
628                                 log=logWay)
629
630         else:
631             opt = "-z 0"
632             if self.show_desktop: opt = "--show-desktop=0"
633             status, elapsed = fork.batch_salome(
634                                 binSalome,
635                                 self.logger,
636                                 os.path.join( self.tmp_working_dir, "WORK"),
637                                 [ opt, "--shutdown-server=1", script_path ],
638                                 getTmpDir=tmpDir,
639                                 fin=killSalome,
640                                 delai=time_out,
641                                 log=logWay,
642                                 delaiapp=time_out_salome)
643
644         self.logger.write("status = %s, elapsed = %s\n" % (status, elapsed),
645                           5)
646
647         # create the test result to add in the config object
648         test_info = src.pyconf.Mapping(self.config)
649         test_info.testbase = self.currentTestBase
650         test_info.grid = self.currentgrid
651         test_info.session = self.currentsession
652         test_info.script = src.pyconf.Sequence(self.config)
653
654         script_results = self.read_results(listTest, elapsed == time_out)
655         for sr in sorted(script_results.keys()):
656             self.nb_run += 1
657
658             # create script result
659             script_info = src.pyconf.Mapping(self.config)
660             script_info.name = sr
661             script_info.res = script_results[sr][0]
662             script_info.time = script_results[sr][1]
663             if script_info.res == src.TIMEOUT_STATUS:
664                 script_info.time = time_out
665             if script_info.time < 1e-3: script_info.time = 0
666
667             callback = script_results[sr][2]
668             if script_info.res != src.OK_STATUS and len(callback) > 0:
669                 script_info.callback = callback
670
671             kfres = script_results[sr][3]
672             if len(kfres) > 0:
673                 script_info.known_error = src.pyconf.Mapping(self.config)
674                 script_info.known_error.date = kfres[0]
675                 script_info.known_error.expected = kfres[1]
676                 script_info.known_error.comment = kfres[2]
677                 script_info.known_error.fixed = kfres[3]
678             
679             script_info.content = script_results[sr][4]
680             script_info.out = script_results[sr][5]
681             
682             # add it to the list of results
683             test_info.script.append(script_info, '')
684
685             # display the results
686             if script_info.time > 0:
687                 exectime = "(%7.3f s)" % script_info.time
688             else:
689                 exectime = ""
690
691             sp = "." * (35 - len(script_info.name))
692             self.logger.write(self.write_test_margin(3), 3)
693             self.logger.write("script %s %s %s %s\n" % (
694                                 src.printcolors.printcLabel(script_info.name),
695                                 sp,
696                                 src.printcolors.printc(script_info.res),
697                                 exectime), 3, False)
698             if script_info and len(callback) > 0:
699                 self.logger.write("Exception in %s\n%s\n" % \
700                     (script_info.name,
701                      src.printcolors.printcWarning(callback)), 2, False)
702
703             if script_info.res == src.OK_STATUS:
704                 self.nb_succeed += 1
705             elif script_info.res == src.KNOWNFAILURE_STATUS:
706                 self.nb_acknoledge += 1
707             elif script_info.res == src.TIMEOUT_STATUS:
708                 self.nb_timeout += 1
709             elif script_info.res == src.NA_STATUS:
710                 self.nb_run -= 1
711             elif script_info.res == "?":
712                 self.nb_not_run += 1
713                 
714
715         self.config.TESTS.append(test_info, '')
716
717     ##
718     # Runs all tests of a session.
719     def run_session_tests(self):
720        
721         self.logger.write(self.write_test_margin(2), 3)
722         self.logger.write("Session = %s\n" % src.printcolors.printcLabel(
723                                                     self.currentsession), 3, False)
724
725         # prepare list of tests to run
726         tests = os.listdir(os.path.join(self.currentDir,
727                                         self.currentgrid,
728                                         self.currentsession))
729         tests = filter(lambda l: l.endswith(".py"), tests)
730         tests = sorted(tests, key=str.lower)
731
732         # build list of known failures
733         cat = "%s/%s/" % (self.currentgrid, self.currentsession)
734         ignoreDict = {}
735         for k in self.ignore_tests.keys():
736             if k.startswith(cat):
737                 ignoreDict[k[len(cat):]] = self.ignore_tests[k]
738
739         self.run_tests(tests, ignoreDict)
740
741     ##
742     # Runs all tests of a grid.
743     def run_grid_tests(self):
744         self.logger.write(self.write_test_margin(1), 3)
745         self.logger.write("grid = %s\n" % src.printcolors.printcLabel(
746                                                 self.currentgrid), 3, False)
747
748         grid_path = os.path.join(self.currentDir, self.currentgrid)
749
750         sessions = []
751         if self.sessions is not None:
752             sessions = self.sessions # user choice
753         else:
754             # use all scripts in grid
755             sessions = filter(lambda l: l not in C_IGNORE_GRIDS,
756                            os.listdir(grid_path))
757             sessions = filter(lambda l: os.path.isdir(os.path.join(grid_path,
758                                                                 l)), sessions)
759
760         sessions = sorted(sessions, key=str.lower)
761         existingSessions = self.getSubDirectories(grid_path)
762         for session_ in sessions:
763             if not os.path.exists(os.path.join(grid_path, session_)):
764                 self.logger.write(self.write_test_margin(2), 3)
765                 msg = """\
766 Session '%s' not found
767 Existing sessions are:
768 %s
769 """ % (session_, PP.pformat(sorted(existingSessions)))
770                 self.logger.write(src.printcolors.printcWarning(msg), 3, False)
771             else:
772                 self.currentsession = session_
773                 self.run_session_tests()
774
775     def getSubDirectories(self, aDir):
776         """
777         get names of first level of sub directories in aDir
778         excluding '.git' etc as beginning with '.'
779         """
780         res = os.listdir(aDir)
781         res = [d for d in res if os.path.isdir(os.path.join(aDir, d)) and d[0] != '.']
782         # print("getSubDirectories %s are:\n%s" % (aDir, PP.pformat(res)))
783         return res
784
785     ##
786     # Runs test testbase.
787     def run_testbase_tests(self):
788         res_dir = os.path.join(self.currentDir, "RESSOURCES")
789         os.environ['PYTHONPATH'] =  (res_dir + 
790                                      os.pathsep + 
791                                      os.environ['PYTHONPATH'])
792         os.environ['TT_BASE_RESSOURCES'] = res_dir
793         src.printcolors.print_value(self.logger,
794                                     "TT_BASE_RESSOURCES",
795                                     res_dir,
796                                     4)
797         self.logger.write("\n", 4, False)
798
799         self.logger.write(self.write_test_margin(0), 3)
800         testbase_label = "Test base = %s\n" % src.printcolors.printcLabel(
801                                                         self.currentTestBase)
802         self.logger.write(testbase_label, 3, False)
803         self.logger.write("-" * len(src.printcolors.cleancolor(testbase_label)),
804                           3)
805         self.logger.write("\n", 3, False)
806
807         # load settings
808         settings_file = os.path.join(res_dir, "test_settings.py")
809         if os.path.exists(settings_file):
810             gdic, ldic = {}, {}
811             execfile(settings_file, gdic, ldic)
812             self.logger.write("Load test settings '%s'\n" % settings_file, 5)
813             self.settings = ldic['settings_dic']
814             self.ignore_tests = ldic['known_failures_list']
815             if isinstance(self.ignore_tests, list):
816                 self.ignore_tests = {}
817                 self.logger.write(src.printcolors.printcWarning(
818                   "known_failures_list must be a dictionary (not a list)") + "\n", 1, False)
819         else:
820             self.ignore_tests = {}
821             self.settings.clear()
822
823         # read known failures pyconf
824         if "testerror" in self.config.LOCAL:
825             #import testerror
826             #self.known_errors = testerror.read_test_failures(
827             #                            self.config.TOOLS.testerror.file_path,
828             #                            do_error=False)
829             pass
830         else:
831             self.known_errors = None
832
833         if self.grids is not None:
834             grids = self.grids # given by user
835         else:
836             # select all the grids (i.e. directories) in the directory
837             grids = filter(lambda l: l not in C_IGNORE_GRIDS,
838                              os.listdir(self.currentDir))
839             grids = filter(lambda l: os.path.isdir(
840                                         os.path.join(self.currentDir, l)),
841                              grids)
842
843         grids = sorted(grids, key=str.lower)
844         existingGrids = self.getSubDirectories(self.currentDir)
845         for grid in grids:
846             if not os.path.exists(os.path.join(self.currentDir, grid)):
847                 self.logger.write(self.write_test_margin(1), 3)
848                 msg = """\
849 Grid '%s' does not exist
850 Existing grids are:
851 %s
852 """ % (grid, PP.pformat(sorted(existingGrids)))
853                 self.logger.write(src.printcolors.printcWarning(msg), 3, False)
854             else:
855                 self.currentgrid = grid
856                 self.run_grid_tests()
857
858     def run_script(self, script_name):
859         if ('APPLICATION' in self.config and 
860                 script_name in self.config.APPLICATION):
861             script = self.config.APPLICATION[script_name]
862             if len(script) == 0:
863                 return
864
865             self.logger.write("\n", 2, False)
866             if not os.path.exists(script):
867                 self.logger.write(src.printcolors.printcWarning("WARNING: scrip"
868                                         "t not found: %s" % script) + "\n", 2)
869             else:
870                 self.logger.write(src.printcolors.printcHeader("----------- sta"
871                                             "rt %s" % script_name) + "\n", 2)
872                 self.logger.write("Run script: %s\n" % script, 2)
873                 subprocess.Popen(script, shell=True).wait()
874                 self.logger.write(src.printcolors.printcHeader("----------- end"
875                                                 " %s" % script_name) + "\n", 2)
876
877     def run_all_tests(self):
878         initTime = datetime.datetime.now()
879
880         self.run_script('test_setup')
881         self.logger.write("\n", 2, False)
882
883         self.logger.write(src.printcolors.printcHeader(
884                                             _("=== STARTING TESTS")) + "\n", 2)
885         self.logger.write("\n", 2, False)
886         self.currentDir = os.path.join(self.tmp_working_dir,
887                                        'BASES',
888                                        self.currentTestBase)
889         self.run_testbase_tests()
890
891         # calculate total execution time
892         totalTime = datetime.datetime.now() - initTime
893         totalTime -= datetime.timedelta(microseconds=totalTime.microseconds)
894         self.logger.write("\n", 2, False)
895         self.logger.write(src.printcolors.printcHeader(_("=== END TESTS")), 2)
896         self.logger.write(" %s\n" % src.printcolors.printcInfo(str(totalTime)),
897                           2,
898                           False)
899
900         #
901         # Start the tests
902         #
903         self.run_script('test_cleanup')
904         self.logger.write("\n", 2, False)
905
906         # evaluate results
907
908         res_out = _("Tests Results: %(succeed)d / %(total)d\n") % \
909             { 'succeed': self.nb_succeed, 'total': self.nb_run }
910         if self.nb_succeed == self.nb_run:
911             res_out = src.printcolors.printcSuccess(res_out)
912         else:
913             res_out = src.printcolors.printcError(res_out)
914         self.logger.write(res_out, 1)
915
916         if self.nb_timeout > 0:
917             self.logger.write(_("%d tests TIMEOUT\n") % self.nb_timeout, 1)
918         if self.nb_not_run > 0:
919             self.logger.write(_("%d tests not executed\n") % self.nb_not_run, 1)
920         if self.nb_acknoledge > 0:
921             self.logger.write(_("%d tests known failures\n") % self.nb_acknoledge, 1)
922
923         status = src.OK_STATUS
924         if self.nb_run - self.nb_succeed - self.nb_acknoledge > 0:
925             status = src.KO_STATUS
926         elif self.nb_acknoledge:
927             status = src.KNOWNFAILURE_STATUS
928         
929         self.logger.write(_("Status: %s\n" % status), 3)
930
931         return self.nb_run - self.nb_succeed - self.nb_acknoledge
932
933     ##
934     # Write margin to show test results.
935     def write_test_margin(self, tab):
936         if tab == 0:
937             return ""
938         return "|   " * (tab - 1) + "+ "
939