2 #include "SALOME_Comm_i.hxx"
4 #include "omnithread.h"
5 #include "Utils_SINGLETON.hxx"
6 #include "Utils_ORB_INIT.hxx"
9 #include "SenderFactory.hxx"
12 CORBA::ORB_var &getGlobalORB(){
13 ORB_INIT &init = *SINGLETON_<ORB_INIT>::Instance();
14 CORBA::ORB_var &orb = init(0,0);
18 /*! Return the C++ data associated to the array to transmit.
19 Used when sender and receiver are collocalized.
21 const void *SALOME_Sender_i::getData(long &size) const{
26 /*! Return the sizeof() of each component of the generic array
28 int SALOME_Sender_i::getSizeOf() const {
32 /*! Unique constructor */
33 SALOME_Sender_i::SALOME_Sender_i(SALOME::TypeOfDataTransmitted type,const void *tabToSend,long lgrTabToSend,int sizeOf):_tabToSend(tabToSend),_lgrTabToSend(lgrTabToSend),_type(type),_sizeOf(sizeOf){
36 /*! Method to establish if the CORBA object refered by pCorba is collocalised.\n
37 If it is, the pointer to the servant that incarnates the CORBA object is returned.
39 SALOME_Sender_i *SALOME_Sender_i::find(SALOME::Sender_ptr pCorba){
40 PortableServer::ServantBase *ret;
42 ret=PortableServer::POA::_the_root_poa()->reference_to_servant(pCorba);
48 return dynamic_cast<SALOME_Sender_i *>(ret);
51 /*! Method for the remote destroy of the current servant. This method is used by the receiver to destroy the sender when the transfert is complete.
53 void SALOME_Sender_i::release()
55 PortableServer::ObjectId_var oid = _default_POA()->servant_to_id(this);
56 _default_POA()->deactivate_object(oid);
60 /*! Return the type of the element that compose the array. Used by receiverfactory to build the correct receiver.
62 SALOME::TypeOfDataTransmitted SALOME_Sender_i::getTypeOfDataTransmitted()
67 /*! Return a new sender of the same array but with an another protocol.
69 SALOME::Sender_ptr SALOME_Sender_i::buildOtherWithProtocol(SALOME::TypeOfCommunication type)
71 return SenderFactory::buildSender(type,this);
74 SALOME_CorbaDoubleNCSender_i::SALOME_CorbaDoubleNCSender_i(const double *tabToSend,long lgrTabToSend):SALOME_Sender_i(SALOME::DOUBLE_,tabToSend,lgrTabToSend,sizeof(double)){
77 SALOME_CorbaDoubleNCSender_i::~SALOME_CorbaDoubleNCSender_i(){
80 CORBA::ULong SALOME_CorbaDoubleNCSender_i::getSize(){
81 CORBA::ULong ret=_lgrTabToSend;
85 SALOME::vectorOfDouble* SALOME_CorbaDoubleNCSender_i::sendPart(CORBA::ULong offset, CORBA::ULong length){
86 SALOME::vectorOfDouble_var c1 = new SALOME::vectorOfDouble(length,length,(CORBA::Double *)((double *)_tabToSend+(long)offset),0);
90 SALOME::vectorOfDouble* SALOME_CorbaDoubleNCSender_i::send(){
91 SALOME::vectorOfDouble_var c1 = new SALOME::vectorOfDouble(_lgrTabToSend,_lgrTabToSend,(CORBA::Double *)_tabToSend,0);
95 SALOME_CorbaDoubleCSender_i::SALOME_CorbaDoubleCSender_i(const double *tabToSend,long lgrTabToSend):SALOME_Sender_i(SALOME::DOUBLE_,tabToSend,lgrTabToSend,sizeof(double)){
98 SALOME_CorbaDoubleCSender_i::~SALOME_CorbaDoubleCSender_i(){
101 CORBA::ULong SALOME_CorbaDoubleCSender_i::getSize(){
102 CORBA::ULong ret=_lgrTabToSend;
106 SALOME::vectorOfDouble* SALOME_CorbaDoubleCSender_i::sendPart(CORBA::ULong offset, CORBA::ULong length){
107 SALOME::vectorOfDouble_var c1 = new SALOME::vectorOfDouble;
109 for (long i=0; i<length; i++)
110 c1[i] = ((double *)_tabToSend)[i+offset];
114 ////////////////////////
116 SALOME_CorbaLongNCSender_i::SALOME_CorbaLongNCSender_i(const int *tabToSend,long lgrTabToSend):SALOME_Sender_i(SALOME::INT_,tabToSend,lgrTabToSend,sizeof(int)){
119 SALOME_CorbaLongNCSender_i::~SALOME_CorbaLongNCSender_i(){
122 CORBA::ULong SALOME_CorbaLongNCSender_i::getSize(){
123 CORBA::ULong ret=_lgrTabToSend;
127 SALOME::vectorOfLong* SALOME_CorbaLongNCSender_i::sendPart(CORBA::ULong offset, CORBA::ULong length){
128 SALOME::vectorOfLong_var c1 = new SALOME::vectorOfLong(length,length,(CORBA::Long *)((long *)_tabToSend+(long)offset),0);
132 SALOME::vectorOfLong* SALOME_CorbaLongNCSender_i::send(){
133 SALOME::vectorOfLong_var c1 = new SALOME::vectorOfLong(_lgrTabToSend,_lgrTabToSend,(CORBA::Long *)_tabToSend,0);
137 SALOME_CorbaLongCSender_i::SALOME_CorbaLongCSender_i(const int *tabToSend,long lgrTabToSend):SALOME_Sender_i(SALOME::INT_,tabToSend,lgrTabToSend,sizeof(int)){
140 SALOME_CorbaLongCSender_i::~SALOME_CorbaLongCSender_i(){
143 CORBA::ULong SALOME_CorbaLongCSender_i::getSize(){
144 CORBA::ULong ret=_lgrTabToSend;
148 SALOME::vectorOfLong* SALOME_CorbaLongCSender_i::sendPart(CORBA::ULong offset, CORBA::ULong length){
149 SALOME::vectorOfLong_var c1 = new SALOME::vectorOfLong;
151 for (long i=0; i<length; i++)
152 c1[i] = ((long *)_tabToSend)[i+offset];
158 unsigned long SALOME_MPISender_i::_tag1=0;
160 unsigned long SALOME_MPISender_i::_tag2=1;
162 SALOME_MPISender_i::SALOME_MPISender_i(SALOME::TypeOfDataTransmitted type,const void *tabToSend,long lgrTabToSend,int sizeOf):SALOME_Sender_i(type,tabToSend,lgrTabToSend,sizeOf){
163 _portName=new char[MPI_MAX_PORT_NAME];
166 SALOME_MPISender_i::~SALOME_MPISender_i(){
170 SALOME::MPISender::param* SALOME_MPISender_i::getParam()
175 SALOME::MPISender::param_var p = new SALOME::MPISender::param;
176 MPI_Comm_rank(MPI_COMM_WORLD,&_cproc);
182 std::string service("toto_");
183 sprintf(stag,"%d_",_tag1);
185 sprintf(stag,"%d_",p->tag2);
187 p->service = CORBA::string_dup(service.c_str());
188 MPI_Open_port(MPI_INFO_NULL, _portName);
189 MPI_Errhandler_set(MPI_COMM_WORLD,MPI_ERRORS_RETURN);
190 while ( i != TIMEOUT && MPI_Publish_name((char*)service.c_str(),MPI_INFO_NULL,_portName) != MPI_SUCCESS) {
193 MPI_Errhandler_set(MPI_COMM_WORLD,MPI_ERRORS_ARE_FATAL);
194 if ( i == TIMEOUT ) {
195 MPI_Close_port(_portName);
204 void SALOME_MPISender_i::send()
206 _argsForThr=new (void *)[8];
207 _argsForThr[0]=_portName;
208 _argsForThr[1]=&_lgrTabToSend;
209 _argsForThr[2]=(void *)_tabToSend;
210 _argsForThr[3]=&_cproc;
211 _argsForThr[4]=&_tag1Inst;
212 _argsForThr[5]=&_tag2Inst;
213 _argsForThr[6]=&_com;
214 _argsForThr[7]=&_type;
216 _newThr=new omni_thread(SALOME_MPISender_i::myThread,_argsForThr);
220 void* SALOME_MPISender_i::myThread(void *args)
222 void **argsTab=(void **)args;
223 long *lgrTabToSend=(long *)argsTab[1];
224 int *cproc=(int *)argsTab[3];
225 int *tag1=(int *)argsTab[4];
226 int *tag2=(int *)argsTab[5];
227 MPI_Comm *com=(MPI_Comm *)argsTab[6];
228 SALOME::TypeOfDataTransmitted *type=(SALOME::TypeOfDataTransmitted *)argsTab[7];
230 MPI_Comm_accept((char *)argsTab[0],MPI_INFO_NULL,0,MPI_COMM_SELF,com);
231 MPI_Send(lgrTabToSend,1,MPI_LONG,*cproc,*tag1,*com);
234 case SALOME::DOUBLE_:
235 MPI_Send(argsTab[2],*lgrTabToSend,MPI_DOUBLE,*cproc,*tag2,*com);
238 MPI_Send(argsTab[2],*lgrTabToSend,MPI_INT,*cproc,*tag2,*com);
243 void SALOME_MPISender_i::close(const SALOME::MPISender::param& p)
245 std::string service(p.service);
246 const char *st=p.service;
249 MPI_Comm_free(&_com);
250 MPI_Unpublish_name((char *)service.c_str(),MPI_INFO_NULL,_portName);
251 MPI_Close_port(_portName);
252 delete [] _argsForThr;
260 #define _POSIX_PII_SOCKET
261 #define _LIBC_POLLUTION_H_
263 #include <sys/types.h>
264 #include <sys/socket.h>
265 #include <netinet/in.h>
266 #include <arpa/inet.h>
270 SALOME_SocketSender_i::SALOME_SocketSender_i(SALOME::TypeOfDataTransmitted type,const void *tabToSend,long lgrTabToSend,int sizeOf):SALOME_Sender_i(type,tabToSend,lgrTabToSend,sizeOf){
271 _IPAddress = inetAddress();
276 SALOME_SocketSender_i::~SALOME_SocketSender_i(){
279 std::string SALOME_SocketSender_i::inetAddress()
282 char t[INET_ADDRSTRLEN+1];
283 struct hostent *host;
284 struct in_addr saddr;
290 saddr.s_addr = inet_addr(s);
291 if (saddr.s_addr != -1)
292 inet_ntop(AF_INET, &saddr, t, INET_ADDRSTRLEN);
294 host = gethostbyname(s);
296 inet_ntop(AF_INET, (struct in_addr *) *host->h_addr_list,
299 return std::string(t);
302 SALOME::SocketSender::param * SALOME_SocketSender_i::getParam()
305 SALOME::SocketSender::param_var p = new SALOME::SocketSender::param;
308 p->lend = _lgrTabToSend - 1;
310 p->internet_address = CORBA::string_dup(_IPAddress.c_str());
315 void SALOME_SocketSender_i::send()
317 _argsForThr=new void *[6];
318 _argsForThr[0]=&_serverSockfd;
319 _argsForThr[1]=&_clientSockfd;
320 _argsForThr[2]=&_lgrTabToSend;
321 _argsForThr[3]=(void *)_tabToSend;
322 _argsForThr[4]=&_errorFlag;
323 _argsForThr[5]=&_type;
325 _newThr=new omni_thread(SALOME_SocketSender_i::myThread,_argsForThr);
329 void* SALOME_SocketSender_i::myThread(void *args)
332 void **argsTab=(void **)args;
333 int *serverSockfd=(int *)argsTab[0];
334 int *clientSockfd=(int *)argsTab[1];
335 long *lgrTabToSend=(long *)argsTab[2];
336 void *tabToSend=argsTab[3];
337 bool *errorFlag=(bool*)argsTab[4];
338 SALOME::TypeOfDataTransmitted *type=(SALOME::TypeOfDataTransmitted *)argsTab[5];
340 XDR xp; /* pointeur sur le decodeur XDR */
344 case SALOME::DOUBLE_:
345 xdrmem_create(&xp,(char*)tabToSend,(*lgrTabToSend)*sizeof(double),XDR_ENCODE );
346 xdr_vector( &xp, (char*)tabToSend, *lgrTabToSend, sizeof(double), (xdrproc_t)xdr_double );
349 while( n < *lgrTabToSend*sizeof(double) ){
350 m = write(*clientSockfd, (char*)tabToSend+n, *lgrTabToSend*sizeof(double)-n);
352 if( *clientSockfd >= 0 ){
353 ::close(*clientSockfd);
356 if( *serverSockfd >= 0 ){
357 ::close(*serverSockfd);
366 xdrmem_create(&xp,(char*)tabToSend,(*lgrTabToSend)*sizeof(double),XDR_DECODE );
367 xdr_vector( &xp, (char*)tabToSend, *lgrTabToSend, sizeof(double), (xdrproc_t)xdr_double );
371 xdrmem_create(&xp,(char*)tabToSend,(*lgrTabToSend)*sizeof(int),XDR_ENCODE );
372 xdr_vector( &xp, (char*)tabToSend, *lgrTabToSend, sizeof(int), (xdrproc_t)xdr_int );
375 while( n < *lgrTabToSend*sizeof(int) ){
376 m = write(*clientSockfd, (char*)tabToSend+n, *lgrTabToSend*sizeof(int)-n);
378 if( *clientSockfd >= 0 ){
379 ::close(*clientSockfd);
382 if( *serverSockfd >= 0 ){
383 ::close(*serverSockfd);
392 xdrmem_create(&xp,(char*)tabToSend,(*lgrTabToSend)*sizeof(int),XDR_DECODE );
393 xdr_vector( &xp, (char*)tabToSend, *lgrTabToSend, sizeof(int), (xdrproc_t)xdr_int );
398 void SALOME_SocketSender_i::initCom() throw(SALOME::SALOME_Exception)
400 struct sockaddr_in serv_addr;
402 SALOME::ExceptionStruct es;
404 /* Ouverture de la socket */
405 _serverSockfd = socket(AF_INET , SOCK_STREAM , 0);
406 if(_serverSockfd < 0) {
407 es.type = SALOME::COMM;
408 es.text = "error Socket exception";
409 throw SALOME::SALOME_Exception(es);
411 /* Socket structure initialisation*/
412 bzero((char*)&serv_addr,sizeof(serv_addr));
413 serv_addr.sin_family = AF_INET;
414 serv_addr.sin_port = 0; /* asking for a free port */
415 serv_addr.sin_addr.s_addr = INADDR_ANY;
417 /* Association of socket with a port */
418 if( ::bind(_serverSockfd, (struct sockaddr *) & serv_addr,
419 sizeof(struct sockaddr)) < 0 ) {
421 es.type = SALOME::COMM;
422 es.text = "error bind Socket exception";
423 throw SALOME::SALOME_Exception(es);
425 /* Listening to the allocated port */
426 if( listen(_serverSockfd, 10) < 0 ) {
428 es.type = SALOME::COMM;
429 es.text = "error listen Socket exception";
430 throw SALOME::SALOME_Exception(es);
432 /* Retrieving port number*/
433 if( getsockname(_serverSockfd, (struct sockaddr *) & serv_addr, &n) < 0 ){
435 es.type = SALOME::COMM;
436 es.text = "error getName Socket exception";
437 throw SALOME::SALOME_Exception(es);
439 _port = htons(serv_addr.sin_port);
443 void SALOME_SocketSender_i::acceptCom() throw(SALOME::SALOME_Exception)
447 struct sockaddr_in client_addr;
448 SALOME::ExceptionStruct es;
450 sin_size = sizeof(struct sockaddr_in);
452 _clientSockfd = accept(_serverSockfd, (struct sockaddr *)&client_addr, &sin_size);
453 if( _clientSockfd < 0 ){
455 es.type = SALOME::COMM;
456 es.text = "error accept Socket exception";
457 throw SALOME::SALOME_Exception(es);
461 void SALOME_SocketSender_i::closeCom()
463 if( _clientSockfd >= 0 ){
464 ::close(_clientSockfd);
467 if( _serverSockfd >= 0 ){
468 ::close(_serverSockfd);
474 void SALOME_SocketSender_i::endOfCom()
480 SALOME::ExceptionStruct es;
481 es.type = SALOME::COMM;
482 es.text = "error write Socket exception";
483 throw SALOME::SALOME_Exception(es);
485 delete [] _argsForThr;
489 #undef _LIBC_POLLUTION_H_
490 #undef _POSIX_PII_SOCKET