Salome HOME
Typo-fix by Kunda
[modules/kernel.git] / src / Launcher / Test / test_launcher.py
1 #! /usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import unittest
5 import os
6 import sys
7 import time
8 import tempfile
9 import errno
10
11 def mkdir_p(path):
12   try:
13     os.makedirs(path)
14   except OSError as exc:  # Python >2.5
15     if exc.errno == errno.EEXIST and os.path.isdir(path):
16       pass
17     else:
18       raise
19
20 # Test of SalomeLauncher.
21 # This test should be run in the salome environment, using "salome shell"
22 # and a salome application should be running.
23 # The test will try to launch batch jobs on every available resources which
24 # have the can_launch_batch_jobs parameter set to True.
25 # You can use the environment variable USER_CATALOG_RESOURCES_FILE in order to
26 # define a customised resource catalog.
27 # If YACS_ROOT_DIR is not set, the test of submitting a YACS schema will be
28 # skipped.
29 class TestCompo(unittest.TestCase):
30   @classmethod
31   def setUpClass(cls):
32     # Prepare the test directory
33     temp = tempfile.NamedTemporaryFile()
34     cls.test_dir = os.path.join(temp.name, "test_dir")
35     name = os.path.basename(temp.name)
36     temp.close()
37     cls.suffix = time.strftime("-%Y-%m-%d-%H-%M-%S")+"-%s"%(os.getpid())
38     mkdir_p(cls.test_dir)
39
40     # load catalogs
41 #    mc = salome.naming_service.Resolve('/Kernel/ModulCatalog')
42 #    ior = salome.orb.object_to_string(mc)
43 #    import SALOMERuntime
44 #    SALOMERuntime.RuntimeSALOME_setRuntime()
45 #    salome_runtime = SALOMERuntime.getSALOMERuntime()
46 #    session_catalog = salome_runtime.loadCatalog("session", ior)
47 #    salome_runtime.addCatalog(session_catalog)
48
49     # Get the list of possible ressources
50     ressource_param = salome.ResourceParameters()
51     ressource_param.can_launch_batch_jobs = True
52     rm = salome.lcc.getResourcesManager()
53     cls.ressources = rm.GetFittingResources(ressource_param)
54
55   def verifyFile(self, path, content):
56     try:
57       f = open(path, 'r')
58       text = f.read()
59       f.close()
60       self.assertEqual(text, content)
61     except IOError,ex:
62       self.fail("IO exception:" + str(ex));
63
64   ##############################
65   # test of python_salome job
66   ##############################
67   def test_salome_py_job(self):
68     case_test_dir = os.path.join(TestCompo.test_dir, "salome_py")
69     mkdir_p(case_test_dir)
70
71     old_dir = os.getcwd()
72     os.chdir(case_test_dir)
73
74     # job script
75     script_file = "myScript.py"
76     job_script_file = os.path.join(case_test_dir, script_file)
77     script_text = """#! /usr/bin/env python
78 # -*- coding: utf-8 -*-
79
80 # verify import salome
81 import salome
82 salome.salome_init()
83
84 f = open('result.txt', 'w')
85 f.write("Salut!")
86 f.close()
87
88 import os
89 os.mkdir("subdir")
90 f = open(os.path.join("subdir",'autre.txt'), 'w')
91 f.write("Hello!")
92 f.close()
93 """
94     f = open(job_script_file, "w")
95     f.write(script_text)
96     f.close()
97
98     local_result_dir = os.path.join(case_test_dir, "result_py_job-")
99     job_params = salome.JobParameters()
100     job_params.job_type = "python_salome"
101     job_params.job_file = job_script_file
102     job_params.in_files = []
103     job_params.out_files = ["result.txt", "subdir"]
104     job_params.resource_required = salome.ResourceParameters()
105     job_params.resource_required.nb_proc = 1
106
107     launcher = salome.naming_service.Resolve('/SalomeLauncher')
108
109     for resource in self.ressources:
110       print "Testing python_salome job on ", resource
111       job_params.result_directory = local_result_dir + resource
112       job_params.job_name = "PyJob" + resource
113       job_params.resource_required.name = resource
114       # use default working directory for this test
115
116       job_id = launcher.createJob(job_params)
117       launcher.launchJob(job_id)
118
119       jobState = launcher.getJobState(job_id)
120       print "Job %d state: %s" % (job_id,jobState)
121       while jobState != "FINISHED" and jobState != "FAILED" :
122         time.sleep(5)
123         jobState = launcher.getJobState(job_id)
124         print "Job %d state: %s" % (job_id,jobState)
125         pass
126
127       self.assertEqual(jobState, "FINISHED")
128
129       # getJobResults to default directory (result_directory)
130       launcher.getJobResults(job_id, "")
131       self.verifyFile(os.path.join(job_params.result_directory, "result.txt"),
132                       "Salut!")
133       self.verifyFile(os.path.join(job_params.result_directory,
134                                    "subdir", "autre.txt"),
135                       "Hello!")
136
137       # getJobResults to a specific directory
138       mydir = os.path.join(case_test_dir, "custom_result_dir" + resource)
139       launcher.getJobResults(job_id, mydir)
140       self.verifyFile(os.path.join(mydir, "result.txt"), "Salut!")
141       self.verifyFile(os.path.join(mydir, "subdir", "autre.txt"), "Hello!")
142       pass #for
143
144     os.chdir(old_dir)
145
146   ##############################
147   # test of command job type
148   ##############################
149   def test_command(self):
150     case_test_dir = os.path.join(TestCompo.test_dir, "command")
151     mkdir_p(case_test_dir)
152
153     # job script
154     data_file = "in.txt"
155     script_file = "myEnvScript.py"
156     script_text = """#! /usr/bin/env python
157 # -*- coding: utf-8 -*-
158
159 import os,sys
160
161 text_result = os.getenv("ENV_TEST_VAR","")
162
163 f = open('result.txt', 'w')
164 f.write(text_result)
165 f.close()
166
167 in_f = open("in.txt", "r")
168 in_text = in_f.read()
169 in_f.close()
170
171 os.mkdir("copie")
172 f = open(os.path.join("copie",'copie.txt'), 'w')
173 f.write(in_text)
174 f.close()
175 """
176     abs_script_file = os.path.join(case_test_dir, script_file)
177     f = open(abs_script_file, "w")
178     f.write(script_text)
179     f.close()
180     os.chmod(abs_script_file, 0o755)
181
182     #environment script
183     env_file = "myEnv.sh"
184     env_text = """export ENV_TEST_VAR="expected"
185 """
186     f = open(os.path.join(case_test_dir, env_file), "w")
187     f.write(env_text)
188     f.close()
189
190     # write data file
191     f = open(os.path.join(case_test_dir, data_file), "w")
192     f.write("to be copied")
193     f.close()
194
195     # job params
196     local_result_dir = os.path.join(case_test_dir, "result_com_job-")
197     job_params = salome.JobParameters()
198     job_params.job_type = "command"
199     job_params.job_file = script_file
200     job_params.env_file = env_file
201     job_params.in_files = [data_file]
202     job_params.out_files = ["result.txt", "copie"]
203     job_params.local_directory = case_test_dir
204     job_params.resource_required = salome.ResourceParameters()
205     job_params.resource_required.nb_proc = 1
206
207     # create and launch the job
208     launcher = salome.naming_service.Resolve('/SalomeLauncher')
209     resManager= salome.lcc.getResourcesManager()
210
211     for resource in self.ressources:
212       print "Testing command job on ", resource
213       job_params.result_directory = local_result_dir + resource
214       job_params.job_name = "CommandJob_" + resource
215       job_params.resource_required.name = resource
216
217       # use the working directory of the resource
218       resParams = resManager.GetResourceDefinition(resource)
219       wd = os.path.join(resParams.working_directory,
220                         "CommandJob" + self.suffix)
221       job_params.work_directory = wd
222
223       job_id = launcher.createJob(job_params)
224       launcher.launchJob(job_id)
225       # wait for the end of the job
226       jobState = launcher.getJobState(job_id)
227       print "Job %d state: %s" % (job_id,jobState)
228       while jobState != "FINISHED" and jobState != "FAILED" :
229         time.sleep(3)
230         jobState = launcher.getJobState(job_id)
231         print "Job %d state: %s" % (job_id,jobState)
232         pass
233
234       # verify the results
235       self.assertEqual(jobState, "FINISHED")
236       launcher.getJobResults(job_id, "")
237       self.verifyFile(os.path.join(job_params.result_directory, "result.txt"),
238                       "expected")
239       self.verifyFile(os.path.join(job_params.result_directory,
240                                    "copie",'copie.txt'),
241                       "to be copied")
242
243       # verify getJobWorkFile
244       mydir = os.path.join(case_test_dir, "work_dir" + resource)
245       success = launcher.getJobWorkFile(job_id, "result.txt", mydir)
246       self.assertEqual(success, True)
247       self.verifyFile(os.path.join(mydir, "result.txt"), "expected")
248
249       success = launcher.getJobWorkFile(job_id, "copie", mydir)
250       self.assertEqual(success, True)
251       self.verifyFile(os.path.join(mydir, "copie", "copie.txt"),
252                       "to be copied")
253
254
255   ##############################
256   # test of yacs job type
257   ##############################
258   def test_yacs(self):
259     yacs_path = os.getenv("YACS_ROOT_DIR", "")
260     if not os.path.isdir(yacs_path):
261       self.skipTest("Needs YACS module to run. Please define YACS_ROOT_DIR.")
262
263     case_test_dir = os.path.join(TestCompo.test_dir, "yacs")
264     mkdir_p(case_test_dir)
265
266     #environment script
267     env_file = "myEnv.sh"
268     env_text = """export ENV_TEST_VAR="expected"
269 """
270     f = open(os.path.join(case_test_dir, env_file), "w")
271     f.write(env_text)
272     f.close()
273
274     # job script
275     script_text = """<?xml version='1.0' encoding='iso-8859-1' ?>
276 <proc name="newSchema_1">
277    <property name="DefaultStudyID" value="1"/>
278    <container name="DefaultContainer">
279       <property name="container_kind" value="Salome"/>
280       <property name="attached_on_cloning" value="0"/>
281       <property name="container_name" value="FactoryServer"/>
282       <property name="name" value="localhost"/>
283    </container>
284    <inline name="PyScript0">
285       <script><code><![CDATA[import os
286 text_result = os.getenv("ENV_TEST_VAR","")
287 f = open('result.txt', 'w')
288 f.write(text_result)
289 f.close()
290 ]]></code></script>
291       <load container="DefaultContainer"/>
292    </inline>
293 </proc>
294 """
295     yacs_file = "mySchema.xml"
296     job_script_file = os.path.join(case_test_dir, yacs_file)
297     f = open(job_script_file, "w")
298     f.write(script_text)
299     f.close()
300
301     local_result_dir = os.path.join(case_test_dir, "result_yacs_job-")
302     job_params = salome.JobParameters()
303     job_params.job_type = "yacs_file"
304     job_params.job_file = job_script_file
305     job_params.env_file = os.path.join(case_test_dir,env_file)
306     job_params.out_files = ["result.txt"]
307
308     # define the interval between two YACS schema dumps (3 seconds)
309     import Engines
310     job_params.specific_parameters = [Engines.Parameter("EnableDumpYACS", "3")]
311     job_params.resource_required = salome.ResourceParameters()
312     job_params.resource_required.nb_proc = 1
313
314     launcher = salome.naming_service.Resolve('/SalomeLauncher')
315     resManager= salome.lcc.getResourcesManager()
316
317     for resource in self.ressources:
318       print "Testing yacs job on ", resource
319       job_params.result_directory = local_result_dir + resource
320       job_params.job_name = "YacsJob_" + resource
321       job_params.resource_required.name = resource
322
323       # use the working directory of the resource
324       resParams = resManager.GetResourceDefinition(resource)
325       wd = os.path.join(resParams.working_directory,
326                         "YacsJob" + self.suffix)
327       job_params.work_directory = wd
328
329       job_id = launcher.createJob(job_params)
330       launcher.launchJob(job_id)
331       jobState = launcher.getJobState(job_id)
332
333       yacs_dump_success = False
334       print "Job %d state: %s" % (job_id,jobState)
335       while jobState != "FINISHED" and jobState != "FAILED" :
336         time.sleep(5)
337         jobState = launcher.getJobState(job_id)
338 #        yacs_dump_success = launcher.getJobWorkFile(job_id, "dumpState_mySchema.xml",
339         yacs_dump_success = launcher.getJobDumpState(job_id,
340                                               job_params.result_directory)
341         print "Job %d state: %s - dump: %s" % (job_id,jobState, yacs_dump_success)
342         pass
343
344       self.assertEqual(jobState, "FINISHED")
345
346       # Verify dumpState file is in the results
347       self.assertTrue(yacs_dump_success)
348       dump_file_path = os.path.join(job_params.result_directory,
349                                     "dumpState_mySchema.xml")
350       self.assertTrue(os.path.isfile(dump_file_path))
351
352       # Load the schema state from the dump file and verify the state of a node
353       import SALOMERuntime
354       SALOMERuntime.RuntimeSALOME_setRuntime(1)
355       import loader
356       schema = loader.YACSLoader().load(job_script_file)
357       stateParser = loader.stateParser()
358       sl = loader.stateLoader(stateParser, schema)
359       sl.parse(dump_file_path)
360       # 106 : "DONE" state code
361       self.assertEqual(106, schema.getChildByName("PyScript0").getEffectiveState())
362
363       # getJobResults to default directory (result_directory)
364       launcher.getJobResults(job_id, "")
365       self.verifyFile(os.path.join(job_params.result_directory, "result.txt"),
366                       "expected")
367
368   ##############################
369   # test of yacs job type using "--init_port" driver option
370   ##############################
371   def test_yacsopt(self):
372     yacs_path = os.getenv("YACS_ROOT_DIR", "")
373     if not os.path.isdir(yacs_path):
374       self.skipTest("Needs YACS module to run. Please define YACS_ROOT_DIR.")
375
376     case_test_dir = os.path.join(TestCompo.test_dir, "yacs_opt")
377     mkdir_p(case_test_dir)
378
379     # job script
380     script_text = """<?xml version='1.0' encoding='iso-8859-1' ?>
381 <proc name="myschema">
382    <property name="DefaultStudyID" value="1"/>
383    <type name="string" kind="string"/>
384    <type name="bool" kind="bool"/>
385    <type name="double" kind="double"/>
386    <type name="int" kind="int"/>
387    <container name="DefaultContainer">
388       <property name="container_kind" value="Salome"/>
389       <property name="attached_on_cloning" value="0"/>
390       <property name="container_name" value="FactoryServer"/>
391       <property name="name" value="localhost"/>
392    </container>
393    <inline name="mynode">
394       <script><code><![CDATA[
395 text_result = "i=%s,d=%s,b=%s,s=%s" % (i,d,b,s)
396 f = open('result.txt', 'w')
397 f.write(text_result)
398 f.close()
399 ]]></code></script>
400       <load container="DefaultContainer"/>
401       <inport name="i" type="int"/>
402       <inport name="d" type="double"/>
403       <inport name="b" type="bool"/>
404       <inport name="s" type="string"/>
405    </inline>
406 </proc>
407 """
408     yacs_file = "simpleSchema.xml"
409     job_script_file = os.path.join(case_test_dir, yacs_file)
410     f = open(job_script_file, "w")
411     f.write(script_text)
412     f.close()
413
414     local_result_dir = os.path.join(case_test_dir, "result_yacsopt_job-")
415     job_params = salome.JobParameters()
416     job_params.job_type = "yacs_file"
417     job_params.job_file = job_script_file
418     #job_params.env_file = os.path.join(case_test_dir,env_file)
419     job_params.out_files = ["result.txt"]
420
421     # define the interval between two YACS schema dumps (3 seconds)
422     import Engines
423     job_params.specific_parameters = [Engines.Parameter("YACSDriverOptions",
424                "-imynode.i=5 -imynode.d=3.7 -imynode.b=False -imynode.s=lili")]
425     expected_result="i=5,d=3.7,b=False,s=lili"
426     job_params.resource_required = salome.ResourceParameters()
427     job_params.resource_required.nb_proc = 1
428
429     launcher = salome.naming_service.Resolve('/SalomeLauncher')
430     resManager= salome.lcc.getResourcesManager()
431
432     for resource in self.ressources:
433       print "Testing yacs job with options on ", resource
434       job_params.result_directory = local_result_dir + resource
435       job_params.job_name = "YacsJobOpt_" + resource
436       job_params.resource_required.name = resource
437
438       # use the working directory of the resource
439       resParams = resManager.GetResourceDefinition(resource)
440       wd = os.path.join(resParams.working_directory,
441                         "YacsJobOpt" + self.suffix)
442       job_params.work_directory = wd
443
444       job_id = launcher.createJob(job_params)
445       launcher.launchJob(job_id)
446       jobState = launcher.getJobState(job_id)
447
448       yacs_dump_success = False
449       print "Job %d state: %s" % (job_id,jobState)
450       while jobState != "FINISHED" and jobState != "FAILED" :
451         time.sleep(5)
452         jobState = launcher.getJobState(job_id)
453         print "Job %d state: %s " % (job_id,jobState)
454         pass
455
456       self.assertEqual(jobState, "FINISHED")
457
458       # getJobResults to default directory (result_directory)
459       launcher.getJobResults(job_id, "")
460       self.verifyFile(os.path.join(job_params.result_directory, "result.txt"),
461                       expected_result)
462
463 if __name__ == '__main__':
464     # create study
465     import salome
466     salome.salome_init()
467     unittest.main()