Salome HOME
Moved some functionality to VTKViewer_Utilities.h
[modules/kernel.git] / src / Communication / Receivers.cxx
1 #include "poa.h"
2 #include "utilities.h"
3 using namespace std;
4
5 #define TAILLE_SPLIT 100000
6 #define TIMEOUT 20
7
8 template<class T,class TCorba,class TSeqCorba,class CorbaSender>
9 CorbaNCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender>::CorbaNCNoCopyReceiver(CorbaSender mySender):_mySender(mySender){
10 }
11
12 template<class T,class TCorba,class TSeqCorba,class CorbaSender>
13 CorbaNCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender>::~CorbaNCNoCopyReceiver(){
14   _mySender->release();
15   CORBA::release(_mySender);
16 }
17
18 template<class T,class TCorba,class TSeqCorba,class CorbaSender>
19 void *CorbaNCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender>::getDistValue(long &size)
20 {
21   TSeqCorba seq=_mySender->send();
22   size=seq->length();
23   return seq->get_buffer(1);
24 }
25
26 template<class T,class TCorba,class TSeqCorba,class CorbaSender>
27 void *CorbaNCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender>::getValue(long &size)
28 {
29   return Receiver::getValue(size,_mySender);
30 }
31
32 template<class T,class TCorba,class TSeqCorba,class CorbaSender>
33 CorbaNCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender>::CorbaNCWithCopyReceiver(CorbaSender mySender):_mySender(mySender){
34 }
35
36 template<class T,class TCorba,class TSeqCorba,class CorbaSender>
37 CorbaNCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender>::~CorbaNCWithCopyReceiver(){
38   _mySender->release();
39   CORBA::release(_mySender);
40 }
41
42 template<class T,class TCorba,class TSeqCorba,class CorbaSender>
43 void *CorbaNCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender>::getDistValue(long &size){
44   size=_mySender->getSize();
45   long n;
46   T *ret=new T[size];
47   T *iter=ret;
48   for(long i=0;i<size;i+=TAILLE_SPLIT)
49     {
50       if(size-i>TAILLE_SPLIT)
51         n=TAILLE_SPLIT;
52       else
53         n=size-i;
54       TSeqCorba seq=_mySender->sendPart(i,n);
55       T *seqd=(T *)seq->get_buffer(0);
56       for(long j=0;j<n;j++)
57         *iter++=*seqd++;
58     }
59   return ret;
60 }
61
62 template<class T,class TCorba,class TSeqCorba,class CorbaSender>
63 void *CorbaNCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender>::getValue(long &size)
64 {
65   return Receiver::getValue(size,_mySender);
66 }
67
68 template<class T,class TCorba,class TSeqCorba,class CorbaSender>
69 CorbaWCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender>::CorbaWCNoCopyReceiver(CorbaSender mySender):_mySender(mySender){
70 }
71
72 template<class T,class TCorba,class TSeqCorba,class CorbaSender>
73 CorbaWCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender>::~CorbaWCNoCopyReceiver(){
74   _mySender->release();
75   CORBA::release(_mySender);
76 }
77
78 template<class T,class TCorba,class TSeqCorba,class CorbaSender>
79 void *CorbaWCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender>::getDistValue(long &size){
80   size=_mySender->getSize();
81   long n;
82   T *ret=new T[size];
83   T *iter=ret;
84   for(long i=0;i<size;i+=TAILLE_SPLIT)
85     {
86       if(size-i>TAILLE_SPLIT)
87         n=TAILLE_SPLIT;
88       else
89         n=size-i;
90       TSeqCorba seq=_mySender->sendPart(i,n);
91       TCorba *seqd=seq->get_buffer(0);
92       for(long j=0;j<n;j++)
93         *iter++=*seqd++;
94     }
95   return ret;
96 }
97
98 template<class T,class TCorba,class TSeqCorba,class CorbaSender>
99 void *CorbaWCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender>::getValue(long &size)
100 {
101   return Receiver::getValue(size,_mySender);
102 }
103
104 template<class T,class TCorba,class TSeqCorba,class CorbaSender>
105 CorbaWCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender>::CorbaWCWithCopyReceiver(CorbaSender mySender):_mySender(mySender){
106 }
107
108 template<class T,class TCorba,class TSeqCorba,class CorbaSender>
109 CorbaWCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender>::~CorbaWCWithCopyReceiver(){
110   _mySender->release();
111   CORBA::release(_mySender);
112 }
113
114 template<class T,class TCorba,class TSeqCorba,class CorbaSender>
115 void *CorbaWCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender>::getDistValue(long &size){
116   size=_mySender->getSize();
117   long n;
118   T *ret=new T[size];
119   T *iter=ret;
120   for(long i=0;i<size;i+=TAILLE_SPLIT)
121     {
122       if(size-i>TAILLE_SPLIT)
123         n=TAILLE_SPLIT;
124       else
125         n=size-i;
126       TSeqCorba seq=_mySender->sendPart(i,n);
127       TCorba *seqd=seq->get_buffer(0);
128       for(long j=0;j<n;j++)
129       *iter++=*seqd++;
130     }
131   return ret;
132 }
133
134 template<class T,class TCorba,class TSeqCorba,class CorbaSender>
135 void *CorbaWCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender>::getValue(long &size)
136 {
137   return Receiver::getValue(size,_mySender);
138 }
139
140 #ifdef HAVE_MPI2
141
142 template<class T,MPI_Datatype T2>
143 MPIReceiver<T,T2>::MPIReceiver(SALOME::MPISender_ptr mySender):_mySender(mySender){
144 }
145
146 template<class T,MPI_Datatype T2>
147 MPIReceiver<T,T2>::~MPIReceiver(){
148   _mySender->release();
149   CORBA::release(_mySender);
150 }
151
152 template<class T,MPI_Datatype T2>
153 void *MPIReceiver<T,T2>::getDistValue(long &size){
154   int i=0;
155   int myproc;
156   int sproc;
157   MPI_Status status;
158   MPI_Comm com; 
159   char   port_name_clt [MPI_MAX_PORT_NAME];
160   float telps, tuser, tsys, tcpu;
161   T *_v;
162   long _n;
163
164   
165   CORBA::Any a; 
166   MPI_Comm_rank(MPI_COMM_WORLD, &myproc);
167   SALOME::MPISender::param_var p =_mySender->getParam();
168   _mySender->send();
169   sproc = p->myproc;
170   MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_RETURN);
171   while ( i != TIMEOUT  && MPI_Lookup_name((char*)p->service,MPI_INFO_NULL,port_name_clt) != MPI_SUCCESS) { 
172     i++;
173   }       
174   MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_ARE_FATAL);
175   if ( i == TIMEOUT  ) { 
176     MPI_Finalize();
177     exit(-1);
178   }
179   else{
180     //       Connect to service, get the inter-communicator server
181     //      Attention MPI_Comm_connect est un appel collectif :
182     //  - Si lancement mpirun -c n -----> uniquement     MPI_COMM_SELF fonctionne
183     //  - Si lancement client_server&client_server ----> MPI_COMM_WORLD fonctionne
184     
185     //      TIMEOUT is inefficient since MPI_Comm_Connect doesn't return if we asked for
186     //        a service that has been unpublished !
187     MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_RETURN);
188     i = 0;
189     while ( i != TIMEOUT  &&  MPI_Comm_connect(port_name_clt, MPI_INFO_NULL, 0, MPI_COMM_WORLD, &com)!=MPI_SUCCESS ) { 
190       i++; 
191     } 
192     MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_ARE_FATAL);
193     if ( i == TIMEOUT ) {
194       MPI_Finalize(); 
195       exit(-1);
196     }
197   }
198   MPI_Recv( &_n, 1, MPI_LONG, sproc,p->tag1,com,&status);
199   _v = new T[_n];
200   MPI_Recv( _v, _n, T2, sproc,p->tag2,com,&status);
201   _mySender->close(p);
202   MPI_Comm_disconnect( &com );  
203   size=_n;
204   return _v;
205 }
206
207 template<class T,MPI_Datatype T2>
208 void *MPIReceiver<T,T2>::getValue(long &size)
209 {
210   return Receiver::getValue(size,_mySender);
211 }
212
213 #endif
214
215 #ifdef HAVE_SOCKET
216 #include <sys/types.h>
217 #include <sys/socket.h>
218 #include <netinet/in.h>
219 #include <arpa/inet.h>
220 #include <netdb.h>
221 #include <unistd.h>
222 #include <rpc/xdr.h>
223
224 template<class T,int (*myFunc)(XDR*,T*)>
225 SocketReceiver<T,myFunc>::SocketReceiver(SALOME::SocketSender_ptr mySender) : _mySender(mySender)
226 {
227   _clientSockfd = -1;
228   _senderDestruc=true;
229 }
230
231 template<class T,int (*myFunc)(XDR*,T*)>
232 SocketReceiver<T,myFunc>::~SocketReceiver()
233 {
234   if(_senderDestruc)
235     {
236       _mySender->release();
237       CORBA::release(_mySender);
238     }
239 }
240
241 template<class T,int (*myFunc)(XDR*,T*)>
242 void *SocketReceiver<T,myFunc>::getValue(long &size)
243 {
244   return Receiver::getValue(size,_mySender);
245 }
246
247 template<class T,int (*myFunc)(XDR*,T*)>
248 void* SocketReceiver<T,myFunc>::getDistValue(long &size)
249 {
250   int n=0, m;
251   T *v;
252   XDR xp; /* pointeur sur le decodeur XDR */
253
254   try{
255     initCom();
256
257     SALOME::SocketSender::param_var p = _mySender->getParam();
258
259     size = p->lend - p->lstart + 1;
260     v = new T[size];
261
262     connectCom(p->internet_address, p->myport);
263   
264     _mySender->send();
265
266     xdrmem_create(&xp,(char*)v,size*sizeof(T),XDR_DECODE );
267     while( n < size*sizeof(T) ){
268       m = read(_clientSockfd, (char*)v+n, size*sizeof(T)-n);
269       if( m < 0 ){
270         closeCom();
271         delete [] v;
272         SALOME::ExceptionStruct es;
273         es.type = SALOME::COMM;
274         es.text = "error read Socket exception";
275         throw SALOME::SALOME_Exception(es);
276       }
277       n += m;
278     }
279     xdr_vector( &xp, (char*)v, size, sizeof(T), (xdrproc_t)myFunc);
280     xdr_destroy( &xp );
281     
282     _mySender->endOfCom();
283     closeCom();
284   }
285   catch(SALOME::SALOME_Exception &ex){
286     if( ex.details.type == SALOME::COMM )
287       {
288         _senderDestruc=false;
289         cout << ex.details.text << endl;
290         throw MultiCommException("Unknown sender protocol");
291       }
292     else
293       throw ex;
294   }
295  
296   return v;
297 }
298
299 template<class T,int (*myFunc)(XDR*,T*)>
300 void SocketReceiver<T,myFunc>::initCom()
301 {
302   try{
303     _mySender->initCom();
304
305     /* Ouverture de la socket */
306     _clientSockfd = socket(AF_INET, SOCK_STREAM, 0);
307     if (_clientSockfd < 0) {
308       closeCom();
309       SALOME::ExceptionStruct es;
310       es.type = SALOME::COMM;
311       es.text = "error Socket exception";
312       throw SALOME::SALOME_Exception(es);
313     }
314   }
315   catch(SALOME::SALOME_Exception &ex){
316     if( ex.details.type == SALOME::COMM )
317       {
318         _senderDestruc=false;
319         cout << ex.details.text << endl;
320         throw MultiCommException("Unknown sender protocol");
321       }
322     else
323       throw ex;
324   }
325
326 }
327
328 template<class T,int (*myFunc)(XDR*,T*)>
329 void SocketReceiver<T,myFunc>::connectCom(const char *dest_address, int port)
330 {
331   struct sockaddr_in serv_addr;
332   struct hostent * server;
333   SALOME::ExceptionStruct es;
334
335   try{
336     /* reception of the host structure on the remote process */
337     server = gethostbyname(dest_address);
338     if( server == NULL ) {
339       closeCom();
340       es.type = SALOME::COMM;
341       es.text = "error unknown host Socket exception";
342       _senderDestruc=false;
343       throw SALOME::SALOME_Exception(es);
344     }
345
346     /* Initialisation of the socket structure */
347     bzero((char*)&serv_addr,sizeof(serv_addr));
348     serv_addr.sin_family = AF_INET;
349     serv_addr.sin_addr.s_addr = INADDR_ANY;
350     bcopy((char *)server->h_addr, 
351           (char *)&serv_addr.sin_addr.s_addr,
352           server->h_length);
353     serv_addr.sin_port = htons(port);
354     
355     if( connect(_clientSockfd, (struct sockaddr *) & serv_addr, sizeof(struct sockaddr)) < 0 ){
356       closeCom();
357       es.type = SALOME::COMM;
358       es.text = "error connect Socket exception";
359       _senderDestruc=false;
360       throw SALOME::SALOME_Exception(es);
361     }
362
363     _mySender->acceptCom();
364
365   }
366   catch(SALOME::SALOME_Exception &ex){
367     if( ex.details.type == SALOME::COMM )
368       {
369         _senderDestruc=false;
370         cout << ex.details.text << endl;
371         throw MultiCommException("Unknown sender protocol");
372       }
373     else
374       throw ex;
375   }
376
377 }
378
379
380 template<class T,int (*myFunc)(XDR*,T*)>
381 void SocketReceiver<T,myFunc>::closeCom()
382 {
383   _mySender->closeCom();
384   if( _clientSockfd >= 0 ){
385     close(_clientSockfd);
386     _clientSockfd = -1;
387   }
388  
389 }
390
391 #endif