]> SALOME platform Git repositories - modules/yacs.git/blob - src/yacsloader_swig/Test/StdAloneYacsLoaderTest1.py
Salome HOME
[EDF30875] : mistype leading to a non activation of squeeze mode
[modules/yacs.git] / src / yacsloader_swig / Test / StdAloneYacsLoaderTest1.py
1 #!/usr/bin/env python3
2 # Copyright (C) 2006-2024  CEA, EDF
3 #
4 # This library is free software; you can redistribute it and/or
5 # modify it under the terms of the GNU Lesser General Public
6 # License as published by the Free Software Foundation; either
7 # version 2.1 of the License, or (at your option) any later version.
8 #
9 # This library is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12 # Lesser General Public License for more details.
13 #
14 # You should have received a copy of the GNU Lesser General Public
15 # License along with this library; if not, write to the Free Software
16 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
17 #
18 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
19 #
20
21 import pilot
22 import SALOMERuntime
23 import loader
24 import unittest
25 import tempfile
26 import os
27
28 class StdAloneYacsLoaderTest1(unittest.TestCase):
29
30   def setUp(self):
31     SALOMERuntime.RuntimeSALOME_setRuntime()
32     self.r = pilot.getRuntime()
33     self.l = loader.YACSLoader()# self.l.load("foreachImbr_tmp.xml")
34     self.workdir = tempfile.mkdtemp(suffix=".yacstest")
35     pass
36
37   def test1(self):
38     """tests imbrication of foreach loop."""
39     SALOMERuntime.RuntimeSALOME_setRuntime()
40     l=loader.YACSLoader()
41     ex=pilot.ExecutorSwig()
42     p=self.r.createProc("pr")
43     td=p.createType("double","double")
44     td2=p.createSequenceTc("seqdbl","seqdbl",td)
45     td3=p.createSequenceTc("seqdblvec","seqdblvec",td2)
46     td4=p.createSequenceTc("seqseqdblvec","seqseqdblvec",td3)
47     node1=self.r.createScriptNode("","node1")
48     node1.setScript("o1=[([1,1],[2,2,2]),([10],[11,11],[12,12,12]),([20],[21,21],[22,22,22],[23,23,23,23])]")
49     o1=node1.edAddOutputPort("o1",td4)
50     p.edAddChild(node1)
51     node2=self.r.createForEachLoop("node2",td3)
52     p.edAddChild(node2)
53     p.edAddCFLink(node1,node2)
54     p.edAddLink(o1,node2.edGetSeqOfSamplesPort())
55     node2.edGetNbOfBranchesPort().edInitInt(2)
56     #
57     node20=self.r.createBloc("node20")
58     node2.edAddChild(node20)
59     node200=self.r.createForEachLoop("node200",td2)
60     node20.edAddChild(node200)
61     node200.edGetNbOfBranchesPort().edInitInt(2)
62     p.edAddLink(node2.edGetSamplePort(),node200.edGetSeqOfSamplesPort())
63     node2000=self.r.createForEachLoop("node2000",td)
64     node2000.edGetNbOfBranchesPort().edInitInt(2)
65     node200.edAddChild(node2000)
66     p.edAddLink(node200.edGetSamplePort(),node2000.edGetSeqOfSamplesPort())
67     node20000=self.r.createScriptNode("","node20000")
68     node2000.edAddChild(node20000)
69     i1=node20000.edAddInputPort("i1",td)
70     o2=node20000.edAddOutputPort("o2",td)
71     node20000.setScript("o2=i1+2")
72     p.edAddLink(node2000.edGetSamplePort(),i1)
73     #
74     node3=self.r.createScriptNode("","node3")
75     node3.setScript("o3=i2")
76     p.edAddChild(node3)
77     i2=node3.edAddInputPort("i2",td4)
78     o3=node3.edAddOutputPort("o3",td4)
79     p.edAddCFLink(node2,node3)
80     p.edAddLink(o2,i2)
81     ex = pilot.ExecutorSwig()
82     self.assertEqual(p.getState(),pilot.READY)
83     ex.RunW(p,0)
84     self.assertEqual(p.getState(),pilot.DONE)
85     zeResu=node3.getOutputPort("o3").get()
86     self.assertEqual(zeResu,[[[3.,3.],[4.,4.,4.]],[[12.],[13.,13.],[14.,14.,14.]],[[22.],[23.,23.],[24.,24.,24.],[25.,25.,25.,25.]]])
87     fname = os.path.join(self.workdir, "foreachImbrBuildFS.xml")
88     p.saveSchema(fname)
89     pass
90
91   def test2(self):
92     """ Non regression test. When input/output declared as pyobj hiding a string type to go to or from a ForEachLoop it previous lead
93     to an error.
94     """
95     fname=os.path.join(self.workdir, "BugPyObjStrInYacs.xml")
96     p=self.r.createProc("pr")
97     tc0=p.createInterfaceTc("python:obj:1.0","pyobj",[])
98     tc1=p.createSequenceTc("list[pyobj]","list[pyobj]",tc0)
99     #
100     node0=self.r.createScriptNode("Salome","node0")
101     node0.setScript("o1=[\"a\",\"bc\"]")
102     o1=node0.edAddOutputPort("o1",tc1)
103     p.edAddChild(node0)
104     #
105     node1=self.r.createForEachLoop("node1",tc0)
106     p.edAddChild(node1)
107     p.edAddCFLink(node0,node1)
108     node1.edGetNbOfBranchesPort().edInitInt(1)
109     p.edAddLink(o1,node1.edGetSeqOfSamplesPort())
110     #
111     node10=self.r.createScriptNode("Salome","node10")
112     node10.setScript("o1=3*i1")
113     i10_1=node10.edAddInputPort("i1",tc0)
114     o10_1=node10.edAddOutputPort("o1",tc0)
115     node1.edAddChild(node10)
116     p.edAddLink(node1.edGetSamplePort(),i10_1)
117     #
118     node2=self.r.createScriptNode("Salome","node2")
119     node2.setScript("o1=i1")
120     i2_1=node2.edAddInputPort("i1",tc1)
121     o2_1=node2.edAddOutputPort("o1",tc1)
122     p.edAddChild(node2)
123     p.edAddCFLink(node1,node2)
124     p.edAddLink(o10_1,i2_1)
125     ##
126     p.saveSchema(fname)
127     p=self.l.load(fname)
128     ##
129     ex = pilot.ExecutorSwig()
130     self.assertEqual(p.getState(),pilot.READY)
131     ex.RunW(p,0)
132     self.assertEqual(p.getState(),pilot.DONE)
133     #
134     data = p.getChildByName("node2").getOutputPort("o1").get()
135     self.assertEqual(data,['aaa','bcbcbc'])
136     pass
137
138   def test3(self):
139     """ Non regression test Mantis 23234 CEA1726"""
140     fname=os.path.join(self.workdir, "test23234.xml")
141     p=self.r.createProc("Test23234")
142     ti=p.createType("int","int")
143     initNode=self.r.createScriptNode("","init")
144     initNode_n=initNode.edAddOutputPort("n",ti)
145     initNode.setScript("n=10")
146     p.edAddChild(initNode)
147     #
148     endNode=self.r.createScriptNode("","checkResu")
149     endNode_n=endNode.edAddInputPort("n",ti)
150     endNode_tot=endNode.edAddInputPort("tot",ti)
151     endNode_error=endNode.edAddOutputPort("error",ti)
152     endNode.setScript("error=tot-n*(n+1)/2")
153     p.edAddChild(endNode)
154     #
155     fl=self.r.createForLoop("ForLoop_sum_1_n")
156     p.edAddChild(fl)
157     #
158     p.edAddCFLink(initNode,fl)
159     p.edAddCFLink(fl,endNode)
160     #
161     summ=self.r.createFuncNode("","sum")
162     summ_i=summ.edAddInputPort("i",ti)
163     summ_total=summ.edAddOutputPort("total",ti)
164     summ.setScript("""n=0
165 def sum(i):
166    global n
167    n+=i+1
168    return n""")
169     summ.setFname("sum")
170     fl.edAddChild(summ)
171     #
172     p.edAddLink(fl.edGetIndexPort(),summ_i)
173     p.edAddLink(initNode_n,fl.edGetNbOfTimesInputPort())
174     p.edAddLink(initNode_n,endNode_n)
175     p.edAddLink(summ_total,endNode_tot)
176     #
177     p.saveSchema(fname)
178     ex=pilot.ExecutorSwig()
179     self.assertEqual(p.getState(),pilot.READY)
180     ex.RunW(p,0)
181     self.assertEqual(p.getState(),pilot.DONE)
182     self.assertEqual(endNode_error.getPyObj(),0)
183     pass
184
185   def test4(self):
186     """ test linked to TestSaveLoadRun.test20. This is a smaller test coming from EDF autotest"""
187     p=self.r.createProc("test26")
188     n=self.r.createScriptNode("","node1")
189     n.setScript("import os")
190     p.edAddChild(n)
191     n.setState(pilot.DISABLED)
192     #
193     ex=pilot.ExecutorSwig()
194     self.assertEqual(p.getState(),pilot.READY)
195     ex.RunW(p,0)
196     self.assertEqual(p.getState(),pilot.ACTIVATED)
197     self.assertEqual(n.getState(),pilot.DISABLED) # <- test is here.
198     pass
199
200   def test5(self):
201     """ Test focusing P13268. If I connect a list[pyobj] output inside a ForEach to a list[pyobj] outside a foreach it works now."""
202     #self.assertTrue(False)
203     fname=os.path.join(self.workdir, "testP1328.xml")
204     p=self.r.createProc("testP1328")
205     tc0=p.createInterfaceTc("python:obj:1.0","pyobj",[])
206     tc1=p.createSequenceTc("list[pyobj]","list[pyobj]",tc0)
207     n0=self.r.createScriptNode("","n0")
208     n1=self.r.createForEachLoop("n1",tc0)
209     n10=self.r.createScriptNode("","n10")
210     n2=self.r.createScriptNode("","n2")
211     p.edAddChild(n0) ; p.edAddChild(n1) ; p.edAddChild(n2) ; n1.edAddChild(n10)
212     n0.setScript("o2=[[elt] for elt in range(10)]")
213     n10.setScript("o6=2*i5")
214     n2.setScript("assert(i8==[[0,0],[1,1],[2,2],[3,3],[4,4],[5,5],[6,6],[7,7],[8,8],[9,9]])")
215     o2=n0.edAddOutputPort("o2",tc1)
216     i5=n10.edAddInputPort("i5",tc0)
217     o6=n10.edAddOutputPort("o6",tc1) # the goal of test is here ! tc1 NOT tc0 !
218     i8=n2.edAddInputPort("i8",tc1)
219     #
220     p.edAddCFLink(n0,n1)
221     p.edAddCFLink(n1,n2)
222     #
223     p.edAddLink(o2,n1.edGetSeqOfSamplesPort())
224     p.edAddLink(n1.edGetSamplePort(),i5)
225     p.edAddLink(o6,i8) # important link for the test !
226     #
227     n1.edGetNbOfBranchesPort().edInitInt(1)
228     #
229     p.saveSchema(fname)
230     #
231     ex=pilot.ExecutorSwig()
232     self.assertEqual(p.getState(),pilot.READY)
233     ex.RunW(p,0)
234     self.assertEqual(p.getState(),pilot.DONE)
235     self.assertEqual(p.getChildByName("n2").getInputPort("i8").getPyObj(),[[0,0],[1,1],[2,2],[3,3],[4,4],[5,5],[6,6],[7,7],[8,8],[9,9]])
236     pass
237
238   def test6(self):
239     """ Test focusing on P13766. Test of a connection of 2 foreach at same level where the output pyobj is connected to the list[pyobj] input samples of the 2nd foreach"""
240     fname=os.path.join(self.workdir, "testP13766.xml")
241     p=self.r.createProc("testP13766")
242     tc0=p.createInterfaceTc("python:obj:1.0","pyobj",[])
243     tc1=p.createSequenceTc("list[pyobj]","list[pyobj]",tc0)
244     n0=self.r.createScriptNode("","n0")
245     n1=self.r.createForEachLoop("n1",tc0)
246     n2=self.r.createForEachLoop("n2",tc0)
247     n10=self.r.createScriptNode("","n10")
248     n20=self.r.createScriptNode("","n20")
249     n3=self.r.createScriptNode("","n3")
250     p.edAddChild(n0) ; p.edAddChild(n1) ; p.edAddChild(n2) ; p.edAddChild(n3) ; n1.edAddChild(n10) ; n2.edAddChild(n20)
251     n0.setScript("o2=[[elt] for elt in range(10)]")
252     n10.setScript("o6=3*i5")
253     n3.setScript("assert(i8==[[0,0,0,0,0,0],[1,1,1,1,1,1],[2,2,2,2,2,2],[3,3,3,3,3,3],[4,4,4,4,4,4],[5,5,5,5,5,5],[6,6,6,6,6,6],[7,7,7,7,7,7],[8,8,8,8,8,8],[9,9,9,9,9,9]])")
254     n20.setScript("o10=2*i9")
255     o2=n0.edAddOutputPort("o2",tc1)
256     i5=n10.edAddInputPort("i5",tc0)
257     o6=n10.edAddOutputPort("o6",tc0)
258     i9=n20.edAddInputPort("i9",tc0)
259     o10=n20.edAddOutputPort("o10",tc0)
260     i8=n3.edAddInputPort("i8",tc1)
261     #
262     p.edAddCFLink(n0,n1)
263     p.edAddCFLink(n1,n2)
264     p.edAddCFLink(n2,n3)
265     #
266     p.edAddLink(o2,n1.edGetSeqOfSamplesPort())
267     p.edAddLink(o6,n2.edGetSeqOfSamplesPort())# test is here !
268     p.edAddLink(n1.edGetSamplePort(),i5)
269     p.edAddLink(n2.edGetSamplePort(),i9)
270     p.edAddLink(o10,i8) 
271     #
272     n1.edGetNbOfBranchesPort().edInitInt(1)
273     n2.edGetNbOfBranchesPort().edInitInt(1)
274     #
275     p.saveSchema(fname)
276     #
277     ex=pilot.ExecutorSwig()
278     self.assertEqual(p.getState(),pilot.READY)
279     ex.RunW(p,0)
280     self.assertEqual(p.getState(),pilot.DONE)
281     self.assertEqual(p.getChildByName("n3").getInputPort("i8").getPyObj(),[[0,0,0,0,0,0],[1,1,1,1,1,1],[2,2,2,2,2,2],[3,3,3,3,3,3],[4,4,4,4,4,4],[5,5,5,5,5,5],[6,6,6,6,6,6],[7,7,7,7,7,7],[8,8,8,8,8,8],[9,9,9,9,9,9]])
282     pass
283
284   def test7(self):
285     """EDF17963 : Python3 porting. Py3 Pickeling generates more often byte(0) into the bytes. This reveals an incorrect management of Python Bytes -> Any String that leads to truncated bytes."""
286
287     entree_script="""Study = "toto"
288 print("Entree", Study)"""
289
290     pyscript0_script="""entier = 42
291 print("PyScript0",entier)
292 """
293
294     sortie_script="""import numpy as np
295 assert(isinstance(resultats,np.ndarray))
296 assert(resultats==np.array(range(1),dtype=np.int32))
297 """
298
299     nbWorkers=1
300
301     SALOMERuntime.RuntimeSALOME.setRuntime()
302     r=SALOMERuntime.getSALOMERuntime()
303     #
304     p0=r.createProc("run")
305     #
306     td=p0.createType("double","double")
307     ti=p0.createType("int","int")
308     ts=p0.createType("string","string")
309     tp=p0.createInterfaceTc("python:obj:1.0","pyobj",[])
310     tdd=p0.createSequenceTc("seqdouble","seqdouble",td)
311     tds=p0.createSequenceTc("seqstr","seqstr",ts)
312     tdp=p0.createSequenceTc("list[pyobj]","list[pyobj]",tp)
313     #
314     n0 = r.createScriptNode("Salome","Entree")
315     n0.setExecutionMode("local")
316     n0.setScript(entree_script)
317     o0 = n0.edAddOutputPort("Study",tp)
318     p0.edAddChild(n0)
319     #
320     n1 = r.createOptimizerLoop("MainLoop","async_plugin.py","myalgosync",True)
321     n1.edGetNbOfBranchesPort().edInitInt(nbWorkers)
322     p0.edAddChild(n1)
323     #
324     n10=r.createScriptNode("Salome","PyScript0")
325     n10.setScript(pyscript0_script)
326     i1 = n10.edAddInputPort("double",td)
327     o1 = n10.edAddOutputPort("entier",ti)
328     n10.setExecutionMode("local")
329     n1.edAddChild(n10)
330     #
331     n2 = r.createScriptNode("Salome","Sortie")
332     n2.setExecutionMode("local")
333     n2.setScript(sortie_script)
334     i2 = n2.edAddInputPort("resultats",tp)
335     p0.edAddChild(n2)
336     #
337     p0.edAddCFLink(n0,n1)
338     p0.edAddCFLink(n1,n2)
339     #
340     p0.edAddLink(o0,n1.getInputPort("algoInit"))
341     p0.edAddLink(n1.getOutputPort("algoResults"),i2)
342     p0.edAddLink(n1.getOutputPort("evalSamples"),i1)
343     p0.edAddLink(o1,n1.getInputPort("evalResults"))
344     #
345     #p0.saveSchema(fname)
346     #
347     ex=pilot.ExecutorSwig()
348     ex.RunW(p0,0)
349     self.assertTrue(p0.getEffectiveState() == pilot.DONE)
350     pass
351   
352   def tearDown(self):
353     del self.r
354     del self.l
355     pass
356
357   pass
358
359 if __name__ == '__main__':
360     unittest.main()