1 // Copyright (C) 2007-2020 CEA/DEN, EDF R&D, OPEN CASCADE
3 // Copyright (C) 2003-2007 OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 // CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
6 // This library is free software; you can redistribute it and/or
7 // modify it under the terms of the GNU Lesser General Public
8 // License as published by the Free Software Foundation; either
9 // version 2.1 of the License, or (at your option) any later version.
11 // This library is distributed in the hope that it will be useful,
12 // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 // Lesser General Public License for more details.
16 // You should have received a copy of the GNU Lesser General Public
17 // License along with this library; if not, write to the Free Software
18 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
20 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
23 #include "omniORB4/poa.h"
24 #include "utilities.h"
26 #define TAILLE_SPLIT 100000
29 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
30 CorbaNCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::CorbaNCNoCopyReceiver(CorbaSender mySender):_mySender(mySender){
33 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
34 CorbaNCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::~CorbaNCNoCopyReceiver(){
38 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
39 T *CorbaNCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::getDistValue(long &size)
41 TSeqCorba seq=_mySender->send();
43 return (T *)seq->get_buffer(1);
46 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
47 T *CorbaNCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::getValue(long &size)
49 return Receiver<T,servForT,ptrForT>::getValue(size,_mySender);
52 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
53 CorbaNCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::CorbaNCWithCopyReceiver(CorbaSender mySender):_mySender(mySender){
56 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
57 CorbaNCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::~CorbaNCWithCopyReceiver(){
61 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
62 T *CorbaNCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::getDistValue(long &size){
63 size=_mySender->getSize();
67 for(long i=0;i<size;i+=TAILLE_SPLIT)
69 if(size-i>TAILLE_SPLIT)
73 TSeqCorba seq=_mySender->sendPart(i,n);
74 T *seqd=(T *)seq->get_buffer(0);
81 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
82 T *CorbaNCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::getValue(long &size)
84 return Receiver<T,servForT,ptrForT>::getValue(size,_mySender);
87 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
88 CorbaWCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::CorbaWCNoCopyReceiver(CorbaSender mySender):_mySender(mySender){
91 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
92 CorbaWCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::~CorbaWCNoCopyReceiver(){
96 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
97 T *CorbaWCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::getDistValue(long &size){
98 size=_mySender->getSize();
102 for(long i=0;i<size;i+=TAILLE_SPLIT)
104 if(size-i>TAILLE_SPLIT)
108 TSeqCorba seq=_mySender->sendPart(i,n);
109 TCorba *seqd=seq->get_buffer(0);
110 for(long j=0;j<n;j++)
116 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
117 T *CorbaWCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::getValue(long &size)
119 return Receiver<T,servForT,ptrForT>::getValue(size,_mySender);
122 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
123 CorbaWCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::CorbaWCWithCopyReceiver(CorbaSender mySender):_mySender(mySender){
126 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
127 CorbaWCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::~CorbaWCWithCopyReceiver(){
128 _mySender->release();
131 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
132 T *CorbaWCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::getDistValue(long &size){
133 size=_mySender->getSize();
137 for(long i=0;i<size;i+=TAILLE_SPLIT)
139 if(size-i>TAILLE_SPLIT)
143 TSeqCorba seq=_mySender->sendPart(i,n);
144 TCorba *seqd=seq->get_buffer(0);
145 for(long j=0;j<n;j++)
151 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
152 T *CorbaWCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::getValue(long &size)
154 return Receiver<T,servForT,ptrForT>::getValue(size,_mySender);
159 template<class T,class CorbaSender,class servForT,class ptrForT>
160 MPIReceiver<T,CorbaSender,servForT,ptrForT>::MPIReceiver(CorbaSender mySender):_mySender(mySender){
163 template<class T,class CorbaSender,class servForT,class ptrForT>
164 MPIReceiver<T,CorbaSender,servForT,ptrForT>::~MPIReceiver(){
165 _mySender->release();
168 template<class T,class CorbaSender,class servForT,class ptrForT>
169 T *MPIReceiver<T,CorbaSender,servForT,ptrForT>::getDistValue(long &size){
175 char port_name_clt [MPI_MAX_PORT_NAME];
181 MPI_Comm_rank(MPI_COMM_WORLD, &myproc);
182 SALOME::MPISender::param_var p =_mySender->getParam();
185 #if OMPI_MAJOR_VERSION >= 4
186 MPI_Comm_set_errhandler(MPI_COMM_WORLD, MPI_ERRORS_RETURN);
188 MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_RETURN);
190 while ( i != TIMEOUT && MPI_Lookup_name((char*)p->service,MPI_INFO_NULL,port_name_clt) != MPI_SUCCESS) {
193 #if OMPI_MAJOR_VERSION >= 4
194 MPI_Comm_set_errhandler(MPI_COMM_WORLD, MPI_ERRORS_ARE_FATAL);
196 MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_ARE_FATAL);
198 if ( i == TIMEOUT ) {
203 // Connect to service, get the inter-communicator server
204 // Attention MPI_Comm_connect est un appel collectif :
205 // - Si lancement mpirun -c n -----> uniquement MPI_COMM_SELF fonctionne
206 // - Si lancement client_server&client_server ----> MPI_COMM_WORLD fonctionne
208 // TIMEOUT is inefficient since MPI_Comm_Connect doesn't return if we asked for
209 // a service that has been unpublished !
210 #if OMPI_MAJOR_VERSION >= 4
211 MPI_Comm_set_errhandler(MPI_COMM_WORLD, MPI_ERRORS_RETURN);
213 MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_RETURN);
216 while ( i != TIMEOUT && MPI_Comm_connect(port_name_clt, MPI_INFO_NULL, 0, MPI_COMM_WORLD, &com)!=MPI_SUCCESS ) {
219 #if OMPI_MAJOR_VERSION >= 4
220 MPI_Comm_set_errhandler(MPI_COMM_WORLD, MPI_ERRORS_ARE_FATAL);
222 MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_ARE_FATAL);
224 if ( i == TIMEOUT ) {
229 MPI_Recv( &_n, 1, MPI_LONG, sproc,p->tag1,com,&status);
231 MPI_Recv( _v, _n, MPITRAITS<T>::MpiType, sproc,p->tag2,com,&status);
233 MPI_Comm_disconnect( &com );
238 template<class T,class CorbaSender,class servForT,class ptrForT>
239 T *MPIReceiver<T,CorbaSender,servForT,ptrForT>::getValue(long &size)
241 return Receiver<T,servForT,ptrForT>::getValue(size,_mySender);
247 #include <sys/types.h>
248 #include <sys/socket.h>
249 #include <netinet/in.h>
250 #include <arpa/inet.h>
255 template<class T,int (*myFunc)(XDR*,T*),class CorbaSender,class servForT,class ptrForT>
256 SocketReceiver<T,myFunc,CorbaSender,servForT,ptrForT>::SocketReceiver(CorbaSender mySender) : _mySender(mySender)
262 template<class T,int (*myFunc)(XDR*,T*),class CorbaSender,class servForT,class ptrForT>
263 SocketReceiver<T,myFunc,CorbaSender,servForT,ptrForT>::~SocketReceiver()
267 _mySender->release();
271 template<class T,int (*myFunc)(XDR*,T*),class CorbaSender,class servForT,class ptrForT>
272 T *SocketReceiver<T,myFunc,CorbaSender,servForT,ptrForT>::getValue(long &size)
274 return Receiver<T,servForT,ptrForT>::getValue(size,_mySender);
277 template<class T,int (*myFunc)(XDR*,T*),class CorbaSender,class servForT,class ptrForT>
278 T* SocketReceiver<T,myFunc,CorbaSender,servForT,ptrForT>::getDistValue(long &size)
282 XDR xp; /* pointeur sur le decodeur XDR */
287 SALOME::SocketSender::param_var p = _mySender->getParam();
289 size = p->lend - p->lstart + 1;
292 connectCom(p->internet_address, p->myport);
296 xdrmem_create(&xp,(char*)v,size*sizeof(T),XDR_DECODE );
297 while( n < size*sizeof(T) ){
298 m = read(_clientSockfd, (char*)v+n, size*sizeof(T)-n);
302 SALOME::ExceptionStruct es;
303 es.type = SALOME::COMM;
304 es.text = "error read Socket exception";
305 throw SALOME::SALOME_Exception(es);
309 xdr_vector( &xp, (char*)v, size, sizeof(T), (xdrproc_t)myFunc);
312 _mySender->endOfCom();
315 catch(SALOME::SALOME_Exception &ex){
316 if( ex.details.type == SALOME::COMM )
318 _senderDestruc=false;
319 std::cout << ex.details.text << std::endl;
320 throw MultiCommException("Unknown sender protocol");
329 template<class T,int (*myFunc)(XDR*,T*),class CorbaSender,class servForT,class ptrForT>
330 void SocketReceiver<T,myFunc,CorbaSender,servForT,ptrForT>::initCom()
333 _mySender->initCom();
335 /* Ouverture de la socket */
336 _clientSockfd = socket(AF_INET, SOCK_STREAM, 0);
337 if (_clientSockfd < 0) {
339 SALOME::ExceptionStruct es;
340 es.type = SALOME::COMM;
341 es.text = "error Socket exception";
342 throw SALOME::SALOME_Exception(es);
345 catch(SALOME::SALOME_Exception &ex){
346 if( ex.details.type == SALOME::COMM )
348 _senderDestruc=false;
349 std::cout << ex.details.text << std::endl;
350 throw MultiCommException("Unknown sender protocol");
358 template<class T,int (*myFunc)(XDR*,T*),class CorbaSender,class servForT,class ptrForT>
359 void SocketReceiver<T,myFunc,CorbaSender,servForT,ptrForT>::connectCom(const char *dest_address, int port)
361 struct sockaddr_in serv_addr;
362 struct hostent * server;
363 SALOME::ExceptionStruct es;
366 /* reception of the host structure on the remote process */
367 server = gethostbyname(dest_address);
368 if( server == NULL ) {
370 es.type = SALOME::COMM;
371 es.text = "error unknown host Socket exception";
372 _senderDestruc=false;
373 throw SALOME::SALOME_Exception(es);
376 /* Initialisation of the socket structure */
377 bzero((char*)&serv_addr,sizeof(serv_addr));
378 serv_addr.sin_family = AF_INET;
379 serv_addr.sin_addr.s_addr = INADDR_ANY;
380 bcopy((char *)server->h_addr,
381 (char *)&serv_addr.sin_addr.s_addr,
383 serv_addr.sin_port = htons(port);
385 if( connect(_clientSockfd, (struct sockaddr *) & serv_addr, sizeof(struct sockaddr)) < 0 ){
387 es.type = SALOME::COMM;
388 es.text = "error connect Socket exception";
389 _senderDestruc=false;
390 throw SALOME::SALOME_Exception(es);
393 _mySender->acceptCom();
396 catch(SALOME::SALOME_Exception &ex){
397 if( ex.details.type == SALOME::COMM )
399 _senderDestruc=false;
400 std::cout << ex.details.text << std::endl;
401 throw MultiCommException("Unknown sender protocol");
410 template<class T,int (*myFunc)(XDR*,T*),class CorbaSender,class servForT,class ptrForT>
411 void SocketReceiver<T,myFunc,CorbaSender,servForT,ptrForT>::closeCom()
413 _mySender->closeCom();
414 if( _clientSockfd >= 0 ){
415 close(_clientSockfd);