3 # Copyright (C) 2010-2013 CEA/DEN
5 # This library is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation; either
8 # version 2.1 of the License.
10 # This library is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 # Lesser General Public License for more details.
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this library; if not, write to the Free Software
17 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 # Python 2/3 compatibility for execfile function
23 def execfile(somefile, global_vars, local_vars):
24 with open(somefile) as f:
25 code = compile(f.read(), somefile, 'exec')
26 exec(code, global_vars, local_vars)
38 verbose = False # cvw TODO
43 # directories not considered as test grids
44 C_IGNORE_GRIDS = ['.git', '.svn', 'RESSOURCES']
48 # Get directory to be used for the temporary files.
50 def getTmpDirDEFAULT():
51 if src.architecture.is_windows():
52 directory = os.getenv("TEMP")
54 # for Linux: use /tmp/logs/{user} folder
55 directory = os.path.join( '/tmp', 'logs', os.getenv("USER", "unknown"))
71 self.tmp_working_dir = tmp_working_dir
72 self.sessions = sessions
73 self.launcher = launcher
74 self.show_desktop = show_desktop
76 res = self.prepare_testbase(testbase)
77 self.test_base_found = True
80 self.test_base_found = False
83 self.known_errors = None
85 # create section for results
86 self.config.TESTS = src.pyconf.Sequence(self.config)
92 self.nb_acknoledge = 0
94 def _copy_dir(self, source, target):
95 if self.config.VARS.python >= "2.6":
96 shutil.copytree(source, target,
98 ignore=shutil.ignore_patterns('.git*','.svn*'))
100 shutil.copytree(source, target,
103 def prepare_testbase_from_dir(self, testbase_name, testbase_dir):
104 self.logger.write(_("get test base from dir: %s\n") % \
105 src.printcolors.printcLabel(testbase_dir), 3)
106 if not os.access(testbase_dir, os.X_OK):
107 raise src.SatException(_("testbase %(name)s (%(dir)s) does not "
108 "exist ...\n") % { 'name': testbase_name,
109 'dir': testbase_dir })
111 self._copy_dir(testbase_dir,
112 os.path.join(self.tmp_working_dir, 'BASES', testbase_name))
114 def prepare_testbase_from_git(self,
119 _("get test base '%(testbase)s' with '%(tag)s' tag from git\n") % {
120 "testbase" : src.printcolors.printcLabel(testbase_name),
121 "tag" : src.printcolors.printcLabel(testbase_tag)},
124 def set_signal(): # pragma: no cover
125 """see http://bugs.python.org/issue1652"""
127 signal.signal(signal.SIGPIPE, signal.SIG_DFL)
129 cmd = "git clone --depth 1 %(base)s %(dir)s"
130 cmd += " && cd %(dir)s"
131 if testbase_tag=='master':
132 cmd += " && git fetch origin %(branch)s"
134 cmd += " && git fetch origin %(branch)s:%(branch)s"
135 cmd += " && git checkout %(branch)s"
136 cmd = cmd % { 'branch': testbase_tag,
137 'base': testbase_base,
138 'dir': testbase_name }
140 self.logger.write("> %s\n" % cmd, 5)
141 if src.architecture.is_windows():
142 # preexec_fn not supported on windows platform
143 res = subprocess.call(cmd,
144 cwd=os.path.join(self.tmp_working_dir, 'BASES'),
146 stdout=self.logger.logTxtFile,
147 stderr=subprocess.PIPE)
149 res = subprocess.call(cmd,
150 cwd=os.path.join(self.tmp_working_dir, 'BASES'),
152 preexec_fn=set_signal,
153 stdout=self.logger.logTxtFile,
154 stderr=subprocess.PIPE)
156 raise src.SatException(_("Error: unable to get test base "
157 "'%(name)s' from git '%(repo)s'.") % \
158 { 'name': testbase_name,
159 'repo': testbase_base })
162 self.logger.error(_("git is not installed. exiting...\n"))
165 def prepare_testbase_from_svn(self, user, testbase_name, testbase_base):
166 self.logger.write(_("get test base '%s' from svn\n") % \
167 src.printcolors.printcLabel(testbase_name), 3)
169 def set_signal(): # pragma: no cover
170 """see http://bugs.python.org/issue1652"""
172 signal.signal(signal.SIGPIPE, signal.SIG_DFL)
174 cmd = "svn checkout --username %(user)s %(base)s %(dir)s"
175 cmd = cmd % { 'user': user,
176 'base': testbase_base,
177 'dir': testbase_name }
179 # Get the application environment
180 self.logger.write(_("Set the application environment\n"), 5)
181 env_appli = src.environment.SalomeEnviron(self.config,
182 src.environment.Environ(dict(os.environ)))
183 env_appli.set_application_env(self.logger)
185 self.logger.write("> %s\n" % cmd, 5)
186 if src.architecture.is_windows():
187 # preexec_fn not supported on windows platform
188 res = subprocess.call(cmd,
189 cwd=os.path.join(self.tmp_working_dir, 'BASES'),
191 stdout=self.logger.logTxtFile,
192 stderr=subprocess.PIPE)
194 res = subprocess.call(cmd,
195 cwd=os.path.join(self.tmp_working_dir, 'BASES'),
197 preexec_fn=set_signal,
198 stdout=self.logger.logTxtFile,
199 stderr=subprocess.PIPE,
200 env=env_appli.environ.environ,)
203 raise src.SatException(_("Error: unable to get test base '%(nam"
204 "e)s' from svn '%(repo)s'.") % \
205 { 'name': testbase_name,
206 'repo': testbase_base })
209 self.logger.error(_("svn is not installed. exiting...\n"))
213 # Configure tests base.
214 def prepare_testbase(self, test_base_name):
215 src.printcolors.print_value(self.logger,
219 self.logger.write("\n", 3, False)
221 # search for the test base
222 test_base_info = None
223 for project_name in self.config.PROJECTS.projects:
224 project_info = self.config.PROJECTS.projects[project_name]
225 if "test_bases" not in project_info:
227 for t_b_info in project_info.test_bases:
228 if t_b_info.name == test_base_name:
229 test_base_info = t_b_info
231 if not test_base_info:
232 if os.path.exists(test_base_name):
233 self.prepare_testbase_from_dir("DIR", test_base_name)
234 self.currentTestBase = "DIR"
237 if not test_base_info:
238 message = (_("########## ERROR: test base '%s' not found\n") %
240 self.logger.write("%s\n" % src.printcolors.printcError(message))
243 if test_base_info.get_sources == "dir":
244 self.prepare_testbase_from_dir(test_base_name,
245 test_base_info.info.dir)
246 elif test_base_info.get_sources == "git":
247 self.prepare_testbase_from_git(test_base_name,
248 test_base_info.info.base,
249 self.config.APPLICATION.test_base.tag)
250 elif test_base_info.get_sources == "svn":
251 svn_user = src.get_cfg_param(test_base_info.info,
253 self.config.USER.svn_user)
254 self.prepare_testbase_from_svn(svn_user,
256 test_base_info.info.base)
258 raise src.SatException(_("unknown source type '%(type)s' for test b"
259 "ase '%(base)s' ...\n") % {
260 'type': test_base_info.get_sources,
261 'base': test_base_name })
263 self.currentTestBase = test_base_name
266 # Searches if the script is declared in known errors pyconf.
267 # Update the status if needed.
268 def search_known_errors(self, status, test_grid, test_session, test):
269 test_path = os.path.join(test_grid, test_session, test)
270 if not src.config_has_application(self.config):
273 if self.known_errors is None:
276 platform = self.config.VARS.arch
277 application = self.config.VARS.application
278 error = self.known_errors.get_error(test_path, application, platform)
282 if status == src.OK_STATUS:
285 self.known_errors.fix_error(error)
287 #testerror.write_test_failures(
288 # self.config.TOOLS.testerror.file_path,
289 # self.known_errors.errors)
290 return status, [ error.date,
296 self.known_errors.unfix_error(error)
298 #testerror.write_test_failures(self.config.TOOLS.testerror.file_path,
299 # self.known_errors.errors)
301 delta = self.known_errors.get_expecting_days(error)
302 kfres = [ error.date, error.expected, error.comment, error.fixed ]
304 return src.KO_STATUS, kfres
305 return src.KNOWNFAILURE_STATUS, kfres
308 # Read the *.result.py files.
309 def read_results(self, listTest, has_timed_out):
311 for test in listTest:
312 resfile = os.path.join(self.currentDir,
315 test[:-3] + ".result.py")
317 # check if <test>.result.py file exists
318 if not os.path.exists(resfile):
319 results[test] = ["?", -1, "", []]
323 print("test script: '%s':\n'%s'\n" % (resfile, open(resfile, 'r').read()))
325 execfile(resfile, gdic, ldic)
327 status = src.TIMEOUT_STATUS
328 if not has_timed_out:
329 status = src.KO_STATUS
331 if ldic.has_key('status'):
332 status = ldic['status']
335 if status == src.KO_STATUS or status == src.OK_STATUS:
336 status, expected = self.search_known_errors(status,
342 if ldic.has_key('callback'):
343 callback = ldic['callback']
344 elif status == src.KO_STATUS:
347 print("--- CRASH ldic\n%s" % PP.pformat(ldic)) # cvw TODO
348 print("--- CRASH gdic\n%s" % PP.pformat(gdic))
352 if ldic.has_key('time'):
354 exec_time = float(ldic['time'])
358 results[test] = [status, exec_time, callback, expected]
360 # check if <test>.py file exists
361 testfile = os.path.join(self.currentDir,
366 if not os.path.exists(testfile):
367 results[test].append('')
369 text = open(testfile, "r").read()
370 results[test].append(text)
372 # check if <test>.out.py file exists
373 outfile = os.path.join(self.currentDir,
376 test[:-3] + ".out.py")
378 if not os.path.exists(outfile):
379 results[test].append('')
381 text = open(outfile, "r").read()
382 results[test].append(text)
387 # Generates the script to be run by Salome.
388 # This python script includes init and close statements and a loop
389 # calling all the scripts of a single directory.
390 def generate_script(self, listTest, script_path, ignoreList):
392 template_file = open(os.path.join(self.config.VARS.srcDir,
394 "scriptTemplate.py"), 'r')
395 template = string.Template(template_file.read())
397 # create substitution dictionary
399 d['resourcesWay'] = os.path.join(self.currentDir, 'RESSOURCES')
400 d['tmpDir'] = os.path.join(self.tmp_working_dir, 'WORK')
401 d['toolsWay'] = os.path.join(self.config.VARS.srcDir, "test")
402 d['sessionDir'] = os.path.join(self.currentDir,
405 d['resultFile'] = os.path.join(self.tmp_working_dir,
408 d['listTest'] = listTest
409 d['sessionName'] = self.currentsession
410 d['ignore'] = ignoreList
412 # create script with template
413 contents = template.safe_substitute(d)
414 if verbose: print("generate_script '%s':\n%s" % (script_path, contents)) # cvw TODO
415 with open(script_path, 'w') as f:
419 # Find the getTmpDir function that gives access to *pidict file directory.
420 # (the *pidict file exists when SALOME is launched)
421 def get_tmp_dir(self):
422 # Rare case where there is no KERNEL in grid list
423 # (for example MED_STANDALONE)
424 if ('APPLICATION' in self.config
425 and 'KERNEL' not in self.config.APPLICATION.products
426 and 'KERNEL_ROOT_DIR' not in os.environ):
427 return getTmpDirDEFAULT
429 # Case where "sat test" is launched in an existing SALOME environment
430 if 'KERNEL_ROOT_DIR' in os.environ:
431 root_dir = os.environ['KERNEL_ROOT_DIR']
433 if ('APPLICATION' in self.config
434 and 'KERNEL' in self.config.APPLICATION.products):
435 root_dir = src.product.get_product_config(self.config,
436 "KERNEL").install_dir
438 # Case where there the appli option is called (with path to launcher)
439 if len(self.launcher) > 0:
440 # There are two cases : The old application (runAppli)
442 launcherName = os.path.basename(self.launcher)
443 launcherDir = os.path.dirname(self.launcher)
444 if launcherName == 'runAppli':
446 cmd = ("for i in " + launcherDir + "/env.d/*.sh; do source ${i};"
447 " done ; echo $KERNEL_ROOT_DIR")
450 cmd = ("echo -e 'import os\nprint os.environ[\"KERNEL_" +
451 "ROOT_DIR\"]' > tmpscript.py; %s shell" +
452 " tmpscript.py") % self.launcher
454 # OP 14/11/2017 Ajout de traces pour essayer de decouvrir le pb
455 # de remontee de log des tests
456 #root_dir = subprocess.Popen(cmd,
457 # stdout=subprocess.PIPE,
459 # executable='/bin/bash').communicate()[0].split()[-1]
460 # OP Add Windows case
461 if src.architecture.is_windows():
462 subproc_res = subprocess.Popen(cmd,
463 stdout=subprocess.PIPE,
464 shell=True).communicate()
467 subproc_res = subprocess.Popen(cmd,
468 stdout=subprocess.PIPE,
470 executable='/bin/bash').communicate()
472 #print "TRACES OP - test_module.py/Test.get_tmp_dir() subproc_res = "
473 #for resLine in subproc_res:
474 # print "- '#%s#'" %resLine
476 root_dir = subproc_res[0].split()[-1]
478 # OP 14/11/2017 Ajout de traces pour essayer de decouvrir le pb
479 # de remontee de log des tests
480 #print "TRACES OP - test_module.py/Test.get_tmp_dir() root_dir = '#%s#'" %root_dir
482 # import grid salome_utils from KERNEL that gives
483 # the right getTmpDir function
484 (file_, pathname, description) = imp.find_module("salome_utils",
485 [os.path.join(root_dir,
489 grid = imp.load_module("salome_utils",
493 return grid.getLogDir
495 grid = imp.load_module("salome_utils",
499 return grid.getTmpDir
505 def get_test_timeout(self, test_name, default_value):
506 if ("timeout" in self.settings and
507 test_name in self.settings["timeout"]):
508 return self.settings["timeout"][test_name]
512 def generate_launching_commands(self):
513 # Case where "sat test" is launched in an existing SALOME environment
514 if 'KERNEL_ROOT_DIR' in os.environ:
515 binSalome = "runSalome"
517 killSalome = "killSalome.py"
519 # Rare case where there is no KERNEL in grid list
520 # (for example MED_STANDALONE)
521 if ('APPLICATION' in self.config and
522 'KERNEL' not in self.config.APPLICATION.products):
523 binSalome = "runSalome"
525 killSalome = "killSalome.py"
526 src.environment.load_environment(self.config, False, self.logger)
527 return binSalome, binPython, killSalome
529 # Case where there the appli option is called (with path to launcher)
530 if len(self.launcher) > 0:
531 # There are two cases : The old application (runAppli)
533 launcherName = os.path.basename(self.launcher)
534 launcherDir = os.path.dirname(self.launcher)
535 if launcherName == 'runAppli':
537 binSalome = self.launcher
538 binPython = ("for i in " +
540 "/env.d/*.sh; do source ${i}; done ; python")
541 killSalome = ("for i in " +
543 "/env.d/*.sh; do source ${i}; done ; killSalome.py'")
544 return binSalome, binPython, killSalome
547 binSalome = self.launcher
548 binPython = self.launcher + ' shell'
549 killSalome = self.launcher + ' killall'
550 return binSalome, binPython, killSalome
552 # SALOME version detection and APPLI repository detection
553 VersionSalome = src.get_salome_version(self.config)
555 if "APPLI" in self.config and "application_name" in self.config.APPLI:
556 appdir = self.config.APPLI.application_name
558 # Case where SALOME has NOT the launcher that uses the SalomeContext API
559 if VersionSalome < 730:
560 binSalome = os.path.join(self.config.APPLICATION.workdir,
564 killSalome = "killSalome.py"
565 src.environment.load_environment(self.config, False, self.logger)
566 return binSalome, binPython, killSalome
568 # Case where SALOME has the launcher that uses the SalomeContext API
570 launcher_name = src.get_launcher_name(self.config)
571 binSalome = os.path.join(self.config.APPLICATION.workdir,
574 binPython = binSalome + ' shell'
575 killSalome = binSalome + ' killall'
576 return binSalome, binPython, killSalome
578 return binSalome, binPython, killSalome
582 # Runs tests of a session (using a single instance of Salome).
583 def run_tests(self, listTest, ignoreList):
584 out_path = os.path.join(self.currentDir,
587 if verbose: print("run_tests '%s'\nlistTest: %s\nignoreList: %s" %
588 (self.currentDir, PP.pformat(listTest), PP.pformat(ignoreList))) # cvw TODO
589 sessionname = "%s/%s" % (self.currentgrid, self.currentsession)
590 time_out = self.get_test_timeout(sessionname,
593 time_out_salome = DEFAULT_TIMEOUT
595 # generate wrapper script
596 script_path = os.path.join(out_path, 'wrapperScript.py')
597 self.generate_script(listTest, script_path, ignoreList)
599 tmpDir = self.get_tmp_dir()
601 binSalome, binPython, killSalome = self.generate_launching_commands()
602 if self.settings.has_key("run_with_grids") and \
603 self.settings["run_with_grids"].has_key(sessionname):
604 binSalome = (binSalome + " -m %s" % self.settings["run_with_grids"][sessionname])
606 logWay = os.path.join(self.tmp_working_dir, "WORK", "log_cxx")
610 if self.currentsession.startswith("NOGUI_"):
611 # runSalome -t (bash)
612 status, elapsed = fork.batch(
615 os.path.join(self.tmp_working_dir, "WORK"),
616 [ "-t", "--shutdown-server=1", script_path ],
620 elif self.currentsession.startswith("PY_"):
622 status, elapsed = fork.batch(
625 os.path.join(self.tmp_working_dir, "WORK"),
632 if self.show_desktop: opt = "--show-desktop=0"
633 status, elapsed = fork.batch_salome(
636 os.path.join( self.tmp_working_dir, "WORK"),
637 [ opt, "--shutdown-server=1", script_path ],
642 delaiapp=time_out_salome)
644 self.logger.write("status = %s, elapsed = %s\n" % (status, elapsed),
647 # create the test result to add in the config object
648 test_info = src.pyconf.Mapping(self.config)
649 test_info.testbase = self.currentTestBase
650 test_info.grid = self.currentgrid
651 test_info.session = self.currentsession
652 test_info.script = src.pyconf.Sequence(self.config)
654 script_results = self.read_results(listTest, elapsed == time_out)
655 for sr in sorted(script_results.keys()):
658 # create script result
659 script_info = src.pyconf.Mapping(self.config)
660 script_info.name = sr
661 script_info.res = script_results[sr][0]
662 script_info.time = script_results[sr][1]
663 if script_info.res == src.TIMEOUT_STATUS:
664 script_info.time = time_out
665 if script_info.time < 1e-3: script_info.time = 0
667 callback = script_results[sr][2]
668 if script_info.res != src.OK_STATUS and len(callback) > 0:
669 script_info.callback = callback
671 kfres = script_results[sr][3]
673 script_info.known_error = src.pyconf.Mapping(self.config)
674 script_info.known_error.date = kfres[0]
675 script_info.known_error.expected = kfres[1]
676 script_info.known_error.comment = kfres[2]
677 script_info.known_error.fixed = kfres[3]
679 script_info.content = script_results[sr][4]
680 script_info.out = script_results[sr][5]
682 # add it to the list of results
683 test_info.script.append(script_info, '')
685 # display the results
686 if script_info.time > 0:
687 exectime = "(%7.3f s)" % script_info.time
691 sp = "." * (35 - len(script_info.name))
692 self.logger.write(self.write_test_margin(3), 3)
693 self.logger.write("script %s %s %s %s\n" % (
694 src.printcolors.printcLabel(script_info.name),
696 src.printcolors.printc(script_info.res),
698 if script_info and len(callback) > 0:
699 self.logger.write("Exception in %s\n%s\n" % \
701 src.printcolors.printcWarning(callback)), 2, False)
703 if script_info.res == src.OK_STATUS:
705 elif script_info.res == src.KNOWNFAILURE_STATUS:
706 self.nb_acknoledge += 1
707 elif script_info.res == src.TIMEOUT_STATUS:
709 elif script_info.res == src.NA_STATUS:
711 elif script_info.res == "?":
715 self.config.TESTS.append(test_info, '')
718 # Runs all tests of a session.
719 def run_session_tests(self):
721 self.logger.write(self.write_test_margin(2), 3)
722 self.logger.write("Session = %s\n" % src.printcolors.printcLabel(
723 self.currentsession), 3, False)
725 # prepare list of tests to run
726 tests = os.listdir(os.path.join(self.currentDir,
728 self.currentsession))
729 tests = filter(lambda l: l.endswith(".py"), tests)
730 tests = sorted(tests, key=str.lower)
732 # build list of known failures
733 cat = "%s/%s/" % (self.currentgrid, self.currentsession)
735 for k in self.ignore_tests.keys():
736 if k.startswith(cat):
737 ignoreDict[k[len(cat):]] = self.ignore_tests[k]
739 self.run_tests(tests, ignoreDict)
742 # Runs all tests of a grid.
743 def run_grid_tests(self):
744 self.logger.write(self.write_test_margin(1), 3)
745 self.logger.write("grid = %s\n" % src.printcolors.printcLabel(
746 self.currentgrid), 3, False)
748 grid_path = os.path.join(self.currentDir, self.currentgrid)
751 if self.sessions is not None:
752 sessions = self.sessions # user choice
754 # use all scripts in grid
755 sessions = filter(lambda l: l not in C_IGNORE_GRIDS,
756 os.listdir(grid_path))
757 sessions = filter(lambda l: os.path.isdir(os.path.join(grid_path,
760 sessions = sorted(sessions, key=str.lower)
761 existingSessions = self.getSubDirectories(grid_path)
762 for session_ in sessions:
763 if not os.path.exists(os.path.join(grid_path, session_)):
764 self.logger.write(self.write_test_margin(2), 3)
766 Session '%s' not found
767 Existing sessions are:
769 """ % (session_, PP.pformat(sorted(existingSessions)))
770 self.logger.write(src.printcolors.printcWarning(msg), 3, False)
772 self.currentsession = session_
773 self.run_session_tests()
775 def getSubDirectories(self, aDir):
777 get names of first level of sub directories in aDir
778 excluding '.git' etc as beginning with '.'
780 res = os.listdir(aDir)
781 res = [d for d in res if os.path.isdir(os.path.join(aDir, d)) and d[0] != '.']
782 # print("getSubDirectories %s are:\n%s" % (aDir, PP.pformat(res)))
786 # Runs test testbase.
787 def run_testbase_tests(self):
788 res_dir = os.path.join(self.currentDir, "RESSOURCES")
789 os.environ['PYTHONPATH'] = (res_dir +
791 os.environ['PYTHONPATH'])
792 os.environ['TT_BASE_RESSOURCES'] = res_dir
793 src.printcolors.print_value(self.logger,
794 "TT_BASE_RESSOURCES",
797 self.logger.write("\n", 4, False)
799 self.logger.write(self.write_test_margin(0), 3)
800 testbase_label = "Test base = %s\n" % src.printcolors.printcLabel(
801 self.currentTestBase)
802 self.logger.write(testbase_label, 3, False)
803 self.logger.write("-" * len(src.printcolors.cleancolor(testbase_label)),
805 self.logger.write("\n", 3, False)
808 settings_file = os.path.join(res_dir, "test_settings.py")
809 if os.path.exists(settings_file):
811 execfile(settings_file, gdic, ldic)
812 self.logger.write("Load test settings '%s'\n" % settings_file, 5)
813 self.settings = ldic['settings_dic']
814 self.ignore_tests = ldic['known_failures_list']
815 if isinstance(self.ignore_tests, list):
816 self.ignore_tests = {}
817 self.logger.write(src.printcolors.printcWarning(
818 "known_failures_list must be a dictionary (not a list)") + "\n", 1, False)
820 self.ignore_tests = {}
821 self.settings.clear()
823 # read known failures pyconf
824 if "testerror" in self.config.LOCAL:
826 #self.known_errors = testerror.read_test_failures(
827 # self.config.TOOLS.testerror.file_path,
831 self.known_errors = None
833 if self.grids is not None:
834 grids = self.grids # given by user
836 # select all the grids (i.e. directories) in the directory
837 grids = filter(lambda l: l not in C_IGNORE_GRIDS,
838 os.listdir(self.currentDir))
839 grids = filter(lambda l: os.path.isdir(
840 os.path.join(self.currentDir, l)),
843 grids = sorted(grids, key=str.lower)
844 existingGrids = self.getSubDirectories(self.currentDir)
846 if not os.path.exists(os.path.join(self.currentDir, grid)):
847 self.logger.write(self.write_test_margin(1), 3)
849 Grid '%s' does not exist
852 """ % (grid, PP.pformat(sorted(existingGrids)))
853 self.logger.write(src.printcolors.printcWarning(msg), 3, False)
855 self.currentgrid = grid
856 self.run_grid_tests()
858 def run_script(self, script_name):
859 if ('APPLICATION' in self.config and
860 script_name in self.config.APPLICATION):
861 script = self.config.APPLICATION[script_name]
865 self.logger.write("\n", 2, False)
866 if not os.path.exists(script):
867 self.logger.write(src.printcolors.printcWarning("WARNING: scrip"
868 "t not found: %s" % script) + "\n", 2)
870 self.logger.write(src.printcolors.printcHeader("----------- sta"
871 "rt %s" % script_name) + "\n", 2)
872 self.logger.write("Run script: %s\n" % script, 2)
873 subprocess.Popen(script, shell=True).wait()
874 self.logger.write(src.printcolors.printcHeader("----------- end"
875 " %s" % script_name) + "\n", 2)
877 def run_all_tests(self):
878 initTime = datetime.datetime.now()
880 self.run_script('test_setup')
881 self.logger.write("\n", 2, False)
883 self.logger.write(src.printcolors.printcHeader(
884 _("=== STARTING TESTS")) + "\n", 2)
885 self.logger.write("\n", 2, False)
886 self.currentDir = os.path.join(self.tmp_working_dir,
888 self.currentTestBase)
889 self.run_testbase_tests()
891 # calculate total execution time
892 totalTime = datetime.datetime.now() - initTime
893 totalTime -= datetime.timedelta(microseconds=totalTime.microseconds)
894 self.logger.write("\n", 2, False)
895 self.logger.write(src.printcolors.printcHeader(_("=== END TESTS")), 2)
896 self.logger.write(" %s\n" % src.printcolors.printcInfo(str(totalTime)),
903 self.run_script('test_cleanup')
904 self.logger.write("\n", 2, False)
908 res_out = _("Tests Results: %(succeed)d / %(total)d\n") % \
909 { 'succeed': self.nb_succeed, 'total': self.nb_run }
910 if self.nb_succeed == self.nb_run:
911 res_out = src.printcolors.printcSuccess(res_out)
913 res_out = src.printcolors.printcError(res_out)
914 self.logger.write(res_out, 1)
916 if self.nb_timeout > 0:
917 self.logger.write(_("%d tests TIMEOUT\n") % self.nb_timeout, 1)
918 if self.nb_not_run > 0:
919 self.logger.write(_("%d tests not executed\n") % self.nb_not_run, 1)
920 if self.nb_acknoledge > 0:
921 self.logger.write(_("%d tests known failures\n") % self.nb_acknoledge, 1)
923 status = src.OK_STATUS
924 if self.nb_run - self.nb_succeed - self.nb_acknoledge > 0:
925 status = src.KO_STATUS
926 elif self.nb_acknoledge:
927 status = src.KNOWNFAILURE_STATUS
929 self.logger.write(_("Status: %s\n" % status), 3)
931 return self.nb_run - self.nb_succeed - self.nb_acknoledge
934 # Write margin to show test results.
935 def write_test_margin(self, tab):
938 return "| " * (tab - 1) + "+ "