1 # -*- coding: iso-8859-1 -*-
2 # Copyright (C) 2007-2024 CEA, EDF, OPEN CASCADE
4 # This library is free software; you can redistribute it and/or
5 # modify it under the terms of the GNU Lesser General Public
6 # License as published by the Free Software Foundation; either
7 # version 2.1 of the License, or (at your option) any later version.
9 # This library is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 # Lesser General Public License for more details.
14 # You should have received a copy of the GNU Lesser General Public
15 # License along with this library; if not, write to the Free Software
16 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 ## @package salome_utils
22 # @brief Set of utility functions used by SALOME python scripts.
25 Various utilities for SALOME.
28 # pragma pylint: disable=invalid-name
38 from contextlib import suppress
42 ## Setting formatter in setVerbose() was commented because adding of handler
43 ## breaks using of root logger in other modules and cause many double lines in logs.
44 #FORMAT = '%(levelname)s : %(asctime)s : [%(filename)s:%(funcName)s:%(lineno)s] : %(message)s'
45 #logging.basicConfig(format=FORMAT)
46 logger = logging.getLogger()
50 Convert given `arg` to a boolean value.
51 String values like 'True', 'TRUE', 'YES', 'Yes', 'y', 'NO', 'false', 'n', etc.
53 If `arg` does not represent a boolean, an exception is raised.
54 :param arg : value being converted
55 :return result of conversion: `True` or `False`
57 if isinstance(arg, bool):
59 if isinstance(arg, bytes):
60 arg = arg.decode('utf-8', errors='ignore')
61 if isinstance(arg, str):
62 if arg.lower() in ('yes', 'y', 'true', 't'):
64 if arg.lower() in ('no', 'n', 'false', 'f'):
66 raise ValueError('Not a boolean value')
72 Get current omniORB configuration.
74 The information is retrieved from the omniORB configuration file defined
75 by the OMNIORB_CONFIG environment variable.
76 If omniORB configuration file can not be accessed, a tuple of three empty
79 :return tuple of three strings: (orb_version, host_name, port_number)
84 with suppress(Exception), open(os.getenv('OMNIORB_CONFIG')) as forb:
85 regvar = re.compile(r'(ORB)?InitRef.*corbaname::(.*):(\d+)\s*$')
86 for line in forb.readlines():
87 match = regvar.match(line)
89 orb_version = '4' if match.group(1) is None else '3'
90 hostname = match.group(2)
91 port_number = match.group(3)
93 return orb_version, hostname, port_number
97 def getHostFromORBcfg():
99 Get current omniORB host name.
102 return getORBcfgInfo()[1]
106 def getPortFromORBcfg():
108 Get current omniORB port.
111 return getORBcfgInfo()[2]
119 The following procedure is perfomed to deduce user name:
120 1. try USER (USERNAME on Windows) environment variable.
121 2. if (1) fails, try LOGNAME (un*x only).
122 3. if (2) fails, return 'unknown' as default user name
126 if sys.platform == 'win32':
127 username = os.getenv('USERNAME')
129 username = os.getenv('USER', os.getenv('LOGNAME'))
132 username = getpass.getuser()
141 The following procedure is perfomed to deduce host name:
142 1. try socket python module, gethostname() function
143 2. if (1) fails, try HOSTNAME environment variable
144 3. if (2) fails, try HOST environment variable
145 4. if (3) fails, tries 'unknown' as default host name
146 5. finally, checks that IP is configured for hostname; if not, returns 'localhost'
151 with suppress(Exception):
152 host = socket.gethostname()
154 host = os.getenv('HOSTNAME', os.getenv('HOST', 'unknown'))
156 # the following line just checks that IP is configured for hostname
157 socket.gethostbyname(host)
158 except (TypeError, OSError):
164 def getShortHostName():
166 Get short host name (with domain stripped).
167 See `getHostName()` for more details.
168 :return short host name
170 with suppress(AttributeError, IndexError):
171 return getHostName().split('.')[0]
172 return 'unknown' # default host name
178 Get application name.
179 The following procedure is perfomed to deduce application name:
180 1. try APPNAME environment variable
181 2. if (1) fails, return 'SALOME' as default application name
182 :return application name
184 return os.getenv('APPNAME', 'SALOME') # 'SALOME' is default user name
191 def getPortNumber(use_default=True):
193 Get currently used omniORB port.
194 The following procedure is perfomed to deduce port number:
195 1. try NSPORT environment variable
196 2. if (1) fails, try to parse config file defined by OMNIORB_CONFIG environment variable
197 3. if (2) fails, return 2809 as default port number (if use_default is `True`) or `None`
198 (if use_default is `False`)
201 with suppress(TypeError, ValueError):
202 return int(os.getenv('NSPORT'))
203 with suppress(TypeError, ValueError):
204 port = int(getPortFromORBcfg())
207 return 2809 if use_default else None
214 :return home directory path
216 return osp.realpath(osp.expanduser('~'))
222 Get directory that stores log files.
223 :return path to the log directory
225 return osp.join(getTmpDir(), 'logs', getUserName())
231 Get directory to store temporary files.
232 :return temporary directory path
234 with tempfile.NamedTemporaryFile() as tmp:
235 return osp.dirname(tmp.name)
240 # pragma pylint: disable=too-many-arguments
241 def generateFileName(path, prefix=None, suffix=None, extension=None,
242 unique=False, separator='_', hidden=False, **kwargs):
246 :param path : directory path
247 :param prefix : file name prefix (none by default)
248 :param suffix : file name suffix (none by default)
249 :param extension : file extension (none by default)
250 :param unique : if `True`, function generates unique file name -
251 in this case, if file with the generated name already
252 exists in `path` directory, an integer suffix is appended
253 to the file name (`False` by default)
254 :param separator : words separator ('_' by default)
255 :param hidden : if `True`, file name is prepended with dot symbol
257 :param kwargs : additional keywrods arguments (see below)
258 :return generated file name
260 Additionally supported keyword parameters:
261 - with_username : use user name:
262 - with_hostname : use host name:
263 - with_port : use port number:
264 - with_app : use application name:
265 - with_pid : use current pid
267 Any of these keyword arguments can accept either explicit string value,
268 or `True` to automatically deduce value from current configuration.
273 _str = '' if _str is None else str(_str)
275 filename.append(_str)
277 def _with_kwarg(_kwarg, _func):
278 _value = kwargs.get(_kwarg, False)
280 if _try_bool(_value):
281 filename.append(str(_func()))
286 _with_kwarg('with_username', getUserName)
287 _with_kwarg('with_hostname', getShortHostName)
288 _with_kwarg('with_port', getPortNumber)
289 _with_kwarg('with_app', getAppName)
290 _with_kwarg('with_pid', getPid)
293 # raise an exception if file name is empty
295 raise ValueError('Empty file name')
298 extension = '' if extension is None else str(extension)
299 if extension.startswith('.'):
300 extension = extension[1:]
303 separator = '' if separator is None else str(separator)
305 def _generate(_index=None):
306 # join all components together, add index if necessary
307 if _index is not None:
308 _name = separator.join(filename+[str(_index)])
310 _name = separator.join(filename)
311 # prepend with dot if necessary
314 # append extension if ncessary
316 _name = _name + '.' + extension
318 return osp.join(path, _name)
323 while osp.exists(name):
325 name = _generate(index)
326 return osp.normpath(name)
332 Clear contents of directory.
333 :param path directory path
336 for filename in os.listdir(path):
337 file_path = osp.join(path, filename)
338 with suppress(OSError):
339 if osp.isdir(file_path):
340 shutil.rmtree(file_path)
346 def makeDir(path, mode=0o777):
348 Make directory with the specified path.
349 :param path : directory path
350 :param mode : access mode
353 oldmask = os.umask(0)
354 os.makedirs(path, mode=mode, exist_ok=True)
362 def makeTmpDir(path, mode=0o777):
364 Make temporary directory with the specified path.
365 If the directory exists, clear all its contents.
366 :param path : directory path
367 :param mode : access mode
374 def uniteFiles(src_file, dest_file):
376 Join contents of `src_file` and `dest_file` and put result to `dest_file`.
377 File `dest_file` may not exist.
378 :param src_file : source file path
379 :param dest_file : destination file path
381 if not osp.exists(src_file):
384 if osp.exists(dest_file):
385 with suppress(OSError), open(src_file, 'rb') as src, open(dest_file, 'ab') as dest:
387 dest.write(src.read())
389 with suppress(OSError):
390 shutil.copy(src_file, dest_file)
394 class ColoredFormatter(logging.Formatter):
395 BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(30,38)
396 COLORS = { 'WARNING': YELLOW, 'INFO': WHITE, 'DEBUG': BLUE, 'CRITICAL': YELLOW, 'ERROR': RED }
397 def __init__(self, *args, **kwargs):
398 logging.Formatter.__init__(self, *args, **kwargs)
399 def format(self, record):
400 RESET_SEQ = "\033[0m"
401 COLOR_SEQ = "\033[1;%dm"
403 frame = inspect.currentframe()
406 record.levelname = COLOR_SEQ % ColoredFormatter.COLORS[record.levelname] + record.levelname + RESET_SEQ
407 record.msg = "{} ( callsite is {} of file \"{}\" at line {} )".format(record.msg, frame.f_code.co_name,inspect.getsourcefile(frame),inspect.getlineno(frame) )
408 return logging.Formatter.format(self, record)
410 class BackTraceFormatter(logging.Formatter):
411 def __init__(self, *args, **kwargs):
412 logging.Formatter.__init__(self, *args, **kwargs)
413 def format(self, record):
415 frame = inspect.currentframe()
416 # go upward with ( a limit of 10 steps ) of the stack to catch the effective callsite. Not very steady....
417 # should be replaced by an analysis of frame.f_code
420 if inspect.getsourcefile(frame) != logging.__file__:
422 record.msg = "{} ( callsite is {} of file \"{}\" at line {} )".format(record.msg, frame.f_code.co_name,inspect.getsourcefile(frame),inspect.getlineno(frame) )
423 return logging.Formatter.format(self, record)
425 def positionVerbosityOfLogger( verboseLevel ):
426 from packaging import version
427 current_version = version.parse("{}.{}".format(sys.version_info.major,sys.version_info.minor))
428 version_ref = version.parse("3.5.0")
431 if current_version >= version_ref:
432 formatter = BackTraceFormatter('%(levelname)s : %(asctime)s : %(message)s ',style='%')
434 formatter = logging.Formatter('%(levelname)s : %(asctime)s : %(message)s ',style='%')
435 formatter.default_time_format = '%H:%M:%S'
436 formatter.default_msec_format = "%s.%03d"
437 stream_handler = logging.StreamHandler()
438 stream_handler.setFormatter(formatter)
439 logger.addHandler(stream_handler)
440 logger.setLevel(verboseLevel)
442 def positionVerbosityOfLoggerRegardingState():
443 positionVerbosityOfLogger( verboseLevel() )
447 Get current verbosity activation.
449 Default verbosity level is specified via the environment variable SALOME_VERBOSE,
452 $ export SALOME_VERBOSE=1
454 The function `setVerbose()` can be used to explicitly set verbosity activation.
456 :return current verbosity level
459 return KernelBasis.VerbosityActivated()
463 def setVerbose(status):
465 Change verbosity activation status.
466 The function `verbose()` can be used to get current verbosity level.
467 :param status : verbosity status
471 return KernelBasis.SetVerbosityActivated( status )
475 KernelLogLevelToLogging = {"INFO":logging.INFO, "DEBUG":logging.DEBUG, "WARNING":logging.WARNING, "ERROR":logging.ERROR}
477 LoggingToKernelLogLevel = {v: k for k, v in KernelLogLevelToLogging.items()}
481 Get current verbosity level.
483 Default verbosity level is specified via the environment variable SALOME_VERBOSE,
486 $ export SALOME_VERBOSE_LEVEL=7
488 The function `setVerboseLevel()` can be used to explicitly set verbosity level.
490 :return current verbosity level
493 return KernelLogLevelToLogging[ KernelBasis.VerbosityLevel() ]
495 def setVerboseLevel(level):
497 Change verbosity level.
498 The function `verboseLevel()` can be used to get current verbosity level.
499 :param level : verbosity level
502 KernelBasis.SetVerbosityLevel(LoggingToKernelLogLevel[ level ])
506 def killPid(pid, sig=9):
508 Send signal `sig` to the process with given `pid`.
510 :param pid : PID of the process
511 :param sig : signal to send; some of possible values:
513 - 0 : do nothing, just check process existence (see below)
514 NOTE: other values are not processed on Windows
515 :return result of execution:
517 - 0 : fail, no such process
518 - -1 : fail, another reason
523 with suppress(ValueError):
527 ret = 1 if psutil.pid_exists(pid) else 0
530 print("######## killPid pid = ", pid)
532 process = psutil.Process(pid)
534 _, alive = psutil.wait_procs([process], timeout=5)
538 except psutil.NoSuchProcess:
545 def getOmniNamesPid(port):
547 Get PID of omniNames process running on given `port`.
548 :param port : port number
549 :return omniNames process's PID
551 processes = {p.info['pid']: p.info['name'] for p in psutil.process_iter(['pid', 'name'])}
552 return next((c.pid for c in psutil.net_connections(kind='inet') \
553 if str(c.laddr.port) == str(port) and processes.get(c.pid).startswith('omniNames')), None)
556 def killOmniNames(port):
558 Kill omniNames process running on given `port`.
559 :param port : port number
561 with suppress(Exception):
562 killPid(getOmniNamesPid(port))