2 # Copyright (C) 2006-2023 CEA/DEN, EDF R&D
4 # This library is free software; you can redistribute it and/or
5 # modify it under the terms of the GNU Lesser General Public
6 # License as published by the Free Software Foundation; either
7 # version 2.1 of the License, or (at your option) any later version.
9 # This library is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 # Lesser General Public License for more details.
14 # You should have received a copy of the GNU Lesser General Public
15 # License along with this library; if not, write to the Free Software
16 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
32 class TestResume(unittest.TestCase):
35 SALOMERuntime.RuntimeSALOME_setRuntime(1)
36 self.l = loader.YACSLoader()
37 self.e = pilot.ExecutorSwig()
38 self.p = self.l.load("samples/bloc2.xml")
39 workdir = tempfile.mkdtemp(suffix=".yacstest")
40 self.statefile = os.path.join(workdir, 'dumpPartialBloc2.xml')
45 cm = salome.lcc.getContainerManager()
46 cm.ShutdownContainers()
49 def test1_PartialExec(self):
50 # --- stop execution after breakpoint
53 print("================= Start of PARTIALEXEC ===================")
55 self.e.setListOfBreakPoints(brp)
56 self.e.setExecMode(2) # YACS::STOPBEFORENODES
57 #self.e.displayDot(self.p)
58 run1 = threading.Thread(None, self.e.RunPy, "breakpoint", (self.p,0))
62 #self.e.displayDot(self.p)
63 self.e.saveState(self.statefile)
64 #self.e.displayDot(self.p)
65 self.e.stopExecution()
66 #self.e.displayDot(self.p)
67 self.assertEqual(101, self.p.getChildByName('b1.b2.node1').getEffectiveState())
68 self.assertEqual(106, self.p.getChildByName('b1.node1').getEffectiveState())
69 print("================= reach BREAKPOINT PARTIAL EXEC ==========")
71 # --- reload state from previous partial execution then exec
74 print("================= Start of EXECLOADEDSTATE ===============")
75 sp = loader.stateParser()
76 sl = loader.stateLoader(sp,self.p)
77 sl.parse(self.statefile)
78 #self.e.displayDot(self.p)
79 self.e.setExecMode(0) # YACS::CONTINUE
80 run2 = threading.Thread(None, self.e.RunPy, "loadState", (self.p,0,True,True))
84 #self.e.displayDot(self.p)
86 self.assertEqual(106, self.p.getChildByName('node1').getEffectiveState())
87 self.assertEqual(106, self.p.getChildByName('node2').getEffectiveState())
88 self.assertEqual(106, self.p.getChildByName('b1.node1').getEffectiveState())
89 self.assertEqual(106, self.p.getChildByName('b1.node2').getEffectiveState())
90 self.assertEqual(106, self.p.getChildByName('b1.b2.node1').getEffectiveState())
91 self.assertEqual(106, self.p.getChildByName('b1.b2.node2').getEffectiveState())
92 self.assertEqual(106, self.p.getChildByName('b1.b2.loop1.node1').getEffectiveState())
93 print("================= End of EXECLOADEDSTATE =================")
97 if __name__ == '__main__':
98 dir_test = tempfile.mkdtemp(suffix=".yacstest")
99 file_test = os.path.join(dir_test,"UnitTestsResult")
100 with open(file_test, 'a') as f:
101 f.write(" --- TEST src/yacsloader: testResume.py\n")
102 suite = unittest.makeSuite(TestResume)
103 result=unittest.TextTestRunner(f, descriptions=1, verbosity=1).run(suite)
104 sys.exit(not result.wasSuccessful())