]> SALOME platform Git repositories - modules/kernel.git/blob - src/Communication/Receivers.cxx
Salome HOME
PR: mergefrom_BR_GEAY_05Nov04
[modules/kernel.git] / src / Communication / Receivers.cxx
1 #include "poa.h"
2 #include "utilities.h"
3
4 #define TAILLE_SPLIT 100000
5 #define TIMEOUT 20
6
7 template<class T,class TCorba,class TSeqCorba,class CorbaSender>
8 CorbaNCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender>::CorbaNCNoCopyReceiver(CorbaSender mySender):_mySender(mySender){
9 }
10
11 template<class T,class TCorba,class TSeqCorba,class CorbaSender>
12 CorbaNCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender>::~CorbaNCNoCopyReceiver(){
13   _mySender->release();
14   CORBA::release(_mySender);
15 }
16
17 template<class T,class TCorba,class TSeqCorba,class CorbaSender>
18 void *CorbaNCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender>::getDistValue(long &size)
19 {
20   TSeqCorba seq=_mySender->send();
21   size=seq->length();
22   return seq->get_buffer(1);
23 }
24
25 template<class T,class TCorba,class TSeqCorba,class CorbaSender>
26 void *CorbaNCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender>::getValue(long &size)
27 {
28   return Receiver::getValue(size,_mySender);
29 }
30
31 template<class T,class TCorba,class TSeqCorba,class CorbaSender>
32 CorbaNCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender>::CorbaNCWithCopyReceiver(CorbaSender mySender):_mySender(mySender){
33 }
34
35 template<class T,class TCorba,class TSeqCorba,class CorbaSender>
36 CorbaNCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender>::~CorbaNCWithCopyReceiver(){
37   _mySender->release();
38   CORBA::release(_mySender);
39 }
40
41 template<class T,class TCorba,class TSeqCorba,class CorbaSender>
42 void *CorbaNCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender>::getDistValue(long &size){
43   size=_mySender->getSize();
44   long n;
45   T *ret=new T[size];
46   T *iter=ret;
47   for(long i=0;i<size;i+=TAILLE_SPLIT)
48     {
49       if(size-i>TAILLE_SPLIT)
50         n=TAILLE_SPLIT;
51       else
52         n=size-i;
53       TSeqCorba seq=_mySender->sendPart(i,n);
54       T *seqd=(T *)seq->get_buffer(0);
55       for(long j=0;j<n;j++)
56         *iter++=*seqd++;
57     }
58   return ret;
59 }
60
61 template<class T,class TCorba,class TSeqCorba,class CorbaSender>
62 void *CorbaNCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender>::getValue(long &size)
63 {
64   return Receiver::getValue(size,_mySender);
65 }
66
67 template<class T,class TCorba,class TSeqCorba,class CorbaSender>
68 CorbaWCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender>::CorbaWCNoCopyReceiver(CorbaSender mySender):_mySender(mySender){
69 }
70
71 template<class T,class TCorba,class TSeqCorba,class CorbaSender>
72 CorbaWCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender>::~CorbaWCNoCopyReceiver(){
73   _mySender->release();
74   CORBA::release(_mySender);
75 }
76
77 template<class T,class TCorba,class TSeqCorba,class CorbaSender>
78 void *CorbaWCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender>::getDistValue(long &size){
79   size=_mySender->getSize();
80   long n;
81   T *ret=new T[size];
82   T *iter=ret;
83   for(long i=0;i<size;i+=TAILLE_SPLIT)
84     {
85       if(size-i>TAILLE_SPLIT)
86         n=TAILLE_SPLIT;
87       else
88         n=size-i;
89       TSeqCorba seq=_mySender->sendPart(i,n);
90       TCorba *seqd=seq->get_buffer(0);
91       for(long j=0;j<n;j++)
92         *iter++=*seqd++;
93     }
94   return ret;
95 }
96
97 template<class T,class TCorba,class TSeqCorba,class CorbaSender>
98 void *CorbaWCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender>::getValue(long &size)
99 {
100   return Receiver::getValue(size,_mySender);
101 }
102
103 template<class T,class TCorba,class TSeqCorba,class CorbaSender>
104 CorbaWCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender>::CorbaWCWithCopyReceiver(CorbaSender mySender):_mySender(mySender){
105 }
106
107 template<class T,class TCorba,class TSeqCorba,class CorbaSender>
108 CorbaWCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender>::~CorbaWCWithCopyReceiver(){
109   _mySender->release();
110   CORBA::release(_mySender);
111 }
112
113 template<class T,class TCorba,class TSeqCorba,class CorbaSender>
114 void *CorbaWCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender>::getDistValue(long &size){
115   size=_mySender->getSize();
116   long n;
117   T *ret=new T[size];
118   T *iter=ret;
119   for(long i=0;i<size;i+=TAILLE_SPLIT)
120     {
121       if(size-i>TAILLE_SPLIT)
122         n=TAILLE_SPLIT;
123       else
124         n=size-i;
125       TSeqCorba seq=_mySender->sendPart(i,n);
126       TCorba *seqd=seq->get_buffer(0);
127       for(long j=0;j<n;j++)
128       *iter++=*seqd++;
129     }
130   return ret;
131 }
132
133 template<class T,class TCorba,class TSeqCorba,class CorbaSender>
134 void *CorbaWCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender>::getValue(long &size)
135 {
136   return Receiver::getValue(size,_mySender);
137 }
138
139 #ifdef HAVE_MPI2
140
141 template<class T,MPI_Datatype T2>
142 MPIReceiver<T,T2>::MPIReceiver(SALOME::MPISender_ptr mySender):_mySender(mySender){
143 }
144
145 template<class T,MPI_Datatype T2>
146 MPIReceiver<T,T2>::~MPIReceiver(){
147   _mySender->release();
148   CORBA::release(_mySender);
149 }
150
151 template<class T,MPI_Datatype T2>
152 void *MPIReceiver<T,T2>::getDistValue(long &size){
153   int i=0;
154   int myproc;
155   int sproc;
156   MPI_Status status;
157   MPI_Comm com; 
158   char   port_name_clt [MPI_MAX_PORT_NAME];
159   float telps, tuser, tsys, tcpu;
160   T *_v;
161   long _n;
162
163   
164   CORBA::Any a; 
165   MPI_Comm_rank(MPI_COMM_WORLD, &myproc);
166   SALOME::MPISender::param_var p =_mySender->getParam();
167   _mySender->send();
168   sproc = p->myproc;
169   MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_RETURN);
170   while ( i != TIMEOUT  && MPI_Lookup_name((char*)p->service,MPI_INFO_NULL,port_name_clt) != MPI_SUCCESS) { 
171     i++;
172   }       
173   MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_ARE_FATAL);
174   if ( i == TIMEOUT  ) { 
175     MPI_Finalize();
176     exit(-1);
177   }
178   else{
179     //       Connect to service, get the inter-communicator server
180     //      Attention MPI_Comm_connect est un appel collectif :
181     //  - Si lancement mpirun -c n -----> uniquement     MPI_COMM_SELF fonctionne
182     //  - Si lancement client_server&client_server ----> MPI_COMM_WORLD fonctionne
183     
184     //      TIMEOUT is inefficient since MPI_Comm_Connect doesn't return if we asked for
185     //        a service that has been unpublished !
186     MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_RETURN);
187     i = 0;
188     while ( i != TIMEOUT  &&  MPI_Comm_connect(port_name_clt, MPI_INFO_NULL, 0, MPI_COMM_WORLD, &com)!=MPI_SUCCESS ) { 
189       i++; 
190     } 
191     MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_ARE_FATAL);
192     if ( i == TIMEOUT ) {
193       MPI_Finalize(); 
194       exit(-1);
195     }
196   }
197   MPI_Recv( &_n, 1, MPI_LONG, sproc,p->tag1,com,&status);
198   _v = new T[_n];
199   MPI_Recv( _v, _n, T2, sproc,p->tag2,com,&status);
200   _mySender->close(p);
201   MPI_Comm_disconnect( &com );  
202   size=_n;
203   return _v;
204 }
205
206 template<class T,MPI_Datatype T2>
207 void *MPIReceiver<T,T2>::getValue(long &size)
208 {
209   return Receiver::getValue(size,_mySender);
210 }
211
212 #endif
213
214 #ifdef HAVE_SOCKET
215 #include <sys/types.h>
216 #include <sys/socket.h>
217 #include <netinet/in.h>
218 #include <arpa/inet.h>
219 #include <netdb.h>
220 #include <unistd.h>
221 #include <rpc/xdr.h>
222
223 template<class T,int (*myFunc)(XDR*,T*)>
224 SocketReceiver<T,myFunc>::SocketReceiver(SALOME::SocketSender_ptr mySender) : _mySender(mySender)
225 {
226   _clientSockfd = -1;
227   _senderDestruc=true;
228 }
229
230 template<class T,int (*myFunc)(XDR*,T*)>
231 SocketReceiver<T,myFunc>::~SocketReceiver()
232 {
233   if(_senderDestruc)
234     {
235       _mySender->release();
236       CORBA::release(_mySender);
237     }
238 }
239
240 template<class T,int (*myFunc)(XDR*,T*)>
241 void *SocketReceiver<T,myFunc>::getValue(long &size)
242 {
243   return Receiver::getValue(size,_mySender);
244 }
245
246 template<class T,int (*myFunc)(XDR*,T*)>
247 void* SocketReceiver<T,myFunc>::getDistValue(long &size)
248 {
249   int n=0, m;
250   T *v;
251   XDR xp; /* pointeur sur le decodeur XDR */
252
253   try{
254     initCom();
255
256     SALOME::SocketSender::param_var p = _mySender->getParam();
257
258     size = p->lend - p->lstart + 1;
259     v = new T[size];
260
261     connectCom(p->internet_address, p->myport);
262   
263     _mySender->send();
264
265     xdrmem_create(&xp,(char*)v,size*sizeof(T),XDR_DECODE );
266     while( n < size*sizeof(T) ){
267       m = read(_clientSockfd, (char*)v+n, size*sizeof(T)-n);
268       if( m < 0 ){
269         closeCom();
270         delete [] v;
271         SALOME::ExceptionStruct es;
272         es.type = SALOME::COMM;
273         es.text = "error read Socket exception";
274         throw SALOME::SALOME_Exception(es);
275       }
276       n += m;
277     }
278     xdr_vector( &xp, (char*)v, size, sizeof(T), (xdrproc_t)myFunc);
279     xdr_destroy( &xp );
280     
281     _mySender->endOfCom();
282     closeCom();
283   }
284   catch(SALOME::SALOME_Exception &ex){
285     if( ex.details.type == SALOME::COMM )
286       {
287         _senderDestruc=false;
288         cout << ex.details.text << endl;
289         throw MultiCommException("Unknown sender protocol");
290       }
291     else
292       throw ex;
293   }
294  
295   return v;
296 }
297
298 template<class T,int (*myFunc)(XDR*,T*)>
299 void SocketReceiver<T,myFunc>::initCom()
300 {
301   try{
302     _mySender->initCom();
303
304     /* Ouverture de la socket */
305     _clientSockfd = socket(AF_INET, SOCK_STREAM, 0);
306     if (_clientSockfd < 0) {
307       closeCom();
308       SALOME::ExceptionStruct es;
309       es.type = SALOME::COMM;
310       es.text = "error Socket exception";
311       throw SALOME::SALOME_Exception(es);
312     }
313   }
314   catch(SALOME::SALOME_Exception &ex){
315     if( ex.details.type == SALOME::COMM )
316       {
317         _senderDestruc=false;
318         cout << ex.details.text << endl;
319         throw MultiCommException("Unknown sender protocol");
320       }
321     else
322       throw ex;
323   }
324
325 }
326
327 template<class T,int (*myFunc)(XDR*,T*)>
328 void SocketReceiver<T,myFunc>::connectCom(const char *dest_address, int port)
329 {
330   struct sockaddr_in serv_addr;
331   struct hostent * server;
332   SALOME::ExceptionStruct es;
333
334   try{
335     /* reception of the host structure on the remote process */
336     server = gethostbyname(dest_address);
337     if( server == NULL ) {
338       closeCom();
339       es.type = SALOME::COMM;
340       es.text = "error unknown host Socket exception";
341       _senderDestruc=false;
342       throw SALOME::SALOME_Exception(es);
343     }
344
345     /* Initialisation of the socket structure */
346     bzero((char*)&serv_addr,sizeof(serv_addr));
347     serv_addr.sin_family = AF_INET;
348     serv_addr.sin_addr.s_addr = INADDR_ANY;
349     bcopy((char *)server->h_addr, 
350           (char *)&serv_addr.sin_addr.s_addr,
351           server->h_length);
352     serv_addr.sin_port = htons(port);
353     
354     if( connect(_clientSockfd, (struct sockaddr *) & serv_addr, sizeof(struct sockaddr)) < 0 ){
355       closeCom();
356       es.type = SALOME::COMM;
357       es.text = "error connect Socket exception";
358       _senderDestruc=false;
359       throw SALOME::SALOME_Exception(es);
360     }
361
362     _mySender->acceptCom();
363
364   }
365   catch(SALOME::SALOME_Exception &ex){
366     if( ex.details.type == SALOME::COMM )
367       {
368         _senderDestruc=false;
369         cout << ex.details.text << endl;
370         throw MultiCommException("Unknown sender protocol");
371       }
372     else
373       throw ex;
374   }
375
376 }
377
378
379 template<class T,int (*myFunc)(XDR*,T*)>
380 void SocketReceiver<T,myFunc>::closeCom()
381 {
382   _mySender->closeCom();
383   if( _clientSockfd >= 0 ){
384     close(_clientSockfd);
385     _clientSockfd = -1;
386   }
387  
388 }
389
390 #endif