From d9da5867f38c0b04f5df834f9cdd5f49c14a136b Mon Sep 17 00:00:00 2001 From: aguerre Date: Tue, 12 Nov 2013 16:58:04 +0000 Subject: [PATCH] use file locks --- bin/PortManager.py | 185 +++++++++++++++++++++++++++++++++++------ bin/parseConfigFile.py | 3 +- 2 files changed, 163 insertions(+), 25 deletions(-) diff --git a/bin/PortManager.py b/bin/PortManager.py index da1e06cd6..56e81f106 100644 --- a/bin/PortManager.py +++ b/bin/PortManager.py @@ -27,6 +27,7 @@ import multiprocessing import time import socket +import os import sys import threading import SocketServer @@ -39,6 +40,20 @@ except: import struct import ctypes +import logging +def createLogger(): + logger = logging.getLogger(__name__) +# logger.setLevel(logging.DEBUG) + ch = logging.StreamHandler() + ch.setLevel(logging.DEBUG) + formatter = logging.Formatter("%(levelname)s:%(threadName)s:%(message)s") + ch.setFormatter(formatter) + logger.addHandler(ch) + return logger +# +logger = createLogger() + + if sys.platform == 'win32': import multiprocessing.reduction # make sockets pickable/inheritable @@ -78,6 +93,7 @@ class _PortManager(object): # :TODO: must manage lock owner return port # def releasePort(self, port): + logger.debug("PortManager.releasePort %s"%port) with self.__lock: if port in self.__lockedPorts: self.__lockedPorts.remove(port) @@ -151,9 +167,11 @@ GET_PORT_MSG = "GET_PORT" GET_PREFERED_PORT_MSG = "GET_PREFERED_PORT" RELEASE_PORT_MSG = "RELEASE_PORT" STOP_SERVER_MSG = "STOP_SERVER" +TEST_SERVER_MSG = "TEST_SERVER" GET_PORT_ACK_MSG = "GET_PORT" RELEASE_PORT_ACK_MSG = "RELEASE_PORT" +TEST_SERVER_ACK_MSG = "TEST_SERVER" class _ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler): def handle(self): @@ -175,35 +193,150 @@ class _ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler): pm.releasePort(port) response = "%s" % (RELEASE_PORT_ACK_MSG) _send(self.request, response) - #print "RELEASE_PORT:", port + logger.debug("RELEASE_PORT: %s"%port) if not pm.isBusy(): - #print "Close server" + logger.debug("Close server") + config_file, lock_file = _getConfigurationFilename() + try: + os.remove(config_file) + pmlock.release() + os.remove(lock_file) + except: + pass self.server.shutdown() #print pm elif data == STOP_SERVER_MSG: - #print "Close server" + logger.debug("Close server") self.server.shutdown() + elif data == TEST_SERVER_MSG: + _send(self.request, TEST_SERVER_ACK_MSG) # class _ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer): pass -pm_address = ('127.0.0.1', 51843) -def __getServer(address): +#------------------------------------ +# A file locker (Linux only) +import fcntl +class PortManagerLock: + def __init__(self, filename, readonly=False, blocking=True): + self.filename = filename + # This will create it if it does not exist already + logger.debug("Create lock on %s"%filename) + mode = 'w' + if readonly: + mode = 'r' + self.handle = open(filename, mode) + self.handle.seek(0) # go back to beginning of file to read it multiple times + self.__blocking = blocking + + def acquire(self): + mode = fcntl.LOCK_EX + if not self.__blocking: # Raise an IOError exception if file has already been locked + mode = mode | fcntl.LOCK_NB + fcntl.flock(self.handle, mode) + logger.debug("lock acquired %s"%self.__blocking) + + def release(self): + fcntl.flock(self.handle, fcntl.LOCK_UN) + logger.debug("lock released") + + def __del__(self): + if logger: + logger.debug("Close lock file") + self.handle.close() +# +#------------------------------------ + +# Server address has to be shared by different processes, without any common +# ancestor. +# The "simplest" solution is to define it here as a global variable. Unfortunately, +# availability of the corresponding socket is not guaranted at all. If PortManager +# tries to use a socket it does not own, server is not created (it is identified as +# already existing), clients then connect on this socket but message passing +# between clients and server will not work and SALOME launch will crash. +# We can introduce a port verification procedure automatically called by importing +# this module (i.e. when creating the server). This procedure consists in creating +# a client which sends a specific message to the server that has to be tested. And +# loop on port numbers until a free socket is found and associated to a new server. +# +# Another approach is to let Python socket API select a free port number, then store +# it to a file on server host machine in order to be shared with clients. +# The logical part can be defined as follows. When server is started (by importing +# this module), write server port number to a specific file (do not use a temporary +# file name). Each client then read server address from this same file ; if file is +# not nound, it is an error (add appropriate processing). +# Server could also check file existence and try to use the same address as previous +# server in order to avoid opening too many unecessary sockets ; but we need to apply +# the above verification procedure. This processing is not necessary because TCP socket +# timeout will automatically close unused sockets. + +def _getConfigurationFilename(): + omniorbUserPath = os.getenv("OMNIORB_USER_PATH") + + from salome_utils import generateFileName + portmanager_config = generateFileName(omniorbUserPath, + prefix="omniORB", + suffix="PortManager", + extension="cfg", + hidden=True) + lock_file = portmanager_config + "-lock" + return (portmanager_config, lock_file) +# + +def __checkServer(): + while True: + logger.debug("CHECKING SERVER") + status = __newClient(TEST_SERVER_MSG) + if status == TEST_SERVER_ACK_MSG: + break + return (status == TEST_SERVER_ACK_MSG) +# + +def __getServerAddress(readonly=True): + address = ("localhost", 0) + config_file, lock_file = _getConfigurationFilename() + lock = PortManagerLock(config_file, readonly, blocking=True) + lock.acquire() + try: + address = eval(lock.handle.read()) + except (IOError, SyntaxError) as e: + logger.debug("no configuration file") + pass + finally: + lock.release() + return address +# + +def __setServerAddress(address): + config_file, lock_file = _getConfigurationFilename() + lock = PortManagerLock(config_file, readonly=False, blocking=True) + lock.acquire() + logger.debug("setServerAddress: %s"%str(address)) + lock.handle.write(str(address)) + lock.release() +# + +def __getServer(): + address = __getServerAddress(readonly=False) SocketServer.ThreadingTCPServer.allow_reuse_address = True # can be restarted immediately server = _ThreadedTCPServer(address, _ThreadedTCPRequestHandler, False) # Do not automatically bind server.allow_reuse_address = True # Prevent 'cannot bind to address' errors on restart + server.server_bind() # Manually bind, to support allow_reuse_address + __setServerAddress(server.server_address) + server.server_activate() return server # +pmlock = None def __startServer(): - global pm_address try: - server = __getServer(pm_address) - server.server_bind() # Manually bind, to support allow_reuse_address - server.server_activate() - pm_address = server.server_address + config_file, lock_file = _getConfigurationFilename() + global pmlock + pmlock = PortManagerLock(lock_file, readonly=False, blocking=False) + pmlock.acquire() + server = __getServer() # Start a thread with the server -- that thread will then start one # more thread for each request server_thread = threading.Thread(target=server.serve_forever, name="SALOME_PortManager") @@ -211,50 +344,54 @@ def __startServer(): #server_thread.setDaemon(True) server_thread.start() #print "Server loop running in thread:", server_thread.getName() - #print "Server address:", pm_address - #return address except: - #print "Server already started" - #print "Server address:", pm_address - #return pm_address + logger.debug("Server already started") pass # -def __newClient(address, message): +def __newClient(message): + address = __getServerAddress(readonly=True) try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - #print "connect client to", address + logger.debug("connect client to %s"%str(address)) sock.connect(address) _send(sock, message) response = _receive(sock) if response.startswith(GET_PORT_ACK_MSG): port = int(response[len(GET_PORT_ACK_MSG)+1:]) - #print "GET_PORT:", port + logger.debug("GET_PORT: %s"%port) return port elif response == RELEASE_PORT_ACK_MSG: - #print "Received: %s" % response + logger.debug("Received: %s" % response) return 0 pass + elif response == TEST_SERVER_ACK_MSG: + logger.debug("Server is ok") + return TEST_SERVER_ACK_MSG + pass sock.close() except socket.error: - #print "Unable to connect to server" + logger.debug("Unable to connect to server") return -1 # def getPort(preferedPort=None): if preferedPort: - return __newClient(pm_address, "%s: %s"%(GET_PREFERED_PORT_MSG,preferedPort)) + return __newClient("%s: %s"%(GET_PREFERED_PORT_MSG,preferedPort)) else: - return __newClient(pm_address, GET_PORT_MSG) + return __newClient(GET_PORT_MSG) # def releasePort(port): - __newClient(pm_address, "%s: %s"%(RELEASE_PORT_MSG,port)) + logger.debug("application asks for releasePort %s"%port) + __newClient("%s: %s"%(RELEASE_PORT_MSG,port)) # def stopServer(): - __newClient(pm_address, STOP_SERVER_MSG) + __newClient(STOP_SERVER_MSG) # # Auto start: unique instance ; no effect if called multiple times __startServer() +logger.debug("Server started... do check...") +assert(__checkServer()) diff --git a/bin/parseConfigFile.py b/bin/parseConfigFile.py index 8dd21189f..41c294334 100644 --- a/bin/parseConfigFile.py +++ b/bin/parseConfigFile.py @@ -167,7 +167,8 @@ def __processConfigFile(config, reserved = [], filename="UNKNOWN FILENAME"): pattern = re.compile('\${ ( [^}]* ) }', re.VERBOSE) # string enclosed in ${ and } expandedVal = pattern.sub(r'', expandedVal) # remove matching patterns # Trim colons - expandedVal = _trimColons(expandedVal) + if not "DLIM8VAR" in key: # special case: DISTENE licence key can contain double clons (::) + expandedVal = _trimColons(expandedVal) if key in reservedKeys: shortKey = key[len(ADD_TO_PREFIX):] -- 2.39.2