# Create or update the hat xml that gives access to all the commands log files
logger.write(_("Generating the hat log file (can be long) ... "), 3)
xmlHatFilePath = os.path.join(logDir, 'hat.xml')
- try:
+ if True: #try: # cvw TODO
src.logger.update_hat_xml(logDir,
application = runner.cfg.VARS.application,
notShownCommands = notShownCommands)
logger.write(src.printcolors.printc("OK"), 3)
- except:
+ else: #except: # cvw TODO
logger.write(src.printcolors.printc("KO"), 3)
logger.write(" problem update hat.xml", 3)
import datetime
import gzip
+verbose = False
+
try:
from hashlib import sha1
except ImportError:
parser.add_option('g', 'grid', 'list', 'grids',
_('Optional: Indicate which grid(s) to test (subdirectory of the test '
'base).'))
-parser.add_option('s', 'session', 'list', 'sessions',
+parser.add_option('s', 'session', 'list2', 'sessions',
_('Optional: indicate which session(s) to test (subdirectory of the '
'grid).'))
parser.add_option('', 'display', 'string', 'display',
first_time = False
if not os.path.exists(xml_history_path):
+ if verbose: print("first_time as NOT existing '%s'" % xml_history_path) # cvw TODO
first_time = True
root = etree.Element("salome")
prod_node = etree.Element("product", name=application_name, build=xmlname)
root.append(prod_node)
else:
+ if verbose: print("NOT first_time as existing '%s'" % xml_history_path) # cvw TODO
root = etree.parse(xml_history_path).getroot()
prod_node = root.find("product")
tt[test.testbase].append(test)
for testbase in tt.keys():
+ if verbose: print("---- create_test_report %s %s" % (testbase, first_time))
if first_time:
gn = add_simple_node(tests, "testbase")
else:
content = "\n".join(lines)
# create hash from context information
- dirname = sha1(content.encode()).hexdigest()
+ dirname = datetime.datetime.now().strftime("%y%m%d_%H%M%S_") + sha1(content.encode()).hexdigest()[0:6]
base_dir = os.path.join(tmp_dir, dirname)
os.makedirs(base_dir)
os.environ['TT_TMP_RESULT'] = base_dir
"board",
retcode,
"Click on the link to get the detailed test results")
-
+ logger.write("\nTests board is file %s\n" % xml_board_path, 1)
+
# Add the historic files into the log files list of the command
logger.l_logFiles.append(historic_xml_path)
log_macro_command_file_expression = "^[0-9]{8}_+[0-9]{6}_+.*\.xml$"
log_all_command_file_expression = "^.*[0-9]{8}_+[0-9]{6}_+.*\.xml$"
+verbose = True # cvw TODO
+
class Logger(object):
"""\
Class to handle log mechanism.
self.config.VARS.user})
# The time when command was launched
Y, m, dd, H, M, S = date_to_datetime(self.config.VARS.datehour)
- date_hour = "%2s/%2s/%4s %2sh%2sm%2ss" % (dd, m, Y, H, M, S)
+ date_hour = "%4s/%2s/%2s %2sh%2sm%2ss" % (Y, m, dd, H, M, S)
self.xmlFile.append_node_attrib("Site", attrib={"beginTime" :
date_hour})
# The application if any
sys.stdout.write(printcolors.printcWarning("%s\n%s\n" % (msg, e)))
return False, None, None
- if 'application' in logFileXml.xmlroot.keys():
- appliLog = logFileXml.xmlroot.get('application')
- launched_cmd = logFileXml.xmlroot.find('Site').attrib['launchedCommand']
- # if it corresponds, then the log has to be shown
- if appliLog == application:
- return True, appliLog, launched_cmd
- elif application != 'None':
- return False, appliLog, launched_cmd
-
- return True, appliLog, launched_cmd
-
+ try:
+ if 'application' in logFileXml.xmlroot.keys():
+ appliLog = logFileXml.xmlroot.get('application')
+ launched_cmd = logFileXml.xmlroot.find('Site').attrib['launchedCommand']
+ # if it corresponds, then the log has to be shown
+ if appliLog == application:
+ return True, appliLog, launched_cmd
+ elif application != 'None':
+ return False, appliLog, launched_cmd
+
+ return True, appliLog, launched_cmd
+ except Exception as e:
+ msg = _("WARNING: the log file %s cannot be parsed:" % logFilePath)
+ sys.stdout.write(printcolors.printcWarning("%s\n%s\n" % (msg, e)))
+ return False, None, None
+
if application == 'None':
return True, None, None
import string
import subprocess
-"""
-Exception class for test errors.
-"""
+
class SatTestError(Exception):
+ """
+ Exception class for test errors.
+ """
def __init__(self, value):
self.value = value
+
def __str__(self):
return repr(self.value)
class SatNotApplicableError(Exception):
+ """
+ Exception class for test errors.
+ """
def __init__(self, value):
self.value = value
+
def __str__(self):
return repr(self.value)
def ERROR(message):
- print message
+ print("ERROR", message)
raise SatTestError(message)
def NOT_APPLICABLE(message):
- print message
+ print("NOT_APPLICABLE", message)
raise SatNotApplicableError(message)
-##
-# Compares 2 numbers with tolerance tol.
def compFloat(f1, f2, tol=10e-10):
+ """Compares 2 numbers with tolerance tol."""
diff = abs(f1 - f2)
- print "|f1-f2| = %s (tol=%s)" % (str(diff), str(tol))
+ print("|f1-f2| = %s (tol=%s)" % (str(diff), str(tol)))
if diff <= tol:
comp = "OK"
else:
comp = "KO"
return comp
-##
-# Compares 2 files.
def compFiles(f1, f2, tol=0):
+ """Compares 2 files."""
assert os.path.exists(f1), "compFiles: file not found: %s" % f1
assert os.path.exists(f2), "compFiles: file not found: %s" % f2
diffLine = os.popen("diff -y --suppress-common-lines %s %s" % (f1, f2))
diff = len(string.split(diffLine.read(), "\n"))
diffLine.close()
- print "nb of diff lines = %s (tol=%s)" % (str(diff), str(tol))
+ print("nb of diff lines = %s (tol=%s)" % (str(diff), str(tol)))
if diff <= tol:
comp = "OK"
else:
comp = "KO"
return comp
-##
-# Uses mdump to dump a med file.
def mdump_med(med_file, dump_file, options):
+ """Uses mdump to dump a med file."""
assert isinstance(options, list), "Bad options for mdump: %s" % options
assert len(options) == 3, "Bad options for mdump: %s" % options
cmd = "mdump %s %s" % (med_file, " ".join(options))
- #print cmd
-
- df = open(dump_file, "w")
- pdump = subprocess.Popen(cmd, shell=True, stdout=df)
- st = pdump.wait()
- df.close()
+ #print(cmd)
+ with open(dump_file, "w") as df:
+ pdump = subprocess.Popen(cmd, shell=True, stdout=df)
+ st = pdump.wait()
return st
-##
-# Compares 2 med files by using mdump.
def compMED(file1, file2, tol=0, diff_flags=""):
- assert os.path.exists(file1), "compMED: file not found: %s" % file1
- assert os.path.exists(file2), "compMED: file not found: %s" % file2
-
- print
- print ">>>> compMED"
- print " file1:", file1
- print " file2:", file2
-
+ """Compares 2 med files by using mdump."""
+
+ # local utility method
def do_dump(med):
dump = os.path.join(os.environ['TT_TMP_RESULT'], os.path.basename(med) + ".mdump")
st = mdump_med(med, dump, ["1", "NODALE", "FULL_INTERLACE"])
raise Exception("Error mpdump %s" % med)
# replace file name with "filename"
- lines = open(dump, "r").readlines()
- dumpfile = open(dump, "w")
- for line in lines:
- try:
- line.index('Nom universel du maillage')
- continue
- except:
- dumpfile.write(line.replace(med, 'filename'))
+ with open(dump, "r") as ff:
+ lines = ff.readlines()
+ with open(dump, "w") as dumpfile:
+ for line in lines:
+ try:
+ line.index('Universal name of mesh')
+ continue
+ except:
+ dumpfile.write(line.replace(med, 'filename'))
return dump
+
+ # begin method
+ print(""">>>> compMED
+ file1: %s
+ file2: %s
+""" % (file1, file2))
+
+ if not os.path.exists(file1):
+ print("compMED: file not found: '%s'" % file1)
+ print("<<<< compMED\n")
+ return 1
+ if not os.path.exists(file2):
+ print("compMED: file not found: '%s'" % file2)
+ print("<<<< compMED\n")
+ return 1
+
dump1 = do_dump(file1)
dump2 = do_dump(file2)
diff_cmd = "diff %s %s %s" % (diff_flags, dump1, dump2)
- print " >" + diff_cmd
+ print(" >" + diff_cmd)
pdiff = subprocess.Popen(diff_cmd, shell=True, stdout=subprocess.PIPE)
status = pdiff.wait()
- print " Diff =", status
+ print(" Diff =", status)
if status != 0:
- print pdiff.stdout.read()
-
- print "<<<< compMED"
- print
+ print(pdiff.stdout.read())
+ print("<<<< compMED\n")
return status
-class TOOLS_class:
+class TOOLS_class(object):
def __init__(self, base_ressources_dir, tmp_dir, test_ressources_dir):
self.base_ressources_dir = base_ressources_dir
self.tmp_dir = tmp_dir
#!/usr/bin/env python
#-*- coding:utf-8 -*-
-import os, sys, traceback
+"""
+This script is automatically generated by 'command sat test etc...'
+from ...salomeTools/src/test/scriptTemplate.py
+"""
+
+import os
+import sys
+import traceback
import os.path
import time as THEBIGTIME
+
# set path
toolsWay = r'${toolsWay}'
resourcesWay = r'${resourcesWay}'
from TOOLS import SatNotApplicableError
-# on set les variables d'environement
+# set environement variables
os.environ['TT_BASE_RESSOURCES'] = resourcesWay
sys.path.append(resourcesWay)
-exec_result = open(r'${resultFile}', 'w')
-exec_result.write('Open\n')
-
__stdout__ = sys.stdout
__stderr__ = sys.stderr
-for test in listTest:
- pylog = open(os.path.join(outWay, test[:-3] + ".result.py"), "w")
- testout = open(os.path.join(outWay, test[:-3] + ".out.py"), "w")
- my_tools.init()
- sys.stdout = testout
- sys.stderr = testout
-
- pylog.write('#-*- coding:utf-8 -*-\n')
- exec_result.write("Run %s " % test)
- exec_result.flush()
-
- try:
- timeStart = THEBIGTIME.time()
- execfile(os.path.join(outWay, test), globals(), locals())
- timeTest = THEBIGTIME.time() - timeStart
- except SatNotApplicableError, ex:
- status = "NA"
- reason = str(ex)
- exec_result.write("NA\n")
- timeTest = THEBIGTIME.time() - timeStart
- pylog.write('status = "NA"\n')
- pylog.write('time = "' + timeTest.__str__() + '"\n')
- pylog.write('callback = "%s"\n' % reason)
- except Exception, ex:
- status = "KO"
- reason = ""
- if ignore.has_key(test):
- status = "KF"
- reason = "Known Failure = %s\n\n" % ignore[test]
- exec_result.write("%s\n" % status)
- timeTest = THEBIGTIME.time() - timeStart
- pylog.write('status = "%s" \n' % status)
- pylog.write('time = "' + timeTest.__str__() + '"\n')
- pylog.write('callback="""' + reason)
- exc_type, exc_value, exc_traceback = sys.exc_info()
- traceback.print_exception(exc_type,
- exc_value,
- exc_traceback,
- None,
- file=pylog)
- pylog.write('"""\n')
- else:
- exec_result.write("OK\n")
- pylog.write('status = "OK"\n')
- pylog.write('time = "' + timeTest.__str__() + '"\n')
-
- testout.close()
- sys.stdout = __stdout__
- sys.stderr = __stderr__
- my_tools.writeInFiles(pylog)
- pylog.close()
-
-exec_result.write('Close\n')
-exec_result.close()
+with open(r'${resultFile}', 'w') as exec_result:
+ exec_result.write('Open\n')
+
+ for test in listTest:
+ with open(os.path.join(outWay, test[:-3] + ".result.py"), "w") as pylog:
+ with open(os.path.join(outWay, test[:-3] + ".out.py"), "w") as testout:
+ my_tools.init()
+ sys.stdout = testout
+ sys.stderr = testout
+
+ pylog.write('#-*- coding:utf-8 -*-\n')
+ exec_result.write("Run %s " % test)
+ exec_result.flush()
+
+ try:
+ timeStart = THEBIGTIME.time()
+ execfile(os.path.join(outWay, test), globals(), locals())
+ timeTest = THEBIGTIME.time() - timeStart
+ except SatNotApplicableError as ex:
+ status = "NA"
+ reason = str(ex)
+ exec_result.write("NA\n")
+ timeTest = THEBIGTIME.time() - timeStart
+ pylog.write('status = "NA"\n')
+ pylog.write('time = "' + timeTest.__str__() + '"\n')
+ pylog.write('callback = "%s"\n' % reason)
+ except Exception as ex:
+ status = "KO"
+ reason = ""
+ if ignore.has_key(test):
+ status = "KF"
+ reason = "Known Failure = %s\n\n" % ignore[test]
+ exec_result.write("%s\n" % status)
+ timeTest = THEBIGTIME.time() - timeStart
+ pylog.write('status = "%s" \n' % status)
+ pylog.write('time = "' + timeTest.__str__() + '"\n')
+ pylog.write('callback="""' + reason)
+ exc_type, exc_value, exc_traceback = sys.exc_info()
+ traceback.print_exception(exc_type,
+ exc_value,
+ exc_traceback,
+ None,
+ file=pylog)
+ pylog.write('"""\n')
+ else:
+ exec_result.write("OK\n")
+ pylog.write('status = "OK"\n')
+ pylog.write('time = "' + timeTest.__str__() + '"\n')
+
+ pass
+ # testout.close()
+
+ sys.stdout = __stdout__
+ sys.stderr = __stderr__
+ my_tools.writeInFiles(pylog)
+ pass
+ # pylog.close()
+
+ exec_result.write('Close\n')
+ pass
+ # exec_result.close()
if 'PY' not in '${sessionName}':
import salome_utils
# 'salome',
# 'killSalome.py')
#cmd = '{python} {killScript} {port}'.format(python=os.environ['PYTHONBIN'],
- # killScript=killScript,
- # port=salome_utils.getPortNumber())
+ # killScript=killScript,
+ # port=salome_utils.getPortNumber())
cmd = 'killSalome.py {port}'.format( port=salome_utils.getPortNumber())
os.system(cmd)
+
code = compile(f.read(), somefile, 'exec')
exec(code, global_vars, local_vars)
-
import os
import sys
import datetime
import string
import imp
import subprocess
+import glob
+import pprint as PP
+
+verbose = False # cvw TODO
from . import fork
import src
results[test] = ["?", -1, "", []]
else:
gdic, ldic = {}, {}
+ if verbose:
+ print("test script: '%s':\n'%s'\n" % (resfile, open(resfile, 'r').read()))
+
execfile(resfile, gdic, ldic)
status = src.TIMEOUT_STATUS
callback = ldic['callback']
elif status == src.KO_STATUS:
callback = "CRASH"
+ if verbose:
+ print("--- CRASH ldic\n%s" % PP.pformat(ldic)) # cvw TODO
+ print("--- CRASH gdic\n%s" % PP.pformat(gdic))
+ pass
exec_time = -1
if ldic.has_key('time'):
d['ignore'] = ignoreList
# create script with template
- script = open(script_path, 'w')
- script.write(template.safe_substitute(d))
- script.close()
+ contents = template.safe_substitute(d)
+ if verbose: print("generate_script '%s':\n%s" % (script_path, contents)) # cvw TODO
+ with open(script_path, 'w') as f:
+ f.write(contents)
+
# Find the getTmpDir function that gives access to *pidict file directory.
# (the *pidict file exists when SALOME is launched)
out_path = os.path.join(self.currentDir,
self.currentgrid,
self.currentsession)
+ if verbose: print("run_tests '%s'\nlistTest: %s\nignoreList: %s" %
+ (self.currentDir, PP.pformat(listTest), PP.pformat(ignoreList))) # cvw TODO
sessionname = "%s/%s" % (self.currentgrid, self.currentsession)
time_out = self.get_test_timeout(sessionname,
DEFAULT_TIMEOUT)
tmpDir = self.get_tmp_dir()
binSalome, binPython, killSalome = self.generate_launching_commands()
- if self.settings.has_key("run_with_grids") \
- and self.settings["run_with_grids"].has_key(sessionname):
- binSalome = (binSalome +
- " -m %s" % self.settings["run_with_grids"][sessionname])
+ if self.settings.has_key("run_with_grids") and \
+ self.settings["run_with_grids"].has_key(sessionname):
+ binSalome = (binSalome + " -m %s" % self.settings["run_with_grids"][sessionname])
logWay = os.path.join(self.tmp_working_dir, "WORK", "log_cxx")
elapsed = -1
if self.currentsession.startswith("NOGUI_"):
# runSalome -t (bash)
- status, elapsed = fork.batch(binSalome, self.logger,
- os.path.join(self.tmp_working_dir,
- "WORK"),
- [ "-t",
- "--shutdown-server=1",
- script_path ],
- delai=time_out,
- log=logWay)
+ status, elapsed = fork.batch(
+ binSalome,
+ self.logger,
+ os.path.join(self.tmp_working_dir, "WORK"),
+ [ "-t", "--shutdown-server=1", script_path ],
+ delai=time_out,
+ log=logWay)
elif self.currentsession.startswith("PY_"):
# python script.py
- status, elapsed = fork.batch(binPython, self.logger,
- os.path.join(self.tmp_working_dir,
- "WORK"),
- [script_path],
- delai=time_out, log=logWay)
+ status, elapsed = fork.batch(
+ binPython,
+ self.logger,
+ os.path.join(self.tmp_working_dir, "WORK"),
+ [script_path],
+ delai=time_out,
+ log=logWay)
else:
opt = "-z 0"
if self.show_desktop: opt = "--show-desktop=0"
- status, elapsed = fork.batch_salome(binSalome,
- self.logger,
- os.path.join(
- self.tmp_working_dir,
- "WORK"),
- [ opt,
- "--shutdown-server=1",
- script_path ],
- getTmpDir=tmpDir,
- fin=killSalome,
- delai=time_out,
- log=logWay,
- delaiapp=time_out_salome)
+ status, elapsed = fork.batch_salome(
+ binSalome,
+ self.logger,
+ os.path.join( self.tmp_working_dir, "WORK"),
+ [ opt, "--shutdown-server=1", script_path ],
+ getTmpDir=tmpDir,
+ fin=killSalome,
+ delai=time_out,
+ log=logWay,
+ delaiapp=time_out_salome)
self.logger.write("status = %s, elapsed = %s\n" % (status, elapsed),
5)
l)), sessions)
sessions = sorted(sessions, key=str.lower)
+ existingSessions = self.getSubDirectories(grid_path)
for session_ in sessions:
if not os.path.exists(os.path.join(grid_path, session_)):
self.logger.write(self.write_test_margin(2), 3)
- self.logger.write(src.printcolors.printcWarning("Session %s not"
- " found" % session_) + "\n", 3, False)
+ msg = """\
+Session '%s' not found
+Existing sessions are:
+%s
+""" % (session_, PP.pformat(sorted(existingSessions)))
+ self.logger.write(src.printcolors.printcWarning(msg), 3, False)
else:
self.currentsession = session_
self.run_session_tests()
+ def getSubDirectories(self, aDir):
+ """
+ get names of first level of sub directories in aDir
+ excluding '.git' etc as beginning with '.'
+ """
+ res = os.listdir(aDir)
+ res = [d for d in res if os.path.isdir(os.path.join(aDir, d)) and d[0] != '.']
+ # print("getSubDirectories %s are:\n%s" % (aDir, PP.pformat(res)))
+ return res
+
##
# Runs test testbase.
def run_testbase_tests(self):
if os.path.exists(settings_file):
gdic, ldic = {}, {}
execfile(settings_file, gdic, ldic)
- self.logger.write(_("Load test settings\n"), 3)
+ self.logger.write("Load test settings '%s'\n" % settings_file, 5)
self.settings = ldic['settings_dic']
self.ignore_tests = ldic['known_failures_list']
if isinstance(self.ignore_tests, list):
self.ignore_tests = {}
- self.logger.write(src.printcolors.printcWarning("known_failur"
- "es_list must be a dictionary (not a list)") + "\n", 1, False)
+ self.logger.write(src.printcolors.printcWarning(
+ "known_failures_list must be a dictionary (not a list)") + "\n", 1, False)
else:
self.ignore_tests = {}
self.settings.clear()
grids)
grids = sorted(grids, key=str.lower)
+ existingGrids = self.getSubDirectories(self.currentDir)
for grid in grids:
if not os.path.exists(os.path.join(self.currentDir, grid)):
self.logger.write(self.write_test_margin(1), 3)
- self.logger.write(src.printcolors.printcWarning(
- "grid %s does not exist\n" % grid), 3, False)
+ msg = """\
+Grid '%s' does not exist
+Existing grids are:
+%s
+""" % (grid, PP.pformat(sorted(existingGrids)))
+ self.logger.write(src.printcolors.printcWarning(msg), 3, False)
else:
self.currentgrid = grid
self.run_grid_tests()