1 # -*- coding: iso-8859-1 -*-
2 # Copyright (C) 2007-2021 CEA/DEN, EDF R&D, OPEN CASCADE
4 # This library is free software; you can redistribute it and/or
5 # modify it under the terms of the GNU Lesser General Public
6 # License as published by the Free Software Foundation; either
7 # version 2.1 of the License, or (at your option) any later version.
9 # This library is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 # Lesser General Public License for more details.
14 # You should have received a copy of the GNU Lesser General Public
15 # License along with this library; if not, write to the Free Software
16 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 ## @package salome_utils
22 # @brief Set of utility functions used by SALOME python scripts.
25 Various utilities for SALOME.
28 # pragma pylint: disable=invalid-name
37 from contextlib import suppress
43 Convert given `arg` to a boolean value.
44 String values like 'True', 'TRUE', 'YES', 'Yes', 'y', 'NO', 'false', 'n', etc.
46 If `arg` does not represent a boolean, an exception is raised.
47 :param arg : value being converted
48 :return result of conversion: `True` or `False`
50 if isinstance(arg, bool):
52 if isinstance(arg, bytes):
53 arg = arg.decode('utf-8', errors='ignore')
54 if isinstance(arg, str):
55 if arg.lower() in ('yes', 'y', 'true', 't'):
57 if arg.lower() in ('no', 'n', 'false', 'f'):
59 raise ValueError('Not a boolean value')
65 Get current omniORB configuration.
67 The information is retrieved from the omniORB configuration file defined
68 by the OMNIORB_CONFIG environment variable.
69 If omniORB configuration file can not be accessed, a tuple of three empty
72 :return tuple of three strings: (orb_version, host_name, port_number)
77 with suppress(Exception), open(os.getenv('OMNIORB_CONFIG')) as forb:
78 regvar = re.compile(r'(ORB)?InitRef.*corbaname::(.*):(\d+)\s*$')
79 for line in forb.readlines():
80 match = regvar.match(line)
82 orb_version = '4' if match.group(1) is None else '3'
83 hostname = match.group(2)
84 port_number = match.group(3)
86 return orb_version, hostname, port_number
90 def getHostFromORBcfg():
92 Get current omniORB host name.
95 return getORBcfgInfo()[1]
99 def getPortFromORBcfg():
101 Get current omniORB port.
104 return getORBcfgInfo()[2]
112 The following procedure is perfomed to deduce user name:
113 1. try USER (USERNAME on Windows) environment variable.
114 2. if (1) fails, try LOGNAME (un*x only).
115 3. if (2) fails, return 'unknown' as default user name
119 return os.getenv('USERNAME', 'unknown') if sys.platform == 'win32' \
120 else os.getenv('USER', os.getenv('LOGNAME', 'unknown'))
128 The following procedure is perfomed to deduce host name:
129 1. try socket python module, gethostname() function
130 2. if (1) fails, try HOSTNAME environment variable
131 3. if (2) fails, try HOST environment variable
132 4. if (3) fails, tries 'unknown' as default host name
133 5. finally, checks that IP is configured for hostname; if not, returns 'localhost'
138 with suppress(Exception):
139 host = socket.gethostname()
141 host = os.getenv('HOSTNAME', os.getenv('HOST', 'unknown'))
143 # the following line just checks that IP is configured for hostname
144 socket.gethostbyname(host)
145 except (TypeError, OSError):
151 def getShortHostName():
153 Get short host name (with domain stripped).
154 See `getHostName()` for more details.
155 :return short host name
157 with suppress(AttributeError, IndexError):
158 return getHostName().split('.')[0]
159 return 'unknown' # default host name
165 Get application name.
166 The following procedure is perfomed to deduce application name:
167 1. try APPNAME environment variable
168 2. if (1) fails, return 'SALOME' as default application name
169 :return application name
171 return os.getenv('APPNAME', 'SALOME') # 'SALOME' is default user name
178 def getPortNumber(use_default=True):
180 Get currently used omniORB port.
181 The following procedure is perfomed to deduce port number:
182 1. try NSPORT environment variable
183 2. if (1) fails, try to parse config file defined by OMNIORB_CONFIG environment variable
184 3. if (2) fails, return 2809 as default port number (if use_default is `True`) or `None`
185 (if use_default is `False`)
188 with suppress(TypeError, ValueError):
189 return int(os.getenv('NSPORT'))
190 with suppress(TypeError, ValueError):
191 port = int(getPortFromORBcfg())
194 return 2809 if use_default else None
201 :return home directory path
203 return osp.realpath(osp.expanduser('~'))
209 Get directory that stores log files.
210 :return path to the log directory
212 return osp.join(getTmpDir(), 'logs', getUserName())
218 Get directory to store temporary files.
219 :return temporary directory path
221 with tempfile.NamedTemporaryFile() as tmp:
222 return osp.dirname(tmp.name)
227 # pragma pylint: disable=too-many-arguments
228 def generateFileName(path, prefix=None, suffix=None, extension=None,
229 unique=False, separator='_', hidden=False, **kwargs):
233 :param path : directory path
234 :param prefix : file name prefix (none by default)
235 :param suffix : file name suffix (none by default)
236 :param extension : file extension (none by default)
237 :param unique : if `True`, function generates unique file name -
238 in this case, if file with the generated name already
239 exists in `path` directory, an integer suffix is appended
240 to the file name (`False` by default)
241 :param separator : words separator ('_' by default)
242 :param hidden : if `True`, file name is prepended with dot symbol
244 :param kwargs : additional keywrods arguments (see below)
245 :return generated file name
247 Additionally supported keyword parameters:
248 - with_username : use user name:
249 - with_hostname : use host name:
250 - with_port : use port number:
251 - with_app : use application name:
252 - with_pid : use current pid
254 Any of these keyword arguments can accept either explicit string value,
255 or `True` to automatically deduce value from current configuration.
260 _str = '' if _str is None else str(_str)
262 filename.append(_str)
264 def _with_kwarg(_kwarg, _func):
265 _value = kwargs.get(_kwarg, False)
267 if _try_bool(_value):
268 filename.append(str(_func()))
273 _with_kwarg('with_username', getUserName)
274 _with_kwarg('with_hostname', getShortHostName)
275 _with_kwarg('with_port', getPortNumber)
276 _with_kwarg('with_app', getAppName)
277 _with_kwarg('with_pid', getPid)
280 # raise an exception if file name is empty
282 raise ValueError('Empty file name')
285 extension = '' if extension is None else str(extension)
286 if extension.startswith('.'):
287 extension = extension[1:]
290 separator = '' if separator is None else str(separator)
292 def _generate(_index=None):
293 # join all components together, add index if necessary
294 if _index is not None:
295 _name = separator.join(filename+[str(_index)])
297 _name = separator.join(filename)
298 # prepend with dot if necessary
301 # append extension if ncessary
303 _name = _name + '.' + extension
305 return osp.join(path, _name)
310 while osp.exists(name):
312 name = _generate(index)
313 return osp.normpath(name)
319 Clear contents of directory.
320 :param path directory path
323 for filename in os.listdir(path):
324 file_path = osp.join(path, filename)
325 with suppress(OSError):
326 if osp.isdir(file_path):
327 shutil.rmtree(file_path)
333 def makeDir(path, mode=0o777):
335 Make directory with the specified path.
336 :param path : directory path
337 :param mode : access mode
340 oldmask = os.umask(0)
341 os.makedirs(path, mode=mode, exist_ok=True)
349 def makeTmpDir(path, mode=0o777):
351 Make temporary directory with the specified path.
352 If the directory exists, clear all its contents.
353 :param path : directory path
354 :param mode : access mode
361 def uniteFiles(src_file, dest_file):
363 Join contents of `src_file` and `dest_file` and put result to `dest_file`.
364 File `dest_file` may not exist.
365 :param src_file : source file path
366 :param dest_file : destination file path
368 if not osp.exists(src_file):
371 if osp.exists(dest_file):
372 with suppress(OSError), open(src_file, 'rb') as src, open(dest_file, 'ab') as dest:
374 dest.write(src.read())
376 with suppress(OSError):
377 shutil.copy(src_file, dest_file)
383 Get current verbosity level.
385 Default verbosity level is specified via the environment variable SALOME_VERBOSE,
388 $ export SALOME_VERBOSE=1
390 The function `setVerbose()` can be used to explicitly set verbosity level.
392 :return current verbosity level
394 if not hasattr(verbose, 'verbosity_level'):
395 verbose.verbosity_level = 0 # default value
396 with suppress(TypeError, ValueError):
397 # from SALOME_VERBOSE environment variable
398 verbose.verbosity_level = int(os.getenv('SALOME_VERBOSE', '0'))
399 return verbose.verbosity_level
402 def setVerbose(level):
404 Change verbosity level.
405 The function `verbose()` can be used to get current verbosity level.
406 :param level : verbosity level
408 with suppress(TypeError, ValueError):
409 verbose.verbosity_level = int(level)
412 def killPid(pid, sig=9):
414 Send signal `sig` to the process with given `pid`.
416 :param pid : PID of the process
417 :param sig : signal to send; some of possible values:
419 - 0 : do nothing, just check process existence (see below)
420 NOTE: other values are not processed on Windows
421 :return result of execution:
423 - 0 : fail, no such process
424 - -1 : fail, another reason
429 with suppress(ValueError):
433 ret = 1 if psutil.pid_exists(pid) else 0
436 print("######## killPid pid = ", pid)
438 process = psutil.Process(pid)
440 _, alive = psutil.wait_procs([process], timeout=5)
444 except psutil.NoSuchProcess:
451 def getOmniNamesPid(port):
453 Get PID of omniNames process running on given `port`.
454 :param port : port number
455 :return omniNames process's PID
457 processes = {p.info['pid']: p.info['name'] for p in psutil.process_iter(['pid', 'name'])}
458 return next((c.pid for c in psutil.net_connections(kind='inet') \
459 if str(c.laddr.port) == str(port) and processes.get(c.pid).startswith('omniNames')), None)
462 def killOmniNames(port):
464 Kill omniNames process running on given `port`.
465 :param port : port number
467 with suppress(Exception):
468 killPid(getOmniNamesPid(port))