2 # -*- coding: utf-8 -*-
14 except OSError as exc: # Python >2.5
15 if exc.errno == errno.EEXIST and os.path.isdir(path):
20 # Test of SalomeLauncher.
21 # This test should be run in the salome environment, using "salome shell"
22 # and a salome application should be running.
23 # The test will try to launch batch jobs on every available resources which
24 # have the can_launch_batch_jobs parameter set to True.
25 # You can use the environment variable USER_CATALOG_RESOURCES_FILE in order to
26 # define a customised resource catalog.
27 # If YACS_ROOT_DIR is not set, the test of submitting a YACS schema will be
29 class TestCompo(unittest.TestCase):
32 # Prepare the test directory
33 temp = tempfile.NamedTemporaryFile()
34 cls.test_dir = os.path.join(temp.name, "test_dir")
35 name = os.path.basename(temp.name)
37 cls.suffix = time.strftime("-%Y-%m-%d-%H-%M-%S")+"-%s"%(os.getpid())
41 # mc = salome.naming_service.Resolve('/Kernel/ModulCatalog')
42 # ior = salome.orb.object_to_string(mc)
43 # import SALOMERuntime
44 # SALOMERuntime.RuntimeSALOME_setRuntime()
45 # salome_runtime = SALOMERuntime.getSALOMERuntime()
46 # session_catalog = salome_runtime.loadCatalog("session", ior)
47 # salome_runtime.addCatalog(session_catalog)
49 # Get the list of possible ressources
50 ressource_param = salome.ResourceParameters()
51 ressource_param.can_launch_batch_jobs = True
52 rm = salome.lcc.getResourcesManager()
53 cls.ressources = rm.GetFittingResources(ressource_param)
55 def verifyFile(self, path, content):
60 self.assertEqual(text, content)
62 self.fail("IO exception:" + str(ex));
64 def create_JobParameters(self):
65 job_params = salome.JobParameters()
66 job_params.wckey="P11U5:CARBONES" #needed by edf clusters
67 job_params.resource_required = salome.ResourceParameters()
68 job_params.resource_required.nb_proc = 1
71 ##############################
72 # test of python_salome job
73 ##############################
74 def test_salome_py_job(self):
75 case_test_dir = os.path.join(TestCompo.test_dir, "salome_py")
76 mkdir_p(case_test_dir)
79 os.chdir(case_test_dir)
82 script_file = "myScript.py"
83 job_script_file = os.path.join(case_test_dir, script_file)
84 script_text = """#! /usr/bin/env python
85 # -*- coding: utf-8 -*-
87 # verify import salome
91 f = open('result.txt', 'w')
97 f = open(os.path.join("subdir",'autre.txt'), 'w')
101 f = open(job_script_file, "w")
105 local_result_dir = os.path.join(case_test_dir, "result_py_job-")
106 job_params = self.create_JobParameters()
107 job_params.job_type = "python_salome"
108 job_params.job_file = job_script_file
109 job_params.in_files = []
110 job_params.out_files = ["result.txt", "subdir"]
112 launcher = salome.naming_service.Resolve('/SalomeLauncher')
114 for resource in self.ressources:
115 print("Testing python_salome job on ", resource)
116 job_params.result_directory = local_result_dir + resource
117 job_params.job_name = "PyJob" + resource
118 job_params.resource_required.name = resource
119 # use default working directory for this test
121 job_id = launcher.createJob(job_params)
122 launcher.launchJob(job_id)
124 jobState = launcher.getJobState(job_id)
125 print("Job %d state: %s" % (job_id,jobState))
126 while jobState != "FINISHED" and jobState != "FAILED" :
128 jobState = launcher.getJobState(job_id)
129 print("Job %d state: %s" % (job_id,jobState))
132 self.assertEqual(jobState, "FINISHED")
134 # getJobResults to default directory (result_directory)
135 launcher.getJobResults(job_id, "")
136 self.verifyFile(os.path.join(job_params.result_directory, "result.txt"),
138 self.verifyFile(os.path.join(job_params.result_directory,
139 "subdir", "autre.txt"),
142 # getJobResults to a specific directory
143 mydir = os.path.join(case_test_dir, "custom_result_dir" + resource)
144 launcher.getJobResults(job_id, mydir)
145 self.verifyFile(os.path.join(mydir, "result.txt"), "Salut!")
146 self.verifyFile(os.path.join(mydir, "subdir", "autre.txt"), "Hello!")
151 ##############################
152 # test of command job type
153 ##############################
154 def test_command(self):
155 case_test_dir = os.path.join(TestCompo.test_dir, "command")
156 mkdir_p(case_test_dir)
160 script_file = "myEnvScript.py"
161 script_text = """#! /usr/bin/env python
162 # -*- coding: utf-8 -*-
166 text_result = os.getenv("ENV_TEST_VAR","")
168 f = open('result.txt', 'w')
172 in_f = open("in.txt", "r")
173 in_text = in_f.read()
177 f = open(os.path.join("copie",'copie.txt'), 'w')
181 abs_script_file = os.path.join(case_test_dir, script_file)
182 f = open(abs_script_file, "w")
185 os.chmod(abs_script_file, 0o755)
188 env_file = "myEnv.sh"
189 env_text = """export ENV_TEST_VAR="expected"
191 f = open(os.path.join(case_test_dir, env_file), "w")
196 f = open(os.path.join(case_test_dir, data_file), "w")
197 f.write("to be copied")
201 local_result_dir = os.path.join(case_test_dir, "result_com_job-")
202 job_params = self.create_JobParameters()
203 job_params.job_type = "command"
204 job_params.job_file = script_file
205 job_params.env_file = env_file
206 job_params.in_files = [data_file]
207 job_params.out_files = ["result.txt", "copie"]
208 job_params.local_directory = case_test_dir
210 # create and launch the job
211 launcher = salome.naming_service.Resolve('/SalomeLauncher')
212 resManager= salome.lcc.getResourcesManager()
214 for resource in self.ressources:
215 print("Testing command job on ", resource)
216 job_params.result_directory = local_result_dir + resource
217 job_params.job_name = "CommandJob_" + resource
218 job_params.resource_required.name = resource
220 # use the working directory of the resource
221 resParams = resManager.GetResourceDefinition(resource)
222 wd = os.path.join(resParams.working_directory,
223 "CommandJob" + self.suffix)
224 job_params.work_directory = wd
226 job_id = launcher.createJob(job_params)
227 launcher.launchJob(job_id)
228 # wait for the end of the job
229 jobState = launcher.getJobState(job_id)
230 print("Job %d state: %s" % (job_id,jobState))
231 while jobState != "FINISHED" and jobState != "FAILED" :
233 jobState = launcher.getJobState(job_id)
234 print("Job %d state: %s" % (job_id,jobState))
238 self.assertEqual(jobState, "FINISHED")
239 launcher.getJobResults(job_id, "")
240 self.verifyFile(os.path.join(job_params.result_directory, "result.txt"),
242 self.verifyFile(os.path.join(job_params.result_directory,
243 "copie",'copie.txt'),
246 # verify getJobWorkFile
247 mydir = os.path.join(case_test_dir, "work_dir" + resource)
248 success = launcher.getJobWorkFile(job_id, "result.txt", mydir)
249 self.assertEqual(success, True)
250 self.verifyFile(os.path.join(mydir, "result.txt"), "expected")
252 success = launcher.getJobWorkFile(job_id, "copie", mydir)
253 self.assertEqual(success, True)
254 self.verifyFile(os.path.join(mydir, "copie", "copie.txt"),
258 ##############################
259 # test of yacs job type
260 ##############################
262 yacs_path = os.getenv("YACS_ROOT_DIR", "")
263 if not os.path.isdir(yacs_path):
264 self.skipTest("Needs YACS module to run. Please define YACS_ROOT_DIR.")
266 case_test_dir = os.path.join(TestCompo.test_dir, "yacs")
267 mkdir_p(case_test_dir)
270 env_file = "myEnv.sh"
271 env_text = """export ENV_TEST_VAR="expected"
273 f = open(os.path.join(case_test_dir, env_file), "w")
278 script_text = """<?xml version='1.0' encoding='iso-8859-1' ?>
279 <proc name="newSchema_1">
280 <container name="DefaultContainer">
281 <property name="container_kind" value="Salome"/>
282 <property name="attached_on_cloning" value="0"/>
283 <property name="container_name" value="FactoryServer"/>
284 <property name="name" value="localhost"/>
286 <inline name="PyScript0">
287 <script><code><![CDATA[import os
288 text_result = os.getenv("ENV_TEST_VAR","")
289 f = open('result.txt', 'w')
293 <load container="DefaultContainer"/>
297 yacs_file = "mySchema.xml"
298 job_script_file = os.path.join(case_test_dir, yacs_file)
299 f = open(job_script_file, "w")
303 local_result_dir = os.path.join(case_test_dir, "result_yacs_job-")
304 job_params = self.create_JobParameters()
305 job_params.job_type = "yacs_file"
306 job_params.job_file = job_script_file
307 job_params.env_file = os.path.join(case_test_dir,env_file)
308 job_params.out_files = ["result.txt"]
310 # define the interval between two YACS schema dumps (3 seconds)
312 job_params.specific_parameters = [Engines.Parameter("EnableDumpYACS", "3")]
314 launcher = salome.naming_service.Resolve('/SalomeLauncher')
315 resManager= salome.lcc.getResourcesManager()
317 for resource in self.ressources:
318 print("Testing yacs job on ", resource)
319 job_params.result_directory = local_result_dir + resource
320 job_params.job_name = "YacsJob_" + resource
321 job_params.resource_required.name = resource
323 # use the working directory of the resource
324 resParams = resManager.GetResourceDefinition(resource)
325 wd = os.path.join(resParams.working_directory,
326 "YacsJob" + self.suffix)
327 job_params.work_directory = wd
329 job_id = launcher.createJob(job_params)
330 launcher.launchJob(job_id)
331 jobState = launcher.getJobState(job_id)
333 yacs_dump_success = False
334 print("Job %d state: %s" % (job_id,jobState))
335 while jobState != "FINISHED" and jobState != "FAILED" :
337 jobState = launcher.getJobState(job_id)
338 # yacs_dump_success = launcher.getJobWorkFile(job_id, "dumpState_mySchema.xml",
339 yacs_dump_success = launcher.getJobDumpState(job_id,
340 job_params.result_directory)
341 print("Job %d state: %s - dump: %s" % (job_id,jobState, yacs_dump_success))
344 self.assertEqual(jobState, "FINISHED")
346 # Verify dumpState file is in the results
347 self.assertTrue(yacs_dump_success)
348 dump_file_path = os.path.join(job_params.result_directory,
349 "dumpState_mySchema.xml")
350 self.assertTrue(os.path.isfile(dump_file_path))
352 # Load the schema state from the dump file and verify the state of a node
354 SALOMERuntime.RuntimeSALOME_setRuntime(1)
356 schema = loader.YACSLoader().load(job_script_file)
357 stateParser = loader.stateParser()
358 sl = loader.stateLoader(stateParser, schema)
359 sl.parse(dump_file_path)
360 # 106 : "DONE" state code
361 self.assertEqual(106, schema.getChildByName("PyScript0").getEffectiveState())
363 # getJobResults to default directory (result_directory)
364 launcher.getJobResults(job_id, "")
365 self.verifyFile(os.path.join(job_params.result_directory, "result.txt"),
368 ##############################
369 # test of yacs job type using "--init_port" driver option
370 ##############################
371 def test_yacsopt(self):
372 yacs_path = os.getenv("YACS_ROOT_DIR", "")
373 if not os.path.isdir(yacs_path):
374 self.skipTest("Needs YACS module to run. Please define YACS_ROOT_DIR.")
376 case_test_dir = os.path.join(TestCompo.test_dir, "yacs_opt")
377 mkdir_p(case_test_dir)
380 script_text = """<?xml version='1.0' encoding='iso-8859-1' ?>
381 <proc name="myschema">
382 <type name="string" kind="string"/>
383 <type name="bool" kind="bool"/>
384 <type name="double" kind="double"/>
385 <type name="int" kind="int"/>
386 <container name="DefaultContainer">
387 <property name="container_kind" value="Salome"/>
388 <property name="attached_on_cloning" value="0"/>
389 <property name="container_name" value="FactoryServer"/>
390 <property name="name" value="localhost"/>
392 <inline name="mynode">
393 <script><code><![CDATA[
394 text_result = "i=%s,d=%s,b=%s,s=%s" % (i,d,b,s)
395 f = open('result.txt', 'w')
399 <load container="DefaultContainer"/>
400 <inport name="i" type="int"/>
401 <inport name="d" type="double"/>
402 <inport name="b" type="bool"/>
403 <inport name="s" type="string"/>
407 yacs_file = "simpleSchema.xml"
408 job_script_file = os.path.join(case_test_dir, yacs_file)
409 f = open(job_script_file, "w")
413 local_result_dir = os.path.join(case_test_dir, "result_yacsopt_job-")
414 job_params = self.create_JobParameters()
415 job_params.job_type = "yacs_file"
416 job_params.job_file = job_script_file
417 job_params.out_files = ["result.txt"]
419 # define the interval between two YACS schema dumps (3 seconds)
421 job_params.specific_parameters = [Engines.Parameter("YACSDriverOptions",
422 "-imynode.i=5 -imynode.d=3.7 -imynode.b=False -imynode.s=lili")]
423 expected_result="i=5,d=3.7,b=False,s=lili"
425 launcher = salome.naming_service.Resolve('/SalomeLauncher')
426 resManager= salome.lcc.getResourcesManager()
428 for resource in self.ressources:
429 print("Testing yacs job with options on ", resource)
430 job_params.result_directory = local_result_dir + resource
431 job_params.job_name = "YacsJobOpt_" + resource
432 job_params.resource_required.name = resource
434 # use the working directory of the resource
435 resParams = resManager.GetResourceDefinition(resource)
436 wd = os.path.join(resParams.working_directory,
437 "YacsJobOpt" + self.suffix)
438 job_params.work_directory = wd
440 job_id = launcher.createJob(job_params)
441 launcher.launchJob(job_id)
442 jobState = launcher.getJobState(job_id)
444 yacs_dump_success = False
445 print("Job %d state: %s" % (job_id,jobState))
446 while jobState != "FINISHED" and jobState != "FAILED" :
448 jobState = launcher.getJobState(job_id)
449 print("Job %d state: %s " % (job_id,jobState))
452 self.assertEqual(jobState, "FINISHED")
454 # getJobResults to default directory (result_directory)
455 launcher.getJobResults(job_id, "")
456 self.verifyFile(os.path.join(job_params.result_directory, "result.txt"),
459 ############################################
460 # test of command job type with pre_command
461 ############################################
462 def test_command_pre(self):
463 case_test_dir = os.path.join(TestCompo.test_dir, "command_pre")
464 mkdir_p(case_test_dir)
466 # command to be run before the job
467 pre_command = "pre_command.sh"
468 pre_command_text = "echo 'it works!' > in.txt"
469 abs_pre_command_file = os.path.join(case_test_dir, pre_command)
470 f = open(abs_pre_command_file, "w")
471 f.write(pre_command_text)
473 os.chmod(abs_pre_command_file, 0o755)
476 script_file = "myTestScript.py"
477 script_text = """#! /usr/bin/env python
478 # -*- coding: utf-8 -*-
480 in_f = open("in.txt", "r")
481 in_text = in_f.read()
484 f = open('result.txt', 'w')
488 abs_script_file = os.path.join(case_test_dir, script_file)
489 f = open(abs_script_file, "w")
492 os.chmod(abs_script_file, 0o755)
495 local_result_dir = os.path.join(case_test_dir, "result_com_pre_job-")
496 job_params = self.create_JobParameters()
497 job_params.job_type = "command"
498 job_params.job_file = script_file
499 job_params.pre_command = pre_command
500 job_params.in_files = []
501 job_params.out_files = ["result.txt"]
502 job_params.local_directory = case_test_dir
504 # create and launch the job
505 launcher = salome.naming_service.Resolve('/SalomeLauncher')
506 resManager= salome.lcc.getResourcesManager()
508 for resource in self.ressources:
509 print "Testing command job on ", resource
510 job_params.result_directory = local_result_dir + resource
511 job_params.job_name = "CommandPreJob_" + resource
512 job_params.resource_required.name = resource
514 # use the working directory of the resource
515 resParams = resManager.GetResourceDefinition(resource)
516 wd = os.path.join(resParams.working_directory,
517 "CommandPreJob" + self.suffix)
518 job_params.work_directory = wd
520 job_id = launcher.createJob(job_params)
521 launcher.launchJob(job_id)
522 # wait for the end of the job
523 jobState = launcher.getJobState(job_id)
524 print "Job %d state: %s" % (job_id,jobState)
525 while jobState != "FINISHED" and jobState != "FAILED" :
527 jobState = launcher.getJobState(job_id)
528 print "Job %d state: %s" % (job_id,jobState)
532 self.assertEqual(jobState, "FINISHED")
533 launcher.getJobResults(job_id, "")
534 self.verifyFile(os.path.join(job_params.result_directory, "result.txt"),
537 #################################
538 # test of command salome job type
539 #################################
540 def test_command_salome(self):
541 case_test_dir = os.path.join(TestCompo.test_dir, "command_salome")
542 mkdir_p(case_test_dir)
546 script_file = "myEnvScript.py"
547 script_text = """#! /usr/bin/env python
548 # -*- coding: utf-8 -*-
551 # verify import salome
554 text_result = os.getenv("ENV_TEST_VAR","")
556 f = open('result.txt', 'w')
560 in_f = open("in.txt", "r")
561 in_text = in_f.read()
565 f = open(os.path.join("copie",'copie.txt'), 'w')
569 abs_script_file = os.path.join(case_test_dir, script_file)
570 f = open(abs_script_file, "w")
573 os.chmod(abs_script_file, 0o755)
576 env_file = "myEnv.sh"
577 env_text = """export ENV_TEST_VAR="expected"
579 f = open(os.path.join(case_test_dir, env_file), "w")
584 f = open(os.path.join(case_test_dir, data_file), "w")
585 f.write("to be copied")
589 local_result_dir = os.path.join(case_test_dir, "result_comsalome_job-")
590 job_params = self.create_JobParameters()
591 job_params.job_type = "command_salome"
592 job_params.job_file = script_file
593 job_params.env_file = env_file
594 job_params.in_files = [data_file]
595 job_params.out_files = ["result.txt", "copie"]
596 job_params.local_directory = case_test_dir
598 # create and launch the job
599 launcher = salome.naming_service.Resolve('/SalomeLauncher')
600 resManager= salome.lcc.getResourcesManager()
602 for resource in self.ressources:
603 print "Testing command salome job on ", resource
604 job_params.result_directory = local_result_dir + resource
605 job_params.job_name = "CommandSalomeJob_" + resource
606 job_params.resource_required.name = resource
608 # use the working directory of the resource
609 resParams = resManager.GetResourceDefinition(resource)
610 wd = os.path.join(resParams.working_directory,
611 "CommandSalomeJob" + self.suffix)
612 job_params.work_directory = wd
614 job_id = launcher.createJob(job_params)
615 launcher.launchJob(job_id)
616 # wait for the end of the job
617 jobState = launcher.getJobState(job_id)
618 print "Job %d state: %s" % (job_id,jobState)
619 while jobState != "FINISHED" and jobState != "FAILED" :
621 jobState = launcher.getJobState(job_id)
622 print "Job %d state: %s" % (job_id,jobState)
626 self.assertEqual(jobState, "FINISHED")
627 launcher.getJobResults(job_id, "")
628 self.verifyFile(os.path.join(job_params.result_directory, "result.txt"),
630 self.verifyFile(os.path.join(job_params.result_directory,
631 "copie",'copie.txt'),
634 # verify getJobWorkFile
635 mydir = os.path.join(case_test_dir, "work_dir" + resource)
636 success = launcher.getJobWorkFile(job_id, "result.txt", mydir)
637 self.assertEqual(success, True)
638 self.verifyFile(os.path.join(mydir, "result.txt"), "expected")
640 success = launcher.getJobWorkFile(job_id, "copie", mydir)
641 self.assertEqual(success, True)
642 self.verifyFile(os.path.join(mydir, "copie", "copie.txt"),
648 if __name__ == '__main__':