]> SALOME platform Git repositories - modules/kernel.git/blob - bin/appliskel/tests/concurrentSession/TestConcurrentSession.py
Salome HOME
unit tests for concurrent sessions
[modules/kernel.git] / bin / appliskel / tests / concurrentSession / TestConcurrentSession.py
1 # Copyright (C) 2013-2014  CEA/DEN, EDF R&D, OPEN CASCADE
2 #
3 # This library is free software; you can redistribute it and/or
4 # modify it under the terms of the GNU Lesser General Public
5 # License as published by the Free Software Foundation; either
6 # version 2.1 of the License, or (at your option) any later version.
7 #
8 # This library is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
11 # Lesser General Public License for more details.
12 #
13 # You should have received a copy of the GNU Lesser General Public
14 # License along with this library; if not, write to the Free Software
15 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
16 #
17 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
18 #
19
20 import unittest
21 import tempfile
22
23 import os
24 import sys
25 import imp
26 from cStringIO import StringIO
27 import multiprocessing
28
29
30 class TestConcurrentLaunch(unittest.TestCase):
31   def setUp(self):
32     # Initialize path to SALOME application
33     path_to_launcher = os.getenv("SALOME_LAUNCHER")
34     appli_dir = os.path.dirname(path_to_launcher)
35
36     # Configure session startup
37     self.SALOME = imp.load_source("SALOME", os.path.join(appli_dir,"salome"))
38     self.SALOME_appli_args = ["start", "-t"]
39     self.SALOME_shell_args = ["shell"]
40   #
41   def tearDown(self):
42     pass
43   #
44   def appli(self, args=[]):
45     self.SALOME.main(self.SALOME_appli_args + args)
46   #
47   def session(self, args=[]):
48     self.SALOME.main(self.SALOME_shell_args + args)
49   #
50   def test01_SingleSession(self):
51     print "** Testing single session **"
52     self.session(["hello.py"])
53   #
54   def test02_MultiSession(self):
55     print "** Testing multi sessions **"
56     jobs = []
57     for i in range(3):
58       p = multiprocessing.Process(target=self.session, args=(["hello.py"],))
59       jobs.append(p)
60       p.start()
61
62     for j in jobs:
63       j.join()
64   #
65   def test03_SingleAppli(self):
66     print "** Testing single appli **"
67     current_directory = os.path.dirname(os.path.abspath(__file__))
68     session_log = tempfile.NamedTemporaryFile(prefix='session_', suffix='.log')
69     self.appli(["--ns-port-log=%s"%session_log.name])
70     port_number = None
71     with open(session_log.name, "r") as f:
72       port_number = f.readline()
73     self.session(["hello.py"])
74     self.session(["-p", port_number, "killSalomeWithPort.py", "args:%s"%port_number])
75     session_log.close()
76   #
77   def test04_MultiAppli(self):
78     print "** Testing multi appli **"
79     jobs = []
80     for i in range(3):
81       p = multiprocessing.Process(target=self.test03_SingleAppli)
82       jobs.append(p)
83       p.start()
84
85     for j in jobs:
86       j.join()
87   #
88 #
89
90 if __name__ == "__main__":
91   path_to_launcher = os.getenv("SALOME_LAUNCHER")
92   if not path_to_launcher:
93     msg = "Error: please set SALOME_LAUNCHER variable to the salome command of your application folder."
94     raise Exception(msg)
95
96   if not os.path.isfile("hello.py"):
97     with open("hello.py", "w") as f:
98       f.write("print 'Hello!'")
99
100   unittest.main()
101 #