]> SALOME platform Git repositories - modules/kernel.git/blob - src/Launcher/Test/test_launcher.py
Salome HOME
0023299: [CEA] Finalize multi-study removal
[modules/kernel.git] / src / Launcher / Test / test_launcher.py
1 #! /usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import unittest
5 import os
6 import sys
7 import time
8
9 # Test of SalomeLauncher.
10 # This test should be run in the salome environment, using "salome shell"
11 # and a salome application should be running.
12 # The test will try to launch batch jobs on every available ressources which
13 # have the can_launch_batch_jobs parameter set to True.
14 # You can use the environment variable USER_CATALOG_RESOURCES_FILE in order to
15 # define a customised ressource catalog.
16 # If YACS_ROOT_DIR is not set, the test of submitting a YACS schema will be
17 # skiped.
18 class TestCompo(unittest.TestCase):
19   @classmethod
20   def setUpClass(cls):
21     # Prepare the test directory
22     import shutil
23     cls.test_dir = os.path.join(os.getcwd(), "test_dir")
24     cls.suffix = time.strftime("-%Y-%m-%d-%H-%M-%S")
25     shutil.rmtree(cls.test_dir, ignore_errors=True)
26     os.mkdir(cls.test_dir)
27     
28     # load catalogs
29 #    mc = salome.naming_service.Resolve('/Kernel/ModulCatalog')
30 #    ior = salome.orb.object_to_string(mc)
31 #    import SALOMERuntime
32 #    SALOMERuntime.RuntimeSALOME_setRuntime()
33 #    salome_runtime = SALOMERuntime.getSALOMERuntime()
34 #    session_catalog = salome_runtime.loadCatalog("session", ior)
35 #    salome_runtime.addCatalog(session_catalog)
36
37     # Get the list of possible ressources
38     ressource_param = salome.ResourceParameters()
39     ressource_param.can_launch_batch_jobs = True
40     rm = salome.lcc.getResourcesManager()
41     cls.ressources = rm.GetFittingResources(ressource_param)
42
43   def verifyFile(self, path, content):
44     try:
45       f = open(path, 'r')
46       text = f.read()
47       f.close()
48       self.assertEqual(text, content)
49     except IOError,ex:
50       self.fail("IO exception:" + str(ex));
51
52   ##############################
53   # test of python_salome job
54   ##############################
55   def test_salome_py_job(self):
56     case_test_dir = os.path.join(TestCompo.test_dir, "salome_py")
57     os.mkdir(case_test_dir)
58     
59     old_dir = os.getcwd()
60     os.chdir(case_test_dir)
61     
62     # job script
63     script_file = "myScript.py"
64     job_script_file = os.path.join(case_test_dir, script_file)
65     script_text = """#! /usr/bin/env python
66 # -*- coding: utf-8 -*-
67
68 # verify import salome
69 import salome
70 salome.salome_init()
71
72 f = open('result.txt', 'w')
73 f.write("Salut!")
74 f.close()
75
76 import os
77 os.mkdir("subdir")
78 f = open(os.path.join("subdir",'autre.txt'), 'w')
79 f.write("Hello!")
80 f.close()
81 """
82     f = open(job_script_file, "w")
83     f.write(script_text)
84     f.close()
85     
86     local_result_dir = os.path.join(case_test_dir, "result_py_job")
87     job_params = salome.JobParameters()
88     job_params.job_type = "python_salome"
89     job_params.job_file = job_script_file
90     job_params.in_files = []
91     job_params.out_files = ["result.txt", "subdir"]
92     job_params.resource_required = salome.ResourceParameters()
93     job_params.resource_required.nb_proc = 1
94     
95     launcher = salome.naming_service.Resolve('/SalomeLauncher')
96     
97     for resource in self.ressources:
98       print "Testing python_salome job on ", resource
99       job_params.result_directory = local_result_dir + resource
100       job_params.job_name = "PyJob" + resource
101       job_params.resource_required.name = resource
102       # use default working directory for this test
103
104       job_id = launcher.createJob(job_params)
105       launcher.launchJob(job_id)
106
107       jobState = launcher.getJobState(job_id)
108       print "Job %d state: %s" % (job_id,jobState)
109       while jobState != "FINISHED" and jobState != "FAILED" :
110         time.sleep(5)
111         jobState = launcher.getJobState(job_id)
112         print "Job %d state: %s" % (job_id,jobState)
113         pass
114
115       self.assertEqual(jobState, "FINISHED")
116
117       # getJobResults to default directory (result_directory)
118       launcher.getJobResults(job_id, "")
119       self.verifyFile(os.path.join(job_params.result_directory, "result.txt"),
120                       "Salut!")
121       self.verifyFile(os.path.join(job_params.result_directory,
122                                    "subdir", "autre.txt"),
123                       "Hello!")
124
125       # getJobResults to a specific directory
126       mydir = os.path.join(case_test_dir, "custom_result_dir" + resource)
127       launcher.getJobResults(job_id, mydir)
128       self.verifyFile(os.path.join(mydir, "result.txt"), "Salut!")
129       self.verifyFile(os.path.join(mydir, "subdir", "autre.txt"), "Hello!")
130       pass #for
131
132     os.chdir(old_dir)
133     
134   ##############################
135   # test of command job type
136   ##############################
137   def test_command(self):
138     case_test_dir = os.path.join(TestCompo.test_dir, "command")
139     os.mkdir(case_test_dir)
140     
141     # job script
142     data_file = "in.txt"
143     script_file = "myEnvScript.py"
144     script_text = """#! /usr/bin/env python
145 # -*- coding: utf-8 -*-
146
147 import os,sys
148
149 text_result = os.getenv("ENV_TEST_VAR","")
150
151 f = open('result.txt', 'w')
152 f.write(text_result)
153 f.close()
154
155 in_f = open("in.txt", "r")
156 in_text = in_f.read()
157 in_f.close()
158
159 os.mkdir("copie")
160 f = open(os.path.join("copie",'copie.txt'), 'w')
161 f.write(in_text)
162 f.close()
163 """
164     abs_script_file = os.path.join(case_test_dir, script_file)
165     f = open(abs_script_file, "w")
166     f.write(script_text)
167     f.close()
168     os.chmod(abs_script_file, 0o755)
169     
170     #environement script
171     env_file = "myEnv.sh"
172     env_text = """export ENV_TEST_VAR="expected"
173 """
174     f = open(os.path.join(case_test_dir, env_file), "w")
175     f.write(env_text)
176     f.close()
177     
178     # write data file
179     f = open(os.path.join(case_test_dir, data_file), "w")
180     f.write("to be copied")
181     f.close()
182     
183     # job params
184     local_result_dir = os.path.join(case_test_dir, "result_com_job")
185     job_params = salome.JobParameters()
186     job_params.job_type = "command"
187     job_params.job_file = script_file
188     job_params.env_file = env_file
189     job_params.in_files = [data_file]
190     job_params.out_files = ["result.txt", "copie"]
191     job_params.local_directory = case_test_dir
192     job_params.resource_required = salome.ResourceParameters()
193     job_params.resource_required.nb_proc = 1
194     
195     # create and launch the job
196     launcher = salome.naming_service.Resolve('/SalomeLauncher')
197     resManager= salome.lcc.getResourcesManager()
198
199     for resource in self.ressources:
200       print "Testing command job on ", resource
201       job_params.result_directory = local_result_dir + resource
202       job_params.job_name = "CommandJob_" + resource
203       job_params.resource_required.name = resource
204
205       # use the working directory of the resource
206       resParams = resManager.GetResourceDefinition(resource)
207       wd = os.path.join(resParams.working_directory,
208                         "CommandJob_" + self.suffix)
209       job_params.work_directory = wd
210
211       job_id = launcher.createJob(job_params)
212       launcher.launchJob(job_id)
213       # wait for the end of the job
214       jobState = launcher.getJobState(job_id)
215       print "Job %d state: %s" % (job_id,jobState)
216       while jobState != "FINISHED" and jobState != "FAILED" :
217         time.sleep(3)
218         jobState = launcher.getJobState(job_id)
219         print "Job %d state: %s" % (job_id,jobState)
220         pass
221
222       # verify the results
223       self.assertEqual(jobState, "FINISHED")
224       launcher.getJobResults(job_id, "")
225       self.verifyFile(os.path.join(job_params.result_directory, "result.txt"),
226                       "expected")
227       self.verifyFile(os.path.join(job_params.result_directory,
228                                    "copie",'copie.txt'),
229                       "to be copied")
230
231       # verify getJobWorkFile
232       mydir = os.path.join(case_test_dir, "work_dir" + resource)
233       success = launcher.getJobWorkFile(job_id, "result.txt", mydir)
234       self.assertEqual(success, True)
235       self.verifyFile(os.path.join(mydir, "result.txt"), "expected")
236
237       success = launcher.getJobWorkFile(job_id, "copie", mydir)
238       self.assertEqual(success, True)
239       self.verifyFile(os.path.join(mydir, "copie", "copie.txt"),
240                       "to be copied")
241
242
243   ##############################
244   # test of yacs job type
245   ##############################
246   def test_yacs(self):
247     yacs_path = os.getenv("YACS_ROOT_DIR", "")
248     if not os.path.isdir(yacs_path):
249       self.skipTest("Needs YACS module to run. Please define YACS_ROOT_DIR.")
250     
251     case_test_dir = os.path.join(TestCompo.test_dir, "yacs")
252     os.mkdir(case_test_dir)
253     
254     #environement script
255     env_file = "myEnv.sh"
256     env_text = """export ENV_TEST_VAR="expected"
257 """
258     f = open(os.path.join(case_test_dir, env_file), "w")
259     f.write(env_text)
260     f.close()
261     
262     # job script
263     script_text = """<?xml version='1.0' encoding='iso-8859-1' ?>
264 <proc name="newSchema_1">
265    <container name="DefaultContainer">
266       <property name="container_kind" value="Salome"/>
267       <property name="attached_on_cloning" value="0"/>
268       <property name="container_name" value="FactoryServer"/>
269       <property name="name" value="localhost"/>
270    </container>
271    <inline name="PyScript0">
272       <script><code><![CDATA[import os
273 text_result = os.getenv("ENV_TEST_VAR","")
274 f = open('result.txt', 'w')
275 f.write(text_result)
276 f.close()
277 ]]></code></script>
278       <load container="DefaultContainer"/>
279    </inline>
280 </proc>
281 """
282     yacs_file = "mySchema.xml"
283     job_script_file = os.path.join(case_test_dir, yacs_file)
284     f = open(job_script_file, "w")
285     f.write(script_text)
286     f.close()
287     
288     local_result_dir = os.path.join(case_test_dir, "result_yacs_job")
289     job_params = salome.JobParameters()
290     job_params.job_type = "yacs_file"
291     job_params.job_file = job_script_file
292     job_params.env_file = os.path.join(case_test_dir,env_file)
293     job_params.out_files = ["result.txt"]
294     
295     # define the interval between two YACS schema dumps (3 seconds)
296     import Engines
297     job_params.specific_parameters = [Engines.Parameter("EnableDumpYACS", "3")]
298     job_params.resource_required = salome.ResourceParameters()
299     job_params.resource_required.nb_proc = 1
300
301     launcher = salome.naming_service.Resolve('/SalomeLauncher')
302     resManager= salome.lcc.getResourcesManager()
303     
304     for resource in self.ressources:
305       print "Testing yacs job on ", resource
306       job_params.result_directory = local_result_dir + resource
307       job_params.job_name = "YacsJob_" + resource
308       job_params.resource_required.name = resource
309
310       # use the working directory of the resource
311       resParams = resManager.GetResourceDefinition(resource)
312       wd = os.path.join(resParams.working_directory,
313                         "YacsJob_" + self.suffix)
314       job_params.work_directory = wd
315
316       job_id = launcher.createJob(job_params)
317       launcher.launchJob(job_id)
318       jobState = launcher.getJobState(job_id)
319
320       yacs_dump_success = False
321       print "Job %d state: %s" % (job_id,jobState)
322       while jobState != "FINISHED" and jobState != "FAILED" :
323         time.sleep(5)
324         jobState = launcher.getJobState(job_id)
325 #        yacs_dump_success = launcher.getJobWorkFile(job_id, "dumpState_mySchema.xml",
326         yacs_dump_success = launcher.getJobDumpState(job_id,
327                                               job_params.result_directory)
328         print "Job %d state: %s - dump: %s" % (job_id,jobState, yacs_dump_success)
329         pass
330
331       self.assertEqual(jobState, "FINISHED")
332
333       # Verify dumpState file is in the results
334       self.assertTrue(yacs_dump_success)
335       dump_file_path = os.path.join(job_params.result_directory,
336                                     "dumpState_mySchema.xml")
337       self.assertTrue(os.path.isfile(dump_file_path))
338
339       # Load the schema state from the dump file and verify the state of a node
340       import SALOMERuntime
341       SALOMERuntime.RuntimeSALOME_setRuntime(1)
342       import loader
343       schema = loader.YACSLoader().load(job_script_file)
344       stateParser = loader.stateParser()
345       sl = loader.stateLoader(stateParser, schema)
346       sl.parse(dump_file_path)
347       # 106 : "DONE" state code
348       self.assertEqual(106, schema.getChildByName("PyScript0").getEffectiveState())
349
350       # getJobResults to default directory (result_directory)
351       launcher.getJobResults(job_id, "")
352       self.verifyFile(os.path.join(job_params.result_directory, "result.txt"),
353                       "expected")
354     
355   ##############################
356   # test of yacs job type using "--init_port" driver option
357   ##############################
358   def test_yacsopt(self):
359     yacs_path = os.getenv("YACS_ROOT_DIR", "")
360     if not os.path.isdir(yacs_path):
361       self.skipTest("Needs YACS module to run. Please define YACS_ROOT_DIR.")
362     
363     case_test_dir = os.path.join(TestCompo.test_dir, "yacs_opt")
364     os.mkdir(case_test_dir)
365     
366     # job script
367     script_text = """<?xml version='1.0' encoding='iso-8859-1' ?>
368 <proc name="myschema">
369    <type name="string" kind="string"/>
370    <type name="bool" kind="bool"/>
371    <type name="double" kind="double"/>
372    <type name="int" kind="int"/>
373    <container name="DefaultContainer">
374       <property name="container_kind" value="Salome"/>
375       <property name="attached_on_cloning" value="0"/>
376       <property name="container_name" value="FactoryServer"/>
377       <property name="name" value="localhost"/>
378    </container>
379    <inline name="mynode">
380       <script><code><![CDATA[
381 text_result = "i=%s,d=%s,b=%s,s=%s" % (i,d,b,s)
382 f = open('result.txt', 'w')
383 f.write(text_result)
384 f.close()
385 ]]></code></script>
386       <load container="DefaultContainer"/>
387       <inport name="i" type="int"/>
388       <inport name="d" type="double"/>
389       <inport name="b" type="bool"/>
390       <inport name="s" type="string"/>
391    </inline>
392 </proc>
393 """
394     yacs_file = "simpleSchema.xml"
395     job_script_file = os.path.join(case_test_dir, yacs_file)
396     f = open(job_script_file, "w")
397     f.write(script_text)
398     f.close()
399     
400     local_result_dir = os.path.join(case_test_dir, "result_yacsopt_job")
401     job_params = salome.JobParameters()
402     job_params.job_type = "yacs_file"
403     job_params.job_file = job_script_file
404     #job_params.env_file = os.path.join(case_test_dir,env_file)
405     job_params.out_files = ["result.txt"]
406     
407     # define the interval between two YACS schema dumps (3 seconds)
408     import Engines
409     job_params.specific_parameters = [Engines.Parameter("YACSDriverOptions",
410                "-imynode.i=5 -imynode.d=3.7 -imynode.b=False -imynode.s=lili")]
411     expected_result="i=5,d=3.7,b=False,s=lili"
412     job_params.resource_required = salome.ResourceParameters()
413     job_params.resource_required.nb_proc = 1
414
415     launcher = salome.naming_service.Resolve('/SalomeLauncher')
416     resManager= salome.lcc.getResourcesManager()
417     
418     for resource in self.ressources:
419       print "Testing yacs job with options on ", resource
420       job_params.result_directory = local_result_dir + resource
421       job_params.job_name = "YacsJobOpt_" + resource
422       job_params.resource_required.name = resource
423
424       # use the working directory of the resource
425       resParams = resManager.GetResourceDefinition(resource)
426       wd = os.path.join(resParams.working_directory,
427                         "YacsJobOpt_" + self.suffix)
428       job_params.work_directory = wd
429
430       job_id = launcher.createJob(job_params)
431       launcher.launchJob(job_id)
432       jobState = launcher.getJobState(job_id)
433
434       yacs_dump_success = False
435       print "Job %d state: %s" % (job_id,jobState)
436       while jobState != "FINISHED" and jobState != "FAILED" :
437         time.sleep(5)
438         jobState = launcher.getJobState(job_id)
439         print "Job %d state: %s " % (job_id,jobState)
440         pass
441
442       self.assertEqual(jobState, "FINISHED")
443
444       # getJobResults to default directory (result_directory)
445       launcher.getJobResults(job_id, "")
446       self.verifyFile(os.path.join(job_params.result_directory, "result.txt"),
447                       expected_result)
448
449 if __name__ == '__main__':
450     # creat study
451     import salome
452     salome.salome_init()
453     unittest.main()