]> SALOME platform Git repositories - modules/kernel.git/blob - bin/PortManager.py
Salome HOME
6c4ecb59640de9271969825b81ceb45cb057690f
[modules/kernel.git] / bin / PortManager.py
1 #!/usr/bin/env python
2 #  -*- coding: iso-8859-1 -*-
3 # Copyright (C) 2007-2013  CEA/DEN, EDF R&D, OPEN CASCADE
4 #
5 # Copyright (C) 2003-2007  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
6 # CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
7 #
8 # This library is free software; you can redistribute it and/or
9 # modify it under the terms of the GNU Lesser General Public
10 # License as published by the Free Software Foundation; either
11 # version 2.1 of the License.
12 #
13 # This library is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16 # Lesser General Public License for more details.
17 #
18 # You should have received a copy of the GNU Lesser General Public
19 # License along with this library; if not, write to the Free Software
20 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
21 #
22 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
23 #
24 from Singleton import Singleton
25
26 from multiprocessing.managers import SyncManager
27 import multiprocessing
28 import time
29 import socket
30 import Queue
31 import sys
32 import os
33
34 """
35 This class handles port distribution for SALOME sessions.
36 In case of concurent sessions, each will have its own port number.
37 """
38 class PortManager(object): # :TODO: must manage lock owner
39   __metaclass__ = Singleton
40   #
41   def __init__(self, startNumber = 2810, limit = 100, timeout=60):
42     super(PortManager, self).__init__()
43     self.__startNumber = startNumber
44     self.__limit = startNumber + limit
45     self.__lockedPorts = []
46     self.__lock = multiprocessing.Lock()
47     self.__timeout = timeout
48     self.__lastChangeTime = time.time()
49   #
50   def getPort(self):
51     with self.__lock:
52       port = self.__startNumber
53       while self.isPortUsed(port):
54         if port == self.__limit:
55           msg  = "\n"
56           msg += "Can't find a free port to launch omniNames\n"
57           msg += "Try to kill the running servers and then launch SALOME again.\n"
58           raise RuntimeError, msg
59         port = port + 1
60
61       self.__lockedPorts.append(port)
62       self.__lastChangeTime = time.time()
63       return port
64   #
65   def releasePort(self, port):
66     with self.__lock:
67       if port in self.__lockedPorts:
68         self.__lockedPorts.remove(port)
69         self.__lastChangeTime = time.time()
70   #
71   def isBusy(self):
72     return len(self.__lockedPorts)
73   #
74   def isPortUsed(self, port):
75     return (port in self.__lockedPorts) or self.__isNetworkConnectionActiveOnPort(port)
76   #
77   def __isNetworkConnectionActiveOnPort(self, port):
78     # :NOTE: Under windows:
79     #        netstat options -l and -t are unavailable
80     #        grep command is unavailable
81     from subprocess import Popen, PIPE
82     (stdout, stderr) = Popen(['netstat','-an'], stdout=PIPE).communicate()
83     import StringIO
84     buf = StringIO.StringIO(stdout)
85     ports = buf.readlines()
86     # search for TCP - LISTEN connections
87     import re
88     regObj = re.compile( ".*tcp.*:([0-9]+).*:.*listen", re.IGNORECASE );
89     for item in ports:
90       try:
91         p = int(regObj.match(item).group(1))
92         if p == port: return True
93       except:
94         pass
95   #
96   def timeout(self):
97     return (time.time() - self.__lastChangeTime > self.__timeout)
98   #
99   def __str__(self):
100     with self.__lock:
101       return "PortManager: list of locked ports:" + str(self.__lockedPorts)
102   #
103 #
104
105 def __build_server(ip, port, authkey):
106   message_queue = multiprocessing.Queue()
107
108   class MyManager(SyncManager):
109     pass
110
111   MyManager.register("PortManager", PortManager, exposed=['getPort', 'releasePort', 'isBusy', 'isPortUsed', 'timeout', '__str__'])
112   MyManager.register("get_message_queue", callable=lambda: message_queue)
113
114   manager = MyManager(address=(ip, port), authkey=authkey)
115
116   try:
117     manager.get_server()
118     manager.start()
119   except (EOFError, socket.error):
120     print 'Server already started on %s:%s'%(ip,port)
121     sys.exit(1)
122
123   return manager
124 #
125
126 def __build_client(ip, port, authkey):
127   class MyManager(SyncManager):
128     pass
129
130   MyManager.register("PortManager")
131   MyManager.register("get_message_queue")
132
133   manager = MyManager(address=(ip, port), authkey=authkey)
134   try:
135     manager.connect()
136   except socket.error:
137     raise Exception("Unable to connect to server on %s:%s"%(ip, port))
138   return manager
139 #
140
141 def __run_server(ip, port, authkey, timeout):
142   theserver = __build_server(ip, port, authkey)
143   shared_mesq = theserver.get_message_queue()
144   print 'PortManager server started on %s:%s'%(ip,port)
145   portManager = theserver.PortManager(timeout=timeout)
146
147   while portManager.isBusy() or not portManager.timeout():
148     try:
149       message = shared_mesq.get(block=False)
150       print message
151     except Queue.Empty:
152       pass
153
154   print "PortManager server is shuting down..."
155   time.sleep(2)
156   theserver.shutdown()
157 #
158
159 #
160 semaphore = None
161 #
162 def __run_client(manager, execute_func):
163   with semaphore:
164     name = multiprocessing.current_process().name
165     processId = os.getpid()
166
167     # do the job
168     if execute_func:
169       execute_func(manager, name, processId)
170 #
171
172 #
173 local_ip = '127.0.0.1'
174 port = 5000
175 authkey = 'salome_port_manager_access'
176 #
177 lock = multiprocessing.Lock()
178 def start_server(nbSimultaneous=10, timeout=10):
179   with lock:
180     procServer = multiprocessing.Process(target=__run_server, args=(local_ip,port,authkey,timeout,))
181     procServer.start()
182     global semaphore
183     semaphore = multiprocessing.Semaphore(nbSimultaneous)
184     time.sleep(2)
185 #
186
187 def start_client(ip=local_ip, execute_func=None, name="anonymous"):
188   manager = __build_client(ip, port, authkey)
189   p = multiprocessing.Process(target=__run_client, name=name, args=(manager,execute_func,))
190   p.start()
191   return manager
192 #
193
194 client_id = 0
195 def start_clients(nbClients, ip, execute_func, name_prefix="Client"):
196   global client_id
197   for i in range(nbClients):
198     start_client(ip, execute_func, name=name_prefix+"_"+str(client_id))
199     client_id = client_id + 1
200 #