import SALOME
import salome
import unittest
-import cPickle
+import pickle
import gc
import time
from datetime import datetime
nbOfSecWait=1.
def obj2Str(obj):
- return cPickle.dumps(obj,cPickle.HIGHEST_PROTOCOL)
+ return pickle.dumps(obj,pickle.HIGHEST_PROTOCOL)
def str2Obj(strr):
- return cPickle.loads(strr)
+ return pickle.loads(strr)
def generateKey(varName,scopeName):
dsm=salome.naming_service.Resolve("/DataServerManager")
dss,isCreated=dsm.giveADataScopeTransactionCalled(scopeName)
import TestSalomeSDSHelper0
import os,subprocess
fname=os.path.splitext(TestSalomeSDSHelper0.__file__)[0]+".py"
- proc=subprocess.Popen(["python",fname],stdout=subprocess.PIPE,stderr=subprocess.PIPE)
+ proc = subprocess.Popen(["python3", fname], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out,err=proc.communicate()
if proc.returncode!=0:
- print out
- print err
+ print("-------------- work -----------")
+ print(out)
+ print(err)
+ print("~~~~~~~~~~~~~~ work ~~~~~~~~~~~")
return proc.returncode
-def func_test7(scopeName,l,l2,cv):
+def func_test7(scopeName,cv,cv2,cv3,sharedNum):
salome.salome_init()
varName="a"
zeValue={"ab":[4,5,6]}
dsm=salome.naming_service.Resolve("/DataServerManager")
dss,isCreated=dsm.giveADataScopeTransactionCalled(scopeName) # should be suspended nbOfSecWait s by main process
assert(not isCreated)
- l.release() # tell manager that I'm ready
- l2.acquire() # wait for manager to start micro-test1
######### micro-test1 - check that all requests are suspended
+ ######## Barrier
+ with cv2:
+ cv2.notify_all()
+ sharedNum.value=True
+ with cv3:
+ cv3.wait()
+ ####### End Barrier
s=datetime.now()
t0=dss.createRdWrVarTransac(varName,obj2Str(zeValue))
s=(datetime.now()-s).total_seconds()
#
nbProc=8
pool=mp.Pool(processes=nbProc)
- asyncResult=pool.map_async(work,[(i,varName,scopeName) for i in xrange(nbProc)])
+ asyncResult=pool.map_async(work,[(i,varName,scopeName) for i in range(nbProc)])
+ print("asyncResult=", asyncResult)
self.assertEqual(asyncResult.get(),nbProc*[0]) # <- the big test is here !
dsm.removeDataScope(scopeName)
wk.waitFor()
self.assertEqual(str2Obj(dss.waitForMonoThrRev(wk)),[7,8,9,10])
keys=[str2Obj(elt) for elt in dss.getAllKeysOfVarWithTypeDict(varName)]
- self.assertEqual(keys,['ab','cd'])
+ self.assertEqual(set(keys),set(['ab','cd']))
def testTransaction6(self):
""" Test to test RdWr global vars with transaction"""
Warning this method expects a not overloaded machine to be run because test is based on ellapse time.
"""
scopeName="Scope1"
+ varName="ab"
+ zeObj={"ab":[5,6]}
dsm=salome.naming_service.Resolve("/DataServerManager")
dsm.cleanScopesInNS()
if scopeName in dsm.listScopes():
dsm.removeDataScope(scopeName)
- # l is for main process sync. to be sure to launch test when sub process is ready
- # l2 lock is for sub process sync.
- l=mp.Lock(); l2=mp.Lock()
- l.acquire() ; l2.acquire()
- cv=mp.Condition(mp.Lock())
dss,isCreated=dsm.giveADataScopeTransactionCalled(scopeName)
self.assertTrue(isCreated)
- p=mp.Process(target=func_test7,args=(scopeName,l,l2,cv))
+ cv=mp.Condition(mp.Lock())
+ cv2=mp.Condition(mp.Lock()) # sharedNum & cv2 & cv3 for the barrier
+ cv3=mp.Condition(mp.Lock())
+ sharedNum=mp.Value('b',False)
+ p=mp.Process(target=func_test7,args=(scopeName,cv,cv2,cv3,sharedNum))
p.start()
- l.acquire()
- rs=dss.getRequestSwitcher() ; rs.holdRequests() # The aim of the test
- l2.release() # tell slave process that it's ready for micro-test1
+ #
+ dss,isCreated=dsm.giveADataScopeTransactionCalled(scopeName)
+ self.assertTrue(not isCreated)
+ t0=dss.createRdWrVarTransac(varName,obj2Str(zeObj))
+ dss.atomicApply([t0])
+ rs=dss.getRequestSwitcher()
+ self.assertTrue(not isCreated)
+ ######## Barrier
+ with cv2:
+ if not sharedNum.value:
+ cv2.wait()
+ sharedNum.value=False
+ pass
+ with cv3:
+ cv3.notify_all()
+ ####### End Barrier
+ rs.holdRequests() # The aim of the test
+ self.assertEqual(rs.listVars(),[varName]) # call whereas holdRequest is called
time.sleep(nbOfSecWait)
rs.activeRequests() # The aim of the test
######### micro-test3 - check that holdRequests is able to wait for a non finished job
time.sleep(0.01) # let main proc the priority
rs.holdRequests() # the aim of the test is here. main process is occupied 1s -> holdRequests is Expected to wait
s=(datetime.now()-s).total_seconds()
+ self.assertTrue(str2Obj(rs.fetchSerializedContent(varName))==zeObj) # call whereas holdRequest is called
rs.activeRequests()
self.assertTrue(s>=0.99*nbOfSecWait and s<nbOfSecWait*1.01) # expect to be not locked
# finishing