From: ageay Date: Thu, 19 Aug 2004 11:09:07 +0000 (+0000) Subject: *** empty log message *** X-Git-Tag: BR_GEAY_20041105~5 X-Git-Url: http://git.salome-platform.org/gitweb/?a=commitdiff_plain;h=09074e936cba1b2a48ab1029cf8bb58d8cdd4bce;p=modules%2Fkernel.git *** empty log message *** --- diff --git a/src/Communication/Makefile.in b/src/Communication/Makefile.in new file mode 100644 index 000000000..9d2a9be3e --- /dev/null +++ b/src/Communication/Makefile.in @@ -0,0 +1,33 @@ + +top_srcdir=@top_srcdir@ +top_builddir=../.. +srcdir=@srcdir@ +VPATH=.:@srcdir@:@top_srcdir@/idl + + +@COMMENCE@ + +EXPORT_PYSCRIPTS = + +EXPORT_HEADERS = \ + ReceiverFactory.hxx \ + SenderFactory.hxx \ + +# Libraries targets + +LIB = libSalomeCommunication.la +LIB_SRC = SALOME_Comm_i.cxx SenderFactory.cxx Receiver.cxx MultiCommException.cxx SALOMEMultiComm.cxx ReceiverFactory.cxx +LIB_SERVER_IDL = SALOME_Comm.idl SALOME_Exception.idl + +# Executables targets +BIN = +BIN_SRC = +BIN_SERVER_IDL = + +CPPFLAGS+= $(PYTHON_INCLUDES) $(LAM_INCLUDES) + +LDFLAGS+= -lOpUtil -lSALOMELocalTrace + +LIBS += -Xlinker -export-dynamic $(PYTHON_LIBS) $(LAM_LIBS) + +@CONCLUDE@ diff --git a/src/Communication/MultiCommException.cxx b/src/Communication/MultiCommException.cxx new file mode 100644 index 000000000..d649d9652 --- /dev/null +++ b/src/Communication/MultiCommException.cxx @@ -0,0 +1,11 @@ +#include "MultiCommException.hxx" + +MultiCommException::MultiCommException(const char *message) +{ + _message=message; +} + +const char *MultiCommException::what() const +{ + return _message.c_str(); +} diff --git a/src/Communication/MultiCommException.hxx b/src/Communication/MultiCommException.hxx new file mode 100644 index 000000000..0f5466c86 --- /dev/null +++ b/src/Communication/MultiCommException.hxx @@ -0,0 +1,17 @@ +#ifndef _MULTICOMMEXCEPTION_HXX_ +#define _MULTICOMMEXCEPTION_HXX_ + +#include + +using namespace std; + +class MultiCommException { +private: + string _message; +public: + MultiCommException(const char *message); + const char *what() const; +}; + +#endif + diff --git a/src/Communication/Receiver.cxx b/src/Communication/Receiver.cxx new file mode 100644 index 000000000..dc24b200f --- /dev/null +++ b/src/Communication/Receiver.cxx @@ -0,0 +1,25 @@ +#include "Receiver.hxx" +#include + +/*! + return a deep copy of the array contained in the servant. + */ +void *Receiver::getLocalValue(long &size,SALOME_Sender_i* servant) +{ + const void *src=servant->getData(size); + long lgr=size*servant->getSizeOf(); + void *ret=new char[lgr]; + memcpy(ret,src,lgr); + return ret; + //return (void *)servant->getData(size); +} + +void *Receiver::getValue(long &size,SALOME::Sender_ptr sender) +{ + SALOME_Sender_i* data=SALOME_Sender_i::find(sender); + if(data) + return getLocalValue(size,data); + else + return getDistValue(size); +} + diff --git a/src/Communication/Receiver.hxx b/src/Communication/Receiver.hxx new file mode 100644 index 000000000..56c65ad7e --- /dev/null +++ b/src/Communication/Receiver.hxx @@ -0,0 +1,20 @@ +#ifndef _RECEIVER_HXX_ +#define _RECEIVER_HXX_ + +#include "SALOME_Comm_i.hxx" + +/*! Abstract class factorizing common methods of all the receivers. All of the receivers have to inheritate from it. + */ +class Receiver +{ +public: + virtual void *getValue(long &size)=0; + virtual ~Receiver() {} +protected: + virtual void *getValue(long &size,SALOME::Sender_ptr sender); + static inline void *getLocalValue(long &size,SALOME_Sender_i* servant); + virtual void *getDistValue(long &size)=0; +}; + +#endif + diff --git a/src/Communication/ReceiverFactory.cxx b/src/Communication/ReceiverFactory.cxx new file mode 100644 index 000000000..5f448902d --- /dev/null +++ b/src/Communication/ReceiverFactory.cxx @@ -0,0 +1,133 @@ +#include "ReceiverFactory.hxx" +#include "Receivers.hxx" + +#ifdef COMP_CORBA_DOUBLE +#define CorbaDNoCopyReceiver CorbaNCNoCopyReceiver +#define CorbaDWithCopyReceiver CorbaNCWithCopyReceiver +#else +#define CorbaDNoCopyReceiver CorbaWCNoCopyReceiver +#define CorbaDWithCopyReceiver CorbaWCWithCopyReceiver +#endif + +#ifdef COMP_CORBA_LONG +#define CorbaINoCopyReceiver CorbaNCNoCopyReceiver +#define CorbaIWithCopyReceiver CorbaNCWithCopyReceiver +#else +#define CorbaINoCopyReceiver CorbaWCNoCopyReceiver +#define CorbaIWithCopyReceiver CorbaWCWithCopyReceiver +#endif + +/*! + This method performs the transfert with the remote sender given. If it fails with this sender it tries with an another protocol (CORBA by default). + */ +void *ReceiverFactory::getValue(SALOME::Sender_ptr sender,long &size)throw(MultiCommException) +{ + void *ret; + try{ + ret=getValueOneShot(sender,size); + } + catch(MultiCommException&) + { + SALOME::Sender_ptr newSender=sender->buildOtherWithProtocol(SALOME::CORBA_); + sender->release(); + CORBA::release(sender); + ret=getValueOneShot(newSender,size); + } + return ret; +} + +/*! + This method performs the transfert with the remote sender given. If it fails an exception is thrown. + */ +void *ReceiverFactory::getValueOneShot(SALOME::Sender_ptr sender,long &size)throw(MultiCommException) +{ + SALOME::CorbaDoubleNCSender_ptr cncD_ptr; + SALOME::CorbaDoubleCSender_ptr cwcD_ptr; + SALOME::CorbaLongNCSender_ptr cncL_ptr; + SALOME::CorbaLongCSender_ptr cwcL_ptr; +#ifdef HAVE_MPI2 + SALOME::MPISender_ptr mpi_ptr=SALOME::MPISender::_narrow(sender); +#endif +#ifdef HAVE_SOCKET + SALOME::SocketSender_ptr sock_ptr=SALOME::SocketSender::_narrow(sender); +#endif + switch(sender->getTypeOfDataTransmitted()) + { + case SALOME::DOUBLE_: + cncD_ptr=SALOME::CorbaDoubleNCSender::_narrow(sender); + cwcD_ptr=SALOME::CorbaDoubleCSender::_narrow(sender); + if(!CORBA::is_nil(cncD_ptr)) + { + CORBA::release(sender); + CorbaDNoCopyReceiver rec(cncD_ptr); + return rec.getValue(size); + } + else if(!CORBA::is_nil(cwcD_ptr)) + { + CORBA::release(sender); + CorbaDWithCopyReceiver rec(cwcD_ptr); + return rec.getValue(size); + } +#ifdef HAVE_MPI2 + else if(!CORBA::is_nil(mpi_ptr)) + { + CORBA::release(sender); + MPIReceiver rec(mpi_ptr); + return rec.getValue(size); + } +#endif +#ifdef HAVE_SOCKET + else if(!CORBA::is_nil(sock_ptr)) + { + CORBA::release(sender); + SocketReceiver rec(sock_ptr); + return rec.getValue(size); + } +#endif + else + { + throw MultiCommException("Unknown sender protocol"); + return 0; + } + case SALOME::INT_: + cncL_ptr=SALOME::CorbaLongNCSender::_narrow(sender); + cwcL_ptr=SALOME::CorbaLongCSender::_narrow(sender); + if(!CORBA::is_nil(cncL_ptr)) + { + CORBA::release(sender); + CorbaINoCopyReceiver rec(cncL_ptr); + return rec.getValue(size); + } + else if(!CORBA::is_nil(cwcL_ptr)) + { + CORBA::release(sender); + CorbaIWithCopyReceiver rec(cwcL_ptr); + return rec.getValue(size); + } +#ifdef HAVE_MPI2 + else if(!CORBA::is_nil(mpi_ptr)) + { + CORBA::release(sender); + MPIReceiver rec(mpi_ptr); + return rec.getValue(size); + } +#endif +#ifdef HAVE_SOCKET + else if(!CORBA::is_nil(sock_ptr)) + { + CORBA::release(sender); + SocketReceiver rec(sock_ptr); + return rec.getValue(size); + } +#endif + else + { + throw MultiCommException("Unknown sender protocol"); + return 0; + } + default: + throw MultiCommException("unknown type of data transfered"); + return 0; + } +} + diff --git a/src/Communication/ReceiverFactory.hxx b/src/Communication/ReceiverFactory.hxx new file mode 100644 index 000000000..e5072c999 --- /dev/null +++ b/src/Communication/ReceiverFactory.hxx @@ -0,0 +1,21 @@ +#ifndef _RECEIVERFACTORY_HXX_ +#define _RECEIVERFACTORY_HXX_ + +#include +#include CORBA_SERVER_HEADER(SALOME_Comm) +#include "MultiCommException.hxx" + +/*! + This class internally builds a receiver associated with the sender given. It also performs transfert completely and clean up the objects. + This is the only class used client side of an array. + */ +class ReceiverFactory +{ +public: + static void *getValue(SALOME::Sender_ptr sender,long &size)throw(MultiCommException); +private: + static void *getValueOneShot(SALOME::Sender_ptr sender,long &size)throw(MultiCommException); +}; + +#endif + diff --git a/src/Communication/Receivers.cxx b/src/Communication/Receivers.cxx new file mode 100644 index 000000000..04851c461 --- /dev/null +++ b/src/Communication/Receivers.cxx @@ -0,0 +1,385 @@ +#include "poa.h" +#include "utilities.h" + +#define TAILLE_SPLIT 100000 +#define TIMEOUT 20 + +template +CorbaNCNoCopyReceiver::CorbaNCNoCopyReceiver(CorbaSender mySender):_mySender(mySender){ +} + +template +CorbaNCNoCopyReceiver::~CorbaNCNoCopyReceiver(){ + _mySender->release(); + CORBA::release(_mySender); +} + +template +void *CorbaNCNoCopyReceiver::getDistValue(long &size) +{ + TSeqCorba seq=_mySender->send(); + size=seq->length(); + return seq->get_buffer(1); +} + +template +void *CorbaNCNoCopyReceiver::getValue(long &size) +{ + return Receiver::getValue(size,_mySender); +} + +template +CorbaNCWithCopyReceiver::CorbaNCWithCopyReceiver(CorbaSender mySender):_mySender(mySender){ +} + +template +CorbaNCWithCopyReceiver::~CorbaNCWithCopyReceiver(){ + _mySender->release(); + CORBA::release(_mySender); +} + +template +void *CorbaNCWithCopyReceiver::getDistValue(long &size){ + size=_mySender->getSize(); + long n; + T *ret=new T[size]; + T *iter=ret; + for(long i=0;iTAILLE_SPLIT) + n=TAILLE_SPLIT; + else + n=size-i; + TSeqCorba seq=_mySender->sendPart(i,n); + T *seqd=(T *)seq->get_buffer(0); + for(long j=0;j +void *CorbaNCWithCopyReceiver::getValue(long &size) +{ + return Receiver::getValue(size,_mySender); +} + +template +CorbaWCNoCopyReceiver::CorbaWCNoCopyReceiver(CorbaSender mySender):_mySender(mySender){ +} + +template +CorbaWCNoCopyReceiver::~CorbaWCNoCopyReceiver(){ + _mySender->release(); + CORBA::release(_mySender); +} + +template +void *CorbaWCNoCopyReceiver::getDistValue(long &size){ + size=_mySender->getSize(); + long n; + T *ret=new T[size]; + T *iter=ret; + for(long i=0;iTAILLE_SPLIT) + n=TAILLE_SPLIT; + else + n=size-i; + TSeqCorba seq=_mySender->sendPart(i,n); + TCorba *seqd=seq->get_buffer(0); + for(long j=0;j +void *CorbaWCNoCopyReceiver::getValue(long &size) +{ + return Receiver::getValue(size,_mySender); +} + +template +CorbaWCWithCopyReceiver::CorbaWCWithCopyReceiver(CorbaSender mySender):_mySender(mySender){ +} + +template +CorbaWCWithCopyReceiver::~CorbaWCWithCopyReceiver(){ + _mySender->release(); + CORBA::release(_mySender); +} + +template +void *CorbaWCWithCopyReceiver::getDistValue(long &size){ + size=_mySender->getSize(); + long n; + T *ret=new T[size]; + T *iter=ret; + for(long i=0;iTAILLE_SPLIT) + n=TAILLE_SPLIT; + else + n=size-i; + TSeqCorba seq=_mySender->sendPart(i,n); + TCorba *seqd=seq->get_buffer(0); + for(long j=0;j +void *CorbaWCWithCopyReceiver::getValue(long &size) +{ + return Receiver::getValue(size,_mySender); +} + +#ifdef HAVE_MPI2 + +template +MPIReceiver::MPIReceiver(SALOME::MPISender_ptr mySender):_mySender(mySender){ +} + +template +MPIReceiver::~MPIReceiver(){ + _mySender->release(); + CORBA::release(_mySender); +} + +template +void *MPIReceiver::getDistValue(long &size){ + int i=0; + int myproc; + int sproc; + MPI_Status status; + MPI_Comm com; + char port_name_clt [MPI_MAX_PORT_NAME]; + float telps, tuser, tsys, tcpu; + T *_v; + long _n; + + + CORBA::Any a; + MPI_Comm_rank(MPI_COMM_WORLD, &myproc); + SALOME::MPISender::param_var p =_mySender->getParam(); + _mySender->send(); + sproc = p->myproc; + MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_RETURN); + while ( i != TIMEOUT && MPI_Lookup_name((char*)p->service,MPI_INFO_NULL,port_name_clt) != MPI_SUCCESS) { + i++; + } + MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_ARE_FATAL); + if ( i == TIMEOUT ) { + MPI_Finalize(); + exit(-1); + } + else{ + // Connect to service, get the inter-communicator server + // Attention MPI_Comm_connect est un appel collectif : + // - Si lancement mpirun -c n -----> uniquement MPI_COMM_SELF fonctionne + // - Si lancement client_server&client_server ----> MPI_COMM_WORLD fonctionne + + // TIMEOUT is inefficient since MPI_Comm_Connect doesn't return if we asked for + // a service that has been unpublished ! + MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_RETURN); + i = 0; + while ( i != TIMEOUT && MPI_Comm_connect(port_name_clt, MPI_INFO_NULL, 0, MPI_COMM_WORLD, &com)!=MPI_SUCCESS ) { + i++; + } + MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_ARE_FATAL); + if ( i == TIMEOUT ) { + MPI_Finalize(); + exit(-1); + } + } + MPI_Recv( &_n, 1, MPI_LONG, sproc,p->tag1,com,&status); + _v = new T[_n]; + MPI_Recv( _v, _n, T2, sproc,p->tag2,com,&status); + _mySender->close(p); + MPI_Comm_disconnect( &com ); + size=_n; + return _v; +} + +template +void *MPIReceiver::getValue(long &size) +{ + return Receiver::getValue(size,_mySender); +} + +#endif + +#ifdef HAVE_SOCKET +#include +#include +#include +#include +#include +#include + +template +SocketReceiver::SocketReceiver(SALOME::SocketSender_ptr mySender) : _mySender(mySender) +{ + _clientSockfd = -1; + _senderDestruc=true; +} + +template +SocketReceiver::~SocketReceiver() +{ + if(_senderDestruc) + { + _mySender->release(); + CORBA::release(_mySender); + } +} + +template +void *SocketReceiver::getValue(long &size) +{ + return Receiver::getValue(size,_mySender); +} + +template +void* SocketReceiver::getDistValue(long &size) +{ + int n=0, m; + T *v; + + try{ + initCom(); + + SALOME::SocketSender::param_var p = _mySender->getParam(); + + size = p->lend - p->lstart + 1; + v = new T[size]; + + connectCom(p->internet_address, p->myport); + + _mySender->send(); + + while( n < size*sizeof(T) ){ + m = read(_clientSockfd, (char*)v+n, size*sizeof(T)-n); + if( m < 0 ){ + closeCom(); + delete [] v; + SALOME::ExceptionStruct es; + es.type = SALOME::COMM; + es.text = "error read Socket exception"; + throw SALOME::SALOME_Exception(es); + } + n += m; + } + + _mySender->endOfCom(); + closeCom(); + } + catch(SALOME::SALOME_Exception &ex){ + if( ex.details.type == SALOME::COMM ) + { + _senderDestruc=false; + cout << ex.details.text << endl; + throw MultiCommException("Unknown sender protocol"); + } + else + throw ex; + } + + return v; +} + +template +void SocketReceiver::initCom() +{ + try{ + _mySender->initCom(); + + /* Ouverture de la socket */ + _clientSockfd = socket(AF_INET, SOCK_STREAM, 0); + if (_clientSockfd < 0) { + closeCom(); + SALOME::ExceptionStruct es; + es.type = SALOME::COMM; + es.text = "error Socket exception"; + throw SALOME::SALOME_Exception(es); + } + } + catch(SALOME::SALOME_Exception &ex){ + if( ex.details.type == SALOME::COMM ) + { + _senderDestruc=false; + cout << ex.details.text << endl; + throw MultiCommException("Unknown sender protocol"); + } + else + throw ex; + } + +} + +template +void SocketReceiver::connectCom(const char *dest_address, int port) +{ + struct sockaddr_in serv_addr; + struct hostent * server; + SALOME::ExceptionStruct es; + + try{ + /* reception of the host structure on the remote process */ + server = gethostbyname(dest_address); + if( server == NULL ) { + closeCom(); + es.type = SALOME::COMM; + es.text = "error unknown host Socket exception"; + _senderDestruc=false; + throw SALOME::SALOME_Exception(es); + } + + /* Initialisation of the socket structure */ + bzero((char*)&serv_addr,sizeof(serv_addr)); + serv_addr.sin_family = AF_INET; + serv_addr.sin_addr.s_addr = INADDR_ANY; + bcopy((char *)server->h_addr, + (char *)&serv_addr.sin_addr.s_addr, + server->h_length); + serv_addr.sin_port = htons(port); + + if( connect(_clientSockfd, (struct sockaddr *) & serv_addr, sizeof(struct sockaddr)) < 0 ){ + closeCom(); + es.type = SALOME::COMM; + es.text = "error connect Socket exception"; + _senderDestruc=false; + throw SALOME::SALOME_Exception(es); + } + + _mySender->acceptCom(); + + } + catch(SALOME::SALOME_Exception &ex){ + if( ex.details.type == SALOME::COMM ) + { + _senderDestruc=false; + cout << ex.details.text << endl; + throw MultiCommException("Unknown sender protocol"); + } + else + throw ex; + } + +} + + +template +void SocketReceiver::closeCom() +{ + _mySender->closeCom(); + if( _clientSockfd >= 0 ){ + close(_clientSockfd); + _clientSockfd = -1; + } + +} + +#endif diff --git a/src/Communication/Receivers.hxx b/src/Communication/Receivers.hxx new file mode 100644 index 000000000..917a32c1a --- /dev/null +++ b/src/Communication/Receivers.hxx @@ -0,0 +1,118 @@ +#ifndef _RECEIVERS_HXX_ +#define _RECEIVERS_HXX_ + +#include "SALOME_Comm_i.hxx" +#include "Receiver.hxx" +#ifdef HAVE_MPI2 +#include "mpi.h" +#endif + +/*! + Receiver used for transfert with CORBA when no copy is required remotely and locally. + */ +template +class CorbaNCNoCopyReceiver : public Receiver +{ +private: + CorbaSender _mySender; +public: + CorbaNCNoCopyReceiver(CorbaSender mySender); + ~CorbaNCNoCopyReceiver(); + void *getValue(long &size); +private: + void *getDistValue(long &size); +}; + +/*! + Receiver used for transfert with CORBA when copy is not required remotely but required locally. + */ +template +class CorbaNCWithCopyReceiver : public Receiver +{ +private: + CorbaSender _mySender; +public: + CorbaNCWithCopyReceiver(CorbaSender mySender); + ~CorbaNCWithCopyReceiver(); + void *getValue(long &size); +private: + void *getDistValue(long &size); +}; + +/*! + Receiver used for transfert with CORBA when copy is required remotely but not required locally. + */ +template +class CorbaWCNoCopyReceiver : public Receiver +{ +private: + CorbaSender _mySender; +public: + CorbaWCNoCopyReceiver(CorbaSender mySender); + ~CorbaWCNoCopyReceiver(); + void *getValue(long &size); +private: + void *getDistValue(long &size); +}; + +/*! + Receiver used for transfert with CORBA when copy is required both remotely and locally. + */ +template +class CorbaWCWithCopyReceiver : public Receiver +{ +private: + CorbaSender _mySender; +public: + CorbaWCWithCopyReceiver(CorbaSender mySender); + ~CorbaWCWithCopyReceiver(); + void *getValue(long &size); +private: + void *getDistValue(long &size); +}; + +#ifdef HAVE_MPI2 +/*! + Receiver for MPI transfert. + */ +template +class MPIReceiver : public Receiver +{ +private: + SALOME::MPISender_ptr _mySender; +public: + MPIReceiver(SALOME::MPISender_ptr mySender); + ~MPIReceiver(); + void *getValue(long &size); +private: + void *getDistValue(long &size); +}; +#endif + +#ifdef HAVE_SOCKET +/*! + Receiver for transfert with sockets. + */ +template +class SocketReceiver : public Receiver +{ +private: + SALOME::SocketSender_ptr _mySender; + int _clientSockfd; + bool _senderDestruc; +public: + SocketReceiver(SALOME::SocketSender_ptr mySender); + ~SocketReceiver(); + void *getValue(long &size); +private: + void *getDistValue(long &size); + void initCom(); + void connectCom(const char *, int); + void closeCom(); +}; +#endif + +#include "Receivers.cxx" + +#endif + diff --git a/src/Communication/SALOMEMultiComm.cxx b/src/Communication/SALOMEMultiComm.cxx new file mode 100644 index 000000000..71c4135e6 --- /dev/null +++ b/src/Communication/SALOMEMultiComm.cxx @@ -0,0 +1,21 @@ +#include "SALOMEMultiComm.hxx" + +SALOMEMultiComm::SALOMEMultiComm():_type(SALOME::CORBA_) +{ +} + +SALOMEMultiComm::SALOMEMultiComm(SALOME::TypeOfCommunication type):_type(type) +{ +} + +void SALOMEMultiComm::setProtocol(SALOME::TypeOfCommunication type) +{ + _type=type; +} + +SALOME::TypeOfCommunication SALOMEMultiComm::getProtocol() const +{ + return _type; +} + + diff --git a/src/Communication/SALOMEMultiComm.hxx b/src/Communication/SALOMEMultiComm.hxx new file mode 100644 index 000000000..2537dc042 --- /dev/null +++ b/src/Communication/SALOMEMultiComm.hxx @@ -0,0 +1,21 @@ +#ifndef _SALOMEMULTICOMM_HXX_ +#define _SALOMEMULTICOMM_HXX_ + +#include +#include CORBA_SERVER_HEADER(SALOME_Comm) + +/*! + Class is designed to ease the use of multi communication.\n + Simply inherite from it your servant class you want to emit data with senders. + */ +class SALOMEMultiComm : public virtual POA_SALOME::MultiCommClass { +protected: + SALOME::TypeOfCommunication _type; +public: + SALOMEMultiComm(); + SALOMEMultiComm(SALOME::TypeOfCommunication type); + virtual void setProtocol(SALOME::TypeOfCommunication type); + SALOME::TypeOfCommunication getProtocol() const; +}; + +#endif diff --git a/src/Communication/SALOME_Comm_i.cxx b/src/Communication/SALOME_Comm_i.cxx new file mode 100644 index 000000000..74ca7b244 --- /dev/null +++ b/src/Communication/SALOME_Comm_i.cxx @@ -0,0 +1,440 @@ +#include "SALOME_Comm_i.hxx" +#include "poa.h" +#include "omnithread.h" +#include "Utils_SINGLETON.hxx" +#include "Utils_ORB_INIT.hxx" +#include "utilities.h" + +#include "SenderFactory.hxx" + +CORBA::ORB_var &getGlobalORB(){ + ORB_INIT &init = *SINGLETON_::Instance(); + CORBA::ORB_var &orb = init(0,0); + return orb; +} + +/*! Return the C++ data associated to the array to transmit. + Used when sender and receiver are collocalized. + */ +const void *SALOME_Sender_i::getData(long &size) const{ + size=_lgrTabToSend; + return _tabToSend; +} + +/*! Return the sizeof() of each component of the generic array + */ +int SALOME_Sender_i::getSizeOf() const { + return _sizeOf; +} + +/*! Unique constructor */ +SALOME_Sender_i::SALOME_Sender_i(SALOME::TypeOfDataTransmitted type,const void *tabToSend,long lgrTabToSend,int sizeOf):_tabToSend(tabToSend),_lgrTabToSend(lgrTabToSend),_type(type),_sizeOf(sizeOf){ +} + +/*! Method to establish if the CORBA object refered by pCorba is collocalised.\n + If it is, the pointer to the servant that incarnates the CORBA object is returned. +*/ +SALOME_Sender_i *SALOME_Sender_i::find(SALOME::Sender_ptr pCorba){ + PortableServer::ServantBase *ret; + try { + ret=PortableServer::POA::_the_root_poa()->reference_to_servant(pCorba); + } + catch(...){ + return 0; + } + ret->_remove_ref(); + return dynamic_cast(ret); +} + +/*! Method for the remote destroy of the current servant. This method is used by the receiver to destroy the sender when the transfert is complete. + */ +void SALOME_Sender_i::release() +{ + PortableServer::ObjectId_var oid = _default_POA()->servant_to_id(this); + _default_POA()->deactivate_object(oid); + _remove_ref(); +} + +/*! Return the type of the element that compose the array. Used by receiverfactory to build the correct receiver. + */ +SALOME::TypeOfDataTransmitted SALOME_Sender_i::getTypeOfDataTransmitted() +{ + return _type; +} + +/*! Return a new sender of the same array but with an another protocol. + */ +SALOME::Sender_ptr SALOME_Sender_i::buildOtherWithProtocol(SALOME::TypeOfCommunication type) +{ + return SenderFactory::buildSender(type,this); +} + +SALOME_CorbaDoubleNCSender_i::SALOME_CorbaDoubleNCSender_i(const double *tabToSend,long lgrTabToSend):SALOME_Sender_i(SALOME::DOUBLE_,tabToSend,lgrTabToSend,sizeof(double)){ +} + +SALOME_CorbaDoubleNCSender_i::~SALOME_CorbaDoubleNCSender_i(){ +} + +CORBA::ULong SALOME_CorbaDoubleNCSender_i::getSize(){ + CORBA::ULong ret=_lgrTabToSend; + return ret; +} + +SALOME::vectorOfDouble* SALOME_CorbaDoubleNCSender_i::sendPart(CORBA::ULong offset, CORBA::ULong length){ + SALOME::vectorOfDouble_var c1 = new SALOME::vectorOfDouble(length,length,(CORBA::Double *)((double *)_tabToSend+(long)offset),0); + return c1._retn(); +} + +SALOME::vectorOfDouble* SALOME_CorbaDoubleNCSender_i::send(){ + SALOME::vectorOfDouble_var c1 = new SALOME::vectorOfDouble(_lgrTabToSend,_lgrTabToSend,(CORBA::Double *)_tabToSend,0); + return c1._retn(); +} + +SALOME_CorbaDoubleCSender_i::SALOME_CorbaDoubleCSender_i(const double *tabToSend,long lgrTabToSend):SALOME_Sender_i(SALOME::DOUBLE_,tabToSend,lgrTabToSend,sizeof(double)){ +} + +SALOME_CorbaDoubleCSender_i::~SALOME_CorbaDoubleCSender_i(){ +} + +CORBA::ULong SALOME_CorbaDoubleCSender_i::getSize(){ + CORBA::ULong ret=_lgrTabToSend; + return ret; +} + +SALOME::vectorOfDouble* SALOME_CorbaDoubleCSender_i::sendPart(CORBA::ULong offset, CORBA::ULong length){ + SALOME::vectorOfDouble_var c1 = new SALOME::vectorOfDouble; + c1->length(length); + for (long i=0; ilength(length); + for (long i=0; imyproc = _cproc; + p->tag1 = _tag1; + _tag1Inst=_tag1; + p->tag2 =_tag2; + _tag2Inst=_tag2; + std::string service("toto_"); + sprintf(stag,"%d_",_tag1); + service += stag; + sprintf(stag,"%d_",p->tag2); + service += stag; + p->service = CORBA::string_dup(service.c_str()); + MPI_Open_port(MPI_INFO_NULL, _portName); + MPI_Errhandler_set(MPI_COMM_WORLD,MPI_ERRORS_RETURN); + while ( i != TIMEOUT && MPI_Publish_name((char*)service.c_str(),MPI_INFO_NULL,_portName) != MPI_SUCCESS) { + i++; + } + MPI_Errhandler_set(MPI_COMM_WORLD,MPI_ERRORS_ARE_FATAL); + if ( i == TIMEOUT ) { + MPI_Close_port(_portName); + MPI_Finalize(); + exit(-1); + } + _tag1 += 2; + _tag2 += 2; + return p._retn(); +} + +void SALOME_MPISender_i::send() +{ + _argsForThr=new (void *)[8]; + _argsForThr[0]=_portName; + _argsForThr[1]=&_lgrTabToSend; + _argsForThr[2]=(void *)_tabToSend; + _argsForThr[3]=&_cproc; + _argsForThr[4]=&_tag1Inst; + _argsForThr[5]=&_tag2Inst; + _argsForThr[6]=&_com; + _argsForThr[7]=&_type; + + _newThr=new omni_thread(SALOME_MPISender_i::myThread,_argsForThr); + _newThr->start(); +} + +void* SALOME_MPISender_i::myThread(void *args) +{ + void **argsTab=(void **)args; + long *lgrTabToSend=(long *)argsTab[1]; + int *cproc=(int *)argsTab[3]; + int *tag1=(int *)argsTab[4]; + int *tag2=(int *)argsTab[5]; + MPI_Comm *com=(MPI_Comm *)argsTab[6]; + SALOME::TypeOfDataTransmitted *type=(SALOME::TypeOfDataTransmitted *)argsTab[7]; + + MPI_Comm_accept((char *)argsTab[0],MPI_INFO_NULL,0,MPI_COMM_SELF,com); + MPI_Send(lgrTabToSend,1,MPI_LONG,*cproc,*tag1,*com); + switch(*type) + { + case SALOME::DOUBLE_: + MPI_Send(argsTab[2],*lgrTabToSend,MPI_DOUBLE,*cproc,*tag2,*com); + break; + case SALOME::INT_: + MPI_Send(argsTab[2],*lgrTabToSend,MPI_INT,*cproc,*tag2,*com); + } + omni_thread::exit(); +} + +void SALOME_MPISender_i::close(const SALOME::MPISender::param& p) +{ + std::string service(p.service); + const char *st=p.service; + void *r; + _newThr->join(&r); + MPI_Comm_free(&_com); + MPI_Unpublish_name((char *)service.c_str(),MPI_INFO_NULL,_portName); + MPI_Close_port(_portName); + delete [] _argsForThr; +} + +#endif + +#ifdef HAVE_SOCKET + +#include +#include +#include +#include +#include +#include + +SALOME_SocketSender_i::SALOME_SocketSender_i(SALOME::TypeOfDataTransmitted type,const void *tabToSend,long lgrTabToSend,int sizeOf):SALOME_Sender_i(type,tabToSend,lgrTabToSend,sizeOf){ + _IPAddress = inetAddress(); + _serverSockfd = -1; + _clientSockfd = -1; +} + +SALOME_SocketSender_i::~SALOME_SocketSender_i(){ +} + +std::string SALOME_SocketSender_i::inetAddress() +{ + char s[256]; + char t[INET_ADDRSTRLEN+1]; + struct hostent *host; + struct in_addr saddr; + + gethostname(s, 255); + + *t = '\0'; + + saddr.s_addr = inet_addr(s); + if (saddr.s_addr != -1) + inet_ntop(AF_INET, &saddr, t, INET_ADDRSTRLEN); + else { + host = gethostbyname(s); + if (host != NULL) + inet_ntop(AF_INET, (struct in_addr *) *host->h_addr_list, + t, INET_ADDRSTRLEN); + } + return std::string(t); +} + +SALOME::SocketSender::param * SALOME_SocketSender_i::getParam() +{ + + SALOME::SocketSender::param_var p = new SALOME::SocketSender::param; + + p->lstart = 0; + p->lend = _lgrTabToSend - 1; + p->myport = _port; + p->internet_address = CORBA::string_dup(_IPAddress.c_str()); + + return p._retn(); +} + +void SALOME_SocketSender_i::send() +{ + _argsForThr=new (void *)[5]; + _argsForThr[0]=&_serverSockfd; + _argsForThr[1]=&_clientSockfd; + _argsForThr[2]=&_lgrTabToSend; + _argsForThr[3]=(void *)_tabToSend; + _argsForThr[4]=&_errorFlag; + + _newThr=new omni_thread(SALOME_SocketSender_i::myThread,_argsForThr); + _newThr->start(); +} + +void* SALOME_SocketSender_i::myThread(void *args) +{ + int n=0, m; + void **argsTab=(void **)args; + int *serverSockfd=(int *)argsTab[0]; + int *clientSockfd=(int *)argsTab[1]; + long *lgrTabToSend=(long *)argsTab[2]; + void *tabToSend=argsTab[3]; + bool *errorFlag=(bool*)argsTab[4]; + + *errorFlag = false; + while( n < *lgrTabToSend*sizeof(double) ){ + m = write(*clientSockfd, (char*)tabToSend+n, *lgrTabToSend*sizeof(double)-n); + if( m < 0 ){ + if( *clientSockfd >= 0 ){ + ::close(*clientSockfd); + *clientSockfd = -1; + } + if( *serverSockfd >= 0 ){ + ::close(*serverSockfd); + *serverSockfd = -1; + } + *errorFlag = true; + } + n += m; + } +} + +void SALOME_SocketSender_i::initCom() throw(SALOME::SALOME_Exception) +{ + struct sockaddr_in serv_addr; + socklen_t n; + SALOME::ExceptionStruct es; + + /* Ouverture de la socket */ + _serverSockfd = socket(AF_INET , SOCK_STREAM , 0); + if(_serverSockfd < 0) { + es.type = SALOME::COMM; + es.text = "error Socket exception"; + throw SALOME::SALOME_Exception(es); + } + /* Socket structure initialisation*/ + bzero((char*)&serv_addr,sizeof(serv_addr)); + serv_addr.sin_family = AF_INET; + serv_addr.sin_port = 0; /* asking for a free port */ + serv_addr.sin_addr.s_addr = INADDR_ANY; + + /* Association of socket with a port */ + if( ::bind(_serverSockfd, (struct sockaddr *) & serv_addr, + sizeof(struct sockaddr)) < 0 ) { + closeCom(); + es.type = SALOME::COMM; + es.text = "error bind Socket exception"; + throw SALOME::SALOME_Exception(es); + } + /* Listening to the allocated port */ + if( listen(_serverSockfd, 10) < 0 ) { + closeCom(); + es.type = SALOME::COMM; + es.text = "error listen Socket exception"; + throw SALOME::SALOME_Exception(es); + } + /* Retrieving port number*/ + if( getsockname(_serverSockfd, (struct sockaddr *) & serv_addr, &n) < 0 ){ + closeCom(); + es.type = SALOME::COMM; + es.text = "error getName Socket exception"; + throw SALOME::SALOME_Exception(es); + } + _port = htons(serv_addr.sin_port); + SCRUTE(_port); +} + +void SALOME_SocketSender_i::acceptCom() throw(SALOME::SALOME_Exception) +{ + socklen_t sin_size; + int new_fd; + struct sockaddr_in client_addr; + SALOME::ExceptionStruct es; + + sin_size = sizeof(struct sockaddr_in); + + _clientSockfd = accept(_serverSockfd, (struct sockaddr *)&client_addr, &sin_size); + if( _clientSockfd < 0 ){ + closeCom(); + es.type = SALOME::COMM; + es.text = "error accept Socket exception"; + throw SALOME::SALOME_Exception(es); + } +} + +void SALOME_SocketSender_i::closeCom() +{ + if( _clientSockfd >= 0 ){ + ::close(_clientSockfd); + _clientSockfd = -1; + } + if( _serverSockfd >= 0 ){ + ::close(_serverSockfd); + _serverSockfd = -1; + } + +} + +void SALOME_SocketSender_i::endOfCom() +{ + void *r; + _newThr->join(&r); + if(_errorFlag) + { + SALOME::ExceptionStruct es; + es.type = SALOME::COMM; + es.text = "error write Socket exception"; + throw SALOME::SALOME_Exception(es); + } + delete [] _argsForThr; +} + +#endif diff --git a/src/Communication/SALOME_Comm_i.hxx b/src/Communication/SALOME_Comm_i.hxx new file mode 100644 index 000000000..6a8c23ba9 --- /dev/null +++ b/src/Communication/SALOME_Comm_i.hxx @@ -0,0 +1,162 @@ +#ifndef _SALOME_COMM_I_HXX_ +#define _SALOME_COMM_I_HXX_ + +#include +#include +#include +#include CORBA_SERVER_HEADER(SALOME_Comm) +#ifdef HAVE_MPI2 +#include "mpi.h" +#endif + +#define TIMEOUT 20 + +using namespace std; + +/*! + Generic servant class for senders that factorizes all the common methods and attributes necessary to senders. + All servant classes for senders have to inheritate from it. + */ +class SALOME_Sender_i : public virtual POA_SALOME::Sender, + public PortableServer::RefCountServantBase { +protected: + /*! Pointer to the generic array to transmit*/ + const void *_tabToSend; + /*! Length of the generic array to transmit*/ + long _lgrTabToSend; + /*! it represents the sizeof() of each component of the generic array:\n + Practically in terms of bytes the size to be transmitted is _lgrTabToSend*_sizeOf + */ + int _sizeOf; + /*! Type the component of the array*/ + SALOME::TypeOfDataTransmitted _type; + + SALOME_Sender_i(SALOME::TypeOfDataTransmitted type,const void *tabToSend,long lgrTabToSend,int sizeOf); +public: + const void *getData(long &size) const; + int getSizeOf() const; + void release(); + SALOME::TypeOfDataTransmitted getTypeOfDataTransmitted(); + SALOME::Sender_ptr buildOtherWithProtocol(SALOME::TypeOfCommunication type); + static SALOME_Sender_i *find(SALOME::Sender_ptr pCorba); +}; + +/*! Servant class for CORBA sender for double* when no copy of array _tabToSend is required, that is to say double and CORBA::Double are binary equal. + */ +class SALOME_CorbaDoubleNCSender_i : public POA_SALOME::CorbaDoubleNCSender, + public SALOME_Sender_i +{ +public: + SALOME_CorbaDoubleNCSender_i(const double *tabToSend,long lgrTabToSend); + ~SALOME_CorbaDoubleNCSender_i(); + CORBA::ULong getSize(); + SALOME::vectorOfDouble* sendPart(CORBA::ULong offset, CORBA::ULong length); + SALOME::vectorOfDouble* send(); +}; + +/*! Servant class for CORBA sender for double* when copy of array _tabToSend is required, that is to say double and CORBA::Double are NOT binary equal. + */ +class SALOME_CorbaDoubleCSender_i : public POA_SALOME::CorbaDoubleCSender, + public SALOME_Sender_i +{ +public: + SALOME_CorbaDoubleCSender_i(const double *tabToSend,const long lgrTabToSend); + ~SALOME_CorbaDoubleCSender_i(); + CORBA::ULong getSize(); + SALOME::vectorOfDouble* sendPart(CORBA::ULong offset, CORBA::ULong length); +}; + +/*! Servant class for CORBA sender for int* when no copy of array _tabToSend is required, that is to say int and CORBA::Long are binary equal. + */ +class SALOME_CorbaLongNCSender_i : public POA_SALOME::CorbaLongNCSender, + public SALOME_Sender_i +{ +public: + SALOME_CorbaLongNCSender_i(const int *tabToSend,const long lgrTabToSend); + ~SALOME_CorbaLongNCSender_i(); + CORBA::ULong getSize(); + SALOME::vectorOfLong* sendPart(CORBA::ULong offset, CORBA::ULong length); + SALOME::vectorOfLong* send(); +}; + +/*! Servant class for CORBA sender for int* when copy of array _tabToSend is required, that is to say int and CORBA::Long are NOT binary equal. + */ +class SALOME_CorbaLongCSender_i : public POA_SALOME::CorbaLongCSender, + public SALOME_Sender_i +{ +public: + SALOME_CorbaLongCSender_i(const int *tabToSend,long lgrTabToSend); + ~SALOME_CorbaLongCSender_i(); + CORBA::ULong getSize(); + SALOME::vectorOfLong* sendPart(CORBA::ULong offset, CORBA::ULong length); + SALOME::CorbaLongCSender_ptr _this(); +}; + +#ifdef HAVE_MPI2 + +/*! Servant class of sender using MPI2. + */ +class SALOME_MPISender_i : public POA_SALOME::MPISender, + public SALOME_Sender_i +{ +private: + static unsigned long _tag1; + static unsigned long _tag2; + /*! Name of the port opened*/ + char *_portName; + int _cproc; + /*! Tag 1 that identifies the transfert*/ + int _tag1Inst; + /*! Tag 2 that identifies the transfert*/ + int _tag2Inst; + /*! MPI communicator*/ + MPI_Comm _com; + /*! Array of pointer for asynchronous invocation with omnithread*/ + void **_argsForThr; + /*! Pointer to thread created on asynchronous invocation*/ + omni_thread *_newThr; +public: + SALOME_MPISender_i(SALOME::TypeOfDataTransmitted type,const void *tabToSend,long lgrTabToSend,int sizeOf); + ~SALOME_MPISender_i(); + SALOME::MPISender::param* getParam(); + void send(); + void close(const SALOME::MPISender::param& p); +private: + static void* myThread(void *args); +}; + +#endif + +#ifdef HAVE_SOCKET + +/*! Servant class of sender using Sockets. + */ +class SALOME_SocketSender_i : public POA_SALOME::SocketSender, + public SALOME_Sender_i +{ +private: + int _serverSockfd; + int _clientSockfd; + int _port; + std::string _IPAddress; + void **_argsForThr; + omni_thread *_newThr; + bool _errorFlag; +public: + SALOME_SocketSender_i(SALOME::TypeOfDataTransmitted type,const void *tabToSend,long lgrTabToSend,int sizeOf); + ~SALOME_SocketSender_i(); + SALOME::SocketSender::param* getParam(); + void send(); + void initCom() throw(SALOME::SALOME_Exception); + void acceptCom() throw(SALOME::SALOME_Exception); + void endOfCom(); + void closeCom(); +private: + static void* myThread(void *args); + std::string inetAddress(); +}; + +#endif + +#endif + diff --git a/src/Communication/SenderFactory.cxx b/src/Communication/SenderFactory.cxx new file mode 100644 index 000000000..2e3fea4a0 --- /dev/null +++ b/src/Communication/SenderFactory.cxx @@ -0,0 +1,96 @@ +#include "SenderFactory.hxx" +#include "SALOMEMultiComm.hxx" +#include "SALOME_Comm_i.hxx" + +#ifdef COMP_CORBA_DOUBLE +#define SALOME_CorbaDoubleSender SALOME_CorbaDoubleNCSender_i +#else +#define SALOME_CorbaDoubleSender SALOME_CorbaDoubleCSender_i +#endif + +#ifdef COMP_CORBA_LONG +#define SALOME_CorbaLongSender SALOME_CorbaLongNCSender_i +#else +#define SALOME_CorbaLongSender SALOME_CorbaLongCSender_i +#endif + +SALOME::Sender_ptr SenderFactory::buildSender(SALOMEMultiComm &multiCommunicator,const double *tab,long lgr)throw(MultiCommException){ + switch(multiCommunicator.getProtocol()) + { + case SALOME::CORBA_: + { + SALOME_CorbaDoubleSender * retc=new SALOME_CorbaDoubleSender(tab,lgr); + return retc->_this(); + } +#ifdef HAVE_MPI2 + case SALOME::MPI_: + { + SALOME_MPISender_i* retm=new SALOME_MPISender_i(SALOME::DOUBLE_,tab,lgr,sizeof(double)); + return retm->_this(); + } +#endif +#ifdef HAVE_SOCKET + case SALOME::SOCKET_: + { + SALOME_SocketSender_i* rets=new SALOME_SocketSender_i(SALOME::DOUBLE_,tab,lgr,sizeof(double)); + return rets->_this(); + } +#endif + default: + { + multiCommunicator.setProtocol(SALOME::CORBA_); + SALOME_CorbaDoubleSender * retc=new SALOME_CorbaDoubleSender(tab,lgr); + return retc->_this(); + } +// throw MultiCommException("Communication protocol not implemented"); + } +} + +SALOME::Sender_ptr SenderFactory::buildSender(SALOMEMultiComm &multiCommunicator,const int *tab,long lgr)throw(MultiCommException){ + switch(multiCommunicator.getProtocol()) + { + case SALOME::CORBA_: + { + SALOME_CorbaLongSender * retc=new SALOME_CorbaLongSender(tab,lgr); + return retc->_this(); + } +#ifdef HAVE_MPI2 + case SALOME::MPI_: + { + SALOME_MPISender_i* retm=new SALOME_MPISender_i(SALOME::INT_,tab,lgr,sizeof(int)); + return retm->_this(); + } +#endif +#ifdef HAVE_SOCKET + case SALOME::SOCKET_: + { + SALOME_SocketSender_i* rets=new SALOME_SocketSender_i(SALOME::INT_,tab,lgr,sizeof(int)); + return rets->_this(); + } +#endif + default: + { + multiCommunicator.setProtocol(SALOME::CORBA_); + SALOME_CorbaLongSender * retc=new SALOME_CorbaLongSender(tab,lgr); + return retc->_this(); + } +// throw MultiCommException("Communication protocol not implemented"); + } + } + +SALOME::Sender_ptr SenderFactory::buildSender(SALOME::TypeOfCommunication NewType,SALOME_Sender_i *src) +{ + SALOMEMultiComm mc(NewType); + long n; + const void *data=src->getData(n); + switch(src->getTypeOfDataTransmitted()) + { + case SALOME::DOUBLE_: + return buildSender(mc,(const double *)data,n); + case SALOME::INT_: + return buildSender(mc,(const int *)data,n); + } + +} + + diff --git a/src/Communication/SenderFactory.hxx b/src/Communication/SenderFactory.hxx new file mode 100644 index 000000000..d85a842ba --- /dev/null +++ b/src/Communication/SenderFactory.hxx @@ -0,0 +1,24 @@ +#ifndef _SENDERFACTORY_HXX_ +#define _SENDERFACTORY_HXX_ + +#include "MultiCommException.hxx" +#include +#include CORBA_SERVER_HEADER(SALOME_Comm) + +class SALOMEMultiComm; + +class SALOME_Sender_i; + +/*! + This class implements the factory pattern of GoF by making a sender by giving an array and a communicator.It completely hides the type of sender from the user. + */ +class SenderFactory +{ +public: + static SALOME::Sender_ptr buildSender(SALOMEMultiComm &multiCommunicator,const double *tab,long lgr) throw(MultiCommException); + static SALOME::Sender_ptr buildSender(SALOMEMultiComm &multiCommunicator,const int *tab,long lgr) throw(MultiCommException); + static SALOME::Sender_ptr buildSender(SALOME::TypeOfCommunication NewType,SALOME_Sender_i *src); +}; + +#endif +