#!/usr/bin/env python
# -*- coding: iso-8859-1 -*-
-# Copyright (C) 2007-2013 CEA/DEN, EDF R&D, OPEN CASCADE
+# Copyright (C) 2007-2017 CEA/DEN, EDF R&D, OPEN CASCADE
#
# Copyright (C) 2003-2007 OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
# CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
-# version 2.1 of the License.
+# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
#
# See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
#
-from Singleton import Singleton
-
-import multiprocessing
-import time
-import socket
-
+import os
import sys
-import threading
-import SocketServer
try:
- import cPickle as pickle
+ import cPickle as pickle #@UnusedImport
except:
- import pickle
-
-import struct
-import ctypes
-
-if sys.platform == 'win32':
- import multiprocessing.reduction # make sockets pickable/inheritable
+ import pickle #@Reimport
-multiprocessing.freeze_support() # Add support for when a program which uses multiprocessing has been frozen to produce a Windows executable.
+__PORT_MIN_NUMBER = 2810
+__PORT_MAX_NUMBER = 2910
-"""
-This class handles port distribution for SALOME sessions.
-In case of concurent sessions, each will have its own port number.
-"""
-class _PortManager(object): # :TODO: must manage lock owner
- __metaclass__ = Singleton
- #
- def __init__(self, startNumber = 2810, limit = 100, timeout=60):
- super(_PortManager, self).__init__()
- self.__startNumber = startNumber
- self.__limit = startNumber + limit
- self.__lockedPorts = []
- self.__lock = multiprocessing.Lock()
- self.__timeout = timeout
- self.__lastChangeTime = time.time()
- #
- # Test for prefered port number, if asked.
- def getPort(self, port=None):
- with self.__lock:
- if not port or self.isPortUsed(port):
- port = self.__startNumber
- while self.isPortUsed(port):
- if port == self.__limit:
- msg = "\n"
- msg += "Can't find a free port to launch omniNames\n"
- msg += "Try to kill the running servers and then launch SALOME again.\n"
- raise RuntimeError, msg
- port = port + 1
- #
- self.__lockedPorts.append(port)
- self.__lastChangeTime = time.time()
- return port
- #
- def releasePort(self, port):
- with self.__lock:
- if port in self.__lockedPorts:
- self.__lockedPorts.remove(port)
- self.__lastChangeTime = time.time()
- #
- def isBusy(self):
- return len(self.__lockedPorts)
- #
- def isPortUsed(self, port):
- return (port in self.__lockedPorts) or self.__isNetworkConnectionActiveOnPort(port)
- #
- def __isNetworkConnectionActiveOnPort(self, port):
- # :NOTE: Under windows:
- # netstat options -l and -t are unavailable
- # grep command is unavailable
- from subprocess import Popen, PIPE
- (stdout, stderr) = Popen(['netstat','-an'], stdout=PIPE).communicate()
- import StringIO
- buf = StringIO.StringIO(stdout)
- ports = buf.readlines()
- # search for TCP - LISTEN connections
- import re
- regObj = re.compile( ".*tcp.*:([0-9]+).*:.*listen", re.IGNORECASE );
- for item in ports:
- try:
- p = int(regObj.match(item).group(1))
- if p == port: return True
- except:
- pass
- #
- def timeout(self):
- return (time.time() - self.__lastChangeTime > self.__timeout)
- #
- def __str__(self):
- with self.__lock:
- return "_PortManager: list of locked ports:" + str(sorted(self.__lockedPorts))
- #
+import logging
+def createLogger():
+ logger = logging.getLogger(__name__)
+# logger.setLevel(logging.DEBUG)
+ logger.setLevel(logging.INFO)
+ ch = logging.StreamHandler()
+ ch.setLevel(logging.DEBUG)
+ formatter = logging.Formatter("%(levelname)s:%(threadName)s:%(message)s")
+ ch.setFormatter(formatter)
+ logger.addHandler(ch)
+ return logger
#
+logger = createLogger()
#------------------------------------
-# Communication methods
-
-_marshall = pickle.dumps
-_unmarshall = pickle.loads
-
-def _send(channel, *args):
- buf = _marshall(args)
- value = socket.htonl(len(buf))
- size = struct.pack("L",value)
- channel.send(size)
- channel.send(buf)
+# A file locker
+def __acquire_lock(lock):
+ logger.debug("ACQUIRE LOCK")
+ if sys.platform == "win32":
+ import msvcrt
+ # lock 1 byte: file is supposed to be zero-byte long
+ msvcrt.locking(lock.fileno(), msvcrt.LK_LOCK, 1)
+ else:
+ import fcntl
+ fcntl.flock(lock, fcntl.LOCK_EX)
+ logger.debug("LOCK ACQUIRED")
+#
+def __release_lock(lock):
+ logger.debug("RELEASE LOCK")
+ if sys.platform == "win32":
+ import msvcrt
+ msvcrt.locking(lock.fileno(), msvcrt.LK_UNLCK, 1)
+ else:
+ import fcntl
+ fcntl.flock(lock, fcntl.LOCK_UN)
+ logger.debug("LOCK RELEASED")
#
+#------------------------------------
-def _receive(channel):
- size = struct.calcsize("L")
- size = channel.recv(size)
- try:
- size = socket.ntohl(struct.unpack("L", size)[0])
- except struct.error, e:
- return ''
+def _getConfigurationFilename():
+ omniorbUserPath = os.getenv("OMNIORB_USER_PATH")
- buf = ""
- while len(buf) < size:
- buf = channel.recv(size - len(buf))
+ from salome_utils import generateFileName
+ portmanager_config = generateFileName(omniorbUserPath,
+ prefix="salome",
+ suffix="PortManager",
+ extension="cfg",
+ hidden=True)
+ import tempfile
+ temp = tempfile.NamedTemporaryFile()
+ lock_file = os.path.join(os.path.dirname(temp.name), ".salome_PortManager.lock")
+ temp.close()
- return _unmarshall(buf)[0]
+ return (portmanager_config, lock_file)
#
-#------------------------------------
-GET_PORT_MSG = "GET_PORT"
-GET_PREFERED_PORT_MSG = "GET_PREFERED_PORT"
-RELEASE_PORT_MSG = "RELEASE_PORT"
-STOP_SERVER_MSG = "STOP_SERVER"
-
-GET_PORT_ACK_MSG = "GET_PORT"
-RELEASE_PORT_ACK_MSG = "RELEASE_PORT"
-
-class _ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler):
- def handle(self):
- data = _receive(self.request)
- if data == GET_PORT_MSG:
- pm = _PortManager()
- port = pm.getPort()
- response = "%s: %s" % (GET_PORT_ACK_MSG, port)
- _send(self.request, response)
- elif data.startswith(GET_PREFERED_PORT_MSG):
- port = int(data[len(GET_PREFERED_PORT_MSG)+1:])
- pm = _PortManager()
- port = pm.getPort(port)
- response = "%s: %s" % (GET_PORT_ACK_MSG, port)
- _send(self.request, response)
- elif data.startswith(RELEASE_PORT_MSG):
- port = int(data[len(RELEASE_PORT_MSG)+1:])
- pm = _PortManager()
- pm.releasePort(port)
- response = "%s" % (RELEASE_PORT_ACK_MSG)
- _send(self.request, response)
- #print "RELEASE_PORT:", port
- if not pm.isBusy():
- #print "Close server"
- self.server.shutdown()
- #print pm
- elif data == STOP_SERVER_MSG:
- #print "Close server"
- self.server.shutdown()
+def __isPortUsed(port, config):
+ busy_ports = []
+ for ports in config.values():
+ busy_ports += ports
+ return (port in busy_ports) or __isNetworkConnectionActiveOnPort(port)
#
-class _ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
+def __isNetworkConnectionActiveOnPort(port):
+ # :NOTE: Under windows:
+ # netstat options -l and -t are unavailable
+ # grep command is unavailable
+ if sys.platform == "win32":
+ cmd = ['netstat','-a','-n','-p tcp']
+ else:
+ cmd = ['netstat','-ant']
pass
-pm_address = ('127.0.0.1', 51843)
-def __getServer(address):
- SocketServer.ThreadingTCPServer.allow_reuse_address = True # can be restarted immediately
- server = _ThreadedTCPServer(address, _ThreadedTCPRequestHandler, False) # Do not automatically bind
- server.allow_reuse_address = True # Prevent 'cannot bind to address' errors on restart
- return server
-#
-
-def __startServer():
- global pm_address
+ err = None
try:
- server = __getServer(pm_address)
- server.server_bind() # Manually bind, to support allow_reuse_address
- server.server_activate()
- pm_address = server.server_address
-
- # Start a thread with the server -- that thread will then start one
- # more thread for each request
- server_thread = threading.Thread(target=server.serve_forever, name="SALOME_PortManager")
- # Exit the server thread when the main thread terminates
- #server_thread.setDaemon(True)
- server_thread.start()
- #print "Server loop running in thread:", server_thread.getName()
- #print "Server address:", pm_address
- #return address
+ from subprocess import Popen, PIPE, STDOUT
+ p = Popen(cmd, stdout=PIPE, stderr=STDOUT)
+ out, err = p.communicate()
except:
- #print "Server already started"
- #print "Server address:", pm_address
- #return pm_address
- pass
-#
+ print "Error when trying to access active network connections."
+ if err: print err
+ import traceback
+ traceback.print_exc()
+ return False
-def __newClient(address, message):
- try:
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- #print "connect client to", address
- sock.connect(address)
- _send(sock, message)
- response = _receive(sock)
- if response.startswith(GET_PORT_ACK_MSG):
- port = int(response[len(GET_PORT_ACK_MSG)+1:])
- #print "GET_PORT:", port
- return port
- elif response == RELEASE_PORT_ACK_MSG:
- #print "Received: %s" % response
- return 0
+ import StringIO
+ buf = StringIO.StringIO(out)
+ ports = buf.readlines()
+ # search for TCP - LISTEN connections
+ import re
+ regObj = re.compile( ".*tcp.*:([0-9]+).*:.*listen", re.IGNORECASE );
+ for item in ports:
+ try:
+ p = int(regObj.match(item).group(1))
+ if p == port: return True
+ except:
pass
- sock.close()
- except socket.error:
- #print "Unable to connect to server"
- return -1
+ return False
#
def getPort(preferedPort=None):
- if preferedPort:
- return __newClient(pm_address, "%s: %s"%(GET_PREFERED_PORT_MSG,preferedPort))
- else:
- return __newClient(pm_address, GET_PORT_MSG)
+ logger.debug("GET PORT")
+
+ config_file, lock_file = _getConfigurationFilename()
+ oldmask = os.umask(0)
+ with open(lock_file, 'w') as lock:
+ # acquire lock
+ __acquire_lock(lock)
+
+ # read config
+ config = {}
+ logger.debug("read configuration file")
+ try:
+ with open(config_file, 'r') as f:
+ config = pickle.load(f)
+ except:
+ logger.info("Problem loading PortManager file: %s"%config_file)
+ # In this case config dictionary is reset
+
+ logger.debug("load config: %s"%str(config))
+ appli_path = os.getenv("ABSOLUTE_APPLI_PATH", "unknown")
+ try:
+ config[appli_path]
+ except KeyError:
+ config[appli_path] = []
+
+ # append port
+ port = preferedPort
+ if not port or __isPortUsed(port, config):
+ port = __PORT_MIN_NUMBER
+ while __isPortUsed(port, config):
+ if port == __PORT_MAX_NUMBER:
+ msg = "\n"
+ msg += "Can't find a free port to launch omniNames\n"
+ msg += "Try to kill the running servers and then launch SALOME again.\n"
+ raise RuntimeError, msg
+ logger.debug("Port %s seems to be busy"%str(port))
+ port = port + 1
+ logger.debug("found free port: %s"%str(port))
+ config[appli_path].append(port)
+
+ # write config
+ logger.debug("write config: %s"%str(config))
+ try:
+ with open(config_file, 'w') as f:
+ pickle.dump(config, f)
+ except IOError:
+ pass
+
+ # release lock
+ __release_lock(lock)
+ #
+
+ os.umask(oldmask)
+ logger.debug("get port: %s"%str(port))
+ return port
#
def releasePort(port):
- __newClient(pm_address, "%s: %s"%(RELEASE_PORT_MSG,port))
-#
+ port = int(port)
+ logger.debug("RELEASE PORT (%s)"%port)
+
+ config_file, lock_file = _getConfigurationFilename()
+ oldmask = os.umask(0)
+ with open(lock_file, 'w') as lock:
+ # acquire lock
+ __acquire_lock(lock)
+
+ # read config
+ config = {}
+ logger.debug("read configuration file")
+ try:
+ with open(config_file, 'r') as f:
+ config = pickle.load(f)
+ except IOError: # empty file
+ pass
+
+ logger.debug("load config: %s"%str(config))
+ appli_path = os.getenv("ABSOLUTE_APPLI_PATH", "unknown")
+ try:
+ config[appli_path]
+ except KeyError:
+ config[appli_path] = []
-def stopServer():
- __newClient(pm_address, STOP_SERVER_MSG)
+ # remove port from list
+ ports_info = config[appli_path]
+ config[appli_path] = [x for x in ports_info if x != port]
+
+ # write config
+ logger.debug("write config: %s"%str(config))
+ try:
+ with open(config_file, 'w') as f:
+ pickle.dump(config, f)
+ except IOError:
+ pass
+
+ # release lock
+ __release_lock(lock)
+
+ logger.debug("released port port: %s"%str(port))
+
+ os.umask(oldmask)
#
-# Auto start: unique instance ; no effect if called multiple times
-__startServer()
+def getBusyPorts():
+ config_file, lock_file = _getConfigurationFilename()
+ oldmask = os.umask(0)
+ with open(lock_file, 'w') as lock:
+ # acquire lock
+ __acquire_lock(lock)
+
+ # read config
+ config = {}
+ logger.debug("read configuration file")
+ try:
+ with open(config_file, 'r') as f:
+ config = pickle.load(f)
+ except IOError: # empty file
+ pass
+
+ logger.debug("load config: %s"%str(config))
+ appli_path = os.getenv("ABSOLUTE_APPLI_PATH", "unknown")
+ try:
+ config[appli_path]
+ except KeyError:
+ config[appli_path] = []
+
+ # Scan all possible ports to determine which ones are owned by other applications
+ ports_info = { 'this': [], 'other': [] }
+ my_busy_ports = config[appli_path]
+ for port in range(__PORT_MIN_NUMBER, __PORT_MAX_NUMBER):
+ if __isPortUsed(port, config):
+ logger.debug("Port %s seems to be busy"%str(port))
+ if port in my_busy_ports:
+ ports_info["this"].append(port)
+ else:
+ ports_info["other"].append(port)
+
+ logger.debug("all busy_ports: %s"%str(ports_info))
+
+ sorted_ports = { 'this': sorted(ports_info['this']),
+ 'other': sorted(ports_info['other']) }
+
+ # release lock
+ __release_lock(lock)
+
+ os.umask(oldmask)
+ return sorted_ports
+#