1 #! /usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 # Copyright (C) 2014-2023 CEA, EDF
5 # This library is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation; either
8 # version 2.1 of the License, or (at your option) any later version.
10 # This library is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 # Lesser General Public License for more details.
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this library; if not, write to the Free Software
17 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
32 except OSError as exc: # Python >2.5
33 if exc.errno == errno.EEXIST and os.path.isdir(path):
38 # Test of SalomeLauncher.
39 # This test should be run in the salome environment, using "salome shell"
40 # and a salome application should be running.
41 # The test will try to launch batch jobs on every available resources which
42 # have the can_launch_batch_jobs parameter set to True.
43 # You can use the environment variable USER_CATALOG_RESOURCES_FILE in order to
44 # define a customised resource catalog.
45 # If YACS_ROOT_DIR is not set, the test of submitting a YACS schema will be
47 class TestCompo(unittest.TestCase):
50 # Prepare the test directory
51 temp = tempfile.NamedTemporaryFile()
52 cls.test_dir = os.path.join(temp.name, "test_dir")
53 name = os.path.basename(temp.name)
55 cls.suffix = time.strftime("-%Y-%m-%d-%H-%M-%S")+"-%s"%(os.getpid())
59 # mc = salome.naming_service.Resolve('/Kernel/ModulCatalog')
60 # ior = salome.orb.object_to_string(mc)
61 # import SALOMERuntime
62 # SALOMERuntime.RuntimeSALOME_setRuntime()
63 # salome_runtime = SALOMERuntime.getSALOMERuntime()
64 # session_catalog = salome_runtime.loadCatalog("session", ior)
65 # salome_runtime.addCatalog(session_catalog)
67 # Get the list of possible ressources
68 ressource_param = salome.ResourceParameters()
69 ressource_param.can_launch_batch_jobs = True
70 rm = salome.lcc.getResourcesManager()
71 cls.ressources = rm.GetFittingResources(ressource_param)
73 def verifyFile(self, path, content):
78 self.assertEqual(text, content)
80 self.fail("IO exception:" + str(ex));
82 def create_JobParameters(self):
83 job_params = salome.JobParameters()
84 job_params.wckey="P11N0:SALOME" #needed by edf clusters
85 job_params.resource_required = salome.ResourceParameters()
86 job_params.resource_required.nb_proc = 1
89 ##############################
90 # test of python_salome job
91 ##############################
92 def test_salome_py_job(self):
93 case_test_dir = os.path.join(TestCompo.test_dir, "salome_py")
94 mkdir_p(case_test_dir)
97 os.chdir(case_test_dir)
100 script_file = "myScript.py"
101 job_script_file = os.path.join(case_test_dir, script_file)
102 script_text = """#! /usr/bin/env python3
103 # -*- coding: utf-8 -*-
105 # verify import salome
110 f = open('result.txt', 'w')
116 f = open(os.path.join("subdir",'autre.txt'), 'w')
120 f = open(job_script_file, "w")
124 local_result_dir = os.path.join(case_test_dir, "result_py_job-")
125 job_params = self.create_JobParameters()
126 job_params.job_type = "python_salome"
127 job_params.job_file = job_script_file
128 job_params.in_files = []
129 job_params.out_files = ["result.txt", "subdir"]
131 launcher = salome.naming_service.Resolve('/SalomeLauncher')
133 for resource in self.ressources:
134 print("Testing python_salome job on ", resource)
135 job_params.result_directory = local_result_dir + resource
136 job_params.job_name = "PyJob" + resource
137 job_params.resource_required.name = resource
138 # use default working directory for this test
140 job_id = launcher.createJob(job_params)
141 launcher.launchJob(job_id)
143 jobState = launcher.getJobState(job_id)
144 print("Job %d state: %s" % (job_id,jobState))
145 while jobState != "FINISHED" and jobState != "FAILED" :
147 jobState = launcher.getJobState(job_id)
148 print("Job %d state: %s" % (job_id,jobState))
151 self.assertEqual(jobState, "FINISHED")
153 # getJobResults to default directory (result_directory)
154 launcher.getJobResults(job_id, "")
155 self.verifyFile(os.path.join(job_params.result_directory, "result.txt"),
157 self.verifyFile(os.path.join(job_params.result_directory,
158 "subdir", "autre.txt"),
161 # getJobResults to a specific directory
162 mydir = os.path.join(case_test_dir, "custom_result_dir" + resource)
163 launcher.getJobResults(job_id, mydir)
164 self.verifyFile(os.path.join(mydir, "result.txt"), "Salut!")
165 self.verifyFile(os.path.join(mydir, "subdir", "autre.txt"), "Hello!")
170 ##############################
171 # test of command job type
172 ##############################
173 def test_command(self):
174 case_test_dir = os.path.join(TestCompo.test_dir, "command")
175 mkdir_p(case_test_dir)
179 script_file = "myEnvScript.py"
180 script_text = """#! /usr/bin/env python3
181 # -*- coding: utf-8 -*-
185 text_result = os.getenv("ENV_TEST_VAR","")
187 f = open('result.txt', 'w')
191 in_f = open("in.txt", "r")
192 in_text = in_f.read()
196 f = open(os.path.join("copie",'copie.txt'), 'w')
200 abs_script_file = os.path.join(case_test_dir, script_file)
201 f = open(abs_script_file, "w")
204 os.chmod(abs_script_file, 0o755)
207 env_file = "myEnv.sh"
208 env_text = """export ENV_TEST_VAR="expected"
210 f = open(os.path.join(case_test_dir, env_file), "w")
215 f = open(os.path.join(case_test_dir, data_file), "w")
216 f.write("to be copied")
220 local_result_dir = os.path.join(case_test_dir, "result_com_job-")
221 job_params = self.create_JobParameters()
222 job_params.job_type = "command"
223 job_params.job_file = script_file
224 job_params.env_file = env_file
225 job_params.in_files = [data_file]
226 job_params.out_files = ["result.txt", "copie"]
227 job_params.local_directory = case_test_dir
229 # create and launch the job
230 launcher = salome.naming_service.Resolve('/SalomeLauncher')
231 resManager= salome.lcc.getResourcesManager()
233 for resource in self.ressources:
234 print("Testing command job on ", resource)
235 job_params.result_directory = local_result_dir + resource
236 job_params.job_name = "CommandJob_" + resource
237 job_params.resource_required.name = resource
239 # use the working directory of the resource
240 resParams = resManager.GetResourceDefinition(resource)
241 wd = os.path.join(resParams.working_directory,
242 "CommandJob" + self.suffix)
243 job_params.work_directory = wd
245 job_id = launcher.createJob(job_params)
246 launcher.launchJob(job_id)
247 # wait for the end of the job
248 jobState = launcher.getJobState(job_id)
249 print("Job %d state: %s" % (job_id,jobState))
250 while jobState != "FINISHED" and jobState != "FAILED" :
252 jobState = launcher.getJobState(job_id)
253 print("Job %d state: %s" % (job_id,jobState))
257 self.assertEqual(jobState, "FINISHED")
258 launcher.getJobResults(job_id, "")
259 self.verifyFile(os.path.join(job_params.result_directory, "result.txt"),
261 self.verifyFile(os.path.join(job_params.result_directory,
262 "copie",'copie.txt'),
265 # verify getJobWorkFile
266 mydir = os.path.join(case_test_dir, "work_dir" + resource)
267 success = launcher.getJobWorkFile(job_id, "result.txt", mydir)
268 self.assertEqual(success, True)
269 self.verifyFile(os.path.join(mydir, "result.txt"), "expected")
271 success = launcher.getJobWorkFile(job_id, "copie", mydir)
272 self.assertEqual(success, True)
273 self.verifyFile(os.path.join(mydir, "copie", "copie.txt"),
277 ##############################
278 # test of yacs job type
279 ##############################
281 yacs_path = os.getenv("YACS_ROOT_DIR", "")
282 if not os.path.isdir(yacs_path):
283 self.skipTest("Needs YACS module to run. Please define YACS_ROOT_DIR.")
285 case_test_dir = os.path.join(TestCompo.test_dir, "yacs")
286 mkdir_p(case_test_dir)
289 env_file = "myEnv.sh"
290 env_text = """export ENV_TEST_VAR="expected"
292 f = open(os.path.join(case_test_dir, env_file), "w")
297 script_text = """<?xml version='1.0' encoding='iso-8859-1' ?>
298 <proc name="newSchema_1">
299 <container name="DefaultContainer">
300 <property name="container_kind" value="Salome"/>
301 <property name="attached_on_cloning" value="0"/>
302 <property name="container_name" value="FactoryServer"/>
303 <property name="name" value="localhost"/>
305 <inline name="PyScript0">
306 <script><code><![CDATA[import os
307 text_result = os.getenv("ENV_TEST_VAR","")
308 f = open('result.txt', 'w')
312 <load container="DefaultContainer"/>
316 yacs_file = "mySchema.xml"
317 job_script_file = os.path.join(case_test_dir, yacs_file)
318 f = open(job_script_file, "w")
322 local_result_dir = os.path.join(case_test_dir, "result_yacs_job-")
323 job_params = self.create_JobParameters()
324 job_params.job_type = "yacs_file"
325 job_params.job_file = job_script_file
326 job_params.env_file = os.path.join(case_test_dir,env_file)
327 job_params.out_files = ["result.txt"]
329 # define the interval between two YACS schema dumps (3 seconds)
331 job_params.specific_parameters = [Engines.Parameter("EnableDumpYACS", "3")]
333 launcher = salome.naming_service.Resolve('/SalomeLauncher')
334 resManager= salome.lcc.getResourcesManager()
336 for resource in self.ressources:
337 print("Testing yacs job on ", resource)
338 job_params.result_directory = local_result_dir + resource
339 job_params.job_name = "YacsJob_" + resource
340 job_params.resource_required.name = resource
342 # use the working directory of the resource
343 resParams = resManager.GetResourceDefinition(resource)
344 wd = os.path.join(resParams.working_directory,
345 "YacsJob" + self.suffix)
346 job_params.work_directory = wd
348 job_id = launcher.createJob(job_params)
349 launcher.launchJob(job_id)
350 jobState = launcher.getJobState(job_id)
352 yacs_dump_success = False
353 print("Job %d state: %s" % (job_id,jobState))
354 while jobState != "FINISHED" and jobState != "FAILED" :
356 jobState = launcher.getJobState(job_id)
357 # yacs_dump_success = launcher.getJobWorkFile(job_id, "dumpState_mySchema.xml",
358 yacs_dump_success = launcher.getJobDumpState(job_id,
359 job_params.result_directory)
360 print("Job %d state: %s - dump: %s" % (job_id,jobState, yacs_dump_success))
363 self.assertEqual(jobState, "FINISHED")
365 # Verify dumpState file is in the results
366 self.assertTrue(yacs_dump_success)
367 dump_file_path = os.path.join(job_params.result_directory,
368 "dumpState_mySchema.xml")
369 self.assertTrue(os.path.isfile(dump_file_path))
371 # Load the schema state from the dump file and verify the state of a node
373 SALOMERuntime.RuntimeSALOME_setRuntime(1)
375 schema = loader.YACSLoader().load(job_script_file)
376 stateParser = loader.stateParser()
377 sl = loader.stateLoader(stateParser, schema)
378 sl.parse(dump_file_path)
379 # 106 : "DONE" state code
380 self.assertEqual(106, schema.getChildByName("PyScript0").getEffectiveState())
382 # getJobResults to default directory (result_directory)
383 launcher.getJobResults(job_id, "")
384 self.verifyFile(os.path.join(job_params.result_directory, "result.txt"),
387 ##############################
388 # test of yacs job type using "--init_port" driver option
389 ##############################
390 def test_yacsopt(self):
391 yacs_path = os.getenv("YACS_ROOT_DIR", "")
392 if not os.path.isdir(yacs_path):
393 self.skipTest("Needs YACS module to run. Please define YACS_ROOT_DIR.")
395 case_test_dir = os.path.join(TestCompo.test_dir, "yacs_opt")
396 mkdir_p(case_test_dir)
399 script_text = """<?xml version='1.0' encoding='iso-8859-1' ?>
400 <proc name="myschema">
401 <type name="string" kind="string"/>
402 <type name="bool" kind="bool"/>
403 <type name="double" kind="double"/>
404 <type name="int" kind="int"/>
405 <container name="DefaultContainer">
406 <property name="container_kind" value="Salome"/>
407 <property name="attached_on_cloning" value="0"/>
408 <property name="container_name" value="FactoryServer"/>
409 <property name="name" value="localhost"/>
411 <inline name="mynode">
412 <script><code><![CDATA[
413 text_result = "i=%s,d=%s,b=%s,s=%s" % (i,d,b,s)
414 f = open('result.txt', 'w')
418 <load container="DefaultContainer"/>
419 <inport name="i" type="int"/>
420 <inport name="d" type="double"/>
421 <inport name="b" type="bool"/>
422 <inport name="s" type="string"/>
426 yacs_file = "simpleSchema.xml"
427 job_script_file = os.path.join(case_test_dir, yacs_file)
428 f = open(job_script_file, "w")
432 local_result_dir = os.path.join(case_test_dir, "result_yacsopt_job-")
433 job_params = self.create_JobParameters()
434 job_params.job_type = "yacs_file"
435 job_params.job_file = job_script_file
436 job_params.out_files = ["result.txt"]
438 # define the interval between two YACS schema dumps (3 seconds)
440 job_params.specific_parameters = [Engines.Parameter("YACSDriverOptions",
441 "-imynode.i=5 -imynode.d=3.7 -imynode.b=False -imynode.s=lili")]
442 expected_result="i=5,d=3.7,b=False,s=lili"
444 launcher = salome.naming_service.Resolve('/SalomeLauncher')
445 resManager= salome.lcc.getResourcesManager()
447 for resource in self.ressources:
448 print("Testing yacs job with options on ", resource)
449 job_params.result_directory = local_result_dir + resource
450 job_params.job_name = "YacsJobOpt_" + resource
451 job_params.resource_required.name = resource
453 # use the working directory of the resource
454 resParams = resManager.GetResourceDefinition(resource)
455 wd = os.path.join(resParams.working_directory,
456 "YacsJobOpt" + self.suffix)
457 job_params.work_directory = wd
459 job_id = launcher.createJob(job_params)
460 launcher.launchJob(job_id)
461 jobState = launcher.getJobState(job_id)
463 yacs_dump_success = False
464 print("Job %d state: %s" % (job_id,jobState))
465 while jobState != "FINISHED" and jobState != "FAILED" :
467 jobState = launcher.getJobState(job_id)
468 print("Job %d state: %s " % (job_id,jobState))
471 self.assertEqual(jobState, "FINISHED")
473 # getJobResults to default directory (result_directory)
474 launcher.getJobResults(job_id, "")
475 self.verifyFile(os.path.join(job_params.result_directory, "result.txt"),
478 ############################################
479 # test of command job type with pre_command
480 ############################################
481 def test_command_pre(self):
482 case_test_dir = os.path.join(TestCompo.test_dir, "command_pre")
483 mkdir_p(case_test_dir)
485 # command to be run before the job
486 pre_command = "pre_command.sh"
487 pre_command_text = "echo 'it works!' > in.txt"
488 abs_pre_command_file = os.path.join(case_test_dir, pre_command)
489 f = open(abs_pre_command_file, "w")
490 f.write(pre_command_text)
492 os.chmod(abs_pre_command_file, 0o755)
495 script_file = "myTestScript.py"
496 script_text = """#! /usr/bin/env python3
497 # -*- coding: utf-8 -*-
499 in_f = open("in.txt", "r")
500 in_text = in_f.read()
503 f = open('result.txt', 'w')
507 abs_script_file = os.path.join(case_test_dir, script_file)
508 f = open(abs_script_file, "w")
511 os.chmod(abs_script_file, 0o755)
514 local_result_dir = os.path.join(case_test_dir, "result_com_pre_job-")
515 job_params = self.create_JobParameters()
516 job_params.job_type = "command"
517 job_params.job_file = script_file
518 job_params.pre_command = pre_command
519 job_params.in_files = []
520 job_params.out_files = ["result.txt"]
521 job_params.local_directory = case_test_dir
523 # create and launch the job
524 launcher = salome.naming_service.Resolve('/SalomeLauncher')
525 resManager= salome.lcc.getResourcesManager()
527 for resource in self.ressources:
528 print("Testing command job on ", resource)
529 job_params.result_directory = local_result_dir + resource
530 job_params.job_name = "CommandPreJob_" + resource
531 job_params.resource_required.name = resource
533 # use the working directory of the resource
534 resParams = resManager.GetResourceDefinition(resource)
535 wd = os.path.join(resParams.working_directory,
536 "CommandPreJob" + self.suffix)
537 job_params.work_directory = wd
539 job_id = launcher.createJob(job_params)
540 launcher.launchJob(job_id)
541 # wait for the end of the job
542 jobState = launcher.getJobState(job_id)
543 print("Job %d state: %s" % (job_id,jobState))
544 while jobState != "FINISHED" and jobState != "FAILED" :
546 jobState = launcher.getJobState(job_id)
547 print("Job %d state: %s" % (job_id,jobState))
551 self.assertEqual(jobState, "FINISHED")
552 launcher.getJobResults(job_id, "")
553 self.verifyFile(os.path.join(job_params.result_directory, "result.txt"),
556 #################################
557 # test of command salome job type
558 #################################
559 def test_command_salome(self):
560 case_test_dir = os.path.join(TestCompo.test_dir, "command_salome")
561 mkdir_p(case_test_dir)
565 script_file = "myEnvScript.py"
566 script_text = """#! /usr/bin/env python3
567 # -*- coding: utf-8 -*-
570 # verify import salome
573 text_result = os.getenv("ENV_TEST_VAR","")
575 f = open('result.txt', 'w')
579 in_f = open("in.txt", "r")
580 in_text = in_f.read()
584 f = open(os.path.join("copie",'copie.txt'), 'w')
588 abs_script_file = os.path.join(case_test_dir, script_file)
589 f = open(abs_script_file, "w")
592 os.chmod(abs_script_file, 0o755)
595 env_file = "myEnv.sh"
596 env_text = """export ENV_TEST_VAR="expected"
598 f = open(os.path.join(case_test_dir, env_file), "w")
603 f = open(os.path.join(case_test_dir, data_file), "w")
604 f.write("to be copied")
608 local_result_dir = os.path.join(case_test_dir, "result_comsalome_job-")
609 job_params = self.create_JobParameters()
610 job_params.job_type = "command_salome"
611 job_params.job_file = script_file
612 job_params.env_file = env_file
613 job_params.in_files = [data_file]
614 job_params.out_files = ["result.txt", "copie"]
615 job_params.local_directory = case_test_dir
617 # create and launch the job
618 launcher = salome.naming_service.Resolve('/SalomeLauncher')
619 resManager= salome.lcc.getResourcesManager()
621 for resource in self.ressources:
622 print("Testing command salome job on ", resource)
623 job_params.result_directory = local_result_dir + resource
624 job_params.job_name = "CommandSalomeJob_" + resource
625 job_params.resource_required.name = resource
627 # use the working directory of the resource
628 resParams = resManager.GetResourceDefinition(resource)
629 wd = os.path.join(resParams.working_directory,
630 "CommandSalomeJob" + self.suffix)
631 job_params.work_directory = wd
633 job_id = launcher.createJob(job_params)
634 launcher.launchJob(job_id)
635 # wait for the end of the job
636 jobState = launcher.getJobState(job_id)
637 print("Job %d state: %s" % (job_id,jobState))
638 while jobState != "FINISHED" and jobState != "FAILED" :
640 jobState = launcher.getJobState(job_id)
641 print("Job %d state: %s" % (job_id,jobState))
645 self.assertEqual(jobState, "FINISHED")
646 launcher.getJobResults(job_id, "")
647 self.verifyFile(os.path.join(job_params.result_directory, "result.txt"),
649 self.verifyFile(os.path.join(job_params.result_directory,
650 "copie",'copie.txt'),
653 # verify getJobWorkFile
654 mydir = os.path.join(case_test_dir, "work_dir" + resource)
655 success = launcher.getJobWorkFile(job_id, "result.txt", mydir)
656 self.assertEqual(success, True)
657 self.verifyFile(os.path.join(mydir, "result.txt"), "expected")
659 success = launcher.getJobWorkFile(job_id, "copie", mydir)
660 self.assertEqual(success, True)
661 self.verifyFile(os.path.join(mydir, "copie", "copie.txt"),
667 if __name__ == '__main__':