Salome HOME
Merge branch 'V9_11_BR'
[modules/kernel.git] / src / Launcher_SWIG / Test / test_swig_launcher.py
1 #! /usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 # Copyright (C) 2014-2023  CEA, EDF
4 #
5 # This library is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation; either
8 # version 2.1 of the License, or (at your option) any later version.
9 #
10 # This library is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 # Lesser General Public License for more details.
14 #
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this library; if not, write to the Free Software
17 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
18 #
19 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
20 #
21
22 import unittest
23 import os
24 import sys
25 import time
26 import tempfile
27 import errno
28
29 def mkdir_p(path):
30   try:
31     os.makedirs(path)
32   except OSError as exc:  # Python >2.5
33     if exc.errno == errno.EEXIST and os.path.isdir(path):
34       pass
35     else:
36       raise
37
38 import pylauncher
39 def createLauncher():
40   launcher = pylauncher.Launcher_cpp()
41   launcher.SetResourcesManager(createResourcesManager())
42   return launcher
43
44 def createResourcesManager():
45   # localhost is defined anyway, even if the catalog file does not exist.
46   catalog_path = os.environ.get("USER_CATALOG_RESOURCES_FILE", "")
47   return pylauncher.ResourcesManager_cpp(catalog_path)
48
49 def createJobParameters():
50   jp = pylauncher.JobParameters_cpp()
51   jp.resource_required = createResourceParameters()
52   return jp
53
54 def createResourceParameters():
55   return pylauncher.resourceParams()
56
57 # Test of SalomeLauncher.
58 # This test should be run in the salome environment, using "salome shell".
59 # It does not need a salome application running.
60 # The test will try to launch batch jobs on every available resources which
61 # have the can_launch_batch_jobs parameter set to True.
62 # You can use the environment variable USER_CATALOG_RESOURCES_FILE in order to
63 # define a customised resource catalog.
64 # If YACS_ROOT_DIR is not set, the test of submitting a YACS schema will be
65 # skipped.
66 class TestCompo(unittest.TestCase):
67   @classmethod
68   def setUpClass(cls):
69     # Prepare the test directory
70     temp = tempfile.NamedTemporaryFile()
71     cls.test_dir = os.path.join(temp.name, "test_dir")
72     name = os.path.basename(temp.name)
73     temp.close()
74     cls.suffix = time.strftime("-%Y-%m-%d-%H-%M-%S")+"-%s"%(os.getpid())
75     mkdir_p(cls.test_dir)
76
77     # Get the list of possible ressources
78     ressource_param = createResourceParameters()
79     ressource_param.can_launch_batch_jobs = True
80     rm = createResourcesManager()
81     cls.ressources = rm.GetFittingResources(ressource_param)
82
83   def verifyFile(self, path, content):
84     try:
85       f = open(path, 'r')
86       text = f.read()
87       f.close()
88       self.assertEqual(text, content)
89     except IOError as ex:
90       self.fail("IO exception:" + str(ex));
91
92   def create_JobParameters(self):
93     job_params = createJobParameters()
94     job_params.wckey="P11U5:CARBONES" #needed by edf clusters
95     job_params.resource_required.nb_proc = 1
96     return job_params
97
98   ##############################
99   # test of python_salome job
100   ##############################
101   def test_salome_py_job(self):
102     case_test_dir = os.path.join(TestCompo.test_dir, "salome_py")
103     mkdir_p(case_test_dir)
104
105     old_dir = os.getcwd()
106     os.chdir(case_test_dir)
107
108     # job script
109     script_file = "myScript.py"
110     job_script_file = os.path.join(case_test_dir, script_file)
111     script_text = """#! /usr/bin/env python3
112 # -*- coding: utf-8 -*-
113
114 # verify import salome
115 import salome
116 salome.salome_init()
117
118 f = open('result.txt', 'w')
119 f.write("Salut!")
120 f.close()
121
122 import os
123 os.mkdir("subdir")
124 f = open(os.path.join("subdir",'autre.txt'), 'w')
125 f.write("Hello!")
126 f.close()
127 """
128     f = open(job_script_file, "w")
129     f.write(script_text)
130     f.close()
131
132     local_result_dir = os.path.join(case_test_dir, "result_py_job-")
133     job_params = self.create_JobParameters()
134     job_params.job_type = "python_salome"
135     job_params.job_file = job_script_file
136     job_params.in_files = []
137     job_params.out_files = ["result.txt", "subdir"]
138
139     launcher = createLauncher()
140
141     for resource in self.ressources:
142       print("Testing python_salome job on ", resource)
143       job_params.result_directory = local_result_dir + resource
144       job_params.job_name = "PyJob" + resource
145       job_params.resource_required.name = resource
146       # use default working directory for this test
147
148       job_id = launcher.createJob(job_params)
149       launcher.launchJob(job_id)
150
151       jobState = launcher.getJobState(job_id)
152       print("Job %d state: %s" % (job_id,jobState))
153       while jobState != "FINISHED" and jobState != "FAILED" :
154         time.sleep(5)
155         jobState = launcher.getJobState(job_id)
156         print("Job %d state: %s" % (job_id,jobState))
157         pass
158
159       self.assertEqual(jobState, "FINISHED")
160
161       # getJobResults to default directory (result_directory)
162       launcher.getJobResults(job_id, "")
163       self.verifyFile(os.path.join(job_params.result_directory, "result.txt"),
164                       "Salut!")
165       self.verifyFile(os.path.join(job_params.result_directory,
166                                    "subdir", "autre.txt"),
167                       "Hello!")
168
169       # getJobResults to a specific directory
170       mydir = os.path.join(case_test_dir, "custom_result_dir" + resource)
171       launcher.getJobResults(job_id, mydir)
172       self.verifyFile(os.path.join(mydir, "result.txt"), "Salut!")
173       self.verifyFile(os.path.join(mydir, "subdir", "autre.txt"), "Hello!")
174       pass #for
175
176     os.chdir(old_dir)
177
178   ##############################
179   # test of command job type
180   ##############################
181   def test_command(self):
182     case_test_dir = os.path.join(TestCompo.test_dir, "command")
183     mkdir_p(case_test_dir)
184
185     # job script
186     data_file = "in.txt"
187     script_file = "myEnvScript.py"
188     script_text = """#! /usr/bin/env python3
189 # -*- coding: utf-8 -*-
190
191 import os,sys
192
193 text_result = os.getenv("ENV_TEST_VAR","")
194
195 f = open('result.txt', 'w')
196 f.write(text_result)
197 f.close()
198
199 in_f = open("in.txt", "r")
200 in_text = in_f.read()
201 in_f.close()
202
203 os.mkdir("copie")
204 f = open(os.path.join("copie",'copie.txt'), 'w')
205 f.write(in_text)
206 f.close()
207 """
208     abs_script_file = os.path.join(case_test_dir, script_file)
209     f = open(abs_script_file, "w")
210     f.write(script_text)
211     f.close()
212     os.chmod(abs_script_file, 0o755)
213
214     #environment script
215     env_file = "myEnv.sh"
216     env_text = """export ENV_TEST_VAR="expected"
217 """
218     f = open(os.path.join(case_test_dir, env_file), "w")
219     f.write(env_text)
220     f.close()
221
222     # write data file
223     f = open(os.path.join(case_test_dir, data_file), "w")
224     f.write("to be copied")
225     f.close()
226
227     # job params
228     local_result_dir = os.path.join(case_test_dir, "result_com_job-")
229     job_params = self.create_JobParameters()
230     job_params.job_type = "command"
231     job_params.job_file = script_file
232     job_params.env_file = env_file
233     job_params.in_files = [data_file]
234     job_params.out_files = ["result.txt", "copie"]
235     job_params.local_directory = case_test_dir
236
237     # create and launch the job
238     launcher = createLauncher()
239     resManager= createResourcesManager()
240
241     for resource in self.ressources:
242       print("Testing command job on ", resource)
243       job_params.result_directory = local_result_dir + resource
244       job_params.job_name = "CommandJob_" + resource
245       job_params.resource_required.name = resource
246
247       # use the working directory of the resource
248       resParams = resManager.GetResourceDefinition(resource)
249       wd = os.path.join(resParams.working_directory,
250                         "CommandJob" + self.suffix)
251       job_params.work_directory = wd
252
253       job_id = launcher.createJob(job_params)
254       launcher.launchJob(job_id)
255       # wait for the end of the job
256       jobState = launcher.getJobState(job_id)
257       print("Job %d state: %s" % (job_id,jobState))
258       while jobState != "FINISHED" and jobState != "FAILED" :
259         time.sleep(3)
260         jobState = launcher.getJobState(job_id)
261         print("Job %d state: %s" % (job_id,jobState))
262         pass
263
264       # verify the results
265       self.assertEqual(jobState, "FINISHED")
266       launcher.getJobResults(job_id, "")
267       self.verifyFile(os.path.join(job_params.result_directory, "result.txt"),
268                       "expected")
269       self.verifyFile(os.path.join(job_params.result_directory,
270                                    "copie",'copie.txt'),
271                       "to be copied")
272
273       # verify getJobWorkFile
274       mydir = os.path.join(case_test_dir, "work_dir" + resource)
275       success = launcher.getJobWorkFile(job_id, "result.txt", mydir)
276       self.assertEqual(success, True)
277       self.verifyFile(os.path.join(mydir, "result.txt"), "expected")
278
279       success = launcher.getJobWorkFile(job_id, "copie", mydir)
280       self.assertEqual(success, True)
281       self.verifyFile(os.path.join(mydir, "copie", "copie.txt"),
282                       "to be copied")
283
284
285   ##############################
286   # test of yacs job type
287   ##############################
288   def test_yacs(self):
289     yacs_path = os.getenv("YACS_ROOT_DIR", "")
290     if not os.path.isdir(yacs_path):
291       self.skipTest("Needs YACS module to run. Please define YACS_ROOT_DIR.")
292
293     case_test_dir = os.path.join(TestCompo.test_dir, "yacs")
294     mkdir_p(case_test_dir)
295
296     #environment script
297     env_file = "myEnv.sh"
298     env_text = """export ENV_TEST_VAR="expected"
299 """
300     f = open(os.path.join(case_test_dir, env_file), "w")
301     f.write(env_text)
302     f.close()
303
304     # job script
305     script_text = """<?xml version='1.0' encoding='iso-8859-1' ?>
306 <proc name="newSchema_1">
307    <container name="DefaultContainer">
308       <property name="container_kind" value="Salome"/>
309       <property name="attached_on_cloning" value="0"/>
310       <property name="container_name" value="FactoryServer"/>
311       <property name="name" value="localhost"/>
312    </container>
313    <inline name="PyScript0">
314       <script><code><![CDATA[import os
315 text_result = os.getenv("ENV_TEST_VAR","")
316 f = open('result.txt', 'w')
317 f.write(text_result)
318 f.close()
319 ]]></code></script>
320       <load container="DefaultContainer"/>
321    </inline>
322 </proc>
323 """
324     yacs_file = "mySchema.xml"
325     job_script_file = os.path.join(case_test_dir, yacs_file)
326     f = open(job_script_file, "w")
327     f.write(script_text)
328     f.close()
329
330     local_result_dir = os.path.join(case_test_dir, "result_yacs_job-")
331     job_params = self.create_JobParameters()
332     job_params.job_type = "yacs_file"
333     job_params.job_file = job_script_file
334     job_params.env_file = os.path.join(case_test_dir,env_file)
335     job_params.out_files = ["result.txt"]
336
337     # define the interval between two YACS schema dumps (3 seconds)
338     #import Engines
339     #job_params.specific_parameters = [Engines.Parameter("EnableDumpYACS", "3")]
340     job_params.specific_parameters = {"EnableDumpYACS": "3"}
341
342     launcher = createLauncher()
343     resManager= createResourcesManager()
344
345     for resource in self.ressources:
346       print("Testing yacs job on ", resource)
347       job_params.result_directory = local_result_dir + resource
348       job_params.job_name = "YacsJob_" + resource
349       job_params.resource_required.name = resource
350
351       # use the working directory of the resource
352       resParams = resManager.GetResourceDefinition(resource)
353       wd = os.path.join(resParams.working_directory,
354                         "YacsJob" + self.suffix)
355       job_params.work_directory = wd
356
357       job_id = launcher.createJob(job_params)
358       launcher.launchJob(job_id)
359       jobState = launcher.getJobState(job_id)
360
361       yacs_dump_success = False
362       print("Job %d state: %s" % (job_id,jobState))
363       while jobState != "FINISHED" and jobState != "FAILED" :
364         time.sleep(5)
365         jobState = launcher.getJobState(job_id)
366 #        yacs_dump_success = launcher.getJobWorkFile(job_id, "dumpState_mySchema.xml",
367         yacs_dump_success = launcher.getJobDumpState(job_id,
368                                               job_params.result_directory)
369         print("Job %d state: %s - dump: %s" % (job_id,jobState, yacs_dump_success))
370         pass
371
372       self.assertEqual(jobState, "FINISHED")
373
374       # Verify dumpState file is in the results
375       self.assertTrue(yacs_dump_success)
376       dump_file_path = os.path.join(job_params.result_directory,
377                                     "dumpState_mySchema.xml")
378       self.assertTrue(os.path.isfile(dump_file_path))
379
380       """
381       # Load the schema state from the dump file and verify the state of a node
382       import SALOMERuntime
383       SALOMERuntime.RuntimeSALOME_setRuntime(1)
384       import loader
385       schema = loader.YACSLoader().load(job_script_file)
386       stateParser = loader.stateParser()
387       sl = loader.stateLoader(stateParser, schema)
388       sl.parse(dump_file_path)
389       # 106 : "DONE" state code
390       self.assertEqual(106, schema.getChildByName("PyScript0").getEffectiveState())
391       """
392
393       # getJobResults to default directory (result_directory)
394       launcher.getJobResults(job_id, "")
395       self.verifyFile(os.path.join(job_params.result_directory, "result.txt"),
396                       "expected")
397
398   ##############################
399   # test of yacs job type using "--init_port" driver option
400   ##############################
401   def test_yacsopt(self):
402     yacs_path = os.getenv("YACS_ROOT_DIR", "")
403     if not os.path.isdir(yacs_path):
404       self.skipTest("Needs YACS module to run. Please define YACS_ROOT_DIR.")
405
406     case_test_dir = os.path.join(TestCompo.test_dir, "yacs_opt")
407     mkdir_p(case_test_dir)
408
409     # job script
410     script_text = """<?xml version='1.0' encoding='iso-8859-1' ?>
411 <proc name="myschema">
412    <type name="string" kind="string"/>
413    <type name="bool" kind="bool"/>
414    <type name="double" kind="double"/>
415    <type name="int" kind="int"/>
416    <container name="DefaultContainer">
417       <property name="container_kind" value="Salome"/>
418       <property name="attached_on_cloning" value="0"/>
419       <property name="container_name" value="FactoryServer"/>
420       <property name="name" value="localhost"/>
421    </container>
422    <inline name="mynode">
423       <script><code><![CDATA[
424 text_result = "i=%s,d=%s,b=%s,s=%s" % (i,d,b,s)
425 f = open('result.txt', 'w')
426 f.write(text_result)
427 f.close()
428 ]]></code></script>
429       <load container="DefaultContainer"/>
430       <inport name="i" type="int"/>
431       <inport name="d" type="double"/>
432       <inport name="b" type="bool"/>
433       <inport name="s" type="string"/>
434    </inline>
435 </proc>
436 """
437     yacs_file = "simpleSchema.xml"
438     job_script_file = os.path.join(case_test_dir, yacs_file)
439     f = open(job_script_file, "w")
440     f.write(script_text)
441     f.close()
442
443     local_result_dir = os.path.join(case_test_dir, "result_yacsopt_job-")
444     job_params = self.create_JobParameters()
445     job_params.job_type = "yacs_file"
446     job_params.job_file = job_script_file
447     job_params.out_files = ["result.txt"]
448
449     # define the interval between two YACS schema dumps (3 seconds)
450     #import Engines
451     #job_params.specific_parameters = [Engines.Parameter("YACSDriverOptions",
452     #           "-imynode.i=5 -imynode.d=3.7 -imynode.b=False -imynode.s=lili")]
453     job_params.specific_parameters = {"YACSDriverOptions":
454                  "-imynode.i=5 -imynode.d=3.7 -imynode.b=False -imynode.s=lili"}
455     expected_result="i=5,d=3.7,b=False,s=lili"
456
457     launcher = createLauncher()
458     resManager= createResourcesManager()
459
460     for resource in self.ressources:
461       print("Testing yacs job with options on ", resource)
462       job_params.result_directory = local_result_dir + resource
463       job_params.job_name = "YacsJobOpt_" + resource
464       job_params.resource_required.name = resource
465
466       # use the working directory of the resource
467       resParams = resManager.GetResourceDefinition(resource)
468       wd = os.path.join(resParams.working_directory,
469                         "YacsJobOpt" + self.suffix)
470       job_params.work_directory = wd
471
472       job_id = launcher.createJob(job_params)
473       launcher.launchJob(job_id)
474       jobState = launcher.getJobState(job_id)
475
476       yacs_dump_success = False
477       print("Job %d state: %s" % (job_id,jobState))
478       while jobState != "FINISHED" and jobState != "FAILED" :
479         time.sleep(5)
480         jobState = launcher.getJobState(job_id)
481         print("Job %d state: %s " % (job_id,jobState))
482         pass
483
484       self.assertEqual(jobState, "FINISHED")
485
486       # getJobResults to default directory (result_directory)
487       launcher.getJobResults(job_id, "")
488       self.verifyFile(os.path.join(job_params.result_directory, "result.txt"),
489                       expected_result)
490
491   ############################################
492   # test of command job type with pre_command
493   ############################################
494   def test_command_pre(self):
495     case_test_dir = os.path.join(TestCompo.test_dir, "command_pre")
496     mkdir_p(case_test_dir)
497
498     # command to be run before the job
499     pre_command = "pre_command.sh"
500     pre_command_text = "echo 'it works!' > in.txt"
501     abs_pre_command_file = os.path.join(case_test_dir, pre_command)
502     f = open(abs_pre_command_file, "w")
503     f.write(pre_command_text)
504     f.close()
505     os.chmod(abs_pre_command_file, 0o755)
506     
507     # job script
508     script_file = "myTestScript.py"
509     script_text = """#! /usr/bin/env python3
510 # -*- coding: utf-8 -*-
511
512 in_f = open("in.txt", "r")
513 in_text = in_f.read()
514 in_f.close()
515
516 f = open('result.txt', 'w')
517 f.write(in_text)
518 f.close()
519 """
520     abs_script_file = os.path.join(case_test_dir, script_file)
521     f = open(abs_script_file, "w")
522     f.write(script_text)
523     f.close()
524     os.chmod(abs_script_file, 0o755)
525
526     # job params
527     local_result_dir = os.path.join(case_test_dir, "result_com_pre_job-")
528     job_params = self.create_JobParameters()
529     job_params.job_type = "command"
530     job_params.job_file = script_file
531     job_params.pre_command = pre_command
532     job_params.in_files = []
533     job_params.out_files = ["result.txt"]
534     job_params.local_directory = case_test_dir
535
536     # create and launch the job
537     launcher = createLauncher()
538     resManager= createResourcesManager()
539
540     for resource in self.ressources:
541       print("Testing command job on ", resource)
542       job_params.result_directory = local_result_dir + resource
543       job_params.job_name = "CommandPreJob_" + resource
544       job_params.resource_required.name = resource
545
546       # use the working directory of the resource
547       resParams = resManager.GetResourceDefinition(resource)
548       wd = os.path.join(resParams.working_directory,
549                         "CommandPreJob" + self.suffix)
550       job_params.work_directory = wd
551
552       job_id = launcher.createJob(job_params)
553       launcher.launchJob(job_id)
554       # wait for the end of the job
555       jobState = launcher.getJobState(job_id)
556       print("Job %d state: %s" % (job_id,jobState))
557       while jobState != "FINISHED" and jobState != "FAILED" :
558         time.sleep(3)
559         jobState = launcher.getJobState(job_id)
560         print("Job %d state: %s" % (job_id,jobState))
561         pass
562
563       # verify the results
564       self.assertEqual(jobState, "FINISHED")
565       launcher.getJobResults(job_id, "")
566       self.verifyFile(os.path.join(job_params.result_directory, "result.txt"),
567                       "it works!\n")
568
569   #################################
570   # test of command salome job type
571   #################################
572   def test_command_salome(self):
573     case_test_dir = os.path.join(TestCompo.test_dir, "command_salome")
574     mkdir_p(case_test_dir)
575
576     # job script
577     data_file = "in.txt"
578     script_file = "myEnvScript.py"
579     script_text = """#! /usr/bin/env python3
580 # -*- coding: utf-8 -*-
581
582 import os,sys
583 # verify import salome
584 import salome
585
586 text_result = os.getenv("ENV_TEST_VAR","")
587
588 f = open('result.txt', 'w')
589 f.write(text_result)
590 f.close()
591
592 in_f = open("in.txt", "r")
593 in_text = in_f.read()
594 in_f.close()
595
596 os.mkdir("copie")
597 f = open(os.path.join("copie",'copie.txt'), 'w')
598 f.write(in_text)
599 f.close()
600 """
601     abs_script_file = os.path.join(case_test_dir, script_file)
602     f = open(abs_script_file, "w")
603     f.write(script_text)
604     f.close()
605     os.chmod(abs_script_file, 0o755)
606
607     #environment script
608     env_file = "myEnv.sh"
609     env_text = """export ENV_TEST_VAR="expected"
610 """
611     f = open(os.path.join(case_test_dir, env_file), "w")
612     f.write(env_text)
613     f.close()
614
615     # write data file
616     f = open(os.path.join(case_test_dir, data_file), "w")
617     f.write("to be copied")
618     f.close()
619
620     # job params
621     local_result_dir = os.path.join(case_test_dir, "result_comsalome_job-")
622     job_params = self.create_JobParameters()
623     job_params.job_type = "command_salome"
624     job_params.job_file = script_file
625     job_params.env_file = env_file
626     job_params.in_files = [data_file]
627     job_params.out_files = ["result.txt", "copie"]
628     job_params.local_directory = case_test_dir
629
630     # create and launch the job
631     launcher = createLauncher()
632     resManager= createResourcesManager()
633
634     for resource in self.ressources:
635       print("Testing command salome job on ", resource)
636       job_params.result_directory = local_result_dir + resource
637       job_params.job_name = "CommandSalomeJob_" + resource
638       job_params.resource_required.name = resource
639
640       # use the working directory of the resource
641       resParams = resManager.GetResourceDefinition(resource)
642       wd = os.path.join(resParams.working_directory,
643                         "CommandSalomeJob" + self.suffix)
644       job_params.work_directory = wd
645
646       job_id = launcher.createJob(job_params)
647       launcher.launchJob(job_id)
648       # wait for the end of the job
649       jobState = launcher.getJobState(job_id)
650       print("Job %d state: %s" % (job_id,jobState))
651       while jobState != "FINISHED" and jobState != "FAILED" :
652         time.sleep(3)
653         jobState = launcher.getJobState(job_id)
654         print("Job %d state: %s" % (job_id,jobState))
655         pass
656
657       # verify the results
658       self.assertEqual(jobState, "FINISHED")
659       launcher.getJobResults(job_id, "")
660       self.verifyFile(os.path.join(job_params.result_directory, "result.txt"),
661                       "expected")
662       self.verifyFile(os.path.join(job_params.result_directory,
663                                    "copie",'copie.txt'),
664                       "to be copied")
665
666       # verify getJobWorkFile
667       mydir = os.path.join(case_test_dir, "work_dir" + resource)
668       success = launcher.getJobWorkFile(job_id, "result.txt", mydir)
669       self.assertEqual(success, True)
670       self.verifyFile(os.path.join(mydir, "result.txt"), "expected")
671
672       success = launcher.getJobWorkFile(job_id, "copie", mydir)
673       self.assertEqual(success, True)
674       self.verifyFile(os.path.join(mydir, "copie", "copie.txt"),
675                       "to be copied")
676       pass
677     pass
678   pass
679
680 if __name__ == '__main__':
681     # create study
682     unittest.main()