2 # -*- coding: iso-8859-1 -*-
3 # Copyright (C) 2007-2013 CEA/DEN, EDF R&D, OPEN CASCADE
5 # Copyright (C) 2003-2007 OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
6 # CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
8 # This library is free software; you can redistribute it and/or
9 # modify it under the terms of the GNU Lesser General Public
10 # License as published by the Free Software Foundation; either
11 # version 2.1 of the License.
13 # This library is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 # Lesser General Public License for more details.
18 # You should have received a copy of the GNU Lesser General Public
19 # License along with this library; if not, write to the Free Software
20 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
22 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
24 from Singleton import Singleton
26 from multiprocessing.managers import SyncManager
27 import multiprocessing
35 This class handles port distribution for SALOME sessions.
36 In case of concurent sessions, each will have its own port number.
38 class PortManager(object): # :TODO: must manage lock owner
39 __metaclass__ = Singleton
41 def __init__(self, startNumber = 2810, limit = 100, timeout=60):
42 super(PortManager, self).__init__()
43 self.__startNumber = startNumber
44 self.__limit = startNumber + limit
45 self.__lockedPorts = []
46 self.__lock = multiprocessing.Lock()
47 self.__timeout = timeout
48 self.__lastChangeTime = time.time()
52 port = self.__startNumber
53 while self.isPortUsed(port):
54 if port == self.__limit:
56 msg += "Can't find a free port to launch omniNames\n"
57 msg += "Try to kill the running servers and then launch SALOME again.\n"
58 raise RuntimeError, msg
61 self.__lockedPorts.append(port)
62 self.__lastChangeTime = time.time()
65 def releasePort(self, port):
67 if port in self.__lockedPorts:
68 self.__lockedPorts.remove(port)
69 self.__lastChangeTime = time.time()
72 return len(self.__lockedPorts)
74 def isPortUsed(self, port):
75 return (port in self.__lockedPorts) or self.__isNetworkConnectionActiveOnPort(port)
77 def __isNetworkConnectionActiveOnPort(self, port):
78 # :NOTE: Under windows:
79 # netstat options -l and -t are unavailable
80 # grep command is unavailable
81 from subprocess import Popen, PIPE
82 (stdout, stderr) = Popen(['netstat','-an'], stdout=PIPE).communicate()
84 buf = StringIO.StringIO(stdout)
85 ports = buf.readlines()
86 # search for TCP - LISTEN connections
88 regObj = re.compile( ".*tcp.*:([0-9]+).*:.*listen", re.IGNORECASE );
91 p = int(regObj.match(item).group(1))
92 if p == port: return True
97 return (time.time() - self.__lastChangeTime > self.__timeout)
101 return "PortManager: list of locked ports:" + str(self.__lockedPorts)
105 def __build_server(ip, port, authkey):
106 message_queue = multiprocessing.Queue()
108 class MyManager(SyncManager):
111 MyManager.register("PortManager", PortManager, exposed=['getPort', 'releasePort', 'isBusy', 'isPortUsed', 'timeout', '__str__'])
112 MyManager.register("get_message_queue", callable=lambda: message_queue)
114 manager = MyManager(address=(ip, port), authkey=authkey)
119 except (EOFError, socket.error):
120 print 'Server already started on %s:%s'%(ip,port)
126 def __build_client(ip, port, authkey):
127 class MyManager(SyncManager):
130 MyManager.register("PortManager")
131 MyManager.register("get_message_queue")
133 manager = MyManager(address=(ip, port), authkey=authkey)
137 raise Exception("Unable to connect to server on %s:%s"%(ip, port))
141 def __run_server(ip, port, authkey, timeout):
142 theserver = __build_server(ip, port, authkey)
143 shared_mesq = theserver.get_message_queue()
144 print 'PortManager server started on %s:%s'%(ip,port)
145 portManager = theserver.PortManager(timeout=timeout)
147 while portManager.isBusy() or not portManager.timeout():
149 message = shared_mesq.get(block=False)
154 print "PortManager server is shuting down..."
162 def __run_client(manager, execute_func):
164 name = multiprocessing.current_process().name
165 processId = os.getpid()
169 execute_func(manager, name, processId)
173 local_ip = '127.0.0.1'
175 authkey = 'salome_port_manager_access'
177 lock = multiprocessing.Lock()
178 def start_server(nbSimultaneous=10, timeout=10):
180 procServer = multiprocessing.Process(target=__run_server, args=(local_ip,port,authkey,timeout,))
183 semaphore = multiprocessing.Semaphore(nbSimultaneous)
187 def start_client(ip=local_ip, execute_func=None, name="anonymous"):
188 manager = __build_client(ip, port, authkey)
189 p = multiprocessing.Process(target=__run_client, name=name, args=(manager,execute_func,))
195 def start_clients(nbClients, ip, execute_func, name_prefix="Client"):
197 for i in range(nbClients):
198 start_client(ip, execute_func, name=name_prefix+"_"+str(client_id))
199 client_id = client_id + 1