Salome HOME
correct previous integration (Porting to Python 2.6)
[modules/kernel.git] / src / DSC / ParallelDSC / Param_Double_Port_provides_i.cxx
1 //  Copyright (C) 2009  CEA/DEN, EDF R&D, OPEN CASCADE
2 //
3 //  This library is free software; you can redistribute it and/or
4 //  modify it under the terms of the GNU Lesser General Public
5 //  License as published by the Free Software Foundation; either
6 //  version 2.1 of the License.
7 //
8 //  This library is distributed in the hope that it will be useful,
9 //  but WITHOUT ANY WARRANTY; without even the implied warranty of
10 //  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
11 //  Lesser General Public License for more details.
12 //
13 //  You should have received a copy of the GNU Lesser General Public
14 //  License along with this library; if not, write to the Free Software
15 //  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
16 //
17 //  See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
18 //
19 //  File   : param_double_port_provides.cxx
20 //  Author : André RIBES (EDF)
21 //  Module : KERNEL
22
23 #include <iostream>
24 #include <string>
25 #include <sstream>
26
27 #include "Param_Double_Port_provides_i.hxx"
28
29 #include <paco_omni.h>
30 #include <paco_dummy.h>
31 #include <paco_mpi.h>
32 #include <paco_direct_comScheduling.h>
33 #include <GaBro.h>
34 #include <BasicBC.h>
35
36 Param_Double_Port_provides_i::Param_Double_Port_provides_i(CORBA::ORB_ptr orb, char * ior, int rank) :
37   Ports::Param_Double_Port_serv(orb,ior,rank),
38   Ports::Param_Double_Port_base_serv(orb,ior,rank),
39   Ports::Data_Port_serv(orb,ior,rank),
40   Ports::Data_Port_base_serv(orb,ior,rank),
41   Ports::Port_serv(orb,ior,rank),
42   Ports::Port_base_serv(orb,ior,rank),
43   InterfaceParallel_impl(orb,ior,rank)
44 {
45   _seq_data = NULL;
46
47   seq_data_termine = false;                 
48   seq_data_mutex = new pthread_mutex_t();
49   pthread_mutex_init(seq_data_mutex, NULL);
50   seq_data_condition = new pthread_cond_t();
51   pthread_cond_init(seq_data_condition, NULL);
52   seq_data_termine_cp = true;               
53   seq_data_mutex_cp = new pthread_mutex_t();
54   pthread_mutex_init(seq_data_mutex_cp, NULL);
55   seq_data_condition_cp = new pthread_cond_t();
56   pthread_cond_init(seq_data_condition_cp, NULL);
57
58   _seq_results = NULL;
59
60   seq_results_termine = false;              
61   seq_results_mutex = new pthread_mutex_t();
62   pthread_mutex_init(seq_results_mutex, NULL);
63   seq_results_condition = new pthread_cond_t();
64   pthread_cond_init(seq_results_condition, NULL);
65   seq_results_termine_cp = true;                    
66   seq_results_mutex_cp = new pthread_mutex_t();
67   pthread_mutex_init(seq_results_mutex_cp, NULL);
68   seq_results_condition_cp = new pthread_cond_t();
69   pthread_cond_init(seq_results_condition_cp, NULL);
70 }
71
72 Param_Double_Port_provides_i::~Param_Double_Port_provides_i() 
73 {
74   if (_seq_data)
75     delete _seq_data;
76
77   pthread_mutex_destroy(seq_data_mutex);
78   delete seq_data_mutex;
79   pthread_cond_destroy(seq_data_condition);
80   delete seq_data_condition;
81   pthread_mutex_destroy(seq_data_mutex_cp);
82   delete seq_data_mutex_cp;
83   pthread_cond_destroy(seq_data_condition_cp);
84   delete seq_data_condition_cp;
85
86   if (_seq_results)
87     delete _seq_results;
88
89   pthread_mutex_destroy(seq_results_mutex);
90   delete seq_results_mutex;
91   pthread_cond_destroy(seq_results_condition);
92   delete seq_results_condition;
93   pthread_mutex_destroy(seq_results_mutex_cp);
94   delete seq_results_mutex_cp;
95   pthread_cond_destroy(seq_results_condition_cp);
96   delete seq_results_condition_cp;
97 }
98
99 Param_Double_Port_provides_i *
100 Param_Double_Port_provides_i::init_port(Engines_ParallelDSC_i * par_compo, 
101                                         std::string port_name,
102                                         CORBA::ORB_ptr orb)
103 {
104   int rank = par_compo->getMyRank();
105   int totalNode = par_compo->getTotalNode();
106   paco_com * com = par_compo->getCom();
107
108   MESSAGE("Configuration of Param_Double_Port_provides: rank = " << rank << " totalNode = " << totalNode);
109
110   // DOIT ETRE DEJA FAIT AVANT !!!???
111   paco_fabrique_manager* pfm = paco_getFabriqueManager();
112   pfm->register_com("pdp_dummy", new paco_dummy_fabrique());
113   pfm->register_thread("pdp_thread", new paco_omni_fabrique());
114   pfm->register_comScheduling("pdp_direct", new paco_direct_fabrique());
115   pfm->register_distribution("pdp_GaBro", new GaBro_fab());
116   pfm->register_distribution("pdp_BasicBC", new BasicBC_fab());
117
118   Param_Double_Port_provides_i * port = NULL; 
119   Ports::Param_Double_Port_proxy_impl * proxy_node = NULL; 
120
121   std::cerr << "Creating Proxy" << std::endl;
122   if (rank == 0) {
123     // On commence par créer le proxy
124     // Il est enregistré dans le composant et sera détruit automatiquement
125     // lorsque le composant sera détruit
126     proxy_node = 
127       new Ports::Param_Double_Port_proxy_impl(CORBA::ORB::_duplicate(orb),
128                                               pfm->get_thread("pdp_thread"));
129     proxy_node->setLibCom("pdp_dummy", proxy_node);
130     proxy_node->setLibThread("pdp_thread");
131     PaCO::PacoTopology_t serveur_topo;
132     serveur_topo.total = totalNode;
133     proxy_node->setTopology(serveur_topo);
134
135     // Création de la propriété
136     PortProperties_i * proxy_node_properties = new PortProperties_i();
137
138     // Enregistrement du proxy
139     par_compo->add_parallel_provides_proxy_port(proxy_node->_this(), 
140                                                 port_name.c_str(),
141                                                 proxy_node_properties->_this());
142     proxy_node->_remove_ref();
143     proxy_node_properties->_remove_ref();
144   }
145   else {
146     par_compo->add_parallel_provides_proxy_wait(port_name.c_str());
147   }
148
149   std::cerr << "Getting proxy" << std::endl;
150   char * proxy_ior = (char * ) par_compo->get_proxy(port_name.c_str());
151   std::cerr << "Proxy ior is : " << proxy_ior << std::endl;
152
153   port = new Param_Double_Port_provides_i(CORBA::ORB::_duplicate(orb), proxy_ior, rank);
154   port->copyClientGlobalContext(par_compo);
155
156   // Il faut maintenant configurer les bibliothèques
157   // de redistributions de la fonction put
158   ParallelMethodContext * method_ptr = port->getParallelMethodContext("put");
159   method_ptr->setLibComScheduling("pdp_direct"); 
160   method_ptr->setDistLibArg("param_data", "pdp_BasicBC", "in");
161   BasicBC * dislib = (BasicBC *) method_ptr->getDistLibArg("param_data", "in");
162   dislib->setEltSize(sizeof(CORBA::Double));
163
164   // Il faut maintenant configurer les bibliothèques
165   // de redistributions de la fonction get_results
166   method_ptr = port->getParallelMethodContext("get_results");
167   method_ptr->setLibComScheduling("pdp_direct"); 
168   method_ptr->setDistLibArg("param_results", "pdp_GaBro", "out");
169   GaBro * dislib_gabro = (GaBro *) method_ptr->getDistLibArg("param_results", "out");
170   dislib_gabro->setEltSize(sizeof(CORBA::Double));
171
172   // Enregistement du port 
173   for (int i = 0; i < totalNode; i++) 
174   {
175     std::ostringstream node_number;
176     node_number << i;
177     std::string event_name("AddNode");
178     event_name += node_number.str();
179     std::string tag_name = proxy_ior;
180
181     if (i == rank) {
182       std::cerr << "Adding node of processor : " << i << std::endl;
183       par_compo->add_parallel_provides_node_port(Ports::Port_PaCO::_narrow(port->_this()), port_name.c_str());
184       port->_remove_ref();
185       par_compo->InterfaceParallel_impl::_proxy->send_event(event_name.c_str(), tag_name.c_str());
186     }
187
188     par_compo->wait_event(event_name.c_str(), tag_name.c_str());
189   }
190
191   // On démarre l'objet parallèle
192   std::string event_name("StartingProxy");
193   std::string tag_name = proxy_ior;
194   if (rank == 0) 
195   {
196     proxy_node->start();
197     par_compo->InterfaceParallel_impl::_proxy->send_event(event_name.c_str(), tag_name.c_str());
198   }
199
200   CORBA::string_free(proxy_ior);
201   return port;
202 }
203
204 void
205 Param_Double_Port_provides_i::wait_init_port(Engines_ParallelDSC_i * par_compo, 
206                                              std::string port_name,
207                                              CORBA::ORB_ptr orb)
208 {
209   int rank = par_compo->getMyRank();
210   int totalNode = par_compo->getTotalNode();
211   // Enregistement du port 
212   for (int i = 0; i < totalNode; i++) 
213   {
214     std::ostringstream node_number;
215     node_number << i;
216     std::string event_name("WaitingNode");
217     event_name += node_number.str();
218     char * proxy_ior = (char * ) par_compo->get_proxy(port_name.c_str());
219     std::string tag_name(proxy_ior);
220     CORBA::string_free(proxy_ior);
221     if (i == rank) 
222       par_compo->InterfaceParallel_impl::_proxy->send_event(event_name.c_str(), tag_name.c_str());
223     par_compo->wait_event(event_name.c_str(), tag_name.c_str());
224   }
225 }
226
227 void 
228 Param_Double_Port_provides_i::put(const Ports::Param_Double_Port::seq_double & param_data)
229 {
230
231   // On attend que le get soit fait
232   // Au départ seq_data_termine_cp = TRUE
233   pthread_mutex_lock(seq_data_mutex_cp);
234   while (seq_data_termine_cp == false)
235   {
236      pthread_cond_wait(seq_data_condition_cp, seq_data_mutex_cp);
237   }
238   seq_data_termine_cp = false;
239   pthread_mutex_unlock(seq_data_mutex_cp);
240
241   pthread_mutex_lock(seq_data_mutex);
242
243   // Création d'une nouvelle séquence
244   // Elle prend le buffer sans le copier
245   Ports::Param_Double_Port::seq_double * n_param_data = (Ports::Param_Double_Port::seq_double *) &param_data;
246   _seq_data = new Ports::Param_Double_Port::seq_double(n_param_data->length(), n_param_data->length(), n_param_data->get_buffer(1), 1);
247
248   seq_data_termine = true;
249   pthread_cond_signal(seq_data_condition);
250   pthread_mutex_unlock(seq_data_mutex);
251 }
252     
253 void 
254 Param_Double_Port_provides_i::get_results(Ports::Param_Double_Port::seq_double_out param_results)
255 {
256   pthread_mutex_lock(seq_results_mutex);
257   while (seq_results_termine == false)
258   {
259      pthread_cond_wait(seq_results_condition, seq_results_mutex);
260   }
261
262   // Création d'une nouvelle séquence
263   // Elle prend le buffer sans le copier
264   param_results = new Ports::Param_Double_Port::seq_double(_seq_results->length(), _seq_results->length(), _seq_results->get_buffer(1), 1);
265   delete _seq_results;
266   _seq_results = NULL;
267
268   seq_results_termine = false;
269   pthread_mutex_unlock(seq_results_mutex);
270
271   // On indique que l'on a copié la valeur
272   // Et donc que l'on peut recevoir une nouvelle valeur
273   pthread_mutex_lock(seq_results_mutex_cp);
274   seq_results_termine_cp = true;
275   pthread_cond_signal(seq_results_condition_cp);
276   pthread_mutex_unlock(seq_results_mutex_cp);
277 }
278
279 Ports::Param_Double_Port::seq_double *
280 Param_Double_Port_provides_i::get_data()
281 {
282   Ports::Param_Double_Port::seq_double * result = NULL;
283
284   pthread_mutex_lock(seq_data_mutex);
285   while (seq_data_termine == false)
286   {
287      pthread_cond_wait(seq_data_condition, seq_data_mutex);
288   }
289
290   // Création d'une nouvelle séquence
291   // Elle prend le buffer sans le copier
292   result = new Ports::Param_Double_Port::seq_double(_seq_data->length(), _seq_data->length(), _seq_data->get_buffer(1), 1);
293   delete _seq_data;
294   _seq_data = NULL;
295
296   seq_data_termine = false;
297   pthread_mutex_unlock(seq_data_mutex);
298
299   // On indique que l'on a copié la valeur
300   // Et donc que l'on peut recevoir une nouvelle valeur
301   pthread_mutex_lock(seq_data_mutex_cp);
302   seq_data_termine_cp = true;
303   pthread_cond_signal(seq_data_condition_cp);
304   pthread_mutex_unlock(seq_data_mutex_cp);
305   return result;
306 }
307
308 void
309 Param_Double_Port_provides_i::set_data(Ports::Param_Double_Port::seq_double * results)
310 {
311   // On attend que le get soit fait
312   // Au départ seq_results_termine_cp = TRUE
313   pthread_mutex_lock(seq_results_mutex_cp);
314   while (seq_results_termine_cp == false)
315   {
316      pthread_cond_wait(seq_results_condition_cp, seq_results_mutex_cp);
317   }
318   seq_results_termine_cp = false;
319   pthread_mutex_unlock(seq_results_mutex_cp);
320
321   pthread_mutex_lock(seq_results_mutex);
322
323   // Création d'une nouvelle séquence
324   // Elle prend le buffer sans le copier
325   _seq_results = new Ports::Param_Double_Port::seq_double(results->length(), results->length(), results->get_buffer(1), 1);
326
327   seq_results_termine = true;
328   pthread_cond_signal(seq_results_condition);
329   pthread_mutex_unlock(seq_results_mutex);
330 }
331
332 void 
333 Param_Double_Port_provides_i::configure_set_data(int data_length, int totalNbElt, int BeginEltPos)
334 {
335   // Configuration de la biblothèque de redistribution
336   // pour les données actuelles
337   ParallelMethodContext * method_ptr = getParallelMethodContext("get_results");
338   GaBro * dislib = (GaBro *) method_ptr->getDistLibArg("param_results", "out");
339   dislib->setNodeNbElt(data_length);
340   dislib->setTotalNbElt(totalNbElt);
341   dislib->setNodePos(BeginEltPos);
342 }