2 # -*- coding: iso-8859-1 -*-
3 # Copyright (C) 2007-2013 CEA/DEN, EDF R&D, OPEN CASCADE
5 # Copyright (C) 2003-2007 OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
6 # CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
8 # This library is free software; you can redistribute it and/or
9 # modify it under the terms of the GNU Lesser General Public
10 # License as published by the Free Software Foundation; either
11 # version 2.1 of the License.
13 # This library is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 # Lesser General Public License for more details.
18 # You should have received a copy of the GNU Lesser General Public
19 # License along with this library; if not, write to the Free Software
20 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
22 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
24 from Singleton import Singleton
26 import multiprocessing
35 import cPickle as pickle
42 if sys.platform == 'win32':
43 import multiprocessing.reduction # make sockets pickable/inheritable
45 multiprocessing.freeze_support() # Add support for when a program which uses multiprocessing has been frozen to produce a Windows executable.
48 This class handles port distribution for SALOME sessions.
49 In case of concurent sessions, each will have its own port number.
51 class _PortManager(object): # :TODO: must manage lock owner
52 __metaclass__ = Singleton
54 def __init__(self, startNumber = 2810, limit = 100, timeout=60):
55 super(_PortManager, self).__init__()
56 self.__startNumber = startNumber
57 self.__limit = startNumber + limit
58 self.__lockedPorts = []
59 self.__lock = multiprocessing.Lock()
60 self.__timeout = timeout
61 self.__lastChangeTime = time.time()
63 # Test for prefered port number, if asked.
64 def getPort(self, port=None):
66 if not port or self.isPortUsed(port):
67 port = self.__startNumber
68 while self.isPortUsed(port):
69 if port == self.__limit:
71 msg += "Can't find a free port to launch omniNames\n"
72 msg += "Try to kill the running servers and then launch SALOME again.\n"
73 raise RuntimeError, msg
76 self.__lockedPorts.append(port)
77 self.__lastChangeTime = time.time()
80 def releasePort(self, port):
82 if port in self.__lockedPorts:
83 self.__lockedPorts.remove(port)
84 self.__lastChangeTime = time.time()
87 return len(self.__lockedPorts)
89 def isPortUsed(self, port):
90 return (port in self.__lockedPorts) or self.__isNetworkConnectionActiveOnPort(port)
92 def __isNetworkConnectionActiveOnPort(self, port):
93 # :NOTE: Under windows:
94 # netstat options -l and -t are unavailable
95 # grep command is unavailable
96 from subprocess import Popen, PIPE
97 (stdout, stderr) = Popen(['netstat','-an'], stdout=PIPE).communicate()
99 buf = StringIO.StringIO(stdout)
100 ports = buf.readlines()
101 # search for TCP - LISTEN connections
103 regObj = re.compile( ".*tcp.*:([0-9]+).*:.*listen", re.IGNORECASE );
106 p = int(regObj.match(item).group(1))
107 if p == port: return True
112 return (time.time() - self.__lastChangeTime > self.__timeout)
116 return "_PortManager: list of locked ports:" + str(sorted(self.__lockedPorts))
120 #------------------------------------
121 # Communication methods
123 _marshall = pickle.dumps
124 _unmarshall = pickle.loads
126 def _send(channel, *args):
127 buf = _marshall(args)
128 value = socket.htonl(len(buf))
129 size = struct.pack("L",value)
134 def _receive(channel):
135 size = struct.calcsize("L")
136 size = channel.recv(size)
138 size = socket.ntohl(struct.unpack("L", size)[0])
139 except struct.error, e:
143 while len(buf) < size:
144 buf = channel.recv(size - len(buf))
146 return _unmarshall(buf)[0]
148 #------------------------------------
150 GET_PORT_MSG = "GET_PORT"
151 GET_PREFERED_PORT_MSG = "GET_PREFERED_PORT"
152 RELEASE_PORT_MSG = "RELEASE_PORT"
153 STOP_SERVER_MSG = "STOP_SERVER"
155 GET_PORT_ACK_MSG = "GET_PORT"
156 RELEASE_PORT_ACK_MSG = "RELEASE_PORT"
158 class _ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler):
160 data = _receive(self.request)
161 if data == GET_PORT_MSG:
164 response = "%s: %s" % (GET_PORT_ACK_MSG, port)
165 _send(self.request, response)
166 elif data.startswith(GET_PREFERED_PORT_MSG):
167 port = int(data[len(GET_PREFERED_PORT_MSG)+1:])
169 port = pm.getPort(port)
170 response = "%s: %s" % (GET_PORT_ACK_MSG, port)
171 _send(self.request, response)
172 elif data.startswith(RELEASE_PORT_MSG):
173 port = int(data[len(RELEASE_PORT_MSG)+1:])
176 response = "%s" % (RELEASE_PORT_ACK_MSG)
177 _send(self.request, response)
178 #print "RELEASE_PORT:", port
180 #print "Close server"
181 self.server.shutdown()
183 elif data == STOP_SERVER_MSG:
184 #print "Close server"
185 self.server.shutdown()
188 class _ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
191 pm_address = ('127.0.0.1', 51843)
192 def __getServer(address):
193 SocketServer.ThreadingTCPServer.allow_reuse_address = True # can be restarted immediately
194 server = _ThreadedTCPServer(address, _ThreadedTCPRequestHandler, False) # Do not automatically bind
195 server.allow_reuse_address = True # Prevent 'cannot bind to address' errors on restart
202 server = __getServer(pm_address)
203 server.server_bind() # Manually bind, to support allow_reuse_address
204 server.server_activate()
205 pm_address = server.server_address
207 # Start a thread with the server -- that thread will then start one
208 # more thread for each request
209 server_thread = threading.Thread(target=server.serve_forever, name="SALOME_PortManager")
210 # Exit the server thread when the main thread terminates
211 #server_thread.setDaemon(True)
212 server_thread.start()
213 #print "Server loop running in thread:", server_thread.getName()
214 #print "Server address:", pm_address
217 #print "Server already started"
218 #print "Server address:", pm_address
223 def __newClient(address, message):
225 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
226 #print "connect client to", address
227 sock.connect(address)
229 response = _receive(sock)
230 if response.startswith(GET_PORT_ACK_MSG):
231 port = int(response[len(GET_PORT_ACK_MSG)+1:])
232 #print "GET_PORT:", port
234 elif response == RELEASE_PORT_ACK_MSG:
235 #print "Received: %s" % response
240 #print "Unable to connect to server"
244 def getPort(preferedPort=None):
246 return __newClient(pm_address, "%s: %s"%(GET_PREFERED_PORT_MSG,preferedPort))
248 return __newClient(pm_address, GET_PORT_MSG)
251 def releasePort(port):
252 __newClient(pm_address, "%s: %s"%(RELEASE_PORT_MSG,port))
256 __newClient(pm_address, STOP_SERVER_MSG)
259 # Auto start: unique instance ; no effect if called multiple times