import pickle
import gc
import time
+from datetime import datetime
import multiprocessing as mp
+nbOfSecWait=1.
+
def obj2Str(obj):
return pickle.dumps(obj,pickle.HIGHEST_PROTOCOL)
def str2Obj(strr):
import TestSalomeSDSHelper0
import os,subprocess
fname=os.path.splitext(TestSalomeSDSHelper0.__file__)[0]+".py"
- proc=subprocess.Popen(["python",fname],stdout=subprocess.PIPE,stderr=subprocess.PIPE)
+ proc = subprocess.Popen(["python3", fname], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out,err=proc.communicate()
if proc.returncode!=0:
+ print("-------------- work -----------")
print(out)
print(err)
+ print("~~~~~~~~~~~~~~ work ~~~~~~~~~~~")
return proc.returncode
-
+
+def func_test7(scopeName,cv,cv2,cv3,sharedNum):
+ salome.salome_init()
+ varName="a"
+ zeValue={"ab":[4,5,6]}
+ dsm=salome.naming_service.Resolve("/DataServerManager")
+ dss,isCreated=dsm.giveADataScopeTransactionCalled(scopeName) # should be suspended nbOfSecWait s by main process
+ assert(not isCreated)
+ ######### micro-test1 - check that all requests are suspended
+ ######## Barrier
+ with cv2:
+ cv2.notify_all()
+ sharedNum.value=True
+ with cv3:
+ cv3.wait()
+ ####### End Barrier
+ s=datetime.now()
+ t0=dss.createRdWrVarTransac(varName,obj2Str(zeValue))
+ s=(datetime.now()-s).total_seconds()
+ assert(s>=0.99*nbOfSecWait and s<nbOfSecWait*1.01) # expect to wait nearly nbOfSecWait seconds
+ dss.atomicApply([t0])
+ ######### end of micro-test1
+ ######### micro-test2 - after activeRequests everything work well
+ s=datetime.now()
+ st=dss.fetchSerializedContent(varName)
+ assert(str2Obj(st)==zeValue)
+ s=(datetime.now()-s).total_seconds()
+ assert(s>=0. and s<0.05) # expect to be not locked
+ ######### end of micro-test2
+ with cv:
+ cv.notify_all()
+ dss.takeANap(nbOfSecWait) # emulate a DataServer occupation
+ pass
+
class SalomeSDSTest(unittest.TestCase):
def testList1(self):
nbProc=8
pool=mp.Pool(processes=nbProc)
asyncResult=pool.map_async(work,[(i,varName,scopeName) for i in range(nbProc)])
+ print("asyncResult=", asyncResult)
self.assertEqual(asyncResult.get(),nbProc*[0]) # <- the big test is here !
dsm.removeDataScope(scopeName)
wk.waitFor()
self.assertEqual(str2Obj(dss.waitForMonoThrRev(wk)),[7,8,9,10])
keys=[str2Obj(elt) for elt in dss.getAllKeysOfVarWithTypeDict(varName)]
- self.assertEqual(keys,['ab','cd'])
+ self.assertEqual(set(keys),set(['ab','cd']))
def testTransaction6(self):
""" Test to test RdWr global vars with transaction"""
self.assertEqual(str2Obj(dss.fetchSerializedContent(varName)),{'ab':[4,5,6],'cd':[7,8,9,10]})
pass
+ def testLockToDump(self):
+ """ Test to check that holdRequests method. This method wait for clean server status and hold it until activeRequests is called.
+ Warning this method expects a not overloaded machine to be run because test is based on ellapse time.
+ """
+ scopeName="Scope1"
+ varName="ab"
+ zeObj={"ab":[5,6]}
+ dsm=salome.naming_service.Resolve("/DataServerManager")
+ dsm.cleanScopesInNS()
+ if scopeName in dsm.listScopes():
+ dsm.removeDataScope(scopeName)
+ dss,isCreated=dsm.giveADataScopeTransactionCalled(scopeName)
+ self.assertTrue(isCreated)
+ cv=mp.Condition(mp.Lock())
+ cv2=mp.Condition(mp.Lock()) # sharedNum & cv2 & cv3 for the barrier
+ cv3=mp.Condition(mp.Lock())
+ sharedNum=mp.Value('b',False)
+ p=mp.Process(target=func_test7,args=(scopeName,cv,cv2,cv3,sharedNum))
+ p.start()
+ #
+ dss,isCreated=dsm.giveADataScopeTransactionCalled(scopeName)
+ self.assertTrue(not isCreated)
+ t0=dss.createRdWrVarTransac(varName,obj2Str(zeObj))
+ dss.atomicApply([t0])
+ rs=dss.getRequestSwitcher()
+ self.assertTrue(not isCreated)
+ ######## Barrier
+ with cv2:
+ if not sharedNum.value:
+ cv2.wait()
+ sharedNum.value=False
+ pass
+ with cv3:
+ cv3.notify_all()
+ ####### End Barrier
+ rs.holdRequests() # The aim of the test
+ self.assertEqual(rs.listVars(),[varName]) # call whereas holdRequest is called
+ time.sleep(nbOfSecWait)
+ rs.activeRequests() # The aim of the test
+ ######### micro-test3 - check that holdRequests is able to wait for a non finished job
+ with cv:
+ cv.wait()
+ s=datetime.now()
+ time.sleep(0.01) # let main proc the priority
+ rs.holdRequests() # the aim of the test is here. main process is occupied 1s -> holdRequests is Expected to wait
+ s=(datetime.now()-s).total_seconds()
+ self.assertTrue(str2Obj(rs.fetchSerializedContent(varName))==zeObj) # call whereas holdRequest is called
+ rs.activeRequests()
+ self.assertTrue(s>=0.99*nbOfSecWait and s<nbOfSecWait*1.01) # expect to be not locked
+ # finishing
+ p.join()
+ pass
+
def setUp(self):
salome.salome_init()
pass
pass
-unittest.main()
+if __name__=="__main__":
+ unittest.main()