Salome HOME
prevent from double session close
[modules/kernel.git] / bin / PortManager.py
index 6c4ecb59640de9271969825b81ceb45cb057690f..8791bfc94c897ecdcd31e557760517a8122c7f22 100644 (file)
@@ -1,6 +1,6 @@
 #!/usr/bin/env python
 #  -*- coding: iso-8859-1 -*-
-# Copyright (C) 2007-2013  CEA/DEN, EDF R&D, OPEN CASCADE
+# Copyright (C) 2007-2015  CEA/DEN, EDF R&D, OPEN CASCADE
 #
 # Copyright (C) 2003-2007  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
 # CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
@@ -8,7 +8,7 @@
 # This library is free software; you can redistribute it and/or
 # modify it under the terms of the GNU Lesser General Public
 # License as published by the Free Software Foundation; either
-# version 2.1 of the License.
+# version 2.1 of the License, or (at your option) any later version.
 #
 # This library is distributed in the hope that it will be useful,
 # but WITHOUT ANY WARRANTY; without even the implied warranty of
 #
 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
 #
-from Singleton import Singleton
-
-from multiprocessing.managers import SyncManager
-import multiprocessing
-import time
-import socket
-import Queue
-import sys
 import os
+import sys
 
-"""
-This class handles port distribution for SALOME sessions.
-In case of concurent sessions, each will have its own port number.
-"""
-class PortManager(object): # :TODO: must manage lock owner
-  __metaclass__ = Singleton
-  #
-  def __init__(self, startNumber = 2810, limit = 100, timeout=60):
-    super(PortManager, self).__init__()
-    self.__startNumber = startNumber
-    self.__limit = startNumber + limit
-    self.__lockedPorts = []
-    self.__lock = multiprocessing.Lock()
-    self.__timeout = timeout
-    self.__lastChangeTime = time.time()
-  #
-  def getPort(self):
-    with self.__lock:
-      port = self.__startNumber
-      while self.isPortUsed(port):
-        if port == self.__limit:
+try:
+  import cPickle as pickle #@UnusedImport
+except:
+  import pickle #@Reimport
+
+import logging
+def createLogger():
+  logger = logging.getLogger(__name__)
+#  logger.setLevel(logging.DEBUG)
+  ch = logging.StreamHandler()
+  ch.setLevel(logging.DEBUG)
+  formatter = logging.Formatter("%(levelname)s:%(threadName)s:%(message)s")
+  ch.setFormatter(formatter)
+  logger.addHandler(ch)
+  return logger
+#
+logger = createLogger()
+
+#------------------------------------
+# A file locker (Linux only)
+def __acquire_lock(lock):
+  if sys.platform == "win32":
+    import msvcrt
+    # lock 1 byte: file is supposed to be zero-byte long
+    msvcrt.locking(lock.fileno(), msvcrt.LK_LOCK, 1)
+  else:
+    import fcntl
+    fcntl.flock(lock, fcntl.LOCK_EX)
+#
+def __release_lock(lock):
+  if sys.platform == "win32":
+    import msvcrt
+    msvcrt.locking(lock.fileno(), msvcrt.LK_UNLCK, 1)
+  else:
+    import fcntl
+    fcntl.flock(lock, fcntl.LOCK_UN)
+#
+
+def _getConfigurationFilename():
+  omniorbUserPath = os.getenv("OMNIORB_USER_PATH")
+
+  from salome_utils import generateFileName
+  portmanager_config = generateFileName(omniorbUserPath,
+                                        prefix="omniORB",
+                                        suffix="PortManager",
+                                        extension="cfg",
+                                        hidden=True)
+  import tempfile
+  temp = tempfile.NamedTemporaryFile()
+  lock_file = os.path.join(os.path.dirname(temp.name), ".omniORB_PortManager.lock")
+  temp.close()
+
+  return (portmanager_config, lock_file)
+#
+
+def __isPortUsed(port, busy_ports):
+  return (port in busy_ports) or __isNetworkConnectionActiveOnPort(port)
+#
+
+def __isNetworkConnectionActiveOnPort(port):
+  # :NOTE: Under windows:
+  #        netstat options -l and -t are unavailable
+  #        grep command is unavailable
+  from subprocess import Popen, PIPE
+  stdout, _ = Popen(['netstat','-an'], stdout=PIPE).communicate()
+  import StringIO
+  buf = StringIO.StringIO(stdout)
+  ports = buf.readlines()
+  # search for TCP - LISTEN connections
+  import re
+  regObj = re.compile( ".*tcp.*:([0-9]+).*:.*listen", re.IGNORECASE );
+  for item in ports:
+    try:
+      p = int(regObj.match(item).group(1))
+      if p == port: return True
+    except:
+      pass
+#
+
+def getPort(preferedPort=None):
+  logger.debug("GET PORT")
+
+  config_file, lock_file = _getConfigurationFilename()
+  oldmask = os.umask(0)
+  with open(lock_file, 'w') as lock:
+    # acquire lock
+    __acquire_lock(lock)
+
+    # read config
+    config = {'busy_ports':[]}
+    logger.debug("read configuration file")
+    try:
+      with open(config_file, 'r') as f:
+        config = pickle.load(f)
+    except IOError: # empty file
+      pass
+
+    logger.debug("load busy_ports: %s"%str(config["busy_ports"]))
+
+    # append port
+    busy_ports = config["busy_ports"]
+    port = preferedPort
+    if not port or __isPortUsed(port, busy_ports):
+      port = 2810
+      while __isPortUsed(port, busy_ports):
+        if port == 2810+100:
           msg  = "\n"
           msg += "Can't find a free port to launch omniNames\n"
           msg += "Try to kill the running servers and then launch SALOME again.\n"
           raise RuntimeError, msg
         port = port + 1
