#!/usr/bin/env python
# -*- coding: iso-8859-1 -*-
-# Copyright (C) 2007-2013 CEA/DEN, EDF R&D, OPEN CASCADE
+# Copyright (C) 2007-2017 CEA/DEN, EDF R&D, OPEN CASCADE
#
# Copyright (C) 2003-2007 OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
# CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
-# version 2.1 of the License.
+# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
#
# See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
#
-from Singleton import Singleton
-
-from multiprocessing.managers import SyncManager
-import multiprocessing
-import time
-import socket
-import Queue
-import sys
import os
+import sys
-"""
-This class handles port distribution for SALOME sessions.
-In case of concurent sessions, each will have its own port number.
-"""
-class PortManager(object): # :TODO: must manage lock owner
- __metaclass__ = Singleton
- #
- def __init__(self, startNumber = 2810, limit = 100, timeout=60):
- super(PortManager, self).__init__()
- self.__startNumber = startNumber
- self.__limit = startNumber + limit
- self.__lockedPorts = []
- self.__lock = multiprocessing.Lock()
- self.__timeout = timeout
- self.__lastChangeTime = time.time()
- #
- def getPort(self):
- with self.__lock:
- port = self.__startNumber
- while self.isPortUsed(port):
- if port == self.__limit:
+try:
+ import cPickle as pickle #@UnusedImport
+except:
+ import pickle #@Reimport
+
+__PORT_MIN_NUMBER = 2810
+__PORT_MAX_NUMBER = 2910
+
+import logging
+def createLogger():
+ logger = logging.getLogger(__name__)
+# logger.setLevel(logging.DEBUG)
+ logger.setLevel(logging.INFO)
+ ch = logging.StreamHandler()
+ ch.setLevel(logging.DEBUG)
+ formatter = logging.Formatter("%(levelname)s:%(threadName)s:%(message)s")
+ ch.setFormatter(formatter)
+ logger.addHandler(ch)
+ return logger
+#
+logger = createLogger()
+
+#------------------------------------
+# A file locker
+def __acquire_lock(lock):
+ logger.debug("ACQUIRE LOCK")
+ if sys.platform == "win32":
+ import msvcrt
+ # lock 1 byte: file is supposed to be zero-byte long
+ msvcrt.locking(lock.fileno(), msvcrt.LK_LOCK, 1)
+ else:
+ import fcntl
+ fcntl.flock(lock, fcntl.LOCK_EX)
+ logger.debug("LOCK ACQUIRED")
+#
+def __release_lock(lock):
+ logger.debug("RELEASE LOCK")
+ if sys.platform == "win32":
+ import msvcrt
+ msvcrt.locking(lock.fileno(), msvcrt.LK_UNLCK, 1)
+ else:
+ import fcntl
+ fcntl.flock(lock, fcntl.LOCK_UN)
+ logger.debug("LOCK RELEASED")
+#
+#------------------------------------
+
+def _getConfigurationFilename():
+ omniorbUserPath = os.getenv("OMNIORB_USER_PATH")
+
+ from salome_utils import generateFileName
+ portmanager_config = generateFileName(omniorbUserPath,
+ prefix="salome",
+ suffix="PortManager",
+ extension="cfg",
+ hidden=True)
+ import tempfile
+ temp = tempfile.NamedTemporaryFile()
+ lock_file = os.path.join(os.path.dirname(temp.name), ".salome_PortManager.lock")
+ temp.close()
+
+ return (portmanager_config, lock_file)
+#
+
+def __isPortUsed(port, config):
+ busy_ports = []
+ for ports in config.values():
+ busy_ports += ports
+ return (port in busy_ports) or __isNetworkConnectionActiveOnPort(port)
+#
+
+def __isNetworkConnectionActiveOnPort(port):
+ # :NOTE: Under windows:
+ # netstat options -l and -t are unavailable
+ # grep command is unavailable
+ if sys.platform == "win32":
+ cmd = ['netstat','-a','-n','-p tcp']
+ else:
+ cmd = ['netstat','-ant']
+ pass
+
+ err = None
+ try:
+ from subprocess import Popen, PIPE, STDOUT
+ p = Popen(cmd, stdout=PIPE, stderr=STDOUT)
+ out, err = p.communicate()
+ except:
+ print "Error when trying to access active network connections."
+ if err: print err
+ import traceback
+ traceback.print_exc()
+ return False
+
+ import StringIO
+ buf = StringIO.StringIO(out)
+ ports = buf.readlines()
+ # search for TCP - LISTEN connections
+ import re
+ regObj = re.compile( ".*tcp.*:([0-9]+).*:.*listen", re.IGNORECASE );
+ for item in ports:
+ try:
+ p = int(regObj.match(item).group(1))
+ if p == port: return True
+ except:
+ pass
+ return False
+#
+
+def getPort(preferedPort=None):
+ logger.debug("GET PORT")
+
+ config_file, lock_file = _getConfigurationFilename()
+ oldmask = os.umask(0)
+ with open(lock_file, 'w') as lock:
+ # acquire lock
+ __acquire_lock(lock)
+
+ # read config
+ config = {}
+ logger.debug("read configuration file")
+ try:
+ with open(config_file, 'r') as f:
+ config = pickle.load(f)
+ except:
+ logger.info("Problem loading PortManager file: %s"%config_file)
+ # In this case config dictionary is reset
+
+ logger.debug("load config: %s"%str(config))
+ appli_path = os.getenv("ABSOLUTE_APPLI_PATH", "unknown")
+ try:
+ config[appli_path]
+ except KeyError:
+ config[appli_path] = []
+
+ # append port
+ port = preferedPort
+ if not port or __isPortUsed(port, config):
+ port = __PORT_MIN_NUMBER
+ while __isPortUsed(port, config):
+ if port == __PORT_MAX_NUMBER:
msg = "\n"
msg += "Can't find a free port to launch omniNames\n"
msg += "Try to kill the running servers and then launch SALOME again.\n"
raise RuntimeError, msg
+ logger.debug("Port %s seems to be busy"%str(port))
port = port + 1
+ logger.debug("found free port: %s"%str(port))
+ config[appli_path].append(port)
- self.__lockedPorts.append(port)
- self.__lastChangeTime = time.time()
- return port
- #
- def releasePort(self, port):
- with self.__lock:
- if port in self.__lockedPorts:
- self.__lockedPorts.remove(port)
- self.__lastChangeTime = time.time()
- #
- def isBusy(self):
- return len(self.__lockedPorts)
- #
- def isPortUsed(self, port):
- return (port in self.__lockedPorts) or self.__isNetworkConnectionActiveOnPort(port)
- #
- def __isNetworkConnectionActiveOnPort(self, port):
- # :NOTE: Under windows:
- # netstat options -l and -t are unavailable
- # grep command is unavailable
- from subprocess import Popen, PIPE
- (stdout, stderr) = Popen(['netstat','-an'], stdout=PIPE).communicate()
- import StringIO
- buf = StringIO.StringIO(stdout)
- ports = buf.readlines()
- # search for TCP - LISTEN connections
- import re
- regObj = re.compile( ".*tcp.*:([0-9]+).*:.*listen", re.IGNORECASE );
- for item in ports:
- try:
- p = int(regObj.match(item).group(1))
- if p == port: return True
- except:
- pass
- #
- def timeout(self):
- return (time.time() - self.__lastChangeTime > self.__timeout)
- #
- def __str__(self):
- with self.__lock:
- return "PortManager: list of locked ports:" + str(self.__lockedPorts)
+ # write config
+ logger.debug("write config: %s"%str(config))
+ try:
+ with open(config_file, 'w') as f:
+ pickle.dump(config, f)
+ except IOError:
+ pass
+
+ # release lock
+ __release_lock(lock)
#
+
+ os.umask(oldmask)
+ logger.debug("get port: %s"%str(port))
+ return port
#
-def __build_server(ip, port, authkey):
- message_queue = multiprocessing.Queue()
+def releasePort(port):
+ port = int(port)
+ logger.debug("RELEASE PORT (%s)"%port)
- class MyManager(SyncManager):
- pass
+ config_file, lock_file = _getConfigurationFilename()
+ oldmask = os.umask(0)
+ with open(lock_file, 'w') as lock:
+ # acquire lock
+ __acquire_lock(lock)
- MyManager.register("PortManager", PortManager, exposed=['getPort', 'releasePort', 'isBusy', 'isPortUsed', 'timeout', '__str__'])
- MyManager.register("get_message_queue", callable=lambda: message_queue)
+ # read config
+ config = {}
+ logger.debug("read configuration file")
+ try:
+ with open(config_file, 'r') as f:
+ config = pickle.load(f)
+ except IOError: # empty file
+ pass
- manager = MyManager(address=(ip, port), authkey=authkey)
+ logger.debug("load config: %s"%str(config))
+ appli_path = os.getenv("ABSOLUTE_APPLI_PATH", "unknown")
+ try:
+ config[appli_path]
+ except KeyError:
+ config[appli_path] = []
- try:
- manager.get_server()
- manager.start()
- except (EOFError, socket.error):
- print 'Server already started on %s:%s'%(ip,port)
- sys.exit(1)
+ # remove port from list
+ ports_info = config[appli_path]
+ config[appli_path] = [x for x in ports_info if x != port]
- return manager
-#
+ # write config
+ logger.debug("write config: %s"%str(config))
+ try:
+ with open(config_file, 'w') as f:
+ pickle.dump(config, f)
+ except IOError:
+ pass
-def __build_client(ip, port, authkey):
- class MyManager(SyncManager):
- pass
+ # release lock
+ __release_lock(lock)
- MyManager.register("PortManager")
- MyManager.register("get_message_queue")
+ logger.debug("released port port: %s"%str(port))
- manager = MyManager(address=(ip, port), authkey=authkey)
- try:
- manager.connect()
- except socket.error:
- raise Exception("Unable to connect to server on %s:%s"%(ip, port))
- return manager
+ os.umask(oldmask)
#
-def __run_server(ip, port, authkey, timeout):
- theserver = __build_server(ip, port, authkey)
- shared_mesq = theserver.get_message_queue()
- print 'PortManager server started on %s:%s'%(ip,port)
- portManager = theserver.PortManager(timeout=timeout)
+def getBusyPorts():
+ config_file, lock_file = _getConfigurationFilename()
+ oldmask = os.umask(0)
+ with open(lock_file, 'w') as lock:
+ # acquire lock
+ __acquire_lock(lock)
- while portManager.isBusy() or not portManager.timeout():
+ # read config
+ config = {}
+ logger.debug("read configuration file")
try:
- message = shared_mesq.get(block=False)
- print message
- except Queue.Empty:
+ with open(config_file, 'r') as f:
+ config = pickle.load(f)
+ except IOError: # empty file
pass
- print "PortManager server is shuting down..."
- time.sleep(2)
- theserver.shutdown()
-#
+ logger.debug("load config: %s"%str(config))
+ appli_path = os.getenv("ABSOLUTE_APPLI_PATH", "unknown")
+ try:
+ config[appli_path]
+ except KeyError:
+ config[appli_path] = []
-#
-semaphore = None
-#
-def __run_client(manager, execute_func):
- with semaphore:
- name = multiprocessing.current_process().name
- processId = os.getpid()
-
- # do the job
- if execute_func:
- execute_func(manager, name, processId)
-#
+ # Scan all possible ports to determine which ones are owned by other applications
+ ports_info = { 'this': [], 'other': [] }
+ my_busy_ports = config[appli_path]
+ for port in range(__PORT_MIN_NUMBER, __PORT_MAX_NUMBER):
+ if __isPortUsed(port, config):
+ logger.debug("Port %s seems to be busy"%str(port))
+ if port in my_busy_ports:
+ ports_info["this"].append(port)
+ else:
+ ports_info["other"].append(port)
-#
-local_ip = '127.0.0.1'
-port = 5000
-authkey = 'salome_port_manager_access'
-#
-lock = multiprocessing.Lock()
-def start_server(nbSimultaneous=10, timeout=10):
- with lock:
- procServer = multiprocessing.Process(target=__run_server, args=(local_ip,port,authkey,timeout,))
- procServer.start()
- global semaphore
- semaphore = multiprocessing.Semaphore(nbSimultaneous)
- time.sleep(2)
-#
+ logger.debug("all busy_ports: %s"%str(ports_info))
-def start_client(ip=local_ip, execute_func=None, name="anonymous"):
- manager = __build_client(ip, port, authkey)
- p = multiprocessing.Process(target=__run_client, name=name, args=(manager,execute_func,))
- p.start()
- return manager
-#
+ sorted_ports = { 'this': sorted(ports_info['this']),
+ 'other': sorted(ports_info['other']) }
+
+ # release lock
+ __release_lock(lock)
-client_id = 0
-def start_clients(nbClients, ip, execute_func, name_prefix="Client"):
- global client_id
- for i in range(nbClients):
- start_client(ip, execute_func, name=name_prefix+"_"+str(client_id))
- client_id = client_id + 1
+ os.umask(oldmask)
+ return sorted_ports
#