Salome HOME
db4cb851e869c577f87a076121ad41483256aa86
[tools/sat.git] / src / test_module.py
1 #!/usr/bin/env python
2 #-*- coding:utf-8 -*-
3 #  Copyright (C) 2010-2013  CEA/DEN
4 #
5 #  This library is free software; you can redistribute it and/or
6 #  modify it under the terms of the GNU Lesser General Public
7 #  License as published by the Free Software Foundation; either
8 #  version 2.1 of the License.
9 #
10 #  This library is distributed in the hope that it will be useful,
11 #  but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 #  Lesser General Public License for more details.
14 #
15 #  You should have received a copy of the GNU Lesser General Public
16 #  License along with this library; if not, write to the Free Software
17 #  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
18
19 import os, sys, datetime, shutil, string
20 import subprocess
21 import fork
22 import src
23
24 # directories not considered as test modules
25 C_IGNORE_MODULES = ['.git', '.svn', 'RESSOURCES']
26
27 C_TESTS_SOURCE_DIR = "Tests"
28 C_TESTS_LIGHT_FILE = "TestsLight.txt"
29
30 # Get directory to be used for the temporary files.
31 #
32 def getTmpDirDEFAULT():
33     if src.architecture.is_windows():
34         directory = os.getenv("TEMP")
35     else:
36         # for Linux: use /tmp/logs/{user} folder
37         directory = os.path.join( '/tmp', 'logs', os.getenv("USER", "unknown"))
38     return directory
39
40 class Test:
41     def __init__(self,
42                  config,
43                  logger,
44                  sessionDir,
45                  grid="",
46                  modules=None,
47                  types=None,
48                  appli="",
49                  mode="normal",
50                  dir_="",
51                  show_desktop=True,
52                  light=False):
53         self.modules = modules
54         self.config = config
55         self.logger = logger
56         self.sessionDir = sessionDir
57         self.dir = dir_
58         self.types = types
59         self.appli = appli
60         self.mode = mode
61         self.show_desktop = show_desktop
62         self.light = light
63
64         if len(self.dir) > 0:
65             self.logger.write("\n", 3, False)
66             self.prepare_grid_from_dir("DIR", self.dir)
67             self.currentGrid = "DIR"
68         else:
69             self.prepare_grid(grid)
70
71         self.settings = {}
72         self.known_errors = None
73
74         # create section for results
75         self.config.TESTS = src.pyconf.Sequence(self.config)
76
77         self.nb_run = 0
78         self.nb_succeed = 0
79         self.nb_timeout = 0
80         self.nb_not_run = 0
81         self.nb_acknoledge = 0
82
83     def _copy_dir(self, source, target):
84         if self.config.VARS.python >= "2.6":
85             shutil.copytree(source, target,
86                             symlinks=True,
87                             ignore=shutil.ignore_patterns('.git*','.svn*'))
88         else:
89             shutil.copytree(source, target,
90                             symlinks=True)
91
92     def prepare_grid_from_dir(self, grid_name, grid_dir):
93         self.logger.write(_("get grid from dir: %s\n") % src.printcolors.printcLabel(grid_dir), 3)
94         if not os.access(grid_dir, os.X_OK):
95             raise src.SatException(_("testbase %(name)s (%(dir)s) does not exist ...\n") % \
96                 { 'name': grid_name, 'dir': grid_dir })
97
98         self._copy_dir(grid_dir, os.path.join(self.sessionDir, 'BASES', grid_name))
99
100     def prepare_grid_from_git(self, grid_name, grid_base, grid_tag):
101         self.logger.write(
102             _("get grid '%(grid)s' with '%(tag)s' tag from git\n") % {
103                                "grid" : src.printcolors.printcLabel(grid_name),
104                                "tag" : src.printcolors.printcLabel(grid_tag)},
105                           3)
106         try:
107             def set_signal(): # pragma: no cover
108                 """see http://bugs.python.org/issue1652"""
109                 import signal
110                 signal.signal(signal.SIGPIPE, signal.SIG_DFL)
111
112             cmd = "git clone --depth 1 %(base)s %(dir)s"
113             cmd += " && cd %(dir)s"
114             if grid_tag=='master':
115                 cmd += " && git fetch origin %(branch)s"
116             else:
117                 cmd += " && git fetch origin %(branch)s:%(branch)s"
118             cmd += " && git checkout %(branch)s"
119             cmd = cmd % { 'branch': grid_tag, 'base': grid_base, 'dir': grid_name }
120
121             self.logger.write("> %s\n" % cmd, 5)
122             if src.architecture.is_windows():
123                 # preexec_fn not supported on windows platform
124                 res = subprocess.call(cmd,
125                                 cwd=os.path.join(self.sessionDir, 'BASES'),
126                                 shell=True,
127                                 stdout=self.logger.logTxtFile,
128                                 stderr=subprocess.PIPE)
129             else:
130                 res = subprocess.call(cmd,
131                                 cwd=os.path.join(self.sessionDir, 'BASES'),
132                                 shell=True,
133                                 preexec_fn=set_signal,
134                                 stdout=self.logger.logTxtFile,
135                                 stderr=subprocess.PIPE)
136             if res != 0:
137                 raise src.SatException(_("Error: unable to get test base '%(name)s' from git '%(repo)s'.") % \
138                     { 'name': grid_name, 'repo': grid_base })
139
140         except OSError:
141             self.logger.error(_("git is not installed. exiting...\n"))
142             sys.exit(0)
143
144     def prepare_grid_from_svn(self, user, grid_name, grid_base):
145         self.logger.write(_("get grid '%s' from svn\n") % src.printcolors.printcLabel(grid_name), 3)
146         try:
147             def set_signal(): # pragma: no cover
148                 """see http://bugs.python.org/issue1652"""
149                 import signal
150                 signal.signal(signal.SIGPIPE, signal.SIG_DFL)
151
152             cmd = "svn checkout --username %(user)s %(base)s %(dir)s"
153             cmd = cmd % { 'user': user, 'base': grid_base, 'dir': grid_name }
154
155             self.logger.write("> %s\n" % cmd, 5)
156             if src.architecture.is_windows():
157                 # preexec_fn not supported on windows platform
158                 res = subprocess.call(cmd,
159                                 cwd=os.path.join(self.sessionDir, 'BASES'),
160                                 shell=True,
161                                 stdout=self.logger.logTxtFile,
162                                 stderr=subprocess.PIPE)
163             else:
164                 res = subprocess.call(cmd,
165                                 cwd=os.path.join(self.sessionDir, 'BASES'),
166                                 shell=True,
167                                 preexec_fn=set_signal,
168                                 stdout=self.logger.logTxtFile,
169                                 stderr=subprocess.PIPE)
170
171             if res != 0:
172                 raise src.SatException(_("Error: unable to get test base '%(nam"
173                                          "e)s' from svn '%(repo)s'.") % \
174                                        { 'name': grid_name, 'repo': grid_base })
175
176         except OSError:
177             self.logger.error(_("svn is not installed. exiting...\n"))
178             sys.exit(0)
179
180     ##
181     # Configure tests base.
182     def prepare_grid(self, grid_name):
183         src.printcolors.print_value(self.logger,
184                                     _("Testing grid"),
185                                     grid_name,
186                                     3)
187         self.logger.write("\n", 3, False)
188
189         # search for the grid
190         test_base_info = None
191         for project_name in self.config.PROJECTS.projects:
192             project_info = self.config.PROJECTS.projects[project_name]
193             for t_b_info in project_info.test_bases:
194                 if t_b_info.name == grid_name:
195                     test_base_info = t_b_info
196         
197         if not test_base_info:
198             message = _("########## WARNING: grid '%s' not found\n") % grid_name
199             raise src.SatException(message)
200
201         if test_base_info.get_sources == "dir":
202             self.prepare_grid_from_dir(grid_name, test_base_info.info.dir)
203         elif test_base_info.get_sources == "git":
204             self.prepare_grid_from_git(grid_name,
205                                        test_base_info.info.base,
206                                        self.config.APPLICATION.test_base.tag)
207         elif test_base_info.get_sources == "svn":
208             svn_user = src.get_cfg_param(test_base_info.svn_info,
209                                          "svn_user",
210                                          self.config.USER.svn_user)
211             self.prepare_grid_from_svn(svn_user,
212                                        grid_name,
213                                        test_base_info.info.base)
214         else:
215             raise src.SatException(_("unknown source type '%(type)s' for testb"
216                                      "ase '%(grid)s' ...\n") % {
217                                         'type': test_base_info.get_sources,
218                                         'grid': grid_name })
219
220         self.currentGrid = grid_name
221
222     ##
223     # Searches if the script is declared in known errors pyconf.
224     # Update the status if needed.
225     def search_known_errors(self, status, test_module, test_type, test):
226         test_path = os.path.join(test_module, test_type, test)
227         if not src.config_has_application(self.config):
228             return status, []
229
230         if self.known_errors is None:
231             return status, []
232
233         platform = self.config.VARS.arch
234         application = self.config.VARS.application
235         error = self.known_errors.get_error(test_path, application, platform)
236         if error is None:
237             return status, []
238         
239         if status == src.OK_STATUS:
240             if not error.fixed:
241                 # the error is fixed
242                 self.known_errors.fix_error(error)
243                 #import testerror
244                 #testerror.write_test_failures(
245                 #                        self.config.TOOLS.testerror.file_path,
246                 #                        self.known_errors.errors)
247             return status, [ error.date,
248                             error.expected,
249                             error.comment,
250                             error.fixed ]
251
252         if error.fixed:
253             self.known_errors.unfix_error(error)
254             #import testerror
255             #testerror.write_test_failures(self.config.TOOLS.testerror.file_path,
256             #                              self.known_errors.errors)
257
258         delta = self.known_errors.get_expecting_days(error)
259         kfres = [ error.date, error.expected, error.comment, error.fixed ]
260         if delta < 0:
261             return src.KO_STATUS, kfres
262         return src.KNOWNFAILURE_STATUS, kfres
263
264     ##
265     # Read the *.result.py files.
266     def read_results(self, listTest, has_timed_out):
267         results = {}
268         for test in listTest:
269             resfile = os.path.join(self.currentDir,
270                                    self.currentModule,
271                                    self.currentType,
272                                    test[:-3] + ".result.py")
273
274             # check if <test>.result.py file exists
275             if not os.path.exists(resfile):
276                 results[test] = ["?", -1, "", []]
277             else:
278                 gdic, ldic = {}, {}
279                 execfile(resfile, gdic, ldic)
280
281                 status = src.TIMEOUT_STATUS
282                 if not has_timed_out:
283                     status = src.KO_STATUS
284
285                 if ldic.has_key('status'):
286                     status = ldic['status']
287
288                 expected = []
289                 if status == src.KO_STATUS or status == src.OK_STATUS:
290                     status, expected = self.search_known_errors(status,
291                                                             self.currentModule,
292                                                             self.currentType,
293                                                             test)
294
295                 callback = ""
296                 if ldic.has_key('callback'):
297                     callback = ldic['callback']
298                 elif status == src.KO_STATUS:
299                     callback = "CRASH"
300
301                 exec_time = -1
302                 if ldic.has_key('time'):
303                     try:
304                         exec_time = float(ldic['time'])
305                     except:
306                         pass
307
308                 results[test] = [status, exec_time, callback, expected]
309
310         return results
311
312     ##
313     # Generates the script to be run by Salome.
314     # This python script includes init and close statements and a loop
315     # calling all the scripts of a single directory.
316     def generate_script(self, listTest, script_path, ignoreList):
317         # open template file
318         template_file = open(os.path.join(self.config.VARS.srcDir,
319                                           "test",
320                                           "scriptTemplate.py"), 'r')
321         template = string.Template(template_file.read())
322
323         # create substitution dictionary
324         d = dict()
325         d['resourcesWay'] = os.path.join(self.currentDir, 'RESSOURCES')
326         d['tmpDir'] = os.path.join(self.sessionDir, 'WORK')
327         d['toolsWay'] = os.path.join(self.config.VARS.srcDir, "test")
328         d['typeDir'] = os.path.join(self.currentDir,
329                                     self.currentModule,
330                                     self.currentType)
331         d['resultFile'] = os.path.join(self.sessionDir, 'WORK', 'exec_result')
332         d['listTest'] = listTest
333         d['typeName'] = self.currentType
334         d['ignore'] = ignoreList
335
336         # create script with template
337         script = open(script_path, 'w')
338         script.write(template.safe_substitute(d))
339         script.close()
340
341     # Find the getTmpDir function that gives access to *pidict file directory.
342     # (the *pidict file exists when SALOME is launched) 
343     def get_tmp_dir(self):
344         # Rare case where there is no KERNEL in module list 
345         # (for example MED_STANDALONE)
346         if ('APPLICATION' in self.config 
347                 and 'KERNEL' not in self.config.APPLICATION.products 
348                 and 'KERNEL_ROOT_DIR' not in os.environ):
349             return getTmpDirDEFAULT
350         
351         # Case where "sat test" is launched in an existing SALOME environment
352         if 'KERNEL_ROOT_DIR' in os.environ:
353             root_dir =  os.environ['KERNEL_ROOT_DIR']
354         
355         if ('APPLICATION' in self.config 
356                 and 'KERNEL' in self.config.APPLICATION.products):
357             root_dir = src.product.get_product_config(self.config,
358                                                       "KERNEL").install_dir
359
360         # Case where there the appli option is called (with path to launcher)
361         if len(self.appli) > 0:
362             # There are two cases : The old application (runAppli) 
363             # and the new one
364             launcherName = os.path.basename(self.appli)
365             launcherDir = os.path.dirname(self.appli)
366             if launcherName == 'runAppli':
367                 # Old application
368                 cmd = "for i in " + launcherDir + "/env.d/*.sh; do source ${i};"
369                 " done ; echo $KERNEL_ROOT_DIR"
370             else:
371                 # New application
372                 cmd = "echo -e 'import os\nprint os.environ[\"KERNEL_ROOT_DIR\""
373                 "]' > tmpscript.py; %s shell tmpscript.py" % self.appli
374             root_dir = subprocess.Popen(cmd,
375                             stdout=subprocess.PIPE,
376                             shell=True,
377                             executable='/bin/bash').communicate()[0].split()[-1]
378         
379         # import module salome_utils from KERNEL that gives 
380         # the right getTmpDir function
381         import imp
382         (file_, pathname, description) = imp.find_module("salome_utils",
383                                                          [os.path.join(root_dir,
384                                                                     'bin',
385                                                                     'salome')])
386         try:
387             module = imp.load_module("salome_utils",
388                                      file_,
389                                      pathname,
390                                      description)
391             return module.getLogDir
392         except:
393             module = imp.load_module("salome_utils",
394                                      file_,
395                                      pathname,
396                                      description)
397             return module.getTmpDir
398         finally:
399             if file_:
400                 file_.close()
401
402
403     def get_test_timeout(self, test_name, default_value):
404         if (self.settings.has_key("timeout") and 
405                 self.settings["timeout"].has_key(test_name)):
406             return self.settings["timeout"][test_name]
407
408         return default_value
409
410     def generate_launching_commands(self, typename):
411         # Case where "sat test" is launched in an existing SALOME environment
412         if 'KERNEL_ROOT_DIR' in os.environ:
413             binSalome = "runSalome"
414             binPython = "python"
415             killSalome = "killSalome.py"
416         
417         # Rare case where there is no KERNEL in module list 
418         # (for example MED_STANDALONE)
419         if ('APPLICATION' in self.config and 
420                 'KERNEL' not in self.config.APPLICATION.products):
421             binSalome = "runSalome"
422             binPython = "python" 
423             killSalome = "killSalome.py"   
424             src.environment.load_environment(self.config, False, self.logger)         
425             return binSalome, binPython, killSalome
426         
427         # Case where there the appli option is called (with path to launcher)
428         if len(self.appli) > 0:
429             # There are two cases : The old application (runAppli) 
430             # and the new one
431             launcherName = os.path.basename(self.appli)
432             launcherDir = os.path.dirname(self.appli)
433             if launcherName == 'runAppli':
434                 # Old application
435                 binSalome = self.appli
436                 binPython = ("for i in " +
437                              launcherDir +
438                              "/env.d/*.sh; do source ${i}; done ; python")
439                 killSalome = ("for i in " +
440                         launcherDir +
441                         "/env.d/*.sh; do source ${i}; done ; killSalome.py'")
442                 return binSalome, binPython, killSalome
443             else:
444                 # New application
445                 binSalome = self.appli
446                 binPython = self.appli + ' shell'
447                 killSalome = self.appli + ' killall'
448                 return binSalome, binPython, killSalome
449
450         # SALOME version detection and APPLI repository detection
451         VersionSalome = src.get_salome_version(self.config)
452         appdir = 'APPLI'
453         if "APPLI" in self.config and "application_name" in self.config.APPLI:
454             appdir = self.config.APPLI.application_name
455         
456         # Case where SALOME has NOT the launcher that uses the SalomeContext API
457         if VersionSalome < 730:
458             binSalome = os.path.join(self.config.APPLI.module_appli_install_dir,
459                                      appdir,
460                                      "runAppli")
461             binPython = "python"
462             killSalome = "killSalome.py"
463             src.environment.load_environment(self.config, False, self.logger)           
464             return binSalome, binPython, killSalome
465         
466         # Case where SALOME has the launcher that uses the SalomeContext API
467         if VersionSalome >= 730:            
468             if 'profile' not in self.config.APPLICATION:
469                 # Before revision of application concept
470                 launcher_name = self.config.APPLI.launch_alias_name
471                 binSalome = os.path.join(self.config.APPLICATION.workdir,
472                                          appdir,
473                                          launcher_name)
474             else:
475                 # After revision of application concept
476                 launcher_name = self.config.APPLICATION.profile.launcher_name
477                 binSalome = os.path.join(self.config.APPLICATION.workdir,
478                                          launcher_name)
479             
480             if src.architecture.is_windows():
481                 binSalome += '.bat'
482
483             binPython = binSalome + ' shell'
484             killSalome = binSalome + ' killall'
485             return binSalome, binPython, killSalome
486                 
487         return binSalome, binPython, killSalome
488         
489
490     ##
491     # Runs tests of a type (using a single instance of Salome).
492     def run_tests(self, listTest, ignoreList):
493         out_path = os.path.join(self.currentDir,
494                                 self.currentModule,
495                                 self.currentType)
496         typename = "%s/%s" % (self.currentModule, self.currentType)
497         time_out = self.get_test_timeout(typename,
498                                          self.config.SITE.test.timeout)
499
500         time_out_salome = src.get_cfg_param(self.config.SITE.test,
501                                             "timeout_app",
502                                             self.config.SITE.test.timeout)
503
504         # generate wrapper script
505         script_path = os.path.join(out_path, 'wrapperScript.py')
506         self.generate_script(listTest, script_path, ignoreList)
507
508         tmpDir = self.get_tmp_dir()
509
510         binSalome, binPython, killSalome = self.generate_launching_commands(
511                                                                     typename)
512         if self.settings.has_key("run_with_modules") \
513            and self.settings["run_with_modules"].has_key(typename):
514             binSalome = (binSalome +
515                          " -m %s" % self.settings["run_with_modules"][typename])
516
517         logWay = os.path.join(self.sessionDir, "WORK", "log_cxx")
518
519         status = False
520         ellapsed = -1
521         if self.currentType.startswith("NOGUI_"):
522             # runSalome -t (bash)
523             status, ellapsed = fork.batch(binSalome, self.logger,
524                                         os.path.join(self.sessionDir, "WORK"),
525                                         [ "-t",
526                                          "--shutdown-server=1",
527                                          script_path ],
528                                         delai=time_out,
529                                         log=logWay)
530
531         elif self.currentType.startswith("PY_"):
532             # python script.py
533             status, ellapsed = fork.batch(binPython, self.logger,
534                                           os.path.join(self.sessionDir, "WORK"),
535                                           [script_path],
536                                           delai=time_out, log=logWay)
537
538         else:
539             opt = "-z 0"
540             if self.show_desktop: opt = "--show-desktop=0"
541             status, ellapsed = fork.batch_salome(binSalome,
542                                                  self.logger,
543                                                  os.path.join(self.sessionDir,
544                                                               "WORK"),
545                                                  [ opt,
546                                                   "--shutdown-server=1",
547                                                   script_path ],
548                                                  getTmpDir=tmpDir,
549                                                  fin=killSalome,
550                                                  delai=time_out,
551                                                  log=logWay,
552                                                  delaiapp=time_out_salome)
553
554         self.logger.write("status = %s, ellapsed = %s\n" % (status, ellapsed),
555                           5)
556
557         # create the test result to add in the config object
558         test_info = src.pyconf.Mapping(self.config)
559         test_info.grid = self.currentGrid
560         test_info.module = self.currentModule
561         test_info.type = self.currentType
562         test_info.script = src.pyconf.Sequence(self.config)
563
564         script_results = self.read_results(listTest, ellapsed == time_out)
565         for sr in sorted(script_results.keys()):
566             self.nb_run += 1
567
568             # create script result
569             script_info = src.pyconf.Mapping(self.config)
570             script_info.name = sr
571             script_info.res = script_results[sr][0]
572             script_info.time = script_results[sr][1]
573             if script_info.res == src.TIMEOUT_STATUS:
574                 script_info.time = time_out
575             if script_info.time < 1e-3: script_info.time = 0
576
577             callback = script_results[sr][2]
578             if script_info.res != src.OK_STATUS and len(callback) > 0:
579                 script_info.callback = callback
580
581             kfres = script_results[sr][3]
582             if len(kfres) > 0:
583                 script_info.known_error = src.pyconf.Mapping(self.config)
584                 script_info.known_error.date = kfres[0]
585                 script_info.known_error.expected = kfres[1]
586                 script_info.known_error.comment = kfres[2]
587                 script_info.known_error.fixed = kfres[3]
588
589             # add it to the list of results
590             test_info.script.append(script_info, '')
591
592             # display the results
593             if script_info.time > 0:
594                 exectime = "(%7.3f s)" % script_info.time
595             else:
596                 exectime = ""
597
598             sp = "." * (35 - len(script_info.name))
599             self.logger.write(self.write_test_margin(3), 3)
600             self.logger.write("script %s %s %s %s\n" % (
601                                 src.printcolors.printcLabel(script_info.name),
602                                 sp,
603                                 src.printcolors.printc(script_info.res),
604                                 exectime), 3, False)
605             if script_info and len(callback) > 0:
606                 self.logger.write("Exception in %s\n%s\n" % \
607                     (script_info.name,
608                      src.printcolors.printcWarning(callback)), 2, False)
609
610             if script_info.res == src.OK_STATUS:
611                 self.nb_succeed += 1
612             elif script_info.res == src.KNOWNFAILURE_STATUS:
613                 self.nb_acknoledge += 1
614             elif script_info.res == src.TIMEOUT_STATUS:
615                 self.nb_timeout += 1
616             elif script_info.res == src.NA_STATUS:
617                 self.nb_run -= 1
618             elif script_info.res == "?":
619                 self.nb_not_run += 1
620
621         self.config.TESTS.append(test_info, '')
622
623     ##
624     # Runs all tests of a type.
625     def run_type_tests(self, light_test):
626         if self.light:
627             if not any(map(lambda l: l.startswith(self.currentType),
628                            light_test)):
629                 # no test to run => skip
630                 return
631         
632         self.logger.write(self.write_test_margin(2), 3)
633         self.logger.write("Type = %s\n" % src.printcolors.printcLabel(
634                                                     self.currentType), 3, False)
635
636         # prepare list of tests to run
637         tests = os.listdir(os.path.join(self.currentDir,
638                                         self.currentModule,
639                                         self.currentType))
640         tests = filter(lambda l: l.endswith(".py"), tests)
641         tests = sorted(tests, key=str.lower)
642
643         if self.light:
644             tests = filter(lambda l: os.path.join(self.currentType,
645                                                   l) in light_test, tests)
646
647         # build list of known failures
648         cat = "%s/%s/" % (self.currentModule, self.currentType)
649         ignoreDict = {}
650         for k in self.ignore_tests.keys():
651             if k.startswith(cat):
652                 ignoreDict[k[len(cat):]] = self.ignore_tests[k]
653
654         self.run_tests(tests, ignoreDict)
655
656     ##
657     # Runs all tests of a module.
658     def run_module_tests(self):
659         self.logger.write(self.write_test_margin(1), 3)
660         self.logger.write("Module = %s\n" % src.printcolors.printcLabel(
661                                                 self.currentModule), 3, False)
662
663         module_path = os.path.join(self.currentDir, self.currentModule)
664
665         types = []
666         if self.types is not None:
667             types = self.types # user choice
668         else:
669             # use all scripts in module
670             types = filter(lambda l: l not in C_IGNORE_MODULES,
671                            os.listdir(module_path))
672             types = filter(lambda l: os.path.isdir(os.path.join(module_path,
673                                                                 l)), types)
674
675         # in batch mode keep only modules with NOGUI or PY
676         if self.mode == "batch":
677             types = filter(lambda l: ("NOGUI" in l or "PY" in l), types)
678
679         light_test = []
680         if self.light:
681             light_path = os.path.join(module_path, C_TESTS_LIGHT_FILE)
682             if not os.path.exists(light_path):
683                 types = []
684                 msg = src.printcolors.printcWarning(_("List of light tests not"
685                                                     " found: %s") % light_path)
686                 self.logger.write(msg + "\n")
687             else:
688                 # read the file
689                 light_file = open(light_path, "r")
690                 light_test = map(lambda l: l.strip(), light_file.readlines())
691
692         types = sorted(types, key=str.lower)
693         for type_ in types:
694             if not os.path.exists(os.path.join(module_path, type_)):
695                 self.logger.write(self.write_test_margin(2), 3)
696                 self.logger.write(src.printcolors.printcWarning("Type %s not "
697                                             "found" % type_) + "\n", 3, False)
698             else:
699                 self.currentType = type_
700                 self.run_type_tests(light_test)
701
702     ##
703     # Runs test grid.
704     def run_grid_tests(self):
705         res_dir = os.path.join(self.currentDir, "RESSOURCES")
706         os.environ['PYTHONPATH'] =  (res_dir + 
707                                      os.pathsep + 
708                                      os.environ['PYTHONPATH'])
709         os.environ['TT_BASE_RESSOURCES'] = res_dir
710         src.printcolors.print_value(self.logger,
711                                     "TT_BASE_RESSOURCES",
712                                     res_dir,
713                                     4)
714         self.logger.write("\n", 4, False)
715
716         self.logger.write(self.write_test_margin(0), 3)
717         grid_label = "Grid = %s\n" % src.printcolors.printcLabel(
718                                                             self.currentGrid)
719         self.logger.write(grid_label, 3, False)
720         self.logger.write("-" * len(src.printcolors.cleancolor(grid_label)), 3)
721         self.logger.write("\n", 3, False)
722
723         # load settings
724         settings_file = os.path.join(res_dir, "test_settings.py")
725         if os.path.exists(settings_file):
726             gdic, ldic = {}, {}
727             execfile(settings_file, gdic, ldic)
728             self.logger.write(_("Load test settings\n"), 3)
729             self.settings = ldic['settings_dic']
730             self.ignore_tests = ldic['known_failures_list']
731             if isinstance(self.ignore_tests, list):
732                 self.ignore_tests = {}
733                 self.logger.write(src.printcolors.printcWarning("known_failur"
734                   "es_list must be a dictionary (not a list)") + "\n", 1, False)
735         else:
736             self.ignore_tests = {}
737             self.settings.clear()
738
739         # read known failures pyconf
740         if "testerror" in self.config.SITE:
741             #import testerror
742             #self.known_errors = testerror.read_test_failures(
743             #                            self.config.TOOLS.testerror.file_path,
744             #                            do_error=False)
745             pass
746         else:
747             self.known_errors = None
748
749         if self.modules is not None:
750             modules = self.modules # given by user
751         else:
752             # select all the modules (i.e. directories) in the directory
753             modules = filter(lambda l: l not in C_IGNORE_MODULES,
754                              os.listdir(self.currentDir))
755             modules = filter(lambda l: os.path.isdir(
756                                         os.path.join(self.currentDir, l)),
757                              modules)
758
759         modules = sorted(modules, key=str.lower)
760         for module in modules:
761             if not os.path.exists(os.path.join(self.currentDir, module)):
762                 self.logger.write(self.write_test_margin(1), 3)
763                 self.logger.write(src.printcolors.printcWarning(
764                             "Module %s does not exist\n" % module), 3, False)
765             else:
766                 self.currentModule = module
767                 self.run_module_tests()
768
769     def run_script(self, script_name):
770         if 'APPLICATION' in self.config and script_name in self.config.APPLICATION:
771             script = self.config.APPLICATION[script_name]
772             if len(script) == 0:
773                 return
774
775             self.logger.write("\n", 2, False)
776             if not os.path.exists(script):
777                 self.logger.write(src.printcolors.printcWarning("WARNING: scrip"
778                                         "t not found: %s" % script) + "\n", 2)
779             else:
780                 self.logger.write(src.printcolors.printcHeader("----------- sta"
781                                             "rt %s" % script_name) + "\n", 2)
782                 self.logger.write("Run script: %s\n" % script, 2)
783                 subprocess.Popen(script, shell=True).wait()
784                 self.logger.write(src.printcolors.printcHeader("----------- end"
785                                                 " %s" % script_name) + "\n", 2)
786
787     def run_all_tests(self, session_name=""):
788         initTime = datetime.datetime.now()
789
790         self.run_script('test_setup')
791         self.logger.write("\n", 2, False)
792
793         self.logger.write(src.printcolors.printcHeader(
794                                             _("=== STARTING TESTS")) + "\n", 2)
795         self.logger.write("\n", 2, False)
796         self.currentDir = os.path.join(self.sessionDir,
797                                        'BASES',
798                                        self.currentGrid)
799         self.run_grid_tests()
800
801         # calculate total execution time
802         totalTime = datetime.datetime.now() - initTime
803         totalTime -= datetime.timedelta(microseconds=totalTime.microseconds)
804         self.logger.write("\n", 2, False)
805         self.logger.write(src.printcolors.printcHeader(_("=== END TESTS")), 2)
806         self.logger.write(" %s\n" % src.printcolors.printcInfo(str(totalTime)),
807                           2,
808                           False)
809
810         #
811         # Start the tests
812         #
813         self.run_script('test_cleanup')
814         self.logger.write("\n", 2, False)
815
816         # evaluate results
817         res_count = "%d / %d" % (self.nb_succeed,
818                                  self.nb_run - self.nb_acknoledge)
819
820         res_out = _("Tests Results: %(succeed)d / %(total)d\n") % \
821             { 'succeed': self.nb_succeed, 'total': self.nb_run }
822         if self.nb_succeed == self.nb_run:
823             res_out = src.printcolors.printcSuccess(res_out)
824         else:
825             res_out = src.printcolors.printcError(res_out)
826         self.logger.write(res_out, 1)
827
828         if self.nb_timeout > 0:
829             self.logger.write(_("%d tests TIMEOUT\n") % self.nb_timeout, 1)
830             res_count += " TO: %d" % self.nb_timeout
831         if self.nb_not_run > 0:
832             self.logger.write(_("%d tests not executed\n") % self.nb_not_run, 1)
833             res_count += " NR: %d" % self.nb_not_run
834
835         status = src.OK_STATUS
836         if self.nb_run - self.nb_succeed - self.nb_acknoledge > 0:
837             status = src.KO_STATUS
838         elif self.nb_acknoledge:
839             status = src.KNOWNFAILURE_STATUS
840         
841         self.logger.write(_("Status: %s\n" % status), 3)
842         
843         if session_name is not None and len(session_name) > 0:
844             self.config.RESULTS.test["session"] = session_name
845
846         return self.nb_run - self.nb_succeed - self.nb_acknoledge
847
848     ##
849     # Write margin to show test results.
850     def write_test_margin(self, tab):
851         if tab == 0:
852             return ""
853         return "|   " * (tab - 1) + "+ "
854