Salome HOME
a7bafd149796714d797a1bbf50a06c70b742844c
[tools/sat.git] / src / test_module.py
1 #!/usr/bin/env python
2 #-*- coding:utf-8 -*-
3 #  Copyright (C) 2010-2013  CEA/DEN
4 #
5 #  This library is free software; you can redistribute it and/or
6 #  modify it under the terms of the GNU Lesser General Public
7 #  License as published by the Free Software Foundation; either
8 #  version 2.1 of the License.
9 #
10 #  This library is distributed in the hope that it will be useful,
11 #  but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 #  Lesser General Public License for more details.
14 #
15 #  You should have received a copy of the GNU Lesser General Public
16 #  License along with this library; if not, write to the Free Software
17 #  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
18
19 # Python 2/3 compatibility for execfile function
20 try:
21     execfile
22 except:
23     def execfile(somefile, global_vars, local_vars):
24         with open(somefile) as f:
25             code = compile(f.read(), somefile, 'exec')
26             exec(code, global_vars, local_vars)
27
28
29 import os
30 import sys
31 import datetime
32 import shutil
33 import string
34 import imp
35 import subprocess
36
37 from . import fork
38 import src
39
40 # directories not considered as test modules
41 C_IGNORE_MODULES = ['.git', '.svn', 'RESSOURCES']
42
43 C_TESTS_LIGHT_FILE = "TestsLight.txt"
44
45 # Get directory to be used for the temporary files.
46 #
47 def getTmpDirDEFAULT():
48     if src.architecture.is_windows():
49         directory = os.getenv("TEMP")
50     else:
51         # for Linux: use /tmp/logs/{user} folder
52         directory = os.path.join( '/tmp', 'logs', os.getenv("USER", "unknown"))
53     return directory
54
55 class Test:
56     def __init__(self,
57                  config,
58                  logger,
59                  tmp_working_dir,
60                  testbase="",
61                  modules=None,
62                  types=None,
63                  launcher="",
64                  show_desktop=True):
65         self.modules = modules
66         self.config = config
67         self.logger = logger
68         self.tmp_working_dir = tmp_working_dir
69         self.types = types
70         self.launcher = launcher
71         self.show_desktop = show_desktop
72
73         res = self.prepare_testbase(testbase)
74         self.test_base_found = True
75         if res == 1:
76             # Fail
77             self.test_base_found = False
78         
79         self.settings = {}
80         self.known_errors = None
81
82         # create section for results
83         self.config.TESTS = src.pyconf.Sequence(self.config)
84
85         self.nb_run = 0
86         self.nb_succeed = 0
87         self.nb_timeout = 0
88         self.nb_not_run = 0
89         self.nb_acknoledge = 0
90
91     def _copy_dir(self, source, target):
92         if self.config.VARS.python >= "2.6":
93             shutil.copytree(source, target,
94                             symlinks=True,
95                             ignore=shutil.ignore_patterns('.git*','.svn*'))
96         else:
97             shutil.copytree(source, target,
98                             symlinks=True)
99
100     def prepare_testbase_from_dir(self, testbase_name, testbase_dir):
101         self.logger.write(_("get test base from dir: %s\n") % \
102                           src.printcolors.printcLabel(testbase_dir), 3)
103         if not os.access(testbase_dir, os.X_OK):
104             raise src.SatException(_("testbase %(name)s (%(dir)s) does not "
105                                      "exist ...\n") % { 'name': testbase_name,
106                                                        'dir': testbase_dir })
107
108         self._copy_dir(testbase_dir,
109                        os.path.join(self.tmp_working_dir, 'BASES', testbase_name))
110
111     def prepare_testbase_from_git(self,
112                                   testbase_name,
113                                   testbase_base,
114                                   testbase_tag):
115         self.logger.write(
116             _("get test base '%(testbase)s' with '%(tag)s' tag from git\n") % {
117                         "testbase" : src.printcolors.printcLabel(testbase_name),
118                         "tag" : src.printcolors.printcLabel(testbase_tag)},
119                           3)
120         try:
121             def set_signal(): # pragma: no cover
122                 """see http://bugs.python.org/issue1652"""
123                 import signal
124                 signal.signal(signal.SIGPIPE, signal.SIG_DFL)
125
126             cmd = "git clone --depth 1 %(base)s %(dir)s"
127             cmd += " && cd %(dir)s"
128             if testbase_tag=='master':
129                 cmd += " && git fetch origin %(branch)s"
130             else:
131                 cmd += " && git fetch origin %(branch)s:%(branch)s"
132             cmd += " && git checkout %(branch)s"
133             cmd = cmd % { 'branch': testbase_tag,
134                          'base': testbase_base,
135                          'dir': testbase_name }
136
137             self.logger.write("> %s\n" % cmd, 5)
138             if src.architecture.is_windows():
139                 # preexec_fn not supported on windows platform
140                 res = subprocess.call(cmd,
141                                 cwd=os.path.join(self.tmp_working_dir, 'BASES'),
142                                 shell=True,
143                                 stdout=self.logger.logTxtFile,
144                                 stderr=subprocess.PIPE)
145             else:
146                 res = subprocess.call(cmd,
147                                 cwd=os.path.join(self.tmp_working_dir, 'BASES'),
148                                 shell=True,
149                                 preexec_fn=set_signal,
150                                 stdout=self.logger.logTxtFile,
151                                 stderr=subprocess.PIPE)
152             if res != 0:
153                 raise src.SatException(_("Error: unable to get test base "
154                                          "'%(name)s' from git '%(repo)s'.") % \
155                                        { 'name': testbase_name,
156                                         'repo': testbase_base })
157
158         except OSError:
159             self.logger.error(_("git is not installed. exiting...\n"))
160             sys.exit(0)
161
162     def prepare_testbase_from_svn(self, user, testbase_name, testbase_base):
163         self.logger.write(_("get test base '%s' from svn\n") % \
164                           src.printcolors.printcLabel(testbase_name), 3)
165         try:
166             def set_signal(): # pragma: no cover
167                 """see http://bugs.python.org/issue1652"""
168                 import signal
169                 signal.signal(signal.SIGPIPE, signal.SIG_DFL)
170
171             cmd = "svn checkout --username %(user)s %(base)s %(dir)s"
172             cmd = cmd % { 'user': user,
173                          'base': testbase_base,
174                          'dir': testbase_name }
175
176             self.logger.write("> %s\n" % cmd, 5)
177             if src.architecture.is_windows():
178                 # preexec_fn not supported on windows platform
179                 res = subprocess.call(cmd,
180                                 cwd=os.path.join(self.tmp_working_dir, 'BASES'),
181                                 shell=True,
182                                 stdout=self.logger.logTxtFile,
183                                 stderr=subprocess.PIPE)
184             else:
185                 res = subprocess.call(cmd,
186                                 cwd=os.path.join(self.tmp_working_dir, 'BASES'),
187                                 shell=True,
188                                 preexec_fn=set_signal,
189                                 stdout=self.logger.logTxtFile,
190                                 stderr=subprocess.PIPE)
191
192             if res != 0:
193                 raise src.SatException(_("Error: unable to get test base '%(nam"
194                                          "e)s' from svn '%(repo)s'.") % \
195                                        { 'name': testbase_name,
196                                         'repo': testbase_base })
197
198         except OSError:
199             self.logger.error(_("svn is not installed. exiting...\n"))
200             sys.exit(0)
201
202     ##
203     # Configure tests base.
204     def prepare_testbase(self, test_base_name):
205         src.printcolors.print_value(self.logger,
206                                     _("Test base"),
207                                     test_base_name,
208                                     3)
209         self.logger.write("\n", 3, False)
210
211         # search for the test base
212         test_base_info = None
213         for project_name in self.config.PROJECTS.projects:
214             project_info = self.config.PROJECTS.projects[project_name]
215             for t_b_info in project_info.test_bases:
216                 if t_b_info.name == test_base_name:
217                     test_base_info = t_b_info
218         
219         if not test_base_info:
220             if os.path.exists(test_base_name):
221                 self.prepare_testbase_from_dir("DIR", test_base_name)
222                 self.currentTestBase = "DIR"
223                 return 0
224         
225         if not test_base_info:
226             message = (_("########## ERROR: test base '%s' not found\n") % 
227                        test_base_name)
228             self.logger.write("%s\n" % src.printcolors.printcError(message))
229             return 1
230
231         if test_base_info.get_sources == "dir":
232             self.prepare_testbase_from_dir(test_base_name,
233                                            test_base_info.info.dir)
234         elif test_base_info.get_sources == "git":
235             self.prepare_testbase_from_git(test_base_name,
236                                        test_base_info.info.base,
237                                        self.config.APPLICATION.test_base.tag)
238         elif test_base_info.get_sources == "svn":
239             svn_user = src.get_cfg_param(test_base_info.svn_info,
240                                          "svn_user",
241                                          self.config.USER.svn_user)
242             self.prepare_testbase_from_svn(svn_user,
243                                        test_base_name,
244                                        test_base_info.info.base)
245         else:
246             raise src.SatException(_("unknown source type '%(type)s' for test b"
247                                      "ase '%(base)s' ...\n") % {
248                                         'type': test_base_info.get_sources,
249                                         'base': test_base_name })
250
251         self.currentTestBase = test_base_name
252
253     ##
254     # Searches if the script is declared in known errors pyconf.
255     # Update the status if needed.
256     def search_known_errors(self, status, test_module, test_type, test):
257         test_path = os.path.join(test_module, test_type, test)
258         if not src.config_has_application(self.config):
259             return status, []
260
261         if self.known_errors is None:
262             return status, []
263
264         platform = self.config.VARS.arch
265         application = self.config.VARS.application
266         error = self.known_errors.get_error(test_path, application, platform)
267         if error is None:
268             return status, []
269         
270         if status == src.OK_STATUS:
271             if not error.fixed:
272                 # the error is fixed
273                 self.known_errors.fix_error(error)
274                 #import testerror
275                 #testerror.write_test_failures(
276                 #                        self.config.TOOLS.testerror.file_path,
277                 #                        self.known_errors.errors)
278             return status, [ error.date,
279                             error.expected,
280                             error.comment,
281                             error.fixed ]
282
283         if error.fixed:
284             self.known_errors.unfix_error(error)
285             #import testerror
286             #testerror.write_test_failures(self.config.TOOLS.testerror.file_path,
287             #                              self.known_errors.errors)
288
289         delta = self.known_errors.get_expecting_days(error)
290         kfres = [ error.date, error.expected, error.comment, error.fixed ]
291         if delta < 0:
292             return src.KO_STATUS, kfres
293         return src.KNOWNFAILURE_STATUS, kfres
294
295     ##
296     # Read the *.result.py files.
297     def read_results(self, listTest, has_timed_out):
298         results = {}
299         for test in listTest:
300             resfile = os.path.join(self.currentDir,
301                                    self.currentModule,
302                                    self.currentType,
303                                    test[:-3] + ".result.py")
304
305             # check if <test>.result.py file exists
306             if not os.path.exists(resfile):
307                 results[test] = ["?", -1, "", []]
308             else:
309                 gdic, ldic = {}, {}
310                 execfile(resfile, gdic, ldic)
311
312                 status = src.TIMEOUT_STATUS
313                 if not has_timed_out:
314                     status = src.KO_STATUS
315
316                 if ldic.has_key('status'):
317                     status = ldic['status']
318
319                 expected = []
320                 if status == src.KO_STATUS or status == src.OK_STATUS:
321                     status, expected = self.search_known_errors(status,
322                                                             self.currentModule,
323                                                             self.currentType,
324                                                             test)
325
326                 callback = ""
327                 if ldic.has_key('callback'):
328                     callback = ldic['callback']
329                 elif status == src.KO_STATUS:
330                     callback = "CRASH"
331
332                 exec_time = -1
333                 if ldic.has_key('time'):
334                     try:
335                         exec_time = float(ldic['time'])
336                     except:
337                         pass
338
339                 results[test] = [status, exec_time, callback, expected]
340             
341             # check if <test>.py file exists
342             testfile = os.path.join(self.currentDir,
343                                    self.currentModule,
344                                    self.currentType,
345                                    test)
346             
347             if not os.path.exists(testfile):
348                 results[test].append('')
349             else:
350                 text = open(testfile, "r").read()
351                 results[test].append(text)
352
353             # check if <test>.out.py file exists
354             outfile = os.path.join(self.currentDir,
355                                    self.currentModule,
356                                    self.currentType,
357                                    test[:-3] + ".out.py")
358             
359             if not os.path.exists(outfile):
360                 results[test].append('')
361             else:
362                 text = open(outfile, "r").read()
363                 results[test].append(text)
364
365         return results
366
367     ##
368     # Generates the script to be run by Salome.
369     # This python script includes init and close statements and a loop
370     # calling all the scripts of a single directory.
371     def generate_script(self, listTest, script_path, ignoreList):
372         # open template file
373         template_file = open(os.path.join(self.config.VARS.srcDir,
374                                           "test",
375                                           "scriptTemplate.py"), 'r')
376         template = string.Template(template_file.read())
377
378         # create substitution dictionary
379         d = dict()
380         d['resourcesWay'] = os.path.join(self.currentDir, 'RESSOURCES')
381         d['tmpDir'] = os.path.join(self.tmp_working_dir, 'WORK')
382         d['toolsWay'] = os.path.join(self.config.VARS.srcDir, "test")
383         d['typeDir'] = os.path.join(self.currentDir,
384                                     self.currentModule,
385                                     self.currentType)
386         d['resultFile'] = os.path.join(self.tmp_working_dir,
387                                        'WORK',
388                                        'exec_result')
389         d['listTest'] = listTest
390         d['typeName'] = self.currentType
391         d['ignore'] = ignoreList
392
393         # create script with template
394         script = open(script_path, 'w')
395         script.write(template.safe_substitute(d))
396         script.close()
397
398     # Find the getTmpDir function that gives access to *pidict file directory.
399     # (the *pidict file exists when SALOME is launched) 
400     def get_tmp_dir(self):
401         # Rare case where there is no KERNEL in module list 
402         # (for example MED_STANDALONE)
403         if ('APPLICATION' in self.config 
404                 and 'KERNEL' not in self.config.APPLICATION.products 
405                 and 'KERNEL_ROOT_DIR' not in os.environ):
406             return getTmpDirDEFAULT
407         
408         # Case where "sat test" is launched in an existing SALOME environment
409         if 'KERNEL_ROOT_DIR' in os.environ:
410             root_dir =  os.environ['KERNEL_ROOT_DIR']
411         
412         if ('APPLICATION' in self.config 
413                 and 'KERNEL' in self.config.APPLICATION.products):
414             root_dir = src.product.get_product_config(self.config,
415                                                       "KERNEL").install_dir
416
417         # Case where there the appli option is called (with path to launcher)
418         if len(self.launcher) > 0:
419             # There are two cases : The old application (runAppli) 
420             # and the new one
421             launcherName = os.path.basename(self.launcher)
422             launcherDir = os.path.dirname(self.launcher)
423             if launcherName == 'runAppli':
424                 # Old application
425                 cmd = "for i in " + launcherDir + "/env.d/*.sh; do source ${i};"
426                 " done ; echo $KERNEL_ROOT_DIR"
427             else:
428                 # New application
429                 cmd = "echo -e 'import os\nprint os.environ[\"KERNEL_ROOT_DIR\""
430                 "]' > tmpscript.py; %s shell tmpscript.py" % self.launcher
431             root_dir = subprocess.Popen(cmd,
432                             stdout=subprocess.PIPE,
433                             shell=True,
434                             executable='/bin/bash').communicate()[0].split()[-1]
435         
436         # import module salome_utils from KERNEL that gives 
437         # the right getTmpDir function
438         
439         (file_, pathname, description) = imp.find_module("salome_utils",
440                                                          [os.path.join(root_dir,
441                                                                     'bin',
442                                                                     'salome')])
443         try:
444             module = imp.load_module("salome_utils",
445                                      file_,
446                                      pathname,
447                                      description)
448             return module.getLogDir
449         except:
450             module = imp.load_module("salome_utils",
451                                      file_,
452                                      pathname,
453                                      description)
454             return module.getTmpDir
455         finally:
456             if file_:
457                 file_.close()
458
459
460     def get_test_timeout(self, test_name, default_value):
461         if ("timeout" in self.settings and 
462                 test_name in self.settings["timeout"]):
463             return self.settings["timeout"][test_name]
464
465         return default_value
466
467     def generate_launching_commands(self, typename):
468         # Case where "sat test" is launched in an existing SALOME environment
469         if 'KERNEL_ROOT_DIR' in os.environ:
470             binSalome = "runSalome"
471             binPython = "python"
472             killSalome = "killSalome.py"
473         
474         # Rare case where there is no KERNEL in module list 
475         # (for example MED_STANDALONE)
476         if ('APPLICATION' in self.config and 
477                 'KERNEL' not in self.config.APPLICATION.products):
478             binSalome = "runSalome"
479             binPython = "python" 
480             killSalome = "killSalome.py"   
481             src.environment.load_environment(self.config, False, self.logger)         
482             return binSalome, binPython, killSalome
483         
484         # Case where there the appli option is called (with path to launcher)
485         if len(self.launcher) > 0:
486             # There are two cases : The old application (runAppli) 
487             # and the new one
488             launcherName = os.path.basename(self.launcher)
489             launcherDir = os.path.dirname(self.launcher)
490             if launcherName == 'runAppli':
491                 # Old application
492                 binSalome = self.launcher
493                 binPython = ("for i in " +
494                              launcherDir +
495                              "/env.d/*.sh; do source ${i}; done ; python")
496                 killSalome = ("for i in " +
497                         launcherDir +
498                         "/env.d/*.sh; do source ${i}; done ; killSalome.py'")
499                 return binSalome, binPython, killSalome
500             else:
501                 # New application
502                 binSalome = self.launcher
503                 binPython = self.launcher + ' shell'
504                 killSalome = self.launcher + ' killall'
505                 return binSalome, binPython, killSalome
506
507         # SALOME version detection and APPLI repository detection
508         VersionSalome = src.get_salome_version(self.config)
509         appdir = 'APPLI'
510         if "APPLI" in self.config and "application_name" in self.config.APPLI:
511             appdir = self.config.APPLI.application_name
512         
513         # Case where SALOME has NOT the launcher that uses the SalomeContext API
514         if VersionSalome < 730:
515             binSalome = os.path.join(self.config.APPLI.module_appli_install_dir,
516                                      appdir,
517                                      "runAppli")
518             binPython = "python"
519             killSalome = "killSalome.py"
520             src.environment.load_environment(self.config, False, self.logger)           
521             return binSalome, binPython, killSalome
522         
523         # Case where SALOME has the launcher that uses the SalomeContext API
524         if VersionSalome >= 730:            
525             if 'profile' not in self.config.APPLICATION:
526                 # Before revision of application concept
527                 launcher_name = self.config.APPLI.launch_alias_name
528                 binSalome = os.path.join(self.config.APPLICATION.workdir,
529                                          appdir,
530                                          launcher_name)
531             else:
532                 # After revision of application concept
533                 launcher_name = self.config.APPLICATION.profile.launcher_name
534                 binSalome = os.path.join(self.config.APPLICATION.workdir,
535                                          launcher_name)
536             
537             if src.architecture.is_windows():
538                 binSalome += '.bat'
539
540             binPython = binSalome + ' shell'
541             killSalome = binSalome + ' killall'
542             return binSalome, binPython, killSalome
543                 
544         return binSalome, binPython, killSalome
545         
546
547     ##
548     # Runs tests of a type (using a single instance of Salome).
549     def run_tests(self, listTest, ignoreList):
550         out_path = os.path.join(self.currentDir,
551                                 self.currentModule,
552                                 self.currentType)
553         typename = "%s/%s" % (self.currentModule, self.currentType)
554         time_out = self.get_test_timeout(typename,
555                                          self.config.SITE.test.timeout)
556
557         time_out_salome = src.get_cfg_param(self.config.SITE.test,
558                                             "timeout_app",
559                                             self.config.SITE.test.timeout)
560
561         # generate wrapper script
562         script_path = os.path.join(out_path, 'wrapperScript.py')
563         self.generate_script(listTest, script_path, ignoreList)
564
565         tmpDir = self.get_tmp_dir()
566
567         binSalome, binPython, killSalome = self.generate_launching_commands(
568                                                                     typename)
569         if self.settings.has_key("run_with_modules") \
570            and self.settings["run_with_modules"].has_key(typename):
571             binSalome = (binSalome +
572                          " -m %s" % self.settings["run_with_modules"][typename])
573
574         logWay = os.path.join(self.tmp_working_dir, "WORK", "log_cxx")
575
576         status = False
577         elapsed = -1
578         if self.currentType.startswith("NOGUI_"):
579             # runSalome -t (bash)
580             status, elapsed = fork.batch(binSalome, self.logger,
581                                         os.path.join(self.tmp_working_dir,
582                                                      "WORK"),
583                                         [ "-t",
584                                          "--shutdown-server=1",
585                                          script_path ],
586                                         delai=time_out,
587                                         log=logWay)
588
589         elif self.currentType.startswith("PY_"):
590             # python script.py
591             status, elapsed = fork.batch(binPython, self.logger,
592                                           os.path.join(self.tmp_working_dir,
593                                                        "WORK"),
594                                           [script_path],
595                                           delai=time_out, log=logWay)
596
597         else:
598             opt = "-z 0"
599             if self.show_desktop: opt = "--show-desktop=0"
600             status, elapsed = fork.batch_salome(binSalome,
601                                                  self.logger,
602                                                  os.path.join(
603                                                         self.tmp_working_dir,
604                                                         "WORK"),
605                                                  [ opt,
606                                                   "--shutdown-server=1",
607                                                   script_path ],
608                                                  getTmpDir=tmpDir,
609                                                  fin=killSalome,
610                                                  delai=time_out,
611                                                  log=logWay,
612                                                  delaiapp=time_out_salome)
613
614         self.logger.write("status = %s, elapsed = %s\n" % (status, elapsed),
615                           5)
616
617         # create the test result to add in the config object
618         test_info = src.pyconf.Mapping(self.config)
619         test_info.testbase = self.currentTestBase
620         test_info.module = self.currentModule
621         test_info.type = self.currentType
622         test_info.script = src.pyconf.Sequence(self.config)
623
624         script_results = self.read_results(listTest, elapsed == time_out)
625         for sr in sorted(script_results.keys()):
626             self.nb_run += 1
627
628             # create script result
629             script_info = src.pyconf.Mapping(self.config)
630             script_info.name = sr
631             script_info.res = script_results[sr][0]
632             script_info.time = script_results[sr][1]
633             if script_info.res == src.TIMEOUT_STATUS:
634                 script_info.time = time_out
635             if script_info.time < 1e-3: script_info.time = 0
636
637             callback = script_results[sr][2]
638             if script_info.res != src.OK_STATUS and len(callback) > 0:
639                 script_info.callback = callback
640
641             kfres = script_results[sr][3]
642             if len(kfres) > 0:
643                 script_info.known_error = src.pyconf.Mapping(self.config)
644                 script_info.known_error.date = kfres[0]
645                 script_info.known_error.expected = kfres[1]
646                 script_info.known_error.comment = kfres[2]
647                 script_info.known_error.fixed = kfres[3]
648             
649             script_info.content = script_results[sr][4]
650             script_info.out = script_results[sr][5]
651             
652             # add it to the list of results
653             test_info.script.append(script_info, '')
654
655             # display the results
656             if script_info.time > 0:
657                 exectime = "(%7.3f s)" % script_info.time
658             else:
659                 exectime = ""
660
661             sp = "." * (35 - len(script_info.name))
662             self.logger.write(self.write_test_margin(3), 3)
663             self.logger.write("script %s %s %s %s\n" % (
664                                 src.printcolors.printcLabel(script_info.name),
665                                 sp,
666                                 src.printcolors.printc(script_info.res),
667                                 exectime), 3, False)
668             if script_info and len(callback) > 0:
669                 self.logger.write("Exception in %s\n%s\n" % \
670                     (script_info.name,
671                      src.printcolors.printcWarning(callback)), 2, False)
672
673             if script_info.res == src.OK_STATUS:
674                 self.nb_succeed += 1
675             elif script_info.res == src.KNOWNFAILURE_STATUS:
676                 self.nb_acknoledge += 1
677             elif script_info.res == src.TIMEOUT_STATUS:
678                 self.nb_timeout += 1
679             elif script_info.res == src.NA_STATUS:
680                 self.nb_run -= 1
681             elif script_info.res == "?":
682                 self.nb_not_run += 1
683
684         self.config.TESTS.append(test_info, '')
685
686     ##
687     # Runs all tests of a type.
688     def run_type_tests(self):
689        
690         self.logger.write(self.write_test_margin(2), 3)
691         self.logger.write("Type = %s\n" % src.printcolors.printcLabel(
692                                                     self.currentType), 3, False)
693
694         # prepare list of tests to run
695         tests = os.listdir(os.path.join(self.currentDir,
696                                         self.currentModule,
697                                         self.currentType))
698         tests = filter(lambda l: l.endswith(".py"), tests)
699         tests = sorted(tests, key=str.lower)
700
701         # build list of known failures
702         cat = "%s/%s/" % (self.currentModule, self.currentType)
703         ignoreDict = {}
704         for k in self.ignore_tests.keys():
705             if k.startswith(cat):
706                 ignoreDict[k[len(cat):]] = self.ignore_tests[k]
707
708         self.run_tests(tests, ignoreDict)
709
710     ##
711     # Runs all tests of a module.
712     def run_module_tests(self):
713         self.logger.write(self.write_test_margin(1), 3)
714         self.logger.write("Module = %s\n" % src.printcolors.printcLabel(
715                                                 self.currentModule), 3, False)
716
717         module_path = os.path.join(self.currentDir, self.currentModule)
718
719         types = []
720         if self.types is not None:
721             types = self.types # user choice
722         else:
723             # use all scripts in module
724             types = filter(lambda l: l not in C_IGNORE_MODULES,
725                            os.listdir(module_path))
726             types = filter(lambda l: os.path.isdir(os.path.join(module_path,
727                                                                 l)), types)
728
729         types = sorted(types, key=str.lower)
730         for type_ in types:
731             if not os.path.exists(os.path.join(module_path, type_)):
732                 self.logger.write(self.write_test_margin(2), 3)
733                 self.logger.write(src.printcolors.printcWarning("Type %s not "
734                                             "found" % type_) + "\n", 3, False)
735             else:
736                 self.currentType = type_
737                 self.run_type_tests()
738
739     ##
740     # Runs test testbase.
741     def run_testbase_tests(self):
742         res_dir = os.path.join(self.currentDir, "RESSOURCES")
743         os.environ['PYTHONPATH'] =  (res_dir + 
744                                      os.pathsep + 
745                                      os.environ['PYTHONPATH'])
746         os.environ['TT_BASE_RESSOURCES'] = res_dir
747         src.printcolors.print_value(self.logger,
748                                     "TT_BASE_RESSOURCES",
749                                     res_dir,
750                                     4)
751         self.logger.write("\n", 4, False)
752
753         self.logger.write(self.write_test_margin(0), 3)
754         testbase_label = "Test base = %s\n" % src.printcolors.printcLabel(
755                                                         self.currentTestBase)
756         self.logger.write(testbase_label, 3, False)
757         self.logger.write("-" * len(src.printcolors.cleancolor(testbase_label)),
758                           3)
759         self.logger.write("\n", 3, False)
760
761         # load settings
762         settings_file = os.path.join(res_dir, "test_settings.py")
763         if os.path.exists(settings_file):
764             gdic, ldic = {}, {}
765             execfile(settings_file, gdic, ldic)
766             self.logger.write(_("Load test settings\n"), 3)
767             self.settings = ldic['settings_dic']
768             self.ignore_tests = ldic['known_failures_list']
769             if isinstance(self.ignore_tests, list):
770                 self.ignore_tests = {}
771                 self.logger.write(src.printcolors.printcWarning("known_failur"
772                   "es_list must be a dictionary (not a list)") + "\n", 1, False)
773         else:
774             self.ignore_tests = {}
775             self.settings.clear()
776
777         # read known failures pyconf
778         if "testerror" in self.config.SITE:
779             #import testerror
780             #self.known_errors = testerror.read_test_failures(
781             #                            self.config.TOOLS.testerror.file_path,
782             #                            do_error=False)
783             pass
784         else:
785             self.known_errors = None
786
787         if self.modules is not None:
788             modules = self.modules # given by user
789         else:
790             # select all the modules (i.e. directories) in the directory
791             modules = filter(lambda l: l not in C_IGNORE_MODULES,
792                              os.listdir(self.currentDir))
793             modules = filter(lambda l: os.path.isdir(
794                                         os.path.join(self.currentDir, l)),
795                              modules)
796
797         modules = sorted(modules, key=str.lower)
798         for module in modules:
799             if not os.path.exists(os.path.join(self.currentDir, module)):
800                 self.logger.write(self.write_test_margin(1), 3)
801                 self.logger.write(src.printcolors.printcWarning(
802                             "Module %s does not exist\n" % module), 3, False)
803             else:
804                 self.currentModule = module
805                 self.run_module_tests()
806
807     def run_script(self, script_name):
808         if ('APPLICATION' in self.config and 
809                 script_name in self.config.APPLICATION):
810             script = self.config.APPLICATION[script_name]
811             if len(script) == 0:
812                 return
813
814             self.logger.write("\n", 2, False)
815             if not os.path.exists(script):
816                 self.logger.write(src.printcolors.printcWarning("WARNING: scrip"
817                                         "t not found: %s" % script) + "\n", 2)
818             else:
819                 self.logger.write(src.printcolors.printcHeader("----------- sta"
820                                             "rt %s" % script_name) + "\n", 2)
821                 self.logger.write("Run script: %s\n" % script, 2)
822                 subprocess.Popen(script, shell=True).wait()
823                 self.logger.write(src.printcolors.printcHeader("----------- end"
824                                                 " %s" % script_name) + "\n", 2)
825
826     def run_all_tests(self):
827         initTime = datetime.datetime.now()
828
829         self.run_script('test_setup')
830         self.logger.write("\n", 2, False)
831
832         self.logger.write(src.printcolors.printcHeader(
833                                             _("=== STARTING TESTS")) + "\n", 2)
834         self.logger.write("\n", 2, False)
835         self.currentDir = os.path.join(self.tmp_working_dir,
836                                        'BASES',
837                                        self.currentTestBase)
838         self.run_testbase_tests()
839
840         # calculate total execution time
841         totalTime = datetime.datetime.now() - initTime
842         totalTime -= datetime.timedelta(microseconds=totalTime.microseconds)
843         self.logger.write("\n", 2, False)
844         self.logger.write(src.printcolors.printcHeader(_("=== END TESTS")), 2)
845         self.logger.write(" %s\n" % src.printcolors.printcInfo(str(totalTime)),
846                           2,
847                           False)
848
849         #
850         # Start the tests
851         #
852         self.run_script('test_cleanup')
853         self.logger.write("\n", 2, False)
854
855         # evaluate results
856         res_count = "%d / %d" % (self.nb_succeed,
857                                  self.nb_run - self.nb_acknoledge)
858
859         res_out = _("Tests Results: %(succeed)d / %(total)d\n") % \
860             { 'succeed': self.nb_succeed, 'total': self.nb_run }
861         if self.nb_succeed == self.nb_run:
862             res_out = src.printcolors.printcSuccess(res_out)
863         else:
864             res_out = src.printcolors.printcError(res_out)
865         self.logger.write(res_out, 1)
866
867         if self.nb_timeout > 0:
868             self.logger.write(_("%d tests TIMEOUT\n") % self.nb_timeout, 1)
869             res_count += " TO: %d" % self.nb_timeout
870         if self.nb_not_run > 0:
871             self.logger.write(_("%d tests not executed\n") % self.nb_not_run, 1)
872             res_count += " NR: %d" % self.nb_not_run
873
874         status = src.OK_STATUS
875         if self.nb_run - self.nb_succeed - self.nb_acknoledge > 0:
876             status = src.KO_STATUS
877         elif self.nb_acknoledge:
878             status = src.KNOWNFAILURE_STATUS
879         
880         self.logger.write(_("Status: %s\n" % status), 3)
881
882         return self.nb_run - self.nb_succeed - self.nb_acknoledge
883
884     ##
885     # Write margin to show test results.
886     def write_test_margin(self, tab):
887         if tab == 0:
888             return ""
889         return "|   " * (tab - 1) + "+ "
890