]> SALOME platform Git repositories - modules/kernel.git/blob - src/Launcher/Test/test_launcher.py
Salome HOME
339e4c8751b67e843c8e78a79a4528f7775544b5
[modules/kernel.git] / src / Launcher / Test / test_launcher.py
1 #! /usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import unittest
5 import os
6 import sys
7 import time
8 import tempfile
9 import errno
10
11 def mkdir_p(path):
12   try:
13     os.makedirs(path)
14   except OSError as exc:  # Python >2.5
15     if exc.errno == errno.EEXIST and os.path.isdir(path):
16       pass
17     else:
18       raise
19
20 # Test of SalomeLauncher.
21 # This test should be run in the salome environment, using "salome shell"
22 # and a salome application should be running.
23 # The test will try to launch batch jobs on every available resources which
24 # have the can_launch_batch_jobs parameter set to True.
25 # You can use the environment variable USER_CATALOG_RESOURCES_FILE in order to
26 # define a customised resource catalog.
27 # If YACS_ROOT_DIR is not set, the test of submitting a YACS schema will be
28 # skipped.
29 class TestCompo(unittest.TestCase):
30   @classmethod
31   def setUpClass(cls):
32     # Prepare the test directory
33     temp = tempfile.NamedTemporaryFile()
34     cls.test_dir = os.path.join(temp.name, "test_dir")
35     name = os.path.basename(temp.name)
36     temp.close()
37     cls.suffix = time.strftime("-%Y-%m-%d-%H-%M-%S")+"-%s"%(os.getpid())
38     mkdir_p(cls.test_dir)
39
40     # load catalogs
41 #    mc = salome.naming_service.Resolve('/Kernel/ModulCatalog')
42 #    ior = salome.orb.object_to_string(mc)
43 #    import SALOMERuntime
44 #    SALOMERuntime.RuntimeSALOME_setRuntime()
45 #    salome_runtime = SALOMERuntime.getSALOMERuntime()
46 #    session_catalog = salome_runtime.loadCatalog("session", ior)
47 #    salome_runtime.addCatalog(session_catalog)
48
49     # Get the list of possible ressources
50     ressource_param = salome.ResourceParameters()
51     ressource_param.can_launch_batch_jobs = True
52     rm = salome.lcc.getResourcesManager()
53     cls.ressources = rm.GetFittingResources(ressource_param)
54
55   def verifyFile(self, path, content):
56     try:
57       f = open(path, 'r')
58       text = f.read()
59       f.close()
60       self.assertEqual(text, content)
61     except IOError as ex:
62       self.fail("IO exception:" + str(ex));
63
64   def create_JobParameters(self):
65     job_params = salome.JobParameters()
66     job_params.wckey="P11U5:CARBONES" #needed by edf clusters
67     job_params.resource_required = salome.ResourceParameters()
68     job_params.resource_required.nb_proc = 1
69     return job_params
70
71   ##############################
72   # test of python_salome job
73   ##############################
74   def test_salome_py_job(self):
75     case_test_dir = os.path.join(TestCompo.test_dir, "salome_py")
76     mkdir_p(case_test_dir)
77
78     old_dir = os.getcwd()
79     os.chdir(case_test_dir)
80
81     # job script
82     script_file = "myScript.py"
83     job_script_file = os.path.join(case_test_dir, script_file)
84     script_text = """#! /usr/bin/env python
85 # -*- coding: utf-8 -*-
86
87 # verify import salome
88 import salome
89 salome.salome_init()
90
91 f = open('result.txt', 'w')
92 f.write("Salut!")
93 f.close()
94
95 import os
96 os.mkdir("subdir")
97 f = open(os.path.join("subdir",'autre.txt'), 'w')
98 f.write("Hello!")
99 f.close()
100 """
101     f = open(job_script_file, "w")
102     f.write(script_text)
103     f.close()
104
105     local_result_dir = os.path.join(case_test_dir, "result_py_job-")
106     job_params = self.create_JobParameters()
107     job_params.job_type = "python_salome"
108     job_params.job_file = job_script_file
109     job_params.in_files = []
110     job_params.out_files = ["result.txt", "subdir"]
111
112     launcher = salome.naming_service.Resolve('/SalomeLauncher')
113
114     for resource in self.ressources:
115       print("Testing python_salome job on ", resource)
116       job_params.result_directory = local_result_dir + resource
117       job_params.job_name = "PyJob" + resource
118       job_params.resource_required.name = resource
119       # use default working directory for this test
120
121       job_id = launcher.createJob(job_params)
122       launcher.launchJob(job_id)
123
124       jobState = launcher.getJobState(job_id)
125       print("Job %d state: %s" % (job_id,jobState))
126       while jobState != "FINISHED" and jobState != "FAILED" :
127         time.sleep(5)
128         jobState = launcher.getJobState(job_id)
129         print("Job %d state: %s" % (job_id,jobState))
130         pass
131
132       self.assertEqual(jobState, "FINISHED")
133
134       # getJobResults to default directory (result_directory)
135       launcher.getJobResults(job_id, "")
136       self.verifyFile(os.path.join(job_params.result_directory, "result.txt"),
137                       "Salut!")
138       self.verifyFile(os.path.join(job_params.result_directory,
139                                    "subdir", "autre.txt"),
140                       "Hello!")
141
142       # getJobResults to a specific directory
143       mydir = os.path.join(case_test_dir, "custom_result_dir" + resource)
144       launcher.getJobResults(job_id, mydir)
145       self.verifyFile(os.path.join(mydir, "result.txt"), "Salut!")
146       self.verifyFile(os.path.join(mydir, "subdir", "autre.txt"), "Hello!")
147       pass #for
148
149     os.chdir(old_dir)
150
151   ##############################
152   # test of command job type
153   ##############################
154   def test_command(self):
155     case_test_dir = os.path.join(TestCompo.test_dir, "command")
156     mkdir_p(case_test_dir)
157
158     # job script
159     data_file = "in.txt"
160     script_file = "myEnvScript.py"
161     script_text = """#! /usr/bin/env python
162 # -*- coding: utf-8 -*-
163
164 import os,sys
165
166 text_result = os.getenv("ENV_TEST_VAR","")
167
168 f = open('result.txt', 'w')
169 f.write(text_result)
170 f.close()
171
172 in_f = open("in.txt", "r")
173 in_text = in_f.read()
174 in_f.close()
175
176 os.mkdir("copie")
177 f = open(os.path.join("copie",'copie.txt'), 'w')
178 f.write(in_text)
179 f.close()
180 """
181     abs_script_file = os.path.join(case_test_dir, script_file)
182     f = open(abs_script_file, "w")
183     f.write(script_text)
184     f.close()
185     os.chmod(abs_script_file, 0o755)
186
187     #environment script
188     env_file = "myEnv.sh"
189     env_text = """export ENV_TEST_VAR="expected"
190 """
191     f = open(os.path.join(case_test_dir, env_file), "w")
192     f.write(env_text)
193     f.close()
194
195     # write data file
196     f = open(os.path.join(case_test_dir, data_file), "w")
197     f.write("to be copied")
198     f.close()
199
200     # job params
201     local_result_dir = os.path.join(case_test_dir, "result_com_job-")
202     job_params = self.create_JobParameters()
203     job_params.job_type = "command"
204     job_params.job_file = script_file
205     job_params.env_file = env_file
206     job_params.in_files = [data_file]
207     job_params.out_files = ["result.txt", "copie"]
208     job_params.local_directory = case_test_dir
209
210     # create and launch the job
211     launcher = salome.naming_service.Resolve('/SalomeLauncher')
212     resManager= salome.lcc.getResourcesManager()
213
214     for resource in self.ressources:
215       print("Testing command job on ", resource)
216       job_params.result_directory = local_result_dir + resource
217       job_params.job_name = "CommandJob_" + resource
218       job_params.resource_required.name = resource
219
220       # use the working directory of the resource
221       resParams = resManager.GetResourceDefinition(resource)
222       wd = os.path.join(resParams.working_directory,
223                         "CommandJob" + self.suffix)
224       job_params.work_directory = wd
225
226       job_id = launcher.createJob(job_params)
227       launcher.launchJob(job_id)
228       # wait for the end of the job
229       jobState = launcher.getJobState(job_id)
230       print("Job %d state: %s" % (job_id,jobState))
231       while jobState != "FINISHED" and jobState != "FAILED" :
232         time.sleep(3)
233         jobState = launcher.getJobState(job_id)
234         print("Job %d state: %s" % (job_id,jobState))
235         pass
236
237       # verify the results
238       self.assertEqual(jobState, "FINISHED")
239       launcher.getJobResults(job_id, "")
240       self.verifyFile(os.path.join(job_params.result_directory, "result.txt"),
241                       "expected")
242       self.verifyFile(os.path.join(job_params.result_directory,
243                                    "copie",'copie.txt'),
244                       "to be copied")
245
246       # verify getJobWorkFile
247       mydir = os.path.join(case_test_dir, "work_dir" + resource)
248       success = launcher.getJobWorkFile(job_id, "result.txt", mydir)
249       self.assertEqual(success, True)
250       self.verifyFile(os.path.join(mydir, "result.txt"), "expected")
251
252       success = launcher.getJobWorkFile(job_id, "copie", mydir)
253       self.assertEqual(success, True)
254       self.verifyFile(os.path.join(mydir, "copie", "copie.txt"),
255                       "to be copied")
256
257
258   ##############################
259   # test of yacs job type
260   ##############################
261   def test_yacs(self):
262     yacs_path = os.getenv("YACS_ROOT_DIR", "")
263     if not os.path.isdir(yacs_path):
264       self.skipTest("Needs YACS module to run. Please define YACS_ROOT_DIR.")
265
266     case_test_dir = os.path.join(TestCompo.test_dir, "yacs")
267     mkdir_p(case_test_dir)
268
269     #environment script
270     env_file = "myEnv.sh"
271     env_text = """export ENV_TEST_VAR="expected"
272 """
273     f = open(os.path.join(case_test_dir, env_file), "w")
274     f.write(env_text)
275     f.close()
276
277     # job script
278     script_text = """<?xml version='1.0' encoding='iso-8859-1' ?>
279 <proc name="newSchema_1">
280    <container name="DefaultContainer">
281       <property name="container_kind" value="Salome"/>
282       <property name="attached_on_cloning" value="0"/>
283       <property name="container_name" value="FactoryServer"/>
284       <property name="name" value="localhost"/>
285    </container>
286    <inline name="PyScript0">
287       <script><code><![CDATA[import os
288 text_result = os.getenv("ENV_TEST_VAR","")
289 f = open('result.txt', 'w')
290 f.write(text_result)
291 f.close()
292 ]]></code></script>
293       <load container="DefaultContainer"/>
294    </inline>
295 </proc>
296 """
297     yacs_file = "mySchema.xml"
298     job_script_file = os.path.join(case_test_dir, yacs_file)
299     f = open(job_script_file, "w")
300     f.write(script_text)
301     f.close()
302
303     local_result_dir = os.path.join(case_test_dir, "result_yacs_job-")
304     job_params = self.create_JobParameters()
305     job_params.job_type = "yacs_file"
306     job_params.job_file = job_script_file
307     job_params.env_file = os.path.join(case_test_dir,env_file)
308     job_params.out_files = ["result.txt"]
309
310     # define the interval between two YACS schema dumps (3 seconds)
311     import Engines
312     job_params.specific_parameters = [Engines.Parameter("EnableDumpYACS", "3")]
313
314     launcher = salome.naming_service.Resolve('/SalomeLauncher')
315     resManager= salome.lcc.getResourcesManager()
316
317     for resource in self.ressources:
318       print("Testing yacs job on ", resource)
319       job_params.result_directory = local_result_dir + resource
320       job_params.job_name = "YacsJob_" + resource
321       job_params.resource_required.name = resource
322
323       # use the working directory of the resource
324       resParams = resManager.GetResourceDefinition(resource)
325       wd = os.path.join(resParams.working_directory,
326                         "YacsJob" + self.suffix)
327       job_params.work_directory = wd
328
329       job_id = launcher.createJob(job_params)
330       launcher.launchJob(job_id)
331       jobState = launcher.getJobState(job_id)
332
333       yacs_dump_success = False
334       print("Job %d state: %s" % (job_id,jobState))
335       while jobState != "FINISHED" and jobState != "FAILED" :
336         time.sleep(5)
337         jobState = launcher.getJobState(job_id)
338 #        yacs_dump_success = launcher.getJobWorkFile(job_id, "dumpState_mySchema.xml",
339         yacs_dump_success = launcher.getJobDumpState(job_id,
340                                               job_params.result_directory)
341         print("Job %d state: %s - dump: %s" % (job_id,jobState, yacs_dump_success))
342         pass
343
344       self.assertEqual(jobState, "FINISHED")
345
346       # Verify dumpState file is in the results
347       self.assertTrue(yacs_dump_success)
348       dump_file_path = os.path.join(job_params.result_directory,
349                                     "dumpState_mySchema.xml")
350       self.assertTrue(os.path.isfile(dump_file_path))
351
352       # Load the schema state from the dump file and verify the state of a node
353       import SALOMERuntime
354       SALOMERuntime.RuntimeSALOME_setRuntime(1)
355       import loader
356       schema = loader.YACSLoader().load(job_script_file)
357       stateParser = loader.stateParser()
358       sl = loader.stateLoader(stateParser, schema)
359       sl.parse(dump_file_path)
360       # 106 : "DONE" state code
361       self.assertEqual(106, schema.getChildByName("PyScript0").getEffectiveState())
362
363       # getJobResults to default directory (result_directory)
364       launcher.getJobResults(job_id, "")
365       self.verifyFile(os.path.join(job_params.result_directory, "result.txt"),
366                       "expected")
367
368   ##############################
369   # test of yacs job type using "--init_port" driver option
370   ##############################
371   def test_yacsopt(self):
372     yacs_path = os.getenv("YACS_ROOT_DIR", "")
373     if not os.path.isdir(yacs_path):
374       self.skipTest("Needs YACS module to run. Please define YACS_ROOT_DIR.")
375
376     case_test_dir = os.path.join(TestCompo.test_dir, "yacs_opt")
377     mkdir_p(case_test_dir)
378
379     # job script
380     script_text = """<?xml version='1.0' encoding='iso-8859-1' ?>
381 <proc name="myschema">
382    <type name="string" kind="string"/>
383    <type name="bool" kind="bool"/>
384    <type name="double" kind="double"/>
385    <type name="int" kind="int"/>
386    <container name="DefaultContainer">
387       <property name="container_kind" value="Salome"/>
388       <property name="attached_on_cloning" value="0"/>
389       <property name="container_name" value="FactoryServer"/>
390       <property name="name" value="localhost"/>
391    </container>
392    <inline name="mynode">
393       <script><code><![CDATA[
394 text_result = "i=%s,d=%s,b=%s,s=%s" % (i,d,b,s)
395 f = open('result.txt', 'w')
396 f.write(text_result)
397 f.close()
398 ]]></code></script>
399       <load container="DefaultContainer"/>
400       <inport name="i" type="int"/>
401       <inport name="d" type="double"/>
402       <inport name="b" type="bool"/>
403       <inport name="s" type="string"/>
404    </inline>
405 </proc>
406 """
407     yacs_file = "simpleSchema.xml"
408     job_script_file = os.path.join(case_test_dir, yacs_file)
409     f = open(job_script_file, "w")
410     f.write(script_text)
411     f.close()
412
413     local_result_dir = os.path.join(case_test_dir, "result_yacsopt_job-")
414     job_params = self.create_JobParameters()
415     job_params.job_type = "yacs_file"
416     job_params.job_file = job_script_file
417     job_params.out_files = ["result.txt"]
418
419     # define the interval between two YACS schema dumps (3 seconds)
420     import Engines
421     job_params.specific_parameters = [Engines.Parameter("YACSDriverOptions",
422                "-imynode.i=5 -imynode.d=3.7 -imynode.b=False -imynode.s=lili")]
423     expected_result="i=5,d=3.7,b=False,s=lili"
424
425     launcher = salome.naming_service.Resolve('/SalomeLauncher')
426     resManager= salome.lcc.getResourcesManager()
427
428     for resource in self.ressources:
429       print("Testing yacs job with options on ", resource)
430       job_params.result_directory = local_result_dir + resource
431       job_params.job_name = "YacsJobOpt_" + resource
432       job_params.resource_required.name = resource
433
434       # use the working directory of the resource
435       resParams = resManager.GetResourceDefinition(resource)
436       wd = os.path.join(resParams.working_directory,
437                         "YacsJobOpt" + self.suffix)
438       job_params.work_directory = wd
439
440       job_id = launcher.createJob(job_params)
441       launcher.launchJob(job_id)
442       jobState = launcher.getJobState(job_id)
443
444       yacs_dump_success = False
445       print("Job %d state: %s" % (job_id,jobState))
446       while jobState != "FINISHED" and jobState != "FAILED" :
447         time.sleep(5)
448         jobState = launcher.getJobState(job_id)
449         print("Job %d state: %s " % (job_id,jobState))
450         pass
451
452       self.assertEqual(jobState, "FINISHED")
453
454       # getJobResults to default directory (result_directory)
455       launcher.getJobResults(job_id, "")
456       self.verifyFile(os.path.join(job_params.result_directory, "result.txt"),
457                       expected_result)
458
459   ############################################
460   # test of command job type with pre_command
461   ############################################
462   def test_command_pre(self):
463     case_test_dir = os.path.join(TestCompo.test_dir, "command_pre")
464     mkdir_p(case_test_dir)
465
466     # command to be run before the job
467     pre_command = "pre_command.sh"
468     pre_command_text = "echo 'it works!' > in.txt"
469     abs_pre_command_file = os.path.join(case_test_dir, pre_command)
470     f = open(abs_pre_command_file, "w")
471     f.write(pre_command_text)
472     f.close()
473     os.chmod(abs_pre_command_file, 0o755)
474     
475     # job script
476     script_file = "myTestScript.py"
477     script_text = """#! /usr/bin/env python
478 # -*- coding: utf-8 -*-
479
480 in_f = open("in.txt", "r")
481 in_text = in_f.read()
482 in_f.close()
483
484 f = open('result.txt', 'w')
485 f.write(in_text)
486 f.close()
487 """
488     abs_script_file = os.path.join(case_test_dir, script_file)
489     f = open(abs_script_file, "w")
490     f.write(script_text)
491     f.close()
492     os.chmod(abs_script_file, 0o755)
493
494     # job params
495     local_result_dir = os.path.join(case_test_dir, "result_com_pre_job-")
496     job_params = self.create_JobParameters()
497     job_params.job_type = "command"
498     job_params.job_file = script_file
499     job_params.pre_command = pre_command
500     job_params.in_files = []
501     job_params.out_files = ["result.txt"]
502     job_params.local_directory = case_test_dir
503
504     # create and launch the job
505     launcher = salome.naming_service.Resolve('/SalomeLauncher')
506     resManager= salome.lcc.getResourcesManager()
507
508     for resource in self.ressources:
509       print "Testing command job on ", resource
510       job_params.result_directory = local_result_dir + resource
511       job_params.job_name = "CommandPreJob_" + resource
512       job_params.resource_required.name = resource
513
514       # use the working directory of the resource
515       resParams = resManager.GetResourceDefinition(resource)
516       wd = os.path.join(resParams.working_directory,
517                         "CommandPreJob" + self.suffix)
518       job_params.work_directory = wd
519
520       job_id = launcher.createJob(job_params)
521       launcher.launchJob(job_id)
522       # wait for the end of the job
523       jobState = launcher.getJobState(job_id)
524       print "Job %d state: %s" % (job_id,jobState)
525       while jobState != "FINISHED" and jobState != "FAILED" :
526         time.sleep(3)
527         jobState = launcher.getJobState(job_id)
528         print "Job %d state: %s" % (job_id,jobState)
529         pass
530
531       # verify the results
532       self.assertEqual(jobState, "FINISHED")
533       launcher.getJobResults(job_id, "")
534       self.verifyFile(os.path.join(job_params.result_directory, "result.txt"),
535                       "it works!\n")
536
537   #################################
538   # test of command salome job type
539   #################################
540   def test_command_salome(self):
541     case_test_dir = os.path.join(TestCompo.test_dir, "command_salome")
542     mkdir_p(case_test_dir)
543
544     # job script
545     data_file = "in.txt"
546     script_file = "myEnvScript.py"
547     script_text = """#! /usr/bin/env python
548 # -*- coding: utf-8 -*-
549
550 import os,sys
551 # verify import salome
552 import salome
553
554 text_result = os.getenv("ENV_TEST_VAR","")
555
556 f = open('result.txt', 'w')
557 f.write(text_result)
558 f.close()
559
560 in_f = open("in.txt", "r")
561 in_text = in_f.read()
562 in_f.close()
563
564 os.mkdir("copie")
565 f = open(os.path.join("copie",'copie.txt'), 'w')
566 f.write(in_text)
567 f.close()
568 """
569     abs_script_file = os.path.join(case_test_dir, script_file)
570     f = open(abs_script_file, "w")
571     f.write(script_text)
572     f.close()
573     os.chmod(abs_script_file, 0o755)
574
575     #environment script
576     env_file = "myEnv.sh"
577     env_text = """export ENV_TEST_VAR="expected"
578 """
579     f = open(os.path.join(case_test_dir, env_file), "w")
580     f.write(env_text)
581     f.close()
582
583     # write data file
584     f = open(os.path.join(case_test_dir, data_file), "w")
585     f.write("to be copied")
586     f.close()
587
588     # job params
589     local_result_dir = os.path.join(case_test_dir, "result_comsalome_job-")
590     job_params = self.create_JobParameters()
591     job_params.job_type = "command_salome"
592     job_params.job_file = script_file
593     job_params.env_file = env_file
594     job_params.in_files = [data_file]
595     job_params.out_files = ["result.txt", "copie"]
596     job_params.local_directory = case_test_dir
597
598     # create and launch the job
599     launcher = salome.naming_service.Resolve('/SalomeLauncher')
600     resManager= salome.lcc.getResourcesManager()
601
602     for resource in self.ressources:
603       print "Testing command salome job on ", resource
604       job_params.result_directory = local_result_dir + resource
605       job_params.job_name = "CommandSalomeJob_" + resource
606       job_params.resource_required.name = resource
607
608       # use the working directory of the resource
609       resParams = resManager.GetResourceDefinition(resource)
610       wd = os.path.join(resParams.working_directory,
611                         "CommandSalomeJob" + self.suffix)
612       job_params.work_directory = wd
613
614       job_id = launcher.createJob(job_params)
615       launcher.launchJob(job_id)
616       # wait for the end of the job
617       jobState = launcher.getJobState(job_id)
618       print "Job %d state: %s" % (job_id,jobState)
619       while jobState != "FINISHED" and jobState != "FAILED" :
620         time.sleep(3)
621         jobState = launcher.getJobState(job_id)
622         print "Job %d state: %s" % (job_id,jobState)
623         pass
624
625       # verify the results
626       self.assertEqual(jobState, "FINISHED")
627       launcher.getJobResults(job_id, "")
628       self.verifyFile(os.path.join(job_params.result_directory, "result.txt"),
629                       "expected")
630       self.verifyFile(os.path.join(job_params.result_directory,
631                                    "copie",'copie.txt'),
632                       "to be copied")
633
634       # verify getJobWorkFile
635       mydir = os.path.join(case_test_dir, "work_dir" + resource)
636       success = launcher.getJobWorkFile(job_id, "result.txt", mydir)
637       self.assertEqual(success, True)
638       self.verifyFile(os.path.join(mydir, "result.txt"), "expected")
639
640       success = launcher.getJobWorkFile(job_id, "copie", mydir)
641       self.assertEqual(success, True)
642       self.verifyFile(os.path.join(mydir, "copie", "copie.txt"),
643                       "to be copied")
644       pass
645     pass
646   pass
647
648 if __name__ == '__main__':
649     # create study
650     import salome
651     salome.salome_init()
652     unittest.main()