]> SALOME platform Git repositories - modules/kernel.git/blob - src/Launcher/Test/test_launcher.py
Salome HOME
Compilation under Windows: add missing header
[modules/kernel.git] / src / Launcher / Test / test_launcher.py
1 #! /usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 # Copyright (C) 2014-2021  CEA/DEN, EDF R&D
4 #
5 # This library is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation; either
8 # version 2.1 of the License, or (at your option) any later version.
9 #
10 # This library is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 # Lesser General Public License for more details.
14 #
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this library; if not, write to the Free Software
17 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
18 #
19 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
20 #
21
22 import unittest
23 import os
24 import sys
25 import time
26 import tempfile
27 import errno
28
29 def mkdir_p(path):
30   try:
31     os.makedirs(path)
32   except OSError as exc:  # Python >2.5
33     if exc.errno == errno.EEXIST and os.path.isdir(path):
34       pass
35     else:
36       raise
37
38 # Test of SalomeLauncher.
39 # This test should be run in the salome environment, using "salome shell"
40 # and a salome application should be running.
41 # The test will try to launch batch jobs on every available resources which
42 # have the can_launch_batch_jobs parameter set to True.
43 # You can use the environment variable USER_CATALOG_RESOURCES_FILE in order to
44 # define a customised resource catalog.
45 # If YACS_ROOT_DIR is not set, the test of submitting a YACS schema will be
46 # skipped.
47 class TestCompo(unittest.TestCase):
48   @classmethod
49   def setUpClass(cls):
50     # Prepare the test directory
51     temp = tempfile.NamedTemporaryFile()
52     cls.test_dir = os.path.join(temp.name, "test_dir")
53     name = os.path.basename(temp.name)
54     temp.close()
55     cls.suffix = time.strftime("-%Y-%m-%d-%H-%M-%S")+"-%s"%(os.getpid())
56     mkdir_p(cls.test_dir)
57
58     # load catalogs
59 #    mc = salome.naming_service.Resolve('/Kernel/ModulCatalog')
60 #    ior = salome.orb.object_to_string(mc)
61 #    import SALOMERuntime
62 #    SALOMERuntime.RuntimeSALOME_setRuntime()
63 #    salome_runtime = SALOMERuntime.getSALOMERuntime()
64 #    session_catalog = salome_runtime.loadCatalog("session", ior)
65 #    salome_runtime.addCatalog(session_catalog)
66
67     # Get the list of possible ressources
68     ressource_param = salome.ResourceParameters()
69     ressource_param.can_launch_batch_jobs = True
70     rm = salome.lcc.getResourcesManager()
71     cls.ressources = rm.GetFittingResources(ressource_param)
72
73   def verifyFile(self, path, content):
74     try:
75       f = open(path, 'r')
76       text = f.read()
77       f.close()
78       self.assertEqual(text, content)
79     except IOError as ex:
80       self.fail("IO exception:" + str(ex));
81
82   def create_JobParameters(self):
83     job_params = salome.JobParameters()
84     job_params.wckey="P11N0:SALOME" #needed by edf clusters
85     job_params.resource_required = salome.ResourceParameters()
86     job_params.resource_required.nb_proc = 1
87     return job_params
88
89   ##############################
90   # test of python_salome job
91   ##############################
92   def test_salome_py_job(self):
93     case_test_dir = os.path.join(TestCompo.test_dir, "salome_py")
94     mkdir_p(case_test_dir)
95
96     old_dir = os.getcwd()
97     os.chdir(case_test_dir)
98
99     # job script
100     script_file = "myScript.py"
101     job_script_file = os.path.join(case_test_dir, script_file)
102     script_text = """#! /usr/bin/env python3
103 # -*- coding: utf-8 -*-
104
105 # verify import salome
106 import salome
107 salome.salome_init()
108
109 f = open('result.txt', 'w')
110 f.write("Salut!")
111 f.close()
112
113 import os
114 os.mkdir("subdir")
115 f = open(os.path.join("subdir",'autre.txt'), 'w')
116 f.write("Hello!")
117 f.close()
118 """
119     f = open(job_script_file, "w")
120     f.write(script_text)
121     f.close()
122
123     local_result_dir = os.path.join(case_test_dir, "result_py_job-")
124     job_params = self.create_JobParameters()
125     job_params.job_type = "python_salome"
126     job_params.job_file = job_script_file
127     job_params.in_files = []
128     job_params.out_files = ["result.txt", "subdir"]
129
130     launcher = salome.naming_service.Resolve('/SalomeLauncher')
131
132     for resource in self.ressources:
133       print("Testing python_salome job on ", resource)
134       job_params.result_directory = local_result_dir + resource
135       job_params.job_name = "PyJob" + resource
136       job_params.resource_required.name = resource
137       # use default working directory for this test
138
139       job_id = launcher.createJob(job_params)
140       launcher.launchJob(job_id)
141
142       jobState = launcher.getJobState(job_id)
143       print("Job %d state: %s" % (job_id,jobState))
144       while jobState != "FINISHED" and jobState != "FAILED" :
145         time.sleep(5)
146         jobState = launcher.getJobState(job_id)
147         print("Job %d state: %s" % (job_id,jobState))
148         pass
149
150       self.assertEqual(jobState, "FINISHED")
151
152       # getJobResults to default directory (result_directory)
153       launcher.getJobResults(job_id, "")
154       self.verifyFile(os.path.join(job_params.result_directory, "result.txt"),
155                       "Salut!")
156       self.verifyFile(os.path.join(job_params.result_directory,
157                                    "subdir", "autre.txt"),
158                       "Hello!")
159
160       # getJobResults to a specific directory
161       mydir = os.path.join(case_test_dir, "custom_result_dir" + resource)
162       launcher.getJobResults(job_id, mydir)
163       self.verifyFile(os.path.join(mydir, "result.txt"), "Salut!")
164       self.verifyFile(os.path.join(mydir, "subdir", "autre.txt"), "Hello!")
165       pass #for
166
167     os.chdir(old_dir)
168
169   ##############################
170   # test of command job type
171   ##############################
172   def test_command(self):
173     case_test_dir = os.path.join(TestCompo.test_dir, "command")
174     mkdir_p(case_test_dir)
175
176     # job script
177     data_file = "in.txt"
178     script_file = "myEnvScript.py"
179     script_text = """#! /usr/bin/env python3
180 # -*- coding: utf-8 -*-
181
182 import os,sys
183
184 text_result = os.getenv("ENV_TEST_VAR","")
185
186 f = open('result.txt', 'w')
187 f.write(text_result)
188 f.close()
189
190 in_f = open("in.txt", "r")
191 in_text = in_f.read()
192 in_f.close()
193
194 os.mkdir("copie")
195 f = open(os.path.join("copie",'copie.txt'), 'w')
196 f.write(in_text)
197 f.close()
198 """
199     abs_script_file = os.path.join(case_test_dir, script_file)
200     f = open(abs_script_file, "w")
201     f.write(script_text)
202     f.close()
203     os.chmod(abs_script_file, 0o755)
204
205     #environment script
206     env_file = "myEnv.sh"
207     env_text = """export ENV_TEST_VAR="expected"
208 """
209     f = open(os.path.join(case_test_dir, env_file), "w")
210     f.write(env_text)
211     f.close()
212
213     # write data file
214     f = open(os.path.join(case_test_dir, data_file), "w")
215     f.write("to be copied")
216     f.close()
217
218     # job params
219     local_result_dir = os.path.join(case_test_dir, "result_com_job-")
220     job_params = self.create_JobParameters()
221     job_params.job_type = "command"
222     job_params.job_file = script_file
223     job_params.env_file = env_file
224     job_params.in_files = [data_file]
225     job_params.out_files = ["result.txt", "copie"]
226     job_params.local_directory = case_test_dir
227
228     # create and launch the job
229     launcher = salome.naming_service.Resolve('/SalomeLauncher')
230     resManager= salome.lcc.getResourcesManager()
231
232     for resource in self.ressources:
233       print("Testing command job on ", resource)
234       job_params.result_directory = local_result_dir + resource
235       job_params.job_name = "CommandJob_" + resource
236       job_params.resource_required.name = resource
237
238       # use the working directory of the resource
239       resParams = resManager.GetResourceDefinition(resource)
240       wd = os.path.join(resParams.working_directory,
241                         "CommandJob" + self.suffix)
242       job_params.work_directory = wd
243
244       job_id = launcher.createJob(job_params)
245       launcher.launchJob(job_id)
246       # wait for the end of the job
247       jobState = launcher.getJobState(job_id)
248       print("Job %d state: %s" % (job_id,jobState))
249       while jobState != "FINISHED" and jobState != "FAILED" :
250         time.sleep(3)
251         jobState = launcher.getJobState(job_id)
252         print("Job %d state: %s" % (job_id,jobState))
253         pass
254
255       # verify the results
256       self.assertEqual(jobState, "FINISHED")
257       launcher.getJobResults(job_id, "")
258       self.verifyFile(os.path.join(job_params.result_directory, "result.txt"),
259                       "expected")
260       self.verifyFile(os.path.join(job_params.result_directory,
261                                    "copie",'copie.txt'),
262                       "to be copied")
263
264       # verify getJobWorkFile
265       mydir = os.path.join(case_test_dir, "work_dir" + resource)
266       success = launcher.getJobWorkFile(job_id, "result.txt", mydir)
267       self.assertEqual(success, True)
268       self.verifyFile(os.path.join(mydir, "result.txt"), "expected")
269
270       success = launcher.getJobWorkFile(job_id, "copie", mydir)
271       self.assertEqual(success, True)
272       self.verifyFile(os.path.join(mydir, "copie", "copie.txt"),
273                       "to be copied")
274
275
276   ##############################
277   # test of yacs job type
278   ##############################
279   def test_yacs(self):
280     yacs_path = os.getenv("YACS_ROOT_DIR", "")
281     if not os.path.isdir(yacs_path):
282       self.skipTest("Needs YACS module to run. Please define YACS_ROOT_DIR.")
283
284     case_test_dir = os.path.join(TestCompo.test_dir, "yacs")
285     mkdir_p(case_test_dir)
286
287     #environment script
288     env_file = "myEnv.sh"
289     env_text = """export ENV_TEST_VAR="expected"
290 """
291     f = open(os.path.join(case_test_dir, env_file), "w")
292     f.write(env_text)
293     f.close()
294
295     # job script
296     script_text = """<?xml version='1.0' encoding='iso-8859-1' ?>
297 <proc name="newSchema_1">
298    <container name="DefaultContainer">
299       <property name="container_kind" value="Salome"/>
300       <property name="attached_on_cloning" value="0"/>
301       <property name="container_name" value="FactoryServer"/>
302       <property name="name" value="localhost"/>
303    </container>
304    <inline name="PyScript0">
305       <script><code><![CDATA[import os
306 text_result = os.getenv("ENV_TEST_VAR","")
307 f = open('result.txt', 'w')
308 f.write(text_result)
309 f.close()
310 ]]></code></script>
311       <load container="DefaultContainer"/>
312    </inline>
313 </proc>
314 """
315     yacs_file = "mySchema.xml"
316     job_script_file = os.path.join(case_test_dir, yacs_file)
317     f = open(job_script_file, "w")
318     f.write(script_text)
319     f.close()
320
321     local_result_dir = os.path.join(case_test_dir, "result_yacs_job-")
322     job_params = self.create_JobParameters()
323     job_params.job_type = "yacs_file"
324     job_params.job_file = job_script_file
325     job_params.env_file = os.path.join(case_test_dir,env_file)
326     job_params.out_files = ["result.txt"]
327
328     # define the interval between two YACS schema dumps (3 seconds)
329     import Engines
330     job_params.specific_parameters = [Engines.Parameter("EnableDumpYACS", "3")]
331
332     launcher = salome.naming_service.Resolve('/SalomeLauncher')
333     resManager= salome.lcc.getResourcesManager()
334
335     for resource in self.ressources:
336       print("Testing yacs job on ", resource)
337       job_params.result_directory = local_result_dir + resource
338       job_params.job_name = "YacsJob_" + resource
339       job_params.resource_required.name = resource
340
341       # use the working directory of the resource
342       resParams = resManager.GetResourceDefinition(resource)
343       wd = os.path.join(resParams.working_directory,
344                         "YacsJob" + self.suffix)
345       job_params.work_directory = wd
346
347       job_id = launcher.createJob(job_params)
348       launcher.launchJob(job_id)
349       jobState = launcher.getJobState(job_id)
350
351       yacs_dump_success = False
352       print("Job %d state: %s" % (job_id,jobState))
353       while jobState != "FINISHED" and jobState != "FAILED" :
354         time.sleep(5)
355         jobState = launcher.getJobState(job_id)
356 #        yacs_dump_success = launcher.getJobWorkFile(job_id, "dumpState_mySchema.xml",
357         yacs_dump_success = launcher.getJobDumpState(job_id,
358                                               job_params.result_directory)
359         print("Job %d state: %s - dump: %s" % (job_id,jobState, yacs_dump_success))
360         pass
361
362       self.assertEqual(jobState, "FINISHED")
363
364       # Verify dumpState file is in the results
365       self.assertTrue(yacs_dump_success)
366       dump_file_path = os.path.join(job_params.result_directory,
367                                     "dumpState_mySchema.xml")
368       self.assertTrue(os.path.isfile(dump_file_path))
369
370       # Load the schema state from the dump file and verify the state of a node
371       import SALOMERuntime
372       SALOMERuntime.RuntimeSALOME_setRuntime(1)
373       import loader
374       schema = loader.YACSLoader().load(job_script_file)
375       stateParser = loader.stateParser()
376       sl = loader.stateLoader(stateParser, schema)
377       sl.parse(dump_file_path)
378       # 106 : "DONE" state code
379       self.assertEqual(106, schema.getChildByName("PyScript0").getEffectiveState())
380
381       # getJobResults to default directory (result_directory)
382       launcher.getJobResults(job_id, "")
383       self.verifyFile(os.path.join(job_params.result_directory, "result.txt"),
384                       "expected")
385
386   ##############################
387   # test of yacs job type using "--init_port" driver option
388   ##############################
389   def test_yacsopt(self):
390     yacs_path = os.getenv("YACS_ROOT_DIR", "")
391     if not os.path.isdir(yacs_path):
392       self.skipTest("Needs YACS module to run. Please define YACS_ROOT_DIR.")
393
394     case_test_dir = os.path.join(TestCompo.test_dir, "yacs_opt")
395     mkdir_p(case_test_dir)
396
397     # job script
398     script_text = """<?xml version='1.0' encoding='iso-8859-1' ?>
399 <proc name="myschema">
400    <type name="string" kind="string"/>
401    <type name="bool" kind="bool"/>
402    <type name="double" kind="double"/>
403    <type name="int" kind="int"/>
404    <container name="DefaultContainer">
405       <property name="container_kind" value="Salome"/>
406       <property name="attached_on_cloning" value="0"/>
407       <property name="container_name" value="FactoryServer"/>
408       <property name="name" value="localhost"/>
409    </container>
410    <inline name="mynode">
411       <script><code><![CDATA[
412 text_result = "i=%s,d=%s,b=%s,s=%s" % (i,d,b,s)
413 f = open('result.txt', 'w')
414 f.write(text_result)
415 f.close()
416 ]]></code></script>
417       <load container="DefaultContainer"/>
418       <inport name="i" type="int"/>
419       <inport name="d" type="double"/>
420       <inport name="b" type="bool"/>
421       <inport name="s" type="string"/>
422    </inline>
423 </proc>
424 """
425     yacs_file = "simpleSchema.xml"
426     job_script_file = os.path.join(case_test_dir, yacs_file)
427     f = open(job_script_file, "w")
428     f.write(script_text)
429     f.close()
430
431     local_result_dir = os.path.join(case_test_dir, "result_yacsopt_job-")
432     job_params = self.create_JobParameters()
433     job_params.job_type = "yacs_file"
434     job_params.job_file = job_script_file
435     job_params.out_files = ["result.txt"]
436
437     # define the interval between two YACS schema dumps (3 seconds)
438     import Engines
439     job_params.specific_parameters = [Engines.Parameter("YACSDriverOptions",
440                "-imynode.i=5 -imynode.d=3.7 -imynode.b=False -imynode.s=lili")]
441     expected_result="i=5,d=3.7,b=False,s=lili"
442
443     launcher = salome.naming_service.Resolve('/SalomeLauncher')
444     resManager= salome.lcc.getResourcesManager()
445
446     for resource in self.ressources:
447       print("Testing yacs job with options on ", resource)
448       job_params.result_directory = local_result_dir + resource
449       job_params.job_name = "YacsJobOpt_" + resource
450       job_params.resource_required.name = resource
451
452       # use the working directory of the resource
453       resParams = resManager.GetResourceDefinition(resource)
454       wd = os.path.join(resParams.working_directory,
455                         "YacsJobOpt" + self.suffix)
456       job_params.work_directory = wd
457
458       job_id = launcher.createJob(job_params)
459       launcher.launchJob(job_id)
460       jobState = launcher.getJobState(job_id)
461
462       yacs_dump_success = False
463       print("Job %d state: %s" % (job_id,jobState))
464       while jobState != "FINISHED" and jobState != "FAILED" :
465         time.sleep(5)
466         jobState = launcher.getJobState(job_id)
467         print("Job %d state: %s " % (job_id,jobState))
468         pass
469
470       self.assertEqual(jobState, "FINISHED")
471
472       # getJobResults to default directory (result_directory)
473       launcher.getJobResults(job_id, "")
474       self.verifyFile(os.path.join(job_params.result_directory, "result.txt"),
475                       expected_result)
476
477   ############################################
478   # test of command job type with pre_command
479   ############################################
480   def test_command_pre(self):
481     case_test_dir = os.path.join(TestCompo.test_dir, "command_pre")
482     mkdir_p(case_test_dir)
483
484     # command to be run before the job
485     pre_command = "pre_command.sh"
486     pre_command_text = "echo 'it works!' > in.txt"
487     abs_pre_command_file = os.path.join(case_test_dir, pre_command)
488     f = open(abs_pre_command_file, "w")
489     f.write(pre_command_text)
490     f.close()
491     os.chmod(abs_pre_command_file, 0o755)
492     
493     # job script
494     script_file = "myTestScript.py"
495     script_text = """#! /usr/bin/env python3
496 # -*- coding: utf-8 -*-
497
498 in_f = open("in.txt", "r")
499 in_text = in_f.read()
500 in_f.close()
501
502 f = open('result.txt', 'w')
503 f.write(in_text)
504 f.close()
505 """
506     abs_script_file = os.path.join(case_test_dir, script_file)
507     f = open(abs_script_file, "w")
508     f.write(script_text)
509     f.close()
510     os.chmod(abs_script_file, 0o755)
511
512     # job params
513     local_result_dir = os.path.join(case_test_dir, "result_com_pre_job-")
514     job_params = self.create_JobParameters()
515     job_params.job_type = "command"
516     job_params.job_file = script_file
517     job_params.pre_command = pre_command
518     job_params.in_files = []
519     job_params.out_files = ["result.txt"]
520     job_params.local_directory = case_test_dir
521
522     # create and launch the job
523     launcher = salome.naming_service.Resolve('/SalomeLauncher')
524     resManager= salome.lcc.getResourcesManager()
525
526     for resource in self.ressources:
527       print("Testing command job on ", resource)
528       job_params.result_directory = local_result_dir + resource
529       job_params.job_name = "CommandPreJob_" + resource
530       job_params.resource_required.name = resource
531
532       # use the working directory of the resource
533       resParams = resManager.GetResourceDefinition(resource)
534       wd = os.path.join(resParams.working_directory,
535                         "CommandPreJob" + self.suffix)
536       job_params.work_directory = wd
537
538       job_id = launcher.createJob(job_params)
539       launcher.launchJob(job_id)
540       # wait for the end of the job
541       jobState = launcher.getJobState(job_id)
542       print("Job %d state: %s" % (job_id,jobState))
543       while jobState != "FINISHED" and jobState != "FAILED" :
544         time.sleep(3)
545         jobState = launcher.getJobState(job_id)
546         print("Job %d state: %s" % (job_id,jobState))
547         pass
548
549       # verify the results
550       self.assertEqual(jobState, "FINISHED")
551       launcher.getJobResults(job_id, "")
552       self.verifyFile(os.path.join(job_params.result_directory, "result.txt"),
553                       "it works!\n")
554
555   #################################
556   # test of command salome job type
557   #################################
558   def test_command_salome(self):
559     case_test_dir = os.path.join(TestCompo.test_dir, "command_salome")
560     mkdir_p(case_test_dir)
561
562     # job script
563     data_file = "in.txt"
564     script_file = "myEnvScript.py"
565     script_text = """#! /usr/bin/env python3
566 # -*- coding: utf-8 -*-
567
568 import os,sys
569 # verify import salome
570 import salome
571
572 text_result = os.getenv("ENV_TEST_VAR","")
573
574 f = open('result.txt', 'w')
575 f.write(text_result)
576 f.close()
577
578 in_f = open("in.txt", "r")
579 in_text = in_f.read()
580 in_f.close()
581
582 os.mkdir("copie")
583 f = open(os.path.join("copie",'copie.txt'), 'w')
584 f.write(in_text)
585 f.close()
586 """
587     abs_script_file = os.path.join(case_test_dir, script_file)
588     f = open(abs_script_file, "w")
589     f.write(script_text)
590     f.close()
591     os.chmod(abs_script_file, 0o755)
592
593     #environment script
594     env_file = "myEnv.sh"
595     env_text = """export ENV_TEST_VAR="expected"
596 """
597     f = open(os.path.join(case_test_dir, env_file), "w")
598     f.write(env_text)
599     f.close()
600
601     # write data file
602     f = open(os.path.join(case_test_dir, data_file), "w")
603     f.write("to be copied")
604     f.close()
605
606     # job params
607     local_result_dir = os.path.join(case_test_dir, "result_comsalome_job-")
608     job_params = self.create_JobParameters()
609     job_params.job_type = "command_salome"
610     job_params.job_file = script_file
611     job_params.env_file = env_file
612     job_params.in_files = [data_file]
613     job_params.out_files = ["result.txt", "copie"]
614     job_params.local_directory = case_test_dir
615
616     # create and launch the job
617     launcher = salome.naming_service.Resolve('/SalomeLauncher')
618     resManager= salome.lcc.getResourcesManager()
619
620     for resource in self.ressources:
621       print("Testing command salome job on ", resource)
622       job_params.result_directory = local_result_dir + resource
623       job_params.job_name = "CommandSalomeJob_" + resource
624       job_params.resource_required.name = resource
625
626       # use the working directory of the resource
627       resParams = resManager.GetResourceDefinition(resource)
628       wd = os.path.join(resParams.working_directory,
629                         "CommandSalomeJob" + self.suffix)
630       job_params.work_directory = wd
631
632       job_id = launcher.createJob(job_params)
633       launcher.launchJob(job_id)
634       # wait for the end of the job
635       jobState = launcher.getJobState(job_id)
636       print("Job %d state: %s" % (job_id,jobState))
637       while jobState != "FINISHED" and jobState != "FAILED" :
638         time.sleep(3)
639         jobState = launcher.getJobState(job_id)
640         print("Job %d state: %s" % (job_id,jobState))
641         pass
642
643       # verify the results
644       self.assertEqual(jobState, "FINISHED")
645       launcher.getJobResults(job_id, "")
646       self.verifyFile(os.path.join(job_params.result_directory, "result.txt"),
647                       "expected")
648       self.verifyFile(os.path.join(job_params.result_directory,
649                                    "copie",'copie.txt'),
650                       "to be copied")
651
652       # verify getJobWorkFile
653       mydir = os.path.join(case_test_dir, "work_dir" + resource)
654       success = launcher.getJobWorkFile(job_id, "result.txt", mydir)
655       self.assertEqual(success, True)
656       self.verifyFile(os.path.join(mydir, "result.txt"), "expected")
657
658       success = launcher.getJobWorkFile(job_id, "copie", mydir)
659       self.assertEqual(success, True)
660       self.verifyFile(os.path.join(mydir, "copie", "copie.txt"),
661                       "to be copied")
662       pass
663     pass
664   pass
665
666 if __name__ == '__main__':
667     # create study
668     import salome
669     salome.salome_init()
670     unittest.main()