1 # -*- coding: iso-8859-1 -*-
2 # Copyright (C) 2007-2022 CEA/DEN, EDF R&D, OPEN CASCADE
4 # This library is free software; you can redistribute it and/or
5 # modify it under the terms of the GNU Lesser General Public
6 # License as published by the Free Software Foundation; either
7 # version 2.1 of the License, or (at your option) any later version.
9 # This library is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 # Lesser General Public License for more details.
14 # You should have received a copy of the GNU Lesser General Public
15 # License along with this library; if not, write to the Free Software
16 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 ## @package salome_utils
22 # @brief Set of utility functions used by SALOME python scripts.
25 Various utilities for SALOME.
28 # pragma pylint: disable=invalid-name
37 from contextlib import suppress
43 Convert given `arg` to a boolean value.
44 String values like 'True', 'TRUE', 'YES', 'Yes', 'y', 'NO', 'false', 'n', etc.
46 If `arg` does not represent a boolean, an exception is raised.
47 :param arg : value being converted
48 :return result of conversion: `True` or `False`
50 if isinstance(arg, bool):
52 if isinstance(arg, bytes):
53 arg = arg.decode('utf-8', errors='ignore')
54 if isinstance(arg, str):
55 if arg.lower() in ('yes', 'y', 'true', 't'):
57 if arg.lower() in ('no', 'n', 'false', 'f'):
59 raise ValueError('Not a boolean value')
65 Get current omniORB configuration.
67 The information is retrieved from the omniORB configuration file defined
68 by the OMNIORB_CONFIG environment variable.
69 If omniORB configuration file can not be accessed, a tuple of three empty
72 :return tuple of three strings: (orb_version, host_name, port_number)
77 with suppress(Exception), open(os.getenv('OMNIORB_CONFIG')) as forb:
78 regvar = re.compile(r'(ORB)?InitRef.*corbaname::(.*):(\d+)\s*$')
79 for line in forb.readlines():
80 match = regvar.match(line)
82 orb_version = '4' if match.group(1) is None else '3'
83 hostname = match.group(2)
84 port_number = match.group(3)
86 return orb_version, hostname, port_number
90 def getHostFromORBcfg():
92 Get current omniORB host name.
95 return getORBcfgInfo()[1]
99 def getPortFromORBcfg():
101 Get current omniORB port.
104 return getORBcfgInfo()[2]
112 The following procedure is perfomed to deduce user name:
113 1. try USER (USERNAME on Windows) environment variable.
114 2. if (1) fails, try LOGNAME (un*x only).
115 3. if (2) fails, return 'unknown' as default user name
119 if sys.platform == 'win32':
120 username = os.getenv('USERNAME')
122 username = os.getenv('USER', os.getenv('LOGNAME'))
125 username = getpass.getuser()
134 The following procedure is perfomed to deduce host name:
135 1. try socket python module, gethostname() function
136 2. if (1) fails, try HOSTNAME environment variable
137 3. if (2) fails, try HOST environment variable
138 4. if (3) fails, tries 'unknown' as default host name
139 5. finally, checks that IP is configured for hostname; if not, returns 'localhost'
144 with suppress(Exception):
145 host = socket.gethostname()
147 host = os.getenv('HOSTNAME', os.getenv('HOST', 'unknown'))
149 # the following line just checks that IP is configured for hostname
150 socket.gethostbyname(host)
151 except (TypeError, OSError):
157 def getShortHostName():
159 Get short host name (with domain stripped).
160 See `getHostName()` for more details.
161 :return short host name
163 with suppress(AttributeError, IndexError):
164 return getHostName().split('.')[0]
165 return 'unknown' # default host name
171 Get application name.
172 The following procedure is perfomed to deduce application name:
173 1. try APPNAME environment variable
174 2. if (1) fails, return 'SALOME' as default application name
175 :return application name
177 return os.getenv('APPNAME', 'SALOME') # 'SALOME' is default user name
184 def getPortNumber(use_default=True):
186 Get currently used omniORB port.
187 The following procedure is perfomed to deduce port number:
188 1. try NSPORT environment variable
189 2. if (1) fails, try to parse config file defined by OMNIORB_CONFIG environment variable
190 3. if (2) fails, return 2809 as default port number (if use_default is `True`) or `None`
191 (if use_default is `False`)
194 with suppress(TypeError, ValueError):
195 return int(os.getenv('NSPORT'))
196 with suppress(TypeError, ValueError):
197 port = int(getPortFromORBcfg())
200 return 2809 if use_default else None
207 :return home directory path
209 return osp.realpath(osp.expanduser('~'))
215 Get directory that stores log files.
216 :return path to the log directory
218 return osp.join(getTmpDir(), 'logs', getUserName())
224 Get directory to store temporary files.
225 :return temporary directory path
227 with tempfile.NamedTemporaryFile() as tmp:
228 return osp.dirname(tmp.name)
233 # pragma pylint: disable=too-many-arguments
234 def generateFileName(path, prefix=None, suffix=None, extension=None,
235 unique=False, separator='_', hidden=False, **kwargs):
239 :param path : directory path
240 :param prefix : file name prefix (none by default)
241 :param suffix : file name suffix (none by default)
242 :param extension : file extension (none by default)
243 :param unique : if `True`, function generates unique file name -
244 in this case, if file with the generated name already
245 exists in `path` directory, an integer suffix is appended
246 to the file name (`False` by default)
247 :param separator : words separator ('_' by default)
248 :param hidden : if `True`, file name is prepended with dot symbol
250 :param kwargs : additional keywrods arguments (see below)
251 :return generated file name
253 Additionally supported keyword parameters:
254 - with_username : use user name:
255 - with_hostname : use host name:
256 - with_port : use port number:
257 - with_app : use application name:
258 - with_pid : use current pid
260 Any of these keyword arguments can accept either explicit string value,
261 or `True` to automatically deduce value from current configuration.
266 _str = '' if _str is None else str(_str)
268 filename.append(_str)
270 def _with_kwarg(_kwarg, _func):
271 _value = kwargs.get(_kwarg, False)
273 if _try_bool(_value):
274 filename.append(str(_func()))
279 _with_kwarg('with_username', getUserName)
280 _with_kwarg('with_hostname', getShortHostName)
281 _with_kwarg('with_port', getPortNumber)
282 _with_kwarg('with_app', getAppName)
283 _with_kwarg('with_pid', getPid)
286 # raise an exception if file name is empty
288 raise ValueError('Empty file name')
291 extension = '' if extension is None else str(extension)
292 if extension.startswith('.'):
293 extension = extension[1:]
296 separator = '' if separator is None else str(separator)
298 def _generate(_index=None):
299 # join all components together, add index if necessary
300 if _index is not None:
301 _name = separator.join(filename+[str(_index)])
303 _name = separator.join(filename)
304 # prepend with dot if necessary
307 # append extension if ncessary
309 _name = _name + '.' + extension
311 return osp.join(path, _name)
316 while osp.exists(name):
318 name = _generate(index)
319 return osp.normpath(name)
325 Clear contents of directory.
326 :param path directory path
329 for filename in os.listdir(path):
330 file_path = osp.join(path, filename)
331 with suppress(OSError):
332 if osp.isdir(file_path):
333 shutil.rmtree(file_path)
339 def makeDir(path, mode=0o777):
341 Make directory with the specified path.
342 :param path : directory path
343 :param mode : access mode
346 oldmask = os.umask(0)
347 os.makedirs(path, mode=mode, exist_ok=True)
355 def makeTmpDir(path, mode=0o777):
357 Make temporary directory with the specified path.
358 If the directory exists, clear all its contents.
359 :param path : directory path
360 :param mode : access mode
367 def uniteFiles(src_file, dest_file):
369 Join contents of `src_file` and `dest_file` and put result to `dest_file`.
370 File `dest_file` may not exist.
371 :param src_file : source file path
372 :param dest_file : destination file path
374 if not osp.exists(src_file):
377 if osp.exists(dest_file):
378 with suppress(OSError), open(src_file, 'rb') as src, open(dest_file, 'ab') as dest:
380 dest.write(src.read())
382 with suppress(OSError):
383 shutil.copy(src_file, dest_file)
389 Get current verbosity level.
391 Default verbosity level is specified via the environment variable SALOME_VERBOSE,
394 $ export SALOME_VERBOSE=1
396 The function `setVerbose()` can be used to explicitly set verbosity level.
398 :return current verbosity level
400 if not hasattr(verbose, 'verbosity_level'):
401 verbose.verbosity_level = 0 # default value
402 with suppress(TypeError, ValueError):
403 # from SALOME_VERBOSE environment variable
404 verbose.verbosity_level = int(os.getenv('SALOME_VERBOSE', '0'))
405 return verbose.verbosity_level
408 def setVerbose(level):
410 Change verbosity level.
411 The function `verbose()` can be used to get current verbosity level.
412 :param level : verbosity level
414 with suppress(TypeError, ValueError):
415 verbose.verbosity_level = int(level)
418 def killPid(pid, sig=9):
420 Send signal `sig` to the process with given `pid`.
422 :param pid : PID of the process
423 :param sig : signal to send; some of possible values:
425 - 0 : do nothing, just check process existence (see below)
426 NOTE: other values are not processed on Windows
427 :return result of execution:
429 - 0 : fail, no such process
430 - -1 : fail, another reason
435 with suppress(ValueError):
439 ret = 1 if psutil.pid_exists(pid) else 0
442 print("######## killPid pid = ", pid)
444 process = psutil.Process(pid)
446 _, alive = psutil.wait_procs([process], timeout=5)
450 except psutil.NoSuchProcess:
457 def getOmniNamesPid(port):
459 Get PID of omniNames process running on given `port`.
460 :param port : port number
461 :return omniNames process's PID
463 processes = {p.info['pid']: p.info['name'] for p in psutil.process_iter(['pid', 'name'])}
464 return next((c.pid for c in psutil.net_connections(kind='inet') \
465 if str(c.laddr.port) == str(port) and processes.get(c.pid).startswith('omniNames')), None)
468 def killOmniNames(port):
470 Kill omniNames process running on given `port`.
471 :param port : port number
473 with suppress(Exception):
474 killPid(getOmniNamesPid(port))