Salome HOME
prevent from double session close
[modules/kernel.git] / bin / PortManager.py
index cd9664b51cabe3d5769a95dea6211deb8a57b748..8791bfc94c897ecdcd31e557760517a8122c7f22 100644 (file)
@@ -1,6 +1,6 @@
 #!/usr/bin/env python
 #  -*- coding: iso-8859-1 -*-
-# Copyright (C) 2007-2013  CEA/DEN, EDF R&D, OPEN CASCADE
+# Copyright (C) 2007-2015  CEA/DEN, EDF R&D, OPEN CASCADE
 #
 # Copyright (C) 2003-2007  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
 # CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
@@ -8,7 +8,7 @@
 # This library is free software; you can redistribute it and/or
 # modify it under the terms of the GNU Lesser General Public
 # License as published by the Free Software Foundation; either
-# version 2.1 of the License.
+# version 2.1 of the License, or (at your option) any later version.
 #
 # This library is distributed in the hope that it will be useful,
 # but WITHOUT ANY WARRANTY; without even the implied warranty of
 #
 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
 #
-from Singleton import Singleton
-
-import multiprocessing
-import time
-import socket
-
+import os
 import sys
-import threading
-import SocketServer
 
 try:
-  import cPickle as pickle
+  import cPickle as pickle #@UnusedImport
 except:
-  import pickle
-
-import struct
-import ctypes
-
-if sys.platform == 'win32':
-  import multiprocessing.reduction    # make sockets pickable/inheritable
-
-multiprocessing.freeze_support() # Add support for when a program which uses multiprocessing has been frozen to produce a Windows executable.
-#ignore = multiprocessing.active_children()      # cleanup any old processes
+  import pickle #@Reimport
 
-"""
-This class handles port distribution for SALOME sessions.
-In case of concurent sessions, each will have its own port number.
-"""
-class _PortManager(object): # :TODO: must manage lock owner
-  __metaclass__ = Singleton
-  #
-  def __init__(self, startNumber = 2810, limit = 100, timeout=60):
-    super(_PortManager, self).__init__()
-    self.__startNumber = startNumber
-    self.__limit = startNumber + limit
-    self.__lockedPorts = []
-    self.__lock = multiprocessing.Lock()
-    self.__timeout = timeout
-    self.__lastChangeTime = time.time()
-  #
-  # Test for prefered port number, if asked.
-  def getPort(self, port=None):
-    with self.__lock:
-      if not port or self.isPortUsed(port):
-        port = self.__startNumber
-        while self.isPortUsed(port):
-          if port == self.__limit:
-            msg  = "\n"
-            msg += "Can't find a free port to launch omniNames\n"
-            msg += "Try to kill the running servers and then launch SALOME again.\n"
-            raise RuntimeError, msg
-          port = port + 1
-      #
-      self.__lockedPorts.append(port)
-      self.__lastChangeTime = time.time()
-      return port
-  #
-  def releasePort(self, port):
-    with self.__lock:
-      if port in self.__lockedPorts:
-        self.__lockedPorts.remove(port)
-        self.__lastChangeTime = time.time()
-  #
-  def isBusy(self):
-    return len(self.__lockedPorts)
-  #
-  def isPortUsed(self, port):
-    return (port in self.__lockedPorts) or self.__isNetworkConnectionActiveOnPort(port)
-  #
-  def __isNetworkConnectionActiveOnPort(self, port):
-    # :NOTE: Under windows:
-    #        netstat options -l and -t are unavailable
-    #        grep command is unavailable
-    from subprocess import Popen, PIPE
-    (stdout, stderr) = Popen(['netstat','-an'], stdout=PIPE).communicate()
-    import StringIO
-    buf = StringIO.StringIO(stdout)
-    ports = buf.readlines()
-    # search for TCP - LISTEN connections
-    import re
-    regObj = re.compile( ".*tcp.*:([0-9]+).*:.*listen", re.IGNORECASE );
-    for item in ports:
-      try:
-        p = int(regObj.match(item).group(1))
-        if p == port: return True
-      except:
-        pass
-  #
-  def timeout(self):
-    return (time.time() - self.__lastChangeTime > self.__timeout)
-  #
-  def __str__(self):
-    with self.__lock:
-      return "_PortManager: list of locked ports:" + str(sorted(self.__lockedPorts))
-  #
+import logging
+def createLogger():
+  logger = logging.getLogger(__name__)
+#  logger.setLevel(logging.DEBUG)
+  ch = logging.StreamHandler()
+  ch.setLevel(logging.DEBUG)
+  formatter = logging.Formatter("%(levelname)s:%(threadName)s:%(message)s")
+  ch.setFormatter(formatter)
+  logger.addHandler(ch)
+  return logger
 #
+logger = createLogger()
 
 #------------------------------------
