Salome HOME
Updated copyright comment
[modules/kernel.git] / bin / appliskel / tests / concurrentSession / TestConcurrentSession.py
1 #!/usr/bin/env python3
2 # Copyright (C) 2013-2024  CEA, EDF, OPEN CASCADE
3 #
4 # This library is free software; you can redistribute it and/or
5 # modify it under the terms of the GNU Lesser General Public
6 # License as published by the Free Software Foundation; either
7 # version 2.1 of the License, or (at your option) any later version.
8 #
9 # This library is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12 # Lesser General Public License for more details.
13 #
14 # You should have received a copy of the GNU Lesser General Public
15 # License along with this library; if not, write to the Free Software
16 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
17 #
18 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
19 #
20
21 import logging
22 import multiprocessing
23 import sys
24 import unittest
25
26 def new_instance(running_instances):
27   from salome_instance import SalomeInstance
28   instance = SalomeInstance.start()
29   print("Instance created and now running on port", instance.get_port())
30   running_instances.put(instance)
31 #
32
33 def createInstances(nb):
34   running_instances = multiprocessing.Queue()
35   processes = [
36     multiprocessing.Process(target=new_instance, args=(running_instances,))
37     for i in range(nb)
38   ]
39   return running_instances, processes
40 #
41
42 def terminateInstances(running_instances):
43   import time
44   timeout = time.time() + 60 * 10  # the test duration is about 50 s, we reasonably assume a max duration of 10mn
45
46   while not running_instances.empty() and time.time() < timeout:
47     instance = running_instances.get()
48     print("Terminate instance running on port", instance.get_port())
49     instance.stop()
50 #
51
52 def session(args=None):
53   if args is None:
54     args = []
55   try:
56     import setenv
57     setenv.main(True)
58     import runSession
59     params, args = runSession.configureSession(args, exe="salome shell")
60     return runSession.runSession(params, args)
61   except SystemExit as e:
62     if str(e) != '0':
63       logging.error(e)
64     pass
65 #
66
67 def appli(args=None):
68   if args is None:
69     args = []
70   try:
71     sys.argv = ['runSalome', '-t']
72     import setenv
73     setenv.main(True, exeName="salome start")
74     import runSalome
75     runSalome.runSalome()
76   except SystemExit as e:
77     if str(e) != '0':
78       logging.error(e)
79     pass
80 #
81
82 class TestConcurrentLaunch(unittest.TestCase):
83
84   def test01_SingleSession(self):
85     print("** Testing single session **")
86     session(["hello.py"])
87
88   def test02_MultiSession(self):
89     print("** Testing multi sessions **")
90     jobs = []
91     for i in range(9):
92       p = multiprocessing.Process(target=session, args=(["hello.py"],))
93       jobs.append(p)
94       p.start()
95
96     for j in jobs:
97       j.join()
98   #
99
100   def test03_SingleAppli(self):
101     print("** Testing single appli **")
102     running_instances, processes = createInstances(1)
103     for p in processes:
104       p.start()
105       pass
106     for p in processes:
107       p.join()
108       pass
109
110     session(["hello.py"])
111     terminateInstances(running_instances)
112   #
113
114   def test04_MultiAppli(self):
115     print("** Testing multi appli **")
116     running_instances, processes = createInstances(9)
117     for p in processes:
118       p.start()
119       pass
120     for p in processes:
121       p.join()
122       pass
123
124     session(["hello.py"])
125     terminateInstances(running_instances)
126   #
127 #
128
129 if __name__ == "__main__":
130   unittest.main()
131 #