Salome HOME
Merge remote branch 'origin/abn/newpy_pv4-1' into V7_3_1_BR
[modules/kernel.git] / bin / PortManager.py
index 6c4ecb59640de9271969825b81ceb45cb057690f..0ad853a9bc05e0594413c847706a8a4339d157db 100644 (file)
 #
 from Singleton import Singleton
 
-from multiprocessing.managers import SyncManager
 import multiprocessing
 import time
 import socket
-import Queue
-import sys
+
 import os
+import sys
+import threading
+import SocketServer
+
+try:
+  import cPickle as pickle
+except:
+  import pickle
+
+import struct
+import ctypes
+
+import logging
+def createLogger():
+  logger = logging.getLogger(__name__)
+#  logger.setLevel(logging.DEBUG)
+  ch = logging.StreamHandler()
+  ch.setLevel(logging.DEBUG)
+  formatter = logging.Formatter("%(levelname)s:%(threadName)s:%(message)s")
+  ch.setFormatter(formatter)
+  logger.addHandler(ch)
+  return logger
+#
+logger = createLogger()
+
+
+if sys.platform == 'win32':
+  import multiprocessing.reduction    # make sockets pickable/inheritable
+
+multiprocessing.freeze_support() # Add support for when a program which uses multiprocessing has been frozen to produce a Windows executable.
 
 """
 This class handles port distribution for SALOME sessions.
 In case of concurent sessions, each will have its own port number.
 """
-class PortManager(object): # :TODO: must manage lock owner
+class _PortManager(object): # :TODO: must manage lock owner
   __metaclass__ = Singleton
   #
   def __init__(self, startNumber = 2810, limit = 100, timeout=60):
-    super(PortManager, self).__init__()
+    super(_PortManager, self).__init__()
     self.__startNumber = startNumber
     self.__limit = startNumber + limit
     self.__lockedPorts = []
@@ -47,22 +75,25 @@ class PortManager(object): # :TODO: must manage lock owner
     self.__timeout = timeout
     self.__lastChangeTime = time.time()
   #
-  def getPort(self):
+  # Test for prefered port number, if asked.
+  def getPort(self, port=None):
     with self.__lock:
-      port = self.__startNumber
-      while self.isPortUsed(port):
-        if port == self.__limit:
-          msg  = "\n"
-          msg += "Can't find a free port to launch omniNames\n"
-          msg += "Try to kill the running servers and then launch SALOME again.\n"
-          raise RuntimeError, msg
-        port = port + 1
-
+      if not port or self.isPortUsed(port):
+        port = self.__startNumber
+        while self.isPortUsed(port):
+          if port == self.__limit:
+            msg  = "\n"
+            msg += "Can't find a free port to launch omniNames\n"
+            msg += "Try to kill the running servers and then launch SALOME again.\n"
+            raise RuntimeError, msg
+          port = port + 1
+      #
       self.__lockedPorts.append(port)
       self.__lastChangeTime = time.time()
       return port
   #
   def releasePort(self, port):
+    logger.debug("PortManager.releasePort %s"%port)
     with self.__lock:
       if port in self.__lockedPorts:
         self.__lockedPorts.remove(port)
@@ -98,103 +129,269 @@ class PortManager(object): # :TODO: must manage lock owner
   #
   def __str__(self):
     with self.__lock:
-      return "PortManager: list of locked ports:" + str(self.__lockedPorts)
+      return "_PortManager: list of locked ports:" + str(sorted(self.__lockedPorts))
   #
 #
 
-def __build_server(ip, port, authkey):
-  message_queue = multiprocessing.Queue()
+#------------------------------------
+# Communication methods
 
-  class MyManager(SyncManager):
-    pass
-
-  MyManager.register("PortManager", PortManager, exposed=['getPort', 'releasePort', 'isBusy', 'isPortUsed', 'timeout', '__str__'])
-  MyManager.register("get_message_queue", callable=lambda: message_queue)
+_marshall = pickle.dumps
+_unmarshall = pickle.loads
 
