]> SALOME platform Git repositories - modules/adao.git/blob - src/daSalome/daYacsIntegration/daOptimizerLoop.py
Salome HOME
Ajout de AlgorithmParameters
[modules/adao.git] / src / daSalome / daYacsIntegration / daOptimizerLoop.py
1 #-*-coding:iso-8859-1-*-
2
3 import SALOMERuntime
4 import pickle
5 import numpy
6 import threading
7
8 from daCore.AssimilationStudy import AssimilationStudy
9 from daYacsIntegration import daStudy
10
11 class OptimizerHooks:
12
13   def __init__(self, optim_algo):
14     self.optim_algo = optim_algo
15
16     # Gestion du compteur
17     self.sample_counter = 0
18     self.counter_lock = threading.Lock()
19
20   def Direct(self, X, sync = 1):
21     print "Call Direct OptimizerHooks"
22     if sync == 1:
23       # 1: Get a unique sample number
24       self.counter_lock.acquire()
25       self.sample_counter += 1
26       local_counter = self.sample_counter
27
28       # 2: Put sample in the job pool
29       computation = {}
30       computation["method"] = "Direct"
31       computation["data"] = X
32       computation = pickle.dumps(computation)
33       self.optim_algo.pool.pushInSample(local_counter, computation)
34
35       # 3: Wait
36       while 1:
37         print "waiting"
38         self.optim_algo.signalMasterAndWait()
39         print "signal"
40         if self.optim_algo.isTerminationRequested():
41           self.optim_algo.pool.destroyAll()
42           return
43         else:
44           # Get current Id
45           sample_id = self.optim_algo.pool.getCurrentId()
46           if sample_id == local_counter:
47             # 4: Data is ready
48             matrix_from_pool = self.optim_algo.pool.getOutSample(local_counter).getStringValue()
49
50             # 5: Release lock
51             # Have to be done before but need a new implementation
52             # of the optimizer loop
53             self.counter_lock.release()
54
55             # 6: return results
56             Y = pickle.loads(matrix_from_pool)
57             return Y
58     else:
59       print "sync false is not yet implemented"
60       raise daStudy.daError("sync == false not yet implemented")
61
62   def Tangent(self, X, sync = 1):
63     print "Call Tangent OptimizerHooks"
64     if sync == 1:
65       # 1: Get a unique sample number
66       self.counter_lock.acquire()
67       self.sample_counter += 1
68       local_counter = self.sample_counter
69
70       # 2: Put sample in the job pool
71       computation = {}
72       computation["method"] = "Tangent"
73       computation["data"] = X
74       computation = pickle.dumps(computation)
75       self.optim_algo.pool.pushInSample(local_counter, computation)
76
77       # 3: Wait
78       while 1:
79         self.optim_algo.signalMasterAndWait()
80         if self.optim_algo.isTerminationRequested():
81           self.optim_algo.pool.destroyAll()
82           return
83         else:
84           # Get current Id
85           sample_id = self.optim_algo.pool.getCurrentId()
86           if sample_id == local_counter:
87             # 4: Data is ready
88             matrix_from_pool = self.optim_algo.pool.getOutSample(local_counter).getStringValue()
89
90             # 5: Release lock
91             # Have to be done before but need a new implementation
92             # of the optimizer loop
93             self.counter_lock.release()
94
95             # 6: return results
96             Y = pickle.loads(matrix_from_pool)
97             return Y
98     else:
99       print "sync false is not yet implemented"
100       raise daStudy.daError("sync == false not yet implemented")
101
102   def Adjoint(self, (X, Y), sync = 1):
103     print "Call Adjoint OptimizerHooks"
104     if sync == 1:
105       # 1: Get a unique sample number
106       self.counter_lock.acquire()
107       self.sample_counter += 1
108       local_counter = self.sample_counter
109
110       # 2: Put sample in the job pool
111       computation = {}
112       computation["method"] = "Adjoint"
113       computation["data"] = (X, Y)
114       computation = pickle.dumps(computation)
115       self.optim_algo.pool.pushInSample(local_counter, computation)
116
117       # 3: Wait
118       while 1:
119         print "waiting"
120         self.optim_algo.signalMasterAndWait()
121         print "signal"
122         if self.optim_algo.isTerminationRequested():
123           self.optim_algo.pool.destroyAll()
124           return
125         else:
126           # Get current Id
127           sample_id = self.optim_algo.pool.getCurrentId()
128           if sample_id == local_counter:
129             # 4: Data is ready
130             matrix_from_pool = self.optim_algo.pool.getOutSample(local_counter).getStringValue()
131
132             # 5: Release lock
133             # Have to be done before but need a new implementation
134             # of the optimizer loop
135             self.counter_lock.release()
136
137             # 6: return results
138             Z = pickle.loads(matrix_from_pool)
139             return Z
140     else:
141       print "sync false is not yet implemented"
142       raise daStudy.daError("sync == false not yet implemented")
143
144 class AssimilationAlgorithm_asynch(SALOMERuntime.OptimizerAlgASync):
145
146   def __init__(self):
147     SALOMERuntime.RuntimeSALOME_setRuntime()
148     SALOMERuntime.OptimizerAlgASync.__init__(self, None)
149     self.runtime = SALOMERuntime.getSALOMERuntime()
150
151     # Definission des types d'entres et de sorties pour le code de calcul
152     self.tin  = self.runtime.getTypeCode("pyobj")
153     self.tout = self.runtime.getTypeCode("pyobj")
154
155     self.optim_hooks = OptimizerHooks(self)
156
157   # input vient du port algoinit, input est un Any YACS !
158   def initialize(self,input):
159     print "Algorithme initialize"
160
161     # get the daStudy
162     #print "[Debug] Input is ", input
163     str_da_study = input.getStringValue()
164     self.da_study = pickle.loads(str_da_study)
165     #print "[Debug] da_study is ", self.da_study
166     self.da_study.initAlgorithm()
167     self.ADD = self.da_study.getAssimilationStudy()
168
169   def startToTakeDecision(self):
170     print "Algorithme startToTakeDecision"
171
172     # Check if ObservationOperator is already set
173     if self.da_study.getObservationOperatorType("Direct") == "Function" or self.da_study.getObservationOperatorType("Tangent") == "Function" or self.da_study.getObservationOperatorType("Adjoint") == "Function" :
174       # Use proxy function for YACS
175       self.hooks = OptimizerHooks(self)
176       direct = tangent = adjoint = None
177       if self.da_study.getObservationOperatorType("Direct") == "Function":
178         direct = self.hooks.Direct
179       if self.da_study.getObservationOperatorType("Tangent") == "Function" :
180         tangent = self.hooks.Tangent
181       if self.da_study.getObservationOperatorType("Adjoint") == "Function" :
182         adjoint = self.hooks.Adjoint
183
184       # Set ObservationOperator
185       self.ADD.setObservationOperator(asFunction = {"Direct":direct, "Tangent":tangent, "Adjoint":adjoint})
186
187
188     # Start Assimilation Study
189     self.ADD.analyze()
190
191     # Assimilation Study is finished
192     self.pool.destroyAll()
193
194   def getAlgoResult(self):
195     print "getAlgoResult"
196     self.ADD.prepare_to_pickle()
197     result = pickle.dumps(self.da_study)
198     return result
199
200   # Obligatoire ???
201   def finish(self):
202     print "Algorithme finish"
203   def parseFileToInit(self,fileName):
204     print "Algorithme parseFileToInit"
205
206   # Fonctions qui ne changent pas
207   def setPool(self,pool):
208     self.pool=pool
209   def getTCForIn(self):
210     return self.tin
211   def getTCForOut(self):
212     return self.tout
213   def getTCForAlgoInit(self):
214     return self.tin
215   def getTCForAlgoResult(self):
216     return self.tout
217