1 #-*-coding:iso-8859-1-*-
8 from daCore.AssimilationStudy import AssimilationStudy
9 from daYacsIntegration import daStudy
13 def __init__(self, optim_algo):
14 self.optim_algo = optim_algo
17 self.sample_counter = 0
18 self.counter_lock = threading.Lock()
20 def Direct(self, X, sync = 1):
21 print "Call Direct OptimizerHooks"
23 # 1: Get a unique sample number
24 self.counter_lock.acquire()
25 self.sample_counter += 1
26 local_counter = self.sample_counter
28 # 2: Put sample in the job pool
30 computation["method"] = "Direct"
31 computation["data"] = X
32 computation = pickle.dumps(computation)
33 self.optim_algo.pool.pushInSample(local_counter, computation)
38 self.optim_algo.signalMasterAndWait()
40 if self.optim_algo.isTerminationRequested():
41 self.optim_algo.pool.destroyAll()
45 sample_id = self.optim_algo.pool.getCurrentId()
46 if sample_id == local_counter:
48 matrix_from_pool = self.optim_algo.pool.getOutSample(local_counter).getStringValue()
51 # Have to be done before but need a new implementation
52 # of the optimizer loop
53 self.counter_lock.release()
56 Y = pickle.loads(matrix_from_pool)
59 print "sync false is not yet implemented"
60 raise daStudy.daError("sync == false not yet implemented")
62 def Tangent(self, X, sync = 1):
63 print "Call Tangent OptimizerHooks"
65 # 1: Get a unique sample number
66 self.counter_lock.acquire()
67 self.sample_counter += 1
68 local_counter = self.sample_counter
70 # 2: Put sample in the job pool
72 computation["method"] = "Tangent"
73 computation["data"] = X
74 computation = pickle.dumps(computation)
75 self.optim_algo.pool.pushInSample(local_counter, computation)
79 self.optim_algo.signalMasterAndWait()
80 if self.optim_algo.isTerminationRequested():
81 self.optim_algo.pool.destroyAll()
85 sample_id = self.optim_algo.pool.getCurrentId()
86 if sample_id == local_counter:
88 matrix_from_pool = self.optim_algo.pool.getOutSample(local_counter).getStringValue()
91 # Have to be done before but need a new implementation
92 # of the optimizer loop
93 self.counter_lock.release()
96 Y = pickle.loads(matrix_from_pool)
99 print "sync false is not yet implemented"
100 raise daStudy.daError("sync == false not yet implemented")
102 def Adjoint(self, (X, Y), sync = 1):
103 print "Call Adjoint OptimizerHooks"
105 # 1: Get a unique sample number
106 self.counter_lock.acquire()
107 self.sample_counter += 1
108 local_counter = self.sample_counter
110 # 2: Put sample in the job pool
112 computation["method"] = "Adjoint"
113 computation["data"] = (X, Y)
114 computation = pickle.dumps(computation)
115 self.optim_algo.pool.pushInSample(local_counter, computation)
120 self.optim_algo.signalMasterAndWait()
122 if self.optim_algo.isTerminationRequested():
123 self.optim_algo.pool.destroyAll()
127 sample_id = self.optim_algo.pool.getCurrentId()
128 if sample_id == local_counter:
130 matrix_from_pool = self.optim_algo.pool.getOutSample(local_counter).getStringValue()
133 # Have to be done before but need a new implementation
134 # of the optimizer loop
135 self.counter_lock.release()
138 Z = pickle.loads(matrix_from_pool)
141 print "sync false is not yet implemented"
142 raise daStudy.daError("sync == false not yet implemented")
144 class AssimilationAlgorithm_asynch(SALOMERuntime.OptimizerAlgASync):
147 SALOMERuntime.RuntimeSALOME_setRuntime()
148 SALOMERuntime.OptimizerAlgASync.__init__(self, None)
149 self.runtime = SALOMERuntime.getSALOMERuntime()
151 # Definission des types d'entres et de sorties pour le code de calcul
152 self.tin = self.runtime.getTypeCode("pyobj")
153 self.tout = self.runtime.getTypeCode("pyobj")
155 self.optim_hooks = OptimizerHooks(self)
157 # input vient du port algoinit, input est un Any YACS !
158 def initialize(self,input):
159 print "Algorithme initialize"
162 #print "[Debug] Input is ", input
163 str_da_study = input.getStringValue()
164 self.da_study = pickle.loads(str_da_study)
165 #print "[Debug] da_study is ", self.da_study
166 self.da_study.initAlgorithm()
167 self.ADD = self.da_study.getAssimilationStudy()
169 def startToTakeDecision(self):
170 print "Algorithme startToTakeDecision"
172 # Check if ObservationOperator is already set
173 if self.da_study.getObservationOperatorType("Direct") == "Function" or self.da_study.getObservationOperatorType("Tangent") == "Function" or self.da_study.getObservationOperatorType("Adjoint") == "Function" :
174 # Use proxy function for YACS
175 self.hooks = OptimizerHooks(self)
176 direct = tangent = adjoint = None
177 if self.da_study.getObservationOperatorType("Direct") == "Function":
178 direct = self.hooks.Direct
179 if self.da_study.getObservationOperatorType("Tangent") == "Function" :
180 tangent = self.hooks.Tangent
181 if self.da_study.getObservationOperatorType("Adjoint") == "Function" :
182 adjoint = self.hooks.Adjoint
184 # Set ObservationOperator
185 self.ADD.setObservationOperator(asFunction = {"Direct":direct, "Tangent":tangent, "Adjoint":adjoint})
188 # Start Assimilation Study
191 # Assimilation Study is finished
192 self.pool.destroyAll()
194 def getAlgoResult(self):
195 print "getAlgoResult"
196 self.ADD.prepare_to_pickle()
197 result = pickle.dumps(self.da_study)
202 print "Algorithme finish"
203 def parseFileToInit(self,fileName):
204 print "Algorithme parseFileToInit"
206 # Fonctions qui ne changent pas
207 def setPool(self,pool):
209 def getTCForIn(self):
211 def getTCForOut(self):
213 def getTCForAlgoInit(self):
215 def getTCForAlgoResult(self):