# -*- coding: iso-8859-1 -*-
-# Copyright (C) 2007-2012 CEA/DEN, EDF R&D, OPEN CASCADE
+# Copyright (C) 2007-2023 CEA, EDF, OPEN CASCADE
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
-# version 2.1 of the License.
+# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
#
-# ---
-# File : salome_utils.py
-# Author : Vadim SANDLER, Open CASCADE S.A.S. (vadim.sandler@opencascade.com)
-# ---
-
## @package salome_utils
-# \brief Set of utility functions used by SALOME python scripts.
+# @brief Set of utility functions used by SALOME python scripts.
-#
-# Exported functions
-#
+"""
+Various utilities for SALOME.
+"""
-__all__ = [
- 'getORBcfgInfo',
- 'getHostFromORBcfg',
- 'getPortFromORBcfg',
- 'getUserName',
- 'getHostName',
- 'getShortHostName',
- 'getAppName',
- 'getPortNumber',
- 'getTmpDir',
- 'getHomeDir',
- 'generateFileName',
- 'makeTmpDir',
- 'uniteFiles',
- ]
+# pragma pylint: disable=invalid-name
-# ---
+import os
+import os.path as osp
+import re
+import shutil
+import socket
+import sys
+import tempfile
+from contextlib import suppress
+
+import psutil
-def _try_bool( arg ):
+def _try_bool(arg):
"""
- Check if specified parameter represents boolean value and returns its value.
- String values like 'True', 'TRUE', 'YES', 'Yes', 'y', 'NO', 'false', 'n', etc
+ Convert given `arg` to a boolean value.
+ String values like 'True', 'TRUE', 'YES', 'Yes', 'y', 'NO', 'false', 'n', etc.
are supported.
- If <arg> does not represent a boolean, an exception is raised.
+ If `arg` does not represent a boolean, an exception is raised.
+ :param arg : value being converted
+ :return result of conversion: `True` or `False`
"""
- import types
- if type( arg ) == types.BooleanType :
+ if isinstance(arg, bool):
return arg
- elif type( arg ) == types.StringType :
- v = str( arg ).lower()
- if v in [ "yes", "y", "true" ]: return True
- elif v in [ "no", "n", "false" ]: return False
- pass
- raise Exception("Not boolean value")
+ if isinstance(arg, bytes):
+ arg = arg.decode('utf-8', errors='ignore')
+ if isinstance(arg, str):
+ if arg.lower() in ('yes', 'y', 'true', 't'):
+ return True
+ if arg.lower() in ('no', 'n', 'false', 'f'):
+ return False
+ raise ValueError('Not a boolean value')
# ---
def getORBcfgInfo():
"""
- Get omniORB current configuration.
- Returns a list of three values: [ orb_version, host_name, port_number ].
-
+ Get current omniORB configuration.
+
The information is retrieved from the omniORB configuration file defined
by the OMNIORB_CONFIG environment variable.
- If omniORB configuration file can not be accessed, a list of three empty
+ If omniORB configuration file can not be accessed, a tuple of three empty
strings is returned.
- """
- import os, re
- ret = [ "", "", "" ]
- try:
- f = open( os.getenv( "OMNIORB_CONFIG" ) )
- lines = f.readlines()
- f.close()
- regvar = re.compile( "(ORB)?InitRef.*corbaname::(.*):(\d+)\s*$" )
- for l in lines:
- try:
- m = regvar.match( l )
- if m:
- if m.group(1) is None:
- ret[0] = "4"
- else:
- ret[0] = "3"
- pass
- ret[1] = m.group(2)
- ret[2] = m.group(3)
- break
- pass
- except:
- pass
- pass
- pass
- except:
- pass
- return ret
+
+ :return tuple of three strings: (orb_version, host_name, port_number)
+ """
+ orb_version = ''
+ hostname = ''
+ port_number = ''
+ with suppress(Exception), open(os.getenv('OMNIORB_CONFIG')) as forb:
+ regvar = re.compile(r'(ORB)?InitRef.*corbaname::(.*):(\d+)\s*$')
+ for line in forb.readlines():
+ match = regvar.match(line)
+ if match:
+ orb_version = '4' if match.group(1) is None else '3'
+ hostname = match.group(2)
+ port_number = match.group(3)
+ break
+ return orb_version, hostname, port_number
# ---
def getHostFromORBcfg():
"""
- Get current omniORB host.
+ Get current omniORB host name.
+ :return host name
"""
return getORBcfgInfo()[1]
+
# ---
def getPortFromORBcfg():
"""
Get current omniORB port.
+ :return port number
"""
return getORBcfgInfo()[2]
def getUserName():
"""
- Get user name:
- 1. try USER environment variable
- 2. if fails, return 'unknown' as default user name
+ Get user name.
+
+ The following procedure is perfomed to deduce user name:
+ 1. try USER (USERNAME on Windows) environment variable.
+ 2. if (1) fails, try LOGNAME (un*x only).
+ 3. if (2) fails, return 'unknown' as default user name
+
+ :return user name
"""
- import os
- return os.getenv( "USER", "unknown" ) # 'unknown' is default user name
+ if sys.platform == 'win32':
+ username = os.getenv('USERNAME')
+ else:
+ username = os.getenv('USER', os.getenv('LOGNAME'))
+ if username is None:
+ import getpass
+ username = getpass.getuser()
+ return username
# ---
def getHostName():
"""
- Get host name:
- 1. try socket python module gethostname() function
- 2. if fails, try HOSTNAME environment variable
- 3. if fails, try HOST environment variable
- 4. if fails, return 'unknown' as default host name
+ Get host name.
+
+ The following procedure is perfomed to deduce host name:
+ 1. try socket python module, gethostname() function
+ 2. if (1) fails, try HOSTNAME environment variable
+ 3. if (2) fails, try HOST environment variable
+ 4. if (3) fails, tries 'unknown' as default host name
+ 5. finally, checks that IP is configured for hostname; if not, returns 'localhost'
+
+ :return host name
"""
- import os
- try:
- import socket
+ host = None
+ with suppress(Exception):
host = socket.gethostname()
- except:
- host = None
- pass
- if not host: host = os.getenv("HOSTNAME")
- if not host: host = os.getenv("HOST")
- if not host: host = "unknown" # 'unknown' is default host name
+ if not host:
+ host = os.getenv('HOSTNAME', os.getenv('HOST', 'unknown'))
+ try:
+ # the following line just checks that IP is configured for hostname
+ socket.gethostbyname(host)
+ except (TypeError, OSError):
+ host = 'localhost'
return host
# ---
def getShortHostName():
"""
- Get short host name:
- 1. try socket python module gethostname() function
- 2. if fails, try HOSTNAME environment variable
- 3. if fails, try HOST environment variable
- 4. if fails, return 'unknown' as default host name
+ Get short host name (with domain stripped).
+ See `getHostName()` for more details.
+ :return short host name
"""
- try:
+ with suppress(AttributeError, IndexError):
return getHostName().split('.')[0]
- except:
- pass
- return "unknown" # 'unknown' is default host name
-
+ return 'unknown' # default host name
+
# ---
def getAppName():
"""
- Get application name:
+ Get application name.
+ The following procedure is perfomed to deduce application name:
1. try APPNAME environment variable
- 2. if fails, return 'SALOME' as default application name
+ 2. if (1) fails, return 'SALOME' as default application name
+ :return application name
"""
- import os
- return os.getenv( "APPNAME", "SALOME" ) # 'SALOME' is default user name
+ return os.getenv('APPNAME', 'SALOME') # 'SALOME' is default user name
+
+def getPid():
+ return os.getpid()
# ---
def getPortNumber(use_default=True):
"""
- Get current naming server port number:
+ Get currently used omniORB port.
+ The following procedure is perfomed to deduce port number:
1. try NSPORT environment variable
- 1. if fails, try to parse config file defined by OMNIORB_CONFIG environment variable
- 2. if fails, return 2809 as default port number (if use_default is True) or None (id use_default is False)
- """
- import os
- try:
- return int( os.getenv( "NSPORT" ) )
- except:
- pass
- try:
- port = int( getPortFromORBcfg() )
- if port is not None: return port
- except:
- pass
- if use_default: return 2809 # '2809' is default port number
- return None
+ 2. if (1) fails, try to parse config file defined by OMNIORB_CONFIG environment variable
+ 3. if (2) fails, return 2809 as default port number (if use_default is `True`) or `None`
+ (if use_default is `False`)
+ :return port number
+ """
+ with suppress(TypeError, ValueError):
+ return int(os.getenv('NSPORT'))
+ with suppress(TypeError, ValueError):
+ port = int(getPortFromORBcfg())
+ if port:
+ return port
+ return 2809 if use_default else None
# ---
def getHomeDir():
"""
Get home directory.
+ :return home directory path
"""
- import os, sys
- if sys.platform == "win32":
- # for Windows the home directory is detected in the following way:
- # 1. try USERPROFILE env variable
- # 2. try combination of HOMEDRIVE and HOMEPATH env variables
- # 3. try HOME env variable
- # TODO: on Windows, also GetUserProfileDirectoryW() system function might be used
- dir = os.getenv("USERPROFILE")
- if not dir and os.getenv("HOMEDRIVE") and os.getenv("HOMEPATH"):
- dir = os.path.join(os.getenv("HOMEDRIVE"), os.getenv("HOMEPATH"))
- if not dir:
- dir = os.getenv("HOME")
- pass
- else:
- # for Linux: use HOME variable
- dir = os.getenv("HOME")
- pass
- return dir
+ return osp.realpath(osp.expanduser('~'))
+
+# ---
+
+def getLogDir():
+ """
+ Get directory that stores log files.
+ :return path to the log directory
+ """
+ return osp.join(getTmpDir(), 'logs', getUserName())
+
# ---
def getTmpDir():
"""
- Get directory to be used for the temporary files.
+ Get directory to store temporary files.
+ :return temporary directory path
"""
- import os, sys
- if sys.platform == "win32":
- # for Windows: temporarily using home directory for tmp files;
- # to be replaced with TEMP environment variable later...
- dir = os.getenv("HOME")
- else:
- # for Linux: use /tmp/logs/{user} folder
- dir = os.path.join( '/tmp', 'logs', getUserName() )
- pass
- return dir
+ with tempfile.NamedTemporaryFile() as tmp:
+ return osp.dirname(tmp.name)
+ return None
# ---
-def generateFileName( dir, prefix = None, suffix = None, extension = None,
- unique = False, separator = "_", hidden = False, **kwargs ):
- """
- Generate file name by sepecified parameters. If necessary, file name
- can be generated to be unique.
-
- Parameters:
- - dir : directory path
- - prefix : file prefix (not added by default)
- - suffix : file suffix (not added by default)
- - extension : file extension (not added by default)
- - unique : if this parameter is True, the unique file name is generated:
- in this case, if the file with the generated name already exists
- in the <dir> directory, an integer suffix is added to the end of the
- file name. This parameter is False by default.
- - separator : separator of the words ('_' by default)
- - hidden : if this parameter is True, the file name is prepended by . (dot)
- symbol. This parameter is False by default.
-
- Other keyword parameters are:
- - with_username : 'add user name' flag/option:
- * boolean value can be passed to determine user name automatically
- * string value to be used as user name
- - with_hostname : 'add host name' flag/option:
- * boolean value can be passed to determine host name automatically
- * string value to be used as host name
- - with_port : 'add port number' flag/option:
- * boolean value can be passed to determine port number automatically
- * string value to be used as port number
- - with_app : 'add application name' flag/option:
- * boolean value can be passed to determine application name automatically
- * string value to be used as application name
- All <with_...> parameters are optional.
- """
- supported = [ 'with_username', 'with_hostname', 'with_port', 'with_app' ]
- from launchConfigureParser import verbose
+# pragma pylint: disable=too-many-arguments
+def generateFileName(path, prefix=None, suffix=None, extension=None,
+ unique=False, separator='_', hidden=False, **kwargs):
+ """
+ Generate file name.
+
+ :param path : directory path
+ :param prefix : file name prefix (none by default)
+ :param suffix : file name suffix (none by default)
+ :param extension : file extension (none by default)
+ :param unique : if `True`, function generates unique file name -
+ in this case, if file with the generated name already
+ exists in `path` directory, an integer suffix is appended
+ to the file name (`False` by default)
+ :param separator : words separator ('_' by default)
+ :param hidden : if `True`, file name is prepended with dot symbol
+ (`False` by default)
+ :param kwargs : additional keywrods arguments (see below)
+ :return generated file name
+
+ Additionally supported keyword parameters:
+ - with_username : use user name:
+ - with_hostname : use host name:
+ - with_port : use port number:
+ - with_app : use application name:
+ - with_pid : use current pid
+
+ Any of these keyword arguments can accept either explicit string value,
+ or `True` to automatically deduce value from current configuration.
+ """
filename = []
- # separator
- if separator is None:
- separator = ""
- pass
- else:
- separator = str( separator )
- pass
- # prefix (if specified)
- if prefix is not None:
- filename.append( str( prefix ) )
- pass
- # additional keywords
- ### check unsupported parameters
- for kw in kwargs:
- if kw not in supported and verbose():
- print 'Warning! salome_utilitie.py: generateFileName(): parameter %s is not supported' % kw
- pass
- pass
- ### process supported keywords
- for kw in supported:
- if kw not in kwargs: continue
- ### user name
- if kw == 'with_username':
- try:
- # auto user name ?
- if _try_bool( kwargs[kw] ): filename.append( getUserName() )
- pass
- except:
- # user name given as parameter
- filename.append( kwargs[kw] )
- pass
- pass
- ### host name
- elif kw == 'with_hostname':
- try:
- # auto host name ?
- if _try_bool( kwargs[kw] ): filename.append( getShortHostName() )
- pass
- except:
- # host name given as parameter
- filename.append( kwargs[kw] )
- pass
- pass
- ### port number
- elif kw == 'with_port':
- try:
- # auto port number ?
- if _try_bool( kwargs[kw] ): filename.append( str( getPortNumber() ) )
- pass
- except:
- # port number given as parameter
- filename.append( str( kwargs[kw] ) )
- pass
- pass
- ### application name
- elif kw == 'with_app':
- try:
- # auto application name ?
- if _try_bool( kwargs[kw] ): filename.append( getAppName() )
- pass
- except:
- # application name given as parameter
- filename.append( kwargs[kw] )
- pass
- pass
- pass
- # suffix (if specified)
- if suffix is not None:
- filename.append( str( suffix ) )
- pass
+
+ def _with_str(_str):
+ _str = '' if _str is None else str(_str)
+ if _str:
+ filename.append(_str)
+
+ def _with_kwarg(_kwarg, _func):
+ _value = kwargs.get(_kwarg, False)
+ try:
+ if _try_bool(_value):
+ filename.append(str(_func()))
+ except ValueError:
+ _with_str(_value)
+
+ _with_str(prefix)
+ _with_kwarg('with_username', getUserName)
+ _with_kwarg('with_hostname', getShortHostName)
+ _with_kwarg('with_port', getPortNumber)
+ _with_kwarg('with_app', getAppName)
+ _with_kwarg('with_pid', getPid)
+ _with_str(suffix)
+
# raise an exception if file name is empty
if not filename:
- raise Exception("Empty file name")
- #
- if extension is not None and extension.startswith("."): extension = extension[1:]
- #
- import os
- name = separator.join( filename )
- if hidden: name = "." + name # add dot for hidden files
- if extension: name = name + "." + str( extension ) # add extension if defined
- name = os.path.join( dir, name )
+ raise ValueError('Empty file name')
+
+ # extension
+ extension = '' if extension is None else str(extension)
+ if extension.startswith('.'):
+ extension = extension[1:]
+
+ # separator
+ separator = '' if separator is None else str(separator)
+
+ def _generate(_index=None):
+ # join all components together, add index if necessary
+ if _index is not None:
+ _name = separator.join(filename+[str(_index)])
+ else:
+ _name = separator.join(filename)
+ # prepend with dot if necessary
+ if hidden:
+ _name = '.' + _name
+ # append extension if ncessary
+ if extension:
+ _name = _name + '.' + extension
+ # now get full path
+ return osp.join(path, _name)
+
+ name = _generate()
if unique:
- # create unique file name
index = 0
- while os.path.exists( name ):
+ while osp.exists(name):
index = index + 1
- name = separator.join( filename ) + separator + str( index )
- if hidden: name = "." + name # add dot for hidden files
- if extension: name = name + "." + str( extension ) # add extension if defined
- name = os.path.join( dir, name )
- pass
- pass
- return os.path.normpath(name)
+ name = _generate(index)
+ return osp.normpath(name)
# ---
-def makeTmpDir( path, mode=0777 ):
+def cleanDir(path):
"""
- Make temporary directory with the specified path.
- If the directory exists then clear its contents.
-
- Parameters:
- - path : absolute path to the directory to be created.
- - mode : access mode
- """
- import os
- if os.path.exists( path ):
- import sys
- if sys.platform == "win32":
- os.system( "rmdir /S /Q " + '"' + path + '"' )
- os.system( "mkdir " + '"' + path + '"' )
- else:
- os.system( "rm -rf " + path + "/*" )
- else:
- dirs = path.split("/")
- shift1 = shift2 = 0
- if not dirs[0]: shift1 = 1
- if dirs[-1]: shift2 = 1
- for i in range(1+shift1,len(dirs)+shift2):
- p = "/".join(dirs[:i])
- try:
- os.mkdir(p, mode)
- os.chmod(p, mode)
- except:
- pass
+ Clear contents of directory.
+ :param path directory path
+ """
+ if osp.exists(path):
+ for filename in os.listdir(path):
+ file_path = osp.join(path, filename)
+ with suppress(OSError):
+ if osp.isdir(file_path):
+ shutil.rmtree(file_path)
+ else:
+ os.unlink(file_path)
# ---
-def uniteFiles( src_file, dest_file ):
+def makeDir(path, mode=0o777):
+ """
+ Make directory with the specified path.
+ :param path : directory path
+ :param mode : access mode
"""
- Unite contents of the source file with contents of the destination file
- and put result of the uniting to the destination file.
- If the destination file does not exist then the source file is simply
- copied to its path.
+ try:
+ oldmask = os.umask(0)
+ os.makedirs(path, mode=mode, exist_ok=True)
+ except IOError:
+ pass
+ finally:
+ os.umask(oldmask)
+
+# ---
- Parameters:
- - src_file : absolute path to the source file
- - dest_file : absolute path to the destination file
+def makeTmpDir(path, mode=0o777):
"""
- import os
+ Make temporary directory with the specified path.
+ If the directory exists, clear all its contents.
+ :param path : directory path
+ :param mode : access mode
+ """
+ makeDir(path, mode)
+ cleanDir(path)
- if not os.path.exists( src_file ):
+# ---
+
+def uniteFiles(src_file, dest_file):
+ """
+ Join contents of `src_file` and `dest_file` and put result to `dest_file`.
+ File `dest_file` may not exist.
+ :param src_file : source file path
+ :param dest_file : destination file path
+ """
+ if not osp.exists(src_file):
return
- pass
- if os.path.exists( dest_file ):
- # add a symbol of new line to contents of the destination file (just in case)
- dest = open( dest_file, 'r' )
- dest_lines = dest.readlines()
- dest.close()
+ if osp.exists(dest_file):
+ with suppress(OSError), open(src_file, 'rb') as src, open(dest_file, 'ab') as dest:
+ dest.write(b'\n')
+ dest.write(src.read())
+ else:
+ with suppress(OSError):
+ shutil.copy(src_file, dest_file)
+
+# --
- dest_lines.append( "\n" )
+def verbose():
+ """
+ Get current verbosity level.
- dest = open( dest_file, 'w' )
- dest.writelines( dest_lines )
- dest.close()
+ Default verbosity level is specified via the environment variable SALOME_VERBOSE,
+ e.g. in bash:
- import sys
- if sys.platform == "win32":
- command = "type " + '"' + src_file + '"' + " >> " + '"' + dest_file + '"'
- else:
- command = "cat " + src_file + " >> " + dest_file
- pass
- pass
- else:
- import sys
- if sys.platform == "win32":
- command = "copy " + '"' + src_file + '"' + " " + '"' + dest_file + '"' + " > nul"
- else:
- command = "cp " + src_file + " " + dest_file
- pass
- pass
+ $ export SALOME_VERBOSE=1
- os.system( command )
+ The function `setVerbose()` can be used to explicitly set verbosity level.
+ :return current verbosity level
+ """
+ if not hasattr(verbose, 'verbosity_level'):
+ verbose.verbosity_level = 0 # default value
+ with suppress(TypeError, ValueError):
+ # from SALOME_VERBOSE environment variable
+ verbose.verbosity_level = int(os.getenv('SALOME_VERBOSE', '0'))
+ return verbose.verbosity_level
# --
-_verbose = None
+def setVerbose(level):
+ """
+ Change verbosity level.
+ The function `verbose()` can be used to get current verbosity level.
+ :param level : verbosity level
+ """
+ with suppress(TypeError, ValueError):
+ verbose.verbosity_level = int(level)
+# --
-def verbose():
+def killPid(pid, sig=9):
"""
- Get verbosity level. Default verbosity level is specified via the environment variable
- SALOME_VERBOSE, e.g.:
- [bash %] export SALOME_VERBOSE=1
- The function setVerbose() can be used to change verbosity level explicitly.
+ Send signal `sig` to the process with given `pid`.
+
+ :param pid : PID of the process
+ :param sig : signal to send; some of possible values:
+ - 9 : kill process
+ - 0 : do nothing, just check process existence (see below)
+ NOTE: other values are not processed on Windows
+ :return result of execution:
+ - 1 : success
+ - 0 : fail, no such process
+ - -1 : fail, another reason
"""
- global _verbose
- # verbose has already been called
- if _verbose is not None:
- return _verbose
- # first time
- try:
- from os import getenv
- _verbose = int(getenv('SALOME_VERBOSE'))
- except:
- _verbose = 0
- pass
- #
- return _verbose
+ if not pid:
+ return -1
-def setVerbose(level):
+ with suppress(ValueError):
+ pid = int(pid)
+
+ if sig == 0:
+ ret = 1 if psutil.pid_exists(pid) else 0
+ else:
+ if verbose():
+ print("######## killPid pid = ", pid)
+ try:
+ process = psutil.Process(pid)
+ process.terminate()
+ _, alive = psutil.wait_procs([process], timeout=5)
+ for proc in alive:
+ proc.kill()
+ ret = 1
+ except psutil.NoSuchProcess:
+ ret = 0
+ except OSError:
+ ret = -1
+ return ret
+# --
+
+def getOmniNamesPid(port):
"""
- Change verbosity level. The function verbose() can be used to get current verbosity level.
+ Get PID of omniNames process running on given `port`.
+ :param port : port number
+ :return omniNames process's PID
"""
- global _verbose
- _verbose = level
- return
+ processes = {p.info['pid']: p.info['name'] for p in psutil.process_iter(['pid', 'name'])}
+ return next((c.pid for c in psutil.net_connections(kind='inet') \
+ if str(c.laddr.port) == str(port) and processes.get(c.pid).startswith('omniNames')), None)
+# --
+def killOmniNames(port):
+ """
+ Kill omniNames process running on given `port`.
+ :param port : port number
+ """
+ with suppress(Exception):
+ killPid(getOmniNamesPid(port))
+# --