Salome HOME
Add the preprocess feature to SALOME_Launcher.
[modules/kernel.git] / src / Launcher / Test / test_launcher.py
1 #! /usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import unittest
5 import os
6 import sys
7 import time
8 import tempfile
9 import errno
10
11 def mkdir_p(path):
12   try:
13     os.makedirs(path)
14   except OSError as exc:  # Python >2.5
15     if exc.errno == errno.EEXIST and os.path.isdir(path):
16       pass
17     else:
18       raise
19
20 # Test of SalomeLauncher.
21 # This test should be run in the salome environment, using "salome shell"
22 # and a salome application should be running.
23 # The test will try to launch batch jobs on every available ressources which
24 # have the can_launch_batch_jobs parameter set to True.
25 # You can use the environment variable USER_CATALOG_RESOURCES_FILE in order to
26 # define a customised ressource catalog.
27 # If YACS_ROOT_DIR is not set, the test of submitting a YACS schema will be
28 # skiped.
29 class TestCompo(unittest.TestCase):
30   @classmethod
31   def setUpClass(cls):
32     # Prepare the test directory
33     temp = tempfile.NamedTemporaryFile()
34     cls.test_dir = os.path.join(temp.name, "test_dir")
35     name = os.path.basename(temp.name)
36     temp.close()
37     cls.suffix = time.strftime("-%Y-%m-%d-%H-%M-%S")+"-%s"%(os.getpid())
38     mkdir_p(cls.test_dir)
39
40     # load catalogs
41 #    mc = salome.naming_service.Resolve('/Kernel/ModulCatalog')
42 #    ior = salome.orb.object_to_string(mc)
43 #    import SALOMERuntime
44 #    SALOMERuntime.RuntimeSALOME_setRuntime()
45 #    salome_runtime = SALOMERuntime.getSALOMERuntime()
46 #    session_catalog = salome_runtime.loadCatalog("session", ior)
47 #    salome_runtime.addCatalog(session_catalog)
48
49     # Get the list of possible ressources
50     ressource_param = salome.ResourceParameters()
51     ressource_param.can_launch_batch_jobs = True
52     rm = salome.lcc.getResourcesManager()
53     cls.ressources = rm.GetFittingResources(ressource_param)
54
55   def verifyFile(self, path, content):
56     try:
57       f = open(path, 'r')
58       text = f.read()
59       f.close()
60       self.assertEqual(text, content)
61     except IOError,ex:
62       self.fail("IO exception:" + str(ex));
63
64   def create_JobParameters(self):
65     job_params = salome.JobParameters()
66     job_params.wckey="P11U50:CARBONES" #needed by edf clusters
67     job_params.resource_required = salome.ResourceParameters()
68     job_params.resource_required.nb_proc = 1
69     return job_params
70
71   ##############################
72   # test of python_salome job
73   ##############################
74   def test_salome_py_job(self):
75     case_test_dir = os.path.join(TestCompo.test_dir, "salome_py")
76     mkdir_p(case_test_dir)
77
78     old_dir = os.getcwd()
79     os.chdir(case_test_dir)
80
81     # job script
82     script_file = "myScript.py"
83     job_script_file = os.path.join(case_test_dir, script_file)
84     script_text = """#! /usr/bin/env python
85 # -*- coding: utf-8 -*-
86
87 # verify import salome
88 import salome
89 salome.salome_init()
90
91 f = open('result.txt', 'w')
92 f.write("Salut!")
93 f.close()
94
95 import os
96 os.mkdir("subdir")
97 f = open(os.path.join("subdir",'autre.txt'), 'w')
98 f.write("Hello!")
99 f.close()
100 """
101     f = open(job_script_file, "w")
102     f.write(script_text)
103     f.close()
104
105     local_result_dir = os.path.join(case_test_dir, "result_py_job-")
106     job_params = self.create_JobParameters()
107     job_params.job_type = "python_salome"
108     job_params.job_file = job_script_file
109     job_params.in_files = []
110     job_params.out_files = ["result.txt", "subdir"]
111
112     launcher = salome.naming_service.Resolve('/SalomeLauncher')
113
114     for resource in self.ressources:
115       print "Testing python_salome job on ", resource
116       job_params.result_directory = local_result_dir + resource
117       job_params.job_name = "PyJob" + resource
118       job_params.resource_required.name = resource
119       # use default working directory for this test
120
121       job_id = launcher.createJob(job_params)
122       launcher.launchJob(job_id)
123
124       jobState = launcher.getJobState(job_id)
125       print "Job %d state: %s" % (job_id,jobState)
126       while jobState != "FINISHED" and jobState != "FAILED" :
127         time.sleep(5)
128         jobState = launcher.getJobState(job_id)
129         print "Job %d state: %s" % (job_id,jobState)
130         pass
131
132       self.assertEqual(jobState, "FINISHED")
133
134       # getJobResults to default directory (result_directory)
135       launcher.getJobResults(job_id, "")
136       self.verifyFile(os.path.join(job_params.result_directory, "result.txt"),
137                       "Salut!")
138       self.verifyFile(os.path.join(job_params.result_directory,
139                                    "subdir", "autre.txt"),
140                       "Hello!")
141
142       # getJobResults to a specific directory
143       mydir = os.path.join(case_test_dir, "custom_result_dir" + resource)
144       launcher.getJobResults(job_id, mydir)
145       self.verifyFile(os.path.join(mydir, "result.txt"), "Salut!")
146       self.verifyFile(os.path.join(mydir, "subdir", "autre.txt"), "Hello!")
147       pass #for
148
149     os.chdir(old_dir)
150
151   ##############################
152   # test of command job type
153   ##############################
154   def test_command(self):
155     case_test_dir = os.path.join(TestCompo.test_dir, "command")
156     mkdir_p(case_test_dir)
157
158     # job script
159     data_file = "in.txt"
160     script_file = "myEnvScript.py"
161     script_text = """#! /usr/bin/env python
162 # -*- coding: utf-8 -*-
163
164 import os,sys
165
166 text_result = os.getenv("ENV_TEST_VAR","")
167
168 f = open('result.txt', 'w')
169 f.write(text_result)
170 f.close()
171
172 in_f = open("in.txt", "r")
173 in_text = in_f.read()
174 in_f.close()
175
176 os.mkdir("copie")
177 f = open(os.path.join("copie",'copie.txt'), 'w')
178 f.write(in_text)
179 f.close()
180 """
181     abs_script_file = os.path.join(case_test_dir, script_file)
182     f = open(abs_script_file, "w")
183     f.write(script_text)
184     f.close()
185     os.chmod(abs_script_file, 0o755)
186
187     #environement script
188     env_file = "myEnv.sh"
189     env_text = """export ENV_TEST_VAR="expected"
190 """
191     f = open(os.path.join(case_test_dir, env_file), "w")
192     f.write(env_text)
193     f.close()
194
195     # write data file
196     f = open(os.path.join(case_test_dir, data_file), "w")
197     f.write("to be copied")
198     f.close()
199
200     # job params
201     local_result_dir = os.path.join(case_test_dir, "result_com_job-")
202     job_params = self.create_JobParameters()
203     job_params.job_type = "command"
204     job_params.job_file = script_file
205     job_params.env_file = env_file
206     job_params.in_files = [data_file]
207     job_params.out_files = ["result.txt", "copie"]
208     job_params.local_directory = case_test_dir
209
210     # create and launch the job
211     launcher = salome.naming_service.Resolve('/SalomeLauncher')
212     resManager= salome.lcc.getResourcesManager()
213
214     for resource in self.ressources:
215       print "Testing command job on ", resource
216       job_params.result_directory = local_result_dir + resource
217       job_params.job_name = "CommandJob_" + resource
218       job_params.resource_required.name = resource
219
220       # use the working directory of the resource
221       resParams = resManager.GetResourceDefinition(resource)
222       wd = os.path.join(resParams.working_directory,
223                         "CommandJob" + self.suffix)
224       job_params.work_directory = wd
225
226       job_id = launcher.createJob(job_params)
227       launcher.launchJob(job_id)
228       # wait for the end of the job
229       jobState = launcher.getJobState(job_id)
230       print "Job %d state: %s" % (job_id,jobState)
231       while jobState != "FINISHED" and jobState != "FAILED" :
232         time.sleep(3)
233         jobState = launcher.getJobState(job_id)
234         print "Job %d state: %s" % (job_id,jobState)
235         pass
236
237       # verify the results
238       self.assertEqual(jobState, "FINISHED")
239       launcher.getJobResults(job_id, "")
240       self.verifyFile(os.path.join(job_params.result_directory, "result.txt"),
241                       "expected")
242       self.verifyFile(os.path.join(job_params.result_directory,
243                                    "copie",'copie.txt'),
244                       "to be copied")
245
246       # verify getJobWorkFile
247       mydir = os.path.join(case_test_dir, "work_dir" + resource)
248       success = launcher.getJobWorkFile(job_id, "result.txt", mydir)
249       self.assertEqual(success, True)
250       self.verifyFile(os.path.join(mydir, "result.txt"), "expected")
251
252       success = launcher.getJobWorkFile(job_id, "copie", mydir)
253       self.assertEqual(success, True)
254       self.verifyFile(os.path.join(mydir, "copie", "copie.txt"),
255                       "to be copied")
256
257
258   ##############################
259   # test of yacs job type
260   ##############################
261   def test_yacs(self):
262     yacs_path = os.getenv("YACS_ROOT_DIR", "")
263     if not os.path.isdir(yacs_path):
264       self.skipTest("Needs YACS module to run. Please define YACS_ROOT_DIR.")
265
266     case_test_dir = os.path.join(TestCompo.test_dir, "yacs")
267     mkdir_p(case_test_dir)
268
269     #environement script
270     env_file = "myEnv.sh"
271     env_text = """export ENV_TEST_VAR="expected"
272 """
273     f = open(os.path.join(case_test_dir, env_file), "w")
274     f.write(env_text)
275     f.close()
276
277     # job script
278     script_text = """<?xml version='1.0' encoding='iso-8859-1' ?>
279 <proc name="newSchema_1">
280    <property name="DefaultStudyID" value="1"/>
281    <container name="DefaultContainer">
282       <property name="container_kind" value="Salome"/>
283       <property name="attached_on_cloning" value="0"/>
284       <property name="container_name" value="FactoryServer"/>
285       <property name="name" value="localhost"/>
286    </container>
287    <inline name="PyScript0">
288       <script><code><![CDATA[import os
289 text_result = os.getenv("ENV_TEST_VAR","")
290 f = open('result.txt', 'w')
291 f.write(text_result)
292 f.close()
293 ]]></code></script>
294       <load container="DefaultContainer"/>
295    </inline>
296 </proc>
297 """
298     yacs_file = "mySchema.xml"
299     job_script_file = os.path.join(case_test_dir, yacs_file)
300     f = open(job_script_file, "w")
301     f.write(script_text)
302     f.close()
303
304     local_result_dir = os.path.join(case_test_dir, "result_yacs_job-")
305     job_params = self.create_JobParameters()
306     job_params.job_type = "yacs_file"
307     job_params.job_file = job_script_file
308     job_params.env_file = os.path.join(case_test_dir,env_file)
309     job_params.out_files = ["result.txt"]
310
311     # define the interval between two YACS schema dumps (3 seconds)
312     import Engines
313     job_params.specific_parameters = [Engines.Parameter("EnableDumpYACS", "3")]
314
315     launcher = salome.naming_service.Resolve('/SalomeLauncher')
316     resManager= salome.lcc.getResourcesManager()
317
318     for resource in self.ressources:
319       print "Testing yacs job on ", resource
320       job_params.result_directory = local_result_dir + resource
321       job_params.job_name = "YacsJob_" + resource
322       job_params.resource_required.name = resource
323
324       # use the working directory of the resource
325       resParams = resManager.GetResourceDefinition(resource)
326       wd = os.path.join(resParams.working_directory,
327                         "YacsJob" + self.suffix)
328       job_params.work_directory = wd
329
330       job_id = launcher.createJob(job_params)
331       launcher.launchJob(job_id)
332       jobState = launcher.getJobState(job_id)
333
334       yacs_dump_success = False
335       print "Job %d state: %s" % (job_id,jobState)
336       while jobState != "FINISHED" and jobState != "FAILED" :
337         time.sleep(5)
338         jobState = launcher.getJobState(job_id)
339 #        yacs_dump_success = launcher.getJobWorkFile(job_id, "dumpState_mySchema.xml",
340         yacs_dump_success = launcher.getJobDumpState(job_id,
341                                               job_params.result_directory)
342         print "Job %d state: %s - dump: %s" % (job_id,jobState, yacs_dump_success)
343         pass
344
345       self.assertEqual(jobState, "FINISHED")
346
347       # Verify dumpState file is in the results
348       self.assertTrue(yacs_dump_success)
349       dump_file_path = os.path.join(job_params.result_directory,
350                                     "dumpState_mySchema.xml")
351       self.assertTrue(os.path.isfile(dump_file_path))
352
353       # Load the schema state from the dump file and verify the state of a node
354       import SALOMERuntime
355       SALOMERuntime.RuntimeSALOME_setRuntime(1)
356       import loader
357       schema = loader.YACSLoader().load(job_script_file)
358       stateParser = loader.stateParser()
359       sl = loader.stateLoader(stateParser, schema)
360       sl.parse(dump_file_path)
361       # 106 : "DONE" state code
362       self.assertEqual(106, schema.getChildByName("PyScript0").getEffectiveState())
363
364       # getJobResults to default directory (result_directory)
365       launcher.getJobResults(job_id, "")
366       self.verifyFile(os.path.join(job_params.result_directory, "result.txt"),
367                       "expected")
368
369   ##############################
370   # test of yacs job type using "--init_port" driver option
371   ##############################
372   def test_yacsopt(self):
373     yacs_path = os.getenv("YACS_ROOT_DIR", "")
374     if not os.path.isdir(yacs_path):
375       self.skipTest("Needs YACS module to run. Please define YACS_ROOT_DIR.")
376
377     case_test_dir = os.path.join(TestCompo.test_dir, "yacs_opt")
378     mkdir_p(case_test_dir)
379
380     # job script
381     script_text = """<?xml version='1.0' encoding='iso-8859-1' ?>
382 <proc name="myschema">
383    <property name="DefaultStudyID" value="1"/>
384    <type name="string" kind="string"/>
385    <type name="bool" kind="bool"/>
386    <type name="double" kind="double"/>
387    <type name="int" kind="int"/>
388    <container name="DefaultContainer">
389       <property name="container_kind" value="Salome"/>
390       <property name="attached_on_cloning" value="0"/>
391       <property name="container_name" value="FactoryServer"/>
392       <property name="name" value="localhost"/>
393    </container>
394    <inline name="mynode">
395       <script><code><![CDATA[
396 text_result = "i=%s,d=%s,b=%s,s=%s" % (i,d,b,s)
397 f = open('result.txt', 'w')
398 f.write(text_result)
399 f.close()
400 ]]></code></script>
401       <load container="DefaultContainer"/>
402       <inport name="i" type="int"/>
403       <inport name="d" type="double"/>
404       <inport name="b" type="bool"/>
405       <inport name="s" type="string"/>
406    </inline>
407 </proc>
408 """
409     yacs_file = "simpleSchema.xml"
410     job_script_file = os.path.join(case_test_dir, yacs_file)
411     f = open(job_script_file, "w")
412     f.write(script_text)
413     f.close()
414
415     local_result_dir = os.path.join(case_test_dir, "result_yacsopt_job-")
416     job_params = self.create_JobParameters()
417     job_params.job_type = "yacs_file"
418     job_params.job_file = job_script_file
419     job_params.out_files = ["result.txt"]
420
421     # define the interval between two YACS schema dumps (3 seconds)
422     import Engines
423     job_params.specific_parameters = [Engines.Parameter("YACSDriverOptions",
424                "-imynode.i=5 -imynode.d=3.7 -imynode.b=False -imynode.s=lili")]
425     expected_result="i=5,d=3.7,b=False,s=lili"
426
427     launcher = salome.naming_service.Resolve('/SalomeLauncher')
428     resManager= salome.lcc.getResourcesManager()
429
430     for resource in self.ressources:
431       print "Testing yacs job with options on ", resource
432       job_params.result_directory = local_result_dir + resource
433       job_params.job_name = "YacsJobOpt_" + resource
434       job_params.resource_required.name = resource
435
436       # use the working directory of the resource
437       resParams = resManager.GetResourceDefinition(resource)
438       wd = os.path.join(resParams.working_directory,
439                         "YacsJobOpt" + self.suffix)
440       job_params.work_directory = wd
441
442       job_id = launcher.createJob(job_params)
443       launcher.launchJob(job_id)
444       jobState = launcher.getJobState(job_id)
445
446       yacs_dump_success = False
447       print "Job %d state: %s" % (job_id,jobState)
448       while jobState != "FINISHED" and jobState != "FAILED" :
449         time.sleep(5)
450         jobState = launcher.getJobState(job_id)
451         print "Job %d state: %s " % (job_id,jobState)
452         pass
453
454       self.assertEqual(jobState, "FINISHED")
455
456       # getJobResults to default directory (result_directory)
457       launcher.getJobResults(job_id, "")
458       self.verifyFile(os.path.join(job_params.result_directory, "result.txt"),
459                       expected_result)
460
461   ############################################
462   # test of command job type with pre_command
463   ############################################
464   def test_command_pre(self):
465     case_test_dir = os.path.join(TestCompo.test_dir, "command_pre")
466     mkdir_p(case_test_dir)
467
468     # command to be run before the job
469     pre_command = "echo 'it works!' > in.txt"
470     
471     # job script
472     script_file = "myTestScript.py"
473     script_text = """#! /usr/bin/env python
474 # -*- coding: utf-8 -*-
475
476 in_f = open("in.txt", "r")
477 in_text = in_f.read()
478 in_f.close()
479
480 f = open('result.txt', 'w')
481 f.write(in_text)
482 f.close()
483 """
484     abs_script_file = os.path.join(case_test_dir, script_file)
485     f = open(abs_script_file, "w")
486     f.write(script_text)
487     f.close()
488     os.chmod(abs_script_file, 0o755)
489
490     # job params
491     local_result_dir = os.path.join(case_test_dir, "result_com_pre_job-")
492     job_params = self.create_JobParameters()
493     job_params.job_type = "command"
494     job_params.job_file = script_file
495     job_params.pre_command = pre_command
496     job_params.in_files = []
497     job_params.out_files = ["result.txt"]
498     job_params.local_directory = case_test_dir
499
500     # create and launch the job
501     launcher = salome.naming_service.Resolve('/SalomeLauncher')
502     resManager= salome.lcc.getResourcesManager()
503
504     for resource in self.ressources:
505       print "Testing command job on ", resource
506       job_params.result_directory = local_result_dir + resource
507       job_params.job_name = "CommandPreJob_" + resource
508       job_params.resource_required.name = resource
509
510       # use the working directory of the resource
511       resParams = resManager.GetResourceDefinition(resource)
512       wd = os.path.join(resParams.working_directory,
513                         "CommandPreJob" + self.suffix)
514       job_params.work_directory = wd
515
516       job_id = launcher.createJob(job_params)
517       launcher.launchJob(job_id)
518       # wait for the end of the job
519       jobState = launcher.getJobState(job_id)
520       print "Job %d state: %s" % (job_id,jobState)
521       while jobState != "FINISHED" and jobState != "FAILED" :
522         time.sleep(3)
523         jobState = launcher.getJobState(job_id)
524         print "Job %d state: %s" % (job_id,jobState)
525         pass
526
527       # verify the results
528       self.assertEqual(jobState, "FINISHED")
529       launcher.getJobResults(job_id, "")
530       self.verifyFile(os.path.join(job_params.result_directory, "result.txt"),
531                       "it works!\n")
532
533 if __name__ == '__main__':
534     # creat study
535     import salome
536     salome.salome_init()
537     unittest.main()