Salome HOME
Updated copyright comment
[modules/yacs.git] / src / yacsloader_swig / Test / testProgress.py
1 #!/usr/bin/env python3
2 # Copyright (C) 2006-2024  CEA, EDF
3 #
4 # This library is free software; you can redistribute it and/or
5 # modify it under the terms of the GNU Lesser General Public
6 # License as published by the Free Software Foundation; either
7 # version 2.1 of the License, or (at your option) any later version.
8 #
9 # This library is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12 # Lesser General Public License for more details.
13 #
14 # You should have received a copy of the GNU Lesser General Public
15 # License along with this library; if not, write to the Free Software
16 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
17 #
18 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
19 #
20
21 import sys
22 import pilot
23 import SALOMERuntime
24 import loader
25 import unittest
26
27 class TestEdit(unittest.TestCase):
28
29     def setUp(self):
30         SALOMERuntime.RuntimeSALOME_setRuntime()
31         self.r = pilot.getRuntime()
32         self.l = loader.YACSLoader()
33         self.e = pilot.ExecutorSwig()
34         pass
35
36     def test_progress(self):
37   
38         p=self.r.createProc("pr")
39         ti=p.getTypeCode("int")
40         td=p.getTypeCode("double")
41         ts=p.getTypeCode("string")
42
43         #BLOC
44         b=self.r.createBloc("b1")
45         p.edAddChild(b)
46         n1=self.r.createScriptNode("","node1")
47         b.edAddChild(n1)
48         n1.setScript("p1=p1+10")
49         n1.edAddInputPort("p1",ti)
50         n1.edAddOutputPort("p1",ti)
51         n2=self.r.createScriptNode("","node2")
52         b.edAddChild(n2)
53         n2.setScript("p1=2*p1")
54         n2.edAddInputPort("p1",ti)
55         n2.edAddOutputPort("p1",ti)
56         b.edAddDFLink(n1.getOutputPort("p1"),n2.getInputPort("p1"))
57         #initialisation ports
58         n1.getInputPort("p1").edInitPy(5)
59
60         #FOR LOOP
61         loop=self.r.createForLoop("l1")
62         p.edAddChild(loop)
63         ip=loop.getInputPort("nsteps")
64         ip.edInitPy(3)
65         n10=self.r.createScriptNode("","node10")
66         loop.edSetNode(n10)
67         n10.setScript("p1=p1+10")
68         n10.edAddInputPort("p1",ti)
69         n10.edAddOutputPort("p1",ti)
70         n10.getInputPort("p1").edInitPy(5)
71         
72
73         #WHILE LOOP
74         wh=self.r.createWhileLoop("w1")
75         p.edAddChild(wh)
76         n20=self.r.createScriptNode("","node3")
77         n20.setScript("p1=0")
78         n20.edAddOutputPort("p1",ti)
79         wh.edSetNode(n20)
80         cport=wh.getInputPort("condition")
81         cport.edInitBool(True)
82         p.edAddLink(n20.getOutputPort("p1"),cport)
83
84
85         #FOR EACH LOOP
86         fe=self.r.createForEachLoop("fe1",td)
87         p.edAddChild(fe)
88         n30=self.r.createScriptNode("","node3")
89         n30.setScript("import time \ntime.sleep(1) \np1=p1+3.\n")
90         n30.edAddInputPort("p1",td)
91         n30.edAddOutputPort("p1",td)
92         fe.edSetNode(n30)
93         p.edAddLink(fe.getOutputPort("evalSamples"),n30.getInputPort("p1"))
94         fe.getInputPort("nbBranches").edInitPy(2)
95         fe.getInputPort("SmplsCollection").edInitPy([1.,2.,3.,4.,5.,6.])
96
97         #SWITCH
98         n40=self.r.createScriptNode("","node3")
99         n40.setScript("p1=3.5")
100         n40.edAddOutputPort("p1",td)
101         p.edAddChild(n40)
102         #switch
103         sw=self.r.createSwitch("sw1")
104         p.edAddChild(sw)
105         nk1=self.r.createScriptNode("","ncas1")
106         nk1.setScript("p1=p1+3.")
107         nk1.edAddInputPort("p1",td)
108         nk1.edAddOutputPort("p1",td)
109         sw.edSetNode(1,nk1)
110         ndef=self.r.createScriptNode("","ndefault")
111         ndef.setScript("p1=p1+5.")
112         ndef.edAddInputPort("p1",td)
113         ndef.edAddOutputPort("p1",td)
114         sw.edSetDefaultNode(ndef)
115         #initialise the select port
116         sw.getInputPort("select").edInitPy(1)
117         #connection of internal nodes
118         p.edAddDFLink(n40.getOutputPort("p1"),nk1.getInputPort("p1"))
119         p.edAddDFLink(n40.getOutputPort("p1"),ndef.getInputPort("p1"))
120
121         import time
122         import threading
123         self.assertEqual(p.getGlobalProgressPercent(),0)
124         self.assertEqual(p.getState(),pilot.READY)
125         myRun = threading.Thread(None, self.e.RunW, None, (p,0))
126         myRun.start()
127         time.sleep(1.5)
128         self.assertGreater(p.getGlobalProgressPercent(),0)
129         self.assertLess(p.getGlobalProgressPercent(),100)
130         myRun.join()
131         self.assertEqual(p.getState(),pilot.DONE)
132         self.assertEqual(p.getGlobalProgressPercent(),100)
133
134 if __name__ == '__main__':
135   import tempfile
136   import os
137   dir_test = tempfile.mkdtemp(suffix=".yacstest")
138   file_test = os.path.join(dir_test,"UnitTestsResult")
139   with open(file_test, 'a') as f:
140       f.write("  --- TEST src/yacsloader: testProgress.py\n")
141       suite = unittest.makeSuite(TestEdit)
142       result=unittest.TextTestRunner(f, descriptions=1, verbosity=3).run(suite)
143   sys.exit(not result.wasSuccessful())