print("~~~~~~~~~~~~~~ work ~~~~~~~~~~~")
return proc.returncode
-def func_test7(scopeName,l,l2,cv):
+def func_test7(scopeName,cv,cv2,cv3,sharedNum):
salome.salome_init()
varName="a"
zeValue={"ab":[4,5,6]}
dsm=salome.naming_service.Resolve("/DataServerManager")
dss,isCreated=dsm.giveADataScopeTransactionCalled(scopeName) # should be suspended nbOfSecWait s by main process
assert(not isCreated)
- l.release() # tell manager that I'm ready
- l2.acquire() # wait for manager to start micro-test1
######### micro-test1 - check that all requests are suspended
+ ######## Barrier
+ with cv2:
+ cv2.notify_all()
+ sharedNum.value=True
+ with cv3:
+ cv3.wait()
+ ####### End Barrier
s=datetime.now()
t0=dss.createRdWrVarTransac(varName,obj2Str(zeValue))
s=(datetime.now()-s).total_seconds()
Warning this method expects a not overloaded machine to be run because test is based on ellapse time.
"""
scopeName="Scope1"
+ varName="ab"
+ zeObj={"ab":[5,6]}
dsm=salome.naming_service.Resolve("/DataServerManager")
dsm.cleanScopesInNS()
if scopeName in dsm.listScopes():
dsm.removeDataScope(scopeName)
- # l is for main process sync. to be sure to launch test when sub process is ready
- # l2 lock is for sub process sync.
- l=mp.Lock(); l2=mp.Lock()
- l.acquire() ; l2.acquire()
- cv=mp.Condition(mp.Lock())
dss,isCreated=dsm.giveADataScopeTransactionCalled(scopeName)
- #assert(isCreated)
- p=mp.Process(target=func_test7,args=(scopeName,l,l2,cv))
+ self.assertTrue(isCreated)
+ cv=mp.Condition(mp.Lock())
+ cv2=mp.Condition(mp.Lock()) # sharedNum & cv2 & cv3 for the barrier
+ cv3=mp.Condition(mp.Lock())
+ sharedNum=mp.Value('b',False)
+ p=mp.Process(target=func_test7,args=(scopeName,cv,cv2,cv3,sharedNum))
p.start()
- l.acquire()
- rs=dss.getRequestSwitcher() ; rs.holdRequests() # The aim of the test
- l2.release() # tell slave process that it's ready for micro-test1
+ #
+ dss,isCreated=dsm.giveADataScopeTransactionCalled(scopeName)
+ self.assertTrue(not isCreated)
+ t0=dss.createRdWrVarTransac(varName,obj2Str(zeObj))
+ dss.atomicApply([t0])
+ rs=dss.getRequestSwitcher()
+ self.assertTrue(not isCreated)
+ ######## Barrier
+ with cv2:
+ if not sharedNum.value:
+ cv2.wait()
+ sharedNum.value=False
+ pass
+ with cv3:
+ cv3.notify_all()
+ ####### End Barrier
+ rs.holdRequests() # The aim of the test
+ self.assertEqual(rs.listVars(),[varName]) # call whereas holdRequest is called
time.sleep(nbOfSecWait)
rs.activeRequests() # The aim of the test
######### micro-test3 - check that holdRequests is able to wait for a non finished job
time.sleep(0.01) # let main proc the priority
rs.holdRequests() # the aim of the test is here. main process is occupied 1s -> holdRequests is Expected to wait
s=(datetime.now()-s).total_seconds()
+ self.assertTrue(str2Obj(rs.fetchSerializedContent(varName))==zeObj) # call whereas holdRequest is called
rs.activeRequests()
- assert(s>=0.99*nbOfSecWait and s<nbOfSecWait*1.01) # expect to be not locked
+ self.assertTrue(s>=0.99*nbOfSecWait and s<nbOfSecWait*1.01) # expect to be not locked
# finishing
p.join()
pass
pass
-unittest.main()
+if __name__=="__main__":
+ unittest.main()