2 # Copyright (C) 2013-2023 CEA, EDF, OPEN CASCADE
4 # This library is free software; you can redistribute it and/or
5 # modify it under the terms of the GNU Lesser General Public
6 # License as published by the Free Software Foundation; either
7 # version 2.1 of the License, or (at your option) any later version.
9 # This library is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 # Lesser General Public License for more details.
14 # You should have received a copy of the GNU Lesser General Public
15 # License along with this library; if not, write to the Free Software
16 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
22 import multiprocessing
26 def new_instance(running_instances):
27 from salome_instance import SalomeInstance
28 instance = SalomeInstance.start()
29 print("Instance created and now running on port", instance.get_port())
30 running_instances.put(instance)
33 def createInstances(nb):
34 running_instances = multiprocessing.Queue()
36 multiprocessing.Process(target=new_instance, args=(running_instances,))
39 return running_instances, processes
42 def terminateInstances(running_instances):
44 timeout = time.time() + 60 * 10 # the test duration is about 50 s, we reasonably assume a max duration of 10mn
46 while not running_instances.empty() and time.time() < timeout:
47 instance = running_instances.get()
48 print("Terminate instance running on port", instance.get_port())
52 def session(args=None):
59 params, args = runSession.configureSession(args, exe="salome shell")
60 return runSession.runSession(params, args)
61 except SystemExit as e:
71 sys.argv = ['runSalome', '-t']
73 setenv.main(True, exeName="salome start")
76 except SystemExit as e:
82 class TestConcurrentLaunch(unittest.TestCase):
84 def test01_SingleSession(self):
85 print("** Testing single session **")
88 def test02_MultiSession(self):
89 print("** Testing multi sessions **")
92 p = multiprocessing.Process(target=session, args=(["hello.py"],))
100 def test03_SingleAppli(self):
101 print("** Testing single appli **")
102 running_instances, processes = createInstances(1)
110 session(["hello.py"])
111 terminateInstances(running_instances)
114 def test04_MultiAppli(self):
115 print("** Testing multi appli **")
116 running_instances, processes = createInstances(9)
124 session(["hello.py"])
125 terminateInstances(running_instances)
129 if __name__ == "__main__":