return SALOME::RequestSwitcher::_narrow(obj);
}
+void DataScopeServerBase::takeANap(CORBA::Double napDurationInSec)
+{
+ if(napDurationInSec<0.)
+ throw Exception("DataScopeServerBase::takeANap : negative value !");
+#ifndef WIN32
+ struct timespec req,rem;
+ long nbSec((long)napDurationInSec);
+ double remainTime(napDurationInSec-(double)nbSec);
+ req.tv_sec=nbSec;
+ req.tv_nsec=(long)(remainTime*1000000.);
+ int ret(nanosleep(&req,&rem));
+ if(ret!=0)
+ throw Exception("DataScopeServerBase::takeANap : nap not finished as expected !");
+#else
+ throw Exception("DataScopeServerBase::takeANap : not implemented for Windows !");
+#endif
+}
+
void DataScopeServerBase::initializePython(int argc, char *argv[])
{
Py_Initialize();
import cPickle
import gc
import time
+from datetime import datetime
import multiprocessing as mp
+nbOfSecWait=1.
+
def obj2Str(obj):
return cPickle.dumps(obj,cPickle.HIGHEST_PROTOCOL)
def str2Obj(strr):
print out
print err
return proc.returncode
-
+
+def func_test7(scopeName,l,l2,cv):
+ salome.salome_init()
+ varName="a"
+ zeValue={"ab":[4,5,6]}
+ dsm=salome.naming_service.Resolve("/DataServerManager")
+ dss,isCreated=dsm.giveADataScopeTransactionCalled(scopeName) # should be suspended nbOfSecWait s by main process
+ assert(not isCreated)
+ l.release() # tell manager that I'm ready
+ l2.acquire() # wait for manager to start micro-test1
+ ######### micro-test1 - check that all requests are suspended
+ s=datetime.now()
+ t0=dss.createRdWrVarTransac(varName,obj2Str(zeValue))
+ s=(datetime.now()-s).total_seconds()
+ assert(s>=0.99*nbOfSecWait and s<nbOfSecWait*1.01) # expect to wait nearly nbOfSecWait seconds
+ dss.atomicApply([t0])
+ ######### end of micro-test1
+ ######### micro-test2 - after activeRequests everything work well
+ s=datetime.now()
+ st=dss.fetchSerializedContent(varName)
+ assert(str2Obj(st)==zeValue)
+ s=(datetime.now()-s).total_seconds()
+ assert(s>=0. and s<0.05) # expect to be not locked
+ ######### end of micro-test2
+ with cv:
+ cv.notify_all()
+ dss.takeANap(nbOfSecWait) # emulate a DataServer occupation
+ pass
+
class SalomeSDSTest(unittest.TestCase):
def testList1(self):
self.assertEqual(str2Obj(dss.fetchSerializedContent(varName)),{'ab':[4,5,6],'cd':[7,8,9,10]})
pass
+ def testLockToDump(self):
+ """ Test to check that holdRequests method. This method wait for clean server status and hold it until activeRequests is called.
+ Warning this method expects a not overloaded machine to be run because test is based on ellapse time.
+ """
+ scopeName="Scope1"
+ dsm=salome.naming_service.Resolve("/DataServerManager")
+ dsm.cleanScopesInNS()
+ if scopeName in dsm.listScopes():
+ dsm.removeDataScope(scopeName)
+ # l is for main process sync. to be sure to launch test when sub process is ready
+ # l2 lock is for sub process sync.
+ l=mp.Lock(); l2=mp.Lock()
+ l.acquire() ; l2.acquire()
+ cv=mp.Condition(mp.Lock())
+ dss,isCreated=dsm.giveADataScopeTransactionCalled(scopeName)
+ #assert(isCreated)
+ p=mp.Process(target=func_test7,args=(scopeName,l,l2,cv))
+ p.start()
+ l.acquire()
+ rs=dss.getRequestSwitcher() ; rs.holdRequests() # The aim of the test
+ l2.release() # tell slave process that it's ready for micro-test1
+ time.sleep(nbOfSecWait)
+ rs.activeRequests() # The aim of the test
+ ######### micro-test3 - check that holdRequests is able to wait for a non finished job
+ with cv:
+ cv.wait()
+ s=datetime.now()
+ time.sleep(0.01) # let main proc the priority
+ rs.holdRequests() # the aim of the test is here. main process is occupied 1s -> holdRequests is Expected to wait
+ s=(datetime.now()-s).total_seconds()
+ rs.activeRequests()
+ assert(s>=0.99*nbOfSecWait and s<nbOfSecWait*1.01) # expect to be not locked
+ # finishing
+ p.join()
+ pass
+
def setUp(self):
salome.salome_init()
pass