1 // Copyright (C) 2007-2024 CEA, EDF, OPEN CASCADE
3 // This library is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU Lesser General Public
5 // License as published by the Free Software Foundation; either
6 // version 2.1 of the License, or (at your option) any later version.
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 // Lesser General Public License for more details.
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
20 // File : param_double_port_provides.cxx
21 // Author : André RIBES (EDF)
28 #include "Param_Double_Port_provides_i.hxx"
30 #include <paco_omni.h>
31 #include <paco_dummy.h>
33 #include <paco_direct_comScheduling.h>
37 Param_Double_Port_provides_i::Param_Double_Port_provides_i(CORBA::ORB_ptr orb, char * ior, int rank) :
38 Ports::Param_Double_Port_serv(orb,ior,rank),
39 Ports::Param_Double_Port_base_serv(orb,ior,rank),
40 Ports::Data_Port_serv(orb,ior,rank),
41 Ports::Data_Port_base_serv(orb,ior,rank),
42 Ports::Port_serv(orb,ior,rank),
43 Ports::Port_base_serv(orb,ior,rank),
44 InterfaceParallel_impl(orb,ior,rank)
48 seq_data_termine = false;
49 seq_data_mutex = new pthread_mutex_t();
50 pthread_mutex_init(seq_data_mutex, NULL);
51 seq_data_condition = new pthread_cond_t();
52 pthread_cond_init(seq_data_condition, NULL);
53 seq_data_termine_cp = true;
54 seq_data_mutex_cp = new pthread_mutex_t();
55 pthread_mutex_init(seq_data_mutex_cp, NULL);
56 seq_data_condition_cp = new pthread_cond_t();
57 pthread_cond_init(seq_data_condition_cp, NULL);
61 seq_results_termine = false;
62 seq_results_mutex = new pthread_mutex_t();
63 pthread_mutex_init(seq_results_mutex, NULL);
64 seq_results_condition = new pthread_cond_t();
65 pthread_cond_init(seq_results_condition, NULL);
66 seq_results_termine_cp = true;
67 seq_results_mutex_cp = new pthread_mutex_t();
68 pthread_mutex_init(seq_results_mutex_cp, NULL);
69 seq_results_condition_cp = new pthread_cond_t();
70 pthread_cond_init(seq_results_condition_cp, NULL);
73 Param_Double_Port_provides_i::~Param_Double_Port_provides_i()
78 pthread_mutex_destroy(seq_data_mutex);
79 delete seq_data_mutex;
80 pthread_cond_destroy(seq_data_condition);
81 delete seq_data_condition;
82 pthread_mutex_destroy(seq_data_mutex_cp);
83 delete seq_data_mutex_cp;
84 pthread_cond_destroy(seq_data_condition_cp);
85 delete seq_data_condition_cp;
90 pthread_mutex_destroy(seq_results_mutex);
91 delete seq_results_mutex;
92 pthread_cond_destroy(seq_results_condition);
93 delete seq_results_condition;
94 pthread_mutex_destroy(seq_results_mutex_cp);
95 delete seq_results_mutex_cp;
96 pthread_cond_destroy(seq_results_condition_cp);
97 delete seq_results_condition_cp;
100 Param_Double_Port_provides_i *
101 Param_Double_Port_provides_i::init_port(Engines_ParallelDSC_i * par_compo,
102 std::string port_name,
105 int rank = par_compo->getMyRank();
106 int totalNode = par_compo->getTotalNode();
107 paco_com * com = par_compo->getCom();
109 MESSAGE("Configuration of Param_Double_Port_provides: rank = " << rank << " totalNode = " << totalNode);
111 // DOIT ETRE DEJA FAIT AVANT !!!???
112 paco_fabrique_manager* pfm = paco_getFabriqueManager();
113 pfm->register_com("pdp_dummy", new paco_dummy_fabrique());
114 pfm->register_thread("pdp_thread", new paco_omni_fabrique());
115 pfm->register_comScheduling("pdp_direct", new paco_direct_fabrique());
116 pfm->register_distribution("pdp_GaBro", new GaBro_fab());
117 pfm->register_distribution("pdp_BasicBC", new BasicBC_fab());
119 Param_Double_Port_provides_i * port = NULL;
120 Ports::Param_Double_Port_proxy_impl * proxy_node = NULL;
122 std::cerr << "Creating Proxy" << std::endl;
124 // On commence par créer le proxy
125 // Il est enregistré dans le composant et sera détruit automatiquement
126 // lorsque le composant sera détruit
128 new Ports::Param_Double_Port_proxy_impl(CORBA::ORB::_duplicate(orb),
129 pfm->get_thread("pdp_thread"));
130 proxy_node->setLibCom("pdp_dummy", proxy_node);
131 proxy_node->setLibThread("pdp_thread");
132 PaCO::PacoTopology_t serveur_topo;
133 serveur_topo.total = totalNode;
134 proxy_node->setTopology(serveur_topo);
136 // Création de la propriété
137 PortProperties_i * proxy_node_properties = new PortProperties_i();
139 // Enregistrement du proxy
140 par_compo->add_parallel_provides_proxy_port(proxy_node->_this(),
142 proxy_node_properties->_this());
143 proxy_node->_remove_ref();
144 proxy_node_properties->_remove_ref();
147 par_compo->add_parallel_provides_proxy_wait(port_name.c_str());
150 std::cerr << "Getting proxy" << std::endl;
151 char * proxy_ior = (char * ) par_compo->get_proxy(port_name.c_str());
152 std::cerr << "Proxy ior is : " << proxy_ior << std::endl;
154 port = new Param_Double_Port_provides_i(CORBA::ORB::_duplicate(orb), proxy_ior, rank);
155 port->copyClientGlobalContext(par_compo);
157 // Il faut maintenant configurer les bibliothèques
158 // de redistributions de la fonction put
159 ParallelMethodContext * method_ptr = port->getParallelMethodContext("put");
160 method_ptr->setLibComScheduling("pdp_direct");
161 method_ptr->setDistLibArg("param_data", "pdp_BasicBC", "in");
162 BasicBC * dislib = (BasicBC *) method_ptr->getDistLibArg("param_data", "in");
163 dislib->setEltSize(sizeof(CORBA::Double));
165 // Il faut maintenant configurer les bibliothèques
166 // de redistributions de la fonction get_results
167 method_ptr = port->getParallelMethodContext("get_results");
168 method_ptr->setLibComScheduling("pdp_direct");
169 method_ptr->setDistLibArg("param_results", "pdp_GaBro", "out");
170 GaBro * dislib_gabro = (GaBro *) method_ptr->getDistLibArg("param_results", "out");
171 dislib_gabro->setEltSize(sizeof(CORBA::Double));
173 // Enregistement du port
174 for (int i = 0; i < totalNode; i++)
176 std::ostringstream node_number;
178 std::string event_name("AddNode");
179 event_name += node_number.str();
180 std::string tag_name = proxy_ior;
183 std::cerr << "Adding node of processor : " << i << std::endl;
184 par_compo->add_parallel_provides_node_port(Ports::Port_PaCO::_narrow(port->_this()), port_name.c_str());
186 par_compo->InterfaceParallel_impl::_proxy->send_event(event_name.c_str(), tag_name.c_str());
189 par_compo->wait_event(event_name.c_str(), tag_name.c_str());
192 // On démarre l'objet parallèle
193 std::string event_name("StartingProxy");
194 std::string tag_name = proxy_ior;
198 par_compo->InterfaceParallel_impl::_proxy->send_event(event_name.c_str(), tag_name.c_str());
201 CORBA::string_free(proxy_ior);
206 Param_Double_Port_provides_i::wait_init_port(Engines_ParallelDSC_i * par_compo,
207 std::string port_name,
210 int rank = par_compo->getMyRank();
211 int totalNode = par_compo->getTotalNode();
212 // Enregistement du port
213 for (int i = 0; i < totalNode; i++)
215 std::ostringstream node_number;
217 std::string event_name("WaitingNode");
218 event_name += node_number.str();
219 char * proxy_ior = (char * ) par_compo->get_proxy(port_name.c_str());
220 std::string tag_name(proxy_ior);
221 CORBA::string_free(proxy_ior);
223 par_compo->InterfaceParallel_impl::_proxy->send_event(event_name.c_str(), tag_name.c_str());
224 par_compo->wait_event(event_name.c_str(), tag_name.c_str());
229 Param_Double_Port_provides_i::put(const Ports::Param_Double_Port::seq_double & param_data)
232 // On attend que le get soit fait
233 // Au départ seq_data_termine_cp = TRUE
234 pthread_mutex_lock(seq_data_mutex_cp);
235 while (seq_data_termine_cp == false)
237 pthread_cond_wait(seq_data_condition_cp, seq_data_mutex_cp);
239 seq_data_termine_cp = false;
240 pthread_mutex_unlock(seq_data_mutex_cp);
242 pthread_mutex_lock(seq_data_mutex);
244 // Création d'une nouvelle séquence
245 // Elle prend le buffer sans le copier
246 Ports::Param_Double_Port::seq_double * n_param_data = (Ports::Param_Double_Port::seq_double *) ¶m_data;
247 _seq_data = new Ports::Param_Double_Port::seq_double(n_param_data->length(), n_param_data->length(), n_param_data->get_buffer(1), 1);
249 seq_data_termine = true;
250 pthread_cond_signal(seq_data_condition);
251 pthread_mutex_unlock(seq_data_mutex);
255 Param_Double_Port_provides_i::get_results(Ports::Param_Double_Port::seq_double_out param_results)
257 pthread_mutex_lock(seq_results_mutex);
258 while (seq_results_termine == false)
260 pthread_cond_wait(seq_results_condition, seq_results_mutex);
263 // Création d'une nouvelle séquence
264 // Elle prend le buffer sans le copier
265 param_results = new Ports::Param_Double_Port::seq_double(_seq_results->length(), _seq_results->length(), _seq_results->get_buffer(1), 1);
269 seq_results_termine = false;
270 pthread_mutex_unlock(seq_results_mutex);
272 // On indique que l'on a copié la valeur
273 // Et donc que l'on peut recevoir une nouvelle valeur
274 pthread_mutex_lock(seq_results_mutex_cp);
275 seq_results_termine_cp = true;
276 pthread_cond_signal(seq_results_condition_cp);
277 pthread_mutex_unlock(seq_results_mutex_cp);
280 Ports::Param_Double_Port::seq_double *
281 Param_Double_Port_provides_i::get_data()
283 Ports::Param_Double_Port::seq_double * result = NULL;
285 pthread_mutex_lock(seq_data_mutex);
286 while (seq_data_termine == false)
288 pthread_cond_wait(seq_data_condition, seq_data_mutex);
291 // Création d'une nouvelle séquence
292 // Elle prend le buffer sans le copier
293 result = new Ports::Param_Double_Port::seq_double(_seq_data->length(), _seq_data->length(), _seq_data->get_buffer(1), 1);
297 seq_data_termine = false;
298 pthread_mutex_unlock(seq_data_mutex);
300 // On indique que l'on a copié la valeur
301 // Et donc que l'on peut recevoir une nouvelle valeur
302 pthread_mutex_lock(seq_data_mutex_cp);
303 seq_data_termine_cp = true;
304 pthread_cond_signal(seq_data_condition_cp);
305 pthread_mutex_unlock(seq_data_mutex_cp);
310 Param_Double_Port_provides_i::set_data(Ports::Param_Double_Port::seq_double * results)
312 // On attend que le get soit fait
313 // Au départ seq_results_termine_cp = TRUE
314 pthread_mutex_lock(seq_results_mutex_cp);
315 while (seq_results_termine_cp == false)
317 pthread_cond_wait(seq_results_condition_cp, seq_results_mutex_cp);
319 seq_results_termine_cp = false;
320 pthread_mutex_unlock(seq_results_mutex_cp);
322 pthread_mutex_lock(seq_results_mutex);
324 // Création d'une nouvelle séquence
325 // Elle prend le buffer sans le copier
326 _seq_results = new Ports::Param_Double_Port::seq_double(results->length(), results->length(), results->get_buffer(1), 1);
328 seq_results_termine = true;
329 pthread_cond_signal(seq_results_condition);
330 pthread_mutex_unlock(seq_results_mutex);
334 Param_Double_Port_provides_i::configure_set_data(int data_length, int totalNbElt, int BeginEltPos)
336 // Configuration de la biblothèque de redistribution
337 // pour les données actuelles
338 ParallelMethodContext * method_ptr = getParallelMethodContext("get_results");
339 GaBro * dislib = (GaBro *) method_ptr->getDistLibArg("param_results", "out");
340 dislib->setNodeNbElt(data_length);
341 dislib->setTotalNbElt(totalNbElt);
342 dislib->setNodePos(BeginEltPos);