envi = ENVI.SalomeEnviron(config, ENVI.Environ(dict(os.environ)), True)
envi.set_a_product('Python', logger)
- command = "python %s --prefix=%s --config=%s" % (script, appli_dir, config_file)
+ command = """
+set -x
+which python
+python %s --prefix=%s --config=%s
+""" % (script, appli_dir, config_file)
res = UTS.Popen(command, shell=True, cwd=target_dir, env=envi.environ.environ, logger=logger)
res.raiseIfKo()
return res
return RCO.ReturnCode("KO", "to suppress in clean command")
# Check with the user if he really wants to suppress the directories
- msg = UTS.red(_("Remove the following directories:\n"))
+ msg = _("Remove the following directories:\n")
for directory in l_dir_to_suppress:
- msg += " %s\n" % directory
+ msg += " %s\n" % UTS.info(str(directory))
if runner.getAnswer(msg[:-1]) == "No":
return RCO.ReturnCode("OK", "user do not want to continue")
for path in l_paths:
strpath = str(path)
if not path.isdir():
- msg = _("The path %s does not exists (or is not a directory)\n") % strpath
+ msg = _("The path %s does not exists (or is not a directory)") % UTS.info(strpath)
logger.warning(msg)
else:
- logger.info(_("Removing %s ...") % strpath )
path.rm()
- logger.info('OK')
+ logger.info(_("Remove %s ...<OK>") % UTS.info(strpath))
if os.path.exists( prefix ) :
if not options.force :
raise Exception(
- _("The path %s already exists, use option --force to remove it.") % prefix )
+ _("The path %s already exists, use option --force to remove it.") % prefix)
else :
shutil.rmtree( prefix )
UTS.check_config_has_application(config).raiseIfKo()
# Print some informations
- logger.info(_('Getting sources of the application %s') % UTS.label(config.VARS.application))
- logger.info(" workdir = %s" % UTS.blue(config.APPLICATION.workdir))
+ msg = _('Getting sources of the application %s\n workdir = %s\n')
+ msg = msg % (UTS.label(config.VARS.application), UTS.info(config.APPLICATION.workdir))
+ logger.info(msg)
# Get the products list with products informations regarding the options
products_infos = self.get_products_list(options, config)
status = "KO"
msg = _("Some sources haven't been get")
details = [p for p in results if (results[product] == 0 or results[product] is None)]
- details = " ".join(details)
+ details = ",".join(details)
logger.info("\n%s %s: <%s>.\n%s\n" % (msg, msgCount, status, details))
- return RCO.ReturnCode(status, "%s %s" % (msg, msgCount))
+ return RCO.ReturnCode(status, "%s get sources: %s" % (msg, msgCount))
def get_source_for_dev(config, product_info, source_dir, logger, pad):
repo_git = product_info.git_info.repo
# Display informations
- msg = "%s:%s" % (coflag, repo_git)
- msg += " " * (pad + 50 - len(repo_git))
- msg += " tag:%s" % product_info.git_info.tag
- msg += "."*(10 - len(product_info.git_info.tag))
+ msg = " %s:%s tag:%s ..." % (coflag, repo_git, product_info.git_info.tag)
logger.info(msg)
# Call the system function that do the extraction in git mode
"""
# Get the application environment
- logger.info(_("Set the application environment"))
+ logger.debug(_("Set the application environment"))
env_appli = ENVI.SalomeEnviron(config, ENVI.Environ(dict(os.environ)))
env_appli.set_application_env(logger)
source_dir = UTS.Path('')
# display and log
- logger.info('%s: ' % UTS.label(product_name))
- logger.info(' ' * (max_product_name_len - len(product_name)))
+ p_len = len(product_name)
+ logger.info('%s: ...' % UTS.label(product_name))
# Remove the existing source directory if
# the product is not in development mode
is_dev = PROD.product_is_dev(product_info)
if source_dir.exists():
- logger.info("<OK>\n")
- msg = _("Nothing done because source directory existing yet.\n")
- logger.info(msg)
+ msg = _("Do nothing because source directory existing yet.")
+ logger.warning(msg)
+ logger.info("<OK>")
good_result = good_result + 1
# Do not get the sources and go to next product
continue
# print the result
if not(PROD.product_is_fixed(product_info) or PROD.product_is_native(product_info)):
- logger.info('%s\n' % res)
+ logger.info(res)
return good_result, results
def check_remote_machine(machine_name, logger):
logger.debug(_("Check the display on %s\n") % machine_name)
- ssh_cmd = 'ssh -o "StrictHostKeyChecking no" %s "ls"' % machine_name
+ ssh_cmd = """
+set -x
+ssh -o "StrictHostKeyChecking no" %s "whoami"
+""" % machine_name
res = UTS.Popen(ssh_cmd, shell=True, logger=logger)
def create_test_report(config,
raise Exception(msg)
import src.debug as DBG
kNew = kParent.appendLink(idNew)
- DBG.write("appendLinkForCommand %i for parent" % idNew, idParent, True)
+ DBG.write("appendLinkForCommand %i for parent" % idNew, idParent)
return kNew
def setAttribLinkForCommand(cmd, nameAttrib, value):
kCommand = k0.findLink(idCommand)
kLinks = kCommand.getAllIdNames()[1:] #avoid first idCommand
res = [kCommand.findLink(k) for k in kLinks]
- DBG.write("getLinksForXml", [k.toDict() for k in res], True)
+ DBG.write("getLinksForXml", [k.toDict() for k in res])
return res
def getLinksForCmd(idCommand):
handler.setFormatter(formatter)
handler.idCommandHandlers = 0
logger.addHandler(handler)
+ # as RootLogger is level WARNING
+ # my logger is not notset but low, handlers needs setlevel greater
+ logger.setLevel(LOGI.DEBUG)
+ # import src/debug as DBG
+ # tmp = (logger.getEffectiveLevel(), LOGI.NOTSET, logger.level, logger.parent.level)
+ # DBG.write("logger levels tmp, True)
if level is not None: # level could be modified during execution....
- logger.setLevel(level)
handler.setLevel(level) # on screen log as user wants
else:
- logger.setLevel(LOGI.STEP) # in xml files log step
handler.setLevel(LOGI.INFO) # on screen no log step, which are in xml files
-
+ return
+
def initLoggerAsUnittest(logger, fmt=None, level=None):
"""
_loggerDefault = getDefaultLogger()
_loggerUnittest = getUnittestLogger()
initLoggerAsDefault(_loggerDefault, '%(levelname)s :: %(message)s')
- initLoggerAsUnittest(_loggerUnittest, '%(asctime)s :: %(levelname)s :: %(message)s', level=LOGI.DEBUG)
+ initLoggerAsUnittest(_loggerUnittest, '%(asctime)s :: %(levelname)s :: %(message)s')
env["PATH"] = rootdir + ":" + env["PATH"]
# TODO setLocale not 'fr' on subprocesses, why not?
# env["LANG"] == ''
- p = SP.Popen(command, shell=True, env=env, stdout=SP.PIPE, stderr=SP.PIPE)
- res = P.communicate()
+ res = UTS.Popen(command, env=env)
return res
def setNotLocale():
import utilsSat as UTS
import src.returnCode as RCO
+import src.debug as DBG
def show_in_editor(editor, filePath, logger):
"""open filePath using editor.
cmd = editor % filePath
msg = "show_in_editor command: '%s'" % cmd
logger.debug(msg)
- p = SP.Popen(cmd, shell=True)
- p.communicate()
- return RCO.ReturnCode("OK", msg)
+ res = UTS.Popen(cmd, logger=logger)
+ return res
except:
msg = _("Unable to edit file '%s'") % filePath
logger.error(msg)
:param logger: (Logger) The logger instance to use.
:param environment: (Environ)
The environment to source when extracting.
- :return: RCO.ReturnCode OK if the extraction is successful
+ :return: (ReturnCode) OK if the extraction is successful
"""
if not where.exists():
- where.make()
+ where.make()
whe = str(where)
- if tag == "master" or tag == "HEAD":
- command = "git clone %(rem)s %(whe)s --quiet" % {'rem': from_what, 'whe': whe}
- else:
- # NOTICE: this command only works with recent version of git
- # because --work-tree does not work with an absolute path
- where_git = os.path.join(whe, ".git" )
- command = r"""rmdir %(whe)s && \
-git clone %(rem)s %(whe)s --quiet && \
-git --git-dir=%(whe_git)s --work-tree=%(whe)s checkout %(tag)s --quiet"""
- command = command % {'rem': from_what, 'tag': tag, 'whe': whe, 'whe_git': where_git }
+ where_git = os.path.join(whe, ".git" )
+ command = r"""
+set -x
+aDir=%(whe)s
+rmdir $aDir
+git clone %(rem)s $aDir --quiet
+cd $aDir
+git checkout %(tag)s --quiet
+# last command for OK/KO
+git log -n 1
+""" % {'rem': from_what, 'tag': tag, 'whe': whe }
env = environment.environ.environ
res = UTS.Popen(command, cwd=str(where.dir()), env=env, logger=logger)
+ DBG.write("git_extract", res.__repr__())
return res
def archive_extract(from_what, where, logger):
:return: (bool) True if the extraction is successful
"""
try:
- archive = tarfile.open(from_what)
- for i in archive.getmembers():
- archive.extract(i, path=str(where))
- return True, os.path.commonprefix(archive.getnames())
- except Exception as exc:
- logger.error("archive_extract: %s\n" % exc)
- return False, None
+ archive = tarfile.open(from_what)
+ for i in archive.getmembers():
+ archive.extract(i, path=str(where))
+ value = os.path.commonprefix(archive.getnames())
+ res = RCO.ReturnCode("OK", "archive_extract done", value=value)
+ except Exception as e:
+ logger.error("<KO> archive_extract problem:\n%s" % str(e))
+ res = RCO.ReturnCode("KO", "archive_extract problem", value=str(e))
+ return res
def cvs_extract(protocol, user, server, base, tag, product, where,
logger, checkout=False, environment=None):
:param checkout: (bool) If true use checkout cvs.
:param environment: (Environ)
The environment to source when extracting.
- :return: (bool) True if the extraction is successful
+ :return: (ReturnCode) OK if the extraction is successful
"""
opttag = ''
if tag is not None and len(tag) > 0:
- opttag = '-r ' + tag
+ opttag = '-r ' + tag
cmd = 'export'
if checkout:
- cmd = 'checkout'
+ cmd = 'checkout'
elif len(opttag) == 0:
- opttag = '-DNOW'
+ opttag = '-DNOW'
if len(protocol) > 0:
- root = "%s@%s:%s" % (user, server, base)
- command = "cvs -d :%(protocol)s:%(root)s %(command)s -d %(where)s %(tag)s %(product)s" % \
- { 'protocol': protocol, 'root': root, 'where': str(where.base()),
- 'tag': opttag, 'product': product, 'command': cmd }
+ root = "%s@%s:%s" % (user, server, base)
+ command = r"""
+set -x
+aDir=%(whe)s
+cvs -d :%(prot)s:%(root)s %(cmd)s -d $aDir %(tag)s %(prod)s
+# last command for OK/KO
+cd $aDir
+""" % { 'prot': protocol, 'root': root, 'whe': str(where.base()),
+ 'tag': opttag, 'prod': product, 'cmd': cmd }
else:
- command = "cvs -d %(root)s %(command)s -d %(where)s %(tag)s %(base)s/%(product)s" % \
- { 'root': server, 'base': base, 'where': str(where.base()),
- 'tag': opttag, 'product': product, 'command': cmd }
-
- logger.debug(command)
+ command = r"""
+set -x
+aDir=%(whe)s
+cvs -d %(root)s %(cmd)s -d $aDir %(tag)s %(base)s/%(prod)s
+# last command for OK/KO
+cd $aDir
+""" % { 'root': server, 'base': base, 'whe': str(where.base()),
+ 'tag': opttag, 'prod': product, 'cmd': cmd }
if not where.dir().exists():
- where.dir().make()
-
- logger.logTxtFile.write("\n" + command + "\n")
- logger.logTxtFile.flush()
- res = SP.call(command,
- cwd=str(where.dir()),
- env=environment.environ.environ,
- shell=True,
- stdout=logger.logTxtFile,
- stderr=SP.STDOUT)
- return (res == 0)
+ where.dir().make()
+
+ env = environment.environ.environ
+ res = UTS.Popen(command, cwd=str(where.dir()), env=env, logger=logger)
+ DBG.write("cvs_extract", res.__repr__())
+ return res
def svn_extract(user,
from_what,
:param checkout: (bool) If true use checkout svn.
:param environment: (Environ)
The environment to source when extracting.
- :return: (bool) True if the extraction is successful
+ :return: (ReturnCode) OK if the extraction is successful
"""
if not where.exists():
- where.make()
-
+ where.make()
+
+ repl = {
+ 'rem': from_what,
+ 'user': user,
+ 'whe': str(where),
+ 'tag' : tag,
+ }
+
if checkout:
- command = "svn checkout --username %(user)s %(remote)s %(where)s" % \
- { 'remote': from_what, 'user' : user, 'where': str(where) }
+ cmd = "svn checkout --username %(user)s %(rem)s $aDir" % repl
else:
- command = ""
- if os.path.exists(str(where)):
- command = "/bin/rm -rf %(where)s && " % \
- { 'remote': from_what, 'where': str(where) }
+ cmd = ""
+ if os.path.exists(str(where)):
+ cmd = "rm -rf $aDir\n" % repl
+ if tag == "master":
+ cmd += "svn export --username %(user)s %(rem)s $aDir" % repl
+ else:
+ cmd += "svn export -r %(tag)s --username %(user)s %(rem)s $aDir" % repl
- if tag == "master":
- command += "svn export --username %(user)s %(remote)s %(where)s" % \
- { 'remote': from_what, 'user' : user, 'where': str(where) }
- else:
- command += "svn export -r %(tag)s --username %(user)s %(remote)s %(where)s" % \
- { 'tag' : tag, 'remote': from_what, 'user' : user, 'where': str(where) }
-
- logger.logTxtFile.write(command + "\n")
-
- logger.debug(command)
- logger.logTxtFile.write("\n" + command + "\n")
- logger.logTxtFile.flush()
- res = SP.call(command,
- cwd=str(where.dir()),
- env=environment.environ.environ,
- shell=True,
- stdout=logger.logTxtFile,
- stderr=SP.STDOUT)
- return (res == 0)
+ cmd = """
+set -x
+aDir=%(whe)s
+%s
+# last command for OK/KO
+cd $aDir
+""" % cmd
+ env = environment.environ.environ
+ res = UTS.Popen(cmd, cwd=str(where.dir()), env=env, logger=logger)
+ DBG.write("svn_extract", res.__repr__())
+ return res
+
launcherDir = os.path.dirname(self.launcher)
if launcherName == 'runAppli':
# Old application
- cmd = "for i in %s/env.d/*.sh; do source ${i}; done ; echo $KERNEL_ROOT_DIR"
- cmd = cmd % launcherDir
+ cmd = r"""
+set -x
+for i in %s/env.d/*.sh; do source ${i}; done
+echo $KERNEL_ROOT_DIR
+""" % launcherDir
else:
- # New application TODO fix that horreur
- cmd = "echo -e 'import os\nprint os.environ[\"KERNEL_ROOT_DIR\"]' > tmpscript.py; %s shell tmpscript.py"
- cmd = cmd % self.launcher
+ # New application TODO fix that horror
+ cmd = r"""
+set -x
+echo -e 'import os\nprint os.environ["KERNEL_ROOT_DIR"]' > tmpscript.py
+%s shell tmpscript.py
+""" % self.launcher
p = SP.Popen(cmd, stdout=SP.PIPE, shell=True, executable='/bin/bash')
subproc_res = p.communicate()
##############################################################################
def Popen(command, shell=True, cwd=None, env=None, stdout=SP.PIPE, stderr=SP.PIPE, logger=None):
- """make subprocess.Popen(cmd), with call logger.trace and logger.error if problem"""
- try:
- proc = SP.Popen(command, shell=shell, cwd=cwd, env=env, stdout=stdout, stderr=stderr)
- res_out, res_err = proc.communicate()
-
- if res_err == "":
+ """
+ make subprocess.Popen(cmd), with
+ call logger.trace and logger.error if problem as returncode != 0
+ """
+ if True: #try:
+ proc = SP.Popen(command, shell=shell, cwd=cwd, env=env, stdout=stdout, stderr=SP.STDOUT)
+ res_out, res_err = proc.communicate() # res_err = None as stderr=SP.STDOUT
+ rc = proc.returncode
+
+ DBG.write("Popen logger returncode", (rc, res_out))
+
+ if rc == 0:
if logger is not None:
- logger.trace("OK launch command cwd=%s:\n%s" % (cwd, command))
- logger.trace("OK result command stdout:\n%s" % res_out)
+ logger.trace("<OK> launch command rc=%s cwd=<info>%s<reset>:\n%s" % (rc, cwd, command))
+ logger.trace("<OK> result command stdout&stderr:\n%s" % res_out)
return RCO.ReturnCode("OK", "command done", value=res_out)
else:
if logger is not None:
- logger.warning("KO launch command cwd=%s:\n%s" % (cwd, command))
- logger.warning("KO result command stdout:\n%s" % res_out)
- logger.warning("KO result command stderr:\n%s" % res_err)
- return RCO.ReturnCode("KO", "command problem", value=stderr)
- except Exception as e:
- logger.error("KO launch command cwd=%s:\n%s" % (cwd, command))
- logger.error("launch command exception:\n%s" % str(e))
+ logger.warning("<KO> launch command rc=%s cwd=<info>%s<reset>:\n%s" % (rc, cwd, command))
+ logger.warning("<KO> result command stdout&stderr:\n%s" % res_out)
+ return RCO.ReturnCode("KO", "command problem", value=res_out)
+ else: #except Exception as e:
+ logger.error("<KO> launch command cwd=%s:\n%s" % (cwd, command))
+ logger.error("launch command exception:\n%s" % e)
return RCO.ReturnCode("KO", "launch command problem")
def test_010(self):
cmd = "sat --help"
- stdout, stderr = SAT.launchSat(cmd)
- DBG.write("test_010 stdout", stdout)
- DBG.write("test_010 stderr", stderr)
- self.assertEqual(stderr, "")
- self.assertTrue(" - config" in stdout)
- self.assertTrue(" - prepare" in stdout)
- self.assertTrue(" - compile" in stdout)
+ res = SAT.launchSat(cmd)
+ self.assertTrue(res.isOk())
+ out = res.getValue()
+ self.assertTrue(" - config" in out)
+ self.assertTrue(" - prepare" in out)
+ self.assertTrue(" - compile" in out)
def test_011(self):
cmd = "--help"