Salome HOME
Merge branch 'omu/multijob'
[tools/ydefx.git] / src / pydefx / plugins / jobexecutor.py
1 import pydefx
2 import os
3 import pickle
4 import time
5 import traceback
6
7 pydefx.forceNoSalomeServers()
8 class Context:
9   def __init__(self):
10     self.launcher = pydefx.salome_proxy.getLauncher() # getLauncher()
11   pass
12
13 class JobExecutor:
14   def __init__(self, config):
15     self.config = config
16
17   def initialize(self):
18     """ This is executed before the first evaluation.
19     Put here global processing needed by all the evaluations like the copy of
20     commun files.
21     """
22     # Copy the commun files to the root work directory
23     params = pydefx.Parameters() # global parameters
24     params.loadDict(self.config["params"])
25     # use a fake empty command.
26     # Using launcher to copy some files on the remote file system,
27     # without launching a job.
28     command = os.path.join(os.getcwd(), "empty.sh")
29     open(command, "w").close()
30     params.salome_parameters.job_file = command
31     params.salome_parameters.job_type = "command"
32     study_module = os.path.join(os.getcwd(), self.config["studymodule"]+".py")
33     infiles = list(params.salome_parameters.in_files)
34     params.salome_parameters.in_files = infiles + [study_module]
35     launcher = pydefx.salome_proxy.getLauncher()
36     job_id = launcher.createJob(params.salome_parameters)
37     launcher.exportInputFiles(job_id)
38
39   def evaluate(self, idx, point):
40     """ This is executed for every point to be evaluated.
41     """
42     context = Context()
43     error = None
44     out_values = None
45     try:
46       self.prepare(idx, point, context)
47       if self.noRunFound(idx, point, context):
48         self.runjob(idx, point, context)
49       error, out_values = self.getResult(context)
50     except Exception as e:
51       error = str(e)
52       traceback.print_exc()
53     return error, out_values
54
55   def prepare(self, idx, point, context):
56     """
57     Define local and remote work directory.
58     Define job script.
59     """
60     context.params = pydefx.Parameters()
61     context.params.loadDict(self.config["params"])
62     salome_parameters = context.params.salome_parameters
63     root_local_dir = salome_parameters.result_directory
64     root_remote_dir = salome_parameters.work_directory
65     input_files = [] # commun files are already copied to the root directory
66     point_name = "job_"+str(idx)
67     context.local_dir = os.path.join(root_local_dir, point_name)
68     point_remote_dir = os.path.join(root_remote_dir, point_name)
69     if not os.path.exists(context.local_dir):
70       os.mkdir(context.local_dir)
71     # export the point to a file
72     data_file_name = "idefixdata.csv"
73     data_file_path = os.path.join(context.local_dir, data_file_name)
74     with open(data_file_path, "w") as f:
75       # explicit dict convertion is needed for compatibility between python versions
76       f.write(repr(dict(point)))
77     input_files.append(data_file_path)
78
79     #command_path = os.path.join(root_local_dir, "command.py")
80     #salome_parameters.job_type = "command_salome"
81     #salome_parameters.job_file = command_path
82
83     salome_parameters.in_files = input_files
84     salome_parameters.out_files = ["idefixresult.txt", "idefixerror.txt"]
85     salome_parameters.work_directory = point_remote_dir
86     salome_parameters.result_directory = context.local_dir
87
88   def noRunFound(self, idx, point, context):
89     return True
90
91   def runjob(self, idx, point, context):
92     """
93     Create, launch and wait for the end of the job.
94     """
95     import random
96     sleep_delay = random.randint(5, 15) #10
97     #launcher = pydefx.salome_proxy.getLauncher()
98     launcher = context.launcher
99     context.job_id = launcher.createJob(context.params.salome_parameters)
100     launcher.launchJob(context.job_id)
101     jobState = launcher.getJobState(context.job_id)
102     while jobState=="QUEUED" or jobState=="IN_PROCESS" or jobState=="RUNNING" :
103       time.sleep(sleep_delay)
104       jobState = launcher.getJobState(context.job_id)
105
106   def getResult(self, context):
107     """
108     Check the job state, fetch the result file.
109     """
110     #launcher = pydefx.salome_proxy.getLauncher()
111     launcher = context.launcher
112     jobState = launcher.getJobState(context.job_id)
113     error=""
114     result=None
115     if jobState != "FINISHED" :
116       error = "Job has not finished correctly."
117     else:
118       launcher.getJobResults(context.job_id, "")
119       error_file = os.path.join(context.local_dir, "idefixerror.txt")
120       result_file = os.path.join(context.local_dir, "idefixresult.txt")
121       with open(error_file, "r") as f:
122         error = f.read()
123       with open(result_file, "r") as f:
124         result_str = f.read()
125         result = eval(result_str)
126
127     return error, result
128
129 def createExecutor(config):
130   return JobExecutor(config)