Salome HOME
Windows fix
[modules/kernel.git] / bin / PortManager.py
1 #!/usr/bin/env python
2 #  -*- coding: iso-8859-1 -*-
3 # Copyright (C) 2007-2013  CEA/DEN, EDF R&D, OPEN CASCADE
4 #
5 # Copyright (C) 2003-2007  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
6 # CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
7 #
8 # This library is free software; you can redistribute it and/or
9 # modify it under the terms of the GNU Lesser General Public
10 # License as published by the Free Software Foundation; either
11 # version 2.1 of the License.
12 #
13 # This library is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16 # Lesser General Public License for more details.
17 #
18 # You should have received a copy of the GNU Lesser General Public
19 # License along with this library; if not, write to the Free Software
20 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
21 #
22 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
23 #
24 from Singleton import Singleton
25
26 import multiprocessing
27 import time
28 import socket
29
30 import sys
31 import threading
32 import SocketServer
33
34 try:
35   import cPickle as pickle
36 except:
37   import pickle
38
39 import struct
40 import ctypes
41
42 if sys.platform == 'win32':
43   import multiprocessing.reduction    # make sockets pickable/inheritable
44
45 multiprocessing.freeze_support() # Add support for when a program which uses multiprocessing has been frozen to produce a Windows executable.
46
47 """
48 This class handles port distribution for SALOME sessions.
49 In case of concurent sessions, each will have its own port number.
50 """
51 class _PortManager(object): # :TODO: must manage lock owner
52   __metaclass__ = Singleton
53   #
54   def __init__(self, startNumber = 2810, limit = 100, timeout=60):
55     super(_PortManager, self).__init__()
56     self.__startNumber = startNumber
57     self.__limit = startNumber + limit
58     self.__lockedPorts = []
59     self.__lock = multiprocessing.Lock()
60     self.__timeout = timeout
61     self.__lastChangeTime = time.time()
62   #
63   # Test for prefered port number, if asked.
64   def getPort(self, port=None):
65     with self.__lock:
66       if not port or self.isPortUsed(port):
67         port = self.__startNumber
68         while self.isPortUsed(port):
69           if port == self.__limit:
70             msg  = "\n"
71             msg += "Can't find a free port to launch omniNames\n"
72             msg += "Try to kill the running servers and then launch SALOME again.\n"
73             raise RuntimeError, msg
74           port = port + 1
75       #
76       self.__lockedPorts.append(port)
77       self.__lastChangeTime = time.time()
78       return port
79   #
80   def releasePort(self, port):
81     with self.__lock:
82       if port in self.__lockedPorts:
83         self.__lockedPorts.remove(port)
84         self.__lastChangeTime = time.time()
85   #
86   def isBusy(self):
87     return len(self.__lockedPorts)
88   #
89   def isPortUsed(self, port):
90     return (port in self.__lockedPorts) or self.__isNetworkConnectionActiveOnPort(port)
91   #
92   def __isNetworkConnectionActiveOnPort(self, port):
93     # :NOTE: Under windows:
94     #        netstat options -l and -t are unavailable
95     #        grep command is unavailable
96     from subprocess import Popen, PIPE
97     (stdout, stderr) = Popen(['netstat','-an'], stdout=PIPE).communicate()
98     import StringIO
99     buf = StringIO.StringIO(stdout)
100     ports = buf.readlines()
101     # search for TCP - LISTEN connections
102     import re
103     regObj = re.compile( ".*tcp.*:([0-9]+).*:.*listen", re.IGNORECASE );
104     for item in ports:
105       try:
106         p = int(regObj.match(item).group(1))
107         if p == port: return True
108       except:
109         pass
110   #
111   def timeout(self):
112     return (time.time() - self.__lastChangeTime > self.__timeout)
113   #
114   def __str__(self):
115     with self.__lock:
116       return "_PortManager: list of locked ports:" + str(sorted(self.__lockedPorts))
117   #
118 #
119
120 #------------------------------------
121 # Communication methods
122
123 _marshall = pickle.dumps
124 _unmarshall = pickle.loads
125
126 def _send(channel, *args):
127   buf = _marshall(args)
128   value = socket.htonl(len(buf))
129   size = struct.pack("L",value)
130   channel.send(size)
131   channel.send(buf)
132 #
133
134 def _receive(channel):
135   size = struct.calcsize("L")
136   size = channel.recv(size)
137   try:
138     size = socket.ntohl(struct.unpack("L", size)[0])
139   except struct.error, e:
140     return ''
141
142   buf = ""
143   while len(buf) < size:
144     buf = channel.recv(size - len(buf))
145
146   return _unmarshall(buf)[0]
147 #
148 #------------------------------------
149
150 GET_PORT_MSG = "GET_PORT"
151 GET_PREFERED_PORT_MSG = "GET_PREFERED_PORT"
152 RELEASE_PORT_MSG = "RELEASE_PORT"
153 STOP_SERVER_MSG = "STOP_SERVER"
154
155 GET_PORT_ACK_MSG = "GET_PORT"
156 RELEASE_PORT_ACK_MSG = "RELEASE_PORT"
157
158 class _ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler):
159   def handle(self):
160     data = _receive(self.request)
161     if data == GET_PORT_MSG:
162       pm = _PortManager()
163       port = pm.getPort()
164       response = "%s: %s" % (GET_PORT_ACK_MSG, port)
165       _send(self.request, response)
166     elif data.startswith(GET_PREFERED_PORT_MSG):
167       port = int(data[len(GET_PREFERED_PORT_MSG)+1:])
168       pm = _PortManager()
169       port = pm.getPort(port)
170       response = "%s: %s" % (GET_PORT_ACK_MSG, port)
171       _send(self.request, response)
172     elif data.startswith(RELEASE_PORT_MSG):
173       port = int(data[len(RELEASE_PORT_MSG)+1:])
174       pm = _PortManager()
175       pm.releasePort(port)
176       response = "%s" % (RELEASE_PORT_ACK_MSG)
177       _send(self.request, response)
178       #print "RELEASE_PORT:", port
179       if not pm.isBusy():
180         #print "Close server"
181         self.server.shutdown()
182       #print pm
183     elif data == STOP_SERVER_MSG:
184       #print "Close server"
185       self.server.shutdown()
186 #
187
188 class _ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
189     pass
190
191 pm_address = ('127.0.0.1', 51843)
192 def __getServer(address):
193   SocketServer.ThreadingTCPServer.allow_reuse_address = True # can be restarted immediately
194   server = _ThreadedTCPServer(address, _ThreadedTCPRequestHandler, False) # Do not automatically bind
195   server.allow_reuse_address = True # Prevent 'cannot bind to address' errors on restart
196   return server
197 #
198
199 def __startServer():
200   global pm_address
201   try:
202     server = __getServer(pm_address)
203     server.server_bind()     # Manually bind, to support allow_reuse_address
204     server.server_activate()
205     pm_address = server.server_address
206
207     # Start a thread with the server -- that thread will then start one
208     # more thread for each request
209     server_thread = threading.Thread(target=server.serve_forever, name="SALOME_PortManager")
210     # Exit the server thread when the main thread terminates
211     #server_thread.setDaemon(True)
212     server_thread.start()
213     #print "Server loop running in thread:", server_thread.getName()
214     #print "Server address:", pm_address
215     #return address
216   except:
217     #print "Server already started"
218     #print "Server address:", pm_address
219     #return pm_address
220     pass
221 #
222
223 def __newClient(address, message):
224   try:
225     sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
226     #print "connect client to", address
227     sock.connect(address)
228     _send(sock, message)
229     response = _receive(sock)
230     if response.startswith(GET_PORT_ACK_MSG):
231       port = int(response[len(GET_PORT_ACK_MSG)+1:])
232       #print "GET_PORT:", port
233       return port
234     elif response == RELEASE_PORT_ACK_MSG:
235       #print "Received: %s" % response
236       return 0
237       pass
238     sock.close()
239   except socket.error:
240     #print "Unable to connect to server"
241     return -1
242 #
243
244 def getPort(preferedPort=None):
245   if preferedPort:
246     return __newClient(pm_address, "%s: %s"%(GET_PREFERED_PORT_MSG,preferedPort))
247   else:
248     return __newClient(pm_address, GET_PORT_MSG)
249 #
250
251 def releasePort(port):
252   __newClient(pm_address, "%s: %s"%(RELEASE_PORT_MSG,port))
253 #
254
255 def stopServer():
256   __newClient(pm_address, STOP_SERVER_MSG)
257 #
258
259 # Auto start: unique instance ; no effect if called multiple times
260 __startServer()