-  manager = MyManager(address=(ip, port), authkey=authkey)
+def _send(channel, *args):
+  buf = _marshall(args)
+  value = socket.htonl(len(buf))
+  size = struct.pack("L",value)
+  channel.send(size)
+  channel.send(buf)
+#
 
+def _receive(channel):
+  size = struct.calcsize("L")
+  size = channel.recv(size)
   try:
-    manager.get_server()
-    manager.start()
-  except (EOFError, socket.error):
-    print 'Server already started on %s:%s'%(ip,port)
-    sys.exit(1)
+    size = socket.ntohl(struct.unpack("L", size)[0])
+  except struct.error, e:
+    return ''
+
+  buf = ""
+  while len(buf) < size:
+    buf = channel.recv(size - len(buf))
+
+  return _unmarshall(buf)[0]
+#
+#------------------------------------
+
+GET_PORT_MSG = "GET_PORT"
+GET_PREFERED_PORT_MSG = "GET_PREFERED_PORT"
+RELEASE_PORT_MSG = "RELEASE_PORT"
+STOP_SERVER_MSG = "STOP_SERVER"
+TEST_SERVER_MSG = "TEST_SERVER"
+
+GET_PORT_ACK_MSG = "GET_PORT"
+RELEASE_PORT_ACK_MSG = "RELEASE_PORT"
+TEST_SERVER_ACK_MSG = "TEST_SERVER"
 
-  return manager
+class _ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler):
+  def handle(self):
+    data = _receive(self.request)
+    if data == GET_PORT_MSG:
+      pm = _PortManager()
+      port = pm.getPort()
+      response = "%s: %s" % (GET_PORT_ACK_MSG, port)
+      _send(self.request, response)
+    elif data.startswith(GET_PREFERED_PORT_MSG):
+      port = int(data[len(GET_PREFERED_PORT_MSG)+1:])
+      pm = _PortManager()
+      port = pm.getPort(port)
+      response = "%s: %s" % (GET_PORT_ACK_MSG, port)
+      _send(self.request, response)
+    elif data.startswith(RELEASE_PORT_MSG):
+      port = int(data[len(RELEASE_PORT_MSG)+1:])
+      pm = _PortManager()
+      pm.releasePort(port)
+      response = "%s" % (RELEASE_PORT_ACK_MSG)
+      _send(self.request, response)
+      logger.debug("RELEASE_PORT: %s"%port)
+      if not pm.isBusy():
+        logger.debug("Close server")
+        config_file, lock_file = _getConfigurationFilename()
+        try:
+          os.remove(config_file)
+          pmlock.release()
+          os.remove(lock_file)
+        except:
+          pass
+        self.server.shutdown()
+      #print pm
+    elif data == STOP_SERVER_MSG:
+      logger.debug("Close server")
+      self.server.shutdown()
+    elif data == TEST_SERVER_MSG:
+      _send(self.request, TEST_SERVER_ACK_MSG)
 #
 
-def __build_client(ip, port, authkey):
-  class MyManager(SyncManager):
+class _ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
     pass
 
-  MyManager.register("PortManager")
-  MyManager.register("get_message_queue")
+#------------------------------------
+# A file locker (Linux only)
+import fcntl
+class PortManagerLock:
+  def __init__(self, filename, readonly=False, blocking=True):
+    self.filename = filename
+    # This will create it if it does not exist already
+    logger.debug("Create lock on %s"%filename)
+    mode = 'w'
+    if readonly:
+      mode = 'r'
+    self.handle = open(filename, mode)
+    self.handle.seek(0) # go back to beginning of file to read it multiple times
+    self.__blocking = blocking
 
