X-Git-Url: http://git.salome-platform.org/gitweb/?a=blobdiff_plain;f=bin%2FPortManager.py;h=ced2ad296a56f28f7c2f388c0e63bb196362eaea;hb=873ec599707c3abbea2c3203aa4d8c1e98e2dcd2;hp=0ad853a9bc05e0594413c847706a8a4339d157db;hpb=da3a8fb8adeebef45153575af5040d7f44d89416;p=modules%2Fkernel.git diff --git a/bin/PortManager.py b/bin/PortManager.py index 0ad853a9b..ced2ad296 100644 --- a/bin/PortManager.py +++ b/bin/PortManager.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # -*- coding: iso-8859-1 -*- -# Copyright (C) 2007-2013 CEA/DEN, EDF R&D, OPEN CASCADE +# Copyright (C) 2007-2017 CEA/DEN, EDF R&D, OPEN CASCADE # # Copyright (C) 2003-2007 OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN, # CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS @@ -8,7 +8,7 @@ # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either -# version 2.1 of the License. +# version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of @@ -21,29 +21,22 @@ # # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com # -from Singleton import Singleton - -import multiprocessing -import time -import socket - import os import sys -import threading -import SocketServer try: - import cPickle as pickle + import cPickle as pickle #@UnusedImport except: - import pickle + import pickle #@Reimport -import struct -import ctypes +__PORT_MIN_NUMBER = 2810 +__PORT_MAX_NUMBER = 2910 import logging def createLogger(): logger = logging.getLogger(__name__) # logger.setLevel(logging.DEBUG) + logger.setLevel(logging.INFO) ch = logging.StreamHandler() ch.setLevel(logging.DEBUG) formatter = logging.Formatter("%(levelname)s:%(threadName)s:%(message)s") @@ -53,345 +46,238 @@ def createLogger(): # logger = createLogger() - -if sys.platform == 'win32': - import multiprocessing.reduction # make sockets pickable/inheritable - -multiprocessing.freeze_support() # Add support for when a program which uses multiprocessing has been frozen to produce a Windows executable. - -""" -This class handles port distribution for SALOME sessions. -In case of concurent sessions, each will have its own port number. -""" -class _PortManager(object): # :TODO: must manage lock owner - __metaclass__ = Singleton - # - def __init__(self, startNumber = 2810, limit = 100, timeout=60): - super(_PortManager, self).__init__() - self.__startNumber = startNumber - self.__limit = startNumber + limit - self.__lockedPorts = [] - self.__lock = multiprocessing.Lock() - self.__timeout = timeout - self.__lastChangeTime = time.time() - # - # Test for prefered port number, if asked. - def getPort(self, port=None): - with self.__lock: - if not port or self.isPortUsed(port): - port = self.__startNumber - while self.isPortUsed(port): - if port == self.__limit: - msg = "\n" - msg += "Can't find a free port to launch omniNames\n" - msg += "Try to kill the running servers and then launch SALOME again.\n" - raise RuntimeError, msg - port = port + 1 - # - self.__lockedPorts.append(port) - self.__lastChangeTime = time.time() - return port - # - def releasePort(self, port): - logger.debug("PortManager.releasePort %s"%port) - with self.__lock: - if port in self.__lockedPorts: - self.__lockedPorts.remove(port) - self.__lastChangeTime = time.time() - # - def isBusy(self): - return len(self.__lockedPorts) - # - def isPortUsed(self, port): - return (port in self.__lockedPorts) or self.__isNetworkConnectionActiveOnPort(port) - # - def __isNetworkConnectionActiveOnPort(self, port): - # :NOTE: Under windows: - # netstat options -l and -t are unavailable - # grep command is unavailable - from subprocess import Popen, PIPE - (stdout, stderr) = Popen(['netstat','-an'], stdout=PIPE).communicate() - import StringIO - buf = StringIO.StringIO(stdout) - ports = buf.readlines() - # search for TCP - LISTEN connections - import re - regObj = re.compile( ".*tcp.*:([0-9]+).*:.*listen", re.IGNORECASE ); - for item in ports: - try: - p = int(regObj.match(item).group(1)) - if p == port: return True - except: - pass - # - def timeout(self): - return (time.time() - self.__lastChangeTime > self.__timeout) - # - def __str__(self): - with self.__lock: - return "_PortManager: list of locked ports:" + str(sorted(self.__lockedPorts)) - # -# - #------------------------------------ -# Communication methods - -_marshall = pickle.dumps -_unmarshall = pickle.loads - -def _send(channel, *args): - buf = _marshall(args) - value = socket.htonl(len(buf)) - size = struct.pack("L",value) - channel.send(size) - channel.send(buf) -# - -def _receive(channel): - size = struct.calcsize("L") - size = channel.recv(size) - try: - size = socket.ntohl(struct.unpack("L", size)[0]) - except struct.error, e: - return '' - - buf = "" - while len(buf) < size: - buf = channel.recv(size - len(buf)) - - return _unmarshall(buf)[0] -# -#------------------------------------ - -GET_PORT_MSG = "GET_PORT" -GET_PREFERED_PORT_MSG = "GET_PREFERED_PORT" -RELEASE_PORT_MSG = "RELEASE_PORT" -STOP_SERVER_MSG = "STOP_SERVER" -TEST_SERVER_MSG = "TEST_SERVER" - -GET_PORT_ACK_MSG = "GET_PORT" -RELEASE_PORT_ACK_MSG = "RELEASE_PORT" -TEST_SERVER_ACK_MSG = "TEST_SERVER" - -class _ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler): - def handle(self): - data = _receive(self.request) - if data == GET_PORT_MSG: - pm = _PortManager() - port = pm.getPort() - response = "%s: %s" % (GET_PORT_ACK_MSG, port) - _send(self.request, response) - elif data.startswith(GET_PREFERED_PORT_MSG): - port = int(data[len(GET_PREFERED_PORT_MSG)+1:]) - pm = _PortManager() - port = pm.getPort(port) - response = "%s: %s" % (GET_PORT_ACK_MSG, port) - _send(self.request, response) - elif data.startswith(RELEASE_PORT_MSG): - port = int(data[len(RELEASE_PORT_MSG)+1:]) - pm = _PortManager() - pm.releasePort(port) - response = "%s" % (RELEASE_PORT_ACK_MSG) - _send(self.request, response) - logger.debug("RELEASE_PORT: %s"%port) - if not pm.isBusy(): - logger.debug("Close server") - config_file, lock_file = _getConfigurationFilename() - try: - os.remove(config_file) - pmlock.release() - os.remove(lock_file) - except: - pass - self.server.shutdown() - #print pm - elif data == STOP_SERVER_MSG: - logger.debug("Close server") - self.server.shutdown() - elif data == TEST_SERVER_MSG: - _send(self.request, TEST_SERVER_ACK_MSG) +# A file locker +def __acquire_lock(lock): + logger.debug("ACQUIRE LOCK") + if sys.platform == "win32": + import msvcrt + # lock 1 byte: file is supposed to be zero-byte long + msvcrt.locking(lock.fileno(), msvcrt.LK_LOCK, 1) + else: + import fcntl + fcntl.flock(lock, fcntl.LOCK_EX) + logger.debug("LOCK ACQUIRED") # - -class _ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer): - pass - -#------------------------------------ -# A file locker (Linux only) -import fcntl -class PortManagerLock: - def __init__(self, filename, readonly=False, blocking=True): - self.filename = filename - # This will create it if it does not exist already - logger.debug("Create lock on %s"%filename) - mode = 'w' - if readonly: - mode = 'r' - self.handle = open(filename, mode) - self.handle.seek(0) # go back to beginning of file to read it multiple times - self.__blocking = blocking - - def acquire(self): - mode = fcntl.LOCK_EX - if not self.__blocking: # Raise an IOError exception if file has already been locked - mode = mode | fcntl.LOCK_NB - fcntl.flock(self.handle, mode) - logger.debug("lock acquired %s"%self.__blocking) - - def release(self): - fcntl.flock(self.handle, fcntl.LOCK_UN) - logger.debug("lock released") - - def __del__(self): - if logger: - logger.debug("Close lock file") - self.handle.close() +def __release_lock(lock): + logger.debug("RELEASE LOCK") + if sys.platform == "win32": + import msvcrt + msvcrt.locking(lock.fileno(), msvcrt.LK_UNLCK, 1) + else: + import fcntl + fcntl.flock(lock, fcntl.LOCK_UN) + logger.debug("LOCK RELEASED") # #------------------------------------ -# Server address has to be shared by different processes, without any common -# ancestor. -# The "simplest" solution is to define it here as a global variable. Unfortunately, -# availability of the corresponding socket is not guaranted at all. If PortManager -# tries to use a socket it does not own, server is not created (it is identified as -# already existing), clients then connect on this socket but message passing -# between clients and server will not work and SALOME launch will crash. -# We can introduce a port verification procedure automatically called by importing -# this module (i.e. when creating the server). This procedure consists in creating -# a client which sends a specific message to the server that has to be tested. And -# loop on port numbers until a free socket is found and associated to a new server. -# -# Another approach is to let Python socket API select a free port number, then store -# it to a file on server host machine in order to be shared with clients. -# The logical part can be defined as follows. When server is started (by importing -# this module), write server port number to a specific file (do not use a temporary -# file name). Each client then read server address from this same file ; if file is -# not nound, it is an error (add appropriate processing). -# Server could also check file existence and try to use the same address as previous -# server in order to avoid opening too many unecessary sockets ; but we need to apply -# the above verification procedure. This processing is not necessary because TCP socket -# timeout will automatically close unused sockets. - def _getConfigurationFilename(): omniorbUserPath = os.getenv("OMNIORB_USER_PATH") from salome_utils import generateFileName portmanager_config = generateFileName(omniorbUserPath, - prefix="omniORB", + prefix="salome", suffix="PortManager", extension="cfg", hidden=True) - lock_file = portmanager_config + "-lock" + import tempfile + temp = tempfile.NamedTemporaryFile() + lock_file = os.path.join(os.path.dirname(temp.name), ".salome_PortManager.lock") + temp.close() + return (portmanager_config, lock_file) # -def __checkServer(): - while True: - logger.debug("CHECKING SERVER") - status = __newClient(TEST_SERVER_MSG) - if status == TEST_SERVER_ACK_MSG: - break - return (status == TEST_SERVER_ACK_MSG) +def __isPortUsed(port, config): + busy_ports = [] + for ports in config.values(): + busy_ports += ports + return (port in busy_ports) or __isNetworkConnectionActiveOnPort(port) # -def __getServerAddress(readonly=True): - address = ("localhost", 0) - try: - config_file, lock_file = _getConfigurationFilename() - lock = PortManagerLock(config_file, readonly, blocking=True) - lock.acquire() - address = eval(lock.handle.read()) - lock.release() - except (IOError, SyntaxError) as e: - logger.debug("no configuration file") +def __isNetworkConnectionActiveOnPort(port): + # :NOTE: Under windows: + # netstat options -l and -t are unavailable + # grep command is unavailable + if sys.platform == "win32": + cmd = ['netstat','-a','-n','-p tcp'] + else: + cmd = ['netstat','-ant'] pass - finally: - return address + + err = None + try: + from subprocess import Popen, PIPE, STDOUT + p = Popen(cmd, stdout=PIPE, stderr=STDOUT) + out, err = p.communicate() + except: + print "Error when trying to access active network connections." + if err: print err + import traceback + traceback.print_exc() + return False + + import StringIO + buf = StringIO.StringIO(out) + ports = buf.readlines() + # search for TCP - LISTEN connections + import re + regObj = re.compile( ".*tcp.*:([0-9]+).*:.*listen", re.IGNORECASE ); + for item in ports: + try: + p = int(regObj.match(item).group(1)) + if p == port: return True + except: + pass + return False # -def __setServerAddress(address): +def getPort(preferedPort=None): + logger.debug("GET PORT") + config_file, lock_file = _getConfigurationFilename() - lock = PortManagerLock(config_file, readonly=False, blocking=True) - lock.acquire() - logger.debug("setServerAddress: %s"%str(address)) - lock.handle.write(str(address)) - lock.release() -# + oldmask = os.umask(0) + with open(lock_file, 'w') as lock: + # acquire lock + __acquire_lock(lock) + + # read config + config = {} + logger.debug("read configuration file") + try: + with open(config_file, 'r') as f: + config = pickle.load(f) + except: + logger.info("Problem loading PortManager file: %s"%config_file) + # In this case config dictionary is reset + + logger.debug("load config: %s"%str(config)) + appli_path = os.getenv("ABSOLUTE_APPLI_PATH", "unknown") + try: + config[appli_path] + except KeyError: + config[appli_path] = [] + + # append port + port = preferedPort + if not port or __isPortUsed(port, config): + port = __PORT_MIN_NUMBER + while __isPortUsed(port, config): + if port == __PORT_MAX_NUMBER: + msg = "\n" + msg += "Can't find a free port to launch omniNames\n" + msg += "Try to kill the running servers and then launch SALOME again.\n" + raise RuntimeError, msg + logger.debug("Port %s seems to be busy"%str(port)) + port = port + 1 + logger.debug("found free port: %s"%str(port)) + config[appli_path].append(port) + + # write config + logger.debug("write config: %s"%str(config)) + try: + with open(config_file, 'w') as f: + pickle.dump(config, f) + except IOError: + pass -def __getServer(): - address = __getServerAddress(readonly=False) - SocketServer.ThreadingTCPServer.allow_reuse_address = True # can be restarted immediately - server = _ThreadedTCPServer(address, _ThreadedTCPRequestHandler, False) # Do not automatically bind - server.allow_reuse_address = True # Prevent 'cannot bind to address' errors on restart - server.server_bind() # Manually bind, to support allow_reuse_address - __setServerAddress(server.server_address) - server.server_activate() - return server -# + # release lock + __release_lock(lock) + # -pmlock = None -def __startServer(): - try: - config_file, lock_file = _getConfigurationFilename() - global pmlock - pmlock = PortManagerLock(lock_file, readonly=False, blocking=False) - pmlock.acquire() - - server = __getServer() - # Start a thread with the server -- that thread will then start one - # more thread for each request - server_thread = threading.Thread(target=server.serve_forever, name="SALOME_PortManager") - # Exit the server thread when the main thread terminates - #server_thread.setDaemon(True) - server_thread.start() - #print "Server loop running in thread:", server_thread.getName() - except: - logger.debug("Server already started") - pass + os.umask(oldmask) + logger.debug("get port: %s"%str(port)) + return port # -def __newClient(message): - address = __getServerAddress(readonly=True) - try: - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - logger.debug("connect client to %s"%str(address)) - sock.connect(address) - _send(sock, message) - response = _receive(sock) - if response.startswith(GET_PORT_ACK_MSG): - port = int(response[len(GET_PORT_ACK_MSG)+1:]) - logger.debug("GET_PORT: %s"%port) - return port - elif response == RELEASE_PORT_ACK_MSG: - logger.debug("Received: %s" % response) - return 0 +def releasePort(port): + port = int(port) + logger.debug("RELEASE PORT (%s)"%port) + + config_file, lock_file = _getConfigurationFilename() + oldmask = os.umask(0) + with open(lock_file, 'w') as lock: + # acquire lock + __acquire_lock(lock) + + # read config + config = {} + logger.debug("read configuration file") + try: + with open(config_file, 'r') as f: + config = pickle.load(f) + except IOError: # empty file pass - elif response == TEST_SERVER_ACK_MSG: - logger.debug("Server is ok") - return TEST_SERVER_ACK_MSG + + logger.debug("load config: %s"%str(config)) + appli_path = os.getenv("ABSOLUTE_APPLI_PATH", "unknown") + try: + config[appli_path] + except KeyError: + config[appli_path] = [] + + # remove port from list + ports_info = config[appli_path] + config[appli_path] = [x for x in ports_info if x != port] + + # write config + logger.debug("write config: %s"%str(config)) + try: + with open(config_file, 'w') as f: + pickle.dump(config, f) + except IOError: pass - sock.close() - except socket.error: - logger.debug("Unable to connect to server") - return -1 -# -def getPort(preferedPort=None): - if preferedPort: - return __newClient("%s: %s"%(GET_PREFERED_PORT_MSG,preferedPort)) - else: - return __newClient(GET_PORT_MSG) -# + # release lock + __release_lock(lock) -def releasePort(port): - logger.debug("application asks for releasePort %s"%port) - __newClient("%s: %s"%(RELEASE_PORT_MSG,port)) -# + logger.debug("released port port: %s"%str(port)) -def stopServer(): - __newClient(STOP_SERVER_MSG) + os.umask(oldmask) # -# Auto start: unique instance ; no effect if called multiple times -__startServer() -logger.debug("Server started... do check...") -assert(__checkServer()) +def getBusyPorts(): + config_file, lock_file = _getConfigurationFilename() + oldmask = os.umask(0) + with open(lock_file, 'w') as lock: + # acquire lock + __acquire_lock(lock) + + # read config + config = {} + logger.debug("read configuration file") + try: + with open(config_file, 'r') as f: + config = pickle.load(f) + except IOError: # empty file + pass + + logger.debug("load config: %s"%str(config)) + appli_path = os.getenv("ABSOLUTE_APPLI_PATH", "unknown") + try: + config[appli_path] + except KeyError: + config[appli_path] = [] + + # Scan all possible ports to determine which ones are owned by other applications + ports_info = { 'this': [], 'other': [] } + my_busy_ports = config[appli_path] + for port in range(__PORT_MIN_NUMBER, __PORT_MAX_NUMBER): + if __isPortUsed(port, config): + logger.debug("Port %s seems to be busy"%str(port)) + if port in my_busy_ports: + ports_info["this"].append(port) + else: + ports_info["other"].append(port) + + logger.debug("all busy_ports: %s"%str(ports_info)) + + sorted_ports = { 'this': sorted(ports_info['this']), + 'other': sorted(ports_info['other']) } + + # release lock + __release_lock(lock) + + os.umask(oldmask) + return sorted_ports +#