-# Communication methods
-
-_marshall = pickle.dumps
-_unmarshall = pickle.loads
-
-def _send(channel, *args):
-  buf = _marshall(args)
-  value = socket.htonl(len(buf))
-  size = struct.pack("L",value)
-  channel.send(size)
-  channel.send(buf)
+# A file locker (Linux only)
+def __acquire_lock(lock):
+  if sys.platform == "win32":
+    import msvcrt
+    # lock 1 byte: file is supposed to be zero-byte long
+    msvcrt.locking(lock.fileno(), msvcrt.LK_LOCK, 1)
+  else:
+    import fcntl
+    fcntl.flock(lock, fcntl.LOCK_EX)
+#
+def __release_lock(lock):
+  if sys.platform == "win32":
+    import msvcrt
+    msvcrt.locking(lock.fileno(), msvcrt.LK_UNLCK, 1)
+  else:
+    import fcntl
+    fcntl.flock(lock, fcntl.LOCK_UN)
+#
+
+def _getConfigurationFilename():
+  omniorbUserPath = os.getenv("OMNIORB_USER_PATH")
+
+  from salome_utils import generateFileName
+  portmanager_config = generateFileName(omniorbUserPath,
+                                        prefix="omniORB",
+                                        suffix="PortManager",
+                                        extension="cfg",
+                                        hidden=True)
+  import tempfile
+  temp = tempfile.NamedTemporaryFile()
+  lock_file = os.path.join(os.path.dirname(temp.name), ".omniORB_PortManager.lock")
+  temp.close()
+
+  return (portmanager_config, lock_file)
+#
+
+def __isPortUsed(port, busy_ports):
+  return (port in busy_ports) or __isNetworkConnectionActiveOnPort(port)
+#
+
+def __isNetworkConnectionActiveOnPort(port):
+  # :NOTE: Under windows:
+  #        netstat options -l and -t are unavailable
+  #        grep command is unavailable
+  from subprocess import Popen, PIPE
+  stdout, _ = Popen(['netstat','-an'], stdout=PIPE).communicate()
+  import StringIO
+  buf = StringIO.StringIO(stdout)
+  ports = buf.readlines()
+  # search for TCP - LISTEN connections
+  import re
+  regObj = re.compile( ".*tcp.*:([0-9]+).*:.*listen", re.IGNORECASE );
+  for item in ports:
+    try:
+      p = int(regObj.match(item).group(1))
+      if p == port: return True
+    except:
+      pass
 #
 
-def _receive(channel):
-  size = struct.calcsize("L")
-  size = channel.recv(size)
-  try:
-    size = socket.ntohl(struct.unpack("L", size)[0])
-  except struct.error, e:
-    return ''
+def getPort(preferedPort=None):
+  logger.debug("GET PORT")
+
+  config_file, lock_file = _getConfigurationFilename()
+  oldmask = os.umask(0)
+  with open(lock_file, 'w') as lock:
+    # acquire lock
+    __acquire_lock(lock)
+
+    # read config
+    config = {'busy_ports':[]}
+    logger.debug("read configuration file")
+    try:
+      with open(config_file, 'r') as f:
+        config = pickle.load(f)
+    except IOError: # empty file
+      pass
 
-  buf = ""
-  while len(buf) < size:
-    buf = channel.recv(size - len(buf))
+    logger.debug("load busy_ports: %s"%str(config["busy_ports"]))
+
+    # append port
+    busy_ports = config["busy_ports"]
+    port = preferedPort
+    if not port or __isPortUsed(port, busy_ports):
+      port = 2810
+      while __isPortUsed(port, busy_ports):
+        if port == 2810+100:
+          msg  = "\n"
+          msg += "Can't find a free port to launch omniNames\n"
+          msg += "Try to kill the running servers and then launch SALOME again.\n"
+          raise RuntimeError, msg
+        port = port + 1
+    logger.debug("found free port: %s"%str(port))
+    config["busy_ports"].append(port)
+
+    # write config
+    logger.debug("write busy_ports: %s"%str(config["busy_ports"]))
+    try:
+      with open(config_file, 'w') as f:
+        pickle.dump(config, f)
+    except IOError:
+      pass
+
+    # release lock
+    __release_lock(lock)
+  #
 
-  return _unmarshall(buf)[0]
+  os.umask(oldmask)
+  logger.debug("get port: %s"%str(port))
+  return port
 #
-#------------------------------------
 
-GET_PORT_MSG = "GET_PORT"
-GET_PREFERED_PORT_MSG = "GET_PREFERED_PORT"
-RELEASE_PORT_MSG = "RELEASE_PORT"
-STOP_SERVER_MSG = "STOP_SERVER"
+def releasePort(port):
+  port = int(port)
+  logger.debug("RELEASE PORT (%s)"%port)
+
+  config_file, lock_file = _getConfigurationFilename()
+  with open(lock_file, 'w') as lock:
+    # acquire lock
+    __acquire_lock(lock)
+
+    # read config
+    config = {'busy_ports':[]}
+    logger.debug("read configuration file")
+    try:
+      with open(config_file, 'r') as f:
+        config = pickle.load(f)
+    except IOError: # empty file
+      pass
 
