Salome HOME
Join modifications from branch BR_DEBUG_3_2_0b1
[modules/kernel.git] / src / Communication / Receivers.cxx
1 // Copyright (C) 2005  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
2 // CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
3 // 
4 // This library is free software; you can redistribute it and/or
5 // modify it under the terms of the GNU Lesser General Public
6 // License as published by the Free Software Foundation; either 
7 // version 2.1 of the License.
8 // 
9 // This library is distributed in the hope that it will be useful 
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of 
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
12 // Lesser General Public License for more details.
13 //
14 // You should have received a copy of the GNU Lesser General Public  
15 // License along with this library; if not, write to the Free Software 
16 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
17 //
18 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
19 //
20 #include "poa.h"
21 #include "utilities.h"
22 using namespace std;
23
24 #define TAILLE_SPLIT 100000
25 #define TIMEOUT 20
26
27 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
28 CorbaNCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::CorbaNCNoCopyReceiver(CorbaSender mySender):_mySender(mySender){
29 }
30
31 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
32 CorbaNCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::~CorbaNCNoCopyReceiver(){
33   _mySender->release();
34 }
35
36 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
37 T *CorbaNCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::getDistValue(long &size)
38 {
39   TSeqCorba seq=_mySender->send();
40   size=seq->length();
41   return (T *)seq->get_buffer(1);
42 }
43
44 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
45 T *CorbaNCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::getValue(long &size)
46 {
47   return Receiver<T,servForT,ptrForT>::getValue(size,_mySender);
48 }
49
50 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
51 CorbaNCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::CorbaNCWithCopyReceiver(CorbaSender mySender):_mySender(mySender){
52 }
53
54 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
55 CorbaNCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::~CorbaNCWithCopyReceiver(){
56   _mySender->release();
57 }
58
59 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
60 T *CorbaNCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::getDistValue(long &size){
61   size=_mySender->getSize();
62   long n;
63   T *ret=new T[size];
64   T *iter=ret;
65   for(long i=0;i<size;i+=TAILLE_SPLIT)
66     {
67       if(size-i>TAILLE_SPLIT)
68         n=TAILLE_SPLIT;
69       else
70         n=size-i;
71       TSeqCorba seq=_mySender->sendPart(i,n);
72       T *seqd=(T *)seq->get_buffer(0);
73       for(long j=0;j<n;j++)
74         *iter++=*seqd++;
75     }
76   return ret;
77 }
78
79 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
80 T *CorbaNCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::getValue(long &size)
81 {
82   return Receiver<T,servForT,ptrForT>::getValue(size,_mySender);
83 }
84
85 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
86 CorbaWCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::CorbaWCNoCopyReceiver(CorbaSender mySender):_mySender(mySender){
87 }
88
89 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
90 CorbaWCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::~CorbaWCNoCopyReceiver(){
91   _mySender->release();
92 }
93
94 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
95 T *CorbaWCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::getDistValue(long &size){
96   size=_mySender->getSize();
97   long n;
98   T *ret=new T[size];
99   T *iter=ret;
100   for(long i=0;i<size;i+=TAILLE_SPLIT)
101     {
102       if(size-i>TAILLE_SPLIT)
103         n=TAILLE_SPLIT;
104       else
105         n=size-i;
106       TSeqCorba seq=_mySender->sendPart(i,n);
107       TCorba *seqd=seq->get_buffer(0);
108       for(long j=0;j<n;j++)
109         *iter++=*seqd++;
110     }
111   return ret;
112 }
113
114 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
115 T *CorbaWCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::getValue(long &size)
116 {
117   return Receiver<T,servForT,ptrForT>::getValue(size,_mySender);
118 }
119
120 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
121 CorbaWCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::CorbaWCWithCopyReceiver(CorbaSender mySender):_mySender(mySender){
122 }
123
124 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
125 CorbaWCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::~CorbaWCWithCopyReceiver(){
126   _mySender->release();
127 }
128
129 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
130 T *CorbaWCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::getDistValue(long &size){
131   size=_mySender->getSize();
132   long n;
133   T *ret=new T[size];
134   T *iter=ret;
135   for(long i=0;i<size;i+=TAILLE_SPLIT)
136     {
137       if(size-i>TAILLE_SPLIT)
138         n=TAILLE_SPLIT;
139       else
140         n=size-i;
141       TSeqCorba seq=_mySender->sendPart(i,n);
142       TCorba *seqd=seq->get_buffer(0);
143       for(long j=0;j<n;j++)
144       *iter++=*seqd++;
145     }
146   return ret;
147 }
148
149 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
150 T *CorbaWCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::getValue(long &size)
151 {
152   return Receiver<T,servForT,ptrForT>::getValue(size,_mySender);
153 }
154
155 #ifdef HAVE_MPI2
156
157 template<class T,class CorbaSender,class servForT,class ptrForT>
158 MPIReceiver<T,CorbaSender,servForT,ptrForT>::MPIReceiver(CorbaSender mySender):_mySender(mySender){
159 }
160
161 template<class T,class CorbaSender,class servForT,class ptrForT>
162 MPIReceiver<T,CorbaSender,servForT,ptrForT>::~MPIReceiver(){
163   _mySender->release();
164 }
165
166 template<class T,class CorbaSender,class servForT,class ptrForT>
167 T *MPIReceiver<T,CorbaSender,servForT,ptrForT>::getDistValue(long &size){
168   int i=0;
169   int myproc;
170   int sproc;
171   MPI_Status status;
172   MPI_Comm com; 
173   char   port_name_clt [MPI_MAX_PORT_NAME];
174   float telps, tuser, tsys, tcpu;
175   T *_v;
176   long _n;
177
178   
179   CORBA::Any a; 
180   MPI_Comm_rank(MPI_COMM_WORLD, &myproc);
181   SALOME::MPISender::param_var p =_mySender->getParam();
182   _mySender->send();
183   sproc = p->myproc;
184   MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_RETURN);
185   while ( i != TIMEOUT  && MPI_Lookup_name((char*)p->service,MPI_INFO_NULL,port_name_clt) != MPI_SUCCESS) { 
186     i++;
187   }       
188   MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_ARE_FATAL);
189   if ( i == TIMEOUT  ) { 
190     MPI_Finalize();
191     exit(-1);
192   }
193   else{
194     //       Connect to service, get the inter-communicator server
195     //      Attention MPI_Comm_connect est un appel collectif :
196     //  - Si lancement mpirun -c n -----> uniquement     MPI_COMM_SELF fonctionne
197     //  - Si lancement client_server&client_server ----> MPI_COMM_WORLD fonctionne
198     
199     //      TIMEOUT is inefficient since MPI_Comm_Connect doesn't return if we asked for
200     //        a service that has been unpublished !
201     MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_RETURN);
202     i = 0;
203     while ( i != TIMEOUT  &&  MPI_Comm_connect(port_name_clt, MPI_INFO_NULL, 0, MPI_COMM_WORLD, &com)!=MPI_SUCCESS ) { 
204       i++; 
205     } 
206     MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_ARE_FATAL);
207     if ( i == TIMEOUT ) {
208       MPI_Finalize(); 
209       exit(-1);
210     }
211   }
212   MPI_Recv( &_n, 1, MPI_LONG, sproc,p->tag1,com,&status);
213   _v = new T[_n];
214   MPI_Recv( _v, _n, MPITRAITS<T>::MpiType, sproc,p->tag2,com,&status);
215   _mySender->close(p);
216   MPI_Comm_disconnect( &com );  
217   size=_n;
218   return _v;
219 }
220
221 template<class T,class CorbaSender,class servForT,class ptrForT>
222 T *MPIReceiver<T,CorbaSender,servForT,ptrForT>::getValue(long &size)
223 {
224   return Receiver<T,servForT,ptrForT>::getValue(size,_mySender);
225 }
226
227 #endif
228
229 #ifdef HAVE_SOCKET
230 #include <sys/types.h>
231 #include <sys/socket.h>
232 #include <netinet/in.h>
233 #include <arpa/inet.h>
234 #include <netdb.h>
235 #include <unistd.h>
236 #include <rpc/xdr.h>
237
238 template<class T,int (*myFunc)(XDR*,T*),class CorbaSender,class servForT,class ptrForT>
239 SocketReceiver<T,myFunc,CorbaSender,servForT,ptrForT>::SocketReceiver(CorbaSender mySender) : _mySender(mySender)
240 {
241   _clientSockfd = -1;
242   _senderDestruc=true;
243 }
244
245 template<class T,int (*myFunc)(XDR*,T*),class CorbaSender,class servForT,class ptrForT>
246 SocketReceiver<T,myFunc,CorbaSender,servForT,ptrForT>::~SocketReceiver()
247 {
248   if(_senderDestruc)
249     {
250       _mySender->release();
251     }
252 }
253
254 template<class T,int (*myFunc)(XDR*,T*),class CorbaSender,class servForT,class ptrForT>
255 T *SocketReceiver<T,myFunc,CorbaSender,servForT,ptrForT>::getValue(long &size)
256 {
257   return Receiver<T,servForT,ptrForT>::getValue(size,_mySender);
258 }
259
260 template<class T,int (*myFunc)(XDR*,T*),class CorbaSender,class servForT,class ptrForT>
261 T* SocketReceiver<T,myFunc,CorbaSender,servForT,ptrForT>::getDistValue(long &size)
262 {
263   int n=0, m;
264   T *v;
265   XDR xp; /* pointeur sur le decodeur XDR */
266
267   try{
268     initCom();
269
270     SALOME::SocketSender::param_var p = _mySender->getParam();
271
272     size = p->lend - p->lstart + 1;
273     v = new T[size];
274
275     connectCom(p->internet_address, p->myport);
276   
277     _mySender->send();
278
279     xdrmem_create(&xp,(char*)v,size*sizeof(T),XDR_DECODE );
280     while( n < size*sizeof(T) ){
281       m = read(_clientSockfd, (char*)v+n, size*sizeof(T)-n);
282       if( m < 0 ){
283         closeCom();
284         delete [] v;
285         SALOME::ExceptionStruct es;
286         es.type = SALOME::COMM;
287         es.text = "error read Socket exception";
288         throw SALOME::SALOME_Exception(es);
289       }
290       n += m;
291     }
292     xdr_vector( &xp, (char*)v, size, sizeof(T), (xdrproc_t)myFunc);
293     xdr_destroy( &xp );
294     
295     _mySender->endOfCom();
296     closeCom();
297   }
298   catch(SALOME::SALOME_Exception &ex){
299     if( ex.details.type == SALOME::COMM )
300       {
301         _senderDestruc=false;
302         cout << ex.details.text << endl;
303         throw MultiCommException("Unknown sender protocol");
304       }
305     else
306       throw ex;
307   }
308  
309   return v;
310 }
311
312 template<class T,int (*myFunc)(XDR*,T*),class CorbaSender,class servForT,class ptrForT>
313 void SocketReceiver<T,myFunc,CorbaSender,servForT,ptrForT>::initCom()
314 {
315   try{
316     _mySender->initCom();
317
318     /* Ouverture de la socket */
319     _clientSockfd = socket(AF_INET, SOCK_STREAM, 0);
320     if (_clientSockfd < 0) {
321       closeCom();
322       SALOME::ExceptionStruct es;
323       es.type = SALOME::COMM;
324       es.text = "error Socket exception";
325       throw SALOME::SALOME_Exception(es);
326     }
327   }
328   catch(SALOME::SALOME_Exception &ex){
329     if( ex.details.type == SALOME::COMM )
330       {
331         _senderDestruc=false;
332         cout << ex.details.text << endl;
333         throw MultiCommException("Unknown sender protocol");
334       }
335     else
336       throw ex;
337   }
338
339 }
340
341 template<class T,int (*myFunc)(XDR*,T*),class CorbaSender,class servForT,class ptrForT>
342 void SocketReceiver<T,myFunc,CorbaSender,servForT,ptrForT>::connectCom(const char *dest_address, int port)
343 {
344   struct sockaddr_in serv_addr;
345   struct hostent * server;
346   SALOME::ExceptionStruct es;
347
348   try{
349     /* reception of the host structure on the remote process */
350     server = gethostbyname(dest_address);
351     if( server == NULL ) {
352       closeCom();
353       es.type = SALOME::COMM;
354       es.text = "error unknown host Socket exception";
355       _senderDestruc=false;
356       throw SALOME::SALOME_Exception(es);
357     }
358
359     /* Initialisation of the socket structure */
360     bzero((char*)&serv_addr,sizeof(serv_addr));
361     serv_addr.sin_family = AF_INET;
362     serv_addr.sin_addr.s_addr = INADDR_ANY;
363     bcopy((char *)server->h_addr, 
364           (char *)&serv_addr.sin_addr.s_addr,
365           server->h_length);
366     serv_addr.sin_port = htons(port);
367     
368     if( connect(_clientSockfd, (struct sockaddr *) & serv_addr, sizeof(struct sockaddr)) < 0 ){
369       closeCom();
370       es.type = SALOME::COMM;
371       es.text = "error connect Socket exception";
372       _senderDestruc=false;
373       throw SALOME::SALOME_Exception(es);
374     }
375
376     _mySender->acceptCom();
377
378   }
379   catch(SALOME::SALOME_Exception &ex){
380     if( ex.details.type == SALOME::COMM )
381       {
382         _senderDestruc=false;
383         cout << ex.details.text << endl;
384         throw MultiCommException("Unknown sender protocol");
385       }
386     else
387       throw ex;
388   }
389
390 }
391
392
393 template<class T,int (*myFunc)(XDR*,T*),class CorbaSender,class servForT,class ptrForT>
394 void SocketReceiver<T,myFunc,CorbaSender,servForT,ptrForT>::closeCom()
395 {
396   _mySender->closeCom();
397   if( _clientSockfd >= 0 ){
398     close(_clientSockfd);
399     _clientSockfd = -1;
400   }
401  
402 }
403
404 #endif