1 // Copyright (C) 2007-2024 CEA, EDF
3 // This library is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU Lesser General Public
5 // License as published by the Free Software Foundation; either
6 // version 2.1 of the License, or (at your option) any later version.
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 // Lesser General Public License for more details.
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
20 #include "MPIAccess/MPIAccess.hxx"
22 #include "InterpolationUtils.hxx"
34 The class \a MPIAccess is the gateway to the MPI library.
35 It is a helper class that gathers the calls to the MPI
36 library that are made in the ParaMEDMEM library. This gathering
37 allows easier gathering of information about the communication
38 in the library. With MPIAccess, tags are managed automatically
39 and asynchronous operations are easier.
41 It is typically called after the MPI_Init() call in a program. It is afterwards passed as a parameter to the constructors of ParaMEDMEM objects so that they access the MPI library via the MPIAccess.
43 As an example, the following code initializes a processor group made of the zero processor.
46 #include "MPIAccess.hxx"
47 #include "ProcessorGroup.hxx"
49 int main(int argc, char** argv)
52 MPI_Init(&argc, &argv);
53 MEDCoupling::CommInterface comm_interface;
55 //setting up a processor group with proc 0
58 MEDCoupling::ProcessorGroup group(procs, comm_interface);
60 MEDCoupling::MPIAccess mpi_access(group);
69 /*! Creates a MPIAccess that is based on the processors included in \a ProcessorGroup.
70 This class may be called for easier use of MPI API.
72 \param ProcessorGroup MPIProcessorGroup object giving access to group management
73 \param BaseTag and MaxTag define the range of tags to be used.
74 Tags are managed by MPIAccess. They are cyclically incremented.
75 When there is a Send or a Receive operation there is a new RequestId tag returned
76 to the caller. That RequestId may be used to manage the operation Wait, Check of
77 status etc... The MPITag internally managed by MPIAccess is used as "tag" argument
81 MPIAccess::MPIAccess(MPIProcessorGroup * ProcessorGroup, int BaseTag, int MaxTag) :
82 _comm_interface( ProcessorGroup->getCommInterface() ) ,
83 _intra_communicator( ProcessorGroup->getComm() )
88 //MPI_Comm_get_attr does not run with _IntraCommunicator ???
89 //MPI_Comm_get_attr(*_IntraCommunicator,MPID_TAG_UB,&mpitagub,&flag) ;
90 MPI_Comm_get_attr(MPI_COMM_WORLD,MPI_TAG_UB,&v,&flag) ;
91 mpitagub=*(reinterpret_cast<int*>(v));
93 BaseTag = (BaseTag/MODULO_TAG)*MODULO_TAG ;
95 MaxTag = (mpitagub/MODULO_TAG-1)*MODULO_TAG ;
96 MPI_Comm_rank( *_intra_communicator, &_my_rank ) ;
97 if ( !flag | (BaseTag < 0) | (BaseTag >= MaxTag) | (MaxTag > mpitagub) )
98 throw INTERP_KERNEL::Exception("wrong call to MPIAccess constructor");
100 _processor_group = ProcessorGroup ;
101 _processor_group_size = _processor_group->size() ;
105 _max_request = std::numeric_limits<int>::max() ;
106 _request = _base_request ;
108 _base_MPI_tag = BaseTag ;
109 _max_MPI_tag = MaxTag ;
111 _send_request = new int[ _processor_group_size ] ;
112 _recv_request = new int[ _processor_group_size ] ;
114 _send_requests.resize( _processor_group_size ) ;
115 _recv_requests.resize( _processor_group_size ) ;
117 _send_MPI_tag = new int[ _processor_group_size ] ;
118 _recv_MPI_Tag = new int[ _processor_group_size ] ;
120 for (i = 0 ; i < _processor_group_size ; i++ )
122 _send_request[ i ] = _max_request ;
123 _recv_request[ i ] = _max_request ;
124 _send_requests[ i ].resize(0) ;
125 _recv_requests[ i ].resize(0) ;
126 _send_MPI_tag[ i ] = _max_MPI_tag ;
127 _recv_MPI_Tag[ i ] = _max_MPI_tag ;
129 MPI_Datatype array_of_types[3] ;
130 array_of_types[0] = MPI_DOUBLE ;
131 array_of_types[1] = MPI_DOUBLE ;
132 array_of_types[2] = MPI_INT ;
133 int array_of_blocklengths[3] ;
134 array_of_blocklengths[0] = 1 ;
135 array_of_blocklengths[1] = 1 ;
136 array_of_blocklengths[2] = 1 ;
137 MPI_Aint array_of_displacements[3] ;
138 array_of_displacements[0] = 0 ;
139 array_of_displacements[1] = sizeof(double) ;
140 array_of_displacements[2] = 2*sizeof(double) ;
141 MPI_Type_create_struct(3, array_of_blocklengths, array_of_displacements,
142 array_of_types, &_MPI_TIME) ;
143 MPI_Type_commit(&_MPI_TIME) ;
146 MPIAccess::~MPIAccess()
148 delete [] _send_request ;
149 delete [] _recv_request ;
150 delete [] _send_MPI_tag ;
151 delete [] _recv_MPI_Tag ;
152 MPI_Type_free(&_MPI_TIME) ;
156 MPIAccess and "RequestIds" :
157 ============================
159 . WARNING : In the specification document, the distinction
160 between "MPITags" and "RequestIds" is not clear. "MPITags"
161 are arguments of calls to MPI. "RequestIds" does not concern
162 calls to MPI. "RequestIds" are named "tag"as arguments in/out
163 in the MPIAccess API in the specification documentation.
164 But in the implementation we have the right name RequestId (or
165 RecvRequestId/SendRequestId).
167 . When we have a MPI write/read request via MPIAccess, we get
168 an identifier "RequestId".
169 That identifier matches a structure RequestStruct of
170 MPIAccess. The access to that structure is done with the map
171 "_MapOfRequestStruct".
172 That structure RequestStruct give the possibility to manage
173 the structures MPI_Request and MPI_Status * of MPI. It give
174 also the possibility to get information about that request :
175 target, send/recv, tag, [a]synchronous, type, outcount.
177 . That identifier is used to control an asynchronous request
178 via MPIAccess : Wait, Test, Probe, etc...
180 . In practise "RequestId" is simply an integer fo the interval
181 [0 , 2**32-1]. There is only one such a cyclic for
182 [I]Sends and [I]Recvs.
184 . That "RequestIds" and their associated structures give an easy
185 way to manage asynchronous communications.
186 For example we have mpi_access->Wait( int RequestId ) instead of
187 MPI_Wait(MPI_Request *request, MPI_Status *status).
189 . The API of MPIAccess may give the "SendRequestIds" of a "target",
190 the "RecvRequestIds" from a "source" or the "SendRequestIds" of
191 all "targets" or the "RecvRequestIds" of all "sources".
192 That avoid to manage them in Presentation-ParaMEDMEM.
195 int MPIAccess::newRequest( MPI_Datatype datatype, int tag , int destsourcerank ,
196 bool fromsourcerank , bool asynchronous )
198 RequestStruct *mpiaccessstruct = new RequestStruct;
199 mpiaccessstruct->MPITag = tag ;
200 mpiaccessstruct->MPIDatatype = datatype ;
201 mpiaccessstruct->MPITarget = destsourcerank ;
202 mpiaccessstruct->MPIIsRecv = fromsourcerank ;
203 MPI_Status *aStatus = new MPI_Status ;
204 mpiaccessstruct->MPIStatus = aStatus ;
205 mpiaccessstruct->MPIAsynchronous = asynchronous ;
206 mpiaccessstruct->MPICompleted = !asynchronous ;
207 mpiaccessstruct->MPIOutCount = -1 ;
210 mpiaccessstruct->MPIRequest = MPI_REQUEST_NULL ;
211 mpiaccessstruct->MPIStatus->MPI_SOURCE = destsourcerank ;
212 mpiaccessstruct->MPIStatus->MPI_TAG = tag ;
213 mpiaccessstruct->MPIStatus->MPI_ERROR = MPI_SUCCESS ;
215 if ( _request == _max_request )
216 _request = _base_request ;
218 _map_of_request_struct[_request] = mpiaccessstruct ;
219 if ( fromsourcerank )
220 _recv_request[ destsourcerank ] = _request;
222 _send_request[ destsourcerank ] = _request;
224 cout << "NewRequest" << _my_rank << "( " << _request << " ) "
225 << mpiaccessstruct << endl ;
230 MPIAccess and "tags" (or "MPITags") :
231 =====================================
233 . The constructor give the possibility to choose an interval of
234 tags to use : [BaseTag , MaxTag].
235 The default is [ 0 , MPI_TAG_UB], MPI_TAG_UB being the maximum
236 value in an implementation of MPI (minimum 32767 = 2**15-1).
237 On awa with the implementation lam MPI_TAG_UB value is
238 7353944. The norm MPI specify that value is the same in all
239 processes started by mpirun.
240 In the case of the use of the same IntraCommunicator in a process
241 for several distinct data flows (or for several IntraCommunicators
242 with common processes), that permits to avoid ambiguity
245 . In MPIAccess the tags have two parts (#define MODULO_TAG 10) :
246 + The last decimal digit decimal correspond to MPI_DataType ( 1 for
247 TimeMessages, 2 for MPI_INT and 3 for MPI_DOUBLE)
248 + The value of other digits correspond to a circular number for each
250 + A TimeMessage and the associated DataMessage have the same number
251 (but the types are different and the tags also).
253 . For a Send of a message from a process "source" to a process
254 "target", we have _send_MPI_tag[target] in the process
255 source (it contains the last "tag" used for the Send of a
256 message to the process target).
257 And in the process "target" which receive that message, we have
258 _recv_MPI_Tag[source] (it contains the last "tag" used for the Recv
259 of messages from the process source).
260 Naturally in the MPI norm the values of that tags must be the same.
262 int MPIAccess::newSendTag( MPI_Datatype datatype, int destrank , int method ,
263 bool asynchronous, int &RequestId )
266 tag = incrTag( _send_MPI_tag[destrank] ) ;
267 tag = valTag( tag, method ) ;
268 _send_MPI_tag[ destrank ] = tag ;
269 RequestId = newRequest( datatype, tag, destrank , false , asynchronous ) ;
270 _send_request[ destrank ] = RequestId ;
271 _send_requests[ destrank ].push_back( RequestId ) ;
275 int MPIAccess::newRecvTag( MPI_Datatype datatype, int sourcerank , int method ,
276 bool asynchronous, int &RequestId )
279 tag = incrTag( _recv_MPI_Tag[sourcerank] ) ;
280 tag = valTag( tag, method ) ;
281 _recv_MPI_Tag[ sourcerank ] = tag ;
282 RequestId = newRequest( datatype, tag , sourcerank , true , asynchronous ) ;
283 _recv_request[ sourcerank ] = RequestId ;
284 _recv_requests[ sourcerank ].push_back( RequestId ) ;
288 // Returns the number of all SendRequestIds that may be used to allocate
289 // ArrayOfSendRequests for the call to SendRequestIds
290 int MPIAccess::sendRequestIdsSize()
293 for (int i = 0 ; i < _processor_group_size ; i++ )
294 size += (int)_send_requests[ i ].size() ;
298 // Returns in ArrayOfSendRequests with the dimension "size" all the
300 int MPIAccess::sendRequestIds(int size, int *ArrayOfSendRequests)
304 for ( destrank = 0 ; destrank < _processor_group_size ; destrank++ )
306 list< int >::const_iterator iter ;
307 for (iter = _send_requests[ destrank ].begin() ; iter != _send_requests[destrank].end() ; iter++ )
308 ArrayOfSendRequests[i++] = *iter ;
313 // Returns the number of all RecvRequestIds that may be used to allocate
314 // ArrayOfRecvRequests for the call to RecvRequestIds
315 int MPIAccess::recvRequestIdsSize()
318 for (int i = 0 ; i < _processor_group_size ; i++ )
319 size += (int)_recv_requests[ i ].size() ;
323 // Returns in ArrayOfRecvRequests with the dimension "size" all the
325 int MPIAccess::recvRequestIds(int size, int *ArrayOfRecvRequests)
329 for ( sourcerank = 0 ; sourcerank < _processor_group_size ; sourcerank++ )
331 list< int >::const_iterator iter ;
332 for (iter = _recv_requests[ sourcerank ].begin() ; iter != _recv_requests[sourcerank].end() ; iter++ )
333 ArrayOfRecvRequests[i++] = *iter ;
338 // Returns in ArrayOfSendRequests with the dimension "size" all the
339 // SendRequestIds to a destination rank
340 int MPIAccess::sendRequestIds(int destrank, int size, int *ArrayOfSendRequests)
342 if (size < (int)_send_requests[destrank].size() )
343 throw INTERP_KERNEL::Exception("wrong call to MPIAccess::SendRequestIds");
345 list< int >::const_iterator iter ;
346 for (iter = _send_requests[ destrank ].begin() ; iter != _send_requests[destrank].end() ; iter++ )
347 ArrayOfSendRequests[i++] = *iter ;
348 return (int)_send_requests[destrank].size() ;
351 // Returns in ArrayOfRecvRequests with the dimension "size" all the
352 // RecvRequestIds from a sourcerank
353 int MPIAccess::recvRequestIds(int sourcerank, int size, int *ArrayOfRecvRequests)
355 if (size < (int)_recv_requests[sourcerank].size() )
356 throw INTERP_KERNEL::Exception("wrong call to MPIAccess::RecvRequestIds");
358 list< int >::const_iterator iter ;
359 _recv_requests[ sourcerank ] ;
360 for (iter = _recv_requests[ sourcerank ].begin() ; iter != _recv_requests[sourcerank].end() ; iter++ )
361 ArrayOfRecvRequests[i++] = *iter ;
362 return (int)_recv_requests[sourcerank].size() ;
365 // Send in synchronous mode count values of type datatype from buffer to target
366 // (returns RequestId identifier even if the corresponding structure is deleted :
367 // it is only in order to have the same signature as the asynchronous mode)
368 int MPIAccess::send(void* buffer, int count, MPI_Datatype datatype, int target, int &RequestId)
370 int sts = MPI_SUCCESS ;
374 _MessageIdent aMethodIdent = methodId( datatype ) ;
375 int MPItag = newSendTag( datatype, target , aMethodIdent , false , RequestId ) ;
376 if ( aMethodIdent == _message_time )
378 TimeMessage *aTimeMsg = (TimeMessage *) buffer ;
379 aTimeMsg->tag = MPItag ;
381 deleteRequest( RequestId ) ;
382 sts = _comm_interface.send(buffer, count, datatype, target, MPItag,
383 *_intra_communicator ) ;
385 cout << "MPIAccess::Send" << _my_rank << " SendRequestId "
386 << RequestId << " count " << count << " target " << target
387 << " MPItag " << MPItag << endl ;
392 // Receive (read) in synchronous mode count values of type datatype in buffer from source
393 // (returns RequestId identifier even if the corresponding structure is deleted :
394 // it is only in order to have the same signature as the asynchronous mode)
395 // The output argument OutCount is optional : *OutCount <= count
396 int MPIAccess::recv(void* buffer, int count, MPI_Datatype datatype, int source, int &RequestId, int *OutCount)
398 int sts = MPI_SUCCESS ;
400 if ( OutCount != NULL )
404 _MessageIdent aMethodIdent = methodId( datatype ) ;
405 int MPItag = newRecvTag( datatype, source , aMethodIdent , false , RequestId ) ;
406 sts = _comm_interface.recv(buffer, count, datatype, source, MPItag,
407 *_intra_communicator , MPIStatus( RequestId ) ) ;
409 if ( sts == MPI_SUCCESS )
411 MPI_Datatype datatype2 = MPIDatatype( RequestId ) ;
412 _comm_interface.getCount(MPIStatus( RequestId ), datatype2, &outcount ) ;
413 setMPIOutCount( RequestId , outcount ) ;
414 setMPICompleted( RequestId , true ) ;
415 deleteStatus( RequestId ) ;
417 if ( OutCount != NULL )
418 *OutCount = outcount ;
420 cout << "MPIAccess::Recv" << _my_rank << " RecvRequestId "
421 << RequestId << " count " << count << " source " << source
422 << " MPItag " << MPItag << endl ;
423 deleteRequest( RequestId ) ;
428 // Send in asynchronous mode count values of type datatype from buffer to target
429 // Returns RequestId identifier.
430 int MPIAccess::ISend(void* buffer, int count, MPI_Datatype datatype, int target, int &RequestId)
432 int sts = MPI_SUCCESS ;
436 _MessageIdent aMethodIdent = methodId( datatype ) ;
437 int MPItag = newSendTag( datatype, target , aMethodIdent , true , RequestId ) ;
438 if ( aMethodIdent == _message_time )
440 TimeMessage *aTimeMsg = (TimeMessage *) buffer ;
441 aTimeMsg->tag = MPItag ;
443 MPI_Request *aSendRequest = MPIRequest( RequestId ) ;
446 cout << "MPIAccess::ISend" << _my_rank << " ISendRequestId "
447 << RequestId << " count " << count << " target " << target
448 << " MPItag " << MPItag << endl ;
450 cout << "MPIAccess::ISend" << _my_rank << " time "
451 << ((TimeMessage *)buffer)->time << " " << ((TimeMessage *)buffer)->deltatime
454 sts = _comm_interface.Isend(buffer, count, datatype, target, MPItag,
455 *_intra_communicator , aSendRequest) ;
460 // Receive (read) in asynchronous mode count values of type datatype in buffer from source
461 // returns RequestId identifier.
462 int MPIAccess::IRecv(void* buffer, int count, MPI_Datatype datatype, int source, int &RequestId)
464 int sts = MPI_SUCCESS ;
468 _MessageIdent aMethodIdent = methodId( datatype ) ;
469 int MPItag = newRecvTag( datatype, source , aMethodIdent , true , RequestId ) ;
470 MPI_Request *aRecvRequest = MPIRequest( RequestId ) ;
473 cout << "MPIAccess::IRecv" << _my_rank << " IRecvRequestId "
474 << RequestId << " count " << count << " source " << source
475 << " MPItag " << MPItag << endl ;
477 cout << "MPIAccess::ISend" << _my_rank << " time "
478 << ((TimeMessage *)buffer)->time << " " << ((TimeMessage *)buffer)->deltatime
481 sts = _comm_interface.Irecv(buffer, count, datatype, source, MPItag,
482 *_intra_communicator , aRecvRequest) ;
487 // Perform a Send and a Recv in synchronous mode
488 int MPIAccess::sendRecv(void* sendbuf, int sendcount, MPI_Datatype sendtype,
489 int dest, int &SendRequestId,
490 void* recvbuf, int recvcount, MPI_Datatype recvtype,
491 int source, int &RecvRequestId, int *OutCount)
493 int sts = MPI_SUCCESS ;
497 sts = IRecv(recvbuf, recvcount, recvtype, source, RecvRequestId) ;
500 cout << "MPIAccess::SendRecv" << _my_rank << " IRecv RecvRequestId "
501 << RecvRequestId << endl ;
502 if ( sts == MPI_SUCCESS )
505 sts = send(sendbuf, sendcount, sendtype, dest, SendRequestId) ;
507 cout << "MPIAccess::SendRecv" << _my_rank << " Send SendRequestId "
508 << SendRequestId << endl ;
509 if ( sts == MPI_SUCCESS && recvcount )
511 sts = wait( RecvRequestId ) ;
512 outcount = MPIOutCount( RecvRequestId ) ;
514 cout << "MPIAccess::SendRecv" << _my_rank << " IRecv RecvRequestId "
515 << RecvRequestId << " outcount " << outcount << endl ;
518 if ( OutCount != NULL )
520 *OutCount = outcount ;
522 cout << "MPIAccess::SendRecv" << _my_rank << " *OutCount = " << *OutCount
525 deleteRequest( RecvRequestId ) ;
529 // Perform a Send and a Recv in asynchronous mode
530 int MPIAccess::ISendRecv(void* sendbuf, int sendcount, MPI_Datatype sendtype,
531 int dest, int &SendRequestId,
532 void* recvbuf, int recvcount, MPI_Datatype recvtype,
533 int source, int &RecvRequestId)
535 int sts = MPI_SUCCESS ;
539 sts = IRecv(recvbuf, recvcount, recvtype, source, RecvRequestId) ;
540 if ( sts == MPI_SUCCESS )
542 sts = ISend(sendbuf, sendcount, sendtype, dest, SendRequestId) ;
546 // Perform a wait of a Send or Recv asynchronous Request
547 // Do nothing for a synchronous Request
548 // Manage MPI_Request * and MPI_Status * structure
549 int MPIAccess::wait( int RequestId )
551 int status = MPI_SUCCESS ;
552 if ( !MPICompleted( RequestId ) )
554 if ( *MPIRequest( RequestId ) != MPI_REQUEST_NULL )
557 cout << "MPIAccess::Wait" << _my_rank << " -> wait( " << RequestId
558 << " ) MPIRequest " << MPIRequest( RequestId ) << " MPIStatus "
559 << MPIStatus( RequestId ) << " MPITag " << MPITag( RequestId )
560 << " MPIIsRecv " << MPIIsRecv( RequestId ) << endl ;
561 status = _comm_interface.wait(MPIRequest( RequestId ), MPIStatus( RequestId )) ;
566 cout << "MPIAccess::Wait" << _my_rank << " MPIRequest == MPI_REQUEST_NULL"
569 setMPICompleted( RequestId , true ) ;
570 if ( MPIIsRecv( RequestId ) && MPIStatus( RequestId ) )
572 MPI_Datatype datatype = MPIDatatype( RequestId ) ;
574 status = _comm_interface.getCount(MPIStatus( RequestId ), datatype,
576 if ( status == MPI_SUCCESS )
578 setMPIOutCount( RequestId , outcount ) ;
579 deleteStatus( RequestId ) ;
581 cout << "MPIAccess::Wait" << _my_rank << " RequestId " << RequestId
582 << "MPIIsRecv " << MPIIsRecv( RequestId ) << " outcount " << outcount
588 cout << "MPIAccess::Wait" << _my_rank << " MPIIsRecv "
589 << MPIIsRecv( RequestId ) << " outcount " << outcount << endl ;
595 cout << "MPIAccess::Wait" << _my_rank << " MPIIsRecv " << MPIIsRecv( RequestId )
596 << " MPIOutCount " << MPIOutCount( RequestId ) << endl ;
600 cout << "MPIAccess::Wait" << _my_rank << " RequestId " << RequestId
601 << " Request " << MPIRequest( RequestId )
602 << " Status " << MPIStatus( RequestId ) << " MPICompleted "
603 << MPICompleted( RequestId ) << " MPIOutCount " << MPIOutCount( RequestId )
608 // Perform a "test" of a Send or Recv asynchronous Request
609 // If the request is done, returns true in the flag argument
610 // If the request is not finished, returns false in the flag argument
611 // Do nothing for a synchronous Request
612 // Manage MPI_request * and MPI_status * structure
613 int MPIAccess::test(int RequestId, int &flag)
615 int status = MPI_SUCCESS ;
616 flag = MPICompleted( RequestId ) ;
618 cout << "MPIAccess::Test" << _my_rank << " flag " << flag ;
619 if ( MPIIsRecv( RequestId ) )
630 cout << "Request" << RequestId << " " << MPIRequest( RequestId )
631 << " Status " << MPIStatus( RequestId ) << endl ;
634 if ( *MPIRequest( RequestId ) != MPI_REQUEST_NULL )
637 cout << "MPIAccess::Test" << _my_rank << " -> test( " << RequestId
638 << " ) MPIRequest " << MPIRequest( RequestId )
639 << " MPIStatus " << MPIStatus( RequestId )
640 << " MPITag " << MPITag( RequestId )
641 << " MPIIsRecv " << MPIIsRecv( RequestId ) << endl ;
642 status = _comm_interface.test(MPIRequest( RequestId ), &flag,
643 MPIStatus( RequestId )) ;
648 cout << "MPIAccess::Test" << _my_rank << " MPIRequest == MPI_REQUEST_NULL"
653 setMPICompleted( RequestId , true ) ;
654 if ( MPIIsRecv( RequestId ) && MPIStatus( RequestId ) )
657 MPI_Datatype datatype = MPIDatatype( RequestId ) ;
658 status = _comm_interface.getCount( MPIStatus( RequestId ), datatype,
660 if ( status == MPI_SUCCESS )
662 setMPIOutCount( RequestId , outcount ) ;
663 deleteStatus( RequestId ) ;
665 cout << "MPIAccess::Test" << _my_rank << " MPIIsRecv "
666 << MPIIsRecv( RequestId ) << " outcount " << outcount << endl ;
671 cout << "MPIAccess::Test" << _my_rank << " MPIIsRecv "
672 << MPIIsRecv( RequestId ) << " outcount " << outcount << endl ;
678 cout << "MPIAccess::Test" << _my_rank << " MPIIsRecv "
679 << MPIIsRecv( RequestId ) << " MPIOutCount "
680 << MPIOutCount( RequestId ) << endl ;
685 cout << "MPIAccess::Test" << _my_rank << " RequestId " << RequestId
686 << " flag " << flag << " MPICompleted " << MPICompleted( RequestId )
687 << " MPIOutCount " << MPIOutCount( RequestId ) << endl ;
691 int MPIAccess::waitAny(int count, int *array_of_RequestIds, int &RequestId)
693 int status = MPI_ERR_OTHER ;
695 cout << "MPIAccess::WaitAny not yet implemented" << endl ;
699 int MPIAccess::testAny(int count, int *array_of_RequestIds, int &RequestId, int &flag)
701 int status = MPI_ERR_OTHER ;
704 cout << "MPIAccess::TestAny not yet implemented" << endl ;
708 // Perform a wait of each Send or Recv asynchronous Request of the array
709 // array_of_RequestIds of size "count".
710 // That array may be filled with a call to SendRequestIdsSize or RecvRequestIdsSize
711 // Do nothing for a synchronous Request
712 // Manage MPI_Request * and MPI_Status * structure
713 int MPIAccess::waitAll(int count, int *array_of_RequestIds)
716 cout << "WaitAll" << _my_rank << " : count " << count << endl ;
718 int retstatus = MPI_SUCCESS ;
720 for ( i = 0 ; i < count ; i++ )
723 cout << "WaitAll" << _my_rank << " " << i << " -> Wait( "
724 << array_of_RequestIds[i] << " )" << endl ;
725 status = wait( array_of_RequestIds[i] ) ;
726 if ( status != MPI_SUCCESS )
730 cout << "EndWaitAll" << _my_rank << endl ;
734 // Perform a "test" of each Send or Recv asynchronous Request of the array
735 // array_of_RequestIds of size "count".
736 // That array may be filled with a call to SendRequestIdsSize or RecvRequestIdsSize
737 // If all requests are done, returns true in the flag argument
738 // If all requests are not finished, returns false in the flag argument
739 // Do nothing for a synchronous Request
740 // Manage MPI_Request * and MPI_Status * structure
741 int MPIAccess::testAll(int count, int *array_of_RequestIds, int &flag)
744 cout << "TestAll" << _my_rank << " : count " << count << endl ;
746 int retstatus = MPI_SUCCESS ;
747 bool retflag = true ;
749 for ( i = 0 ; i < count ; i++ )
751 status = test( array_of_RequestIds[i] , flag ) ;
752 retflag = retflag && (flag != 0) ;
753 if ( status != MPI_SUCCESS )
758 cout << "EndTestAll" << _my_rank << endl ;
762 int MPIAccess::waitSome(int count, int *array_of_RequestIds, int outcount,
763 int *outarray_of_RequestIds)
765 int status = MPI_ERR_OTHER ;
766 cout << "MPIAccess::WaitSome not yet implemented" << endl ;
770 int MPIAccess::testSome(int count, int *array_of_RequestIds, int outcounts,
771 int *outarray_of_RequestIds)
773 int status = MPI_ERR_OTHER ;
774 cout << "MPIAccess::TestSome not yet implemented" << endl ;
778 // Probe checks if a message is available for read from FromSource rank.
779 // Returns the corresponding source, MPITag, datatype and outcount
780 // Probe is a blocking call which wait until a message is available
781 int MPIAccess::probe(int FromSource, int &source, int &MPITag,
782 MPI_Datatype &myDatatype, int &outcount)
784 MPI_Status aMPIStatus ;
785 int sts = _comm_interface.probe( FromSource, MPI_ANY_TAG,
786 *_intra_communicator , &aMPIStatus ) ;
787 if ( sts == MPI_SUCCESS )
789 source = aMPIStatus.MPI_SOURCE ;
790 MPITag = aMPIStatus.MPI_TAG ;
791 int MethodId = (MPITag % MODULO_TAG) ;
792 myDatatype = datatype( (MEDCoupling::_MessageIdent) MethodId ) ;
793 _comm_interface.getCount(&aMPIStatus, myDatatype, &outcount ) ;
795 cout << "MPIAccess::Probe" << _my_rank << " FromSource " << FromSource
796 << " source " << source << " MPITag " << MPITag << " MethodId "
797 << MethodId << " datatype " << myDatatype << " outcount " << outcount
810 // IProbe checks if a message is available for read from FromSource rank.
811 // If there is a message available, returns the corresponding source,
812 // MPITag, datatype and outcount with flag = true
813 // If not, returns flag = false
814 int MPIAccess::IProbe(int FromSource, int &source, int &MPITag,
815 MPI_Datatype &myDataType, int &outcount, int &flag)
817 MPI_Status aMPIStatus ;
818 int sts = _comm_interface.Iprobe( FromSource, MPI_ANY_TAG,
819 *_intra_communicator , &flag,
821 if ( sts == MPI_SUCCESS && flag )
823 source = aMPIStatus.MPI_SOURCE ;
824 MPITag = aMPIStatus.MPI_TAG ;
825 int MethodId = (MPITag % MODULO_TAG) ;
826 myDataType = datatype( (MEDCoupling::_MessageIdent) MethodId ) ;
827 _comm_interface.getCount(&aMPIStatus, myDataType, &outcount ) ;
829 cout << "MPIAccess::IProbe" << _my_rank << " FromSource " << FromSource
830 << " source " << source << " MPITag " << MPITag << " MethodId "
831 << MethodId << " datatype " << myDataType << " outcount " << outcount
832 << " flag " << flag << endl ;
844 // Cancel concerns a "posted" asynchronous IRecv
845 // Returns flag = true if the receiving request was successfully canceled
846 // Returns flag = false if the receiving request was finished but not canceled
847 // Use cancel, wait and test_cancelled of the MPI API
848 int MPIAccess::cancel( int RecvRequestId, int &flag )
851 int sts = _comm_interface.cancel( MPIRequest( RecvRequestId ) ) ;
852 if ( sts == MPI_SUCCESS )
854 sts = _comm_interface.wait( MPIRequest( RecvRequestId ) ,
855 MPIStatus( RecvRequestId ) ) ;
856 if ( sts == MPI_SUCCESS )
857 sts = _comm_interface.testCancelled( MPIStatus( RecvRequestId ) , &flag ) ;
862 // Cancel concerns a "pending" receiving message (without IRecv "posted")
863 // Returns flag = true if the message was successfully canceled
864 // Returns flag = false if the receiving request was finished but not canceled
865 // Use Irecv, cancel, wait and test_cancelled of the MPI API
866 int MPIAccess::cancel( int source, int theMPITag, MPI_Datatype datatype, int outcount, int &flag )
869 MPI_Aint extent, lbound ;
871 sts = MPI_Type_get_extent( datatype , &lbound, &extent ) ;
872 if ( sts == MPI_SUCCESS )
874 void * recvbuf = malloc( extent*outcount ) ;
875 MPI_Request aRecvRequest ;
877 cout << "MPIAccess::Cancel" << _my_rank << " Irecv extent " << extent
878 << " datatype " << datatype << " source " << source << " theMPITag "
879 << theMPITag << endl ;
880 sts = _comm_interface.Irecv( recvbuf, outcount, datatype, source, theMPITag,
881 *_intra_communicator , &aRecvRequest ) ;
882 if ( sts == MPI_SUCCESS )
884 sts = _comm_interface.cancel( &aRecvRequest ) ;
886 cout << "MPIAccess::Cancel" << _my_rank << " theMPITag " << theMPITag
887 << " cancel done" << endl ;
888 if ( sts == MPI_SUCCESS )
892 cout << "MPIAccess::Cancel" << _my_rank << " wait" << endl ;
893 sts = _comm_interface.wait( &aRecvRequest , &aStatus ) ;
894 if ( sts == MPI_SUCCESS )
897 cout << "MPIAccess::Cancel" << _my_rank << " test_cancelled" << endl ;
898 sts = _comm_interface.testCancelled( &aStatus , &flag ) ;
902 if ( _trace && datatype == timeType() )
903 cout << "MPIAccess::Cancel" << _my_rank << " time "
904 << ((TimeMessage *) recvbuf)->time << " "
905 << ((TimeMessage *) recvbuf)->deltatime << endl ;
909 cout << "MPIAccess::Cancel" << _my_rank << " flag " << flag << endl ;
914 // CancelAll concerns all "pending" receiving message (without IRecv "posted")
915 // CancelAll use IProbe and Cancel (see obove)
916 int MPIAccess::cancelAll()
918 int sts = MPI_SUCCESS ;
922 MPI_Datatype datatype ;
925 for ( target = 0 ; target < _processor_group_size ; target++ )
927 sts = IProbe(target, source, MPITag, datatype, outcount, flag) ;
928 if ( sts == MPI_SUCCESS && flag )
930 sts = cancel(source, MPITag, datatype, outcount, flag) ;
932 cout << "MPIAccess::CancelAll" << _my_rank << " source " << source
933 << " MPITag " << MPITag << " datatype " << datatype
934 << " outcount " << outcount << " Cancel flag " << flag << endl ;
935 if ( sts != MPI_SUCCESS )
938 else if ( sts != MPI_SUCCESS )
944 // Same as barrier of MPI API
945 int MPIAccess::barrier()
947 int status = _comm_interface.barrier( *_intra_communicator ) ;
951 // Same as Error_string of MPI API
952 int MPIAccess::errorString(int errorcode, char *string, int *resultlen) const
954 return _comm_interface.errorString( errorcode, string, resultlen) ;
957 // Returns source, tag, error and outcount corresponding to receiving RequestId
958 // By default the corresponding structure of RequestId is deleted
959 int MPIAccess::status(int RequestId, int &source, int &tag, int &error,
960 int &outcount, bool keepRequestStruct)
962 MPI_Status *myStatus = MPIStatus( RequestId ) ;
964 cout << "MPIAccess::status" << _my_rank << " RequestId " << RequestId
965 << " status " << myStatus << endl ;
966 if ( myStatus != NULL && MPIAsynchronous( RequestId ) &&
967 MPICompleted( RequestId ) )
969 if ( MPIIsRecv( RequestId ) )
971 source = myStatus->MPI_SOURCE ;
972 tag = myStatus->MPI_TAG ;
973 error = myStatus->MPI_ERROR ;
974 MPI_Datatype datatype = MPIDatatype( RequestId ) ;
975 _comm_interface.getCount(myStatus, datatype, &outcount ) ;
977 cout << "MPIAccess::status" << _my_rank << " RequestId " << RequestId
978 << " status " << myStatus << " outcount " << outcount << endl ;
979 setMPIOutCount( RequestId , outcount ) ;
983 source = MPITarget( RequestId ) ;
984 tag = MPITag( RequestId ) ;
986 outcount = MPIOutCount( RequestId ) ;
988 if ( !keepRequestStruct )
989 deleteRequest( RequestId ) ;
994 source = MPITarget( RequestId ) ;
995 tag = MPITag( RequestId ) ;
997 outcount = MPIOutCount( RequestId ) ;
1002 int MPIAccess::requestFree( MPI_Request *request )
1004 return _comm_interface.requestFree( request ) ;
1007 // Print all information of all known requests for debugging purpose
1008 void MPIAccess::check() const
1011 map< int , RequestStruct * >::const_iterator MapOfRequestStructiterator ;
1012 cout << "MPIAccess::Check" << _my_rank << "_map_of_request_struct_size "
1013 << _map_of_request_struct.size() << endl ;
1014 for ( MapOfRequestStructiterator = _map_of_request_struct.begin() ;
1015 MapOfRequestStructiterator != _map_of_request_struct.end() ;
1016 MapOfRequestStructiterator++ )
1018 if ( MapOfRequestStructiterator->second != NULL )
1020 cout << " Check" << _my_rank << " " << i << ". Request"
1021 << MapOfRequestStructiterator->first << "-->" ;
1022 if ( (MapOfRequestStructiterator->second)->MPIAsynchronous )
1024 if ( (MapOfRequestStructiterator->second)->MPIIsRecv )
1025 cout << "Recv from " ;
1027 cout << "Send to " ;
1028 cout << (MapOfRequestStructiterator->second)->MPITarget
1029 << " MPITag " << (MapOfRequestStructiterator->second)->MPITag
1030 << " DataType " << (MapOfRequestStructiterator->second)->MPIDatatype
1031 << " Request " << (MapOfRequestStructiterator->second)->MPIRequest
1032 << " Status " << (MapOfRequestStructiterator->second)->MPIStatus
1033 << " Completed " << (MapOfRequestStructiterator->second)->MPICompleted
1040 // Returns the MPI size of a TimeMessage
1041 MPI_Aint MPIAccess::timeExtent() const
1043 MPI_Aint aextent, lbound ;
1044 MPI_Type_get_extent( _MPI_TIME , &lbound, &aextent ) ;
1048 // Returns the MPI size of a MPI_INT
1049 MPI_Aint MPIAccess::intExtent() const
1051 MPI_Aint aextent, lbound ;
1052 MPI_Type_get_extent( MPI_INT , &lbound, &aextent ) ;
1056 // Returns the MPI size of a MPI_LONG
1057 MPI_Aint MPIAccess::longExtent() const
1059 MPI_Aint aextent, lbound ;
1060 MPI_Type_get_extent( MPI_LONG , &lbound, &aextent ) ;
1064 // Returns the MPI size of a MPI_DOUBLE
1065 MPI_Aint MPIAccess::doubleExtent() const
1067 MPI_Aint aextent, lbound ;
1068 MPI_Type_get_extent( MPI_DOUBLE , &lbound, &aextent ) ;
1072 // Outputs fields of a TimeMessage structure
1073 ostream & operator<< (ostream & f ,const TimeMessage & aTimeMsg )
1075 f << " time " << aTimeMsg.time << " deltatime " << aTimeMsg.deltatime
1076 << " tag " << aTimeMsg.tag ;
1080 // Outputs the DataType coded in a Tag
1081 ostream & operator<< (ostream & f ,const _MessageIdent & methodtype )
1085 case _message_time :
1086 f << " MethodTime ";
1091 case _message_double :
1092 f << " MPI_DOUBLE ";
1095 f << " UnknownMethodType ";