1 // Copyright (C) 2007-2014 CEA/DEN, EDF R&D
3 // This library is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU Lesser General Public
5 // License as published by the Free Software Foundation; either
6 // version 2.1 of the License, or (at your option) any later version.
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 // Lesser General Public License for more details.
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
20 #include "MPIAccess.hxx"
21 #include "InterpolationUtils.hxx"
29 /*! \defgroup mpi_access MPIAccess
30 Class \a MPIAccess is the gateway to the MPI library.
31 It is a helper class that gathers the calls to the MPI
32 library that are made in the ParaMEDMEM library. This gathering
33 allows easier gathering of information about the communication
34 in the library. With MPIAccess, tags are managed automatically
35 and asynchronous operations are easier.
37 It is typically called after the MPI_Init() call in a program. It is afterwards passed as a parameter to the constructors of ParaMEDMEM objects so that they access the MPI library via the MPIAccess.
39 As an example, the following code initializes a processor group made of the zero processor.
42 #include "MPIAccess.hxx"
43 #include "ProcessorGroup.hxx"
45 int main(int argc, char** argv)
48 MPI_Init(&argc, &argv);
49 ParaMEDMEM::CommInterface comm_interface;
51 //setting up a processor group with proc 0
54 ParaMEDMEM::ProcessorGroup group(procs, comm_interface);
56 ParaMEDMEM::MPIAccess mpi_access(group);
65 /*! Creates a MPIAccess that is based on the processors included in \a ProcessorGroup.
66 This class may be called for easier use of MPI API.
68 \param ProcessorGroup MPIProcessorGroup object giving access to group management
69 \param BaseTag and MaxTag define the range of tags to be used.
70 Tags are managed by MPIAccess. They are cyclically incremented.
71 When there is a Send or a Receive operation there is a new RequestId tag returned
72 to the caller. That RequestId may be used to manage the operation Wait, Check of
73 status etc... The MPITag internally managed by MPIAccess is used as "tag" argument
77 MPIAccess::MPIAccess(MPIProcessorGroup * ProcessorGroup, int BaseTag, int MaxTag) :
78 _comm_interface( ProcessorGroup->getCommInterface() ) ,
79 _intra_communicator( ProcessorGroup->getComm() )
84 //MPI_Comm_get_attr does not run with _IntraCommunicator ???
85 //MPI_Comm_get_attr(*_IntraCommunicator,MPID_TAG_UB,&mpitagub,&flag) ;
86 MPI_Comm_get_attr(MPI_COMM_WORLD,MPI_TAG_UB,&v,&flag) ;
87 mpitagub=*(reinterpret_cast<int*>(v));
89 BaseTag = (BaseTag/MODULO_TAG)*MODULO_TAG ;
91 MaxTag = (mpitagub/MODULO_TAG-1)*MODULO_TAG ;
92 MPI_Comm_rank( *_intra_communicator, &_my_rank ) ;
93 if ( !flag | (BaseTag < 0) | (BaseTag >= MaxTag) | (MaxTag > mpitagub) )
94 throw INTERP_KERNEL::Exception("wrong call to MPIAccess constructor");
96 _processor_group = ProcessorGroup ;
97 _processor_group_size = _processor_group->size() ;
101 _max_request = std::numeric_limits<int>::max() ;
102 _request = _base_request ;
104 _base_MPI_tag = BaseTag ;
105 _max_MPI_tag = MaxTag ;
107 _send_request = new int[ _processor_group_size ] ;
108 _recv_request = new int[ _processor_group_size ] ;
110 _send_requests.resize( _processor_group_size ) ;
111 _recv_requests.resize( _processor_group_size ) ;
113 _send_MPI_tag = new int[ _processor_group_size ] ;
114 _recv_MPI_Tag = new int[ _processor_group_size ] ;
116 for (i = 0 ; i < _processor_group_size ; i++ )
118 _send_request[ i ] = _max_request ;
119 _recv_request[ i ] = _max_request ;
120 _send_requests[ i ].resize(0) ;
121 _recv_requests[ i ].resize(0) ;
122 _send_MPI_tag[ i ] = _max_MPI_tag ;
123 _recv_MPI_Tag[ i ] = _max_MPI_tag ;
125 MPI_Datatype array_of_types[3] ;
126 array_of_types[0] = MPI_DOUBLE ;
127 array_of_types[1] = MPI_DOUBLE ;
128 array_of_types[2] = MPI_INT ;
129 int array_of_blocklengths[3] ;
130 array_of_blocklengths[0] = 1 ;
131 array_of_blocklengths[1] = 1 ;
132 array_of_blocklengths[2] = 1 ;
133 MPI_Aint array_of_displacements[3] ;
134 array_of_displacements[0] = 0 ;
135 array_of_displacements[1] = sizeof(double) ;
136 array_of_displacements[2] = 2*sizeof(double) ;
137 MPI_Type_struct(3, array_of_blocklengths, array_of_displacements,
138 array_of_types, &_MPI_TIME) ;
139 MPI_Type_commit(&_MPI_TIME) ;
142 MPIAccess::~MPIAccess()
144 delete [] _send_request ;
145 delete [] _recv_request ;
146 delete [] _send_MPI_tag ;
147 delete [] _recv_MPI_Tag ;
148 MPI_Type_free(&_MPI_TIME) ;
152 MPIAccess and "RequestIds" :
153 ============================
155 . WARNING : In the specification document, the distinction
156 between "MPITags" and "RequestIds" is not clear. "MPITags"
157 are arguments of calls to MPI. "RequestIds" does not concern
158 calls to MPI. "RequestIds" are named "tag"as arguments in/out
159 in the MPIAccess API in the specification documentation.
160 But in the implementation we have the right name RequestId (or
161 RecvRequestId/SendRequestId).
163 . When we have a MPI write/read request via MPIAccess, we get
164 an identifier "RequestId".
165 That identifier matches a structure RequestStruct of
166 MPIAccess. The access to that structure is done with the map
167 "_MapOfRequestStruct".
168 That structure RequestStruct give the possibility to manage
169 the structures MPI_Request and MPI_Status * of MPI. It give
170 also the possibility to get informations about that request :
171 target, send/recv, tag, [a]synchronous, type, outcount.
173 . That identifier is used to control an asynchronous request
174 via MPIAccess : Wait, Test, Probe, etc...
176 . In practise "RequestId" is simply an integer fo the interval
177 [0 , 2**32-1]. There is only one such a cyclic for
178 [I]Sends and [I]Recvs.
180 . That "RequestIds" and their associated structures give an easy
181 way to manage asynchronous communications.
182 For example we have mpi_access->Wait( int RequestId ) instead of
183 MPI_Wait(MPI_Request *request, MPI_Status *status).
185 . The API of MPIAccess may give the "SendRequestIds" of a "target",
186 the "RecvRequestIds" from a "source" or the "SendRequestIds" of
187 all "targets" or the "RecvRequestIds" of all "sources".
188 That avoid to manage them in Presentation-ParaMEDMEM.
191 int MPIAccess::newRequest( MPI_Datatype datatype, int tag , int destsourcerank ,
192 bool fromsourcerank , bool asynchronous )
194 RequestStruct *mpiaccessstruct = new RequestStruct;
195 mpiaccessstruct->MPITag = tag ;
196 mpiaccessstruct->MPIDatatype = datatype ;
197 mpiaccessstruct->MPITarget = destsourcerank ;
198 mpiaccessstruct->MPIIsRecv = fromsourcerank ;
199 MPI_Status *aStatus = new MPI_Status ;
200 mpiaccessstruct->MPIStatus = aStatus ;
201 mpiaccessstruct->MPIAsynchronous = asynchronous ;
202 mpiaccessstruct->MPICompleted = !asynchronous ;
203 mpiaccessstruct->MPIOutCount = -1 ;
206 mpiaccessstruct->MPIRequest = MPI_REQUEST_NULL ;
207 mpiaccessstruct->MPIStatus->MPI_SOURCE = destsourcerank ;
208 mpiaccessstruct->MPIStatus->MPI_TAG = tag ;
209 mpiaccessstruct->MPIStatus->MPI_ERROR = MPI_SUCCESS ;
211 if ( _request == _max_request )
212 _request = _base_request ;
214 _map_of_request_struct[_request] = mpiaccessstruct ;
215 if ( fromsourcerank )
216 _recv_request[ destsourcerank ] = _request;
218 _send_request[ destsourcerank ] = _request;
220 cout << "NewRequest" << _my_rank << "( " << _request << " ) "
221 << mpiaccessstruct << endl ;
226 MPIAccess and "tags" (or "MPITags") :
227 =====================================
229 . The constructor give the possibility to choose an interval of
230 tags to use : [BaseTag , MaxTag].
231 The default is [ 0 , MPI_TAG_UB], MPI_TAG_UB being the maximum
232 value in an implementation of MPI (minimum 32767 = 2**15-1).
233 On awa with the implementation lam MPI_TAG_UB value is
234 7353944. The norma MPI specify that value is the same in all
235 processes started by mpirun.
236 In the case of the use of the same IntraCommunicator in a process
237 for several distinct data flows (or for several IntraCommunicators
238 with common processes), that permits to avoid ambibuity
241 . In MPIAccess the tags have two parts (#define MODULO_TAG 10) :
242 + The last decimal digit decimal correspond to MPI_DataType ( 1 for
243 TimeMessages, 2 for MPI_INT and 3 for MPI_DOUBLE)
244 + The value of other digits correspond to a circular numero for each
246 + A TimeMessage and the associated DataMessage have the same numero
247 (but the types are different and the tags also).
249 . For a Send of a message from a process "source" to a process
250 "target", we have _send_MPI_tag[target] in the process
251 source (it contains the last "tag" used for the Send of a pour l'envoi de
252 message to the process target).
253 And in the process "target" which receive that message, we have
254 _recv_MPI_Tag[source] (it contains the last "tag" used for the Recv
255 of messages from the process source).
256 Naturally in the MPI norma the values of that tags must be the same.
258 int MPIAccess::newSendTag( MPI_Datatype datatype, int destrank , int method ,
259 bool asynchronous, int &RequestId )
262 tag = incrTag( _send_MPI_tag[destrank] ) ;
263 tag = valTag( tag, method ) ;
264 _send_MPI_tag[ destrank ] = tag ;
265 RequestId = newRequest( datatype, tag, destrank , false , asynchronous ) ;
266 _send_request[ destrank ] = RequestId ;
267 _send_requests[ destrank ].push_back( RequestId ) ;
271 int MPIAccess::newRecvTag( MPI_Datatype datatype, int sourcerank , int method ,
272 bool asynchronous, int &RequestId )
275 tag = incrTag( _recv_MPI_Tag[sourcerank] ) ;
276 tag = valTag( tag, method ) ;
277 _recv_MPI_Tag[ sourcerank ] = tag ;
278 RequestId = newRequest( datatype, tag , sourcerank , true , asynchronous ) ;
279 _recv_request[ sourcerank ] = RequestId ;
280 _recv_requests[ sourcerank ].push_back( RequestId ) ;
284 // Returns the number of all SendRequestIds that may be used to allocate
285 // ArrayOfSendRequests for the call to SendRequestIds
286 int MPIAccess::sendRequestIdsSize()
289 for (int i = 0 ; i < _processor_group_size ; i++ )
290 size += _send_requests[ i ].size() ;
294 // Returns in ArrayOfSendRequests with the dimension "size" all the
296 int MPIAccess::sendRequestIds(int size, int *ArrayOfSendRequests)
300 for ( destrank = 0 ; destrank < _processor_group_size ; destrank++ )
302 list< int >::const_iterator iter ;
303 for (iter = _send_requests[ destrank ].begin() ; iter != _send_requests[destrank].end() ; iter++ )
304 ArrayOfSendRequests[i++] = *iter ;
309 // Returns the number of all RecvRequestIds that may be used to allocate
310 // ArrayOfRecvRequests for the call to RecvRequestIds
311 int MPIAccess::recvRequestIdsSize()
314 for (int i = 0 ; i < _processor_group_size ; i++ )
315 size += _recv_requests[ i ].size() ;
319 // Returns in ArrayOfRecvRequests with the dimension "size" all the
321 int MPIAccess::recvRequestIds(int size, int *ArrayOfRecvRequests)
325 for ( sourcerank = 0 ; sourcerank < _processor_group_size ; sourcerank++ )
327 list< int >::const_iterator iter ;
328 for (iter = _recv_requests[ sourcerank ].begin() ; iter != _recv_requests[sourcerank].end() ; iter++ )
329 ArrayOfRecvRequests[i++] = *iter ;
334 // Returns in ArrayOfSendRequests with the dimension "size" all the
335 // SendRequestIds to a destination rank
336 int MPIAccess::sendRequestIds(int destrank, int size, int *ArrayOfSendRequests)
338 if (size < (int)_send_requests[destrank].size() )
339 throw INTERP_KERNEL::Exception("wrong call to MPIAccess::SendRequestIds");
341 list< int >::const_iterator iter ;
342 for (iter = _send_requests[ destrank ].begin() ; iter != _send_requests[destrank].end() ; iter++ )
343 ArrayOfSendRequests[i++] = *iter ;
344 return _send_requests[destrank].size() ;
347 // Returns in ArrayOfRecvRequests with the dimension "size" all the
348 // RecvRequestIds from a sourcerank
349 int MPIAccess::recvRequestIds(int sourcerank, int size, int *ArrayOfRecvRequests)
351 if (size < (int)_recv_requests[sourcerank].size() )
352 throw INTERP_KERNEL::Exception("wrong call to MPIAccess::RecvRequestIds");
354 list< int >::const_iterator iter ;
355 _recv_requests[ sourcerank ] ;
356 for (iter = _recv_requests[ sourcerank ].begin() ; iter != _recv_requests[sourcerank].end() ; iter++ )
357 ArrayOfRecvRequests[i++] = *iter ;
358 return _recv_requests[sourcerank].size() ;
361 // Send in synchronous mode count values of type datatype from buffer to target
362 // (returns RequestId identifier even if the corresponding structure is deleted :
363 // it is only in order to have the same signature as the asynchronous mode)
364 int MPIAccess::send(void* buffer, int count, MPI_Datatype datatype, int target, int &RequestId)
366 int sts = MPI_SUCCESS ;
370 _MessageIdent aMethodIdent = methodId( datatype ) ;
371 int MPItag = newSendTag( datatype, target , aMethodIdent , false , RequestId ) ;
372 if ( aMethodIdent == _message_time )
374 TimeMessage *aTimeMsg = (TimeMessage *) buffer ;
375 aTimeMsg->tag = MPItag ;
377 deleteRequest( RequestId ) ;
378 sts = _comm_interface.send(buffer, count, datatype, target, MPItag,
379 *_intra_communicator ) ;
381 cout << "MPIAccess::Send" << _my_rank << " SendRequestId "
382 << RequestId << " count " << count << " target " << target
383 << " MPItag " << MPItag << endl ;
388 // Receive (read) in synchronous mode count values of type datatype in buffer from source
389 // (returns RequestId identifier even if the corresponding structure is deleted :
390 // it is only in order to have the same signature as the asynchronous mode)
391 // The output argument OutCount is optionnal : *OutCount <= count
392 int MPIAccess::recv(void* buffer, int count, MPI_Datatype datatype, int source, int &RequestId, int *OutCount)
394 int sts = MPI_SUCCESS ;
396 if ( OutCount != NULL )
400 _MessageIdent aMethodIdent = methodId( datatype ) ;
401 int MPItag = newRecvTag( datatype, source , aMethodIdent , false , RequestId ) ;
402 sts = _comm_interface.recv(buffer, count, datatype, source, MPItag,
403 *_intra_communicator , MPIStatus( RequestId ) ) ;
405 if ( sts == MPI_SUCCESS )
407 MPI_Datatype datatype = MPIDatatype( RequestId ) ;
408 _comm_interface.getCount(MPIStatus( RequestId ), datatype, &outcount ) ;
409 setMPIOutCount( RequestId , outcount ) ;
410 setMPICompleted( RequestId , true ) ;
411 deleteStatus( RequestId ) ;
413 if ( OutCount != NULL )
414 *OutCount = outcount ;
416 cout << "MPIAccess::Recv" << _my_rank << " RecvRequestId "
417 << RequestId << " count " << count << " source " << source
418 << " MPItag " << MPItag << endl ;
419 deleteRequest( RequestId ) ;
424 // Send in asynchronous mode count values of type datatype from buffer to target
425 // Returns RequestId identifier.
426 int MPIAccess::ISend(void* buffer, int count, MPI_Datatype datatype, int target, int &RequestId)
428 int sts = MPI_SUCCESS ;
432 _MessageIdent aMethodIdent = methodId( datatype ) ;
433 int MPItag = newSendTag( datatype, target , aMethodIdent , true , RequestId ) ;
434 if ( aMethodIdent == _message_time )
436 TimeMessage *aTimeMsg = (TimeMessage *) buffer ;
437 aTimeMsg->tag = MPItag ;
439 MPI_Request *aSendRequest = MPIRequest( RequestId ) ;
442 cout << "MPIAccess::ISend" << _my_rank << " ISendRequestId "
443 << RequestId << " count " << count << " target " << target
444 << " MPItag " << MPItag << endl ;
446 cout << "MPIAccess::ISend" << _my_rank << " time "
447 << ((TimeMessage *)buffer)->time << " " << ((TimeMessage *)buffer)->deltatime
450 sts = _comm_interface.Isend(buffer, count, datatype, target, MPItag,
451 *_intra_communicator , aSendRequest) ;
456 // Receive (read) in asynchronous mode count values of type datatype in buffer from source
457 // returns RequestId identifier.
458 int MPIAccess::IRecv(void* buffer, int count, MPI_Datatype datatype, int source, int &RequestId)
460 int sts = MPI_SUCCESS ;
464 _MessageIdent aMethodIdent = methodId( datatype ) ;
465 int MPItag = newRecvTag( datatype, source , aMethodIdent , true , RequestId ) ;
466 MPI_Request *aRecvRequest = MPIRequest( RequestId ) ;
469 cout << "MPIAccess::IRecv" << _my_rank << " IRecvRequestId "
470 << RequestId << " count " << count << " source " << source
471 << " MPItag " << MPItag << endl ;
473 cout << "MPIAccess::ISend" << _my_rank << " time "
474 << ((TimeMessage *)buffer)->time << " " << ((TimeMessage *)buffer)->deltatime
477 sts = _comm_interface.Irecv(buffer, count, datatype, source, MPItag,
478 *_intra_communicator , aRecvRequest) ;
483 // Perform a Send and a Recv in synchronous mode
484 int MPIAccess::sendRecv(void* sendbuf, int sendcount, MPI_Datatype sendtype,
485 int dest, int &SendRequestId,
486 void* recvbuf, int recvcount, MPI_Datatype recvtype,
487 int source, int &RecvRequestId, int *OutCount)
489 int sts = MPI_SUCCESS ;
493 sts = IRecv(recvbuf, recvcount, recvtype, source, RecvRequestId) ;
496 cout << "MPIAccess::SendRecv" << _my_rank << " IRecv RecvRequestId "
497 << RecvRequestId << endl ;
498 if ( sts == MPI_SUCCESS )
501 sts = send(sendbuf, sendcount, sendtype, dest, SendRequestId) ;
503 cout << "MPIAccess::SendRecv" << _my_rank << " Send SendRequestId "
504 << SendRequestId << endl ;
505 if ( sts == MPI_SUCCESS && recvcount )
507 sts = wait( RecvRequestId ) ;
508 outcount = MPIOutCount( RecvRequestId ) ;
510 cout << "MPIAccess::SendRecv" << _my_rank << " IRecv RecvRequestId "
511 << RecvRequestId << " outcount " << outcount << endl ;
514 if ( OutCount != NULL )
516 *OutCount = outcount ;
518 cout << "MPIAccess::SendRecv" << _my_rank << " *OutCount = " << *OutCount
521 deleteRequest( RecvRequestId ) ;
525 // Perform a Send and a Recv in asynchronous mode
526 int MPIAccess::ISendRecv(void* sendbuf, int sendcount, MPI_Datatype sendtype,
527 int dest, int &SendRequestId,
528 void* recvbuf, int recvcount, MPI_Datatype recvtype,
529 int source, int &RecvRequestId)
531 int sts = MPI_SUCCESS ;
535 sts = IRecv(recvbuf, recvcount, recvtype, source, RecvRequestId) ;
536 if ( sts == MPI_SUCCESS )
538 sts = ISend(sendbuf, sendcount, sendtype, dest, SendRequestId) ;
542 // Perform a wait of a Send or Recv asynchronous Request
543 // Do nothing for a synchronous Request
544 // Manage MPI_Request * and MPI_Status * structure
545 int MPIAccess::wait( int RequestId )
547 int status = MPI_SUCCESS ;
548 if ( !MPICompleted( RequestId ) )
550 if ( *MPIRequest( RequestId ) != MPI_REQUEST_NULL )
553 cout << "MPIAccess::Wait" << _my_rank << " -> wait( " << RequestId
554 << " ) MPIRequest " << MPIRequest( RequestId ) << " MPIStatus "
555 << MPIStatus( RequestId ) << " MPITag " << MPITag( RequestId )
556 << " MPIIsRecv " << MPIIsRecv( RequestId ) << endl ;
557 status = _comm_interface.wait(MPIRequest( RequestId ), MPIStatus( RequestId )) ;
562 cout << "MPIAccess::Wait" << _my_rank << " MPIRequest == MPI_REQUEST_NULL"
565 setMPICompleted( RequestId , true ) ;
566 if ( MPIIsRecv( RequestId ) && MPIStatus( RequestId ) )
568 MPI_Datatype datatype = MPIDatatype( RequestId ) ;
570 status = _comm_interface.getCount(MPIStatus( RequestId ), datatype,
572 if ( status == MPI_SUCCESS )
574 setMPIOutCount( RequestId , outcount ) ;
575 deleteStatus( RequestId ) ;
577 cout << "MPIAccess::Wait" << _my_rank << " RequestId " << RequestId
578 << "MPIIsRecv " << MPIIsRecv( RequestId ) << " outcount " << outcount
584 cout << "MPIAccess::Wait" << _my_rank << " MPIIsRecv "
585 << MPIIsRecv( RequestId ) << " outcount " << outcount << endl ;
591 cout << "MPIAccess::Wait" << _my_rank << " MPIIsRecv " << MPIIsRecv( RequestId )
592 << " MPIOutCount " << MPIOutCount( RequestId ) << endl ;
596 cout << "MPIAccess::Wait" << _my_rank << " RequestId " << RequestId
597 << " Request " << MPIRequest( RequestId )
598 << " Status " << MPIStatus( RequestId ) << " MPICompleted "
599 << MPICompleted( RequestId ) << " MPIOutCount " << MPIOutCount( RequestId )
604 // Perform a "test" of a Send or Recv asynchronous Request
605 // If the request is done, returns true in the flag argument
606 // If the request is not finished, returns false in the flag argument
607 // Do nothing for a synchronous Request
608 // Manage MPI_request * and MPI_status * structure
609 int MPIAccess::test(int RequestId, int &flag)
611 int status = MPI_SUCCESS ;
612 flag = MPICompleted( RequestId ) ;
614 cout << "MPIAccess::Test" << _my_rank << " flag " << flag ;
615 if ( MPIIsRecv( RequestId ) )
626 cout << "Request" << RequestId << " " << MPIRequest( RequestId )
627 << " Status " << MPIStatus( RequestId ) << endl ;
630 if ( *MPIRequest( RequestId ) != MPI_REQUEST_NULL )
633 cout << "MPIAccess::Test" << _my_rank << " -> test( " << RequestId
634 << " ) MPIRequest " << MPIRequest( RequestId )
635 << " MPIStatus " << MPIStatus( RequestId )
636 << " MPITag " << MPITag( RequestId )
637 << " MPIIsRecv " << MPIIsRecv( RequestId ) << endl ;
638 status = _comm_interface.test(MPIRequest( RequestId ), &flag,
639 MPIStatus( RequestId )) ;
644 cout << "MPIAccess::Test" << _my_rank << " MPIRequest == MPI_REQUEST_NULL"
649 setMPICompleted( RequestId , true ) ;
650 if ( MPIIsRecv( RequestId ) && MPIStatus( RequestId ) )
653 MPI_Datatype datatype = MPIDatatype( RequestId ) ;
654 status = _comm_interface.getCount( MPIStatus( RequestId ), datatype,
656 if ( status == MPI_SUCCESS )
658 setMPIOutCount( RequestId , outcount ) ;
659 deleteStatus( RequestId ) ;
661 cout << "MPIAccess::Test" << _my_rank << " MPIIsRecv "
662 << MPIIsRecv( RequestId ) << " outcount " << outcount << endl ;
667 cout << "MPIAccess::Test" << _my_rank << " MPIIsRecv "
668 << MPIIsRecv( RequestId ) << " outcount " << outcount << endl ;
674 cout << "MPIAccess::Test" << _my_rank << " MPIIsRecv "
675 << MPIIsRecv( RequestId ) << " MPIOutCount "
676 << MPIOutCount( RequestId ) << endl ;
681 cout << "MPIAccess::Test" << _my_rank << " RequestId " << RequestId
682 << " flag " << flag << " MPICompleted " << MPICompleted( RequestId )
683 << " MPIOutCount " << MPIOutCount( RequestId ) << endl ;
687 int MPIAccess::waitAny(int count, int *array_of_RequestIds, int &RequestId)
689 int status = MPI_ERR_OTHER ;
691 cout << "MPIAccess::WaitAny not yet implemented" << endl ;
695 int MPIAccess::testAny(int count, int *array_of_RequestIds, int &RequestId, int &flag)
697 int status = MPI_ERR_OTHER ;
700 cout << "MPIAccess::TestAny not yet implemented" << endl ;
704 // Perform a wait of each Send or Recv asynchronous Request of the array
705 // array_of_RequestIds of size "count".
706 // That array may be filled with a call to SendRequestIdsSize or RecvRequestIdsSize
707 // Do nothing for a synchronous Request
708 // Manage MPI_Request * and MPI_Status * structure
709 int MPIAccess::waitAll(int count, int *array_of_RequestIds)
712 cout << "WaitAll" << _my_rank << " : count " << count << endl ;
714 int retstatus = MPI_SUCCESS ;
716 for ( i = 0 ; i < count ; i++ )
719 cout << "WaitAll" << _my_rank << " " << i << " -> Wait( "
720 << array_of_RequestIds[i] << " )" << endl ;
721 status = wait( array_of_RequestIds[i] ) ;
722 if ( status != MPI_SUCCESS )
726 cout << "EndWaitAll" << _my_rank << endl ;
730 // Perform a "test" of each Send or Recv asynchronous Request of the array
731 // array_of_RequestIds of size "count".
732 // That array may be filled with a call to SendRequestIdsSize or RecvRequestIdsSize
733 // If all requests are done, returns true in the flag argument
734 // If all requests are not finished, returns false in the flag argument
735 // Do nothing for a synchronous Request
736 // Manage MPI_Request * and MPI_Status * structure
737 int MPIAccess::testAll(int count, int *array_of_RequestIds, int &flag)
740 cout << "TestAll" << _my_rank << " : count " << count << endl ;
742 int retstatus = MPI_SUCCESS ;
743 bool retflag = true ;
745 for ( i = 0 ; i < count ; i++ )
747 status = test( array_of_RequestIds[i] , flag ) ;
748 retflag = retflag && (flag != 0) ;
749 if ( status != MPI_SUCCESS )
754 cout << "EndTestAll" << _my_rank << endl ;
758 int MPIAccess::waitSome(int count, int *array_of_RequestIds, int outcount,
759 int *outarray_of_RequestIds)
761 int status = MPI_ERR_OTHER ;
762 cout << "MPIAccess::WaitSome not yet implemented" << endl ;
766 int MPIAccess::testSome(int count, int *array_of_RequestIds, int outcounts,
767 int *outarray_of_RequestIds)
769 int status = MPI_ERR_OTHER ;
770 cout << "MPIAccess::TestSome not yet implemented" << endl ;
774 // Probe checks if a message is available for read from FromSource rank.
775 // Returns the corresponding source, MPITag, datatype and outcount
776 // Probe is a blocking call which wait until a message is available
777 int MPIAccess::probe(int FromSource, int &source, int &MPITag,
778 MPI_Datatype &myDatatype, int &outcount)
780 MPI_Status aMPIStatus ;
781 int sts = _comm_interface.probe( FromSource, MPI_ANY_TAG,
782 *_intra_communicator , &aMPIStatus ) ;
783 if ( sts == MPI_SUCCESS )
785 source = aMPIStatus.MPI_SOURCE ;
786 MPITag = aMPIStatus.MPI_TAG ;
787 int MethodId = (MPITag % MODULO_TAG) ;
788 myDatatype = datatype( (ParaMEDMEM::_MessageIdent) MethodId ) ;
789 _comm_interface.getCount(&aMPIStatus, myDatatype, &outcount ) ;
791 cout << "MPIAccess::Probe" << _my_rank << " FromSource " << FromSource
792 << " source " << source << " MPITag " << MPITag << " MethodId "
793 << MethodId << " datatype " << myDatatype << " outcount " << outcount
806 // IProbe checks if a message is available for read from FromSource rank.
807 // If there is a message available, returns the corresponding source,
808 // MPITag, datatype and outcount with flag = true
809 // If not, returns flag = false
810 int MPIAccess::IProbe(int FromSource, int &source, int &MPITag,
811 MPI_Datatype &myDataType, int &outcount, int &flag)
813 MPI_Status aMPIStatus ;
814 int sts = _comm_interface.Iprobe( FromSource, MPI_ANY_TAG,
815 *_intra_communicator , &flag,
817 if ( sts == MPI_SUCCESS && flag )
819 source = aMPIStatus.MPI_SOURCE ;
820 MPITag = aMPIStatus.MPI_TAG ;
821 int MethodId = (MPITag % MODULO_TAG) ;
822 myDataType = datatype( (ParaMEDMEM::_MessageIdent) MethodId ) ;
823 _comm_interface.getCount(&aMPIStatus, myDataType, &outcount ) ;
825 cout << "MPIAccess::IProbe" << _my_rank << " FromSource " << FromSource
826 << " source " << source << " MPITag " << MPITag << " MethodId "
827 << MethodId << " datatype " << myDataType << " outcount " << outcount
828 << " flag " << flag << endl ;
840 // Cancel concerns a "posted" asynchronous IRecv
841 // Returns flag = true if the receiving request was successfully canceled
842 // Returns flag = false if the receiving request was finished but not canceled
843 // Use cancel, wait and test_cancelled of the MPI API
844 int MPIAccess::cancel( int RecvRequestId, int &flag )
847 int sts = _comm_interface.cancel( MPIRequest( RecvRequestId ) ) ;
848 if ( sts == MPI_SUCCESS )
850 sts = _comm_interface.wait( MPIRequest( RecvRequestId ) ,
851 MPIStatus( RecvRequestId ) ) ;
852 if ( sts == MPI_SUCCESS )
853 sts = _comm_interface.testCancelled( MPIStatus( RecvRequestId ) , &flag ) ;
858 // Cancel concerns a "pending" receiving message (without IRecv "posted")
859 // Returns flag = true if the message was successfully canceled
860 // Returns flag = false if the receiving request was finished but not canceled
861 // Use Irecv, cancel, wait and test_cancelled of the MPI API
862 int MPIAccess::cancel( int source, int theMPITag, MPI_Datatype datatype, int outcount, int &flag )
867 sts = MPI_Type_extent( datatype , &extent ) ;
868 if ( sts == MPI_SUCCESS )
870 void * recvbuf = malloc( extent*outcount ) ;
871 MPI_Request aRecvRequest ;
873 cout << "MPIAccess::Cancel" << _my_rank << " Irecv extent " << extent
874 << " datatype " << datatype << " source " << source << " theMPITag "
875 << theMPITag << endl ;
876 sts = _comm_interface.Irecv( recvbuf, outcount, datatype, source, theMPITag,
877 *_intra_communicator , &aRecvRequest ) ;
878 if ( sts == MPI_SUCCESS )
880 sts = _comm_interface.cancel( &aRecvRequest ) ;
882 cout << "MPIAccess::Cancel" << _my_rank << " theMPITag " << theMPITag
883 << " cancel done" << endl ;
884 if ( sts == MPI_SUCCESS )
888 cout << "MPIAccess::Cancel" << _my_rank << " wait" << endl ;
889 sts = _comm_interface.wait( &aRecvRequest , &aStatus ) ;
890 if ( sts == MPI_SUCCESS )
893 cout << "MPIAccess::Cancel" << _my_rank << " test_cancelled" << endl ;
894 sts = _comm_interface.testCancelled( &aStatus , &flag ) ;
898 if ( _trace && datatype == timeType() )
899 cout << "MPIAccess::Cancel" << _my_rank << " time "
900 << ((TimeMessage *) recvbuf)->time << " "
901 << ((TimeMessage *) recvbuf)->deltatime << endl ;
905 cout << "MPIAccess::Cancel" << _my_rank << " flag " << flag << endl ;
910 // CancelAll concerns all "pending" receiving message (without IRecv "posted")
911 // CancelAll use IProbe and Cancel (see obove)
912 int MPIAccess::cancelAll()
914 int sts = MPI_SUCCESS ;
918 MPI_Datatype datatype ;
921 for ( target = 0 ; target < _processor_group_size ; target++ )
923 sts = IProbe(target, source, MPITag, datatype, outcount, flag) ;
924 if ( sts == MPI_SUCCESS && flag )
926 sts = cancel(source, MPITag, datatype, outcount, flag) ;
928 cout << "MPIAccess::CancelAll" << _my_rank << " source " << source
929 << " MPITag " << MPITag << " datatype " << datatype
930 << " outcount " << outcount << " Cancel flag " << flag << endl ;
931 if ( sts != MPI_SUCCESS )
934 else if ( sts != MPI_SUCCESS )
940 // Same as barrier of MPI API
941 int MPIAccess::barrier()
943 int status = _comm_interface.barrier( *_intra_communicator ) ;
947 // Same as Error_string of MPI API
948 int MPIAccess::errorString(int errorcode, char *string, int *resultlen) const
950 return _comm_interface.errorString( errorcode, string, resultlen) ;
953 // Returns source, tag, error and outcount corresponding to receiving RequestId
954 // By default the corresponding structure of RequestId is deleted
955 int MPIAccess::status(int RequestId, int &source, int &tag, int &error,
956 int &outcount, bool keepRequestStruct)
958 MPI_Status *myStatus = MPIStatus( RequestId ) ;
960 cout << "MPIAccess::status" << _my_rank << " RequestId " << RequestId
961 << " status " << myStatus << endl ;
962 if ( myStatus != NULL && MPIAsynchronous( RequestId ) &&
963 MPICompleted( RequestId ) )
965 if ( MPIIsRecv( RequestId ) )
967 source = myStatus->MPI_SOURCE ;
968 tag = myStatus->MPI_TAG ;
969 error = myStatus->MPI_ERROR ;
970 MPI_Datatype datatype = MPIDatatype( RequestId ) ;
971 _comm_interface.getCount(myStatus, datatype, &outcount ) ;
973 cout << "MPIAccess::status" << _my_rank << " RequestId " << RequestId
974 << " status " << myStatus << " outcount " << outcount << endl ;
975 setMPIOutCount( RequestId , outcount ) ;
979 source = MPITarget( RequestId ) ;
980 tag = MPITag( RequestId ) ;
982 outcount = MPIOutCount( RequestId ) ;
984 if ( !keepRequestStruct )
985 deleteRequest( RequestId ) ;
990 source = MPITarget( RequestId ) ;
991 tag = MPITag( RequestId ) ;
993 outcount = MPIOutCount( RequestId ) ;
998 int MPIAccess::requestFree( MPI_Request *request )
1000 return _comm_interface.requestFree( request ) ;
1003 // Print all informations of all known requests for debugging purpose
1004 void MPIAccess::check() const
1007 map< int , RequestStruct * >::const_iterator MapOfRequestStructiterator ;
1008 cout << "MPIAccess::Check" << _my_rank << "_map_of_request_struct_size "
1009 << _map_of_request_struct.size() << endl ;
1010 for ( MapOfRequestStructiterator = _map_of_request_struct.begin() ;
1011 MapOfRequestStructiterator != _map_of_request_struct.end() ;
1012 MapOfRequestStructiterator++ )
1014 if ( MapOfRequestStructiterator->second != NULL )
1016 cout << " Check" << _my_rank << " " << i << ". Request"
1017 << MapOfRequestStructiterator->first << "-->" ;
1018 if ( (MapOfRequestStructiterator->second)->MPIAsynchronous )
1020 if ( (MapOfRequestStructiterator->second)->MPIIsRecv )
1021 cout << "Recv from " ;
1023 cout << "Send to " ;
1024 cout << (MapOfRequestStructiterator->second)->MPITarget
1025 << " MPITag " << (MapOfRequestStructiterator->second)->MPITag
1026 << " DataType " << (MapOfRequestStructiterator->second)->MPIDatatype
1027 << " Request " << (MapOfRequestStructiterator->second)->MPIRequest
1028 << " Status " << (MapOfRequestStructiterator->second)->MPIStatus
1029 << " Completed " << (MapOfRequestStructiterator->second)->MPICompleted
1036 // Returns the MPI size of a TimeMessage
1037 MPI_Aint MPIAccess::timeExtent() const
1040 MPI_Type_extent( _MPI_TIME , &aextent ) ;
1044 // Returns the MPI size of a MPI_INT
1045 MPI_Aint MPIAccess::intExtent() const
1048 MPI_Type_extent( MPI_INT , &aextent ) ;
1052 // Returns the MPI size of a MPI_DOUBLE
1053 MPI_Aint MPIAccess::doubleExtent() const
1056 MPI_Type_extent( MPI_DOUBLE , &aextent ) ;
1060 // Outputs fields of a TimeMessage structure
1061 ostream & operator<< (ostream & f ,const TimeMessage & aTimeMsg )
1063 f << " time " << aTimeMsg.time << " deltatime " << aTimeMsg.deltatime
1064 << " tag " << aTimeMsg.tag ;
1068 // Outputs the DataType coded in a Tag
1069 ostream & operator<< (ostream & f ,const _MessageIdent & methodtype )
1073 case _message_time :
1074 f << " MethodTime ";
1079 case _message_double :
1080 f << " MPI_DOUBLE ";
1083 f << " UnknownMethodType ";