Salome HOME
updated copyright message
[modules/kernel.git] / src / Communication / Receivers.cxx
1 // Copyright (C) 2007-2023  CEA, EDF, OPEN CASCADE
2 //
3 // Copyright (C) 2003-2007  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 // CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
5 //
6 // This library is free software; you can redistribute it and/or
7 // modify it under the terms of the GNU Lesser General Public
8 // License as published by the Free Software Foundation; either
9 // version 2.1 of the License, or (at your option) any later version.
10 //
11 // This library is distributed in the hope that it will be useful,
12 // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 // Lesser General Public License for more details.
15 //
16 // You should have received a copy of the GNU Lesser General Public
17 // License along with this library; if not, write to the Free Software
18 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
19 //
20 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 //
22
23 #include "omniORB4/poa.h"
24 #include "utilities.h"
25 #include "Basics_MpiUtils.hxx"
26
27 #define TAILLE_SPLIT 100000
28 #define TIMEOUT 20
29
30 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
31 CorbaNCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::CorbaNCNoCopyReceiver(CorbaSender mySender):_mySender(mySender){
32 }
33
34 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
35 CorbaNCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::~CorbaNCNoCopyReceiver(){
36   _mySender->release();
37 }
38
39 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
40 T *CorbaNCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::getDistValue(long &size)
41 {
42   TSeqCorba seq=_mySender->send();
43   size=seq->length();
44   return (T *)seq->get_buffer(1);
45 }
46
47 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
48 T *CorbaNCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::getValue(long &size)
49 {
50   return Receiver<T,servForT,ptrForT>::getValue(size,_mySender);
51 }
52
53 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
54 CorbaNCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::CorbaNCWithCopyReceiver(CorbaSender mySender):_mySender(mySender){
55 }
56
57 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
58 CorbaNCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::~CorbaNCWithCopyReceiver(){
59   _mySender->release();
60 }
61
62 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
63 T *CorbaNCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::getDistValue(long &size){
64   size=_mySender->getSize();
65   long n;
66   T *ret=new T[size];
67   T *iter=ret;
68   for(long i=0;i<size;i+=TAILLE_SPLIT)
69     {
70       if(size-i>TAILLE_SPLIT)
71         n=TAILLE_SPLIT;
72       else
73         n=size-i;
74       TSeqCorba seq=_mySender->sendPart(i,n);
75       T *seqd=(T *)seq->get_buffer(0);
76       for(long j=0;j<n;j++)
77         *iter++=*seqd++;
78     }
79   return ret;
80 }
81
82 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
83 T *CorbaNCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::getValue(long &size)
84 {
85   return Receiver<T,servForT,ptrForT>::getValue(size,_mySender);
86 }
87
88 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
89 CorbaWCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::CorbaWCNoCopyReceiver(CorbaSender mySender):_mySender(mySender){
90 }
91
92 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
93 CorbaWCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::~CorbaWCNoCopyReceiver(){
94   _mySender->release();
95 }
96
97 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
98 T *CorbaWCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::getDistValue(long &size){
99   size=_mySender->getSize();
100   long n;
101   T *ret=new T[size];
102   T *iter=ret;
103   for(long i=0;i<size;i+=TAILLE_SPLIT)
104     {
105       if(size-i>TAILLE_SPLIT)
106         n=TAILLE_SPLIT;
107       else
108         n=size-i;
109       TSeqCorba seq=_mySender->sendPart(i,n);
110       TCorba *seqd=seq->get_buffer(0);
111       for(long j=0;j<n;j++)
112         *iter++=*seqd++;
113     }
114   return ret;
115 }
116
117 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
118 T *CorbaWCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::getValue(long &size)
119 {
120   return Receiver<T,servForT,ptrForT>::getValue(size,_mySender);
121 }
122
123 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
124 CorbaWCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::CorbaWCWithCopyReceiver(CorbaSender mySender):_mySender(mySender){
125 }
126
127 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
128 CorbaWCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::~CorbaWCWithCopyReceiver(){
129   _mySender->release();
130 }
131
132 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
133 T *CorbaWCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::getDistValue(long &size){
134   size=_mySender->getSize();
135   long n;
136   T *ret=new T[size];
137   T *iter=ret;
138   for(long i=0;i<size;i+=TAILLE_SPLIT)
139     {
140       if(size-i>TAILLE_SPLIT)
141         n=TAILLE_SPLIT;
142       else
143         n=size-i;
144       TSeqCorba seq=_mySender->sendPart(i,n);
145       TCorba *seqd=seq->get_buffer(0);
146       for(long j=0;j<n;j++)
147       *iter++=*seqd++;
148     }
149   return ret;
150 }
151
152 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
153 T *CorbaWCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::getValue(long &size)
154 {
155   return Receiver<T,servForT,ptrForT>::getValue(size,_mySender);
156 }
157
158 #ifdef HAVE_MPI2
159
160 template<class T,class CorbaSender,class servForT,class ptrForT>
161 MPIReceiver<T,CorbaSender,servForT,ptrForT>::MPIReceiver(CorbaSender mySender):_mySender(mySender){
162 }
163
164 template<class T,class CorbaSender,class servForT,class ptrForT>
165 MPIReceiver<T,CorbaSender,servForT,ptrForT>::~MPIReceiver(){
166   _mySender->release();
167 }
168
169 template<class T,class CorbaSender,class servForT,class ptrForT>
170 T *MPIReceiver<T,CorbaSender,servForT,ptrForT>::getDistValue(long &size){
171   int i=0;
172   int myproc;
173   int sproc;
174   MPI_Status status;
175   MPI_Comm com; 
176   char   port_name_clt [MPI_MAX_PORT_NAME];
177   T *_v;
178   long _n;
179
180   
181   CORBA::Any a; 
182   MPI_Comm_rank(MPI_COMM_WORLD, &myproc);
183   SALOME::MPISender::param_var p =_mySender->getParam();
184   _mySender->send();
185   sproc = p->myproc;
186   MPI_ERROR_HANDLER(MPI_ERRORS_RETURN);
187   while ( i != TIMEOUT  && MPI_Lookup_name((char*)p->service,MPI_INFO_NULL,port_name_clt) != MPI_SUCCESS) { 
188     i++;
189   }
190   MPI_ERROR_HANDLER(MPI_ERRORS_ARE_FATAL);
191   if ( i == TIMEOUT  ) { 
192     MPI_Finalize();
193     exit(-1);
194   }
195   else{
196     //       Connect to service, get the inter-communicator server
197     //      Attention MPI_Comm_connect est un appel collectif :
198     //  - Si lancement mpirun -c n -----> uniquement     MPI_COMM_SELF fonctionne
199     //  - Si lancement client_server&client_server ----> MPI_COMM_WORLD fonctionne
200     
201     //      TIMEOUT is inefficient since MPI_Comm_Connect doesn't return if we asked for
202     //        a service that has been unpublished !
203     MPI_ERROR_HANDLER(MPI_ERRORS_RETURN);
204     i = 0;
205     while ( i != TIMEOUT  &&  MPI_Comm_connect(port_name_clt, MPI_INFO_NULL, 0, MPI_COMM_WORLD, &com)!=MPI_SUCCESS ) { 
206       i++; 
207     }
208     MPI_ERROR_HANDLER(MPI_ERRORS_ARE_FATAL);
209     if ( i == TIMEOUT ) {
210       MPI_Finalize(); 
211       exit(-1);
212     }
213   }
214   MPI_Recv( &_n, 1, MPI_LONG, sproc,p->tag1,com,&status);
215   _v = new T[_n];
216   MPI_Recv( _v, _n, MPITRAITS<T>::MpiType, sproc,p->tag2,com,&status);
217   _mySender->close(p);
218   MPI_Comm_disconnect( &com );  
219   size=_n;
220   return _v;
221 }
222
223 template<class T,class CorbaSender,class servForT,class ptrForT>
224 T *MPIReceiver<T,CorbaSender,servForT,ptrForT>::getValue(long &size)
225 {
226   return Receiver<T,servForT,ptrForT>::getValue(size,_mySender);
227 }
228
229 #endif
230
231 #ifdef HAVE_SOCKET
232 #include <sys/types.h>
233 #include <sys/socket.h>
234 #include <netinet/in.h>
235 #include <arpa/inet.h>
236 #include <netdb.h>
237 #include <unistd.h>
238 #include <rpc/xdr.h>
239
240 template<class T,int (*myFunc)(XDR*,T*),class CorbaSender,class servForT,class ptrForT>
241 SocketReceiver<T,myFunc,CorbaSender,servForT,ptrForT>::SocketReceiver(CorbaSender mySender) : _mySender(mySender)
242 {
243   _clientSockfd = -1;
244   _senderDestruc=true;
245 }
246
247 template<class T,int (*myFunc)(XDR*,T*),class CorbaSender,class servForT,class ptrForT>
248 SocketReceiver<T,myFunc,CorbaSender,servForT,ptrForT>::~SocketReceiver()
249 {
250   if(_senderDestruc)
251     {
252       _mySender->release();
253     }
254 }
255
256 template<class T,int (*myFunc)(XDR*,T*),class CorbaSender,class servForT,class ptrForT>
257 T *SocketReceiver<T,myFunc,CorbaSender,servForT,ptrForT>::getValue(long &size)
258 {
259   return Receiver<T,servForT,ptrForT>::getValue(size,_mySender);
260 }
261
262 template<class T,int (*myFunc)(XDR*,T*),class CorbaSender,class servForT,class ptrForT>
263 T* SocketReceiver<T,myFunc,CorbaSender,servForT,ptrForT>::getDistValue(long &size)
264 {
265   int n=0, m;
266   T *v;
267   XDR xp; /* pointeur sur le decodeur XDR */
268
269   try{
270     initCom();
271
272     SALOME::SocketSender::param_var p = _mySender->getParam();
273
274     size = p->lend - p->lstart + 1;
275     v = new T[size];
276
277     connectCom(p->internet_address, p->myport);
278   
279     _mySender->send();
280
281     xdrmem_create(&xp,(char*)v,size*sizeof(T),XDR_DECODE );
282     while( n < size*sizeof(T) ){
283       m = read(_clientSockfd, (char*)v+n, size*sizeof(T)-n);
284       if( m < 0 ){
285         closeCom();
286         delete [] v;
287         SALOME::ExceptionStruct es;
288         es.type = SALOME::COMM;
289         es.text = "error read Socket exception";
290         throw SALOME::SALOME_Exception(es);
291       }
292       n += m;
293     }
294     xdr_vector( &xp, (char*)v, size, sizeof(T), (xdrproc_t)myFunc);
295     xdr_destroy( &xp );
296     
297     _mySender->endOfCom();
298     closeCom();
299   }
300   catch(SALOME::SALOME_Exception &ex){
301     if( ex.details.type == SALOME::COMM )
302       {
303         _senderDestruc=false;
304                 std::cout << ex.details.text << std::endl;
305         throw MultiCommException("Unknown sender protocol");
306       }
307     else
308       throw ex;
309   }
310  
311   return v;
312 }
313
314 template<class T,int (*myFunc)(XDR*,T*),class CorbaSender,class servForT,class ptrForT>
315 void SocketReceiver<T,myFunc,CorbaSender,servForT,ptrForT>::initCom()
316 {
317   try{
318     _mySender->initCom();
319
320     /* Ouverture de la socket */
321     _clientSockfd = socket(AF_INET, SOCK_STREAM, 0);
322     if (_clientSockfd < 0) {
323       closeCom();
324       SALOME::ExceptionStruct es;
325       es.type = SALOME::COMM;
326       es.text = "error Socket exception";
327       throw SALOME::SALOME_Exception(es);
328     }
329   }
330   catch(SALOME::SALOME_Exception &ex){
331     if( ex.details.type == SALOME::COMM )
332       {
333         _senderDestruc=false;
334                 std::cout << ex.details.text << std::endl;
335         throw MultiCommException("Unknown sender protocol");
336       }
337     else
338       throw ex;
339   }
340
341 }
342
343 template<class T,int (*myFunc)(XDR*,T*),class CorbaSender,class servForT,class ptrForT>
344 void SocketReceiver<T,myFunc,CorbaSender,servForT,ptrForT>::connectCom(const char *dest_address, int port)
345 {
346   struct sockaddr_in serv_addr;
347   struct hostent * server;
348   SALOME::ExceptionStruct es;
349
350   try{
351     /* reception of the host structure on the remote process */
352     server = gethostbyname(dest_address);
353     if( server == NULL ) {
354       closeCom();
355       es.type = SALOME::COMM;
356       es.text = "error unknown host Socket exception";
357       _senderDestruc=false;
358       throw SALOME::SALOME_Exception(es);
359     }
360
361     /* Initialisation of the socket structure */
362     bzero((char*)&serv_addr,sizeof(serv_addr));
363     serv_addr.sin_family = AF_INET;
364     serv_addr.sin_addr.s_addr = INADDR_ANY;
365     bcopy((char *)server->h_addr, 
366           (char *)&serv_addr.sin_addr.s_addr,
367           server->h_length);
368     serv_addr.sin_port = htons(port);
369     
370     if( connect(_clientSockfd, (struct sockaddr *) & serv_addr, sizeof(struct sockaddr)) < 0 ){
371       closeCom();
372       es.type = SALOME::COMM;
373       es.text = "error connect Socket exception";
374       _senderDestruc=false;
375       throw SALOME::SALOME_Exception(es);
376     }
377
378     _mySender->acceptCom();
379
380   }
381   catch(SALOME::SALOME_Exception &ex){
382     if( ex.details.type == SALOME::COMM )
383       {
384         _senderDestruc=false;
385         std::cout << ex.details.text << std::endl;
386         throw MultiCommException("Unknown sender protocol");
387       }
388     else
389       throw ex;
390   }
391
392 }
393
394
395 template<class T,int (*myFunc)(XDR*,T*),class CorbaSender,class servForT,class ptrForT>
396 void SocketReceiver<T,myFunc,CorbaSender,servForT,ptrForT>::closeCom()
397 {
398   _mySender->closeCom();
399   if( _clientSockfd >= 0 ){
400     close(_clientSockfd);
401     _clientSockfd = -1;
402   }
403  
404 }
405
406 #endif