class SalomeSDSTest(unittest.TestCase):
def testList1(self):
- a=SalomeSDSClt.CreateRdExtGlobalVar([],"a","Scope0")
+ scopeName = "Scope0"
+ a=SalomeSDSClt.CreateRdExtGlobalVar([],"a",scopeName)
self.assertEqual(a.local_copy(),[])
a.append(5)
self.assertEqual(a.local_copy(),[5])
a[4].append(7)
self.assertEqual(a.local_copy(),[5,["rt",8],5,["rt",8],["rt",8,7]])
a.ptr().getMyDataScopeServer().deleteVar("a")
+ del a
+ import gc
+ gc.collect(0)
+ salome.dsm.removeDataScope(scopeName)
pass
def testDict1(self):
- a=SalomeSDSClt.CreateRdExtGlobalVar({},"a","Scope0")
+ scopeName = "Scope0"
+ a=SalomeSDSClt.CreateRdExtGlobalVar({},"a",scopeName)
a["ab"]=4
self.assertEqual(a.local_copy(),{"ab":4})
a["cd"]=[5]
a["gh"]["cd"].append(99) ; a["cd"].append(88)
self.assertEqual(a.local_copy(),{"ab":4,"cd":[5,77,88],"ef":["a","bb","ccc"],"gh":{"ab":4,"cd":[5,77,99],"ef":["a","bb","ccc"]}})
a.ptr().getMyDataScopeServer().deleteVar("a")
+ del a
+ import gc
+ gc.collect(0)
+ salome.dsm.removeDataScope(scopeName)
pass
def testReadOnly1(self):
- a=SalomeSDSClt.CreateRdOnlyGlobalVar({"ab":4,"cd":[5,77]},"a","Scope0")
+ scopeName = "Scope0"
+ #
+ a=SalomeSDSClt.CreateRdOnlyGlobalVar({"ab":4,"cd":[5,77]},"a",scopeName)
self.assertEqual(a.local_copy(),{"ab":4,"cd":[5,77]})
self.assertRaises(Exception,a.__getitem__,"ab")
a.ptr().getMyDataScopeServer().deleteVar("a")
+ del a
+ import gc
+ gc.collect(0)
+ #
+ salome.dsm.removeDataScope(scopeName)
def testTransaction1(self):
scopeName="Scope1"
wk=dss.waitForKeyInVar(varName,obj2Str("cd"))
wk.waitFor()
self.assertEqual(str2Obj(dss.waitForMonoThrRev(wk)),[7,8,9,10])
+ dsm.removeDataScope(scopeName)
def testTransaction3(self):
scopeName="Scope1"
t2=dss.removeKeyInVarErrorIfNotAlreadyExisting(varName,obj2Str("ab"))
dss.atomicApply([t2])
self.assertEqual(str2Obj(dss.fetchSerializedContent(varName)),{'cd':[7,8,9,10]})
+ dsm.removeDataScope(scopeName)
def testTransaction4(self):
scopeName="Scope1"
self.assertEqual(str2Obj(dss.waitForMonoThrRev(wk)),[7,8,9,10])
dss.atomicApply([t2])
self.assertEqual(str2Obj(dss.fetchSerializedContent(varName)),{'ab':[4,5,6]})
+ dsm.removeDataScope(scopeName)
def testTransaction5(self):
""" Like testTransaction2 but without transactions. """
self.assertEqual(str2Obj(dss.waitForMonoThrRev(wk)),[7,8,9,10])
keys=[str2Obj(elt) for elt in dss.getAllKeysOfVarWithTypeDict(varName)]
self.assertEqual(set(keys),set(['ab','cd']))
+ dsm.removeDataScope(scopeName)
def testTransaction6(self):
""" Test to test RdWr global vars with transaction"""
dss.atomicApply([t1])
self.assertEqual(dss.getAccessOfVar(varName),"RdExt")
self.assertEqual(str2Obj(dss.fetchSerializedContent(varName)),{'ab':[4,5,6],'cd':[7,8,9,10]})
+ dsm.removeDataScope(scopeName)
pass
def testTransaction8(self):
t4=dss.createRdExtVarFreeStyleTransac(varName,obj2Str(value),funcContent)
self.assertRaises(SALOME.SALOME_Exception,dss.atomicApply,[t4]) # d is in dict pointed by var. Func returns false -> rejected
self.assertEqual(str2Obj(dss.fetchSerializedContent(varName)),value3)
+ dsm.removeDataScope(scopeName)
pass
def testTransaction9(self):
t1.addKeyValueInVarErrorIfAlreadyExistingNow(obj2Str("c"),obj2Str(3))
dss.atomicApply([t1])
self.assertEqual(str2Obj(dss.fetchSerializedContent(varName)),value2)
+ dsm.removeDataScope(scopeName)
pass
self.assertTrue(s>=0.99*nbOfSecWait and s<nbOfSecWait*1.01) # expect to be not locked
# finishing
p.join()
+ dsm.removeDataScope(scopeName)
pass
def setUp(self):