Salome HOME
sat #8581 : ajout pour chaque produit d'une information sur le tag du projet et de...
[tools/sat.git] / src / test_module.py
1 #!/usr/bin/env python
2 #-*- coding:utf-8 -*-
3 #  Copyright (C) 2010-2013  CEA/DEN
4 #
5 #  This library is free software; you can redistribute it and/or
6 #  modify it under the terms of the GNU Lesser General Public
7 #  License as published by the Free Software Foundation; either
8 #  version 2.1 of the License.
9 #
10 #  This library is distributed in the hope that it will be useful,
11 #  but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 #  Lesser General Public License for more details.
14 #
15 #  You should have received a copy of the GNU Lesser General Public
16 #  License along with this library; if not, write to the Free Software
17 #  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
18
19 # Python 2/3 compatibility for execfile function
20 try:
21     execfile
22 except:
23     def execfile(somefile, global_vars, local_vars):
24         with open(somefile) as f:
25             code = compile(f.read(), somefile, 'exec')
26             exec(code, global_vars, local_vars)
27
28 import os
29 import sys
30 import datetime
31 import shutil
32 import string
33 import imp
34 import subprocess
35 import glob
36 import pprint as PP
37
38 verbose = False
39
40 from . import fork
41 import src
42
43 # directories not considered as test grids
44 C_IGNORE_GRIDS = ['.git', '.svn', 'RESSOURCES']
45
46 DEFAULT_TIMEOUT = 150
47
48 # Get directory to be used for the temporary files.
49 #
50 def getTmpDirDEFAULT():
51     if src.architecture.is_windows():
52         directory = os.getenv("TEMP")
53     else:
54         # for Linux: use /tmp/logs/{user} folder
55         directory = os.path.join( '/tmp', 'logs', os.getenv("USER", "unknown"))
56     return directory
57
58 class Test:
59     def __init__(self,
60                  config,
61                  logger,
62                  tmp_working_dir,
63                  testbase="",
64                  grids=None,
65                  sessions=None,
66                  launcher="",
67                  show_desktop=True):
68         self.grids = grids
69         self.config = config
70         self.logger = logger
71         self.tmp_working_dir = tmp_working_dir
72         self.sessions = sessions
73         self.launcher = launcher
74         self.show_desktop = show_desktop
75
76         res = self.prepare_testbase(testbase)
77         self.test_base_found = True
78         if res == 1:
79             # Fail
80             self.test_base_found = False
81         
82         self.settings = {}
83         self.known_errors = None
84
85         # create section for results
86         self.config.TESTS = src.pyconf.Sequence(self.config)
87
88         self.nb_run = 0
89         self.nb_succeed = 0
90         self.nb_timeout = 0
91         self.nb_not_run = 0
92         self.nb_acknoledge = 0
93
94     def _copy_dir(self, source, target):
95         if self.config.VARS.python >= "2.6":
96             shutil.copytree(source, target,
97                             symlinks=True,
98                             ignore=shutil.ignore_patterns('.git*','.svn*'))
99         else:
100             shutil.copytree(source, target,
101                             symlinks=True)
102
103     def prepare_testbase_from_dir(self, testbase_name, testbase_dir):
104         self.logger.write(_("get test base from dir: %s\n") % \
105                           src.printcolors.printcLabel(testbase_dir), 3)
106         if not os.access(testbase_dir, os.X_OK):
107             raise src.SatException(_("testbase %(name)s (%(dir)s) does not "
108                                      "exist ...\n") % { 'name': testbase_name,
109                                                        'dir': testbase_dir })
110
111         self._copy_dir(testbase_dir,
112                        os.path.join(self.tmp_working_dir, 'BASES', testbase_name))
113
114     def prepare_testbase_from_git(self,
115                                   testbase_name,
116                                   testbase_base,
117                                   testbase_tag):
118         self.logger.write(
119             _("get test base '%(testbase)s' with '%(tag)s' tag from git\n") % {
120                         "testbase" : src.printcolors.printcLabel(testbase_name),
121                         "tag" : src.printcolors.printcLabel(testbase_tag)},
122                           3)
123         try:
124             def set_signal(): # pragma: no cover
125                 """see http://bugs.python.org/issue1652"""
126                 import signal
127                 signal.signal(signal.SIGPIPE, signal.SIG_DFL)
128
129             cmd = "git clone --depth 1 %(base)s %(dir)s"
130             cmd += " && cd %(dir)s"
131             if testbase_tag=='master':
132                 cmd += " && git fetch origin %(branch)s"
133             else:
134                 cmd += " && git fetch origin %(branch)s:%(branch)s"
135             cmd += " && git checkout %(branch)s"
136             cmd = cmd % { 'branch': testbase_tag,
137                          'base': testbase_base,
138                          'dir': testbase_name }
139
140             self.logger.write("> %s\n" % cmd, 5)
141             if src.architecture.is_windows():
142                 # preexec_fn not supported on windows platform
143                 res = subprocess.call(cmd,
144                                 cwd=os.path.join(self.tmp_working_dir, 'BASES'),
145                                 shell=True,
146                                 stdout=self.logger.logTxtFile,
147                                 stderr=subprocess.PIPE)
148             else:
149                 res = subprocess.call(cmd,
150                                 cwd=os.path.join(self.tmp_working_dir, 'BASES'),
151                                 shell=True,
152                                 preexec_fn=set_signal,
153                                 stdout=self.logger.logTxtFile,
154                                 stderr=subprocess.PIPE)
155             if res != 0:
156                 raise src.SatException(_("Error: unable to get test base "
157                                          "'%(name)s' from git '%(repo)s'.") % \
158                                        { 'name': testbase_name,
159                                         'repo': testbase_base })
160
161         except OSError:
162             self.logger.error(_("git is not installed. exiting...\n"))
163             sys.exit(0)
164
165     def prepare_testbase_from_svn(self, user, testbase_name, testbase_base):
166         self.logger.write(_("get test base '%s' from svn\n") % \
167                           src.printcolors.printcLabel(testbase_name), 3)
168         try:
169             def set_signal(): # pragma: no cover
170                 """see http://bugs.python.org/issue1652"""
171                 import signal
172                 signal.signal(signal.SIGPIPE, signal.SIG_DFL)
173
174             cmd = "svn checkout --username %(user)s %(base)s %(dir)s"
175             cmd = cmd % { 'user': user,
176                          'base': testbase_base,
177                          'dir': testbase_name }
178             
179             # Get the application environment
180             self.logger.write(_("Set the application environment\n"), 5)
181             env_appli = src.environment.SalomeEnviron(self.config,
182                                       src.environment.Environ(dict(os.environ)))
183             env_appli.set_application_env(self.logger)
184             
185             self.logger.write("> %s\n" % cmd, 5)
186             if src.architecture.is_windows():
187                 # preexec_fn not supported on windows platform
188                 res = subprocess.call(cmd,
189                                 cwd=os.path.join(self.tmp_working_dir, 'BASES'),
190                                 shell=True,
191                                 stdout=self.logger.logTxtFile,
192                                 stderr=subprocess.PIPE)
193             else:
194                 res = subprocess.call(cmd,
195                                 cwd=os.path.join(self.tmp_working_dir, 'BASES'),
196                                 shell=True,
197                                 preexec_fn=set_signal,
198                                 stdout=self.logger.logTxtFile,
199                                 stderr=subprocess.PIPE,
200                                 env=env_appli.environ.environ,)
201
202             if res != 0:
203                 raise src.SatException(_("Error: unable to get test base '%(nam"
204                                          "e)s' from svn '%(repo)s'.") % \
205                                        { 'name': testbase_name,
206                                         'repo': testbase_base })
207
208         except OSError:
209             self.logger.error(_("svn is not installed. exiting...\n"))
210             sys.exit(0)
211
212     ##
213     # Configure tests base.
214     def prepare_testbase(self, test_base_name):
215         src.printcolors.print_value(self.logger,
216                                     _("Test base"),
217                                     test_base_name,
218                                     3)
219         self.logger.write("\n", 3, False)
220
221         # search for the test base
222         test_base_info = None
223         for project_name in self.config.PROJECTS.projects:
224             project_info = self.config.PROJECTS.projects[project_name]
225             if "test_bases" not in project_info:
226                 continue
227             for t_b_info in project_info.test_bases:
228                 if t_b_info.name == test_base_name:
229                     test_base_info = t_b_info
230         
231         if not test_base_info:
232             if os.path.exists(test_base_name):
233                 self.prepare_testbase_from_dir("DIR", test_base_name)
234                 self.currentTestBase = "DIR"
235                 return 0
236         
237         if not test_base_info:
238             message = (_("########## ERROR: test base '%s' not found\n") % 
239                        test_base_name)
240             self.logger.write("%s\n" % src.printcolors.printcError(message))
241             return 1
242
243         if test_base_info.get_sources == "dir":
244             self.prepare_testbase_from_dir(test_base_name,
245                                            test_base_info.info.dir)
246         elif test_base_info.get_sources == "git":
247             self.prepare_testbase_from_git(test_base_name,
248                                        test_base_info.info.base,
249                                        self.config.APPLICATION.test_base.tag)
250         elif test_base_info.get_sources == "svn":
251             svn_user = src.get_cfg_param(test_base_info.info,
252                                          "svn_user",
253                                          self.config.USER.svn_user)
254             self.prepare_testbase_from_svn(svn_user,
255                                        test_base_name,
256                                        test_base_info.info.base)
257         else:
258             raise src.SatException(_("unknown source type '%(type)s' for test b"
259                                      "ase '%(base)s' ...\n") % {
260                                         'type': test_base_info.get_sources,
261                                         'base': test_base_name })
262
263         self.currentTestBase = test_base_name
264
265     ##
266     # Searches if the script is declared in known errors pyconf.
267     # Update the status if needed.
268     def search_known_errors(self, status, test_grid, test_session, test):
269         test_path = os.path.join(test_grid, test_session, test)
270         if not src.config_has_application(self.config):
271             return status, []
272
273         if self.known_errors is None:
274             return status, []
275
276         platform = self.config.VARS.arch
277         application = self.config.VARS.application
278         error = self.known_errors.get_error(test_path, application, platform)
279         if error is None:
280             return status, []
281         
282         if status == src.OK_STATUS:
283             if not error.fixed:
284                 # the error is fixed
285                 self.known_errors.fix_error(error)
286                 #import testerror
287                 #testerror.write_test_failures(
288                 #                        self.config.TOOLS.testerror.file_path,
289                 #                        self.known_errors.errors)
290             return status, [ error.date,
291                             error.expected,
292                             error.comment,
293                             error.fixed ]
294
295         if error.fixed:
296             self.known_errors.unfix_error(error)
297             #import testerror
298             #testerror.write_test_failures(self.config.TOOLS.testerror.file_path,
299             #                              self.known_errors.errors)
300
301         delta = self.known_errors.get_expecting_days(error)
302         kfres = [ error.date, error.expected, error.comment, error.fixed ]
303         if delta < 0:
304             return src.KO_STATUS, kfres
305         return src.KNOWNFAILURE_STATUS, kfres
306
307     ##
308     # Read the *.result.py files.
309     def read_results(self, listTest, has_timed_out):
310         results = {}
311         for test in listTest:
312             resfile = os.path.join(self.currentDir,
313                                    self.currentgrid,
314                                    self.currentsession,
315                                    test[:-3] + ".result.py")
316
317             # check if <test>.result.py file exists
318             if not os.path.exists(resfile):
319                 results[test] = ["?", -1, "", []]
320             else:
321                 gdic, ldic = {}, {}
322                 if verbose:
323                   print("test script: '%s':\n'%s'\n" % (resfile, open(resfile, 'r').read()))
324
325                 try:
326                   execfile(resfile, gdic, ldic)
327
328                   status = src.TIMEOUT_STATUS
329                   if not has_timed_out:
330                       status = src.KO_STATUS
331
332                   if 'status' in ldic:
333                       status = ldic['status']
334
335                   expected = []
336                   if status == src.KO_STATUS or status == src.OK_STATUS:
337                       status, expected = self.search_known_errors(status,
338                                                               self.currentgrid,
339                                                               self.currentsession,
340                                                               test)
341
342                   callback = ""
343                   if 'callback' in ldic:
344                       callback = ldic['callback']
345                   elif status == src.KO_STATUS:
346                       callback = "CRASH"
347                       if verbose:
348                         print("--- CRASH ldic\n%s" % PP.pformat(ldic)) # cvw TODO
349                         print("--- CRASH gdic\n%s" %  PP.pformat(gdic))
350                         pass
351
352                   exec_time = -1
353                   if 'time' in ldic:
354                       try:
355                           exec_time = float(ldic['time'])
356                       except:
357                           pass
358
359                   results[test] = [status, exec_time, callback, expected]
360
361                 except:
362                   results[test] = ["?", -1, "", []]
363                   # results[test] = [src.O_STATUS, -1, open(resfile, 'r').read(), []]
364             
365             # check if <test>.py file exists
366             testfile = os.path.join(self.currentDir,
367                                    self.currentgrid,
368                                    self.currentsession,
369                                    test)
370             
371             if not os.path.exists(testfile):
372                 results[test].append('')
373             else:
374                 text = open(testfile, "r").read()
375                 results[test].append(text)
376
377             # check if <test>.out.py file exists
378             outfile = os.path.join(self.currentDir,
379                                    self.currentgrid,
380                                    self.currentsession,
381                                    test[:-3] + ".out.py")
382             
383             if not os.path.exists(outfile):
384                 results[test].append('')
385             else:
386                 text = open(outfile, "r").read()
387                 results[test].append(text)
388
389         return results
390
391     ##
392     # Generates the script to be run by Salome.
393     # This python script includes init and close statements and a loop
394     # calling all the scripts of a single directory.
395     def generate_script(self, listTest, script_path, ignoreList):
396         # open template file
397         tFile = os.path.join(self.config.VARS.srcDir, "test", "scriptTemplate.py")
398         with open(tFile, 'r') as f:
399           template = string.Template(f.read())
400         
401         # create substitution dictionary
402         d = dict()
403         d['resourcesWay'] = os.path.join(self.currentDir, 'RESSOURCES')
404         d['tmpDir'] = os.path.join(self.tmp_working_dir, 'WORK')
405         d['toolsWay'] = os.path.join(self.config.VARS.srcDir, "test")
406         d['sessionDir'] = os.path.join(self.currentDir, self.currentgrid, self.currentsession)
407         d['resultFile'] = os.path.join(self.tmp_working_dir, 'WORK', 'exec_result')
408         d['listTest'] = listTest
409         d['sessionName'] = self.currentsession
410         d['ignore'] = ignoreList
411
412         # create script with template
413         contents = template.safe_substitute(d)
414         if verbose: print("generate_script '%s':\n%s" % (script_path, contents)) # cvw TODO
415         with open(script_path, 'w') as f:
416           f.write(contents)
417
418
419     # Find the getTmpDir function that gives access to *_pidict file directory.
420     # (the *_pidict file exists when SALOME is launched)
421     def get_tmp_dir(self):
422         # Rare case where there is no KERNEL in grid list 
423         # (for example MED_STANDALONE)
424         if ('APPLICATION' in self.config 
425                 and 'KERNEL' not in self.config.APPLICATION.products 
426                 and 'KERNEL_ROOT_DIR' not in os.environ):
427             return getTmpDirDEFAULT
428         
429         # Case where "sat test" is launched in an existing SALOME environment
430         if 'KERNEL_ROOT_DIR' in os.environ:
431             root_dir =  os.environ['KERNEL_ROOT_DIR']
432         
433         if ('APPLICATION' in self.config and
434             'KERNEL' in self.config.APPLICATION.products):
435             root_dir = src.product.get_product_config(self.config, "KERNEL").install_dir
436
437         # Case where there the appli option is called (with path to launcher)
438         if len(self.launcher) > 0:
439             # There are two cases : The old application (runAppli) 
440             # and the new one
441             launcherName = os.path.basename(self.launcher)
442             launcherDir = os.path.dirname(self.launcher)
443             if launcherName == 'runAppli':
444                 # Old application
445                 cmd = """
446 for i in %s/env.d/*.sh; 
447   do source ${i};
448 done
449 echo $KERNEL_ROOT_DIR
450 """ % launcherDir
451             else:
452                 # New application
453                 cmd = """
454 echo -e 'import os\nprint(os.environ[\"KERNEL_ROOT_DIR\"])' > tmpscript.py
455 %s shell tmpscript.py
456 """ % self.launcher
457
458             if src.architecture.is_windows():
459                 subproc_res = subprocess.Popen(cmd,
460                             stdout=subprocess.PIPE,
461                             shell=True).communicate()
462                 pass
463             else:
464                 subproc_res = subprocess.Popen(cmd,
465                             stdout=subprocess.PIPE,
466                             shell=True,
467                             executable='/bin/bash').communicate()
468                 pass
469             
470             root_dir = subproc_res[0].split()[-1]
471         
472         # import grid salome_utils from KERNEL that gives 
473         # the right getTmpDir function
474         root_dir = root_dir.decode('utf-8')
475         aPath = [os.path.join(root_dir, 'bin', 'salome')]
476         sal_uts = "salome_utils"
477         try:
478             (file_, pathname, description) = imp.find_module(sal_uts, aPath )
479         except Exception:
480             msg = "inexisting %s.py in %s" % (sal_uts, aPath)
481             raise Exception(msg)
482
483         try:
484             grid = imp.load_module(sal_uts, file_, pathname, description)
485             return grid.getLogDir
486         except:
487             grid = imp.load_module(sal_uts, file_, pathname, description)
488             return grid.getTmpDir
489         finally:
490             if file_:
491                 file_.close()
492
493
494     def get_test_timeout(self, test_name, default_value):
495         if ("timeout" in self.settings and 
496                 test_name in self.settings["timeout"]):
497             return self.settings["timeout"][test_name]
498
499         return default_value
500
501     def generate_launching_commands(self):
502         # Case where "sat test" is launched in an existing SALOME environment
503         if 'KERNEL_ROOT_DIR' in os.environ:
504             binSalome = "runSalome"
505             binPython = "python"
506             killSalome = "killSalome.py"
507         
508         # Rare case where there is no KERNEL in grid list 
509         # (for example MED_STANDALONE)
510         if ('APPLICATION' in self.config and 
511                 'KERNEL' not in self.config.APPLICATION.products):
512             binSalome = "runSalome"
513             binPython = "python" 
514             killSalome = "killSalome.py"   
515             src.environment.load_environment(self.config, False, self.logger)         
516             return binSalome, binPython, killSalome
517         
518         # Case where there the appli option is called (with path to launcher)
519         if len(self.launcher) > 0:
520             # There are two cases : The old application (runAppli) 
521             # and the new one
522             launcherName = os.path.basename(self.launcher)
523             launcherDir = os.path.dirname(self.launcher)
524             if launcherName == 'runAppli':
525                 # Old application
526                 binSalome = self.launcher
527                 binPython = ("for i in " +
528                              launcherDir +
529                              "/env.d/*.sh; do source ${i}; done ; python")
530                 killSalome = ("for i in " +
531                         launcherDir +
532                         "/env.d/*.sh; do source ${i}; done ; killSalome.py'")
533                 return binSalome, binPython, killSalome
534             else:
535                 # New application
536                 binSalome = self.launcher
537                 binPython = self.launcher + ' shell'
538                 killSalome = self.launcher + ' killall'
539                 return binSalome, binPython, killSalome
540
541         # SALOME version detection and APPLI repository detection
542         VersionSalome = src.get_salome_version(self.config)
543         appdir = 'APPLI'
544         if "APPLI" in self.config and "application_name" in self.config.APPLI:
545             appdir = self.config.APPLI.application_name
546         
547         # Case where SALOME has NOT the launcher that uses the SalomeContext API
548         if VersionSalome < 730:
549             binSalome = os.path.join(self.config.APPLICATION.workdir,
550                                      appdir,
551                                      "runAppli")
552             binPython = "python"
553             killSalome = "killSalome.py"
554             src.environment.load_environment(self.config, False, self.logger)           
555             return binSalome, binPython, killSalome
556         
557         # Case where SALOME has the launcher that uses the SalomeContext API
558         else:            
559             launcher_name = src.get_launcher_name(self.config)
560             binSalome = os.path.join(self.config.APPLICATION.workdir,
561                                      launcher_name)
562             
563             binPython = binSalome + ' shell'
564             killSalome = binSalome + ' killall'
565             return binSalome, binPython, killSalome
566                 
567         return binSalome, binPython, killSalome
568         
569
570     ##
571     # Runs tests of a session (using a single instance of Salome).
572     def run_tests(self, listTest, ignoreList):
573         out_path = os.path.join(self.currentDir,
574                                 self.currentgrid,
575                                 self.currentsession)
576         if verbose: print("run_tests '%s'\nlistTest: %s\nignoreList: %s" %
577                    (self.currentDir, PP.pformat(listTest), PP.pformat(ignoreList))) # cvw TODO
578         sessionname = "%s/%s" % (self.currentgrid, self.currentsession)
579         time_out = self.get_test_timeout(sessionname,
580                                          DEFAULT_TIMEOUT)
581
582         time_out_salome = DEFAULT_TIMEOUT
583
584         # generate wrapper script
585         script_path = os.path.join(out_path, 'wrapperScript.py')
586         self.generate_script(listTest, script_path, ignoreList)
587
588         tmpDir = self.get_tmp_dir()
589
590         binSalome, binPython, killSalome = self.generate_launching_commands()
591         if "run_with_grids" in self.settings and \
592            sessionname in self.settings["run_with_grids"]:
593             binSalome = (binSalome + " -m %s" % self.settings["run_with_grids"][sessionname])
594
595         logWay = os.path.join(self.tmp_working_dir, "WORK", "log_cxx")
596
597         status = False
598         elapsed = -1
599         if self.currentsession.startswith("NOGUI_"):
600             # runSalome -t (bash)
601             status, elapsed = fork.batch(
602                                 binSalome,
603                                 self.logger,
604                                 os.path.join(self.tmp_working_dir, "WORK"),
605                                 [ "-t", "--shutdown-server=1", script_path ],
606                                 delai=time_out,
607                                 log=logWay)
608
609         elif self.currentsession.startswith("PY_"):
610             # python script.py
611             status, elapsed = fork.batch(
612                                 binPython,
613                                 self.logger,
614                                 os.path.join(self.tmp_working_dir, "WORK"),
615                                 [script_path],
616                                 delai=time_out,
617                                 log=logWay)
618
619         else:
620             opt = "-z 0"
621             if self.show_desktop: opt = "--show-desktop=0"
622             status, elapsed = fork.batch_salome(
623                                 binSalome,
624                                 self.logger,
625                                 os.path.join( self.tmp_working_dir, "WORK"),
626                                 [ opt, "--shutdown-server=1", script_path ],
627                                 getTmpDir=tmpDir,
628                                 fin=killSalome,
629                                 delai=time_out,
630                                 log=logWay,
631                                 delaiapp=time_out_salome)
632
633         self.logger.write("status = %s, elapsed = %s\n" % (status, elapsed), 5)
634
635         # create the test result to add in the config object
636         test_info = src.pyconf.Mapping(self.config)
637         test_info.testbase = self.currentTestBase
638         test_info.grid = self.currentgrid
639         test_info.session = self.currentsession
640         test_info.script = src.pyconf.Sequence(self.config)
641
642         script_results = self.read_results(listTest, elapsed == time_out)
643         for sr in sorted(script_results.keys()):
644             self.nb_run += 1
645
646             # create script result
647             script_info = src.pyconf.Mapping(self.config)
648             script_info.name = sr
649             script_info.res = script_results[sr][0]
650             script_info.time = script_results[sr][1]
651             if script_info.res == src.TIMEOUT_STATUS:
652                 script_info.time = time_out
653             if script_info.time < 1e-3: script_info.time = 0
654
655             callback = script_results[sr][2]
656             if script_info.res != src.OK_STATUS and len(callback) > 0:
657                 script_info.callback = callback
658
659             kfres = script_results[sr][3]
660             if len(kfres) > 0:
661                 script_info.known_error = src.pyconf.Mapping(self.config)
662                 script_info.known_error.date = kfres[0]
663                 script_info.known_error.expected = kfres[1]
664                 script_info.known_error.comment = kfres[2]
665                 script_info.known_error.fixed = kfres[3]
666             
667             script_info.content = script_results[sr][4]
668             script_info.out = script_results[sr][5]
669             
670             # add it to the list of results
671             test_info.script.append(script_info, '')
672
673             # display the results
674             if script_info.time > 0:
675                 exectime = "(%7.3f s)" % script_info.time
676             else:
677                 exectime = ""
678
679             sp = "." * (35 - len(script_info.name))
680             self.logger.write(self.write_test_margin(3), 3)
681             self.logger.write("script %s %s %s %s\n" % (
682                                 src.printcolors.printcLabel(script_info.name),
683                                 sp,
684                                 src.printcolors.printc(script_info.res),
685                                 exectime), 3, False)
686             if script_info and len(callback) > 0:
687                 self.logger.write("Exception in %s\n%s\n" % \
688                     (script_info.name,
689                      src.printcolors.printcWarning(callback)), 2, False)
690
691             if script_info.res == src.OK_STATUS:
692                 self.nb_succeed += 1
693             elif script_info.res == src.KNOWNFAILURE_STATUS:
694                 self.nb_acknoledge += 1
695             elif script_info.res == src.TIMEOUT_STATUS:
696                 self.nb_timeout += 1
697             elif script_info.res == src.NA_STATUS:
698                 self.nb_run -= 1
699             elif script_info.res == "?":
700                 self.nb_not_run += 1
701                 
702
703         self.config.TESTS.append(test_info, '')
704
705     ##
706     # Runs all tests of a session.
707     def run_session_tests(self):
708        
709         self.logger.write(self.write_test_margin(2), 3)
710         self.logger.write("Session = %s\n" % src.printcolors.printcLabel(
711                                                     self.currentsession), 3, False)
712
713         # prepare list of tests to run
714         tests = os.listdir(os.path.join(self.currentDir,
715                                         self.currentgrid,
716                                         self.currentsession))
717         # avoid result files of previous tests, if presents
718         # tests = filter(lambda l: l.endswith(".py"), tests)
719         tests = [t for t in tests if t.endswith(".py") \
720                    and not ( t.endswith(".out.py") or \
721                              t.endswith(".result.py") or \
722                              t.endswith("wrapperScript.py") \
723                            ) ]
724         tests = sorted(tests, key=str.lower)
725
726         # build list of known failures
727         cat = "%s/%s/" % (self.currentgrid, self.currentsession)
728         ignoreDict = {}
729         for k in self.ignore_tests.keys():
730             if k.startswith(cat):
731                 ignoreDict[k[len(cat):]] = self.ignore_tests[k]
732
733         self.run_tests(tests, ignoreDict)
734
735     ##
736     # Runs all tests of a grid.
737     def run_grid_tests(self):
738         self.logger.write(self.write_test_margin(1), 3)
739         self.logger.write("grid = %s\n" % src.printcolors.printcLabel(
740                                                 self.currentgrid), 3, False)
741
742         grid_path = os.path.join(self.currentDir, self.currentgrid)
743
744         sessions = []
745         if self.sessions is not None:
746             sessions = self.sessions # user choice
747         else:
748             # use all scripts in grid
749             sessions = filter(lambda l: l not in C_IGNORE_GRIDS,
750                            os.listdir(grid_path))
751             sessions = filter(lambda l: os.path.isdir(os.path.join(grid_path,
752                                                                 l)), sessions)
753
754         sessions = sorted(sessions, key=str.lower)
755         existingSessions = self.getSubDirectories(grid_path)
756         for session_ in sessions:
757             if not os.path.exists(os.path.join(grid_path, session_)):
758                 self.logger.write(self.write_test_margin(2), 3)
759                 msg = """\
760 Session '%s' not found
761 Existing sessions are:
762 %s
763 """ % (session_, PP.pformat(sorted(existingSessions)))
764                 self.logger.write(src.printcolors.printcWarning(msg), 3, False)
765             else:
766                 self.currentsession = session_
767                 self.run_session_tests()
768
769     def getSubDirectories(self, aDir):
770         """
771         get names of first level of sub directories in aDir
772         excluding '.git' etc as beginning with '.'
773         """
774         res = os.listdir(aDir)
775         res = [d for d in res if os.path.isdir(os.path.join(aDir, d)) and d[0] != '.']
776         # print("getSubDirectories %s are:\n%s" % (aDir, PP.pformat(res)))
777         return res
778
779     ##
780     # Runs test testbase.
781     def run_testbase_tests(self):
782         res_dir = os.path.join(self.currentDir, "RESSOURCES")
783         os.environ['PYTHONPATH'] =  (res_dir + 
784                                      os.pathsep + 
785                                      os.environ['PYTHONPATH'])
786         os.environ['TT_BASE_RESSOURCES'] = res_dir
787         src.printcolors.print_value(self.logger,
788                                     "TT_BASE_RESSOURCES",
789                                     res_dir,
790                                     4)
791         self.logger.write("\n", 4, False)
792
793         self.logger.write(self.write_test_margin(0), 3)
794         testbase_label = "Test base = %s\n" % src.printcolors.printcLabel(
795                                                         self.currentTestBase)
796         self.logger.write(testbase_label, 3, False)
797         self.logger.write("-" * len(src.printcolors.cleancolor(testbase_label)),
798                           3)
799         self.logger.write("\n", 3, False)
800
801         # load settings
802         settings_file = os.path.join(res_dir, "test_settings.py")
803         if os.path.exists(settings_file):
804             gdic, ldic = {}, {}
805             execfile(settings_file, gdic, ldic)
806             self.logger.write("Load test settings '%s'\n" % settings_file, 5)
807             self.settings = ldic['settings_dic']
808             self.ignore_tests = ldic['known_failures_list']
809             if isinstance(self.ignore_tests, list):
810                 self.ignore_tests = {}
811                 self.logger.write(src.printcolors.printcWarning(
812                   "known_failures_list must be a dictionary (not a list)") + "\n", 1, False)
813         else:
814             self.ignore_tests = {}
815             self.settings.clear()
816
817         # read known failures pyconf
818         if "testerror" in self.config.LOCAL:
819             #import testerror
820             #self.known_errors = testerror.read_test_failures(
821             #                            self.config.TOOLS.testerror.file_path,
822             #                            do_error=False)
823             pass
824         else:
825             self.known_errors = None
826
827         if self.grids is not None:
828             grids = self.grids # given by user
829         else:
830             # select all the grids (i.e. directories) in the directory
831             grids = filter(lambda l: l not in C_IGNORE_GRIDS,
832                              os.listdir(self.currentDir))
833             grids = filter(lambda l: os.path.isdir(
834                                         os.path.join(self.currentDir, l)),
835                              grids)
836
837         grids = sorted(grids, key=str.lower)
838         existingGrids = self.getSubDirectories(self.currentDir)
839         for grid in grids:
840             if not os.path.exists(os.path.join(self.currentDir, grid)):
841                 self.logger.write(self.write_test_margin(1), 3)
842                 msg = """\
843 Grid '%s' does not exist
844 Existing grids are:
845 %s
846 """ % (grid, PP.pformat(sorted(existingGrids)))
847                 self.logger.write(src.printcolors.printcWarning(msg), 3, False)
848             else:
849                 self.currentgrid = grid
850                 self.run_grid_tests()
851
852     def run_script(self, script_name):
853         if ('APPLICATION' in self.config and
854                 script_name in self.config.APPLICATION):
855             script = self.config.APPLICATION[script_name]
856             if len(script) == 0:
857                 return
858
859             self.logger.write("\n", 2, False)
860             if not os.path.exists(script):
861                 self.logger.write(src.printcolors.printcWarning("WARNING: scrip"
862                                         "t not found: %s" % script) + "\n", 2)
863             else:
864                 self.logger.write(src.printcolors.printcHeader("----------- sta"
865                                             "rt %s" % script_name) + "\n", 2)
866                 self.logger.write("Run script: %s\n" % script, 2)
867                 subprocess.Popen(script, shell=True).wait()
868                 self.logger.write(src.printcolors.printcHeader("----------- end"
869                                                 " %s" % script_name) + "\n", 2)
870
871     def run_all_tests(self):
872         initTime = datetime.datetime.now()
873
874         self.run_script('test_setup')
875         self.logger.write("\n", 2, False)
876
877         self.logger.write(src.printcolors.printcHeader(
878                                             _("=== STARTING TESTS")) + "\n", 2)
879         self.logger.write("\n", 2, False)
880         self.currentDir = os.path.join(self.tmp_working_dir,
881                                        'BASES',
882                                        self.currentTestBase)
883         self.run_testbase_tests()
884
885         # calculate total execution time
886         totalTime = datetime.datetime.now() - initTime
887         totalTime -= datetime.timedelta(microseconds=totalTime.microseconds)
888         self.logger.write("\n", 2, False)
889         self.logger.write(src.printcolors.printcHeader(_("=== END TESTS")), 2)
890         self.logger.write(" %s\n" % src.printcolors.printcInfo(str(totalTime)),
891                           2,
892                           False)
893
894         #
895         # Start the tests
896         #
897         self.run_script('test_cleanup')
898         self.logger.write("\n", 2, False)
899
900         # evaluate results
901
902         res_out = _("Tests Results: %(succeed)d / %(total)d\n") % \
903             { 'succeed': self.nb_succeed, 'total': self.nb_run }
904         if self.nb_succeed == self.nb_run:
905             res_out = src.printcolors.printcSuccess(res_out)
906         else:
907             res_out = src.printcolors.printcError(res_out)
908         self.logger.write(res_out, 1)
909
910         if self.nb_timeout > 0:
911             self.logger.write(_("%d tests TIMEOUT\n") % self.nb_timeout, 1)
912         if self.nb_not_run > 0:
913             self.logger.write(_("%d tests not executed\n") % self.nb_not_run, 1)
914         if self.nb_acknoledge > 0:
915             self.logger.write(_("%d tests known failures\n") % self.nb_acknoledge, 1)
916
917         status = src.OK_STATUS
918         if self.nb_run - self.nb_succeed - self.nb_acknoledge > 0:
919             status = src.KO_STATUS
920         elif self.nb_acknoledge:
921             status = src.KNOWNFAILURE_STATUS
922         
923         self.logger.write(_("Status: %s\n" % status), 3)
924
925         return self.nb_run - self.nb_succeed - self.nb_acknoledge
926
927     ##
928     # Write margin to show test results.
929     def write_test_margin(self, tab):
930         if tab == 0:
931             return ""
932         return "|   " * (tab - 1) + "+ "
933