1 // Copyright (C) 2007-2008 CEA/DEN, EDF R&D
3 // This library is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU Lesser General Public
5 // License as published by the Free Software Foundation; either
6 // version 2.1 of the License.
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 // Lesser General Public License for more details.
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
19 #include "MPIAccess.hxx"
20 #include "InterpolationUtils.hxx"
28 /*! \defgroup mpi_access MPIAccess
29 Class \a MPIAccess is the gateway to the MPI library.
30 It is a helper class that gathers the calls to the MPI
31 library that are made in the ParaMEDMEM library. This gathering
32 allows easier gathering of information about the communication
33 in the library. With MPIAccess, tags are managed automatically
34 and asynchronous operations are easier.
36 It is typically called after the MPI_Init() call in a program. It is afterwards passed as a parameter to the constructors of ParaMEDMEM objects so that they access the MPI library via the MPIAccess.
38 As an example, the following code initializes a processor group made of the zero processor.
41 #include "MPIAccess.hxx"
42 #include "ProcessorGroup.hxx"
44 int main(int argc, char** argv)
47 MPI_Init(&argc, &argv);
48 ParaMEDMEM::CommInterface comm_interface;
50 //setting up a processor group with proc 0
53 ParaMEDMEM::ProcessorGroup group(procs, comm_interface);
55 ParaMEDMEM::MPIAccess mpi_access(group);
64 /*! Creates a MPIAccess that is based on the processors included in \a ProcessorGroup.
65 This class may be called for easier use of MPI API.
67 \param ProcessorGroup MPIProcessorGroup object giving access to group management
68 \param BaseTag and MaxTag define the range of tags to be used.
69 Tags are managed by MPIAccess. They are cyclically incremented.
70 When there is a Send or a Receive operation there is a new RequestId tag returned
71 to the caller. That RequestId may be used to manage the operation Wait, Check of
72 status etc... The MPITag internally managed by MPIAccess is used as "tag" argument
76 MPIAccess::MPIAccess(MPIProcessorGroup * ProcessorGroup, int BaseTag, int MaxTag) :
77 _comm_interface( ProcessorGroup->getCommInterface() ) ,
78 _intra_communicator( ProcessorGroup->getComm() )
82 //MPI_Attr_get does not run with _IntraCommunicator ???
83 //MPI_Attr_get(*_IntraCommunicator,MPI_TAG_UB,&mpitagub,&flag) ;
84 MPI_Attr_get(MPI_COMM_WORLD,MPI_TAG_UB,&mpitagub,&flag) ;
85 mpitagub=abs(mpitagub);
87 BaseTag = (BaseTag/MODULO_TAG)*MODULO_TAG ;
89 MaxTag = (mpitagub/MODULO_TAG-1)*MODULO_TAG ;
90 MPI_Comm_rank( *_intra_communicator, &_my_rank ) ;
91 cout << "MPIAccess::MPIAccess" << _my_rank << " this " << this << " BaseTag " << BaseTag
92 << " MaxTag " << MaxTag << " mpitagub " << mpitagub << " (minimum 32767) "
93 << " flag " << flag << endl ;
94 if ( !flag | (BaseTag < 0) | (BaseTag >= MaxTag) | (MaxTag > mpitagub) )
95 throw INTERP_KERNEL::Exception("wrong call to MPIAccess constructor");
97 _processor_group = ProcessorGroup ;
98 _processor_group_size = _processor_group->size() ;
101 cout << "MPIAccess::MPIAccess" << _my_rank << " _processor_group_size "
102 << _processor_group_size << endl ;
105 _max_request = std::numeric_limits<int>::max() ;
106 _request = _base_request ;
108 _base_MPI_tag = BaseTag ;
109 _max_MPI_tag = MaxTag ;
111 _send_request = new int[ _processor_group_size ] ;
112 _recv_request = new int[ _processor_group_size ] ;
114 _send_requests.resize( _processor_group_size ) ;
115 _recv_requests.resize( _processor_group_size ) ;
117 _send_MPI_tag = new int[ _processor_group_size ] ;
118 _recv_MPI_Tag = new int[ _processor_group_size ] ;
120 for (i = 0 ; i < _processor_group_size ; i++ )
122 _send_request[ i ] = _max_request ;
123 _recv_request[ i ] = _max_request ;
124 _send_requests[ i ].resize(0) ;
125 _recv_requests[ i ].resize(0) ;
126 _send_MPI_tag[ i ] = _max_MPI_tag ;
127 _recv_MPI_Tag[ i ] = _max_MPI_tag ;
129 MPI_Datatype array_of_types[3] ;
130 array_of_types[0] = MPI_DOUBLE ;
131 array_of_types[1] = MPI_DOUBLE ;
132 array_of_types[2] = MPI_INT ;
133 int array_of_blocklengths[3] ;
134 array_of_blocklengths[0] = 1 ;
135 array_of_blocklengths[1] = 1 ;
136 array_of_blocklengths[2] = 1 ;
137 MPI_Aint array_of_displacements[3] ;
138 array_of_displacements[0] = 0 ;
139 array_of_displacements[1] = sizeof(double) ;
140 array_of_displacements[2] = 2*sizeof(double) ;
141 MPI_Type_struct(3, array_of_blocklengths, array_of_displacements,
142 array_of_types, &_MPI_TIME) ;
143 MPI_Type_commit(&_MPI_TIME) ;
146 MPIAccess::~MPIAccess()
148 cout << "MPIAccess::~MPIAccess" << _my_rank << " this " << this << endl ;
149 delete [] _send_request ;
150 delete [] _recv_request ;
151 delete [] _send_MPI_tag ;
152 delete [] _recv_MPI_Tag ;
153 MPI_Type_free(&_MPI_TIME) ;
154 cout << "End of MPIAccess::~MPIAccess" << _my_rank << " this " << this << endl ;
158 MPIAccess and "RequestIds" :
159 ============================
161 . WARNING : In the specification document, the distinction
162 between "MPITags" and "RequestIds" is not clear. "MPITags"
163 are arguments of calls to MPI. "RequestIds" does not concern
164 calls to MPI. "RequestIds" are named "tag"as arguments in/out
165 in the MPIAccess API in the specification documentation.
166 But in the implementation we have the right name RequestId (or
167 RecvRequestId/SendRequestId).
169 . When we have a MPI write/read request via MPIAccess, we get
170 an identifier "RequestId".
171 That identifier matches a structure RequestStruct of
172 MPIAccess. The access to that structure is done with the map
173 "_MapOfRequestStruct".
174 That structure RequestStruct give the possibility to manage
175 the structures MPI_Request and MPI_Status * of MPI. It give
176 also the possibility to get informations about that request :
177 target, send/recv, tag, [a]synchronous, type, outcount.
179 . That identifier is used to control an asynchronous request
180 via MPIAccess : Wait, Test, Probe, etc...
182 . In practise "RequestId" is simply an integer fo the interval
183 [0 , 2**32-1]. There is only one such a cyclic for
184 [I]Sends and [I]Recvs.
186 . That "RequestIds" and their associated structures give an easy
187 way to manage asynchronous communications.
188 For example we have mpi_access->Wait( int RequestId ) instead of
189 MPI_Wait(MPI_Request *request, MPI_Status *status).
191 . The API of MPIAccess may give the "SendRequestIds" of a "target",
192 the "RecvRequestIds" from a "source" or the "SendRequestIds" of
193 all "targets" or the "RecvRequestIds" of all "sources".
194 That avoid to manage them in Presentation-ParaMEDMEM.
197 int MPIAccess::newRequest( MPI_Datatype datatype, int tag , int destsourcerank ,
198 bool fromsourcerank , bool asynchronous )
200 RequestStruct *mpiaccessstruct = new RequestStruct;
201 mpiaccessstruct->MPITag = tag ;
202 mpiaccessstruct->MPIDatatype = datatype ;
203 mpiaccessstruct->MPITarget = destsourcerank ;
204 mpiaccessstruct->MPIIsRecv = fromsourcerank ;
205 MPI_Status *aStatus = new MPI_Status ;
206 mpiaccessstruct->MPIStatus = aStatus ;
207 mpiaccessstruct->MPIAsynchronous = asynchronous ;
208 mpiaccessstruct->MPICompleted = !asynchronous ;
209 mpiaccessstruct->MPIOutCount = -1 ;
212 mpiaccessstruct->MPIRequest = MPI_REQUEST_NULL ;
213 mpiaccessstruct->MPIStatus->MPI_SOURCE = destsourcerank ;
214 mpiaccessstruct->MPIStatus->MPI_TAG = tag ;
215 mpiaccessstruct->MPIStatus->MPI_ERROR = MPI_SUCCESS ;
217 if ( _request == _max_request )
218 _request = _base_request ;
220 _map_of_request_struct[_request] = mpiaccessstruct ;
221 if ( fromsourcerank )
222 _recv_request[ destsourcerank ] = _request;
224 _send_request[ destsourcerank ] = _request;
226 cout << "NewRequest" << _my_rank << "( " << _request << " ) "
227 << mpiaccessstruct << endl ;
232 MPIAccess and "tags" (or "MPITags") :
233 =====================================
235 . The constructor give the possibility to choose an interval of
236 tags to use : [BaseTag , MaxTag].
237 The default is [ 0 , MPI_TAG_UB], MPI_TAG_UB being the maximum
238 value in an implementation of MPI (minimum 32767 = 2**15-1).
239 On awa with the implementation lam MPI_TAG_UB value is
240 7353944. The norma MPI specify that value is the same in all
241 processes started by mpirun.
242 In the case of the use of the same IntraCommunicator in a process
243 for several distinct data flows (or for several IntraCommunicators
244 with common processes), that permits to avoid ambibuity
247 . In MPIAccess the tags have two parts (#define MODULO_TAG 10) :
248 + The last decimal digit decimal correspond to MPI_DataType ( 1 for
249 TimeMessages, 2 for MPI_INT and 3 for MPI_DOUBLE)
250 + The value of other digits correspond to a circular numero for each
252 + A TimeMessage and the associated DataMessage have the same numero
253 (but the types are different and the tags also).
255 . For a Send of a message from a process "source" to a process
256 "target", we have _send_MPI_tag[target] in the process
257 source (it contains the last "tag" used for the Send of a pour l'envoi de
258 message to the process target).
259 And in the process "target" which receive that message, we have
260 _recv_MPI_Tag[source] (it contains the last "tag" used for the Recv
261 of messages from the process source).
262 Naturally in the MPI norma the values of that tags must be the same.
264 int MPIAccess::newSendTag( MPI_Datatype datatype, int destrank , int method ,
265 bool asynchronous, int &RequestId )
268 tag = incrTag( _send_MPI_tag[destrank] ) ;
269 tag = valTag( tag, method ) ;
270 _send_MPI_tag[ destrank ] = tag ;
271 RequestId = newRequest( datatype, tag, destrank , false , asynchronous ) ;
272 _send_request[ destrank ] = RequestId ;
273 _send_requests[ destrank ].push_back( RequestId ) ;
277 int MPIAccess::newRecvTag( MPI_Datatype datatype, int sourcerank , int method ,
278 bool asynchronous, int &RequestId )
281 tag = incrTag( _recv_MPI_Tag[sourcerank] ) ;
282 tag = valTag( tag, method ) ;
283 _recv_MPI_Tag[ sourcerank ] = tag ;
284 RequestId = newRequest( datatype, tag , sourcerank , true , asynchronous ) ;
285 _recv_request[ sourcerank ] = RequestId ;
286 _recv_requests[ sourcerank ].push_back( RequestId ) ;
290 // Returns the number of all SendRequestIds that may be used to allocate
291 // ArrayOfSendRequests for the call to SendRequestIds
292 int MPIAccess::sendRequestIdsSize()
295 for (int i = 0 ; i < _processor_group_size ; i++ )
296 size += _send_requests[ i ].size() ;
300 // Returns in ArrayOfSendRequests with the dimension "size" all the
302 int MPIAccess::sendRequestIds(int size, int *ArrayOfSendRequests)
306 for ( destrank = 0 ; destrank < _processor_group_size ; destrank++ )
308 list< int >::const_iterator iter ;
309 for (iter = _send_requests[ destrank ].begin() ; iter != _send_requests[destrank].end() ; iter++ )
310 ArrayOfSendRequests[i++] = *iter ;
315 // Returns the number of all RecvRequestIds that may be used to allocate
316 // ArrayOfRecvRequests for the call to RecvRequestIds
317 int MPIAccess::recvRequestIdsSize()
320 for (int i = 0 ; i < _processor_group_size ; i++ )
321 size += _recv_requests[ i ].size() ;
325 // Returns in ArrayOfRecvRequests with the dimension "size" all the
327 int MPIAccess::recvRequestIds(int size, int *ArrayOfRecvRequests)
331 for ( sourcerank = 0 ; sourcerank < _processor_group_size ; sourcerank++ )
333 list< int >::const_iterator iter ;
334 for (iter = _recv_requests[ sourcerank ].begin() ; iter != _recv_requests[sourcerank].end() ; iter++ )
335 ArrayOfRecvRequests[i++] = *iter ;
340 // Returns in ArrayOfSendRequests with the dimension "size" all the
341 // SendRequestIds to a destination rank
342 int MPIAccess::sendRequestIds(int destrank, int size, int *ArrayOfSendRequests)
344 if (size < _send_requests[destrank].size() )
345 throw INTERP_KERNEL::Exception("wrong call to MPIAccess::SendRequestIds");
347 list< int >::const_iterator iter ;
348 for (iter = _send_requests[ destrank ].begin() ; iter != _send_requests[destrank].end() ; iter++ )
349 ArrayOfSendRequests[i++] = *iter ;
350 return _send_requests[destrank].size() ;
353 // Returns in ArrayOfRecvRequests with the dimension "size" all the
354 // RecvRequestIds from a sourcerank
355 int MPIAccess::recvRequestIds(int sourcerank, int size, int *ArrayOfRecvRequests)
357 if (size < _recv_requests[sourcerank].size() )
358 throw INTERP_KERNEL::Exception("wrong call to MPIAccess::RecvRequestIds");
360 list< int >::const_iterator iter ;
361 _recv_requests[ sourcerank ] ;
362 for (iter = _recv_requests[ sourcerank ].begin() ; iter != _recv_requests[sourcerank].end() ; iter++ )
363 ArrayOfRecvRequests[i++] = *iter ;
364 return _recv_requests[sourcerank].size() ;
367 // Send in synchronous mode count values of type datatype from buffer to target
368 // (returns RequestId identifier even if the corresponding structure is deleted :
369 // it is only in order to have the same signature as the asynchronous mode)
370 int MPIAccess::send(void* buffer, int count, MPI_Datatype datatype, int target, int &RequestId)
372 int sts = MPI_SUCCESS ;
376 _MessageIdent aMethodIdent = methodId( datatype ) ;
377 int MPItag = newSendTag( datatype, target , aMethodIdent , false , RequestId ) ;
378 if ( aMethodIdent == _message_time )
380 TimeMessage *aTimeMsg = (TimeMessage *) buffer ;
381 aTimeMsg->tag = MPItag ;
383 deleteRequest( RequestId ) ;
384 sts = _comm_interface.send(buffer, count, datatype, target, MPItag,
385 *_intra_communicator ) ;
387 cout << "MPIAccess::Send" << _my_rank << " SendRequestId "
388 << RequestId << " count " << count << " target " << target
389 << " MPItag " << MPItag << endl ;
394 // Receive (read) in synchronous mode count values of type datatype in buffer from source
395 // (returns RequestId identifier even if the corresponding structure is deleted :
396 // it is only in order to have the same signature as the asynchronous mode)
397 // The output argument OutCount is optionnal : *OutCount <= count
398 int MPIAccess::recv(void* buffer, int count, MPI_Datatype datatype, int source, int &RequestId, int *OutCount)
400 int sts = MPI_SUCCESS ;
402 if ( OutCount != NULL )
406 _MessageIdent aMethodIdent = methodId( datatype ) ;
407 int MPItag = newRecvTag( datatype, source , aMethodIdent , false , RequestId ) ;
408 sts = _comm_interface.recv(buffer, count, datatype, source, MPItag,
409 *_intra_communicator , MPIStatus( RequestId ) ) ;
411 if ( sts == MPI_SUCCESS )
413 MPI_Datatype datatype = MPIDatatype( RequestId ) ;
414 _comm_interface.getCount(MPIStatus( RequestId ), datatype, &outcount ) ;
415 setMPIOutCount( RequestId , outcount ) ;
416 setMPICompleted( RequestId , true ) ;
417 deleteStatus( RequestId ) ;
419 if ( OutCount != NULL )
420 *OutCount = outcount ;
422 cout << "MPIAccess::Recv" << _my_rank << " RecvRequestId "
423 << RequestId << " count " << count << " source " << source
424 << " MPItag " << MPItag << endl ;
425 deleteRequest( RequestId ) ;
430 // Send in asynchronous mode count values of type datatype from buffer to target
431 // Returns RequestId identifier.
432 int MPIAccess::ISend(void* buffer, int count, MPI_Datatype datatype, int target, int &RequestId)
434 int sts = MPI_SUCCESS ;
438 _MessageIdent aMethodIdent = methodId( datatype ) ;
439 int MPItag = newSendTag( datatype, target , aMethodIdent , true , RequestId ) ;
440 if ( aMethodIdent == _message_time )
442 TimeMessage *aTimeMsg = (TimeMessage *) buffer ;
443 aTimeMsg->tag = MPItag ;
445 MPI_Request *aSendRequest = MPIRequest( RequestId ) ;
448 cout << "MPIAccess::ISend" << _my_rank << " ISendRequestId "
449 << RequestId << " count " << count << " target " << target
450 << " MPItag " << MPItag << endl ;
452 cout << "MPIAccess::ISend" << _my_rank << " time "
453 << ((TimeMessage *)buffer)->time << " " << ((TimeMessage *)buffer)->deltatime
456 sts = _comm_interface.Isend(buffer, count, datatype, target, MPItag,
457 *_intra_communicator , aSendRequest) ;
462 // Receive (read) in asynchronous mode count values of type datatype in buffer from source
463 // returns RequestId identifier.
464 int MPIAccess::IRecv(void* buffer, int count, MPI_Datatype datatype, int source, int &RequestId)
466 int sts = MPI_SUCCESS ;
470 _MessageIdent aMethodIdent = methodId( datatype ) ;
471 int MPItag = newRecvTag( datatype, source , aMethodIdent , true , RequestId ) ;
472 MPI_Request *aRecvRequest = MPIRequest( RequestId ) ;
475 cout << "MPIAccess::IRecv" << _my_rank << " IRecvRequestId "
476 << RequestId << " count " << count << " source " << source
477 << " MPItag " << MPItag << endl ;
479 cout << "MPIAccess::ISend" << _my_rank << " time "
480 << ((TimeMessage *)buffer)->time << " " << ((TimeMessage *)buffer)->deltatime
483 sts = _comm_interface.Irecv(buffer, count, datatype, source, MPItag,
484 *_intra_communicator , aRecvRequest) ;
489 // Perform a Send and a Recv in synchronous mode
490 int MPIAccess::sendRecv(void* sendbuf, int sendcount, MPI_Datatype sendtype,
491 int dest, int &SendRequestId,
492 void* recvbuf, int recvcount, MPI_Datatype recvtype,
493 int source, int &RecvRequestId, int *OutCount)
495 int sts = MPI_SUCCESS ;
499 sts = IRecv(recvbuf, recvcount, recvtype, source, RecvRequestId) ;
502 cout << "MPIAccess::SendRecv" << _my_rank << " IRecv RecvRequestId "
503 << RecvRequestId << endl ;
504 if ( sts == MPI_SUCCESS )
507 sts = send(sendbuf, sendcount, sendtype, dest, SendRequestId) ;
509 cout << "MPIAccess::SendRecv" << _my_rank << " Send SendRequestId "
510 << SendRequestId << endl ;
511 if ( sts == MPI_SUCCESS && recvcount )
513 sts = wait( RecvRequestId ) ;
514 outcount = MPIOutCount( RecvRequestId ) ;
516 cout << "MPIAccess::SendRecv" << _my_rank << " IRecv RecvRequestId "
517 << RecvRequestId << " outcount " << outcount << endl ;
520 if ( OutCount != NULL )
522 *OutCount = outcount ;
524 cout << "MPIAccess::SendRecv" << _my_rank << " *OutCount = " << *OutCount
527 deleteRequest( RecvRequestId ) ;
531 // Perform a Send and a Recv in asynchronous mode
532 int MPIAccess::ISendRecv(void* sendbuf, int sendcount, MPI_Datatype sendtype,
533 int dest, int &SendRequestId,
534 void* recvbuf, int recvcount, MPI_Datatype recvtype,
535 int source, int &RecvRequestId)
537 int sts = MPI_SUCCESS ;
541 sts = IRecv(recvbuf, recvcount, recvtype, source, RecvRequestId) ;
542 if ( sts == MPI_SUCCESS )
544 sts = ISend(sendbuf, sendcount, sendtype, dest, SendRequestId) ;
548 // Perform a wait of a Send or Recv asynchronous Request
549 // Do nothing for a synchronous Request
550 // Manage MPI_Request * and MPI_Status * structure
551 int MPIAccess::wait( int RequestId )
553 int status = MPI_SUCCESS ;
554 if ( !MPICompleted( RequestId ) )
556 if ( *MPIRequest( RequestId ) != MPI_REQUEST_NULL )
559 cout << "MPIAccess::Wait" << _my_rank << " -> wait( " << RequestId
560 << " ) MPIRequest " << MPIRequest( RequestId ) << " MPIStatus "
561 << MPIStatus( RequestId ) << " MPITag " << MPITag( RequestId )
562 << " MPIIsRecv " << MPIIsRecv( RequestId ) << endl ;
563 status = _comm_interface.wait(MPIRequest( RequestId ), MPIStatus( RequestId )) ;
568 cout << "MPIAccess::Wait" << _my_rank << " MPIRequest == MPI_REQUEST_NULL"
571 setMPICompleted( RequestId , true ) ;
572 if ( MPIIsRecv( RequestId ) && MPIStatus( RequestId ) )
574 MPI_Datatype datatype = MPIDatatype( RequestId ) ;
576 status = _comm_interface.getCount(MPIStatus( RequestId ), datatype,
578 if ( status == MPI_SUCCESS )
580 setMPIOutCount( RequestId , outcount ) ;
581 deleteStatus( RequestId ) ;
583 cout << "MPIAccess::Wait" << _my_rank << " RequestId " << RequestId
584 << "MPIIsRecv " << MPIIsRecv( RequestId ) << " outcount " << outcount
590 cout << "MPIAccess::Wait" << _my_rank << " MPIIsRecv "
591 << MPIIsRecv( RequestId ) << " outcount " << outcount << endl ;
597 cout << "MPIAccess::Wait" << _my_rank << " MPIIsRecv " << MPIIsRecv( RequestId )
598 << " MPIOutCount " << MPIOutCount( RequestId ) << endl ;
602 cout << "MPIAccess::Wait" << _my_rank << " RequestId " << RequestId
603 << " Request " << MPIRequest( RequestId )
604 << " Status " << MPIStatus( RequestId ) << " MPICompleted "
605 << MPICompleted( RequestId ) << " MPIOutCount " << MPIOutCount( RequestId )
610 // Perform a "test" of a Send or Recv asynchronous Request
611 // If the request is done, returns true in the flag argument
612 // If the request is not finished, returns false in the flag argument
613 // Do nothing for a synchronous Request
614 // Manage MPI_request * and MPI_status * structure
615 int MPIAccess::test(int RequestId, int &flag)
617 int status = MPI_SUCCESS ;
618 flag = MPICompleted( RequestId ) ;
620 cout << "MPIAccess::Test" << _my_rank << " flag " << flag ;
621 if ( MPIIsRecv( RequestId ) )
632 cout << "Request" << RequestId << " " << MPIRequest( RequestId )
633 << " Status " << MPIStatus( RequestId ) << endl ;
636 if ( *MPIRequest( RequestId ) != MPI_REQUEST_NULL )
639 cout << "MPIAccess::Test" << _my_rank << " -> test( " << RequestId
640 << " ) MPIRequest " << MPIRequest( RequestId )
641 << " MPIStatus " << MPIStatus( RequestId )
642 << " MPITag " << MPITag( RequestId )
643 << " MPIIsRecv " << MPIIsRecv( RequestId ) << endl ;
644 status = _comm_interface.test(MPIRequest( RequestId ), &flag,
645 MPIStatus( RequestId )) ;
650 cout << "MPIAccess::Test" << _my_rank << " MPIRequest == MPI_REQUEST_NULL"
655 setMPICompleted( RequestId , true ) ;
656 if ( MPIIsRecv( RequestId ) && MPIStatus( RequestId ) )
659 MPI_Datatype datatype = MPIDatatype( RequestId ) ;
660 status = _comm_interface.getCount( MPIStatus( RequestId ), datatype,
662 if ( status == MPI_SUCCESS )
664 setMPIOutCount( RequestId , outcount ) ;
665 deleteStatus( RequestId ) ;
667 cout << "MPIAccess::Test" << _my_rank << " MPIIsRecv "
668 << MPIIsRecv( RequestId ) << " outcount " << outcount << endl ;
673 cout << "MPIAccess::Test" << _my_rank << " MPIIsRecv "
674 << MPIIsRecv( RequestId ) << " outcount " << outcount << endl ;
680 cout << "MPIAccess::Test" << _my_rank << " MPIIsRecv "
681 << MPIIsRecv( RequestId ) << " MPIOutCount "
682 << MPIOutCount( RequestId ) << endl ;
687 cout << "MPIAccess::Test" << _my_rank << " RequestId " << RequestId
688 << " flag " << flag << " MPICompleted " << MPICompleted( RequestId )
689 << " MPIOutCount " << MPIOutCount( RequestId ) << endl ;
693 int MPIAccess::waitAny(int count, int *array_of_RequestIds, int &RequestId)
695 int status = MPI_ERR_OTHER ;
697 cout << "MPIAccess::WaitAny not yet implemented" << endl ;
701 int MPIAccess::testAny(int count, int *array_of_RequestIds, int &RequestId, int &flag)
703 int status = MPI_ERR_OTHER ;
706 cout << "MPIAccess::TestAny not yet implemented" << endl ;
710 // Perform a wait of each Send or Recv asynchronous Request of the array
711 // array_of_RequestIds of size "count".
712 // That array may be filled with a call to SendRequestIdsSize or RecvRequestIdsSize
713 // Do nothing for a synchronous Request
714 // Manage MPI_Request * and MPI_Status * structure
715 int MPIAccess::waitAll(int count, int *array_of_RequestIds)
718 cout << "WaitAll" << _my_rank << " : count " << count << endl ;
720 int retstatus = MPI_SUCCESS ;
722 for ( i = 0 ; i < count ; i++ )
725 cout << "WaitAll" << _my_rank << " " << i << " -> Wait( "
726 << array_of_RequestIds[i] << " )" << endl ;
727 status = wait( array_of_RequestIds[i] ) ;
728 if ( status != MPI_SUCCESS )
732 cout << "EndWaitAll" << _my_rank << endl ;
736 // Perform a "test" of each Send or Recv asynchronous Request of the array
737 // array_of_RequestIds of size "count".
738 // That array may be filled with a call to SendRequestIdsSize or RecvRequestIdsSize
739 // If all requests are done, returns true in the flag argument
740 // If all requests are not finished, returns false in the flag argument
741 // Do nothing for a synchronous Request
742 // Manage MPI_Request * and MPI_Status * structure
743 int MPIAccess::testAll(int count, int *array_of_RequestIds, int &flag)
746 cout << "TestAll" << _my_rank << " : count " << count << endl ;
748 int retstatus = MPI_SUCCESS ;
749 bool retflag = true ;
751 for ( i = 0 ; i < count ; i++ )
753 status = test( array_of_RequestIds[i] , flag ) ;
754 retflag = retflag && (flag != 0) ;
755 if ( status != MPI_SUCCESS )
760 cout << "EndTestAll" << _my_rank << endl ;
764 int MPIAccess::waitSome(int count, int *array_of_RequestIds, int outcount,
765 int *outarray_of_RequestIds)
767 int status = MPI_ERR_OTHER ;
768 cout << "MPIAccess::WaitSome not yet implemented" << endl ;
772 int MPIAccess::testSome(int count, int *array_of_RequestIds, int outcounts,
773 int *outarray_of_RequestIds)
775 int status = MPI_ERR_OTHER ;
776 cout << "MPIAccess::TestSome not yet implemented" << endl ;
780 // Probe checks if a message is available for read from FromSource rank.
781 // Returns the corresponding source, MPITag, datatype and outcount
782 // Probe is a blocking call which wait until a message is available
783 int MPIAccess::probe(int FromSource, int &source, int &MPITag,
784 MPI_Datatype &myDatatype, int &outcount)
786 MPI_Status aMPIStatus ;
787 int sts = _comm_interface.probe( FromSource, MPI_ANY_TAG,
788 *_intra_communicator , &aMPIStatus ) ;
789 if ( sts == MPI_SUCCESS )
791 source = aMPIStatus.MPI_SOURCE ;
792 MPITag = aMPIStatus.MPI_TAG ;
793 int MethodId = (MPITag % MODULO_TAG) ;
794 myDatatype = datatype( (ParaMEDMEM::_MessageIdent) MethodId ) ;
795 _comm_interface.getCount(&aMPIStatus, myDatatype, &outcount ) ;
797 cout << "MPIAccess::Probe" << _my_rank << " FromSource " << FromSource
798 << " source " << source << " MPITag " << MPITag << " MethodId "
799 << MethodId << " datatype " << myDatatype << " outcount " << outcount
812 // IProbe checks if a message is available for read from FromSource rank.
813 // If there is a message available, returns the corresponding source,
814 // MPITag, datatype and outcount with flag = true
815 // If not, returns flag = false
816 int MPIAccess::IProbe(int FromSource, int &source, int &MPITag,
817 MPI_Datatype &myDataType, int &outcount, int &flag)
819 MPI_Status aMPIStatus ;
820 int sts = _comm_interface.Iprobe( FromSource, MPI_ANY_TAG,
821 *_intra_communicator , &flag,
823 if ( sts == MPI_SUCCESS && flag )
825 source = aMPIStatus.MPI_SOURCE ;
826 MPITag = aMPIStatus.MPI_TAG ;
827 int MethodId = (MPITag % MODULO_TAG) ;
828 myDataType = datatype( (ParaMEDMEM::_MessageIdent) MethodId ) ;
829 _comm_interface.getCount(&aMPIStatus, myDataType, &outcount ) ;
831 cout << "MPIAccess::IProbe" << _my_rank << " FromSource " << FromSource
832 << " source " << source << " MPITag " << MPITag << " MethodId "
833 << MethodId << " datatype " << myDataType << " outcount " << outcount
834 << " flag " << flag << endl ;
846 // Cancel concerns a "posted" asynchronous IRecv
847 // Returns flag = true if the receiving request was successfully canceled
848 // Returns flag = false if the receiving request was finished but not canceled
849 // Use cancel, wait and test_cancelled of the MPI API
850 int MPIAccess::cancel( int RecvRequestId, int &flag )
853 int sts = _comm_interface.cancel( MPIRequest( RecvRequestId ) ) ;
854 if ( sts == MPI_SUCCESS )
856 sts = _comm_interface.wait( MPIRequest( RecvRequestId ) ,
857 MPIStatus( RecvRequestId ) ) ;
858 if ( sts == MPI_SUCCESS )
859 sts = _comm_interface.testCancelled( MPIStatus( RecvRequestId ) , &flag ) ;
864 // Cancel concerns a "pending" receiving message (without IRecv "posted")
865 // Returns flag = true if the message was successfully canceled
866 // Returns flag = false if the receiving request was finished but not canceled
867 // Use Irecv, cancel, wait and test_cancelled of the MPI API
868 int MPIAccess::cancel( int source, int theMPITag, MPI_Datatype datatype, int outcount, int &flag )
873 sts = MPI_Type_extent( datatype , &extent ) ;
874 if ( sts == MPI_SUCCESS )
876 void * recvbuf = malloc( extent*outcount ) ;
877 MPI_Request aRecvRequest ;
879 cout << "MPIAccess::Cancel" << _my_rank << " Irecv extent " << extent
880 << " datatype " << datatype << " source " << source << " theMPITag "
881 << theMPITag << endl ;
882 sts = _comm_interface.Irecv( recvbuf, outcount, datatype, source, theMPITag,
883 *_intra_communicator , &aRecvRequest ) ;
884 if ( sts == MPI_SUCCESS )
886 sts = _comm_interface.cancel( &aRecvRequest ) ;
888 cout << "MPIAccess::Cancel" << _my_rank << " theMPITag " << theMPITag
889 << " cancel done" << endl ;
890 if ( sts == MPI_SUCCESS )
894 cout << "MPIAccess::Cancel" << _my_rank << " wait" << endl ;
895 sts = _comm_interface.wait( &aRecvRequest , &aStatus ) ;
896 if ( sts == MPI_SUCCESS )
899 cout << "MPIAccess::Cancel" << _my_rank << " test_cancelled" << endl ;
900 sts = _comm_interface.testCancelled( &aStatus , &flag ) ;
904 if ( _trace && datatype == timeType() )
905 cout << "MPIAccess::Cancel" << _my_rank << " time "
906 << ((TimeMessage *) recvbuf)->time << " "
907 << ((TimeMessage *) recvbuf)->deltatime << endl ;
911 cout << "MPIAccess::Cancel" << _my_rank << " flag " << flag << endl ;
916 // CancelAll concerns all "pending" receiving message (without IRecv "posted")
917 // CancelAll use IProbe and Cancel (see obove)
918 int MPIAccess::cancelAll()
920 int sts = MPI_SUCCESS ;
924 MPI_Datatype datatype ;
927 for ( target = 0 ; target < _processor_group_size ; target++ )
929 sts = IProbe(target, source, MPITag, datatype, outcount, flag) ;
930 if ( sts == MPI_SUCCESS && flag )
932 sts = cancel(source, MPITag, datatype, outcount, flag) ;
934 cout << "MPIAccess::CancelAll" << _my_rank << " source " << source
935 << " MPITag " << MPITag << " datatype " << datatype
936 << " outcount " << outcount << " Cancel flag " << flag << endl ;
937 if ( sts != MPI_SUCCESS )
940 else if ( sts != MPI_SUCCESS )
946 // Same as barrier of MPI API
947 int MPIAccess::barrier()
949 int status = _comm_interface.barrier( *_intra_communicator ) ;
953 // Same as Error_string of MPI API
954 int MPIAccess::errorString(int errorcode, char *string, int *resultlen) const
956 return _comm_interface.errorString( errorcode, string, resultlen) ;
959 // Returns source, tag, error and outcount corresponding to receiving RequestId
960 // By default the corresponding structure of RequestId is deleted
961 int MPIAccess::status(int RequestId, int &source, int &tag, int &error,
962 int &outcount, bool keepRequestStruct)
964 MPI_Status *myStatus = MPIStatus( RequestId ) ;
966 cout << "MPIAccess::status" << _my_rank << " RequestId " << RequestId
967 << " status " << myStatus << endl ;
968 if ( myStatus != NULL && MPIAsynchronous( RequestId ) &&
969 MPICompleted( RequestId ) )
971 if ( MPIIsRecv( RequestId ) )
973 source = myStatus->MPI_SOURCE ;
974 tag = myStatus->MPI_TAG ;
975 error = myStatus->MPI_ERROR ;
976 MPI_Datatype datatype = MPIDatatype( RequestId ) ;
977 _comm_interface.getCount(myStatus, datatype, &outcount ) ;
979 cout << "MPIAccess::status" << _my_rank << " RequestId " << RequestId
980 << " status " << myStatus << " outcount " << outcount << endl ;
981 setMPIOutCount( RequestId , outcount ) ;
985 source = MPITarget( RequestId ) ;
986 tag = MPITag( RequestId ) ;
988 outcount = MPIOutCount( RequestId ) ;
990 if ( !keepRequestStruct )
991 deleteRequest( RequestId ) ;
996 source = MPITarget( RequestId ) ;
997 tag = MPITag( RequestId ) ;
999 outcount = MPIOutCount( RequestId ) ;
1001 return MPI_SUCCESS ;
1004 int MPIAccess::requestFree( MPI_Request *request )
1006 return _comm_interface.requestFree( request ) ;
1009 // Print all informations of all known requests for debugging purpose
1010 void MPIAccess::check() const
1013 map< int , RequestStruct * >::const_iterator MapOfRequestStructiterator ;
1014 cout << "MPIAccess::Check" << _my_rank << "_map_of_request_struct_size "
1015 << _map_of_request_struct.size() << endl ;
1016 for ( MapOfRequestStructiterator = _map_of_request_struct.begin() ;
1017 MapOfRequestStructiterator != _map_of_request_struct.end() ;
1018 MapOfRequestStructiterator++ )
1020 if ( MapOfRequestStructiterator->second != NULL )
1022 cout << " Check" << _my_rank << " " << i << ". Request"
1023 << MapOfRequestStructiterator->first << "-->" ;
1024 if ( (MapOfRequestStructiterator->second)->MPIAsynchronous )
1026 if ( (MapOfRequestStructiterator->second)->MPIIsRecv )
1027 cout << "Recv from " ;
1029 cout << "Send to " ;
1030 cout << (MapOfRequestStructiterator->second)->MPITarget
1031 << " MPITag " << (MapOfRequestStructiterator->second)->MPITag
1032 << " DataType " << (MapOfRequestStructiterator->second)->MPIDatatype
1033 << " Request " << (MapOfRequestStructiterator->second)->MPIRequest
1034 << " Status " << (MapOfRequestStructiterator->second)->MPIStatus
1035 << " Completed " << (MapOfRequestStructiterator->second)->MPICompleted
1042 // Outputs fields of a TimeMessage structure
1043 ostream & operator<< (ostream & f ,const TimeMessage & aTimeMsg )
1045 f << " time " << aTimeMsg.time << " deltatime " << aTimeMsg.deltatime
1046 << " tag " << aTimeMsg.tag ;
1050 // Outputs the DataType coded in a Tag
1051 ostream & operator<< (ostream & f ,const _MessageIdent & methodtype )
1055 case _message_time :
1056 f << " MethodTime ";
1061 case _message_double :
1062 f << " MPI_DOUBLE ";
1065 f << " UnknownMethodType ";