#
from Singleton import Singleton
-from multiprocessing.managers import SyncManager
import multiprocessing
import time
import socket
-import Queue
+
import sys
-import os
+import threading
+import SocketServer
+
+try:
+ import cPickle as pickle
+except:
+ import pickle
+
+import struct
+import ctypes
+
+if sys.platform == 'win32':
+ import multiprocessing.reduction # make sockets pickable/inheritable
+
+multiprocessing.freeze_support() # Add support for when a program which uses multiprocessing has been frozen to produce a Windows executable.
+#ignore = multiprocessing.active_children() # cleanup any old processes
"""
This class handles port distribution for SALOME sessions.
In case of concurent sessions, each will have its own port number.
"""
-class PortManager(object): # :TODO: must manage lock owner
+class _PortManager(object): # :TODO: must manage lock owner
__metaclass__ = Singleton
#
def __init__(self, startNumber = 2810, limit = 100, timeout=60):
- super(PortManager, self).__init__()
+ super(_PortManager, self).__init__()
self.__startNumber = startNumber
self.__limit = startNumber + limit
self.__lockedPorts = []
self.__timeout = timeout
self.__lastChangeTime = time.time()
#
- def getPort(self):
+ # Test for prefered port number, if asked.
+ def getPort(self, port=None):
with self.__lock:
- port = self.__startNumber
- while self.isPortUsed(port):
- if port == self.__limit:
- msg = "\n"
- msg += "Can't find a free port to launch omniNames\n"
- msg += "Try to kill the running servers and then launch SALOME again.\n"
- raise RuntimeError, msg
- port = port + 1
-
+ if not port or self.isPortUsed(port):
+ port = self.__startNumber
+ while self.isPortUsed(port):
+ if port == self.__limit:
+ msg = "\n"
+ msg += "Can't find a free port to launch omniNames\n"
+ msg += "Try to kill the running servers and then launch SALOME again.\n"
+ raise RuntimeError, msg
+ port = port + 1
+ #
self.__lockedPorts.append(port)
self.__lastChangeTime = time.time()
return port
#
def __str__(self):
with self.__lock:
- return "PortManager: list of locked ports:" + str(self.__lockedPorts)
+ return "_PortManager: list of locked ports:" + str(sorted(self.__lockedPorts))
#
#
-def __build_server(ip, port, authkey):
- message_queue = multiprocessing.Queue()
-
- class MyManager(SyncManager):
- pass
+#------------------------------------
+# Communication methods
- MyManager.register("PortManager", PortManager, exposed=['getPort', 'releasePort', 'isBusy', 'isPortUsed', 'timeout', '__str__'])
- MyManager.register("get_message_queue", callable=lambda: message_queue)
+_marshall = pickle.dumps
+_unmarshall = pickle.loads
- manager = MyManager(address=(ip, port), authkey=authkey)
+def _send(channel, *args):
+ buf = _marshall(args)
+ value = socket.htonl(len(buf))
+ size = struct.pack("L",value)
+ channel.send(size)
+ channel.send(buf)
+#
+def _receive(channel):
+ size = struct.calcsize("L")
+ size = channel.recv(size)
try:
- manager.get_server()
- manager.start()
- except (EOFError, socket.error):
- print 'Server already started on %s:%s'%(ip,port)
- sys.exit(1)
+ size = socket.ntohl(struct.unpack("L", size)[0])
+ except struct.error, e:
+ return ''
- return manager
+ buf = ""
+ while len(buf) < size:
+ buf = channel.recv(size - len(buf))
+
+ return _unmarshall(buf)[0]
#
+#------------------------------------
-def __build_client(ip, port, authkey):
- class MyManager(SyncManager):
- pass
+GET_PORT_MSG = "GET_PORT"
+GET_PREFERED_PORT_MSG = "GET_PREFERED_PORT"
+RELEASE_PORT_MSG = "RELEASE_PORT"
+STOP_SERVER_MSG = "STOP_SERVER"
- MyManager.register("PortManager")
- MyManager.register("get_message_queue")
+GET_PORT_ACK_MSG = "GET_PORT"
+RELEASE_PORT_ACK_MSG = "RELEASE_PORT"
- manager = MyManager(address=(ip, port), authkey=authkey)
- try:
- manager.connect()
- except socket.error:
- raise Exception("Unable to connect to server on %s:%s"%(ip, port))
- return manager
+class _ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler):
+ def handle(self):
+ data = _receive(self.request)
+ if data == GET_PORT_MSG:
+ pm = _PortManager()
+ port = pm.getPort()
+ response = "%s: %s" % (GET_PORT_ACK_MSG, port)
+ _send(self.request, response)
+ elif data.startswith(GET_PREFERED_PORT_MSG):
+ port = int(data[len(GET_PREFERED_PORT_MSG)+1:])
+ pm = _PortManager()
+ port = pm.getPort(port)
+ response = "%s: %s" % (GET_PORT_ACK_MSG, port)
+ _send(self.request, response)
+ elif data.startswith(RELEASE_PORT_MSG):
+ port = int(data[len(RELEASE_PORT_MSG)+1:])
+ pm = _PortManager()
+ pm.releasePort(port)
+ response = "%s" % (RELEASE_PORT_ACK_MSG)
+ _send(self.request, response)
+ #print "RELEASE_PORT:", port
+ if not pm.isBusy():
+ #print "Close server"
+ self.server.shutdown()
+ #print pm
+ elif data == STOP_SERVER_MSG:
+ #print "Close server"
+ self.server.shutdown()
#
-def __run_server(ip, port, authkey, timeout):
- theserver = __build_server(ip, port, authkey)
- shared_mesq = theserver.get_message_queue()
- print 'PortManager server started on %s:%s'%(ip,port)
- portManager = theserver.PortManager(timeout=timeout)
-
- while portManager.isBusy() or not portManager.timeout():
- try:
- message = shared_mesq.get(block=False)
- print message
- except Queue.Empty:
- pass
+class _ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
+ pass
- print "PortManager server is shuting down..."
- time.sleep(2)
- theserver.shutdown()
+def __getServer(address):
+ SocketServer.ThreadingTCPServer.allow_reuse_address = True # can be restarted immediately
+ server = _ThreadedTCPServer(address, _ThreadedTCPRequestHandler, False) # Do not automatically bind
+ server.allow_reuse_address = True # Prevent 'cannot bind to address' errors on restart
+ return server
#
-#
-semaphore = None
-#
-def __run_client(manager, execute_func):
- with semaphore:
- name = multiprocessing.current_process().name
- processId = os.getpid()
+pm_address = ('localhost', 12345)
+def __startServer():
+ try:
+ server = __getServer(pm_address)
+ server.server_bind() # Manually bind, to support allow_reuse_address
+ server.server_activate()
+ address = server.server_address
- # do the job
- if execute_func:
- execute_func(manager, name, processId)
+ # Start a thread with the server -- that thread will then start one
+ # more thread for each request
+ server_thread = threading.Thread(target=server.serve_forever, name="SALOME_PortManager")
+ # Exit the server thread when the main thread terminates
+ #server_thread.setDaemon(True)
+ server_thread.start()
+ #print "Server loop running in thread:", server_thread.getName()
+ #print "Server address:", address
+ #return address
+ except:
+ #print "Server already started"
+ #print "Server address:", pm_address
+ #return pm_address
+ pass
#
+def __newClient(address, message):
+ try:
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.connect(address)
+ _send(sock, message)
+ response = _receive(sock)
+ if response.startswith(GET_PORT_ACK_MSG):
+ port = int(response[len(GET_PORT_ACK_MSG)+1:])
+ #print "GET_PORT:", port
+ return port
+ elif response == RELEASE_PORT_ACK_MSG:
+ #print "Received: %s" % response
+ return 0
+ pass
+ sock.close()
+ except socket.error:
+ #print "Unable to connect to server"
+ return -1
#
-local_ip = '127.0.0.1'
-port = 5000
-authkey = 'salome_port_manager_access'
-#
-lock = multiprocessing.Lock()
-def start_server(nbSimultaneous=10, timeout=10):
- with lock:
- procServer = multiprocessing.Process(target=__run_server, args=(local_ip,port,authkey,timeout,))
- procServer.start()
- global semaphore
- semaphore = multiprocessing.Semaphore(nbSimultaneous)
- time.sleep(2)
+
+def getPort(preferedPort=None):
+ if preferedPort:
+ return __newClient(pm_address, "%s: %s"%(GET_PREFERED_PORT_MSG,preferedPort))
+ else:
+ return __newClient(pm_address, GET_PORT_MSG)
#
-def start_client(ip=local_ip, execute_func=None, name="anonymous"):
- manager = __build_client(ip, port, authkey)
- p = multiprocessing.Process(target=__run_client, name=name, args=(manager,execute_func,))
- p.start()
- return manager
+def releasePort(port):
+ __newClient(pm_address, "%s: %s"%(RELEASE_PORT_MSG,port))
#
-client_id = 0
-def start_clients(nbClients, ip, execute_func, name_prefix="Client"):
- global client_id
- for i in range(nbClients):
- start_client(ip, execute_func, name=name_prefix+"_"+str(client_id))
- client_id = client_id + 1
+def stopServer():
+ __newClient(pm_address, STOP_SERVER_MSG)
#
+
+# Auto start: unique instance ; no effect if called multiple times
+__startServer()
+#server_thread = threading.Thread(target=__startServer, name="SALOME_PortManager")
+#server_thread.setDaemon(True)
+#server_thread.start()
The SALOME user can use the following scripts:
-runAppli
+runAppli [DEPRECATED]
Launches a SALOME Session
(similar to ${KERNEL_ROOT_DIR}/bin/salome/runSalome but with a different
name to avoid confusions).
-runSession
+runSession [DEPRECATED]
Launches a shell script in the SALOME application environment, with access
to the current SALOME session (naming service), if any.
Without arguments, the script is interactive. With arguments, the script
executes the command in the SALOME application environment.
-runConsole
+runConsole [DEPRECATED]
Gives a python console connected to the current SALOME Session.
It is also possible to use runSession, then python.
-runTests
- Similar to runSession, used for unit testing. runTests defines a new
- configuration for naming service (new port number) to avoid interferences
- with a running SALOME session. runSession tries to use an already existing
- naming service definition from a running session (hostname & port number).
+salome [NEW]
+ A single Python command to start SALOME:
+ salome [start]: replace runAppli
+ salome shell: replace runSession
+ salome console: replace runConsole
+
SALOME internal run scripts
---------------------------
getAppliPath.py
Used by other scripts to define the Application Path.
-searchFreePort.sh
- Used by other scripts to find a free port for naming service.
-
For remote calls, SALOME uses one script.
runRemote.sh
export SalomeAppConfig=${HOME}/${APPLI}
- where SalomeAppConfig designates the directory containing SalomeApp.xml.
- Note that ${APPLI} is already defined by the calling scripts when
+ where SalomeAppConfig designates the directory containing SalomeApp.xml.
+ Note that ${APPLI} is already defined by the calling scripts when
env.d/configGUI.sh is sourced.
###############################################
############### IMPORTANT NOTE ################
###############################################
-# The runConsole script is obsolete. #
-# Please consider the new salome.py launcher. #
+# The runAppli script is obsolete. #
+# Please consider the new salome launcher. #
###############################################
############### IMPORTANT NOTE ################
###############################################
# The runConsole script is obsolete. #
-# Please consider the new salome.py launcher. #
+# Please consider the new salome launcher. #
###############################################
###############################################
############### IMPORTANT NOTE ################
###############################################
-# The runConsole script is obsolete. #
-# Please consider the new salome.py launcher. #
+# The runSession script is obsolete. #
+# Please consider the new salome launcher. #
###############################################
# See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
#
-from PortManager import start_server, start_clients
-import time
-import datetime
-import random
+from PortManager import getPort, releasePort, stopServer
import sys
+import multiprocessing
import unittest
-def hello(manager, name, processId):
- seconds = random.randrange(3, 10)
- startTime = datetime.datetime.now().time()
-
- portManager = manager.PortManager()
- message_queue = manager.get_message_queue()
- port = portManager.getPort()
- message_queue.put("+ Process %s (%d) says: Hello! at %s on port %d and waits for %d seconds"%(name, processId, startTime, port, seconds))
- time.sleep(seconds)
- endTime = datetime.datetime.now().time()
- portManager.releasePort(port)
- message_queue.put("- Process %s (%d) finished at %s --> port %d is free"%(name, processId, endTime, port))
+def port_reservation(prefered=None, test=None, expected=None):
+ if prefered:
+ port = getPort(prefered)
+ else:
+ port = getPort()
+ print "port = %s"%port
+
+ if expected:
+ print "expected = %s"%expected
+ test.assertTrue(port == expected)
#
class TestMinimalExample(unittest.TestCase):
+ @classmethod
+ def tearDownClass(cls):
+ stopServer()
+ stopServer() # no effect
+ #
+ def testSequential(self):
+ print "BEGIN testSequential"
+ processes = [
+ multiprocessing.Process(target=port_reservation)
+ for i in range(3)
+ ]
+
+ for p in processes:
+ p.start()
+
+ for p in processes:
+ p.join()
+
+ # Try to get specific port number
+ expected = 2872
+ p = multiprocessing.Process(target=port_reservation, args=(2872, self,expected,))
+ p.start()
+ p.join()
+
+ # Try to get specific port number
+ p = multiprocessing.Process(target=port_reservation, args=(2812, self,))
+ p.start()
+ p.join()
+
+ # Release port
+ p = multiprocessing.Process(target=releasePort, args=(2812,))
+ p.start()
+ p.join()
+
+ # Try to get specific port number
+ expected = 2812
+ p = multiprocessing.Process(target=port_reservation, args=(2812, self,expected,))
+ p.start()
+ p.join()
+
+ print "END testSequential"
+ #
def testConcurrent(self):
- nbProcesses = 10
- nbSimultaneous = 4
- ip = '127.0.0.1'
+ print "BEGIN testConcurrent"
+ processes = [
+ multiprocessing.Process(target=port_reservation)
+ for i in range(3)
+ ]
+
+ # Try to get specific port number
+ p = multiprocessing.Process(target=port_reservation, args=(2852,))
+ processes.append(p)
+
+ # Try to get specific port number
+ p = multiprocessing.Process(target=port_reservation, args=(2812,))
+ processes.append(p)
+
+ # Release port
+ p = multiprocessing.Process(target=releasePort, args=(2812,))
+ processes.append(p)
+
+ # Try to get specific port number
+ p = multiprocessing.Process(target=port_reservation, args=(2812,))
+ processes.append(p)
+
+ for p in processes:
+ p.start()
- # start server
- start_server(nbSimultaneous=nbSimultaneous, timeout=5)
+ for p in processes:
+ p.join()
- # start several clients
- start_clients(nbProcesses, ip, hello)
+ print "END testConcurrent"
#
#
import sys
import unittest
import multiprocessing
+import imp
+
+def unwrap_self_session(arg, **kwarg):
+ return TestConcurrentLaunch.session(*arg, **kwarg)
+#
class TestConcurrentLaunch(unittest.TestCase):
@classmethod
# Initialize path to SALOME application
path_to_launcher = os.getenv("SALOME_LAUNCHER")
appli_dir = os.path.dirname(path_to_launcher)
- envd_dir = os.path.join(appli_dir, "env.d")
+ cls.envd_dir = os.path.join(appli_dir, "env.d")
# Configure session startup
cls.SALOME = imp.load_source("SALOME", os.path.join(appli_dir,"salome"))
- cls.SALOME_args = ["shell", "--config="+envd_dir]
+ #cls.SALOME_args = ["shell", "--config="+cls.envd_dir]
+ cls.SALOME_args = ["--config="+cls.envd_dir]
+ #
+ @classmethod
+ def tearDownClass(cls):
+ args = ["killall", "--config="+cls.envd_dir]
+ cls.SALOME.main(args)
+ pass
#
def session(self, args=[]):
self.SALOME.main(self.SALOME_args + args)
#
- def testSingleSession(self):
+ def test01_SingleSession(self):
print "** Testing single session **"
self.session()
#
- def testMultiSession(self):
+ def test02_MultiSession(self):
print "** Testing multi sessions **"
jobs = []
for i in range(3):
- p = multiprocessing.Process(target=session, args=(self,))
+ p = multiprocessing.Process(target=unwrap_self_session, args=([self],))
jobs.append(p)
p.start()
"""
if not port: return
+ from PortManager import releasePort
+ releasePort(port)
+
from salome_utils import generateFileName
# set OMNIORB_CONFIG variable to the proper file
Parameters:
- port - port number
"""
+ import PortManager
+ PortManager.releasePort(port)
+
from salome_utils import getShortHostName, getHostName
# try to shutdown session nomally
# Process --print-port option
if cmd_opts.print_port:
- import PortManager
- PortManager.start_server(nbSimultaneous=25, timeout=10)
from searchFreePort import searchFreePort
searchFreePort({})
print "port:%s"%(os.environ['NSPORT'])
+ import PortManager
+ PortManager.releasePort(os.environ['NSPORT'])
sys.exit(0)
pass
###############################################
############### IMPORTANT NOTE ################
###############################################
-# The runConsole script is obsolete. #
-# Please consider the new salome.py launcher. #
+# The runSalome script is obsolete. #
+# Please consider the new salome launcher. #
###############################################
# **********************************************************
# This file is used by m4 files in many modules to detect SALOME KERNEL.
-# Its initial purpose (starting salome) can be replaced by new salome.py command.
+# Its initial purpose (starting salome) can be replaced by new salome command.
# When definitively switching from autotools to Cmake, this file may be removed.
# **********************************************************
import json
from salomeLauncherUtils import formatScriptsAndArgs
import subprocess
-import PortManager
# -----------------------------------------------------------------------------
try:
killMyPort(my_port)
except:
- print "problem in LocalPortKill(), killMyPort("<<port<<")"
+ print "problem in LocalPortKill(), killMyPort(%s)"%port
pass
pass
args, modules_list, modules_root_dir = pickle.load(fenv)
fenv.close()
kill_salome(args)
- PortManager.start_server(nbSimultaneous=25, timeout=10)
from searchFreePort import searchFreePort
searchFreePort(args, 0)
clt = useSalome(args, modules_list, modules_root_dir)
test = False
pass
if test:
- PortManager.start_server(nbSimultaneous=25, timeout=10)
from searchFreePort import searchFreePort
searchFreePort(args, save_config, args.get('useport'))
pass
import sys
import string
import socket
-import PortManager
_session = None
def startSession(modules=[]):
global _session
if _session: return
- PortManager.start_server(nbSimultaneous=25, timeout=10)
from searchFreePort import searchFreePort
searchFreePort()
_session = SalomeSession(modules)
Returns first found free port number.
"""
- import PortManager
- client = PortManager.start_client() # :NOTE: might specify a (remote) IP
- portManager = client.PortManager()
+ #import PortManager
+ #client = PortManager.start_client() # :NOTE: might specify a (remote) IP
+ #portManager = client.PortManager()
+ from PortManager import getPort
+ port = getPort(use_port)
+
+ #PortManager.getPort()
+ #PortManager.test_value()
if use_port:
print "Check if port can be used: %d" % use_port,
- if not portManager.isPortUsed(use_port):
+ #if not portManager.isPortUsed(use_port):
+ if port == use_port:
print "- OK"
__setup_config(use_port, args, save_config)
return
pass
#
print "Searching for a free port for naming service:",
- port = portManager.getPort()
+ #port = portManager.getPort()
print "%s - OK"%(port)
__setup_config(port, args, save_config)
- port = portManager.releasePort(port)
+ #port = portManager.releasePort(port)
#