From: aguerre Date: Thu, 3 Oct 2013 15:50:41 +0000 (+0000) Subject: New approach for PortManager X-Git-Url: http://git.salome-platform.org/gitweb/?a=commitdiff_plain;h=9817903f2ee7b0a92e68fa1441412f1c80bb5f56;p=modules%2Fyacs.git New approach for PortManager --- diff --git a/bin/PortManager.py b/bin/PortManager.py index 6c4ecb596..cd9664b51 100644 --- a/bin/PortManager.py +++ b/bin/PortManager.py @@ -23,23 +23,37 @@ # from Singleton import Singleton -from multiprocessing.managers import SyncManager import multiprocessing import time import socket -import Queue + import sys -import os +import threading +import SocketServer + +try: + import cPickle as pickle +except: + import pickle + +import struct +import ctypes + +if sys.platform == 'win32': + import multiprocessing.reduction # make sockets pickable/inheritable + +multiprocessing.freeze_support() # Add support for when a program which uses multiprocessing has been frozen to produce a Windows executable. +#ignore = multiprocessing.active_children() # cleanup any old processes """ This class handles port distribution for SALOME sessions. In case of concurent sessions, each will have its own port number. """ -class PortManager(object): # :TODO: must manage lock owner +class _PortManager(object): # :TODO: must manage lock owner __metaclass__ = Singleton # def __init__(self, startNumber = 2810, limit = 100, timeout=60): - super(PortManager, self).__init__() + super(_PortManager, self).__init__() self.__startNumber = startNumber self.__limit = startNumber + limit self.__lockedPorts = [] @@ -47,17 +61,19 @@ class PortManager(object): # :TODO: must manage lock owner self.__timeout = timeout self.__lastChangeTime = time.time() # - def getPort(self): + # Test for prefered port number, if asked. + def getPort(self, port=None): with self.__lock: - port = self.__startNumber - while self.isPortUsed(port): - if port == self.__limit: - msg = "\n" - msg += "Can't find a free port to launch omniNames\n" - msg += "Try to kill the running servers and then launch SALOME again.\n" - raise RuntimeError, msg - port = port + 1 - + if not port or self.isPortUsed(port): + port = self.__startNumber + while self.isPortUsed(port): + if port == self.__limit: + msg = "\n" + msg += "Can't find a free port to launch omniNames\n" + msg += "Try to kill the running servers and then launch SALOME again.\n" + raise RuntimeError, msg + port = port + 1 + # self.__lockedPorts.append(port) self.__lastChangeTime = time.time() return port @@ -98,103 +114,149 @@ class PortManager(object): # :TODO: must manage lock owner # def __str__(self): with self.__lock: - return "PortManager: list of locked ports:" + str(self.__lockedPorts) + return "_PortManager: list of locked ports:" + str(sorted(self.__lockedPorts)) # # -def __build_server(ip, port, authkey): - message_queue = multiprocessing.Queue() - - class MyManager(SyncManager): - pass +#------------------------------------ +# Communication methods - MyManager.register("PortManager", PortManager, exposed=['getPort', 'releasePort', 'isBusy', 'isPortUsed', 'timeout', '__str__']) - MyManager.register("get_message_queue", callable=lambda: message_queue) +_marshall = pickle.dumps +_unmarshall = pickle.loads - manager = MyManager(address=(ip, port), authkey=authkey) +def _send(channel, *args): + buf = _marshall(args) + value = socket.htonl(len(buf)) + size = struct.pack("L",value) + channel.send(size) + channel.send(buf) +# +def _receive(channel): + size = struct.calcsize("L") + size = channel.recv(size) try: - manager.get_server() - manager.start() - except (EOFError, socket.error): - print 'Server already started on %s:%s'%(ip,port) - sys.exit(1) + size = socket.ntohl(struct.unpack("L", size)[0]) + except struct.error, e: + return '' - return manager + buf = "" + while len(buf) < size: + buf = channel.recv(size - len(buf)) + + return _unmarshall(buf)[0] # +#------------------------------------ -def __build_client(ip, port, authkey): - class MyManager(SyncManager): - pass +GET_PORT_MSG = "GET_PORT" +GET_PREFERED_PORT_MSG = "GET_PREFERED_PORT" +RELEASE_PORT_MSG = "RELEASE_PORT" +STOP_SERVER_MSG = "STOP_SERVER" - MyManager.register("PortManager") - MyManager.register("get_message_queue") +GET_PORT_ACK_MSG = "GET_PORT" +RELEASE_PORT_ACK_MSG = "RELEASE_PORT" - manager = MyManager(address=(ip, port), authkey=authkey) - try: - manager.connect() - except socket.error: - raise Exception("Unable to connect to server on %s:%s"%(ip, port)) - return manager +class _ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler): + def handle(self): + data = _receive(self.request) + if data == GET_PORT_MSG: + pm = _PortManager() + port = pm.getPort() + response = "%s: %s" % (GET_PORT_ACK_MSG, port) + _send(self.request, response) + elif data.startswith(GET_PREFERED_PORT_MSG): + port = int(data[len(GET_PREFERED_PORT_MSG)+1:]) + pm = _PortManager() + port = pm.getPort(port) + response = "%s: %s" % (GET_PORT_ACK_MSG, port) + _send(self.request, response) + elif data.startswith(RELEASE_PORT_MSG): + port = int(data[len(RELEASE_PORT_MSG)+1:]) + pm = _PortManager() + pm.releasePort(port) + response = "%s" % (RELEASE_PORT_ACK_MSG) + _send(self.request, response) + #print "RELEASE_PORT:", port + if not pm.isBusy(): + #print "Close server" + self.server.shutdown() + #print pm + elif data == STOP_SERVER_MSG: + #print "Close server" + self.server.shutdown() # -def __run_server(ip, port, authkey, timeout): - theserver = __build_server(ip, port, authkey) - shared_mesq = theserver.get_message_queue() - print 'PortManager server started on %s:%s'%(ip,port) - portManager = theserver.PortManager(timeout=timeout) - - while portManager.isBusy() or not portManager.timeout(): - try: - message = shared_mesq.get(block=False) - print message - except Queue.Empty: - pass +class _ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer): + pass - print "PortManager server is shuting down..." - time.sleep(2) - theserver.shutdown() +def __getServer(address): + SocketServer.ThreadingTCPServer.allow_reuse_address = True # can be restarted immediately + server = _ThreadedTCPServer(address, _ThreadedTCPRequestHandler, False) # Do not automatically bind + server.allow_reuse_address = True # Prevent 'cannot bind to address' errors on restart + return server # -# -semaphore = None -# -def __run_client(manager, execute_func): - with semaphore: - name = multiprocessing.current_process().name - processId = os.getpid() +pm_address = ('localhost', 12345) +def __startServer(): + try: + server = __getServer(pm_address) + server.server_bind() # Manually bind, to support allow_reuse_address + server.server_activate() + address = server.server_address - # do the job - if execute_func: - execute_func(manager, name, processId) + # Start a thread with the server -- that thread will then start one + # more thread for each request + server_thread = threading.Thread(target=server.serve_forever, name="SALOME_PortManager") + # Exit the server thread when the main thread terminates + #server_thread.setDaemon(True) + server_thread.start() + #print "Server loop running in thread:", server_thread.getName() + #print "Server address:", address + #return address + except: + #print "Server already started" + #print "Server address:", pm_address + #return pm_address + pass # +def __newClient(address, message): + try: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect(address) + _send(sock, message) + response = _receive(sock) + if response.startswith(GET_PORT_ACK_MSG): + port = int(response[len(GET_PORT_ACK_MSG)+1:]) + #print "GET_PORT:", port + return port + elif response == RELEASE_PORT_ACK_MSG: + #print "Received: %s" % response + return 0 + pass + sock.close() + except socket.error: + #print "Unable to connect to server" + return -1 # -local_ip = '127.0.0.1' -port = 5000 -authkey = 'salome_port_manager_access' -# -lock = multiprocessing.Lock() -def start_server(nbSimultaneous=10, timeout=10): - with lock: - procServer = multiprocessing.Process(target=__run_server, args=(local_ip,port,authkey,timeout,)) - procServer.start() - global semaphore - semaphore = multiprocessing.Semaphore(nbSimultaneous) - time.sleep(2) + +def getPort(preferedPort=None): + if preferedPort: + return __newClient(pm_address, "%s: %s"%(GET_PREFERED_PORT_MSG,preferedPort)) + else: + return __newClient(pm_address, GET_PORT_MSG) # -def start_client(ip=local_ip, execute_func=None, name="anonymous"): - manager = __build_client(ip, port, authkey) - p = multiprocessing.Process(target=__run_client, name=name, args=(manager,execute_func,)) - p.start() - return manager +def releasePort(port): + __newClient(pm_address, "%s: %s"%(RELEASE_PORT_MSG,port)) # -client_id = 0 -def start_clients(nbClients, ip, execute_func, name_prefix="Client"): - global client_id - for i in range(nbClients): - start_client(ip, execute_func, name=name_prefix+"_"+str(client_id)) - client_id = client_id + 1 +def stopServer(): + __newClient(pm_address, STOP_SERVER_MSG) # + +# Auto start: unique instance ; no effect if called multiple times +__startServer() +#server_thread = threading.Thread(target=__startServer, name="SALOME_PortManager") +#server_thread.setDaemon(True) +#server_thread.start() diff --git a/bin/appliskel/README b/bin/appliskel/README index 4650c6011..39feb0052 100644 --- a/bin/appliskel/README +++ b/bin/appliskel/README @@ -44,26 +44,27 @@ User run scripts The SALOME user can use the following scripts: -runAppli +runAppli [DEPRECATED] Launches a SALOME Session (similar to ${KERNEL_ROOT_DIR}/bin/salome/runSalome but with a different name to avoid confusions). -runSession +runSession [DEPRECATED] Launches a shell script in the SALOME application environment, with access to the current SALOME session (naming service), if any. Without arguments, the script is interactive. With arguments, the script executes the command in the SALOME application environment. -runConsole +runConsole [DEPRECATED] Gives a python console connected to the current SALOME Session. It is also possible to use runSession, then python. -runTests - Similar to runSession, used for unit testing. runTests defines a new - configuration for naming service (new port number) to avoid interferences - with a running SALOME session. runSession tries to use an already existing - naming service definition from a running session (hostname & port number). +salome [NEW] + A single Python command to start SALOME: + salome [start]: replace runAppli + salome shell: replace runSession + salome console: replace runConsole + SALOME internal run scripts --------------------------- @@ -74,9 +75,6 @@ envd getAppliPath.py Used by other scripts to define the Application Path. -searchFreePort.sh - Used by other scripts to find a free port for naming service. - For remote calls, SALOME uses one script. runRemote.sh @@ -123,6 +121,6 @@ configGUI.sh export SalomeAppConfig=${HOME}/${APPLI} - where SalomeAppConfig designates the directory containing SalomeApp.xml. - Note that ${APPLI} is already defined by the calling scripts when + where SalomeAppConfig designates the directory containing SalomeApp.xml. + Note that ${APPLI} is already defined by the calling scripts when env.d/configGUI.sh is sourced. diff --git a/bin/appliskel/runAppli b/bin/appliskel/runAppli index 984706190..08fb373c0 100755 --- a/bin/appliskel/runAppli +++ b/bin/appliskel/runAppli @@ -25,8 +25,8 @@ ############################################### ############### IMPORTANT NOTE ################ ############################################### -# The runConsole script is obsolete. # -# Please consider the new salome.py launcher. # +# The runAppli script is obsolete. # +# Please consider the new salome launcher. # ############################################### diff --git a/bin/appliskel/runConsole b/bin/appliskel/runConsole index 9667e7e89..80a4bd7b4 100755 --- a/bin/appliskel/runConsole +++ b/bin/appliskel/runConsole @@ -26,7 +26,7 @@ ############### IMPORTANT NOTE ################ ############################################### # The runConsole script is obsolete. # -# Please consider the new salome.py launcher. # +# Please consider the new salome launcher. # ############################################### diff --git a/bin/appliskel/runSession b/bin/appliskel/runSession index 294cae70e..bc1772908 100755 --- a/bin/appliskel/runSession +++ b/bin/appliskel/runSession @@ -29,8 +29,8 @@ ############################################### ############### IMPORTANT NOTE ################ ############################################### -# The runConsole script is obsolete. # -# Please consider the new salome.py launcher. # +# The runSession script is obsolete. # +# Please consider the new salome launcher. # ############################################### diff --git a/bin/appliskel/tests/concurrentSession/TestMinimalExample.py b/bin/appliskel/tests/concurrentSession/TestMinimalExample.py index 7022585db..2c1462830 100644 --- a/bin/appliskel/tests/concurrentSession/TestMinimalExample.py +++ b/bin/appliskel/tests/concurrentSession/TestMinimalExample.py @@ -18,38 +18,96 @@ # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com # -from PortManager import start_server, start_clients -import time -import datetime -import random +from PortManager import getPort, releasePort, stopServer import sys +import multiprocessing import unittest -def hello(manager, name, processId): - seconds = random.randrange(3, 10) - startTime = datetime.datetime.now().time() - - portManager = manager.PortManager() - message_queue = manager.get_message_queue() - port = portManager.getPort() - message_queue.put("+ Process %s (%d) says: Hello! at %s on port %d and waits for %d seconds"%(name, processId, startTime, port, seconds)) - time.sleep(seconds) - endTime = datetime.datetime.now().time() - portManager.releasePort(port) - message_queue.put("- Process %s (%d) finished at %s --> port %d is free"%(name, processId, endTime, port)) +def port_reservation(prefered=None, test=None, expected=None): + if prefered: + port = getPort(prefered) + else: + port = getPort() + print "port = %s"%port + + if expected: + print "expected = %s"%expected + test.assertTrue(port == expected) # class TestMinimalExample(unittest.TestCase): + @classmethod + def tearDownClass(cls): + stopServer() + stopServer() # no effect + # + def testSequential(self): + print "BEGIN testSequential" + processes = [ + multiprocessing.Process(target=port_reservation) + for i in range(3) + ] + + for p in processes: + p.start() + + for p in processes: + p.join() + + # Try to get specific port number + expected = 2872 + p = multiprocessing.Process(target=port_reservation, args=(2872, self,expected,)) + p.start() + p.join() + + # Try to get specific port number + p = multiprocessing.Process(target=port_reservation, args=(2812, self,)) + p.start() + p.join() + + # Release port + p = multiprocessing.Process(target=releasePort, args=(2812,)) + p.start() + p.join() + + # Try to get specific port number + expected = 2812 + p = multiprocessing.Process(target=port_reservation, args=(2812, self,expected,)) + p.start() + p.join() + + print "END testSequential" + # def testConcurrent(self): - nbProcesses = 10 - nbSimultaneous = 4 - ip = '127.0.0.1' + print "BEGIN testConcurrent" + processes = [ + multiprocessing.Process(target=port_reservation) + for i in range(3) + ] + + # Try to get specific port number + p = multiprocessing.Process(target=port_reservation, args=(2852,)) + processes.append(p) + + # Try to get specific port number + p = multiprocessing.Process(target=port_reservation, args=(2812,)) + processes.append(p) + + # Release port + p = multiprocessing.Process(target=releasePort, args=(2812,)) + processes.append(p) + + # Try to get specific port number + p = multiprocessing.Process(target=port_reservation, args=(2812,)) + processes.append(p) + + for p in processes: + p.start() - # start server - start_server(nbSimultaneous=nbSimultaneous, timeout=5) + for p in processes: + p.join() - # start several clients - start_clients(nbProcesses, ip, hello) + print "END testConcurrent" # # diff --git a/bin/appliskel/tests/concurrentSession/testConcurrentSession.py b/bin/appliskel/tests/concurrentSession/testConcurrentSession.py index 5b857cb98..008e0b7bd 100644 --- a/bin/appliskel/tests/concurrentSession/testConcurrentSession.py +++ b/bin/appliskel/tests/concurrentSession/testConcurrentSession.py @@ -22,6 +22,11 @@ import os import sys import unittest import multiprocessing +import imp + +def unwrap_self_session(arg, **kwarg): + return TestConcurrentLaunch.session(*arg, **kwarg) +# class TestConcurrentLaunch(unittest.TestCase): @classmethod @@ -29,25 +34,32 @@ class TestConcurrentLaunch(unittest.TestCase): # Initialize path to SALOME application path_to_launcher = os.getenv("SALOME_LAUNCHER") appli_dir = os.path.dirname(path_to_launcher) - envd_dir = os.path.join(appli_dir, "env.d") + cls.envd_dir = os.path.join(appli_dir, "env.d") # Configure session startup cls.SALOME = imp.load_source("SALOME", os.path.join(appli_dir,"salome")) - cls.SALOME_args = ["shell", "--config="+envd_dir] + #cls.SALOME_args = ["shell", "--config="+cls.envd_dir] + cls.SALOME_args = ["--config="+cls.envd_dir] + # + @classmethod + def tearDownClass(cls): + args = ["killall", "--config="+cls.envd_dir] + cls.SALOME.main(args) + pass # def session(self, args=[]): self.SALOME.main(self.SALOME_args + args) # - def testSingleSession(self): + def test01_SingleSession(self): print "** Testing single session **" self.session() # - def testMultiSession(self): + def test02_MultiSession(self): print "** Testing multi sessions **" jobs = [] for i in range(3): - p = multiprocessing.Process(target=session, args=(self,)) + p = multiprocessing.Process(target=unwrap_self_session, args=([self],)) jobs.append(p) p.start() diff --git a/bin/killSalomeWithPort.py b/bin/killSalomeWithPort.py index 84774cef3..2e2bf65c8 100755 --- a/bin/killSalomeWithPort.py +++ b/bin/killSalomeWithPort.py @@ -141,6 +141,9 @@ def shutdownMyPort(port, cleanup=True): """ if not port: return + from PortManager import releasePort + releasePort(port) + from salome_utils import generateFileName # set OMNIORB_CONFIG variable to the proper file @@ -186,6 +189,9 @@ def killMyPort(port): Parameters: - port - port number """ + import PortManager + PortManager.releasePort(port) + from salome_utils import getShortHostName, getHostName # try to shutdown session nomally diff --git a/bin/launchConfigureParser.py b/bin/launchConfigureParser.py index 7043c5154..7937325cd 100755 --- a/bin/launchConfigureParser.py +++ b/bin/launchConfigureParser.py @@ -878,11 +878,11 @@ def get_env(theAdditionalOptions=[], appname=salomeappname, cfgname=salomecfgnam # Process --print-port option if cmd_opts.print_port: - import PortManager - PortManager.start_server(nbSimultaneous=25, timeout=10) from searchFreePort import searchFreePort searchFreePort({}) print "port:%s"%(os.environ['NSPORT']) + import PortManager + PortManager.releasePort(os.environ['NSPORT']) sys.exit(0) pass diff --git a/bin/runSalome b/bin/runSalome index 6c363e1e1..1662d3fe0 100755 --- a/bin/runSalome +++ b/bin/runSalome @@ -25,14 +25,14 @@ ############################################### ############### IMPORTANT NOTE ################ ############################################### -# The runConsole script is obsolete. # -# Please consider the new salome.py launcher. # +# The runSalome script is obsolete. # +# Please consider the new salome launcher. # ############################################### # ********************************************************** # This file is used by m4 files in many modules to detect SALOME KERNEL. -# Its initial purpose (starting salome) can be replaced by new salome.py command. +# Its initial purpose (starting salome) can be replaced by new salome command. # When definitively switching from autotools to Cmake, this file may be removed. # ********************************************************** diff --git a/bin/runSalome.py b/bin/runSalome.py index acb9c981f..2a3852720 100755 --- a/bin/runSalome.py +++ b/bin/runSalome.py @@ -34,7 +34,6 @@ from server import process_id, Server import json from salomeLauncherUtils import formatScriptsAndArgs import subprocess -import PortManager # ----------------------------------------------------------------------------- @@ -66,7 +65,7 @@ def givenPortKill(port): try: killMyPort(my_port) except: - print "problem in LocalPortKill(), killMyPort("<