1 #! /usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 # Copyright (C) 2014-2023 CEA, EDF
5 # This library is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation; either
8 # version 2.1 of the License, or (at your option) any later version.
10 # This library is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 # Lesser General Public License for more details.
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this library; if not, write to the Free Software
17 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
32 except OSError as exc: # Python >2.5
33 if exc.errno == errno.EEXIST and os.path.isdir(path):
40 launcher = pylauncher.Launcher_cpp()
41 launcher.SetResourcesManager(createResourcesManager())
44 def createResourcesManager():
45 # localhost is defined anyway, even if the catalog file does not exist.
46 catalog_path = os.environ.get("USER_CATALOG_RESOURCES_FILE", "")
47 return pylauncher.ResourcesManager_cpp(catalog_path)
49 def createJobParameters():
50 jp = pylauncher.JobParameters_cpp()
51 jp.resource_required = createResourceParameters()
54 def createResourceParameters():
55 return pylauncher.resourceParams()
57 # Test of SalomeLauncher.
58 # This test should be run in the salome environment, using "salome shell".
59 # It does not need a salome application running.
60 # The test will try to launch batch jobs on every available resources which
61 # have the can_launch_batch_jobs parameter set to True.
62 # You can use the environment variable USER_CATALOG_RESOURCES_FILE in order to
63 # define a customised resource catalog.
64 # If YACS_ROOT_DIR is not set, the test of submitting a YACS schema will be
66 class TestCompo(unittest.TestCase):
69 # Prepare the test directory
70 temp = tempfile.NamedTemporaryFile()
71 cls.test_dir = os.path.join(temp.name, "test_dir")
72 name = os.path.basename(temp.name)
74 cls.suffix = time.strftime("-%Y-%m-%d-%H-%M-%S")+"-%s"%(os.getpid())
77 # Get the list of possible ressources
78 ressource_param = createResourceParameters()
79 ressource_param.can_launch_batch_jobs = True
80 rm = createResourcesManager()
81 cls.ressources = rm.GetFittingResources(ressource_param)
83 def verifyFile(self, path, content):
88 self.assertEqual(text, content)
90 self.fail("IO exception:" + str(ex));
92 def create_JobParameters(self):
93 job_params = createJobParameters()
94 job_params.wckey="P11U5:CARBONES" #needed by edf clusters
95 job_params.resource_required.nb_proc = 1
98 ##############################
99 # test of python_salome job
100 ##############################
101 def test_salome_py_job(self):
102 case_test_dir = os.path.join(TestCompo.test_dir, "salome_py")
103 mkdir_p(case_test_dir)
105 old_dir = os.getcwd()
106 os.chdir(case_test_dir)
109 script_file = "myScript.py"
110 job_script_file = os.path.join(case_test_dir, script_file)
111 script_text = """#! /usr/bin/env python3
112 # -*- coding: utf-8 -*-
114 # verify import salome
118 f = open('result.txt', 'w')
124 f = open(os.path.join("subdir",'autre.txt'), 'w')
128 f = open(job_script_file, "w")
132 local_result_dir = os.path.join(case_test_dir, "result_py_job-")
133 job_params = self.create_JobParameters()
134 job_params.job_type = "python_salome"
135 job_params.job_file = job_script_file
136 job_params.in_files = []
137 job_params.out_files = ["result.txt", "subdir"]
139 launcher = createLauncher()
141 for resource in self.ressources:
142 print("Testing python_salome job on ", resource)
143 job_params.result_directory = local_result_dir + resource
144 job_params.job_name = "PyJob" + resource
145 job_params.resource_required.name = resource
146 # use default working directory for this test
148 job_id = launcher.createJob(job_params)
149 launcher.launchJob(job_id)
151 jobState = launcher.getJobState(job_id)
152 print("Job %d state: %s" % (job_id,jobState))
153 while jobState != "FINISHED" and jobState != "FAILED" :
155 jobState = launcher.getJobState(job_id)
156 print("Job %d state: %s" % (job_id,jobState))
159 self.assertEqual(jobState, "FINISHED")
161 # getJobResults to default directory (result_directory)
162 launcher.getJobResults(job_id, "")
163 self.verifyFile(os.path.join(job_params.result_directory, "result.txt"),
165 self.verifyFile(os.path.join(job_params.result_directory,
166 "subdir", "autre.txt"),
169 # getJobResults to a specific directory
170 mydir = os.path.join(case_test_dir, "custom_result_dir" + resource)
171 launcher.getJobResults(job_id, mydir)
172 self.verifyFile(os.path.join(mydir, "result.txt"), "Salut!")
173 self.verifyFile(os.path.join(mydir, "subdir", "autre.txt"), "Hello!")
178 ##############################
179 # test of command job type
180 ##############################
181 def test_command(self):
182 case_test_dir = os.path.join(TestCompo.test_dir, "command")
183 mkdir_p(case_test_dir)
187 script_file = "myEnvScript.py"
188 script_text = """#! /usr/bin/env python3
189 # -*- coding: utf-8 -*-
193 text_result = os.getenv("ENV_TEST_VAR","")
195 f = open('result.txt', 'w')
199 in_f = open("in.txt", "r")
200 in_text = in_f.read()
204 f = open(os.path.join("copie",'copie.txt'), 'w')
208 abs_script_file = os.path.join(case_test_dir, script_file)
209 f = open(abs_script_file, "w")
212 os.chmod(abs_script_file, 0o755)
215 env_file = "myEnv.sh"
216 env_text = """export ENV_TEST_VAR="expected"
218 f = open(os.path.join(case_test_dir, env_file), "w")
223 f = open(os.path.join(case_test_dir, data_file), "w")
224 f.write("to be copied")
228 local_result_dir = os.path.join(case_test_dir, "result_com_job-")
229 job_params = self.create_JobParameters()
230 job_params.job_type = "command"
231 job_params.job_file = script_file
232 job_params.env_file = env_file
233 job_params.in_files = [data_file]
234 job_params.out_files = ["result.txt", "copie"]
235 job_params.local_directory = case_test_dir
237 # create and launch the job
238 launcher = createLauncher()
239 resManager= createResourcesManager()
241 for resource in self.ressources:
242 print("Testing command job on ", resource)
243 job_params.result_directory = local_result_dir + resource
244 job_params.job_name = "CommandJob_" + resource
245 job_params.resource_required.name = resource
247 # use the working directory of the resource
248 resParams = resManager.GetResourceDefinition(resource)
249 wd = os.path.join(resParams.working_directory,
250 "CommandJob" + self.suffix)
251 job_params.work_directory = wd
253 job_id = launcher.createJob(job_params)
254 launcher.launchJob(job_id)
255 # wait for the end of the job
256 jobState = launcher.getJobState(job_id)
257 print("Job %d state: %s" % (job_id,jobState))
258 while jobState != "FINISHED" and jobState != "FAILED" :
260 jobState = launcher.getJobState(job_id)
261 print("Job %d state: %s" % (job_id,jobState))
265 self.assertEqual(jobState, "FINISHED")
266 launcher.getJobResults(job_id, "")
267 self.verifyFile(os.path.join(job_params.result_directory, "result.txt"),
269 self.verifyFile(os.path.join(job_params.result_directory,
270 "copie",'copie.txt'),
273 # verify getJobWorkFile
274 mydir = os.path.join(case_test_dir, "work_dir" + resource)
275 success = launcher.getJobWorkFile(job_id, "result.txt", mydir)
276 self.assertEqual(success, True)
277 self.verifyFile(os.path.join(mydir, "result.txt"), "expected")
279 success = launcher.getJobWorkFile(job_id, "copie", mydir)
280 self.assertEqual(success, True)
281 self.verifyFile(os.path.join(mydir, "copie", "copie.txt"),
285 ##############################
286 # test of yacs job type
287 ##############################
289 yacs_path = os.getenv("YACS_ROOT_DIR", "")
290 if not os.path.isdir(yacs_path):
291 self.skipTest("Needs YACS module to run. Please define YACS_ROOT_DIR.")
293 case_test_dir = os.path.join(TestCompo.test_dir, "yacs")
294 mkdir_p(case_test_dir)
297 env_file = "myEnv.sh"
298 env_text = """export ENV_TEST_VAR="expected"
300 f = open(os.path.join(case_test_dir, env_file), "w")
305 script_text = """<?xml version='1.0' encoding='iso-8859-1' ?>
306 <proc name="newSchema_1">
307 <container name="DefaultContainer">
308 <property name="container_kind" value="Salome"/>
309 <property name="attached_on_cloning" value="0"/>
310 <property name="container_name" value="FactoryServer"/>
311 <property name="name" value="localhost"/>
313 <inline name="PyScript0">
314 <script><code><![CDATA[import os
315 text_result = os.getenv("ENV_TEST_VAR","")
316 f = open('result.txt', 'w')
320 <load container="DefaultContainer"/>
324 yacs_file = "mySchema.xml"
325 job_script_file = os.path.join(case_test_dir, yacs_file)
326 f = open(job_script_file, "w")
330 local_result_dir = os.path.join(case_test_dir, "result_yacs_job-")
331 job_params = self.create_JobParameters()
332 job_params.job_type = "yacs_file"
333 job_params.job_file = job_script_file
334 job_params.env_file = os.path.join(case_test_dir,env_file)
335 job_params.out_files = ["result.txt"]
337 # define the interval between two YACS schema dumps (3 seconds)
339 #job_params.specific_parameters = [Engines.Parameter("EnableDumpYACS", "3")]
340 job_params.specific_parameters = {"EnableDumpYACS": "3"}
342 launcher = createLauncher()
343 resManager= createResourcesManager()
345 for resource in self.ressources:
346 print("Testing yacs job on ", resource)
347 job_params.result_directory = local_result_dir + resource
348 job_params.job_name = "YacsJob_" + resource
349 job_params.resource_required.name = resource
351 # use the working directory of the resource
352 resParams = resManager.GetResourceDefinition(resource)
353 wd = os.path.join(resParams.working_directory,
354 "YacsJob" + self.suffix)
355 job_params.work_directory = wd
357 job_id = launcher.createJob(job_params)
358 launcher.launchJob(job_id)
359 jobState = launcher.getJobState(job_id)
361 yacs_dump_success = False
362 print("Job %d state: %s" % (job_id,jobState))
363 while jobState != "FINISHED" and jobState != "FAILED" :
365 jobState = launcher.getJobState(job_id)
366 # yacs_dump_success = launcher.getJobWorkFile(job_id, "dumpState_mySchema.xml",
367 yacs_dump_success = launcher.getJobDumpState(job_id,
368 job_params.result_directory)
369 print("Job %d state: %s - dump: %s" % (job_id,jobState, yacs_dump_success))
372 self.assertEqual(jobState, "FINISHED")
374 # Verify dumpState file is in the results
375 self.assertTrue(yacs_dump_success)
376 dump_file_path = os.path.join(job_params.result_directory,
377 "dumpState_mySchema.xml")
378 self.assertTrue(os.path.isfile(dump_file_path))
381 # Load the schema state from the dump file and verify the state of a node
383 SALOMERuntime.RuntimeSALOME_setRuntime(1)
385 schema = loader.YACSLoader().load(job_script_file)
386 stateParser = loader.stateParser()
387 sl = loader.stateLoader(stateParser, schema)
388 sl.parse(dump_file_path)
389 # 106 : "DONE" state code
390 self.assertEqual(106, schema.getChildByName("PyScript0").getEffectiveState())
393 # getJobResults to default directory (result_directory)
394 launcher.getJobResults(job_id, "")
395 self.verifyFile(os.path.join(job_params.result_directory, "result.txt"),
398 ##############################
399 # test of yacs job type using "--init_port" driver option
400 ##############################
401 def test_yacsopt(self):
402 yacs_path = os.getenv("YACS_ROOT_DIR", "")
403 if not os.path.isdir(yacs_path):
404 self.skipTest("Needs YACS module to run. Please define YACS_ROOT_DIR.")
406 case_test_dir = os.path.join(TestCompo.test_dir, "yacs_opt")
407 mkdir_p(case_test_dir)
410 script_text = """<?xml version='1.0' encoding='iso-8859-1' ?>
411 <proc name="myschema">
412 <type name="string" kind="string"/>
413 <type name="bool" kind="bool"/>
414 <type name="double" kind="double"/>
415 <type name="int" kind="int"/>
416 <container name="DefaultContainer">
417 <property name="container_kind" value="Salome"/>
418 <property name="attached_on_cloning" value="0"/>
419 <property name="container_name" value="FactoryServer"/>
420 <property name="name" value="localhost"/>
422 <inline name="mynode">
423 <script><code><![CDATA[
424 text_result = "i=%s,d=%s,b=%s,s=%s" % (i,d,b,s)
425 f = open('result.txt', 'w')
429 <load container="DefaultContainer"/>
430 <inport name="i" type="int"/>
431 <inport name="d" type="double"/>
432 <inport name="b" type="bool"/>
433 <inport name="s" type="string"/>
437 yacs_file = "simpleSchema.xml"
438 job_script_file = os.path.join(case_test_dir, yacs_file)
439 f = open(job_script_file, "w")
443 local_result_dir = os.path.join(case_test_dir, "result_yacsopt_job-")
444 job_params = self.create_JobParameters()
445 job_params.job_type = "yacs_file"
446 job_params.job_file = job_script_file
447 job_params.out_files = ["result.txt"]
449 # define the interval between two YACS schema dumps (3 seconds)
451 #job_params.specific_parameters = [Engines.Parameter("YACSDriverOptions",
452 # "-imynode.i=5 -imynode.d=3.7 -imynode.b=False -imynode.s=lili")]
453 job_params.specific_parameters = {"YACSDriverOptions":
454 "-imynode.i=5 -imynode.d=3.7 -imynode.b=False -imynode.s=lili"}
455 expected_result="i=5,d=3.7,b=False,s=lili"
457 launcher = createLauncher()
458 resManager= createResourcesManager()
460 for resource in self.ressources:
461 print("Testing yacs job with options on ", resource)
462 job_params.result_directory = local_result_dir + resource
463 job_params.job_name = "YacsJobOpt_" + resource
464 job_params.resource_required.name = resource
466 # use the working directory of the resource
467 resParams = resManager.GetResourceDefinition(resource)
468 wd = os.path.join(resParams.working_directory,
469 "YacsJobOpt" + self.suffix)
470 job_params.work_directory = wd
472 job_id = launcher.createJob(job_params)
473 launcher.launchJob(job_id)
474 jobState = launcher.getJobState(job_id)
476 yacs_dump_success = False
477 print("Job %d state: %s" % (job_id,jobState))
478 while jobState != "FINISHED" and jobState != "FAILED" :
480 jobState = launcher.getJobState(job_id)
481 print("Job %d state: %s " % (job_id,jobState))
484 self.assertEqual(jobState, "FINISHED")
486 # getJobResults to default directory (result_directory)
487 launcher.getJobResults(job_id, "")
488 self.verifyFile(os.path.join(job_params.result_directory, "result.txt"),
491 ############################################
492 # test of command job type with pre_command
493 ############################################
494 def test_command_pre(self):
495 case_test_dir = os.path.join(TestCompo.test_dir, "command_pre")
496 mkdir_p(case_test_dir)
498 # command to be run before the job
499 pre_command = "pre_command.sh"
500 pre_command_text = "echo 'it works!' > in.txt"
501 abs_pre_command_file = os.path.join(case_test_dir, pre_command)
502 f = open(abs_pre_command_file, "w")
503 f.write(pre_command_text)
505 os.chmod(abs_pre_command_file, 0o755)
508 script_file = "myTestScript.py"
509 script_text = """#! /usr/bin/env python3
510 # -*- coding: utf-8 -*-
512 in_f = open("in.txt", "r")
513 in_text = in_f.read()
516 f = open('result.txt', 'w')
520 abs_script_file = os.path.join(case_test_dir, script_file)
521 f = open(abs_script_file, "w")
524 os.chmod(abs_script_file, 0o755)
527 local_result_dir = os.path.join(case_test_dir, "result_com_pre_job-")
528 job_params = self.create_JobParameters()
529 job_params.job_type = "command"
530 job_params.job_file = script_file
531 job_params.pre_command = pre_command
532 job_params.in_files = []
533 job_params.out_files = ["result.txt"]
534 job_params.local_directory = case_test_dir
536 # create and launch the job
537 launcher = createLauncher()
538 resManager= createResourcesManager()
540 for resource in self.ressources:
541 print("Testing command job on ", resource)
542 job_params.result_directory = local_result_dir + resource
543 job_params.job_name = "CommandPreJob_" + resource
544 job_params.resource_required.name = resource
546 # use the working directory of the resource
547 resParams = resManager.GetResourceDefinition(resource)
548 wd = os.path.join(resParams.working_directory,
549 "CommandPreJob" + self.suffix)
550 job_params.work_directory = wd
552 job_id = launcher.createJob(job_params)
553 launcher.launchJob(job_id)
554 # wait for the end of the job
555 jobState = launcher.getJobState(job_id)
556 print("Job %d state: %s" % (job_id,jobState))
557 while jobState != "FINISHED" and jobState != "FAILED" :
559 jobState = launcher.getJobState(job_id)
560 print("Job %d state: %s" % (job_id,jobState))
564 self.assertEqual(jobState, "FINISHED")
565 launcher.getJobResults(job_id, "")
566 self.verifyFile(os.path.join(job_params.result_directory, "result.txt"),
569 #################################
570 # test of command salome job type
571 #################################
572 def test_command_salome(self):
573 case_test_dir = os.path.join(TestCompo.test_dir, "command_salome")
574 mkdir_p(case_test_dir)
578 script_file = "myEnvScript.py"
579 script_text = """#! /usr/bin/env python3
580 # -*- coding: utf-8 -*-
583 # verify import salome
586 text_result = os.getenv("ENV_TEST_VAR","")
588 f = open('result.txt', 'w')
592 in_f = open("in.txt", "r")
593 in_text = in_f.read()
597 f = open(os.path.join("copie",'copie.txt'), 'w')
601 abs_script_file = os.path.join(case_test_dir, script_file)
602 f = open(abs_script_file, "w")
605 os.chmod(abs_script_file, 0o755)
608 env_file = "myEnv.sh"
609 env_text = """export ENV_TEST_VAR="expected"
611 f = open(os.path.join(case_test_dir, env_file), "w")
616 f = open(os.path.join(case_test_dir, data_file), "w")
617 f.write("to be copied")
621 local_result_dir = os.path.join(case_test_dir, "result_comsalome_job-")
622 job_params = self.create_JobParameters()
623 job_params.job_type = "command_salome"
624 job_params.job_file = script_file
625 job_params.env_file = env_file
626 job_params.in_files = [data_file]
627 job_params.out_files = ["result.txt", "copie"]
628 job_params.local_directory = case_test_dir
630 # create and launch the job
631 launcher = createLauncher()
632 resManager= createResourcesManager()
634 for resource in self.ressources:
635 print("Testing command salome job on ", resource)
636 job_params.result_directory = local_result_dir + resource
637 job_params.job_name = "CommandSalomeJob_" + resource
638 job_params.resource_required.name = resource
640 # use the working directory of the resource
641 resParams = resManager.GetResourceDefinition(resource)
642 wd = os.path.join(resParams.working_directory,
643 "CommandSalomeJob" + self.suffix)
644 job_params.work_directory = wd
646 job_id = launcher.createJob(job_params)
647 launcher.launchJob(job_id)
648 # wait for the end of the job
649 jobState = launcher.getJobState(job_id)
650 print("Job %d state: %s" % (job_id,jobState))
651 while jobState != "FINISHED" and jobState != "FAILED" :
653 jobState = launcher.getJobState(job_id)
654 print("Job %d state: %s" % (job_id,jobState))
658 self.assertEqual(jobState, "FINISHED")
659 launcher.getJobResults(job_id, "")
660 self.verifyFile(os.path.join(job_params.result_directory, "result.txt"),
662 self.verifyFile(os.path.join(job_params.result_directory,
663 "copie",'copie.txt'),
666 # verify getJobWorkFile
667 mydir = os.path.join(case_test_dir, "work_dir" + resource)
668 success = launcher.getJobWorkFile(job_id, "result.txt", mydir)
669 self.assertEqual(success, True)
670 self.verifyFile(os.path.join(mydir, "result.txt"), "expected")
672 success = launcher.getJobWorkFile(job_id, "copie", mydir)
673 self.assertEqual(success, True)
674 self.verifyFile(os.path.join(mydir, "copie", "copie.txt"),
680 if __name__ == '__main__':