Salome HOME
updated copyright message
[modules/yacs.git] / src / yacsloader_swig / Test / StdAloneYacsLoaderTest1.py
old mode 100755 (executable)
new mode 100644 (file)
index 1059186..776c1e5
@@ -1,4 +1,5 @@
-# Copyright (C) 2006-2014  CEA/DEN, EDF R&D
+#!/usr/bin/env python3
+# Copyright (C) 2006-2023  CEA, EDF
 #
 # This library is free software; you can redistribute it and/or
 # modify it under the terms of the GNU Lesser General Public
@@ -21,6 +22,8 @@ import pilot
 import SALOMERuntime
 import loader
 import unittest
+import tempfile
+import os
 
 class StdAloneYacsLoaderTest1(unittest.TestCase):
 
@@ -28,10 +31,11 @@ class StdAloneYacsLoaderTest1(unittest.TestCase):
     SALOMERuntime.RuntimeSALOME_setRuntime()
     self.r = pilot.getRuntime()
     self.l = loader.YACSLoader()# self.l.load("foreachImbr_tmp.xml")
+    self.workdir = tempfile.mkdtemp(suffix=".yacstest")
     pass
 
   def test1(self):
-    """tests imbrication od"""
+    """tests imbrication of foreach loop."""
     SALOMERuntime.RuntimeSALOME_setRuntime()
     l=loader.YACSLoader()
     ex=pilot.ExecutorSwig()
@@ -80,9 +84,270 @@ class StdAloneYacsLoaderTest1(unittest.TestCase):
     self.assertEqual(p.getState(),pilot.DONE)
     zeResu=node3.getOutputPort("o3").get()
     self.assertEqual(zeResu,[[[3.,3.],[4.,4.,4.]],[[12.],[13.,13.],[14.,14.,14.]],[[22.],[23.,23.],[24.,24.,24.],[25.,25.,25.,25.]]])
-    p.saveSchema("foreachImbrBuildFS.xml")
+    fname = os.path.join(self.workdir, "foreachImbrBuildFS.xml")
+    p.saveSchema(fname)
     pass
 
