Salome HOME
Porting to Mandrake 10.1 and new products:
[modules/kernel.git] / src / Communication / SALOME_Comm_i.cxx
1 #include <rpc/xdr.h>
2 #include "SALOME_Comm_i.hxx"
3 #include "poa.h"
4 #include "omnithread.h"
5 #include "Utils_SINGLETON.hxx"
6 #include "Utils_ORB_INIT.hxx"
7 #include "utilities.h"
8
9 #include "SenderFactory.hxx"
10 using namespace std;
11
12 CORBA::ORB_var &getGlobalORB(){
13   ORB_INIT &init = *SINGLETON_<ORB_INIT>::Instance();
14   CORBA::ORB_var &orb = init(0,0);
15   return orb;
16 }
17
18 /*! Return the C++ data associated to the array to transmit.
19   Used when sender and receiver are collocalized.
20  */
21 const void *SALOME_Sender_i::getData(long &size) const{
22   size=_lgrTabToSend;
23   return _tabToSend;
24 }
25
26 /*! Return the sizeof() of each component of the generic array
27  */
28 int SALOME_Sender_i::getSizeOf() const {
29   return _sizeOf;
30 }
31
32 /*! Unique constructor */
33 SALOME_Sender_i::SALOME_Sender_i(const void *tabToSend,long lgrTabToSend,int sizeOf,bool ownTabToSend):_tabToSend(tabToSend),_lgrTabToSend(lgrTabToSend),_sizeOf(sizeOf),_ownTabToSend(ownTabToSend){
34 }
35
36 /*! To force ownerShip status */
37 void SALOME_Sender_i::setOwnerShip(bool own)
38 {
39   _ownTabToSend=own;
40 }
41
42 /*! Method for the remote destroy of the current servant. This method is used by the receiver to destroy the sender when the transfert is complete.
43  */
44 void SALOME_Sender_i::release()
45 {
46   PortableServer::ObjectId_var oid = _default_POA()->servant_to_id(this);
47   _default_POA()->deactivate_object(oid);
48   _remove_ref();
49 }
50
51 SALOME_SenderDouble_i::SALOME_SenderDouble_i(const double *tabToSend,long lgrTabToSend,bool ownTabToSend):SALOME_Sender_i(tabToSend,lgrTabToSend,sizeof(double),ownTabToSend)
52 {
53 }
54
55 /*! Destructor.
56  */
57 SALOME_SenderDouble_i::~SALOME_SenderDouble_i()
58 {
59   if(_ownTabToSend)
60     delete [] (double *)_tabToSend;
61 }
62
63 /*! Return a new sender of the same array but with an another protocol and delegates to the returned sender the ownership of array.
64  */
65 SALOME::SenderDouble_ptr SALOME_SenderDouble_i::buildOtherWithProtocol(SALOME::TypeOfCommunication type)
66 {
67   return SenderFactory::buildSender(type,this);
68 }
69
70 /*! Method to establish if the CORBA object refered by pCorba is collocalised.\n
71   If it is, the pointer to the servant that incarnates the CORBA object is returned.
72 */
73 SALOME_SenderDouble_i *SALOME_SenderDouble_i::find(SALOME::SenderDouble_ptr pCorba){
74   PortableServer::ServantBase *ret;
75   try {
76     ret=PortableServer::POA::_the_root_poa()->reference_to_servant(pCorba);
77   }
78   catch(...){
79     return 0;
80   }
81   ret->_remove_ref();
82   return dynamic_cast<SALOME_SenderDouble_i *>(ret);
83 }
84
85 SALOME_SenderInt_i::SALOME_SenderInt_i(const int *tabToSend,long lgrTabToSend,bool ownTabToSend):SALOME_Sender_i(tabToSend,lgrTabToSend,sizeof(int),ownTabToSend)
86 {
87 }
88
89 /*! Destructor.
90  */
91 SALOME_SenderInt_i::~SALOME_SenderInt_i()
92 {
93   if(_ownTabToSend)
94     delete [] (int *)_tabToSend;
95 }
96
97 /*! Return a new sender of the same array but with an another protocol.
98  */
99 SALOME::SenderInt_ptr SALOME_SenderInt_i::buildOtherWithProtocol(SALOME::TypeOfCommunication type)
100 {
101   return SenderFactory::buildSender(type,this);
102 }
103
104 /*! Method to establish if the CORBA object refered by pCorba is collocalised.\n
105   If it is, the pointer to the servant that incarnates the CORBA object is returned.
106 */
107 SALOME_SenderInt_i *SALOME_SenderInt_i::find(SALOME::SenderInt_ptr pCorba){
108   PortableServer::ServantBase *ret;
109   try {
110     ret=PortableServer::POA::_the_root_poa()->reference_to_servant(pCorba);
111   }
112   catch(...){
113     return 0;
114   }
115   ret->_remove_ref();
116   return dynamic_cast<SALOME_SenderInt_i *>(ret);
117 }
118
119 SALOME_CorbaDoubleNCSender_i::SALOME_CorbaDoubleNCSender_i(const double *tabToSend,long lgrTabToSend,bool ownTabToSend):SALOME_SenderDouble_i(tabToSend,lgrTabToSend,ownTabToSend),SALOME_Sender_i(tabToSend,lgrTabToSend,sizeof(double),ownTabToSend){
120 }
121
122 SALOME_CorbaDoubleNCSender_i::~SALOME_CorbaDoubleNCSender_i(){
123 }
124
125 CORBA::ULong SALOME_CorbaDoubleNCSender_i::getSize(){
126   CORBA::ULong ret=_lgrTabToSend;
127   return ret;
128 }
129
130 SALOME::vectorOfDouble* SALOME_CorbaDoubleNCSender_i::sendPart(CORBA::ULong offset, CORBA::ULong length){
131   SALOME::vectorOfDouble_var c1 = new SALOME::vectorOfDouble(length,length,(CORBA::Double *)((double *)_tabToSend+(long)offset),0);
132   return c1._retn();
133 }
134
135 SALOME::vectorOfDouble* SALOME_CorbaDoubleNCSender_i::send(){
136   SALOME::vectorOfDouble_var c1 = new SALOME::vectorOfDouble(_lgrTabToSend,_lgrTabToSend,(CORBA::Double *)_tabToSend,0);
137   return c1._retn();
138 }
139
140 SALOME_CorbaDoubleCSender_i::SALOME_CorbaDoubleCSender_i(const double *tabToSend,long lgrTabToSend,bool ownTabToSend):SALOME_SenderDouble_i(tabToSend,lgrTabToSend,ownTabToSend),SALOME_Sender_i(tabToSend,lgrTabToSend,sizeof(double),ownTabToSend){
141 }
142
143 SALOME_CorbaDoubleCSender_i::~SALOME_CorbaDoubleCSender_i(){
144 }
145
146 CORBA::ULong SALOME_CorbaDoubleCSender_i::getSize(){
147   CORBA::ULong ret=_lgrTabToSend;
148   return ret;
149 }
150
151 SALOME::vectorOfDouble* SALOME_CorbaDoubleCSender_i::sendPart(CORBA::ULong offset, CORBA::ULong length){
152   SALOME::vectorOfDouble_var c1 = new SALOME::vectorOfDouble;
153   c1->length(length);
154   for (long i=0; i<length; i++)
155     c1[i] = ((double *)_tabToSend)[i+offset];
156   return c1._retn();
157 }
158
159 ////////////////////////
160
161 SALOME_CorbaLongNCSender_i::SALOME_CorbaLongNCSender_i(const int *tabToSend,long lgrTabToSend,bool ownTabToSend):SALOME_SenderInt_i(tabToSend,lgrTabToSend,ownTabToSend),SALOME_Sender_i(tabToSend,lgrTabToSend,sizeof(int),ownTabToSend){
162 }
163
164 SALOME_CorbaLongNCSender_i::~SALOME_CorbaLongNCSender_i(){
165 }
166
167 CORBA::ULong SALOME_CorbaLongNCSender_i::getSize(){
168   CORBA::ULong ret=_lgrTabToSend;
169   return ret;
170 }
171
172 SALOME::vectorOfLong* SALOME_CorbaLongNCSender_i::sendPart(CORBA::ULong offset, CORBA::ULong length){
173   SALOME::vectorOfLong_var c1 = new SALOME::vectorOfLong(length,length,(CORBA::Long *)((long *)_tabToSend+(long)offset),0);
174   return c1._retn();
175 }
176
177 SALOME::vectorOfLong* SALOME_CorbaLongNCSender_i::send(){
178   SALOME::vectorOfLong_var c1 = new SALOME::vectorOfLong(_lgrTabToSend,_lgrTabToSend,(CORBA::Long *)_tabToSend,0);
179   return c1._retn();
180 }
181
182 SALOME_CorbaLongCSender_i::SALOME_CorbaLongCSender_i(const int *tabToSend,long lgrTabToSend,bool ownTabToSend):SALOME_SenderInt_i(tabToSend,lgrTabToSend,ownTabToSend),SALOME_Sender_i(tabToSend,lgrTabToSend,sizeof(int),ownTabToSend){
183 }
184
185 SALOME_CorbaLongCSender_i::~SALOME_CorbaLongCSender_i(){
186 }
187
188 CORBA::ULong SALOME_CorbaLongCSender_i::getSize(){
189   CORBA::ULong ret=_lgrTabToSend;
190   return ret;
191 }
192
193 SALOME::vectorOfLong* SALOME_CorbaLongCSender_i::sendPart(CORBA::ULong offset, CORBA::ULong length){
194   SALOME::vectorOfLong_var c1 = new SALOME::vectorOfLong;
195   c1->length(length);
196   for (long i=0; i<length; i++)
197     c1[i] = ((long *)_tabToSend)[i+offset];
198   return c1._retn();
199 }
200
201 #ifdef HAVE_MPI2
202
203 unsigned long SALOME_MPISender_i::_tag1=0;
204
205 unsigned long SALOME_MPISender_i::_tag2=1;
206
207 SALOME_MPISender_i::SALOME_MPISender_i(const void *tabToSend,long lgrTabToSend,int sizeOf,bool ownTabToSend):SALOME_Sender_i(tabToSend,lgrTabToSend,sizeOf,ownTabToSend){
208   _portName=new char[MPI_MAX_PORT_NAME];
209 }
210
211 SALOME_MPISender_i::~SALOME_MPISender_i(){
212   delete [] _portName;
213 }
214
215 SALOME::MPISender::param* SALOME_MPISender_i::getParam()
216 {
217   char stag[12];
218   int myproc,i=0;
219
220   SALOME::MPISender::param_var p = new SALOME::MPISender::param;
221   MPI_Comm_rank(MPI_COMM_WORLD,&_cproc);
222   p->myproc = _cproc;
223   p->tag1 = _tag1;
224   _tag1Inst=_tag1;
225   p->tag2 =_tag2;
226   _tag2Inst=_tag2;
227   std::string service("toto_");
228   sprintf(stag,"%d_",_tag1);
229   service += stag;
230   sprintf(stag,"%d_",p->tag2);
231   service += stag;
232   p->service = CORBA::string_dup(service.c_str());
233   MPI_Open_port(MPI_INFO_NULL, _portName);
234   MPI_Errhandler_set(MPI_COMM_WORLD,MPI_ERRORS_RETURN);
235   while ( i != TIMEOUT  && MPI_Publish_name((char*)service.c_str(),MPI_INFO_NULL,_portName) != MPI_SUCCESS) {
236     i++;
237   } 
238   MPI_Errhandler_set(MPI_COMM_WORLD,MPI_ERRORS_ARE_FATAL);
239   if ( i == TIMEOUT  ) { 
240     MPI_Close_port(_portName);
241     MPI_Finalize();
242     exit(-1);
243     }
244   _tag1 += 2;
245   _tag2 += 2;
246   return p._retn();
247 }
248
249 void SALOME_MPISender_i::send()
250 {
251   _type=getTypeOfDataTransmitted();
252   _argsForThr=new (void *)[8];
253   _argsForThr[0]=_portName;
254   _argsForThr[1]=&_lgrTabToSend;
255   _argsForThr[2]=(void *)_tabToSend;
256   _argsForThr[3]=&_cproc;
257   _argsForThr[4]=&_tag1Inst;
258   _argsForThr[5]=&_tag2Inst;
259   _argsForThr[6]=&_com;
260   _argsForThr[7]=&_type;
261
262   _newThr=new omni_thread(SALOME_MPISender_i::myThread,_argsForThr);
263   _newThr->start();
264 }
265
266 void* SALOME_MPISender_i::myThread(void *args)
267 {
268   void **argsTab=(void **)args;
269   long *lgrTabToSend=(long *)argsTab[1];
270   int *cproc=(int *)argsTab[3];
271   int *tag1=(int *)argsTab[4];
272   int *tag2=(int *)argsTab[5];
273   MPI_Comm *com=(MPI_Comm *)argsTab[6];
274   SALOME::TypeOfDataTransmitted *type=(SALOME::TypeOfDataTransmitted *)argsTab[7];
275
276   MPI_Comm_accept((char *)argsTab[0],MPI_INFO_NULL,0,MPI_COMM_SELF,com);
277   MPI_Send(lgrTabToSend,1,MPI_LONG,*cproc,*tag1,*com);
278   switch(*type)
279     { 
280     case SALOME::DOUBLE_:
281       MPI_Send(argsTab[2],*lgrTabToSend,MPI_DOUBLE,*cproc,*tag2,*com);
282       break;
283     case SALOME::INT_:
284       MPI_Send(argsTab[2],*lgrTabToSend,MPI_INT,*cproc,*tag2,*com);
285     }
286   omni_thread::exit();
287   return args;
288 }
289
290 void SALOME_MPISender_i::close(const SALOME::MPISender::param& p)
291 {
292   std::string service(p.service);
293   const char *st=p.service;
294   void *r;
295   _newThr->join(&r);
296   MPI_Comm_free(&_com); 
297   MPI_Unpublish_name((char *)service.c_str(),MPI_INFO_NULL,_portName); 
298   MPI_Close_port(_portName);
299   delete [] _argsForThr;
300 }
301
302 SALOME_MPISenderDouble_i::SALOME_MPISenderDouble_i(const double *tabToSend,long lgrTabToSend,bool ownTabToSend)
303   :SALOME_SenderDouble_i(tabToSend,lgrTabToSend,ownTabToSend),SALOME_MPISender_i(tabToSend,lgrTabToSend,sizeof(double),ownTabToSend)
304   ,SALOME_Sender_i(tabToSend,lgrTabToSend,sizeof(double),ownTabToSend)
305 {
306 }
307
308 SALOME_MPISenderInt_i::SALOME_MPISenderInt_i(const int *tabToSend,long lgrTabToSend,bool ownTabToSend)
309   :SALOME_SenderInt_i(tabToSend,lgrTabToSend,ownTabToSend),SALOME_MPISender_i(tabToSend,lgrTabToSend,sizeof(int),ownTabToSend)
310   ,SALOME_Sender_i(tabToSend,lgrTabToSend,sizeof(int),ownTabToSend)
311 {
312 }
313
314 #endif
315
316 #ifdef HAVE_SOCKET
317
318 //CCRT porting
319 #define _POSIX_PII_SOCKET
320 #define _LIBC_POLLUTION_H_
321
322 #include <sys/types.h>
323 #include <sys/socket.h>
324 #include <netinet/in.h>
325 #include <arpa/inet.h>
326 #include <netdb.h>
327 #include <unistd.h>
328
329 SALOME_SocketSender_i::SALOME_SocketSender_i(const void *tabToSend,long lgrTabToSend,int sizeOf,bool ownTabToSend):SALOME_Sender_i(tabToSend,lgrTabToSend,sizeOf,ownTabToSend){
330   _IPAddress = inetAddress();
331   _serverSockfd = -1;
332   _clientSockfd = -1;
333 }
334
335 SALOME_SocketSender_i::~SALOME_SocketSender_i(){
336 }
337
338 std::string SALOME_SocketSender_i::inetAddress()
339 {
340    char s[256];
341    char t[INET_ADDRSTRLEN+1];
342    struct hostent *host;
343    struct in_addr saddr;
344
345    gethostname(s, 255);
346
347    *t = '\0';
348
349    saddr.s_addr = inet_addr(s);
350    if (saddr.s_addr != -1)
351       inet_ntop(AF_INET, &saddr, t, INET_ADDRSTRLEN);
352    else {
353       host = gethostbyname(s);
354       if (host != NULL)
355          inet_ntop(AF_INET, (struct in_addr *) *host->h_addr_list, 
356                    t, INET_ADDRSTRLEN);
357    }
358    return std::string(t);
359 }
360
361 SALOME::SocketSender::param * SALOME_SocketSender_i::getParam()
362 {
363
364   SALOME::SocketSender::param_var p = new SALOME::SocketSender::param;
365
366   p->lstart = 0;
367   p->lend = _lgrTabToSend - 1;
368   p->myport = _port;
369   p->internet_address = CORBA::string_dup(_IPAddress.c_str());
370
371   return p._retn();
372 }
373
374 void SALOME_SocketSender_i::send()
375 {
376   _type=getTypeOfDataTransmitted();
377   _argsForThr=new void *[6];
378   _argsForThr[0]=&_serverSockfd;
379   _argsForThr[1]=&_clientSockfd;
380   _argsForThr[2]=&_lgrTabToSend;
381   _argsForThr[3]=(void *)_tabToSend;
382   _argsForThr[4]=&_errorFlag;
383   _argsForThr[5]=&_type;
384
385   _newThr=new omni_thread(SALOME_SocketSender_i::myThread,_argsForThr);
386   _newThr->start();
387 }
388
389 void* SALOME_SocketSender_i::myThread(void *args)
390 {
391   int n=0, m;
392   void **argsTab=(void **)args;
393   int *serverSockfd=(int *)argsTab[0];
394   int *clientSockfd=(int *)argsTab[1];
395   long *lgrTabToSend=(long *)argsTab[2];
396   void *tabToSend=argsTab[3];
397   bool *errorFlag=(bool*)argsTab[4];
398   SALOME::TypeOfDataTransmitted *type=(SALOME::TypeOfDataTransmitted *)argsTab[5];
399   
400   XDR xp; /* pointeur sur le decodeur XDR */
401   
402   switch(*type)
403     { 
404     case SALOME::DOUBLE_:
405       xdrmem_create(&xp,(char*)tabToSend,(*lgrTabToSend)*sizeof(double),XDR_ENCODE );
406       xdr_vector( &xp, (char*)tabToSend, *lgrTabToSend, sizeof(double), (xdrproc_t)xdr_double );
407
408       *errorFlag = false;
409       while( n < *lgrTabToSend*sizeof(double) ){
410         m = write(*clientSockfd, (char*)tabToSend+n, *lgrTabToSend*sizeof(double)-n);
411         if( m < 0 ){
412           if( *clientSockfd >= 0 ){
413             ::close(*clientSockfd);
414             *clientSockfd = -1;
415           }
416           if( *serverSockfd >= 0 ){
417             ::close(*serverSockfd);
418             *serverSockfd = -1;
419           }
420           *errorFlag = true;
421         }
422         n += m;
423       }
424       xdr_destroy( &xp );
425
426       xdrmem_create(&xp,(char*)tabToSend,(*lgrTabToSend)*sizeof(double),XDR_DECODE );
427       xdr_vector( &xp, (char*)tabToSend, *lgrTabToSend, sizeof(double), (xdrproc_t)xdr_double );
428       xdr_destroy( &xp );
429       break;
430     case SALOME::INT_:
431       xdrmem_create(&xp,(char*)tabToSend,(*lgrTabToSend)*sizeof(int),XDR_ENCODE );
432       xdr_vector( &xp, (char*)tabToSend, *lgrTabToSend, sizeof(int), (xdrproc_t)xdr_int );
433
434       *errorFlag = false;
435       while( n < *lgrTabToSend*sizeof(int) ){
436         m = write(*clientSockfd, (char*)tabToSend+n, *lgrTabToSend*sizeof(int)-n);
437         if( m < 0 ){
438           if( *clientSockfd >= 0 ){
439             ::close(*clientSockfd);
440             *clientSockfd = -1;
441           }
442           if( *serverSockfd >= 0 ){
443             ::close(*serverSockfd);
444             *serverSockfd = -1;
445           }
446           *errorFlag = true;
447         }
448         n += m;
449       }
450       xdr_destroy( &xp );
451
452       xdrmem_create(&xp,(char*)tabToSend,(*lgrTabToSend)*sizeof(int),XDR_DECODE );
453       xdr_vector( &xp, (char*)tabToSend, *lgrTabToSend, sizeof(int), (xdrproc_t)xdr_int );
454       xdr_destroy( &xp );
455     }
456   return args;
457 }
458
459 void SALOME_SocketSender_i::initCom() throw(SALOME::SALOME_Exception)
460 {
461   struct sockaddr_in serv_addr;
462   socklen_t n;
463   SALOME::ExceptionStruct es;
464
465   /* Ouverture de la socket */
466   _serverSockfd = socket(AF_INET , SOCK_STREAM , 0);
467   if(_serverSockfd < 0) {
468     es.type = SALOME::COMM;
469     es.text = "error Socket exception";
470     throw SALOME::SALOME_Exception(es);
471   }
472   /* Socket structure initialisation*/
473   bzero((char*)&serv_addr,sizeof(serv_addr));
474   serv_addr.sin_family = AF_INET;
475   serv_addr.sin_port = 0; /* asking for a free port */
476   serv_addr.sin_addr.s_addr = INADDR_ANY;
477
478   /* Association of socket with a port */
479   if( ::bind(_serverSockfd, (struct sockaddr *) & serv_addr, 
480            sizeof(struct sockaddr)) < 0 ) {
481     closeCom();
482     es.type = SALOME::COMM;
483     es.text = "error bind Socket exception";
484     throw SALOME::SALOME_Exception(es);
485   }
486   /* Listening to the allocated port */
487   if( listen(_serverSockfd, 10) < 0 ) {
488     closeCom();
489     es.type = SALOME::COMM;
490     es.text = "error listen Socket exception";
491     throw SALOME::SALOME_Exception(es);
492   }
493   /* Retrieving port number*/
494   if( getsockname(_serverSockfd, (struct sockaddr *) & serv_addr, &n) < 0 ){
495     closeCom();
496     es.type = SALOME::COMM;
497     es.text = "error getName Socket exception";
498     throw SALOME::SALOME_Exception(es);
499   }
500   _port = htons(serv_addr.sin_port);
501   SCRUTE(_port);
502 }
503
504 void SALOME_SocketSender_i::acceptCom() throw(SALOME::SALOME_Exception)
505 {
506   socklen_t sin_size;
507   struct sockaddr_in client_addr;
508   SALOME::ExceptionStruct es;
509
510   sin_size = sizeof(struct sockaddr_in);
511   
512   _clientSockfd = accept(_serverSockfd, (struct sockaddr *)&client_addr, &sin_size);
513   if( _clientSockfd < 0 ){
514     closeCom();
515     es.type = SALOME::COMM;
516     es.text = "error accept Socket exception";
517     throw SALOME::SALOME_Exception(es);
518   }
519 }
520
521 void SALOME_SocketSender_i::closeCom()
522 {
523   if( _clientSockfd >= 0 ){
524     ::close(_clientSockfd);
525     _clientSockfd = -1;
526   }
527   if( _serverSockfd >= 0 ){
528     ::close(_serverSockfd);
529     _serverSockfd = -1;
530   }
531
532 }
533
534 void SALOME_SocketSender_i::endOfCom()
535 {
536   void *r;
537   _newThr->join(&r);
538   if(_errorFlag)
539     {
540       SALOME::ExceptionStruct es;
541       es.type = SALOME::COMM;
542       es.text = "error write Socket exception";
543       throw SALOME::SALOME_Exception(es);
544     }
545   delete [] _argsForThr;
546 }
547
548 SALOME_SocketSenderDouble_i::SALOME_SocketSenderDouble_i(const double *tabToSend,long lgrTabToSend,bool ownTabToSend)
549   :SALOME_SenderDouble_i(tabToSend,lgrTabToSend,ownTabToSend),SALOME_SocketSender_i(tabToSend,lgrTabToSend,sizeof(double),ownTabToSend)
550   ,SALOME_Sender_i(tabToSend,lgrTabToSend,sizeof(double),ownTabToSend)
551 {
552 }
553
554 SALOME_SocketSenderInt_i::SALOME_SocketSenderInt_i(const int *tabToSend,long lgrTabToSend,bool ownTabToSend)
555   :SALOME_SenderInt_i(tabToSend,lgrTabToSend,ownTabToSend),SALOME_SocketSender_i(tabToSend,lgrTabToSend,sizeof(int),ownTabToSend)
556   ,SALOME_Sender_i(tabToSend,lgrTabToSend,sizeof(int),ownTabToSend)
557 {
558 }
559
560 //CCRT porting
561 #undef _LIBC_POLLUTION_H_
562 #undef _POSIX_PII_SOCKET
563
564 #endif