+    logger.debug("found free port: %s"%str(port))
+    config["busy_ports"].append(port)
 
-      self.__lockedPorts.append(port)
-      self.__lastChangeTime = time.time()
-      return port
-  #
-  def releasePort(self, port):
-    with self.__lock:
-      if port in self.__lockedPorts:
-        self.__lockedPorts.remove(port)
-        self.__lastChangeTime = time.time()
-  #
-  def isBusy(self):
-    return len(self.__lockedPorts)
-  #
-  def isPortUsed(self, port):
-    return (port in self.__lockedPorts) or self.__isNetworkConnectionActiveOnPort(port)
-  #
-  def __isNetworkConnectionActiveOnPort(self, port):
-    # :NOTE: Under windows:
-    #        netstat options -l and -t are unavailable
-    #        grep command is unavailable
-    from subprocess import Popen, PIPE
-    (stdout, stderr) = Popen(['netstat','-an'], stdout=PIPE).communicate()
-    import StringIO
-    buf = StringIO.StringIO(stdout)
-    ports = buf.readlines()
-    # search for TCP - LISTEN connections
-    import re
-    regObj = re.compile( ".*tcp.*:([0-9]+).*:.*listen", re.IGNORECASE );
-    for item in ports:
-      try:
-        p = int(regObj.match(item).group(1))
-        if p == port: return True
-      except:
-        pass
-  #
-  def timeout(self):
-    return (time.time() - self.__lastChangeTime > self.__timeout)
-  #
-  def __str__(self):
-    with self.__lock:
-      return "PortManager: list of locked ports:" + str(self.__lockedPorts)
+    # write config
+    logger.debug("write busy_ports: %s"%str(config["busy_ports"]))
+    try:
+      with open(config_file, 'w') as f:
+        pickle.dump(config, f)
+    except IOError:
+      pass
+
+    # release lock
+    __release_lock(lock)
   #
+
+  os.umask(oldmask)
+  logger.debug("get port: %s"%str(port))
+  return port
 #
 
-def __build_server(ip, port, authkey):
-  message_queue = multiprocessing.Queue()
+def releasePort(port):
+  port = int(port)
+  logger.debug("RELEASE PORT (%s)"%port)
 
-  class MyManager(SyncManager):
-    pass
+  config_file, lock_file = _getConfigurationFilename()
+  with open(lock_file, 'w') as lock:
+    # acquire lock
+    __acquire_lock(lock)
 
