From b6a130e47e34b0dc8cc16900bba14fc4dad1116a Mon Sep 17 00:00:00 2001 From: SONOLET Aymeric Date: Wed, 6 Dec 2023 12:45:42 +0100 Subject: [PATCH] improve: Replace apt list with dpkg-query for speed fix(update): check for apt if dpkg is not found --- src/product.py | 23 +- src/system.py | 714 +++++++++++++++++++++++++++++-------------------- 2 files changed, 434 insertions(+), 303 deletions(-) diff --git a/src/product.py b/src/product.py index 28948da..8a456eb 100644 --- a/src/product.py +++ b/src/product.py @@ -896,17 +896,17 @@ def check_installation(config, product_info): build_dep_ko=[] for pkg in build_pkg: if "KO" in build_pkg[pkg]: - build_dep_ko.append(pkg) + build_dep_ko.append(pkg) if build_dep_ko: - # the product is not installed : display message and return error status - msg="Please install them with %s before compiling salome" % check_cmd[0] - print("\nmissing compile time dependencies : ") - for md in build_dep_ko: - print(md) - print(msg) - return False - else: - return True + # the product is not installed : display message and return error status + msg="Please install them with %s before compiling salome" % check_cmd[0] + print(build_pkg) + print("\nmissing compile time dependencies : ") + for md in build_dep_ko: + print(md) + print(msg) + return False + return True install_dir = product_info.install_dir if src.product.product_is_fixed(product_info): @@ -1312,7 +1312,8 @@ def check_system_dep(distrib, check_cmd, product_info): if "rpm_dev" in additional_sysinfo: for pkg in additional_sysinfo.rpm_dev: build_dep[pkg]=src.system.check_system_pkg(check_cmd,pkg) - if check_cmd[0]=="apt": + #if check_cmd[0]=="apt" or check_cmd[0]=="dpkg-query": + else: if "apt" in sysinfo: for pkg in sysinfo.apt: runtime_dep[pkg]=src.system.check_system_pkg(check_cmd,pkg) diff --git a/src/system.py b/src/system.py index 0d5c2ea..206fcb8 100644 --- a/src/system.py +++ b/src/system.py @@ -1,5 +1,5 @@ #!/usr/bin/env python -#-*- coding:utf-8 -*- +# -*- coding:utf-8 -*- # Copyright (C) 2010-2013 CEA/DEN # # This library is free software; you can redistribute it and/or @@ -16,17 +16,17 @@ # License along with this library; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -''' +""" In this file : all functions that do a system call, like open a browser or an editor, or call a git command -''' +""" import os import subprocess as SP import time import tarfile import time - + import debug as DBG import utilsSat as UTS @@ -34,111 +34,124 @@ import src from . import printcolors + def show_in_editor(editor, filePath, logger): - '''open filePath using editor. - + """open filePath using editor. + :param editor str: The editor to use. :param filePath str: The path to the file to open. - ''' + """ # default editor is vi if editor is None or len(editor) == 0: - editor = 'vi' - - if '%s' not in editor: - editor += ' %s' + editor = "vi" + + if "%s" not in editor: + editor += " %s" try: # launch cmd using subprocess.Popen cmd = editor % filePath - logger.write('Launched command:\n' + cmd + '\n', 5) + logger.write("Launched command:\n" + cmd + "\n", 5) p = SP.Popen(cmd, shell=True) p.communicate() except: - logger.write(printcolors.printcError(_("Unable to edit file %s\n") - % filePath), 1) + logger.write( + printcolors.printcError(_("Unable to edit file %s\n") % filePath), 1 + ) + def show_in_webbrowser(editor, filePath, logger): - '''open filePath using web browser firefox, chromium etc... + """open filePath using web browser firefox, chromium etc... if file is xml, previous http sever is done before to fix new security problems - + :param editor str: The web browser to use. :param filePath str: The path to the file to open. - ''' + """ import psutil + # default editor is firefox if editor is None or len(editor) == 0: - editor = 'firefox' - + editor = "firefox" + path, namefile = os.path.split(filePath) basefile, ext = os.path.splitext(namefile) # previouly http.server 8765/6/7... kill ... or not ? TODO wait and see REX - port = os.getenv('SAT_PORT_LOG', '8765') + port = os.getenv("SAT_PORT_LOG", "8765") for proc in psutil.process_iter(): - # help(proc) - cmdline = " ".join(proc.cmdline()) - if "python3 -m http.server %s" % port in cmdline: - print("kill previous process '%s'" % cmdline) - proc.kill() # TODO may be not owner ? -> change 8766/7/8... as SAT_PORT_LOG - + # help(proc) + cmdline = " ".join(proc.cmdline()) + if "python3 -m http.server %s" % port in cmdline: + print("kill previous process '%s'" % cmdline) + proc.kill() # TODO may be not owner ? -> change 8766/7/8... as SAT_PORT_LOG + cmd = """ set -x cd %(path)s python3 -m http.server %(port)s &> /dev/null & %(editor)s http://localhost:%(port)s/%(namefile)s -""" % {"path": path, "editor": editor, "namefile": namefile, 'port': port} +""" % { + "path": path, + "editor": editor, + "namefile": namefile, + "port": port, + } # print("show_in_webbrowser:\n%s" % cmd) - + try: # launch cmd using subprocess.Popen - logger.write('Launched command:\n%s\n' % cmd, 5) + logger.write("Launched command:\n%s\n" % cmd, 5) p = SP.Popen(cmd, shell=True, stdout=SP.PIPE, stderr=SP.STDOUT) - res_out, _ = p.communicate() # _ = None as stderr=SP.STDOUT + res_out, _ = p.communicate() # _ = None as stderr=SP.STDOUT # print("Launched command stdout:\n%s" % res_out) except Exception as e: - logger.write(printcolors.printcError(_("Unable to display file %s\n%s\n") - % (filePath, e)), 1) - + logger.write( + printcolors.printcError( + _("Unable to display file %s\n%s\n") % (filePath, e) + ), + 1, + ) + def git_describe(repo_path): - '''Use git describe --tags command to return tag description of the git repository" + """Use git describe --tags command to return tag description of the git repository" :param repo_path str: The git repository to describe - ''' - git_cmd="cd %s;git describe --tags" % repo_path + """ + git_cmd = "cd %s;git describe --tags" % repo_path p = SP.Popen(git_cmd, shell=True, stdin=SP.PIPE, stdout=SP.PIPE, stderr=SP.PIPE) p.wait() if p.returncode != 0: return False else: - tag_description=p.stdout.readlines()[0].strip() + tag_description = p.stdout.readlines()[0].strip() # with python3 this utf8 bytes should be decoded if isinstance(tag_description, bytes): - tag_description=tag_description.decode("utf-8", "ignore") + tag_description = tag_description.decode("utf-8", "ignore") return tag_description def git_extract(from_what, tag, git_options, where, logger, environment=None): - '''Extracts sources from a git repository. -87 - :param from_what str: The remote git repository. - :param tag str: The tag. - :param git_options str: git options - :param where str: The path where to extract. - :param logger Logger: The logger instance to use. - :param environment src.environment.Environ: The environment to source when extracting. - :return: True if the extraction is successful - :rtype: boolean - ''' - DBG.write("git_extract", [from_what, tag, str(where)]) - if not where.exists(): - where.make() - where_git = os.path.join(str(where), ".git") - if tag == "master" or tag == "HEAD": - if src.architecture.is_windows(): - cmd = "git clone %(git_options)s %(remote)s %(where)s" - else: - cmd = r""" + """Extracts sources from a git repository. + 87 + :param from_what str: The remote git repository. + :param tag str: The tag. + :param git_options str: git options + :param where str: The path where to extract. + :param logger Logger: The logger instance to use. + :param environment src.environment.Environ: The environment to source when extracting. + :return: True if the extraction is successful + :rtype: boolean + """ + DBG.write("git_extract", [from_what, tag, str(where)]) + if not where.exists(): + where.make() + where_git = os.path.join(str(where), ".git") + if tag == "master" or tag == "HEAD": + if src.architecture.is_windows(): + cmd = "git clone %(git_options)s %(remote)s %(where)s" + else: + cmd = r""" set -x git clone %(git_options)s %(remote)s %(where)s res=$? @@ -147,15 +160,21 @@ if [ $res -eq 0 ]; then fi exit $res """ - cmd = cmd % {'git_options': git_options, 'remote': from_what, 'tag': tag, 'where': str(where), 'where_git': where_git} - else: - # NOTICE: this command only works with recent version of git - # because --work-tree does not work with an absolute path - if src.architecture.is_windows(): - cmd = "rm -rf %(where)s && git clone %(git_options)s %(remote)s %(where)s && git --git-dir=%(where_git)s --work-tree=%(where)s checkout %(tag)s" + cmd = cmd % { + "git_options": git_options, + "remote": from_what, + "tag": tag, + "where": str(where), + "where_git": where_git, + } else: -# for sat compile --update : changes the date of directory, only for branches, not tag - cmd = r""" + # NOTICE: this command only works with recent version of git + # because --work-tree does not work with an absolute path + if src.architecture.is_windows(): + cmd = "rm -rf %(where)s && git clone %(git_options)s %(remote)s %(where)s && git --git-dir=%(where_git)s --work-tree=%(where)s checkout %(tag)s" + else: + # for sat compile --update : changes the date of directory, only for branches, not tag + cmd = r""" set -x rm -rf %(where)s git clone %(git_options)s %(remote)s %(where)s && \ @@ -167,142 +186,174 @@ if [ $res -eq 0 -a $? -ne 0 ]; then fi exit $res """ - cmd = cmd % {'git_options': git_options, - 'remote': from_what, - 'tag': tag, - 'where': str(where), - 'where_git': where_git} - - - cmd=cmd.replace('date_format', '"%ai"') - logger.logTxtFile.write("\n" + cmd + "\n") - logger.logTxtFile.flush() - - DBG.write("cmd", cmd) - # git commands may fail sometimes for various raisons - # (big module, network troubles, tuleap maintenance) - # therefore we give several tries - i_try = 0 - max_number_of_tries = 3 - sleep_delay = 30 # seconds - while (True): - i_try += 1 - rc = UTS.Popen(cmd, cwd=str(where.dir()), env=environment.environ.environ, logger=logger) - if rc.isOk() or (i_try>=max_number_of_tries): - break - logger.write('\ngit command failed! Wait %d seconds and give an other try (%d/%d)\n' % \ - (sleep_delay, i_try + 1, max_number_of_tries), 3) - time.sleep(sleep_delay) # wait a little - - return rc.isOk() + cmd = cmd % { + "git_options": git_options, + "remote": from_what, + "tag": tag, + "where": str(where), + "where_git": where_git, + } + + cmd = cmd.replace("date_format", '"%ai"') + logger.logTxtFile.write("\n" + cmd + "\n") + logger.logTxtFile.flush() + + DBG.write("cmd", cmd) + # git commands may fail sometimes for various raisons + # (big module, network troubles, tuleap maintenance) + # therefore we give several tries + i_try = 0 + max_number_of_tries = 3 + sleep_delay = 30 # seconds + while True: + i_try += 1 + rc = UTS.Popen( + cmd, cwd=str(where.dir()), env=environment.environ.environ, logger=logger + ) + if rc.isOk() or (i_try >= max_number_of_tries): + break + logger.write( + "\ngit command failed! Wait %d seconds and give an other try (%d/%d)\n" + % (sleep_delay, i_try + 1, max_number_of_tries), + 3, + ) + time.sleep(sleep_delay) # wait a little + + return rc.isOk() + def git_checkout(tag, git_options, where, logger, environment=None): - '''Checkout sources from a git repository. -87 - :param tag str: The tag. - :param git_options str: git options - :param where str: The path where to extract. - :param logger Logger: The logger instance to use. - :param environment src.environment.Environ: The environment to source when extracting. - :return: True if the extraction is successful - :rtype: boolean - ''' - DBG.write("git_checkout", [tag, str(where)]) - if not where.exists(): - where.make() - where_git = os.path.join(str(where), ".git") - cmd = r""" + """Checkout sources from a git repository. + 87 + :param tag str: The tag. + :param git_options str: git options + :param where str: The path where to extract. + :param logger Logger: The logger instance to use. + :param environment src.environment.Environ: The environment to source when extracting. + :return: True if the extraction is successful + :rtype: boolean + """ + DBG.write("git_checkout", [tag, str(where)]) + if not where.exists(): + where.make() + where_git = os.path.join(str(where), ".git") + cmd = r""" git --git-dir=%(where_git)s --work-tree=%(where)s checkout %(git_options)s --guess %(tag)s +res=$? +if [ $res -eq 0 ]; then + touch -d "$(git --git-dir=%(where_git)s log -1 --format=date_format)" %(where)s +fi +exit $res """ - cmd = cmd % {'git_options': git_options, 'tag': tag, 'where': str(where), 'where_git': where_git} + cmd = cmd % { + "git_options": git_options, + "tag": tag, + "where": str(where), + "where_git": where_git, + } + + cmd = cmd.replace("date_format", '"%ai"') + cmd = cmd.replace("--no-guess ", "") + isOk = launch_command(cmd, logger, where, environment) + return isOk - cmd=cmd.replace('date_format', '"%ai"') - cmd=cmd.replace('--no-guess ', '') - isOk = launch_command(cmd, logger, where, environment) - return isOk def git_pull(from_what, tag, git_options, where, logger, environment=None): - '''Checkout sources from a git repository. -87 - :param from_what str: The remote git repository. - :param tag str: The tag. - :param git_options str: git options - :param where str: The path where to extract. - :param logger Logger: The logger instance to use. - :param environment src.environment.Environ: The environment to source when extracting. - :return: True if the extraction is successful - :rtype: boolean - ''' - DBG.write("git_pull", [from_what, tag, str(where)]) - if not where.exists(): - where.make() - where_git = os.path.join(str(where), ".git") - cmd = r""" + """Checkout sources from a git repository. + 87 + :param from_what str: The remote git repository. + :param tag str: The tag. + :param git_options str: git options + :param where str: The path where to extract. + :param logger Logger: The logger instance to use. + :param environment src.environment.Environ: The environment to source when extracting. + :return: True if the extraction is successful + :rtype: boolean + """ + DBG.write("git_pull", [from_what, tag, str(where)]) + if not where.exists(): + where.make() + where_git = os.path.join(str(where), ".git") + cmd = r""" git --git-dir=%(where_git)s --work-tree=%(where)s pull %(git_options)s --recurse-submodule --ff-only origin %(tag)s """ - cmd = cmd % {'git_options': git_options, 'tag': tag, 'where': str(where), 'where_git': where_git} + cmd = cmd % { + "git_options": git_options, + "tag": tag, + "where": str(where), + "where_git": where_git, + } - cmd=cmd.replace('date_format', '"%ai"') - isOk = launch_command(cmd, logger, where, environment) - return isOk + cmd = cmd.replace("date_format", '"%ai"') + isOk = launch_command(cmd, logger, where, environment) + return isOk def launch_command(cmd, logger, where, environment): - logger.logTxtFile.write("\n" + cmd + "\n") - logger.logTxtFile.flush() - - DBG.write("cmd", cmd) - # git commands may fail sometimes for various raisons - # (big module, network troubles, tuleap maintenance) - # therefore we give several tries - i_try = 0 - max_number_of_tries = 3 - sleep_delay = 30 # seconds - while (True): - i_try += 1 - rc = UTS.Popen(cmd, cwd=str(where.dir()), env=environment.environ.environ, logger=logger) - if rc.isOk() or (i_try>=max_number_of_tries): - break - logger.write('\ngit command failed! Wait %d seconds and give an other try (%d/%d)\n' % \ - (sleep_delay, i_try + 1, max_number_of_tries), 3) - time.sleep(sleep_delay) # wait a little - - return rc.isOk() - - -def git_extract_sub_dir(from_what, tag, git_options, where, sub_dir, logger, environment=None): - '''Extracts sources from a subtree sub_dir of a git repository. - - :param from_what str: The remote git repository. - :param tag str: The tag. - :param git_options str: git options - :param where str: The path where to extract. - :param sub_dir str: The relative path of subtree to extract. - :param logger Logger: The logger instance to use. - :param environment src.environment.Environ: The environment to source when extracting. - :return: True if the extraction is successful - :rtype: boolean - ''' - strWhere = str(where) - tmpWhere = strWhere + '_tmp' - parentWhere = os.path.dirname(strWhere) - if not os.path.exists(parentWhere): - logger.error("not existing directory: %s" % parentWhere) - return False - if os.path.isdir(strWhere): - logger.error("do not override existing directory: %s" % strWhere) - return False - aDict = {'git_options': git_options, - 'remote': from_what, - 'tag': tag, - 'sub_dir': sub_dir, - 'where': strWhere, - 'parentWhere': parentWhere, - 'tmpWhere': tmpWhere, - } - DBG.write("git_extract_sub_dir", aDict) - if not src.architecture.is_windows(): - cmd = r""" + logger.logTxtFile.write("\n" + cmd + "\n") + logger.logTxtFile.flush() + + DBG.write("cmd", cmd) + # git commands may fail sometimes for various raisons + # (big module, network troubles, tuleap maintenance) + # therefore we give several tries + i_try = 0 + max_number_of_tries = 3 + sleep_delay = 30 # seconds + while True: + i_try += 1 + rc = UTS.Popen( + cmd, cwd=str(where.dir()), env=environment.environ.environ, logger=logger + ) + if rc.isOk() or (i_try >= max_number_of_tries): + break + logger.write( + "\ngit command failed! Wait %d seconds and give an other try (%d/%d)\n" + % (sleep_delay, i_try + 1, max_number_of_tries), + 3, + ) + time.sleep(sleep_delay) # wait a little + + return rc.isOk() + + +def git_extract_sub_dir( + from_what, tag, git_options, where, sub_dir, logger, environment=None +): + """Extracts sources from a subtree sub_dir of a git repository. + + :param from_what str: The remote git repository. + :param tag str: The tag. + :param git_options str: git options + :param where str: The path where to extract. + :param sub_dir str: The relative path of subtree to extract. + :param logger Logger: The logger instance to use. + :param environment src.environment.Environ: The environment to source when extracting. + :return: True if the extraction is successful + :rtype: boolean + """ + strWhere = str(where) + tmpWhere = strWhere + "_tmp" + parentWhere = os.path.dirname(strWhere) + if not os.path.exists(parentWhere): + logger.error("not existing directory: %s" % parentWhere) + return False + if os.path.isdir(strWhere): + logger.error("do not override existing directory: %s" % strWhere) + return False + aDict = { + "git_options": git_options, + "remote": from_what, + "tag": tag, + "sub_dir": sub_dir, + "where": strWhere, + "parentWhere": parentWhere, + "tmpWhere": tmpWhere, + } + DBG.write("git_extract_sub_dir", aDict) + if not src.architecture.is_windows(): + cmd = ( + r""" set -x export tmpDir=%(tmpWhere)s && \ rm -rf $tmpDir @@ -312,9 +363,12 @@ git checkout %(tag)s && \ mv %(sub_dir)s %(where)s && \ git log -1 > %(where)s/README_git_log.txt && \ rm -rf $tmpDir -""" % aDict - else: - cmd = r""" +""" + % aDict + ) + else: + cmd = ( + r""" set tmpDir=%(tmpWhere)s && \ rm -rf $tmpDir @@ -324,26 +378,32 @@ git checkout %(tag)s && \ mv %(sub_dir)s %(where)s && \ git log -1 > %(where)s/README_git_log.txt && \ rm -rf $tmpDir -""" % aDict +""" + % aDict + ) + + DBG.write("cmd", cmd) - DBG.write("cmd", cmd) + for nbtry in range(0, 3): # retries case of network problem + rc = UTS.Popen( + cmd, cwd=parentWhere, env=environment.environ.environ, logger=logger + ) + if rc.isOk(): + break + time.sleep(30) # wait a little - for nbtry in range(0,3): # retries case of network problem - rc = UTS.Popen(cmd, cwd=parentWhere, env=environment.environ.environ, logger=logger) - if rc.isOk(): break - time.sleep(30) # wait a little + return rc.isOk() - return rc.isOk() def archive_extract(from_what, where, logger): - '''Extracts sources from an archive. - + """Extracts sources from an archive. + :param from_what str: The path to the archive. :param where str: The path where to extract. :param logger Logger: The logger instance to use. :return: True if the extraction is successful :rtype: boolean - ''' + """ try: archive = tarfile.open(from_what) for i in archive.getmembers(): @@ -353,10 +413,21 @@ def archive_extract(from_what, where, logger): logger.write("archive_extract: %s\n" % exc) return False, None -def cvs_extract(protocol, user, server, base, tag, product, where, - logger, checkout=False, environment=None): - '''Extracts sources from a cvs repository. - + +def cvs_extract( + protocol, + user, + server, + base, + tag, + product, + where, + logger, + checkout=False, + environment=None, +): + """Extracts sources from a cvs repository. + :param protocol str: The cvs protocol. :param user str: The user to be used. :param server str: The remote cvs server. @@ -370,27 +441,43 @@ def cvs_extract(protocol, user, server, base, tag, product, where, extracting. :return: True if the extraction is successful :rtype: boolean - ''' + """ - opttag = '' + opttag = "" if tag is not None and len(tag) > 0: - opttag = '-r ' + tag + opttag = "-r " + tag - cmd = 'export' + cmd = "export" if checkout: - cmd = 'checkout' + cmd = "checkout" elif len(opttag) == 0: - opttag = '-DNOW' - + opttag = "-DNOW" + if len(protocol) > 0: root = "%s@%s:%s" % (user, server, base) - command = "cvs -d :%(protocol)s:%(root)s %(command)s -d %(where)s %(tag)s %(product)s" % \ - { 'protocol': protocol, 'root': root, 'where': str(where.base()), - 'tag': opttag, 'product': product, 'command': cmd } + command = ( + "cvs -d :%(protocol)s:%(root)s %(command)s -d %(where)s %(tag)s %(product)s" + % { + "protocol": protocol, + "root": root, + "where": str(where.base()), + "tag": opttag, + "product": product, + "command": cmd, + } + ) else: - command = "cvs -d %(root)s %(command)s -d %(where)s %(tag)s %(base)s/%(product)s" % \ - { 'root': server, 'base': base, 'where': str(where.base()), - 'tag': opttag, 'product': product, 'command': cmd } + command = ( + "cvs -d %(root)s %(command)s -d %(where)s %(tag)s %(base)s/%(product)s" + % { + "root": server, + "base": base, + "where": str(where.base()), + "tag": opttag, + "product": product, + "command": cmd, + } + ) logger.write(command + "\n", 5) @@ -398,23 +485,21 @@ def cvs_extract(protocol, user, server, base, tag, product, where, where.dir().make() logger.logTxtFile.write("\n" + command + "\n") - logger.logTxtFile.flush() - res = SP.call(command, cwd=str(where.dir()), - env=environment.environ.environ, - shell=True, - stdout=logger.logTxtFile, - stderr=SP.STDOUT) - return (res == 0) - -def svn_extract(user, - from_what, - tag, - where, - logger, - checkout=False, - environment=None): - '''Extracts sources from a svn repository. - + logger.logTxtFile.flush() + res = SP.call( + command, + cwd=str(where.dir()), + env=environment.environ.environ, + shell=True, + stdout=logger.logTxtFile, + stderr=SP.STDOUT, + ) + return res == 0 + + +def svn_extract(user, from_what, tag, where, logger, checkout=False, environment=None): + """Extracts sources from a svn repository. + :param user str: The user to be used. :param from_what str: The remote git repository. :param tag str: The tag. @@ -425,107 +510,152 @@ def svn_extract(user, extracting. :return: True if the extraction is successful :rtype: boolean - ''' + """ if not where.exists(): where.make() if checkout: - command = "svn checkout --username %(user)s %(remote)s %(where)s" % \ - { 'remote': from_what, 'user' : user, 'where': str(where) } + command = "svn checkout --username %(user)s %(remote)s %(where)s" % { + "remote": from_what, + "user": user, + "where": str(where), + } else: command = "" if os.path.exists(str(where)): - command = "/bin/rm -rf %(where)s && " % \ - { 'remote': from_what, 'where': str(where) } - + command = "/bin/rm -rf %(where)s && " % { + "remote": from_what, + "where": str(where), + } + if tag == "master": - command += "svn export --username %(user)s %(remote)s %(where)s" % \ - { 'remote': from_what, 'user' : user, 'where': str(where) } + command += "svn export --username %(user)s %(remote)s %(where)s" % { + "remote": from_what, + "user": user, + "where": str(where), + } else: - command += "svn export -r %(tag)s --username %(user)s %(remote)s %(where)s" % \ - { 'tag' : tag, 'remote': from_what, 'user' : user, 'where': str(where) } - + command += ( + "svn export -r %(tag)s --username %(user)s %(remote)s %(where)s" + % {"tag": tag, "remote": from_what, "user": user, "where": str(where)} + ) + logger.logTxtFile.write(command + "\n") - + logger.write(command + "\n", 5) logger.logTxtFile.write("\n" + command + "\n") logger.logTxtFile.flush() - res = SP.call(command, cwd=str(where.dir()), - env=environment.environ.environ, - shell=True, - stdout=logger.logTxtFile, - stderr=SP.STDOUT) - return (res == 0) + res = SP.call( + command, + cwd=str(where.dir()), + env=environment.environ.environ, + shell=True, + stdout=logger.logTxtFile, + stderr=SP.STDOUT, + ) + return res == 0 + def get_pkg_check_cmd(dist_name): - '''Build the command to use for checking if a linux package is installed or not.''' + """Build the command to use for checking if a linux package is installed or not.""" - if dist_name in ["CO","FD","MG","MD","CO","OS"]: # linux using rpm - linux="RH" - manager_msg_err="Error : command failed because sat was not able to find apt command" + if dist_name in ["CO", "FD", "MG", "MD", "CO", "OS"]: # linux using rpm + linux = "RH" + manager_msg_err = ( + "Error : command failed because sat was not able to find apt command" + ) else: - linux="DB" - manager_msg_err="Error : command failed because sat was not able to find rpm command" + linux = "DB" + manager_msg_err = ( + "Error : command failed because sat was not able to find rpm command" + ) # 1- search for an installed package manager (rpm on rh, apt on db) - cmd_which_rpm=["which", "rpm"] - cmd_which_apt=["which", "apt"] - with open(os.devnull, 'w') as devnull: + cmd_which_rpm = ["which", "rpm"] + cmd_which_apt = ["which", "apt"] + cmd_which_dpkg = ["which", "dpkg-query"] + with open(os.devnull, "w") as devnull: # 1) we search for apt (debian based systems) - completed=SP.call(cmd_which_apt,stdout=devnull, stderr=SP.STDOUT) - if completed==0 and linux=="DB": - cmd_is_package_installed=["apt", "list", "--installed"] + completed = SP.call(cmd_which_dpkg, stdout=devnull, stderr=SP.STDOUT) + if completed == 0 and linux == "DB": + cmd_is_package_installed = ["dpkg-query", "--no-pager", "-l"] else: - # 2) if apt not found search for rpm (redhat) - completed=SP.call(cmd_which_rpm,stdout=devnull, stderr=SP.STDOUT) # only 3.8! ,capture_output=True) - if completed==0 and linux=="RH": - cmd_is_package_installed=["rpm", "-q"] + # 2) if dpkg not found search for rpm (redhat) + completed = SP.call(cmd_which_apt, stdout=devnull, stderr=SP.STDOUT) + if completed == 0 and linux == "DB": + cmd_is_package_installed = ["apt", "list", "--installed"] else: - # no package manager was found corresponding to dist_name - raise src.SatException(manager_msg_err) + # 2) if apt and dpkg not found search for rpm (redhat) + completed = SP.call( + cmd_which_rpm, stdout=devnull, stderr=SP.STDOUT + ) # only 3.8! ,capture_output=True) + if completed == 0 and linux == "RH": + cmd_is_package_installed = ["rpm", "-q"] + else: + # no package manager was found corresponding to dist_name + raise src.SatException(manager_msg_err) return cmd_is_package_installed -def check_system_pkg(check_cmd,pkg): - '''Check if a package is installed + +def check_system_pkg(check_cmd, pkg): + """Check if a package is installed :param check_cmd list: the list of command to use system package manager :param user str: the pkg name to check :rtype: str :return: a string with package name with status un message - ''' + """ # build command - FNULL = open(os.devnull, 'w') - cmd_is_package_installed=[] + FNULL = open(os.devnull, "w") + cmd_is_package_installed = [] for cmd in check_cmd: cmd_is_package_installed.append(cmd) cmd_is_package_installed.append(pkg) - - if check_cmd[0]=="apt": + if check_cmd[0] == "apt": # special treatment for apt # apt output is too messy for being used # some debian packages have version numbers in their name, we need to add a * # also apt do not return status, we need to use grep - # and apt output is too messy for being used - cmd_is_package_installed[-1]+="*" # we don't specify in pyconf the exact name because of version numbers - p=SP.Popen(cmd_is_package_installed, stdout=SP.PIPE, stderr=FNULL) + # and apt output is too messy for being used + cmd_is_package_installed[ + -1 + ] += "*" # we don't specify in pyconf the exact name because of version numbers + p = SP.Popen(cmd_is_package_installed, stdout=SP.PIPE, stderr=FNULL) try: - output = SP.check_output(['grep', pkg], stdin=p.stdout) - msg_status=src.printcolors.printcSuccess("OK") + output = SP.check_output(["grep", pkg], stdin=p.stdout) + msg_status = src.printcolors.printcSuccess("OK") except: - msg_status=src.printcolors.printcError("KO") - msg_status+=" (package is not installed!)\n" + msg_status = src.printcolors.printcError("KO") + msg_status += " (package is not installed!)\n" + elif check_cmd[0] == "dpkg-query": + # special treatment for dpkg-query + # some debian packages have version numbers in their name, we need to add a * + # also dpkg-query do not return status, we need to use grep + # and dpkg-query output is too messy for being used + cmd_is_package_installed[-1] = ( + cmd_is_package_installed[-1] + "*" + ) # we don't specify in pyconf the exact name because of version numbers + p = SP.Popen(cmd_is_package_installed, stdout=SP.PIPE, stderr=FNULL) + try: + output = SP.check_output(["grep", "^ii"], stdin=p.stdout) + msg_status = src.printcolors.printcSuccess("OK") + except SP.CalledProcessError: + msg_status = src.printcolors.printcError("KO") + msg_status += " (package is not installed!)\n" else: - p=SP.Popen(cmd_is_package_installed, stdout=SP.PIPE, stderr=FNULL) + p = SP.Popen(cmd_is_package_installed, stdout=SP.PIPE, stderr=FNULL) output, err = p.communicate() rc = p.returncode - if rc==0: - msg_status=src.printcolors.printcSuccess("OK") + if rc == 0: + msg_status = src.printcolors.printcSuccess("OK") # in python3 output is a byte and should be decoded if isinstance(output, bytes): output = output.decode("utf-8", "ignore") - msg_status+=" (" + output.replace('\n',' ') + ")\n" # remove output trailing \n + msg_status += ( + " (" + output.replace("\n", " ") + ")\n" + ) # remove output trailing \n else: - msg_status=src.printcolors.printcError("KO") - msg_status+=" (package is not installed!)\n" + msg_status = src.printcolors.printcError("KO") + msg_status += " (package is not installed!)\n" return msg_status -- 2.39.2