2 # -*- coding: utf-8 -*-
14 except OSError as exc: # Python >2.5
15 if exc.errno == errno.EEXIST and os.path.isdir(path):
20 # Test of SalomeLauncher.
21 # This test should be run in the salome environment, using "salome shell"
22 # and a salome application should be running.
23 # The test will try to launch batch jobs on every available ressources which
24 # have the can_launch_batch_jobs parameter set to True.
25 # You can use the environment variable USER_CATALOG_RESOURCES_FILE in order to
26 # define a customised ressource catalog.
27 # If YACS_ROOT_DIR is not set, the test of submitting a YACS schema will be
29 class TestCompo(unittest.TestCase):
32 # Prepare the test directory
33 temp = tempfile.NamedTemporaryFile()
34 cls.test_dir = os.path.join(temp.name, "test_dir")
35 name = os.path.basename(temp.name)
37 cls.suffix = time.strftime("-%Y-%m-%d-%H-%M-%S")+"-%s"%(os.getpid())
41 # mc = salome.naming_service.Resolve('/Kernel/ModulCatalog')
42 # ior = salome.orb.object_to_string(mc)
43 # import SALOMERuntime
44 # SALOMERuntime.RuntimeSALOME_setRuntime()
45 # salome_runtime = SALOMERuntime.getSALOMERuntime()
46 # session_catalog = salome_runtime.loadCatalog("session", ior)
47 # salome_runtime.addCatalog(session_catalog)
49 # Get the list of possible ressources
50 ressource_param = salome.ResourceParameters()
51 ressource_param.can_launch_batch_jobs = True
52 rm = salome.lcc.getResourcesManager()
53 cls.ressources = rm.GetFittingResources(ressource_param)
55 def verifyFile(self, path, content):
60 self.assertEqual(text, content)
62 self.fail("IO exception:" + str(ex));
64 def create_JobParameters(self):
65 job_params = salome.JobParameters()
66 job_params.wckey="P11U50:CARBONES" #needed by edf clusters
67 job_params.resource_required = salome.ResourceParameters()
68 job_params.resource_required.nb_proc = 1
71 ##############################
72 # test of python_salome job
73 ##############################
74 def test_salome_py_job(self):
75 case_test_dir = os.path.join(TestCompo.test_dir, "salome_py")
76 mkdir_p(case_test_dir)
79 os.chdir(case_test_dir)
82 script_file = "myScript.py"
83 job_script_file = os.path.join(case_test_dir, script_file)
84 script_text = """#! /usr/bin/env python
85 # -*- coding: utf-8 -*-
87 # verify import salome
91 f = open('result.txt', 'w')
97 f = open(os.path.join("subdir",'autre.txt'), 'w')
101 f = open(job_script_file, "w")
105 local_result_dir = os.path.join(case_test_dir, "result_py_job-")
106 job_params = self.create_JobParameters()
107 job_params.job_type = "python_salome"
108 job_params.job_file = job_script_file
109 job_params.in_files = []
110 job_params.out_files = ["result.txt", "subdir"]
112 launcher = salome.naming_service.Resolve('/SalomeLauncher')
114 for resource in self.ressources:
115 print "Testing python_salome job on ", resource
116 job_params.result_directory = local_result_dir + resource
117 job_params.job_name = "PyJob" + resource
118 job_params.resource_required.name = resource
119 # use default working directory for this test
121 job_id = launcher.createJob(job_params)
122 launcher.launchJob(job_id)
124 jobState = launcher.getJobState(job_id)
125 print "Job %d state: %s" % (job_id,jobState)
126 while jobState != "FINISHED" and jobState != "FAILED" :
128 jobState = launcher.getJobState(job_id)
129 print "Job %d state: %s" % (job_id,jobState)
132 self.assertEqual(jobState, "FINISHED")
134 # getJobResults to default directory (result_directory)
135 launcher.getJobResults(job_id, "")
136 self.verifyFile(os.path.join(job_params.result_directory, "result.txt"),
138 self.verifyFile(os.path.join(job_params.result_directory,
139 "subdir", "autre.txt"),
142 # getJobResults to a specific directory
143 mydir = os.path.join(case_test_dir, "custom_result_dir" + resource)
144 launcher.getJobResults(job_id, mydir)
145 self.verifyFile(os.path.join(mydir, "result.txt"), "Salut!")
146 self.verifyFile(os.path.join(mydir, "subdir", "autre.txt"), "Hello!")
151 ##############################
152 # test of command job type
153 ##############################
154 def test_command(self):
155 case_test_dir = os.path.join(TestCompo.test_dir, "command")
156 mkdir_p(case_test_dir)
160 script_file = "myEnvScript.py"
161 script_text = """#! /usr/bin/env python
162 # -*- coding: utf-8 -*-
166 text_result = os.getenv("ENV_TEST_VAR","")
168 f = open('result.txt', 'w')
172 in_f = open("in.txt", "r")
173 in_text = in_f.read()
177 f = open(os.path.join("copie",'copie.txt'), 'w')
181 abs_script_file = os.path.join(case_test_dir, script_file)
182 f = open(abs_script_file, "w")
185 os.chmod(abs_script_file, 0o755)
188 env_file = "myEnv.sh"
189 env_text = """export ENV_TEST_VAR="expected"
191 f = open(os.path.join(case_test_dir, env_file), "w")
196 f = open(os.path.join(case_test_dir, data_file), "w")
197 f.write("to be copied")
201 local_result_dir = os.path.join(case_test_dir, "result_com_job-")
202 job_params = self.create_JobParameters()
203 job_params.job_type = "command"
204 job_params.job_file = script_file
205 job_params.env_file = env_file
206 job_params.in_files = [data_file]
207 job_params.out_files = ["result.txt", "copie"]
208 job_params.local_directory = case_test_dir
210 # create and launch the job
211 launcher = salome.naming_service.Resolve('/SalomeLauncher')
212 resManager= salome.lcc.getResourcesManager()
214 for resource in self.ressources:
215 print "Testing command job on ", resource
216 job_params.result_directory = local_result_dir + resource
217 job_params.job_name = "CommandJob_" + resource
218 job_params.resource_required.name = resource
220 # use the working directory of the resource
221 resParams = resManager.GetResourceDefinition(resource)
222 wd = os.path.join(resParams.working_directory,
223 "CommandJob" + self.suffix)
224 job_params.work_directory = wd
226 job_id = launcher.createJob(job_params)
227 launcher.launchJob(job_id)
228 # wait for the end of the job
229 jobState = launcher.getJobState(job_id)
230 print "Job %d state: %s" % (job_id,jobState)
231 while jobState != "FINISHED" and jobState != "FAILED" :
233 jobState = launcher.getJobState(job_id)
234 print "Job %d state: %s" % (job_id,jobState)
238 self.assertEqual(jobState, "FINISHED")
239 launcher.getJobResults(job_id, "")
240 self.verifyFile(os.path.join(job_params.result_directory, "result.txt"),
242 self.verifyFile(os.path.join(job_params.result_directory,
243 "copie",'copie.txt'),
246 # verify getJobWorkFile
247 mydir = os.path.join(case_test_dir, "work_dir" + resource)
248 success = launcher.getJobWorkFile(job_id, "result.txt", mydir)
249 self.assertEqual(success, True)
250 self.verifyFile(os.path.join(mydir, "result.txt"), "expected")
252 success = launcher.getJobWorkFile(job_id, "copie", mydir)
253 self.assertEqual(success, True)
254 self.verifyFile(os.path.join(mydir, "copie", "copie.txt"),
258 ##############################
259 # test of yacs job type
260 ##############################
262 yacs_path = os.getenv("YACS_ROOT_DIR", "")
263 if not os.path.isdir(yacs_path):
264 self.skipTest("Needs YACS module to run. Please define YACS_ROOT_DIR.")
266 case_test_dir = os.path.join(TestCompo.test_dir, "yacs")
267 mkdir_p(case_test_dir)
270 env_file = "myEnv.sh"
271 env_text = """export ENV_TEST_VAR="expected"
273 f = open(os.path.join(case_test_dir, env_file), "w")
278 script_text = """<?xml version='1.0' encoding='iso-8859-1' ?>
279 <proc name="newSchema_1">
280 <property name="DefaultStudyID" value="1"/>
281 <container name="DefaultContainer">
282 <property name="container_kind" value="Salome"/>
283 <property name="attached_on_cloning" value="0"/>
284 <property name="container_name" value="FactoryServer"/>
285 <property name="name" value="localhost"/>
287 <inline name="PyScript0">
288 <script><code><![CDATA[import os
289 text_result = os.getenv("ENV_TEST_VAR","")
290 f = open('result.txt', 'w')
294 <load container="DefaultContainer"/>
298 yacs_file = "mySchema.xml"
299 job_script_file = os.path.join(case_test_dir, yacs_file)
300 f = open(job_script_file, "w")
304 local_result_dir = os.path.join(case_test_dir, "result_yacs_job-")
305 job_params = self.create_JobParameters()
306 job_params.job_type = "yacs_file"
307 job_params.job_file = job_script_file
308 job_params.env_file = os.path.join(case_test_dir,env_file)
309 job_params.out_files = ["result.txt"]
311 # define the interval between two YACS schema dumps (3 seconds)
313 job_params.specific_parameters = [Engines.Parameter("EnableDumpYACS", "3")]
315 launcher = salome.naming_service.Resolve('/SalomeLauncher')
316 resManager= salome.lcc.getResourcesManager()
318 for resource in self.ressources:
319 print "Testing yacs job on ", resource
320 job_params.result_directory = local_result_dir + resource
321 job_params.job_name = "YacsJob_" + resource
322 job_params.resource_required.name = resource
324 # use the working directory of the resource
325 resParams = resManager.GetResourceDefinition(resource)
326 wd = os.path.join(resParams.working_directory,
327 "YacsJob" + self.suffix)
328 job_params.work_directory = wd
330 job_id = launcher.createJob(job_params)
331 launcher.launchJob(job_id)
332 jobState = launcher.getJobState(job_id)
334 yacs_dump_success = False
335 print "Job %d state: %s" % (job_id,jobState)
336 while jobState != "FINISHED" and jobState != "FAILED" :
338 jobState = launcher.getJobState(job_id)
339 # yacs_dump_success = launcher.getJobWorkFile(job_id, "dumpState_mySchema.xml",
340 yacs_dump_success = launcher.getJobDumpState(job_id,
341 job_params.result_directory)
342 print "Job %d state: %s - dump: %s" % (job_id,jobState, yacs_dump_success)
345 self.assertEqual(jobState, "FINISHED")
347 # Verify dumpState file is in the results
348 self.assertTrue(yacs_dump_success)
349 dump_file_path = os.path.join(job_params.result_directory,
350 "dumpState_mySchema.xml")
351 self.assertTrue(os.path.isfile(dump_file_path))
353 # Load the schema state from the dump file and verify the state of a node
355 SALOMERuntime.RuntimeSALOME_setRuntime(1)
357 schema = loader.YACSLoader().load(job_script_file)
358 stateParser = loader.stateParser()
359 sl = loader.stateLoader(stateParser, schema)
360 sl.parse(dump_file_path)
361 # 106 : "DONE" state code
362 self.assertEqual(106, schema.getChildByName("PyScript0").getEffectiveState())
364 # getJobResults to default directory (result_directory)
365 launcher.getJobResults(job_id, "")
366 self.verifyFile(os.path.join(job_params.result_directory, "result.txt"),
369 ##############################
370 # test of yacs job type using "--init_port" driver option
371 ##############################
372 def test_yacsopt(self):
373 yacs_path = os.getenv("YACS_ROOT_DIR", "")
374 if not os.path.isdir(yacs_path):
375 self.skipTest("Needs YACS module to run. Please define YACS_ROOT_DIR.")
377 case_test_dir = os.path.join(TestCompo.test_dir, "yacs_opt")
378 mkdir_p(case_test_dir)
381 script_text = """<?xml version='1.0' encoding='iso-8859-1' ?>
382 <proc name="myschema">
383 <property name="DefaultStudyID" value="1"/>
384 <type name="string" kind="string"/>
385 <type name="bool" kind="bool"/>
386 <type name="double" kind="double"/>
387 <type name="int" kind="int"/>
388 <container name="DefaultContainer">
389 <property name="container_kind" value="Salome"/>
390 <property name="attached_on_cloning" value="0"/>
391 <property name="container_name" value="FactoryServer"/>
392 <property name="name" value="localhost"/>
394 <inline name="mynode">
395 <script><code><![CDATA[
396 text_result = "i=%s,d=%s,b=%s,s=%s" % (i,d,b,s)
397 f = open('result.txt', 'w')
401 <load container="DefaultContainer"/>
402 <inport name="i" type="int"/>
403 <inport name="d" type="double"/>
404 <inport name="b" type="bool"/>
405 <inport name="s" type="string"/>
409 yacs_file = "simpleSchema.xml"
410 job_script_file = os.path.join(case_test_dir, yacs_file)
411 f = open(job_script_file, "w")
415 local_result_dir = os.path.join(case_test_dir, "result_yacsopt_job-")
416 job_params = self.create_JobParameters()
417 job_params.job_type = "yacs_file"
418 job_params.job_file = job_script_file
419 job_params.out_files = ["result.txt"]
421 # define the interval between two YACS schema dumps (3 seconds)
423 job_params.specific_parameters = [Engines.Parameter("YACSDriverOptions",
424 "-imynode.i=5 -imynode.d=3.7 -imynode.b=False -imynode.s=lili")]
425 expected_result="i=5,d=3.7,b=False,s=lili"
427 launcher = salome.naming_service.Resolve('/SalomeLauncher')
428 resManager= salome.lcc.getResourcesManager()
430 for resource in self.ressources:
431 print "Testing yacs job with options on ", resource
432 job_params.result_directory = local_result_dir + resource
433 job_params.job_name = "YacsJobOpt_" + resource
434 job_params.resource_required.name = resource
436 # use the working directory of the resource
437 resParams = resManager.GetResourceDefinition(resource)
438 wd = os.path.join(resParams.working_directory,
439 "YacsJobOpt" + self.suffix)
440 job_params.work_directory = wd
442 job_id = launcher.createJob(job_params)
443 launcher.launchJob(job_id)
444 jobState = launcher.getJobState(job_id)
446 yacs_dump_success = False
447 print "Job %d state: %s" % (job_id,jobState)
448 while jobState != "FINISHED" and jobState != "FAILED" :
450 jobState = launcher.getJobState(job_id)
451 print "Job %d state: %s " % (job_id,jobState)
454 self.assertEqual(jobState, "FINISHED")
456 # getJobResults to default directory (result_directory)
457 launcher.getJobResults(job_id, "")
458 self.verifyFile(os.path.join(job_params.result_directory, "result.txt"),
461 ############################################
462 # test of command job type with pre_command
463 ############################################
464 def test_command_pre(self):
465 case_test_dir = os.path.join(TestCompo.test_dir, "command_pre")
466 mkdir_p(case_test_dir)
468 # command to be run before the job
469 pre_command = "pre_command.sh"
470 pre_command_text = "echo 'it works!' > in.txt"
471 abs_pre_command_file = os.path.join(case_test_dir, pre_command)
472 f = open(abs_pre_command_file, "w")
473 f.write(pre_command_text)
475 os.chmod(abs_pre_command_file, 0o755)
478 script_file = "myTestScript.py"
479 script_text = """#! /usr/bin/env python
480 # -*- coding: utf-8 -*-
482 in_f = open("in.txt", "r")
483 in_text = in_f.read()
486 f = open('result.txt', 'w')
490 abs_script_file = os.path.join(case_test_dir, script_file)
491 f = open(abs_script_file, "w")
494 os.chmod(abs_script_file, 0o755)
497 local_result_dir = os.path.join(case_test_dir, "result_com_pre_job-")
498 job_params = self.create_JobParameters()
499 job_params.job_type = "command"
500 job_params.job_file = script_file
501 job_params.pre_command = pre_command
502 job_params.in_files = []
503 job_params.out_files = ["result.txt"]
504 job_params.local_directory = case_test_dir
506 # create and launch the job
507 launcher = salome.naming_service.Resolve('/SalomeLauncher')
508 resManager= salome.lcc.getResourcesManager()
510 for resource in self.ressources:
511 print "Testing command job on ", resource
512 job_params.result_directory = local_result_dir + resource
513 job_params.job_name = "CommandPreJob_" + resource
514 job_params.resource_required.name = resource
516 # use the working directory of the resource
517 resParams = resManager.GetResourceDefinition(resource)
518 wd = os.path.join(resParams.working_directory,
519 "CommandPreJob" + self.suffix)
520 job_params.work_directory = wd
522 job_id = launcher.createJob(job_params)
523 launcher.launchJob(job_id)
524 # wait for the end of the job
525 jobState = launcher.getJobState(job_id)
526 print "Job %d state: %s" % (job_id,jobState)
527 while jobState != "FINISHED" and jobState != "FAILED" :
529 jobState = launcher.getJobState(job_id)
530 print "Job %d state: %s" % (job_id,jobState)
534 self.assertEqual(jobState, "FINISHED")
535 launcher.getJobResults(job_id, "")
536 self.verifyFile(os.path.join(job_params.result_directory, "result.txt"),
539 #################################
540 # test of command salome job type
541 #################################
542 def test_command_salome(self):
543 case_test_dir = os.path.join(TestCompo.test_dir, "command_salome")
544 mkdir_p(case_test_dir)
548 script_file = "myEnvScript.py"
549 script_text = """#! /usr/bin/env python
550 # -*- coding: utf-8 -*-
553 # verify import salome
556 text_result = os.getenv("ENV_TEST_VAR","")
558 f = open('result.txt', 'w')
562 in_f = open("in.txt", "r")
563 in_text = in_f.read()
567 f = open(os.path.join("copie",'copie.txt'), 'w')
571 abs_script_file = os.path.join(case_test_dir, script_file)
572 f = open(abs_script_file, "w")
575 os.chmod(abs_script_file, 0o755)
578 env_file = "myEnv.sh"
579 env_text = """export ENV_TEST_VAR="expected"
581 f = open(os.path.join(case_test_dir, env_file), "w")
586 f = open(os.path.join(case_test_dir, data_file), "w")
587 f.write("to be copied")
591 local_result_dir = os.path.join(case_test_dir, "result_comsalome_job-")
592 job_params = self.create_JobParameters()
593 job_params.job_type = "command_salome"
594 job_params.job_file = script_file
595 job_params.env_file = env_file
596 job_params.in_files = [data_file]
597 job_params.out_files = ["result.txt", "copie"]
598 job_params.local_directory = case_test_dir
600 # create and launch the job
601 launcher = salome.naming_service.Resolve('/SalomeLauncher')
602 resManager= salome.lcc.getResourcesManager()
604 for resource in self.ressources:
605 print "Testing command salome job on ", resource
606 job_params.result_directory = local_result_dir + resource
607 job_params.job_name = "CommandSalomeJob_" + resource
608 job_params.resource_required.name = resource
610 # use the working directory of the resource
611 resParams = resManager.GetResourceDefinition(resource)
612 wd = os.path.join(resParams.working_directory,
613 "CommandSalomeJob" + self.suffix)
614 job_params.work_directory = wd
616 job_id = launcher.createJob(job_params)
617 launcher.launchJob(job_id)
618 # wait for the end of the job
619 jobState = launcher.getJobState(job_id)
620 print "Job %d state: %s" % (job_id,jobState)
621 while jobState != "FINISHED" and jobState != "FAILED" :
623 jobState = launcher.getJobState(job_id)
624 print "Job %d state: %s" % (job_id,jobState)
628 self.assertEqual(jobState, "FINISHED")
629 launcher.getJobResults(job_id, "")
630 self.verifyFile(os.path.join(job_params.result_directory, "result.txt"),
632 self.verifyFile(os.path.join(job_params.result_directory,
633 "copie",'copie.txt'),
636 # verify getJobWorkFile
637 mydir = os.path.join(case_test_dir, "work_dir" + resource)
638 success = launcher.getJobWorkFile(job_id, "result.txt", mydir)
639 self.assertEqual(success, True)
640 self.verifyFile(os.path.join(mydir, "result.txt"), "expected")
642 success = launcher.getJobWorkFile(job_id, "copie", mydir)
643 self.assertEqual(success, True)
644 self.verifyFile(os.path.join(mydir, "copie", "copie.txt"),
650 if __name__ == '__main__':