-# Copyright (C) 2007-2008 CEA/DEN, EDF R&D, OPEN CASCADE
+# Copyright (C) 2007-2014 CEA/DEN, EDF R&D, OPEN CASCADE
#
# Copyright (C) 2003-2007 OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
# CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
import sys
import time
-# Import libBatch library
-from libBatch_Swig import *
+# Import libbatch library
+from libbatch import *
def work():
print "*******************************************************************************************"
print "*******************************************************************************************"
# eventually remove any previous result
- if (os.path.exists('result.txt')):
- os.remove('result.txt')
+ if (os.path.exists("resultdir/seconddirname/result.txt")):
+ os.remove("resultdir/seconddirname/result.txt")
# Define the job...
job = Job()
# ... and its parameters ...
p = {}
- p['EXECUTABLE'] = './copied-test-script.sh'
- p['NAME'] = 'Test_Local_SH'
- p['WORKDIR'] = '/tmp'
- p['INFILE'] = [('seta.sh', 'copied-seta.sh'), ('setb.sh', 'copied-setb.sh'),
- ('test-script.sh', 'copied-test-script.sh')]
- p['OUTFILE'] = [('result.txt', 'orig-result.txt')]
+ p[EXECUTABLE] = config.TEST_SOURCE_DIR + "/test_script.py";
+ p[ARGUMENTS] = ["copied_seta.py", "copied_setb.py", "orig_result.txt"];
+ p[NAME] = 'Test_Python_Local_SH'
+ p[WORKDIR] = config.TEST_LOCAL_SH_WORKDIR
+ p[INFILE] = [(config.TEST_SOURCE_DIR + '/seta.py', 'copied_seta.py'),
+ (config.TEST_SOURCE_DIR + '/setb.py', 'copied_setb.py')]
+ p[OUTFILE] = [('result.txt', 'orig_result.txt')]
job.setParametre(p)
# ... and its environment
e = {}
+ e["MYENVVAR"] = "MYVALUE";
job.setEnvironnement(e)
print job
# Get the catalog
c = BatchManagerCatalog.getInstance()
- # Create a BatchManager of type Local_SSH on localhost
- bm = c('SH')('localhost')
+ # Create a BatchManager of type Local_SH on localhost
+ bm = c('LOCAL')('localhost', '', SH)
# Submit the job to the BatchManager
jobid = bm.submitJob(job)
jobid.queryJob()
# Wait for the end of the job
- state = 'Unknown'
- while state != 'Done':
- time.sleep(0.1)
- jinfo = jobid.queryJob()
- try:
- state = jinfo.getParametre()['STATE']
- except KeyError:
- pass
+ state = bm.waitForJobEnd(jobid, config.TEST_LOCAL_SH_TIMEOUT);
- print "Job", jobid, "is done"
- # wait for 2 more seconds for the copy of output files and the cleanup
- # (there's no cleaner way to do that yet)
- time.sleep(2)
+ if state == FINISHED:
+ print "Job", jobid, "is done"
+ bm.importOutputFiles(job, "resultdir/seconddirname")
+ elif state == FAILED:
+ print "Job", jobid, " finished in error"
+ bm.importOutputFiles(job, "resultdir/seconddirname")
+ return 1
+ else:
+ print "Timeout while executing job"
+ return 1
- # test the result file
- exp = "c = 12"
- f = open('result.txt')
- res = f.read().strip()
- print "result found : %s, expected : %s" % (res, exp)
+ if state != FINISHED and state != FAILED:
+ print "Error: Job not finished after timeout"
+ return 1;
- if (res == exp):
- return 0
+ # test the result file
+ res = {}
+ execfile('resultdir/seconddirname/result.txt', res)
+ if (res["c"] == 12 and res["MYENVVAR"] == "MYVALUE"):
+ print "OK, Expected result found."
+ return 0
else:
- return 1
+ print "result found : %s, expected : %s" % (res, 'res["c"] == 12 and res["MYENVVAR"] == "MYVALUE"')
+ return 1
if __name__ == "__main__":
retcode = work()