Salome HOME
style: black format
[tools/sat.git] / src / test_module.py
1 #!/usr/bin/env python
2 # -*- coding:utf-8 -*-
3 #  Copyright (C) 2010-2013  CEA/DEN
4 #
5 #  This library is free software; you can redistribute it and/or
6 #  modify it under the terms of the GNU Lesser General Public
7 #  License as published by the Free Software Foundation; either
8 #  version 2.1 of the License.
9 #
10 #  This library is distributed in the hope that it will be useful,
11 #  but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 #  Lesser General Public License for more details.
14 #
15 #  You should have received a copy of the GNU Lesser General Public
16 #  License along with this library; if not, write to the Free Software
17 #  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
18
19 # Python 2/3 compatibility for execfile function
20 try:
21     execfile
22 except:
23
24     def execfile(somefile, global_vars, local_vars):
25         with open(somefile) as f:
26             code = compile(f.read(), somefile, "exec")
27             exec(code, global_vars, local_vars)
28
29
30 import os
31 import sys
32 import datetime
33 import shutil
34 import string
35 import imp
36 import subprocess
37 import glob
38 import pprint as PP
39
40 verbose = False
41
42 from . import fork
43 import src
44 from src.versionMinorMajorPatch import MinorMajorPatch as MMP
45
46 # directories not considered as test grids
47 C_IGNORE_GRIDS = [".git", ".svn", "RESSOURCES"]
48
49 DEFAULT_TIMEOUT = 150
50
51 # Get directory to be used for the temporary files.
52 #
53 def getTmpDirDEFAULT():
54     if src.architecture.is_windows():
55         directory = os.getenv("TEMP")
56     else:
57         # for Linux: use /tmp/logs/{user} folder
58         directory = os.path.join("/tmp", "logs", os.getenv("USER", "unknown"))
59     return directory
60
61
62 class Test:
63     def __init__(
64         self,
65         config,
66         logger,
67         tmp_working_dir,
68         testbase="",
69         grids=None,
70         sessions=None,
71         launcher="",
72         show_desktop=True,
73     ):
74         self.grids = grids
75         self.config = config
76         self.logger = logger
77         self.tmp_working_dir = tmp_working_dir
78         self.sessions = sessions
79         self.launcher = launcher
80         self.show_desktop = show_desktop
81
82         res = self.prepare_testbase(testbase)
83         self.test_base_found = True
84         if res == 1:
85             # Fail
86             self.test_base_found = False
87
88         self.settings = {}
89         self.known_errors = None
90
91         # create section for results
92         self.config.TESTS = src.pyconf.Sequence(self.config)
93
94         self.nb_run = 0
95         self.nb_succeed = 0
96         self.nb_timeout = 0
97         self.nb_not_run = 0
98         self.nb_acknoledge = 0
99
100     def _copy_dir(self, source, target):
101         if self.config.VARS.python >= "2.6":
102             shutil.copytree(
103                 source,
104                 target,
105                 symlinks=True,
106                 ignore=shutil.ignore_patterns(".git*", ".svn*"),
107             )
108         else:
109             shutil.copytree(source, target, symlinks=True)
110
111     def prepare_testbase_from_dir(self, testbase_name, testbase_dir):
112         self.logger.write(
113             _("get test base from dir: %s\n")
114             % src.printcolors.printcLabel(testbase_dir),
115             3,
116         )
117         if not os.access(testbase_dir, os.X_OK):
118             raise src.SatException(
119                 _("testbase %(name)s (%(dir)s) does not " "exist ...\n")
120                 % {"name": testbase_name, "dir": testbase_dir}
121             )
122
123         self._copy_dir(
124             testbase_dir, os.path.join(self.tmp_working_dir, "BASES", testbase_name)
125         )
126
127     def prepare_testbase_from_git(self, testbase_name, testbase_base, testbase_tag):
128         self.logger.write(
129             _("get test base '%(testbase)s' with '%(tag)s' tag from git\n")
130             % {
131                 "testbase": src.printcolors.printcLabel(testbase_name),
132                 "tag": src.printcolors.printcLabel(testbase_tag),
133             },
134             3,
135         )
136         try:
137
138             def set_signal():  # pragma: no cover
139                 """see http://bugs.python.org/issue1652"""
140                 import signal
141
142                 signal.signal(signal.SIGPIPE, signal.SIG_DFL)
143
144             cmd = "git clone --depth 1 %(base)s %(dir)s"
145             cmd += " && cd %(dir)s"
146             if testbase_tag == "master":
147                 cmd += " && git fetch origin %(branch)s"
148             else:
149                 cmd += " && git fetch origin %(branch)s:%(branch)s"
150             cmd += " && git checkout %(branch)s"
151             cmd = cmd % {
152                 "branch": testbase_tag,
153                 "base": testbase_base,
154                 "dir": testbase_name,
155             }
156
157             self.logger.write("> %s\n" % cmd, 5)
158             if src.architecture.is_windows():
159                 # preexec_fn not supported on windows platform
160                 res = subprocess.call(
161                     cmd,
162                     cwd=os.path.join(self.tmp_working_dir, "BASES"),
163                     shell=True,
164                     stdout=self.logger.logTxtFile,
165                     stderr=subprocess.PIPE,
166                 )
167             else:
168                 res = subprocess.call(
169                     cmd,
170                     cwd=os.path.join(self.tmp_working_dir, "BASES"),
171                     shell=True,
172                     preexec_fn=set_signal,
173                     stdout=self.logger.logTxtFile,
174                     stderr=subprocess.PIPE,
175                 )
176             if res != 0:
177                 raise src.SatException(
178                     _(
179                         "Error: unable to get test base "
180                         "'%(name)s' from git '%(repo)s'."
181                     )
182                     % {"name": testbase_name, "repo": testbase_base}
183                 )
184
185         except OSError:
186             self.logger.error(_("git is not installed. exiting...\n"))
187             sys.exit(0)
188
189     def prepare_testbase_from_svn(self, user, testbase_name, testbase_base):
190         self.logger.write(
191             _("get test base '%s' from svn\n")
192             % src.printcolors.printcLabel(testbase_name),
193             3,
194         )
195         try:
196
197             def set_signal():  # pragma: no cover
198                 """see http://bugs.python.org/issue1652"""
199                 import signal
200
201                 signal.signal(signal.SIGPIPE, signal.SIG_DFL)
202
203             cmd = "svn checkout --username %(user)s %(base)s %(dir)s"
204             cmd = cmd % {"user": user, "base": testbase_base, "dir": testbase_name}
205
206             # Get the application environment
207             self.logger.write(_("Set the application environment\n"), 5)
208             env_appli = src.environment.SalomeEnviron(
209                 self.config, src.environment.Environ(dict(os.environ))
210             )
211             env_appli.set_application_env(self.logger)
212
213             self.logger.write("> %s\n" % cmd, 5)
214             if src.architecture.is_windows():
215                 # preexec_fn not supported on windows platform
216                 res = subprocess.call(
217                     cmd,
218                     cwd=os.path.join(self.tmp_working_dir, "BASES"),
219                     shell=True,
220                     stdout=self.logger.logTxtFile,
221                     stderr=subprocess.PIPE,
222                 )
223             else:
224                 res = subprocess.call(
225                     cmd,
226                     cwd=os.path.join(self.tmp_working_dir, "BASES"),
227                     shell=True,
228                     preexec_fn=set_signal,
229                     stdout=self.logger.logTxtFile,
230                     stderr=subprocess.PIPE,
231                     env=env_appli.environ.environ,
232                 )
233
234             if res != 0:
235                 raise src.SatException(
236                     _(
237                         "Error: unable to get test base '%(nam"
238                         "e)s' from svn '%(repo)s'."
239                     )
240                     % {"name": testbase_name, "repo": testbase_base}
241                 )
242
243         except OSError:
244             self.logger.error(_("svn is not installed. exiting...\n"))
245             sys.exit(0)
246
247     ##
248     # Configure tests base.
249     def prepare_testbase(self, test_base_name):
250         src.printcolors.print_value(self.logger, _("Test base"), test_base_name, 3)
251         self.logger.write("\n", 3, False)
252
253         # search for the test base
254         test_base_info = None
255         for project_name in self.config.PROJECTS.projects:
256             project_info = self.config.PROJECTS.projects[project_name]
257             if "test_bases" not in project_info:
258                 continue
259             for t_b_info in project_info.test_bases:
260                 if t_b_info.name == test_base_name:
261                     test_base_info = t_b_info
262
263         if not test_base_info:
264             if os.path.exists(test_base_name):
265                 self.prepare_testbase_from_dir("DIR", test_base_name)
266                 self.currentTestBase = "DIR"
267                 return 0
268
269         if not test_base_info:
270             message = _("########## ERROR: test base '%s' not found\n") % test_base_name
271             self.logger.write("%s\n" % src.printcolors.printcError(message))
272             return 1
273
274         if test_base_info.get_sources == "dir":
275             self.prepare_testbase_from_dir(test_base_name, test_base_info.info.dir)
276         elif test_base_info.get_sources == "git":
277             self.prepare_testbase_from_git(
278                 test_base_name,
279                 test_base_info.info.base,
280                 self.config.APPLICATION.test_base.tag,
281             )
282         elif test_base_info.get_sources == "svn":
283             svn_user = src.get_cfg_param(
284                 test_base_info.info, "svn_user", self.config.USER.svn_user
285             )
286             self.prepare_testbase_from_svn(
287                 svn_user, test_base_name, test_base_info.info.base
288             )
289         else:
290             raise src.SatException(
291                 _("unknown source type '%(type)s' for test b" "ase '%(base)s' ...\n")
292                 % {"type": test_base_info.get_sources, "base": test_base_name}
293             )
294
295         self.currentTestBase = test_base_name
296
297     ##
298     # Searches if the script is declared in known errors pyconf.
299     # Update the status if needed.
300     def search_known_errors(self, status, test_grid, test_session, test):
301         test_path = os.path.join(test_grid, test_session, test)
302         if not src.config_has_application(self.config):
303             return status, []
304
305         if self.known_errors is None:
306             return status, []
307
308         platform = self.config.VARS.arch
309         application = self.config.VARS.application
310         error = self.known_errors.get_error(test_path, application, platform)
311         if error is None:
312             return status, []
313
314         if status == src.OK_STATUS:
315             if not error.fixed:
316                 # the error is fixed
317                 self.known_errors.fix_error(error)
318                 # import testerror
319                 # testerror.write_test_failures(
320                 #                        self.config.TOOLS.testerror.file_path,
321                 #                        self.known_errors.errors)
322             return status, [error.date, error.expected, error.comment, error.fixed]
323
324         if error.fixed:
325             self.known_errors.unfix_error(error)
326             # import testerror
327             # testerror.write_test_failures(self.config.TOOLS.testerror.file_path,
328             #                              self.known_errors.errors)
329
330         delta = self.known_errors.get_expecting_days(error)
331         kfres = [error.date, error.expected, error.comment, error.fixed]
332         if delta < 0:
333             return src.KO_STATUS, kfres
334         return src.KNOWNFAILURE_STATUS, kfres
335
336     ##
337     # Read the *.result.py files.
338     def read_results(self, listTest, has_timed_out):
339         results = {}
340         for test in listTest:
341             resfile = os.path.join(
342                 self.currentDir,
343                 self.currentgrid,
344                 self.currentsession,
345                 test[:-3] + ".result.py",
346             )
347
348             # check if <test>.result.py file exists
349             if not os.path.exists(resfile):
350                 results[test] = ["?", -1, "", []]
351             else:
352                 gdic, ldic = {}, {}
353                 if verbose:
354                     print(
355                         "test script: '%s':\n'%s'\n"
356                         % (resfile, open(resfile, "r").read())
357                     )
358
359                 try:
360                     execfile(resfile, gdic, ldic)
361
362                     status = src.TIMEOUT_STATUS
363                     if not has_timed_out:
364                         status = src.KO_STATUS
365
366                     if "status" in ldic:
367                         status = ldic["status"]
368
369                     expected = []
370                     if status == src.KO_STATUS or status == src.OK_STATUS:
371                         status, expected = self.search_known_errors(
372                             status, self.currentgrid, self.currentsession, test
373                         )
374
375                     callback = ""
376                     if "callback" in ldic:
377                         callback = ldic["callback"]
378                     elif status == src.KO_STATUS:
379                         callback = "CRASH"
380                         if verbose:
381                             print("--- CRASH ldic\n%s" % PP.pformat(ldic))  # cvw TODO
382                             print("--- CRASH gdic\n%s" % PP.pformat(gdic))
383                             pass
384
385                     exec_time = -1
386                     if "time" in ldic:
387                         try:
388                             exec_time = float(ldic["time"])
389                         except:
390                             pass
391
392                     results[test] = [status, exec_time, callback, expected]
393
394                 except:
395                     results[test] = ["?", -1, "", []]
396                     # results[test] = [src.O_STATUS, -1, open(resfile, 'r').read(), []]
397
398             # check if <test>.py file exists
399             testfile = os.path.join(
400                 self.currentDir, self.currentgrid, self.currentsession, test
401             )
402
403             if not os.path.exists(testfile):
404                 results[test].append("")
405             else:
406                 text = open(testfile, "r").read()
407                 results[test].append(text)
408
409             # check if <test>.out.py file exists
410             outfile = os.path.join(
411                 self.currentDir,
412                 self.currentgrid,
413                 self.currentsession,
414                 test[:-3] + ".out.py",
415             )
416
417             if not os.path.exists(outfile):
418                 results[test].append("")
419             else:
420                 text = open(outfile, "r").read()
421                 results[test].append(text)
422
423         return results
424
425     ##
426     # Generates the script to be run by Salome.
427     # This python script includes init and close statements and a loop
428     # calling all the scripts of a single directory.
429     def generate_script(self, listTest, script_path, ignoreList):
430         # open template file
431         tFile = os.path.join(self.config.VARS.srcDir, "test", "scriptTemplate.py")
432         with open(tFile, "r") as f:
433             template = string.Template(f.read())
434
435         # create substitution dictionary
436         d = dict()
437         d["resourcesWay"] = os.path.join(self.currentDir, "RESSOURCES")
438         d["tmpDir"] = os.path.join(self.tmp_working_dir, "WORK")
439         d["toolsWay"] = os.path.join(self.config.VARS.srcDir, "test")
440         d["sessionDir"] = os.path.join(
441             self.currentDir, self.currentgrid, self.currentsession
442         )
443         d["resultFile"] = os.path.join(self.tmp_working_dir, "WORK", "exec_result")
444         d["listTest"] = listTest
445         d["sessionName"] = self.currentsession
446         d["ignore"] = ignoreList
447
448         # create script with template
449         contents = template.safe_substitute(d)
450         if verbose:
451             print("generate_script '%s':\n%s" % (script_path, contents))  # cvw TODO
452         with open(script_path, "w") as f:
453             f.write(contents)
454
455     # Find the getTmpDir function that gives access to *_pidict file directory.
456     # (the *_pidict file exists when SALOME is launched)
457     def get_tmp_dir(self):
458         # Rare case where there is no KERNEL in grid list
459         # (for example MED_STANDALONE)
460         if (
461             "APPLICATION" in self.config
462             and "KERNEL" not in self.config.APPLICATION.products
463             and "KERNEL_ROOT_DIR" not in os.environ
464         ):
465             return getTmpDirDEFAULT
466
467         # Case where "sat test" is launched in an existing SALOME environment
468         if "KERNEL_ROOT_DIR" in os.environ:
469             root_dir = os.environ["KERNEL_ROOT_DIR"]
470
471         if (
472             "APPLICATION" in self.config
473             and "KERNEL" in self.config.APPLICATION.products
474         ):
475             root_dir = src.product.get_product_config(self.config, "KERNEL").install_dir
476
477         # Case where there the appli option is called (with path to launcher)
478         if len(self.launcher) > 0:
479             # There are two cases : The old application (runAppli)
480             # and the new one
481             launcherName = os.path.basename(self.launcher)
482             launcherDir = os.path.dirname(self.launcher)
483             if launcherName == "runAppli":
484                 # Old application
485                 cmd = (
486                     """
487 for i in %s/env.d/*.sh;
488   do source ${i};
489 done
490 echo $KERNEL_ROOT_DIR
491 """
492                     % launcherDir
493                 )
494             else:
495                 # New application
496                 cmd = (
497                     """
498 echo -e 'import os\nprint(os.environ[\"KERNEL_ROOT_DIR\"])' > tmpscript.py
499 %s shell tmpscript.py
500 """
501                     % self.launcher
502                 )
503
504             if src.architecture.is_windows():
505                 subproc_res = subprocess.Popen(
506                     cmd, stdout=subprocess.PIPE, shell=True
507                 ).communicate()
508                 pass
509             else:
510                 subproc_res = subprocess.Popen(
511                     cmd, stdout=subprocess.PIPE, shell=True, executable="/bin/bash"
512                 ).communicate()
513                 pass
514
515             root_dir = subproc_res[0].split()[-1]
516
517         # import grid salome_utils from KERNEL that gives
518         # the right getTmpDir function
519         root_dir = root_dir.decode("utf-8")
520         aPath = [os.path.join(root_dir, "bin", "salome")]
521         sal_uts = "salome_utils"
522         try:
523             (file_, pathname, description) = imp.find_module(sal_uts, aPath)
524         except Exception:
525             msg = "inexisting %s.py in %s" % (sal_uts, aPath)
526             raise Exception(msg)
527
528         try:
529             grid = imp.load_module(sal_uts, file_, pathname, description)
530             return grid.getLogDir
531         except:
532             grid = imp.load_module(sal_uts, file_, pathname, description)
533             return grid.getTmpDir
534         finally:
535             if file_:
536                 file_.close()
537
538     def get_test_timeout(self, test_name, default_value):
539         if "timeout" in self.settings and test_name in self.settings["timeout"]:
540             return self.settings["timeout"][test_name]
541
542         return default_value
543
544     def generate_launching_commands(self):
545
546         # Case where there the appli option is called (with path to launcher)
547         if len(self.launcher) > 0:
548             # There are two cases : The old application (runAppli)
549             # and the new one
550             launcherName = os.path.basename(self.launcher)
551             launcherDir = os.path.dirname(self.launcher)
552             if os.path.basename(launcherDir) == "APPLI":
553                 # Old application
554                 binSalome = self.launcher
555                 binPython = (
556                     "for i in "
557                     + launcherDir
558                     + "/env.d/*.sh; do source ${i}; done ; python"
559                 )
560                 killSalome = (
561                     "for i in "
562                     + launcherDir
563                     + "/env.d/*.sh; do source ${i}; done ; killSalome.py'"
564                 )
565                 return binSalome, binPython, killSalome
566             else:
567                 # New application
568                 binSalome = self.launcher
569                 binPython = self.launcher + " shell"
570                 killSalome = self.launcher + " killall"
571                 return binSalome, binPython, killSalome
572
573         # SALOME version detection and APPLI repository detection
574         VersionSalome = src.get_salome_version(self.config)
575         appdir = "APPLI"
576         if "APPLI" in self.config and "application_name" in self.config.APPLI:
577             appdir = self.config.APPLI.application_name
578
579         # Case where SALOME has NOT the launcher that uses the SalomeContext API
580         if VersionSalome < MMP([7, 3, 0]):
581             binSalome = os.path.join(
582                 self.config.APPLICATION.workdir, appdir, "runAppli"
583             )
584             binPython = "python"
585             killSalome = "killSalome.py"
586             src.environment.load_environment(self.config, False, self.logger)
587             return binSalome, binPython, killSalome
588
589         # Case where SALOME has the launcher that uses the SalomeContext API
590         else:
591             launcher_name = src.get_launcher_name(self.config)
592             binSalome = os.path.join(self.config.APPLICATION.workdir, launcher_name)
593
594             binPython = binSalome + " shell"
595             killSalome = binSalome + " killall"
596             return binSalome, binPython, killSalome
597
598         return binSalome, binPython, killSalome
599
600     ##
601     # Runs tests of a session (using a single instance of Salome).
602     def run_tests(self, listTest, ignoreList):
603         out_path = os.path.join(self.currentDir, self.currentgrid, self.currentsession)
604         if verbose:
605             print(
606                 "run_tests '%s'\nlistTest: %s\nignoreList: %s"
607                 % (self.currentDir, PP.pformat(listTest), PP.pformat(ignoreList))
608             )  # cvw TODO
609         sessionname = "%s/%s" % (self.currentgrid, self.currentsession)
610         time_out = self.get_test_timeout(sessionname, DEFAULT_TIMEOUT)
611
612         time_out_salome = DEFAULT_TIMEOUT
613
614         # generate wrapper script
615         script_path = os.path.join(out_path, "wrapperScript.py")
616         self.generate_script(listTest, script_path, ignoreList)
617
618         tmpDir = self.get_tmp_dir()
619
620         binSalome, binPython, killSalome = self.generate_launching_commands()
621         if (
622             "run_with_grids" in self.settings
623             and sessionname in self.settings["run_with_grids"]
624         ):
625             binSalome = (
626                 binSalome + " -m %s" % self.settings["run_with_grids"][sessionname]
627             )
628
629         logWay = os.path.join(self.tmp_working_dir, "WORK", "log_cxx")
630
631         status = False
632         elapsed = -1
633         if self.currentsession.startswith("NOGUI_"):
634             # runSalome -t (bash)
635             status, elapsed = fork.batch(
636                 binSalome,
637                 self.logger,
638                 os.path.join(self.tmp_working_dir, "WORK"),
639                 ["-t", "--shutdown-server=1", script_path],
640                 delai=time_out,
641                 log=logWay,
642             )
643
644         elif self.currentsession.startswith("PY_"):
645             # python script.py
646             status, elapsed = fork.batch(
647                 binPython,
648                 self.logger,
649                 os.path.join(self.tmp_working_dir, "WORK"),
650                 [script_path],
651                 delai=time_out,
652                 log=logWay,
653             )
654
655         else:
656             opt = "-z 0"
657             if self.show_desktop:
658                 opt = "--show-desktop=0"
659             status, elapsed = fork.batch_salome(
660                 binSalome,
661                 self.logger,
662                 os.path.join(self.tmp_working_dir, "WORK"),
663                 [opt, "--shutdown-server=1", script_path],
664                 getTmpDir=tmpDir,
665                 fin=killSalome,
666                 delai=time_out,
667                 log=logWay,
668                 delaiapp=time_out_salome,
669             )
670
671         self.logger.write("status = %s, elapsed = %s\n" % (status, elapsed), 5)
672
673         # create the test result to add in the config object
674         test_info = src.pyconf.Mapping(self.config)
675         test_info.testbase = self.currentTestBase
676         test_info.grid = self.currentgrid
677         test_info.session = self.currentsession
678         test_info.script = src.pyconf.Sequence(self.config)
679
680         script_results = self.read_results(listTest, elapsed == time_out)
681         for sr in sorted(script_results.keys()):
682             self.nb_run += 1
683
684             # create script result
685             script_info = src.pyconf.Mapping(self.config)
686             script_info.name = sr
687             script_info.res = script_results[sr][0]
688             script_info.time = script_results[sr][1]
689             if script_info.res == src.TIMEOUT_STATUS:
690                 script_info.time = time_out
691             if script_info.time < 1e-3:
692                 script_info.time = 0
693
694             callback = script_results[sr][2]
695             if script_info.res != src.OK_STATUS and len(callback) > 0:
696                 script_info.callback = callback
697
698             kfres = script_results[sr][3]
699             if len(kfres) > 0:
700                 script_info.known_error = src.pyconf.Mapping(self.config)
701                 script_info.known_error.date = kfres[0]
702                 script_info.known_error.expected = kfres[1]
703                 script_info.known_error.comment = kfres[2]
704                 script_info.known_error.fixed = kfres[3]
705
706             script_info.content = script_results[sr][4]
707             script_info.out = script_results[sr][5]
708
709             # add it to the list of results
710             test_info.script.append(script_info, "")
711
712             # display the results
713             if script_info.time > 0:
714                 exectime = "(%7.3f s)" % script_info.time
715             else:
716                 exectime = ""
717
718             sp = "." * (35 - len(script_info.name))
719             self.logger.write(self.write_test_margin(3), 3)
720             self.logger.write(
721                 "script %s %s %s %s\n"
722                 % (
723                     src.printcolors.printcLabel(script_info.name),
724                     sp,
725                     src.printcolors.printc(script_info.res),
726                     exectime,
727                 ),
728                 3,
729                 False,
730             )
731             if script_info and len(callback) > 0:
732                 self.logger.write(
733                     "Exception in %s\n%s\n"
734                     % (script_info.name, src.printcolors.printcWarning(callback)),
735                     2,
736                     False,
737                 )
738
739             if script_info.res == src.OK_STATUS:
740                 self.nb_succeed += 1
741             elif script_info.res == src.KNOWNFAILURE_STATUS:
742                 self.nb_acknoledge += 1
743             elif script_info.res == src.TIMEOUT_STATUS:
744                 self.nb_timeout += 1
745             elif script_info.res == src.NA_STATUS:
746                 self.nb_run -= 1
747             elif script_info.res == "?":
748                 self.nb_not_run += 1
749
750         self.config.TESTS.append(test_info, "")
751
752     ##
753     # Runs all tests of a session.
754     def run_session_tests(self):
755
756         self.logger.write(self.write_test_margin(2), 3)
757         self.logger.write(
758             "Session = %s\n" % src.printcolors.printcLabel(self.currentsession),
759             3,
760             False,
761         )
762
763         # prepare list of tests to run
764         tests = os.listdir(
765             os.path.join(self.currentDir, self.currentgrid, self.currentsession)
766         )
767         # avoid result files of previous tests, if presents
768         # tests = filter(lambda l: l.endswith(".py"), tests)
769         tests = [
770             t
771             for t in tests
772             if t.endswith(".py")
773             and not (
774                 t.endswith(".out.py")
775                 or t.endswith(".result.py")
776                 or t.endswith("wrapperScript.py")
777             )
778         ]
779         tests = sorted(tests, key=str.lower)
780
781         # build list of known failures
782         cat = "%s/%s/" % (self.currentgrid, self.currentsession)
783         ignoreDict = {}
784         for k in self.ignore_tests.keys():
785             if k.startswith(cat):
786                 ignoreDict[k[len(cat) :]] = self.ignore_tests[k]
787
788         self.run_tests(tests, ignoreDict)
789
790     ##
791     # Runs all tests of a grid.
792     def run_grid_tests(self):
793         self.logger.write(self.write_test_margin(1), 3)
794         self.logger.write(
795             "grid = %s\n" % src.printcolors.printcLabel(self.currentgrid), 3, False
796         )
797
798         grid_path = os.path.join(self.currentDir, self.currentgrid)
799
800         sessions = []
801         if self.sessions is not None:
802             sessions = self.sessions  # user choice
803         else:
804             # use all scripts in grid
805             sessions = filter(lambda l: l not in C_IGNORE_GRIDS, os.listdir(grid_path))
806             sessions = filter(
807                 lambda l: os.path.isdir(os.path.join(grid_path, l)), sessions
808             )
809
810         sessions = sorted(sessions, key=str.lower)
811         existingSessions = self.getSubDirectories(grid_path)
812         for session_ in sessions:
813             if not os.path.exists(os.path.join(grid_path, session_)):
814                 self.logger.write(self.write_test_margin(2), 3)
815                 msg = """\
816 Session '%s' not found
817 Existing sessions are:
818 %s
819 """ % (
820                     session_,
821                     PP.pformat(sorted(existingSessions)),
822                 )
823                 self.logger.write(src.printcolors.printcWarning(msg), 3, False)
824             else:
825                 self.currentsession = session_
826                 self.run_session_tests()
827
828     def getSubDirectories(self, aDir):
829         """
830         get names of first level of sub directories in aDir
831         excluding '.git' etc as beginning with '.'
832         """
833         res = os.listdir(aDir)
834         res = [d for d in res if os.path.isdir(os.path.join(aDir, d)) and d[0] != "."]
835         # print("getSubDirectories %s are:\n%s" % (aDir, PP.pformat(res)))
836         return res
837
838     ##
839     # Runs test testbase.
840     def run_testbase_tests(self):
841         res_dir = os.path.join(self.currentDir, "RESSOURCES")
842         os.environ["PYTHONPATH"] = res_dir + os.pathsep + os.environ["PYTHONPATH"]
843         os.environ["TT_BASE_RESSOURCES"] = res_dir
844         src.printcolors.print_value(self.logger, "TT_BASE_RESSOURCES", res_dir, 4)
845         self.logger.write("\n", 4, False)
846
847         self.logger.write(self.write_test_margin(0), 3)
848         testbase_label = "Test base = %s\n" % src.printcolors.printcLabel(
849             self.currentTestBase
850         )
851         self.logger.write(testbase_label, 3, False)
852         self.logger.write("-" * len(src.printcolors.cleancolor(testbase_label)), 3)
853         self.logger.write("\n", 3, False)
854
855         # load settings
856         settings_file = os.path.join(res_dir, "test_settings.py")
857         if os.path.exists(settings_file):
858             gdic, ldic = {}, {}
859             execfile(settings_file, gdic, ldic)
860             self.logger.write("Load test settings '%s'\n" % settings_file, 5)
861             self.settings = ldic["settings_dic"]
862             self.ignore_tests = ldic["known_failures_list"]
863             if isinstance(self.ignore_tests, list):
864                 self.ignore_tests = {}
865                 self.logger.write(
866                     src.printcolors.printcWarning(
867                         "known_failures_list must be a dictionary (not a list)"
868                     )
869                     + "\n",
870                     1,
871                     False,
872                 )
873         else:
874             self.ignore_tests = {}
875             self.settings.clear()
876
877         # read known failures pyconf
878         if "testerror" in self.config.LOCAL:
879             # import testerror
880             # self.known_errors = testerror.read_test_failures(
881             #                            self.config.TOOLS.testerror.file_path,
882             #                            do_error=False)
883             pass
884         else:
885             self.known_errors = None
886
887         if self.grids is not None:
888             grids = self.grids  # given by user
889         else:
890             # select all the grids (i.e. directories) in the directory
891             grids = filter(
892                 lambda l: l not in C_IGNORE_GRIDS, os.listdir(self.currentDir)
893             )
894             grids = filter(
895                 lambda l: os.path.isdir(os.path.join(self.currentDir, l)), grids
896             )
897
898         grids = sorted(grids, key=str.lower)
899         existingGrids = self.getSubDirectories(self.currentDir)
900         for grid in grids:
901             if not os.path.exists(os.path.join(self.currentDir, grid)):
902                 self.logger.write(self.write_test_margin(1), 3)
903                 msg = """\
904 Grid '%s' does not exist
905 Existing grids are:
906 %s
907 """ % (
908                     grid,
909                     PP.pformat(sorted(existingGrids)),
910                 )
911                 self.logger.write(src.printcolors.printcWarning(msg), 3, False)
912             else:
913                 self.currentgrid = grid
914                 self.run_grid_tests()
915
916     def run_script(self, script_name):
917         if "APPLICATION" in self.config and script_name in self.config.APPLICATION:
918             script = self.config.APPLICATION[script_name]
919             if len(script) == 0:
920                 return
921
922             self.logger.write("\n", 2, False)
923             if not os.path.exists(script):
924                 self.logger.write(
925                     src.printcolors.printcWarning(
926                         "WARNING: scrip" "t not found: %s" % script
927                     )
928                     + "\n",
929                     2,
930                 )
931             else:
932                 self.logger.write(
933                     src.printcolors.printcHeader(
934                         "----------- sta" "rt %s" % script_name
935                     )
936                     + "\n",
937                     2,
938                 )
939                 self.logger.write("Run script: %s\n" % script, 2)
940                 subprocess.Popen(script, shell=True).wait()
941                 self.logger.write(
942                     src.printcolors.printcHeader("----------- end" " %s" % script_name)
943                     + "\n",
944                     2,
945                 )
946
947     def run_all_tests(self):
948         initTime = datetime.datetime.now()
949
950         self.run_script("test_setup")
951         self.logger.write("\n", 2, False)
952
953         self.logger.write(
954             src.printcolors.printcHeader(_("=== STARTING TESTS")) + "\n", 2
955         )
956         self.logger.write("\n", 2, False)
957         self.currentDir = os.path.join(
958             self.tmp_working_dir, "BASES", self.currentTestBase
959         )
960         self.run_testbase_tests()
961
962         # calculate total execution time
963         totalTime = datetime.datetime.now() - initTime
964         totalTime -= datetime.timedelta(microseconds=totalTime.microseconds)
965         self.logger.write("\n", 2, False)
966         self.logger.write(src.printcolors.printcHeader(_("=== END TESTS")), 2)
967         self.logger.write(
968             " %s\n" % src.printcolors.printcInfo(str(totalTime)), 2, False
969         )
970
971         #
972         # Start the tests
973         #
974         self.run_script("test_cleanup")
975         self.logger.write("\n", 2, False)
976
977         # evaluate results
978
979         res_out = _("Tests Results: %(succeed)d / %(total)d\n") % {
980             "succeed": self.nb_succeed,
981             "total": self.nb_run,
982         }
983         if self.nb_succeed == self.nb_run:
984             res_out = src.printcolors.printcSuccess(res_out)
985         else:
986             res_out = src.printcolors.printcError(res_out)
987         self.logger.write(res_out, 1)
988
989         if self.nb_timeout > 0:
990             self.logger.write(_("%d tests TIMEOUT\n") % self.nb_timeout, 1)
991         if self.nb_not_run > 0:
992             self.logger.write(_("%d tests not executed\n") % self.nb_not_run, 1)
993         if self.nb_acknoledge > 0:
994             self.logger.write(_("%d tests known failures\n") % self.nb_acknoledge, 1)
995
996         status = src.OK_STATUS
997         if self.nb_run - self.nb_succeed - self.nb_acknoledge > 0:
998             status = src.KO_STATUS
999         elif self.nb_acknoledge:
1000             status = src.KNOWNFAILURE_STATUS
1001
1002         self.logger.write(_("Status: %s\n" % status), 3)
1003
1004         return self.nb_run - self.nb_succeed - self.nb_acknoledge
1005
1006     ##
1007     # Write margin to show test results.
1008     def write_test_margin(self, tab):
1009         if tab == 0:
1010             return ""
1011         return "|   " * (tab - 1) + "+ "