-  MyManager.register("PortManager", PortManager, exposed=['getPort', 'releasePort', 'isBusy', 'isPortUsed', 'timeout', '__str__'])
-  MyManager.register("get_message_queue", callable=lambda: message_queue)
+    # read config
+    config = {'busy_ports':[]}
+    logger.debug("read configuration file")
+    try:
+      with open(config_file, 'r') as f:
+        config = pickle.load(f)
+    except IOError: # empty file
+      pass
 
-  manager = MyManager(address=(ip, port), authkey=authkey)
+    logger.debug("load busy_ports: %s"%str(config["busy_ports"]))
 
-  try:
-    manager.get_server()
-    manager.start()
-  except (EOFError, socket.error):
-    print 'Server already started on %s:%s'%(ip,port)
-    sys.exit(1)
+    # remove port from list
+    busy_ports = config["busy_ports"]
 
-  return manager
-#
+    if port in busy_ports:
+      busy_ports.remove(port)
+      config["busy_ports"] = busy_ports
 
-def __build_client(ip, port, authkey):
-  class MyManager(SyncManager):
-    pass
+    # write config
+    logger.debug("write busy_ports: %s"%str(config["busy_ports"]))
+    try:
+      with open(config_file, 'w') as f:
+        pickle.dump(config, f)
+    except IOError:
+      pass
 
-  MyManager.register("PortManager")
-  MyManager.register("get_message_queue")
+    # release lock
+    __release_lock(lock)
 
-  manager = MyManager(address=(ip, port), authkey=authkey)
-  try:
-    manager.connect()
-  except socket.error:
-    raise Exception("Unable to connect to server on %s:%s"%(ip, port))
-  return manager
+    logger.debug("released port port: %s"%str(port))
 #
 
-def __run_server(ip, port, authkey, timeout):
-  theserver = __build_server(ip, port, authkey)
-  shared_mesq = theserver.get_message_queue()
-  print 'PortManager server started on %s:%s'%(ip,port)
-  portManager = theserver.PortManager(timeout=timeout)
+def getBusyPorts():
+  config_file, lock_file = _getConfigurationFilename()
+  with open(lock_file, 'w') as lock:
+    # acquire lock
+    __acquire_lock(lock)
 
-  while portManager.isBusy() or not portManager.timeout():
+    # read config
+    config = {'busy_ports':[]}
+    logger.debug("read configuration file")
     try:
-      message = shared_mesq.get(block=False)
-      print message
-    except Queue.Empty:
+      with open(config_file, 'r') as f:
+        config = pickle.load(f)
+    except IOError: # empty file
       pass
 
-  print "PortManager server is shuting down..."
-  time.sleep(2)
-  theserver.shutdown()
-#
-
-#
-semaphore = None
-#
-def __run_client(manager, execute_func):
-  with semaphore:
-    name = multiprocessing.current_process().name
-    processId = os.getpid()
-
-    # do the job
-    if execute_func:
-      execute_func(manager, name, processId)
-#
+    logger.debug("load busy_ports: %s"%str(config["busy_ports"]))
 
-#
-local_ip = '127.0.0.1'
-port = 5000
-authkey = 'salome_port_manager_access'
-#
-lock = multiprocessing.Lock()
-def start_server(nbSimultaneous=10, timeout=10):
-  with lock:
-    procServer = multiprocessing.Process(target=__run_server, args=(local_ip,port,authkey,timeout,))
-    procServer.start()
-    global semaphore
-    semaphore = multiprocessing.Semaphore(nbSimultaneous)
-    time.sleep(2)
-#
-
-def start_client(ip=local_ip, execute_func=None, name="anonymous"):
-  manager = __build_client(ip, port, authkey)
-  p = multiprocessing.Process(target=__run_client, name=name, args=(manager,execute_func,))
-  p.start()
-  return manager
-#
+    busy_ports = config["busy_ports"]
+    # release lock
+    __release_lock(lock)
 
-client_id = 0
-def start_clients(nbClients, ip, execute_func, name_prefix="Client"):
-  global client_id
-  for i in range(nbClients):
-    start_client(ip, execute_func, name=name_prefix+"_"+str(client_id))
-    client_id = client_id + 1
+    return busy_ports
 #