Salome HOME
Python container is obsolete
[modules/yacs.git] / bin / PortManager.py
1 #!/usr/bin/env python
2 #  -*- coding: iso-8859-1 -*-
3 # Copyright (C) 2007-2013  CEA/DEN, EDF R&D, OPEN CASCADE
4 #
5 # Copyright (C) 2003-2007  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
6 # CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
7 #
8 # This library is free software; you can redistribute it and/or
9 # modify it under the terms of the GNU Lesser General Public
10 # License as published by the Free Software Foundation; either
11 # version 2.1 of the License.
12 #
13 # This library is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16 # Lesser General Public License for more details.
17 #
18 # You should have received a copy of the GNU Lesser General Public
19 # License along with this library; if not, write to the Free Software
20 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
21 #
22 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
23 #
24 from Singleton import Singleton
25
26 import multiprocessing
27 import time
28 import socket
29
30 import os
31 import sys
32 import threading
33 import SocketServer
34
35 try:
36   import cPickle as pickle
37 except:
38   import pickle
39
40 import struct
41 import ctypes
42
43 import logging
44 def createLogger():
45   logger = logging.getLogger(__name__)
46 #  logger.setLevel(logging.DEBUG)
47   ch = logging.StreamHandler()
48   ch.setLevel(logging.DEBUG)
49   formatter = logging.Formatter("%(levelname)s:%(threadName)s:%(message)s")
50   ch.setFormatter(formatter)
51   logger.addHandler(ch)
52   return logger
53 #
54 logger = createLogger()
55
56
57 if sys.platform == 'win32':
58   import multiprocessing.reduction    # make sockets pickable/inheritable
59
60 multiprocessing.freeze_support() # Add support for when a program which uses multiprocessing has been frozen to produce a Windows executable.
61
62 """
63 This class handles port distribution for SALOME sessions.
64 In case of concurent sessions, each will have its own port number.
65 """
66 class _PortManager(object): # :TODO: must manage lock owner
67   __metaclass__ = Singleton
68   #
69   def __init__(self, startNumber = 2810, limit = 100, timeout=60):
70     super(_PortManager, self).__init__()
71     self.__startNumber = startNumber
72     self.__limit = startNumber + limit
73     self.__lockedPorts = []
74     self.__lock = multiprocessing.Lock()
75     self.__timeout = timeout
76     self.__lastChangeTime = time.time()
77   #
78   # Test for prefered port number, if asked.
79   def getPort(self, port=None):
80     with self.__lock:
81       if not port or self.isPortUsed(port):
82         port = self.__startNumber
83         while self.isPortUsed(port):
84           if port == self.__limit:
85             msg  = "\n"
86             msg += "Can't find a free port to launch omniNames\n"
87             msg += "Try to kill the running servers and then launch SALOME again.\n"
88             raise RuntimeError, msg
89           port = port + 1
90       #
91       self.__lockedPorts.append(port)
92       self.__lastChangeTime = time.time()
93       return port
94   #
95   def releasePort(self, port):
96     logger.debug("PortManager.releasePort %s"%port)
97     with self.__lock:
98       if port in self.__lockedPorts:
99         self.__lockedPorts.remove(port)
100         self.__lastChangeTime = time.time()
101   #
102   def isBusy(self):
103     return len(self.__lockedPorts)
104   #
105   def isPortUsed(self, port):
106     return (port in self.__lockedPorts) or self.__isNetworkConnectionActiveOnPort(port)
107   #
108   def __isNetworkConnectionActiveOnPort(self, port):
109     # :NOTE: Under windows:
110     #        netstat options -l and -t are unavailable
111     #        grep command is unavailable
112     from subprocess import Popen, PIPE
113     (stdout, stderr) = Popen(['netstat','-an'], stdout=PIPE).communicate()
114     import StringIO
115     buf = StringIO.StringIO(stdout)
116     ports = buf.readlines()
117     # search for TCP - LISTEN connections
118     import re
119     regObj = re.compile( ".*tcp.*:([0-9]+).*:.*listen", re.IGNORECASE );
120     for item in ports:
121       try:
122         p = int(regObj.match(item).group(1))
123         if p == port: return True
124       except:
125         pass
126   #
127   def timeout(self):
128     return (time.time() - self.__lastChangeTime > self.__timeout)
129   #
130   def __str__(self):
131     with self.__lock:
132       return "_PortManager: list of locked ports:" + str(sorted(self.__lockedPorts))
133   #
134 #
135
136 #------------------------------------
137 # Communication methods
138
139 _marshall = pickle.dumps
140 _unmarshall = pickle.loads
141
142 def _send(channel, *args):
143   buf = _marshall(args)
144   value = socket.htonl(len(buf))
145   size = struct.pack("L",value)
146   channel.send(size)
147   channel.send(buf)
148 #
149
150 def _receive(channel):
151   size = struct.calcsize("L")
152   size = channel.recv(size)
153   try:
154     size = socket.ntohl(struct.unpack("L", size)[0])
155   except struct.error, e:
156     return ''
157
158   buf = ""
159   while len(buf) < size:
160     buf = channel.recv(size - len(buf))
161
162   return _unmarshall(buf)[0]
163 #
164 #------------------------------------
165
166 GET_PORT_MSG = "GET_PORT"
167 GET_PREFERED_PORT_MSG = "GET_PREFERED_PORT"
168 RELEASE_PORT_MSG = "RELEASE_PORT"
169 STOP_SERVER_MSG = "STOP_SERVER"
170 TEST_SERVER_MSG = "TEST_SERVER"
171
172 GET_PORT_ACK_MSG = "GET_PORT"
173 RELEASE_PORT_ACK_MSG = "RELEASE_PORT"
174 TEST_SERVER_ACK_MSG = "TEST_SERVER"
175
176 class _ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler):
177   def handle(self):
178     data = _receive(self.request)
179     if data == GET_PORT_MSG:
180       pm = _PortManager()
181       port = pm.getPort()
182       response = "%s: %s" % (GET_PORT_ACK_MSG, port)
183       _send(self.request, response)
184     elif data.startswith(GET_PREFERED_PORT_MSG):
185       port = int(data[len(GET_PREFERED_PORT_MSG)+1:])
186       pm = _PortManager()
187       port = pm.getPort(port)
188       response = "%s: %s" % (GET_PORT_ACK_MSG, port)
189       _send(self.request, response)
190     elif data.startswith(RELEASE_PORT_MSG):
191       port = int(data[len(RELEASE_PORT_MSG)+1:])
192       pm = _PortManager()
193       pm.releasePort(port)
194       response = "%s" % (RELEASE_PORT_ACK_MSG)
195       _send(self.request, response)
196       logger.debug("RELEASE_PORT: %s"%port)
197       if not pm.isBusy():
198         logger.debug("Close server")
199         config_file, lock_file = _getConfigurationFilename()
200         try:
201           os.remove(config_file)
202           pmlock.release()
203           os.remove(lock_file)
204         except:
205           pass
206         self.server.shutdown()
207       #print pm
208     elif data == STOP_SERVER_MSG:
209       logger.debug("Close server")
210       self.server.shutdown()
211     elif data == TEST_SERVER_MSG:
212       _send(self.request, TEST_SERVER_ACK_MSG)
213 #
214
215 class _ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
216     pass
217
218 #------------------------------------
219 # A file locker (Linux only)
220 import fcntl
221 class PortManagerLock:
222   def __init__(self, filename, readonly=False, blocking=True):
223     self.filename = filename
224     # This will create it if it does not exist already
225     logger.debug("Create lock on %s"%filename)
226     mode = 'w'
227     if readonly:
228       mode = 'r'
229     self.handle = open(filename, mode)
230     self.handle.seek(0) # go back to beginning of file to read it multiple times
231     self.__blocking = blocking
232
233   def acquire(self):
234     mode = fcntl.LOCK_EX
235     if not self.__blocking: # Raise an IOError exception if file has already been locked
236       mode = mode | fcntl.LOCK_NB
237     fcntl.flock(self.handle, mode)
238     logger.debug("lock acquired %s"%self.__blocking)
239
240   def release(self):
241     fcntl.flock(self.handle, fcntl.LOCK_UN)
242     logger.debug("lock released")
243
244   def __del__(self):
245     if logger:
246       logger.debug("Close lock file")
247     self.handle.close()
248 #
249 #------------------------------------
250
251 # Server address has to be shared by different processes, without any common
252 # ancestor.
253 # The "simplest" solution is to define it here as a global variable. Unfortunately,
254 # availability of the corresponding socket is not guaranted at all. If PortManager
255 # tries to use a socket it does not own, server is not created (it is identified as
256 # already existing), clients then connect on this socket but message passing
257 # between clients and server will not work and SALOME launch will crash.
258 # We can introduce a port verification procedure automatically called by importing
259 # this module (i.e. when creating the server). This procedure consists in creating
260 # a client which sends a specific message to the server that has to be tested. And
261 # loop on port numbers until a free socket is found and associated to a new server.
262 #
263 # Another approach is to let Python socket API select a free port number, then store
264 # it to a file on server host machine in order to be shared with clients.
265 # The logical part can be defined as follows. When server is started (by importing
266 # this module), write server port number to a specific file (do not use a temporary
267 # file name). Each client then read server address from this same file ; if file is
268 # not nound, it is an error (add appropriate processing).
269 # Server could also check file existence and try to use the same address as previous
270 # server in order to avoid opening too many unecessary sockets ; but we need to apply
271 # the above verification procedure. This processing is not necessary because TCP socket
272 # timeout will automatically close unused sockets.
273
274 def _getConfigurationFilename():
275   omniorbUserPath = os.getenv("OMNIORB_USER_PATH")
276
277   from salome_utils import generateFileName
278   portmanager_config = generateFileName(omniorbUserPath,
279                                         prefix="omniORB",
280                                         suffix="PortManager",
281                                         extension="cfg",
282                                         hidden=True)
283   lock_file = portmanager_config + "-lock"
284   return (portmanager_config, lock_file)
285 #
286
287 def __checkServer():
288   while True:
289     logger.debug("CHECKING SERVER")
290     status = __newClient(TEST_SERVER_MSG)
291     if status == TEST_SERVER_ACK_MSG:
292       break
293   return (status == TEST_SERVER_ACK_MSG)
294 #
295
296 def __getServerAddress(readonly=True):
297   address = ("localhost", 0)
298   try:
299     config_file, lock_file = _getConfigurationFilename()
300     lock = PortManagerLock(config_file, readonly, blocking=True)
301     lock.acquire()
302     address = eval(lock.handle.read())
303     lock.release()
304   except (IOError, SyntaxError) as e:
305     logger.debug("no configuration file")
306     pass
307   finally:
308     return address
309 #
310
311 def __setServerAddress(address):
312   config_file, lock_file = _getConfigurationFilename()
313   lock = PortManagerLock(config_file, readonly=False, blocking=True)
314   lock.acquire()
315   logger.debug("setServerAddress: %s"%str(address))
316   lock.handle.write(str(address))
317   lock.release()
318 #
319
320 def __getServer():
321   address = __getServerAddress(readonly=False)
322   SocketServer.ThreadingTCPServer.allow_reuse_address = True # can be restarted immediately
323   server = _ThreadedTCPServer(address, _ThreadedTCPRequestHandler, False) # Do not automatically bind
324   server.allow_reuse_address = True # Prevent 'cannot bind to address' errors on restart
325   server.server_bind()     # Manually bind, to support allow_reuse_address
326   __setServerAddress(server.server_address)
327   server.server_activate()
328   return server
329 #
330
331 pmlock = None
332 def __startServer():
333   try:
334     config_file, lock_file = _getConfigurationFilename()
335     global pmlock
336     pmlock = PortManagerLock(lock_file, readonly=False, blocking=False)
337     pmlock.acquire()
338
339     server = __getServer()
340     # Start a thread with the server -- that thread will then start one
341     # more thread for each request
342     server_thread = threading.Thread(target=server.serve_forever, name="SALOME_PortManager")
343     # Exit the server thread when the main thread terminates
344     #server_thread.setDaemon(True)
345     server_thread.start()
346     #print "Server loop running in thread:", server_thread.getName()
347   except:
348     logger.debug("Server already started")
349     pass
350 #
351
352 def __newClient(message):
353   address = __getServerAddress(readonly=True)
354   try:
355     sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
356     logger.debug("connect client to %s"%str(address))
357     sock.connect(address)
358     _send(sock, message)
359     response = _receive(sock)
360     if response.startswith(GET_PORT_ACK_MSG):
361       port = int(response[len(GET_PORT_ACK_MSG)+1:])
362       logger.debug("GET_PORT: %s"%port)
363       return port
364     elif response == RELEASE_PORT_ACK_MSG:
365       logger.debug("Received: %s" % response)
366       return 0
367       pass
368     elif response == TEST_SERVER_ACK_MSG:
369       logger.debug("Server is ok")
370       return TEST_SERVER_ACK_MSG
371       pass
372     sock.close()
373   except socket.error:
374     logger.debug("Unable to connect to server")
375     return -1
376 #
377
378 def getPort(preferedPort=None):
379   if preferedPort:
380     return __newClient("%s: %s"%(GET_PREFERED_PORT_MSG,preferedPort))
381   else:
382     return __newClient(GET_PORT_MSG)
383 #
384
385 def releasePort(port):
386   logger.debug("application asks for releasePort %s"%port)
387   __newClient("%s: %s"%(RELEASE_PORT_MSG,port))
388 #
389
390 def stopServer():
391   __newClient(STOP_SERVER_MSG)
392 #
393
394 # Auto start: unique instance ; no effect if called multiple times
395 __startServer()
396 logger.debug("Server started... do check...")
397 assert(__checkServer())