options = self.getOptions()
# check for product
- src.check_config_has_application( runner.cfg )
+ src.check_config_has_application( config )
- application = runner.cfg.VARS.application
+ application = config.VARS.application
logger.info(_("Building application for <header>%s<reset>\n") % application)
# if section APPLICATION.virtual_app does not exists create one
- if "virtual_app" not in runner.cfg.APPLICATION:
+ if "virtual_app" not in config.APPLICATION:
msg = _("The section APPLICATION.virtual_app is not defined in the product.")
logger.error(UTS.red(msg))
return RCO.ReturnCode("KO", msg)
# get application dir
- target_dir = runner.cfg.APPLICATION.workdir
+ target_dir = config.APPLICATION.workdir
if options.target:
target_dir = options.target
# set list of modules
if options.modules:
- runner.cfg.APPLICATION.virtual_app['modules'] = options.modules
+ config.APPLICATION.virtual_app['modules'] = options.modules
# set name and application_name
if options.name:
- runner.cfg.APPLICATION.virtual_app['name'] = options.name
- runner.cfg.APPLICATION.virtual_app['application_name'] = options.name + "_appdir"
+ config.APPLICATION.virtual_app['name'] = options.name
+ config.APPLICATION.virtual_app['application_name'] = options.name + "_appdir"
- application_name = src.get_cfg_param(runner.cfg.APPLICATION.virtual_app,
+ application_name = src.get_cfg_param(config.APPLICATION.virtual_app,
"application_name",
- runner.cfg.APPLICATION.virtual_app.name + "_appdir")
+ config.APPLICATION.virtual_app.name + "_appdir")
appli_dir = os.path.join(target_dir, application_name)
fmt = " %s = %s\n" # as " label = value\n"
# generate catalog for given list of computers
catalog_src = options.gencat
catalog = generate_catalog(options.gencat.split(","),
- runner.cfg,logger)
- elif 'catalog' in runner.cfg.APPLICATION.virtual_app:
+ config,logger)
+ elif 'catalog' in config.APPLICATION.virtual_app:
# use catalog specified in the product
- if runner.cfg.APPLICATION.virtual_app.catalog.endswith(".xml"):
+ if config.APPLICATION.virtual_app.catalog.endswith(".xml"):
# catalog as a file
- catalog = runner.cfg.APPLICATION.virtual_app.catalog
+ catalog = config.APPLICATION.virtual_app.catalog
else:
# catalog as a list of computers
- catalog_src = runner.cfg.APPLICATION.virtual_app.catalog
+ catalog_src = config.APPLICATION.virtual_app.catalog
mlist = filter(lambda l: len(l.strip()) > 0,
- runner.cfg.APPLICATION.virtual_app.catalog.split(","))
+ config.APPLICATION.virtual_app.catalog.split(","))
if len(mlist) > 0:
- catalog = generate_catalog(runner.cfg.APPLICATION.virtual_app.catalog.split(","),
- runner.cfg, logger)
+ catalog = generate_catalog(config.APPLICATION.virtual_app.catalog.split(","),
+ config, logger)
# display which catalog is used
if len(catalog) > 0:
# generate the application
try:
try: # try/except/finally not supported in all version of python
- retcode = create_application(runner.cfg, appli_dir, catalog, logger)
+ retcode = create_application(config, appli_dir, catalog, logger)
except Exception as exc:
details.append(str(exc))
raise
# check that the command has been called with an application
- src.check_config_has_application( runner.cfg )
+ src.check_config_has_application( config )
# Get the list of products to treat
- products_infos = get_products_list(options, runner.cfg, logger)
+ products_infos = get_products_list(options, config, logger)
# Print some informations
msg = _('Executing the check command in the build directories of the application')
- logger.info("%s %s\n" % (msg, UTS.label(runner.cfg.VARS.application)))
+ logger.info("%s %s\n" % (msg, UTS.label(config.VARS.application)))
info = [(_("BUILD directory"),
- os.path.join(runner.cfg.APPLICATION.workdir, 'BUILD'))]
+ os.path.join(config.APPLICATION.workdir, 'BUILD'))]
UTS.logger_info_tuples(logger, info)
# Call the function that will loop over all the products and execute
# the right command(s)
- res = check_all_products(runner.cfg, products_infos, logger)
+ res = check_all_products(config, products_infos, logger)
# Print the final state
nb_products = len(products_infos)
return 0
# check that the command has been called with an application
- src.check_config_has_application( runner.cfg )
+ src.check_config_has_application( config )
# Print some informations
- nameApp = str(runner.cfg.VARS.application)
- srcDir = os.path.join(runner.cfg.APPLICATION.workdir, 'SOURCES')
- buildDir = os.path.join(runner.cfg.APPLICATION.workdir, 'BUILD')
+ nameApp = str(config.VARS.application)
+ srcDir = os.path.join(config.APPLICATION.workdir, 'SOURCES')
+ buildDir = os.path.join(config.APPLICATION.workdir, 'BUILD')
msg = _("Application %s, executing compile commands in build directories of products.\n"
logger.info(msg % UTS.label(nameApp))
UTS.logger_info_tuples(logger, info)
# Get the list of products to treat
- products_infos = get_products_list(options, runner.cfg, logger)
+ products_infos = get_products_list(options, config, logger)
if options.fathers:
# Extend the list with all recursive dependencies of the given products
- products_infos = extend_with_fathers(runner.cfg, products_infos)
+ products_infos = extend_with_fathers(config, products_infos)
if options.children:
# Extend the list with all products that use the given products
- products_infos = extend_with_children(runner.cfg, products_infos)
+ products_infos = extend_with_children(config, products_infos)
# Sort the list regarding the dependencies of the products
- products_infos = sort_products(runner.cfg, products_infos)
+ products_infos = sort_products(config, products_infos)
# Call the function that will loop over all the products and execute
# the right command(s)
- res = compile_all_products(runner, runner.cfg, options, products_infos, logger)
+ res = compile_all_products(runner, config, options, products_infos, logger)
# Print the final state
nb_products = len(products_infos)
import src.returnCode as RCO
from src.salomeTools import _BaseCommand
import src.configManager as CFGMGR
+import src.system as SYSS
########################################################################
'open_application' not in config): # edit user pyconf
usercfg = os.path.join(config.VARS.personalDir, 'SAT.pyconf')
logger.info(_("Opening %s\n") % usercfg)
- src.system.show_in_editor(editor, usercfg, logger)
+ SYSS.show_in_editor(editor, usercfg, logger)
else:
# search for file <application>.pyconf and open it
for path in config.PATHS.APPLICATIONPATH:
pyconf_path = os.path.join(path, config.VARS.application + ".pyconf")
if os.path.exists(pyconf_path):
logger.info(_("Opening %s\n") % pyconf_path)
- src.system.show_in_editor(editor, pyconf_path, logger)
+ SYSS.show_in_editor(editor, pyconf_path, logger)
break
# case : give information about the product in parameter
# check that the command has been called with an application
- src.check_config_has_application( runner.cfg )
+ src.check_config_has_application( config )
# Get the list of products to treat
- products_infos = get_products_list(options, runner.cfg, logger)
+ products_infos = get_products_list(options, config, logger)
# Print some informations
logger.info(_('Configuring the sources of the application %s\n') %
- UTS.label(runner.cfg.VARS.application))
+ UTS.label(config.VARS.application))
info = [(_("BUILD directory"),
- os.path.join(runner.cfg.APPLICATION.workdir, 'BUILD'))]
+ os.path.join(config.APPLICATION.workdir, 'BUILD'))]
UTS.logger_info_tuples(logger, info)
# Call the function that will loop over all the products and execute
# the right command(s)
if options.option is None:
options.option = ""
- res = configure_all_products(runner.cfg, products_infos, options.option, logger)
+ res = configure_all_products(config, products_infos, options.option, logger)
# Print the final state
nb_products = len(products_infos)
options = self.getOptions()
# check that the command was called with an application
- src.check_config_has_application( runner.cfg )
+ src.check_config_has_application( config )
if options.products is None:
environ_info = None
# add products specified by user (only products
# included in the application)
environ_info = filter(lambda l:
- l in runner.cfg.APPLICATION.products.keys(),
+ l in config.APPLICATION.products.keys(),
options.products)
if options.shell == []:
if out_dir:
out_dir = os.path.abspath(out_dir)
- write_all_source_files(runner.cfg, logger, out_dir=out_dir, shells=shell,
+ write_all_source_files(config, logger, out_dir=out_dir, shells=shell,
prefix=options.prefix, env_info=environ_info)
logger.info("\n")
#TODO return code
if options.path:
l_dir_path = options.path
else:
- src.check_config_has_application(runner.cfg)
+ src.check_config_has_application(config)
if options.sources:
- l_dir_path = [os.path.join(runner.cfg.APPLICATION.workdir,
- "SOURCES")]
+ l_dir_path = [os.path.join(config.APPLICATION.workdir, "SOURCES")]
else:
# find all installation paths
- all_products = runner.cfg.APPLICATION.products.keys()
- l_product_cfg = src.product.get_products_infos(all_products,
- runner.cfg)
+ all_products = config.APPLICATION.products.keys()
+ l_product_cfg = src.product.get_products_infos(all_products, config)
l_dir_path = [pi.install_dir for __, pi in l_product_cfg]
# Get the files to ignore during the searching
options = self.getOptions()
# Check that the command has been called with an application
- src.check_config_has_application(runner.cfg)
+ src.check_config_has_application(config)
logger.write(_('Generation of SALOME modules for application %s\n') % \
- UTS.label(runner.cfg.VARS.application), 1)
+ UTS.label(config.VARS.application), 1)
status = src.KO_STATUS
# verify that YACSGEN is available
- yacsgen_dir = check_yacsgen(runner.cfg, options.yacsgen, logger)
+ yacsgen_dir = check_yacsgen(config, options.yacsgen, logger)
if isinstance(yacsgen_dir, tuple):
# The check failed
logger.info(" insert directory PATH %s = %s\n" % \
("YACSGEN", UTS.blue(yacsgen_dir)
- products = runner.cfg.APPLICATION.products
+ products = config.APPLICATION.products
if options.products:
products = options.products
details = []
nbgen = 0
- context = build_context(runner.cfg, logger)
+ context = build_context(config, logger)
for product in products:
header = _("Generating %s") % UTS.label(product)
header += " %s " % ("." * (20 - len(product)))
logger.write(header, 3)
logger.flush()
- if product not in runner.cfg.PRODUCTS:
+ if product not in config.PRODUCTS:
logger.write(_("Unknown product\n"), 3, False)
continue
- pi = src.product.get_product_config(runner.cfg, product)
+ pi = src.product.get_product_config(config, product)
if not src.product.product_is_generated(pi):
logger.write(_("not a generated product\n"), 3, False)
continue
nbgen += 1
try:
- result = generate_component_list(runner.cfg,
- pi,
- context,
- logger)
+ result = generate_component_list(config, pi, context, logger)
except Exception as exc:
result = str(exc)
import src.debug as DBG
import src.returnCode as RCO
+import src.utilsSat as UTS
from src.salomeTools import _BaseCommand
import src.pyconf as PYCONF
# Print some informations
logger.write(_('Local Settings of SAT %s\n\n') % \
- UTS.label(runner.cfg.VARS.salometoolsway), 1)
+ UTS.label(config.VARS.salometoolsway), 1)
res = 0
res_check = check_path(value, logger)
res += res_check
if res_check == 0:
- res_set = set_local_value(runner.cfg, key, value, logger)
+ res_set = set_local_value(config, key, value, logger)
res += res_set
# Set the options corresponding to an informative value
for opt in [("VCS", options.VCS), ("tag", options.tag)]:
key, value = opt
- res_set = set_local_value(runner.cfg, key, value, logger)
+ res_set = set_local_value(config, key, value, logger)
res += res_set
- display_local_values(runner.cfg, logger)
+ display_local_values(config, logger)
return res
# Try to create the given path
try:
- src.ensure_path_exists(str(path))
+ UTS.ensure_path_exists(str(path))
except Exception as e:
msg = _("""\
Unable to create the directory %s:
logger = self.getLogger()
options = self.getOptions()
- l_cfg_dir = runner.cfg.PATHS.JOBPATH
+ l_cfg_dir = config.PATHS.JOBPATH
# Make sure the jobs_config option has been called
if not options.jobs_cfg:
logger.error(msg)
return 1
- info = [ (_("Platform"), runner.cfg.VARS.dist),
+ info = [ (_("Platform"), config.VARS.dist),
(_("File containing the jobs configuration"), file_jobs_cfg) ]
UTS.logger_info_tuples(logger, info)
import src.ElementTree as etree
import src.debug as DBG
import src.returnCode as RCO
+import src.utilsSat as UTS
from src.salomeTools import _BaseCommand
import src.pyconf as PYCONF
logger = self.getLogger()
options = self.getOptions()
- l_cfg_dir = runner.cfg.PATHS.JOBPATH
+ l_cfg_dir = config.PATHS.JOBPATH
# list option : display all the available config files
if options.list:
one_config_jobs = src.read_config_from_a_file(file_jobs_cfg)
merger.merge(config_jobs, one_config_jobs)
- info = [(_("Platform"), runner.cfg.VARS.dist),
+ info = [(_("Platform"), config.VARS.dist),
(_("Files containing the jobs configuration"), l_conf_files_path)]
UTS.logger_info_tuples(logger, info)
# on every machine
name_pyconf = "_".join([os.path.basename(path)[:-len('.pyconf')]
for path in l_conf_files_path]) + ".pyconf"
- path_pyconf = src.get_tmp_filename(runner.cfg, name_pyconf)
+ path_pyconf = src.get_tmp_filename(config, name_pyconf)
#Save config
f = file( path_pyconf , 'w')
config_jobs.__save__(f)
# log the paramiko problems
- log_dir = src.get_log_path(runner.cfg)
+ log_dir = UTS.get_log_path(config)
paramiko_log_dir_path = os.path.join(log_dir, "JOBS")
- src.ensure_path_exists(paramiko_log_dir_path)
+ UTS.ensure_path_exists(paramiko_log_dir_path)
paramiko = getParamiko(logger)
paramiko.util.log_to_file(os.path.join(paramiko_log_dir_path,
logger.txtFileName))
# Copy the stylesheets in the log directory
log_dir = log_dir
- xsl_dir = os.path.join(runner.cfg.VARS.srcDir, 'xsl')
+ xsl_dir = os.path.join(config.VARS.srcDir, 'xsl')
files_to_copy = []
files_to_copy.append(os.path.join(xsl_dir, STYLESHEET_GLOBAL))
files_to_copy.append(os.path.join(xsl_dir, STYLESHEET_BOARD))
gui = Gui(log_dir,
today_jobs.ljobs,
today_jobs.ljobs_not_today,
- runner.cfg.VARS.datehour,
+ config.VARS.datehour,
logger,
file_boards = options.input_boards)
self.err += _("Unable to get remote log files!\n%s\n" % str(e))
def total_duration(self):
- """Give the total duration of the job
+ """\
+ Give the total duration of the job
:return: the total duration of the job in seconds
:rtype: int
return self._Tf - self._T0
def run(self):
- """Launch the job by executing the remote command.
+ """\
+ Launch the job by executing the remote command.
"""
# Prevent multiple run
self._has_begun = True
def write_results(self):
- """Display on the terminal all the job's information
+ """\
+ Display on the terminal all the job's information
"""
self.logger.write("name : " + self.name + "\n")
if self.after:
self.logger.write(self.err + "\n")
def get_status(self):
- """Get the status of the job (used by the Gui for xml display)
+ """\
+ Get the status of the job (used by the Gui for xml display)
:return: The current status of the job
:rtype: String
time.localtime(self._Tf))
class Jobs(object):
- '''Class to manage the jobs to be run
- '''
+ """\
+ Class to manage the jobs to be run
+ """
def __init__(self,
runner,
logger,
self.determine_jobs_and_machines()
def define_job(self, job_def, machine):
- '''Takes a pyconf job definition and a machine (from class machine)
- and returns the job instance corresponding to the definition.
+ """\
+ Takes a pyconf job definition and a machine (from class machine)
+ and returns the job instance corresponding to the definition.
:param job_def src.config.Mapping: a job definition
:param machine machine: the machine on which the job will run
:return: The corresponding job in a job class instance
:rtype: job
- '''
+ """
name = job_def.name
cmmnds = job_def.commands
if not "timeout" in job_def:
prefix = prefix)
def determine_jobs_and_machines(self):
- '''Function that reads the pyconf jobs definition and instantiates all
- the machines and jobs to be done today.
+ """\
+ Reads the pyconf jobs definition and instantiates all
+ the machines and jobs to be done today.
- :return: Nothing
- :rtype: N\A
- '''
+ :return: None
+ """
today = datetime.date.weekday(datetime.date.today())
host_list = []
options = self.getOptions()
# Verify that the command was called with an application
- src.check_config_has_application( runner.cfg )
+ src.check_config_has_application( config )
# Determine the launcher name (from option, profile section or by default "salome")
if options.name:
launcher_name = options.name
else:
- launcher_name = src.get_launcher_name(runner.cfg)
+ launcher_name = src.get_launcher_name(config)
# set the launcher path
- launcher_path = runner.cfg.APPLICATION.workdir
+ launcher_path = config.APPLICATION.workdir
# Copy a catalog if the option is called
additional_environ = {}
if options.catalog:
- additional_environ = copy_catalog(runner.cfg, options.catalog)
+ additional_environ = copy_catalog(config, options.catalog)
# Generate a catalog of resources if the corresponding option was called
if options.gencat:
- catalog_path = generate_catalog(options.gencat.split(","),
- runner.cfg,
- logger)
- additional_environ = copy_catalog(runner.cfg, catalog_path)
+ catalog_path = generate_catalog(options.gencat.split(","), config, logger)
+ additional_environ = copy_catalog(config, catalog_path)
# Generate the launcher
- launcherPath = generate_launch_file( runner.cfg,
+ launcherPath = generate_launch_file( config,
logger,
launcher_name,
launcher_path,
import src.debug as DBG
import src.returnCode as RCO
+import src.utilsSat as UTS
from src.salomeTools import _BaseCommand
+import src.system as SYSS
# Compatibility python 2/3 for input function
# input stays input for python 3 and input = raw_input for python 2
# get the log directory.
- logDir = src.get_log_path(runner.cfg)
+ logDir = UTS.get_log_path(config)
# Print a header
nb_files_log_dir = len(glob.glob(os.path.join(logDir, "*")))
if options.clean:
nbClean = options.clean
# get the list of files to remove
- lLogs = src.logger.list_log_file(logDir,
- src.logger.log_all_command_file_expression)
+ lLogs = UTS.list_log_file(logDir, UTS.log_all_command_file_expression)
nbLogFiles = len(lLogs)
# Delete all if the invoked number is bigger than the number of log files
if nbClean > nbLogFiles:
os.path.basename(filePath)[:-len('.xml')] + '.pyconf')
remove_log_file(pyconfFilePath, logger)
-
- logger.write("<OK>\n%i logs deleted.\n" % nbClean)
- return 0
+ msg = "%i logs deleted" % nbClean
+ logger.info("<OK>\n%s\n" % msg)
+ return RCO.ReturnCode("OK", msg)
# determine the commands to show in the hat log
- notShownCommands = list(runner.cfg.INTERNAL.log.not_shown_commands)
+ notShownCommands = list(config.INTERNAL.log.not_shown_commands)
if options.full:
notShownCommands = []
# Find the stylesheets Directory and files
- xslDir = os.path.join(runner.cfg.VARS.srcDir, 'xsl')
+ xslDir = os.path.join(config.VARS.srcDir, 'xsl')
xslCommand = os.path.join(xslDir, "command.xsl")
xslHat = os.path.join(xslDir, "hat.xsl")
xsltest = os.path.join(xslDir, "test.xsl")
# So we can clean the LOGS directories easily
shutil.copy(xslCommand, logDir)
shutil.copy(xslHat, logDir)
- src.ensure_path_exists(os.path.join(logDir, "TEST"))
+ UTS.ensure_path_exists(os.path.join(logDir, "TEST"))
shutil.copy(xsltest, os.path.join(logDir, "TEST"))
shutil.copy(imgLogo, logDir)
# If the last option is invoked, just, show the last log file
if options.last_terminal:
- src.check_config_has_application(runner.cfg)
- rootLogDir = os.path.join(runner.cfg.APPLICATION.workdir, 'LOGS')
- src.ensure_path_exists(rootLogDir)
+ src.check_config_has_application(config)
+ rootLogDir = os.path.join(config.APPLICATION.workdir, 'LOGS')
+ UTS.ensure_path_exists(rootLogDir)
log_dirs = os.listdir(rootLogDir)
if log_dirs == []:
raise Exception("log directory empty")
log_dirs= sorted(log_dirs)
- show_last_logs(logger, runner.cfg, log_dirs)
- return 0
+ res = show_last_logs(logger, config, log_dirs)
+ return res
# If the last option is invoked, just, show the last log file
if options.last:
raise Exception("last log file not found in '%s'" % logDir)
if options.terminal:
# Show the log corresponding to the selected command call
- print_log_command_in_terminal(lastLogFilePath, logger)
+ res = print_log_command_in_terminal(lastLogFilePath, logger)
else:
# open the log xml file in the user editor
- src.system.show_in_editor(runner.cfg.USER.browser,
+ res = SYSS.show_in_editor(config.USER.browser,
lastLogFilePath, logger)
- return 0
+ return res
# If the user asks for a terminal display
if options.terminal:
# Parse the log directory in order to find
# all the files corresponding to the commands
- lLogs = src.logger.list_log_file(logDir,
- src.logger.log_macro_command_file_expression)
+ lLogs = UTS.list_log_file(logDir, UTS._log_macro_command_file_expression)
lLogsFiltered = []
for filePath, __, date, __, hour, cmd, __ in lLogs:
- showLog, cmdAppli, __ = src.logger.show_command_log(filePath, cmd,
- runner.cfg.VARS.application, notShownCommands)
- if showLog:
+ showLog = UTS.show_command_log(filePath, cmd, config.VARS.application, notShownCommands)
+ # showLog, cmdAppli, __ = UTS.show_command_log(filePath, cmd,
+ # config.VARS.application, notShownCommands)
+ cmdAppli = showLog.getValue()[0]
+ if showLog.isOk():
lLogsFiltered.append((filePath, date, hour, cmd, cmdAppli))
lLogsFiltered = sorted(lLogsFiltered)
print_log_command_in_terminal(lLogsFiltered[index][0], logger)
x = 0
- return 0
+ return RCO.ReturnCode("OK", "end from user")
# Create or update the hat xml that gives access to all the commands log files
- logger.write(_("Generating the hat log file (can be long) ... "), 3)
+ logger.info(_("Generating the hat log file (can be long) ... "))
xmlHatFilePath = os.path.join(logDir, 'hat.xml')
src.logger.update_hat_xml(logDir,
- application = runner.cfg.VARS.application,
+ application = config.VARS.application,
notShownCommands = notShownCommands)
- logger.info("<OK>\n"))
+ logger.info("<OK>\n")
# open the hat xml in the user editor
if not options.no_browser:
- logger.write(_("\nOpening the log file\n"), 3)
- src.system.show_in_editor(runner.cfg.USER.browser, xmlHatFilePath, logger)
- return 0
-
+ logger.info(_("\nOpening the log file\n"))
+ res = SYSS.show_in_editor(config.USER.browser, xmlHatFilePath, logger)
+ return res
+
+ return RCO.ReturnCode("OK", "option no browser")
def get_last_log_file(logDir, notShownCommands):
- '''Used in case of last option. Get the last log command file path.
+ """\
+ Used in case of last option.
+ Get the last log command file path.
:param logDir str: The directory where to search the log files
:param notShownCommands list: the list of commands to ignore
:return: the path to the last log file
:rtype: str
- '''
+ """
last = (_, 0)
for fileName in os.listdir(logDir):
# YYYYMMDD_HHMMSS_namecmd.xml
- sExpr = src.logger.log_macro_command_file_expression
+ sExpr = UTS._log_macro_command_file_expression
oExpr = re.compile(sExpr)
if oExpr.search(fileName):
# get date and hour and format it
if x > 0:
(__, file_name) = sorted(l_time_file)[x-1]
log_file_path = os.path.join(product_log_dir, file_name)
- src.system.show_in_editor(config.USER.editor, log_file_path, logger)
+ SYSS.show_in_editor(config.USER.editor, log_file_path, logger)
def ask_value(nb):
'''Ask for an int n. 0<n<nb
options = self.getOptions()
# check that the command has been called with an application
- src.check_config_has_application( runner.cfg )
+ src.check_config_has_application( config )
# Get the list of products to treat
- products_infos = get_products_list(options, runner.cfg, logger)
+ products_infos = get_products_list(options, config, logger)
# Print some informations
logger.write(
_('Executing the make command in the build directories of the application %s\n') %
- UTS.label(runner.cfg.VARS.application), 1)
+ UTS.label(config.VARS.application), 1)
info = [(_("BUILD directory"),
- os.path.join(runner.cfg.APPLICATION.workdir, 'BUILD'))]
+ os.path.join(config.APPLICATION.workdir, 'BUILD'))]
UTS.logger_info_tuples(logger, info)
# Call the function that will loop over all the products and execute
# the right command(s)
if options.option is None:
options.option = ""
- res = make_all_products(runner.cfg, products_infos, options.option, logger)
+ res = make_all_products(config, products_infos, options.option, logger)
# Print the final state
nb_products = len(products_infos)
options = self.getOptions()
# check that the command has been called with an application
- src.check_config_has_application( runner.cfg )
+ src.check_config_has_application( config )
# Get the list of products to treat
- products_infos = get_products_list(options, runner.cfg, logger)
+ products_infos = get_products_list(options, config, logger)
# Print some informations
logger.write(_('Executing the make install command in the build directories of the application %s\n') %
- UTS.label(runner.cfg.VARS.application), 1)
+ UTS.label(config.VARS.application), 1)
info = [(_("BUILD directory"),
- os.path.join(runner.cfg.APPLICATION.workdir, 'BUILD'))]
+ os.path.join(config.APPLICATION.workdir, 'BUILD'))]
UTS.logger_info_tuples(logger, info)
# Call the function that will loop over all the products and execute
# the right command(s)
- res = makeinstall_all_products(runner.cfg, products_infos, logger)
+ res = makeinstall_all_products(config, products_infos, logger)
# Print the final state
nb_products = len(products_infos)
return 1
# The repository where to put the package if not Binary or Source
- package_default_path = runner.cfg.LOCAL.workdir
+ package_default_path = config.LOCAL.workdir
# if the package contains binaries or sources:
if options.binaries or options.sources:
# Check that the command has been called with an application
- src.check_config_has_application(runner.cfg)
+ src.check_config_has_application(config)
# Display information
logger.write(_("Packaging application %s\n") % \
- UTS.label(runner.cfg.VARS.application), 1)
+ UTS.label(config.VARS.application), 1)
# Get the default directory where to put the packages
- package_default_path = os.path.join(runner.cfg.APPLICATION.workdir, "PACKAGE")
- src.ensure_path_exists(package_default_path)
+ package_default_path = os.path.join(config.APPLICATION.workdir, "PACKAGE")
+ UTS.ensure_path_exists(package_default_path)
# if the package contains a project:
if options.project:
# check that the project is visible by SAT
- if options.project not in runner.cfg.PROJECTS.project_file_paths:
+ if options.project not in config.PROJECTS.project_file_paths:
local_path = os.path.join(
- runner.cfg.VARS.salometoolsway, "data", "local.pyconf")
+ config.VARS.salometoolsway, "data", "local.pyconf")
msg = _("""\
The project %s is not visible by salomeTools.
Please add it in the %s file.\n""") % (options.project, local_path)
# Remove the products that are filtered by the --without_property option
if options.without_property:
[prop, value] = options.without_property.split(":")
- update_config(runner.cfg, prop, value)
+ update_config(config, prop, value)
# get the name of the archive or build it
if options.name:
archive_name=""
dir_name = package_default_path
if options.binaries or options.sources:
- archive_name = runner.cfg.APPLICATION.name
+ archive_name = config.APPLICATION.name
if options.binaries:
- archive_name += "-"+runner.cfg.VARS.dist
+ archive_name += "-"+config.VARS.dist
if options.sources:
archive_name += "-SRC"
archive_name += ("PROJECT-" + project_name)
if options.sat:
- archive_name += ("salomeTools_" + runner.cfg.INTERNAL.sat_version)
+ archive_name += ("salomeTools_" + config.INTERNAL.sat_version)
if len(archive_name)==0: # no option worked
msg = _("""\
Cannot name the archive.
# Create a working directory for all files that are produced during the
# package creation and that will be removed at the end of the command
- tmp_working_dir = os.path.join(runner.cfg.VARS.tmp_root,
- runner.cfg.VARS.datehour)
- src.ensure_path_exists(tmp_working_dir)
+ tmp_working_dir = os.path.join(config.VARS.tmp_root,
+ config.VARS.datehour)
+ UTS.ensure_path_exists(tmp_working_dir)
logger.write("\n", 5)
logger.write(_("The temporary working directory: %s\n") % tmp_working_dir, 5)
d_paths_to_substitute={}
if options.binaries:
- d_bin_files_to_add = binary_package(runner.cfg,
+ d_bin_files_to_add = binary_package(config,
logger,
options,
tmp_working_dir)
for key in d_bin_files_to_add:
if key.endswith("(bin)"):
source_dir = d_bin_files_to_add[key][0]
- path_in_archive = d_bin_files_to_add[key][1].replace("BINARIES-" + runner.cfg.VARS.dist,"INSTALL")
+ path_in_archive = d_bin_files_to_add[key][1].replace("BINARIES-" + config.VARS.dist,"INSTALL")
if os.path.basename(source_dir)==os.path.basename(path_in_archive):
# if basename is the same we will just substitute the dirname
d_paths_to_substitute[os.path.dirname(source_dir)]=\
if options.sources:
d_files_to_add.update(source_package(runner,
- runner.cfg,
+ config,
logger,
options,
tmp_working_dir))
if options.binaries:
# for archives with bin and sources we provide a shell script able to
# install binaries for compilation
- file_install_bin=produce_install_bin_file(runner.cfg,logger,
+ file_install_bin=produce_install_bin_file(config,logger,
tmp_working_dir,
d_paths_to_substitute,
"install_bin.sh")
# --salomeTool option is not considered when --sources is selected, as this option
# already brings salomeTool!
if options.sat:
- d_files_to_add.update({"salomeTools" : (runner.cfg.VARS.salometoolsway, "")})
+ d_files_to_add.update({"salomeTools" : (config.VARS.salometoolsway, "")})
if options.project:
return 1
# Add the README file in the package
- local_readme_tmp_path = add_readme(runner.cfg,
- options,
- tmp_working_dir)
+ local_readme_tmp_path = add_readme(config, options, tmp_working_dir)
d_files_to_add["README"] = (local_readme_tmp_path, "README")
# Add the additional files of option add_files
env_scripts_tmp_dir,
patches_tmp_dir,
application_tmp_dir]:
- src.ensure_path_exists(directory)
+ UTS.ensure_path_exists(directory)
# Create the pyconf that contains the information of the project
project_pyconf_name = "project.pyconf"
logger.info(' workdir = %s\n\n"', UTS.blue(config.APPLICATION.workdir))
# Get the products list with products informations regarding the options
- products_infos = commands.prepare.get_products_list(options, runner.cfg, logger)
+ products_infos = commands.prepare.get_products_list(options, config, logger)
# Get the maximum name length in order to format the terminal display
max_product_name_len = 1
good_result = 0
for __, product_info in products_infos:
# Apply the patch
- return_code, patch_res = apply_patch(runner.cfg,
+ return_code, patch_res = apply_patch(config,
product_info,
max_product_name_len,
logger)
logger = self.getLogger()
options = self.getOptions()
- src.check_config_has_application(runner.cfg)
+ src.check_config_has_application(config)
if options.prefix is None:
msg = _("The --%s argument is required\n") % "prefix"
logger.write(UTS.red(msg), 1)
return 1
- retcode = generate_profile_sources( runner.cfg, options, logger )
+ retcode = generate_profile_sources(config, options, logger)
if not options.no_update :
- update_pyconf( runner.cfg, options )
+ update_pyconf(config, options)
return retcode
options = self.getOptions()
# check for product
- src.check_config_has_application(runner.cfg)
+ src.check_config_has_application(config)
# Determine launcher path
- launcher_name = src.get_launcher_name(runner.cfg)
- launcher_dir = runner.cfg.APPLICATION.workdir
+ launcher_name = src.get_launcher_name(config)
+ launcher_dir = config.APPLICATION.workdir
# Check the launcher existence
if launcher_name not in os.listdir(launcher_dir):
# Display information : how to get the logs
messageFirstPart = _("\nEnd of execution. To see the traces, "
"please tap the following command :\n")
- messageSecondPart = UTS.label(
- runner.cfg.VARS.salometoolsway +
- os.sep +
- "sat log " +
- runner.cfg.VARS.application + "\n")
+ messageSecondPart = UTS.label( config.VARS.salometoolsway + os.sep +
+ "sat log " + config.VARS.application + "\n")
logger.write(" %s\n" %(messageFirstPart + messageSecondPart), 2)
return 0
options = self.getOptions()
# check that the command has been called with an application
- src.check_config_has_application( runner.cfg )
+ src.check_config_has_application( config )
# Get the list of products to treat
- products_infos = get_products_list(options, runner.cfg, logger)
+ products_infos = get_products_list(options, config, logger)
# Print some informations
msg = ('Executing the script in the build directories of the application %s\n') % \
- UTS.label(runner.cfg.VARS.application)
+ UTS.label(config.VARS.application)
logger.write(msg, 1)
- info = [(_("BUILD directory"), os.path.join(runner.cfg.APPLICATION.workdir, 'BUILD'))]
+ info = [(_("BUILD directory"), os.path.join(config.APPLICATION.workdir, 'BUILD'))]
UTS.logger_info_tuples(logger, info)
# Call the function that will loop over all the products and execute
# the right command(s)
if options.nb_proc is None:
options.nb_proc = 0
- res = run_script_all_products(runner.cfg,
+ res = run_script_all_products(config,
products_infos,
options.nb_proc,
logger)
import src.debug as DBG
import src.returnCode as RCO
from src.salomeTools import _BaseCommand
+import src.system as SYSS
########################################################################
# Command class
logger.write("\n" + msg)
# Call the system function that do the extraction in git mode
- retcode = src.system.git_extract(repo_git,
+ retcode = SYSS.git_extract(repo_git,
product_info.git_info.tag,
source_dir, logger, environ)
return retcode
3, False)
logger.flush()
# Call the system function that do the extraction in archive mode
- retcode, NameExtractedDirectory = src.system.archive_extract(
+ retcode, NameExtractedDirectory = SYSS.archive_extract(
product_info.archive_info.archive_name,
source_dir.dir(), logger)
logger.write(msg)
# Call the system function that do the extraction in cvs mode
- retcode = src.system.cvs_extract(protocol, user,
+ retcode = SYSS.cvs_extract(protocol, user,
product_info.cvs_info.server,
product_info.cvs_info.product_base,
product_info.cvs_info.tag,
logger.write('%s:%s ... ' % (coflag, product_info.svn_info.repo)
# Call the system function that do the extraction in svn mode
- retcode = src.system.svn_extract(user,
+ retcode = SYSS.svn_extract(user,
product_info.svn_info.repo,
product_info.svn_info.tag,
source_dir,
import src.returnCode as RCO
import src.utilsSat as UTS
from src.salomeTools import _BaseCommand
+import src.system as SYSS
# Compatibility python 2/3 for input function
# input stays input for python 3 and input = raw_input for python 2
logger.error(msg_miss % "target")
return 1
- if "APPLICATION" in runner.cfg:
+ if "APPLICATION" in config:
msg = _("This command does not use a product.\n")
logger.error(msg)
return 1
if options.info:
- return get_template_info(runner.cfg, options.template, logger)
+ return get_template_info(config, options.template, logger)
if options.name is None:
logger.error(msg_miss % "name")
# CNC inutile
# Ask user confirmation if a module of the same name already exists
- #if options.name in runner.cfg.PRODUCTS and not runner.options.batch:
+ #if options.name in config.PRODUCTS and not runner.options.batch:
# logger.write(UTS.red(
# _("A module named '%s' already exists." % options.name)), 1)
# logger.write("\n", 1)
return 1
conf_values[param_def[0].strip()] = param_def[1].strip()
- retcode = prepare_from_template(runner.cfg, options.name, options.template,
+ retcode = prepare_from_template(config, options.name, options.template,
target_dir, conf_values, logger)
if retcode == 0:
logger.write(" " + _(
"Extract template %s\n") % UTS.info(
template), 4)
- src.system.archive_extract(template_src_dir, target_dir)
+ SYSS.archive_extract(template_src_dir, target_dir)
else:
logger.write(" " + _(
"Copy template %s\n") % UTS.info(
if os.path.isdir(sources):
shutil.copytree(sources, tmpdir)
else:
- src.system.archive_extract(sources, tmpdir)
+ SYSS.archive_extract(sources, tmpdir)
settings_file = os.path.join(tmpdir, "template.info")
if not os.path.exists(settings_file):
# the test base is specified either by the application, or by the --base option
with_application = False
- if runner.cfg.VARS.application != 'None':
+ if config.VARS.application != 'None':
logger.write(
_('Running tests on application %s\n') %
- UTS.label(runner.cfg.VARS.application), 1)
+ UTS.label(config.VARS.application), 1)
with_application = True
elif not options.base:
raise Exception(
os.environ['DISPLAY'] = options.display
elif 'DISPLAY' not in os.environ:
# if no display set
- if ('test' in runner.cfg.LOCAL and
- 'display' in runner.cfg.LOCAL.test and
- len(runner.cfg.LOCAL.test.display) > 0):
+ if ('test' in config.LOCAL and
+ 'display' in config.LOCAL.test and
+ len(config.LOCAL.test.display) > 0):
# use default value for test tool
- os.environ['DISPLAY'] = runner.cfg.LOCAL.test.display
+ os.environ['DISPLAY'] = config.LOCAL.test.display
else:
os.environ['DISPLAY'] = "localhost:0.0"
# initialization
#################
if with_application:
- tmp_dir = os.path.join(runner.cfg.VARS.tmp_root,
- runner.cfg.APPLICATION.name,
+ tmp_dir = os.path.join(config.VARS.tmp_root,
+ config.APPLICATION.name,
"test")
else:
- tmp_dir = os.path.join(runner.cfg.VARS.tmp_root,
+ tmp_dir = os.path.join(config.VARS.tmp_root,
"test")
# remove previous tmp dir
_("error removing TT_TMP_RESULT %s\n") % tmp_dir)
lines = []
- lines.append("date = '%s'" % runner.cfg.VARS.date)
- lines.append("hour = '%s'" % runner.cfg.VARS.hour)
- lines.append("node = '%s'" % runner.cfg.VARS.node)
- lines.append("arch = '%s'" % runner.cfg.VARS.dist)
+ lines.append("date = '%s'" % config.VARS.date)
+ lines.append("hour = '%s'" % config.VARS.hour)
+ lines.append("node = '%s'" % config.VARS.node)
+ lines.append("arch = '%s'" % config.VARS.dist)
- if 'APPLICATION' in runner.cfg:
+ if 'APPLICATION' in config:
lines.append("application_info = {}")
lines.append("application_info['name'] = '%s'" %
- runner.cfg.APPLICATION.name)
+ config.APPLICATION.name)
lines.append("application_info['tag'] = '%s'" %
- runner.cfg.APPLICATION.tag)
+ config.APPLICATION.tag)
lines.append("application_info['products'] = %s" %
- str(runner.cfg.APPLICATION.products))
+ str(config.APPLICATION.products))
content = "\n".join(lines)
test_base = ""
if options.base:
test_base = options.base
- elif with_application and "test_base" in runner.cfg.APPLICATION:
- test_base = runner.cfg.APPLICATION.test_base.name
+ elif with_application and "test_base" in config.APPLICATION:
+ test_base = config.APPLICATION.test_base.name
fmt = " %s = %s\n"
msg = fmt % (_('Display'), os.environ['DISPLAY'])
logger.info(msg)
# create the test object
- test_runner = src.test_module.Test(runner.cfg,
+ test_runner = src.test_module.Test(config,
logger,
base_dir,
testbase=test_base,
logger.write("\n", 2, False)
logger.write(_("\nGenerate the specific test log\n"), 5)
- log_dir = src.get_log_path(runner.cfg)
+ log_dir = UTS.get_log_path(config)
out_dir = os.path.join(log_dir, "TEST")
- src.ensure_path_exists(out_dir)
+ UTS.ensure_path_exists(out_dir)
name_xml_board = logger.logFileName.split(".")[0] + "board" + ".xml"
- historic_xml_path = generate_history_xml_path(runner.cfg, test_base)
+ historic_xml_path = generate_history_xml_path(config, test_base)
- create_test_report(runner.cfg,
+ create_test_report(config,
historic_xml_path,
out_dir,
retcode,
test_base_name = os.path.basename(test_base)
history_xml_name += test_base_name
history_xml_name += ".xml"
- log_dir = src.get_log_path(config)
+ log_dir = UTS.get_log_path(config)
return os.path.join(log_dir, "TEST", history_xml_name)
Basic requirements
==================
+.. warning :: THIS IS OBSOLETE FOR SAT 5.1
+
By adding a file *mycommand.py* in the ``commands`` directory, salomeTools will define a new command named ``mycommand``.
In *mycommand.py*, there must be the following method: ::
========================================================
The *runner* variable is an python instance of *Sat* class.
-It gives access to *runner.cfg* which is the data model defined from all
+It gives access to *runner.getConfig()* which is the data model defined from all
*configuration pyconf files* of salomeTools
For example, *runner.cfg.APPLICATION.workdir*
contains the root directory of the current application.
try:
returnCode = sat.execute_cli(args)
- logger.debug("sat exit code: %s" % returnCode)
+ if returnCode.isOk():
+ logger.debug("sat exit code: %s" % returnCode) # OK no trace
+ else:
+ logger.error("sat exit code: %s" % returnCode) # KO say why
sys.exit(returnCode.toSys())
except Exception as e:
--- /dev/null
+#!/usr/bin/env python
+#-*- coding:utf-8 -*-
+
+# Copyright (C) 2010-20xx CEA/DEN
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+"""\
+define class as a simple dictionary with keys
+with pretty print __str__ and __repr__ (indented as recursive)
+and jsonDumps()
+
+usage:
+ >> import catchAll as CAA
+ >> a = CAA.CatchAll()
+ >> a.tintin = "reporter"
+ >> a.milou = "dog"
+ >> print("a=%s" % a)
+ >> print("tintin: %s" % a.tintin)
+"""
+
+import pprint as PP
+import json
+
+########################################################################################
+# json utils
+########################################################################################
+def dumper(obj):
+ """to json explore subclass object as dict"""
+ return obj.__dict__
+
+def dumperType(obj):
+ """\
+ to get a "_type" to trace json subclass object,
+ but ignore all attributes begining with '_'
+ """
+ typeatt = "_type"
+ aDict = dict((k,v) for k, v in obj.__dict__.iteritems() if k[0] != "_" or k == typeatt)
+ if not aDict.has_key(typeatt): aDict[typeatt] = obj.__class__.__name__
+ return aDict
+
+def jsonDumps(obj):
+ """to get direct default jsonDumps method"""
+ return json.dumps(obj, default=dumperType, sort_keys=True, indent=2)
+
+
+########################################################################################
+class CatchAll(object):
+ """\
+ class as simple dynamic dictionary
+ with predefined keys as properties in
+ inherited classes through __init__ method. Or NOT.
+ with pretty print __str__ and __repr__ (indented as recursive)
+ with jsonDumps()
+
+ usage:
+
+ >> import catchAll as CAA
+ >> a = CAA.CatchAll()
+ >> a.tintin = "reporter"
+ >> a.milou = "dog"
+ >> print("a=%s" % a)
+ >> print("tintin: %s" % a.tintin)
+
+ as
+
+ >> a = {}
+ >> a["tintin"] = "reporter"
+ >> a["milou"] = "dog"
+ >> print("tintin: %s" % a["tintin"]
+ """
+
+ def __repr__asList(self):
+ """\
+ goal is to be unambiguous
+ an ordered list representation is better for test (and visualize) (in)equality
+ """
+ aList = []
+ for k in sorted(self.__dict__.keys()):
+ if k[0] != '_':
+ aList.append( [k, self.__dict__[k]] )
+ return self.__class__.__name__ + " = " + aList.__repr__()
+
+ def __repr__(self):
+ """goal is to be unambiguous, easy human readeable"""
+ return self._reprIndent()
+
+ def _reprIndent(self, indent=0):
+ res = ""
+ newIndent = indent + 2
+ for k in sorted(self.__dict__.keys()):
+ if k[0] != '_':
+ kk = self.__dict__[k]
+ if issubclass(CatchAll, kk.__class__):
+ res += "\n" + " "*newIndent + "%s: %s" % (k, kk._reprIndent(newIndent))
+ else:
+ skk = self._indent(PP.pformat(kk), newIndent)
+ res += "\n" + " "*newIndent + "%s: %s" % (k, skk)
+ return self.__class__.__name__ + "(" + res + ")"
+
+ def _indent(self, txt, indent):
+ txts = txt.split("\n")
+ if len(txt) > 1:
+ return ("\n" + " "*indent).join(txts)
+ else:
+ return txt
+
+
+ def jsonDumps(self):
+ return jsonDumps(self)
+
import shutil
from src.options import OptResult
+import src.utilsSat as UTS
C_COMPILE_ENV_LIST = ["CC",
"LOGS",
self.product_info.name)
file_path = os.path.join(dir_where_to_put, file_name)
- src.ensure_path_exists(dir_where_to_put)
+ UTS.ensure_path_exists(dir_where_to_put)
# write the logTxtFile copy it to the destination, and then recreate
# it as it was
self.logger.logTxtFile.close()
# " file of salomTools).\n")
#
- src.ensure_path_exists(config.VARS.personalDir)
- src.ensure_path_exists(os.path.join(config.VARS.personalDir,
+ UTS.ensure_path_exists(config.VARS.personalDir)
+ UTS.ensure_path_exists(os.path.join(config.VARS.personalDir,
'Applications'))
f = open(cfg_name, 'w')
def __repr__(self):
"""complete with value, 'ok, why, value' message"""
- res = "%s: '%s' for value: %s" % (self._status, self._why, PP.pformat(self._value))
+ res = '%s: "%s" for value: %s' % (self._status, self._why, PP.pformat(self._value))
return res
def __str__(self):
"""without value, only simple 'ok, why' message"""
- res = "%s: '%s'" % (self._status, self._why)
+ res = '%s: "%s"' % (self._status, self._why)
return res
def indent(self, text, amount=5, ch=' '):
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
'''
-In this file : all functions that do a system call,
+All utilities method doing a system call,
like open a browser or an editor, or call a git command
+
+usage:
+ >> import src.system as SYSS
'''
import subprocess
import os
import tarfile
-
-from . import printcolors
+import src.returnCode as RCO
def show_in_editor(editor, filePath, logger):
- '''open filePath using editor.
+ """\
+ open filePath using editor.
:param editor str: The editor to use.
:param filePath str: The path to the file to open.
- '''
+ """
# default editor is vi
if editor is None or len(editor) == 0:
editor = 'vi'
try:
# launch cmd using subprocess.Popen
cmd = editor % filePath
- logger.write('Launched command:\n' + cmd + '\n', 5)
+ msg = "show_in_editor command: '%s'" % cmd
+ logger.debug(msg)
p = subprocess.Popen(cmd, shell=True)
p.communicate()
+ return RCO.ReturnCode("OK", msg)
except:
- logger.write(printcolors.printcError(
- _("Unable to edit file %s\n") % filePath), 1)
+ msg = _("Unable to edit file '%s'") % filePath
+ logger.error(msg)
+ return RCO.ReturnCode("KO", msg)
def git_extract(from_what, tag, where, logger, environment=None):
- '''Extracts sources from a git repository.
+ """\
+ Extracts sources from a git repository.
:param from_what str: The remote git repository.
:param tag str: The tag.
extracting.
:return: True if the extraction is successful
:rtype: boolean
- '''
+ """
if not where.exists():
where.make()
if tag == "master" or tag == "HEAD":
'where': str(where),
'where_git': where_git }
- logger.write(command + "\n", 5)
+ logger.debug("git_extract \n" + command)
logger.logTxtFile.write("\n" + command + "\n")
logger.logTxtFile.flush()
return (res == 0)
def archive_extract(from_what, where, logger):
- '''Extracts sources from an archive.
+ """\
+ Extracts sources from an archive.
:param from_what str: The path to the archive.
:param where str: The path where to extract.
:param logger Logger: The logger instance to use.
:return: True if the extraction is successful
:rtype: boolean
- '''
+ """
try:
archive = tarfile.open(from_what)
for i in archive.getmembers():
archive.extract(i, path=str(where))
return True, os.path.commonprefix(archive.getnames())
except Exception as exc:
- logger.write("archive_extract: %s\n" % exc)
+ logger.error("archive_extract: %s\n" % exc)
return False, None
def cvs_extract(protocol, user, server, base, tag, product, where,
logger, checkout=False, environment=None):
- '''Extracts sources from a cvs repository.
+ """\
+ Extracts sources from a cvs repository.
:param protocol str: The cvs protocol.
:param user str: The user to be used.
extracting.
:return: True if the extraction is successful
:rtype: boolean
- '''
+ """
opttag = ''
if tag is not None and len(tag) > 0:
logger,
checkout=False,
environment=None):
- '''Extracts sources from a svn repository.
+ """\
+ Extracts sources from a svn repository.
:param user str: The user to be used.
:param from_what str: The remote git repository.
extracting.
:return: True if the extraction is successful
:rtype: boolean
- '''
+ """
if not where.exists():
where.make()
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-"""
+"""\
utilities for sat
general useful simple methods
all-in-one import srs.utilsSat as UTS
usage:
- >> import srs.utilsSat as UTS
- >> UTS.ensure_path_exists(path)
+>> import srsc.utilsSat as UTS
+>> UTS.ensure_path_exists(path)
"""
import os
import errno
import stat
-from src.coloringSat import cleanColors # as shortcut
+import datetime
+import re
+import tempfile
+
+import src.returnCode as RCO
##############################################################################
# file system utilities
:param tuples list: The list of tuples to format
:return: The tabulated text. (mutiples lines)
"""
- # find the maximum length of the first value of the tuples in info
+ # find the maximum length of the first value of the tuples
smax = max(map(lambda l: len(l[0]), tuples))
# Print each item of tuples with good indentation
msg = ""
- for i in info:
+ for i in tuples:
sp = " " * (smax - len(i[0]))
- msg += sp + "%s = %s\n" % i[0:1] # tuples, may be longer
- if len(info) > 1: msg += "\n" # for long list
+ msg += sp + "%s = %s\n" % (i[0], i[1]) # tuples, may be longer
+ if len(tuples) > 1: msg += "\n" # for long list
return msg
def formatValue(label, value, suffix=""):
return res
+##############################################################################
+# log utilities (TODO: set in loggingSat class ? ...)
+##############################################################################
+_log_macro_command_file_expression = "^[0-9]{8}_+[0-9]{6}_+.*\.xml$"
+
+def date_to_datetime(date):
+ """\
+ From a string date in format YYYYMMDD_HHMMSS
+ returns list year, mon, day, hour, minutes, seconds
+
+ :param date str: The date in format YYYYMMDD_HHMMSS
+ :return: the same date and time in separate variables.
+ :rtype: (str,str,str,str,str,str)
+ """
+ Y = date[:4]
+ m = date[4:6]
+ dd = date[6:8]
+ H = date[9:11]
+ M = date[11:13]
+ S = date[13:15]
+ return Y, m, dd, H, M, S
+
+def timedelta_total_seconds(timedelta):
+ """\
+ Replace total_seconds from datetime module
+ in order to be compatible with old python versions
+
+ :param timedelta datetime.timedelta: The delta between two dates
+ :return: The number of seconds corresponding to timedelta.
+ :rtype: float
+ """
+ return (
+ timedelta.microseconds + 0.0 +
+ (timedelta.seconds + timedelta.days * 24 * 3600) * 10 ** 6) / 10 ** 6
+
+def show_command_log(logFilePath, cmd, application, notShownCommands):
+ """\
+ Used in updateHatXml.
+ Determine if the log xml file logFilePath
+ has to be shown or not in the hat log.
+
+ :param logFilePath str: the path to the command xml log file
+ :param cmd str: the command of the log file
+ :param application str: the application passed as parameter
+ to the salomeTools command
+ :param notShownCommands list: the list of commands
+ that are not shown by default
+
+ :return: RCO.ReturnCode("OK") if cmd is not in notShownCommands and the application
+ in the log file corresponds to application
+ ReturnCode value is tuple (appliLog, launched_cmd)
+ """
+ # When the command is not in notShownCommands, no need to go further :
+ # Do not show
+ if cmd in notShownCommands:
+ return RCO.ReturnCode("KO", "in notShownCommands", None)
+
+ # Get the application of the log file
+ try:
+ logFileXml = src.xmlManager.ReadXmlFile(logFilePath)
+ except Exception as e:
+ msg = _("The log file '%s' cannot be read:" % logFilePath)
+ return RCO.ReturnCode("KO", msg, None)
+
+ if 'application' in logFileXml.xmlroot.keys():
+ appliLog = logFileXml.xmlroot.get('application')
+ launched_cmd = logFileXml.xmlroot.find('Site').attrib['launchedCommand']
+ # if it corresponds, then the log has to be shown
+ if appliLog == application:
+ return RCO.ReturnCode("OK", "appliLog == application", (appliLog, launched_cmd))
+ elif application != 'None':
+ return RCO.ReturnCode("KO", "application != 'None'", (appliLog, launched_cmd))
+
+ return RCO.ReturnCode("OK", "", (appliLog, launched_cmd))
+
+ if application == 'None':
+ return RCO.ReturnCode("OK", "application == 'None'", (None, None))
+
+ return RCO.ReturnCode("KO", "", (None, None))
+
+def list_log_file(dirPath, expression):
+ """Find all files corresponding to expression in dirPath
+
+ :param dirPath str: the directory where to search the files
+ :param expression str: the regular expression of files to find
+ :return: the list of files path and informations about it
+ :rtype: list
+ """
+ lRes = []
+ for fileName in os.listdir(dirPath):
+ # YYYYMMDD_HHMMSS_namecmd.xml
+ sExpr = expression
+ oExpr = re.compile(sExpr)
+ if oExpr.search(fileName):
+ file_name = fileName
+ if fileName.startswith("micro_"):
+ file_name = fileName[len("micro_"):]
+ # get date and hour and format it
+ date_hour_cmd_host = file_name.split('_')
+ date_not_formated = date_hour_cmd_host[0]
+ date = "%s/%s/%s" % (date_not_formated[6:8],
+ date_not_formated[4:6],
+ date_not_formated[0:4])
+ hour_not_formated = date_hour_cmd_host[1]
+ hour = "%s:%s:%s" % (hour_not_formated[0:2],
+ hour_not_formated[2:4],
+ hour_not_formated[4:6])
+ if len(date_hour_cmd_host) < 4:
+ cmd = date_hour_cmd_host[2][:-len('.xml')]
+ host = ""
+ else:
+ cmd = date_hour_cmd_host[2]
+ host = date_hour_cmd_host[3][:-len('.xml')]
+ lRes.append((os.path.join(dirPath, fileName),
+ date_not_formated,
+ date,
+ hour_not_formated,
+ hour,
+ cmd,
+ host))
+ return lRes
+
+def update_hat_xml(logDir, application=None, notShownCommands = []):
+ """\
+ Create the xml file in logDir that contain all the xml file
+ and have a name like YYYYMMDD_HHMMSS_namecmd.xml
+
+ :param logDir str: the directory to parse
+ :param application str: the name of the application if there is any
+ """
+ # Create an instance of XmlLogFile class to create hat.xml file
+ xmlHatFilePath = os.path.join(logDir, 'hat.xml')
+ xmlHat = src.xmlManager.XmlLogFile(xmlHatFilePath,
+ "LOGlist", {"application" : application})
+ # parse the log directory to find all the command logs,
+ # then add it to the xml file
+ lLogFile = list_log_file(logDir, _log_macro_command_file_expression)
+ for filePath, __, date, __, hour, cmd, __ in lLogFile:
+ showLog, cmdAppli, full_cmd = show_command_log(filePath, cmd,
+ application, notShownCommands)
+ #if cmd not in notShownCommands:
+ if showLog:
+ # add a node to the hat.xml file
+ xmlHat.add_simple_node("LogCommand",
+ text=os.path.basename(filePath),
+ attrib = {"date" : date,
+ "hour" : hour,
+ "cmd" : cmd,
+ "application" : cmdAppli,
+ "full_command" : full_cmd})
+
+ # Write the file on the hard drive
+ xmlHat.write_tree('hat.xsl')
except:
pass
-import src
-from . import ElementTree as etree
+import src.utilsSat as UTS
+import src.ElementTree as etree
class XmlLogFile(object):
'''Class to manage writing in salomeTools xml log file
# Initialize the filePath and ensure that the directory
# that contain the file exists (make it if necessary)
self.logFile = filePath
- src.ensure_path_exists(os.path.dirname(filePath))
+ UTS.ensure_path_exists(os.path.dirname(filePath))
# Initialize the field that contain the xml in memory
self.xmlroot = etree.Element(rootname, attrib = attrib)
--- /dev/null
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+# Copyright (C) 2008-20xx CEA/DEN
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+# See http://www.salome-platform.org or email : webmaster.salome@opencascade.com
+
+import unittest
+
+import src.debug as DBG # Easy print stderr (for DEBUG only)
+from src.catchAll import CatchAll as CA
+
+verbose = False # True #
+
+########################################################################################
+class TestCase(unittest.TestCase):
+ "Test the catchAll.py"""
+
+ def test_000(self):
+ # one shot setUp() for this TestCase
+ if verbose:
+ DBG.push_debug(True)
+ # DBG.write("assert unittest", [a for a in dir(self) if "assert" in a])
+ pass
+
+ def test_999(self):
+ # one shot tearDown() for this TestCase
+ if verbose:
+ DBG.pop_debug()
+ return
+
+ def test_005(self):
+ a = CA()
+ a.tintin = "reporter"
+ a.milou = "dog"
+ a._yoo = "abcd" # not in repr
+ self.assertEqual(a.tintin, "reporter")
+ self.assertEqual(a.milou, "dog")
+ DBG.write("test_005 str", str(a))
+ DBG.write("test_005 repr", repr(a))
+ DBG.write("test_005 jsondump", a.jsonDumps())
+ del(a.tintin)
+ self.assertFalse(hasattr(a, "tintin"))
+ self.assertEqual(a.milou, "dog")
+ self.assertIn("_yoo", a.__dict__.keys())
+
+
+ def test_010(self):
+ h = CA()
+ h.haddock = "sailor"
+ h.tintin = "reporter"
+ h.milou = "dog"
+ h._yoo = "abcd" # not in repr
+ aDict = {1: "1", 2: "22", 10: "1000000000"}
+ # long for indent view
+ h.other = ["castafiore",
+ "nestor",
+ "irmaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
+ aDict]
+ a = CA()
+ a.heroes = h
+ DBG.write("test_010 str", str(a))
+ DBG.write("test_010 repr", repr(a))
+ DBG.write("test_010 jsondump", a.jsonDumps())
+ r = repr(a)
+ self.assertIn("tintin:", r)
+ self.assertIn("other:", r)
+ self.assertIn("1000000000", r)
+ self .assertNotIn("abcd", r) # not in repr
+ self .assertEqual(a.heroes._yoo, "abcd") # but in a.heroes
+
+
+if __name__ == '__main__':
+ unittest.main(exit=False)
+ pass
+