Salome HOME
9d59f6ab39444b94410193f45cb33ddd3e8fa8be
[modules/kernel.git] / bin / salome_utils.py
1 #  -*- coding: iso-8859-1 -*-
2 # Copyright (C) 2007-2024  CEA, EDF, OPEN CASCADE
3 #
4 # This library is free software; you can redistribute it and/or
5 # modify it under the terms of the GNU Lesser General Public
6 # License as published by the Free Software Foundation; either
7 # version 2.1 of the License, or (at your option) any later version.
8 #
9 # This library is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12 # Lesser General Public License for more details.
13 #
14 # You should have received a copy of the GNU Lesser General Public
15 # License along with this library; if not, write to the Free Software
16 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
17 #
18 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
19 #
20
21 ## @package salome_utils
22 #  @brief Set of utility functions used by SALOME python scripts.
23
24 """
25 Various utilities for SALOME.
26 """
27
28 # pragma pylint: disable=invalid-name
29
30 import os
31 import os.path as osp
32 import re
33 import shutil
34 import socket
35 import sys
36 import tempfile
37 import logging
38 from contextlib import suppress
39
40 import psutil
41
42 ## Setting formatter in setVerbose() was commented because adding of handler
43 ## breaks using of root logger in other modules and cause many double lines in logs.
44 #FORMAT = '%(levelname)s : %(asctime)s : [%(filename)s:%(funcName)s:%(lineno)s] : %(message)s'
45 #logging.basicConfig(format=FORMAT)
46 logger = logging.getLogger()
47
48 def _try_bool(arg):
49     """
50     Convert given `arg` to a boolean value.
51     String values like 'True', 'TRUE', 'YES', 'Yes', 'y', 'NO', 'false', 'n', etc.
52     are supported.
53     If `arg` does not represent a boolean, an exception is raised.
54     :param arg : value being converted
55     :return result of conversion: `True` or `False`
56     """
57     if isinstance(arg, bool):
58         return arg
59     if isinstance(arg, bytes):
60         arg = arg.decode('utf-8', errors='ignore')
61     if isinstance(arg, str):
62         if arg.lower() in ('yes', 'y', 'true', 't'):
63             return True
64         if arg.lower() in ('no', 'n', 'false', 'f'):
65             return False
66     raise ValueError('Not a boolean value')
67
68 # ---
69
70 def getORBcfgInfo():
71     """
72     Get current omniORB configuration.
73
74     The information is retrieved from the omniORB configuration file defined
75     by the OMNIORB_CONFIG environment variable.
76     If omniORB configuration file can not be accessed, a tuple of three empty
77     strings is returned.
78
79     :return tuple of three strings: (orb_version, host_name, port_number)
80     """
81     orb_version = ''
82     hostname = ''
83     port_number = ''
84     with suppress(Exception), open(os.getenv('OMNIORB_CONFIG')) as forb:
85         regvar = re.compile(r'(ORB)?InitRef.*corbaname::(.*):(\d+)\s*$')
86         for line in forb.readlines():
87             match = regvar.match(line)
88             if match:
89                 orb_version = '4' if match.group(1) is None else '3'
90                 hostname = match.group(2)
91                 port_number = match.group(3)
92                 break
93     return orb_version, hostname, port_number
94
95 # ---
96
97 def getHostFromORBcfg():
98     """
99     Get current omniORB host name.
100     :return host name
101     """
102     return getORBcfgInfo()[1]
103
104 # ---
105
106 def getPortFromORBcfg():
107     """
108     Get current omniORB port.
109     :return port number
110     """
111     return getORBcfgInfo()[2]
112
113 # ---
114
115 def getUserName():
116     """
117     Get user name.
118
119     The following procedure is perfomed to deduce user name:
120     1. try USER (USERNAME on Windows) environment variable.
121     2. if (1) fails, try LOGNAME (un*x only).
122     3. if (2) fails, return 'unknown' as default user name
123
124     :return user name
125     """
126     if sys.platform == 'win32':
127         username = os.getenv('USERNAME')
128     else:
129         username = os.getenv('USER', os.getenv('LOGNAME'))
130     if username is None:
131         import getpass
132         username = getpass.getuser()
133     return username
134
135 # ---
136
137 def getHostName():
138     """
139     Get host name.
140
141     The following procedure is perfomed to deduce host name:
142     1. try socket python module, gethostname() function
143     2. if (1) fails, try HOSTNAME environment variable
144     3. if (2) fails, try HOST environment variable
145     4. if (3) fails, tries 'unknown' as default host name
146     5. finally, checks that IP is configured for hostname; if not, returns 'localhost'
147
148     :return host name
149     """
150     host = None
151     with suppress(Exception):
152         host = socket.gethostname()
153     if not host:
154         host = os.getenv('HOSTNAME', os.getenv('HOST', 'unknown'))
155     try:
156         # the following line just checks that IP is configured for hostname
157         socket.gethostbyname(host)
158     except (TypeError, OSError):
159         host = 'localhost'
160     return host
161
162 # ---
163
164 def getShortHostName():
165     """
166     Get short host name (with domain stripped).
167     See `getHostName()` for more details.
168     :return short host name
169     """
170     with suppress(AttributeError, IndexError):
171         return getHostName().split('.')[0]
172     return 'unknown' # default host name
173
174 # ---
175
176 def getAppName():
177     """
178     Get application name.
179     The following procedure is perfomed to deduce application name:
180     1. try APPNAME environment variable
181     2. if (1) fails, return 'SALOME' as default application name
182     :return application name
183     """
184     return os.getenv('APPNAME', 'SALOME') # 'SALOME' is default user name
185
186 def getPid():
187     return os.getpid()
188
189 # ---
190
191 def getPortNumber(use_default=True):
192     """
193     Get currently used omniORB port.
194     The following procedure is perfomed to deduce port number:
195     1. try NSPORT environment variable
196     2. if (1) fails, try to parse config file defined by OMNIORB_CONFIG environment variable
197     3. if (2) fails, return 2809 as default port number (if use_default is `True`) or `None`
198        (if use_default is `False`)
199     :return port number
200     """
201     with suppress(TypeError, ValueError):
202         return int(os.getenv('NSPORT'))
203     with suppress(TypeError, ValueError):
204         port = int(getPortFromORBcfg())
205         if port:
206             return port
207     return 2809 if use_default else None
208
209 # ---
210
211 def getHomeDir():
212     """
213     Get home directory.
214     :return home directory path
215     """
216     return osp.realpath(osp.expanduser('~'))
217
218 # ---
219
220 def getLogDir():
221     """
222     Get directory that stores log files.
223     :return path to the log directory
224     """
225     return osp.join(getTmpDir(), 'logs', getUserName())
226
227 # ---
228
229 def getTmpDir():
230     """
231     Get directory to store temporary files.
232     :return temporary directory path
233     """
234     with tempfile.NamedTemporaryFile() as tmp:
235         return osp.dirname(tmp.name)
236     return None
237
238 # ---
239
240 # pragma pylint: disable=too-many-arguments
241 def generateFileName(path, prefix=None, suffix=None, extension=None,
242                      unique=False, separator='_', hidden=False, **kwargs):
243     """
244     Generate file name.
245
246     :param path      : directory path
247     :param prefix    : file name prefix (none by default)
248     :param suffix    : file name suffix (none by default)
249     :param extension : file extension (none by default)
250     :param unique    : if `True`, function generates unique file name -
251                        in this case, if file with the generated name already
252                        exists in `path` directory, an integer suffix is appended
253                        to the file name (`False` by default)
254     :param separator : words separator ('_' by default)
255     :param hidden    : if `True`, file name is prepended with dot symbol
256                        (`False` by default)
257     :param kwargs    : additional keywrods arguments (see below)
258     :return generated file name
259
260     Additionally supported keyword parameters:
261     - with_username : use user name:
262     - with_hostname : use host name:
263     - with_port : use port number:
264     - with_app      : use application name:
265     - with_pid      : use current pid
266
267     Any of these keyword arguments can accept either explicit string value,
268     or `True` to automatically deduce value from current configuration.
269     """
270     filename = []
271
272     def _with_str(_str):
273         _str = '' if _str is None else str(_str)
274         if _str:
275             filename.append(_str)
276
277     def _with_kwarg(_kwarg, _func):
278         _value = kwargs.get(_kwarg, False)
279         try:
280             if _try_bool(_value):
281                 filename.append(str(_func()))
282         except ValueError:
283             _with_str(_value)
284
285     _with_str(prefix)
286     _with_kwarg('with_username', getUserName)
287     _with_kwarg('with_hostname', getShortHostName)
288     _with_kwarg('with_port', getPortNumber)
289     _with_kwarg('with_app', getAppName)
290     _with_kwarg('with_pid', getPid)
291     _with_str(suffix)
292
293     # raise an exception if file name is empty
294     if not filename:
295         raise ValueError('Empty file name')
296
297     # extension
298     extension = '' if extension is None else str(extension)
299     if extension.startswith('.'):
300         extension = extension[1:]
301
302     # separator
303     separator = '' if separator is None else str(separator)
304
305     def _generate(_index=None):
306         # join all components together, add index if necessary
307         if _index is not None:
308             _name = separator.join(filename+[str(_index)])
309         else:
310             _name = separator.join(filename)
311         # prepend with dot if necessary
312         if hidden:
313             _name = '.' + _name
314         # append extension if ncessary
315         if extension:
316             _name = _name + '.' + extension
317         # now get full path
318         return osp.join(path, _name)
319
320     name = _generate()
321     if unique:
322         index = 0
323         while osp.exists(name):
324             index = index + 1
325             name = _generate(index)
326     return osp.normpath(name)
327
328 # ---
329
330 def cleanDir(path):
331     """
332     Clear contents of directory.
333     :param path directory path
334     """
335     if osp.exists(path):
336         for filename in os.listdir(path):
337             file_path = osp.join(path, filename)
338             with suppress(OSError):
339                 if osp.isdir(file_path):
340                     shutil.rmtree(file_path)
341                 else:
342                     os.unlink(file_path)
343
344 # ---
345
346 def makeDir(path, mode=0o777):
347     """
348     Make directory with the specified path.
349     :param path : directory path
350     :param mode : access mode
351     """
352     try:
353         oldmask = os.umask(0)
354         os.makedirs(path, mode=mode, exist_ok=True)
355     except IOError:
356         pass
357     finally:
358         os.umask(oldmask)
359
360 # ---
361
362 def makeTmpDir(path, mode=0o777):
363     """
364     Make temporary directory with the specified path.
365     If the directory exists, clear all its contents.
366     :param path : directory path
367     :param mode : access mode
368     """
369     makeDir(path, mode)
370     cleanDir(path)
371
372 # ---
373
374 def uniteFiles(src_file, dest_file):
375     """
376     Join contents of `src_file` and `dest_file` and put result to `dest_file`.
377     File `dest_file` may not exist.
378     :param src_file  : source file path
379     :param dest_file : destination file path
380     """
381     if not osp.exists(src_file):
382         return
383
384     if osp.exists(dest_file):
385         with suppress(OSError), open(src_file, 'rb') as src, open(dest_file, 'ab') as dest:
386             dest.write(b'\n')
387             dest.write(src.read())
388     else:
389         with suppress(OSError):
390             shutil.copy(src_file, dest_file)
391
392 # --
393
394 class ColoredFormatter(logging.Formatter):
395     BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(30,38)
396     COLORS = { 'WARNING': YELLOW, 'INFO': WHITE, 'DEBUG': BLUE, 'CRITICAL': YELLOW, 'ERROR': RED }
397     def __init__(self, *args, **kwargs):
398         logging.Formatter.__init__(self, *args, **kwargs)
399     def format(self, record):
400         RESET_SEQ = "\033[0m"
401         COLOR_SEQ = "\033[1;%dm"
402         import inspect
403         frame = inspect.currentframe()
404         for i in range(8):
405             frame = frame.f_back
406         record.levelname = COLOR_SEQ % ColoredFormatter.COLORS[record.levelname] + record.levelname + RESET_SEQ
407         record.msg = "{} ( callsite is {} of file \"{}\" at line {} )".format(record.msg, frame.f_code.co_name,inspect.getsourcefile(frame),inspect.getlineno(frame) )
408         return logging.Formatter.format(self, record)
409
410 class BackTraceFormatter(logging.Formatter):
411     def __init__(self, *args, **kwargs):
412         logging.Formatter.__init__(self, *args, **kwargs)
413     def format(self, record):
414         import inspect
415         frame = inspect.currentframe()
416         # go upward with ( a limit of 10 steps ) of the stack to catch the effective callsite. Not very steady....
417         # should be replaced by an analysis of frame.f_code
418         for i in range(10):
419             frame = frame.f_back
420             if inspect.getsourcefile(frame) != logging.__file__:
421                 break
422         record.msg = "{} ( callsite is {} of file \"{}\" at line {} )".format(record.msg, frame.f_code.co_name,inspect.getsourcefile(frame),inspect.getlineno(frame) )
423         return logging.Formatter.format(self, record)     
424     
425 def positionVerbosityOfLogger( verboseLevel ):
426     from packaging import version
427     current_version = version.parse("{}.{}".format(sys.version_info.major,sys.version_info.minor))
428     version_ref = version.parse("3.5.0")
429     global logger
430     formatter = None
431     if current_version >= version_ref:
432         formatter = BackTraceFormatter('%(levelname)s : %(asctime)s : %(message)s ',style='%')
433     else:
434         formatter = logging.Formatter('%(levelname)s : %(asctime)s : %(message)s ',style='%')
435     formatter.default_time_format = '%H:%M:%S'
436     formatter.default_msec_format = "%s.%03d"
437     stream_handler = logging.StreamHandler()
438     stream_handler.setFormatter(formatter)
439     logger.addHandler(stream_handler)
440     logger.setLevel(verboseLevel)
441
442 def positionVerbosityOfLoggerRegardingState():
443     positionVerbosityOfLogger( verboseLevel() )
444
445 def verbose():
446     """
447     Get current verbosity activation.
448
449     Default verbosity level is specified via the environment variable SALOME_VERBOSE,
450     e.g. in bash:
451
452         $ export SALOME_VERBOSE=1
453
454     The function `setVerbose()` can be used to explicitly set verbosity activation.
455
456     :return current verbosity level
457     """
458     import KernelBasis
459     return KernelBasis.VerbosityActivated()
460
461 # --
462
463 def setVerbose(status):
464     """
465     Change verbosity activation status.
466     The function `verbose()` can be used to get current verbosity level.
467     :param status : verbosity status
468     :type status: bool
469     """
470     import KernelBasis
471     return KernelBasis.SetVerbosityActivated( status )
472
473 # --
474
475 KernelLogLevelToLogging = {"INFO":logging.INFO, "DEBUG":logging.DEBUG, "WARNING":logging.WARNING, "ERROR":logging.ERROR}
476
477 LoggingToKernelLogLevel = {v: k for k, v in KernelLogLevelToLogging.items()}
478
479 def verboseLevel():
480     """
481     Get current verbosity level.
482
483     Default verbosity level is specified via the environment variable SALOME_VERBOSE,
484     e.g. in bash:
485
486         $ export SALOME_VERBOSE_LEVEL=7
487
488     The function `setVerboseLevel()` can be used to explicitly set verbosity level.
489
490     :return current verbosity level
491     """
492     import KernelBasis
493     return KernelLogLevelToLogging[ KernelBasis.VerbosityLevel() ]
494
495 def setVerboseLevel(level):
496     """
497     Change verbosity level.
498     The function `verboseLevel()` can be used to get current verbosity level.
499     :param level : verbosity level
500     """
501     import KernelBasis
502     KernelBasis.SetVerbosityLevel(LoggingToKernelLogLevel[ level ])
503
504 # --
505
506 def killPid(pid, sig=9):
507     """
508     Send signal `sig` to the process with given `pid`.
509
510     :param pid : PID of the process
511     :param sig : signal to send; some of possible values:
512        - 9 : kill process
513        - 0 : do nothing, just check process existence (see below)
514        NOTE: other values are not processed on Windows
515     :return result of execution:
516        -  1 : success
517        -  0 : fail, no such process
518        - -1 : fail, another reason
519     """
520     if not pid:
521         return -1
522
523     with suppress(ValueError):
524         pid = int(pid)
525
526     if sig == 0:
527         ret = 1 if psutil.pid_exists(pid) else 0
528     else:
529         if verbose():
530             print("######## killPid pid = ", pid)
531         try:
532             process = psutil.Process(pid)
533             process.terminate()
534             _, alive = psutil.wait_procs([process], timeout=5)
535             for proc in alive:
536                 proc.kill()
537             ret = 1
538         except psutil.NoSuchProcess:
539             ret = 0
540         except OSError:
541             ret = -1
542     return ret
543 # --
544
545 def getOmniNamesPid(port):
546     """
547     Get PID of omniNames process running on given `port`.
548     :param port : port number
549     :return omniNames process's PID
550     """
551     processes = {p.info['pid']: p.info['name'] for p in psutil.process_iter(['pid', 'name'])}
552     return next((c.pid for c in psutil.net_connections(kind='inet') \
553                      if str(c.laddr.port) == str(port) and processes.get(c.pid).startswith('omniNames')), None)
554 # --
555
556 def killOmniNames(port):
557     """
558     Kill omniNames process running on given `port`.
559     :param port : port number
560     """
561     with suppress(Exception):
562         killPid(getOmniNamesPid(port))
563 # --