]> SALOME platform Git repositories - modules/kernel.git/blob - src/Communication/SALOME_Comm_i.cxx
Salome HOME
Moved some functionality to VTKViewer_Utilities.h
[modules/kernel.git] / src / Communication / SALOME_Comm_i.cxx
1 #include <rpc/xdr.h>
2 #include "SALOME_Comm_i.hxx"
3 #include "poa.h"
4 #include "omnithread.h"
5 #include "Utils_SINGLETON.hxx"
6 #include "Utils_ORB_INIT.hxx"
7 #include "utilities.h"
8
9 #include "SenderFactory.hxx"
10 using namespace std;
11
12 CORBA::ORB_var &getGlobalORB(){
13   ORB_INIT &init = *SINGLETON_<ORB_INIT>::Instance();
14   CORBA::ORB_var &orb = init(0,0);
15   return orb;
16 }
17
18 /*! Return the C++ data associated to the array to transmit.
19   Used when sender and receiver are collocalized.
20  */
21 const void *SALOME_Sender_i::getData(long &size) const{
22   size=_lgrTabToSend;
23   return _tabToSend;
24 }
25
26 /*! Return the sizeof() of each component of the generic array
27  */
28 int SALOME_Sender_i::getSizeOf() const {
29   return _sizeOf;
30 }
31
32 /*! Unique constructor */
33 SALOME_Sender_i::SALOME_Sender_i(SALOME::TypeOfDataTransmitted type,const void *tabToSend,long lgrTabToSend,int sizeOf):_tabToSend(tabToSend),_lgrTabToSend(lgrTabToSend),_type(type),_sizeOf(sizeOf){
34 }
35
36 /*! Method to establish if the CORBA object refered by pCorba is collocalised.\n
37   If it is, the pointer to the servant that incarnates the CORBA object is returned.
38 */
39 SALOME_Sender_i *SALOME_Sender_i::find(SALOME::Sender_ptr pCorba){
40   PortableServer::ServantBase *ret;
41   try {
42     ret=PortableServer::POA::_the_root_poa()->reference_to_servant(pCorba);
43   }
44   catch(...){
45     return 0;
46   }
47   ret->_remove_ref();
48   return dynamic_cast<SALOME_Sender_i *>(ret);
49 }
50
51 /*! Method for the remote destroy of the current servant. This method is used by the receiver to destroy the sender when the transfert is complete.
52  */
53 void SALOME_Sender_i::release()
54 {
55   PortableServer::ObjectId_var oid = _default_POA()->servant_to_id(this);
56   _default_POA()->deactivate_object(oid);
57   _remove_ref();
58 }
59
60 /*! Return the type of the element that compose the array. Used by receiverfactory to build the correct receiver.
61  */
62 SALOME::TypeOfDataTransmitted SALOME_Sender_i::getTypeOfDataTransmitted()
63 {
64   return _type;
65 }
66
67 /*! Return a new sender of the same array but with an another protocol.
68  */
69 SALOME::Sender_ptr SALOME_Sender_i::buildOtherWithProtocol(SALOME::TypeOfCommunication type)
70 {
71   return SenderFactory::buildSender(type,this);
72 }
73
74 SALOME_CorbaDoubleNCSender_i::SALOME_CorbaDoubleNCSender_i(const double *tabToSend,long lgrTabToSend):SALOME_Sender_i(SALOME::DOUBLE_,tabToSend,lgrTabToSend,sizeof(double)){
75 }
76
77 SALOME_CorbaDoubleNCSender_i::~SALOME_CorbaDoubleNCSender_i(){
78 }
79
80 CORBA::ULong SALOME_CorbaDoubleNCSender_i::getSize(){
81   CORBA::ULong ret=_lgrTabToSend;
82   return ret;
83 }
84
85 SALOME::vectorOfDouble* SALOME_CorbaDoubleNCSender_i::sendPart(CORBA::ULong offset, CORBA::ULong length){
86   SALOME::vectorOfDouble_var c1 = new SALOME::vectorOfDouble(length,length,(CORBA::Double *)((double *)_tabToSend+(long)offset),0);
87   return c1._retn();
88 }
89
90 SALOME::vectorOfDouble* SALOME_CorbaDoubleNCSender_i::send(){
91   SALOME::vectorOfDouble_var c1 = new SALOME::vectorOfDouble(_lgrTabToSend,_lgrTabToSend,(CORBA::Double *)_tabToSend,0);
92   return c1._retn();
93 }
94
95 SALOME_CorbaDoubleCSender_i::SALOME_CorbaDoubleCSender_i(const double *tabToSend,long lgrTabToSend):SALOME_Sender_i(SALOME::DOUBLE_,tabToSend,lgrTabToSend,sizeof(double)){
96 }
97
98 SALOME_CorbaDoubleCSender_i::~SALOME_CorbaDoubleCSender_i(){
99 }
100
101 CORBA::ULong SALOME_CorbaDoubleCSender_i::getSize(){
102   CORBA::ULong ret=_lgrTabToSend;
103   return ret;
104 }
105
106 SALOME::vectorOfDouble* SALOME_CorbaDoubleCSender_i::sendPart(CORBA::ULong offset, CORBA::ULong length){
107   SALOME::vectorOfDouble_var c1 = new SALOME::vectorOfDouble;
108   c1->length(length);
109   for (long i=0; i<length; i++)
110     c1[i] = ((double *)_tabToSend)[i+offset];
111   return c1._retn();
112 }
113
114 ////////////////////////
115
116 SALOME_CorbaLongNCSender_i::SALOME_CorbaLongNCSender_i(const int *tabToSend,long lgrTabToSend):SALOME_Sender_i(SALOME::INT_,tabToSend,lgrTabToSend,sizeof(int)){
117 }
118
119 SALOME_CorbaLongNCSender_i::~SALOME_CorbaLongNCSender_i(){
120 }
121
122 CORBA::ULong SALOME_CorbaLongNCSender_i::getSize(){
123   CORBA::ULong ret=_lgrTabToSend;
124   return ret;
125 }
126
127 SALOME::vectorOfLong* SALOME_CorbaLongNCSender_i::sendPart(CORBA::ULong offset, CORBA::ULong length){
128   SALOME::vectorOfLong_var c1 = new SALOME::vectorOfLong(length,length,(CORBA::Long *)((long *)_tabToSend+(long)offset),0);
129   return c1._retn();
130 }
131
132 SALOME::vectorOfLong* SALOME_CorbaLongNCSender_i::send(){
133   SALOME::vectorOfLong_var c1 = new SALOME::vectorOfLong(_lgrTabToSend,_lgrTabToSend,(CORBA::Long *)_tabToSend,0);
134   return c1._retn();
135 }
136
137 SALOME_CorbaLongCSender_i::SALOME_CorbaLongCSender_i(const int *tabToSend,long lgrTabToSend):SALOME_Sender_i(SALOME::INT_,tabToSend,lgrTabToSend,sizeof(int)){
138 }
139
140 SALOME_CorbaLongCSender_i::~SALOME_CorbaLongCSender_i(){
141 }
142
143 CORBA::ULong SALOME_CorbaLongCSender_i::getSize(){
144   CORBA::ULong ret=_lgrTabToSend;
145   return ret;
146 }
147
148 SALOME::vectorOfLong* SALOME_CorbaLongCSender_i::sendPart(CORBA::ULong offset, CORBA::ULong length){
149   SALOME::vectorOfLong_var c1 = new SALOME::vectorOfLong;
150   c1->length(length);
151   for (long i=0; i<length; i++)
152     c1[i] = ((long *)_tabToSend)[i+offset];
153   return c1._retn();
154 }
155
156 #ifdef HAVE_MPI2
157
158 unsigned long SALOME_MPISender_i::_tag1=0;
159
160 unsigned long SALOME_MPISender_i::_tag2=1;
161
162 SALOME_MPISender_i::SALOME_MPISender_i(SALOME::TypeOfDataTransmitted type,const void *tabToSend,long lgrTabToSend,int sizeOf):SALOME_Sender_i(type,tabToSend,lgrTabToSend,sizeOf){
163   _portName=new char[MPI_MAX_PORT_NAME];
164 }
165
166 SALOME_MPISender_i::~SALOME_MPISender_i(){
167   delete [] _portName;
168 }
169
170 SALOME::MPISender::param* SALOME_MPISender_i::getParam()
171 {
172   char stag[12];
173   int myproc,i=0;
174
175   SALOME::MPISender::param_var p = new SALOME::MPISender::param;
176   MPI_Comm_rank(MPI_COMM_WORLD,&_cproc);
177   p->myproc = _cproc;
178   p->tag1 = _tag1;
179   _tag1Inst=_tag1;
180   p->tag2 =_tag2;
181   _tag2Inst=_tag2;
182   std::string service("toto_");
183   sprintf(stag,"%d_",_tag1);
184   service += stag;
185   sprintf(stag,"%d_",p->tag2);
186   service += stag;
187   p->service = CORBA::string_dup(service.c_str());
188   MPI_Open_port(MPI_INFO_NULL, _portName);
189   MPI_Errhandler_set(MPI_COMM_WORLD,MPI_ERRORS_RETURN);
190   while ( i != TIMEOUT  && MPI_Publish_name((char*)service.c_str(),MPI_INFO_NULL,_portName) != MPI_SUCCESS) {
191     i++;
192   } 
193   MPI_Errhandler_set(MPI_COMM_WORLD,MPI_ERRORS_ARE_FATAL);
194   if ( i == TIMEOUT  ) { 
195     MPI_Close_port(_portName);
196     MPI_Finalize();
197     exit(-1);
198     }
199   _tag1 += 2;
200   _tag2 += 2;
201   return p._retn();
202 }
203
204 void SALOME_MPISender_i::send()
205 {
206   _argsForThr=new (void *)[8];
207   _argsForThr[0]=_portName;
208   _argsForThr[1]=&_lgrTabToSend;
209   _argsForThr[2]=(void *)_tabToSend;
210   _argsForThr[3]=&_cproc;
211   _argsForThr[4]=&_tag1Inst;
212   _argsForThr[5]=&_tag2Inst;
213   _argsForThr[6]=&_com;
214   _argsForThr[7]=&_type;
215
216   _newThr=new omni_thread(SALOME_MPISender_i::myThread,_argsForThr);
217   _newThr->start();
218 }
219
220 void* SALOME_MPISender_i::myThread(void *args)
221 {
222   void **argsTab=(void **)args;
223   long *lgrTabToSend=(long *)argsTab[1];
224   int *cproc=(int *)argsTab[3];
225   int *tag1=(int *)argsTab[4];
226   int *tag2=(int *)argsTab[5];
227   MPI_Comm *com=(MPI_Comm *)argsTab[6];
228   SALOME::TypeOfDataTransmitted *type=(SALOME::TypeOfDataTransmitted *)argsTab[7];
229
230   MPI_Comm_accept((char *)argsTab[0],MPI_INFO_NULL,0,MPI_COMM_SELF,com);
231   MPI_Send(lgrTabToSend,1,MPI_LONG,*cproc,*tag1,*com);
232   switch(*type)
233     { 
234     case SALOME::DOUBLE_:
235       MPI_Send(argsTab[2],*lgrTabToSend,MPI_DOUBLE,*cproc,*tag2,*com);
236       break;
237     case SALOME::INT_:
238       MPI_Send(argsTab[2],*lgrTabToSend,MPI_INT,*cproc,*tag2,*com);
239     }
240   omni_thread::exit();
241 }
242
243 void SALOME_MPISender_i::close(const SALOME::MPISender::param& p)
244 {
245   std::string service(p.service);
246   const char *st=p.service;
247   void *r;
248   _newThr->join(&r);
249   MPI_Comm_free(&_com); 
250   MPI_Unpublish_name((char *)service.c_str(),MPI_INFO_NULL,_portName); 
251   MPI_Close_port(_portName);
252   delete [] _argsForThr;
253 }
254
255 #endif
256
257 #ifdef HAVE_SOCKET
258
259 //CCRT porting
260 #define _POSIX_PII_SOCKET
261 #define _LIBC_POLLUTION_H_
262
263 #include <sys/types.h>
264 #include <sys/socket.h>
265 #include <netinet/in.h>
266 #include <arpa/inet.h>
267 #include <netdb.h>
268 #include <unistd.h>
269
270 SALOME_SocketSender_i::SALOME_SocketSender_i(SALOME::TypeOfDataTransmitted type,const void *tabToSend,long lgrTabToSend,int sizeOf):SALOME_Sender_i(type,tabToSend,lgrTabToSend,sizeOf){
271   _IPAddress = inetAddress();
272   _serverSockfd = -1;
273   _clientSockfd = -1;
274 }
275
276 SALOME_SocketSender_i::~SALOME_SocketSender_i(){
277 }
278
279 std::string SALOME_SocketSender_i::inetAddress()
280 {
281    char s[256];
282    char t[INET_ADDRSTRLEN+1];
283    struct hostent *host;
284    struct in_addr saddr;
285
286    gethostname(s, 255);
287
288    *t = '\0';
289
290    saddr.s_addr = inet_addr(s);
291    if (saddr.s_addr != -1)
292       inet_ntop(AF_INET, &saddr, t, INET_ADDRSTRLEN);
293    else {
294       host = gethostbyname(s);
295       if (host != NULL)
296          inet_ntop(AF_INET, (struct in_addr *) *host->h_addr_list, 
297                    t, INET_ADDRSTRLEN);
298    }
299    return std::string(t);
300 }
301
302 SALOME::SocketSender::param * SALOME_SocketSender_i::getParam()
303 {
304
305   SALOME::SocketSender::param_var p = new SALOME::SocketSender::param;
306
307   p->lstart = 0;
308   p->lend = _lgrTabToSend - 1;
309   p->myport = _port;
310   p->internet_address = CORBA::string_dup(_IPAddress.c_str());
311
312   return p._retn();
313 }
314
315 void SALOME_SocketSender_i::send()
316 {
317   _argsForThr=new void *[6];
318   _argsForThr[0]=&_serverSockfd;
319   _argsForThr[1]=&_clientSockfd;
320   _argsForThr[2]=&_lgrTabToSend;
321   _argsForThr[3]=(void *)_tabToSend;
322   _argsForThr[4]=&_errorFlag;
323   _argsForThr[5]=&_type;
324
325   _newThr=new omni_thread(SALOME_SocketSender_i::myThread,_argsForThr);
326   _newThr->start();
327 }
328
329 void* SALOME_SocketSender_i::myThread(void *args)
330 {
331   int n=0, m;
332   void **argsTab=(void **)args;
333   int *serverSockfd=(int *)argsTab[0];
334   int *clientSockfd=(int *)argsTab[1];
335   long *lgrTabToSend=(long *)argsTab[2];
336   void *tabToSend=argsTab[3];
337   bool *errorFlag=(bool*)argsTab[4];
338   SALOME::TypeOfDataTransmitted *type=(SALOME::TypeOfDataTransmitted *)argsTab[5];
339   
340   XDR xp; /* pointeur sur le decodeur XDR */
341   
342   switch(*type)
343     { 
344     case SALOME::DOUBLE_:
345       xdrmem_create(&xp,(char*)tabToSend,(*lgrTabToSend)*sizeof(double),XDR_ENCODE );
346       xdr_vector( &xp, (char*)tabToSend, *lgrTabToSend, sizeof(double), (xdrproc_t)xdr_double );
347
348       *errorFlag = false;
349       while( n < *lgrTabToSend*sizeof(double) ){
350         m = write(*clientSockfd, (char*)tabToSend+n, *lgrTabToSend*sizeof(double)-n);
351         if( m < 0 ){
352           if( *clientSockfd >= 0 ){
353             ::close(*clientSockfd);
354             *clientSockfd = -1;
355           }
356           if( *serverSockfd >= 0 ){
357             ::close(*serverSockfd);
358             *serverSockfd = -1;
359           }
360           *errorFlag = true;
361         }
362         n += m;
363       }
364       xdr_destroy( &xp );
365
366       xdrmem_create(&xp,(char*)tabToSend,(*lgrTabToSend)*sizeof(double),XDR_DECODE );
367       xdr_vector( &xp, (char*)tabToSend, *lgrTabToSend, sizeof(double), (xdrproc_t)xdr_double );
368       xdr_destroy( &xp );
369       break;
370     case SALOME::INT_:
371       xdrmem_create(&xp,(char*)tabToSend,(*lgrTabToSend)*sizeof(int),XDR_ENCODE );
372       xdr_vector( &xp, (char*)tabToSend, *lgrTabToSend, sizeof(int), (xdrproc_t)xdr_int );
373
374       *errorFlag = false;
375       while( n < *lgrTabToSend*sizeof(int) ){
376         m = write(*clientSockfd, (char*)tabToSend+n, *lgrTabToSend*sizeof(int)-n);
377         if( m < 0 ){
378           if( *clientSockfd >= 0 ){
379             ::close(*clientSockfd);
380             *clientSockfd = -1;
381           }
382           if( *serverSockfd >= 0 ){
383             ::close(*serverSockfd);
384             *serverSockfd = -1;
385           }
386           *errorFlag = true;
387         }
388         n += m;
389       }
390       xdr_destroy( &xp );
391
392       xdrmem_create(&xp,(char*)tabToSend,(*lgrTabToSend)*sizeof(int),XDR_DECODE );
393       xdr_vector( &xp, (char*)tabToSend, *lgrTabToSend, sizeof(int), (xdrproc_t)xdr_int );
394       xdr_destroy( &xp );
395     }
396 }
397
398 void SALOME_SocketSender_i::initCom() throw(SALOME::SALOME_Exception)
399 {
400   struct sockaddr_in serv_addr;
401   socklen_t n;
402   SALOME::ExceptionStruct es;
403
404   /* Ouverture de la socket */
405   _serverSockfd = socket(AF_INET , SOCK_STREAM , 0);
406   if(_serverSockfd < 0) {
407     es.type = SALOME::COMM;
408     es.text = "error Socket exception";
409     throw SALOME::SALOME_Exception(es);
410   }
411   /* Socket structure initialisation*/
412   bzero((char*)&serv_addr,sizeof(serv_addr));
413   serv_addr.sin_family = AF_INET;
414   serv_addr.sin_port = 0; /* asking for a free port */
415   serv_addr.sin_addr.s_addr = INADDR_ANY;
416
417   /* Association of socket with a port */
418   if( ::bind(_serverSockfd, (struct sockaddr *) & serv_addr, 
419            sizeof(struct sockaddr)) < 0 ) {
420     closeCom();
421     es.type = SALOME::COMM;
422     es.text = "error bind Socket exception";
423     throw SALOME::SALOME_Exception(es);
424   }
425   /* Listening to the allocated port */
426   if( listen(_serverSockfd, 10) < 0 ) {
427     closeCom();
428     es.type = SALOME::COMM;
429     es.text = "error listen Socket exception";
430     throw SALOME::SALOME_Exception(es);
431   }
432   /* Retrieving port number*/
433   if( getsockname(_serverSockfd, (struct sockaddr *) & serv_addr, &n) < 0 ){
434     closeCom();
435     es.type = SALOME::COMM;
436     es.text = "error getName Socket exception";
437     throw SALOME::SALOME_Exception(es);
438   }
439   _port = htons(serv_addr.sin_port);
440   SCRUTE(_port);
441 }
442
443 void SALOME_SocketSender_i::acceptCom() throw(SALOME::SALOME_Exception)
444 {
445   socklen_t sin_size;
446   int new_fd;
447   struct sockaddr_in client_addr;
448   SALOME::ExceptionStruct es;
449
450   sin_size = sizeof(struct sockaddr_in);
451   
452   _clientSockfd = accept(_serverSockfd, (struct sockaddr *)&client_addr, &sin_size);
453   if( _clientSockfd < 0 ){
454     closeCom();
455     es.type = SALOME::COMM;
456     es.text = "error accept Socket exception";
457     throw SALOME::SALOME_Exception(es);
458   }
459 }
460
461 void SALOME_SocketSender_i::closeCom()
462 {
463   if( _clientSockfd >= 0 ){
464     ::close(_clientSockfd);
465     _clientSockfd = -1;
466   }
467   if( _serverSockfd >= 0 ){
468     ::close(_serverSockfd);
469     _serverSockfd = -1;
470   }
471
472 }
473
474 void SALOME_SocketSender_i::endOfCom()
475 {
476   void *r;
477   _newThr->join(&r);
478   if(_errorFlag)
479     {
480       SALOME::ExceptionStruct es;
481       es.type = SALOME::COMM;
482       es.text = "error write Socket exception";
483       throw SALOME::SALOME_Exception(es);
484     }
485   delete [] _argsForThr;
486 }
487
488 //CCRT porting
489 #undef _LIBC_POLLUTION_H_
490 #undef _POSIX_PII_SOCKET
491
492 #endif