]> SALOME platform Git repositories - modules/kernel.git/commitdiff
Salome HOME
use file locks
authoraguerre <aguerre>
Tue, 12 Nov 2013 16:58:04 +0000 (16:58 +0000)
committeraguerre <aguerre>
Tue, 12 Nov 2013 16:58:04 +0000 (16:58 +0000)
bin/PortManager.py
bin/parseConfigFile.py

index da1e06cd6732fa1c9e5ab4d12004e96512feae66..56e81f106e017866ca3d3e596159265475e12aac 100644 (file)
@@ -27,6 +27,7 @@ import multiprocessing
 import time
 import socket
 
+import os
 import sys
 import threading
 import SocketServer
@@ -39,6 +40,20 @@ except:
 import struct
 import ctypes
 
+import logging
+def createLogger():
+  logger = logging.getLogger(__name__)
+#  logger.setLevel(logging.DEBUG)
+  ch = logging.StreamHandler()
+  ch.setLevel(logging.DEBUG)
+  formatter = logging.Formatter("%(levelname)s:%(threadName)s:%(message)s")
+  ch.setFormatter(formatter)
+  logger.addHandler(ch)
+  return logger
+#
+logger = createLogger()
+
+
 if sys.platform == 'win32':
   import multiprocessing.reduction    # make sockets pickable/inheritable
 
@@ -78,6 +93,7 @@ class _PortManager(object): # :TODO: must manage lock owner
       return port
   #
   def releasePort(self, port):
+    logger.debug("PortManager.releasePort %s"%port)
     with self.__lock:
       if port in self.__lockedPorts:
         self.__lockedPorts.remove(port)
@@ -151,9 +167,11 @@ GET_PORT_MSG = "GET_PORT"
 GET_PREFERED_PORT_MSG = "GET_PREFERED_PORT"
 RELEASE_PORT_MSG = "RELEASE_PORT"
 STOP_SERVER_MSG = "STOP_SERVER"
+TEST_SERVER_MSG = "TEST_SERVER"
 
 GET_PORT_ACK_MSG = "GET_PORT"
 RELEASE_PORT_ACK_MSG = "RELEASE_PORT"
+TEST_SERVER_ACK_MSG = "TEST_SERVER"
 
 class _ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler):
   def handle(self):
@@ -175,35 +193,150 @@ class _ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler):
       pm.releasePort(port)
       response = "%s" % (RELEASE_PORT_ACK_MSG)
       _send(self.request, response)
-      #print "RELEASE_PORT:", port
+      logger.debug("RELEASE_PORT: %s"%port)
       if not pm.isBusy():
-        #print "Close server"
+        logger.debug("Close server")
+        config_file, lock_file = _getConfigurationFilename()
+        try:
+          os.remove(config_file)
+          pmlock.release()
+          os.remove(lock_file)
+        except:
+          pass
         self.server.shutdown()
       #print pm
     elif data == STOP_SERVER_MSG:
-      #print "Close server"
+      logger.debug("Close server")
       self.server.shutdown()
+    elif data == TEST_SERVER_MSG:
+      _send(self.request, TEST_SERVER_ACK_MSG)
 #
 
 class _ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
     pass
 
-pm_address = ('127.0.0.1', 51843)
-def __getServer(address):
+#------------------------------------
+# A file locker (Linux only)
+import fcntl
+class PortManagerLock:
+  def __init__(self, filename, readonly=False, blocking=True):
+    self.filename = filename
+    # This will create it if it does not exist already
+    logger.debug("Create lock on %s"%filename)
+    mode = 'w'
+    if readonly:
+      mode = 'r'
+    self.handle = open(filename, mode)
+    self.handle.seek(0) # go back to beginning of file to read it multiple times
+    self.__blocking = blocking
+
+  def acquire(self):
+    mode = fcntl.LOCK_EX
+    if not self.__blocking: # Raise an IOError exception if file has already been locked
+      mode = mode | fcntl.LOCK_NB
+    fcntl.flock(self.handle, mode)
+    logger.debug("lock acquired %s"%self.__blocking)
+
+  def release(self):
+    fcntl.flock(self.handle, fcntl.LOCK_UN)
+    logger.debug("lock released")
+
+  def __del__(self):
+    if logger:
+      logger.debug("Close lock file")
+    self.handle.close()
+#
+#------------------------------------
+
+# Server address has to be shared by different processes, without any common
+# ancestor.
+# The "simplest" solution is to define it here as a global variable. Unfortunately,
+# availability of the corresponding socket is not guaranted at all. If PortManager
+# tries to use a socket it does not own, server is not created (it is identified as
+# already existing), clients then connect on this socket but message passing
+# between clients and server will not work and SALOME launch will crash.
+# We can introduce a port verification procedure automatically called by importing
+# this module (i.e. when creating the server). This procedure consists in creating
+# a client which sends a specific message to the server that has to be tested. And
+# loop on port numbers until a free socket is found and associated to a new server.
+#
+# Another approach is to let Python socket API select a free port number, then store
+# it to a file on server host machine in order to be shared with clients.
+# The logical part can be defined as follows. When server is started (by importing
+# this module), write server port number to a specific file (do not use a temporary
+# file name). Each client then read server address from this same file ; if file is
+# not nound, it is an error (add appropriate processing).
+# Server could also check file existence and try to use the same address as previous
+# server in order to avoid opening too many unecessary sockets ; but we need to apply
+# the above verification procedure. This processing is not necessary because TCP socket
+# timeout will automatically close unused sockets.
+
+def _getConfigurationFilename():
+  omniorbUserPath = os.getenv("OMNIORB_USER_PATH")
+
+  from salome_utils import generateFileName
+  portmanager_config = generateFileName(omniorbUserPath,
+                                        prefix="omniORB",
+                                        suffix="PortManager",
+                                        extension="cfg",
+                                        hidden=True)
+  lock_file = portmanager_config + "-lock"
+  return (portmanager_config, lock_file)
+#
+
+def __checkServer():
+  while True:
+    logger.debug("CHECKING SERVER")
+    status = __newClient(TEST_SERVER_MSG)
+    if status == TEST_SERVER_ACK_MSG:
+      break
+  return (status == TEST_SERVER_ACK_MSG)
+#
+
+def __getServerAddress(readonly=True):
+  address = ("localhost", 0)
+  config_file, lock_file = _getConfigurationFilename()
+  lock = PortManagerLock(config_file, readonly, blocking=True)
+  lock.acquire()
+  try:
+    address = eval(lock.handle.read())
+  except (IOError, SyntaxError) as e:
+    logger.debug("no configuration file")
+    pass
+  finally:
+    lock.release()
+    return address
+#
+
+def __setServerAddress(address):
+  config_file, lock_file = _getConfigurationFilename()
+  lock = PortManagerLock(config_file, readonly=False, blocking=True)
+  lock.acquire()
+  logger.debug("setServerAddress: %s"%str(address))
+  lock.handle.write(str(address))
+  lock.release()
+#
+
+def __getServer():
+  address = __getServerAddress(readonly=False)
   SocketServer.ThreadingTCPServer.allow_reuse_address = True # can be restarted immediately
   server = _ThreadedTCPServer(address, _ThreadedTCPRequestHandler, False) # Do not automatically bind
   server.allow_reuse_address = True # Prevent 'cannot bind to address' errors on restart
