Salome HOME
spns #8471 : add Windows case to launch tests with the command 'sat test'
[tools/sat.git] / src / test_module.py
1 #!/usr/bin/env python
2 #-*- coding:utf-8 -*-
3 #  Copyright (C) 2010-2013  CEA/DEN
4 #
5 #  This library is free software; you can redistribute it and/or
6 #  modify it under the terms of the GNU Lesser General Public
7 #  License as published by the Free Software Foundation; either
8 #  version 2.1 of the License.
9 #
10 #  This library is distributed in the hope that it will be useful,
11 #  but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 #  Lesser General Public License for more details.
14 #
15 #  You should have received a copy of the GNU Lesser General Public
16 #  License along with this library; if not, write to the Free Software
17 #  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
18
19 # Python 2/3 compatibility for execfile function
20 try:
21     execfile
22 except:
23     def execfile(somefile, global_vars, local_vars):
24         with open(somefile) as f:
25             code = compile(f.read(), somefile, 'exec')
26             exec(code, global_vars, local_vars)
27
28
29 import os
30 import sys
31 import datetime
32 import shutil
33 import string
34 import imp
35 import subprocess
36
37 from . import fork
38 import src
39
40 # directories not considered as test grids
41 C_IGNORE_GRIDS = ['.git', '.svn', 'RESSOURCES']
42
43 DEFAULT_TIMEOUT = 150
44
45 # Get directory to be used for the temporary files.
46 #
47 def getTmpDirDEFAULT():
48     if src.architecture.is_windows():
49         directory = os.getenv("TEMP")
50     else:
51         # for Linux: use /tmp/logs/{user} folder
52         directory = os.path.join( '/tmp', 'logs', os.getenv("USER", "unknown"))
53     return directory
54
55 class Test:
56     def __init__(self,
57                  config,
58                  logger,
59                  tmp_working_dir,
60                  testbase="",
61                  grids=None,
62                  sessions=None,
63                  launcher="",
64                  show_desktop=True):
65         self.grids = grids
66         self.config = config
67         self.logger = logger
68         self.tmp_working_dir = tmp_working_dir
69         self.sessions = sessions
70         self.launcher = launcher
71         self.show_desktop = show_desktop
72
73         res = self.prepare_testbase(testbase)
74         self.test_base_found = True
75         if res == 1:
76             # Fail
77             self.test_base_found = False
78         
79         self.settings = {}
80         self.known_errors = None
81
82         # create section for results
83         self.config.TESTS = src.pyconf.Sequence(self.config)
84
85         self.nb_run = 0
86         self.nb_succeed = 0
87         self.nb_timeout = 0
88         self.nb_not_run = 0
89         self.nb_acknoledge = 0
90
91     def _copy_dir(self, source, target):
92         if self.config.VARS.python >= "2.6":
93             shutil.copytree(source, target,
94                             symlinks=True,
95                             ignore=shutil.ignore_patterns('.git*','.svn*'))
96         else:
97             shutil.copytree(source, target,
98                             symlinks=True)
99
100     def prepare_testbase_from_dir(self, testbase_name, testbase_dir):
101         self.logger.write(_("get test base from dir: %s\n") % \
102                           src.printcolors.printcLabel(testbase_dir), 3)
103         if not os.access(testbase_dir, os.X_OK):
104             raise src.SatException(_("testbase %(name)s (%(dir)s) does not "
105                                      "exist ...\n") % { 'name': testbase_name,
106                                                        'dir': testbase_dir })
107
108         self._copy_dir(testbase_dir,
109                        os.path.join(self.tmp_working_dir, 'BASES', testbase_name))
110
111     def prepare_testbase_from_git(self,
112                                   testbase_name,
113                                   testbase_base,
114                                   testbase_tag):
115         self.logger.write(
116             _("get test base '%(testbase)s' with '%(tag)s' tag from git\n") % {
117                         "testbase" : src.printcolors.printcLabel(testbase_name),
118                         "tag" : src.printcolors.printcLabel(testbase_tag)},
119                           3)
120         try:
121             def set_signal(): # pragma: no cover
122                 """see http://bugs.python.org/issue1652"""
123                 import signal
124                 signal.signal(signal.SIGPIPE, signal.SIG_DFL)
125
126             cmd = "git clone --depth 1 %(base)s %(dir)s"
127             cmd += " && cd %(dir)s"
128             if testbase_tag=='master':
129                 cmd += " && git fetch origin %(branch)s"
130             else:
131                 cmd += " && git fetch origin %(branch)s:%(branch)s"
132             cmd += " && git checkout %(branch)s"
133             cmd = cmd % { 'branch': testbase_tag,
134                          'base': testbase_base,
135                          'dir': testbase_name }
136
137             self.logger.write("> %s\n" % cmd, 5)
138             if src.architecture.is_windows():
139                 # preexec_fn not supported on windows platform
140                 res = subprocess.call(cmd,
141                                 cwd=os.path.join(self.tmp_working_dir, 'BASES'),
142                                 shell=True,
143                                 stdout=self.logger.logTxtFile,
144                                 stderr=subprocess.PIPE)
145             else:
146                 res = subprocess.call(cmd,
147                                 cwd=os.path.join(self.tmp_working_dir, 'BASES'),
148                                 shell=True,
149                                 preexec_fn=set_signal,
150                                 stdout=self.logger.logTxtFile,
151                                 stderr=subprocess.PIPE)
152             if res != 0:
153                 raise src.SatException(_("Error: unable to get test base "
154                                          "'%(name)s' from git '%(repo)s'.") % \
155                                        { 'name': testbase_name,
156                                         'repo': testbase_base })
157
158         except OSError:
159             self.logger.error(_("git is not installed. exiting...\n"))
160             sys.exit(0)
161
162     def prepare_testbase_from_svn(self, user, testbase_name, testbase_base):
163         self.logger.write(_("get test base '%s' from svn\n") % \
164                           src.printcolors.printcLabel(testbase_name), 3)
165         try:
166             def set_signal(): # pragma: no cover
167                 """see http://bugs.python.org/issue1652"""
168                 import signal
169                 signal.signal(signal.SIGPIPE, signal.SIG_DFL)
170
171             cmd = "svn checkout --username %(user)s %(base)s %(dir)s"
172             cmd = cmd % { 'user': user,
173                          'base': testbase_base,
174                          'dir': testbase_name }
175             
176             # Get the application environment
177             self.logger.write(_("Set the application environment\n"), 5)
178             env_appli = src.environment.SalomeEnviron(self.config,
179                                       src.environment.Environ(dict(os.environ)))
180             env_appli.set_application_env(self.logger)
181             
182             self.logger.write("> %s\n" % cmd, 5)
183             if src.architecture.is_windows():
184                 # preexec_fn not supported on windows platform
185                 res = subprocess.call(cmd,
186                                 cwd=os.path.join(self.tmp_working_dir, 'BASES'),
187                                 shell=True,
188                                 stdout=self.logger.logTxtFile,
189                                 stderr=subprocess.PIPE)
190             else:
191                 res = subprocess.call(cmd,
192                                 cwd=os.path.join(self.tmp_working_dir, 'BASES'),
193                                 shell=True,
194                                 preexec_fn=set_signal,
195                                 stdout=self.logger.logTxtFile,
196                                 stderr=subprocess.PIPE,
197                                 env=env_appli.environ.environ,)
198
199             if res != 0:
200                 raise src.SatException(_("Error: unable to get test base '%(nam"
201                                          "e)s' from svn '%(repo)s'.") % \
202                                        { 'name': testbase_name,
203                                         'repo': testbase_base })
204
205         except OSError:
206             self.logger.error(_("svn is not installed. exiting...\n"))
207             sys.exit(0)
208
209     ##
210     # Configure tests base.
211     def prepare_testbase(self, test_base_name):
212         src.printcolors.print_value(self.logger,
213                                     _("Test base"),
214                                     test_base_name,
215                                     3)
216         self.logger.write("\n", 3, False)
217
218         # search for the test base
219         test_base_info = None
220         for project_name in self.config.PROJECTS.projects:
221             project_info = self.config.PROJECTS.projects[project_name]
222             if "test_bases" not in project_info:
223                 continue
224             for t_b_info in project_info.test_bases:
225                 if t_b_info.name == test_base_name:
226                     test_base_info = t_b_info
227         
228         if not test_base_info:
229             if os.path.exists(test_base_name):
230                 self.prepare_testbase_from_dir("DIR", test_base_name)
231                 self.currentTestBase = "DIR"
232                 return 0
233         
234         if not test_base_info:
235             message = (_("########## ERROR: test base '%s' not found\n") % 
236                        test_base_name)
237             self.logger.write("%s\n" % src.printcolors.printcError(message))
238             return 1
239
240         if test_base_info.get_sources == "dir":
241             self.prepare_testbase_from_dir(test_base_name,
242                                            test_base_info.info.dir)
243         elif test_base_info.get_sources == "git":
244             self.prepare_testbase_from_git(test_base_name,
245                                        test_base_info.info.base,
246                                        self.config.APPLICATION.test_base.tag)
247         elif test_base_info.get_sources == "svn":
248             svn_user = src.get_cfg_param(test_base_info.info,
249                                          "svn_user",
250                                          self.config.USER.svn_user)
251             self.prepare_testbase_from_svn(svn_user,
252                                        test_base_name,
253                                        test_base_info.info.base)
254         else:
255             raise src.SatException(_("unknown source type '%(type)s' for test b"
256                                      "ase '%(base)s' ...\n") % {
257                                         'type': test_base_info.get_sources,
258                                         'base': test_base_name })
259
260         self.currentTestBase = test_base_name
261
262     ##
263     # Searches if the script is declared in known errors pyconf.
264     # Update the status if needed.
265     def search_known_errors(self, status, test_grid, test_session, test):
266         test_path = os.path.join(test_grid, test_session, test)
267         if not src.config_has_application(self.config):
268             return status, []
269
270         if self.known_errors is None:
271             return status, []
272
273         platform = self.config.VARS.arch
274         application = self.config.VARS.application
275         error = self.known_errors.get_error(test_path, application, platform)
276         if error is None:
277             return status, []
278         
279         if status == src.OK_STATUS:
280             if not error.fixed:
281                 # the error is fixed
282                 self.known_errors.fix_error(error)
283                 #import testerror
284                 #testerror.write_test_failures(
285                 #                        self.config.TOOLS.testerror.file_path,
286                 #                        self.known_errors.errors)
287             return status, [ error.date,
288                             error.expected,
289                             error.comment,
290                             error.fixed ]
291
292         if error.fixed:
293             self.known_errors.unfix_error(error)
294             #import testerror
295             #testerror.write_test_failures(self.config.TOOLS.testerror.file_path,
296             #                              self.known_errors.errors)
297
298         delta = self.known_errors.get_expecting_days(error)
299         kfres = [ error.date, error.expected, error.comment, error.fixed ]
300         if delta < 0:
301             return src.KO_STATUS, kfres
302         return src.KNOWNFAILURE_STATUS, kfres
303
304     ##
305     # Read the *.result.py files.
306     def read_results(self, listTest, has_timed_out):
307         results = {}
308         for test in listTest:
309             resfile = os.path.join(self.currentDir,
310                                    self.currentgrid,
311                                    self.currentsession,
312                                    test[:-3] + ".result.py")
313
314             # check if <test>.result.py file exists
315             if not os.path.exists(resfile):
316                 results[test] = ["?", -1, "", []]
317             else:
318                 gdic, ldic = {}, {}
319                 execfile(resfile, gdic, ldic)
320
321                 status = src.TIMEOUT_STATUS
322                 if not has_timed_out:
323                     status = src.KO_STATUS
324
325                 if ldic.has_key('status'):
326                     status = ldic['status']
327
328                 expected = []
329                 if status == src.KO_STATUS or status == src.OK_STATUS:
330                     status, expected = self.search_known_errors(status,
331                                                             self.currentgrid,
332                                                             self.currentsession,
333                                                             test)
334
335                 callback = ""
336                 if ldic.has_key('callback'):
337                     callback = ldic['callback']
338                 elif status == src.KO_STATUS:
339                     callback = "CRASH"
340
341                 exec_time = -1
342                 if ldic.has_key('time'):
343                     try:
344                         exec_time = float(ldic['time'])
345                     except:
346                         pass
347
348                 results[test] = [status, exec_time, callback, expected]
349             
350             # check if <test>.py file exists
351             testfile = os.path.join(self.currentDir,
352                                    self.currentgrid,
353                                    self.currentsession,
354                                    test)
355             
356             if not os.path.exists(testfile):
357                 results[test].append('')
358             else:
359                 text = open(testfile, "r").read()
360                 results[test].append(text)
361
362             # check if <test>.out.py file exists
363             outfile = os.path.join(self.currentDir,
364                                    self.currentgrid,
365                                    self.currentsession,
366                                    test[:-3] + ".out.py")
367             
368             if not os.path.exists(outfile):
369                 results[test].append('')
370             else:
371                 text = open(outfile, "r").read()
372                 results[test].append(text)
373
374         return results
375
376     ##
377     # Generates the script to be run by Salome.
378     # This python script includes init and close statements and a loop
379     # calling all the scripts of a single directory.
380     def generate_script(self, listTest, script_path, ignoreList):
381         # open template file
382         template_file = open(os.path.join(self.config.VARS.srcDir,
383                                           "test",
384                                           "scriptTemplate.py"), 'r')
385         template = string.Template(template_file.read())
386         
387         # create substitution dictionary
388         d = dict()
389         d['resourcesWay'] = os.path.join(self.currentDir, 'RESSOURCES')
390         d['tmpDir'] = os.path.join(self.tmp_working_dir, 'WORK')
391         d['toolsWay'] = os.path.join(self.config.VARS.srcDir, "test")
392         d['sessionDir'] = os.path.join(self.currentDir,
393                                     self.currentgrid,
394                                     self.currentsession)
395         d['resultFile'] = os.path.join(self.tmp_working_dir,
396                                        'WORK',
397                                        'exec_result')
398         d['listTest'] = listTest
399         d['sessionName'] = self.currentsession
400         d['ignore'] = ignoreList
401
402         # create script with template
403         script = open(script_path, 'w')
404         script.write(template.safe_substitute(d))
405         script.close()
406
407     # Find the getTmpDir function that gives access to *pidict file directory.
408     # (the *pidict file exists when SALOME is launched) 
409     def get_tmp_dir(self):
410         # Rare case where there is no KERNEL in grid list 
411         # (for example MED_STANDALONE)
412         if ('APPLICATION' in self.config 
413                 and 'KERNEL' not in self.config.APPLICATION.products 
414                 and 'KERNEL_ROOT_DIR' not in os.environ):
415             return getTmpDirDEFAULT
416         
417         # Case where "sat test" is launched in an existing SALOME environment
418         if 'KERNEL_ROOT_DIR' in os.environ:
419             root_dir =  os.environ['KERNEL_ROOT_DIR']
420         
421         if ('APPLICATION' in self.config 
422                 and 'KERNEL' in self.config.APPLICATION.products):
423             root_dir = src.product.get_product_config(self.config,
424                                                       "KERNEL").install_dir
425
426         # Case where there the appli option is called (with path to launcher)
427         if len(self.launcher) > 0:
428             # There are two cases : The old application (runAppli) 
429             # and the new one
430             launcherName = os.path.basename(self.launcher)
431             launcherDir = os.path.dirname(self.launcher)
432             if launcherName == 'runAppli':
433                 # Old application
434                 cmd = ("for i in " + launcherDir + "/env.d/*.sh; do source ${i};"
435                        " done ; echo $KERNEL_ROOT_DIR")
436             else:
437                 # New application
438                 cmd = ("echo -e 'import os\nprint os.environ[\"KERNEL_" + 
439                        "ROOT_DIR\"]' > tmpscript.py; %s shell" + 
440                        " tmpscript.py") % self.launcher
441
442             # OP 14/11/2017 Ajout de traces pour essayer de decouvrir le pb
443             #               de remontee de log des tests
444             #root_dir = subprocess.Popen(cmd,
445             #                stdout=subprocess.PIPE,
446             #                shell=True,
447             #                executable='/bin/bash').communicate()[0].split()[-1]
448             # OP Add Windows case
449             if src.architecture.is_windows():
450                 subproc_res = subprocess.Popen(cmd,
451                             stdout=subprocess.PIPE,
452                             shell=True).communicate()
453                 pass
454             else:
455                 subproc_res = subprocess.Popen(cmd,
456                             stdout=subprocess.PIPE,
457                             shell=True,
458                             executable='/bin/bash').communicate()
459                 pass
460             #print "TRACES OP - test_module.py/Test.get_tmp_dir() subproc_res = "
461             #for resLine in subproc_res:
462             #    print "- '#%s#'" %resLine
463             
464             root_dir = subproc_res[0].split()[-1]
465
466         # OP 14/11/2017 Ajout de traces pour essayer de decouvrir le pb
467         #               de remontee de log des tests
468         #print "TRACES OP - test_module.py/Test.get_tmp_dir() root_dir = '#%s#'" %root_dir
469         
470         # import grid salome_utils from KERNEL that gives 
471         # the right getTmpDir function
472         (file_, pathname, description) = imp.find_module("salome_utils",
473                                                          [os.path.join(root_dir,
474                                                                     'bin',
475                                                                     'salome')])
476         try:
477             grid = imp.load_module("salome_utils",
478                                      file_,
479                                      pathname,
480                                      description)
481             return grid.getLogDir
482         except:
483             grid = imp.load_module("salome_utils",
484                                      file_,
485                                      pathname,
486                                      description)
487             return grid.getTmpDir
488         finally:
489             if file_:
490                 file_.close()
491
492
493     def get_test_timeout(self, test_name, default_value):
494         if ("timeout" in self.settings and 
495                 test_name in self.settings["timeout"]):
496             return self.settings["timeout"][test_name]
497
498         return default_value
499
500     def generate_launching_commands(self):
501         # Case where "sat test" is launched in an existing SALOME environment
502         if 'KERNEL_ROOT_DIR' in os.environ:
503             binSalome = "runSalome"
504             binPython = "python"
505             killSalome = "killSalome.py"
506         
507         # Rare case where there is no KERNEL in grid list 
508         # (for example MED_STANDALONE)
509         if ('APPLICATION' in self.config and 
510                 'KERNEL' not in self.config.APPLICATION.products):
511             binSalome = "runSalome"
512             binPython = "python" 
513             killSalome = "killSalome.py"   
514             src.environment.load_environment(self.config, False, self.logger)         
515             return binSalome, binPython, killSalome
516         
517         # Case where there the appli option is called (with path to launcher)
518         if len(self.launcher) > 0:
519             # There are two cases : The old application (runAppli) 
520             # and the new one
521             launcherName = os.path.basename(self.launcher)
522             launcherDir = os.path.dirname(self.launcher)
523             if launcherName == 'runAppli':
524                 # Old application
525                 binSalome = self.launcher
526                 binPython = ("for i in " +
527                              launcherDir +
528                              "/env.d/*.sh; do source ${i}; done ; python")
529                 killSalome = ("for i in " +
530                         launcherDir +
531                         "/env.d/*.sh; do source ${i}; done ; killSalome.py'")
532                 return binSalome, binPython, killSalome
533             else:
534                 # New application
535                 binSalome = self.launcher
536                 binPython = self.launcher + ' shell'
537                 killSalome = self.launcher + ' killall'
538                 return binSalome, binPython, killSalome
539
540         # SALOME version detection and APPLI repository detection
541         VersionSalome = src.get_salome_version(self.config)
542         appdir = 'APPLI'
543         if "APPLI" in self.config and "application_name" in self.config.APPLI:
544             appdir = self.config.APPLI.application_name
545         
546         # Case where SALOME has NOT the launcher that uses the SalomeContext API
547         if VersionSalome < 730:
548             binSalome = os.path.join(self.config.APPLICATION.workdir,
549                                      appdir,
550                                      "runAppli")
551             binPython = "python"
552             killSalome = "killSalome.py"
553             src.environment.load_environment(self.config, False, self.logger)           
554             return binSalome, binPython, killSalome
555         
556         # Case where SALOME has the launcher that uses the SalomeContext API
557         else:            
558             launcher_name = src.get_launcher_name(self.config)
559             binSalome = os.path.join(self.config.APPLICATION.workdir,
560                                      launcher_name)
561             
562             binPython = binSalome + ' shell'
563             killSalome = binSalome + ' killall'
564             return binSalome, binPython, killSalome
565                 
566         return binSalome, binPython, killSalome
567         
568
569     ##
570     # Runs tests of a session (using a single instance of Salome).
571     def run_tests(self, listTest, ignoreList):
572         out_path = os.path.join(self.currentDir,
573                                 self.currentgrid,
574                                 self.currentsession)
575         sessionname = "%s/%s" % (self.currentgrid, self.currentsession)
576         time_out = self.get_test_timeout(sessionname,
577                                          DEFAULT_TIMEOUT)
578
579         time_out_salome = DEFAULT_TIMEOUT
580
581         # generate wrapper script
582         script_path = os.path.join(out_path, 'wrapperScript.py')
583         self.generate_script(listTest, script_path, ignoreList)
584
585         tmpDir = self.get_tmp_dir()
586
587         binSalome, binPython, killSalome = self.generate_launching_commands()
588         if self.settings.has_key("run_with_grids") \
589            and self.settings["run_with_grids"].has_key(sessionname):
590             binSalome = (binSalome +
591                          " -m %s" % self.settings["run_with_grids"][sessionname])
592
593         logWay = os.path.join(self.tmp_working_dir, "WORK", "log_cxx")
594
595         status = False
596         elapsed = -1
597         if self.currentsession.startswith("NOGUI_"):
598             # runSalome -t (bash)
599             status, elapsed = fork.batch(binSalome, self.logger,
600                                         os.path.join(self.tmp_working_dir,
601                                                      "WORK"),
602                                         [ "-t",
603                                          "--shutdown-server=1",
604                                          script_path ],
605                                         delai=time_out,
606                                         log=logWay)
607
608         elif self.currentsession.startswith("PY_"):
609             # python script.py
610             status, elapsed = fork.batch(binPython, self.logger,
611                                           os.path.join(self.tmp_working_dir,
612                                                        "WORK"),
613                                           [script_path],
614                                           delai=time_out, log=logWay)
615
616         else:
617             opt = "-z 0"
618             if self.show_desktop: opt = "--show-desktop=0"
619             status, elapsed = fork.batch_salome(binSalome,
620                                                  self.logger,
621                                                  os.path.join(
622                                                         self.tmp_working_dir,
623                                                         "WORK"),
624                                                  [ opt,
625                                                   "--shutdown-server=1",
626                                                   script_path ],
627                                                  getTmpDir=tmpDir,
628                                                  fin=killSalome,
629                                                  delai=time_out,
630                                                  log=logWay,
631                                                  delaiapp=time_out_salome)
632
633         self.logger.write("status = %s, elapsed = %s\n" % (status, elapsed),
634                           5)
635
636         # create the test result to add in the config object
637         test_info = src.pyconf.Mapping(self.config)
638         test_info.testbase = self.currentTestBase
639         test_info.grid = self.currentgrid
640         test_info.session = self.currentsession
641         test_info.script = src.pyconf.Sequence(self.config)
642
643         script_results = self.read_results(listTest, elapsed == time_out)
644         for sr in sorted(script_results.keys()):
645             self.nb_run += 1
646
647             # create script result
648             script_info = src.pyconf.Mapping(self.config)
649             script_info.name = sr
650             script_info.res = script_results[sr][0]
651             script_info.time = script_results[sr][1]
652             if script_info.res == src.TIMEOUT_STATUS:
653                 script_info.time = time_out
654             if script_info.time < 1e-3: script_info.time = 0
655
656             callback = script_results[sr][2]
657             if script_info.res != src.OK_STATUS and len(callback) > 0:
658                 script_info.callback = callback
659
660             kfres = script_results[sr][3]
661             if len(kfres) > 0:
662                 script_info.known_error = src.pyconf.Mapping(self.config)
663                 script_info.known_error.date = kfres[0]
664                 script_info.known_error.expected = kfres[1]
665                 script_info.known_error.comment = kfres[2]
666                 script_info.known_error.fixed = kfres[3]
667             
668             script_info.content = script_results[sr][4]
669             script_info.out = script_results[sr][5]
670             
671             # add it to the list of results
672             test_info.script.append(script_info, '')
673
674             # display the results
675             if script_info.time > 0:
676                 exectime = "(%7.3f s)" % script_info.time
677             else:
678                 exectime = ""
679
680             sp = "." * (35 - len(script_info.name))
681             self.logger.write(self.write_test_margin(3), 3)
682             self.logger.write("script %s %s %s %s\n" % (
683                                 src.printcolors.printcLabel(script_info.name),
684                                 sp,
685                                 src.printcolors.printc(script_info.res),
686                                 exectime), 3, False)
687             if script_info and len(callback) > 0:
688                 self.logger.write("Exception in %s\n%s\n" % \
689                     (script_info.name,
690                      src.printcolors.printcWarning(callback)), 2, False)
691
692             if script_info.res == src.OK_STATUS:
693                 self.nb_succeed += 1
694             elif script_info.res == src.KNOWNFAILURE_STATUS:
695                 self.nb_acknoledge += 1
696             elif script_info.res == src.TIMEOUT_STATUS:
697                 self.nb_timeout += 1
698             elif script_info.res == src.NA_STATUS:
699                 self.nb_run -= 1
700             elif script_info.res == "?":
701                 self.nb_not_run += 1
702                 
703
704         self.config.TESTS.append(test_info, '')
705
706     ##
707     # Runs all tests of a session.
708     def run_session_tests(self):
709        
710         self.logger.write(self.write_test_margin(2), 3)
711         self.logger.write("Session = %s\n" % src.printcolors.printcLabel(
712                                                     self.currentsession), 3, False)
713
714         # prepare list of tests to run
715         tests = os.listdir(os.path.join(self.currentDir,
716                                         self.currentgrid,
717                                         self.currentsession))
718         tests = filter(lambda l: l.endswith(".py"), tests)
719         tests = sorted(tests, key=str.lower)
720
721         # build list of known failures
722         cat = "%s/%s/" % (self.currentgrid, self.currentsession)
723         ignoreDict = {}
724         for k in self.ignore_tests.keys():
725             if k.startswith(cat):
726                 ignoreDict[k[len(cat):]] = self.ignore_tests[k]
727
728         self.run_tests(tests, ignoreDict)
729
730     ##
731     # Runs all tests of a grid.
732     def run_grid_tests(self):
733         self.logger.write(self.write_test_margin(1), 3)
734         self.logger.write("grid = %s\n" % src.printcolors.printcLabel(
735                                                 self.currentgrid), 3, False)
736
737         grid_path = os.path.join(self.currentDir, self.currentgrid)
738
739         sessions = []
740         if self.sessions is not None:
741             sessions = self.sessions # user choice
742         else:
743             # use all scripts in grid
744             sessions = filter(lambda l: l not in C_IGNORE_GRIDS,
745                            os.listdir(grid_path))
746             sessions = filter(lambda l: os.path.isdir(os.path.join(grid_path,
747                                                                 l)), sessions)
748
749         sessions = sorted(sessions, key=str.lower)
750         for session_ in sessions:
751             if not os.path.exists(os.path.join(grid_path, session_)):
752                 self.logger.write(self.write_test_margin(2), 3)
753                 self.logger.write(src.printcolors.printcWarning("Session %s not"
754                                         " found" % session_) + "\n", 3, False)
755             else:
756                 self.currentsession = session_
757                 self.run_session_tests()
758
759     ##
760     # Runs test testbase.
761     def run_testbase_tests(self):
762         res_dir = os.path.join(self.currentDir, "RESSOURCES")
763         os.environ['PYTHONPATH'] =  (res_dir + 
764                                      os.pathsep + 
765                                      os.environ['PYTHONPATH'])
766         os.environ['TT_BASE_RESSOURCES'] = res_dir
767         src.printcolors.print_value(self.logger,
768                                     "TT_BASE_RESSOURCES",
769                                     res_dir,
770                                     4)
771         self.logger.write("\n", 4, False)
772
773         self.logger.write(self.write_test_margin(0), 3)
774         testbase_label = "Test base = %s\n" % src.printcolors.printcLabel(
775                                                         self.currentTestBase)
776         self.logger.write(testbase_label, 3, False)
777         self.logger.write("-" * len(src.printcolors.cleancolor(testbase_label)),
778                           3)
779         self.logger.write("\n", 3, False)
780
781         # load settings
782         settings_file = os.path.join(res_dir, "test_settings.py")
783         if os.path.exists(settings_file):
784             gdic, ldic = {}, {}
785             execfile(settings_file, gdic, ldic)
786             self.logger.write(_("Load test settings\n"), 3)
787             self.settings = ldic['settings_dic']
788             self.ignore_tests = ldic['known_failures_list']
789             if isinstance(self.ignore_tests, list):
790                 self.ignore_tests = {}
791                 self.logger.write(src.printcolors.printcWarning("known_failur"
792                   "es_list must be a dictionary (not a list)") + "\n", 1, False)
793         else:
794             self.ignore_tests = {}
795             self.settings.clear()
796
797         # read known failures pyconf
798         if "testerror" in self.config.LOCAL:
799             #import testerror
800             #self.known_errors = testerror.read_test_failures(
801             #                            self.config.TOOLS.testerror.file_path,
802             #                            do_error=False)
803             pass
804         else:
805             self.known_errors = None
806
807         if self.grids is not None:
808             grids = self.grids # given by user
809         else:
810             # select all the grids (i.e. directories) in the directory
811             grids = filter(lambda l: l not in C_IGNORE_GRIDS,
812                              os.listdir(self.currentDir))
813             grids = filter(lambda l: os.path.isdir(
814                                         os.path.join(self.currentDir, l)),
815                              grids)
816
817         grids = sorted(grids, key=str.lower)
818         for grid in grids:
819             if not os.path.exists(os.path.join(self.currentDir, grid)):
820                 self.logger.write(self.write_test_margin(1), 3)
821                 self.logger.write(src.printcolors.printcWarning(
822                             "grid %s does not exist\n" % grid), 3, False)
823             else:
824                 self.currentgrid = grid
825                 self.run_grid_tests()
826
827     def run_script(self, script_name):
828         if ('APPLICATION' in self.config and 
829                 script_name in self.config.APPLICATION):
830             script = self.config.APPLICATION[script_name]
831             if len(script) == 0:
832                 return
833
834             self.logger.write("\n", 2, False)
835             if not os.path.exists(script):
836                 self.logger.write(src.printcolors.printcWarning("WARNING: scrip"
837                                         "t not found: %s" % script) + "\n", 2)
838             else:
839                 self.logger.write(src.printcolors.printcHeader("----------- sta"
840                                             "rt %s" % script_name) + "\n", 2)
841                 self.logger.write("Run script: %s\n" % script, 2)
842                 subprocess.Popen(script, shell=True).wait()
843                 self.logger.write(src.printcolors.printcHeader("----------- end"
844                                                 " %s" % script_name) + "\n", 2)
845
846     def run_all_tests(self):
847         initTime = datetime.datetime.now()
848
849         self.run_script('test_setup')
850         self.logger.write("\n", 2, False)
851
852         self.logger.write(src.printcolors.printcHeader(
853                                             _("=== STARTING TESTS")) + "\n", 2)
854         self.logger.write("\n", 2, False)
855         self.currentDir = os.path.join(self.tmp_working_dir,
856                                        'BASES',
857                                        self.currentTestBase)
858         self.run_testbase_tests()
859
860         # calculate total execution time
861         totalTime = datetime.datetime.now() - initTime
862         totalTime -= datetime.timedelta(microseconds=totalTime.microseconds)
863         self.logger.write("\n", 2, False)
864         self.logger.write(src.printcolors.printcHeader(_("=== END TESTS")), 2)
865         self.logger.write(" %s\n" % src.printcolors.printcInfo(str(totalTime)),
866                           2,
867                           False)
868
869         #
870         # Start the tests
871         #
872         self.run_script('test_cleanup')
873         self.logger.write("\n", 2, False)
874
875         # evaluate results
876
877         res_out = _("Tests Results: %(succeed)d / %(total)d\n") % \
878             { 'succeed': self.nb_succeed, 'total': self.nb_run }
879         if self.nb_succeed == self.nb_run:
880             res_out = src.printcolors.printcSuccess(res_out)
881         else:
882             res_out = src.printcolors.printcError(res_out)
883         self.logger.write(res_out, 1)
884
885         if self.nb_timeout > 0:
886             self.logger.write(_("%d tests TIMEOUT\n") % self.nb_timeout, 1)
887         if self.nb_not_run > 0:
888             self.logger.write(_("%d tests not executed\n") % self.nb_not_run, 1)
889         if self.nb_acknoledge > 0:
890             self.logger.write(_("%d tests known failures\n") % self.nb_acknoledge, 1)
891
892         status = src.OK_STATUS
893         if self.nb_run - self.nb_succeed - self.nb_acknoledge > 0:
894             status = src.KO_STATUS
895         elif self.nb_acknoledge:
896             status = src.KNOWNFAILURE_STATUS
897         
898         self.logger.write(_("Status: %s\n" % status), 3)
899
900         return self.nb_run - self.nb_succeed - self.nb_acknoledge
901
902     ##
903     # Write margin to show test results.
904     def write_test_margin(self, tab):
905         if tab == 0:
906             return ""
907         return "|   " * (tab - 1) + "+ "
908