code = compile(f.read(), somefile, 'exec')
exec(code, global_vars, local_vars)
-
import os
import sys
import datetime
import string
import imp
import subprocess
+import glob
+import pprint as PP
+
+verbose = False
from . import fork
import src
# directories not considered as test grids
C_IGNORE_GRIDS = ['.git', '.svn', 'RESSOURCES']
+DEFAULT_TIMEOUT = 150
+
# Get directory to be used for the temporary files.
#
def getTmpDirDEFAULT():
cmd = cmd % { 'user': user,
'base': testbase_base,
'dir': testbase_name }
-
+
+ # Get the application environment
+ self.logger.write(_("Set the application environment\n"), 5)
+ env_appli = src.environment.SalomeEnviron(self.config,
+ src.environment.Environ(dict(os.environ)))
+ env_appli.set_application_env(self.logger)
+
self.logger.write("> %s\n" % cmd, 5)
if src.architecture.is_windows():
# preexec_fn not supported on windows platform
shell=True,
preexec_fn=set_signal,
stdout=self.logger.logTxtFile,
- stderr=subprocess.PIPE)
+ stderr=subprocess.PIPE,
+ env=env_appli.environ.environ,)
if res != 0:
raise src.SatException(_("Error: unable to get test base '%(nam"
test_base_info.info.base,
self.config.APPLICATION.test_base.tag)
elif test_base_info.get_sources == "svn":
- svn_user = src.get_cfg_param(test_base_info.svn_info,
+ svn_user = src.get_cfg_param(test_base_info.info,
"svn_user",
self.config.USER.svn_user)
self.prepare_testbase_from_svn(svn_user,
results[test] = ["?", -1, "", []]
else:
gdic, ldic = {}, {}
- execfile(resfile, gdic, ldic)
-
- status = src.TIMEOUT_STATUS
- if not has_timed_out:
- status = src.KO_STATUS
-
- if ldic.has_key('status'):
- status = ldic['status']
-
- expected = []
- if status == src.KO_STATUS or status == src.OK_STATUS:
- status, expected = self.search_known_errors(status,
- self.currentgrid,
- self.currentsession,
- test)
-
- callback = ""
- if ldic.has_key('callback'):
- callback = ldic['callback']
- elif status == src.KO_STATUS:
- callback = "CRASH"
-
- exec_time = -1
- if ldic.has_key('time'):
- try:
- exec_time = float(ldic['time'])
- except:
+ if verbose:
+ print("test script: '%s':\n'%s'\n" % (resfile, open(resfile, 'r').read()))
+
+ try:
+ execfile(resfile, gdic, ldic)
+
+ status = src.TIMEOUT_STATUS
+ if not has_timed_out:
+ status = src.KO_STATUS
+
+ if 'status' in ldic:
+ status = ldic['status']
+
+ expected = []
+ if status == src.KO_STATUS or status == src.OK_STATUS:
+ status, expected = self.search_known_errors(status,
+ self.currentgrid,
+ self.currentsession,
+ test)
+
+ callback = ""
+ if 'callback' in ldic:
+ callback = ldic['callback']
+ elif status == src.KO_STATUS:
+ callback = "CRASH"
+ if verbose:
+ print("--- CRASH ldic\n%s" % PP.pformat(ldic)) # cvw TODO
+ print("--- CRASH gdic\n%s" % PP.pformat(gdic))
pass
- results[test] = [status, exec_time, callback, expected]
+ exec_time = -1
+ if 'time' in ldic:
+ try:
+ exec_time = float(ldic['time'])
+ except:
+ pass
+
+ results[test] = [status, exec_time, callback, expected]
+
+ except:
+ results[test] = ["?", -1, "", []]
+ # results[test] = [src.O_STATUS, -1, open(resfile, 'r').read(), []]
# check if <test>.py file exists
testfile = os.path.join(self.currentDir,
# calling all the scripts of a single directory.
def generate_script(self, listTest, script_path, ignoreList):
# open template file
- template_file = open(os.path.join(self.config.VARS.srcDir,
- "test",
- "scriptTemplate.py"), 'r')
- template = string.Template(template_file.read())
+ tFile = os.path.join(self.config.VARS.srcDir, "test", "scriptTemplate.py")
+ with open(tFile, 'r') as f:
+ template = string.Template(f.read())
# create substitution dictionary
d = dict()
d['resourcesWay'] = os.path.join(self.currentDir, 'RESSOURCES')
d['tmpDir'] = os.path.join(self.tmp_working_dir, 'WORK')
d['toolsWay'] = os.path.join(self.config.VARS.srcDir, "test")
- d['sessionDir'] = os.path.join(self.currentDir,
- self.currentgrid,
- self.currentsession)
- d['resultFile'] = os.path.join(self.tmp_working_dir,
- 'WORK',
- 'exec_result')
+ d['sessionDir'] = os.path.join(self.currentDir, self.currentgrid, self.currentsession)
+ d['resultFile'] = os.path.join(self.tmp_working_dir, 'WORK', 'exec_result')
d['listTest'] = listTest
d['sessionName'] = self.currentsession
d['ignore'] = ignoreList
# create script with template
- script = open(script_path, 'w')
- script.write(template.safe_substitute(d))
- script.close()
+ contents = template.safe_substitute(d)
+ if verbose: print("generate_script '%s':\n%s" % (script_path, contents)) # cvw TODO
+ with open(script_path, 'w') as f:
+ f.write(contents)
- # Find the getTmpDir function that gives access to *pidict file directory.
- # (the *pidict file exists when SALOME is launched)
+
+ # Find the getTmpDir function that gives access to *_pidict file directory.
+ # (the *_pidict file exists when SALOME is launched)
def get_tmp_dir(self):
# Rare case where there is no KERNEL in grid list
# (for example MED_STANDALONE)
if 'KERNEL_ROOT_DIR' in os.environ:
root_dir = os.environ['KERNEL_ROOT_DIR']
- if ('APPLICATION' in self.config
- and 'KERNEL' in self.config.APPLICATION.products):
- root_dir = src.product.get_product_config(self.config,
- "KERNEL").install_dir
+ if ('APPLICATION' in self.config and
+ 'KERNEL' in self.config.APPLICATION.products):
+ root_dir = src.product.get_product_config(self.config, "KERNEL").install_dir
# Case where there the appli option is called (with path to launcher)
if len(self.launcher) > 0:
launcherDir = os.path.dirname(self.launcher)
if launcherName == 'runAppli':
# Old application
- cmd = "for i in " + launcherDir + "/env.d/*.sh; do source ${i};"
- " done ; echo $KERNEL_ROOT_DIR"
+ cmd = """
+for i in %s/env.d/*.sh;
+ do source ${i};
+done
+echo $KERNEL_ROOT_DIR
+""" % launcherDir
else:
# New application
- cmd = ("echo -e 'import os\nprint os.environ[\"KERNEL_" +
- "ROOT_DIR\"]' > tmpscript.py; %s shell" +
- " tmpscript.py") % self.launcher
- root_dir = subprocess.Popen(cmd,
+ cmd = """
+echo -e 'import os\nprint(os.environ[\"KERNEL_ROOT_DIR\"])' > tmpscript.py
+%s shell tmpscript.py
+""" % self.launcher
+
+ if src.architecture.is_windows():
+ subproc_res = subprocess.Popen(cmd,
+ stdout=subprocess.PIPE,
+ shell=True).communicate()
+ pass
+ else:
+ subproc_res = subprocess.Popen(cmd,
stdout=subprocess.PIPE,
shell=True,
- executable='/bin/bash').communicate()[0].split()[-1]
+ executable='/bin/bash').communicate()
+ pass
+
+ root_dir = subproc_res[0].split()[-1]
# import grid salome_utils from KERNEL that gives
# the right getTmpDir function
-
- (file_, pathname, description) = imp.find_module("salome_utils",
- [os.path.join(root_dir,
- 'bin',
- 'salome')])
+ root_dir = root_dir.decode('utf-8')
+ aPath = [os.path.join(root_dir, 'bin', 'salome')]
+ sal_uts = "salome_utils"
try:
- grid = imp.load_module("salome_utils",
- file_,
- pathname,
- description)
+ (file_, pathname, description) = imp.find_module(sal_uts, aPath )
+ except Exception:
+ msg = "inexisting %s.py in %s" % (sal_uts, aPath)
+ raise Exception(msg)
+
+ try:
+ grid = imp.load_module(sal_uts, file_, pathname, description)
return grid.getLogDir
except:
- grid = imp.load_module("salome_utils",
- file_,
- pathname,
- description)
+ grid = imp.load_module(sal_uts, file_, pathname, description)
return grid.getTmpDir
finally:
if file_:
# Case where SALOME has NOT the launcher that uses the SalomeContext API
if VersionSalome < 730:
- binSalome = os.path.join(self.config.APPLI.grid_appli_install_dir,
+ binSalome = os.path.join(self.config.APPLICATION.workdir,
appdir,
"runAppli")
binPython = "python"
return binSalome, binPython, killSalome
# Case where SALOME has the launcher that uses the SalomeContext API
- if VersionSalome >= 730:
- if 'profile' not in self.config.APPLICATION:
- # Before revision of application concept
- launcher_name = self.config.APPLI.launch_alias_name
- binSalome = os.path.join(self.config.APPLICATION.workdir,
- appdir,
- launcher_name)
- else:
- # After revision of application concept
- launcher_name = self.config.APPLICATION.profile.launcher_name
- binSalome = os.path.join(self.config.APPLICATION.workdir,
- launcher_name)
+ else:
+ launcher_name = src.get_launcher_name(self.config)
+ binSalome = os.path.join(self.config.APPLICATION.workdir,
+ launcher_name)
- if src.architecture.is_windows():
- binSalome += '.bat'
-
binPython = binSalome + ' shell'
killSalome = binSalome + ' killall'
return binSalome, binPython, killSalome
out_path = os.path.join(self.currentDir,
self.currentgrid,
self.currentsession)
+ if verbose: print("run_tests '%s'\nlistTest: %s\nignoreList: %s" %
+ (self.currentDir, PP.pformat(listTest), PP.pformat(ignoreList))) # cvw TODO
sessionname = "%s/%s" % (self.currentgrid, self.currentsession)
time_out = self.get_test_timeout(sessionname,
- self.config.SITE.test.timeout)
+ DEFAULT_TIMEOUT)
- time_out_salome = src.get_cfg_param(self.config.SITE.test,
- "timeout_app",
- self.config.SITE.test.timeout)
+ time_out_salome = DEFAULT_TIMEOUT
# generate wrapper script
script_path = os.path.join(out_path, 'wrapperScript.py')
tmpDir = self.get_tmp_dir()
binSalome, binPython, killSalome = self.generate_launching_commands()
- if self.settings.has_key("run_with_grids") \
- and self.settings["run_with_grids"].has_key(sessionname):
- binSalome = (binSalome +
- " -m %s" % self.settings["run_with_grids"][sessionname])
+ if "run_with_grids" in self.settings and \
+ sessionname in self.settings["run_with_grids"]:
+ binSalome = (binSalome + " -m %s" % self.settings["run_with_grids"][sessionname])
logWay = os.path.join(self.tmp_working_dir, "WORK", "log_cxx")
elapsed = -1
if self.currentsession.startswith("NOGUI_"):
# runSalome -t (bash)
- status, elapsed = fork.batch(binSalome, self.logger,
- os.path.join(self.tmp_working_dir,
- "WORK"),
- [ "-t",
- "--shutdown-server=1",
- script_path ],
- delai=time_out,
- log=logWay)
+ status, elapsed = fork.batch(
+ binSalome,
+ self.logger,
+ os.path.join(self.tmp_working_dir, "WORK"),
+ [ "-t", "--shutdown-server=1", script_path ],
+ delai=time_out,
+ log=logWay)
elif self.currentsession.startswith("PY_"):
# python script.py
- status, elapsed = fork.batch(binPython, self.logger,
- os.path.join(self.tmp_working_dir,
- "WORK"),
- [script_path],
- delai=time_out, log=logWay)
+ status, elapsed = fork.batch(
+ binPython,
+ self.logger,
+ os.path.join(self.tmp_working_dir, "WORK"),
+ [script_path],
+ delai=time_out,
+ log=logWay)
else:
opt = "-z 0"
if self.show_desktop: opt = "--show-desktop=0"
- status, elapsed = fork.batch_salome(binSalome,
- self.logger,
- os.path.join(
- self.tmp_working_dir,
- "WORK"),
- [ opt,
- "--shutdown-server=1",
- script_path ],
- getTmpDir=tmpDir,
- fin=killSalome,
- delai=time_out,
- log=logWay,
- delaiapp=time_out_salome)
-
- self.logger.write("status = %s, elapsed = %s\n" % (status, elapsed),
- 5)
+ status, elapsed = fork.batch_salome(
+ binSalome,
+ self.logger,
+ os.path.join( self.tmp_working_dir, "WORK"),
+ [ opt, "--shutdown-server=1", script_path ],
+ getTmpDir=tmpDir,
+ fin=killSalome,
+ delai=time_out,
+ log=logWay,
+ delaiapp=time_out_salome)
+
+ self.logger.write("status = %s, elapsed = %s\n" % (status, elapsed), 5)
# create the test result to add in the config object
test_info = src.pyconf.Mapping(self.config)
self.nb_run -= 1
elif script_info.res == "?":
self.nb_not_run += 1
+
self.config.TESTS.append(test_info, '')
tests = os.listdir(os.path.join(self.currentDir,
self.currentgrid,
self.currentsession))
- tests = filter(lambda l: l.endswith(".py"), tests)
+ # avoid result files of previous tests, if presents
+ # tests = filter(lambda l: l.endswith(".py"), tests)
+ tests = [t for t in tests if t.endswith(".py") \
+ and not ( t.endswith(".out.py") or \
+ t.endswith(".result.py") or \
+ t.endswith("wrapperScript.py") \
+ ) ]
tests = sorted(tests, key=str.lower)
# build list of known failures
l)), sessions)
sessions = sorted(sessions, key=str.lower)
+ existingSessions = self.getSubDirectories(grid_path)
for session_ in sessions:
if not os.path.exists(os.path.join(grid_path, session_)):
self.logger.write(self.write_test_margin(2), 3)
- self.logger.write(src.printcolors.printcWarning("Session %s not"
- " found" % session_) + "\n", 3, False)
+ msg = """\
+Session '%s' not found
+Existing sessions are:
+%s
+""" % (session_, PP.pformat(sorted(existingSessions)))
+ self.logger.write(src.printcolors.printcWarning(msg), 3, False)
else:
self.currentsession = session_
self.run_session_tests()
+ def getSubDirectories(self, aDir):
+ """
+ get names of first level of sub directories in aDir
+ excluding '.git' etc as beginning with '.'
+ """
+ res = os.listdir(aDir)
+ res = [d for d in res if os.path.isdir(os.path.join(aDir, d)) and d[0] != '.']
+ # print("getSubDirectories %s are:\n%s" % (aDir, PP.pformat(res)))
+ return res
+
##
# Runs test testbase.
def run_testbase_tests(self):
if os.path.exists(settings_file):
gdic, ldic = {}, {}
execfile(settings_file, gdic, ldic)
- self.logger.write(_("Load test settings\n"), 3)
+ self.logger.write("Load test settings '%s'\n" % settings_file, 5)
self.settings = ldic['settings_dic']
self.ignore_tests = ldic['known_failures_list']
if isinstance(self.ignore_tests, list):
self.ignore_tests = {}
- self.logger.write(src.printcolors.printcWarning("known_failur"
- "es_list must be a dictionary (not a list)") + "\n", 1, False)
+ self.logger.write(src.printcolors.printcWarning(
+ "known_failures_list must be a dictionary (not a list)") + "\n", 1, False)
else:
self.ignore_tests = {}
self.settings.clear()
# read known failures pyconf
- if "testerror" in self.config.SITE:
+ if "testerror" in self.config.LOCAL:
#import testerror
#self.known_errors = testerror.read_test_failures(
# self.config.TOOLS.testerror.file_path,
grids)
grids = sorted(grids, key=str.lower)
+ existingGrids = self.getSubDirectories(self.currentDir)
for grid in grids:
if not os.path.exists(os.path.join(self.currentDir, grid)):
self.logger.write(self.write_test_margin(1), 3)
- self.logger.write(src.printcolors.printcWarning(
- "grid %s does not exist\n" % grid), 3, False)
+ msg = """\
+Grid '%s' does not exist
+Existing grids are:
+%s
+""" % (grid, PP.pformat(sorted(existingGrids)))
+ self.logger.write(src.printcolors.printcWarning(msg), 3, False)
else:
self.currentgrid = grid
self.run_grid_tests()
def run_script(self, script_name):
- if ('APPLICATION' in self.config and
+ if ('APPLICATION' in self.config and
script_name in self.config.APPLICATION):
script = self.config.APPLICATION[script_name]
if len(script) == 0:
self.logger.write("\n", 2, False)
# evaluate results
- res_count = "%d / %d" % (self.nb_succeed,
- self.nb_run - self.nb_acknoledge)
res_out = _("Tests Results: %(succeed)d / %(total)d\n") % \
{ 'succeed': self.nb_succeed, 'total': self.nb_run }
if self.nb_timeout > 0:
self.logger.write(_("%d tests TIMEOUT\n") % self.nb_timeout, 1)
- res_count += " TO: %d" % self.nb_timeout
if self.nb_not_run > 0:
self.logger.write(_("%d tests not executed\n") % self.nb_not_run, 1)
- res_count += " NR: %d" % self.nb_not_run
+ if self.nb_acknoledge > 0:
+ self.logger.write(_("%d tests known failures\n") % self.nb_acknoledge, 1)
status = src.OK_STATUS
if self.nb_run - self.nb_succeed - self.nb_acknoledge > 0: