Salome HOME
begin fix sat test
[tools/sat.git] / src / test_module.py
1 #!/usr/bin/env python
2 #-*- coding:utf-8 -*-
3 #  Copyright (C) 2010-2013  CEA/DEN
4 #
5 #  This library is free software; you can redistribute it and/or
6 #  modify it under the terms of the GNU Lesser General Public
7 #  License as published by the Free Software Foundation; either
8 #  version 2.1 of the License.
9 #
10 #  This library is distributed in the hope that it will be useful,
11 #  but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 #  Lesser General Public License for more details.
14 #
15 #  You should have received a copy of the GNU Lesser General Public
16 #  License along with this library; if not, write to the Free Software
17 #  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
18
19 # Python 2/3 compatibility for execfile function
20 try:
21     execfile
22 except:
23     def execfile(somefile, global_vars, local_vars):
24         with open(somefile) as f:
25             code = compile(f.read(), somefile, 'exec')
26             exec(code, global_vars, local_vars)
27
28 import os
29 import sys
30 import datetime
31 import shutil
32 import string
33 import imp
34 import subprocess
35 import glob
36 import pprint as PP
37
38 verbose = False # cvw TODO
39
40 from . import fork
41 import src
42
43 # directories not considered as test grids
44 C_IGNORE_GRIDS = ['.git', '.svn', 'RESSOURCES']
45
46 DEFAULT_TIMEOUT = 150
47
48 # Get directory to be used for the temporary files.
49 #
50 def getTmpDirDEFAULT():
51     if src.architecture.is_windows():
52         directory = os.getenv("TEMP")
53     else:
54         # for Linux: use /tmp/logs/{user} folder
55         directory = os.path.join( '/tmp', 'logs', os.getenv("USER", "unknown"))
56     return directory
57
58 class Test:
59     def __init__(self,
60                  config,
61                  logger,
62                  tmp_working_dir,
63                  testbase="",
64                  grids=None,
65                  sessions=None,
66                  launcher="",
67                  show_desktop=True):
68         self.grids = grids
69         self.config = config
70         self.logger = logger
71         self.tmp_working_dir = tmp_working_dir
72         self.sessions = sessions
73         self.launcher = launcher
74         self.show_desktop = show_desktop
75
76         res = self.prepare_testbase(testbase)
77         self.test_base_found = True
78         if res == 1:
79             # Fail
80             self.test_base_found = False
81         
82         self.settings = {}
83         self.known_errors = None
84
85         # create section for results
86         self.config.TESTS = src.pyconf.Sequence(self.config)
87
88         self.nb_run = 0
89         self.nb_succeed = 0
90         self.nb_timeout = 0
91         self.nb_not_run = 0
92         self.nb_acknoledge = 0
93
94     def _copy_dir(self, source, target):
95         if self.config.VARS.python >= "2.6":
96             shutil.copytree(source, target,
97                             symlinks=True,
98                             ignore=shutil.ignore_patterns('.git*','.svn*'))
99         else:
100             shutil.copytree(source, target,
101                             symlinks=True)
102
103     def prepare_testbase_from_dir(self, testbase_name, testbase_dir):
104         self.logger.write(_("get test base from dir: %s\n") % \
105                           src.printcolors.printcLabel(testbase_dir), 3)
106         if not os.access(testbase_dir, os.X_OK):
107             raise src.SatException(_("testbase %(name)s (%(dir)s) does not "
108                                      "exist ...\n") % { 'name': testbase_name,
109                                                        'dir': testbase_dir })
110
111         self._copy_dir(testbase_dir,
112                        os.path.join(self.tmp_working_dir, 'BASES', testbase_name))
113
114     def prepare_testbase_from_git(self,
115                                   testbase_name,
116                                   testbase_base,
117                                   testbase_tag):
118         self.logger.write(
119             _("get test base '%(testbase)s' with '%(tag)s' tag from git\n") % {
120                         "testbase" : src.printcolors.printcLabel(testbase_name),
121                         "tag" : src.printcolors.printcLabel(testbase_tag)},
122                           3)
123         try:
124             def set_signal(): # pragma: no cover
125                 """see http://bugs.python.org/issue1652"""
126                 import signal
127                 signal.signal(signal.SIGPIPE, signal.SIG_DFL)
128
129             cmd = "git clone --depth 1 %(base)s %(dir)s"
130             cmd += " && cd %(dir)s"
131             if testbase_tag=='master':
132                 cmd += " && git fetch origin %(branch)s"
133             else:
134                 cmd += " && git fetch origin %(branch)s:%(branch)s"
135             cmd += " && git checkout %(branch)s"
136             cmd = cmd % { 'branch': testbase_tag,
137                          'base': testbase_base,
138                          'dir': testbase_name }
139
140             self.logger.write("> %s\n" % cmd, 5)
141             if src.architecture.is_windows():
142                 # preexec_fn not supported on windows platform
143                 res = subprocess.call(cmd,
144                                 cwd=os.path.join(self.tmp_working_dir, 'BASES'),
145                                 shell=True,
146                                 stdout=self.logger.logTxtFile,
147                                 stderr=subprocess.PIPE)
148             else:
149                 res = subprocess.call(cmd,
150                                 cwd=os.path.join(self.tmp_working_dir, 'BASES'),
151                                 shell=True,
152                                 preexec_fn=set_signal,
153                                 stdout=self.logger.logTxtFile,
154                                 stderr=subprocess.PIPE)
155             if res != 0:
156                 raise src.SatException(_("Error: unable to get test base "
157                                          "'%(name)s' from git '%(repo)s'.") % \
158                                        { 'name': testbase_name,
159                                         'repo': testbase_base })
160
161         except OSError:
162             self.logger.error(_("git is not installed. exiting...\n"))
163             sys.exit(0)
164
165     def prepare_testbase_from_svn(self, user, testbase_name, testbase_base):
166         self.logger.write(_("get test base '%s' from svn\n") % \
167                           src.printcolors.printcLabel(testbase_name), 3)
168         try:
169             def set_signal(): # pragma: no cover
170                 """see http://bugs.python.org/issue1652"""
171                 import signal
172                 signal.signal(signal.SIGPIPE, signal.SIG_DFL)
173
174             cmd = "svn checkout --username %(user)s %(base)s %(dir)s"
175             cmd = cmd % { 'user': user,
176                          'base': testbase_base,
177                          'dir': testbase_name }
178             
179             # Get the application environment
180             self.logger.write(_("Set the application environment\n"), 5)
181             env_appli = src.environment.SalomeEnviron(self.config,
182                                       src.environment.Environ(dict(os.environ)))
183             env_appli.set_application_env(self.logger)
184             
185             self.logger.write("> %s\n" % cmd, 5)
186             if src.architecture.is_windows():
187                 # preexec_fn not supported on windows platform
188                 res = subprocess.call(cmd,
189                                 cwd=os.path.join(self.tmp_working_dir, 'BASES'),
190                                 shell=True,
191                                 stdout=self.logger.logTxtFile,
192                                 stderr=subprocess.PIPE)
193             else:
194                 res = subprocess.call(cmd,
195                                 cwd=os.path.join(self.tmp_working_dir, 'BASES'),
196                                 shell=True,
197                                 preexec_fn=set_signal,
198                                 stdout=self.logger.logTxtFile,
199                                 stderr=subprocess.PIPE,
200                                 env=env_appli.environ.environ,)
201
202             if res != 0:
203                 raise src.SatException(_("Error: unable to get test base '%(nam"
204                                          "e)s' from svn '%(repo)s'.") % \
205                                        { 'name': testbase_name,
206                                         'repo': testbase_base })
207
208         except OSError:
209             self.logger.error(_("svn is not installed. exiting...\n"))
210             sys.exit(0)
211
212     ##
213     # Configure tests base.
214     def prepare_testbase(self, test_base_name):
215         src.printcolors.print_value(self.logger,
216                                     _("Test base"),
217                                     test_base_name,
218                                     3)
219         self.logger.write("\n", 3, False)
220
221         # search for the test base
222         test_base_info = None
223         for project_name in self.config.PROJECTS.projects:
224             project_info = self.config.PROJECTS.projects[project_name]
225             if "test_bases" not in project_info:
226                 continue
227             for t_b_info in project_info.test_bases:
228                 if t_b_info.name == test_base_name:
229                     test_base_info = t_b_info
230         
231         if not test_base_info:
232             if os.path.exists(test_base_name):
233                 self.prepare_testbase_from_dir("DIR", test_base_name)
234                 self.currentTestBase = "DIR"
235                 return 0
236         
237         if not test_base_info:
238             message = (_("########## ERROR: test base '%s' not found\n") % 
239                        test_base_name)
240             self.logger.write("%s\n" % src.printcolors.printcError(message))
241             return 1
242
243         if test_base_info.get_sources == "dir":
244             self.prepare_testbase_from_dir(test_base_name,
245                                            test_base_info.info.dir)
246         elif test_base_info.get_sources == "git":
247             self.prepare_testbase_from_git(test_base_name,
248                                        test_base_info.info.base,
249                                        self.config.APPLICATION.test_base.tag)
250         elif test_base_info.get_sources == "svn":
251             svn_user = src.get_cfg_param(test_base_info.info,
252                                          "svn_user",
253                                          self.config.USER.svn_user)
254             self.prepare_testbase_from_svn(svn_user,
255                                        test_base_name,
256                                        test_base_info.info.base)
257         else:
258             raise src.SatException(_("unknown source type '%(type)s' for test b"
259                                      "ase '%(base)s' ...\n") % {
260                                         'type': test_base_info.get_sources,
261                                         'base': test_base_name })
262
263         self.currentTestBase = test_base_name
264
265     ##
266     # Searches if the script is declared in known errors pyconf.
267     # Update the status if needed.
268     def search_known_errors(self, status, test_grid, test_session, test):
269         test_path = os.path.join(test_grid, test_session, test)
270         if not src.config_has_application(self.config):
271             return status, []
272
273         if self.known_errors is None:
274             return status, []
275
276         platform = self.config.VARS.arch
277         application = self.config.VARS.application
278         error = self.known_errors.get_error(test_path, application, platform)
279         if error is None:
280             return status, []
281         
282         if status == src.OK_STATUS:
283             if not error.fixed:
284                 # the error is fixed
285                 self.known_errors.fix_error(error)
286                 #import testerror
287                 #testerror.write_test_failures(
288                 #                        self.config.TOOLS.testerror.file_path,
289                 #                        self.known_errors.errors)
290             return status, [ error.date,
291                             error.expected,
292                             error.comment,
293                             error.fixed ]
294
295         if error.fixed:
296             self.known_errors.unfix_error(error)
297             #import testerror
298             #testerror.write_test_failures(self.config.TOOLS.testerror.file_path,
299             #                              self.known_errors.errors)
300
301         delta = self.known_errors.get_expecting_days(error)
302         kfres = [ error.date, error.expected, error.comment, error.fixed ]
303         if delta < 0:
304             return src.KO_STATUS, kfres
305         return src.KNOWNFAILURE_STATUS, kfres
306
307     ##
308     # Read the *.result.py files.
309     def read_results(self, listTest, has_timed_out):
310         results = {}
311         for test in listTest:
312             resfile = os.path.join(self.currentDir,
313                                    self.currentgrid,
314                                    self.currentsession,
315                                    test[:-3] + ".result.py")
316
317             # check if <test>.result.py file exists
318             if not os.path.exists(resfile):
319                 results[test] = ["?", -1, "", []]
320             else:
321                 gdic, ldic = {}, {}
322                 if verbose:
323                   print("test script: '%s':\n'%s'\n" % (resfile, open(resfile, 'r').read()))
324
325                 try:
326                   execfile(resfile, gdic, ldic)
327
328                   status = src.TIMEOUT_STATUS
329                   if not has_timed_out:
330                       status = src.KO_STATUS
331
332                   if 'status' in ldic:
333                       status = ldic['status']
334
335                   expected = []
336                   if status == src.KO_STATUS or status == src.OK_STATUS:
337                       status, expected = self.search_known_errors(status,
338                                                               self.currentgrid,
339                                                               self.currentsession,
340                                                               test)
341
342                   callback = ""
343                   if ldic.has_key('callback'):
344                       callback = ldic['callback']
345                   elif status == src.KO_STATUS:
346                       callback = "CRASH"
347                       if verbose:
348                         print("--- CRASH ldic\n%s" % PP.pformat(ldic)) # cvw TODO
349                         print("--- CRASH gdic\n%s" %  PP.pformat(gdic))
350                         pass
351
352                   exec_time = -1
353                   if 'time' in ldic:
354                       try:
355                           exec_time = float(ldic['time'])
356                       except:
357                           pass
358
359                   results[test] = [status, exec_time, callback, expected]
360
361                 except:
362                   results[test] = ["?", -1, "", []]
363                   # results[test] = [src.O_STATUS, -1, open(resfile, 'r').read(), []]
364             
365             # check if <test>.py file exists
366             testfile = os.path.join(self.currentDir,
367                                    self.currentgrid,
368                                    self.currentsession,
369                                    test)
370             
371             if not os.path.exists(testfile):
372                 results[test].append('')
373             else:
374                 text = open(testfile, "r").read()
375                 results[test].append(text)
376
377             # check if <test>.out.py file exists
378             outfile = os.path.join(self.currentDir,
379                                    self.currentgrid,
380                                    self.currentsession,
381                                    test[:-3] + ".out.py")
382             
383             if not os.path.exists(outfile):
384                 results[test].append('')
385             else:
386                 text = open(outfile, "r").read()
387                 results[test].append(text)
388
389         return results
390
391     ##
392     # Generates the script to be run by Salome.
393     # This python script includes init and close statements and a loop
394     # calling all the scripts of a single directory.
395     def generate_script(self, listTest, script_path, ignoreList):
396         # open template file
397         tFile = os.path.join(self.config.VARS.srcDir, "test", "scriptTemplate.py")
398         with open(tFile, 'r') as f:
399           template = string.Template(f.read())
400         
401         # create substitution dictionary
402         d = dict()
403         d['resourcesWay'] = os.path.join(self.currentDir, 'RESSOURCES')
404         d['tmpDir'] = os.path.join(self.tmp_working_dir, 'WORK')
405         d['toolsWay'] = os.path.join(self.config.VARS.srcDir, "test")
406         d['sessionDir'] = os.path.join(self.currentDir, self.currentgrid, self.currentsession)
407         d['resultFile'] = os.path.join(self.tmp_working_dir, 'WORK', 'exec_result')
408         d['listTest'] = listTest
409         d['sessionName'] = self.currentsession
410         d['ignore'] = ignoreList
411
412         # create script with template
413         contents = template.safe_substitute(d)
414         if verbose: print("generate_script '%s':\n%s" % (script_path, contents)) # cvw TODO
415         with open(script_path, 'w') as f:
416           f.write(contents)
417
418
419     # Find the getTmpDir function that gives access to *_pidict file directory.
420     # (the *_pidict file exists when SALOME is launched)
421     def get_tmp_dir(self):
422         # Rare case where there is no KERNEL in grid list 
423         # (for example MED_STANDALONE)
424         if ('APPLICATION' in self.config 
425                 and 'KERNEL' not in self.config.APPLICATION.products 
426                 and 'KERNEL_ROOT_DIR' not in os.environ):
427             return getTmpDirDEFAULT
428         
429         # Case where "sat test" is launched in an existing SALOME environment
430         if 'KERNEL_ROOT_DIR' in os.environ:
431             root_dir =  os.environ['KERNEL_ROOT_DIR']
432         
433         if ('APPLICATION' in self.config and
434             'KERNEL' in self.config.APPLICATION.products):
435             root_dir = src.product.get_product_config(self.config, "KERNEL").install_dir
436
437         # Case where there the appli option is called (with path to launcher)
438         if len(self.launcher) > 0:
439             # There are two cases : The old application (runAppli) 
440             # and the new one
441             launcherName = os.path.basename(self.launcher)
442             launcherDir = os.path.dirname(self.launcher)
443             if launcherName == 'runAppli':
444                 # Old application
445                 cmd = """
446 for i in %s/env.d/*.sh; 
447   do source ${i};
448 done
449 echo $KERNEL_ROOT_DIR
450 """ % launcherDir
451             else:
452                 # New application
453                 cmd = """
454 echo -e 'import os\nprint(os.environ[\"KERNEL_ROOT_DIR\"])' > tmpscript.py
455 %s shell tmpscript.py
456 """ % self.launcher
457
458             if src.architecture.is_windows():
459                 subproc_res = subprocess.Popen(cmd,
460                             stdout=subprocess.PIPE,
461                             shell=True).communicate()
462                 pass
463             else:
464                 subproc_res = subprocess.Popen(cmd,
465                             stdout=subprocess.PIPE,
466                             shell=True,
467                             executable='/bin/bash').communicate()
468                 pass
469             
470             root_dir = subproc_res[0].split()[-1]
471         
472         # import grid salome_utils from KERNEL that gives 
473         # the right getTmpDir function
474         aPath = [os.path.join(root_dir, 'bin', 'salome')]
475         sal_uts = "salome_utils"
476         try:
477             (file_, pathname, description) = imp.find_module(sal_uts, aPath )
478         except Exception:
479             msg = "inexisting %s.py in %s" % (sal_uts, aPath)
480             raise Exception(msg)
481
482         try:
483             grid = imp.load_module(sal_uts, file_, pathname, description)
484             return grid.getLogDir
485         except:
486             grid = imp.load_module(sal_uts, file_, pathname, description)
487             return grid.getTmpDir
488         finally:
489             if file_:
490                 file_.close()
491
492
493     def get_test_timeout(self, test_name, default_value):
494         if ("timeout" in self.settings and 
495                 test_name in self.settings["timeout"]):
496             return self.settings["timeout"][test_name]
497
498         return default_value
499
500     def generate_launching_commands(self):
501         # Case where "sat test" is launched in an existing SALOME environment
502         if 'KERNEL_ROOT_DIR' in os.environ:
503             binSalome = "runSalome"
504             binPython = "python"
505             killSalome = "killSalome.py"
506         
507         # Rare case where there is no KERNEL in grid list 
508         # (for example MED_STANDALONE)
509         if ('APPLICATION' in self.config and 
510                 'KERNEL' not in self.config.APPLICATION.products):
511             binSalome = "runSalome"
512             binPython = "python" 
513             killSalome = "killSalome.py"   
514             src.environment.load_environment(self.config, False, self.logger)         
515             return binSalome, binPython, killSalome
516         
517         # Case where there the appli option is called (with path to launcher)
518         if len(self.launcher) > 0:
519             # There are two cases : The old application (runAppli) 
520             # and the new one
521             launcherName = os.path.basename(self.launcher)
522             launcherDir = os.path.dirname(self.launcher)
523             if launcherName == 'runAppli':
524                 # Old application
525                 binSalome = self.launcher
526                 binPython = ("for i in " +
527                              launcherDir +
528                              "/env.d/*.sh; do source ${i}; done ; python")
529                 killSalome = ("for i in " +
530                         launcherDir +
531                         "/env.d/*.sh; do source ${i}; done ; killSalome.py'")
532                 return binSalome, binPython, killSalome
533             else:
534                 # New application
535                 binSalome = self.launcher
536                 binPython = self.launcher + ' shell'
537                 killSalome = self.launcher + ' killall'
538                 return binSalome, binPython, killSalome
539
540         # SALOME version detection and APPLI repository detection
541         VersionSalome = src.get_salome_version(self.config)
542         appdir = 'APPLI'
543         if "APPLI" in self.config and "application_name" in self.config.APPLI:
544             appdir = self.config.APPLI.application_name
545         
546         # Case where SALOME has NOT the launcher that uses the SalomeContext API
547         if VersionSalome < 730:
548             binSalome = os.path.join(self.config.APPLICATION.workdir,
549                                      appdir,
550                                      "runAppli")
551             binPython = "python"
552             killSalome = "killSalome.py"
553             src.environment.load_environment(self.config, False, self.logger)           
554             return binSalome, binPython, killSalome
555         
556         # Case where SALOME has the launcher that uses the SalomeContext API
557         else:            
558             launcher_name = src.get_launcher_name(self.config)
559             binSalome = os.path.join(self.config.APPLICATION.workdir,
560                                      launcher_name)
561             
562             binPython = binSalome + ' shell'
563             killSalome = binSalome + ' killall'
564             return binSalome, binPython, killSalome
565                 
566         return binSalome, binPython, killSalome
567         
568
569     ##
570     # Runs tests of a session (using a single instance of Salome).
571     def run_tests(self, listTest, ignoreList):
572         out_path = os.path.join(self.currentDir,
573                                 self.currentgrid,
574                                 self.currentsession)
575         if verbose: print("run_tests '%s'\nlistTest: %s\nignoreList: %s" %
576                    (self.currentDir, PP.pformat(listTest), PP.pformat(ignoreList))) # cvw TODO
577         sessionname = "%s/%s" % (self.currentgrid, self.currentsession)
578         time_out = self.get_test_timeout(sessionname,
579                                          DEFAULT_TIMEOUT)
580
581         time_out_salome = DEFAULT_TIMEOUT
582
583         # generate wrapper script
584         script_path = os.path.join(out_path, 'wrapperScript.py')
585         self.generate_script(listTest, script_path, ignoreList)
586
587         tmpDir = self.get_tmp_dir()
588
589         binSalome, binPython, killSalome = self.generate_launching_commands()
590         if self.settings.has_key("run_with_grids") and \
591            sessionname in self.settings["run_with_grids"]:
592             binSalome = (binSalome + " -m %s" % self.settings["run_with_grids"][sessionname])
593
594         logWay = os.path.join(self.tmp_working_dir, "WORK", "log_cxx")
595
596         status = False
597         elapsed = -1
598         if self.currentsession.startswith("NOGUI_"):
599             # runSalome -t (bash)
600             status, elapsed = fork.batch(
601                                 binSalome,
602                                 self.logger,
603                                 os.path.join(self.tmp_working_dir, "WORK"),
604                                 [ "-t", "--shutdown-server=1", script_path ],
605                                 delai=time_out,
606                                 log=logWay)
607
608         elif self.currentsession.startswith("PY_"):
609             # python script.py
610             status, elapsed = fork.batch(
611                                 binPython,
612                                 self.logger,
613                                 os.path.join(self.tmp_working_dir, "WORK"),
614                                 [script_path],
615                                 delai=time_out,
616                                 log=logWay)
617
618         else:
619             opt = "-z 0"
620             if self.show_desktop: opt = "--show-desktop=0"
621             status, elapsed = fork.batch_salome(
622                                 binSalome,
623                                 self.logger,
624                                 os.path.join( self.tmp_working_dir, "WORK"),
625                                 [ opt, "--shutdown-server=1", script_path ],
626                                 getTmpDir=tmpDir,
627                                 fin=killSalome,
628                                 delai=time_out,
629                                 log=logWay,
630                                 delaiapp=time_out_salome)
631
632         self.logger.write("status = %s, elapsed = %s\n" % (status, elapsed), 5)
633
634         # create the test result to add in the config object
635         test_info = src.pyconf.Mapping(self.config)
636         test_info.testbase = self.currentTestBase
637         test_info.grid = self.currentgrid
638         test_info.session = self.currentsession
639         test_info.script = src.pyconf.Sequence(self.config)
640
641         script_results = self.read_results(listTest, elapsed == time_out)
642         for sr in sorted(script_results.keys()):
643             self.nb_run += 1
644
645             # create script result
646             script_info = src.pyconf.Mapping(self.config)
647             script_info.name = sr
648             script_info.res = script_results[sr][0]
649             script_info.time = script_results[sr][1]
650             if script_info.res == src.TIMEOUT_STATUS:
651                 script_info.time = time_out
652             if script_info.time < 1e-3: script_info.time = 0
653
654             callback = script_results[sr][2]
655             if script_info.res != src.OK_STATUS and len(callback) > 0:
656                 script_info.callback = callback
657
658             kfres = script_results[sr][3]
659             if len(kfres) > 0:
660                 script_info.known_error = src.pyconf.Mapping(self.config)
661                 script_info.known_error.date = kfres[0]
662                 script_info.known_error.expected = kfres[1]
663                 script_info.known_error.comment = kfres[2]
664                 script_info.known_error.fixed = kfres[3]
665             
666             script_info.content = script_results[sr][4]
667             script_info.out = script_results[sr][5]
668             
669             # add it to the list of results
670             test_info.script.append(script_info, '')
671
672             # display the results
673             if script_info.time > 0:
674                 exectime = "(%7.3f s)" % script_info.time
675             else:
676                 exectime = ""
677
678             sp = "." * (35 - len(script_info.name))
679             self.logger.write(self.write_test_margin(3), 3)
680             self.logger.write("script %s %s %s %s\n" % (
681                                 src.printcolors.printcLabel(script_info.name),
682                                 sp,
683                                 src.printcolors.printc(script_info.res),
684                                 exectime), 3, False)
685             if script_info and len(callback) > 0:
686                 self.logger.write("Exception in %s\n%s\n" % \
687                     (script_info.name,
688                      src.printcolors.printcWarning(callback)), 2, False)
689
690             if script_info.res == src.OK_STATUS:
691                 self.nb_succeed += 1
692             elif script_info.res == src.KNOWNFAILURE_STATUS:
693                 self.nb_acknoledge += 1
694             elif script_info.res == src.TIMEOUT_STATUS:
695                 self.nb_timeout += 1
696             elif script_info.res == src.NA_STATUS:
697                 self.nb_run -= 1
698             elif script_info.res == "?":
699                 self.nb_not_run += 1
700                 
701
702         self.config.TESTS.append(test_info, '')
703
704     ##
705     # Runs all tests of a session.
706     def run_session_tests(self):
707        
708         self.logger.write(self.write_test_margin(2), 3)
709         self.logger.write("Session = %s\n" % src.printcolors.printcLabel(
710                                                     self.currentsession), 3, False)
711
712         # prepare list of tests to run
713         tests = os.listdir(os.path.join(self.currentDir,
714                                         self.currentgrid,
715                                         self.currentsession))
716         # avoid result files of previous tests, if presents
717         # tests = filter(lambda l: l.endswith(".py"), tests)
718         tests = [t for t in tests if t.endswith(".py") \
719                    and not ( t.endswith(".out.py") or \
720                              t.endswith(".result.py") or \
721                              t.endswith("wrapperScript.py") \
722                            ) ]
723         tests = sorted(tests, key=str.lower)
724
725         # build list of known failures
726         cat = "%s/%s/" % (self.currentgrid, self.currentsession)
727         ignoreDict = {}
728         for k in self.ignore_tests.keys():
729             if k.startswith(cat):
730                 ignoreDict[k[len(cat):]] = self.ignore_tests[k]
731
732         self.run_tests(tests, ignoreDict)
733
734     ##
735     # Runs all tests of a grid.
736     def run_grid_tests(self):
737         self.logger.write(self.write_test_margin(1), 3)
738         self.logger.write("grid = %s\n" % src.printcolors.printcLabel(
739                                                 self.currentgrid), 3, False)
740
741         grid_path = os.path.join(self.currentDir, self.currentgrid)
742
743         sessions = []
744         if self.sessions is not None:
745             sessions = self.sessions # user choice
746         else:
747             # use all scripts in grid
748             sessions = filter(lambda l: l not in C_IGNORE_GRIDS,
749                            os.listdir(grid_path))
750             sessions = filter(lambda l: os.path.isdir(os.path.join(grid_path,
751                                                                 l)), sessions)
752
753         sessions = sorted(sessions, key=str.lower)
754         existingSessions = self.getSubDirectories(grid_path)
755         for session_ in sessions:
756             if not os.path.exists(os.path.join(grid_path, session_)):
757                 self.logger.write(self.write_test_margin(2), 3)
758                 msg = """\
759 Session '%s' not found
760 Existing sessions are:
761 %s
762 """ % (session_, PP.pformat(sorted(existingSessions)))
763                 self.logger.write(src.printcolors.printcWarning(msg), 3, False)
764             else:
765                 self.currentsession = session_
766                 self.run_session_tests()
767
768     def getSubDirectories(self, aDir):
769         """
770         get names of first level of sub directories in aDir
771         excluding '.git' etc as beginning with '.'
772         """
773         res = os.listdir(aDir)
774         res = [d for d in res if os.path.isdir(os.path.join(aDir, d)) and d[0] != '.']
775         # print("getSubDirectories %s are:\n%s" % (aDir, PP.pformat(res)))
776         return res
777
778     ##
779     # Runs test testbase.
780     def run_testbase_tests(self):
781         res_dir = os.path.join(self.currentDir, "RESSOURCES")
782         os.environ['PYTHONPATH'] =  (res_dir + 
783                                      os.pathsep + 
784                                      os.environ['PYTHONPATH'])
785         os.environ['TT_BASE_RESSOURCES'] = res_dir
786         src.printcolors.print_value(self.logger,
787                                     "TT_BASE_RESSOURCES",
788                                     res_dir,
789                                     4)
790         self.logger.write("\n", 4, False)
791
792         self.logger.write(self.write_test_margin(0), 3)
793         testbase_label = "Test base = %s\n" % src.printcolors.printcLabel(
794                                                         self.currentTestBase)
795         self.logger.write(testbase_label, 3, False)
796         self.logger.write("-" * len(src.printcolors.cleancolor(testbase_label)),
797                           3)
798         self.logger.write("\n", 3, False)
799
800         # load settings
801         settings_file = os.path.join(res_dir, "test_settings.py")
802         if os.path.exists(settings_file):
803             gdic, ldic = {}, {}
804             execfile(settings_file, gdic, ldic)
805             self.logger.write("Load test settings '%s'\n" % settings_file, 5)
806             self.settings = ldic['settings_dic']
807             self.ignore_tests = ldic['known_failures_list']
808             if isinstance(self.ignore_tests, list):
809                 self.ignore_tests = {}
810                 self.logger.write(src.printcolors.printcWarning(
811                   "known_failures_list must be a dictionary (not a list)") + "\n", 1, False)
812         else:
813             self.ignore_tests = {}
814             self.settings.clear()
815
816         # read known failures pyconf
817         if "testerror" in self.config.LOCAL:
818             #import testerror
819             #self.known_errors = testerror.read_test_failures(
820             #                            self.config.TOOLS.testerror.file_path,
821             #                            do_error=False)
822             pass
823         else:
824             self.known_errors = None
825
826         if self.grids is not None:
827             grids = self.grids # given by user
828         else:
829             # select all the grids (i.e. directories) in the directory
830             grids = filter(lambda l: l not in C_IGNORE_GRIDS,
831                              os.listdir(self.currentDir))
832             grids = filter(lambda l: os.path.isdir(
833                                         os.path.join(self.currentDir, l)),
834                              grids)
835
836         grids = sorted(grids, key=str.lower)
837         existingGrids = self.getSubDirectories(self.currentDir)
838         for grid in grids:
839             if not os.path.exists(os.path.join(self.currentDir, grid)):
840                 self.logger.write(self.write_test_margin(1), 3)
841                 msg = """\
842 Grid '%s' does not exist
843 Existing grids are:
844 %s
845 """ % (grid, PP.pformat(sorted(existingGrids)))
846                 self.logger.write(src.printcolors.printcWarning(msg), 3, False)
847             else:
848                 self.currentgrid = grid
849                 self.run_grid_tests()
850
851     def run_script(self, script_name):
852         if ('APPLICATION' in self.config and
853                 script_name in self.config.APPLICATION):
854             script = self.config.APPLICATION[script_name]
855             if len(script) == 0:
856                 return
857
858             self.logger.write("\n", 2, False)
859             if not os.path.exists(script):
860                 self.logger.write(src.printcolors.printcWarning("WARNING: scrip"
861                                         "t not found: %s" % script) + "\n", 2)
862             else:
863                 self.logger.write(src.printcolors.printcHeader("----------- sta"
864                                             "rt %s" % script_name) + "\n", 2)
865                 self.logger.write("Run script: %s\n" % script, 2)
866                 subprocess.Popen(script, shell=True).wait()
867                 self.logger.write(src.printcolors.printcHeader("----------- end"
868                                                 " %s" % script_name) + "\n", 2)
869
870     def run_all_tests(self):
871         initTime = datetime.datetime.now()
872
873         self.run_script('test_setup')
874         self.logger.write("\n", 2, False)
875
876         self.logger.write(src.printcolors.printcHeader(
877                                             _("=== STARTING TESTS")) + "\n", 2)
878         self.logger.write("\n", 2, False)
879         self.currentDir = os.path.join(self.tmp_working_dir,
880                                        'BASES',
881                                        self.currentTestBase)
882         self.run_testbase_tests()
883
884         # calculate total execution time
885         totalTime = datetime.datetime.now() - initTime
886         totalTime -= datetime.timedelta(microseconds=totalTime.microseconds)
887         self.logger.write("\n", 2, False)
888         self.logger.write(src.printcolors.printcHeader(_("=== END TESTS")), 2)
889         self.logger.write(" %s\n" % src.printcolors.printcInfo(str(totalTime)),
890                           2,
891                           False)
892
893         #
894         # Start the tests
895         #
896         self.run_script('test_cleanup')
897         self.logger.write("\n", 2, False)
898
899         # evaluate results
900
901         res_out = _("Tests Results: %(succeed)d / %(total)d\n") % \
902             { 'succeed': self.nb_succeed, 'total': self.nb_run }
903         if self.nb_succeed == self.nb_run:
904             res_out = src.printcolors.printcSuccess(res_out)
905         else:
906             res_out = src.printcolors.printcError(res_out)
907         self.logger.write(res_out, 1)
908
909         if self.nb_timeout > 0:
910             self.logger.write(_("%d tests TIMEOUT\n") % self.nb_timeout, 1)
911         if self.nb_not_run > 0:
912             self.logger.write(_("%d tests not executed\n") % self.nb_not_run, 1)
913         if self.nb_acknoledge > 0:
914             self.logger.write(_("%d tests known failures\n") % self.nb_acknoledge, 1)
915
916         status = src.OK_STATUS
917         if self.nb_run - self.nb_succeed - self.nb_acknoledge > 0:
918             status = src.KO_STATUS
919         elif self.nb_acknoledge:
920             status = src.KNOWNFAILURE_STATUS
921         
922         self.logger.write(_("Status: %s\n" % status), 3)
923
924         return self.nb_run - self.nb_succeed - self.nb_acknoledge
925
926     ##
927     # Write margin to show test results.
928     def write_test_margin(self, tab):
929         if tab == 0:
930             return ""
931         return "|   " * (tab - 1) + "+ "
932