1 // Copyright (C) 2005 OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
2 // CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
4 // This library is free software; you can redistribute it and/or
5 // modify it under the terms of the GNU Lesser General Public
6 // License as published by the Free Software Foundation; either
7 // version 2.1 of the License.
9 // This library is distributed in the hope that it will be useful
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 // Lesser General Public License for more details.
14 // You should have received a copy of the GNU Lesser General Public
15 // License along with this library; if not, write to the Free Software
16 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
20 #include "omniORB4/poa.h"
21 #include "utilities.h"
24 #define TAILLE_SPLIT 100000
27 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
28 CorbaNCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::CorbaNCNoCopyReceiver(CorbaSender mySender):_mySender(mySender){
31 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
32 CorbaNCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::~CorbaNCNoCopyReceiver(){
36 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
37 T *CorbaNCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::getDistValue(long &size)
39 TSeqCorba seq=_mySender->send();
41 return (T *)seq->get_buffer(1);
44 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
45 T *CorbaNCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::getValue(long &size)
47 return Receiver<T,servForT,ptrForT>::getValue(size,_mySender);
50 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
51 CorbaNCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::CorbaNCWithCopyReceiver(CorbaSender mySender):_mySender(mySender){
54 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
55 CorbaNCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::~CorbaNCWithCopyReceiver(){
59 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
60 T *CorbaNCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::getDistValue(long &size){
61 size=_mySender->getSize();
65 for(long i=0;i<size;i+=TAILLE_SPLIT)
67 if(size-i>TAILLE_SPLIT)
71 TSeqCorba seq=_mySender->sendPart(i,n);
72 T *seqd=(T *)seq->get_buffer(0);
79 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
80 T *CorbaNCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::getValue(long &size)
82 return Receiver<T,servForT,ptrForT>::getValue(size,_mySender);
85 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
86 CorbaWCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::CorbaWCNoCopyReceiver(CorbaSender mySender):_mySender(mySender){
89 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
90 CorbaWCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::~CorbaWCNoCopyReceiver(){
94 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
95 T *CorbaWCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::getDistValue(long &size){
96 size=_mySender->getSize();
100 for(long i=0;i<size;i+=TAILLE_SPLIT)
102 if(size-i>TAILLE_SPLIT)
106 TSeqCorba seq=_mySender->sendPart(i,n);
107 TCorba *seqd=seq->get_buffer(0);
108 for(long j=0;j<n;j++)
114 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
115 T *CorbaWCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::getValue(long &size)
117 return Receiver<T,servForT,ptrForT>::getValue(size,_mySender);
120 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
121 CorbaWCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::CorbaWCWithCopyReceiver(CorbaSender mySender):_mySender(mySender){
124 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
125 CorbaWCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::~CorbaWCWithCopyReceiver(){
126 _mySender->release();
129 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
130 T *CorbaWCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::getDistValue(long &size){
131 size=_mySender->getSize();
135 for(long i=0;i<size;i+=TAILLE_SPLIT)
137 if(size-i>TAILLE_SPLIT)
141 TSeqCorba seq=_mySender->sendPart(i,n);
142 TCorba *seqd=seq->get_buffer(0);
143 for(long j=0;j<n;j++)
149 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
150 T *CorbaWCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::getValue(long &size)
152 return Receiver<T,servForT,ptrForT>::getValue(size,_mySender);
157 template<class T,class CorbaSender,class servForT,class ptrForT>
158 MPIReceiver<T,CorbaSender,servForT,ptrForT>::MPIReceiver(CorbaSender mySender):_mySender(mySender){
161 template<class T,class CorbaSender,class servForT,class ptrForT>
162 MPIReceiver<T,CorbaSender,servForT,ptrForT>::~MPIReceiver(){
163 _mySender->release();
166 template<class T,class CorbaSender,class servForT,class ptrForT>
167 T *MPIReceiver<T,CorbaSender,servForT,ptrForT>::getDistValue(long &size){
173 char port_name_clt [MPI_MAX_PORT_NAME];
174 float telps, tuser, tsys, tcpu;
180 MPI_Comm_rank(MPI_COMM_WORLD, &myproc);
181 SALOME::MPISender::param_var p =_mySender->getParam();
184 MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_RETURN);
185 while ( i != TIMEOUT && MPI_Lookup_name((char*)p->service,MPI_INFO_NULL,port_name_clt) != MPI_SUCCESS) {
188 MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_ARE_FATAL);
189 if ( i == TIMEOUT ) {
194 // Connect to service, get the inter-communicator server
195 // Attention MPI_Comm_connect est un appel collectif :
196 // - Si lancement mpirun -c n -----> uniquement MPI_COMM_SELF fonctionne
197 // - Si lancement client_server&client_server ----> MPI_COMM_WORLD fonctionne
199 // TIMEOUT is inefficient since MPI_Comm_Connect doesn't return if we asked for
200 // a service that has been unpublished !
201 MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_RETURN);
203 while ( i != TIMEOUT && MPI_Comm_connect(port_name_clt, MPI_INFO_NULL, 0, MPI_COMM_WORLD, &com)!=MPI_SUCCESS ) {
206 MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_ARE_FATAL);
207 if ( i == TIMEOUT ) {
212 MPI_Recv( &_n, 1, MPI_LONG, sproc,p->tag1,com,&status);
214 MPI_Recv( _v, _n, MPITRAITS<T>::MpiType, sproc,p->tag2,com,&status);
216 MPI_Comm_disconnect( &com );
221 template<class T,class CorbaSender,class servForT,class ptrForT>
222 T *MPIReceiver<T,CorbaSender,servForT,ptrForT>::getValue(long &size)
224 return Receiver<T,servForT,ptrForT>::getValue(size,_mySender);
230 #include <sys/types.h>
231 #include <sys/socket.h>
232 #include <netinet/in.h>
233 #include <arpa/inet.h>
238 template<class T,int (*myFunc)(XDR*,T*),class CorbaSender,class servForT,class ptrForT>
239 SocketReceiver<T,myFunc,CorbaSender,servForT,ptrForT>::SocketReceiver(CorbaSender mySender) : _mySender(mySender)
245 template<class T,int (*myFunc)(XDR*,T*),class CorbaSender,class servForT,class ptrForT>
246 SocketReceiver<T,myFunc,CorbaSender,servForT,ptrForT>::~SocketReceiver()
250 _mySender->release();
254 template<class T,int (*myFunc)(XDR*,T*),class CorbaSender,class servForT,class ptrForT>
255 T *SocketReceiver<T,myFunc,CorbaSender,servForT,ptrForT>::getValue(long &size)
257 return Receiver<T,servForT,ptrForT>::getValue(size,_mySender);
260 template<class T,int (*myFunc)(XDR*,T*),class CorbaSender,class servForT,class ptrForT>
261 T* SocketReceiver<T,myFunc,CorbaSender,servForT,ptrForT>::getDistValue(long &size)
265 XDR xp; /* pointeur sur le decodeur XDR */
270 SALOME::SocketSender::param_var p = _mySender->getParam();
272 size = p->lend - p->lstart + 1;
275 connectCom(p->internet_address, p->myport);
279 xdrmem_create(&xp,(char*)v,size*sizeof(T),XDR_DECODE );
280 while( n < size*sizeof(T) ){
281 m = read(_clientSockfd, (char*)v+n, size*sizeof(T)-n);
285 SALOME::ExceptionStruct es;
286 es.type = SALOME::COMM;
287 es.text = "error read Socket exception";
288 throw SALOME::SALOME_Exception(es);
292 xdr_vector( &xp, (char*)v, size, sizeof(T), (xdrproc_t)myFunc);
295 _mySender->endOfCom();
298 catch(SALOME::SALOME_Exception &ex){
299 if( ex.details.type == SALOME::COMM )
301 _senderDestruc=false;
302 cout << ex.details.text << endl;
303 throw MultiCommException("Unknown sender protocol");
312 template<class T,int (*myFunc)(XDR*,T*),class CorbaSender,class servForT,class ptrForT>
313 void SocketReceiver<T,myFunc,CorbaSender,servForT,ptrForT>::initCom()
316 _mySender->initCom();
318 /* Ouverture de la socket */
319 _clientSockfd = socket(AF_INET, SOCK_STREAM, 0);
320 if (_clientSockfd < 0) {
322 SALOME::ExceptionStruct es;
323 es.type = SALOME::COMM;
324 es.text = "error Socket exception";
325 throw SALOME::SALOME_Exception(es);
328 catch(SALOME::SALOME_Exception &ex){
329 if( ex.details.type == SALOME::COMM )
331 _senderDestruc=false;
332 cout << ex.details.text << endl;
333 throw MultiCommException("Unknown sender protocol");
341 template<class T,int (*myFunc)(XDR*,T*),class CorbaSender,class servForT,class ptrForT>
342 void SocketReceiver<T,myFunc,CorbaSender,servForT,ptrForT>::connectCom(const char *dest_address, int port)
344 struct sockaddr_in serv_addr;
345 struct hostent * server;
346 SALOME::ExceptionStruct es;
349 /* reception of the host structure on the remote process */
350 server = gethostbyname(dest_address);
351 if( server == NULL ) {
353 es.type = SALOME::COMM;
354 es.text = "error unknown host Socket exception";
355 _senderDestruc=false;
356 throw SALOME::SALOME_Exception(es);
359 /* Initialisation of the socket structure */
360 bzero((char*)&serv_addr,sizeof(serv_addr));
361 serv_addr.sin_family = AF_INET;
362 serv_addr.sin_addr.s_addr = INADDR_ANY;
363 bcopy((char *)server->h_addr,
364 (char *)&serv_addr.sin_addr.s_addr,
366 serv_addr.sin_port = htons(port);
368 if( connect(_clientSockfd, (struct sockaddr *) & serv_addr, sizeof(struct sockaddr)) < 0 ){
370 es.type = SALOME::COMM;
371 es.text = "error connect Socket exception";
372 _senderDestruc=false;
373 throw SALOME::SALOME_Exception(es);
376 _mySender->acceptCom();
379 catch(SALOME::SALOME_Exception &ex){
380 if( ex.details.type == SALOME::COMM )
382 _senderDestruc=false;
383 cout << ex.details.text << endl;
384 throw MultiCommException("Unknown sender protocol");
393 template<class T,int (*myFunc)(XDR*,T*),class CorbaSender,class servForT,class ptrForT>
394 void SocketReceiver<T,myFunc,CorbaSender,servForT,ptrForT>::closeCom()
396 _mySender->closeCom();
397 if( _clientSockfd >= 0 ){
398 close(_clientSockfd);