Salome HOME
Copyright update 2021
[modules/yacs.git] / src / yacsloader_swig / Test / testProgress.py
1 # Copyright (C) 2006-2021  CEA/DEN, EDF R&D
2 #
3 # This library is free software; you can redistribute it and/or
4 # modify it under the terms of the GNU Lesser General Public
5 # License as published by the Free Software Foundation; either
6 # version 2.1 of the License, or (at your option) any later version.
7 #
8 # This library is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
11 # Lesser General Public License for more details.
12 #
13 # You should have received a copy of the GNU Lesser General Public
14 # License along with this library; if not, write to the Free Software
15 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
16 #
17 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
18 #
19
20 import sys
21 import pilot
22 import SALOMERuntime
23 import loader
24 import unittest
25
26 class TestEdit(unittest.TestCase):
27
28     def setUp(self):
29         SALOMERuntime.RuntimeSALOME_setRuntime()
30         self.r = pilot.getRuntime()
31         self.l = loader.YACSLoader()
32         self.e = pilot.ExecutorSwig()
33         pass
34
35     def test_progress(self):
36   
37         p=self.r.createProc("pr")
38         ti=p.getTypeCode("int")
39         td=p.getTypeCode("double")
40         ts=p.getTypeCode("string")
41
42         #BLOC
43         b=self.r.createBloc("b1")
44         p.edAddChild(b)
45         n1=self.r.createScriptNode("","node1")
46         b.edAddChild(n1)
47         n1.setScript("p1=p1+10")
48         n1.edAddInputPort("p1",ti)
49         n1.edAddOutputPort("p1",ti)
50         n2=self.r.createScriptNode("","node2")
51         b.edAddChild(n2)
52         n2.setScript("p1=2*p1")
53         n2.edAddInputPort("p1",ti)
54         n2.edAddOutputPort("p1",ti)
55         b.edAddDFLink(n1.getOutputPort("p1"),n2.getInputPort("p1"))
56         #initialisation ports
57         n1.getInputPort("p1").edInitPy(5)
58
59         #FOR LOOP
60         loop=self.r.createForLoop("l1")
61         p.edAddChild(loop)
62         ip=loop.getInputPort("nsteps")
63         ip.edInitPy(3)
64         n10=self.r.createScriptNode("","node10")
65         loop.edSetNode(n10)
66         n10.setScript("p1=p1+10")
67         n10.edAddInputPort("p1",ti)
68         n10.edAddOutputPort("p1",ti)
69         n10.getInputPort("p1").edInitPy(5)
70         
71
72         #WHILE LOOP
73         wh=self.r.createWhileLoop("w1")
74         p.edAddChild(wh)
75         n20=self.r.createScriptNode("","node3")
76         n20.setScript("p1=0")
77         n20.edAddOutputPort("p1",ti)
78         wh.edSetNode(n20)
79         cport=wh.getInputPort("condition")
80         cport.edInitBool(True)
81         p.edAddLink(n20.getOutputPort("p1"),cport)
82
83
84         #FOR EACH LOOP
85         fe=self.r.createForEachLoop("fe1",td)
86         p.edAddChild(fe)
87         n30=self.r.createScriptNode("","node3")
88         n30.setScript("import time \ntime.sleep(1) \np1=p1+3.\n")
89         n30.edAddInputPort("p1",td)
90         n30.edAddOutputPort("p1",td)
91         fe.edSetNode(n30)
92         p.edAddLink(fe.getOutputPort("evalSamples"),n30.getInputPort("p1"))
93         fe.getInputPort("nbBranches").edInitPy(2)
94         fe.getInputPort("SmplsCollection").edInitPy([1.,2.,3.,4.,5.,6.])
95
96         #SWITCH
97         n40=self.r.createScriptNode("","node3")
98         n40.setScript("p1=3.5")
99         n40.edAddOutputPort("p1",td)
100         p.edAddChild(n40)
101         #switch
102         sw=self.r.createSwitch("sw1")
103         p.edAddChild(sw)
104         nk1=self.r.createScriptNode("","ncas1")
105         nk1.setScript("p1=p1+3.")
106         nk1.edAddInputPort("p1",td)
107         nk1.edAddOutputPort("p1",td)
108         sw.edSetNode(1,nk1)
109         ndef=self.r.createScriptNode("","ndefault")
110         ndef.setScript("p1=p1+5.")
111         ndef.edAddInputPort("p1",td)
112         ndef.edAddOutputPort("p1",td)
113         sw.edSetDefaultNode(ndef)
114         #initialise the select port
115         sw.getInputPort("select").edInitPy(1)
116         #connection of internal nodes
117         p.edAddDFLink(n40.getOutputPort("p1"),nk1.getInputPort("p1"))
118         p.edAddDFLink(n40.getOutputPort("p1"),ndef.getInputPort("p1"))
119
120         import time
121         import threading
122         self.assertEqual(p.getGlobalProgressPercent(),0)
123         self.assertEqual(p.getState(),pilot.READY)
124         myRun = threading.Thread(None, self.e.RunW, None, (p,0))
125         myRun.start()
126         time.sleep(1.5)
127         self.assertGreater(p.getGlobalProgressPercent(),0)
128         self.assertLess(p.getGlobalProgressPercent(),100)
129         myRun.join()
130         self.assertEqual(p.getState(),pilot.DONE)
131         self.assertEqual(p.getGlobalProgressPercent(),100)
132          
133 if __name__ == '__main__':
134   import os
135   U = os.getenv('USER')
136   with open("/tmp/" + U + "/UnitTestsResult", 'a') as f:
137       f.write("  --- TEST src/yacsloader: testProgress.py\n")
138       suite = unittest.makeSuite(TestEdit)
139       result=unittest.TextTestRunner(f, descriptions=1, verbosity=3).run(suite)
140   sys.exit(not result.wasSuccessful())