#!/usr/bin/env python
# -*- coding: iso-8859-1 -*-
-# Copyright (C) 2007-2013 CEA/DEN, EDF R&D, OPEN CASCADE
+# Copyright (C) 2007-2015 CEA/DEN, EDF R&D, OPEN CASCADE
#
# Copyright (C) 2003-2007 OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
# CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
-# version 2.1 of the License.
+# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
#
# See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
#
-from Singleton import Singleton
-
-import multiprocessing
-import time
-import socket
-
+import os
import sys
-import threading
-import SocketServer
try:
- import cPickle as pickle
+ import cPickle as pickle #@UnusedImport
except:
- import pickle
-
-import struct
-import ctypes
-
-if sys.platform == 'win32':
- import multiprocessing.reduction # make sockets pickable/inheritable
-
-multiprocessing.freeze_support() # Add support for when a program which uses multiprocessing has been frozen to produce a Windows executable.
-#ignore = multiprocessing.active_children() # cleanup any old processes
+ import pickle #@Reimport
-"""
-This class handles port distribution for SALOME sessions.
-In case of concurent sessions, each will have its own port number.
-"""
-class _PortManager(object): # :TODO: must manage lock owner
- __metaclass__ = Singleton
- #
- def __init__(self, startNumber = 2810, limit = 100, timeout=60):
- super(_PortManager, self).__init__()
- self.__startNumber = startNumber
- self.__limit = startNumber + limit
- self.__lockedPorts = []
- self.__lock = multiprocessing.Lock()
- self.__timeout = timeout
- self.__lastChangeTime = time.time()
- #
- # Test for prefered port number, if asked.
- def getPort(self, port=None):
- with self.__lock:
- if not port or self.isPortUsed(port):
- port = self.__startNumber
- while self.isPortUsed(port):
- if port == self.__limit:
- msg = "\n"
- msg += "Can't find a free port to launch omniNames\n"
- msg += "Try to kill the running servers and then launch SALOME again.\n"
- raise RuntimeError, msg
- port = port + 1
- #
- self.__lockedPorts.append(port)
- self.__lastChangeTime = time.time()
- return port
- #
- def releasePort(self, port):
- with self.__lock:
- if port in self.__lockedPorts:
- self.__lockedPorts.remove(port)
- self.__lastChangeTime = time.time()
- #
- def isBusy(self):
- return len(self.__lockedPorts)
- #
- def isPortUsed(self, port):
- return (port in self.__lockedPorts) or self.__isNetworkConnectionActiveOnPort(port)
- #
- def __isNetworkConnectionActiveOnPort(self, port):
- # :NOTE: Under windows:
- # netstat options -l and -t are unavailable
- # grep command is unavailable
- from subprocess import Popen, PIPE
- (stdout, stderr) = Popen(['netstat','-an'], stdout=PIPE).communicate()
- import StringIO
- buf = StringIO.StringIO(stdout)
- ports = buf.readlines()
- # search for TCP - LISTEN connections
- import re
- regObj = re.compile( ".*tcp.*:([0-9]+).*:.*listen", re.IGNORECASE );
- for item in ports:
- try:
- p = int(regObj.match(item).group(1))
- if p == port: return True
- except:
- pass
- #
- def timeout(self):
- return (time.time() - self.__lastChangeTime > self.__timeout)
- #
- def __str__(self):
- with self.__lock:
- return "_PortManager: list of locked ports:" + str(sorted(self.__lockedPorts))
- #
+import logging
+def createLogger():
+ logger = logging.getLogger(__name__)
+# logger.setLevel(logging.DEBUG)
+ ch = logging.StreamHandler()
+ ch.setLevel(logging.DEBUG)
+ formatter = logging.Formatter("%(levelname)s:%(threadName)s:%(message)s")
+ ch.setFormatter(formatter)
+ logger.addHandler(ch)
+ return logger
#
+logger = createLogger()
#------------------------------------
-# Communication methods
-
-_marshall = pickle.dumps
-_unmarshall = pickle.loads
-
-def _send(channel, *args):
- buf = _marshall(args)
- value = socket.htonl(len(buf))
- size = struct.pack("L",value)
- channel.send(size)
- channel.send(buf)
+# A file locker (Linux only)
+def __acquire_lock(lock):
+ if sys.platform == "win32":
+ import msvcrt
+ # lock 1 byte: file is supposed to be zero-byte long
+ msvcrt.locking(lock.fileno(), msvcrt.LK_LOCK, 1)
+ else:
+ import fcntl
+ fcntl.flock(lock, fcntl.LOCK_EX)
+#
+def __release_lock(lock):
+ if sys.platform == "win32":
+ import msvcrt
+ msvcrt.locking(lock.fileno(), msvcrt.LK_UNLCK, 1)
+ else:
+ import fcntl
+ fcntl.flock(lock, fcntl.LOCK_UN)
+#
+
+def _getConfigurationFilename():
+ omniorbUserPath = os.getenv("OMNIORB_USER_PATH")
+
+ from salome_utils import generateFileName
+ portmanager_config = generateFileName(omniorbUserPath,
+ prefix="omniORB",
+ suffix="PortManager",
+ extension="cfg",
+ hidden=True)
+ import tempfile
+ temp = tempfile.NamedTemporaryFile()
+ lock_file = os.path.join(os.path.dirname(temp.name), ".omniORB_PortManager.lock")
+ temp.close()
+
+ return (portmanager_config, lock_file)
+#
+
+def __isPortUsed(port, busy_ports):
+ return (port in busy_ports) or __isNetworkConnectionActiveOnPort(port)
+#
+
+def __isNetworkConnectionActiveOnPort(port):
+ # :NOTE: Under windows:
+ # netstat options -l and -t are unavailable
+ # grep command is unavailable
+ from subprocess import Popen, PIPE
+ stdout, _ = Popen(['netstat','-an'], stdout=PIPE).communicate()
+ import StringIO
+ buf = StringIO.StringIO(stdout)
+ ports = buf.readlines()
+ # search for TCP - LISTEN connections
+ import re
+ regObj = re.compile( ".*tcp.*:([0-9]+).*:.*listen", re.IGNORECASE );
+ for item in ports:
+ try:
+ p = int(regObj.match(item).group(1))
+ if p == port: return True
+ except:
+ pass
#
-def _receive(channel):
- size = struct.calcsize("L")
- size = channel.recv(size)
- try:
- size = socket.ntohl(struct.unpack("L", size)[0])
- except struct.error, e:
- return ''
+def getPort(preferedPort=None):
+ logger.debug("GET PORT")
+
+ config_file, lock_file = _getConfigurationFilename()
+ oldmask = os.umask(0)
+ with open(lock_file, 'w') as lock:
+ # acquire lock
+ __acquire_lock(lock)
+
+ # read config
+ config = {'busy_ports':[]}
+ logger.debug("read configuration file")
+ try:
+ with open(config_file, 'r') as f:
+ config = pickle.load(f)
+ except IOError: # empty file
+ pass
- buf = ""
- while len(buf) < size:
- buf = channel.recv(size - len(buf))
+ logger.debug("load busy_ports: %s"%str(config["busy_ports"]))
+
+ # append port
+ busy_ports = config["busy_ports"]
+ port = preferedPort
+ if not port or __isPortUsed(port, busy_ports):
+ port = 2810
+ while __isPortUsed(port, busy_ports):
+ if port == 2810+100:
+ msg = "\n"
+ msg += "Can't find a free port to launch omniNames\n"
+ msg += "Try to kill the running servers and then launch SALOME again.\n"
+ raise RuntimeError, msg
+ port = port + 1
+ logger.debug("found free port: %s"%str(port))
+ config["busy_ports"].append(port)
+
+ # write config
+ logger.debug("write busy_ports: %s"%str(config["busy_ports"]))
+ try:
+ with open(config_file, 'w') as f:
+ pickle.dump(config, f)
+ except IOError:
+ pass
+
+ # release lock
+ __release_lock(lock)
+ #
- return _unmarshall(buf)[0]
+ os.umask(oldmask)
+ logger.debug("get port: %s"%str(port))
+ return port
#
-#------------------------------------
-GET_PORT_MSG = "GET_PORT"
-GET_PREFERED_PORT_MSG = "GET_PREFERED_PORT"
-RELEASE_PORT_MSG = "RELEASE_PORT"
-STOP_SERVER_MSG = "STOP_SERVER"
+def releasePort(port):
+ port = int(port)
+ logger.debug("RELEASE PORT (%s)"%port)
+
+ config_file, lock_file = _getConfigurationFilename()
+ with open(lock_file, 'w') as lock:
+ # acquire lock
+ __acquire_lock(lock)
+
+ # read config
+ config = {'busy_ports':[]}
+ logger.debug("read configuration file")
+ try:
+ with open(config_file, 'r') as f:
+ config = pickle.load(f)
+ except IOError: # empty file
+ pass
-GET_PORT_ACK_MSG = "GET_PORT"
-RELEASE_PORT_ACK_MSG = "RELEASE_PORT"
+ logger.debug("load busy_ports: %s"%str(config["busy_ports"]))
-class _ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler):
- def handle(self):
- data = _receive(self.request)
- if data == GET_PORT_MSG:
- pm = _PortManager()
- port = pm.getPort()
- response = "%s: %s" % (GET_PORT_ACK_MSG, port)
- _send(self.request, response)
- elif data.startswith(GET_PREFERED_PORT_MSG):
- port = int(data[len(GET_PREFERED_PORT_MSG)+1:])
- pm = _PortManager()
- port = pm.getPort(port)
- response = "%s: %s" % (GET_PORT_ACK_MSG, port)
- _send(self.request, response)
- elif data.startswith(RELEASE_PORT_MSG):
- port = int(data[len(RELEASE_PORT_MSG)+1:])
- pm = _PortManager()
- pm.releasePort(port)
- response = "%s" % (RELEASE_PORT_ACK_MSG)
- _send(self.request, response)
- #print "RELEASE_PORT:", port
- if not pm.isBusy():
- #print "Close server"
- self.server.shutdown()
- #print pm
- elif data == STOP_SERVER_MSG:
- #print "Close server"
- self.server.shutdown()
-#
+ # remove port from list
+ busy_ports = config["busy_ports"]
-class _ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
- pass
+ if port in busy_ports:
+ busy_ports.remove(port)
+ config["busy_ports"] = busy_ports
-def __getServer(address):
- SocketServer.ThreadingTCPServer.allow_reuse_address = True # can be restarted immediately
- server = _ThreadedTCPServer(address, _ThreadedTCPRequestHandler, False) # Do not automatically bind
- server.allow_reuse_address = True # Prevent 'cannot bind to address' errors on restart
- return server
-#
+ # write config
+ logger.debug("write busy_ports: %s"%str(config["busy_ports"]))
+ try:
+ with open(config_file, 'w') as f:
+ pickle.dump(config, f)
+ except IOError:
+ pass
-pm_address = ('localhost', 12345)
-def __startServer():
- try:
- server = __getServer(pm_address)
- server.server_bind() # Manually bind, to support allow_reuse_address
- server.server_activate()
- address = server.server_address
+ # release lock
+ __release_lock(lock)
- # Start a thread with the server -- that thread will then start one
- # more thread for each request
- server_thread = threading.Thread(target=server.serve_forever, name="SALOME_PortManager")
- # Exit the server thread when the main thread terminates
- #server_thread.setDaemon(True)
- server_thread.start()
- #print "Server loop running in thread:", server_thread.getName()
- #print "Server address:", address
- #return address
- except:
- #print "Server already started"
- #print "Server address:", pm_address
- #return pm_address
- pass
+ logger.debug("released port port: %s"%str(port))
#
-def __newClient(address, message):
- try:
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.connect(address)
- _send(sock, message)
- response = _receive(sock)
- if response.startswith(GET_PORT_ACK_MSG):
- port = int(response[len(GET_PORT_ACK_MSG)+1:])
- #print "GET_PORT:", port
- return port
- elif response == RELEASE_PORT_ACK_MSG:
- #print "Received: %s" % response
- return 0
+def getBusyPorts():
+ config_file, lock_file = _getConfigurationFilename()
+ with open(lock_file, 'w') as lock:
+ # acquire lock
+ __acquire_lock(lock)
+
+ # read config
+ config = {'busy_ports':[]}
+ logger.debug("read configuration file")
+ try:
+ with open(config_file, 'r') as f:
+ config = pickle.load(f)
+ except IOError: # empty file
pass
- sock.close()
- except socket.error:
- #print "Unable to connect to server"
- return -1
-#
-def getPort(preferedPort=None):
- if preferedPort:
- return __newClient(pm_address, "%s: %s"%(GET_PREFERED_PORT_MSG,preferedPort))
- else:
- return __newClient(pm_address, GET_PORT_MSG)
-#
+ logger.debug("load busy_ports: %s"%str(config["busy_ports"]))
-def releasePort(port):
- __newClient(pm_address, "%s: %s"%(RELEASE_PORT_MSG,port))
-#
+ busy_ports = config["busy_ports"]
+ # release lock
+ __release_lock(lock)
-def stopServer():
- __newClient(pm_address, STOP_SERVER_MSG)
+ return busy_ports
#
-
-# Auto start: unique instance ; no effect if called multiple times
-__startServer()
-#server_thread = threading.Thread(target=__startServer, name="SALOME_PortManager")
-#server_thread.setDaemon(True)
-#server_thread.start()