Salome HOME
Work in progress.
[tools/ydefx.git] / src / pydefx / multijob / executor.py
1 import pydefx
2 import os
3 import pickle
4 import time
5 import traceback
6
7 class Context:
8   def __init__(self):
9     self.launcher = pydefx.salome_proxy.getLauncher() # getLauncher()
10   pass
11
12 class JobExecutor:
13   def __init__(self, config):
14     self.config = config
15
16   def initialize(self):
17     """ This is executed before the first evaluation.
18     Put here global processing needed by all the evaluations like the copy of
19     commun files.
20     """
21     # Copy the commun files to the root work directory
22     params = pydefx.Parameters() # global parameters
23     params.loadDict(self.config["params"])
24     # use a fake empty command.
25     # Using launcher to copy some files on the remote file system,
26     # without launching a job.
27     command = os.path.join(os.getcwd(), "empty.sh")
28     open(command, "w").close()
29     params.salome_parameters.job_file = command
30     params.salome_parameters.job_type = "command"
31     study_module = os.path.join(os.getcwd(), self.config["studymodule"]+".py")
32     infiles = list(params.salome_parameters.in_files)
33     params.salome_parameters.in_files = infiles + [study_module]
34     launcher = pydefx.salome_proxy.getLauncher()
35     job_id = launcher.createJob(params.salome_parameters)
36     launcher.exportInputFiles(job_id)
37
38   def evaluate(self, idx, point):
39     """ This is executed for every point to be evaluated.
40     """
41     context = Context()
42     error = None
43     out_values = None
44     try:
45       self.prepare(idx, point, context)
46       if self.noRunFound(idx, point, context):
47         self.runjob(idx, point, context)
48       error, out_values = self.getResult(context)
49     except Exception as e:
50       error = str(e)
51       traceback.print_exc()
52     return error, out_values
53   
54   def prepare(self, idx, point, context):
55     """
56     Define local and remote work directory.
57     Define job script.
58     """
59     context.params = pydefx.Parameters()
60     context.params.loadDict(self.config["params"])
61     salome_parameters = context.params.salome_parameters
62     root_local_dir = salome_parameters.result_directory
63     root_remote_dir = salome_parameters.work_directory
64     input_files = [] # commun files are already copied to the root directory
65     point_name = "job_"+str(idx)
66     context.local_dir = os.path.join(root_local_dir, point_name)
67     point_remote_dir = os.path.join(root_remote_dir, point_name)
68     if not os.path.exists(context.local_dir):
69       os.mkdir(context.local_dir)
70     # export the point to a file
71     data_file_name = "idefixdata.csv"
72     data_file_path = os.path.join(context.local_dir, data_file_name)
73     with open(data_file_path, "w") as f:
74       # explicit dict convertion is needed for compatibility between python versions
75       f.write(repr(dict(point)))
76     input_files.append(data_file_path)
77
78     #command_path = os.path.join(root_local_dir, "command.py")
79     #salome_parameters.job_type = "command_salome"
80     #salome_parameters.job_file = command_path
81
82     salome_parameters.in_files = input_files
83     salome_parameters.out_files = ["idefixresult.txt", "idefixerror.txt"]
84     salome_parameters.work_directory = point_remote_dir
85     salome_parameters.result_directory = context.local_dir
86   
87   def noRunFound(self, idx, point, context):
88     return True
89   
90   def runjob(self, idx, point, context):
91     """
92     Create, launch and wait for the end of the job.
93     """
94     import random
95     sleep_delay = random.randint(5, 15) #10
96     #launcher = pydefx.salome_proxy.getLauncher()
97     launcher = context.launcher
98     context.job_id = launcher.createJob(context.params.salome_parameters)
99     launcher.launchJob(context.job_id)
100     jobState = launcher.getJobState(context.job_id)
101     while jobState=="QUEUED" or jobState=="IN_PROCESS" or jobState=="RUNNING" :
102       time.sleep(sleep_delay)
103       jobState = launcher.getJobState(context.job_id)
104
105   def getResult(self, context):
106     """
107     Check the job state, fetch the result file.
108     """
109     #launcher = pydefx.salome_proxy.getLauncher()
110     launcher = context.launcher
111     jobState = launcher.getJobState(context.job_id)
112     error=""
113     result=None
114     if jobState != "FINISHED" :
115       error = "Job has not finished correctly."
116     else:
117       launcher.getJobResults(context.job_id, "")
118       error_file = os.path.join(context.local_dir, "idefixerror.txt")
119       result_file = os.path.join(context.local_dir, "idefixresult.txt")
120       with open(error_file, "r") as f:
121         error = f.read()
122       with open(result_file, "r") as f:
123         result_str = f.read()
124         result = eval(result_str)
125
126     return error, result
127
128 def createExecutor(config):
129   return JobExecutor(config)