Salome HOME
updated copyright message
[modules/kernel.git] / src / Launcher / Test / test_launcher.py
1 #! /usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 # Copyright (C) 2014-2023  CEA, EDF
4 #
5 # This library is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation; either
8 # version 2.1 of the License, or (at your option) any later version.
9 #
10 # This library is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 # Lesser General Public License for more details.
14 #
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this library; if not, write to the Free Software
17 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
18 #
19 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
20 #
21
22 import unittest
23 import os
24 import sys
25 import time
26 import tempfile
27 import errno
28
29 def mkdir_p(path):
30   try:
31     os.makedirs(path)
32   except OSError as exc:  # Python >2.5
33     if exc.errno == errno.EEXIST and os.path.isdir(path):
34       pass
35     else:
36       raise
37
38 # Test of SalomeLauncher.
39 # This test should be run in the salome environment, using "salome shell"
40 # and a salome application should be running.
41 # The test will try to launch batch jobs on every available resources which
42 # have the can_launch_batch_jobs parameter set to True.
43 # You can use the environment variable USER_CATALOG_RESOURCES_FILE in order to
44 # define a customised resource catalog.
45 # If YACS_ROOT_DIR is not set, the test of submitting a YACS schema will be
46 # skipped.
47 class TestCompo(unittest.TestCase):
48   @classmethod
49   def setUpClass(cls):
50     # Prepare the test directory
51     temp = tempfile.NamedTemporaryFile()
52     cls.test_dir = os.path.join(temp.name, "test_dir")
53     name = os.path.basename(temp.name)
54     temp.close()
55     cls.suffix = time.strftime("-%Y-%m-%d-%H-%M-%S")+"-%s"%(os.getpid())
56     mkdir_p(cls.test_dir)
57
58     # load catalogs
59 #    mc = salome.naming_service.Resolve('/Kernel/ModulCatalog')
60 #    ior = salome.orb.object_to_string(mc)
61 #    import SALOMERuntime
62 #    SALOMERuntime.RuntimeSALOME_setRuntime()
63 #    salome_runtime = SALOMERuntime.getSALOMERuntime()
64 #    session_catalog = salome_runtime.loadCatalog("session", ior)
65 #    salome_runtime.addCatalog(session_catalog)
66
67     # Get the list of possible ressources
68     ressource_param = salome.ResourceParameters()
69     ressource_param.can_launch_batch_jobs = True
70     rm = salome.lcc.getResourcesManager()
71     cls.ressources = rm.GetFittingResources(ressource_param)
72
73   def verifyFile(self, path, content):
74     try:
75       f = open(path, 'r')
76       text = f.read()
77       f.close()
78       self.assertEqual(text, content)
79     except IOError as ex:
80       self.fail("IO exception:" + str(ex));
81
82   def create_JobParameters(self):
83     job_params = salome.JobParameters()
84     job_params.wckey="P11N0:SALOME" #needed by edf clusters
85     job_params.resource_required = salome.ResourceParameters()
86     job_params.resource_required.nb_proc = 1
87     return job_params
88
89   ##############################
90   # test of python_salome job
91   ##############################
92   def test_salome_py_job(self):
93     case_test_dir = os.path.join(TestCompo.test_dir, "salome_py")
94     mkdir_p(case_test_dir)
95
96     old_dir = os.getcwd()
97     os.chdir(case_test_dir)
98
99     # job script
100     script_file = "myScript.py"
101     job_script_file = os.path.join(case_test_dir, script_file)
102     script_text = """#! /usr/bin/env python3
103 # -*- coding: utf-8 -*-
104
105 # verify import salome
106 import salome
107 salome.standalone()
108 salome.salome_init()
109
110 f = open('result.txt', 'w')
111 f.write("Salut!")
112 f.close()
113
114 import os
115 os.mkdir("subdir")
116 f = open(os.path.join("subdir",'autre.txt'), 'w')
117 f.write("Hello!")
118 f.close()
119 """
120     f = open(job_script_file, "w")
121     f.write(script_text)
122     f.close()
123
124     local_result_dir = os.path.join(case_test_dir, "result_py_job-")
125     job_params = self.create_JobParameters()
126     job_params.job_type = "python_salome"
127     job_params.job_file = job_script_file
128     job_params.in_files = []
129     job_params.out_files = ["result.txt", "subdir"]
130
131     launcher = salome.naming_service.Resolve('/SalomeLauncher')
132
133     for resource in self.ressources:
134       print("Testing python_salome job on ", resource)
135       job_params.result_directory = local_result_dir + resource
136       job_params.job_name = "PyJob" + resource
137       job_params.resource_required.name = resource
138       # use default working directory for this test
139
140       job_id = launcher.createJob(job_params)
141       launcher.launchJob(job_id)
142
143       jobState = launcher.getJobState(job_id)
144       print("Job %d state: %s" % (job_id,jobState))
145       while jobState != "FINISHED" and jobState != "FAILED" :
146         time.sleep(5)
147         jobState = launcher.getJobState(job_id)
148         print("Job %d state: %s" % (job_id,jobState))
149         pass
150
151       self.assertEqual(jobState, "FINISHED")
152
153       # getJobResults to default directory (result_directory)
154       launcher.getJobResults(job_id, "")
155       self.verifyFile(os.path.join(job_params.result_directory, "result.txt"),
156                       "Salut!")
157       self.verifyFile(os.path.join(job_params.result_directory,
158                                    "subdir", "autre.txt"),
159                       "Hello!")
160
161       # getJobResults to a specific directory
162       mydir = os.path.join(case_test_dir, "custom_result_dir" + resource)
163       launcher.getJobResults(job_id, mydir)
164       self.verifyFile(os.path.join(mydir, "result.txt"), "Salut!")
165       self.verifyFile(os.path.join(mydir, "subdir", "autre.txt"), "Hello!")
166       pass #for
167
168     os.chdir(old_dir)
169
170   ##############################
171   # test of command job type
172   ##############################
173   def test_command(self):
174     case_test_dir = os.path.join(TestCompo.test_dir, "command")
175     mkdir_p(case_test_dir)
176
177     # job script
178     data_file = "in.txt"
179     script_file = "myEnvScript.py"
180     script_text = """#! /usr/bin/env python3
181 # -*- coding: utf-8 -*-
182
183 import os,sys
184
185 text_result = os.getenv("ENV_TEST_VAR","")
186
187 f = open('result.txt', 'w')
188 f.write(text_result)
189 f.close()
190
191 in_f = open("in.txt", "r")
192 in_text = in_f.read()
193 in_f.close()
194
195 os.mkdir("copie")
196 f = open(os.path.join("copie",'copie.txt'), 'w')
197 f.write(in_text)
198 f.close()
199 """
200     abs_script_file = os.path.join(case_test_dir, script_file)
201     f = open(abs_script_file, "w")
202     f.write(script_text)
203     f.close()
204     os.chmod(abs_script_file, 0o755)
205
206     #environment script
207     env_file = "myEnv.sh"
208     env_text = """export ENV_TEST_VAR="expected"
209 """
210     f = open(os.path.join(case_test_dir, env_file), "w")
211     f.write(env_text)
212     f.close()
213
214     # write data file
215     f = open(os.path.join(case_test_dir, data_file), "w")
216     f.write("to be copied")
217     f.close()
218
219     # job params
220     local_result_dir = os.path.join(case_test_dir, "result_com_job-")
221     job_params = self.create_JobParameters()
222     job_params.job_type = "command"
223     job_params.job_file = script_file
224     job_params.env_file = env_file
225     job_params.in_files = [data_file]
226     job_params.out_files = ["result.txt", "copie"]
227     job_params.local_directory = case_test_dir
228
229     # create and launch the job
230     launcher = salome.naming_service.Resolve('/SalomeLauncher')
231     resManager= salome.lcc.getResourcesManager()
232
233     for resource in self.ressources:
234       print("Testing command job on ", resource)
235       job_params.result_directory = local_result_dir + resource
236       job_params.job_name = "CommandJob_" + resource
237       job_params.resource_required.name = resource
238
239       # use the working directory of the resource
240       resParams = resManager.GetResourceDefinition(resource)
241       wd = os.path.join(resParams.working_directory,
242                         "CommandJob" + self.suffix)
243       job_params.work_directory = wd
244
245       job_id = launcher.createJob(job_params)
246       launcher.launchJob(job_id)
247       # wait for the end of the job
248       jobState = launcher.getJobState(job_id)
249       print("Job %d state: %s" % (job_id,jobState))
250       while jobState != "FINISHED" and jobState != "FAILED" :
251         time.sleep(3)
252         jobState = launcher.getJobState(job_id)
253         print("Job %d state: %s" % (job_id,jobState))
254         pass
255
256       # verify the results
257       self.assertEqual(jobState, "FINISHED")
258       launcher.getJobResults(job_id, "")
259       self.verifyFile(os.path.join(job_params.result_directory, "result.txt"),
260                       "expected")
261       self.verifyFile(os.path.join(job_params.result_directory,
262                                    "copie",'copie.txt'),
263                       "to be copied")
264
265       # verify getJobWorkFile
266       mydir = os.path.join(case_test_dir, "work_dir" + resource)
267       success = launcher.getJobWorkFile(job_id, "result.txt", mydir)
268       self.assertEqual(success, True)
269       self.verifyFile(os.path.join(mydir, "result.txt"), "expected")
270
271       success = launcher.getJobWorkFile(job_id, "copie", mydir)
272       self.assertEqual(success, True)
273       self.verifyFile(os.path.join(mydir, "copie", "copie.txt"),
274                       "to be copied")
275
276
277   ##############################
278   # test of yacs job type
279   ##############################
280   def test_yacs(self):
281     yacs_path = os.getenv("YACS_ROOT_DIR", "")
282     if not os.path.isdir(yacs_path):
283       self.skipTest("Needs YACS module to run. Please define YACS_ROOT_DIR.")
284
285     case_test_dir = os.path.join(TestCompo.test_dir, "yacs")
286     mkdir_p(case_test_dir)
287
288     #environment script
289     env_file = "myEnv.sh"
290     env_text = """export ENV_TEST_VAR="expected"
291 """
292     f = open(os.path.join(case_test_dir, env_file), "w")
293     f.write(env_text)
294     f.close()
295
296     # job script
297     script_text = """<?xml version='1.0' encoding='iso-8859-1' ?>
298 <proc name="newSchema_1">
299    <container name="DefaultContainer">
300       <property name="container_kind" value="Salome"/>
301       <property name="attached_on_cloning" value="0"/>
302       <property name="container_name" value="FactoryServer"/>
303       <property name="name" value="localhost"/>
304    </container>
305    <inline name="PyScript0">
306       <script><code><![CDATA[import os
307 text_result = os.getenv("ENV_TEST_VAR","")
308 f = open('result.txt', 'w')
309 f.write(text_result)
310 f.close()
311 ]]></code></script>
312       <load container="DefaultContainer"/>
313    </inline>
314 </proc>
315 """
316     yacs_file = "mySchema.xml"
317     job_script_file = os.path.join(case_test_dir, yacs_file)
318     f = open(job_script_file, "w")
319     f.write(script_text)
320     f.close()
321
322     local_result_dir = os.path.join(case_test_dir, "result_yacs_job-")
323     job_params = self.create_JobParameters()
324     job_params.job_type = "yacs_file"
325     job_params.job_file = job_script_file
326     job_params.env_file = os.path.join(case_test_dir,env_file)
327     job_params.out_files = ["result.txt"]
328
329     # define the interval between two YACS schema dumps (3 seconds)
330     import Engines
331     job_params.specific_parameters = [Engines.Parameter("EnableDumpYACS", "3")]
332
333     launcher = salome.naming_service.Resolve('/SalomeLauncher')
334     resManager= salome.lcc.getResourcesManager()
335
336     for resource in self.ressources:
337       print("Testing yacs job on ", resource)
338       job_params.result_directory = local_result_dir + resource
339       job_params.job_name = "YacsJob_" + resource
340       job_params.resource_required.name = resource
341
342       # use the working directory of the resource
343       resParams = resManager.GetResourceDefinition(resource)
344       wd = os.path.join(resParams.working_directory,
345                         "YacsJob" + self.suffix)
346       job_params.work_directory = wd
347
348       job_id = launcher.createJob(job_params)
349       launcher.launchJob(job_id)
350       jobState = launcher.getJobState(job_id)
351
352       yacs_dump_success = False
353       print("Job %d state: %s" % (job_id,jobState))
354       while jobState != "FINISHED" and jobState != "FAILED" :
355         time.sleep(5)
356         jobState = launcher.getJobState(job_id)
357 #        yacs_dump_success = launcher.getJobWorkFile(job_id, "dumpState_mySchema.xml",
358         yacs_dump_success = launcher.getJobDumpState(job_id,
359                                               job_params.result_directory)
360         print("Job %d state: %s - dump: %s" % (job_id,jobState, yacs_dump_success))
361         pass
362
363       self.assertEqual(jobState, "FINISHED")
364
365       # Verify dumpState file is in the results
366       self.assertTrue(yacs_dump_success)
367       dump_file_path = os.path.join(job_params.result_directory,
368                                     "dumpState_mySchema.xml")
369       self.assertTrue(os.path.isfile(dump_file_path))
370
371       # Load the schema state from the dump file and verify the state of a node
372       import SALOMERuntime
373       SALOMERuntime.RuntimeSALOME_setRuntime(1)
374       import loader
375       schema = loader.YACSLoader().load(job_script_file)
376       stateParser = loader.stateParser()
377       sl = loader.stateLoader(stateParser, schema)
378       sl.parse(dump_file_path)
379       # 106 : "DONE" state code
380       self.assertEqual(106, schema.getChildByName("PyScript0").getEffectiveState())
381
382       # getJobResults to default directory (result_directory)
383       launcher.getJobResults(job_id, "")
384       self.verifyFile(os.path.join(job_params.result_directory, "result.txt"),
385                       "expected")
386
387   ##############################
388   # test of yacs job type using "--init_port" driver option
389   ##############################
390   def test_yacsopt(self):
391     yacs_path = os.getenv("YACS_ROOT_DIR", "")
392     if not os.path.isdir(yacs_path):
393       self.skipTest("Needs YACS module to run. Please define YACS_ROOT_DIR.")
394
395     case_test_dir = os.path.join(TestCompo.test_dir, "yacs_opt")
396     mkdir_p(case_test_dir)
397
398     # job script
399     script_text = """<?xml version='1.0' encoding='iso-8859-1' ?>
400 <proc name="myschema">
401    <type name="string" kind="string"/>
402    <type name="bool" kind="bool"/>
403    <type name="double" kind="double"/>
404    <type name="int" kind="int"/>
405    <container name="DefaultContainer">
406       <property name="container_kind" value="Salome"/>
407       <property name="attached_on_cloning" value="0"/>
408       <property name="container_name" value="FactoryServer"/>
409       <property name="name" value="localhost"/>
410    </container>
411    <inline name="mynode">
412       <script><code><![CDATA[
413 text_result = "i=%s,d=%s,b=%s,s=%s" % (i,d,b,s)
414 f = open('result.txt', 'w')
415 f.write(text_result)
416 f.close()
417 ]]></code></script>
418       <load container="DefaultContainer"/>
419       <inport name="i" type="int"/>
420       <inport name="d" type="double"/>
421       <inport name="b" type="bool"/>
422       <inport name="s" type="string"/>
423    </inline>
424 </proc>
425 """
426     yacs_file = "simpleSchema.xml"
427     job_script_file = os.path.join(case_test_dir, yacs_file)
428     f = open(job_script_file, "w")
429     f.write(script_text)
430     f.close()
431
432     local_result_dir = os.path.join(case_test_dir, "result_yacsopt_job-")
433     job_params = self.create_JobParameters()
434     job_params.job_type = "yacs_file"
435     job_params.job_file = job_script_file
436     job_params.out_files = ["result.txt"]
437
438     # define the interval between two YACS schema dumps (3 seconds)
439     import Engines
440     job_params.specific_parameters = [Engines.Parameter("YACSDriverOptions",
441                "-imynode.i=5 -imynode.d=3.7 -imynode.b=False -imynode.s=lili")]
442     expected_result="i=5,d=3.7,b=False,s=lili"
443
444     launcher = salome.naming_service.Resolve('/SalomeLauncher')
445     resManager= salome.lcc.getResourcesManager()
446
447     for resource in self.ressources:
448       print("Testing yacs job with options on ", resource)
449       job_params.result_directory = local_result_dir + resource
450       job_params.job_name = "YacsJobOpt_" + resource
451       job_params.resource_required.name = resource
452
453       # use the working directory of the resource
454       resParams = resManager.GetResourceDefinition(resource)
455       wd = os.path.join(resParams.working_directory,
456                         "YacsJobOpt" + self.suffix)
457       job_params.work_directory = wd
458
459       job_id = launcher.createJob(job_params)
460       launcher.launchJob(job_id)
461       jobState = launcher.getJobState(job_id)
462
463       yacs_dump_success = False
464       print("Job %d state: %s" % (job_id,jobState))
465       while jobState != "FINISHED" and jobState != "FAILED" :
466         time.sleep(5)
467         jobState = launcher.getJobState(job_id)
468         print("Job %d state: %s " % (job_id,jobState))
469         pass
470
471       self.assertEqual(jobState, "FINISHED")
472
473       # getJobResults to default directory (result_directory)
474       launcher.getJobResults(job_id, "")
475       self.verifyFile(os.path.join(job_params.result_directory, "result.txt"),
476                       expected_result)
477
478   ############################################
479   # test of command job type with pre_command
480   ############################################
481   def test_command_pre(self):
482     case_test_dir = os.path.join(TestCompo.test_dir, "command_pre")
483     mkdir_p(case_test_dir)
484
485     # command to be run before the job
486     pre_command = "pre_command.sh"
487     pre_command_text = "echo 'it works!' > in.txt"
488     abs_pre_command_file = os.path.join(case_test_dir, pre_command)
489     f = open(abs_pre_command_file, "w")
490     f.write(pre_command_text)
491     f.close()
492     os.chmod(abs_pre_command_file, 0o755)
493     
494     # job script
495     script_file = "myTestScript.py"
496     script_text = """#! /usr/bin/env python3
497 # -*- coding: utf-8 -*-
498
499 in_f = open("in.txt", "r")
500 in_text = in_f.read()
501 in_f.close()
502
503 f = open('result.txt', 'w')
504 f.write(in_text)
505 f.close()
506 """
507     abs_script_file = os.path.join(case_test_dir, script_file)
508     f = open(abs_script_file, "w")
509     f.write(script_text)
510     f.close()
511     os.chmod(abs_script_file, 0o755)
512
513     # job params
514     local_result_dir = os.path.join(case_test_dir, "result_com_pre_job-")
515     job_params = self.create_JobParameters()
516     job_params.job_type = "command"
517     job_params.job_file = script_file
518     job_params.pre_command = pre_command
519     job_params.in_files = []
520     job_params.out_files = ["result.txt"]
521     job_params.local_directory = case_test_dir
522
523     # create and launch the job
524     launcher = salome.naming_service.Resolve('/SalomeLauncher')
525     resManager= salome.lcc.getResourcesManager()
526
527     for resource in self.ressources:
528       print("Testing command job on ", resource)
529       job_params.result_directory = local_result_dir + resource
530       job_params.job_name = "CommandPreJob_" + resource
531       job_params.resource_required.name = resource
532
533       # use the working directory of the resource
534       resParams = resManager.GetResourceDefinition(resource)
535       wd = os.path.join(resParams.working_directory,
536                         "CommandPreJob" + self.suffix)
537       job_params.work_directory = wd
538
539       job_id = launcher.createJob(job_params)
540       launcher.launchJob(job_id)
541       # wait for the end of the job
542       jobState = launcher.getJobState(job_id)
543       print("Job %d state: %s" % (job_id,jobState))
544       while jobState != "FINISHED" and jobState != "FAILED" :
545         time.sleep(3)
546         jobState = launcher.getJobState(job_id)
547         print("Job %d state: %s" % (job_id,jobState))
548         pass
549
550       # verify the results
551       self.assertEqual(jobState, "FINISHED")
552       launcher.getJobResults(job_id, "")
553       self.verifyFile(os.path.join(job_params.result_directory, "result.txt"),
554                       "it works!\n")
555
556   #################################
557   # test of command salome job type
558   #################################
559   def test_command_salome(self):
560     case_test_dir = os.path.join(TestCompo.test_dir, "command_salome")
561     mkdir_p(case_test_dir)
562
563     # job script
564     data_file = "in.txt"
565     script_file = "myEnvScript.py"
566     script_text = """#! /usr/bin/env python3
567 # -*- coding: utf-8 -*-
568
569 import os,sys
570 # verify import salome
571 import salome
572
573 text_result = os.getenv("ENV_TEST_VAR","")
574
575 f = open('result.txt', 'w')
576 f.write(text_result)
577 f.close()
578
579 in_f = open("in.txt", "r")
580 in_text = in_f.read()
581 in_f.close()
582
583 os.mkdir("copie")
584 f = open(os.path.join("copie",'copie.txt'), 'w')
585 f.write(in_text)
586 f.close()
587 """
588     abs_script_file = os.path.join(case_test_dir, script_file)
589     f = open(abs_script_file, "w")
590     f.write(script_text)
591     f.close()
592     os.chmod(abs_script_file, 0o755)
593
594     #environment script
595     env_file = "myEnv.sh"
596     env_text = """export ENV_TEST_VAR="expected"
597 """
598     f = open(os.path.join(case_test_dir, env_file), "w")
599     f.write(env_text)
600     f.close()
601
602     # write data file
603     f = open(os.path.join(case_test_dir, data_file), "w")
604     f.write("to be copied")
605     f.close()
606
607     # job params
608     local_result_dir = os.path.join(case_test_dir, "result_comsalome_job-")
609     job_params = self.create_JobParameters()
610     job_params.job_type = "command_salome"
611     job_params.job_file = script_file
612     job_params.env_file = env_file
613     job_params.in_files = [data_file]
614     job_params.out_files = ["result.txt", "copie"]
615     job_params.local_directory = case_test_dir
616
617     # create and launch the job
618     launcher = salome.naming_service.Resolve('/SalomeLauncher')
619     resManager= salome.lcc.getResourcesManager()
620
621     for resource in self.ressources:
622       print("Testing command salome job on ", resource)
623       job_params.result_directory = local_result_dir + resource
624       job_params.job_name = "CommandSalomeJob_" + resource
625       job_params.resource_required.name = resource
626
627       # use the working directory of the resource
628       resParams = resManager.GetResourceDefinition(resource)
629       wd = os.path.join(resParams.working_directory,
630                         "CommandSalomeJob" + self.suffix)
631       job_params.work_directory = wd
632
633       job_id = launcher.createJob(job_params)
634       launcher.launchJob(job_id)
635       # wait for the end of the job
636       jobState = launcher.getJobState(job_id)
637       print("Job %d state: %s" % (job_id,jobState))
638       while jobState != "FINISHED" and jobState != "FAILED" :
639         time.sleep(3)
640         jobState = launcher.getJobState(job_id)
641         print("Job %d state: %s" % (job_id,jobState))
642         pass
643
644       # verify the results
645       self.assertEqual(jobState, "FINISHED")
646       launcher.getJobResults(job_id, "")
647       self.verifyFile(os.path.join(job_params.result_directory, "result.txt"),
648                       "expected")
649       self.verifyFile(os.path.join(job_params.result_directory,
650                                    "copie",'copie.txt'),
651                       "to be copied")
652
653       # verify getJobWorkFile
654       mydir = os.path.join(case_test_dir, "work_dir" + resource)
655       success = launcher.getJobWorkFile(job_id, "result.txt", mydir)
656       self.assertEqual(success, True)
657       self.verifyFile(os.path.join(mydir, "result.txt"), "expected")
658
659       success = launcher.getJobWorkFile(job_id, "copie", mydir)
660       self.assertEqual(success, True)
661       self.verifyFile(os.path.join(mydir, "copie", "copie.txt"),
662                       "to be copied")
663       pass
664     pass
665   pass
666
667 if __name__ == '__main__':
668     # create study
669     import salome
670     salome.standalone()
671     salome.salome_init()
672     unittest.main()