Salome HOME
Merge 'master' branch into 'V9_dev' branch
[modules/kernel.git] / src / SALOMESDS / TestSalomeSDS.py
index 76ef0dd74182786ecdf1eb61b6f51408cb1c584b..155085b4772fc565d220972f6ae63512ced3f3e5 100644 (file)
@@ -1,5 +1,5 @@
 # -*- coding: utf-8 -*-
-# Copyright (C) 2007-2015  CEA/DEN, EDF R&D, OPEN CASCADE
+# Copyright (C) 2007-2016  CEA/DEN, EDF R&D, OPEN CASCADE
 #
 # This library is free software; you can redistribute it and/or
 # modify it under the terms of the GNU Lesser General Public
@@ -23,15 +23,18 @@ import SalomeSDSClt
 import SALOME
 import salome
 import unittest
-import cPickle
+import pickle
 import gc
 import time
+from datetime import datetime
 import multiprocessing as mp
 
+nbOfSecWait=1.
+
 def obj2Str(obj):
-  return cPickle.dumps(obj,cPickle.HIGHEST_PROTOCOL)
+  return pickle.dumps(obj,pickle.HIGHEST_PROTOCOL)
 def str2Obj(strr):
-  return cPickle.loads(strr)
+  return pickle.loads(strr)
 def generateKey(varName,scopeName):
   dsm=salome.naming_service.Resolve("/DataServerManager")
   dss,isCreated=dsm.giveADataScopeTransactionCalled(scopeName)
@@ -48,13 +51,48 @@ def work(t):
     import TestSalomeSDSHelper0
     import os,subprocess
     fname=os.path.splitext(TestSalomeSDSHelper0.__file__)[0]+".py"
