2 # -*- coding: iso-8859-1 -*-
3 # Copyright (C) 2007-2013 CEA/DEN, EDF R&D, OPEN CASCADE
5 # Copyright (C) 2003-2007 OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
6 # CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
8 # This library is free software; you can redistribute it and/or
9 # modify it under the terms of the GNU Lesser General Public
10 # License as published by the Free Software Foundation; either
11 # version 2.1 of the License.
13 # This library is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 # Lesser General Public License for more details.
18 # You should have received a copy of the GNU Lesser General Public
19 # License along with this library; if not, write to the Free Software
20 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
22 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
24 from Singleton import Singleton
26 import multiprocessing
36 import cPickle as pickle
45 logger = logging.getLogger(__name__)
46 # logger.setLevel(logging.DEBUG)
47 ch = logging.StreamHandler()
48 ch.setLevel(logging.DEBUG)
49 formatter = logging.Formatter("%(levelname)s:%(threadName)s:%(message)s")
50 ch.setFormatter(formatter)
54 logger = createLogger()
57 if sys.platform == 'win32':
58 import multiprocessing.reduction # make sockets pickable/inheritable
60 multiprocessing.freeze_support() # Add support for when a program which uses multiprocessing has been frozen to produce a Windows executable.
63 This class handles port distribution for SALOME sessions.
64 In case of concurent sessions, each will have its own port number.
66 class _PortManager(object): # :TODO: must manage lock owner
67 __metaclass__ = Singleton
69 def __init__(self, startNumber = 2810, limit = 100, timeout=60):
70 super(_PortManager, self).__init__()
71 self.__startNumber = startNumber
72 self.__limit = startNumber + limit
73 self.__lockedPorts = []
74 self.__lock = multiprocessing.Lock()
75 self.__timeout = timeout
76 self.__lastChangeTime = time.time()
78 # Test for prefered port number, if asked.
79 def getPort(self, port=None):
81 if not port or self.isPortUsed(port):
82 port = self.__startNumber
83 while self.isPortUsed(port):
84 if port == self.__limit:
86 msg += "Can't find a free port to launch omniNames\n"
87 msg += "Try to kill the running servers and then launch SALOME again.\n"
88 raise RuntimeError, msg
91 self.__lockedPorts.append(port)
92 self.__lastChangeTime = time.time()
95 def releasePort(self, port):
96 logger.debug("PortManager.releasePort %s"%port)
98 if port in self.__lockedPorts:
99 self.__lockedPorts.remove(port)
100 self.__lastChangeTime = time.time()
103 return len(self.__lockedPorts)
105 def isPortUsed(self, port):
106 return (port in self.__lockedPorts) or self.__isNetworkConnectionActiveOnPort(port)
108 def __isNetworkConnectionActiveOnPort(self, port):
109 # :NOTE: Under windows:
110 # netstat options -l and -t are unavailable
111 # grep command is unavailable
112 from subprocess import Popen, PIPE
113 (stdout, stderr) = Popen(['netstat','-an'], stdout=PIPE).communicate()
115 buf = StringIO.StringIO(stdout)
116 ports = buf.readlines()
117 # search for TCP - LISTEN connections
119 regObj = re.compile( ".*tcp.*:([0-9]+).*:.*listen", re.IGNORECASE );
122 p = int(regObj.match(item).group(1))
123 if p == port: return True
128 return (time.time() - self.__lastChangeTime > self.__timeout)
132 return "_PortManager: list of locked ports:" + str(sorted(self.__lockedPorts))
136 #------------------------------------
137 # Communication methods
139 _marshall = pickle.dumps
140 _unmarshall = pickle.loads
142 def _send(channel, *args):
143 buf = _marshall(args)
144 value = socket.htonl(len(buf))
145 size = struct.pack("L",value)
150 def _receive(channel):
151 size = struct.calcsize("L")
152 size = channel.recv(size)
154 size = socket.ntohl(struct.unpack("L", size)[0])
155 except struct.error, e:
159 while len(buf) < size:
160 buf = channel.recv(size - len(buf))
162 return _unmarshall(buf)[0]
164 #------------------------------------
166 GET_PORT_MSG = "GET_PORT"
167 GET_PREFERED_PORT_MSG = "GET_PREFERED_PORT"
168 RELEASE_PORT_MSG = "RELEASE_PORT"
169 STOP_SERVER_MSG = "STOP_SERVER"
170 TEST_SERVER_MSG = "TEST_SERVER"
172 GET_PORT_ACK_MSG = "GET_PORT"
173 RELEASE_PORT_ACK_MSG = "RELEASE_PORT"
174 TEST_SERVER_ACK_MSG = "TEST_SERVER"
176 class _ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler):
178 data = _receive(self.request)
179 if data == GET_PORT_MSG:
182 response = "%s: %s" % (GET_PORT_ACK_MSG, port)
183 _send(self.request, response)
184 elif data.startswith(GET_PREFERED_PORT_MSG):
185 port = int(data[len(GET_PREFERED_PORT_MSG)+1:])
187 port = pm.getPort(port)
188 response = "%s: %s" % (GET_PORT_ACK_MSG, port)
189 _send(self.request, response)
190 elif data.startswith(RELEASE_PORT_MSG):
191 port = int(data[len(RELEASE_PORT_MSG)+1:])
194 response = "%s" % (RELEASE_PORT_ACK_MSG)
195 _send(self.request, response)
196 logger.debug("RELEASE_PORT: %s"%port)
198 logger.debug("Close server")
199 config_file, lock_file = _getConfigurationFilename()
201 os.remove(config_file)
206 self.server.shutdown()
208 elif data == STOP_SERVER_MSG:
209 logger.debug("Close server")
210 self.server.shutdown()
211 elif data == TEST_SERVER_MSG:
212 _send(self.request, TEST_SERVER_ACK_MSG)
215 class _ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
218 #------------------------------------
219 # A file locker (Linux only)
221 class PortManagerLock:
222 def __init__(self, filename, readonly=False, blocking=True):
223 self.filename = filename
224 # This will create it if it does not exist already
225 logger.debug("Create lock on %s"%filename)
229 self.handle = open(filename, mode)
230 self.handle.seek(0) # go back to beginning of file to read it multiple times
231 self.__blocking = blocking
235 if not self.__blocking: # Raise an IOError exception if file has already been locked
236 mode = mode | fcntl.LOCK_NB
237 fcntl.flock(self.handle, mode)
238 logger.debug("lock acquired %s"%self.__blocking)
241 fcntl.flock(self.handle, fcntl.LOCK_UN)
242 logger.debug("lock released")
246 logger.debug("Close lock file")
249 #------------------------------------
251 # Server address has to be shared by different processes, without any common
253 # The "simplest" solution is to define it here as a global variable. Unfortunately,
254 # availability of the corresponding socket is not guaranted at all. If PortManager
255 # tries to use a socket it does not own, server is not created (it is identified as
256 # already existing), clients then connect on this socket but message passing
257 # between clients and server will not work and SALOME launch will crash.
258 # We can introduce a port verification procedure automatically called by importing
259 # this module (i.e. when creating the server). This procedure consists in creating
260 # a client which sends a specific message to the server that has to be tested. And
261 # loop on port numbers until a free socket is found and associated to a new server.
263 # Another approach is to let Python socket API select a free port number, then store
264 # it to a file on server host machine in order to be shared with clients.
265 # The logical part can be defined as follows. When server is started (by importing
266 # this module), write server port number to a specific file (do not use a temporary
267 # file name). Each client then read server address from this same file ; if file is
268 # not nound, it is an error (add appropriate processing).
269 # Server could also check file existence and try to use the same address as previous
270 # server in order to avoid opening too many unecessary sockets ; but we need to apply
271 # the above verification procedure. This processing is not necessary because TCP socket
272 # timeout will automatically close unused sockets.
274 def _getConfigurationFilename():
275 omniorbUserPath = os.getenv("OMNIORB_USER_PATH")
277 from salome_utils import generateFileName
278 portmanager_config = generateFileName(omniorbUserPath,
280 suffix="PortManager",
283 lock_file = portmanager_config + "-lock"
284 return (portmanager_config, lock_file)
289 logger.debug("CHECKING SERVER")
290 status = __newClient(TEST_SERVER_MSG)
291 if status == TEST_SERVER_ACK_MSG:
293 return (status == TEST_SERVER_ACK_MSG)
296 def __getServerAddress(readonly=True):
297 address = ("localhost", 0)
299 config_file, lock_file = _getConfigurationFilename()
300 lock = PortManagerLock(config_file, readonly, blocking=True)
302 address = eval(lock.handle.read())
304 except (IOError, SyntaxError) as e:
305 logger.debug("no configuration file")
311 def __setServerAddress(address):
312 config_file, lock_file = _getConfigurationFilename()
313 lock = PortManagerLock(config_file, readonly=False, blocking=True)
315 logger.debug("setServerAddress: %s"%str(address))
316 lock.handle.write(str(address))
321 address = __getServerAddress(readonly=False)
322 SocketServer.ThreadingTCPServer.allow_reuse_address = True # can be restarted immediately
323 server = _ThreadedTCPServer(address, _ThreadedTCPRequestHandler, False) # Do not automatically bind
324 server.allow_reuse_address = True # Prevent 'cannot bind to address' errors on restart
325 server.server_bind() # Manually bind, to support allow_reuse_address
326 __setServerAddress(server.server_address)
327 server.server_activate()
334 config_file, lock_file = _getConfigurationFilename()
336 pmlock = PortManagerLock(lock_file, readonly=False, blocking=False)
339 server = __getServer()
340 # Start a thread with the server -- that thread will then start one
341 # more thread for each request
342 server_thread = threading.Thread(target=server.serve_forever, name="SALOME_PortManager")
343 # Exit the server thread when the main thread terminates
344 #server_thread.setDaemon(True)
345 server_thread.start()
346 #print "Server loop running in thread:", server_thread.getName()
348 logger.debug("Server already started")
352 def __newClient(message):
353 address = __getServerAddress(readonly=True)
355 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
356 logger.debug("connect client to %s"%str(address))
357 sock.connect(address)
359 response = _receive(sock)
360 if response.startswith(GET_PORT_ACK_MSG):
361 port = int(response[len(GET_PORT_ACK_MSG)+1:])
362 logger.debug("GET_PORT: %s"%port)
364 elif response == RELEASE_PORT_ACK_MSG:
365 logger.debug("Received: %s" % response)
368 elif response == TEST_SERVER_ACK_MSG:
369 logger.debug("Server is ok")
370 return TEST_SERVER_ACK_MSG
374 logger.debug("Unable to connect to server")
378 def getPort(preferedPort=None):
380 return __newClient("%s: %s"%(GET_PREFERED_PORT_MSG,preferedPort))
382 return __newClient(GET_PORT_MSG)
385 def releasePort(port):
386 logger.debug("application asks for releasePort %s"%port)
387 __newClient("%s: %s"%(RELEASE_PORT_MSG,port))
391 __newClient(STOP_SERVER_MSG)
394 # Auto start: unique instance ; no effect if called multiple times
396 logger.debug("Server started... do check...")
397 assert(__checkServer())