2 # -*- coding: utf-8 -*-
9 # Test of SalomeLauncher.
10 # This test should be run in the salome environment, using "salome shell"
11 # and a salome application should be running.
12 # The test will try to launch batch jobs on every available ressources which
13 # have the can_launch_batch_jobs parameter set to True.
14 # You can use the environment variable USER_CATALOG_RESOURCES_FILE in order to
15 # define a customised ressource catalog.
16 # If YACS_ROOT_DIR is not set, the test of submitting a YACS schema will be
18 class TestCompo(unittest.TestCase):
21 # Prepare the test directory
23 cls.test_dir = os.path.join(os.getcwd(), "test_dir")
24 cls.suffix = time.strftime("-%Y-%m-%d-%H-%M-%S")
25 shutil.rmtree(cls.test_dir, ignore_errors=True)
26 os.mkdir(cls.test_dir)
29 # mc = salome.naming_service.Resolve('/Kernel/ModulCatalog')
30 # ior = salome.orb.object_to_string(mc)
31 # import SALOMERuntime
32 # SALOMERuntime.RuntimeSALOME_setRuntime()
33 # salome_runtime = SALOMERuntime.getSALOMERuntime()
34 # session_catalog = salome_runtime.loadCatalog("session", ior)
35 # salome_runtime.addCatalog(session_catalog)
37 # Get the list of possible ressources
38 ressource_param = salome.ResourceParameters()
39 ressource_param.can_launch_batch_jobs = True
40 rm = salome.lcc.getResourcesManager()
41 cls.ressources = rm.GetFittingResources(ressource_param)
43 def verifyFile(self, path, content):
48 self.assertEqual(text, content)
50 self.fail("IO exception:" + str(ex));
52 ##############################
53 # test of python_salome job
54 ##############################
55 def test_salome_py_job(self):
56 case_test_dir = os.path.join(TestCompo.test_dir, "salome_py")
57 os.mkdir(case_test_dir)
60 os.chdir(case_test_dir)
63 script_file = "myScript.py"
64 job_script_file = os.path.join(case_test_dir, script_file)
65 script_text = """#! /usr/bin/env python
66 # -*- coding: utf-8 -*-
68 # verify import salome
72 f = open('result.txt', 'w')
78 f = open(os.path.join("subdir",'autre.txt'), 'w')
82 f = open(job_script_file, "w")
86 local_result_dir = os.path.join(case_test_dir, "result_py_job")
87 job_params = salome.JobParameters()
88 job_params.job_type = "python_salome"
89 job_params.job_file = job_script_file
90 job_params.in_files = []
91 job_params.out_files = ["result.txt", "subdir"]
92 job_params.resource_required = salome.ResourceParameters()
93 job_params.resource_required.nb_proc = 1
95 launcher = salome.naming_service.Resolve('/SalomeLauncher')
97 for resource in self.ressources:
98 print "Testing python_salome job on ", resource
99 job_params.result_directory = local_result_dir + resource
100 job_params.job_name = "PyJob" + resource
101 job_params.resource_required.name = resource
102 # use default working directory for this test
104 job_id = launcher.createJob(job_params)
105 launcher.launchJob(job_id)
107 jobState = launcher.getJobState(job_id)
108 print "Job %d state: %s" % (job_id,jobState)
109 while jobState != "FINISHED" and jobState != "FAILED" :
111 jobState = launcher.getJobState(job_id)
112 print "Job %d state: %s" % (job_id,jobState)
115 self.assertEqual(jobState, "FINISHED")
117 # getJobResults to default directory (result_directory)
118 launcher.getJobResults(job_id, "")
119 self.verifyFile(os.path.join(job_params.result_directory, "result.txt"),
121 self.verifyFile(os.path.join(job_params.result_directory,
122 "subdir", "autre.txt"),
125 # getJobResults to a specific directory
126 mydir = os.path.join(case_test_dir, "custom_result_dir" + resource)
127 launcher.getJobResults(job_id, mydir)
128 self.verifyFile(os.path.join(mydir, "result.txt"), "Salut!")
129 self.verifyFile(os.path.join(mydir, "subdir", "autre.txt"), "Hello!")
134 ##############################
135 # test of command job type
136 ##############################
137 def test_command(self):
138 case_test_dir = os.path.join(TestCompo.test_dir, "command")
139 os.mkdir(case_test_dir)
143 script_file = "myEnvScript.py"
144 script_text = """#! /usr/bin/env python
145 # -*- coding: utf-8 -*-
149 text_result = os.getenv("ENV_TEST_VAR","")
151 f = open('result.txt', 'w')
155 in_f = open("in.txt", "r")
156 in_text = in_f.read()
160 f = open(os.path.join("copie",'copie.txt'), 'w')
164 abs_script_file = os.path.join(case_test_dir, script_file)
165 f = open(abs_script_file, "w")
168 os.chmod(abs_script_file, 0o755)
171 env_file = "myEnv.sh"
172 env_text = """export ENV_TEST_VAR="expected"
174 f = open(os.path.join(case_test_dir, env_file), "w")
179 f = open(os.path.join(case_test_dir, data_file), "w")
180 f.write("to be copied")
184 local_result_dir = os.path.join(case_test_dir, "result_com_job")
185 job_params = salome.JobParameters()
186 job_params.job_type = "command"
187 job_params.job_file = script_file
188 job_params.env_file = env_file
189 job_params.in_files = [data_file]
190 job_params.out_files = ["result.txt", "copie"]
191 job_params.local_directory = case_test_dir
192 job_params.resource_required = salome.ResourceParameters()
193 job_params.resource_required.nb_proc = 1
195 # create and launch the job
196 launcher = salome.naming_service.Resolve('/SalomeLauncher')
197 resManager= salome.lcc.getResourcesManager()
199 for resource in self.ressources:
200 print "Testing command job on ", resource
201 job_params.result_directory = local_result_dir + resource
202 job_params.job_name = "CommandJob_" + resource
203 job_params.resource_required.name = resource
205 # use the working directory of the resource
206 resParams = resManager.GetResourceDefinition(resource)
207 wd = os.path.join(resParams.working_directory,
208 "CommandJob_" + self.suffix)
209 job_params.work_directory = wd
211 job_id = launcher.createJob(job_params)
212 launcher.launchJob(job_id)
213 # wait for the end of the job
214 jobState = launcher.getJobState(job_id)
215 print "Job %d state: %s" % (job_id,jobState)
216 while jobState != "FINISHED" and jobState != "FAILED" :
218 jobState = launcher.getJobState(job_id)
219 print "Job %d state: %s" % (job_id,jobState)
223 self.assertEqual(jobState, "FINISHED")
224 launcher.getJobResults(job_id, "")
225 self.verifyFile(os.path.join(job_params.result_directory, "result.txt"),
227 self.verifyFile(os.path.join(job_params.result_directory,
228 "copie",'copie.txt'),
231 # verify getJobWorkFile
232 mydir = os.path.join(case_test_dir, "work_dir" + resource)
233 success = launcher.getJobWorkFile(job_id, "result.txt", mydir)
234 self.assertEqual(success, True)
235 self.verifyFile(os.path.join(mydir, "result.txt"), "expected")
237 success = launcher.getJobWorkFile(job_id, "copie", mydir)
238 self.assertEqual(success, True)
239 self.verifyFile(os.path.join(mydir, "copie", "copie.txt"),
243 ##############################
244 # test of yacs job type
245 ##############################
247 yacs_path = os.getenv("YACS_ROOT_DIR", "")
248 if not os.path.isdir(yacs_path):
249 self.skipTest("Needs YACS module to run. Please define YACS_ROOT_DIR.")
251 case_test_dir = os.path.join(TestCompo.test_dir, "yacs")
252 os.mkdir(case_test_dir)
255 env_file = "myEnv.sh"
256 env_text = """export ENV_TEST_VAR="expected"
258 f = open(os.path.join(case_test_dir, env_file), "w")
263 script_text = """<?xml version='1.0' encoding='iso-8859-1' ?>
264 <proc name="newSchema_1">
265 <container name="DefaultContainer">
266 <property name="container_kind" value="Salome"/>
267 <property name="attached_on_cloning" value="0"/>
268 <property name="container_name" value="FactoryServer"/>
269 <property name="name" value="localhost"/>
271 <inline name="PyScript0">
272 <script><code><![CDATA[import os
273 text_result = os.getenv("ENV_TEST_VAR","")
274 f = open('result.txt', 'w')
278 <load container="DefaultContainer"/>
282 yacs_file = "mySchema.xml"
283 job_script_file = os.path.join(case_test_dir, yacs_file)
284 f = open(job_script_file, "w")
288 local_result_dir = os.path.join(case_test_dir, "result_yacs_job")
289 job_params = salome.JobParameters()
290 job_params.job_type = "yacs_file"
291 job_params.job_file = job_script_file
292 job_params.env_file = os.path.join(case_test_dir,env_file)
293 job_params.out_files = ["result.txt"]
295 # define the interval between two YACS schema dumps (3 seconds)
297 job_params.specific_parameters = [Engines.Parameter("EnableDumpYACS", "3")]
298 job_params.resource_required = salome.ResourceParameters()
299 job_params.resource_required.nb_proc = 1
301 launcher = salome.naming_service.Resolve('/SalomeLauncher')
302 resManager= salome.lcc.getResourcesManager()
304 for resource in self.ressources:
305 print "Testing yacs job on ", resource
306 job_params.result_directory = local_result_dir + resource
307 job_params.job_name = "YacsJob_" + resource
308 job_params.resource_required.name = resource
310 # use the working directory of the resource
311 resParams = resManager.GetResourceDefinition(resource)
312 wd = os.path.join(resParams.working_directory,
313 "YacsJob_" + self.suffix)
314 job_params.work_directory = wd
316 job_id = launcher.createJob(job_params)
317 launcher.launchJob(job_id)
318 jobState = launcher.getJobState(job_id)
320 yacs_dump_success = False
321 print "Job %d state: %s" % (job_id,jobState)
322 while jobState != "FINISHED" and jobState != "FAILED" :
324 jobState = launcher.getJobState(job_id)
325 # yacs_dump_success = launcher.getJobWorkFile(job_id, "dumpState_mySchema.xml",
326 yacs_dump_success = launcher.getJobDumpState(job_id,
327 job_params.result_directory)
328 print "Job %d state: %s - dump: %s" % (job_id,jobState, yacs_dump_success)
331 self.assertEqual(jobState, "FINISHED")
333 # Verify dumpState file is in the results
334 self.assertTrue(yacs_dump_success)
335 dump_file_path = os.path.join(job_params.result_directory,
336 "dumpState_mySchema.xml")
337 self.assertTrue(os.path.isfile(dump_file_path))
339 # Load the schema state from the dump file and verify the state of a node
341 SALOMERuntime.RuntimeSALOME_setRuntime(1)
343 schema = loader.YACSLoader().load(job_script_file)
344 stateParser = loader.stateParser()
345 sl = loader.stateLoader(stateParser, schema)
346 sl.parse(dump_file_path)
347 # 106 : "DONE" state code
348 self.assertEqual(106, schema.getChildByName("PyScript0").getEffectiveState())
350 # getJobResults to default directory (result_directory)
351 launcher.getJobResults(job_id, "")
352 self.verifyFile(os.path.join(job_params.result_directory, "result.txt"),
355 ##############################
356 # test of yacs job type using "--init_port" driver option
357 ##############################
358 def test_yacsopt(self):
359 yacs_path = os.getenv("YACS_ROOT_DIR", "")
360 if not os.path.isdir(yacs_path):
361 self.skipTest("Needs YACS module to run. Please define YACS_ROOT_DIR.")
363 case_test_dir = os.path.join(TestCompo.test_dir, "yacs_opt")
364 os.mkdir(case_test_dir)
367 script_text = """<?xml version='1.0' encoding='iso-8859-1' ?>
368 <proc name="myschema">
369 <type name="string" kind="string"/>
370 <type name="bool" kind="bool"/>
371 <type name="double" kind="double"/>
372 <type name="int" kind="int"/>
373 <container name="DefaultContainer">
374 <property name="container_kind" value="Salome"/>
375 <property name="attached_on_cloning" value="0"/>
376 <property name="container_name" value="FactoryServer"/>
377 <property name="name" value="localhost"/>
379 <inline name="mynode">
380 <script><code><![CDATA[
381 text_result = "i=%s,d=%s,b=%s,s=%s" % (i,d,b,s)
382 f = open('result.txt', 'w')
386 <load container="DefaultContainer"/>
387 <inport name="i" type="int"/>
388 <inport name="d" type="double"/>
389 <inport name="b" type="bool"/>
390 <inport name="s" type="string"/>
394 yacs_file = "simpleSchema.xml"
395 job_script_file = os.path.join(case_test_dir, yacs_file)
396 f = open(job_script_file, "w")
400 local_result_dir = os.path.join(case_test_dir, "result_yacsopt_job")
401 job_params = salome.JobParameters()
402 job_params.job_type = "yacs_file"
403 job_params.job_file = job_script_file
404 #job_params.env_file = os.path.join(case_test_dir,env_file)
405 job_params.out_files = ["result.txt"]
407 # define the interval between two YACS schema dumps (3 seconds)
409 job_params.specific_parameters = [Engines.Parameter("YACSDriverOptions",
410 "-imynode.i=5 -imynode.d=3.7 -imynode.b=False -imynode.s=lili")]
411 expected_result="i=5,d=3.7,b=False,s=lili"
412 job_params.resource_required = salome.ResourceParameters()
413 job_params.resource_required.nb_proc = 1
415 launcher = salome.naming_service.Resolve('/SalomeLauncher')
416 resManager= salome.lcc.getResourcesManager()
418 for resource in self.ressources:
419 print "Testing yacs job with options on ", resource
420 job_params.result_directory = local_result_dir + resource
421 job_params.job_name = "YacsJobOpt_" + resource
422 job_params.resource_required.name = resource
424 # use the working directory of the resource
425 resParams = resManager.GetResourceDefinition(resource)
426 wd = os.path.join(resParams.working_directory,
427 "YacsJobOpt_" + self.suffix)
428 job_params.work_directory = wd
430 job_id = launcher.createJob(job_params)
431 launcher.launchJob(job_id)
432 jobState = launcher.getJobState(job_id)
434 yacs_dump_success = False
435 print "Job %d state: %s" % (job_id,jobState)
436 while jobState != "FINISHED" and jobState != "FAILED" :
438 jobState = launcher.getJobState(job_id)
439 print "Job %d state: %s " % (job_id,jobState)
442 self.assertEqual(jobState, "FINISHED")
444 # getJobResults to default directory (result_directory)
445 launcher.getJobResults(job_id, "")
446 self.verifyFile(os.path.join(job_params.result_directory, "result.txt"),
449 if __name__ == '__main__':