2 # -*- coding: iso-8859-1 -*-
3 # Copyright (C) 2007-2013 CEA/DEN, EDF R&D, OPEN CASCADE
5 # Copyright (C) 2003-2007 OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
6 # CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
8 # This library is free software; you can redistribute it and/or
9 # modify it under the terms of the GNU Lesser General Public
10 # License as published by the Free Software Foundation; either
11 # version 2.1 of the License.
13 # This library is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 # Lesser General Public License for more details.
18 # You should have received a copy of the GNU Lesser General Public
19 # License along with this library; if not, write to the Free Software
20 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
22 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
24 from Singleton import Singleton
26 import multiprocessing
35 import cPickle as pickle
42 if sys.platform == 'win32':
43 import multiprocessing.reduction # make sockets pickable/inheritable
45 multiprocessing.freeze_support() # Add support for when a program which uses multiprocessing has been frozen to produce a Windows executable.
46 #ignore = multiprocessing.active_children() # cleanup any old processes
49 This class handles port distribution for SALOME sessions.
50 In case of concurent sessions, each will have its own port number.
52 class _PortManager(object): # :TODO: must manage lock owner
53 __metaclass__ = Singleton
55 def __init__(self, startNumber = 2810, limit = 100, timeout=60):
56 super(_PortManager, self).__init__()
57 self.__startNumber = startNumber
58 self.__limit = startNumber + limit
59 self.__lockedPorts = []
60 self.__lock = multiprocessing.Lock()
61 self.__timeout = timeout
62 self.__lastChangeTime = time.time()
64 # Test for prefered port number, if asked.
65 def getPort(self, port=None):
67 if not port or self.isPortUsed(port):
68 port = self.__startNumber
69 while self.isPortUsed(port):
70 if port == self.__limit:
72 msg += "Can't find a free port to launch omniNames\n"
73 msg += "Try to kill the running servers and then launch SALOME again.\n"
74 raise RuntimeError, msg
77 self.__lockedPorts.append(port)
78 self.__lastChangeTime = time.time()
81 def releasePort(self, port):
83 if port in self.__lockedPorts:
84 self.__lockedPorts.remove(port)
85 self.__lastChangeTime = time.time()
88 return len(self.__lockedPorts)
90 def isPortUsed(self, port):
91 return (port in self.__lockedPorts) or self.__isNetworkConnectionActiveOnPort(port)
93 def __isNetworkConnectionActiveOnPort(self, port):
94 # :NOTE: Under windows:
95 # netstat options -l and -t are unavailable
96 # grep command is unavailable
97 from subprocess import Popen, PIPE
98 (stdout, stderr) = Popen(['netstat','-an'], stdout=PIPE).communicate()
100 buf = StringIO.StringIO(stdout)
101 ports = buf.readlines()
102 # search for TCP - LISTEN connections
104 regObj = re.compile( ".*tcp.*:([0-9]+).*:.*listen", re.IGNORECASE );
107 p = int(regObj.match(item).group(1))
108 if p == port: return True
113 return (time.time() - self.__lastChangeTime > self.__timeout)
117 return "_PortManager: list of locked ports:" + str(sorted(self.__lockedPorts))
121 #------------------------------------
122 # Communication methods
124 _marshall = pickle.dumps
125 _unmarshall = pickle.loads
127 def _send(channel, *args):
128 buf = _marshall(args)
129 value = socket.htonl(len(buf))
130 size = struct.pack("L",value)
135 def _receive(channel):
136 size = struct.calcsize("L")
137 size = channel.recv(size)
139 size = socket.ntohl(struct.unpack("L", size)[0])
140 except struct.error, e:
144 while len(buf) < size:
145 buf = channel.recv(size - len(buf))
147 return _unmarshall(buf)[0]
149 #------------------------------------
151 GET_PORT_MSG = "GET_PORT"
152 GET_PREFERED_PORT_MSG = "GET_PREFERED_PORT"
153 RELEASE_PORT_MSG = "RELEASE_PORT"
154 STOP_SERVER_MSG = "STOP_SERVER"
156 GET_PORT_ACK_MSG = "GET_PORT"
157 RELEASE_PORT_ACK_MSG = "RELEASE_PORT"
159 class _ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler):
161 data = _receive(self.request)
162 if data == GET_PORT_MSG:
165 response = "%s: %s" % (GET_PORT_ACK_MSG, port)
166 _send(self.request, response)
167 elif data.startswith(GET_PREFERED_PORT_MSG):
168 port = int(data[len(GET_PREFERED_PORT_MSG)+1:])
170 port = pm.getPort(port)
171 response = "%s: %s" % (GET_PORT_ACK_MSG, port)
172 _send(self.request, response)
173 elif data.startswith(RELEASE_PORT_MSG):
174 port = int(data[len(RELEASE_PORT_MSG)+1:])
177 response = "%s" % (RELEASE_PORT_ACK_MSG)
178 _send(self.request, response)
179 #print "RELEASE_PORT:", port
181 #print "Close server"
182 self.server.shutdown()
184 elif data == STOP_SERVER_MSG:
185 #print "Close server"
186 self.server.shutdown()
189 class _ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
192 def __getServer(address):
193 SocketServer.ThreadingTCPServer.allow_reuse_address = True # can be restarted immediately
194 server = _ThreadedTCPServer(address, _ThreadedTCPRequestHandler, False) # Do not automatically bind
195 server.allow_reuse_address = True # Prevent 'cannot bind to address' errors on restart
199 pm_address = ('localhost', 12345)
202 server = __getServer(pm_address)
203 server.server_bind() # Manually bind, to support allow_reuse_address
204 server.server_activate()
205 address = server.server_address
207 # Start a thread with the server -- that thread will then start one
208 # more thread for each request
209 server_thread = threading.Thread(target=server.serve_forever, name="SALOME_PortManager")
210 # Exit the server thread when the main thread terminates
211 #server_thread.setDaemon(True)
212 server_thread.start()
213 #print "Server loop running in thread:", server_thread.getName()
214 #print "Server address:", address
217 #print "Server already started"
218 #print "Server address:", pm_address
223 def __newClient(address, message):
225 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
226 sock.connect(address)
228 response = _receive(sock)
229 if response.startswith(GET_PORT_ACK_MSG):
230 port = int(response[len(GET_PORT_ACK_MSG)+1:])
231 #print "GET_PORT:", port
233 elif response == RELEASE_PORT_ACK_MSG:
234 #print "Received: %s" % response
239 #print "Unable to connect to server"
243 def getPort(preferedPort=None):
245 return __newClient(pm_address, "%s: %s"%(GET_PREFERED_PORT_MSG,preferedPort))
247 return __newClient(pm_address, GET_PORT_MSG)
250 def releasePort(port):
251 __newClient(pm_address, "%s: %s"%(RELEASE_PORT_MSG,port))
255 __newClient(pm_address, STOP_SERVER_MSG)
258 # Auto start: unique instance ; no effect if called multiple times
260 #server_thread = threading.Thread(target=__startServer, name="SALOME_PortManager")
261 #server_thread.setDaemon(True)
262 #server_thread.start()