Salome HOME
PR: mergefrom_BR_GEAY_05Nov04
[modules/kernel.git] / src / Communication / SALOME_Comm_i.cxx
1 #include <rpc/xdr.h>
2 #include "SALOME_Comm_i.hxx"
3 #include "poa.h"
4 #include "omnithread.h"
5 #include "Utils_SINGLETON.hxx"
6 #include "Utils_ORB_INIT.hxx"
7 #include "utilities.h"
8
9 #include "SenderFactory.hxx"
10
11 CORBA::ORB_var &getGlobalORB(){
12   ORB_INIT &init = *SINGLETON_<ORB_INIT>::Instance();
13   CORBA::ORB_var &orb = init(0,0);
14   return orb;
15 }
16
17 /*! Return the C++ data associated to the array to transmit.
18   Used when sender and receiver are collocalized.
19  */
20 const void *SALOME_Sender_i::getData(long &size) const{
21   size=_lgrTabToSend;
22   return _tabToSend;
23 }
24
25 /*! Return the sizeof() of each component of the generic array
26  */
27 int SALOME_Sender_i::getSizeOf() const {
28   return _sizeOf;
29 }
30
31 /*! Unique constructor */
32 SALOME_Sender_i::SALOME_Sender_i(SALOME::TypeOfDataTransmitted type,const void *tabToSend,long lgrTabToSend,int sizeOf):_tabToSend(tabToSend),_lgrTabToSend(lgrTabToSend),_type(type),_sizeOf(sizeOf){
33 }
34
35 /*! Method to establish if the CORBA object refered by pCorba is collocalised.\n
36   If it is, the pointer to the servant that incarnates the CORBA object is returned.
37 */
38 SALOME_Sender_i *SALOME_Sender_i::find(SALOME::Sender_ptr pCorba){
39   PortableServer::ServantBase *ret;
40   try {
41     ret=PortableServer::POA::_the_root_poa()->reference_to_servant(pCorba);
42   }
43   catch(...){
44     return 0;
45   }
46   ret->_remove_ref();
47   return dynamic_cast<SALOME_Sender_i *>(ret);
48 }
49
50 /*! Method for the remote destroy of the current servant. This method is used by the receiver to destroy the sender when the transfert is complete.
51  */
52 void SALOME_Sender_i::release()
53 {
54   PortableServer::ObjectId_var oid = _default_POA()->servant_to_id(this);
55   _default_POA()->deactivate_object(oid);
56   _remove_ref();
57 }
58
59 /*! Return the type of the element that compose the array. Used by receiverfactory to build the correct receiver.
60  */
61 SALOME::TypeOfDataTransmitted SALOME_Sender_i::getTypeOfDataTransmitted()
62 {
63   return _type;
64 }
65
66 /*! Return a new sender of the same array but with an another protocol.
67  */
68 SALOME::Sender_ptr SALOME_Sender_i::buildOtherWithProtocol(SALOME::TypeOfCommunication type)
69 {
70   return SenderFactory::buildSender(type,this);
71 }
72
73 SALOME_CorbaDoubleNCSender_i::SALOME_CorbaDoubleNCSender_i(const double *tabToSend,long lgrTabToSend):SALOME_Sender_i(SALOME::DOUBLE_,tabToSend,lgrTabToSend,sizeof(double)){
74 }
75
76 SALOME_CorbaDoubleNCSender_i::~SALOME_CorbaDoubleNCSender_i(){
77 }
78
79 CORBA::ULong SALOME_CorbaDoubleNCSender_i::getSize(){
80   CORBA::ULong ret=_lgrTabToSend;
81   return ret;
82 }
83
84 SALOME::vectorOfDouble* SALOME_CorbaDoubleNCSender_i::sendPart(CORBA::ULong offset, CORBA::ULong length){
85   SALOME::vectorOfDouble_var c1 = new SALOME::vectorOfDouble(length,length,(CORBA::Double *)((double *)_tabToSend+(long)offset),0);
86   return c1._retn();
87 }
88
89 SALOME::vectorOfDouble* SALOME_CorbaDoubleNCSender_i::send(){
90   SALOME::vectorOfDouble_var c1 = new SALOME::vectorOfDouble(_lgrTabToSend,_lgrTabToSend,(CORBA::Double *)_tabToSend,0);
91   return c1._retn();
92 }
93
94 SALOME_CorbaDoubleCSender_i::SALOME_CorbaDoubleCSender_i(const double *tabToSend,long lgrTabToSend):SALOME_Sender_i(SALOME::DOUBLE_,tabToSend,lgrTabToSend,sizeof(double)){
95 }
96
97 SALOME_CorbaDoubleCSender_i::~SALOME_CorbaDoubleCSender_i(){
98 }
99
100 CORBA::ULong SALOME_CorbaDoubleCSender_i::getSize(){
101   CORBA::ULong ret=_lgrTabToSend;
102   return ret;
103 }
104
105 SALOME::vectorOfDouble* SALOME_CorbaDoubleCSender_i::sendPart(CORBA::ULong offset, CORBA::ULong length){
106   SALOME::vectorOfDouble_var c1 = new SALOME::vectorOfDouble;
107   c1->length(length);
108   for (long i=0; i<length; i++)
109     c1[i] = ((double *)_tabToSend)[i+offset];
110   return c1._retn();
111 }
112
113 ////////////////////////
114
115 SALOME_CorbaLongNCSender_i::SALOME_CorbaLongNCSender_i(const int *tabToSend,long lgrTabToSend):SALOME_Sender_i(SALOME::INT_,tabToSend,lgrTabToSend,sizeof(int)){
116 }
117
118 SALOME_CorbaLongNCSender_i::~SALOME_CorbaLongNCSender_i(){
119 }
120
121 CORBA::ULong SALOME_CorbaLongNCSender_i::getSize(){
122   CORBA::ULong ret=_lgrTabToSend;
123   return ret;
124 }
125
126 SALOME::vectorOfLong* SALOME_CorbaLongNCSender_i::sendPart(CORBA::ULong offset, CORBA::ULong length){
127   SALOME::vectorOfLong_var c1 = new SALOME::vectorOfLong(length,length,(CORBA::Long *)((long *)_tabToSend+(long)offset),0);
128   return c1._retn();
129 }
130
131 SALOME::vectorOfLong* SALOME_CorbaLongNCSender_i::send(){
132   SALOME::vectorOfLong_var c1 = new SALOME::vectorOfLong(_lgrTabToSend,_lgrTabToSend,(CORBA::Long *)_tabToSend,0);
133   return c1._retn();
134 }
135
136 SALOME_CorbaLongCSender_i::SALOME_CorbaLongCSender_i(const int *tabToSend,long lgrTabToSend):SALOME_Sender_i(SALOME::INT_,tabToSend,lgrTabToSend,sizeof(int)){
137 }
138
139 SALOME_CorbaLongCSender_i::~SALOME_CorbaLongCSender_i(){
140 }
141
142 CORBA::ULong SALOME_CorbaLongCSender_i::getSize(){
143   CORBA::ULong ret=_lgrTabToSend;
144   return ret;
145 }
146
147 SALOME::vectorOfLong* SALOME_CorbaLongCSender_i::sendPart(CORBA::ULong offset, CORBA::ULong length){
148   SALOME::vectorOfLong_var c1 = new SALOME::vectorOfLong;
149   c1->length(length);
150   for (long i=0; i<length; i++)
151     c1[i] = ((long *)_tabToSend)[i+offset];
152   return c1._retn();
153 }
154
155 #ifdef HAVE_MPI2
156
157 unsigned long SALOME_MPISender_i::_tag1=0;
158
159 unsigned long SALOME_MPISender_i::_tag2=1;
160
161 SALOME_MPISender_i::SALOME_MPISender_i(SALOME::TypeOfDataTransmitted type,const void *tabToSend,long lgrTabToSend,int sizeOf):SALOME_Sender_i(type,tabToSend,lgrTabToSend,sizeOf){
162   _portName=new char[MPI_MAX_PORT_NAME];
163 }
164
165 SALOME_MPISender_i::~SALOME_MPISender_i(){
166   delete [] _portName;
167 }
168
169 SALOME::MPISender::param* SALOME_MPISender_i::getParam()
170 {
171   char stag[12];
172   int myproc,i=0;
173
174   SALOME::MPISender::param_var p = new SALOME::MPISender::param;
175   MPI_Comm_rank(MPI_COMM_WORLD,&_cproc);
176   p->myproc = _cproc;
177   p->tag1 = _tag1;
178   _tag1Inst=_tag1;
179   p->tag2 =_tag2;
180   _tag2Inst=_tag2;
181   std::string service("toto_");
182   sprintf(stag,"%d_",_tag1);
183   service += stag;
184   sprintf(stag,"%d_",p->tag2);
185   service += stag;
186   p->service = CORBA::string_dup(service.c_str());
187   MPI_Open_port(MPI_INFO_NULL, _portName);
188   MPI_Errhandler_set(MPI_COMM_WORLD,MPI_ERRORS_RETURN);
189   while ( i != TIMEOUT  && MPI_Publish_name((char*)service.c_str(),MPI_INFO_NULL,_portName) != MPI_SUCCESS) {
190     i++;
191   } 
192   MPI_Errhandler_set(MPI_COMM_WORLD,MPI_ERRORS_ARE_FATAL);
193   if ( i == TIMEOUT  ) { 
194     MPI_Close_port(_portName);
195     MPI_Finalize();
196     exit(-1);
197     }
198   _tag1 += 2;
199   _tag2 += 2;
200   return p._retn();
201 }
202
203 void SALOME_MPISender_i::send()
204 {
205   _argsForThr=new (void *)[8];
206   _argsForThr[0]=_portName;
207   _argsForThr[1]=&_lgrTabToSend;
208   _argsForThr[2]=(void *)_tabToSend;
209   _argsForThr[3]=&_cproc;
210   _argsForThr[4]=&_tag1Inst;
211   _argsForThr[5]=&_tag2Inst;
212   _argsForThr[6]=&_com;
213   _argsForThr[7]=&_type;
214
215   _newThr=new omni_thread(SALOME_MPISender_i::myThread,_argsForThr);
216   _newThr->start();
217 }
218
219 void* SALOME_MPISender_i::myThread(void *args)
220 {
221   void **argsTab=(void **)args;
222   long *lgrTabToSend=(long *)argsTab[1];
223   int *cproc=(int *)argsTab[3];
224   int *tag1=(int *)argsTab[4];
225   int *tag2=(int *)argsTab[5];
226   MPI_Comm *com=(MPI_Comm *)argsTab[6];
227   SALOME::TypeOfDataTransmitted *type=(SALOME::TypeOfDataTransmitted *)argsTab[7];
228
229   MPI_Comm_accept((char *)argsTab[0],MPI_INFO_NULL,0,MPI_COMM_SELF,com);
230   MPI_Send(lgrTabToSend,1,MPI_LONG,*cproc,*tag1,*com);
231   switch(*type)
232     { 
233     case SALOME::DOUBLE_:
234       MPI_Send(argsTab[2],*lgrTabToSend,MPI_DOUBLE,*cproc,*tag2,*com);
235       break;
236     case SALOME::INT_:
237       MPI_Send(argsTab[2],*lgrTabToSend,MPI_INT,*cproc,*tag2,*com);
238     }
239   omni_thread::exit();
240 }
241
242 void SALOME_MPISender_i::close(const SALOME::MPISender::param& p)
243 {
244   std::string service(p.service);
245   const char *st=p.service;
246   void *r;
247   _newThr->join(&r);
248   MPI_Comm_free(&_com); 
249   MPI_Unpublish_name((char *)service.c_str(),MPI_INFO_NULL,_portName); 
250   MPI_Close_port(_portName);
251   delete [] _argsForThr;
252 }
253
254 #endif
255
256 #ifdef HAVE_SOCKET
257
258 #include <sys/types.h>
259 #include <sys/socket.h>
260 #include <netinet/in.h>
261 #include <arpa/inet.h>
262 #include <netdb.h>
263 #include <unistd.h>
264
265 SALOME_SocketSender_i::SALOME_SocketSender_i(SALOME::TypeOfDataTransmitted type,const void *tabToSend,long lgrTabToSend,int sizeOf):SALOME_Sender_i(type,tabToSend,lgrTabToSend,sizeOf){
266   _IPAddress = inetAddress();
267   _serverSockfd = -1;
268   _clientSockfd = -1;
269 }
270
271 SALOME_SocketSender_i::~SALOME_SocketSender_i(){
272 }
273
274 std::string SALOME_SocketSender_i::inetAddress()
275 {
276    char s[256];
277    char t[INET_ADDRSTRLEN+1];
278    struct hostent *host;
279    struct in_addr saddr;
280
281    gethostname(s, 255);
282
283    *t = '\0';
284
285    saddr.s_addr = inet_addr(s);
286    if (saddr.s_addr != -1)
287       inet_ntop(AF_INET, &saddr, t, INET_ADDRSTRLEN);
288    else {
289       host = gethostbyname(s);
290       if (host != NULL)
291          inet_ntop(AF_INET, (struct in_addr *) *host->h_addr_list, 
292                    t, INET_ADDRSTRLEN);
293    }
294    return std::string(t);
295 }
296
297 SALOME::SocketSender::param * SALOME_SocketSender_i::getParam()
298 {
299
300   SALOME::SocketSender::param_var p = new SALOME::SocketSender::param;
301
302   p->lstart = 0;
303   p->lend = _lgrTabToSend - 1;
304   p->myport = _port;
305   p->internet_address = CORBA::string_dup(_IPAddress.c_str());
306
307   return p._retn();
308 }
309
310 void SALOME_SocketSender_i::send()
311 {
312   _argsForThr=new void *[6];
313   _argsForThr[0]=&_serverSockfd;
314   _argsForThr[1]=&_clientSockfd;
315   _argsForThr[2]=&_lgrTabToSend;
316   _argsForThr[3]=(void *)_tabToSend;
317   _argsForThr[4]=&_errorFlag;
318   _argsForThr[5]=&_type;
319
320   _newThr=new omni_thread(SALOME_SocketSender_i::myThread,_argsForThr);
321   _newThr->start();
322 }
323
324 void* SALOME_SocketSender_i::myThread(void *args)
325 {
326   int n=0, m;
327   void **argsTab=(void **)args;
328   int *serverSockfd=(int *)argsTab[0];
329   int *clientSockfd=(int *)argsTab[1];
330   long *lgrTabToSend=(long *)argsTab[2];
331   void *tabToSend=argsTab[3];
332   bool *errorFlag=(bool*)argsTab[4];
333   SALOME::TypeOfDataTransmitted *type=(SALOME::TypeOfDataTransmitted *)argsTab[5];
334   
335   XDR xp; /* pointeur sur le decodeur XDR */
336   
337   switch(*type)
338     { 
339     case SALOME::DOUBLE_:
340       xdrmem_create(&xp,(char*)tabToSend,(*lgrTabToSend)*sizeof(double),XDR_ENCODE );
341       xdr_vector( &xp, (char*)tabToSend, *lgrTabToSend, sizeof(double), (xdrproc_t)xdr_double );
342
343       *errorFlag = false;
344       while( n < *lgrTabToSend*sizeof(double) ){
345         m = write(*clientSockfd, (char*)tabToSend+n, *lgrTabToSend*sizeof(double)-n);
346         if( m < 0 ){
347           if( *clientSockfd >= 0 ){
348             ::close(*clientSockfd);
349             *clientSockfd = -1;
350           }
351           if( *serverSockfd >= 0 ){
352             ::close(*serverSockfd);
353             *serverSockfd = -1;
354           }
355           *errorFlag = true;
356         }
357         n += m;
358       }
359       xdr_destroy( &xp );
360
361       xdrmem_create(&xp,(char*)tabToSend,(*lgrTabToSend)*sizeof(double),XDR_DECODE );
362       xdr_vector( &xp, (char*)tabToSend, *lgrTabToSend, sizeof(double), (xdrproc_t)xdr_double );
363       xdr_destroy( &xp );
364       break;
365     case SALOME::INT_:
366       xdrmem_create(&xp,(char*)tabToSend,(*lgrTabToSend)*sizeof(int),XDR_ENCODE );
367       xdr_vector( &xp, (char*)tabToSend, *lgrTabToSend, sizeof(int), (xdrproc_t)xdr_int );
368
369       *errorFlag = false;
370       while( n < *lgrTabToSend*sizeof(int) ){
371         m = write(*clientSockfd, (char*)tabToSend+n, *lgrTabToSend*sizeof(int)-n);
372         if( m < 0 ){
373           if( *clientSockfd >= 0 ){
374             ::close(*clientSockfd);
375             *clientSockfd = -1;
376           }
377           if( *serverSockfd >= 0 ){
378             ::close(*serverSockfd);
379             *serverSockfd = -1;
380           }
381           *errorFlag = true;
382         }
383         n += m;
384       }
385       xdr_destroy( &xp );
386
387       xdrmem_create(&xp,(char*)tabToSend,(*lgrTabToSend)*sizeof(int),XDR_DECODE );
388       xdr_vector( &xp, (char*)tabToSend, *lgrTabToSend, sizeof(int), (xdrproc_t)xdr_int );
389       xdr_destroy( &xp );
390     }
391 }
392
393 void SALOME_SocketSender_i::initCom() throw(SALOME::SALOME_Exception)
394 {
395   struct sockaddr_in serv_addr;
396   socklen_t n;
397   SALOME::ExceptionStruct es;
398
399   /* Ouverture de la socket */
400   _serverSockfd = socket(AF_INET , SOCK_STREAM , 0);
401   if(_serverSockfd < 0) {
402     es.type = SALOME::COMM;
403     es.text = "error Socket exception";
404     throw SALOME::SALOME_Exception(es);
405   }
406   /* Socket structure initialisation*/
407   bzero((char*)&serv_addr,sizeof(serv_addr));
408   serv_addr.sin_family = AF_INET;
409   serv_addr.sin_port = 0; /* asking for a free port */
410   serv_addr.sin_addr.s_addr = INADDR_ANY;
411
412   /* Association of socket with a port */
413   if( ::bind(_serverSockfd, (struct sockaddr *) & serv_addr, 
414            sizeof(struct sockaddr)) < 0 ) {
415     closeCom();
416     es.type = SALOME::COMM;
417     es.text = "error bind Socket exception";
418     throw SALOME::SALOME_Exception(es);
419   }
420   /* Listening to the allocated port */
421   if( listen(_serverSockfd, 10) < 0 ) {
422     closeCom();
423     es.type = SALOME::COMM;
424     es.text = "error listen Socket exception";
425     throw SALOME::SALOME_Exception(es);
426   }
427   /* Retrieving port number*/
428   if( getsockname(_serverSockfd, (struct sockaddr *) & serv_addr, &n) < 0 ){
429     closeCom();
430     es.type = SALOME::COMM;
431     es.text = "error getName Socket exception";
432     throw SALOME::SALOME_Exception(es);
433   }
434   _port = htons(serv_addr.sin_port);
435   SCRUTE(_port);
436 }
437
438 void SALOME_SocketSender_i::acceptCom() throw(SALOME::SALOME_Exception)
439 {
440   socklen_t sin_size;
441   int new_fd;
442   struct sockaddr_in client_addr;
443   SALOME::ExceptionStruct es;
444
445   sin_size = sizeof(struct sockaddr_in);
446   
447   _clientSockfd = accept(_serverSockfd, (struct sockaddr *)&client_addr, &sin_size);
448   if( _clientSockfd < 0 ){
449     closeCom();
450     es.type = SALOME::COMM;
451     es.text = "error accept Socket exception";
452     throw SALOME::SALOME_Exception(es);
453   }
454 }
455
456 void SALOME_SocketSender_i::closeCom()
457 {
458   if( _clientSockfd >= 0 ){
459     ::close(_clientSockfd);
460     _clientSockfd = -1;
461   }
462   if( _serverSockfd >= 0 ){
463     ::close(_serverSockfd);
464     _serverSockfd = -1;
465   }
466
467 }
468
469 void SALOME_SocketSender_i::endOfCom()
470 {
471   void *r;
472   _newThr->join(&r);
473   if(_errorFlag)
474     {
475       SALOME::ExceptionStruct es;
476       es.type = SALOME::COMM;
477       es.text = "error write Socket exception";
478       throw SALOME::SALOME_Exception(es);
479     }
480   delete [] _argsForThr;
481 }
482
483 #endif