]> SALOME platform Git repositories - modules/kernel.git/blob - bin/PortManager.py
Salome HOME
cd9664b51cabe3d5769a95dea6211deb8a57b748
[modules/kernel.git] / bin / PortManager.py
1 #!/usr/bin/env python
2 #  -*- coding: iso-8859-1 -*-
3 # Copyright (C) 2007-2013  CEA/DEN, EDF R&D, OPEN CASCADE
4 #
5 # Copyright (C) 2003-2007  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
6 # CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
7 #
8 # This library is free software; you can redistribute it and/or
9 # modify it under the terms of the GNU Lesser General Public
10 # License as published by the Free Software Foundation; either
11 # version 2.1 of the License.
12 #
13 # This library is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16 # Lesser General Public License for more details.
17 #
18 # You should have received a copy of the GNU Lesser General Public
19 # License along with this library; if not, write to the Free Software
20 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
21 #
22 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
23 #
24 from Singleton import Singleton
25
26 import multiprocessing
27 import time
28 import socket
29
30 import sys
31 import threading
32 import SocketServer
33
34 try:
35   import cPickle as pickle
36 except:
37   import pickle
38
39 import struct
40 import ctypes
41
42 if sys.platform == 'win32':
43   import multiprocessing.reduction    # make sockets pickable/inheritable
44
45 multiprocessing.freeze_support() # Add support for when a program which uses multiprocessing has been frozen to produce a Windows executable.
46 #ignore = multiprocessing.active_children()      # cleanup any old processes
47
48 """
49 This class handles port distribution for SALOME sessions.
50 In case of concurent sessions, each will have its own port number.
51 """
52 class _PortManager(object): # :TODO: must manage lock owner
53   __metaclass__ = Singleton
54   #
55   def __init__(self, startNumber = 2810, limit = 100, timeout=60):
56     super(_PortManager, self).__init__()
57     self.__startNumber = startNumber
58     self.__limit = startNumber + limit
59     self.__lockedPorts = []
60     self.__lock = multiprocessing.Lock()
61     self.__timeout = timeout
62     self.__lastChangeTime = time.time()
63   #
64   # Test for prefered port number, if asked.
65   def getPort(self, port=None):
66     with self.__lock:
67       if not port or self.isPortUsed(port):
68         port = self.__startNumber
69         while self.isPortUsed(port):
70           if port == self.__limit:
71             msg  = "\n"
72             msg += "Can't find a free port to launch omniNames\n"
73             msg += "Try to kill the running servers and then launch SALOME again.\n"
74             raise RuntimeError, msg
75           port = port + 1
76       #
77       self.__lockedPorts.append(port)
78       self.__lastChangeTime = time.time()
79       return port
80   #
81   def releasePort(self, port):
82     with self.__lock:
83       if port in self.__lockedPorts:
84         self.__lockedPorts.remove(port)
85         self.__lastChangeTime = time.time()
86   #
87   def isBusy(self):
88     return len(self.__lockedPorts)
89   #
90   def isPortUsed(self, port):
91     return (port in self.__lockedPorts) or self.__isNetworkConnectionActiveOnPort(port)
92   #
93   def __isNetworkConnectionActiveOnPort(self, port):
94     # :NOTE: Under windows:
95     #        netstat options -l and -t are unavailable
96     #        grep command is unavailable
97     from subprocess import Popen, PIPE
98     (stdout, stderr) = Popen(['netstat','-an'], stdout=PIPE).communicate()
99     import StringIO
100     buf = StringIO.StringIO(stdout)
101     ports = buf.readlines()
102     # search for TCP - LISTEN connections
103     import re
104     regObj = re.compile( ".*tcp.*:([0-9]+).*:.*listen", re.IGNORECASE );
105     for item in ports:
106       try:
107         p = int(regObj.match(item).group(1))
108         if p == port: return True
109       except:
110         pass
111   #
112   def timeout(self):
113     return (time.time() - self.__lastChangeTime > self.__timeout)
114   #
115   def __str__(self):
116     with self.__lock:
117       return "_PortManager: list of locked ports:" + str(sorted(self.__lockedPorts))
118   #
119 #
120
121 #------------------------------------
122 # Communication methods
123
124 _marshall = pickle.dumps
125 _unmarshall = pickle.loads
126
127 def _send(channel, *args):
128   buf = _marshall(args)
129   value = socket.htonl(len(buf))
130   size = struct.pack("L",value)
131   channel.send(size)
132   channel.send(buf)
133 #
134
135 def _receive(channel):
136   size = struct.calcsize("L")
137   size = channel.recv(size)
138   try:
139     size = socket.ntohl(struct.unpack("L", size)[0])
140   except struct.error, e:
141     return ''
142
143   buf = ""
144   while len(buf) < size:
145     buf = channel.recv(size - len(buf))
146
147   return _unmarshall(buf)[0]
148 #
149 #------------------------------------
150
151 GET_PORT_MSG = "GET_PORT"
152 GET_PREFERED_PORT_MSG = "GET_PREFERED_PORT"
153 RELEASE_PORT_MSG = "RELEASE_PORT"
154 STOP_SERVER_MSG = "STOP_SERVER"
155
156 GET_PORT_ACK_MSG = "GET_PORT"
157 RELEASE_PORT_ACK_MSG = "RELEASE_PORT"
158
159 class _ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler):
160   def handle(self):
161     data = _receive(self.request)
162     if data == GET_PORT_MSG:
163       pm = _PortManager()
164       port = pm.getPort()
165       response = "%s: %s" % (GET_PORT_ACK_MSG, port)
166       _send(self.request, response)
167     elif data.startswith(GET_PREFERED_PORT_MSG):
168       port = int(data[len(GET_PREFERED_PORT_MSG)+1:])
169       pm = _PortManager()
170       port = pm.getPort(port)
171       response = "%s: %s" % (GET_PORT_ACK_MSG, port)
172       _send(self.request, response)
173     elif data.startswith(RELEASE_PORT_MSG):
174       port = int(data[len(RELEASE_PORT_MSG)+1:])
175       pm = _PortManager()
176       pm.releasePort(port)
177       response = "%s" % (RELEASE_PORT_ACK_MSG)
178       _send(self.request, response)
179       #print "RELEASE_PORT:", port
180       if not pm.isBusy():
181         #print "Close server"
182         self.server.shutdown()
183       #print pm
184     elif data == STOP_SERVER_MSG:
185       #print "Close server"
186       self.server.shutdown()
187 #
188
189 class _ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
190     pass
191
192 def __getServer(address):
193   SocketServer.ThreadingTCPServer.allow_reuse_address = True # can be restarted immediately
194   server = _ThreadedTCPServer(address, _ThreadedTCPRequestHandler, False) # Do not automatically bind
195   server.allow_reuse_address = True # Prevent 'cannot bind to address' errors on restart
196   return server
197 #
198
199 pm_address = ('localhost', 12345)
200 def __startServer():
201   try:
202     server = __getServer(pm_address)
203     server.server_bind()     # Manually bind, to support allow_reuse_address
204     server.server_activate()
205     address = server.server_address
206
207     # Start a thread with the server -- that thread will then start one
208     # more thread for each request
209     server_thread = threading.Thread(target=server.serve_forever, name="SALOME_PortManager")
210     # Exit the server thread when the main thread terminates
211     #server_thread.setDaemon(True)
212     server_thread.start()
213     #print "Server loop running in thread:", server_thread.getName()
214     #print "Server address:", address
215     #return address
216   except:
217     #print "Server already started"
218     #print "Server address:", pm_address
219     #return pm_address
220     pass
221 #
222
223 def __newClient(address, message):
224   try:
225     sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
226     sock.connect(address)
227     _send(sock, message)
228     response = _receive(sock)
229     if response.startswith(GET_PORT_ACK_MSG):
230       port = int(response[len(GET_PORT_ACK_MSG)+1:])
231       #print "GET_PORT:", port
232       return port
233     elif response == RELEASE_PORT_ACK_MSG:
234       #print "Received: %s" % response
235       return 0
236       pass
237     sock.close()
238   except socket.error:
239     #print "Unable to connect to server"
240     return -1
241 #
242
243 def getPort(preferedPort=None):
244   if preferedPort:
245     return __newClient(pm_address, "%s: %s"%(GET_PREFERED_PORT_MSG,preferedPort))
246   else:
247     return __newClient(pm_address, GET_PORT_MSG)
248 #
249
250 def releasePort(port):
251   __newClient(pm_address, "%s: %s"%(RELEASE_PORT_MSG,port))
252 #
253
254 def stopServer():
255   __newClient(pm_address, STOP_SERVER_MSG)
256 #
257
258 # Auto start: unique instance ; no effect if called multiple times
259 __startServer()
260 #server_thread = threading.Thread(target=__startServer, name="SALOME_PortManager")
261 #server_thread.setDaemon(True)
262 #server_thread.start()