1 // Copyright (C) 2007-2013 CEA/DEN, EDF R&D
3 // This library is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU Lesser General Public
5 // License as published by the Free Software Foundation; either
6 // version 2.1 of the License.
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 // Lesser General Public License for more details.
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
20 #include "MPIAccess.hxx"
21 #include "InterpolationUtils.hxx"
29 /*! \defgroup mpi_access MPIAccess
30 Class \a MPIAccess is the gateway to the MPI library.
31 It is a helper class that gathers the calls to the MPI
32 library that are made in the ParaMEDMEM library. This gathering
33 allows easier gathering of information about the communication
34 in the library. With MPIAccess, tags are managed automatically
35 and asynchronous operations are easier.
37 It is typically called after the MPI_Init() call in a program. It is afterwards passed as a parameter to the constructors of ParaMEDMEM objects so that they access the MPI library via the MPIAccess.
39 As an example, the following code initializes a processor group made of the zero processor.
42 #include "MPIAccess.hxx"
43 #include "ProcessorGroup.hxx"
45 int main(int argc, char** argv)
48 MPI_Init(&argc, &argv);
49 ParaMEDMEM::CommInterface comm_interface;
51 //setting up a processor group with proc 0
54 ParaMEDMEM::ProcessorGroup group(procs, comm_interface);
56 ParaMEDMEM::MPIAccess mpi_access(group);
65 /*! Creates a MPIAccess that is based on the processors included in \a ProcessorGroup.
66 This class may be called for easier use of MPI API.
68 \param ProcessorGroup MPIProcessorGroup object giving access to group management
69 \param BaseTag and MaxTag define the range of tags to be used.
70 Tags are managed by MPIAccess. They are cyclically incremented.
71 When there is a Send or a Receive operation there is a new RequestId tag returned
72 to the caller. That RequestId may be used to manage the operation Wait, Check of
73 status etc... The MPITag internally managed by MPIAccess is used as "tag" argument
77 MPIAccess::MPIAccess(MPIProcessorGroup * ProcessorGroup, int BaseTag, int MaxTag) :
78 _comm_interface( ProcessorGroup->getCommInterface() ) ,
79 _intra_communicator( ProcessorGroup->getComm() )
83 //MPI_Attr_get does not run with _IntraCommunicator ???
84 //MPI_Attr_get(*_IntraCommunicator,MPI_TAG_UB,&mpitagub,&flag) ;
85 MPI_Attr_get(MPI_COMM_WORLD,MPI_TAG_UB,&mpitagub,&flag) ;
86 mpitagub=abs(mpitagub);
88 BaseTag = (BaseTag/MODULO_TAG)*MODULO_TAG ;
90 MaxTag = (mpitagub/MODULO_TAG-1)*MODULO_TAG ;
91 MPI_Comm_rank( *_intra_communicator, &_my_rank ) ;
92 if ( !flag | (BaseTag < 0) | (BaseTag >= MaxTag) | (MaxTag > mpitagub) )
93 throw INTERP_KERNEL::Exception("wrong call to MPIAccess constructor");
95 _processor_group = ProcessorGroup ;
96 _processor_group_size = _processor_group->size() ;
100 _max_request = std::numeric_limits<int>::max() ;
101 _request = _base_request ;
103 _base_MPI_tag = BaseTag ;
104 _max_MPI_tag = MaxTag ;
106 _send_request = new int[ _processor_group_size ] ;
107 _recv_request = new int[ _processor_group_size ] ;
109 _send_requests.resize( _processor_group_size ) ;
110 _recv_requests.resize( _processor_group_size ) ;
112 _send_MPI_tag = new int[ _processor_group_size ] ;
113 _recv_MPI_Tag = new int[ _processor_group_size ] ;
115 for (i = 0 ; i < _processor_group_size ; i++ )
117 _send_request[ i ] = _max_request ;
118 _recv_request[ i ] = _max_request ;
119 _send_requests[ i ].resize(0) ;
120 _recv_requests[ i ].resize(0) ;
121 _send_MPI_tag[ i ] = _max_MPI_tag ;
122 _recv_MPI_Tag[ i ] = _max_MPI_tag ;
124 MPI_Datatype array_of_types[3] ;
125 array_of_types[0] = MPI_DOUBLE ;
126 array_of_types[1] = MPI_DOUBLE ;
127 array_of_types[2] = MPI_INT ;
128 int array_of_blocklengths[3] ;
129 array_of_blocklengths[0] = 1 ;
130 array_of_blocklengths[1] = 1 ;
131 array_of_blocklengths[2] = 1 ;
132 MPI_Aint array_of_displacements[3] ;
133 array_of_displacements[0] = 0 ;
134 array_of_displacements[1] = sizeof(double) ;
135 array_of_displacements[2] = 2*sizeof(double) ;
136 MPI_Type_struct(3, array_of_blocklengths, array_of_displacements,
137 array_of_types, &_MPI_TIME) ;
138 MPI_Type_commit(&_MPI_TIME) ;
141 MPIAccess::~MPIAccess()
143 delete [] _send_request ;
144 delete [] _recv_request ;
145 delete [] _send_MPI_tag ;
146 delete [] _recv_MPI_Tag ;
147 MPI_Type_free(&_MPI_TIME) ;
151 MPIAccess and "RequestIds" :
152 ============================
154 . WARNING : In the specification document, the distinction
155 between "MPITags" and "RequestIds" is not clear. "MPITags"
156 are arguments of calls to MPI. "RequestIds" does not concern
157 calls to MPI. "RequestIds" are named "tag"as arguments in/out
158 in the MPIAccess API in the specification documentation.
159 But in the implementation we have the right name RequestId (or
160 RecvRequestId/SendRequestId).
162 . When we have a MPI write/read request via MPIAccess, we get
163 an identifier "RequestId".
164 That identifier matches a structure RequestStruct of
165 MPIAccess. The access to that structure is done with the map
166 "_MapOfRequestStruct".
167 That structure RequestStruct give the possibility to manage
168 the structures MPI_Request and MPI_Status * of MPI. It give
169 also the possibility to get informations about that request :
170 target, send/recv, tag, [a]synchronous, type, outcount.
172 . That identifier is used to control an asynchronous request
173 via MPIAccess : Wait, Test, Probe, etc...
175 . In practise "RequestId" is simply an integer fo the interval
176 [0 , 2**32-1]. There is only one such a cyclic for
177 [I]Sends and [I]Recvs.
179 . That "RequestIds" and their associated structures give an easy
180 way to manage asynchronous communications.
181 For example we have mpi_access->Wait( int RequestId ) instead of
182 MPI_Wait(MPI_Request *request, MPI_Status *status).
184 . The API of MPIAccess may give the "SendRequestIds" of a "target",
185 the "RecvRequestIds" from a "source" or the "SendRequestIds" of
186 all "targets" or the "RecvRequestIds" of all "sources".
187 That avoid to manage them in Presentation-ParaMEDMEM.
190 int MPIAccess::newRequest( MPI_Datatype datatype, int tag , int destsourcerank ,
191 bool fromsourcerank , bool asynchronous )
193 RequestStruct *mpiaccessstruct = new RequestStruct;
194 mpiaccessstruct->MPITag = tag ;
195 mpiaccessstruct->MPIDatatype = datatype ;
196 mpiaccessstruct->MPITarget = destsourcerank ;
197 mpiaccessstruct->MPIIsRecv = fromsourcerank ;
198 MPI_Status *aStatus = new MPI_Status ;
199 mpiaccessstruct->MPIStatus = aStatus ;
200 mpiaccessstruct->MPIAsynchronous = asynchronous ;
201 mpiaccessstruct->MPICompleted = !asynchronous ;
202 mpiaccessstruct->MPIOutCount = -1 ;
205 mpiaccessstruct->MPIRequest = MPI_REQUEST_NULL ;
206 mpiaccessstruct->MPIStatus->MPI_SOURCE = destsourcerank ;
207 mpiaccessstruct->MPIStatus->MPI_TAG = tag ;
208 mpiaccessstruct->MPIStatus->MPI_ERROR = MPI_SUCCESS ;
210 if ( _request == _max_request )
211 _request = _base_request ;
213 _map_of_request_struct[_request] = mpiaccessstruct ;
214 if ( fromsourcerank )
215 _recv_request[ destsourcerank ] = _request;
217 _send_request[ destsourcerank ] = _request;
219 cout << "NewRequest" << _my_rank << "( " << _request << " ) "
220 << mpiaccessstruct << endl ;
225 MPIAccess and "tags" (or "MPITags") :
226 =====================================
228 . The constructor give the possibility to choose an interval of
229 tags to use : [BaseTag , MaxTag].
230 The default is [ 0 , MPI_TAG_UB], MPI_TAG_UB being the maximum
231 value in an implementation of MPI (minimum 32767 = 2**15-1).
232 On awa with the implementation lam MPI_TAG_UB value is
233 7353944. The norma MPI specify that value is the same in all
234 processes started by mpirun.
235 In the case of the use of the same IntraCommunicator in a process
236 for several distinct data flows (or for several IntraCommunicators
237 with common processes), that permits to avoid ambibuity
240 . In MPIAccess the tags have two parts (#define MODULO_TAG 10) :
241 + The last decimal digit decimal correspond to MPI_DataType ( 1 for
242 TimeMessages, 2 for MPI_INT and 3 for MPI_DOUBLE)
243 + The value of other digits correspond to a circular numero for each
245 + A TimeMessage and the associated DataMessage have the same numero
246 (but the types are different and the tags also).
248 . For a Send of a message from a process "source" to a process
249 "target", we have _send_MPI_tag[target] in the process
250 source (it contains the last "tag" used for the Send of a pour l'envoi de
251 message to the process target).
252 And in the process "target" which receive that message, we have
253 _recv_MPI_Tag[source] (it contains the last "tag" used for the Recv
254 of messages from the process source).
255 Naturally in the MPI norma the values of that tags must be the same.
257 int MPIAccess::newSendTag( MPI_Datatype datatype, int destrank , int method ,
258 bool asynchronous, int &RequestId )
261 tag = incrTag( _send_MPI_tag[destrank] ) ;
262 tag = valTag( tag, method ) ;
263 _send_MPI_tag[ destrank ] = tag ;
264 RequestId = newRequest( datatype, tag, destrank , false , asynchronous ) ;
265 _send_request[ destrank ] = RequestId ;
266 _send_requests[ destrank ].push_back( RequestId ) ;
270 int MPIAccess::newRecvTag( MPI_Datatype datatype, int sourcerank , int method ,
271 bool asynchronous, int &RequestId )
274 tag = incrTag( _recv_MPI_Tag[sourcerank] ) ;
275 tag = valTag( tag, method ) ;
276 _recv_MPI_Tag[ sourcerank ] = tag ;
277 RequestId = newRequest( datatype, tag , sourcerank , true , asynchronous ) ;
278 _recv_request[ sourcerank ] = RequestId ;
279 _recv_requests[ sourcerank ].push_back( RequestId ) ;
283 // Returns the number of all SendRequestIds that may be used to allocate
284 // ArrayOfSendRequests for the call to SendRequestIds
285 int MPIAccess::sendRequestIdsSize()
288 for (int i = 0 ; i < _processor_group_size ; i++ )
289 size += _send_requests[ i ].size() ;
293 // Returns in ArrayOfSendRequests with the dimension "size" all the
295 int MPIAccess::sendRequestIds(int size, int *ArrayOfSendRequests)
299 for ( destrank = 0 ; destrank < _processor_group_size ; destrank++ )
301 list< int >::const_iterator iter ;
302 for (iter = _send_requests[ destrank ].begin() ; iter != _send_requests[destrank].end() ; iter++ )
303 ArrayOfSendRequests[i++] = *iter ;
308 // Returns the number of all RecvRequestIds that may be used to allocate
309 // ArrayOfRecvRequests for the call to RecvRequestIds
310 int MPIAccess::recvRequestIdsSize()
313 for (int i = 0 ; i < _processor_group_size ; i++ )
314 size += _recv_requests[ i ].size() ;
318 // Returns in ArrayOfRecvRequests with the dimension "size" all the
320 int MPIAccess::recvRequestIds(int size, int *ArrayOfRecvRequests)
324 for ( sourcerank = 0 ; sourcerank < _processor_group_size ; sourcerank++ )
326 list< int >::const_iterator iter ;
327 for (iter = _recv_requests[ sourcerank ].begin() ; iter != _recv_requests[sourcerank].end() ; iter++ )
328 ArrayOfRecvRequests[i++] = *iter ;
333 // Returns in ArrayOfSendRequests with the dimension "size" all the
334 // SendRequestIds to a destination rank
335 int MPIAccess::sendRequestIds(int destrank, int size, int *ArrayOfSendRequests)
337 if (size < (int)_send_requests[destrank].size() )
338 throw INTERP_KERNEL::Exception("wrong call to MPIAccess::SendRequestIds");
340 list< int >::const_iterator iter ;
341 for (iter = _send_requests[ destrank ].begin() ; iter != _send_requests[destrank].end() ; iter++ )
342 ArrayOfSendRequests[i++] = *iter ;
343 return _send_requests[destrank].size() ;
346 // Returns in ArrayOfRecvRequests with the dimension "size" all the
347 // RecvRequestIds from a sourcerank
348 int MPIAccess::recvRequestIds(int sourcerank, int size, int *ArrayOfRecvRequests)
350 if (size < (int)_recv_requests[sourcerank].size() )
351 throw INTERP_KERNEL::Exception("wrong call to MPIAccess::RecvRequestIds");
353 list< int >::const_iterator iter ;
354 _recv_requests[ sourcerank ] ;
355 for (iter = _recv_requests[ sourcerank ].begin() ; iter != _recv_requests[sourcerank].end() ; iter++ )
356 ArrayOfRecvRequests[i++] = *iter ;
357 return _recv_requests[sourcerank].size() ;
360 // Send in synchronous mode count values of type datatype from buffer to target
361 // (returns RequestId identifier even if the corresponding structure is deleted :
362 // it is only in order to have the same signature as the asynchronous mode)
363 int MPIAccess::send(void* buffer, int count, MPI_Datatype datatype, int target, int &RequestId)
365 int sts = MPI_SUCCESS ;
369 _MessageIdent aMethodIdent = methodId( datatype ) ;
370 int MPItag = newSendTag( datatype, target , aMethodIdent , false , RequestId ) ;
371 if ( aMethodIdent == _message_time )
373 TimeMessage *aTimeMsg = (TimeMessage *) buffer ;
374 aTimeMsg->tag = MPItag ;
376 deleteRequest( RequestId ) ;
377 sts = _comm_interface.send(buffer, count, datatype, target, MPItag,
378 *_intra_communicator ) ;
380 cout << "MPIAccess::Send" << _my_rank << " SendRequestId "
381 << RequestId << " count " << count << " target " << target
382 << " MPItag " << MPItag << endl ;
387 // Receive (read) in synchronous mode count values of type datatype in buffer from source
388 // (returns RequestId identifier even if the corresponding structure is deleted :
389 // it is only in order to have the same signature as the asynchronous mode)
390 // The output argument OutCount is optionnal : *OutCount <= count
391 int MPIAccess::recv(void* buffer, int count, MPI_Datatype datatype, int source, int &RequestId, int *OutCount)
393 int sts = MPI_SUCCESS ;
395 if ( OutCount != NULL )
399 _MessageIdent aMethodIdent = methodId( datatype ) ;
400 int MPItag = newRecvTag( datatype, source , aMethodIdent , false , RequestId ) ;
401 sts = _comm_interface.recv(buffer, count, datatype, source, MPItag,
402 *_intra_communicator , MPIStatus( RequestId ) ) ;
404 if ( sts == MPI_SUCCESS )
406 MPI_Datatype datatype = MPIDatatype( RequestId ) ;
407 _comm_interface.getCount(MPIStatus( RequestId ), datatype, &outcount ) ;
408 setMPIOutCount( RequestId , outcount ) ;
409 setMPICompleted( RequestId , true ) ;
410 deleteStatus( RequestId ) ;
412 if ( OutCount != NULL )
413 *OutCount = outcount ;
415 cout << "MPIAccess::Recv" << _my_rank << " RecvRequestId "
416 << RequestId << " count " << count << " source " << source
417 << " MPItag " << MPItag << endl ;
418 deleteRequest( RequestId ) ;
423 // Send in asynchronous mode count values of type datatype from buffer to target
424 // Returns RequestId identifier.
425 int MPIAccess::ISend(void* buffer, int count, MPI_Datatype datatype, int target, int &RequestId)
427 int sts = MPI_SUCCESS ;
431 _MessageIdent aMethodIdent = methodId( datatype ) ;
432 int MPItag = newSendTag( datatype, target , aMethodIdent , true , RequestId ) ;
433 if ( aMethodIdent == _message_time )
435 TimeMessage *aTimeMsg = (TimeMessage *) buffer ;
436 aTimeMsg->tag = MPItag ;
438 MPI_Request *aSendRequest = MPIRequest( RequestId ) ;
441 cout << "MPIAccess::ISend" << _my_rank << " ISendRequestId "
442 << RequestId << " count " << count << " target " << target
443 << " MPItag " << MPItag << endl ;
445 cout << "MPIAccess::ISend" << _my_rank << " time "
446 << ((TimeMessage *)buffer)->time << " " << ((TimeMessage *)buffer)->deltatime
449 sts = _comm_interface.Isend(buffer, count, datatype, target, MPItag,
450 *_intra_communicator , aSendRequest) ;
455 // Receive (read) in asynchronous mode count values of type datatype in buffer from source
456 // returns RequestId identifier.
457 int MPIAccess::IRecv(void* buffer, int count, MPI_Datatype datatype, int source, int &RequestId)
459 int sts = MPI_SUCCESS ;
463 _MessageIdent aMethodIdent = methodId( datatype ) ;
464 int MPItag = newRecvTag( datatype, source , aMethodIdent , true , RequestId ) ;
465 MPI_Request *aRecvRequest = MPIRequest( RequestId ) ;
468 cout << "MPIAccess::IRecv" << _my_rank << " IRecvRequestId "
469 << RequestId << " count " << count << " source " << source
470 << " MPItag " << MPItag << endl ;
472 cout << "MPIAccess::ISend" << _my_rank << " time "
473 << ((TimeMessage *)buffer)->time << " " << ((TimeMessage *)buffer)->deltatime
476 sts = _comm_interface.Irecv(buffer, count, datatype, source, MPItag,
477 *_intra_communicator , aRecvRequest) ;
482 // Perform a Send and a Recv in synchronous mode
483 int MPIAccess::sendRecv(void* sendbuf, int sendcount, MPI_Datatype sendtype,
484 int dest, int &SendRequestId,
485 void* recvbuf, int recvcount, MPI_Datatype recvtype,
486 int source, int &RecvRequestId, int *OutCount)
488 int sts = MPI_SUCCESS ;
492 sts = IRecv(recvbuf, recvcount, recvtype, source, RecvRequestId) ;
495 cout << "MPIAccess::SendRecv" << _my_rank << " IRecv RecvRequestId "
496 << RecvRequestId << endl ;
497 if ( sts == MPI_SUCCESS )
500 sts = send(sendbuf, sendcount, sendtype, dest, SendRequestId) ;
502 cout << "MPIAccess::SendRecv" << _my_rank << " Send SendRequestId "
503 << SendRequestId << endl ;
504 if ( sts == MPI_SUCCESS && recvcount )
506 sts = wait( RecvRequestId ) ;
507 outcount = MPIOutCount( RecvRequestId ) ;
509 cout << "MPIAccess::SendRecv" << _my_rank << " IRecv RecvRequestId "
510 << RecvRequestId << " outcount " << outcount << endl ;
513 if ( OutCount != NULL )
515 *OutCount = outcount ;
517 cout << "MPIAccess::SendRecv" << _my_rank << " *OutCount = " << *OutCount
520 deleteRequest( RecvRequestId ) ;
524 // Perform a Send and a Recv in asynchronous mode
525 int MPIAccess::ISendRecv(void* sendbuf, int sendcount, MPI_Datatype sendtype,
526 int dest, int &SendRequestId,
527 void* recvbuf, int recvcount, MPI_Datatype recvtype,
528 int source, int &RecvRequestId)
530 int sts = MPI_SUCCESS ;
534 sts = IRecv(recvbuf, recvcount, recvtype, source, RecvRequestId) ;
535 if ( sts == MPI_SUCCESS )
537 sts = ISend(sendbuf, sendcount, sendtype, dest, SendRequestId) ;
541 // Perform a wait of a Send or Recv asynchronous Request
542 // Do nothing for a synchronous Request
543 // Manage MPI_Request * and MPI_Status * structure
544 int MPIAccess::wait( int RequestId )
546 int status = MPI_SUCCESS ;
547 if ( !MPICompleted( RequestId ) )
549 if ( *MPIRequest( RequestId ) != MPI_REQUEST_NULL )
552 cout << "MPIAccess::Wait" << _my_rank << " -> wait( " << RequestId
553 << " ) MPIRequest " << MPIRequest( RequestId ) << " MPIStatus "
554 << MPIStatus( RequestId ) << " MPITag " << MPITag( RequestId )
555 << " MPIIsRecv " << MPIIsRecv( RequestId ) << endl ;
556 status = _comm_interface.wait(MPIRequest( RequestId ), MPIStatus( RequestId )) ;
561 cout << "MPIAccess::Wait" << _my_rank << " MPIRequest == MPI_REQUEST_NULL"
564 setMPICompleted( RequestId , true ) ;
565 if ( MPIIsRecv( RequestId ) && MPIStatus( RequestId ) )
567 MPI_Datatype datatype = MPIDatatype( RequestId ) ;
569 status = _comm_interface.getCount(MPIStatus( RequestId ), datatype,
571 if ( status == MPI_SUCCESS )
573 setMPIOutCount( RequestId , outcount ) ;
574 deleteStatus( RequestId ) ;
576 cout << "MPIAccess::Wait" << _my_rank << " RequestId " << RequestId
577 << "MPIIsRecv " << MPIIsRecv( RequestId ) << " outcount " << outcount
583 cout << "MPIAccess::Wait" << _my_rank << " MPIIsRecv "
584 << MPIIsRecv( RequestId ) << " outcount " << outcount << endl ;
590 cout << "MPIAccess::Wait" << _my_rank << " MPIIsRecv " << MPIIsRecv( RequestId )
591 << " MPIOutCount " << MPIOutCount( RequestId ) << endl ;
595 cout << "MPIAccess::Wait" << _my_rank << " RequestId " << RequestId
596 << " Request " << MPIRequest( RequestId )
597 << " Status " << MPIStatus( RequestId ) << " MPICompleted "
598 << MPICompleted( RequestId ) << " MPIOutCount " << MPIOutCount( RequestId )
603 // Perform a "test" of a Send or Recv asynchronous Request
604 // If the request is done, returns true in the flag argument
605 // If the request is not finished, returns false in the flag argument
606 // Do nothing for a synchronous Request
607 // Manage MPI_request * and MPI_status * structure
608 int MPIAccess::test(int RequestId, int &flag)
610 int status = MPI_SUCCESS ;
611 flag = MPICompleted( RequestId ) ;
613 cout << "MPIAccess::Test" << _my_rank << " flag " << flag ;
614 if ( MPIIsRecv( RequestId ) )
625 cout << "Request" << RequestId << " " << MPIRequest( RequestId )
626 << " Status " << MPIStatus( RequestId ) << endl ;
629 if ( *MPIRequest( RequestId ) != MPI_REQUEST_NULL )
632 cout << "MPIAccess::Test" << _my_rank << " -> test( " << RequestId
633 << " ) MPIRequest " << MPIRequest( RequestId )
634 << " MPIStatus " << MPIStatus( RequestId )
635 << " MPITag " << MPITag( RequestId )
636 << " MPIIsRecv " << MPIIsRecv( RequestId ) << endl ;
637 status = _comm_interface.test(MPIRequest( RequestId ), &flag,
638 MPIStatus( RequestId )) ;
643 cout << "MPIAccess::Test" << _my_rank << " MPIRequest == MPI_REQUEST_NULL"
648 setMPICompleted( RequestId , true ) ;
649 if ( MPIIsRecv( RequestId ) && MPIStatus( RequestId ) )
652 MPI_Datatype datatype = MPIDatatype( RequestId ) ;
653 status = _comm_interface.getCount( MPIStatus( RequestId ), datatype,
655 if ( status == MPI_SUCCESS )
657 setMPIOutCount( RequestId , outcount ) ;
658 deleteStatus( RequestId ) ;
660 cout << "MPIAccess::Test" << _my_rank << " MPIIsRecv "
661 << MPIIsRecv( RequestId ) << " outcount " << outcount << endl ;
666 cout << "MPIAccess::Test" << _my_rank << " MPIIsRecv "
667 << MPIIsRecv( RequestId ) << " outcount " << outcount << endl ;
673 cout << "MPIAccess::Test" << _my_rank << " MPIIsRecv "
674 << MPIIsRecv( RequestId ) << " MPIOutCount "
675 << MPIOutCount( RequestId ) << endl ;
680 cout << "MPIAccess::Test" << _my_rank << " RequestId " << RequestId
681 << " flag " << flag << " MPICompleted " << MPICompleted( RequestId )
682 << " MPIOutCount " << MPIOutCount( RequestId ) << endl ;
686 int MPIAccess::waitAny(int count, int *array_of_RequestIds, int &RequestId)
688 int status = MPI_ERR_OTHER ;
690 cout << "MPIAccess::WaitAny not yet implemented" << endl ;
694 int MPIAccess::testAny(int count, int *array_of_RequestIds, int &RequestId, int &flag)
696 int status = MPI_ERR_OTHER ;
699 cout << "MPIAccess::TestAny not yet implemented" << endl ;
703 // Perform a wait of each Send or Recv asynchronous Request of the array
704 // array_of_RequestIds of size "count".
705 // That array may be filled with a call to SendRequestIdsSize or RecvRequestIdsSize
706 // Do nothing for a synchronous Request
707 // Manage MPI_Request * and MPI_Status * structure
708 int MPIAccess::waitAll(int count, int *array_of_RequestIds)
711 cout << "WaitAll" << _my_rank << " : count " << count << endl ;
713 int retstatus = MPI_SUCCESS ;
715 for ( i = 0 ; i < count ; i++ )
718 cout << "WaitAll" << _my_rank << " " << i << " -> Wait( "
719 << array_of_RequestIds[i] << " )" << endl ;
720 status = wait( array_of_RequestIds[i] ) ;
721 if ( status != MPI_SUCCESS )
725 cout << "EndWaitAll" << _my_rank << endl ;
729 // Perform a "test" of each Send or Recv asynchronous Request of the array
730 // array_of_RequestIds of size "count".
731 // That array may be filled with a call to SendRequestIdsSize or RecvRequestIdsSize
732 // If all requests are done, returns true in the flag argument
733 // If all requests are not finished, returns false in the flag argument
734 // Do nothing for a synchronous Request
735 // Manage MPI_Request * and MPI_Status * structure
736 int MPIAccess::testAll(int count, int *array_of_RequestIds, int &flag)
739 cout << "TestAll" << _my_rank << " : count " << count << endl ;
741 int retstatus = MPI_SUCCESS ;
742 bool retflag = true ;
744 for ( i = 0 ; i < count ; i++ )
746 status = test( array_of_RequestIds[i] , flag ) ;
747 retflag = retflag && (flag != 0) ;
748 if ( status != MPI_SUCCESS )
753 cout << "EndTestAll" << _my_rank << endl ;
757 int MPIAccess::waitSome(int count, int *array_of_RequestIds, int outcount,
758 int *outarray_of_RequestIds)
760 int status = MPI_ERR_OTHER ;
761 cout << "MPIAccess::WaitSome not yet implemented" << endl ;
765 int MPIAccess::testSome(int count, int *array_of_RequestIds, int outcounts,
766 int *outarray_of_RequestIds)
768 int status = MPI_ERR_OTHER ;
769 cout << "MPIAccess::TestSome not yet implemented" << endl ;
773 // Probe checks if a message is available for read from FromSource rank.
774 // Returns the corresponding source, MPITag, datatype and outcount
775 // Probe is a blocking call which wait until a message is available
776 int MPIAccess::probe(int FromSource, int &source, int &MPITag,
777 MPI_Datatype &myDatatype, int &outcount)
779 MPI_Status aMPIStatus ;
780 int sts = _comm_interface.probe( FromSource, MPI_ANY_TAG,
781 *_intra_communicator , &aMPIStatus ) ;
782 if ( sts == MPI_SUCCESS )
784 source = aMPIStatus.MPI_SOURCE ;
785 MPITag = aMPIStatus.MPI_TAG ;
786 int MethodId = (MPITag % MODULO_TAG) ;
787 myDatatype = datatype( (ParaMEDMEM::_MessageIdent) MethodId ) ;
788 _comm_interface.getCount(&aMPIStatus, myDatatype, &outcount ) ;
790 cout << "MPIAccess::Probe" << _my_rank << " FromSource " << FromSource
791 << " source " << source << " MPITag " << MPITag << " MethodId "
792 << MethodId << " datatype " << myDatatype << " outcount " << outcount
805 // IProbe checks if a message is available for read from FromSource rank.
806 // If there is a message available, returns the corresponding source,
807 // MPITag, datatype and outcount with flag = true
808 // If not, returns flag = false
809 int MPIAccess::IProbe(int FromSource, int &source, int &MPITag,
810 MPI_Datatype &myDataType, int &outcount, int &flag)
812 MPI_Status aMPIStatus ;
813 int sts = _comm_interface.Iprobe( FromSource, MPI_ANY_TAG,
814 *_intra_communicator , &flag,
816 if ( sts == MPI_SUCCESS && flag )
818 source = aMPIStatus.MPI_SOURCE ;
819 MPITag = aMPIStatus.MPI_TAG ;
820 int MethodId = (MPITag % MODULO_TAG) ;
821 myDataType = datatype( (ParaMEDMEM::_MessageIdent) MethodId ) ;
822 _comm_interface.getCount(&aMPIStatus, myDataType, &outcount ) ;
824 cout << "MPIAccess::IProbe" << _my_rank << " FromSource " << FromSource
825 << " source " << source << " MPITag " << MPITag << " MethodId "
826 << MethodId << " datatype " << myDataType << " outcount " << outcount
827 << " flag " << flag << endl ;
839 // Cancel concerns a "posted" asynchronous IRecv
840 // Returns flag = true if the receiving request was successfully canceled
841 // Returns flag = false if the receiving request was finished but not canceled
842 // Use cancel, wait and test_cancelled of the MPI API
843 int MPIAccess::cancel( int RecvRequestId, int &flag )
846 int sts = _comm_interface.cancel( MPIRequest( RecvRequestId ) ) ;
847 if ( sts == MPI_SUCCESS )
849 sts = _comm_interface.wait( MPIRequest( RecvRequestId ) ,
850 MPIStatus( RecvRequestId ) ) ;
851 if ( sts == MPI_SUCCESS )
852 sts = _comm_interface.testCancelled( MPIStatus( RecvRequestId ) , &flag ) ;
857 // Cancel concerns a "pending" receiving message (without IRecv "posted")
858 // Returns flag = true if the message was successfully canceled
859 // Returns flag = false if the receiving request was finished but not canceled
860 // Use Irecv, cancel, wait and test_cancelled of the MPI API
861 int MPIAccess::cancel( int source, int theMPITag, MPI_Datatype datatype, int outcount, int &flag )
866 sts = MPI_Type_extent( datatype , &extent ) ;
867 if ( sts == MPI_SUCCESS )
869 void * recvbuf = malloc( extent*outcount ) ;
870 MPI_Request aRecvRequest ;
872 cout << "MPIAccess::Cancel" << _my_rank << " Irecv extent " << extent
873 << " datatype " << datatype << " source " << source << " theMPITag "
874 << theMPITag << endl ;
875 sts = _comm_interface.Irecv( recvbuf, outcount, datatype, source, theMPITag,
876 *_intra_communicator , &aRecvRequest ) ;
877 if ( sts == MPI_SUCCESS )
879 sts = _comm_interface.cancel( &aRecvRequest ) ;
881 cout << "MPIAccess::Cancel" << _my_rank << " theMPITag " << theMPITag
882 << " cancel done" << endl ;
883 if ( sts == MPI_SUCCESS )
887 cout << "MPIAccess::Cancel" << _my_rank << " wait" << endl ;
888 sts = _comm_interface.wait( &aRecvRequest , &aStatus ) ;
889 if ( sts == MPI_SUCCESS )
892 cout << "MPIAccess::Cancel" << _my_rank << " test_cancelled" << endl ;
893 sts = _comm_interface.testCancelled( &aStatus , &flag ) ;
897 if ( _trace && datatype == timeType() )
898 cout << "MPIAccess::Cancel" << _my_rank << " time "
899 << ((TimeMessage *) recvbuf)->time << " "
900 << ((TimeMessage *) recvbuf)->deltatime << endl ;
904 cout << "MPIAccess::Cancel" << _my_rank << " flag " << flag << endl ;
909 // CancelAll concerns all "pending" receiving message (without IRecv "posted")
910 // CancelAll use IProbe and Cancel (see obove)
911 int MPIAccess::cancelAll()
913 int sts = MPI_SUCCESS ;
917 MPI_Datatype datatype ;
920 for ( target = 0 ; target < _processor_group_size ; target++ )
922 sts = IProbe(target, source, MPITag, datatype, outcount, flag) ;
923 if ( sts == MPI_SUCCESS && flag )
925 sts = cancel(source, MPITag, datatype, outcount, flag) ;
927 cout << "MPIAccess::CancelAll" << _my_rank << " source " << source
928 << " MPITag " << MPITag << " datatype " << datatype
929 << " outcount " << outcount << " Cancel flag " << flag << endl ;
930 if ( sts != MPI_SUCCESS )
933 else if ( sts != MPI_SUCCESS )
939 // Same as barrier of MPI API
940 int MPIAccess::barrier()
942 int status = _comm_interface.barrier( *_intra_communicator ) ;
946 // Same as Error_string of MPI API
947 int MPIAccess::errorString(int errorcode, char *string, int *resultlen) const
949 return _comm_interface.errorString( errorcode, string, resultlen) ;
952 // Returns source, tag, error and outcount corresponding to receiving RequestId
953 // By default the corresponding structure of RequestId is deleted
954 int MPIAccess::status(int RequestId, int &source, int &tag, int &error,
955 int &outcount, bool keepRequestStruct)
957 MPI_Status *myStatus = MPIStatus( RequestId ) ;
959 cout << "MPIAccess::status" << _my_rank << " RequestId " << RequestId
960 << " status " << myStatus << endl ;
961 if ( myStatus != NULL && MPIAsynchronous( RequestId ) &&
962 MPICompleted( RequestId ) )
964 if ( MPIIsRecv( RequestId ) )
966 source = myStatus->MPI_SOURCE ;
967 tag = myStatus->MPI_TAG ;
968 error = myStatus->MPI_ERROR ;
969 MPI_Datatype datatype = MPIDatatype( RequestId ) ;
970 _comm_interface.getCount(myStatus, datatype, &outcount ) ;
972 cout << "MPIAccess::status" << _my_rank << " RequestId " << RequestId
973 << " status " << myStatus << " outcount " << outcount << endl ;
974 setMPIOutCount( RequestId , outcount ) ;
978 source = MPITarget( RequestId ) ;
979 tag = MPITag( RequestId ) ;
981 outcount = MPIOutCount( RequestId ) ;
983 if ( !keepRequestStruct )
984 deleteRequest( RequestId ) ;
989 source = MPITarget( RequestId ) ;
990 tag = MPITag( RequestId ) ;
992 outcount = MPIOutCount( RequestId ) ;
997 int MPIAccess::requestFree( MPI_Request *request )
999 return _comm_interface.requestFree( request ) ;
1002 // Print all informations of all known requests for debugging purpose
1003 void MPIAccess::check() const
1006 map< int , RequestStruct * >::const_iterator MapOfRequestStructiterator ;
1007 cout << "MPIAccess::Check" << _my_rank << "_map_of_request_struct_size "
1008 << _map_of_request_struct.size() << endl ;
1009 for ( MapOfRequestStructiterator = _map_of_request_struct.begin() ;
1010 MapOfRequestStructiterator != _map_of_request_struct.end() ;
1011 MapOfRequestStructiterator++ )
1013 if ( MapOfRequestStructiterator->second != NULL )
1015 cout << " Check" << _my_rank << " " << i << ". Request"
1016 << MapOfRequestStructiterator->first << "-->" ;
1017 if ( (MapOfRequestStructiterator->second)->MPIAsynchronous )
1019 if ( (MapOfRequestStructiterator->second)->MPIIsRecv )
1020 cout << "Recv from " ;
1022 cout << "Send to " ;
1023 cout << (MapOfRequestStructiterator->second)->MPITarget
1024 << " MPITag " << (MapOfRequestStructiterator->second)->MPITag
1025 << " DataType " << (MapOfRequestStructiterator->second)->MPIDatatype
1026 << " Request " << (MapOfRequestStructiterator->second)->MPIRequest
1027 << " Status " << (MapOfRequestStructiterator->second)->MPIStatus
1028 << " Completed " << (MapOfRequestStructiterator->second)->MPICompleted
1035 // Outputs fields of a TimeMessage structure
1036 ostream & operator<< (ostream & f ,const TimeMessage & aTimeMsg )
1038 f << " time " << aTimeMsg.time << " deltatime " << aTimeMsg.deltatime
1039 << " tag " << aTimeMsg.tag ;
1043 // Outputs the DataType coded in a Tag
1044 ostream & operator<< (ostream & f ,const _MessageIdent & methodtype )
1048 case _message_time :
1049 f << " MethodTime ";
1054 case _message_double :
1055 f << " MPI_DOUBLE ";
1058 f << " UnknownMethodType ";