From 7003d2d82e500be3b5b1a91cadec862a68b48f48 Mon Sep 17 00:00:00 2001 From: ribes Date: Fri, 27 Feb 2009 11:08:19 +0000 Subject: [PATCH] - Adding a firts parallel DSC port --- .../param_double_port_provides.cxx | 305 ++++++++++++++++++ .../param_double_port_provides.hxx | 74 +++++ .../ParallelDSC/param_double_port_uses.cxx | 110 +++++++ .../ParallelDSC/param_double_port_uses.hxx | 62 ++++ 4 files changed, 551 insertions(+) create mode 100644 src/DSC/ParallelDSC/param_double_port_provides.cxx create mode 100644 src/DSC/ParallelDSC/param_double_port_provides.hxx create mode 100644 src/DSC/ParallelDSC/param_double_port_uses.cxx create mode 100644 src/DSC/ParallelDSC/param_double_port_uses.hxx diff --git a/src/DSC/ParallelDSC/param_double_port_provides.cxx b/src/DSC/ParallelDSC/param_double_port_provides.cxx new file mode 100644 index 000000000..dbad2ae41 --- /dev/null +++ b/src/DSC/ParallelDSC/param_double_port_provides.cxx @@ -0,0 +1,305 @@ +// Copyright (C) 2009 CEA/DEN, EDF R&D, OPEN CASCADE +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Lesser General Public +// License as published by the Free Software Foundation; either +// version 2.1 of the License. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public +// License along with this library; if not, write to the Free Software +// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +// +// See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com +// +// File : param_double_port_provides.cxx +// Author : André RIBES (EDF) +// Module : KERNEL + +#include +#include + +#include "param_double_port_provides.hxx" + +#include +#include +#include +#include +#include +#include + +param_double_port_provides_i::param_double_port_provides_i(CORBA::ORB_ptr orb, char * ior, int rank) : + Ports::Param_Double_Port_serv(orb,ior,rank), + Ports::Param_Double_Port_base_serv(orb,ior,rank), + Ports::Data_Port_serv(orb,ior,rank), + Ports::Data_Port_base_serv(orb,ior,rank), + Ports::Port_serv(orb,ior,rank), + Ports::Port_base_serv(orb,ior,rank), + InterfaceParallel_impl(orb,ior,rank) +{ + _seq_data = NULL; + + seq_data_termine = false; + seq_data_mutex = new pthread_mutex_t(); + pthread_mutex_init(seq_data_mutex, NULL); + seq_data_condition = new pthread_cond_t(); + pthread_cond_init(seq_data_condition, NULL); + seq_data_termine_cp = true; + seq_data_mutex_cp = new pthread_mutex_t(); + pthread_mutex_init(seq_data_mutex_cp, NULL); + seq_data_condition_cp = new pthread_cond_t(); + pthread_cond_init(seq_data_condition_cp, NULL); + + _seq_results = NULL; + + seq_results_termine = false; + seq_results_mutex = new pthread_mutex_t(); + pthread_mutex_init(seq_results_mutex, NULL); + seq_results_condition = new pthread_cond_t(); + pthread_cond_init(seq_results_condition, NULL); + seq_results_termine_cp = true; + seq_results_mutex_cp = new pthread_mutex_t(); + pthread_mutex_init(seq_results_mutex_cp, NULL); + seq_results_condition_cp = new pthread_cond_t(); + pthread_cond_init(seq_results_condition_cp, NULL); +} + +param_double_port_provides_i::~param_double_port_provides_i() +{ + if (_seq_data) + delete _seq_data; + + pthread_mutex_destroy(seq_data_mutex); + delete seq_data_mutex; + pthread_cond_destroy(seq_data_condition); + delete seq_data_condition; + pthread_mutex_destroy(seq_data_mutex_cp); + delete seq_data_mutex_cp; + pthread_cond_destroy(seq_data_condition_cp); + delete seq_data_condition_cp; + + if (_seq_results) + delete _seq_results; + + pthread_mutex_destroy(seq_results_mutex); + delete seq_results_mutex; + pthread_cond_destroy(seq_results_condition); + delete seq_results_condition; + pthread_mutex_destroy(seq_results_mutex_cp); + delete seq_results_mutex_cp; + pthread_cond_destroy(seq_results_condition_cp); + delete seq_results_condition_cp; +} + +param_double_port_provides_i * +param_double_port_provides_i::init_port(Engines_ParallelDSC_i * par_compo, + std::string port_name, + CORBA::ORB_ptr orb) +{ + int rank = par_compo->getMyRank(); + int totalNode = par_compo->getTotalNode(); + paco_com * com = par_compo->getCom(); + + // DOIT ETRE DEJA FAIT AVANT !!!??? + paco_fabrique_manager* pfm = paco_getFabriqueManager(); + pfm->register_com("pdp_dummy", new paco_dummy_fabrique()); + pfm->register_thread("pdp_thread", new paco_omni_fabrique()); + pfm->register_comScheduling("pdp_direct", new paco_direct_fabrique()); + pfm->register_distribution("pdp_GaBro", new GaBro_fab()); + pfm->register_distribution("pdp_BasicBC", new BasicBC_fab()); + + param_double_port_provides_i * port = NULL; + Ports::Param_Double_Port_proxy_impl * proxy_node = NULL; + + std::cerr << "Creating Proxy" << std::endl; + if (rank == 0) { + // On commence par créer le proxy + // Il est enregistré dans le composant et sera détruit automatiquement + // lorsque le composant sera détruit + proxy_node = + new Ports::Param_Double_Port_proxy_impl(CORBA::ORB::_duplicate(orb), + new paco_omni_fabrique()); + proxy_node->setLibCom("pdp_dummy", proxy_node); + proxy_node->setLibThread("pdp_thread"); + PaCO::PacoTopology_t serveur_topo; + serveur_topo.total = totalNode; + proxy_node->setTopology(serveur_topo); + + // Création de la propriété + PortProperties_i * proxy_node_properties = new PortProperties_i(); + + // Enregistrement du proxy + par_compo->add_parallel_provides_proxy_port(proxy_node->_this(), + port_name.c_str(), + proxy_node_properties->_this()); + proxy_node->_remove_ref(); + proxy_node_properties->_remove_ref(); + } + else { + par_compo->add_parallel_provides_proxy_wait(port_name.c_str()); + } + + std::cerr << "Getting proxy" << std::endl; + char * proxy_ior = (char * ) par_compo->get_proxy(port_name.c_str()); + std::cerr << "Proxy ior is : " << proxy_ior << std::endl; + + port = new param_double_port_provides_i(CORBA::ORB::_duplicate(orb), proxy_ior, rank); + port->copyGlobalContext(par_compo); + + // Il faut maintenant configurer les bibliothèques + // de redistributions de la fonction put + ParallelMethodContext * method_ptr = port->getParallelMethodContext("put"); + method_ptr->setLibComScheduling("pdp_direct"); + method_ptr->setDistLibArg("param_data", "pdp_BasicBC", "in"); + BasicBC * dislib = (BasicBC *) method_ptr->getDistLibArg("param_data", "in"); + dislib->setEltSize(sizeof(CORBA::Double)); + + // Il faut maintenant configurer les bibliothèques + // de redistributions de la fonction get_results + method_ptr = port->getParallelMethodContext("get_results"); + method_ptr->setLibComScheduling("pdp_direct"); + method_ptr->setDistLibArg("param_results", "pdp_GaBro", "out"); + GaBro * dislib_gabro = (GaBro *) method_ptr->getDistLibArg("param_results", "out"); + dislib_gabro->setEltSize(sizeof(CORBA::Double)); + + com->paco_barrier(); + + // Enregistement du port + for (int i = 0; i < totalNode; i++) { + if (i == rank) { + std::cerr << "Adding node of processor : " << i << std::endl; + par_compo->add_parallel_provides_node_port(Ports::Port_PaCO::_narrow(port->_this()), port_name.c_str()); + } + com->paco_barrier(); + } + + // On démarre l'objet parallèle + if (rank == 0) { + proxy_node->start(); + com->paco_barrier(); + } + else + com->paco_barrier(); + return port; +} + +void +param_double_port_provides_i::put(const Ports::Param_Double_Port::seq_double & param_data) +{ + + // On attend que le get soit fait + // Au départ seq_data_termine_cp = TRUE + pthread_mutex_lock(seq_data_mutex_cp); + while (seq_data_termine_cp == false) + { + pthread_cond_wait(seq_data_condition_cp, seq_data_mutex_cp); + } + seq_data_termine_cp = false; + pthread_mutex_unlock(seq_data_mutex_cp); + + pthread_mutex_lock(seq_data_mutex); + + // Création d'une nouvelle séquence + // Elle prend le buffer sans le copier + Ports::Param_Double_Port::seq_double * n_param_data = (Ports::Param_Double_Port::seq_double *) ¶m_data; + _seq_data = new Ports::Param_Double_Port::seq_double(n_param_data->length(), n_param_data->length(), n_param_data->get_buffer(1), 1); + + seq_data_termine = true; + pthread_cond_signal(seq_data_condition); + pthread_mutex_unlock(seq_data_mutex); +} + +void +param_double_port_provides_i::get_results(Ports::Param_Double_Port::seq_double_out param_results) +{ + pthread_mutex_lock(seq_results_mutex); + while (seq_results_termine == false) + { + pthread_cond_wait(seq_results_condition, seq_results_mutex); + } + + // Création d'une nouvelle séquence + // Elle prend le buffer sans le copier + param_results = new Ports::Param_Double_Port::seq_double(_seq_results->length(), _seq_results->length(), _seq_results->get_buffer(1), 1); + delete _seq_results; + _seq_results = NULL; + + seq_results_termine = false; + pthread_mutex_unlock(seq_results_mutex); + + // On indique que l'on a copié la valeur + // Et donc que l'on peut recevoir une nouvelle valeur + pthread_mutex_lock(seq_results_mutex_cp); + seq_results_termine_cp = true; + pthread_cond_signal(seq_results_condition_cp); + pthread_mutex_unlock(seq_results_mutex_cp); +} + +Ports::Param_Double_Port::seq_double * +param_double_port_provides_i::get_data() +{ + Ports::Param_Double_Port::seq_double * result = NULL; + + pthread_mutex_lock(seq_data_mutex); + while (seq_data_termine == false) + { + pthread_cond_wait(seq_data_condition, seq_data_mutex); + } + + // Création d'une nouvelle séquence + // Elle prend le buffer sans le copier + result = new Ports::Param_Double_Port::seq_double(_seq_data->length(), _seq_data->length(), _seq_data->get_buffer(1), 1); + delete _seq_data; + _seq_data = NULL; + + seq_data_termine = false; + pthread_mutex_unlock(seq_data_mutex); + + // On indique que l'on a copié la valeur + // Et donc que l'on peut recevoir une nouvelle valeur + pthread_mutex_lock(seq_data_mutex_cp); + seq_data_termine_cp = true; + pthread_cond_signal(seq_data_condition_cp); + pthread_mutex_unlock(seq_data_mutex_cp); + return result; +} + +void +param_double_port_provides_i::set_data(Ports::Param_Double_Port::seq_double * results) +{ + // On attend que le get soit fait + // Au départ seq_results_termine_cp = TRUE + pthread_mutex_lock(seq_results_mutex_cp); + while (seq_results_termine_cp == false) + { + pthread_cond_wait(seq_results_condition_cp, seq_results_mutex_cp); + } + seq_results_termine_cp = false; + pthread_mutex_unlock(seq_results_mutex_cp); + + pthread_mutex_lock(seq_results_mutex); + + // Création d'une nouvelle séquence + // Elle prend le buffer sans le copier + _seq_results = new Ports::Param_Double_Port::seq_double(results->length(), results->length(), results->get_buffer(1), 1); + + seq_results_termine = true; + pthread_cond_signal(seq_results_condition); + pthread_mutex_unlock(seq_results_mutex); +} + +void +param_double_port_provides_i::configure_set_data(int data_length, int totalNbElt, int BeginEltPos) +{ + // Configuration de la biblothèque de redistribution + // pour les données actuelles + ParallelMethodContext * method_ptr = getParallelMethodContext("get_results"); + GaBro * dislib = (GaBro *) method_ptr->getDistLibArg("param_results", "out"); + dislib->setNodeNbElt(data_length); + dislib->setTotalNbElt(totalNbElt); + dislib->setNodePos(BeginEltPos); +} diff --git a/src/DSC/ParallelDSC/param_double_port_provides.hxx b/src/DSC/ParallelDSC/param_double_port_provides.hxx new file mode 100644 index 000000000..4d0552de3 --- /dev/null +++ b/src/DSC/ParallelDSC/param_double_port_provides.hxx @@ -0,0 +1,74 @@ +// Copyright (C) 2009 CEA/DEN, EDF R&D, OPEN CASCADE +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Lesser General Public +// License as published by the Free Software Foundation; either +// version 2.1 of the License. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public +// License along with this library; if not, write to the Free Software +// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +// +// See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com +// +// File : param_double_port_provides.hxx +// Author : André RIBES (EDF) +// Module : KERNEL + +#ifndef _PARAM_DOUBLE_PORT_PROVIDES_HXX_ +#define _PARAM_DOUBLE_PORT_PROVIDES_HXX_ + +#include "SALOME_PortsPaCO_Ports_Param_Double_Port_server.hxx" + +#include "ParallelDSC_i.hxx" +#include "PortProperties_i.hxx" + +class param_double_port_provides_i : + public virtual Ports::Param_Double_Port_serv +{ + public : + param_double_port_provides_i(CORBA::ORB_ptr orb, char * ior, int rank); + virtual ~param_double_port_provides_i(); + + void put(const Ports::Param_Double_Port::seq_double & param_data); + void get_results(Ports::Param_Double_Port::seq_double_out param_results); + + // local methods + Ports::Param_Double_Port::seq_double * get_data(); + void set_data(Ports::Param_Double_Port::seq_double * results); + void configure_set_data(int data_length, + int totalNbElt, + int BeginEltPos); + + // Aide à la création du port + static param_double_port_provides_i * init_port(Engines_ParallelDSC_i * par_compo, + std::string port_name, + CORBA::ORB_ptr orb); + + private: + // Buffers pour la réception et l'envoi + Ports::Param_Double_Port::seq_double * _seq_data; + Ports::Param_Double_Port::seq_double * _seq_results; + + // Variable pour la gestion du buffer de réception + pthread_mutex_t * seq_data_mutex; + pthread_cond_t * seq_data_condition; + bool seq_data_termine; + pthread_mutex_t * seq_data_mutex_cp; + pthread_cond_t * seq_data_condition_cp; + bool seq_data_termine_cp; + + // Variable pour la gestion du buffer d'envoi + pthread_mutex_t * seq_results_mutex; + pthread_cond_t * seq_results_condition; + bool seq_results_termine; + pthread_mutex_t * seq_results_mutex_cp; + pthread_cond_t * seq_results_condition_cp; + bool seq_results_termine_cp; +}; +#endif diff --git a/src/DSC/ParallelDSC/param_double_port_uses.cxx b/src/DSC/ParallelDSC/param_double_port_uses.cxx new file mode 100644 index 000000000..fa203a6be --- /dev/null +++ b/src/DSC/ParallelDSC/param_double_port_uses.cxx @@ -0,0 +1,110 @@ +// Copyright (C) 2009 CEA/DEN, EDF R&D, OPEN CASCADE +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Lesser General Public +// License as published by the Free Software Foundation; either +// version 2.1 of the License. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public +// License along with this library; if not, write to the Free Software +// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +// +// See http://www.salome-platform.org/ or email : webparam_double_port_uses_i.salome@opencascade.com +// +// File : param_double_port_uses.cxx +// Author : André RIBES (EDF) +// Module : KERNEL + +#include "param_double_port_uses.hxx" + +param_double_port_uses_i::param_double_port_uses_i(Engines_ParallelDSC_i * par_compo, + std::string port_name, + CORBA::ORB_ptr orb) +{ + _orb = CORBA::ORB::_duplicate(orb); + _fake_properties = new PortProperties_i(); + _par_compo = par_compo; + _port_name = port_name; + _provides_port = NULL; + + paco_fabrique_manager * pfm = paco_getFabriqueManager(); + pfm->register_comScheduling("param_double_port_uses_i_direct", new paco_direct_fabrique()); + pfm->register_distribution("param_double_port_uses_i_GaBro", new GaBro_fab()); + pfm->register_distribution("param_double_port_uses_i_BasicBC", new BasicBC_fab()); +} + +param_double_port_uses_i::~param_double_port_uses_i() +{ + if (_fake_properties) + delete _fake_properties; + if (_provides_port) + { + _provides_port->stop(); + delete _provides_port; + } +} + +void +param_double_port_uses_i::add_port_to_component() +{ + _par_compo->add_uses_port("IDL:Ports/Param_Double_Port:1.0", + _port_name.c_str(), + _fake_properties->_this()); +} + +void +param_double_port_uses_i::start_port() +{ + Engines::DSC::uses_port * uport = _par_compo->get_uses_port(_port_name.c_str()); + _proxy_port = Ports::Param_Double_Port::_narrow((*uport)[0]); + _provides_port = Ports::PaCO_Param_Double_Port::PaCO_narrow(_proxy_port, _orb); + _provides_port->copyClientGlobalContext(_par_compo); + _provides_port->init(_par_compo->getMyRank(), _par_compo->getTotalNode()); + + // Il faut maintenant configurer les bibliothèques + // de redistributions de la fonction put + ParallelMethodContext * method_ptr; + method_ptr = _provides_port->getParallelMethodContext("put"); + method_ptr->setLibComScheduling("param_double_port_uses_i_direct"); + method_ptr->setDistLibArg("param_data", "param_double_port_uses_i_BasicBC", "in"); + BasicBC * dislib = (BasicBC *) method_ptr->getDistLibArg("param_data", "in"); + dislib->setEltSize(sizeof(CORBA::Double)); + dislib->setBlocSize(0); // BLOC + dislib->setNodeRank(_par_compo->getMyRank()); + + // Il faut maintenant configurer les bibliothèques + // de redistributions de la fonction get_results + method_ptr = _provides_port->getParallelMethodContext("get_results"); + method_ptr->setLibComScheduling("param_double_port_uses_i_direct"); + method_ptr->setDistLibArg("param_results", "param_double_port_uses_i_GaBro", "out"); + GaBro * dislib_gabro = (GaBro *) method_ptr->getDistLibArg("param_results", "out"); + dislib_gabro->setEltSize(sizeof(CORBA::Double)); + + _provides_port->start(); +} + +void +param_double_port_uses_i::configure_port_method_put(int totalNbElt) +{ + ParallelMethodContext * method_ptr; + method_ptr = _provides_port->getParallelMethodContext("put"); + BasicBC * dislib = (BasicBC *) method_ptr->getDistLibArg("param_data", "in"); + dislib->setTotalNbElt(totalNbElt); +} + +void +param_double_port_uses_i::put(const Ports::Param_Double_Port::seq_double & param_data) +{ + _provides_port->put(param_data); +} + +void +param_double_port_uses_i::get_results(Ports::Param_Double_Port::seq_double_out param_results) +{ + _provides_port->get_results(param_results); +} diff --git a/src/DSC/ParallelDSC/param_double_port_uses.hxx b/src/DSC/ParallelDSC/param_double_port_uses.hxx new file mode 100644 index 000000000..914e68705 --- /dev/null +++ b/src/DSC/ParallelDSC/param_double_port_uses.hxx @@ -0,0 +1,62 @@ +// Copyright (C) 2009 CEA/DEN, EDF R&D, OPEN CASCADE +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Lesser General Public +// License as published by the Free Software Foundation; either +// version 2.1 of the License. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public +// License along with this library; if not, write to the Free Software +// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +// +// See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com +// +// File : param_double_port_uses.hxx +// Author : André RIBES (EDF) +// Module : KERNEL + +#ifndef _PARAM_DOUBLE_PORT_USES_HXX_ +#define _PARAM_DOUBLE_PORT_USES_HXX_ + +#include "SALOME_PortsPaCO_Ports_Param_Double_Port_client.hxx" + +#include "ParallelDSC_i.hxx" +#include "PortProperties_i.hxx" + +#include +#include +#include + +class param_double_port_uses_i +{ + public : + param_double_port_uses_i(Engines_ParallelDSC_i * par_compo, + std::string port_name, + CORBA::ORB_ptr orb); + virtual ~param_double_port_uses_i(); + + // Port local init methods + virtual void add_port_to_component(); + virtual void start_port(); + void configure_port_method_put(int totalNbElt); + + // Port methods + void put(const Ports::Param_Double_Port::seq_double & param_data); + void get_results(Ports::Param_Double_Port::seq_double_out param_results); + + private : + CORBA::ORB_var _orb; + std::string _port_name; + Engines_ParallelDSC_i * _par_compo; + Ports::Param_Double_Port_var _proxy_port; + PortProperties_i * _fake_properties; + + Ports::PaCO_Param_Double_Port * _provides_port; +}; +#endif + -- 2.39.2