-    proc=subprocess.Popen(["python",fname],stdout=subprocess.PIPE,stderr=subprocess.PIPE)
+    proc = subprocess.Popen(["python3", fname], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
     out,err=proc.communicate()
     if proc.returncode!=0:
-      print out
-      print err
+      print("-------------- work -----------")
+      print(out)
+      print(err)
+      print("~~~~~~~~~~~~~~ work ~~~~~~~~~~~")
     return proc.returncode
-
+  
+def func_test7(scopeName,cv,cv2,cv3,sharedNum):
+    salome.salome_init()
+    varName="a"
+    zeValue={"ab":[4,5,6]}
+    dsm=salome.naming_service.Resolve("/DataServerManager")
+    dss,isCreated=dsm.giveADataScopeTransactionCalled(scopeName) # should be suspended nbOfSecWait s by main process
+    assert(not isCreated)
+    ######### micro-test1 - check that all requests are suspended
+    ######## Barrier
+    with cv2:
+      cv2.notify_all()
+      sharedNum.value=True
+    with cv3:
+      cv3.wait()
+    ####### End Barrier
+    s=datetime.now()
+    t0=dss.createRdWrVarTransac(varName,obj2Str(zeValue))
+    s=(datetime.now()-s).total_seconds()
+    assert(s>=0.99*nbOfSecWait and s<nbOfSecWait*1.01) # expect to wait nearly nbOfSecWait seconds
+    dss.atomicApply([t0])
+    ######### end of micro-test1
+    ######### micro-test2 - after activeRequests everything work well
+    s=datetime.now()
+    st=dss.fetchSerializedContent(varName)
+    assert(str2Obj(st)==zeValue)
+    s=(datetime.now()-s).total_seconds()
+    assert(s>=0. and s<0.05) # expect to be not locked
+    ######### end of micro-test2
+    with cv:
+      cv.notify_all()
+    dss.takeANap(nbOfSecWait) # emulate a DataServer occupation
+    pass
+  
 class SalomeSDSTest(unittest.TestCase):
   
   def testList1(self):
@@ -123,11 +161,13 @@ class SalomeSDSTest(unittest.TestCase):
     #
     self.assertEqual(str2Obj(dss.fetchSerializedContent(varName)),{'ab':[4,5,6],'cd':[7,8,9,10]})
     wk=dss.waitForKeyInVar(varName,obj2Str("cd"))
-    self.assertEqual(str2Obj(wk.waitFor()),[7,8,9,10])
+    wk.waitFor()
+    self.assertEqual(str2Obj(dss.waitForMonoThrRev(wk)),[7,8,9,10])
     #
     nbProc=8
     pool=mp.Pool(processes=nbProc)
-    asyncResult=pool.map_async(work,[(i,varName,scopeName) for i in xrange(nbProc)])
+    asyncResult=pool.map_async(work,[(i,varName,scopeName) for i in range(nbProc)])
+    print("asyncResult=", asyncResult)
     self.assertEqual(asyncResult.get(),nbProc*[0]) # <- the big test is here !
     dsm.removeDataScope(scopeName)
 
@@ -152,7 +192,8 @@ class SalomeSDSTest(unittest.TestCase):
     #
     self.assertEqual(str2Obj(dss.fetchSerializedContent(varName)),{'ab':[4,5,6],'cd':[7,8,9,10]})
     wk=dss.waitForKeyInVar(varName,obj2Str("cd"))
-    self.assertEqual(str2Obj(wk.waitFor()),[7,8,9,10])
+    wk.waitFor()
+    self.assertEqual(str2Obj(dss.waitForMonoThrRev(wk)),[7,8,9,10])
 
   def testTransaction3(self):
     scopeName="Scope1"
@@ -195,7 +236,8 @@ class SalomeSDSTest(unittest.TestCase):
     self.assertEqual(str2Obj(dss.fetchSerializedContent(varName)),{'ab':[4,5,6],'cd':[7,8,9,10]})
     wk,t2=dss.waitForKeyInVarAndKillIt(varName,obj2Str("cd"))
     self.assertEqual(str2Obj(dss.fetchSerializedContent(varName)),{'ab':[4,5,6],'cd':[7,8,9,10]})
-    self.assertEqual(str2Obj(wk.waitFor()),[7,8,9,10])
+    wk.waitFor()
+    self.assertEqual(str2Obj(dss.waitForMonoThrRev(wk)),[7,8,9,10])
     dss.atomicApply([t2])
     self.assertEqual(str2Obj(dss.fetchSerializedContent(varName)),{'ab':[4,5,6]})
 
@@ -220,16 +262,18 @@ class SalomeSDSTest(unittest.TestCase):
     self.assertEqual(str2Obj(dss.fetchSerializedContent(varName)),{'ab':[4,5,6]})
     wk=dss.waitForKeyInVar(varName,obj2Str("cd"))
     t1.addKeyValueInVarErrorIfAlreadyExistingNow(obj2Str("cd"),obj2Str([7,8,9,10]))
-    self.assertEqual(str2Obj(wk.waitFor()),[7,8,9,10])
+    wk.waitFor()
+    self.assertEqual(str2Obj(dss.waitForMonoThrRev(wk)),[7,8,9,10])
     self.assertEqual(str2Obj(dss.fetchSerializedContent(varName)),{'ab':[4,5,6]})# it is not a bug ! commit of t1 not done !
     dss.atomicApply([t1])
     self.assertEqual(dss.getAccessOfVar(varName),"RdExt")
     #
     self.assertEqual(str2Obj(dss.fetchSerializedContent(varName)),{'ab':[4,5,6],'cd':[7,8,9,10]})
     wk=dss.waitForKeyInVar(varName,obj2Str("cd"))
-    self.assertEqual(str2Obj(wk.waitFor()),[7,8,9,10])
+    wk.waitFor()
+    self.assertEqual(str2Obj(dss.waitForMonoThrRev(wk)),[7,8,9,10])
     keys=[str2Obj(elt) for elt in dss.getAllKeysOfVarWithTypeDict(varName)]
-    self.assertEqual(keys,['ab','cd'])
+    self.assertEqual(set(keys),set(['ab','cd']))
 
   def testTransaction6(self):
     """ Test to test RdWr global vars with transaction"""
@@ -304,11 +348,65 @@ class SalomeSDSTest(unittest.TestCase):
     self.assertEqual(str2Obj(dss.fetchSerializedContent(varName)),{'ab':[4,5,6],'cd':[7,8,9,10]})
     pass
 
+  def testLockToDump(self):
+    """ Test to check that holdRequests method. This method wait for clean server status and hold it until activeRequests is called.
+    Warning this method expects a not overloaded machine to be run because test is based on ellapse time.
+    """
+    scopeName="Scope1"
+    varName="ab"
+    zeObj={"ab":[5,6]}
+    dsm=salome.naming_service.Resolve("/DataServerManager")
+    dsm.cleanScopesInNS()
+    if scopeName in dsm.listScopes():
+        dsm.removeDataScope(scopeName)
+    dss,isCreated=dsm.giveADataScopeTransactionCalled(scopeName)
+    self.assertTrue(isCreated)
+    cv=mp.Condition(mp.Lock())
+    cv2=mp.Condition(mp.Lock()) # sharedNum & cv2 & cv3 for the barrier
+    cv3=mp.Condition(mp.Lock())
+    sharedNum=mp.Value('b',False)
+    p=mp.Process(target=func_test7,args=(scopeName,cv,cv2,cv3,sharedNum))
+    p.start()
+    #
+    dss,isCreated=dsm.giveADataScopeTransactionCalled(scopeName)
+    self.assertTrue(not isCreated)
+    t0=dss.createRdWrVarTransac(varName,obj2Str(zeObj))
+    dss.atomicApply([t0])
+    rs=dss.getRequestSwitcher()
+    self.assertTrue(not isCreated)
+    ######## Barrier
+    with cv2:
+      if not sharedNum.value:
+        cv2.wait()
+      sharedNum.value=False
+      pass
+    with cv3:
+      cv3.notify_all()
+    ####### End Barrier
+    rs.holdRequests() # The aim of the test
+    self.assertEqual(rs.listVars(),[varName]) # call whereas holdRequest is called
+    time.sleep(nbOfSecWait)
+    rs.activeRequests() # The aim of the test
+    ######### micro-test3 - check that holdRequests is able to wait for a non finished job
+    with cv:
+      cv.wait()
+      s=datetime.now()
+      time.sleep(0.01) # let main proc the priority
+      rs.holdRequests() # the aim of the test is here. main process is occupied 1s -> holdRequests is Expected to wait
+      s=(datetime.now()-s).total_seconds()
+      self.assertTrue(str2Obj(rs.fetchSerializedContent(varName))==zeObj) # call whereas holdRequest is called
+      rs.activeRequests()
+      self.assertTrue(s>=0.99*nbOfSecWait and s<nbOfSecWait*1.01) # expect to be not locked
+    # finishing
+    p.join()
+    pass
+
   def setUp(self):
     salome.salome_init()
     pass
   
   pass
 
-unittest.main()
+if __name__=="__main__":
+  unittest.main()