2 # Copyright (C) 2013-2019 CEA/DEN, EDF R&D, OPEN CASCADE
4 # This library is free software; you can redistribute it and/or
5 # modify it under the terms of the GNU Lesser General Public
6 # License as published by the Free Software Foundation; either
7 # version 2.1 of the License, or (at your option) any later version.
9 # This library is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 # Lesser General Public License for more details.
14 # You should have received a copy of the GNU Lesser General Public
15 # License along with this library; if not, write to the Free Software
16 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
27 from io import StringIO
28 import multiprocessing
31 def new_instance(running_instances):
32 from salome_instance import SalomeInstance
33 instance = SalomeInstance.start()
34 print("Instance created and now running on port", instance.get_port())
35 running_instances.put(instance)
38 class TestConcurrentLaunch(unittest.TestCase):
39 def __createInstances(self, nb):
40 running_instances = multiprocessing.Queue()
42 multiprocessing.Process(target=new_instance, args=(running_instances,))
45 return running_instances, processes
47 def __terminateInstances(self, running_instances):
49 timeout = time.time() + 60*10 # the test duration is about 50 s, we reasonably assume a max duration of 10mn
51 while not running_instances.empty() and time.time() < timeout:
52 instance = running_instances.get()
53 print("Terminate instance running on port", instance.get_port())
57 def appli(self, args=None):
61 sys.argv = ['runSalome', '-t']
63 setenv.main(True, exeName="salome start")
66 except SystemExit as e:
71 def session(self, args=None):
78 params, args = runSession.configureSession(args, exe="salome shell")
79 return runSession.runSession(params, args)
80 except SystemExit as e:
85 def test01_SingleSession(self):
86 print("** Testing single session **")
87 self.session(["hello.py"])
89 def test02_MultiSession(self):
90 print("** Testing multi sessions **")
93 p = multiprocessing.Process(target=self.session, args=(["hello.py"],))
100 def test03_SingleAppli(self):
101 print("** Testing single appli **")
102 running_instances, processes = self.__createInstances(1)
110 self.session(["hello.py"])
111 self.__terminateInstances(running_instances)
113 def test04_MultiAppli(self):
114 print("** Testing multi appli **")
115 running_instances, processes = self.__createInstances(9)
123 self.session(["hello.py"])
124 self.__terminateInstances(running_instances)
128 if __name__ == "__main__":