#!/usr/bin/env python
-#-*- coding:utf-8 -*-
+# -*- coding:utf-8 -*-
# Copyright (C) 2010-2013 CEA/DEN
#
# This library is free software; you can redistribute it and/or
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-'''
+"""
In this file : all functions that do a system call,
like open a browser or an editor, or call a git command
-'''
+"""
import os
import subprocess as SP
import time
import tarfile
import time
-
+
import debug as DBG
import utilsSat as UTS
from . import printcolors
+
def show_in_editor(editor, filePath, logger):
- '''open filePath using editor.
-
+ """open filePath using editor.
+
:param editor str: The editor to use.
:param filePath str: The path to the file to open.
- '''
+ """
# default editor is vi
if editor is None or len(editor) == 0:
- editor = 'vi'
-
- if '%s' not in editor:
- editor += ' %s'
+ editor = "vi"
+
+ if "%s" not in editor:
+ editor += " %s"
try:
# launch cmd using subprocess.Popen
cmd = editor % filePath
- logger.write('Launched command:\n' + cmd + '\n', 5)
+ logger.write("Launched command:\n" + cmd + "\n", 5)
p = SP.Popen(cmd, shell=True)
p.communicate()
except:
- logger.write(printcolors.printcError(_("Unable to edit file %s\n")
- % filePath), 1)
+ logger.write(
+ printcolors.printcError(_("Unable to edit file %s\n") % filePath), 1
+ )
+
def show_in_webbrowser(editor, filePath, logger):
- '''open filePath using web browser firefox, chromium etc...
+ """open filePath using web browser firefox, chromium etc...
if file is xml, previous http sever is done before to fix new security problems
-
+
:param editor str: The web browser to use.
:param filePath str: The path to the file to open.
- '''
+ """
import psutil
+
# default editor is firefox
if editor is None or len(editor) == 0:
- editor = 'firefox'
-
+ editor = "firefox"
+
path, namefile = os.path.split(filePath)
basefile, ext = os.path.splitext(namefile)
# previouly http.server 8765/6/7... kill ... or not ? TODO wait and see REX
- port = os.getenv('SAT_PORT_LOG', '8765')
+ port = os.getenv("SAT_PORT_LOG", "8765")
for proc in psutil.process_iter():
- # help(proc)
- cmdline = " ".join(proc.cmdline())
- if "python3 -m http.server %s" % port in cmdline:
- print("kill previous process '%s'" % cmdline)
- proc.kill() # TODO may be not owner ? -> change 8766/7/8... as SAT_PORT_LOG
-
+ # help(proc)
+ cmdline = " ".join(proc.cmdline())
+ if "python3 -m http.server %s" % port in cmdline:
+ print("kill previous process '%s'" % cmdline)
+ proc.kill() # TODO may be not owner ? -> change 8766/7/8... as SAT_PORT_LOG
+
cmd = """
set -x
cd %(path)s
python3 -m http.server %(port)s &> /dev/null &
%(editor)s http://localhost:%(port)s/%(namefile)s
-""" % {"path": path, "editor": editor, "namefile": namefile, 'port': port}
+""" % {
+ "path": path,
+ "editor": editor,
+ "namefile": namefile,
+ "port": port,
+ }
# print("show_in_webbrowser:\n%s" % cmd)
-
+
try:
# launch cmd using subprocess.Popen
- logger.write('Launched command:\n%s\n' % cmd, 5)
+ logger.write("Launched command:\n%s\n" % cmd, 5)
p = SP.Popen(cmd, shell=True, stdout=SP.PIPE, stderr=SP.STDOUT)
- res_out, _ = p.communicate() # _ = None as stderr=SP.STDOUT
+ res_out, _ = p.communicate() # _ = None as stderr=SP.STDOUT
# print("Launched command stdout:\n%s" % res_out)
except Exception as e:
- logger.write(printcolors.printcError(_("Unable to display file %s\n%s\n")
- % (filePath, e)), 1)
-
+ logger.write(
+ printcolors.printcError(
+ _("Unable to display file %s\n%s\n") % (filePath, e)
+ ),
+ 1,
+ )
+
def git_describe(repo_path):
- '''Use git describe --tags command to return tag description of the git repository"
+ """Use git describe --tags command to return tag description of the git repository"
:param repo_path str: The git repository to describe
- '''
- git_cmd="cd %s;git describe --tags" % repo_path
+ """
+ git_cmd = "cd %s;git describe --tags" % repo_path
p = SP.Popen(git_cmd, shell=True, stdin=SP.PIPE, stdout=SP.PIPE, stderr=SP.PIPE)
p.wait()
if p.returncode != 0:
return False
else:
- tag_description=p.stdout.readlines()[0].strip()
+ tag_description = p.stdout.readlines()[0].strip()
# with python3 this utf8 bytes should be decoded
if isinstance(tag_description, bytes):
- tag_description=tag_description.decode("utf-8", "ignore")
+ tag_description = tag_description.decode("utf-8", "ignore")
return tag_description
def git_extract(from_what, tag, git_options, where, logger, environment=None):
- '''Extracts sources from a git repository.
-87
- :param from_what str: The remote git repository.
- :param tag str: The tag.
- :param git_options str: git options
- :param where str: The path where to extract.
- :param logger Logger: The logger instance to use.
- :param environment src.environment.Environ: The environment to source when extracting.
- :return: True if the extraction is successful
- :rtype: boolean
- '''
- DBG.write("git_extract", [from_what, tag, str(where)])
- if not where.exists():
- where.make()
- where_git = os.path.join(str(where), ".git")
- if tag == "master" or tag == "HEAD":
- if src.architecture.is_windows():
- cmd = "git clone %(git_options)s %(remote)s %(where)s"
- else:
- cmd = r"""
+ """Extracts sources from a git repository.
+ 87
+ :param from_what str: The remote git repository.
+ :param tag str: The tag.
+ :param git_options str: git options
+ :param where str: The path where to extract.
+ :param logger Logger: The logger instance to use.
+ :param environment src.environment.Environ: The environment to source when extracting.
+ :return: True if the extraction is successful
+ :rtype: boolean
+ """
+ DBG.write("git_extract", [from_what, tag, str(where)])
+ if not where.exists():
+ where.make()
+ where_git = os.path.join(str(where), ".git")
+ if tag == "master" or tag == "HEAD":
+ if src.architecture.is_windows():
+ cmd = "git clone %(git_options)s %(remote)s %(where)s"
+ else:
+ cmd = r"""
set -x
git clone %(git_options)s %(remote)s %(where)s
res=$?
fi
exit $res
"""
- cmd = cmd % {'git_options': git_options, 'remote': from_what, 'tag': tag, 'where': str(where), 'where_git': where_git}
- else:
- # NOTICE: this command only works with recent version of git
- # because --work-tree does not work with an absolute path
- if src.architecture.is_windows():
- cmd = "rm -rf %(where)s && git clone %(git_options)s %(remote)s %(where)s && git --git-dir=%(where_git)s --work-tree=%(where)s checkout %(tag)s"
+ cmd = cmd % {
+ "git_options": git_options,
+ "remote": from_what,
+ "tag": tag,
+ "where": str(where),
+ "where_git": where_git,
+ }
else:
-# for sat compile --update : changes the date of directory, only for branches, not tag
- cmd = r"""
+ # NOTICE: this command only works with recent version of git
+ # because --work-tree does not work with an absolute path
+ if src.architecture.is_windows():
+ cmd = "rm -rf %(where)s && git clone %(git_options)s %(remote)s %(where)s && git --git-dir=%(where_git)s --work-tree=%(where)s checkout %(tag)s"
+ else:
+ # for sat compile --update : changes the date of directory, only for branches, not tag
+ cmd = r"""
set -x
rm -rf %(where)s
git clone %(git_options)s %(remote)s %(where)s && \
fi
exit $res
"""
- cmd = cmd % {'git_options': git_options,
- 'remote': from_what,
- 'tag': tag,
- 'where': str(where),
- 'where_git': where_git}
-
-
- cmd=cmd.replace('date_format', '"%ai"')
- logger.logTxtFile.write("\n" + cmd + "\n")
- logger.logTxtFile.flush()
-
- DBG.write("cmd", cmd)
- # git commands may fail sometimes for various raisons
- # (big module, network troubles, tuleap maintenance)
- # therefore we give several tries
- i_try = 0
- max_number_of_tries = 3
- sleep_delay = 30 # seconds
- while (True):
- i_try += 1
- rc = UTS.Popen(cmd, cwd=str(where.dir()), env=environment.environ.environ, logger=logger)
- if rc.isOk() or (i_try>=max_number_of_tries):
- break
- logger.write('\ngit command failed! Wait %d seconds and give an other try (%d/%d)\n' % \
- (sleep_delay, i_try + 1, max_number_of_tries), 3)
- time.sleep(sleep_delay) # wait a little
-
- return rc.isOk()
+ cmd = cmd % {
+ "git_options": git_options,
+ "remote": from_what,
+ "tag": tag,
+ "where": str(where),
+ "where_git": where_git,
+ }
+
+ cmd = cmd.replace("date_format", '"%ai"')
+ logger.logTxtFile.write("\n" + cmd + "\n")
+ logger.logTxtFile.flush()
+
+ DBG.write("cmd", cmd)
+ # git commands may fail sometimes for various raisons
+ # (big module, network troubles, tuleap maintenance)
+ # therefore we give several tries
+ i_try = 0
+ max_number_of_tries = 3
+ sleep_delay = 30 # seconds
+ while True:
+ i_try += 1
+ rc = UTS.Popen(
+ cmd, cwd=str(where.dir()), env=environment.environ.environ, logger=logger
+ )
+ if rc.isOk() or (i_try >= max_number_of_tries):
+ break
+ logger.write(
+ "\ngit command failed! Wait %d seconds and give an other try (%d/%d)\n"
+ % (sleep_delay, i_try + 1, max_number_of_tries),
+ 3,
+ )
+ time.sleep(sleep_delay) # wait a little
+
+ return rc.isOk()
+
def git_checkout(tag, git_options, where, logger, environment=None):
- '''Checkout sources from a git repository.
-87
- :param tag str: The tag.
- :param git_options str: git options
- :param where str: The path where to extract.
- :param logger Logger: The logger instance to use.
- :param environment src.environment.Environ: The environment to source when extracting.
- :return: True if the extraction is successful
- :rtype: boolean
- '''
- DBG.write("git_checkout", [tag, str(where)])
- if not where.exists():
- where.make()
- where_git = os.path.join(str(where), ".git")
- cmd = r"""
+ """Checkout sources from a git repository.
+ 87
+ :param tag str: The tag.
+ :param git_options str: git options
+ :param where str: The path where to extract.
+ :param logger Logger: The logger instance to use.
+ :param environment src.environment.Environ: The environment to source when extracting.
+ :return: True if the extraction is successful
+ :rtype: boolean
+ """
+ DBG.write("git_checkout", [tag, str(where)])
+ if not where.exists():
+ where.make()
+ where_git = os.path.join(str(where), ".git")
+ cmd = r"""
git --git-dir=%(where_git)s --work-tree=%(where)s checkout %(git_options)s --guess %(tag)s
+res=$?
+if [ $res -eq 0 ]; then
+ touch -d "$(git --git-dir=%(where_git)s log -1 --format=date_format)" %(where)s
+fi
+exit $res
"""
- cmd = cmd % {'git_options': git_options, 'tag': tag, 'where': str(where), 'where_git': where_git}
+ cmd = cmd % {
+ "git_options": git_options,
+ "tag": tag,
+ "where": str(where),
+ "where_git": where_git,
+ }
+
+ cmd = cmd.replace("date_format", '"%ai"')
+ cmd = cmd.replace("--no-guess ", "")
+ isOk = launch_command(cmd, logger, where, environment)
+ return isOk
- cmd=cmd.replace('date_format', '"%ai"')
- cmd=cmd.replace('--no-guess ', '')
- isOk = launch_command(cmd, logger, where, environment)
- return isOk
def git_pull(from_what, tag, git_options, where, logger, environment=None):
- '''Checkout sources from a git repository.
-87
- :param from_what str: The remote git repository.
- :param tag str: The tag.
- :param git_options str: git options
- :param where str: The path where to extract.
- :param logger Logger: The logger instance to use.
- :param environment src.environment.Environ: The environment to source when extracting.
- :return: True if the extraction is successful
- :rtype: boolean
- '''
- DBG.write("git_pull", [from_what, tag, str(where)])
- if not where.exists():
- where.make()
- where_git = os.path.join(str(where), ".git")
- cmd = r"""
+ """Checkout sources from a git repository.
+ 87
+ :param from_what str: The remote git repository.
+ :param tag str: The tag.
+ :param git_options str: git options
+ :param where str: The path where to extract.
+ :param logger Logger: The logger instance to use.
+ :param environment src.environment.Environ: The environment to source when extracting.
+ :return: True if the extraction is successful
+ :rtype: boolean
+ """
+ DBG.write("git_pull", [from_what, tag, str(where)])
+ if not where.exists():
+ where.make()
+ where_git = os.path.join(str(where), ".git")
+ cmd = r"""
git --git-dir=%(where_git)s --work-tree=%(where)s pull %(git_options)s --recurse-submodule --ff-only origin %(tag)s
"""
- cmd = cmd % {'git_options': git_options, 'tag': tag, 'where': str(where), 'where_git': where_git}
+ cmd = cmd % {
+ "git_options": git_options,
+ "tag": tag,
+ "where": str(where),
+ "where_git": where_git,
+ }
- cmd=cmd.replace('date_format', '"%ai"')
- isOk = launch_command(cmd, logger, where, environment)
- return isOk
+ cmd = cmd.replace("date_format", '"%ai"')
+ isOk = launch_command(cmd, logger, where, environment)
+ return isOk
def launch_command(cmd, logger, where, environment):
- logger.logTxtFile.write("\n" + cmd + "\n")
- logger.logTxtFile.flush()
-
- DBG.write("cmd", cmd)
- # git commands may fail sometimes for various raisons
- # (big module, network troubles, tuleap maintenance)
- # therefore we give several tries
- i_try = 0
- max_number_of_tries = 3
- sleep_delay = 30 # seconds
- while (True):
- i_try += 1
- rc = UTS.Popen(cmd, cwd=str(where.dir()), env=environment.environ.environ, logger=logger)
- if rc.isOk() or (i_try>=max_number_of_tries):
- break
- logger.write('\ngit command failed! Wait %d seconds and give an other try (%d/%d)\n' % \
- (sleep_delay, i_try + 1, max_number_of_tries), 3)
- time.sleep(sleep_delay) # wait a little
-
- return rc.isOk()
-
-
-def git_extract_sub_dir(from_what, tag, git_options, where, sub_dir, logger, environment=None):
- '''Extracts sources from a subtree sub_dir of a git repository.
-
- :param from_what str: The remote git repository.
- :param tag str: The tag.
- :param git_options str: git options
- :param where str: The path where to extract.
- :param sub_dir str: The relative path of subtree to extract.
- :param logger Logger: The logger instance to use.
- :param environment src.environment.Environ: The environment to source when extracting.
- :return: True if the extraction is successful
- :rtype: boolean
- '''
- strWhere = str(where)
- tmpWhere = strWhere + '_tmp'
- parentWhere = os.path.dirname(strWhere)
- if not os.path.exists(parentWhere):
- logger.error("not existing directory: %s" % parentWhere)
- return False
- if os.path.isdir(strWhere):
- logger.error("do not override existing directory: %s" % strWhere)
- return False
- aDict = {'git_options': git_options,
- 'remote': from_what,
- 'tag': tag,
- 'sub_dir': sub_dir,
- 'where': strWhere,
- 'parentWhere': parentWhere,
- 'tmpWhere': tmpWhere,
- }
- DBG.write("git_extract_sub_dir", aDict)
- if not src.architecture.is_windows():
- cmd = r"""
+ logger.logTxtFile.write("\n" + cmd + "\n")
+ logger.logTxtFile.flush()
+
+ DBG.write("cmd", cmd)
+ # git commands may fail sometimes for various raisons
+ # (big module, network troubles, tuleap maintenance)
+ # therefore we give several tries
+ i_try = 0
+ max_number_of_tries = 3
+ sleep_delay = 30 # seconds
+ while True:
+ i_try += 1
+ rc = UTS.Popen(
+ cmd, cwd=str(where.dir()), env=environment.environ.environ, logger=logger
+ )
+ if rc.isOk() or (i_try >= max_number_of_tries):
+ break
+ logger.write(
+ "\ngit command failed! Wait %d seconds and give an other try (%d/%d)\n"
+ % (sleep_delay, i_try + 1, max_number_of_tries),
+ 3,
+ )
+ time.sleep(sleep_delay) # wait a little
+
+ return rc.isOk()
+
+
+def git_extract_sub_dir(
+ from_what, tag, git_options, where, sub_dir, logger, environment=None
+):
+ """Extracts sources from a subtree sub_dir of a git repository.
+
+ :param from_what str: The remote git repository.
+ :param tag str: The tag.
+ :param git_options str: git options
+ :param where str: The path where to extract.
+ :param sub_dir str: The relative path of subtree to extract.
+ :param logger Logger: The logger instance to use.
+ :param environment src.environment.Environ: The environment to source when extracting.
+ :return: True if the extraction is successful
+ :rtype: boolean
+ """
+ strWhere = str(where)
+ tmpWhere = strWhere + "_tmp"
+ parentWhere = os.path.dirname(strWhere)
+ if not os.path.exists(parentWhere):
+ logger.error("not existing directory: %s" % parentWhere)
+ return False
+ if os.path.isdir(strWhere):
+ logger.error("do not override existing directory: %s" % strWhere)
+ return False
+ aDict = {
+ "git_options": git_options,
+ "remote": from_what,
+ "tag": tag,
+ "sub_dir": sub_dir,
+ "where": strWhere,
+ "parentWhere": parentWhere,
+ "tmpWhere": tmpWhere,
+ }
+ DBG.write("git_extract_sub_dir", aDict)
+ if not src.architecture.is_windows():
+ cmd = (
+ r"""
set -x
export tmpDir=%(tmpWhere)s && \
rm -rf $tmpDir
mv %(sub_dir)s %(where)s && \
git log -1 > %(where)s/README_git_log.txt && \
rm -rf $tmpDir
-""" % aDict
- else:
- cmd = r"""
+"""
+ % aDict
+ )
+ else:
+ cmd = (
+ r"""
set tmpDir=%(tmpWhere)s && \
rm -rf $tmpDir
mv %(sub_dir)s %(where)s && \
git log -1 > %(where)s/README_git_log.txt && \
rm -rf $tmpDir
-""" % aDict
+"""
+ % aDict
+ )
+
+ DBG.write("cmd", cmd)
- DBG.write("cmd", cmd)
+ for nbtry in range(0, 3): # retries case of network problem
+ rc = UTS.Popen(
+ cmd, cwd=parentWhere, env=environment.environ.environ, logger=logger
+ )
+ if rc.isOk():
+ break
+ time.sleep(30) # wait a little
- for nbtry in range(0,3): # retries case of network problem
- rc = UTS.Popen(cmd, cwd=parentWhere, env=environment.environ.environ, logger=logger)
- if rc.isOk(): break
- time.sleep(30) # wait a little
+ return rc.isOk()
- return rc.isOk()
def archive_extract(from_what, where, logger):
- '''Extracts sources from an archive.
-
+ """Extracts sources from an archive.
+
:param from_what str: The path to the archive.
:param where str: The path where to extract.
:param logger Logger: The logger instance to use.
:return: True if the extraction is successful
:rtype: boolean
- '''
+ """
try:
archive = tarfile.open(from_what)
for i in archive.getmembers():
logger.write("archive_extract: %s\n" % exc)
return False, None
-def cvs_extract(protocol, user, server, base, tag, product, where,
- logger, checkout=False, environment=None):
- '''Extracts sources from a cvs repository.
-
+
+def cvs_extract(
+ protocol,
+ user,
+ server,
+ base,
+ tag,
+ product,
+ where,
+ logger,
+ checkout=False,
+ environment=None,
+):
+ """Extracts sources from a cvs repository.
+
:param protocol str: The cvs protocol.
:param user str: The user to be used.
:param server str: The remote cvs server.
extracting.
:return: True if the extraction is successful
:rtype: boolean
- '''
+ """
- opttag = ''
+ opttag = ""
if tag is not None and len(tag) > 0:
- opttag = '-r ' + tag
+ opttag = "-r " + tag
- cmd = 'export'
+ cmd = "export"
if checkout:
- cmd = 'checkout'
+ cmd = "checkout"
elif len(opttag) == 0:
- opttag = '-DNOW'
-
+ opttag = "-DNOW"
+
if len(protocol) > 0:
root = "%s@%s:%s" % (user, server, base)
- command = "cvs -d :%(protocol)s:%(root)s %(command)s -d %(where)s %(tag)s %(product)s" % \
- { 'protocol': protocol, 'root': root, 'where': str(where.base()),
- 'tag': opttag, 'product': product, 'command': cmd }
+ command = (
+ "cvs -d :%(protocol)s:%(root)s %(command)s -d %(where)s %(tag)s %(product)s"
+ % {
+ "protocol": protocol,
+ "root": root,
+ "where": str(where.base()),
+ "tag": opttag,
+ "product": product,
+ "command": cmd,
+ }
+ )
else:
- command = "cvs -d %(root)s %(command)s -d %(where)s %(tag)s %(base)s/%(product)s" % \
- { 'root': server, 'base': base, 'where': str(where.base()),
- 'tag': opttag, 'product': product, 'command': cmd }
+ command = (
+ "cvs -d %(root)s %(command)s -d %(where)s %(tag)s %(base)s/%(product)s"
+ % {
+ "root": server,
+ "base": base,
+ "where": str(where.base()),
+ "tag": opttag,
+ "product": product,
+ "command": cmd,
+ }
+ )
logger.write(command + "\n", 5)
where.dir().make()
logger.logTxtFile.write("\n" + command + "\n")
- logger.logTxtFile.flush()
- res = SP.call(command, cwd=str(where.dir()),
- env=environment.environ.environ,
- shell=True,
- stdout=logger.logTxtFile,
- stderr=SP.STDOUT)
- return (res == 0)
-
-def svn_extract(user,
- from_what,
- tag,
- where,
- logger,
- checkout=False,
- environment=None):
- '''Extracts sources from a svn repository.
-
+ logger.logTxtFile.flush()
+ res = SP.call(
+ command,
+ cwd=str(where.dir()),
+ env=environment.environ.environ,
+ shell=True,
+ stdout=logger.logTxtFile,
+ stderr=SP.STDOUT,
+ )
+ return res == 0
+
+
+def svn_extract(user, from_what, tag, where, logger, checkout=False, environment=None):
+ """Extracts sources from a svn repository.
+
:param user str: The user to be used.
:param from_what str: The remote git repository.
:param tag str: The tag.
extracting.
:return: True if the extraction is successful
:rtype: boolean
- '''
+ """
if not where.exists():
where.make()
if checkout:
- command = "svn checkout --username %(user)s %(remote)s %(where)s" % \
- { 'remote': from_what, 'user' : user, 'where': str(where) }
+ command = "svn checkout --username %(user)s %(remote)s %(where)s" % {
+ "remote": from_what,
+ "user": user,
+ "where": str(where),
+ }
else:
command = ""
if os.path.exists(str(where)):
- command = "/bin/rm -rf %(where)s && " % \
- { 'remote': from_what, 'where': str(where) }
-
+ command = "/bin/rm -rf %(where)s && " % {
+ "remote": from_what,
+ "where": str(where),
+ }
+
if tag == "master":
- command += "svn export --username %(user)s %(remote)s %(where)s" % \
- { 'remote': from_what, 'user' : user, 'where': str(where) }
+ command += "svn export --username %(user)s %(remote)s %(where)s" % {
+ "remote": from_what,
+ "user": user,
+ "where": str(where),
+ }
else:
- command += "svn export -r %(tag)s --username %(user)s %(remote)s %(where)s" % \
- { 'tag' : tag, 'remote': from_what, 'user' : user, 'where': str(where) }
-
+ command += (
+ "svn export -r %(tag)s --username %(user)s %(remote)s %(where)s"
+ % {"tag": tag, "remote": from_what, "user": user, "where": str(where)}
+ )
+
logger.logTxtFile.write(command + "\n")
-
+
logger.write(command + "\n", 5)
logger.logTxtFile.write("\n" + command + "\n")
logger.logTxtFile.flush()
- res = SP.call(command, cwd=str(where.dir()),
- env=environment.environ.environ,
- shell=True,
- stdout=logger.logTxtFile,
- stderr=SP.STDOUT)
- return (res == 0)
+ res = SP.call(
+ command,
+ cwd=str(where.dir()),
+ env=environment.environ.environ,
+ shell=True,
+ stdout=logger.logTxtFile,
+ stderr=SP.STDOUT,
+ )
+ return res == 0
+
def get_pkg_check_cmd(dist_name):
- '''Build the command to use for checking if a linux package is installed or not.'''
+ """Build the command to use for checking if a linux package is installed or not."""
- if dist_name in ["CO","FD","MG","MD","CO","OS"]: # linux using rpm
- linux="RH"
- manager_msg_err="Error : command failed because sat was not able to find apt command"
+ if dist_name in ["CO", "FD", "MG", "MD", "CO", "OS"]: # linux using rpm
+ linux = "RH"
+ manager_msg_err = (
+ "Error : command failed because sat was not able to find apt command"
+ )
else:
- linux="DB"
- manager_msg_err="Error : command failed because sat was not able to find rpm command"
+ linux = "DB"
+ manager_msg_err = (
+ "Error : command failed because sat was not able to find rpm command"
+ )
# 1- search for an installed package manager (rpm on rh, apt on db)
- cmd_which_rpm=["which", "rpm"]
- # cmd_which_apt=["which", "apt"]
- cmd_which_apt=["which", "dpkg-query"]
- with open(os.devnull, 'w') as devnull:
+ cmd_which_rpm = ["which", "rpm"]
+ cmd_which_apt = ["which", "apt"]
+ cmd_which_dpkg = ["which", "dpkg-query"]
+ with open(os.devnull, "w") as devnull:
# 1) we search for apt (debian based systems)
- completed=SP.call(cmd_which_apt,stdout=devnull, stderr=SP.STDOUT)
- if completed==0 and linux=="DB":
- # cmd_is_package_installed=["apt", "list", "--installed"]
- cmd_is_package_installed=["dpkg-query", "--no-pager", "-l"]
+ completed = SP.call(cmd_which_dpkg, stdout=devnull, stderr=SP.STDOUT)
+ if completed == 0 and linux == "DB":
+ cmd_is_package_installed = ["dpkg-query", "--no-pager", "-l"]
else:
- # 2) if apt not found search for rpm (redhat)
- completed=SP.call(cmd_which_rpm,stdout=devnull, stderr=SP.STDOUT) # only 3.8! ,capture_output=True)
- if completed==0 and linux=="RH":
- cmd_is_package_installed=["rpm", "-q"]
+ # 2) if dpkg not found search for rpm (redhat)
+ completed = SP.call(cmd_which_apt, stdout=devnull, stderr=SP.STDOUT)
+ if completed == 0 and linux == "DB":
+ cmd_is_package_installed = ["apt", "list", "--installed"]
else:
- # no package manager was found corresponding to dist_name
- raise src.SatException(manager_msg_err)
+ # 2) if apt and dpkg not found search for rpm (redhat)
+ completed = SP.call(
+ cmd_which_rpm, stdout=devnull, stderr=SP.STDOUT
+ ) # only 3.8! ,capture_output=True)
+ if completed == 0 and linux == "RH":
+ cmd_is_package_installed = ["rpm", "-q"]
+ else:
+ # no package manager was found corresponding to dist_name
+ raise src.SatException(manager_msg_err)
return cmd_is_package_installed
-def check_system_pkg(check_cmd,pkg):
- '''Check if a package is installed
+
+def check_system_pkg(check_cmd, pkg):
+ """Check if a package is installed
:param check_cmd list: the list of command to use system package manager
:param user str: the pkg name to check
:rtype: str
:return: a string with package name with status un message
- '''
+ """
# build command
- FNULL = open(os.devnull, 'w')
- cmd_is_package_installed=[]
+ FNULL = open(os.devnull, "w")
+ cmd_is_package_installed = []
for cmd in check_cmd:
cmd_is_package_installed.append(cmd)
cmd_is_package_installed.append(pkg)
-
- if check_cmd[0]=="apt":
+ if check_cmd[0] == "apt":
# special treatment for apt
# apt output is too messy for being used
# some debian packages have version numbers in their name, we need to add a *
# also apt do not return status, we need to use grep
- # and apt output is too messy for being used
- cmd_is_package_installed[-1]+="*" # we don't specify in pyconf the exact name because of version numbers
- p=SP.Popen(cmd_is_package_installed, stdout=SP.PIPE, stderr=FNULL)
+ # and apt output is too messy for being used
+ cmd_is_package_installed[
+ -1
+ ] += "*" # we don't specify in pyconf the exact name because of version numbers
+ p = SP.Popen(cmd_is_package_installed, stdout=SP.PIPE, stderr=FNULL)
try:
- output = SP.check_output(['grep', pkg], stdin=p.stdout)
- msg_status=src.printcolors.printcSuccess("OK")
+ output = SP.check_output(["grep", pkg], stdin=p.stdout)
+ msg_status = src.printcolors.printcSuccess("OK")
except:
- msg_status=src.printcolors.printcError("KO")
- msg_status+=" (package is not installed!)\n"
- elif check_cmd[0]=="dpkg-query":
+ msg_status = src.printcolors.printcError("KO")
+ msg_status += " (package is not installed!)\n"
+ elif check_cmd[0] == "dpkg-query":
# special treatment for dpkg-query
# some debian packages have version numbers in their name, we need to add a *
# also dpkg-query do not return status, we need to use grep
- # and dpkg-query output is too messy for being used
- cmd_is_package_installed[-1] = cmd_is_package_installed[-1] + "*" # we don't specify in pyconf the exact name because of version numbers
- p=SP.Popen(cmd_is_package_installed, stdout=SP.PIPE, stderr=FNULL)
+ # and dpkg-query output is too messy for being used
+ cmd_is_package_installed[-1] = (
+ cmd_is_package_installed[-1] + "*"
+ ) # we don't specify in pyconf the exact name because of version numbers
+ p = SP.Popen(cmd_is_package_installed, stdout=SP.PIPE, stderr=FNULL)
try:
- output = SP.check_output(['grep', "^ii"], stdin=p.stdout)
- msg_status=src.printcolors.printcSuccess("OK")
+ output = SP.check_output(["grep", "^ii"], stdin=p.stdout)
+ msg_status = src.printcolors.printcSuccess("OK")
except SP.CalledProcessError:
- msg_status=src.printcolors.printcError("KO")
- msg_status+=" (package is not installed!)\n"
+ msg_status = src.printcolors.printcError("KO")
+ msg_status += " (package is not installed!)\n"
else:
- p=SP.Popen(cmd_is_package_installed, stdout=SP.PIPE, stderr=FNULL)
+ p = SP.Popen(cmd_is_package_installed, stdout=SP.PIPE, stderr=FNULL)
output, err = p.communicate()
rc = p.returncode
- if rc==0:
- msg_status=src.printcolors.printcSuccess("OK")
+ if rc == 0:
+ msg_status = src.printcolors.printcSuccess("OK")
# in python3 output is a byte and should be decoded
if isinstance(output, bytes):
output = output.decode("utf-8", "ignore")
- msg_status+=" (" + output.replace('\n',' ') + ")\n" # remove output trailing \n
+ msg_status += (
+ " (" + output.replace("\n", " ") + ")\n"
+ ) # remove output trailing \n
else:
- msg_status=src.printcolors.printcError("KO")
- msg_status+=" (package is not installed!)\n"
+ msg_status = src.printcolors.printcError("KO")
+ msg_status += " (package is not installed!)\n"
return msg_status