1 # Copyright (C) 2013-2017 CEA/DEN, EDF R&D, OPEN CASCADE
3 # This library is free software; you can redistribute it and/or
4 # modify it under the terms of the GNU Lesser General Public
5 # License as published by the Free Software Foundation; either
6 # version 2.1 of the License, or (at your option) any later version.
8 # This library is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 # Lesser General Public License for more details.
13 # You should have received a copy of the GNU Lesser General Public
14 # License along with this library; if not, write to the Free Software
15 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
26 from io import StringIO
27 import multiprocessing
30 def new_instance(running_instances):
31 from salome_instance import SalomeInstance
32 instance = SalomeInstance.start()
33 print("Instance created and now running on port", instance.get_port())
34 running_instances.put(instance)
37 class TestConcurrentLaunch(unittest.TestCase):
38 def __createInstances(self, nb):
39 running_instances = multiprocessing.Queue()
41 multiprocessing.Process(target=new_instance, args=(running_instances,))
44 return running_instances, processes
46 def __terminateInstances(self, running_instances):
48 timeout = time.time() + 60*10 # the test duration is about 50 s, we reasonably assume a max duration of 10mn
50 while not running_instances.empty() and time.time() < timeout:
51 instance = running_instances.get()
52 print("Terminate instance running on port", instance.get_port())
56 def appli(self, args=None):
60 sys.argv = ['runSalome', '-t']
62 setenv.main(True, exeName="salome start")
65 except SystemExit as e:
70 def session(self, args=None):
77 params, args = runSession.configureSession(args, exe="salome shell")
78 return runSession.runSession(params, args)
79 except SystemExit as e:
84 def test01_SingleSession(self):
85 print("** Testing single session **")
86 self.session(["hello.py"])
88 def test02_MultiSession(self):
89 print("** Testing multi sessions **")
92 p = multiprocessing.Process(target=self.session, args=(["hello.py"],))
99 def test03_SingleAppli(self):
100 print("** Testing single appli **")
101 running_instances, processes = self.__createInstances(1)
109 self.session(["hello.py"])
110 self.__terminateInstances(running_instances)
112 def test04_MultiAppli(self):
113 print("** Testing multi appli **")
114 running_instances, processes = self.__createInstances(9)
122 self.session(["hello.py"])
123 self.__terminateInstances(running_instances)
127 if __name__ == "__main__":