Salome HOME
PR: merge from branch BR_UnitTests tag mergeto_trunk_17oct05
[modules/kernel.git] / src / Communication / Receivers.cxx
1 #include "poa.h"
2 #include "utilities.h"
3 using namespace std;
4
5 #define TAILLE_SPLIT 100000
6 #define TIMEOUT 20
7
8 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
9 CorbaNCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::CorbaNCNoCopyReceiver(CorbaSender mySender):_mySender(mySender){
10 }
11
12 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
13 CorbaNCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::~CorbaNCNoCopyReceiver(){
14   _mySender->release();
15 }
16
17 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
18 T *CorbaNCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::getDistValue(long &size)
19 {
20   TSeqCorba seq=_mySender->send();
21   size=seq->length();
22   return (T *)seq->get_buffer(1);
23 }
24
25 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
26 T *CorbaNCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::getValue(long &size)
27 {
28   return Receiver<T,servForT,ptrForT>::getValue(size,_mySender);
29 }
30
31 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
32 CorbaNCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::CorbaNCWithCopyReceiver(CorbaSender mySender):_mySender(mySender){
33 }
34
35 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
36 CorbaNCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::~CorbaNCWithCopyReceiver(){
37   _mySender->release();
38 }
39
40 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
41 T *CorbaNCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::getDistValue(long &size){
42   size=_mySender->getSize();
43   long n;
44   T *ret=new T[size];
45   T *iter=ret;
46   for(long i=0;i<size;i+=TAILLE_SPLIT)
47     {
48       if(size-i>TAILLE_SPLIT)
49         n=TAILLE_SPLIT;
50       else
51         n=size-i;
52       TSeqCorba seq=_mySender->sendPart(i,n);
53       T *seqd=(T *)seq->get_buffer(0);
54       for(long j=0;j<n;j++)
55         *iter++=*seqd++;
56     }
57   return ret;
58 }
59
60 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
61 T *CorbaNCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::getValue(long &size)
62 {
63   return Receiver<T,servForT,ptrForT>::getValue(size,_mySender);
64 }
65
66 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
67 CorbaWCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::CorbaWCNoCopyReceiver(CorbaSender mySender):_mySender(mySender){
68 }
69
70 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
71 CorbaWCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::~CorbaWCNoCopyReceiver(){
72   _mySender->release();
73 }
74
75 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
76 T *CorbaWCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::getDistValue(long &size){
77   size=_mySender->getSize();
78   long n;
79   T *ret=new T[size];
80   T *iter=ret;
81   for(long i=0;i<size;i+=TAILLE_SPLIT)
82     {
83       if(size-i>TAILLE_SPLIT)
84         n=TAILLE_SPLIT;
85       else
86         n=size-i;
87       TSeqCorba seq=_mySender->sendPart(i,n);
88       TCorba *seqd=seq->get_buffer(0);
89       for(long j=0;j<n;j++)
90         *iter++=*seqd++;
91     }
92   return ret;
93 }
94
95 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
96 T *CorbaWCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::getValue(long &size)
97 {
98   return Receiver<T,servForT,ptrForT>::getValue(size,_mySender);
99 }
100
101 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
102 CorbaWCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::CorbaWCWithCopyReceiver(CorbaSender mySender):_mySender(mySender){
103 }
104
105 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
106 CorbaWCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::~CorbaWCWithCopyReceiver(){
107   _mySender->release();
108 }
109
110 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
111 T *CorbaWCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::getDistValue(long &size){
112   size=_mySender->getSize();
113   long n;
114   T *ret=new T[size];
115   T *iter=ret;
116   for(long i=0;i<size;i+=TAILLE_SPLIT)
117     {
118       if(size-i>TAILLE_SPLIT)
119         n=TAILLE_SPLIT;
120       else
121         n=size-i;
122       TSeqCorba seq=_mySender->sendPart(i,n);
123       TCorba *seqd=seq->get_buffer(0);
124       for(long j=0;j<n;j++)
125       *iter++=*seqd++;
126     }
127   return ret;
128 }
129
130 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
131 T *CorbaWCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::getValue(long &size)
132 {
133   return Receiver<T,servForT,ptrForT>::getValue(size,_mySender);
134 }
135
136 #ifdef HAVE_MPI2
137
138 template<class T,MPI_Datatype T2,class CorbaSender,class servForT,class ptrForT>
139 MPIReceiver<T,T2,CorbaSender,servForT,ptrForT>::MPIReceiver(CorbaSender mySender):_mySender(mySender){
140 }
141
142 template<class T,MPI_Datatype T2,class CorbaSender,class servForT,class ptrForT>
143 MPIReceiver<T,T2,CorbaSender,servForT,ptrForT>::~MPIReceiver(){
144   _mySender->release();
145 }
146
147 template<class T,MPI_Datatype T2,class CorbaSender,class servForT,class ptrForT>
148 T *MPIReceiver<T,T2,CorbaSender,servForT,ptrForT>::getDistValue(long &size){
149   int i=0;
150   int myproc;
151   int sproc;
152   MPI_Status status;
153   MPI_Comm com; 
154   char   port_name_clt [MPI_MAX_PORT_NAME];
155   float telps, tuser, tsys, tcpu;
156   T *_v;
157   long _n;
158
159   
160   CORBA::Any a; 
161   MPI_Comm_rank(MPI_COMM_WORLD, &myproc);
162   SALOME::MPISender::param_var p =_mySender->getParam();
163   _mySender->send();
164   sproc = p->myproc;
165   MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_RETURN);
166   while ( i != TIMEOUT  && MPI_Lookup_name((char*)p->service,MPI_INFO_NULL,port_name_clt) != MPI_SUCCESS) { 
167     i++;
168   }       
169   MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_ARE_FATAL);
170   if ( i == TIMEOUT  ) { 
171     MPI_Finalize();
172     exit(-1);
173   }
174   else{
175     //       Connect to service, get the inter-communicator server
176     //      Attention MPI_Comm_connect est un appel collectif :
177     //  - Si lancement mpirun -c n -----> uniquement     MPI_COMM_SELF fonctionne
178     //  - Si lancement client_server&client_server ----> MPI_COMM_WORLD fonctionne
179     
180     //      TIMEOUT is inefficient since MPI_Comm_Connect doesn't return if we asked for
181     //        a service that has been unpublished !
182     MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_RETURN);
183     i = 0;
184     while ( i != TIMEOUT  &&  MPI_Comm_connect(port_name_clt, MPI_INFO_NULL, 0, MPI_COMM_WORLD, &com)!=MPI_SUCCESS ) { 
185       i++; 
186     } 
187     MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_ARE_FATAL);
188     if ( i == TIMEOUT ) {
189       MPI_Finalize(); 
190       exit(-1);
191     }
192   }
193   MPI_Recv( &_n, 1, MPI_LONG, sproc,p->tag1,com,&status);
194   _v = new T[_n];
195   MPI_Recv( _v, _n, T2, sproc,p->tag2,com,&status);
196   _mySender->close(p);
197   MPI_Comm_disconnect( &com );  
198   size=_n;
199   return _v;
200 }
201
202 template<class T,MPI_Datatype T2,class CorbaSender,class servForT,class ptrForT>
203 T *MPIReceiver<T,T2,CorbaSender,servForT,ptrForT>::getValue(long &size)
204 {
205   return Receiver<T,servForT,ptrForT>::getValue(size,_mySender);
206 }
207
208 #endif
209
210 #ifdef HAVE_SOCKET
211 #include <sys/types.h>
212 #include <sys/socket.h>
213 #include <netinet/in.h>
214 #include <arpa/inet.h>
215 #include <netdb.h>
216 #include <unistd.h>
217 #include <rpc/xdr.h>
218
219 template<class T,int (*myFunc)(XDR*,T*),class CorbaSender,class servForT,class ptrForT>
220 SocketReceiver<T,myFunc,CorbaSender,servForT,ptrForT>::SocketReceiver(CorbaSender mySender) : _mySender(mySender)
221 {
222   _clientSockfd = -1;
223   _senderDestruc=true;
224 }
225
226 template<class T,int (*myFunc)(XDR*,T*),class CorbaSender,class servForT,class ptrForT>
227 SocketReceiver<T,myFunc,CorbaSender,servForT,ptrForT>::~SocketReceiver()
228 {
229   if(_senderDestruc)
230     {
231       _mySender->release();
232     }
233 }
234
235 template<class T,int (*myFunc)(XDR*,T*),class CorbaSender,class servForT,class ptrForT>
236 T *SocketReceiver<T,myFunc,CorbaSender,servForT,ptrForT>::getValue(long &size)
237 {
238   return Receiver<T,servForT,ptrForT>::getValue(size,_mySender);
239 }
240
241 template<class T,int (*myFunc)(XDR*,T*),class CorbaSender,class servForT,class ptrForT>
242 T* SocketReceiver<T,myFunc,CorbaSender,servForT,ptrForT>::getDistValue(long &size)
243 {
244   int n=0, m;
245   T *v;
246   XDR xp; /* pointeur sur le decodeur XDR */
247
248   try{
249     initCom();
250
251     SALOME::SocketSender::param_var p = _mySender->getParam();
252
253     size = p->lend - p->lstart + 1;
254     v = new T[size];
255
256     connectCom(p->internet_address, p->myport);
257   
258     _mySender->send();
259
260     xdrmem_create(&xp,(char*)v,size*sizeof(T),XDR_DECODE );
261     while( n < size*sizeof(T) ){
262       m = read(_clientSockfd, (char*)v+n, size*sizeof(T)-n);
263       if( m < 0 ){
264         closeCom();
265         delete [] v;
266         SALOME::ExceptionStruct es;
267         es.type = SALOME::COMM;
268         es.text = "error read Socket exception";
269         throw SALOME::SALOME_Exception(es);
270       }
271       n += m;
272     }
273     xdr_vector( &xp, (char*)v, size, sizeof(T), (xdrproc_t)myFunc);
274     xdr_destroy( &xp );
275     
276     _mySender->endOfCom();
277     closeCom();
278   }
279   catch(SALOME::SALOME_Exception &ex){
280     if( ex.details.type == SALOME::COMM )
281       {
282         _senderDestruc=false;
283         cout << ex.details.text << endl;
284         throw MultiCommException("Unknown sender protocol");
285       }
286     else
287       throw ex;
288   }
289  
290   return v;
291 }
292
293 template<class T,int (*myFunc)(XDR*,T*),class CorbaSender,class servForT,class ptrForT>
294 void SocketReceiver<T,myFunc,CorbaSender,servForT,ptrForT>::initCom()
295 {
296   try{
297     _mySender->initCom();
298
299     /* Ouverture de la socket */
300     _clientSockfd = socket(AF_INET, SOCK_STREAM, 0);
301     if (_clientSockfd < 0) {
302       closeCom();
303       SALOME::ExceptionStruct es;
304       es.type = SALOME::COMM;
305       es.text = "error Socket exception";
306       throw SALOME::SALOME_Exception(es);
307     }
308   }
309   catch(SALOME::SALOME_Exception &ex){
310     if( ex.details.type == SALOME::COMM )
311       {
312         _senderDestruc=false;
313         cout << ex.details.text << endl;
314         throw MultiCommException("Unknown sender protocol");
315       }
316     else
317       throw ex;
318   }
319
320 }
321
322 template<class T,int (*myFunc)(XDR*,T*),class CorbaSender,class servForT,class ptrForT>
323 void SocketReceiver<T,myFunc,CorbaSender,servForT,ptrForT>::connectCom(const char *dest_address, int port)
324 {
325   struct sockaddr_in serv_addr;
326   struct hostent * server;
327   SALOME::ExceptionStruct es;
328
329   try{
330     /* reception of the host structure on the remote process */
331     server = gethostbyname(dest_address);
332     if( server == NULL ) {
333       closeCom();
334       es.type = SALOME::COMM;
335       es.text = "error unknown host Socket exception";
336       _senderDestruc=false;
337       throw SALOME::SALOME_Exception(es);
338     }
339
340     /* Initialisation of the socket structure */
341     bzero((char*)&serv_addr,sizeof(serv_addr));
342     serv_addr.sin_family = AF_INET;
343     serv_addr.sin_addr.s_addr = INADDR_ANY;
344     bcopy((char *)server->h_addr, 
345           (char *)&serv_addr.sin_addr.s_addr,
346           server->h_length);
347     serv_addr.sin_port = htons(port);
348     
349     if( connect(_clientSockfd, (struct sockaddr *) & serv_addr, sizeof(struct sockaddr)) < 0 ){
350       closeCom();
351       es.type = SALOME::COMM;
352       es.text = "error connect Socket exception";
353       _senderDestruc=false;
354       throw SALOME::SALOME_Exception(es);
355     }
356
357     _mySender->acceptCom();
358
359   }
360   catch(SALOME::SALOME_Exception &ex){
361     if( ex.details.type == SALOME::COMM )
362       {
363         _senderDestruc=false;
364         cout << ex.details.text << endl;
365         throw MultiCommException("Unknown sender protocol");
366       }
367     else
368       throw ex;
369   }
370
371 }
372
373
374 template<class T,int (*myFunc)(XDR*,T*),class CorbaSender,class servForT,class ptrForT>
375 void SocketReceiver<T,myFunc,CorbaSender,servForT,ptrForT>::closeCom()
376 {
377   _mySender->closeCom();
378   if( _clientSockfd >= 0 ){
379     close(_clientSockfd);
380     _clientSockfd = -1;
381   }
382  
383 }
384
385 #endif