+  def test2(self):
+    """ Non regression test. When input/output declared as pyobj hiding a string type to go to or from a ForEachLoop it previous lead
+    to an error.
+    """
+    fname=os.path.join(self.workdir, "BugPyObjStrInYacs.xml")
+    p=self.r.createProc("pr")
+    tc0=p.createInterfaceTc("python:obj:1.0","pyobj",[])
+    tc1=p.createSequenceTc("list[pyobj]","list[pyobj]",tc0)
+    #
+    node0=self.r.createScriptNode("Salome","node0")
+    node0.setScript("o1=[\"a\",\"bc\"]")
+    o1=node0.edAddOutputPort("o1",tc1)
+    p.edAddChild(node0)
+    #
+    node1=self.r.createForEachLoop("node1",tc0)
+    p.edAddChild(node1)
+    p.edAddCFLink(node0,node1)
+    node1.edGetNbOfBranchesPort().edInitInt(1)
+    p.edAddLink(o1,node1.edGetSeqOfSamplesPort())
+    #
+    node10=self.r.createScriptNode("Salome","node10")
+    node10.setScript("o1=3*i1")
+    i10_1=node10.edAddInputPort("i1",tc0)
+    o10_1=node10.edAddOutputPort("o1",tc0)
+    node1.edAddChild(node10)
+    p.edAddLink(node1.edGetSamplePort(),i10_1)
+    #
+    node2=self.r.createScriptNode("Salome","node2")
+    node2.setScript("o1=i1")
+    i2_1=node2.edAddInputPort("i1",tc1)
+    o2_1=node2.edAddOutputPort("o1",tc1)
+    p.edAddChild(node2)
+    p.edAddCFLink(node1,node2)
+    p.edAddLink(o10_1,i2_1)
+    ##
+    p.saveSchema(fname)
+    p=self.l.load(fname)
+    ##
+    ex = pilot.ExecutorSwig()
+    self.assertEqual(p.getState(),pilot.READY)
+    ex.RunW(p,0)
+    self.assertEqual(p.getState(),pilot.DONE)
+    #
+    self.assertEqual(p.getChildByName("node2").getOutputPort("o1").get(),['aaa','bcbcbc'])
+    pass
+
+  def test3(self):
+    """ Non regression test Mantis 23234 CEA1726"""
+    fname=os.path.join(self.workdir, "test23234.xml")
+    p=self.r.createProc("Test23234")
+    ti=p.createType("int","int")
+    initNode=self.r.createScriptNode("","init")
+    initNode_n=initNode.edAddOutputPort("n",ti)
+    initNode.setScript("n=10")
+    p.edAddChild(initNode)
+    #
+    endNode=self.r.createScriptNode("","checkResu")
+    endNode_n=endNode.edAddInputPort("n",ti)
+    endNode_tot=endNode.edAddInputPort("tot",ti)
+    endNode_error=endNode.edAddOutputPort("error",ti)
+    endNode.setScript("error=tot-n*(n+1)/2")
+    p.edAddChild(endNode)
+    #
+    fl=self.r.createForLoop("ForLoop_sum_1_n")
+    p.edAddChild(fl)
+    #
+    p.edAddCFLink(initNode,fl)
+    p.edAddCFLink(fl,endNode)
+    #
+    summ=self.r.createFuncNode("","sum")
+    summ_i=summ.edAddInputPort("i",ti)
+    summ_total=summ.edAddOutputPort("total",ti)
+    summ.setScript("""n=0
+def sum(i):
+   global n
+   n+=i+1
+   return n""")
+    summ.setFname("sum")
+    fl.edAddChild(summ)
+    #
+    p.edAddLink(fl.edGetIndexPort(),summ_i)
+    p.edAddLink(initNode_n,fl.edGetNbOfTimesInputPort())
+    p.edAddLink(initNode_n,endNode_n)
+    p.edAddLink(summ_total,endNode_tot)
+    #
+    p.saveSchema(fname)
+    ex=pilot.ExecutorSwig()
+    self.assertEqual(p.getState(),pilot.READY)
+    ex.RunW(p,0)
+    self.assertEqual(p.getState(),pilot.DONE)
+    self.assertEqual(endNode_error.getPyObj(),0)
+    pass
+
+  def test4(self):
+    """ test linked to TestSaveLoadRun.test20. This is a smaller test coming from EDF autotest"""
+    p=self.r.createProc("test26")
+    n=self.r.createScriptNode("","node1")
+    n.setScript("import os")
+    p.edAddChild(n)
+    n.setState(pilot.DISABLED)
+    #
+    ex=pilot.ExecutorSwig()
+    self.assertEqual(p.getState(),pilot.READY)
+    ex.RunW(p,0)
+    self.assertEqual(p.getState(),pilot.ACTIVATED)
+    self.assertEqual(n.getState(),pilot.DISABLED) # <- test is here.
+    pass
+
+  def test5(self):
+    """ Test focusing P13268. If I connect a list[pyobj] output inside a ForEach to a list[pyobj] outside a foreach it works now."""
+    #self.assertTrue(False)
+    fname=os.path.join(self.workdir, "testP1328.xml")
+    p=self.r.createProc("testP1328")
+    tc0=p.createInterfaceTc("python:obj:1.0","pyobj",[])
+    tc1=p.createSequenceTc("list[pyobj]","list[pyobj]",tc0)
+    n0=self.r.createScriptNode("","n0")
+    n1=self.r.createForEachLoop("n1",tc0)
+    n10=self.r.createScriptNode("","n10")
+    n2=self.r.createScriptNode("","n2")
+    p.edAddChild(n0) ; p.edAddChild(n1) ; p.edAddChild(n2) ; n1.edAddChild(n10)
+    n0.setScript("o2=[[elt] for elt in range(10)]")
+    n10.setScript("o6=2*i5")
+    n2.setScript("assert(i8==[[0,0],[1,1],[2,2],[3,3],[4,4],[5,5],[6,6],[7,7],[8,8],[9,9]])")
+    o2=n0.edAddOutputPort("o2",tc1)
+    i5=n10.edAddInputPort("i5",tc0)
+    o6=n10.edAddOutputPort("o6",tc1) # the goal of test is here ! tc1 NOT tc0 !
+    i8=n2.edAddInputPort("i8",tc1)
+    #
+    p.edAddCFLink(n0,n1)
+    p.edAddCFLink(n1,n2)
+    #
+    p.edAddLink(o2,n1.edGetSeqOfSamplesPort())
+    p.edAddLink(n1.edGetSamplePort(),i5)
+    p.edAddLink(o6,i8) # important link for the test !
+    #
+    n1.edGetNbOfBranchesPort().edInitInt(1)
+    #
+    p.saveSchema(fname)
+    #
+    ex=pilot.ExecutorSwig()
+    self.assertEqual(p.getState(),pilot.READY)
+    ex.RunW(p,0)
+    self.assertEqual(p.getState(),pilot.DONE)
+    self.assertEqual(p.getChildByName("n2").getInputPort("i8").getPyObj(),[[0,0],[1,1],[2,2],[3,3],[4,4],[5,5],[6,6],[7,7],[8,8],[9,9]])
+    pass
+
+  def test6(self):
+    """ Test focusing on P13766. Test of a connection of 2 foreach at same level where the output pyobj is connected to the list[pyobj] input samples of the 2nd foreach"""
+    fname=os.path.join(self.workdir, "testP13766.xml")
+    p=self.r.createProc("testP13766")
+    tc0=p.createInterfaceTc("python:obj:1.0","pyobj",[])
+    tc1=p.createSequenceTc("list[pyobj]","list[pyobj]",tc0)
+    n0=self.r.createScriptNode("","n0")
+    n1=self.r.createForEachLoop("n1",tc0)
+    n2=self.r.createForEachLoop("n2",tc0)
+    n10=self.r.createScriptNode("","n10")
+    n20=self.r.createScriptNode("","n20")
+    n3=self.r.createScriptNode("","n3")
+    p.edAddChild(n0) ; p.edAddChild(n1) ; p.edAddChild(n2) ; p.edAddChild(n3) ; n1.edAddChild(n10) ; n2.edAddChild(n20)
+    n0.setScript("o2=[[elt] for elt in range(10)]")
+    n10.setScript("o6=3*i5")
+    n3.setScript("assert(i8==[[0,0,0,0,0,0],[1,1,1,1,1,1],[2,2,2,2,2,2],[3,3,3,3,3,3],[4,4,4,4,4,4],[5,5,5,5,5,5],[6,6,6,6,6,6],[7,7,7,7,7,7],[8,8,8,8,8,8],[9,9,9,9,9,9]])")
+    n20.setScript("o10=2*i9")
+    o2=n0.edAddOutputPort("o2",tc1)
+    i5=n10.edAddInputPort("i5",tc0)
+    o6=n10.edAddOutputPort("o6",tc0)
+    i9=n20.edAddInputPort("i9",tc0)
+    o10=n20.edAddOutputPort("o10",tc0)
+    i8=n3.edAddInputPort("i8",tc1)
+    #
+    p.edAddCFLink(n0,n1)
+    p.edAddCFLink(n1,n2)
+    p.edAddCFLink(n2,n3)
+    #
+    p.edAddLink(o2,n1.edGetSeqOfSamplesPort())
+    p.edAddLink(o6,n2.edGetSeqOfSamplesPort())# test is here !
+    p.edAddLink(n1.edGetSamplePort(),i5)
+    p.edAddLink(n2.edGetSamplePort(),i9)
+    p.edAddLink(o10,i8) 
+    #
+    n1.edGetNbOfBranchesPort().edInitInt(1)
+    n2.edGetNbOfBranchesPort().edInitInt(1)
+    #
+    p.saveSchema(fname)
+    #
+    ex=pilot.ExecutorSwig()
+    self.assertEqual(p.getState(),pilot.READY)
+    ex.RunW(p,0)
+    self.assertEqual(p.getState(),pilot.DONE)
+    self.assertEqual(p.getChildByName("n3").getInputPort("i8").getPyObj(),[[0,0,0,0,0,0],[1,1,1,1,1,1],[2,2,2,2,2,2],[3,3,3,3,3,3],[4,4,4,4,4,4],[5,5,5,5,5,5],[6,6,6,6,6,6],[7,7,7,7,7,7],[8,8,8,8,8,8],[9,9,9,9,9,9]])
+    pass
+
+  def test7(self):
+    """EDF17963 : Python3 porting. Py3 Pickeling generates more often byte(0) into the bytes. This reveals an incorrect management of Python Bytes -> Any String that leads to truncated bytes."""
+
+    entree_script="""Study = "toto"
+print("Entree", Study)"""
+
+    pyscript0_script="""entier = 42
+print("PyScript0",entier)
+"""
+
+    sortie_script="""import numpy as np
+assert(isinstance(resultats,np.ndarray))
+assert(resultats==np.array(range(1),dtype=np.int32))
+"""
+
+    nbWorkers=1
+
+    SALOMERuntime.RuntimeSALOME.setRuntime()
+    r=SALOMERuntime.getSALOMERuntime()
+    #
+    p0=r.createProc("run")
+    #
+    td=p0.createType("double","double")
+    ti=p0.createType("int","int")
+    ts=p0.createType("string","string")
+    tp=p0.createInterfaceTc("python:obj:1.0","pyobj",[])
+    tdd=p0.createSequenceTc("seqdouble","seqdouble",td)
+    tds=p0.createSequenceTc("seqstr","seqstr",ts)
+    tdp=p0.createSequenceTc("list[pyobj]","list[pyobj]",tp)
+    #
+    n0 = r.createScriptNode("Salome","Entree")
+    n0.setExecutionMode("local")
+    n0.setScript(entree_script)
+    o0 = n0.edAddOutputPort("Study",tp)
+    p0.edAddChild(n0)
+    #
+    n1 = r.createOptimizerLoop("MainLoop","async_plugin.py","myalgosync",True)
+    n1.edGetNbOfBranchesPort().edInitInt(nbWorkers)
+    p0.edAddChild(n1)
+    #
+    n10=r.createScriptNode("Salome","PyScript0")
+    n10.setScript(pyscript0_script)
+    i1 = n10.edAddInputPort("double",td)
+    o1 = n10.edAddOutputPort("entier",ti)
+    n10.setExecutionMode("local")
+    n1.edAddChild(n10)
+    #
+    n2 = r.createScriptNode("Salome","Sortie")
+    n2.setExecutionMode("local")
+    n2.setScript(sortie_script)
+    i2 = n2.edAddInputPort("resultats",tp)
+    p0.edAddChild(n2)
+    #
+    p0.edAddCFLink(n0,n1)
+    p0.edAddCFLink(n1,n2)
+    #
+    p0.edAddLink(o0,n1.getInputPort("algoInit"))
+    p0.edAddLink(n1.getOutputPort("algoResults"),i2)
+    p0.edAddLink(n1.getOutputPort("evalSamples"),i1)
+    p0.edAddLink(o1,n1.getInputPort("evalResults"))
+    #
+    #p0.saveSchema(fname)
+    #
+    ex=pilot.ExecutorSwig()
+    ex.RunW(p0,0)
+    self.assertTrue(p0.getEffectiveState() == pilot.DONE)
+    pass
+  
   def tearDown(self):
     del self.r
     del self.l