]> SALOME platform Git repositories - modules/kernel.git/blob - bin/appliskel/tests/concurrentSession/TestMinimalExample.py
Salome HOME
Merge branch 'master' of https://git.salome-platform.org/git/modules/kernel
[modules/kernel.git] / bin / appliskel / tests / concurrentSession / TestMinimalExample.py
1 #!/usr/bin/env python
2 # Copyright (C) 2013-2014  CEA/DEN, EDF R&D, OPEN CASCADE
3 #
4 # This library is free software; you can redistribute it and/or
5 # modify it under the terms of the GNU Lesser General Public
6 # License as published by the Free Software Foundation; either
7 # version 2.1 of the License, or (at your option) any later version.
8 #
9 # This library is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12 # Lesser General Public License for more details.
13 #
14 # You should have received a copy of the GNU Lesser General Public
15 # License along with this library; if not, write to the Free Software
16 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
17 #
18 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
19 #
20
21 import os
22 import sys
23 import multiprocessing
24 import unittest
25
26 def port_reservation(obtained_ports, prefered=None, test=None, expected=None):
27   from PortManager import getPort
28   if prefered:
29     port = getPort(prefered)
30   else:
31     port = getPort()
32   print "obtained port = %s"%port
33
34   obtained_ports.put(port)
35
36   if expected:
37     test.assertTrue(port == expected, "used = %s, expected = %s"%(port, expected))
38 #
39
40 class TestMinimalExample(unittest.TestCase):
41   def testSequential(self):
42     from PortManager import releasePort, getBusyPorts
43     print "\nBEGIN testSequential"
44     print "Busy ports", getBusyPorts()
45     obtained_ports = multiprocessing.Queue()
46
47     processes = [
48       multiprocessing.Process(target=port_reservation, args=(obtained_ports,))
49       for i in range(3)
50       ]
51
52     for p in processes:
53       p.start()
54
55     for p in processes:
56       p.join()
57
58     print "Busy ports", getBusyPorts()
59     # Try to get specific port number
60     p = multiprocessing.Process(target=port_reservation, args=(obtained_ports, 2872, self, 2872,))
61     p.start()
62     p.join()
63
64     # Try to get specific port number
65     p = multiprocessing.Process(target=port_reservation, args=(obtained_ports, 2812, self,))
66     p.start()
67     p.join()
68
69     # Try to get specific port number
70     p = multiprocessing.Process(target=port_reservation, args=(obtained_ports, 2812, self,))
71     p.start()
72     p.join()
73
74     # Release port
75     print "release port 2812"
76     p = multiprocessing.Process(target=releasePort, args=(2812,))
77     p.start()
78     p.join()
79
80     # Try to get specific port number
81     p = multiprocessing.Process(target=port_reservation, args=(obtained_ports, 2812, self, 2812,))
82     p.start()
83     p.join()
84
85     # Release ports
86     print "Busy ports", getBusyPorts()
87     while not obtained_ports.empty():
88       port = obtained_ports.get()
89       print "release port", port
90       p = multiprocessing.Process(target=releasePort, args=(port,))
91       p.start()
92       p.join()
93
94     print "END testSequential"
95   #
96
97   def testConcurrent(self):
98     from PortManager import releasePort, getBusyPorts
99     print "\nBEGIN testConcurrent"
100     print "Busy ports", getBusyPorts()
101     obtained_ports = multiprocessing.Queue()
102     processes = [
103       multiprocessing.Process(target=port_reservation, args=(obtained_ports,))
104
105       for i in range(3)
106       ]
107
108     # Try to get specific port number
109     p = multiprocessing.Process(target=port_reservation, args=(obtained_ports, 2872, self, 2872,))
110     processes.append(p)
111
112     # Try to get specific port number
113     p = multiprocessing.Process(target=port_reservation, args=(obtained_ports, 2812,))
114     processes.append(p)
115
116     # Try to get specific port number
117     p = multiprocessing.Process(target=port_reservation, args=(obtained_ports, 2812,))
118     processes.append(p)
119
120     for p in processes:
121       p.start()
122
123     for p in processes:
124       p.join()
125
126     # Release ports
127     print "Busy ports", getBusyPorts()
128     while not obtained_ports.empty():
129       port = obtained_ports.get()
130       print "release port", port
131       p = multiprocessing.Process(target=releasePort, args=(port,))
132       p.start()
133       p.join()
134
135     print "END testConcurrent"
136   #
137 #
138
139 if __name__ == "__main__":
140   omniorb_user_path = os.getenv("OMNIORB_USER_PATH")
141   if not omniorb_user_path:
142     msg = "Error: please set OMNIORB_USER_PATH variable."
143     raise Exception(msg)
144
145   try:
146     import PortManager
147   except ImportError:
148     msg = "Error: can't import PortManager; please check PYTHONPATH variable."
149     raise Exception(msg)
150
151   unittest.main()
152 #