Salome HOME
86d5d61aa20731fb7f2027d3560d0f7b1e4f9a49
[modules/kernel.git] / src / Launcher / Test / test_launcher.py
1 #! /usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import unittest
5 import os
6 import sys
7 import time
8
9 # Test of SalomeLauncher.
10 # This test should be run in the salome environment, using "salome shell"
11 # and a salome application should be running.
12 # The test will try to launch batch jobs on every available ressources which
13 # have the can_launch_batch_jobs parameter set to True.
14 # You can use the environment variable USER_CATALOG_RESOURCES_FILE in order to
15 # define a customised ressource catalog.
16 # If YACS_ROOT_DIR is not set, the test of submitting a YACS schema will be
17 # skiped.
18 class TestCompo(unittest.TestCase):
19   @classmethod
20   def setUpClass(cls):
21     # Prepare the test directory
22     import shutil
23     cls.test_dir = os.path.join(os.getcwd(), "test_dir")
24     cls.suffix = time.strftime("-%Y-%m-%d-%H-%M-%S")
25     shutil.rmtree(cls.test_dir, ignore_errors=True)
26     os.mkdir(cls.test_dir)
27     
28     # load catalogs
29 #    mc = salome.naming_service.Resolve('/Kernel/ModulCatalog')
30 #    ior = salome.orb.object_to_string(mc)
31 #    import SALOMERuntime
32 #    SALOMERuntime.RuntimeSALOME_setRuntime()
33 #    salome_runtime = SALOMERuntime.getSALOMERuntime()
34 #    session_catalog = salome_runtime.loadCatalog("session", ior)
35 #    salome_runtime.addCatalog(session_catalog)
36
37     # Get the list of possible ressources
38     ressource_param = salome.ResourceParameters()
39     ressource_param.can_launch_batch_jobs = True
40     rm = salome.lcc.getResourcesManager()
41     cls.ressources = rm.GetFittingResources(ressource_param)
42
43   def verifyFile(self, path, content):
44     try:
45       f = open(path, 'r')
46       text = f.read()
47       f.close()
48       self.assertEqual(text, content)
49     except IOError,ex:
50       self.fail("IO exception:" + str(ex));
51
52   ##############################
53   # test of python_salome job
54   ##############################
55   def test_salome_py_job(self):
56     case_test_dir = os.path.join(TestCompo.test_dir, "salome_py")
57     os.mkdir(case_test_dir)
58     
59     old_dir = os.getcwd()
60     os.chdir(case_test_dir)
61     
62     # job script
63     script_file = "myScript.py"
64     job_script_file = os.path.join(case_test_dir, script_file)
65     script_text = """#! /usr/bin/env python
66 # -*- coding: utf-8 -*-
67
68 # verify import salome
69 import salome
70 salome.salome_init()
71
72 f = open('result.txt', 'w')
73 f.write("Salut!")
74 f.close()
75
76 import os
77 os.mkdir("subdir")
78 f = open(os.path.join("subdir",'autre.txt'), 'w')
79 f.write("Hello!")
80 f.close()
81 """
82     f = open(job_script_file, "w")
83     f.write(script_text)
84     f.close()
85     
86     local_result_dir = os.path.join(case_test_dir, "result_py_job")
87     job_params = salome.JobParameters()
88     job_params.job_type = "python_salome"
89     job_params.job_file = job_script_file
90     job_params.in_files = []
91     job_params.out_files = ["result.txt", "subdir"]
92     job_params.resource_required = salome.ResourceParameters()
93     job_params.resource_required.nb_proc = 1
94     
95     launcher = salome.naming_service.Resolve('/SalomeLauncher')
96     
97     for resource in self.ressources:
98       print "Testing python_salome job on ", resource
99       job_params.result_directory = local_result_dir + resource
100       job_params.job_name = "PyJob" + resource
101       job_params.resource_required.name = resource
102       # use default working directory for this test
103
104       job_id = launcher.createJob(job_params)
105       launcher.launchJob(job_id)
106
107       jobState = launcher.getJobState(job_id)
108       print "Job %d state: %s" % (job_id,jobState)
109       while jobState != "FINISHED" and jobState != "FAILED" :
110         time.sleep(5)
111         jobState = launcher.getJobState(job_id)
112         print "Job %d state: %s" % (job_id,jobState)
113         pass
114
115       self.assertEqual(jobState, "FINISHED")
116
117       # getJobResults to default directory (result_directory)
118       launcher.getJobResults(job_id, "")
119       self.verifyFile(os.path.join(job_params.result_directory, "result.txt"),
120                       "Salut!")
121       self.verifyFile(os.path.join(job_params.result_directory,
122                                    "subdir", "autre.txt"),
123                       "Hello!")
124
125       # getJobResults to a specific directory
126       mydir = os.path.join(case_test_dir, "custom_result_dir" + resource)
127       launcher.getJobResults(job_id, mydir)
128       self.verifyFile(os.path.join(mydir, "result.txt"), "Salut!")
129       self.verifyFile(os.path.join(mydir, "subdir", "autre.txt"), "Hello!")
130       pass #for
131
132     os.chdir(old_dir)
133     
134   ##############################
135   # test of command job type
136   ##############################
137   def test_command(self):
138     case_test_dir = os.path.join(TestCompo.test_dir, "command")
139     os.mkdir(case_test_dir)
140     
141     # job script
142     data_file = "in.txt"
143     script_file = "myEnvScript.py"
144     script_text = """#! /usr/bin/env python
145 # -*- coding: utf-8 -*-
146
147 import os,sys
148
149 text_result = os.getenv("ENV_TEST_VAR","")
150
151 f = open('result.txt', 'w')
152 f.write(text_result)
153 f.close()
154
155 in_f = open("in.txt", "r")
156 in_text = in_f.read()
157 in_f.close()
158
159 os.mkdir("copie")
160 f = open(os.path.join("copie",'copie.txt'), 'w')
161 f.write(in_text)
162 f.close()
163 """
164     abs_script_file = os.path.join(case_test_dir, script_file)
165     f = open(abs_script_file, "w")
166     f.write(script_text)
167     f.close()
168     os.chmod(abs_script_file, 0o755)
169     
170     #environement script
171     env_file = "myEnv.sh"
172     env_text = """export ENV_TEST_VAR="expected"
173 """
174     f = open(os.path.join(case_test_dir, env_file), "w")
175     f.write(env_text)
176     f.close()
177     
178     # write data file
179     f = open(os.path.join(case_test_dir, data_file), "w")
180     f.write("to be copied")
181     f.close()
182     
183     # job params
184     local_result_dir = os.path.join(case_test_dir, "result_com_job")
185     job_params = salome.JobParameters()
186     job_params.job_type = "command"
187     job_params.job_file = script_file
188     job_params.env_file = env_file
189     job_params.in_files = [data_file]
190     job_params.out_files = ["result.txt", "copie"]
191     job_params.local_directory = case_test_dir
192     job_params.resource_required = salome.ResourceParameters()
193     job_params.resource_required.nb_proc = 1
194     
195     # create and launch the job
196     launcher = salome.naming_service.Resolve('/SalomeLauncher')
197     resManager= salome.lcc.getResourcesManager()
198
199     for resource in self.ressources:
200       print "Testing command job on ", resource
201       job_params.result_directory = local_result_dir + resource
202       job_params.job_name = "CommandJob_" + resource
203       job_params.resource_required.name = resource
204
205       # use the working directory of the resource
206       resParams = resManager.GetResourceDefinition(resource)
207       wd = os.path.join(resParams.working_directory,
208                         "CommandJob_" + self.suffix)
209       job_params.work_directory = wd
210
211       job_id = launcher.createJob(job_params)
212       launcher.launchJob(job_id)
213       # wait for the end of the job
214       jobState = launcher.getJobState(job_id)
215       print "Job %d state: %s" % (job_id,jobState)
216       while jobState != "FINISHED" and jobState != "FAILED" :
217         time.sleep(3)
218         jobState = launcher.getJobState(job_id)
219         print "Job %d state: %s" % (job_id,jobState)
220         pass
221
222       # verify the results
223       self.assertEqual(jobState, "FINISHED")
224       launcher.getJobResults(job_id, "")
225       self.verifyFile(os.path.join(job_params.result_directory, "result.txt"),
226                       "expected")
227       self.verifyFile(os.path.join(job_params.result_directory,
228                                    "copie",'copie.txt'),
229                       "to be copied")
230
231       # verify getJobWorkFile
232       mydir = os.path.join(case_test_dir, "work_dir" + resource)
233       success = launcher.getJobWorkFile(job_id, "result.txt", mydir)
234       self.assertEqual(success, True)
235       self.verifyFile(os.path.join(mydir, "result.txt"), "expected")
236
237       success = launcher.getJobWorkFile(job_id, "copie", mydir)
238       self.assertEqual(success, True)
239       self.verifyFile(os.path.join(mydir, "copie", "copie.txt"),
240                       "to be copied")
241
242
243   ##############################
244   # test of yacs job type
245   ##############################
246   def test_yacs(self):
247     yacs_path = os.getenv("YACS_ROOT_DIR", "")
248     if not os.path.isdir(yacs_path):
249       self.skipTest("Needs YACS module to run. Please define YACS_ROOT_DIR.")
250     
251     case_test_dir = os.path.join(TestCompo.test_dir, "yacs")
252     os.mkdir(case_test_dir)
253     
254     #environement script
255     env_file = "myEnv.sh"
256     env_text = """export ENV_TEST_VAR="expected"
257 """
258     f = open(os.path.join(case_test_dir, env_file), "w")
259     f.write(env_text)
260     f.close()
261     
262     # job script
263     script_text = """<?xml version='1.0' encoding='iso-8859-1' ?>
264 <proc name="newSchema_1">
265    <property name="DefaultStudyID" value="1"/>
266    <container name="DefaultContainer">
267       <property name="container_kind" value="Salome"/>
268       <property name="attached_on_cloning" value="0"/>
269       <property name="container_name" value="FactoryServer"/>
270       <property name="name" value="localhost"/>
271    </container>
272    <inline name="PyScript0">
273       <script><code><![CDATA[import os
274 text_result = os.getenv("ENV_TEST_VAR","")
275 f = open('result.txt', 'w')
276 f.write(text_result)
277 f.close()
278 ]]></code></script>
279       <load container="DefaultContainer"/>
280    </inline>
281 </proc>
282 """
283     yacs_file = "mySchema.xml"
284     job_script_file = os.path.join(case_test_dir, yacs_file)
285     f = open(job_script_file, "w")
286     f.write(script_text)
287     f.close()
288     
289     local_result_dir = os.path.join(case_test_dir, "result_yacs_job")
290     job_params = salome.JobParameters()
291     job_params.job_type = "yacs_file"
292     job_params.job_file = job_script_file
293     job_params.env_file = os.path.join(case_test_dir,env_file)
294     job_params.out_files = ["result.txt"]
295     
296     # define the interval between two YACS schema dumps (3 seconds)
297     import Engines
298     job_params.specific_parameters = [Engines.Parameter("EnableDumpYACS", "3")]
299     job_params.resource_required = salome.ResourceParameters()
300     job_params.resource_required.nb_proc = 1
301
302     launcher = salome.naming_service.Resolve('/SalomeLauncher')
303     resManager= salome.lcc.getResourcesManager()
304     
305     for resource in self.ressources:
306       print "Testing yacs job on ", resource
307       job_params.result_directory = local_result_dir + resource
308       job_params.job_name = "YacsJob_" + resource
309       job_params.resource_required.name = resource
310
311       # use the working directory of the resource
312       resParams = resManager.GetResourceDefinition(resource)
313       wd = os.path.join(resParams.working_directory,
314                         "YacsJob_" + self.suffix)
315       job_params.work_directory = wd
316
317       job_id = launcher.createJob(job_params)
318       launcher.launchJob(job_id)
319       jobState = launcher.getJobState(job_id)
320
321       yacs_dump_success = False
322       print "Job %d state: %s" % (job_id,jobState)
323       while jobState != "FINISHED" and jobState != "FAILED" :
324         time.sleep(5)
325         jobState = launcher.getJobState(job_id)
326 #        yacs_dump_success = launcher.getJobWorkFile(job_id, "dumpState_mySchema.xml",
327         yacs_dump_success = launcher.getJobDumpState(job_id,
328                                               job_params.result_directory)
329         print "Job %d state: %s - dump: %s" % (job_id,jobState, yacs_dump_success)
330         pass
331
332       self.assertEqual(jobState, "FINISHED")
333
334       # Verify dumpState file is in the results
335       self.assertTrue(yacs_dump_success)
336       dump_file_path = os.path.join(job_params.result_directory,
337                                     "dumpState_mySchema.xml")
338       self.assertTrue(os.path.isfile(dump_file_path))
339
340       # Load the schema state from the dump file and verify the state of a node
341       import SALOMERuntime
342       SALOMERuntime.RuntimeSALOME_setRuntime(1)
343       import loader
344       schema = loader.YACSLoader().load(job_script_file)
345       stateParser = loader.stateParser()
346       sl = loader.stateLoader(stateParser, schema)
347       sl.parse(dump_file_path)
348       # 106 : "DONE" state code
349       self.assertEqual(106, schema.getChildByName("PyScript0").getEffectiveState())
350
351       # getJobResults to default directory (result_directory)
352       launcher.getJobResults(job_id, "")
353       self.verifyFile(os.path.join(job_params.result_directory, "result.txt"),
354                       "expected")
355     
356 if __name__ == '__main__':
357     # creat study
358     import salome
359     salome.salome_init()
360     unittest.main()