]> SALOME platform Git repositories - modules/kernel.git/blob - src/Communication/Receivers.cxx
Salome HOME
Build KERNEL with native openmpi on CentOS-8
[modules/kernel.git] / src / Communication / Receivers.cxx
1 // Copyright (C) 2007-2020  CEA/DEN, EDF R&D, OPEN CASCADE
2 //
3 // Copyright (C) 2003-2007  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 // CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
5 //
6 // This library is free software; you can redistribute it and/or
7 // modify it under the terms of the GNU Lesser General Public
8 // License as published by the Free Software Foundation; either
9 // version 2.1 of the License, or (at your option) any later version.
10 //
11 // This library is distributed in the hope that it will be useful,
12 // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 // Lesser General Public License for more details.
15 //
16 // You should have received a copy of the GNU Lesser General Public
17 // License along with this library; if not, write to the Free Software
18 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
19 //
20 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 //
22
23 #include "omniORB4/poa.h"
24 #include "utilities.h"
25
26 #define TAILLE_SPLIT 100000
27 #define TIMEOUT 20
28
29 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
30 CorbaNCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::CorbaNCNoCopyReceiver(CorbaSender mySender):_mySender(mySender){
31 }
32
33 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
34 CorbaNCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::~CorbaNCNoCopyReceiver(){
35   _mySender->release();
36 }
37
38 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
39 T *CorbaNCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::getDistValue(long &size)
40 {
41   TSeqCorba seq=_mySender->send();
42   size=seq->length();
43   return (T *)seq->get_buffer(1);
44 }
45
46 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
47 T *CorbaNCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::getValue(long &size)
48 {
49   return Receiver<T,servForT,ptrForT>::getValue(size,_mySender);
50 }
51
52 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
53 CorbaNCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::CorbaNCWithCopyReceiver(CorbaSender mySender):_mySender(mySender){
54 }
55
56 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
57 CorbaNCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::~CorbaNCWithCopyReceiver(){
58   _mySender->release();
59 }
60
61 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
62 T *CorbaNCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::getDistValue(long &size){
63   size=_mySender->getSize();
64   long n;
65   T *ret=new T[size];
66   T *iter=ret;
67   for(long i=0;i<size;i+=TAILLE_SPLIT)
68     {
69       if(size-i>TAILLE_SPLIT)
70         n=TAILLE_SPLIT;
71       else
72         n=size-i;
73       TSeqCorba seq=_mySender->sendPart(i,n);
74       T *seqd=(T *)seq->get_buffer(0);
75       for(long j=0;j<n;j++)
76         *iter++=*seqd++;
77     }
78   return ret;
79 }
80
81 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
82 T *CorbaNCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::getValue(long &size)
83 {
84   return Receiver<T,servForT,ptrForT>::getValue(size,_mySender);
85 }
86
87 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
88 CorbaWCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::CorbaWCNoCopyReceiver(CorbaSender mySender):_mySender(mySender){
89 }
90
91 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
92 CorbaWCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::~CorbaWCNoCopyReceiver(){
93   _mySender->release();
94 }
95
96 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
97 T *CorbaWCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::getDistValue(long &size){
98   size=_mySender->getSize();
99   long n;
100   T *ret=new T[size];
101   T *iter=ret;
102   for(long i=0;i<size;i+=TAILLE_SPLIT)
103     {
104       if(size-i>TAILLE_SPLIT)
105         n=TAILLE_SPLIT;
106       else
107         n=size-i;
108       TSeqCorba seq=_mySender->sendPart(i,n);
109       TCorba *seqd=seq->get_buffer(0);
110       for(long j=0;j<n;j++)
111         *iter++=*seqd++;
112     }
113   return ret;
114 }
115
116 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
117 T *CorbaWCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::getValue(long &size)
118 {
119   return Receiver<T,servForT,ptrForT>::getValue(size,_mySender);
120 }
121
122 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
123 CorbaWCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::CorbaWCWithCopyReceiver(CorbaSender mySender):_mySender(mySender){
124 }
125
126 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
127 CorbaWCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::~CorbaWCWithCopyReceiver(){
128   _mySender->release();
129 }
130
131 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
132 T *CorbaWCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::getDistValue(long &size){
133   size=_mySender->getSize();
134   long n;
135   T *ret=new T[size];
136   T *iter=ret;
137   for(long i=0;i<size;i+=TAILLE_SPLIT)
138     {
139       if(size-i>TAILLE_SPLIT)
140         n=TAILLE_SPLIT;
141       else
142         n=size-i;
143       TSeqCorba seq=_mySender->sendPart(i,n);
144       TCorba *seqd=seq->get_buffer(0);
145       for(long j=0;j<n;j++)
146       *iter++=*seqd++;
147     }
148   return ret;
149 }
150
151 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
152 T *CorbaWCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::getValue(long &size)
153 {
154   return Receiver<T,servForT,ptrForT>::getValue(size,_mySender);
155 }
156
157 #ifdef HAVE_MPI2
158
159 template<class T,class CorbaSender,class servForT,class ptrForT>
160 MPIReceiver<T,CorbaSender,servForT,ptrForT>::MPIReceiver(CorbaSender mySender):_mySender(mySender){
161 }
162
163 template<class T,class CorbaSender,class servForT,class ptrForT>
164 MPIReceiver<T,CorbaSender,servForT,ptrForT>::~MPIReceiver(){
165   _mySender->release();
166 }
167
168 template<class T,class CorbaSender,class servForT,class ptrForT>
169 T *MPIReceiver<T,CorbaSender,servForT,ptrForT>::getDistValue(long &size){
170   int i=0;
171   int myproc;
172   int sproc;
173   MPI_Status status;
174   MPI_Comm com; 
175   char   port_name_clt [MPI_MAX_PORT_NAME];
176   T *_v;
177   long _n;
178
179   
180   CORBA::Any a; 
181   MPI_Comm_rank(MPI_COMM_WORLD, &myproc);
182   SALOME::MPISender::param_var p =_mySender->getParam();
183   _mySender->send();
184   sproc = p->myproc;
185 #if OMPI_MAJOR_VERSION >= 4
186   MPI_Comm_set_errhandler(MPI_COMM_WORLD, MPI_ERRORS_RETURN);
187 #else
188   MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_RETURN);
189 #endif
190   while ( i != TIMEOUT  && MPI_Lookup_name((char*)p->service,MPI_INFO_NULL,port_name_clt) != MPI_SUCCESS) { 
191     i++;
192   }
193 #if OMPI_MAJOR_VERSION >= 4
194   MPI_Comm_set_errhandler(MPI_COMM_WORLD, MPI_ERRORS_ARE_FATAL);
195 #else
196   MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_ARE_FATAL);
197 #endif
198   if ( i == TIMEOUT  ) { 
199     MPI_Finalize();
200     exit(-1);
201   }
202   else{
203     //       Connect to service, get the inter-communicator server
204     //      Attention MPI_Comm_connect est un appel collectif :
205     //  - Si lancement mpirun -c n -----> uniquement     MPI_COMM_SELF fonctionne
206     //  - Si lancement client_server&client_server ----> MPI_COMM_WORLD fonctionne
207     
208     //      TIMEOUT is inefficient since MPI_Comm_Connect doesn't return if we asked for
209     //        a service that has been unpublished !
210 #if OMPI_MAJOR_VERSION >= 4
211     MPI_Comm_set_errhandler(MPI_COMM_WORLD, MPI_ERRORS_RETURN);
212 #else
213     MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_RETURN);
214 #endif
215     i = 0;
216     while ( i != TIMEOUT  &&  MPI_Comm_connect(port_name_clt, MPI_INFO_NULL, 0, MPI_COMM_WORLD, &com)!=MPI_SUCCESS ) { 
217       i++; 
218     }
219 #if OMPI_MAJOR_VERSION >= 4
220     MPI_Comm_set_errhandler(MPI_COMM_WORLD, MPI_ERRORS_ARE_FATAL);
221 #else 
222     MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_ARE_FATAL);
223 #endif
224     if ( i == TIMEOUT ) {
225       MPI_Finalize(); 
226       exit(-1);
227     }
228   }
229   MPI_Recv( &_n, 1, MPI_LONG, sproc,p->tag1,com,&status);
230   _v = new T[_n];
231   MPI_Recv( _v, _n, MPITRAITS<T>::MpiType, sproc,p->tag2,com,&status);
232   _mySender->close(p);
233   MPI_Comm_disconnect( &com );  
234   size=_n;
235   return _v;
236 }
237
238 template<class T,class CorbaSender,class servForT,class ptrForT>
239 T *MPIReceiver<T,CorbaSender,servForT,ptrForT>::getValue(long &size)
240 {
241   return Receiver<T,servForT,ptrForT>::getValue(size,_mySender);
242 }
243
244 #endif
245
246 #ifdef HAVE_SOCKET
247 #include <sys/types.h>
248 #include <sys/socket.h>
249 #include <netinet/in.h>
250 #include <arpa/inet.h>
251 #include <netdb.h>
252 #include <unistd.h>
253 #include <rpc/xdr.h>
254
255 template<class T,int (*myFunc)(XDR*,T*),class CorbaSender,class servForT,class ptrForT>
256 SocketReceiver<T,myFunc,CorbaSender,servForT,ptrForT>::SocketReceiver(CorbaSender mySender) : _mySender(mySender)
257 {
258   _clientSockfd = -1;
259   _senderDestruc=true;
260 }
261
262 template<class T,int (*myFunc)(XDR*,T*),class CorbaSender,class servForT,class ptrForT>
263 SocketReceiver<T,myFunc,CorbaSender,servForT,ptrForT>::~SocketReceiver()
264 {
265   if(_senderDestruc)
266     {
267       _mySender->release();
268     }
269 }
270
271 template<class T,int (*myFunc)(XDR*,T*),class CorbaSender,class servForT,class ptrForT>
272 T *SocketReceiver<T,myFunc,CorbaSender,servForT,ptrForT>::getValue(long &size)
273 {
274   return Receiver<T,servForT,ptrForT>::getValue(size,_mySender);
275 }
276
277 template<class T,int (*myFunc)(XDR*,T*),class CorbaSender,class servForT,class ptrForT>
278 T* SocketReceiver<T,myFunc,CorbaSender,servForT,ptrForT>::getDistValue(long &size)
279 {
280   int n=0, m;
281   T *v;
282   XDR xp; /* pointeur sur le decodeur XDR */
283
284   try{
285     initCom();
286
287     SALOME::SocketSender::param_var p = _mySender->getParam();
288
289     size = p->lend - p->lstart + 1;
290     v = new T[size];
291
292     connectCom(p->internet_address, p->myport);
293   
294     _mySender->send();
295
296     xdrmem_create(&xp,(char*)v,size*sizeof(T),XDR_DECODE );
297     while( n < size*sizeof(T) ){
298       m = read(_clientSockfd, (char*)v+n, size*sizeof(T)-n);
299       if( m < 0 ){
300         closeCom();
301         delete [] v;
302         SALOME::ExceptionStruct es;
303         es.type = SALOME::COMM;
304         es.text = "error read Socket exception";
305         throw SALOME::SALOME_Exception(es);
306       }
307       n += m;
308     }
309     xdr_vector( &xp, (char*)v, size, sizeof(T), (xdrproc_t)myFunc);
310     xdr_destroy( &xp );
311     
312     _mySender->endOfCom();
313     closeCom();
314   }
315   catch(SALOME::SALOME_Exception &ex){
316     if( ex.details.type == SALOME::COMM )
317       {
318         _senderDestruc=false;
319                 std::cout << ex.details.text << std::endl;
320         throw MultiCommException("Unknown sender protocol");
321       }
322     else
323       throw ex;
324   }
325  
326   return v;
327 }
328
329 template<class T,int (*myFunc)(XDR*,T*),class CorbaSender,class servForT,class ptrForT>
330 void SocketReceiver<T,myFunc,CorbaSender,servForT,ptrForT>::initCom()
331 {
332   try{
333     _mySender->initCom();
334
335     /* Ouverture de la socket */
336     _clientSockfd = socket(AF_INET, SOCK_STREAM, 0);
337     if (_clientSockfd < 0) {
338       closeCom();
339       SALOME::ExceptionStruct es;
340       es.type = SALOME::COMM;
341       es.text = "error Socket exception";
342       throw SALOME::SALOME_Exception(es);
343     }
344   }
345   catch(SALOME::SALOME_Exception &ex){
346     if( ex.details.type == SALOME::COMM )
347       {
348         _senderDestruc=false;
349                 std::cout << ex.details.text << std::endl;
350         throw MultiCommException("Unknown sender protocol");
351       }
352     else
353       throw ex;
354   }
355
356 }
357
358 template<class T,int (*myFunc)(XDR*,T*),class CorbaSender,class servForT,class ptrForT>
359 void SocketReceiver<T,myFunc,CorbaSender,servForT,ptrForT>::connectCom(const char *dest_address, int port)
360 {
361   struct sockaddr_in serv_addr;
362   struct hostent * server;
363   SALOME::ExceptionStruct es;
364
365   try{
366     /* reception of the host structure on the remote process */
367     server = gethostbyname(dest_address);
368     if( server == NULL ) {
369       closeCom();
370       es.type = SALOME::COMM;
371       es.text = "error unknown host Socket exception";
372       _senderDestruc=false;
373       throw SALOME::SALOME_Exception(es);
374     }
375
376     /* Initialisation of the socket structure */
377     bzero((char*)&serv_addr,sizeof(serv_addr));
378     serv_addr.sin_family = AF_INET;
379     serv_addr.sin_addr.s_addr = INADDR_ANY;
380     bcopy((char *)server->h_addr, 
381           (char *)&serv_addr.sin_addr.s_addr,
382           server->h_length);
383     serv_addr.sin_port = htons(port);
384     
385     if( connect(_clientSockfd, (struct sockaddr *) & serv_addr, sizeof(struct sockaddr)) < 0 ){
386       closeCom();
387       es.type = SALOME::COMM;
388       es.text = "error connect Socket exception";
389       _senderDestruc=false;
390       throw SALOME::SALOME_Exception(es);
391     }
392
393     _mySender->acceptCom();
394
395   }
396   catch(SALOME::SALOME_Exception &ex){
397     if( ex.details.type == SALOME::COMM )
398       {
399         _senderDestruc=false;
400         std::cout << ex.details.text << std::endl;
401         throw MultiCommException("Unknown sender protocol");
402       }
403     else
404       throw ex;
405   }
406
407 }
408
409
410 template<class T,int (*myFunc)(XDR*,T*),class CorbaSender,class servForT,class ptrForT>
411 void SocketReceiver<T,myFunc,CorbaSender,servForT,ptrForT>::closeCom()
412 {
413   _mySender->closeCom();
414   if( _clientSockfd >= 0 ){
415     close(_clientSockfd);
416     _clientSockfd = -1;
417   }
418  
419 }
420
421 #endif