X-Git-Url: http://git.salome-platform.org/gitweb/?a=blobdiff_plain;f=bin%2FPortManager.py;h=8791bfc94c897ecdcd31e557760517a8122c7f22;hb=3454d640afacac24e1c958a60ba7b55ad9326584;hp=cd9664b51cabe3d5769a95dea6211deb8a57b748;hpb=9817903f2ee7b0a92e68fa1441412f1c80bb5f56;p=modules%2Fkernel.git diff --git a/bin/PortManager.py b/bin/PortManager.py index cd9664b51..8791bfc94 100644 --- a/bin/PortManager.py +++ b/bin/PortManager.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # -*- coding: iso-8859-1 -*- -# Copyright (C) 2007-2013 CEA/DEN, EDF R&D, OPEN CASCADE +# Copyright (C) 2007-2015 CEA/DEN, EDF R&D, OPEN CASCADE # # Copyright (C) 2003-2007 OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN, # CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS @@ -8,7 +8,7 @@ # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either -# version 2.1 of the License. +# version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of @@ -21,242 +21,201 @@ # # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com # -from Singleton import Singleton - -import multiprocessing -import time -import socket - +import os import sys -import threading -import SocketServer try: - import cPickle as pickle + import cPickle as pickle #@UnusedImport except: - import pickle - -import struct -import ctypes - -if sys.platform == 'win32': - import multiprocessing.reduction # make sockets pickable/inheritable - -multiprocessing.freeze_support() # Add support for when a program which uses multiprocessing has been frozen to produce a Windows executable. -#ignore = multiprocessing.active_children() # cleanup any old processes + import pickle #@Reimport -""" -This class handles port distribution for SALOME sessions. -In case of concurent sessions, each will have its own port number. -""" -class _PortManager(object): # :TODO: must manage lock owner - __metaclass__ = Singleton - # - def __init__(self, startNumber = 2810, limit = 100, timeout=60): - super(_PortManager, self).__init__() - self.__startNumber = startNumber - self.__limit = startNumber + limit - self.__lockedPorts = [] - self.__lock = multiprocessing.Lock() - self.__timeout = timeout - self.__lastChangeTime = time.time() - # - # Test for prefered port number, if asked. - def getPort(self, port=None): - with self.__lock: - if not port or self.isPortUsed(port): - port = self.__startNumber - while self.isPortUsed(port): - if port == self.__limit: - msg = "\n" - msg += "Can't find a free port to launch omniNames\n" - msg += "Try to kill the running servers and then launch SALOME again.\n" - raise RuntimeError, msg - port = port + 1 - # - self.__lockedPorts.append(port) - self.__lastChangeTime = time.time() - return port - # - def releasePort(self, port): - with self.__lock: - if port in self.__lockedPorts: - self.__lockedPorts.remove(port) - self.__lastChangeTime = time.time() - # - def isBusy(self): - return len(self.__lockedPorts) - # - def isPortUsed(self, port): - return (port in self.__lockedPorts) or self.__isNetworkConnectionActiveOnPort(port) - # - def __isNetworkConnectionActiveOnPort(self, port): - # :NOTE: Under windows: - # netstat options -l and -t are unavailable - # grep command is unavailable - from subprocess import Popen, PIPE - (stdout, stderr) = Popen(['netstat','-an'], stdout=PIPE).communicate() - import StringIO - buf = StringIO.StringIO(stdout) - ports = buf.readlines() - # search for TCP - LISTEN connections - import re - regObj = re.compile( ".*tcp.*:([0-9]+).*:.*listen", re.IGNORECASE ); - for item in ports: - try: - p = int(regObj.match(item).group(1)) - if p == port: return True - except: - pass - # - def timeout(self): - return (time.time() - self.__lastChangeTime > self.__timeout) - # - def __str__(self): - with self.__lock: - return "_PortManager: list of locked ports:" + str(sorted(self.__lockedPorts)) - # +import logging +def createLogger(): + logger = logging.getLogger(__name__) +# logger.setLevel(logging.DEBUG) + ch = logging.StreamHandler() + ch.setLevel(logging.DEBUG) + formatter = logging.Formatter("%(levelname)s:%(threadName)s:%(message)s") + ch.setFormatter(formatter) + logger.addHandler(ch) + return logger # +logger = createLogger() #------------------------------------ -# Communication methods - -_marshall = pickle.dumps -_unmarshall = pickle.loads - -def _send(channel, *args): - buf = _marshall(args) - value = socket.htonl(len(buf)) - size = struct.pack("L",value) - channel.send(size) - channel.send(buf) +# A file locker (Linux only) +def __acquire_lock(lock): + if sys.platform == "win32": + import msvcrt + # lock 1 byte: file is supposed to be zero-byte long + msvcrt.locking(lock.fileno(), msvcrt.LK_LOCK, 1) + else: + import fcntl + fcntl.flock(lock, fcntl.LOCK_EX) +# +def __release_lock(lock): + if sys.platform == "win32": + import msvcrt + msvcrt.locking(lock.fileno(), msvcrt.LK_UNLCK, 1) + else: + import fcntl + fcntl.flock(lock, fcntl.LOCK_UN) +# + +def _getConfigurationFilename(): + omniorbUserPath = os.getenv("OMNIORB_USER_PATH") + + from salome_utils import generateFileName + portmanager_config = generateFileName(omniorbUserPath, + prefix="omniORB", + suffix="PortManager", + extension="cfg", + hidden=True) + import tempfile + temp = tempfile.NamedTemporaryFile() + lock_file = os.path.join(os.path.dirname(temp.name), ".omniORB_PortManager.lock") + temp.close() + + return (portmanager_config, lock_file) +# + +def __isPortUsed(port, busy_ports): + return (port in busy_ports) or __isNetworkConnectionActiveOnPort(port) +# + +def __isNetworkConnectionActiveOnPort(port): + # :NOTE: Under windows: + # netstat options -l and -t are unavailable + # grep command is unavailable + from subprocess import Popen, PIPE + stdout, _ = Popen(['netstat','-an'], stdout=PIPE).communicate() + import StringIO + buf = StringIO.StringIO(stdout) + ports = buf.readlines() + # search for TCP - LISTEN connections + import re + regObj = re.compile( ".*tcp.*:([0-9]+).*:.*listen", re.IGNORECASE ); + for item in ports: + try: + p = int(regObj.match(item).group(1)) + if p == port: return True + except: + pass # -def _receive(channel): - size = struct.calcsize("L") - size = channel.recv(size) - try: - size = socket.ntohl(struct.unpack("L", size)[0]) - except struct.error, e: - return '' +def getPort(preferedPort=None): + logger.debug("GET PORT") + + config_file, lock_file = _getConfigurationFilename() + oldmask = os.umask(0) + with open(lock_file, 'w') as lock: + # acquire lock + __acquire_lock(lock) + + # read config + config = {'busy_ports':[]} + logger.debug("read configuration file") + try: + with open(config_file, 'r') as f: + config = pickle.load(f) + except IOError: # empty file + pass - buf = "" - while len(buf) < size: - buf = channel.recv(size - len(buf)) + logger.debug("load busy_ports: %s"%str(config["busy_ports"])) + + # append port + busy_ports = config["busy_ports"] + port = preferedPort + if not port or __isPortUsed(port, busy_ports): + port = 2810 + while __isPortUsed(port, busy_ports): + if port == 2810+100: + msg = "\n" + msg += "Can't find a free port to launch omniNames\n" + msg += "Try to kill the running servers and then launch SALOME again.\n" + raise RuntimeError, msg + port = port + 1 + logger.debug("found free port: %s"%str(port)) + config["busy_ports"].append(port) + + # write config + logger.debug("write busy_ports: %s"%str(config["busy_ports"])) + try: + with open(config_file, 'w') as f: + pickle.dump(config, f) + except IOError: + pass + + # release lock + __release_lock(lock) + # - return _unmarshall(buf)[0] + os.umask(oldmask) + logger.debug("get port: %s"%str(port)) + return port # -#------------------------------------ -GET_PORT_MSG = "GET_PORT" -GET_PREFERED_PORT_MSG = "GET_PREFERED_PORT" -RELEASE_PORT_MSG = "RELEASE_PORT" -STOP_SERVER_MSG = "STOP_SERVER" +def releasePort(port): + port = int(port) + logger.debug("RELEASE PORT (%s)"%port) + + config_file, lock_file = _getConfigurationFilename() + with open(lock_file, 'w') as lock: + # acquire lock + __acquire_lock(lock) + + # read config + config = {'busy_ports':[]} + logger.debug("read configuration file") + try: + with open(config_file, 'r') as f: + config = pickle.load(f) + except IOError: # empty file + pass -GET_PORT_ACK_MSG = "GET_PORT" -RELEASE_PORT_ACK_MSG = "RELEASE_PORT" + logger.debug("load busy_ports: %s"%str(config["busy_ports"])) -class _ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler): - def handle(self): - data = _receive(self.request) - if data == GET_PORT_MSG: - pm = _PortManager() - port = pm.getPort() - response = "%s: %s" % (GET_PORT_ACK_MSG, port) - _send(self.request, response) - elif data.startswith(GET_PREFERED_PORT_MSG): - port = int(data[len(GET_PREFERED_PORT_MSG)+1:]) - pm = _PortManager() - port = pm.getPort(port) - response = "%s: %s" % (GET_PORT_ACK_MSG, port) - _send(self.request, response) - elif data.startswith(RELEASE_PORT_MSG): - port = int(data[len(RELEASE_PORT_MSG)+1:]) - pm = _PortManager() - pm.releasePort(port) - response = "%s" % (RELEASE_PORT_ACK_MSG) - _send(self.request, response) - #print "RELEASE_PORT:", port - if not pm.isBusy(): - #print "Close server" - self.server.shutdown() - #print pm - elif data == STOP_SERVER_MSG: - #print "Close server" - self.server.shutdown() -# + # remove port from list + busy_ports = config["busy_ports"] -class _ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer): - pass + if port in busy_ports: + busy_ports.remove(port) + config["busy_ports"] = busy_ports -def __getServer(address): - SocketServer.ThreadingTCPServer.allow_reuse_address = True # can be restarted immediately - server = _ThreadedTCPServer(address, _ThreadedTCPRequestHandler, False) # Do not automatically bind - server.allow_reuse_address = True # Prevent 'cannot bind to address' errors on restart - return server -# + # write config + logger.debug("write busy_ports: %s"%str(config["busy_ports"])) + try: + with open(config_file, 'w') as f: + pickle.dump(config, f) + except IOError: + pass -pm_address = ('localhost', 12345) -def __startServer(): - try: - server = __getServer(pm_address) - server.server_bind() # Manually bind, to support allow_reuse_address - server.server_activate() - address = server.server_address + # release lock + __release_lock(lock) - # Start a thread with the server -- that thread will then start one - # more thread for each request - server_thread = threading.Thread(target=server.serve_forever, name="SALOME_PortManager") - # Exit the server thread when the main thread terminates - #server_thread.setDaemon(True) - server_thread.start() - #print "Server loop running in thread:", server_thread.getName() - #print "Server address:", address - #return address - except: - #print "Server already started" - #print "Server address:", pm_address - #return pm_address - pass + logger.debug("released port port: %s"%str(port)) # -def __newClient(address, message): - try: - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.connect(address) - _send(sock, message) - response = _receive(sock) - if response.startswith(GET_PORT_ACK_MSG): - port = int(response[len(GET_PORT_ACK_MSG)+1:]) - #print "GET_PORT:", port - return port - elif response == RELEASE_PORT_ACK_MSG: - #print "Received: %s" % response - return 0 +def getBusyPorts(): + config_file, lock_file = _getConfigurationFilename() + with open(lock_file, 'w') as lock: + # acquire lock + __acquire_lock(lock) + + # read config + config = {'busy_ports':[]} + logger.debug("read configuration file") + try: + with open(config_file, 'r') as f: + config = pickle.load(f) + except IOError: # empty file pass - sock.close() - except socket.error: - #print "Unable to connect to server" - return -1 -# -def getPort(preferedPort=None): - if preferedPort: - return __newClient(pm_address, "%s: %s"%(GET_PREFERED_PORT_MSG,preferedPort)) - else: - return __newClient(pm_address, GET_PORT_MSG) -# + logger.debug("load busy_ports: %s"%str(config["busy_ports"])) -def releasePort(port): - __newClient(pm_address, "%s: %s"%(RELEASE_PORT_MSG,port)) -# + busy_ports = config["busy_ports"] + # release lock + __release_lock(lock) -def stopServer(): - __newClient(pm_address, STOP_SERVER_MSG) + return busy_ports # - -# Auto start: unique instance ; no effect if called multiple times -__startServer() -#server_thread = threading.Thread(target=__startServer, name="SALOME_PortManager") -#server_thread.setDaemon(True) -#server_thread.start()