1 // Copyright (C) 2009 CEA/DEN, EDF R&D, OPEN CASCADE
3 // This library is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU Lesser General Public
5 // License as published by the Free Software Foundation; either
6 // version 2.1 of the License.
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 // Lesser General Public License for more details.
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
19 // File : param_double_port_provides.cxx
20 // Author : André RIBES (EDF)
27 #include "Param_Double_Port_provides_i.hxx"
29 #include <paco_omni.h>
30 #include <paco_dummy.h>
32 #include <paco_direct_comScheduling.h>
36 Param_Double_Port_provides_i::Param_Double_Port_provides_i(CORBA::ORB_ptr orb, char * ior, int rank) :
37 Ports::Param_Double_Port_serv(orb,ior,rank),
38 Ports::Param_Double_Port_base_serv(orb,ior,rank),
39 Ports::Data_Port_serv(orb,ior,rank),
40 Ports::Data_Port_base_serv(orb,ior,rank),
41 Ports::Port_serv(orb,ior,rank),
42 Ports::Port_base_serv(orb,ior,rank),
43 InterfaceParallel_impl(orb,ior,rank)
47 seq_data_termine = false;
48 seq_data_mutex = new pthread_mutex_t();
49 pthread_mutex_init(seq_data_mutex, NULL);
50 seq_data_condition = new pthread_cond_t();
51 pthread_cond_init(seq_data_condition, NULL);
52 seq_data_termine_cp = true;
53 seq_data_mutex_cp = new pthread_mutex_t();
54 pthread_mutex_init(seq_data_mutex_cp, NULL);
55 seq_data_condition_cp = new pthread_cond_t();
56 pthread_cond_init(seq_data_condition_cp, NULL);
60 seq_results_termine = false;
61 seq_results_mutex = new pthread_mutex_t();
62 pthread_mutex_init(seq_results_mutex, NULL);
63 seq_results_condition = new pthread_cond_t();
64 pthread_cond_init(seq_results_condition, NULL);
65 seq_results_termine_cp = true;
66 seq_results_mutex_cp = new pthread_mutex_t();
67 pthread_mutex_init(seq_results_mutex_cp, NULL);
68 seq_results_condition_cp = new pthread_cond_t();
69 pthread_cond_init(seq_results_condition_cp, NULL);
72 Param_Double_Port_provides_i::~Param_Double_Port_provides_i()
77 pthread_mutex_destroy(seq_data_mutex);
78 delete seq_data_mutex;
79 pthread_cond_destroy(seq_data_condition);
80 delete seq_data_condition;
81 pthread_mutex_destroy(seq_data_mutex_cp);
82 delete seq_data_mutex_cp;
83 pthread_cond_destroy(seq_data_condition_cp);
84 delete seq_data_condition_cp;
89 pthread_mutex_destroy(seq_results_mutex);
90 delete seq_results_mutex;
91 pthread_cond_destroy(seq_results_condition);
92 delete seq_results_condition;
93 pthread_mutex_destroy(seq_results_mutex_cp);
94 delete seq_results_mutex_cp;
95 pthread_cond_destroy(seq_results_condition_cp);
96 delete seq_results_condition_cp;
99 Param_Double_Port_provides_i *
100 Param_Double_Port_provides_i::init_port(Engines_ParallelDSC_i * par_compo,
101 std::string port_name,
104 int rank = par_compo->getMyRank();
105 int totalNode = par_compo->getTotalNode();
106 paco_com * com = par_compo->getCom();
108 MESSAGE("Configuration of Param_Double_Port_provides: rank = " << rank << " totalNode = " << totalNode);
110 // DOIT ETRE DEJA FAIT AVANT !!!???
111 paco_fabrique_manager* pfm = paco_getFabriqueManager();
112 pfm->register_com("pdp_dummy", new paco_dummy_fabrique());
113 pfm->register_thread("pdp_thread", new paco_omni_fabrique());
114 pfm->register_comScheduling("pdp_direct", new paco_direct_fabrique());
115 pfm->register_distribution("pdp_GaBro", new GaBro_fab());
116 pfm->register_distribution("pdp_BasicBC", new BasicBC_fab());
118 Param_Double_Port_provides_i * port = NULL;
119 Ports::Param_Double_Port_proxy_impl * proxy_node = NULL;
121 std::cerr << "Creating Proxy" << std::endl;
123 // On commence par créer le proxy
124 // Il est enregistré dans le composant et sera détruit automatiquement
125 // lorsque le composant sera détruit
127 new Ports::Param_Double_Port_proxy_impl(CORBA::ORB::_duplicate(orb),
128 pfm->get_thread("pdp_thread"));
129 proxy_node->setLibCom("pdp_dummy", proxy_node);
130 proxy_node->setLibThread("pdp_thread");
131 PaCO::PacoTopology_t serveur_topo;
132 serveur_topo.total = totalNode;
133 proxy_node->setTopology(serveur_topo);
135 // Création de la propriété
136 PortProperties_i * proxy_node_properties = new PortProperties_i();
138 // Enregistrement du proxy
139 par_compo->add_parallel_provides_proxy_port(proxy_node->_this(),
141 proxy_node_properties->_this());
142 proxy_node->_remove_ref();
143 proxy_node_properties->_remove_ref();
146 par_compo->add_parallel_provides_proxy_wait(port_name.c_str());
149 std::cerr << "Getting proxy" << std::endl;
150 char * proxy_ior = (char * ) par_compo->get_proxy(port_name.c_str());
151 std::cerr << "Proxy ior is : " << proxy_ior << std::endl;
153 port = new Param_Double_Port_provides_i(CORBA::ORB::_duplicate(orb), proxy_ior, rank);
154 port->copyClientGlobalContext(par_compo);
156 // Il faut maintenant configurer les bibliothèques
157 // de redistributions de la fonction put
158 ParallelMethodContext * method_ptr = port->getParallelMethodContext("put");
159 method_ptr->setLibComScheduling("pdp_direct");
160 method_ptr->setDistLibArg("param_data", "pdp_BasicBC", "in");
161 BasicBC * dislib = (BasicBC *) method_ptr->getDistLibArg("param_data", "in");
162 dislib->setEltSize(sizeof(CORBA::Double));
164 // Il faut maintenant configurer les bibliothèques
165 // de redistributions de la fonction get_results
166 method_ptr = port->getParallelMethodContext("get_results");
167 method_ptr->setLibComScheduling("pdp_direct");
168 method_ptr->setDistLibArg("param_results", "pdp_GaBro", "out");
169 GaBro * dislib_gabro = (GaBro *) method_ptr->getDistLibArg("param_results", "out");
170 dislib_gabro->setEltSize(sizeof(CORBA::Double));
172 // Enregistement du port
173 for (int i = 0; i < totalNode; i++)
175 std::ostringstream node_number;
177 std::string event_name("AddNode");
178 event_name += node_number.str();
179 std::string tag_name = proxy_ior;
182 std::cerr << "Adding node of processor : " << i << std::endl;
183 par_compo->add_parallel_provides_node_port(Ports::Port_PaCO::_narrow(port->_this()), port_name.c_str());
185 par_compo->InterfaceParallel_impl::_proxy->send_event(event_name.c_str(), tag_name.c_str());
188 par_compo->wait_event(event_name.c_str(), tag_name.c_str());
191 // On démarre l'objet parallèle
192 std::string event_name("StartingProxy");
193 std::string tag_name = proxy_ior;
197 par_compo->InterfaceParallel_impl::_proxy->send_event(event_name.c_str(), tag_name.c_str());
200 CORBA::string_free(proxy_ior);
205 Param_Double_Port_provides_i::wait_init_port(Engines_ParallelDSC_i * par_compo,
206 std::string port_name,
209 int rank = par_compo->getMyRank();
210 int totalNode = par_compo->getTotalNode();
211 // Enregistement du port
212 for (int i = 0; i < totalNode; i++)
214 std::ostringstream node_number;
216 std::string event_name("WaitingNode");
217 event_name += node_number.str();
218 char * proxy_ior = (char * ) par_compo->get_proxy(port_name.c_str());
219 std::string tag_name(proxy_ior);
220 CORBA::string_free(proxy_ior);
222 par_compo->InterfaceParallel_impl::_proxy->send_event(event_name.c_str(), tag_name.c_str());
223 par_compo->wait_event(event_name.c_str(), tag_name.c_str());
228 Param_Double_Port_provides_i::put(const Ports::Param_Double_Port::seq_double & param_data)
231 // On attend que le get soit fait
232 // Au départ seq_data_termine_cp = TRUE
233 pthread_mutex_lock(seq_data_mutex_cp);
234 while (seq_data_termine_cp == false)
236 pthread_cond_wait(seq_data_condition_cp, seq_data_mutex_cp);
238 seq_data_termine_cp = false;
239 pthread_mutex_unlock(seq_data_mutex_cp);
241 pthread_mutex_lock(seq_data_mutex);
243 // Création d'une nouvelle séquence
244 // Elle prend le buffer sans le copier
245 Ports::Param_Double_Port::seq_double * n_param_data = (Ports::Param_Double_Port::seq_double *) ¶m_data;
246 _seq_data = new Ports::Param_Double_Port::seq_double(n_param_data->length(), n_param_data->length(), n_param_data->get_buffer(1), 1);
248 seq_data_termine = true;
249 pthread_cond_signal(seq_data_condition);
250 pthread_mutex_unlock(seq_data_mutex);
254 Param_Double_Port_provides_i::get_results(Ports::Param_Double_Port::seq_double_out param_results)
256 pthread_mutex_lock(seq_results_mutex);
257 while (seq_results_termine == false)
259 pthread_cond_wait(seq_results_condition, seq_results_mutex);
262 // Création d'une nouvelle séquence
263 // Elle prend le buffer sans le copier
264 param_results = new Ports::Param_Double_Port::seq_double(_seq_results->length(), _seq_results->length(), _seq_results->get_buffer(1), 1);
268 seq_results_termine = false;
269 pthread_mutex_unlock(seq_results_mutex);
271 // On indique que l'on a copié la valeur
272 // Et donc que l'on peut recevoir une nouvelle valeur
273 pthread_mutex_lock(seq_results_mutex_cp);
274 seq_results_termine_cp = true;
275 pthread_cond_signal(seq_results_condition_cp);
276 pthread_mutex_unlock(seq_results_mutex_cp);
279 Ports::Param_Double_Port::seq_double *
280 Param_Double_Port_provides_i::get_data()
282 Ports::Param_Double_Port::seq_double * result = NULL;
284 pthread_mutex_lock(seq_data_mutex);
285 while (seq_data_termine == false)
287 pthread_cond_wait(seq_data_condition, seq_data_mutex);
290 // Création d'une nouvelle séquence
291 // Elle prend le buffer sans le copier
292 result = new Ports::Param_Double_Port::seq_double(_seq_data->length(), _seq_data->length(), _seq_data->get_buffer(1), 1);
296 seq_data_termine = false;
297 pthread_mutex_unlock(seq_data_mutex);
299 // On indique que l'on a copié la valeur
300 // Et donc que l'on peut recevoir une nouvelle valeur
301 pthread_mutex_lock(seq_data_mutex_cp);
302 seq_data_termine_cp = true;
303 pthread_cond_signal(seq_data_condition_cp);
304 pthread_mutex_unlock(seq_data_mutex_cp);
309 Param_Double_Port_provides_i::set_data(Ports::Param_Double_Port::seq_double * results)
311 // On attend que le get soit fait
312 // Au départ seq_results_termine_cp = TRUE
313 pthread_mutex_lock(seq_results_mutex_cp);
314 while (seq_results_termine_cp == false)
316 pthread_cond_wait(seq_results_condition_cp, seq_results_mutex_cp);
318 seq_results_termine_cp = false;
319 pthread_mutex_unlock(seq_results_mutex_cp);
321 pthread_mutex_lock(seq_results_mutex);
323 // Création d'une nouvelle séquence
324 // Elle prend le buffer sans le copier
325 _seq_results = new Ports::Param_Double_Port::seq_double(results->length(), results->length(), results->get_buffer(1), 1);
327 seq_results_termine = true;
328 pthread_cond_signal(seq_results_condition);
329 pthread_mutex_unlock(seq_results_mutex);
333 Param_Double_Port_provides_i::configure_set_data(int data_length, int totalNbElt, int BeginEltPos)
335 // Configuration de la biblothèque de redistribution
336 // pour les données actuelles
337 ParallelMethodContext * method_ptr = getParallelMethodContext("get_results");
338 GaBro * dislib = (GaBro *) method_ptr->getDistLibArg("param_results", "out");
339 dislib->setNodeNbElt(data_length);
340 dislib->setTotalNbElt(totalNbElt);
341 dislib->setNodePos(BeginEltPos);