+  server.server_bind()     # Manually bind, to support allow_reuse_address
+  __setServerAddress(server.server_address)
+  server.server_activate()
   return server
 #
 
+pmlock = None
 def __startServer():
-  global pm_address
   try:
-    server = __getServer(pm_address)
-    server.server_bind()     # Manually bind, to support allow_reuse_address
-    server.server_activate()
-    pm_address = server.server_address
+    config_file, lock_file = _getConfigurationFilename()
+    global pmlock
+    pmlock = PortManagerLock(lock_file, readonly=False, blocking=False)
+    pmlock.acquire()
 
+    server = __getServer()
     # Start a thread with the server -- that thread will then start one
     # more thread for each request
     server_thread = threading.Thread(target=server.serve_forever, name="SALOME_PortManager")
@@ -211,50 +344,54 @@ def __startServer():
     #server_thread.setDaemon(True)
     server_thread.start()
     #print "Server loop running in thread:", server_thread.getName()
-    #print "Server address:", pm_address
-    #return address
   except:
-    #print "Server already started"
-    #print "Server address:", pm_address
-    #return pm_address
+    logger.debug("Server already started")
     pass
 #
 
-def __newClient(address, message):
+def __newClient(message):
+  address = __getServerAddress(readonly=True)
   try:
     sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-    #print "connect client to", address
+    logger.debug("connect client to %s"%str(address))
     sock.connect(address)
     _send(sock, message)
     response = _receive(sock)
     if response.startswith(GET_PORT_ACK_MSG):
       port = int(response[len(GET_PORT_ACK_MSG)+1:])
-      #print "GET_PORT:", port
+      logger.debug("GET_PORT: %s"%port)
       return port
     elif response == RELEASE_PORT_ACK_MSG:
-      #print "Received: %s" % response
+      logger.debug("Received: %s" % response)
       return 0
       pass
+    elif response == TEST_SERVER_ACK_MSG:
+      logger.debug("Server is ok")
+      return TEST_SERVER_ACK_MSG
+      pass
     sock.close()
   except socket.error:
-    #print "Unable to connect to server"
+    logger.debug("Unable to connect to server")
     return -1
 #
 
 def getPort(preferedPort=None):
   if preferedPort:
-    return __newClient(pm_address, "%s: %s"%(GET_PREFERED_PORT_MSG,preferedPort))
+    return __newClient("%s: %s"%(GET_PREFERED_PORT_MSG,preferedPort))
   else:
-    return __newClient(pm_address, GET_PORT_MSG)
+    return __newClient(GET_PORT_MSG)
 #
 
 def releasePort(port):
-  __newClient(pm_address, "%s: %s"%(RELEASE_PORT_MSG,port))
+  logger.debug("application asks for releasePort %s"%port)
+  __newClient("%s: %s"%(RELEASE_PORT_MSG,port))
 #
 
 def stopServer():
-  __newClient(pm_address, STOP_SERVER_MSG)
+  __newClient(STOP_SERVER_MSG)
 #
 
 # Auto start: unique instance ; no effect if called multiple times
 __startServer()
+logger.debug("Server started... do check...")
+assert(__checkServer())
index 8dd21189f503699cf8dfeb19684368826df8c46d..41c294334191cce7b4f00df73c986e816a5e7e5c 100644 (file)
@@ -167,7 +167,8 @@ def __processConfigFile(config, reserved = [], filename="UNKNOWN FILENAME"):
         pattern = re.compile('\${ ( [^}]* ) }', re.VERBOSE) # string enclosed in ${ and }
         expandedVal = pattern.sub(r'', expandedVal) # remove matching patterns
         # Trim colons
-        expandedVal = _trimColons(expandedVal)
+        if not "DLIM8VAR" in key: # special case: DISTENE licence key can contain double clons (::)
+          expandedVal = _trimColons(expandedVal)
 
         if key in reservedKeys:
           shortKey = key[len(ADD_TO_PREFIX):]