Salome HOME
sat #20679 : maj doc pdf
[tools/sat.git] / src / test_module.py
1 #!/usr/bin/env python
2 #-*- coding:utf-8 -*-
3 #  Copyright (C) 2010-2013  CEA/DEN
4 #
5 #  This library is free software; you can redistribute it and/or
6 #  modify it under the terms of the GNU Lesser General Public
7 #  License as published by the Free Software Foundation; either
8 #  version 2.1 of the License.
9 #
10 #  This library is distributed in the hope that it will be useful,
11 #  but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 #  Lesser General Public License for more details.
14 #
15 #  You should have received a copy of the GNU Lesser General Public
16 #  License along with this library; if not, write to the Free Software
17 #  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
18
19 # Python 2/3 compatibility for execfile function
20 try:
21     execfile
22 except:
23     def execfile(somefile, global_vars, local_vars):
24         with open(somefile) as f:
25             code = compile(f.read(), somefile, 'exec')
26             exec(code, global_vars, local_vars)
27
28 import os
29 import sys
30 import datetime
31 import shutil
32 import string
33 import imp
34 import subprocess
35 import glob
36 import pprint as PP
37
38 verbose = False
39
40 from . import fork
41 import src
42
43 # directories not considered as test grids
44 C_IGNORE_GRIDS = ['.git', '.svn', 'RESSOURCES']
45
46 DEFAULT_TIMEOUT = 150
47
48 # Get directory to be used for the temporary files.
49 #
50 def getTmpDirDEFAULT():
51     if src.architecture.is_windows():
52         directory = os.getenv("TEMP")
53     else:
54         # for Linux: use /tmp/logs/{user} folder
55         directory = os.path.join( '/tmp', 'logs', os.getenv("USER", "unknown"))
56     return directory
57
58 class Test:
59     def __init__(self,
60                  config,
61                  logger,
62                  tmp_working_dir,
63                  testbase="",
64                  grids=None,
65                  sessions=None,
66                  launcher="",
67                  show_desktop=True):
68         self.grids = grids
69         self.config = config
70         self.logger = logger
71         self.tmp_working_dir = tmp_working_dir
72         self.sessions = sessions
73         self.launcher = launcher
74         self.show_desktop = show_desktop
75
76         res = self.prepare_testbase(testbase)
77         self.test_base_found = True
78         if res == 1:
79             # Fail
80             self.test_base_found = False
81         
82         self.settings = {}
83         self.known_errors = None
84
85         # create section for results
86         self.config.TESTS = src.pyconf.Sequence(self.config)
87
88         self.nb_run = 0
89         self.nb_succeed = 0
90         self.nb_timeout = 0
91         self.nb_not_run = 0
92         self.nb_acknoledge = 0
93
94     def _copy_dir(self, source, target):
95         if self.config.VARS.python >= "2.6":
96             shutil.copytree(source, target,
97                             symlinks=True,
98                             ignore=shutil.ignore_patterns('.git*','.svn*'))
99         else:
100             shutil.copytree(source, target,
101                             symlinks=True)
102
103     def prepare_testbase_from_dir(self, testbase_name, testbase_dir):
104         self.logger.write(_("get test base from dir: %s\n") % \
105                           src.printcolors.printcLabel(testbase_dir), 3)
106         if not os.access(testbase_dir, os.X_OK):
107             raise src.SatException(_("testbase %(name)s (%(dir)s) does not "
108                                      "exist ...\n") % { 'name': testbase_name,
109                                                        'dir': testbase_dir })
110
111         self._copy_dir(testbase_dir,
112                        os.path.join(self.tmp_working_dir, 'BASES', testbase_name))
113
114     def prepare_testbase_from_git(self,
115                                   testbase_name,
116                                   testbase_base,
117                                   testbase_tag):
118         self.logger.write(
119             _("get test base '%(testbase)s' with '%(tag)s' tag from git\n") % {
120                         "testbase" : src.printcolors.printcLabel(testbase_name),
121                         "tag" : src.printcolors.printcLabel(testbase_tag)},
122                           3)
123         try:
124             def set_signal(): # pragma: no cover
125                 """see http://bugs.python.org/issue1652"""
126                 import signal
127                 signal.signal(signal.SIGPIPE, signal.SIG_DFL)
128
129             cmd = "git clone --depth 1 %(base)s %(dir)s"
130             cmd += " && cd %(dir)s"
131             if testbase_tag=='master':
132                 cmd += " && git fetch origin %(branch)s"
133             else:
134                 cmd += " && git fetch origin %(branch)s:%(branch)s"
135             cmd += " && git checkout %(branch)s"
136             cmd = cmd % { 'branch': testbase_tag,
137                          'base': testbase_base,
138                          'dir': testbase_name }
139
140             self.logger.write("> %s\n" % cmd, 5)
141             if src.architecture.is_windows():
142                 # preexec_fn not supported on windows platform
143                 res = subprocess.call(cmd,
144                                 cwd=os.path.join(self.tmp_working_dir, 'BASES'),
145                                 shell=True,
146                                 stdout=self.logger.logTxtFile,
147                                 stderr=subprocess.PIPE)
148             else:
149                 res = subprocess.call(cmd,
150                                 cwd=os.path.join(self.tmp_working_dir, 'BASES'),
151                                 shell=True,
152                                 preexec_fn=set_signal,
153                                 stdout=self.logger.logTxtFile,
154                                 stderr=subprocess.PIPE)
155             if res != 0:
156                 raise src.SatException(_("Error: unable to get test base "
157                                          "'%(name)s' from git '%(repo)s'.") % \
158                                        { 'name': testbase_name,
159                                         'repo': testbase_base })
160
161         except OSError:
162             self.logger.error(_("git is not installed. exiting...\n"))
163             sys.exit(0)
164
165     def prepare_testbase_from_svn(self, user, testbase_name, testbase_base):
166         self.logger.write(_("get test base '%s' from svn\n") % \
167                           src.printcolors.printcLabel(testbase_name), 3)
168         try:
169             def set_signal(): # pragma: no cover
170                 """see http://bugs.python.org/issue1652"""
171                 import signal
172                 signal.signal(signal.SIGPIPE, signal.SIG_DFL)
173
174             cmd = "svn checkout --username %(user)s %(base)s %(dir)s"
175             cmd = cmd % { 'user': user,
176                          'base': testbase_base,
177                          'dir': testbase_name }
178             
179             # Get the application environment
180             self.logger.write(_("Set the application environment\n"), 5)
181             env_appli = src.environment.SalomeEnviron(self.config,
182                                       src.environment.Environ(dict(os.environ)))
183             env_appli.set_application_env(self.logger)
184             
185             self.logger.write("> %s\n" % cmd, 5)
186             if src.architecture.is_windows():
187                 # preexec_fn not supported on windows platform
188                 res = subprocess.call(cmd,
189                                 cwd=os.path.join(self.tmp_working_dir, 'BASES'),
190                                 shell=True,
191                                 stdout=self.logger.logTxtFile,
192                                 stderr=subprocess.PIPE)
193             else:
194                 res = subprocess.call(cmd,
195                                 cwd=os.path.join(self.tmp_working_dir, 'BASES'),
196                                 shell=True,
197                                 preexec_fn=set_signal,
198                                 stdout=self.logger.logTxtFile,
199                                 stderr=subprocess.PIPE,
200                                 env=env_appli.environ.environ,)
201
202             if res != 0:
203                 raise src.SatException(_("Error: unable to get test base '%(nam"
204                                          "e)s' from svn '%(repo)s'.") % \
205                                        { 'name': testbase_name,
206                                         'repo': testbase_base })
207
208         except OSError:
209             self.logger.error(_("svn is not installed. exiting...\n"))
210             sys.exit(0)
211
212     ##
213     # Configure tests base.
214     def prepare_testbase(self, test_base_name):
215         src.printcolors.print_value(self.logger,
216                                     _("Test base"),
217                                     test_base_name,
218                                     3)
219         self.logger.write("\n", 3, False)
220
221         # search for the test base
222         test_base_info = None
223         for project_name in self.config.PROJECTS.projects:
224             project_info = self.config.PROJECTS.projects[project_name]
225             if "test_bases" not in project_info:
226                 continue
227             for t_b_info in project_info.test_bases:
228                 if t_b_info.name == test_base_name:
229                     test_base_info = t_b_info
230         
231         if not test_base_info:
232             if os.path.exists(test_base_name):
233                 self.prepare_testbase_from_dir("DIR", test_base_name)
234                 self.currentTestBase = "DIR"
235                 return 0
236         
237         if not test_base_info:
238             message = (_("########## ERROR: test base '%s' not found\n") % 
239                        test_base_name)
240             self.logger.write("%s\n" % src.printcolors.printcError(message))
241             return 1
242
243         if test_base_info.get_sources == "dir":
244             self.prepare_testbase_from_dir(test_base_name,
245                                            test_base_info.info.dir)
246         elif test_base_info.get_sources == "git":
247             self.prepare_testbase_from_git(test_base_name,
248                                        test_base_info.info.base,
249                                        self.config.APPLICATION.test_base.tag)
250         elif test_base_info.get_sources == "svn":
251             svn_user = src.get_cfg_param(test_base_info.info,
252                                          "svn_user",
253                                          self.config.USER.svn_user)
254             self.prepare_testbase_from_svn(svn_user,
255                                        test_base_name,
256                                        test_base_info.info.base)
257         else:
258             raise src.SatException(_("unknown source type '%(type)s' for test b"
259                                      "ase '%(base)s' ...\n") % {
260                                         'type': test_base_info.get_sources,
261                                         'base': test_base_name })
262
263         self.currentTestBase = test_base_name
264
265     ##
266     # Searches if the script is declared in known errors pyconf.
267     # Update the status if needed.
268     def search_known_errors(self, status, test_grid, test_session, test):
269         test_path = os.path.join(test_grid, test_session, test)
270         if not src.config_has_application(self.config):
271             return status, []
272
273         if self.known_errors is None:
274             return status, []
275
276         platform = self.config.VARS.arch
277         application = self.config.VARS.application
278         error = self.known_errors.get_error(test_path, application, platform)
279         if error is None:
280             return status, []
281         
282         if status == src.OK_STATUS:
283             if not error.fixed:
284                 # the error is fixed
285                 self.known_errors.fix_error(error)
286                 #import testerror
287                 #testerror.write_test_failures(
288                 #                        self.config.TOOLS.testerror.file_path,
289                 #                        self.known_errors.errors)
290             return status, [ error.date,
291                             error.expected,
292                             error.comment,
293                             error.fixed ]
294
295         if error.fixed:
296             self.known_errors.unfix_error(error)
297             #import testerror
298             #testerror.write_test_failures(self.config.TOOLS.testerror.file_path,
299             #                              self.known_errors.errors)
300
301         delta = self.known_errors.get_expecting_days(error)
302         kfres = [ error.date, error.expected, error.comment, error.fixed ]
303         if delta < 0:
304             return src.KO_STATUS, kfres
305         return src.KNOWNFAILURE_STATUS, kfres
306
307     ##
308     # Read the *.result.py files.
309     def read_results(self, listTest, has_timed_out):
310         results = {}
311         for test in listTest:
312             resfile = os.path.join(self.currentDir,
313                                    self.currentgrid,
314                                    self.currentsession,
315                                    test[:-3] + ".result.py")
316
317             # check if <test>.result.py file exists
318             if not os.path.exists(resfile):
319                 results[test] = ["?", -1, "", []]
320             else:
321                 gdic, ldic = {}, {}
322                 if verbose:
323                   print("test script: '%s':\n'%s'\n" % (resfile, open(resfile, 'r').read()))
324
325                 try:
326                   execfile(resfile, gdic, ldic)
327
328                   status = src.TIMEOUT_STATUS
329                   if not has_timed_out:
330                       status = src.KO_STATUS
331
332                   if 'status' in ldic:
333                       status = ldic['status']
334
335                   expected = []
336                   if status == src.KO_STATUS or status == src.OK_STATUS:
337                       status, expected = self.search_known_errors(status,
338                                                               self.currentgrid,
339                                                               self.currentsession,
340                                                               test)
341
342                   callback = ""
343                   if 'callback' in ldic:
344                       callback = ldic['callback']
345                   elif status == src.KO_STATUS:
346                       callback = "CRASH"
347                       if verbose:
348                         print("--- CRASH ldic\n%s" % PP.pformat(ldic)) # cvw TODO
349                         print("--- CRASH gdic\n%s" %  PP.pformat(gdic))
350                         pass
351
352                   exec_time = -1
353                   if 'time' in ldic:
354                       try:
355                           exec_time = float(ldic['time'])
356                       except:
357                           pass
358
359                   results[test] = [status, exec_time, callback, expected]
360
361                 except:
362                   results[test] = ["?", -1, "", []]
363                   # results[test] = [src.O_STATUS, -1, open(resfile, 'r').read(), []]
364             
365             # check if <test>.py file exists
366             testfile = os.path.join(self.currentDir,
367                                    self.currentgrid,
368                                    self.currentsession,
369                                    test)
370             
371             if not os.path.exists(testfile):
372                 results[test].append('')
373             else:
374                 text = open(testfile, "r").read()
375                 results[test].append(text)
376
377             # check if <test>.out.py file exists
378             outfile = os.path.join(self.currentDir,
379                                    self.currentgrid,
380                                    self.currentsession,
381                                    test[:-3] + ".out.py")
382             
383             if not os.path.exists(outfile):
384                 results[test].append('')
385             else:
386                 text = open(outfile, "r").read()
387                 results[test].append(text)
388
389         return results
390
391     ##
392     # Generates the script to be run by Salome.
393     # This python script includes init and close statements and a loop
394     # calling all the scripts of a single directory.
395     def generate_script(self, listTest, script_path, ignoreList):
396         # open template file
397         tFile = os.path.join(self.config.VARS.srcDir, "test", "scriptTemplate.py")
398         with open(tFile, 'r') as f:
399           template = string.Template(f.read())
400         
401         # create substitution dictionary
402         d = dict()
403         d['resourcesWay'] = os.path.join(self.currentDir, 'RESSOURCES')
404         d['tmpDir'] = os.path.join(self.tmp_working_dir, 'WORK')
405         d['toolsWay'] = os.path.join(self.config.VARS.srcDir, "test")
406         d['sessionDir'] = os.path.join(self.currentDir, self.currentgrid, self.currentsession)
407         d['resultFile'] = os.path.join(self.tmp_working_dir, 'WORK', 'exec_result')
408         d['listTest'] = listTest
409         d['sessionName'] = self.currentsession
410         d['ignore'] = ignoreList
411
412         # create script with template
413         contents = template.safe_substitute(d)
414         if verbose: print("generate_script '%s':\n%s" % (script_path, contents)) # cvw TODO
415         with open(script_path, 'w') as f:
416           f.write(contents)
417
418
419     # Find the getTmpDir function that gives access to *_pidict file directory.
420     # (the *_pidict file exists when SALOME is launched)
421     def get_tmp_dir(self):
422         # Rare case where there is no KERNEL in grid list 
423         # (for example MED_STANDALONE)
424         if ('APPLICATION' in self.config 
425                 and 'KERNEL' not in self.config.APPLICATION.products 
426                 and 'KERNEL_ROOT_DIR' not in os.environ):
427             return getTmpDirDEFAULT
428         
429         # Case where "sat test" is launched in an existing SALOME environment
430         if 'KERNEL_ROOT_DIR' in os.environ:
431             root_dir =  os.environ['KERNEL_ROOT_DIR']
432         
433         if ('APPLICATION' in self.config and
434             'KERNEL' in self.config.APPLICATION.products):
435             root_dir = src.product.get_product_config(self.config, "KERNEL").install_dir
436
437         # Case where there the appli option is called (with path to launcher)
438         if len(self.launcher) > 0:
439             # There are two cases : The old application (runAppli) 
440             # and the new one
441             launcherName = os.path.basename(self.launcher)
442             launcherDir = os.path.dirname(self.launcher)
443             if launcherName == 'runAppli':
444                 # Old application
445                 cmd = """
446 for i in %s/env.d/*.sh; 
447   do source ${i};
448 done
449 echo $KERNEL_ROOT_DIR
450 """ % launcherDir
451             else:
452                 # New application
453                 cmd = """
454 echo -e 'import os\nprint(os.environ[\"KERNEL_ROOT_DIR\"])' > tmpscript.py
455 %s shell tmpscript.py
456 """ % self.launcher
457
458             if src.architecture.is_windows():
459                 subproc_res = subprocess.Popen(cmd,
460                             stdout=subprocess.PIPE,
461                             shell=True).communicate()
462                 pass
463             else:
464                 subproc_res = subprocess.Popen(cmd,
465                             stdout=subprocess.PIPE,
466                             shell=True,
467                             executable='/bin/bash').communicate()
468                 pass
469             
470             root_dir = subproc_res[0].split()[-1]
471         
472         # import grid salome_utils from KERNEL that gives 
473         # the right getTmpDir function
474         root_dir = root_dir.decode('utf-8')
475         aPath = [os.path.join(root_dir, 'bin', 'salome')]
476         sal_uts = "salome_utils"
477         try:
478             (file_, pathname, description) = imp.find_module(sal_uts, aPath )
479         except Exception:
480             msg = "inexisting %s.py in %s" % (sal_uts, aPath)
481             raise Exception(msg)
482
483         try:
484             grid = imp.load_module(sal_uts, file_, pathname, description)
485             return grid.getLogDir
486         except:
487             grid = imp.load_module(sal_uts, file_, pathname, description)
488             return grid.getTmpDir
489         finally:
490             if file_:
491                 file_.close()
492
493
494     def get_test_timeout(self, test_name, default_value):
495         if ("timeout" in self.settings and 
496                 test_name in self.settings["timeout"]):
497             return self.settings["timeout"][test_name]
498
499         return default_value
500
501     def generate_launching_commands(self):
502         
503         # Case where there the appli option is called (with path to launcher)
504         if len(self.launcher) > 0:
505             # There are two cases : The old application (runAppli) 
506             # and the new one
507             launcherName = os.path.basename(self.launcher)
508             launcherDir = os.path.dirname(self.launcher)
509             if os.path.basename(launcherDir) == 'APPLI':
510                 # Old application
511                 binSalome = self.launcher
512                 binPython = ("for i in " +
513                              launcherDir +
514                              "/env.d/*.sh; do source ${i}; done ; python")
515                 killSalome = ("for i in " +
516                         launcherDir +
517                         "/env.d/*.sh; do source ${i}; done ; killSalome.py'")
518                 return binSalome, binPython, killSalome
519             else:
520                 # New application
521                 binSalome = self.launcher
522                 binPython = self.launcher + ' shell'
523                 killSalome = self.launcher + ' killall'
524                 return binSalome, binPython, killSalome
525
526         # SALOME version detection and APPLI repository detection
527         VersionSalome = src.get_salome_version(self.config)
528         appdir = 'APPLI'
529         if "APPLI" in self.config and "application_name" in self.config.APPLI:
530             appdir = self.config.APPLI.application_name
531         
532         # Case where SALOME has NOT the launcher that uses the SalomeContext API
533         if VersionSalome < 730:
534             binSalome = os.path.join(self.config.APPLICATION.workdir,
535                                      appdir,
536                                      "runAppli")
537             binPython = "python"
538             killSalome = "killSalome.py"
539             src.environment.load_environment(self.config, False, self.logger)           
540             return binSalome, binPython, killSalome
541         
542         # Case where SALOME has the launcher that uses the SalomeContext API
543         else:            
544             launcher_name = src.get_launcher_name(self.config)
545             binSalome = os.path.join(self.config.APPLICATION.workdir,
546                                      launcher_name)
547             
548             binPython = binSalome + ' shell'
549             killSalome = binSalome + ' killall'
550             return binSalome, binPython, killSalome
551                 
552         return binSalome, binPython, killSalome
553         
554
555     ##
556     # Runs tests of a session (using a single instance of Salome).
557     def run_tests(self, listTest, ignoreList):
558         out_path = os.path.join(self.currentDir,
559                                 self.currentgrid,
560                                 self.currentsession)
561         if verbose: print("run_tests '%s'\nlistTest: %s\nignoreList: %s" %
562                    (self.currentDir, PP.pformat(listTest), PP.pformat(ignoreList))) # cvw TODO
563         sessionname = "%s/%s" % (self.currentgrid, self.currentsession)
564         time_out = self.get_test_timeout(sessionname,
565                                          DEFAULT_TIMEOUT)
566
567         time_out_salome = DEFAULT_TIMEOUT
568
569         # generate wrapper script
570         script_path = os.path.join(out_path, 'wrapperScript.py')
571         self.generate_script(listTest, script_path, ignoreList)
572
573         tmpDir = self.get_tmp_dir()
574
575         binSalome, binPython, killSalome = self.generate_launching_commands()
576         if "run_with_grids" in self.settings and \
577            sessionname in self.settings["run_with_grids"]:
578             binSalome = (binSalome + " -m %s" % self.settings["run_with_grids"][sessionname])
579
580         logWay = os.path.join(self.tmp_working_dir, "WORK", "log_cxx")
581
582         status = False
583         elapsed = -1
584         if self.currentsession.startswith("NOGUI_"):
585             # runSalome -t (bash)
586             status, elapsed = fork.batch(
587                                 binSalome,
588                                 self.logger,
589                                 os.path.join(self.tmp_working_dir, "WORK"),
590                                 [ "-t", "--shutdown-server=1", script_path ],
591                                 delai=time_out,
592                                 log=logWay)
593
594         elif self.currentsession.startswith("PY_"):
595             # python script.py
596             status, elapsed = fork.batch(
597                                 binPython,
598                                 self.logger,
599                                 os.path.join(self.tmp_working_dir, "WORK"),
600                                 [script_path],
601                                 delai=time_out,
602                                 log=logWay)
603
604         else:
605             opt = "-z 0"
606             if self.show_desktop: opt = "--show-desktop=0"
607             status, elapsed = fork.batch_salome(
608                                 binSalome,
609                                 self.logger,
610                                 os.path.join( self.tmp_working_dir, "WORK"),
611                                 [ opt, "--shutdown-server=1", script_path ],
612                                 getTmpDir=tmpDir,
613                                 fin=killSalome,
614                                 delai=time_out,
615                                 log=logWay,
616                                 delaiapp=time_out_salome)
617
618         self.logger.write("status = %s, elapsed = %s\n" % (status, elapsed), 5)
619
620         # create the test result to add in the config object
621         test_info = src.pyconf.Mapping(self.config)
622         test_info.testbase = self.currentTestBase
623         test_info.grid = self.currentgrid
624         test_info.session = self.currentsession
625         test_info.script = src.pyconf.Sequence(self.config)
626
627         script_results = self.read_results(listTest, elapsed == time_out)
628         for sr in sorted(script_results.keys()):
629             self.nb_run += 1
630
631             # create script result
632             script_info = src.pyconf.Mapping(self.config)
633             script_info.name = sr
634             script_info.res = script_results[sr][0]
635             script_info.time = script_results[sr][1]
636             if script_info.res == src.TIMEOUT_STATUS:
637                 script_info.time = time_out
638             if script_info.time < 1e-3: script_info.time = 0
639
640             callback = script_results[sr][2]
641             if script_info.res != src.OK_STATUS and len(callback) > 0:
642                 script_info.callback = callback
643
644             kfres = script_results[sr][3]
645             if len(kfres) > 0:
646                 script_info.known_error = src.pyconf.Mapping(self.config)
647                 script_info.known_error.date = kfres[0]
648                 script_info.known_error.expected = kfres[1]
649                 script_info.known_error.comment = kfres[2]
650                 script_info.known_error.fixed = kfres[3]
651             
652             script_info.content = script_results[sr][4]
653             script_info.out = script_results[sr][5]
654             
655             # add it to the list of results
656             test_info.script.append(script_info, '')
657
658             # display the results
659             if script_info.time > 0:
660                 exectime = "(%7.3f s)" % script_info.time
661             else:
662                 exectime = ""
663
664             sp = "." * (35 - len(script_info.name))
665             self.logger.write(self.write_test_margin(3), 3)
666             self.logger.write("script %s %s %s %s\n" % (
667                                 src.printcolors.printcLabel(script_info.name),
668                                 sp,
669                                 src.printcolors.printc(script_info.res),
670                                 exectime), 3, False)
671             if script_info and len(callback) > 0:
672                 self.logger.write("Exception in %s\n%s\n" % \
673                     (script_info.name,
674                      src.printcolors.printcWarning(callback)), 2, False)
675
676             if script_info.res == src.OK_STATUS:
677                 self.nb_succeed += 1
678             elif script_info.res == src.KNOWNFAILURE_STATUS:
679                 self.nb_acknoledge += 1
680             elif script_info.res == src.TIMEOUT_STATUS:
681                 self.nb_timeout += 1
682             elif script_info.res == src.NA_STATUS:
683                 self.nb_run -= 1
684             elif script_info.res == "?":
685                 self.nb_not_run += 1
686                 
687
688         self.config.TESTS.append(test_info, '')
689
690     ##
691     # Runs all tests of a session.
692     def run_session_tests(self):
693        
694         self.logger.write(self.write_test_margin(2), 3)
695         self.logger.write("Session = %s\n" % src.printcolors.printcLabel(
696                                                     self.currentsession), 3, False)
697
698         # prepare list of tests to run
699         tests = os.listdir(os.path.join(self.currentDir,
700                                         self.currentgrid,
701                                         self.currentsession))
702         # avoid result files of previous tests, if presents
703         # tests = filter(lambda l: l.endswith(".py"), tests)
704         tests = [t for t in tests if t.endswith(".py") \
705                    and not ( t.endswith(".out.py") or \
706                              t.endswith(".result.py") or \
707                              t.endswith("wrapperScript.py") \
708                            ) ]
709         tests = sorted(tests, key=str.lower)
710
711         # build list of known failures
712         cat = "%s/%s/" % (self.currentgrid, self.currentsession)
713         ignoreDict = {}
714         for k in self.ignore_tests.keys():
715             if k.startswith(cat):
716                 ignoreDict[k[len(cat):]] = self.ignore_tests[k]
717
718         self.run_tests(tests, ignoreDict)
719
720     ##
721     # Runs all tests of a grid.
722     def run_grid_tests(self):
723         self.logger.write(self.write_test_margin(1), 3)
724         self.logger.write("grid = %s\n" % src.printcolors.printcLabel(
725                                                 self.currentgrid), 3, False)
726
727         grid_path = os.path.join(self.currentDir, self.currentgrid)
728
729         sessions = []
730         if self.sessions is not None:
731             sessions = self.sessions # user choice
732         else:
733             # use all scripts in grid
734             sessions = filter(lambda l: l not in C_IGNORE_GRIDS,
735                            os.listdir(grid_path))
736             sessions = filter(lambda l: os.path.isdir(os.path.join(grid_path,
737                                                                 l)), sessions)
738
739         sessions = sorted(sessions, key=str.lower)
740         existingSessions = self.getSubDirectories(grid_path)
741         for session_ in sessions:
742             if not os.path.exists(os.path.join(grid_path, session_)):
743                 self.logger.write(self.write_test_margin(2), 3)
744                 msg = """\
745 Session '%s' not found
746 Existing sessions are:
747 %s
748 """ % (session_, PP.pformat(sorted(existingSessions)))
749                 self.logger.write(src.printcolors.printcWarning(msg), 3, False)
750             else:
751                 self.currentsession = session_
752                 self.run_session_tests()
753
754     def getSubDirectories(self, aDir):
755         """
756         get names of first level of sub directories in aDir
757         excluding '.git' etc as beginning with '.'
758         """
759         res = os.listdir(aDir)
760         res = [d for d in res if os.path.isdir(os.path.join(aDir, d)) and d[0] != '.']
761         # print("getSubDirectories %s are:\n%s" % (aDir, PP.pformat(res)))
762         return res
763
764     ##
765     # Runs test testbase.
766     def run_testbase_tests(self):
767         res_dir = os.path.join(self.currentDir, "RESSOURCES")
768         os.environ['PYTHONPATH'] =  (res_dir + 
769                                      os.pathsep + 
770                                      os.environ['PYTHONPATH'])
771         os.environ['TT_BASE_RESSOURCES'] = res_dir
772         src.printcolors.print_value(self.logger,
773                                     "TT_BASE_RESSOURCES",
774                                     res_dir,
775                                     4)
776         self.logger.write("\n", 4, False)
777
778         self.logger.write(self.write_test_margin(0), 3)
779         testbase_label = "Test base = %s\n" % src.printcolors.printcLabel(
780                                                         self.currentTestBase)
781         self.logger.write(testbase_label, 3, False)
782         self.logger.write("-" * len(src.printcolors.cleancolor(testbase_label)),
783                           3)
784         self.logger.write("\n", 3, False)
785
786         # load settings
787         settings_file = os.path.join(res_dir, "test_settings.py")
788         if os.path.exists(settings_file):
789             gdic, ldic = {}, {}
790             execfile(settings_file, gdic, ldic)
791             self.logger.write("Load test settings '%s'\n" % settings_file, 5)
792             self.settings = ldic['settings_dic']
793             self.ignore_tests = ldic['known_failures_list']
794             if isinstance(self.ignore_tests, list):
795                 self.ignore_tests = {}
796                 self.logger.write(src.printcolors.printcWarning(
797                   "known_failures_list must be a dictionary (not a list)") + "\n", 1, False)
798         else:
799             self.ignore_tests = {}
800             self.settings.clear()
801
802         # read known failures pyconf
803         if "testerror" in self.config.LOCAL:
804             #import testerror
805             #self.known_errors = testerror.read_test_failures(
806             #                            self.config.TOOLS.testerror.file_path,
807             #                            do_error=False)
808             pass
809         else:
810             self.known_errors = None
811
812         if self.grids is not None:
813             grids = self.grids # given by user
814         else:
815             # select all the grids (i.e. directories) in the directory
816             grids = filter(lambda l: l not in C_IGNORE_GRIDS,
817                              os.listdir(self.currentDir))
818             grids = filter(lambda l: os.path.isdir(
819                                         os.path.join(self.currentDir, l)),
820                              grids)
821
822         grids = sorted(grids, key=str.lower)
823         existingGrids = self.getSubDirectories(self.currentDir)
824         for grid in grids:
825             if not os.path.exists(os.path.join(self.currentDir, grid)):
826                 self.logger.write(self.write_test_margin(1), 3)
827                 msg = """\
828 Grid '%s' does not exist
829 Existing grids are:
830 %s
831 """ % (grid, PP.pformat(sorted(existingGrids)))
832                 self.logger.write(src.printcolors.printcWarning(msg), 3, False)
833             else:
834                 self.currentgrid = grid
835                 self.run_grid_tests()
836
837     def run_script(self, script_name):
838         if ('APPLICATION' in self.config and
839                 script_name in self.config.APPLICATION):
840             script = self.config.APPLICATION[script_name]
841             if len(script) == 0:
842                 return
843
844             self.logger.write("\n", 2, False)
845             if not os.path.exists(script):
846                 self.logger.write(src.printcolors.printcWarning("WARNING: scrip"
847                                         "t not found: %s" % script) + "\n", 2)
848             else:
849                 self.logger.write(src.printcolors.printcHeader("----------- sta"
850                                             "rt %s" % script_name) + "\n", 2)
851                 self.logger.write("Run script: %s\n" % script, 2)
852                 subprocess.Popen(script, shell=True).wait()
853                 self.logger.write(src.printcolors.printcHeader("----------- end"
854                                                 " %s" % script_name) + "\n", 2)
855
856     def run_all_tests(self):
857         initTime = datetime.datetime.now()
858
859         self.run_script('test_setup')
860         self.logger.write("\n", 2, False)
861
862         self.logger.write(src.printcolors.printcHeader(
863                                             _("=== STARTING TESTS")) + "\n", 2)
864         self.logger.write("\n", 2, False)
865         self.currentDir = os.path.join(self.tmp_working_dir,
866                                        'BASES',
867                                        self.currentTestBase)
868         self.run_testbase_tests()
869
870         # calculate total execution time
871         totalTime = datetime.datetime.now() - initTime
872         totalTime -= datetime.timedelta(microseconds=totalTime.microseconds)
873         self.logger.write("\n", 2, False)
874         self.logger.write(src.printcolors.printcHeader(_("=== END TESTS")), 2)
875         self.logger.write(" %s\n" % src.printcolors.printcInfo(str(totalTime)),
876                           2,
877                           False)
878
879         #
880         # Start the tests
881         #
882         self.run_script('test_cleanup')
883         self.logger.write("\n", 2, False)
884
885         # evaluate results
886
887         res_out = _("Tests Results: %(succeed)d / %(total)d\n") % \
888             { 'succeed': self.nb_succeed, 'total': self.nb_run }
889         if self.nb_succeed == self.nb_run:
890             res_out = src.printcolors.printcSuccess(res_out)
891         else:
892             res_out = src.printcolors.printcError(res_out)
893         self.logger.write(res_out, 1)
894
895         if self.nb_timeout > 0:
896             self.logger.write(_("%d tests TIMEOUT\n") % self.nb_timeout, 1)
897         if self.nb_not_run > 0:
898             self.logger.write(_("%d tests not executed\n") % self.nb_not_run, 1)
899         if self.nb_acknoledge > 0:
900             self.logger.write(_("%d tests known failures\n") % self.nb_acknoledge, 1)
901
902         status = src.OK_STATUS
903         if self.nb_run - self.nb_succeed - self.nb_acknoledge > 0:
904             status = src.KO_STATUS
905         elif self.nb_acknoledge:
906             status = src.KNOWNFAILURE_STATUS
907         
908         self.logger.write(_("Status: %s\n" % status), 3)
909
910         return self.nb_run - self.nb_succeed - self.nb_acknoledge
911
912     ##
913     # Write margin to show test results.
914     def write_test_margin(self, tab):
915         if tab == 0:
916             return ""
917         return "|   " * (tab - 1) + "+ "
918