# -*- coding: iso-8859-1 -*-
-# Copyright (C) 2007-2014 CEA/DEN, EDF R&D, OPEN CASCADE
+# Copyright (C) 2007-2019 CEA/DEN, EDF R&D, OPEN CASCADE
#
# Copyright (C) 2003-2007 OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
# CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
Ex�cution de la fonction. Impl�mentation de la m�thode run
d�clench�e par l'appel � Thread.start().
"""
- print "##################### threadhelper.run"
+ print("##################### threadhelper.run")
if self._function is None: return
try:
self._return = self._function(*self._argv)
- except Exception, e:
- print e
+ except Exception as e:
+ print(e)
self._exception = e
self._stopevent.set()
self.notifyObserver()
return
try:
self._observer.processNotification()
- except AttributeError, att:
+ except AttributeError as att:
if str(att) == "processNotification":
- print "L'observateur n'impl�mente pas la m�thode processNotification()"
+ print("L'observateur n'impl�mente pas la m�thode processNotification()")
else:
- print "La fonction processNotification() a lev� une exception:"
- print att
- except Exception, e:
- print "La fonction processNotification() a lev� une exception:"
- print e
+ print("La fonction processNotification() a lev� une exception:")
+ print(att)
+ except Exception as e:
+ print("La fonction processNotification() a lev� une exception:")
+ print(e)
def callback(self):
if self._callbackFunction is None: return
import os
testfilename="/tmp/threadhelperTestFile"
def testIfContinue():
- print "On examine la pr�sence du fichier ", testfilename
+ print("On examine la pr�sence du fichier ", testfilename)
if os.path.exists(testfilename):
return STOP
else:
return CONTINUE
def endedAction():
- print "FINI"
+ print("FINI")
def TEST_PeriodicTimer():
periodicTimer=PeriodicTimer(1,0,testIfContinue, endedAction)
"""
Fonction qui se termine correctement
"""
- print "D�but"
+ print("D�but")
cnt=0
while ( cnt < nbsteps ):
- print "Etape ", cnt
+ print("Etape ", cnt)
time.sleep(0.6)
cnt+=1
- print "Fin"
+ print("Fin")
def function_with_exception():
"""
Fonction qui aboutie � une lev�e d'exception
"""
- print "D�but"
+ print("D�but")
cnt=0
while ( cnt < 5 ):
- print "Etape ", cnt
+ print("Etape ", cnt)
time.sleep(1)
cnt+=1
raise Exception("erreur d'ex�cution de la fonction")
- print "Fin"
+ print("Fin")
def infinite_function():
"""
fonction de dur�e infinie (tant qu'il y a du courant �l�ctrique) pour
le test du timeout.
"""
- print "D�but"
+ print("D�but")
cnt=0
while ( 1 ):
- print "Etape ", cnt
+ print("Etape ", cnt)
time.sleep(1)
cnt+=1
raise Exception("erreur")
- print "Fin"
+ print("Fin")
def runWithRunner(functionToRun):
Ex�cute la fonction avec le runner. On illustre ici la modalit�
d'utilisation du Runner.
"""
- print "###########"
+ print("###########")
runner = Runner(functionToRun)
runner.start()
while ( not runner.isEnded() ):
- print "La fonction est en cours"
+ print("La fonction est en cours")
time.sleep(0.2)
e = runner.getException()
if e is not None:
- print "La fonction s'est termin�e en erreur"
- print e
+ print("La fonction s'est termin�e en erreur")
+ print(e)
# On peut en fait la relancer
# raise e
else:
- print "La fonction s'est termin�e correctement"
+ print("La fonction s'est termin�e correctement")
def runWithTimeout(functionToRun, timeout=10):
Ex�cute la fonction avec le runner. On illustre ici la modalit�
d'utilisation du Runner.
"""
- print "runWithTimeout : DEBUT"
+ print("runWithTimeout : DEBUT")
runner = Runner(functionToRun)
runner.start()
# On se fixe un temps au del� duquel on consid�re que la fonction
# est en erreur => on tue le thread (timeout)
runner.wait(timeout)
- print "Apr�s runner.timeout(timeout)"
+ print("Apr�s runner.timeout(timeout)")
if not runner.isEnded():
runner.kill()
e = runner.getException()
if e is not None:
- print "La fonction s'est termin�e en erreur"
- print e
+ print("La fonction s'est termin�e en erreur")
+ print(e)
# On peut en fait la relancer
# raise e
else:
- print "La fonction s'est termin�e correctement"
+ print("La fonction s'est termin�e correctement")
- print "runWithTimeout : FIN"
+ print("runWithTimeout : FIN")
import sys
sys.exit(0)
def myCallbackFunction():
- print "myCallbackFunction: the job is ended"
+ print("myCallbackFunction: the job is ended")
def TEST_runWithCallback():
return False
runnerId = runner.getId()
- print "A runner has been started with id="+str(runnerId)
+ print("A runner has been started with id="+str(runnerId))
cpt = 0
while ( not runner.isEnded() ):
- print "Waiting notification from process "+str(runner.getId())+", step n°"+str(cpt)
+ print("Waiting notification from process "+str(runner.getId())+", step n°"+str(cpt))
time.sleep(0.2)
cpt+=1
return True
if __name__ == "__main__":
- import unittester
+ from . import unittester
#TEST_PeriodicTimer()
#TEST_Runner()
#TEST_Timeout()