2 # Copyright (C) 2023-2024 CEA, EDF
4 # This library is free software; you can redistribute it and/or
5 # modify it under the terms of the GNU Lesser General Public
6 # License as published by the Free Software Foundation; either
7 # version 2.1 of the License, or (at your option) any later version.
9 # This library is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 # Lesser General Public License for more details.
14 # You should have received a copy of the GNU Lesser General Public
15 # License along with this library; if not, write to the Free Software
16 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
29 import subprocess as sp
31 class TestYacsDriverOverrides(unittest.TestCase):
34 [EDF28531] : Perf to check when number of threads in charge scheduler side is lower than number of // tasks
36 NB_OF_PARALLEL_NODES = 100
37 NB_OF_PARALLEL_THREADS = 1000
39 with tempfile.TemporaryDirectory() as tmpdirname:
40 SALOMERuntime.RuntimeSALOME.setRuntime()
41 r=SALOMERuntime.getSALOMERuntime()
42 p=r.createProc("PerfTest0")
43 p.setProperty("executor","workloadmanager") # important line here to avoid that gg container treat several tasks in //.
44 ti=p.createType("int","int")
45 td=p.createType("double","double")
46 tdd=p.createSequenceTc("seqdouble","seqdouble",td)
47 tddd=p.createSequenceTc("seqseqdouble","seqseqdouble",tdd)
48 tdddd=p.createSequenceTc("seqseqseqdouble","seqseqseqdouble",tddd)
49 pyobj=p.createInterfaceTc("python:obj:1.0","pyobj",[])
50 seqpyobj=p.createSequenceTc("list[pyobj]","list[pyobj]",pyobj)
51 cont=p.createContainer("gg","Salome")
52 cont.setProperty("nb_parallel_procs","2")
53 cont.setAttachOnCloningStatus(True)
54 cont.setProperty("attached_on_cloning","1")
55 cont.setProperty("type","multi")
56 cont.setProperty("container_name","gg")
58 startNode = r.createScriptNode("Salome","start")
59 startNode.setExecutionMode("local")
60 startNode.setScript("""o2 = list(range({}))""".format(NB_OF_PARALLEL_NODES))
61 po2 = startNode.edAddOutputPort("o2",seqpyobj)
62 p.edAddChild(startNode)
64 fe = r.createForEachLoopDyn("fe",pyobj)
66 p.edAddCFLink(startNode,fe)
67 p.edAddLink(po2,fe.edGetSeqOfSamplesPort())
68 internalNode = r.createScriptNode("Salome","internalNode")
69 internalNode.setExecutionMode("remote")
70 internalNode.setContainer(cont)
71 internalNode.setScript("""import KernelBasis
72 ret = KernelBasis.tony*ppp
74 fe.edSetNode(internalNode)
75 ix = internalNode.edAddInputPort("ppp",pyobj)
76 oret = internalNode.edAddOutputPort("ret",pyobj)
77 p.edAddLink( fe.edGetSamplePort(), ix )
79 endNode = r.createScriptNode("Salome","end")
80 endNode.setExecutionMode("local")
81 endNode.setContainer(None)
82 ozeret = endNode.edAddOutputPort("ozeret",seqpyobj)
83 izeret = endNode.edAddInputPort("izeret",seqpyobj)
84 endNode.setScript("""ozeret = izeret
85 with open("res.txt","w") as f:
86 f.write( str( sum(izeret) ) )
89 p.edAddCFLink(fe,endNode)
90 p.edAddLink( oret, izeret )
91 fname = os.path.join( tmpdirname,"YacsOverrides.xml")
93 # important part of test is here
94 with open(os.path.join(tmpdirname,"yacs_driver_overrides.py"),"w") as fover:
95 fover.write("""import logging
96 import multiprocessing as mp
98 def customize(cm, allresources):
99 cm.SetCodeOnContainerStartUp("import KernelBasis\\nKernelBasis.tony = 5")
101 cpy = {elt:allresources[elt] for elt in allresources.GetListOfEntries()}
102 allresources.DeleteAllResourcesInCatalog()
104 cpy[it].nb_node = mp.cpu_count()
105 cpy[it].protocol = "ssh"
106 allresources.AddResourceInCatalogNoQuestion(cpy[it])
107 logging.debug( repr( allresources ) )""")
109 print("Start computation")
110 spp = sp.Popen(["driver","-v","-vl","DEBUG","--activate-custom-overrides",fname],cwd = tmpdirname)# --activate-custom-overrides is the key point of the test
112 self.assertEqual(spp.returncode,0,"Driver process has failed !")
113 with open(os.path.join(tmpdirname,"res.txt")) as f:
114 self.assertEqual( int( f.read() ), 24750 )
116 if __name__ == '__main__':
117 with tempfile.TemporaryDirectory() as dir_test:
118 file_test = os.path.join(dir_test,"UnitTestsResult")
119 with open(file_test, 'a') as f:
120 f.write(" --- TEST src/yacsloader_swig : TestYacsOverrides.py\n")
121 suite = unittest.makeSuite(TestYacsDriverOverrides)
122 result=unittest.TextTestRunner(f, descriptions=1, verbosity=1).run(suite)
123 if not result.wasSuccessful():
124 raise RuntimeError("Test failed !")