-GET_PORT_ACK_MSG = "GET_PORT"
-RELEASE_PORT_ACK_MSG = "RELEASE_PORT"
+    logger.debug("load busy_ports: %s"%str(config["busy_ports"]))
 
-class _ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler):
-  def handle(self):
-    data = _receive(self.request)
-    if data == GET_PORT_MSG:
-      pm = _PortManager()
-      port = pm.getPort()
-      response = "%s: %s" % (GET_PORT_ACK_MSG, port)
-      _send(self.request, response)
-    elif data.startswith(GET_PREFERED_PORT_MSG):
-      port = int(data[len(GET_PREFERED_PORT_MSG)+1:])
-      pm = _PortManager()
-      port = pm.getPort(port)
-      response = "%s: %s" % (GET_PORT_ACK_MSG, port)
-      _send(self.request, response)
-    elif data.startswith(RELEASE_PORT_MSG):
-      port = int(data[len(RELEASE_PORT_MSG)+1:])
-      pm = _PortManager()
-      pm.releasePort(port)
-      response = "%s" % (RELEASE_PORT_ACK_MSG)
-      _send(self.request, response)
-      #print "RELEASE_PORT:", port
-      if not pm.isBusy():
-        #print "Close server"
-        self.server.shutdown()
-      #print pm
-    elif data == STOP_SERVER_MSG:
-      #print "Close server"
-      self.server.shutdown()
-#
+    # remove port from list
+    busy_ports = config["busy_ports"]
 
-class _ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
-    pass
+    if port in busy_ports:
+      busy_ports.remove(port)
+      config["busy_ports"] = busy_ports
 
-def __getServer(address):
-  SocketServer.ThreadingTCPServer.allow_reuse_address = True # can be restarted immediately
-  server = _ThreadedTCPServer(address, _ThreadedTCPRequestHandler, False) # Do not automatically bind
-  server.allow_reuse_address = True # Prevent 'cannot bind to address' errors on restart
-  return server
-#
+    # write config
+    logger.debug("write busy_ports: %s"%str(config["busy_ports"]))
+    try:
+      with open(config_file, 'w') as f:
+        pickle.dump(config, f)
+    except IOError:
+      pass
 
-pm_address = ('localhost', 12345)
-def __startServer():
-  try:
-    server = __getServer(pm_address)
-    server.server_bind()     # Manually bind, to support allow_reuse_address
-    server.server_activate()
-    address = server.server_address
+    # release lock
+    __release_lock(lock)
 
-    # Start a thread with the server -- that thread will then start one
-    # more thread for each request
-    server_thread = threading.Thread(target=server.serve_forever, name="SALOME_PortManager")
-    # Exit the server thread when the main thread terminates
-    #server_thread.setDaemon(True)
-    server_thread.start()
-    #print "Server loop running in thread:", server_thread.getName()
-    #print "Server address:", address
-    #return address
-  except:
-    #print "Server already started"
-    #print "Server address:", pm_address
-    #return pm_address
-    pass
+    logger.debug("released port port: %s"%str(port))
 #
 
-def __newClient(address, message):
-  try:
-    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-    sock.connect(address)
-    _send(sock, message)
-    response = _receive(sock)
-    if response.startswith(GET_PORT_ACK_MSG):
-      port = int(response[len(GET_PORT_ACK_MSG)+1:])
-      #print "GET_PORT:", port
-      return port
-    elif response == RELEASE_PORT_ACK_MSG:
-      #print "Received: %s" % response
-      return 0
+def getBusyPorts():
+  config_file, lock_file = _getConfigurationFilename()
+  with open(lock_file, 'w') as lock:
+    # acquire lock
+    __acquire_lock(lock)
+
+    # read config
+    config = {'busy_ports':[]}
+    logger.debug("read configuration file")
+    try:
+      with open(config_file, 'r') as f:
+        config = pickle.load(f)
+    except IOError: # empty file
       pass
-    sock.close()
-  except socket.error:
-    #print "Unable to connect to server"
-    return -1
-#
 
-def getPort(preferedPort=None):
-  if preferedPort:
-    return __newClient(pm_address, "%s: %s"%(GET_PREFERED_PORT_MSG,preferedPort))
-  else:
-    return __newClient(pm_address, GET_PORT_MSG)
-#
+    logger.debug("load busy_ports: %s"%str(config["busy_ports"]))
 
-def releasePort(port):
-  __newClient(pm_address, "%s: %s"%(RELEASE_PORT_MSG,port))
-#
+    busy_ports = config["busy_ports"]
+    # release lock
+    __release_lock(lock)
 
-def stopServer():
-  __newClient(pm_address, STOP_SERVER_MSG)
+    return busy_ports
 #
-
-# Auto start: unique instance ; no effect if called multiple times
-__startServer()
-#server_thread = threading.Thread(target=__startServer, name="SALOME_PortManager")
-#server_thread.setDaemon(True)
-#server_thread.start()