-  manager = MyManager(address=(ip, port), authkey=authkey)
-  try:
-    manager.connect()
-  except socket.error:
-    raise Exception("Unable to connect to server on %s:%s"%(ip, port))
-  return manager
+  def acquire(self):
+    mode = fcntl.LOCK_EX
+    if not self.__blocking: # Raise an IOError exception if file has already been locked
+      mode = mode | fcntl.LOCK_NB
+    fcntl.flock(self.handle, mode)
+    logger.debug("lock acquired %s"%self.__blocking)
+
+  def release(self):
+    fcntl.flock(self.handle, fcntl.LOCK_UN)
+    logger.debug("lock released")
+
+  def __del__(self):
+    if logger:
+      logger.debug("Close lock file")
+    self.handle.close()
 #
+#------------------------------------
 
-def __run_server(ip, port, authkey, timeout):
-  theserver = __build_server(ip, port, authkey)
-  shared_mesq = theserver.get_message_queue()
-  print 'PortManager server started on %s:%s'%(ip,port)
-  portManager = theserver.PortManager(timeout=timeout)
+# Server address has to be shared by different processes, without any common
+# ancestor.
+# The "simplest" solution is to define it here as a global variable. Unfortunately,
+# availability of the corresponding socket is not guaranted at all. If PortManager
+# tries to use a socket it does not own, server is not created (it is identified as
+# already existing), clients then connect on this socket but message passing
+# between clients and server will not work and SALOME launch will crash.
+# We can introduce a port verification procedure automatically called by importing
+# this module (i.e. when creating the server). This procedure consists in creating
+# a client which sends a specific message to the server that has to be tested. And
+# loop on port numbers until a free socket is found and associated to a new server.
+#
+# Another approach is to let Python socket API select a free port number, then store
+# it to a file on server host machine in order to be shared with clients.
+# The logical part can be defined as follows. When server is started (by importing
+# this module), write server port number to a specific file (do not use a temporary
+# file name). Each client then read server address from this same file ; if file is
+# not nound, it is an error (add appropriate processing).
+# Server could also check file existence and try to use the same address as previous
+# server in order to avoid opening too many unecessary sockets ; but we need to apply
+# the above verification procedure. This processing is not necessary because TCP socket
+# timeout will automatically close unused sockets.
 
-  while portManager.isBusy() or not portManager.timeout():
-    try:
-      message = shared_mesq.get(block=False)
-      print message
-    except Queue.Empty:
-      pass
+def _getConfigurationFilename():
+  omniorbUserPath = os.getenv("OMNIORB_USER_PATH")
+
+  from salome_utils import generateFileName
+  portmanager_config = generateFileName(omniorbUserPath,
+                                        prefix="omniORB",
+                                        suffix="PortManager",
+                                        extension="cfg",
+                                        hidden=True)
+  lock_file = portmanager_config + "-lock"
+  return (portmanager_config, lock_file)
+#
 
-  print "PortManager server is shuting down..."
-  time.sleep(2)
-  theserver.shutdown()
+def __checkServer():
+  while True:
+    logger.debug("CHECKING SERVER")
+    status = __newClient(TEST_SERVER_MSG)
+    if status == TEST_SERVER_ACK_MSG:
+      break
+  return (status == TEST_SERVER_ACK_MSG)
 #
 
+def __getServerAddress(readonly=True):
+  address = ("localhost", 0)
+  try:
+    config_file, lock_file = _getConfigurationFilename()
+    lock = PortManagerLock(config_file, readonly, blocking=True)
+    lock.acquire()
+    address = eval(lock.handle.read())
+    lock.release()
+  except (IOError, SyntaxError) as e:
+    logger.debug("no configuration file")
+    pass
+  finally:
+    return address
 #
-semaphore = None
+
+def __setServerAddress(address):
+  config_file, lock_file = _getConfigurationFilename()
+  lock = PortManagerLock(config_file, readonly=False, blocking=True)
+  lock.acquire()
+  logger.debug("setServerAddress: %s"%str(address))
+  lock.handle.write(str(address))
+  lock.release()
 #
