Salome HOME
08d001c83cd96d523921397363ebc1b916a4fe6c
[modules/kernel.git] / src / DSC / ParallelDSC / Param_Double_Port_provides_i.cxx
1 // Copyright (C) 2007-2024  CEA, EDF, OPEN CASCADE
2 //
3 // This library is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU Lesser General Public
5 // License as published by the Free Software Foundation; either
6 // version 2.1 of the License, or (at your option) any later version.
7 //
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
11 // Lesser General Public License for more details.
12 //
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
16 //
17 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
18 //
19
20 //  File   : param_double_port_provides.cxx
21 //  Author : André RIBES (EDF)
22 //  Module : KERNEL
23 //
24 #include <iostream>
25 #include <string>
26 #include <sstream>
27
28 #include "Param_Double_Port_provides_i.hxx"
29
30 #include <paco_omni.h>
31 #include <paco_dummy.h>
32 #include <paco_mpi.h>
33 #include <paco_direct_comScheduling.h>
34 #include <GaBro.h>
35 #include <BasicBC.h>
36
37 Param_Double_Port_provides_i::Param_Double_Port_provides_i(CORBA::ORB_ptr orb, char * ior, int rank) :
38   Ports::Param_Double_Port_serv(orb,ior,rank),
39   Ports::Param_Double_Port_base_serv(orb,ior,rank),
40   Ports::Data_Port_serv(orb,ior,rank),
41   Ports::Data_Port_base_serv(orb,ior,rank),
42   Ports::Port_serv(orb,ior,rank),
43   Ports::Port_base_serv(orb,ior,rank),
44   InterfaceParallel_impl(orb,ior,rank)
45 {
46   _seq_data = NULL;
47
48   seq_data_termine = false;                 
49   seq_data_mutex = new pthread_mutex_t();
50   pthread_mutex_init(seq_data_mutex, NULL);
51   seq_data_condition = new pthread_cond_t();
52   pthread_cond_init(seq_data_condition, NULL);
53   seq_data_termine_cp = true;               
54   seq_data_mutex_cp = new pthread_mutex_t();
55   pthread_mutex_init(seq_data_mutex_cp, NULL);
56   seq_data_condition_cp = new pthread_cond_t();
57   pthread_cond_init(seq_data_condition_cp, NULL);
58
59   _seq_results = NULL;
60
61   seq_results_termine = false;              
62   seq_results_mutex = new pthread_mutex_t();
63   pthread_mutex_init(seq_results_mutex, NULL);
64   seq_results_condition = new pthread_cond_t();
65   pthread_cond_init(seq_results_condition, NULL);
66   seq_results_termine_cp = true;                    
67   seq_results_mutex_cp = new pthread_mutex_t();
68   pthread_mutex_init(seq_results_mutex_cp, NULL);
69   seq_results_condition_cp = new pthread_cond_t();
70   pthread_cond_init(seq_results_condition_cp, NULL);
71 }
72
73 Param_Double_Port_provides_i::~Param_Double_Port_provides_i() 
74 {
75   if (_seq_data)
76     delete _seq_data;
77
78   pthread_mutex_destroy(seq_data_mutex);
79   delete seq_data_mutex;
80   pthread_cond_destroy(seq_data_condition);
81   delete seq_data_condition;
82   pthread_mutex_destroy(seq_data_mutex_cp);
83   delete seq_data_mutex_cp;
84   pthread_cond_destroy(seq_data_condition_cp);
85   delete seq_data_condition_cp;
86
87   if (_seq_results)
88     delete _seq_results;
89
90   pthread_mutex_destroy(seq_results_mutex);
91   delete seq_results_mutex;
92   pthread_cond_destroy(seq_results_condition);
93   delete seq_results_condition;
94   pthread_mutex_destroy(seq_results_mutex_cp);
95   delete seq_results_mutex_cp;
96   pthread_cond_destroy(seq_results_condition_cp);
97   delete seq_results_condition_cp;
98 }
99
100 Param_Double_Port_provides_i *
101 Param_Double_Port_provides_i::init_port(Engines_ParallelDSC_i * par_compo, 
102                                         std::string port_name,
103                                         CORBA::ORB_ptr orb)
104 {
105   int rank = par_compo->getMyRank();
106   int totalNode = par_compo->getTotalNode();
107   paco_com * com = par_compo->getCom();
108
109   MESSAGE("Configuration of Param_Double_Port_provides: rank = " << rank << " totalNode = " << totalNode);
110
111   // DOIT ETRE DEJA FAIT AVANT !!!???
112   paco_fabrique_manager* pfm = paco_getFabriqueManager();
113   pfm->register_com("pdp_dummy", new paco_dummy_fabrique());
114   pfm->register_thread("pdp_thread", new paco_omni_fabrique());
115   pfm->register_comScheduling("pdp_direct", new paco_direct_fabrique());
116   pfm->register_distribution("pdp_GaBro", new GaBro_fab());
117   pfm->register_distribution("pdp_BasicBC", new BasicBC_fab());
118
119   Param_Double_Port_provides_i * port = NULL; 
120   Ports::Param_Double_Port_proxy_impl * proxy_node = NULL; 
121
122   std::cerr << "Creating Proxy" << std::endl;
123   if (rank == 0) {
124     // On commence par créer le proxy
125     // Il est enregistré dans le composant et sera détruit automatiquement
126     // lorsque le composant sera détruit
127     proxy_node = 
128       new Ports::Param_Double_Port_proxy_impl(CORBA::ORB::_duplicate(orb),
129                                               pfm->get_thread("pdp_thread"));
130     proxy_node->setLibCom("pdp_dummy", proxy_node);
131     proxy_node->setLibThread("pdp_thread");
132     PaCO::PacoTopology_t serveur_topo;
133     serveur_topo.total = totalNode;
134     proxy_node->setTopology(serveur_topo);
135
136     // Création de la propriété
137     PortProperties_i * proxy_node_properties = new PortProperties_i();
138
139     // Enregistrement du proxy
140     par_compo->add_parallel_provides_proxy_port(proxy_node->_this(), 
141                                                 port_name.c_str(),
142                                                 proxy_node_properties->_this());
143     proxy_node->_remove_ref();
144     proxy_node_properties->_remove_ref();
145   }
146   else {
147     par_compo->add_parallel_provides_proxy_wait(port_name.c_str());
148   }
149
150   std::cerr << "Getting proxy" << std::endl;
151   char * proxy_ior = (char * ) par_compo->get_proxy(port_name.c_str());
152   std::cerr << "Proxy ior is : " << proxy_ior << std::endl;
153
154   port = new Param_Double_Port_provides_i(CORBA::ORB::_duplicate(orb), proxy_ior, rank);
155   port->copyClientGlobalContext(par_compo);
156
157   // Il faut maintenant configurer les bibliothèques
158   // de redistributions de la fonction put
159   ParallelMethodContext * method_ptr = port->getParallelMethodContext("put");
160   method_ptr->setLibComScheduling("pdp_direct"); 
161   method_ptr->setDistLibArg("param_data", "pdp_BasicBC", "in");
162   BasicBC * dislib = (BasicBC *) method_ptr->getDistLibArg("param_data", "in");
163   dislib->setEltSize(sizeof(CORBA::Double));
164
165   // Il faut maintenant configurer les bibliothèques
166   // de redistributions de la fonction get_results
167   method_ptr = port->getParallelMethodContext("get_results");
168   method_ptr->setLibComScheduling("pdp_direct"); 
169   method_ptr->setDistLibArg("param_results", "pdp_GaBro", "out");
170   GaBro * dislib_gabro = (GaBro *) method_ptr->getDistLibArg("param_results", "out");
171   dislib_gabro->setEltSize(sizeof(CORBA::Double));
172
173   // Enregistement du port 
174   for (int i = 0; i < totalNode; i++) 
175   {
176     std::ostringstream node_number;
177     node_number << i;
178     std::string event_name("AddNode");
179     event_name += node_number.str();
180     std::string tag_name = proxy_ior;
181
182     if (i == rank) {
183       std::cerr << "Adding node of processor : " << i << std::endl;
184       par_compo->add_parallel_provides_node_port(Ports::Port_PaCO::_narrow(port->_this()), port_name.c_str());
185       port->_remove_ref();
186       par_compo->InterfaceParallel_impl::_proxy->send_event(event_name.c_str(), tag_name.c_str());
187     }
188
189     par_compo->wait_event(event_name.c_str(), tag_name.c_str());
190   }
191
192   // On démarre l'objet parallèle
193   std::string event_name("StartingProxy");
194   std::string tag_name = proxy_ior;
195   if (rank == 0) 
196   {
197     proxy_node->start();
198     par_compo->InterfaceParallel_impl::_proxy->send_event(event_name.c_str(), tag_name.c_str());
199   }
200
201   CORBA::string_free(proxy_ior);
202   return port;
203 }
204
205 void
206 Param_Double_Port_provides_i::wait_init_port(Engines_ParallelDSC_i * par_compo, 
207                                              std::string port_name,
208                                              CORBA::ORB_ptr orb)
209 {
210   int rank = par_compo->getMyRank();
211   int totalNode = par_compo->getTotalNode();
212   // Enregistement du port 
213   for (int i = 0; i < totalNode; i++) 
214   {
215     std::ostringstream node_number;
216     node_number << i;
217     std::string event_name("WaitingNode");
218     event_name += node_number.str();
219     char * proxy_ior = (char * ) par_compo->get_proxy(port_name.c_str());
220     std::string tag_name(proxy_ior);
221     CORBA::string_free(proxy_ior);
222     if (i == rank) 
223       par_compo->InterfaceParallel_impl::_proxy->send_event(event_name.c_str(), tag_name.c_str());
224     par_compo->wait_event(event_name.c_str(), tag_name.c_str());
225   }
226 }
227
228 void 
229 Param_Double_Port_provides_i::put(const Ports::Param_Double_Port::seq_double & param_data)
230 {
231
232   // On attend que le get soit fait
233   // Au départ seq_data_termine_cp = TRUE
234   pthread_mutex_lock(seq_data_mutex_cp);
235   while (seq_data_termine_cp == false)
236   {
237      pthread_cond_wait(seq_data_condition_cp, seq_data_mutex_cp);
238   }
239   seq_data_termine_cp = false;
240   pthread_mutex_unlock(seq_data_mutex_cp);
241
242   pthread_mutex_lock(seq_data_mutex);
243
244   // Création d'une nouvelle séquence
245   // Elle prend le buffer sans le copier
246   Ports::Param_Double_Port::seq_double * n_param_data = (Ports::Param_Double_Port::seq_double *) &param_data;
247   _seq_data = new Ports::Param_Double_Port::seq_double(n_param_data->length(), n_param_data->length(), n_param_data->get_buffer(1), 1);
248
249   seq_data_termine = true;
250   pthread_cond_signal(seq_data_condition);
251   pthread_mutex_unlock(seq_data_mutex);
252 }
253     
254 void 
255 Param_Double_Port_provides_i::get_results(Ports::Param_Double_Port::seq_double_out param_results)
256 {
257   pthread_mutex_lock(seq_results_mutex);
258   while (seq_results_termine == false)
259   {
260      pthread_cond_wait(seq_results_condition, seq_results_mutex);
261   }
262
263   // Création d'une nouvelle séquence
264   // Elle prend le buffer sans le copier
265   param_results = new Ports::Param_Double_Port::seq_double(_seq_results->length(), _seq_results->length(), _seq_results->get_buffer(1), 1);
266   delete _seq_results;
267   _seq_results = NULL;
268
269   seq_results_termine = false;
270   pthread_mutex_unlock(seq_results_mutex);
271
272   // On indique que l'on a copié la valeur
273   // Et donc que l'on peut recevoir une nouvelle valeur
274   pthread_mutex_lock(seq_results_mutex_cp);
275   seq_results_termine_cp = true;
276   pthread_cond_signal(seq_results_condition_cp);
277   pthread_mutex_unlock(seq_results_mutex_cp);
278 }
279
280 Ports::Param_Double_Port::seq_double *
281 Param_Double_Port_provides_i::get_data()
282 {
283   Ports::Param_Double_Port::seq_double * result = NULL;
284
285   pthread_mutex_lock(seq_data_mutex);
286   while (seq_data_termine == false)
287   {
288      pthread_cond_wait(seq_data_condition, seq_data_mutex);
289   }
290
291   // Création d'une nouvelle séquence
292   // Elle prend le buffer sans le copier
293   result = new Ports::Param_Double_Port::seq_double(_seq_data->length(), _seq_data->length(), _seq_data->get_buffer(1), 1);
294   delete _seq_data;
295   _seq_data = NULL;
296
297   seq_data_termine = false;
298   pthread_mutex_unlock(seq_data_mutex);
299
300   // On indique que l'on a copié la valeur
301   // Et donc que l'on peut recevoir une nouvelle valeur
302   pthread_mutex_lock(seq_data_mutex_cp);
303   seq_data_termine_cp = true;
304   pthread_cond_signal(seq_data_condition_cp);
305   pthread_mutex_unlock(seq_data_mutex_cp);
306   return result;
307 }
308
309 void
310 Param_Double_Port_provides_i::set_data(Ports::Param_Double_Port::seq_double * results)
311 {
312   // On attend que le get soit fait
313   // Au départ seq_results_termine_cp = TRUE
314   pthread_mutex_lock(seq_results_mutex_cp);
315   while (seq_results_termine_cp == false)
316   {
317      pthread_cond_wait(seq_results_condition_cp, seq_results_mutex_cp);
318   }
319   seq_results_termine_cp = false;
320   pthread_mutex_unlock(seq_results_mutex_cp);
321
322   pthread_mutex_lock(seq_results_mutex);
323
324   // Création d'une nouvelle séquence
325   // Elle prend le buffer sans le copier
326   _seq_results = new Ports::Param_Double_Port::seq_double(results->length(), results->length(), results->get_buffer(1), 1);
327
328   seq_results_termine = true;
329   pthread_cond_signal(seq_results_condition);
330   pthread_mutex_unlock(seq_results_mutex);
331 }
332
333 void 
334 Param_Double_Port_provides_i::configure_set_data(int data_length, int totalNbElt, int BeginEltPos)
335 {
336   // Configuration de la biblothèque de redistribution
337   // pour les données actuelles
338   ParallelMethodContext * method_ptr = getParallelMethodContext("get_results");
339   GaBro * dislib = (GaBro *) method_ptr->getDistLibArg("param_results", "out");
340   dislib->setNodeNbElt(data_length);
341   dislib->setTotalNbElt(totalNbElt);
342   dislib->setNodePos(BeginEltPos);
343 }