Salome HOME
spns #40790: on Debian distributions, use dpkg-query instead of apt to check system...
[tools/sat.git] / src / test_module.py
1 #!/usr/bin/env python
2 #-*- coding:utf-8 -*-
3 #  Copyright (C) 2010-2013  CEA/DEN
4 #
5 #  This library is free software; you can redistribute it and/or
6 #  modify it under the terms of the GNU Lesser General Public
7 #  License as published by the Free Software Foundation; either
8 #  version 2.1 of the License.
9 #
10 #  This library is distributed in the hope that it will be useful,
11 #  but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 #  Lesser General Public License for more details.
14 #
15 #  You should have received a copy of the GNU Lesser General Public
16 #  License along with this library; if not, write to the Free Software
17 #  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
18
19 # Python 2/3 compatibility for execfile function
20 try:
21     execfile
22 except:
23     def execfile(somefile, global_vars, local_vars):
24         with open(somefile) as f:
25             code = compile(f.read(), somefile, 'exec')
26             exec(code, global_vars, local_vars)
27
28 import os
29 import sys
30 import datetime
31 import shutil
32 import string
33 import imp
34 import subprocess
35 import glob
36 import pprint as PP
37
38 verbose = False
39
40 from . import fork
41 import src
42 from  src.versionMinorMajorPatch import MinorMajorPatch as MMP
43
44 # directories not considered as test grids
45 C_IGNORE_GRIDS = ['.git', '.svn', 'RESSOURCES']
46
47 DEFAULT_TIMEOUT = 150
48
49 # Get directory to be used for the temporary files.
50 #
51 def getTmpDirDEFAULT():
52     if src.architecture.is_windows():
53         directory = os.getenv("TEMP")
54     else:
55         # for Linux: use /tmp/logs/{user} folder
56         directory = os.path.join( '/tmp', 'logs', os.getenv("USER", "unknown"))
57     return directory
58
59 class Test:
60     def __init__(self,
61                  config,
62                  logger,
63                  tmp_working_dir,
64                  testbase="",
65                  grids=None,
66                  sessions=None,
67                  launcher="",
68                  show_desktop=True):
69         self.grids = grids
70         self.config = config
71         self.logger = logger
72         self.tmp_working_dir = tmp_working_dir
73         self.sessions = sessions
74         self.launcher = launcher
75         self.show_desktop = show_desktop
76
77         res = self.prepare_testbase(testbase)
78         self.test_base_found = True
79         if res == 1:
80             # Fail
81             self.test_base_found = False
82
83         self.settings = {}
84         self.known_errors = None
85
86         # create section for results
87         self.config.TESTS = src.pyconf.Sequence(self.config)
88
89         self.nb_run = 0
90         self.nb_succeed = 0
91         self.nb_timeout = 0
92         self.nb_not_run = 0
93         self.nb_acknoledge = 0
94
95     def _copy_dir(self, source, target):
96         if self.config.VARS.python >= "2.6":
97             shutil.copytree(source, target,
98                             symlinks=True,
99                             ignore=shutil.ignore_patterns('.git*','.svn*'))
100         else:
101             shutil.copytree(source, target,
102                             symlinks=True)
103
104     def prepare_testbase_from_dir(self, testbase_name, testbase_dir):
105         self.logger.write(_("get test base from dir: %s\n") % \
106                           src.printcolors.printcLabel(testbase_dir), 3)
107         if not os.access(testbase_dir, os.X_OK):
108             raise src.SatException(_("testbase %(name)s (%(dir)s) does not "
109                                      "exist ...\n") % { 'name': testbase_name,
110                                                        'dir': testbase_dir })
111
112         self._copy_dir(testbase_dir,
113                        os.path.join(self.tmp_working_dir, 'BASES', testbase_name))
114
115     def prepare_testbase_from_git(self,
116                                   testbase_name,
117                                   testbase_base,
118                                   testbase_tag):
119         self.logger.write(
120             _("get test base '%(testbase)s' with '%(tag)s' tag from git\n") % {
121                         "testbase" : src.printcolors.printcLabel(testbase_name),
122                         "tag" : src.printcolors.printcLabel(testbase_tag)},
123                           3)
124         try:
125             def set_signal(): # pragma: no cover
126                 """see http://bugs.python.org/issue1652"""
127                 import signal
128                 signal.signal(signal.SIGPIPE, signal.SIG_DFL)
129
130             cmd = "git clone --depth 1 %(base)s %(dir)s"
131             cmd += " && cd %(dir)s"
132             if testbase_tag=='master':
133                 cmd += " && git fetch origin %(branch)s"
134             else:
135                 cmd += " && git fetch origin %(branch)s:%(branch)s"
136             cmd += " && git checkout %(branch)s"
137             cmd = cmd % { 'branch': testbase_tag,
138                          'base': testbase_base,
139                          'dir': testbase_name }
140
141             self.logger.write("> %s\n" % cmd, 5)
142             if src.architecture.is_windows():
143                 # preexec_fn not supported on windows platform
144                 res = subprocess.call(cmd,
145                                 cwd=os.path.join(self.tmp_working_dir, 'BASES'),
146                                 shell=True,
147                                 stdout=self.logger.logTxtFile,
148                                 stderr=subprocess.PIPE)
149             else:
150                 res = subprocess.call(cmd,
151                                 cwd=os.path.join(self.tmp_working_dir, 'BASES'),
152                                 shell=True,
153                                 preexec_fn=set_signal,
154                                 stdout=self.logger.logTxtFile,
155                                 stderr=subprocess.PIPE)
156             if res != 0:
157                 raise src.SatException(_("Error: unable to get test base "
158                                          "'%(name)s' from git '%(repo)s'.") % \
159                                        { 'name': testbase_name,
160                                         'repo': testbase_base })
161
162         except OSError:
163             self.logger.error(_("git is not installed. exiting...\n"))
164             sys.exit(0)
165
166     def prepare_testbase_from_svn(self, user, testbase_name, testbase_base):
167         self.logger.write(_("get test base '%s' from svn\n") % \
168                           src.printcolors.printcLabel(testbase_name), 3)
169         try:
170             def set_signal(): # pragma: no cover
171                 """see http://bugs.python.org/issue1652"""
172                 import signal
173                 signal.signal(signal.SIGPIPE, signal.SIG_DFL)
174
175             cmd = "svn checkout --username %(user)s %(base)s %(dir)s"
176             cmd = cmd % { 'user': user,
177                          'base': testbase_base,
178                          'dir': testbase_name }
179
180             # Get the application environment
181             self.logger.write(_("Set the application environment\n"), 5)
182             env_appli = src.environment.SalomeEnviron(self.config,
183                                       src.environment.Environ(dict(os.environ)))
184             env_appli.set_application_env(self.logger)
185
186             self.logger.write("> %s\n" % cmd, 5)
187             if src.architecture.is_windows():
188                 # preexec_fn not supported on windows platform
189                 res = subprocess.call(cmd,
190                                 cwd=os.path.join(self.tmp_working_dir, 'BASES'),
191                                 shell=True,
192                                 stdout=self.logger.logTxtFile,
193                                 stderr=subprocess.PIPE)
194             else:
195                 res = subprocess.call(cmd,
196                                 cwd=os.path.join(self.tmp_working_dir, 'BASES'),
197                                 shell=True,
198                                 preexec_fn=set_signal,
199                                 stdout=self.logger.logTxtFile,
200                                 stderr=subprocess.PIPE,
201                                 env=env_appli.environ.environ,)
202
203             if res != 0:
204                 raise src.SatException(_("Error: unable to get test base '%(nam"
205                                          "e)s' from svn '%(repo)s'.") % \
206                                        { 'name': testbase_name,
207                                         'repo': testbase_base })
208
209         except OSError:
210             self.logger.error(_("svn is not installed. exiting...\n"))
211             sys.exit(0)
212
213     ##
214     # Configure tests base.
215     def prepare_testbase(self, test_base_name):
216         src.printcolors.print_value(self.logger,
217                                     _("Test base"),
218                                     test_base_name,
219                                     3)
220         self.logger.write("\n", 3, False)
221
222         # search for the test base
223         test_base_info = None
224         for project_name in self.config.PROJECTS.projects:
225             project_info = self.config.PROJECTS.projects[project_name]
226             if "test_bases" not in project_info:
227                 continue
228             for t_b_info in project_info.test_bases:
229                 if t_b_info.name == test_base_name:
230                     test_base_info = t_b_info
231
232         if not test_base_info:
233             if os.path.exists(test_base_name):
234                 self.prepare_testbase_from_dir("DIR", test_base_name)
235                 self.currentTestBase = "DIR"
236                 return 0
237
238         if not test_base_info:
239             message = (_("########## ERROR: test base '%s' not found\n") %
240                        test_base_name)
241             self.logger.write("%s\n" % src.printcolors.printcError(message))
242             return 1
243
244         if test_base_info.get_sources == "dir":
245             self.prepare_testbase_from_dir(test_base_name,
246                                            test_base_info.info.dir)
247         elif test_base_info.get_sources == "git":
248             self.prepare_testbase_from_git(test_base_name,
249                                        test_base_info.info.base,
250                                        self.config.APPLICATION.test_base.tag)
251         elif test_base_info.get_sources == "svn":
252             svn_user = src.get_cfg_param(test_base_info.info,
253                                          "svn_user",
254                                          self.config.USER.svn_user)
255             self.prepare_testbase_from_svn(svn_user,
256                                        test_base_name,
257                                        test_base_info.info.base)
258         else:
259             raise src.SatException(_("unknown source type '%(type)s' for test b"
260                                      "ase '%(base)s' ...\n") % {
261                                         'type': test_base_info.get_sources,
262                                         'base': test_base_name })
263
264         self.currentTestBase = test_base_name
265
266     ##
267     # Searches if the script is declared in known errors pyconf.
268     # Update the status if needed.
269     def search_known_errors(self, status, test_grid, test_session, test):
270         test_path = os.path.join(test_grid, test_session, test)
271         if not src.config_has_application(self.config):
272             return status, []
273
274         if self.known_errors is None:
275             return status, []
276
277         platform = self.config.VARS.arch
278         application = self.config.VARS.application
279         error = self.known_errors.get_error(test_path, application, platform)
280         if error is None:
281             return status, []
282
283         if status == src.OK_STATUS:
284             if not error.fixed:
285                 # the error is fixed
286                 self.known_errors.fix_error(error)
287                 #import testerror
288                 #testerror.write_test_failures(
289                 #                        self.config.TOOLS.testerror.file_path,
290                 #                        self.known_errors.errors)
291             return status, [ error.date,
292                             error.expected,
293                             error.comment,
294                             error.fixed ]
295
296         if error.fixed:
297             self.known_errors.unfix_error(error)
298             #import testerror
299             #testerror.write_test_failures(self.config.TOOLS.testerror.file_path,
300             #                              self.known_errors.errors)
301
302         delta = self.known_errors.get_expecting_days(error)
303         kfres = [ error.date, error.expected, error.comment, error.fixed ]
304         if delta < 0:
305             return src.KO_STATUS, kfres
306         return src.KNOWNFAILURE_STATUS, kfres
307
308     ##
309     # Read the *.result.py files.
310     def read_results(self, listTest, has_timed_out):
311         results = {}
312         for test in listTest:
313             resfile = os.path.join(self.currentDir,
314                                    self.currentgrid,
315                                    self.currentsession,
316                                    test[:-3] + ".result.py")
317
318             # check if <test>.result.py file exists
319             if not os.path.exists(resfile):
320                 results[test] = ["?", -1, "", []]
321             else:
322                 gdic, ldic = {}, {}
323                 if verbose:
324                   print("test script: '%s':\n'%s'\n" % (resfile, open(resfile, 'r').read()))
325
326                 try:
327                   execfile(resfile, gdic, ldic)
328
329                   status = src.TIMEOUT_STATUS
330                   if not has_timed_out:
331                       status = src.KO_STATUS
332
333                   if 'status' in ldic:
334                       status = ldic['status']
335
336                   expected = []
337                   if status == src.KO_STATUS or status == src.OK_STATUS:
338                       status, expected = self.search_known_errors(status,
339                                                               self.currentgrid,
340                                                               self.currentsession,
341                                                               test)
342
343                   callback = ""
344                   if 'callback' in ldic:
345                       callback = ldic['callback']
346                   elif status == src.KO_STATUS:
347                       callback = "CRASH"
348                       if verbose:
349                         print("--- CRASH ldic\n%s" % PP.pformat(ldic)) # cvw TODO
350                         print("--- CRASH gdic\n%s" %  PP.pformat(gdic))
351                         pass
352
353                   exec_time = -1
354                   if 'time' in ldic:
355                       try:
356                           exec_time = float(ldic['time'])
357                       except:
358                           pass
359
360                   results[test] = [status, exec_time, callback, expected]
361
362                 except:
363                   results[test] = ["?", -1, "", []]
364                   # results[test] = [src.O_STATUS, -1, open(resfile, 'r').read(), []]
365
366             # check if <test>.py file exists
367             testfile = os.path.join(self.currentDir,
368                                    self.currentgrid,
369                                    self.currentsession,
370                                    test)
371
372             if not os.path.exists(testfile):
373                 results[test].append('')
374             else:
375                 text = open(testfile, "r").read()
376                 results[test].append(text)
377
378             # check if <test>.out.py file exists
379             outfile = os.path.join(self.currentDir,
380                                    self.currentgrid,
381                                    self.currentsession,
382                                    test[:-3] + ".out.py")
383
384             if not os.path.exists(outfile):
385                 results[test].append('')
386             else:
387                 text = open(outfile, "r").read()
388                 results[test].append(text)
389
390         return results
391
392     ##
393     # Generates the script to be run by Salome.
394     # This python script includes init and close statements and a loop
395     # calling all the scripts of a single directory.
396     def generate_script(self, listTest, script_path, ignoreList):
397         # open template file
398         tFile = os.path.join(self.config.VARS.srcDir, "test", "scriptTemplate.py")
399         with open(tFile, 'r') as f:
400           template = string.Template(f.read())
401
402         # create substitution dictionary
403         d = dict()
404         d['resourcesWay'] = os.path.join(self.currentDir, 'RESSOURCES')
405         d['tmpDir'] = os.path.join(self.tmp_working_dir, 'WORK')
406         d['toolsWay'] = os.path.join(self.config.VARS.srcDir, "test")
407         d['sessionDir'] = os.path.join(self.currentDir, self.currentgrid, self.currentsession)
408         d['resultFile'] = os.path.join(self.tmp_working_dir, 'WORK', 'exec_result')
409         d['listTest'] = listTest
410         d['sessionName'] = self.currentsession
411         d['ignore'] = ignoreList
412
413         # create script with template
414         contents = template.safe_substitute(d)
415         if verbose: print("generate_script '%s':\n%s" % (script_path, contents)) # cvw TODO
416         with open(script_path, 'w') as f:
417           f.write(contents)
418
419
420     # Find the getTmpDir function that gives access to *_pidict file directory.
421     # (the *_pidict file exists when SALOME is launched)
422     def get_tmp_dir(self):
423         # Rare case where there is no KERNEL in grid list
424         # (for example MED_STANDALONE)
425         if ('APPLICATION' in self.config
426                 and 'KERNEL' not in self.config.APPLICATION.products
427                 and 'KERNEL_ROOT_DIR' not in os.environ):
428             return getTmpDirDEFAULT
429
430         # Case where "sat test" is launched in an existing SALOME environment
431         if 'KERNEL_ROOT_DIR' in os.environ:
432             root_dir =  os.environ['KERNEL_ROOT_DIR']
433
434         if ('APPLICATION' in self.config and
435             'KERNEL' in self.config.APPLICATION.products):
436             root_dir = src.product.get_product_config(self.config, "KERNEL").install_dir
437
438         # Case where there the appli option is called (with path to launcher)
439         if len(self.launcher) > 0:
440             # There are two cases : The old application (runAppli)
441             # and the new one
442             launcherName = os.path.basename(self.launcher)
443             launcherDir = os.path.dirname(self.launcher)
444             if launcherName == 'runAppli':
445                 # Old application
446                 cmd = """
447 for i in %s/env.d/*.sh;
448   do source ${i};
449 done
450 echo $KERNEL_ROOT_DIR
451 """ % launcherDir
452             else:
453                 # New application
454                 cmd = """
455 echo -e 'import os\nprint(os.environ[\"KERNEL_ROOT_DIR\"])' > tmpscript.py
456 %s shell tmpscript.py
457 """ % self.launcher
458
459             if src.architecture.is_windows():
460                 subproc_res = subprocess.Popen(cmd,
461                             stdout=subprocess.PIPE,
462                             shell=True).communicate()
463                 pass
464             else:
465                 subproc_res = subprocess.Popen(cmd,
466                             stdout=subprocess.PIPE,
467                             shell=True,
468                             executable='/bin/bash').communicate()
469                 pass
470
471             root_dir = subproc_res[0].split()[-1]
472
473         # import grid salome_utils from KERNEL that gives
474         # the right getTmpDir function
475         root_dir = root_dir.decode('utf-8')
476         aPath = [os.path.join(root_dir, 'bin', 'salome')]
477         sal_uts = "salome_utils"
478         try:
479             (file_, pathname, description) = imp.find_module(sal_uts, aPath )
480         except Exception:
481             msg = "inexisting %s.py in %s" % (sal_uts, aPath)
482             raise Exception(msg)
483
484         try:
485             grid = imp.load_module(sal_uts, file_, pathname, description)
486             return grid.getLogDir
487         except:
488             grid = imp.load_module(sal_uts, file_, pathname, description)
489             return grid.getTmpDir
490         finally:
491             if file_:
492                 file_.close()
493
494
495     def get_test_timeout(self, test_name, default_value):
496         if ("timeout" in self.settings and
497                 test_name in self.settings["timeout"]):
498             return self.settings["timeout"][test_name]
499
500         return default_value
501
502     def generate_launching_commands(self):
503
504         # Case where there the appli option is called (with path to launcher)
505         if len(self.launcher) > 0:
506             # There are two cases : The old application (runAppli)
507             # and the new one
508             launcherName = os.path.basename(self.launcher)
509             launcherDir = os.path.dirname(self.launcher)
510             if os.path.basename(launcherDir) == 'APPLI':
511                 # Old application
512                 binSalome = self.launcher
513                 binPython = ("for i in " +
514                              launcherDir +
515                              "/env.d/*.sh; do source ${i}; done ; python")
516                 killSalome = ("for i in " +
517                         launcherDir +
518                         "/env.d/*.sh; do source ${i}; done ; killSalome.py'")
519                 return binSalome, binPython, killSalome
520             else:
521                 # New application
522                 binSalome = self.launcher
523                 binPython = self.launcher + ' shell'
524                 killSalome = self.launcher + ' killall'
525                 return binSalome, binPython, killSalome
526
527         # SALOME version detection and APPLI repository detection
528         VersionSalome = src.get_salome_version(self.config)
529         appdir = 'APPLI'
530         if "APPLI" in self.config and "application_name" in self.config.APPLI:
531             appdir = self.config.APPLI.application_name
532
533         # Case where SALOME has NOT the launcher that uses the SalomeContext API
534         if VersionSalome < MMP([7,3,0]):
535             binSalome = os.path.join(self.config.APPLICATION.workdir,
536                                      appdir,
537                                      "runAppli")
538             binPython = "python"
539             killSalome = "killSalome.py"
540             src.environment.load_environment(self.config, False, self.logger)
541             return binSalome, binPython, killSalome
542
543         # Case where SALOME has the launcher that uses the SalomeContext API
544         else:
545             launcher_name = src.get_launcher_name(self.config)
546             binSalome = os.path.join(self.config.APPLICATION.workdir,
547                                      launcher_name)
548
549             binPython = binSalome + ' shell'
550             killSalome = binSalome + ' killall'
551             return binSalome, binPython, killSalome
552
553         return binSalome, binPython, killSalome
554
555
556     ##
557     # Runs tests of a session (using a single instance of Salome).
558     def run_tests(self, listTest, ignoreList):
559         out_path = os.path.join(self.currentDir,
560                                 self.currentgrid,
561                                 self.currentsession)
562         if verbose: print("run_tests '%s'\nlistTest: %s\nignoreList: %s" %
563                    (self.currentDir, PP.pformat(listTest), PP.pformat(ignoreList))) # cvw TODO
564         sessionname = "%s/%s" % (self.currentgrid, self.currentsession)
565         time_out = self.get_test_timeout(sessionname,
566                                          DEFAULT_TIMEOUT)
567
568         time_out_salome = DEFAULT_TIMEOUT
569
570         # generate wrapper script
571         script_path = os.path.join(out_path, 'wrapperScript.py')
572         self.generate_script(listTest, script_path, ignoreList)
573
574         tmpDir = self.get_tmp_dir()
575
576         binSalome, binPython, killSalome = self.generate_launching_commands()
577         if "run_with_grids" in self.settings and \
578            sessionname in self.settings["run_with_grids"]:
579             binSalome = (binSalome + " -m %s" % self.settings["run_with_grids"][sessionname])
580
581         logWay = os.path.join(self.tmp_working_dir, "WORK", "log_cxx")
582
583         status = False
584         elapsed = -1
585         if self.currentsession.startswith("NOGUI_"):
586             # runSalome -t (bash)
587             status, elapsed = fork.batch(
588                                 binSalome,
589                                 self.logger,
590                                 os.path.join(self.tmp_working_dir, "WORK"),
591                                 [ "-t", "--shutdown-server=1", script_path ],
592                                 delai=time_out,
593                                 log=logWay)
594
595         elif self.currentsession.startswith("PY_"):
596             # python script.py
597             status, elapsed = fork.batch(
598                                 binPython,
599                                 self.logger,
600                                 os.path.join(self.tmp_working_dir, "WORK"),
601                                 [script_path],
602                                 delai=time_out,
603                                 log=logWay)
604
605         else:
606             opt = "-z 0"
607             if self.show_desktop: opt = "--show-desktop=0"
608             status, elapsed = fork.batch_salome(
609                                 binSalome,
610                                 self.logger,
611                                 os.path.join( self.tmp_working_dir, "WORK"),
612                                 [ opt, "--shutdown-server=1", script_path ],
613                                 getTmpDir=tmpDir,
614                                 fin=killSalome,
615                                 delai=time_out,
616                                 log=logWay,
617                                 delaiapp=time_out_salome)
618
619         self.logger.write("status = %s, elapsed = %s\n" % (status, elapsed), 5)
620
621         # create the test result to add in the config object
622         test_info = src.pyconf.Mapping(self.config)
623         test_info.testbase = self.currentTestBase
624         test_info.grid = self.currentgrid
625         test_info.session = self.currentsession
626         test_info.script = src.pyconf.Sequence(self.config)
627
628         script_results = self.read_results(listTest, elapsed == time_out)
629         for sr in sorted(script_results.keys()):
630             self.nb_run += 1
631
632             # create script result
633             script_info = src.pyconf.Mapping(self.config)
634             script_info.name = sr
635             script_info.res = script_results[sr][0]
636             script_info.time = script_results[sr][1]
637             if script_info.res == src.TIMEOUT_STATUS:
638                 script_info.time = time_out
639             if script_info.time < 1e-3: script_info.time = 0
640
641             callback = script_results[sr][2]
642             if script_info.res != src.OK_STATUS and len(callback) > 0:
643                 script_info.callback = callback
644
645             kfres = script_results[sr][3]
646             if len(kfres) > 0:
647                 script_info.known_error = src.pyconf.Mapping(self.config)
648                 script_info.known_error.date = kfres[0]
649                 script_info.known_error.expected = kfres[1]
650                 script_info.known_error.comment = kfres[2]
651                 script_info.known_error.fixed = kfres[3]
652
653             script_info.content = script_results[sr][4]
654             script_info.out = script_results[sr][5]
655
656             # add it to the list of results
657             test_info.script.append(script_info, '')
658
659             # display the results
660             if script_info.time > 0:
661                 exectime = "(%7.3f s)" % script_info.time
662             else:
663                 exectime = ""
664
665             sp = "." * (35 - len(script_info.name))
666             self.logger.write(self.write_test_margin(3), 3)
667             self.logger.write("script %s %s %s %s\n" % (
668                                 src.printcolors.printcLabel(script_info.name),
669                                 sp,
670                                 src.printcolors.printc(script_info.res),
671                                 exectime), 3, False)
672             if script_info and len(callback) > 0:
673                 self.logger.write("Exception in %s\n%s\n" % \
674                     (script_info.name,
675                      src.printcolors.printcWarning(callback)), 2, False)
676
677             if script_info.res == src.OK_STATUS:
678                 self.nb_succeed += 1
679             elif script_info.res == src.KNOWNFAILURE_STATUS:
680                 self.nb_acknoledge += 1
681             elif script_info.res == src.TIMEOUT_STATUS:
682                 self.nb_timeout += 1
683             elif script_info.res == src.NA_STATUS:
684                 self.nb_run -= 1
685             elif script_info.res == "?":
686                 self.nb_not_run += 1
687
688
689         self.config.TESTS.append(test_info, '')
690
691     ##
692     # Runs all tests of a session.
693     def run_session_tests(self):
694
695         self.logger.write(self.write_test_margin(2), 3)
696         self.logger.write("Session = %s\n" % src.printcolors.printcLabel(
697                                                     self.currentsession), 3, False)
698
699         # prepare list of tests to run
700         tests = os.listdir(os.path.join(self.currentDir,
701                                         self.currentgrid,
702                                         self.currentsession))
703         # avoid result files of previous tests, if presents
704         # tests = filter(lambda l: l.endswith(".py"), tests)
705         tests = [t for t in tests if t.endswith(".py") \
706                    and not ( t.endswith(".out.py") or \
707                              t.endswith(".result.py") or \
708                              t.endswith("wrapperScript.py") \
709                            ) ]
710         tests = sorted(tests, key=str.lower)
711
712         # build list of known failures
713         cat = "%s/%s/" % (self.currentgrid, self.currentsession)
714         ignoreDict = {}
715         for k in self.ignore_tests.keys():
716             if k.startswith(cat):
717                 ignoreDict[k[len(cat):]] = self.ignore_tests[k]
718
719         self.run_tests(tests, ignoreDict)
720
721     ##
722     # Runs all tests of a grid.
723     def run_grid_tests(self):
724         self.logger.write(self.write_test_margin(1), 3)
725         self.logger.write("grid = %s\n" % src.printcolors.printcLabel(
726                                                 self.currentgrid), 3, False)
727
728         grid_path = os.path.join(self.currentDir, self.currentgrid)
729
730         sessions = []
731         if self.sessions is not None:
732             sessions = self.sessions # user choice
733         else:
734             # use all scripts in grid
735             sessions = filter(lambda l: l not in C_IGNORE_GRIDS,
736                            os.listdir(grid_path))
737             sessions = filter(lambda l: os.path.isdir(os.path.join(grid_path,
738                                                                 l)), sessions)
739
740         sessions = sorted(sessions, key=str.lower)
741         existingSessions = self.getSubDirectories(grid_path)
742         for session_ in sessions:
743             if not os.path.exists(os.path.join(grid_path, session_)):
744                 self.logger.write(self.write_test_margin(2), 3)
745                 msg = """\
746 Session '%s' not found
747 Existing sessions are:
748 %s
749 """ % (session_, PP.pformat(sorted(existingSessions)))
750                 self.logger.write(src.printcolors.printcWarning(msg), 3, False)
751             else:
752                 self.currentsession = session_
753                 self.run_session_tests()
754
755     def getSubDirectories(self, aDir):
756         """
757         get names of first level of sub directories in aDir
758         excluding '.git' etc as beginning with '.'
759         """
760         res = os.listdir(aDir)
761         res = [d for d in res if os.path.isdir(os.path.join(aDir, d)) and d[0] != '.']
762         # print("getSubDirectories %s are:\n%s" % (aDir, PP.pformat(res)))
763         return res
764
765     ##
766     # Runs test testbase.
767     def run_testbase_tests(self):
768         res_dir = os.path.join(self.currentDir, "RESSOURCES")
769         os.environ['PYTHONPATH'] =  (res_dir +
770                                      os.pathsep +
771                                      os.environ['PYTHONPATH'])
772         os.environ['TT_BASE_RESSOURCES'] = res_dir
773         src.printcolors.print_value(self.logger,
774                                     "TT_BASE_RESSOURCES",
775                                     res_dir,
776                                     4)
777         self.logger.write("\n", 4, False)
778
779         self.logger.write(self.write_test_margin(0), 3)
780         testbase_label = "Test base = %s\n" % src.printcolors.printcLabel(
781                                                         self.currentTestBase)
782         self.logger.write(testbase_label, 3, False)
783         self.logger.write("-" * len(src.printcolors.cleancolor(testbase_label)),
784                           3)
785         self.logger.write("\n", 3, False)
786
787         # load settings
788         settings_file = os.path.join(res_dir, "test_settings.py")
789         if os.path.exists(settings_file):
790             gdic, ldic = {}, {}
791             execfile(settings_file, gdic, ldic)
792             self.logger.write("Load test settings '%s'\n" % settings_file, 5)
793             self.settings = ldic['settings_dic']
794             self.ignore_tests = ldic['known_failures_list']
795             if isinstance(self.ignore_tests, list):
796                 self.ignore_tests = {}
797                 self.logger.write(src.printcolors.printcWarning(
798                   "known_failures_list must be a dictionary (not a list)") + "\n", 1, False)
799         else:
800             self.ignore_tests = {}
801             self.settings.clear()
802
803         # read known failures pyconf
804         if "testerror" in self.config.LOCAL:
805             #import testerror
806             #self.known_errors = testerror.read_test_failures(
807             #                            self.config.TOOLS.testerror.file_path,
808             #                            do_error=False)
809             pass
810         else:
811             self.known_errors = None
812
813         if self.grids is not None:
814             grids = self.grids # given by user
815         else:
816             # select all the grids (i.e. directories) in the directory
817             grids = filter(lambda l: l not in C_IGNORE_GRIDS,
818                              os.listdir(self.currentDir))
819             grids = filter(lambda l: os.path.isdir(
820                                         os.path.join(self.currentDir, l)),
821                              grids)
822
823         grids = sorted(grids, key=str.lower)
824         existingGrids = self.getSubDirectories(self.currentDir)
825         for grid in grids:
826             if not os.path.exists(os.path.join(self.currentDir, grid)):
827                 self.logger.write(self.write_test_margin(1), 3)
828                 msg = """\
829 Grid '%s' does not exist
830 Existing grids are:
831 %s
832 """ % (grid, PP.pformat(sorted(existingGrids)))
833                 self.logger.write(src.printcolors.printcWarning(msg), 3, False)
834             else:
835                 self.currentgrid = grid
836                 self.run_grid_tests()
837
838     def run_script(self, script_name):
839         if ('APPLICATION' in self.config and
840                 script_name in self.config.APPLICATION):
841             script = self.config.APPLICATION[script_name]
842             if len(script) == 0:
843                 return
844
845             self.logger.write("\n", 2, False)
846             if not os.path.exists(script):
847                 self.logger.write(src.printcolors.printcWarning("WARNING: scrip"
848                                         "t not found: %s" % script) + "\n", 2)
849             else:
850                 self.logger.write(src.printcolors.printcHeader("----------- sta"
851                                             "rt %s" % script_name) + "\n", 2)
852                 self.logger.write("Run script: %s\n" % script, 2)
853                 subprocess.Popen(script, shell=True).wait()
854                 self.logger.write(src.printcolors.printcHeader("----------- end"
855                                                 " %s" % script_name) + "\n", 2)
856
857     def run_all_tests(self):
858         initTime = datetime.datetime.now()
859
860         self.run_script('test_setup')
861         self.logger.write("\n", 2, False)
862
863         self.logger.write(src.printcolors.printcHeader(
864                                             _("=== STARTING TESTS")) + "\n", 2)
865         self.logger.write("\n", 2, False)
866         self.currentDir = os.path.join(self.tmp_working_dir,
867                                        'BASES',
868                                        self.currentTestBase)
869         self.run_testbase_tests()
870
871         # calculate total execution time
872         totalTime = datetime.datetime.now() - initTime
873         totalTime -= datetime.timedelta(microseconds=totalTime.microseconds)
874         self.logger.write("\n", 2, False)
875         self.logger.write(src.printcolors.printcHeader(_("=== END TESTS")), 2)
876         self.logger.write(" %s\n" % src.printcolors.printcInfo(str(totalTime)),
877                           2,
878                           False)
879
880         #
881         # Start the tests
882         #
883         self.run_script('test_cleanup')
884         self.logger.write("\n", 2, False)
885
886         # evaluate results
887
888         res_out = _("Tests Results: %(succeed)d / %(total)d\n") % \
889             { 'succeed': self.nb_succeed, 'total': self.nb_run }
890         if self.nb_succeed == self.nb_run:
891             res_out = src.printcolors.printcSuccess(res_out)
892         else:
893             res_out = src.printcolors.printcError(res_out)
894         self.logger.write(res_out, 1)
895
896         if self.nb_timeout > 0:
897             self.logger.write(_("%d tests TIMEOUT\n") % self.nb_timeout, 1)
898         if self.nb_not_run > 0:
899             self.logger.write(_("%d tests not executed\n") % self.nb_not_run, 1)
900         if self.nb_acknoledge > 0:
901             self.logger.write(_("%d tests known failures\n") % self.nb_acknoledge, 1)
902
903         status = src.OK_STATUS
904         if self.nb_run - self.nb_succeed - self.nb_acknoledge > 0:
905             status = src.KO_STATUS
906         elif self.nb_acknoledge:
907             status = src.KNOWNFAILURE_STATUS
908
909         self.logger.write(_("Status: %s\n" % status), 3)
910
911         return self.nb_run - self.nb_succeed - self.nb_acknoledge
912
913     ##
914     # Write margin to show test results.
915     def write_test_margin(self, tab):
916         if tab == 0:
917             return ""
918         return "|   " * (tab - 1) + "+ "