import string
import imp
import subprocess
+
from . import fork
import src
-# directories not considered as test modules
-C_IGNORE_MODULES = ['.git', '.svn', 'RESSOURCES']
+# directories not considered as test grids
+C_IGNORE_GRIDS = ['.git', '.svn', 'RESSOURCES']
-C_TESTS_LIGHT_FILE = "TestsLight.txt"
+DEFAULT_TIMEOUT = 150
# Get directory to be used for the temporary files.
#
def __init__(self,
config,
logger,
- sessionDir,
+ tmp_working_dir,
testbase="",
- modules=None,
- types=None,
+ grids=None,
+ sessions=None,
launcher="",
show_desktop=True):
- self.modules = modules
+ self.grids = grids
self.config = config
self.logger = logger
- self.sessionDir = sessionDir
- self.types = types
+ self.tmp_working_dir = tmp_working_dir
+ self.sessions = sessions
self.launcher = launcher
self.show_desktop = show_desktop
- self.prepare_testbase(testbase)
-
+ res = self.prepare_testbase(testbase)
+ self.test_base_found = True
+ if res == 1:
+ # Fail
+ self.test_base_found = False
+
self.settings = {}
self.known_errors = None
'dir': testbase_dir })
self._copy_dir(testbase_dir,
- os.path.join(self.sessionDir, 'BASES', testbase_name))
+ os.path.join(self.tmp_working_dir, 'BASES', testbase_name))
- def prepare_testbase_from_git(self, testbase_name, testbase_base, testbase_tag):
+ def prepare_testbase_from_git(self,
+ testbase_name,
+ testbase_base,
+ testbase_tag):
self.logger.write(
_("get test base '%(testbase)s' with '%(tag)s' tag from git\n") % {
- "testbase" : src.printcolors.printcLabel(testbase_name),
- "tag" : src.printcolors.printcLabel(testbase_tag)},
+ "testbase" : src.printcolors.printcLabel(testbase_name),
+ "tag" : src.printcolors.printcLabel(testbase_tag)},
3)
try:
def set_signal(): # pragma: no cover
if src.architecture.is_windows():
# preexec_fn not supported on windows platform
res = subprocess.call(cmd,
- cwd=os.path.join(self.sessionDir, 'BASES'),
+ cwd=os.path.join(self.tmp_working_dir, 'BASES'),
shell=True,
stdout=self.logger.logTxtFile,
stderr=subprocess.PIPE)
else:
res = subprocess.call(cmd,
- cwd=os.path.join(self.sessionDir, 'BASES'),
+ cwd=os.path.join(self.tmp_working_dir, 'BASES'),
shell=True,
preexec_fn=set_signal,
stdout=self.logger.logTxtFile,
if res != 0:
raise src.SatException(_("Error: unable to get test base "
"'%(name)s' from git '%(repo)s'.") % \
- { 'name': testbase_name, 'repo': testbase_base })
+ { 'name': testbase_name,
+ 'repo': testbase_base })
except OSError:
self.logger.error(_("git is not installed. exiting...\n"))
signal.signal(signal.SIGPIPE, signal.SIG_DFL)
cmd = "svn checkout --username %(user)s %(base)s %(dir)s"
- cmd = cmd % { 'user': user, 'base': testbase_base, 'dir': testbase_name }
-
+ cmd = cmd % { 'user': user,
+ 'base': testbase_base,
+ 'dir': testbase_name }
+
+ # Get the application environment
+ self.logger.write(_("Set the application environment\n"), 5)
+ env_appli = src.environment.SalomeEnviron(self.config,
+ src.environment.Environ(dict(os.environ)))
+ env_appli.set_application_env(self.logger)
+
self.logger.write("> %s\n" % cmd, 5)
if src.architecture.is_windows():
# preexec_fn not supported on windows platform
res = subprocess.call(cmd,
- cwd=os.path.join(self.sessionDir, 'BASES'),
+ cwd=os.path.join(self.tmp_working_dir, 'BASES'),
shell=True,
stdout=self.logger.logTxtFile,
stderr=subprocess.PIPE)
else:
res = subprocess.call(cmd,
- cwd=os.path.join(self.sessionDir, 'BASES'),
+ cwd=os.path.join(self.tmp_working_dir, 'BASES'),
shell=True,
preexec_fn=set_signal,
stdout=self.logger.logTxtFile,
- stderr=subprocess.PIPE)
+ stderr=subprocess.PIPE,
+ env=env_appli.environ.environ,)
if res != 0:
raise src.SatException(_("Error: unable to get test base '%(nam"
"e)s' from svn '%(repo)s'.") % \
- { 'name': testbase_name, 'repo': testbase_base })
+ { 'name': testbase_name,
+ 'repo': testbase_base })
except OSError:
self.logger.error(_("svn is not installed. exiting...\n"))
test_base_info = None
for project_name in self.config.PROJECTS.projects:
project_info = self.config.PROJECTS.projects[project_name]
+ if "test_bases" not in project_info:
+ continue
for t_b_info in project_info.test_bases:
if t_b_info.name == test_base_name:
test_base_info = t_b_info
return 0
if not test_base_info:
- message = _("########## WARNING: base '%s' not found\n") % test_base_name
- raise src.SatException(message)
+ message = (_("########## ERROR: test base '%s' not found\n") %
+ test_base_name)
+ self.logger.write("%s\n" % src.printcolors.printcError(message))
+ return 1
if test_base_info.get_sources == "dir":
- self.prepare_testbase_from_dir(test_base_name, test_base_info.info.dir)
+ self.prepare_testbase_from_dir(test_base_name,
+ test_base_info.info.dir)
elif test_base_info.get_sources == "git":
self.prepare_testbase_from_git(test_base_name,
test_base_info.info.base,
self.config.APPLICATION.test_base.tag)
elif test_base_info.get_sources == "svn":
- svn_user = src.get_cfg_param(test_base_info.svn_info,
+ svn_user = src.get_cfg_param(test_base_info.info,
"svn_user",
self.config.USER.svn_user)
self.prepare_testbase_from_svn(svn_user,
##
# Searches if the script is declared in known errors pyconf.
# Update the status if needed.
- def search_known_errors(self, status, test_module, test_type, test):
- test_path = os.path.join(test_module, test_type, test)
+ def search_known_errors(self, status, test_grid, test_session, test):
+ test_path = os.path.join(test_grid, test_session, test)
if not src.config_has_application(self.config):
return status, []
results = {}
for test in listTest:
resfile = os.path.join(self.currentDir,
- self.currentModule,
- self.currentType,
+ self.currentgrid,
+ self.currentsession,
test[:-3] + ".result.py")
# check if <test>.result.py file exists
expected = []
if status == src.KO_STATUS or status == src.OK_STATUS:
status, expected = self.search_known_errors(status,
- self.currentModule,
- self.currentType,
+ self.currentgrid,
+ self.currentsession,
test)
callback = ""
# check if <test>.py file exists
testfile = os.path.join(self.currentDir,
- self.currentModule,
- self.currentType,
+ self.currentgrid,
+ self.currentsession,
test)
if not os.path.exists(testfile):
# check if <test>.out.py file exists
outfile = os.path.join(self.currentDir,
- self.currentModule,
- self.currentType,
+ self.currentgrid,
+ self.currentsession,
test[:-3] + ".out.py")
if not os.path.exists(outfile):
"test",
"scriptTemplate.py"), 'r')
template = string.Template(template_file.read())
-
+
# create substitution dictionary
d = dict()
d['resourcesWay'] = os.path.join(self.currentDir, 'RESSOURCES')
- d['tmpDir'] = os.path.join(self.sessionDir, 'WORK')
+ d['tmpDir'] = os.path.join(self.tmp_working_dir, 'WORK')
d['toolsWay'] = os.path.join(self.config.VARS.srcDir, "test")
- d['typeDir'] = os.path.join(self.currentDir,
- self.currentModule,
- self.currentType)
- d['resultFile'] = os.path.join(self.sessionDir, 'WORK', 'exec_result')
+ d['sessionDir'] = os.path.join(self.currentDir,
+ self.currentgrid,
+ self.currentsession)
+ d['resultFile'] = os.path.join(self.tmp_working_dir,
+ 'WORK',
+ 'exec_result')
d['listTest'] = listTest
- d['typeName'] = self.currentType
+ d['sessionName'] = self.currentsession
d['ignore'] = ignoreList
# create script with template
# Find the getTmpDir function that gives access to *pidict file directory.
# (the *pidict file exists when SALOME is launched)
def get_tmp_dir(self):
- # Rare case where there is no KERNEL in module list
+ # Rare case where there is no KERNEL in grid list
# (for example MED_STANDALONE)
if ('APPLICATION' in self.config
and 'KERNEL' not in self.config.APPLICATION.products
launcherDir = os.path.dirname(self.launcher)
if launcherName == 'runAppli':
# Old application
- cmd = "for i in " + launcherDir + "/env.d/*.sh; do source ${i};"
- " done ; echo $KERNEL_ROOT_DIR"
+ cmd = ("for i in " + launcherDir + "/env.d/*.sh; do source ${i};"
+ " done ; echo $KERNEL_ROOT_DIR")
else:
# New application
- cmd = "echo -e 'import os\nprint os.environ[\"KERNEL_ROOT_DIR\""
- "]' > tmpscript.py; %s shell tmpscript.py" % self.launcher
+ cmd = ("echo -e 'import os\nprint os.environ[\"KERNEL_" +
+ "ROOT_DIR\"]' > tmpscript.py; %s shell" +
+ " tmpscript.py") % self.launcher
root_dir = subprocess.Popen(cmd,
stdout=subprocess.PIPE,
shell=True,
executable='/bin/bash').communicate()[0].split()[-1]
- # import module salome_utils from KERNEL that gives
+ # import grid salome_utils from KERNEL that gives
# the right getTmpDir function
(file_, pathname, description) = imp.find_module("salome_utils",
'bin',
'salome')])
try:
- module = imp.load_module("salome_utils",
+ grid = imp.load_module("salome_utils",
file_,
pathname,
description)
- return module.getLogDir
+ return grid.getLogDir
except:
- module = imp.load_module("salome_utils",
+ grid = imp.load_module("salome_utils",
file_,
pathname,
description)
- return module.getTmpDir
+ return grid.getTmpDir
finally:
if file_:
file_.close()
return default_value
- def generate_launching_commands(self, typename):
+ def generate_launching_commands(self):
# Case where "sat test" is launched in an existing SALOME environment
if 'KERNEL_ROOT_DIR' in os.environ:
binSalome = "runSalome"
binPython = "python"
killSalome = "killSalome.py"
- # Rare case where there is no KERNEL in module list
+ # Rare case where there is no KERNEL in grid list
# (for example MED_STANDALONE)
if ('APPLICATION' in self.config and
'KERNEL' not in self.config.APPLICATION.products):
# Case where SALOME has NOT the launcher that uses the SalomeContext API
if VersionSalome < 730:
- binSalome = os.path.join(self.config.APPLI.module_appli_install_dir,
+ binSalome = os.path.join(self.config.APPLICATION.workdir,
appdir,
"runAppli")
binPython = "python"
return binSalome, binPython, killSalome
# Case where SALOME has the launcher that uses the SalomeContext API
- if VersionSalome >= 730:
- if 'profile' not in self.config.APPLICATION:
- # Before revision of application concept
- launcher_name = self.config.APPLI.launch_alias_name
- binSalome = os.path.join(self.config.APPLICATION.workdir,
- appdir,
- launcher_name)
- else:
- # After revision of application concept
- launcher_name = self.config.APPLICATION.profile.launcher_name
- binSalome = os.path.join(self.config.APPLICATION.workdir,
- launcher_name)
+ else:
+ launcher_name = src.get_launcher_name(config)
+ binSalome = os.path.join(self.config.APPLICATION.workdir,
+ launcher_name)
- if src.architecture.is_windows():
- binSalome += '.bat'
-
binPython = binSalome + ' shell'
killSalome = binSalome + ' killall'
return binSalome, binPython, killSalome
##
- # Runs tests of a type (using a single instance of Salome).
+ # Runs tests of a session (using a single instance of Salome).
def run_tests(self, listTest, ignoreList):
out_path = os.path.join(self.currentDir,
- self.currentModule,
- self.currentType)
- typename = "%s/%s" % (self.currentModule, self.currentType)
- time_out = self.get_test_timeout(typename,
- self.config.SITE.test.timeout)
+ self.currentgrid,
+ self.currentsession)
+ sessionname = "%s/%s" % (self.currentgrid, self.currentsession)
+ time_out = self.get_test_timeout(sessionname,
+ DEFAULT_TIMEOUT)
- time_out_salome = src.get_cfg_param(self.config.SITE.test,
- "timeout_app",
- self.config.SITE.test.timeout)
+ time_out_salome = DEFAULT_TIMEOUT
# generate wrapper script
script_path = os.path.join(out_path, 'wrapperScript.py')
tmpDir = self.get_tmp_dir()
- binSalome, binPython, killSalome = self.generate_launching_commands(
- typename)
- if self.settings.has_key("run_with_modules") \
- and self.settings["run_with_modules"].has_key(typename):
+ binSalome, binPython, killSalome = self.generate_launching_commands()
+ if self.settings.has_key("run_with_grids") \
+ and self.settings["run_with_grids"].has_key(sessionname):
binSalome = (binSalome +
- " -m %s" % self.settings["run_with_modules"][typename])
+ " -m %s" % self.settings["run_with_grids"][sessionname])
- logWay = os.path.join(self.sessionDir, "WORK", "log_cxx")
+ logWay = os.path.join(self.tmp_working_dir, "WORK", "log_cxx")
status = False
elapsed = -1
- if self.currentType.startswith("NOGUI_"):
+ if self.currentsession.startswith("NOGUI_"):
# runSalome -t (bash)
status, elapsed = fork.batch(binSalome, self.logger,
- os.path.join(self.sessionDir, "WORK"),
+ os.path.join(self.tmp_working_dir,
+ "WORK"),
[ "-t",
"--shutdown-server=1",
script_path ],
delai=time_out,
log=logWay)
- elif self.currentType.startswith("PY_"):
+ elif self.currentsession.startswith("PY_"):
# python script.py
status, elapsed = fork.batch(binPython, self.logger,
- os.path.join(self.sessionDir, "WORK"),
+ os.path.join(self.tmp_working_dir,
+ "WORK"),
[script_path],
delai=time_out, log=logWay)
if self.show_desktop: opt = "--show-desktop=0"
status, elapsed = fork.batch_salome(binSalome,
self.logger,
- os.path.join(self.sessionDir,
- "WORK"),
+ os.path.join(
+ self.tmp_working_dir,
+ "WORK"),
[ opt,
"--shutdown-server=1",
script_path ],
# create the test result to add in the config object
test_info = src.pyconf.Mapping(self.config)
test_info.testbase = self.currentTestBase
- test_info.module = self.currentModule
- test_info.type = self.currentType
+ test_info.grid = self.currentgrid
+ test_info.session = self.currentsession
test_info.script = src.pyconf.Sequence(self.config)
script_results = self.read_results(listTest, elapsed == time_out)
self.config.TESTS.append(test_info, '')
##
- # Runs all tests of a type.
- def run_type_tests(self):
+ # Runs all tests of a session.
+ def run_session_tests(self):
self.logger.write(self.write_test_margin(2), 3)
- self.logger.write("Type = %s\n" % src.printcolors.printcLabel(
- self.currentType), 3, False)
+ self.logger.write("Session = %s\n" % src.printcolors.printcLabel(
+ self.currentsession), 3, False)
# prepare list of tests to run
tests = os.listdir(os.path.join(self.currentDir,
- self.currentModule,
- self.currentType))
+ self.currentgrid,
+ self.currentsession))
tests = filter(lambda l: l.endswith(".py"), tests)
tests = sorted(tests, key=str.lower)
# build list of known failures
- cat = "%s/%s/" % (self.currentModule, self.currentType)
+ cat = "%s/%s/" % (self.currentgrid, self.currentsession)
ignoreDict = {}
for k in self.ignore_tests.keys():
if k.startswith(cat):
self.run_tests(tests, ignoreDict)
##
- # Runs all tests of a module.
- def run_module_tests(self):
+ # Runs all tests of a grid.
+ def run_grid_tests(self):
self.logger.write(self.write_test_margin(1), 3)
- self.logger.write("Module = %s\n" % src.printcolors.printcLabel(
- self.currentModule), 3, False)
+ self.logger.write("grid = %s\n" % src.printcolors.printcLabel(
+ self.currentgrid), 3, False)
- module_path = os.path.join(self.currentDir, self.currentModule)
+ grid_path = os.path.join(self.currentDir, self.currentgrid)
- types = []
- if self.types is not None:
- types = self.types # user choice
+ sessions = []
+ if self.sessions is not None:
+ sessions = self.sessions # user choice
else:
- # use all scripts in module
- types = filter(lambda l: l not in C_IGNORE_MODULES,
- os.listdir(module_path))
- types = filter(lambda l: os.path.isdir(os.path.join(module_path,
- l)), types)
-
- types = sorted(types, key=str.lower)
- for type_ in types:
- if not os.path.exists(os.path.join(module_path, type_)):
+ # use all scripts in grid
+ sessions = filter(lambda l: l not in C_IGNORE_GRIDS,
+ os.listdir(grid_path))
+ sessions = filter(lambda l: os.path.isdir(os.path.join(grid_path,
+ l)), sessions)
+
+ sessions = sorted(sessions, key=str.lower)
+ for session_ in sessions:
+ if not os.path.exists(os.path.join(grid_path, session_)):
self.logger.write(self.write_test_margin(2), 3)
- self.logger.write(src.printcolors.printcWarning("Type %s not "
- "found" % type_) + "\n", 3, False)
+ self.logger.write(src.printcolors.printcWarning("Session %s not"
+ " found" % session_) + "\n", 3, False)
else:
- self.currentType = type_
- self.run_type_tests()
+ self.currentsession = session_
+ self.run_session_tests()
##
# Runs test testbase.
self.logger.write(self.write_test_margin(0), 3)
testbase_label = "Test base = %s\n" % src.printcolors.printcLabel(
- self.currentTestBase)
+ self.currentTestBase)
self.logger.write(testbase_label, 3, False)
- self.logger.write("-" * len(src.printcolors.cleancolor(testbase_label)), 3)
+ self.logger.write("-" * len(src.printcolors.cleancolor(testbase_label)),
+ 3)
self.logger.write("\n", 3, False)
# load settings
self.settings.clear()
# read known failures pyconf
- if "testerror" in self.config.SITE:
+ if "testerror" in self.config.LOCAL:
#import testerror
#self.known_errors = testerror.read_test_failures(
# self.config.TOOLS.testerror.file_path,
else:
self.known_errors = None
- if self.modules is not None:
- modules = self.modules # given by user
+ if self.grids is not None:
+ grids = self.grids # given by user
else:
- # select all the modules (i.e. directories) in the directory
- modules = filter(lambda l: l not in C_IGNORE_MODULES,
+ # select all the grids (i.e. directories) in the directory
+ grids = filter(lambda l: l not in C_IGNORE_GRIDS,
os.listdir(self.currentDir))
- modules = filter(lambda l: os.path.isdir(
+ grids = filter(lambda l: os.path.isdir(
os.path.join(self.currentDir, l)),
- modules)
+ grids)
- modules = sorted(modules, key=str.lower)
- for module in modules:
- if not os.path.exists(os.path.join(self.currentDir, module)):
+ grids = sorted(grids, key=str.lower)
+ for grid in grids:
+ if not os.path.exists(os.path.join(self.currentDir, grid)):
self.logger.write(self.write_test_margin(1), 3)
self.logger.write(src.printcolors.printcWarning(
- "Module %s does not exist\n" % module), 3, False)
+ "grid %s does not exist\n" % grid), 3, False)
else:
- self.currentModule = module
- self.run_module_tests()
+ self.currentgrid = grid
+ self.run_grid_tests()
def run_script(self, script_name):
- if 'APPLICATION' in self.config and script_name in self.config.APPLICATION:
+ if ('APPLICATION' in self.config and
+ script_name in self.config.APPLICATION):
script = self.config.APPLICATION[script_name]
if len(script) == 0:
return
self.logger.write(src.printcolors.printcHeader(
_("=== STARTING TESTS")) + "\n", 2)
self.logger.write("\n", 2, False)
- self.currentDir = os.path.join(self.sessionDir,
+ self.currentDir = os.path.join(self.tmp_working_dir,
'BASES',
self.currentTestBase)
self.run_testbase_tests()