-def __run_client(manager, execute_func):
-  with semaphore:
-    name = multiprocessing.current_process().name
-    processId = os.getpid()
 
-    # do the job
-    if execute_func:
-      execute_func(manager, name, processId)
+def __getServer():
+  address = __getServerAddress(readonly=False)
+  SocketServer.ThreadingTCPServer.allow_reuse_address = True # can be restarted immediately
+  server = _ThreadedTCPServer(address, _ThreadedTCPRequestHandler, False) # Do not automatically bind
+  server.allow_reuse_address = True # Prevent 'cannot bind to address' errors on restart
+  server.server_bind()     # Manually bind, to support allow_reuse_address
+  __setServerAddress(server.server_address)
+  server.server_activate()
+  return server
 #
 
+pmlock = None
+def __startServer():
+  try:
+    config_file, lock_file = _getConfigurationFilename()
+    global pmlock
+    pmlock = PortManagerLock(lock_file, readonly=False, blocking=False)
+    pmlock.acquire()
+
+    server = __getServer()
+    # Start a thread with the server -- that thread will then start one
+    # more thread for each request
+    server_thread = threading.Thread(target=server.serve_forever, name="SALOME_PortManager")
+    # Exit the server thread when the main thread terminates
+    #server_thread.setDaemon(True)
+    server_thread.start()
+    #print "Server loop running in thread:", server_thread.getName()
+  except:
+    logger.debug("Server already started")
+    pass
 #
-local_ip = '127.0.0.1'
-port = 5000
-authkey = 'salome_port_manager_access'
+
+def __newClient(message):
+  address = __getServerAddress(readonly=True)
+  try:
+    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+    logger.debug("connect client to %s"%str(address))
+    sock.connect(address)
+    _send(sock, message)
+    response = _receive(sock)
+    if response.startswith(GET_PORT_ACK_MSG):
+      port = int(response[len(GET_PORT_ACK_MSG)+1:])
+      logger.debug("GET_PORT: %s"%port)
+      return port
+    elif response == RELEASE_PORT_ACK_MSG:
+      logger.debug("Received: %s" % response)
+      return 0
+      pass
+    elif response == TEST_SERVER_ACK_MSG:
+      logger.debug("Server is ok")
+      return TEST_SERVER_ACK_MSG
+      pass
+    sock.close()
+  except socket.error:
+    logger.debug("Unable to connect to server")
+    return -1
 #
-lock = multiprocessing.Lock()
-def start_server(nbSimultaneous=10, timeout=10):
-  with lock:
-    procServer = multiprocessing.Process(target=__run_server, args=(local_ip,port,authkey,timeout,))
-    procServer.start()
-    global semaphore
-    semaphore = multiprocessing.Semaphore(nbSimultaneous)
-    time.sleep(2)
+
+def getPort(preferedPort=None):
+  if preferedPort:
+    return __newClient("%s: %s"%(GET_PREFERED_PORT_MSG,preferedPort))
+  else:
+    return __newClient(GET_PORT_MSG)
 #
 
-def start_client(ip=local_ip, execute_func=None, name="anonymous"):
-  manager = __build_client(ip, port, authkey)
-  p = multiprocessing.Process(target=__run_client, name=name, args=(manager,execute_func,))
-  p.start()
-  return manager
+def releasePort(port):
+  logger.debug("application asks for releasePort %s"%port)
+  __newClient("%s: %s"%(RELEASE_PORT_MSG,port))
 #
 
-client_id = 0
-def start_clients(nbClients, ip, execute_func, name_prefix="Client"):
-  global client_id
-  for i in range(nbClients):
-    start_client(ip, execute_func, name=name_prefix+"_"+str(client_id))
-    client_id = client_id + 1
+def stopServer():
+  __newClient(STOP_SERVER_MSG)
 #
+
+# Auto start: unique instance ; no effect if called multiple times
+__startServer()
+logger.debug("Server started... do check...")
+assert(__checkServer())