X-Git-Url: http://git.salome-platform.org/gitweb/?a=blobdiff_plain;f=src%2Fyacsloader_swig%2FTest%2FStdAloneYacsLoaderTest1.py;h=776c1e5b6fc84905728cb71df68b3c7de62c819a;hb=255150e4eec79294bbd06c7c464f309b4a264960;hp=776905907b0062f84865162a7244ba5786c76599;hpb=36bd81077bd426edebd0ba9bc5b2e6c70b9e5f4d;p=modules%2Fyacs.git diff --git a/src/yacsloader_swig/Test/StdAloneYacsLoaderTest1.py b/src/yacsloader_swig/Test/StdAloneYacsLoaderTest1.py index 776905907..776c1e5b6 100644 --- a/src/yacsloader_swig/Test/StdAloneYacsLoaderTest1.py +++ b/src/yacsloader_swig/Test/StdAloneYacsLoaderTest1.py @@ -1,4 +1,5 @@ -# Copyright (C) 2006-2015 CEA/DEN, EDF R&D +#!/usr/bin/env python3 +# Copyright (C) 2006-2023 CEA, EDF # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public @@ -21,6 +22,8 @@ import pilot import SALOMERuntime import loader import unittest +import tempfile +import os class StdAloneYacsLoaderTest1(unittest.TestCase): @@ -28,6 +31,7 @@ class StdAloneYacsLoaderTest1(unittest.TestCase): SALOMERuntime.RuntimeSALOME_setRuntime() self.r = pilot.getRuntime() self.l = loader.YACSLoader()# self.l.load("foreachImbr_tmp.xml") + self.workdir = tempfile.mkdtemp(suffix=".yacstest") pass def test1(self): @@ -80,14 +84,15 @@ class StdAloneYacsLoaderTest1(unittest.TestCase): self.assertEqual(p.getState(),pilot.DONE) zeResu=node3.getOutputPort("o3").get() self.assertEqual(zeResu,[[[3.,3.],[4.,4.,4.]],[[12.],[13.,13.],[14.,14.,14.]],[[22.],[23.,23.],[24.,24.,24.],[25.,25.,25.,25.]]]) - p.saveSchema("foreachImbrBuildFS.xml") + fname = os.path.join(self.workdir, "foreachImbrBuildFS.xml") + p.saveSchema(fname) pass def test2(self): """ Non regression test. When input/output declared as pyobj hiding a string type to go to or from a ForEachLoop it previous lead to an error. """ - fname="BugPyObjStrInYacs.xml" + fname=os.path.join(self.workdir, "BugPyObjStrInYacs.xml") p=self.r.createProc("pr") tc0=p.createInterfaceTc("python:obj:1.0","pyobj",[]) tc1=p.createSequenceTc("list[pyobj]","list[pyobj]",tc0) @@ -129,6 +134,220 @@ class StdAloneYacsLoaderTest1(unittest.TestCase): self.assertEqual(p.getChildByName("node2").getOutputPort("o1").get(),['aaa','bcbcbc']) pass + def test3(self): + """ Non regression test Mantis 23234 CEA1726""" + fname=os.path.join(self.workdir, "test23234.xml") + p=self.r.createProc("Test23234") + ti=p.createType("int","int") + initNode=self.r.createScriptNode("","init") + initNode_n=initNode.edAddOutputPort("n",ti) + initNode.setScript("n=10") + p.edAddChild(initNode) + # + endNode=self.r.createScriptNode("","checkResu") + endNode_n=endNode.edAddInputPort("n",ti) + endNode_tot=endNode.edAddInputPort("tot",ti) + endNode_error=endNode.edAddOutputPort("error",ti) + endNode.setScript("error=tot-n*(n+1)/2") + p.edAddChild(endNode) + # + fl=self.r.createForLoop("ForLoop_sum_1_n") + p.edAddChild(fl) + # + p.edAddCFLink(initNode,fl) + p.edAddCFLink(fl,endNode) + # + summ=self.r.createFuncNode("","sum") + summ_i=summ.edAddInputPort("i",ti) + summ_total=summ.edAddOutputPort("total",ti) + summ.setScript("""n=0 +def sum(i): + global n + n+=i+1 + return n""") + summ.setFname("sum") + fl.edAddChild(summ) + # + p.edAddLink(fl.edGetIndexPort(),summ_i) + p.edAddLink(initNode_n,fl.edGetNbOfTimesInputPort()) + p.edAddLink(initNode_n,endNode_n) + p.edAddLink(summ_total,endNode_tot) + # + p.saveSchema(fname) + ex=pilot.ExecutorSwig() + self.assertEqual(p.getState(),pilot.READY) + ex.RunW(p,0) + self.assertEqual(p.getState(),pilot.DONE) + self.assertEqual(endNode_error.getPyObj(),0) + pass + + def test4(self): + """ test linked to TestSaveLoadRun.test20. This is a smaller test coming from EDF autotest""" + p=self.r.createProc("test26") + n=self.r.createScriptNode("","node1") + n.setScript("import os") + p.edAddChild(n) + n.setState(pilot.DISABLED) + # + ex=pilot.ExecutorSwig() + self.assertEqual(p.getState(),pilot.READY) + ex.RunW(p,0) + self.assertEqual(p.getState(),pilot.ACTIVATED) + self.assertEqual(n.getState(),pilot.DISABLED) # <- test is here. + pass + + def test5(self): + """ Test focusing P13268. If I connect a list[pyobj] output inside a ForEach to a list[pyobj] outside a foreach it works now.""" + #self.assertTrue(False) + fname=os.path.join(self.workdir, "testP1328.xml") + p=self.r.createProc("testP1328") + tc0=p.createInterfaceTc("python:obj:1.0","pyobj",[]) + tc1=p.createSequenceTc("list[pyobj]","list[pyobj]",tc0) + n0=self.r.createScriptNode("","n0") + n1=self.r.createForEachLoop("n1",tc0) + n10=self.r.createScriptNode("","n10") + n2=self.r.createScriptNode("","n2") + p.edAddChild(n0) ; p.edAddChild(n1) ; p.edAddChild(n2) ; n1.edAddChild(n10) + n0.setScript("o2=[[elt] for elt in range(10)]") + n10.setScript("o6=2*i5") + n2.setScript("assert(i8==[[0,0],[1,1],[2,2],[3,3],[4,4],[5,5],[6,6],[7,7],[8,8],[9,9]])") + o2=n0.edAddOutputPort("o2",tc1) + i5=n10.edAddInputPort("i5",tc0) + o6=n10.edAddOutputPort("o6",tc1) # the goal of test is here ! tc1 NOT tc0 ! + i8=n2.edAddInputPort("i8",tc1) + # + p.edAddCFLink(n0,n1) + p.edAddCFLink(n1,n2) + # + p.edAddLink(o2,n1.edGetSeqOfSamplesPort()) + p.edAddLink(n1.edGetSamplePort(),i5) + p.edAddLink(o6,i8) # important link for the test ! + # + n1.edGetNbOfBranchesPort().edInitInt(1) + # + p.saveSchema(fname) + # + ex=pilot.ExecutorSwig() + self.assertEqual(p.getState(),pilot.READY) + ex.RunW(p,0) + self.assertEqual(p.getState(),pilot.DONE) + self.assertEqual(p.getChildByName("n2").getInputPort("i8").getPyObj(),[[0,0],[1,1],[2,2],[3,3],[4,4],[5,5],[6,6],[7,7],[8,8],[9,9]]) + pass + + def test6(self): + """ Test focusing on P13766. Test of a connection of 2 foreach at same level where the output pyobj is connected to the list[pyobj] input samples of the 2nd foreach""" + fname=os.path.join(self.workdir, "testP13766.xml") + p=self.r.createProc("testP13766") + tc0=p.createInterfaceTc("python:obj:1.0","pyobj",[]) + tc1=p.createSequenceTc("list[pyobj]","list[pyobj]",tc0) + n0=self.r.createScriptNode("","n0") + n1=self.r.createForEachLoop("n1",tc0) + n2=self.r.createForEachLoop("n2",tc0) + n10=self.r.createScriptNode("","n10") + n20=self.r.createScriptNode("","n20") + n3=self.r.createScriptNode("","n3") + p.edAddChild(n0) ; p.edAddChild(n1) ; p.edAddChild(n2) ; p.edAddChild(n3) ; n1.edAddChild(n10) ; n2.edAddChild(n20) + n0.setScript("o2=[[elt] for elt in range(10)]") + n10.setScript("o6=3*i5") + n3.setScript("assert(i8==[[0,0,0,0,0,0],[1,1,1,1,1,1],[2,2,2,2,2,2],[3,3,3,3,3,3],[4,4,4,4,4,4],[5,5,5,5,5,5],[6,6,6,6,6,6],[7,7,7,7,7,7],[8,8,8,8,8,8],[9,9,9,9,9,9]])") + n20.setScript("o10=2*i9") + o2=n0.edAddOutputPort("o2",tc1) + i5=n10.edAddInputPort("i5",tc0) + o6=n10.edAddOutputPort("o6",tc0) + i9=n20.edAddInputPort("i9",tc0) + o10=n20.edAddOutputPort("o10",tc0) + i8=n3.edAddInputPort("i8",tc1) + # + p.edAddCFLink(n0,n1) + p.edAddCFLink(n1,n2) + p.edAddCFLink(n2,n3) + # + p.edAddLink(o2,n1.edGetSeqOfSamplesPort()) + p.edAddLink(o6,n2.edGetSeqOfSamplesPort())# test is here ! + p.edAddLink(n1.edGetSamplePort(),i5) + p.edAddLink(n2.edGetSamplePort(),i9) + p.edAddLink(o10,i8) + # + n1.edGetNbOfBranchesPort().edInitInt(1) + n2.edGetNbOfBranchesPort().edInitInt(1) + # + p.saveSchema(fname) + # + ex=pilot.ExecutorSwig() + self.assertEqual(p.getState(),pilot.READY) + ex.RunW(p,0) + self.assertEqual(p.getState(),pilot.DONE) + self.assertEqual(p.getChildByName("n3").getInputPort("i8").getPyObj(),[[0,0,0,0,0,0],[1,1,1,1,1,1],[2,2,2,2,2,2],[3,3,3,3,3,3],[4,4,4,4,4,4],[5,5,5,5,5,5],[6,6,6,6,6,6],[7,7,7,7,7,7],[8,8,8,8,8,8],[9,9,9,9,9,9]]) + pass + + def test7(self): + """EDF17963 : Python3 porting. Py3 Pickeling generates more often byte(0) into the bytes. This reveals an incorrect management of Python Bytes -> Any String that leads to truncated bytes.""" + + entree_script="""Study = "toto" +print("Entree", Study)""" + + pyscript0_script="""entier = 42 +print("PyScript0",entier) +""" + + sortie_script="""import numpy as np +assert(isinstance(resultats,np.ndarray)) +assert(resultats==np.array(range(1),dtype=np.int32)) +""" + + nbWorkers=1 + + SALOMERuntime.RuntimeSALOME.setRuntime() + r=SALOMERuntime.getSALOMERuntime() + # + p0=r.createProc("run") + # + td=p0.createType("double","double") + ti=p0.createType("int","int") + ts=p0.createType("string","string") + tp=p0.createInterfaceTc("python:obj:1.0","pyobj",[]) + tdd=p0.createSequenceTc("seqdouble","seqdouble",td) + tds=p0.createSequenceTc("seqstr","seqstr",ts) + tdp=p0.createSequenceTc("list[pyobj]","list[pyobj]",tp) + # + n0 = r.createScriptNode("Salome","Entree") + n0.setExecutionMode("local") + n0.setScript(entree_script) + o0 = n0.edAddOutputPort("Study",tp) + p0.edAddChild(n0) + # + n1 = r.createOptimizerLoop("MainLoop","async_plugin.py","myalgosync",True) + n1.edGetNbOfBranchesPort().edInitInt(nbWorkers) + p0.edAddChild(n1) + # + n10=r.createScriptNode("Salome","PyScript0") + n10.setScript(pyscript0_script) + i1 = n10.edAddInputPort("double",td) + o1 = n10.edAddOutputPort("entier",ti) + n10.setExecutionMode("local") + n1.edAddChild(n10) + # + n2 = r.createScriptNode("Salome","Sortie") + n2.setExecutionMode("local") + n2.setScript(sortie_script) + i2 = n2.edAddInputPort("resultats",tp) + p0.edAddChild(n2) + # + p0.edAddCFLink(n0,n1) + p0.edAddCFLink(n1,n2) + # + p0.edAddLink(o0,n1.getInputPort("algoInit")) + p0.edAddLink(n1.getOutputPort("algoResults"),i2) + p0.edAddLink(n1.getOutputPort("evalSamples"),i1) + p0.edAddLink(o1,n1.getInputPort("evalResults")) + # + #p0.saveSchema(fname) + # + ex=pilot.ExecutorSwig() + ex.RunW(p0,0) + self.assertTrue(p0.getEffectiveState() == pilot.DONE) + pass + def tearDown(self): del self.r del self.l