1 # Copyright (C) 2013-2015 CEA/DEN, EDF R&D, OPEN CASCADE
3 # This library is free software; you can redistribute it and/or
4 # modify it under the terms of the GNU Lesser General Public
5 # License as published by the Free Software Foundation; either
6 # version 2.1 of the License, or (at your option) any later version.
8 # This library is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 # Lesser General Public License for more details.
13 # You should have received a copy of the GNU Lesser General Public
14 # License along with this library; if not, write to the Free Software
15 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
26 from cStringIO import StringIO
27 import multiprocessing
30 def new_instance(running_instances):
31 from salome_instance import SalomeInstance
32 instance = SalomeInstance.start()
33 print "Instance created and now running on port", instance.get_port()
34 running_instances.put(instance)
37 class TestConcurrentLaunch(unittest.TestCase):
38 def __createInstances(self, nb):
39 running_instances = multiprocessing.Queue()
41 multiprocessing.Process(target=new_instance, args=(running_instances,))
44 return running_instances, processes
46 def __terminateInstances(self, running_instances):
47 while not running_instances.empty():
48 instance = running_instances.get()
49 print "Terminate instance running on port", instance.get_port()
53 def appli(self, args=None):
57 sys.argv = ['runSalome', '-t']
59 setenv.main(True, exeName="salome start")
67 def session(self, args=None):
74 params, args = runSession.configureSession(args, exe="salome shell")
75 return runSession.runSession(params, args)
81 def test01_SingleSession(self):
82 print "** Testing single session **"
83 self.session(["hello.py"])
85 def test02_MultiSession(self):
86 print "** Testing multi sessions **"
89 p = multiprocessing.Process(target=self.session, args=(["hello.py"],))
96 def test03_SingleAppli(self):
97 print "** Testing single appli **"
98 running_instances, processes = self.__createInstances(1)
106 self.session(["hello.py"])
107 self.__terminateInstances(running_instances)
109 def test04_MultiAppli(self):
110 print "** Testing multi appli **"
111 running_instances, processes = self.__createInstances(9)
119 self.session(["hello.py"])
120 self.__terminateInstances(running_instances)
124 if __name__ == "__main__":
125 if not os.path.isfile("hello.py"):
126 with open("hello.py", "w") as f:
127 f.write("print 'Hello!'")