Salome HOME
Use /tmp for temporary files needed by salome tests.
[modules/yacs.git] / src / yacsloader_swig / Test / StdAloneYacsLoaderTest1.py
1 # Copyright (C) 2006-2019  CEA/DEN, EDF R&D
2 #
3 # This library is free software; you can redistribute it and/or
4 # modify it under the terms of the GNU Lesser General Public
5 # License as published by the Free Software Foundation; either
6 # version 2.1 of the License, or (at your option) any later version.
7 #
8 # This library is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
11 # Lesser General Public License for more details.
12 #
13 # You should have received a copy of the GNU Lesser General Public
14 # License along with this library; if not, write to the Free Software
15 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
16 #
17 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
18 #
19
20 import pilot
21 import SALOMERuntime
22 import loader
23 import unittest
24 import tempfile
25 import os
26
27 class StdAloneYacsLoaderTest1(unittest.TestCase):
28
29   def setUp(self):
30     SALOMERuntime.RuntimeSALOME_setRuntime()
31     self.r = pilot.getRuntime()
32     self.l = loader.YACSLoader()# self.l.load("foreachImbr_tmp.xml")
33     self.workdir = tempfile.mkdtemp(suffix=".yacstest")
34     pass
35
36   def test1(self):
37     """tests imbrication of foreach loop."""
38     SALOMERuntime.RuntimeSALOME_setRuntime()
39     l=loader.YACSLoader()
40     ex=pilot.ExecutorSwig()
41     p=self.r.createProc("pr")
42     td=p.createType("double","double")
43     td2=p.createSequenceTc("seqdbl","seqdbl",td)
44     td3=p.createSequenceTc("seqdblvec","seqdblvec",td2)
45     td4=p.createSequenceTc("seqseqdblvec","seqseqdblvec",td3)
46     node1=self.r.createScriptNode("","node1")
47     node1.setScript("o1=[([1,1],[2,2,2]),([10],[11,11],[12,12,12]),([20],[21,21],[22,22,22],[23,23,23,23])]")
48     o1=node1.edAddOutputPort("o1",td4)
49     p.edAddChild(node1)
50     node2=self.r.createForEachLoop("node2",td3)
51     p.edAddChild(node2)
52     p.edAddCFLink(node1,node2)
53     p.edAddLink(o1,node2.edGetSeqOfSamplesPort())
54     node2.edGetNbOfBranchesPort().edInitInt(2)
55     #
56     node20=self.r.createBloc("node20")
57     node2.edAddChild(node20)
58     node200=self.r.createForEachLoop("node200",td2)
59     node20.edAddChild(node200)
60     node200.edGetNbOfBranchesPort().edInitInt(2)
61     p.edAddLink(node2.edGetSamplePort(),node200.edGetSeqOfSamplesPort())
62     node2000=self.r.createForEachLoop("node2000",td)
63     node2000.edGetNbOfBranchesPort().edInitInt(2)
64     node200.edAddChild(node2000)
65     p.edAddLink(node200.edGetSamplePort(),node2000.edGetSeqOfSamplesPort())
66     node20000=self.r.createScriptNode("","node20000")
67     node2000.edAddChild(node20000)
68     i1=node20000.edAddInputPort("i1",td)
69     o2=node20000.edAddOutputPort("o2",td)
70     node20000.setScript("o2=i1+2")
71     p.edAddLink(node2000.edGetSamplePort(),i1)
72     #
73     node3=self.r.createScriptNode("","node3")
74     node3.setScript("o3=i2")
75     p.edAddChild(node3)
76     i2=node3.edAddInputPort("i2",td4)
77     o3=node3.edAddOutputPort("o3",td4)
78     p.edAddCFLink(node2,node3)
79     p.edAddLink(o2,i2)
80     ex = pilot.ExecutorSwig()
81     self.assertEqual(p.getState(),pilot.READY)
82     ex.RunW(p,0)
83     self.assertEqual(p.getState(),pilot.DONE)
84     zeResu=node3.getOutputPort("o3").get()
85     self.assertEqual(zeResu,[[[3.,3.],[4.,4.,4.]],[[12.],[13.,13.],[14.,14.,14.]],[[22.],[23.,23.],[24.,24.,24.],[25.,25.,25.,25.]]])
86     fname = os.path.join(self.workdir, "foreachImbrBuildFS.xml")
87     p.saveSchema(fname)
88     pass
89
90   def test2(self):
91     """ Non regression test. When input/output declared as pyobj hiding a string type to go to or from a ForEachLoop it previous lead
92     to an error.
93     """
94     fname=os.path.join(self.workdir, "BugPyObjStrInYacs.xml")
95     p=self.r.createProc("pr")
96     tc0=p.createInterfaceTc("python:obj:1.0","pyobj",[])
97     tc1=p.createSequenceTc("list[pyobj]","list[pyobj]",tc0)
98     #
99     node0=self.r.createScriptNode("Salome","node0")
100     node0.setScript("o1=[\"a\",\"bc\"]")
101     o1=node0.edAddOutputPort("o1",tc1)
102     p.edAddChild(node0)
103     #
104     node1=self.r.createForEachLoop("node1",tc0)
105     p.edAddChild(node1)
106     p.edAddCFLink(node0,node1)
107     node1.edGetNbOfBranchesPort().edInitInt(1)
108     p.edAddLink(o1,node1.edGetSeqOfSamplesPort())
109     #
110     node10=self.r.createScriptNode("Salome","node10")
111     node10.setScript("o1=3*i1")
112     i10_1=node10.edAddInputPort("i1",tc0)
113     o10_1=node10.edAddOutputPort("o1",tc0)
114     node1.edAddChild(node10)
115     p.edAddLink(node1.edGetSamplePort(),i10_1)
116     #
117     node2=self.r.createScriptNode("Salome","node2")
118     node2.setScript("o1=i1")
119     i2_1=node2.edAddInputPort("i1",tc1)
120     o2_1=node2.edAddOutputPort("o1",tc1)
121     p.edAddChild(node2)
122     p.edAddCFLink(node1,node2)
123     p.edAddLink(o10_1,i2_1)
124     ##
125     p.saveSchema(fname)
126     p=self.l.load(fname)
127     ##
128     ex = pilot.ExecutorSwig()
129     self.assertEqual(p.getState(),pilot.READY)
130     ex.RunW(p,0)
131     self.assertEqual(p.getState(),pilot.DONE)
132     #
133     self.assertEqual(p.getChildByName("node2").getOutputPort("o1").get(),['aaa','bcbcbc'])
134     pass
135
136   def test3(self):
137     """ Non regression test Mantis 23234 CEA1726"""
138     fname=os.path.join(self.workdir, "test23234.xml")
139     p=self.r.createProc("Test23234")
140     ti=p.createType("int","int")
141     initNode=self.r.createScriptNode("","init")
142     initNode_n=initNode.edAddOutputPort("n",ti)
143     initNode.setScript("n=10")
144     p.edAddChild(initNode)
145     #
146     endNode=self.r.createScriptNode("","checkResu")
147     endNode_n=endNode.edAddInputPort("n",ti)
148     endNode_tot=endNode.edAddInputPort("tot",ti)
149     endNode_error=endNode.edAddOutputPort("error",ti)
150     endNode.setScript("error=tot-n*(n+1)/2")
151     p.edAddChild(endNode)
152     #
153     fl=self.r.createForLoop("ForLoop_sum_1_n")
154     p.edAddChild(fl)
155     #
156     p.edAddCFLink(initNode,fl)
157     p.edAddCFLink(fl,endNode)
158     #
159     summ=self.r.createFuncNode("","sum")
160     summ_i=summ.edAddInputPort("i",ti)
161     summ_total=summ.edAddOutputPort("total",ti)
162     summ.setScript("""n=0
163 def sum(i):
164    global n
165    n+=i+1
166    return n""")
167     summ.setFname("sum")
168     fl.edAddChild(summ)
169     #
170     p.edAddLink(fl.edGetIndexPort(),summ_i)
171     p.edAddLink(initNode_n,fl.edGetNbOfTimesInputPort())
172     p.edAddLink(initNode_n,endNode_n)
173     p.edAddLink(summ_total,endNode_tot)
174     #
175     p.saveSchema(fname)
176     ex=pilot.ExecutorSwig()
177     self.assertEqual(p.getState(),pilot.READY)
178     ex.RunW(p,0)
179     self.assertEqual(p.getState(),pilot.DONE)
180     self.assertEqual(endNode_error.getPyObj(),0)
181     pass
182
183   def test4(self):
184     """ test linked to TestSaveLoadRun.test20. This is a smaller test coming from EDF autotest"""
185     p=self.r.createProc("test26")
186     n=self.r.createScriptNode("","node1")
187     n.setScript("import os")
188     p.edAddChild(n)
189     n.setState(pilot.DISABLED)
190     #
191     ex=pilot.ExecutorSwig()
192     self.assertEqual(p.getState(),pilot.READY)
193     ex.RunW(p,0)
194     self.assertEqual(p.getState(),pilot.ACTIVATED)
195     self.assertEqual(n.getState(),pilot.DISABLED) # <- test is here.
196     pass
197
198   def test5(self):
199     """ Test focusing P13268. If I connect a list[pyobj] output inside a ForEach to a list[pyobj] outside a foreach it works now."""
200     #self.assertTrue(False)
201     fname=os.path.join(self.workdir, "testP1328.xml")
202     p=self.r.createProc("testP1328")
203     tc0=p.createInterfaceTc("python:obj:1.0","pyobj",[])
204     tc1=p.createSequenceTc("list[pyobj]","list[pyobj]",tc0)
205     n0=self.r.createScriptNode("","n0")
206     n1=self.r.createForEachLoop("n1",tc0)
207     n10=self.r.createScriptNode("","n10")
208     n2=self.r.createScriptNode("","n2")
209     p.edAddChild(n0) ; p.edAddChild(n1) ; p.edAddChild(n2) ; n1.edAddChild(n10)
210     n0.setScript("o2=[[elt] for elt in range(10)]")
211     n10.setScript("o6=2*i5")
212     n2.setScript("assert(i8==[[0,0],[1,1],[2,2],[3,3],[4,4],[5,5],[6,6],[7,7],[8,8],[9,9]])")
213     o2=n0.edAddOutputPort("o2",tc1)
214     i5=n10.edAddInputPort("i5",tc0)
215     o6=n10.edAddOutputPort("o6",tc1) # the goal of test is here ! tc1 NOT tc0 !
216     i8=n2.edAddInputPort("i8",tc1)
217     #
218     p.edAddCFLink(n0,n1)
219     p.edAddCFLink(n1,n2)
220     #
221     p.edAddLink(o2,n1.edGetSeqOfSamplesPort())
222     p.edAddLink(n1.edGetSamplePort(),i5)
223     p.edAddLink(o6,i8) # important link for the test !
224     #
225     n1.edGetNbOfBranchesPort().edInitInt(1)
226     #
227     p.saveSchema(fname)
228     #
229     ex=pilot.ExecutorSwig()
230     self.assertEqual(p.getState(),pilot.READY)
231     ex.RunW(p,0)
232     self.assertEqual(p.getState(),pilot.DONE)
233     self.assertEqual(p.getChildByName("n2").getInputPort("i8").getPyObj(),[[0,0],[1,1],[2,2],[3,3],[4,4],[5,5],[6,6],[7,7],[8,8],[9,9]])
234     pass
235
236   def test6(self):
237     """ Test focusing on P13766. Test of a connection of 2 foreach at same level where the output pyobj is connected to the list[pyobj] input samples of the 2nd foreach"""
238     fname=os.path.join(self.workdir, "testP13766.xml")
239     p=self.r.createProc("testP13766")
240     tc0=p.createInterfaceTc("python:obj:1.0","pyobj",[])
241     tc1=p.createSequenceTc("list[pyobj]","list[pyobj]",tc0)
242     n0=self.r.createScriptNode("","n0")
243     n1=self.r.createForEachLoop("n1",tc0)
244     n2=self.r.createForEachLoop("n2",tc0)
245     n10=self.r.createScriptNode("","n10")
246     n20=self.r.createScriptNode("","n20")
247     n3=self.r.createScriptNode("","n3")
248     p.edAddChild(n0) ; p.edAddChild(n1) ; p.edAddChild(n2) ; p.edAddChild(n3) ; n1.edAddChild(n10) ; n2.edAddChild(n20)
249     n0.setScript("o2=[[elt] for elt in range(10)]")
250     n10.setScript("o6=3*i5")
251     n3.setScript("assert(i8==[[0,0,0,0,0,0],[1,1,1,1,1,1],[2,2,2,2,2,2],[3,3,3,3,3,3],[4,4,4,4,4,4],[5,5,5,5,5,5],[6,6,6,6,6,6],[7,7,7,7,7,7],[8,8,8,8,8,8],[9,9,9,9,9,9]])")
252     n20.setScript("o10=2*i9")
253     o2=n0.edAddOutputPort("o2",tc1)
254     i5=n10.edAddInputPort("i5",tc0)
255     o6=n10.edAddOutputPort("o6",tc0)
256     i9=n20.edAddInputPort("i9",tc0)
257     o10=n20.edAddOutputPort("o10",tc0)
258     i8=n3.edAddInputPort("i8",tc1)
259     #
260     p.edAddCFLink(n0,n1)
261     p.edAddCFLink(n1,n2)
262     p.edAddCFLink(n2,n3)
263     #
264     p.edAddLink(o2,n1.edGetSeqOfSamplesPort())
265     p.edAddLink(o6,n2.edGetSeqOfSamplesPort())# test is here !
266     p.edAddLink(n1.edGetSamplePort(),i5)
267     p.edAddLink(n2.edGetSamplePort(),i9)
268     p.edAddLink(o10,i8) 
269     #
270     n1.edGetNbOfBranchesPort().edInitInt(1)
271     n2.edGetNbOfBranchesPort().edInitInt(1)
272     #
273     p.saveSchema(fname)
274     #
275     ex=pilot.ExecutorSwig()
276     self.assertEqual(p.getState(),pilot.READY)
277     ex.RunW(p,0)
278     self.assertEqual(p.getState(),pilot.DONE)
279     self.assertEqual(p.getChildByName("n3").getInputPort("i8").getPyObj(),[[0,0,0,0,0,0],[1,1,1,1,1,1],[2,2,2,2,2,2],[3,3,3,3,3,3],[4,4,4,4,4,4],[5,5,5,5,5,5],[6,6,6,6,6,6],[7,7,7,7,7,7],[8,8,8,8,8,8],[9,9,9,9,9,9]])
280     pass
281
282   def test7(self):
283     """EDF17963 : Python3 porting. Py3 Pickeling generates more often byte(0) into the bytes. This reveals an incorrect management of Python Bytes -> Any String that leads to truncated bytes."""
284
285     entree_script="""Study = "toto"
286 print("Entree", Study)"""
287
288     pyscript0_script="""entier = 42
289 print("PyScript0",entier)
290 """
291
292     sortie_script="""import numpy as np
293 assert(isinstance(resultats,np.ndarray))
294 """
295
296     nbWorkers=1
297
298     SALOMERuntime.RuntimeSALOME.setRuntime()
299     r=SALOMERuntime.getSALOMERuntime()
300     #
301     p0=r.createProc("run")
302     #
303     td=p0.createType("double","double")
304     ti=p0.createType("int","int")
305     ts=p0.createType("string","string")
306     tp=p0.createInterfaceTc("python:obj:1.0","pyobj",[])
307     tdd=p0.createSequenceTc("seqdouble","seqdouble",td)
308     tds=p0.createSequenceTc("seqstr","seqstr",ts)
309     tdp=p0.createSequenceTc("list[pyobj]","list[pyobj]",tp)
310     #
311     n0 = r.createScriptNode("Salome","Entree")
312     n0.setExecutionMode("local")
313     n0.setScript(entree_script)
314     o0 = n0.edAddOutputPort("Study",tp)
315     p0.edAddChild(n0)
316     #
317     n1 = r.createOptimizerLoop("MainLoop","async_plugin.py","myalgosync",True)
318     n1.edGetNbOfBranchesPort().edInitInt(nbWorkers)
319     p0.edAddChild(n1)
320     #
321     n10=r.createScriptNode("Salome","PyScript0")
322     n10.setScript(pyscript0_script)
323     i1 = n10.edAddInputPort("double",td)
324     o1 = n10.edAddOutputPort("entier",ti)
325     n10.setExecutionMode("local")
326     n1.edAddChild(n10)
327     #
328     n2 = r.createScriptNode("Salome","Sortie")
329     n2.setExecutionMode("local")
330     n2.setScript(sortie_script)
331     i2 = n2.edAddInputPort("resultats",tp)
332     p0.edAddChild(n2)
333     #
334     p0.edAddCFLink(n0,n1)
335     p0.edAddCFLink(n1,n2)
336     #
337     p0.edAddLink(o0,n1.getInputPort("algoInit"))
338     p0.edAddLink(n1.getOutputPort("algoResults"),i2)
339     p0.edAddLink(n1.getOutputPort("evalSamples"),i1)
340     p0.edAddLink(o1,n1.getInputPort("evalResults"))
341     #
342     #p0.saveSchema(fname)
343     #
344     ex=pilot.ExecutorSwig()
345     ex.RunW(p0,0)
346     self.assertTrue(p0.getEffectiveState() == pilot.DONE)
347     pass
348   
349   def tearDown(self):
350     del self.r
351     del self.l
352     pass
353
354   pass
355
356 if __name__ == '__main__':
357     unittest.main()