# Test of SalomeLauncher.
# This test should be run in the salome environment, using "salome shell"
-# and salome application should be running.
+# and a salome application should be running.
+# The test will try to launch batch jobs on every available ressources which
+# have the can_launch_batch_jobs parameter set to True.
+# You can use the environment variable USER_CATALOG_RESOURCES_FILE in order to
+# define a customised ressource catalog.
# If YACS_ROOT_DIR is not set, the test of submitting a YACS schema will be
# skiped.
class TestCompo(unittest.TestCase):
shutil.rmtree(cls.test_dir, ignore_errors=True)
os.mkdir(cls.test_dir)
+ # load catalogs
+# mc = salome.naming_service.Resolve('/Kernel/ModulCatalog')
+# ior = salome.orb.object_to_string(mc)
+# import SALOMERuntime
+# SALOMERuntime.RuntimeSALOME_setRuntime()
+# salome_runtime = SALOMERuntime.getSALOMERuntime()
+# session_catalog = salome_runtime.loadCatalog("session", ior)
+# salome_runtime.addCatalog(session_catalog)
+
+ # Get the list of possible ressources
+ ressource_param = salome.ResourceParameters()
+ ressource_param.can_launch_batch_jobs = True
+ rm = salome.lcc.getResourcesManager()
+ cls.ressources = rm.GetFittingResources(ressource_param)
+
##############################
# test of python_salome job
##############################
f.write(script_text)
f.close()
- local_result_dir = os.path.join(case_test_dir, "result_simple_py_job")
+ local_result_dir = os.path.join(case_test_dir, "result_py_job")
job_params = salome.JobParameters()
- job_params.job_name = "MyJob"
job_params.job_type = "python_salome"
job_params.job_file = job_script_file
job_params.in_files = []
job_params.out_files = ["result.txt", "subdir"]
- job_params.result_directory = local_result_dir
job_params.resource_required = salome.ResourceParameters()
job_params.resource_required.nb_proc = 1
- job_params.work_directory = "/tmp/job_salome_py" + self.suffix
-
launcher = salome.naming_service.Resolve('/SalomeLauncher')
- job_id = launcher.createJob(job_params)
-
- launcher.launchJob(job_id)
- import time
- jobState = launcher.getJobState(job_id)
- print "Job %d state: %s" % (job_id,jobState)
- while jobState != "FINISHED" and jobState != "FAILED" :
- time.sleep(5)
+ for resource in self.ressources:
+ print "Testing python_salome job on ", resource
+ job_params.result_directory = local_result_dir + resource
+ job_params.job_name = "PyJob" + resource
+ job_params.resource_required.name = resource
+ # use default working directory for this test
+
+ job_id = launcher.createJob(job_params)
+ launcher.launchJob(job_id)
+
jobState = launcher.getJobState(job_id)
print "Job %d state: %s" % (job_id,jobState)
- pass
-
- self.assertEqual(jobState, "FINISHED")
-
- # getJobResults to default directory (result_directory)
- launcher.getJobResults(job_id, "")
- try:
- f = open(os.path.join(local_result_dir, "result.txt"), 'r')
- text = f.read()
- f.close()
- self.assertEqual(text, "Salut!")
- except IOError,ex:
- self.fail("IO exception:" + str(ex));
-
- try:
- f = open(os.path.join(local_result_dir, "subdir", "autre.txt"), 'r')
- text = f.read()
- f.close()
- self.assertEqual(text, "Hello!")
- except IOError,ex:
- self.fail("IO exception:" + str(ex));
-
- # getJobResults to a specific directory
- mydir = os.path.join(case_test_dir, "custom_result_dir")
- launcher.getJobResults(job_id, mydir)
- try:
- f = open(os.path.join(mydir, "result.txt"), 'r')
- text = f.read()
- f.close()
- self.assertEqual(text, "Salut!")
- except IOError,ex:
- self.fail("IO exception:" + str(ex));
-
- try:
- f = open(os.path.join(mydir, "subdir", "autre.txt"), 'r')
- text = f.read()
- f.close()
- self.assertEqual(text, "Hello!")
- except IOError,ex:
- self.fail("IO exception:" + str(ex));
- pass
-
+ while jobState != "FINISHED" and jobState != "FAILED" :
+ time.sleep(5)
+ jobState = launcher.getJobState(job_id)
+ print "Job %d state: %s" % (job_id,jobState)
+ pass
+
+ self.assertEqual(jobState, "FINISHED")
+
+ # getJobResults to default directory (result_directory)
+ launcher.getJobResults(job_id, "")
+ try:
+ f = open(os.path.join(job_params.result_directory, "result.txt"), 'r')
+ text = f.read()
+ f.close()
+ self.assertEqual(text, "Salut!")
+ except IOError,ex:
+ self.fail("IO exception:" + str(ex));
+
+ try:
+ f = open(os.path.join(job_params.result_directory,
+ "subdir", "autre.txt"), 'r')
+ text = f.read()
+ f.close()
+ self.assertEqual(text, "Hello!")
+ except IOError,ex:
+ self.fail("IO exception:" + str(ex));
+
+ # getJobResults to a specific directory
+ mydir = os.path.join(case_test_dir, "custom_result_dir" + resource)
+ launcher.getJobResults(job_id, mydir)
+ try:
+ f = open(os.path.join(mydir, "result.txt"), 'r')
+ text = f.read()
+ f.close()
+ self.assertEqual(text, "Salut!")
+ except IOError,ex:
+ self.fail("IO exception:" + str(ex));
+
+ try:
+ f = open(os.path.join(mydir, "subdir", "autre.txt"), 'r')
+ text = f.read()
+ f.close()
+ self.assertEqual(text, "Hello!")
+ except IOError,ex:
+ self.fail("IO exception:" + str(ex));
+ pass
+ pass #for
+
os.chdir(old_dir)
##############################
in_text = in_f.read()
in_f.close()
-os.mkdir("subdir")
-f = open(os.path.join("subdir",'altul.txt'), 'w')
+os.mkdir("copie")
+f = open(os.path.join("copie",'copie.txt'), 'w')
f.write(in_text)
f.close()
"""
# write data file
f = open(os.path.join(case_test_dir, data_file), "w")
- f.write("expected data")
+ f.write("to be copied")
f.close()
# job params
- local_result_dir = os.path.join(case_test_dir, "resultats_env")
+ local_result_dir = os.path.join(case_test_dir, "result_com_job")
job_params = salome.JobParameters()
- job_params.job_name = "CommandJob"
job_params.job_type = "command"
job_params.job_file = script_file
job_params.env_file = env_file
job_params.in_files = [data_file]
- job_params.out_files = ["result.txt", "subdir"]
+ job_params.out_files = ["result.txt", "copie"]
job_params.local_directory = case_test_dir
- job_params.result_directory = local_result_dir
job_params.resource_required = salome.ResourceParameters()
job_params.resource_required.nb_proc = 1
- job_params.work_directory = "/tmp/command_job" + self.suffix
# create and launch the job
launcher = salome.naming_service.Resolve('/SalomeLauncher')
- job_id = launcher.createJob(job_params)
- launcher.launchJob(job_id)
-
- # wait for the end of the job
- import time
- jobState = launcher.getJobState(job_id)
- print "Job %d state: %s" % (job_id,jobState)
- while jobState != "FINISHED" and jobState != "FAILED" :
- time.sleep(3)
+ resManager= salome.lcc.getResourcesManager()
+
+ for resource in self.ressources:
+ print "Testing command job on ", resource
+ job_params.result_directory = local_result_dir + resource
+ job_params.job_name = "CommandJob_" + resource
+ job_params.resource_required.name = resource
+
+ # use the working directory of the resource
+ resParams = resManager.GetResourceDefinition(resource)
+ wd = os.path.join(resParams.working_directory,
+ "CommandJob_" + self.suffix)
+ job_params.work_directory = wd
+
+ job_id = launcher.createJob(job_params)
+ launcher.launchJob(job_id)
+ # wait for the end of the job
jobState = launcher.getJobState(job_id)
print "Job %d state: %s" % (job_id,jobState)
- pass
-
- # verify the results
- self.assertEqual(jobState, "FINISHED")
- launcher.getJobResults(job_id, "")
- try:
- f = open(os.path.join(local_result_dir, "result.txt"), 'r')
- text = f.read()
- f.close()
- self.assertEqual(text, "expected")
- except IOError,ex:
- self.fail("IO exception:" + str(ex));
-
+ while jobState != "FINISHED" and jobState != "FAILED" :
+ time.sleep(3)
+ jobState = launcher.getJobState(job_id)
+ print "Job %d state: %s" % (job_id,jobState)
+ pass
+
+ # verify the results
+ self.assertEqual(jobState, "FINISHED")
+ launcher.getJobResults(job_id, "")
+ try:
+ f = open(os.path.join(job_params.result_directory, "result.txt"), 'r')
+ text = f.read()
+ f.close()
+ self.assertEqual(text, "expected")
+ except IOError,ex:
+ self.fail("IO exception:" + str(ex));
+
+ try:
+ f = open(os.path.join(job_params.result_directory,
+ "copie",'copie.txt'), 'r')
+ text = f.read()
+ f.close()
+ self.assertEqual(text, "to be copied")
+ except IOError,ex:
+ self.fail("IO exception:" + str(ex));
+
##############################
# test of yacs job type
##############################
local_result_dir = os.path.join(case_test_dir, "result_yacs_job")
job_params = salome.JobParameters()
- job_params.job_name = "MyYacsJob"
job_params.job_type = "yacs_file"
job_params.job_file = job_script_file
job_params.env_file = os.path.join(case_test_dir,env_file)
- #job_params.in_files = [case_test_dir]
job_params.out_files = ["result.txt"]
- job_params.result_directory = local_result_dir
# define the interval between two YACS schema dumps (3 seconds)
import Engines
job_params.specific_parameters = [Engines.Parameter("EnableDumpYACS", "3")]
-
job_params.resource_required = salome.ResourceParameters()
job_params.resource_required.nb_proc = 1
-
- job_params.work_directory = "/tmp/job_yacs" + self.suffix
-
+
launcher = salome.naming_service.Resolve('/SalomeLauncher')
- job_id = launcher.createJob(job_params)
+ resManager= salome.lcc.getResourcesManager()
- launcher.launchJob(job_id)
-
- import time
- jobState = launcher.getJobState(job_id)
- yacs_dump_success = False
- print "Job %d state: %s" % (job_id,jobState)
- while jobState != "FINISHED" and jobState != "FAILED" :
- time.sleep(5)
+ for resource in self.ressources:
+ print "Testing yacs job on ", resource
+ job_params.result_directory = local_result_dir + resource
+ job_params.job_name = "YacsJob_" + resource
+ job_params.resource_required.name = resource
+
+ # use the working directory of the resource
+ resParams = resManager.GetResourceDefinition(resource)
+ wd = os.path.join(resParams.working_directory,
+ "YacsJob_" + self.suffix)
+ job_params.work_directory = wd
+
+ job_id = launcher.createJob(job_params)
+ launcher.launchJob(job_id)
jobState = launcher.getJobState(job_id)
- yacs_dump_success = launcher.getJobDumpState(job_id, local_result_dir)
- print "Job %d state: %s - dump: %s" % (job_id,jobState, yacs_dump_success)
- pass
-
- self.assertEqual(jobState, "FINISHED")
-
- # Verify dumpState file is in the results
- self.assertTrue(yacs_dump_success)
- dump_file_path = os.path.join(local_result_dir, "dumpState_mySchema.xml")
- self.assertTrue(os.path.isfile(dump_file_path))
-
- # Load the schema state from the dump file and verify the state of a node
- import SALOMERuntime
- SALOMERuntime.RuntimeSALOME_setRuntime(1)
- import loader
- schema = loader.YACSLoader().load(job_script_file)
- stateParser = loader.stateParser()
- sl = loader.stateLoader(stateParser, schema)
- sl.parse(dump_file_path)
- # 106 : "DONE" state code
- self.assertEqual(106, schema.getChildByName("PyScript0").getEffectiveState())
-
- # getJobResults to default directory (result_directory)
- launcher.getJobResults(job_id, "")
- try:
- f = open(os.path.join(local_result_dir, "result.txt"), 'r')
- text = f.read()
- f.close()
- self.assertEqual(text, "expected")
- except IOError,ex:
- self.fail("IO exception:" + str(ex))
+
+ yacs_dump_success = False
+ print "Job %d state: %s" % (job_id,jobState)
+ while jobState != "FINISHED" and jobState != "FAILED" :
+ time.sleep(5)
+ jobState = launcher.getJobState(job_id)
+ yacs_dump_success = launcher.getJobDumpState(job_id,
+ job_params.result_directory)
+ print "Job %d state: %s - dump: %s" % (job_id,jobState, yacs_dump_success)
+ pass
+
+ self.assertEqual(jobState, "FINISHED")
+
+ # Verify dumpState file is in the results
+ self.assertTrue(yacs_dump_success)
+ dump_file_path = os.path.join(job_params.result_directory,
+ "dumpState_mySchema.xml")
+ self.assertTrue(os.path.isfile(dump_file_path))
+
+ # Load the schema state from the dump file and verify the state of a node
+ import SALOMERuntime
+ SALOMERuntime.RuntimeSALOME_setRuntime(1)
+ import loader
+ schema = loader.YACSLoader().load(job_script_file)
+ stateParser = loader.stateParser()
+ sl = loader.stateLoader(stateParser, schema)
+ sl.parse(dump_file_path)
+ # 106 : "DONE" state code
+ self.assertEqual(106, schema.getChildByName("PyScript0").getEffectiveState())
+
+ # getJobResults to default directory (result_directory)
+ launcher.getJobResults(job_id, "")
+ try:
+ f = open(os.path.join(job_params.result_directory, "result.txt"), 'r')
+ text = f.read()
+ f.close()
+ self.assertEqual(text, "expected")
+ except IOError,ex:
+ self.fail("IO exception:" + str(ex))
if __name__ == '__main__':
# creat study
import salome
salome.salome_init()
- unittest.main()
-
- # load catalogs
-# mc = salome.naming_service.Resolve('/Kernel/ModulCatalog')
-# ior = salome.orb.object_to_string(mc)
-# import SALOMERuntime
-# SALOMERuntime.RuntimeSALOME_setRuntime()
-# salome_runtime = SALOMERuntime.getSALOMERuntime()
-# session_catalog = salome_runtime.loadCatalog("session", ior)
-# salome_runtime.addCatalog(session_catalog)
-
+ unittest.main()
\ No newline at end of file