1 // Copyright (C) 2007-2016 CEA/DEN, EDF R&D
3 // This library is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU Lesser General Public
5 // License as published by the Free Software Foundation; either
6 // version 2.1 of the License, or (at your option) any later version.
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 // Lesser General Public License for more details.
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
20 #include "MPIAccess.hxx"
21 #include "InterpolationUtils.hxx"
33 The class \a MPIAccess is the gateway to the MPI library.
34 It is a helper class that gathers the calls to the MPI
35 library that are made in the ParaMEDMEM library. This gathering
36 allows easier gathering of information about the communication
37 in the library. With MPIAccess, tags are managed automatically
38 and asynchronous operations are easier.
40 It is typically called after the MPI_Init() call in a program. It is afterwards passed as a parameter to the constructors of ParaMEDMEM objects so that they access the MPI library via the MPIAccess.
42 As an example, the following code initializes a processor group made of the zero processor.
45 #include "MPIAccess.hxx"
46 #include "ProcessorGroup.hxx"
48 int main(int argc, char** argv)
51 MPI_Init(&argc, &argv);
52 MEDCoupling::CommInterface comm_interface;
54 //setting up a processor group with proc 0
57 MEDCoupling::ProcessorGroup group(procs, comm_interface);
59 MEDCoupling::MPIAccess mpi_access(group);
68 /*! Creates a MPIAccess that is based on the processors included in \a ProcessorGroup.
69 This class may be called for easier use of MPI API.
71 \param ProcessorGroup MPIProcessorGroup object giving access to group management
72 \param BaseTag and MaxTag define the range of tags to be used.
73 Tags are managed by MPIAccess. They are cyclically incremented.
74 When there is a Send or a Receive operation there is a new RequestId tag returned
75 to the caller. That RequestId may be used to manage the operation Wait, Check of
76 status etc... The MPITag internally managed by MPIAccess is used as "tag" argument
80 MPIAccess::MPIAccess(MPIProcessorGroup * ProcessorGroup, int BaseTag, int MaxTag) :
81 _comm_interface( ProcessorGroup->getCommInterface() ) ,
82 _intra_communicator( ProcessorGroup->getComm() )
87 //MPI_Comm_get_attr does not run with _IntraCommunicator ???
88 //MPI_Comm_get_attr(*_IntraCommunicator,MPID_TAG_UB,&mpitagub,&flag) ;
89 MPI_Comm_get_attr(MPI_COMM_WORLD,MPI_TAG_UB,&v,&flag) ;
90 mpitagub=*(reinterpret_cast<int*>(v));
92 BaseTag = (BaseTag/MODULO_TAG)*MODULO_TAG ;
94 MaxTag = (mpitagub/MODULO_TAG-1)*MODULO_TAG ;
95 MPI_Comm_rank( *_intra_communicator, &_my_rank ) ;
96 if ( !flag | (BaseTag < 0) | (BaseTag >= MaxTag) | (MaxTag > mpitagub) )
97 throw INTERP_KERNEL::Exception("wrong call to MPIAccess constructor");
99 _processor_group = ProcessorGroup ;
100 _processor_group_size = _processor_group->size() ;
104 _max_request = std::numeric_limits<int>::max() ;
105 _request = _base_request ;
107 _base_MPI_tag = BaseTag ;
108 _max_MPI_tag = MaxTag ;
110 _send_request = new int[ _processor_group_size ] ;
111 _recv_request = new int[ _processor_group_size ] ;
113 _send_requests.resize( _processor_group_size ) ;
114 _recv_requests.resize( _processor_group_size ) ;
116 _send_MPI_tag = new int[ _processor_group_size ] ;
117 _recv_MPI_Tag = new int[ _processor_group_size ] ;
119 for (i = 0 ; i < _processor_group_size ; i++ )
121 _send_request[ i ] = _max_request ;
122 _recv_request[ i ] = _max_request ;
123 _send_requests[ i ].resize(0) ;
124 _recv_requests[ i ].resize(0) ;
125 _send_MPI_tag[ i ] = _max_MPI_tag ;
126 _recv_MPI_Tag[ i ] = _max_MPI_tag ;
128 MPI_Datatype array_of_types[3] ;
129 array_of_types[0] = MPI_DOUBLE ;
130 array_of_types[1] = MPI_DOUBLE ;
131 array_of_types[2] = MPI_INT ;
132 int array_of_blocklengths[3] ;
133 array_of_blocklengths[0] = 1 ;
134 array_of_blocklengths[1] = 1 ;
135 array_of_blocklengths[2] = 1 ;
136 MPI_Aint array_of_displacements[3] ;
137 array_of_displacements[0] = 0 ;
138 array_of_displacements[1] = sizeof(double) ;
139 array_of_displacements[2] = 2*sizeof(double) ;
140 MPI_Type_create_struct(3, array_of_blocklengths, array_of_displacements,
141 array_of_types, &_MPI_TIME) ;
142 MPI_Type_commit(&_MPI_TIME) ;
145 MPIAccess::~MPIAccess()
147 delete [] _send_request ;
148 delete [] _recv_request ;
149 delete [] _send_MPI_tag ;
150 delete [] _recv_MPI_Tag ;
151 MPI_Type_free(&_MPI_TIME) ;
155 MPIAccess and "RequestIds" :
156 ============================
158 . WARNING : In the specification document, the distinction
159 between "MPITags" and "RequestIds" is not clear. "MPITags"
160 are arguments of calls to MPI. "RequestIds" does not concern
161 calls to MPI. "RequestIds" are named "tag"as arguments in/out
162 in the MPIAccess API in the specification documentation.
163 But in the implementation we have the right name RequestId (or
164 RecvRequestId/SendRequestId).
166 . When we have a MPI write/read request via MPIAccess, we get
167 an identifier "RequestId".
168 That identifier matches a structure RequestStruct of
169 MPIAccess. The access to that structure is done with the map
170 "_MapOfRequestStruct".
171 That structure RequestStruct give the possibility to manage
172 the structures MPI_Request and MPI_Status * of MPI. It give
173 also the possibility to get informations about that request :
174 target, send/recv, tag, [a]synchronous, type, outcount.
176 . That identifier is used to control an asynchronous request
177 via MPIAccess : Wait, Test, Probe, etc...
179 . In practise "RequestId" is simply an integer fo the interval
180 [0 , 2**32-1]. There is only one such a cyclic for
181 [I]Sends and [I]Recvs.
183 . That "RequestIds" and their associated structures give an easy
184 way to manage asynchronous communications.
185 For example we have mpi_access->Wait( int RequestId ) instead of
186 MPI_Wait(MPI_Request *request, MPI_Status *status).
188 . The API of MPIAccess may give the "SendRequestIds" of a "target",
189 the "RecvRequestIds" from a "source" or the "SendRequestIds" of
190 all "targets" or the "RecvRequestIds" of all "sources".
191 That avoid to manage them in Presentation-ParaMEDMEM.
194 int MPIAccess::newRequest( MPI_Datatype datatype, int tag , int destsourcerank ,
195 bool fromsourcerank , bool asynchronous )
197 RequestStruct *mpiaccessstruct = new RequestStruct;
198 mpiaccessstruct->MPITag = tag ;
199 mpiaccessstruct->MPIDatatype = datatype ;
200 mpiaccessstruct->MPITarget = destsourcerank ;
201 mpiaccessstruct->MPIIsRecv = fromsourcerank ;
202 MPI_Status *aStatus = new MPI_Status ;
203 mpiaccessstruct->MPIStatus = aStatus ;
204 mpiaccessstruct->MPIAsynchronous = asynchronous ;
205 mpiaccessstruct->MPICompleted = !asynchronous ;
206 mpiaccessstruct->MPIOutCount = -1 ;
209 mpiaccessstruct->MPIRequest = MPI_REQUEST_NULL ;
210 mpiaccessstruct->MPIStatus->MPI_SOURCE = destsourcerank ;
211 mpiaccessstruct->MPIStatus->MPI_TAG = tag ;
212 mpiaccessstruct->MPIStatus->MPI_ERROR = MPI_SUCCESS ;
214 if ( _request == _max_request )
215 _request = _base_request ;
217 _map_of_request_struct[_request] = mpiaccessstruct ;
218 if ( fromsourcerank )
219 _recv_request[ destsourcerank ] = _request;
221 _send_request[ destsourcerank ] = _request;
223 cout << "NewRequest" << _my_rank << "( " << _request << " ) "
224 << mpiaccessstruct << endl ;
229 MPIAccess and "tags" (or "MPITags") :
230 =====================================
232 . The constructor give the possibility to choose an interval of
233 tags to use : [BaseTag , MaxTag].
234 The default is [ 0 , MPI_TAG_UB], MPI_TAG_UB being the maximum
235 value in an implementation of MPI (minimum 32767 = 2**15-1).
236 On awa with the implementation lam MPI_TAG_UB value is
237 7353944. The norm MPI specify that value is the same in all
238 processes started by mpirun.
239 In the case of the use of the same IntraCommunicator in a process
240 for several distinct data flows (or for several IntraCommunicators
241 with common processes), that permits to avoid ambiguity
244 . In MPIAccess the tags have two parts (#define MODULO_TAG 10) :
245 + The last decimal digit decimal correspond to MPI_DataType ( 1 for
246 TimeMessages, 2 for MPI_INT and 3 for MPI_DOUBLE)
247 + The value of other digits correspond to a circular number for each
249 + A TimeMessage and the associated DataMessage have the same number
250 (but the types are different and the tags also).
252 . For a Send of a message from a process "source" to a process
253 "target", we have _send_MPI_tag[target] in the process
254 source (it contains the last "tag" used for the Send of a
255 message to the process target).
256 And in the process "target" which receive that message, we have
257 _recv_MPI_Tag[source] (it contains the last "tag" used for the Recv
258 of messages from the process source).
259 Naturally in the MPI norm the values of that tags must be the same.
261 int MPIAccess::newSendTag( MPI_Datatype datatype, int destrank , int method ,
262 bool asynchronous, int &RequestId )
265 tag = incrTag( _send_MPI_tag[destrank] ) ;
266 tag = valTag( tag, method ) ;
267 _send_MPI_tag[ destrank ] = tag ;
268 RequestId = newRequest( datatype, tag, destrank , false , asynchronous ) ;
269 _send_request[ destrank ] = RequestId ;
270 _send_requests[ destrank ].push_back( RequestId ) ;
274 int MPIAccess::newRecvTag( MPI_Datatype datatype, int sourcerank , int method ,
275 bool asynchronous, int &RequestId )
278 tag = incrTag( _recv_MPI_Tag[sourcerank] ) ;
279 tag = valTag( tag, method ) ;
280 _recv_MPI_Tag[ sourcerank ] = tag ;
281 RequestId = newRequest( datatype, tag , sourcerank , true , asynchronous ) ;
282 _recv_request[ sourcerank ] = RequestId ;
283 _recv_requests[ sourcerank ].push_back( RequestId ) ;
287 // Returns the number of all SendRequestIds that may be used to allocate
288 // ArrayOfSendRequests for the call to SendRequestIds
289 int MPIAccess::sendRequestIdsSize()
292 for (int i = 0 ; i < _processor_group_size ; i++ )
293 size += _send_requests[ i ].size() ;
297 // Returns in ArrayOfSendRequests with the dimension "size" all the
299 int MPIAccess::sendRequestIds(int size, int *ArrayOfSendRequests)
303 for ( destrank = 0 ; destrank < _processor_group_size ; destrank++ )
305 list< int >::const_iterator iter ;
306 for (iter = _send_requests[ destrank ].begin() ; iter != _send_requests[destrank].end() ; iter++ )
307 ArrayOfSendRequests[i++] = *iter ;
312 // Returns the number of all RecvRequestIds that may be used to allocate
313 // ArrayOfRecvRequests for the call to RecvRequestIds
314 int MPIAccess::recvRequestIdsSize()
317 for (int i = 0 ; i < _processor_group_size ; i++ )
318 size += _recv_requests[ i ].size() ;
322 // Returns in ArrayOfRecvRequests with the dimension "size" all the
324 int MPIAccess::recvRequestIds(int size, int *ArrayOfRecvRequests)
328 for ( sourcerank = 0 ; sourcerank < _processor_group_size ; sourcerank++ )
330 list< int >::const_iterator iter ;
331 for (iter = _recv_requests[ sourcerank ].begin() ; iter != _recv_requests[sourcerank].end() ; iter++ )
332 ArrayOfRecvRequests[i++] = *iter ;
337 // Returns in ArrayOfSendRequests with the dimension "size" all the
338 // SendRequestIds to a destination rank
339 int MPIAccess::sendRequestIds(int destrank, int size, int *ArrayOfSendRequests)
341 if (size < (int)_send_requests[destrank].size() )
342 throw INTERP_KERNEL::Exception("wrong call to MPIAccess::SendRequestIds");
344 list< int >::const_iterator iter ;
345 for (iter = _send_requests[ destrank ].begin() ; iter != _send_requests[destrank].end() ; iter++ )
346 ArrayOfSendRequests[i++] = *iter ;
347 return _send_requests[destrank].size() ;
350 // Returns in ArrayOfRecvRequests with the dimension "size" all the
351 // RecvRequestIds from a sourcerank
352 int MPIAccess::recvRequestIds(int sourcerank, int size, int *ArrayOfRecvRequests)
354 if (size < (int)_recv_requests[sourcerank].size() )
355 throw INTERP_KERNEL::Exception("wrong call to MPIAccess::RecvRequestIds");
357 list< int >::const_iterator iter ;
358 _recv_requests[ sourcerank ] ;
359 for (iter = _recv_requests[ sourcerank ].begin() ; iter != _recv_requests[sourcerank].end() ; iter++ )
360 ArrayOfRecvRequests[i++] = *iter ;
361 return _recv_requests[sourcerank].size() ;
364 // Send in synchronous mode count values of type datatype from buffer to target
365 // (returns RequestId identifier even if the corresponding structure is deleted :
366 // it is only in order to have the same signature as the asynchronous mode)
367 int MPIAccess::send(void* buffer, int count, MPI_Datatype datatype, int target, int &RequestId)
369 int sts = MPI_SUCCESS ;
373 _MessageIdent aMethodIdent = methodId( datatype ) ;
374 int MPItag = newSendTag( datatype, target , aMethodIdent , false , RequestId ) ;
375 if ( aMethodIdent == _message_time )
377 TimeMessage *aTimeMsg = (TimeMessage *) buffer ;
378 aTimeMsg->tag = MPItag ;
380 deleteRequest( RequestId ) ;
381 sts = _comm_interface.send(buffer, count, datatype, target, MPItag,
382 *_intra_communicator ) ;
384 cout << "MPIAccess::Send" << _my_rank << " SendRequestId "
385 << RequestId << " count " << count << " target " << target
386 << " MPItag " << MPItag << endl ;
391 // Receive (read) in synchronous mode count values of type datatype in buffer from source
392 // (returns RequestId identifier even if the corresponding structure is deleted :
393 // it is only in order to have the same signature as the asynchronous mode)
394 // The output argument OutCount is optionnal : *OutCount <= count
395 int MPIAccess::recv(void* buffer, int count, MPI_Datatype datatype, int source, int &RequestId, int *OutCount)
397 int sts = MPI_SUCCESS ;
399 if ( OutCount != NULL )
403 _MessageIdent aMethodIdent = methodId( datatype ) ;
404 int MPItag = newRecvTag( datatype, source , aMethodIdent , false , RequestId ) ;
405 sts = _comm_interface.recv(buffer, count, datatype, source, MPItag,
406 *_intra_communicator , MPIStatus( RequestId ) ) ;
408 if ( sts == MPI_SUCCESS )
410 MPI_Datatype datatype = MPIDatatype( RequestId ) ;
411 _comm_interface.getCount(MPIStatus( RequestId ), datatype, &outcount ) ;
412 setMPIOutCount( RequestId , outcount ) ;
413 setMPICompleted( RequestId , true ) ;
414 deleteStatus( RequestId ) ;
416 if ( OutCount != NULL )
417 *OutCount = outcount ;
419 cout << "MPIAccess::Recv" << _my_rank << " RecvRequestId "
420 << RequestId << " count " << count << " source " << source
421 << " MPItag " << MPItag << endl ;
422 deleteRequest( RequestId ) ;
427 // Send in asynchronous mode count values of type datatype from buffer to target
428 // Returns RequestId identifier.
429 int MPIAccess::ISend(void* buffer, int count, MPI_Datatype datatype, int target, int &RequestId)
431 int sts = MPI_SUCCESS ;
435 _MessageIdent aMethodIdent = methodId( datatype ) ;
436 int MPItag = newSendTag( datatype, target , aMethodIdent , true , RequestId ) ;
437 if ( aMethodIdent == _message_time )
439 TimeMessage *aTimeMsg = (TimeMessage *) buffer ;
440 aTimeMsg->tag = MPItag ;
442 MPI_Request *aSendRequest = MPIRequest( RequestId ) ;
445 cout << "MPIAccess::ISend" << _my_rank << " ISendRequestId "
446 << RequestId << " count " << count << " target " << target
447 << " MPItag " << MPItag << endl ;
449 cout << "MPIAccess::ISend" << _my_rank << " time "
450 << ((TimeMessage *)buffer)->time << " " << ((TimeMessage *)buffer)->deltatime
453 sts = _comm_interface.Isend(buffer, count, datatype, target, MPItag,
454 *_intra_communicator , aSendRequest) ;
459 // Receive (read) in asynchronous mode count values of type datatype in buffer from source
460 // returns RequestId identifier.
461 int MPIAccess::IRecv(void* buffer, int count, MPI_Datatype datatype, int source, int &RequestId)
463 int sts = MPI_SUCCESS ;
467 _MessageIdent aMethodIdent = methodId( datatype ) ;
468 int MPItag = newRecvTag( datatype, source , aMethodIdent , true , RequestId ) ;
469 MPI_Request *aRecvRequest = MPIRequest( RequestId ) ;
472 cout << "MPIAccess::IRecv" << _my_rank << " IRecvRequestId "
473 << RequestId << " count " << count << " source " << source
474 << " MPItag " << MPItag << endl ;
476 cout << "MPIAccess::ISend" << _my_rank << " time "
477 << ((TimeMessage *)buffer)->time << " " << ((TimeMessage *)buffer)->deltatime
480 sts = _comm_interface.Irecv(buffer, count, datatype, source, MPItag,
481 *_intra_communicator , aRecvRequest) ;
486 // Perform a Send and a Recv in synchronous mode
487 int MPIAccess::sendRecv(void* sendbuf, int sendcount, MPI_Datatype sendtype,
488 int dest, int &SendRequestId,
489 void* recvbuf, int recvcount, MPI_Datatype recvtype,
490 int source, int &RecvRequestId, int *OutCount)
492 int sts = MPI_SUCCESS ;
496 sts = IRecv(recvbuf, recvcount, recvtype, source, RecvRequestId) ;
499 cout << "MPIAccess::SendRecv" << _my_rank << " IRecv RecvRequestId "
500 << RecvRequestId << endl ;
501 if ( sts == MPI_SUCCESS )
504 sts = send(sendbuf, sendcount, sendtype, dest, SendRequestId) ;
506 cout << "MPIAccess::SendRecv" << _my_rank << " Send SendRequestId "
507 << SendRequestId << endl ;
508 if ( sts == MPI_SUCCESS && recvcount )
510 sts = wait( RecvRequestId ) ;
511 outcount = MPIOutCount( RecvRequestId ) ;
513 cout << "MPIAccess::SendRecv" << _my_rank << " IRecv RecvRequestId "
514 << RecvRequestId << " outcount " << outcount << endl ;
517 if ( OutCount != NULL )
519 *OutCount = outcount ;
521 cout << "MPIAccess::SendRecv" << _my_rank << " *OutCount = " << *OutCount
524 deleteRequest( RecvRequestId ) ;
528 // Perform a Send and a Recv in asynchronous mode
529 int MPIAccess::ISendRecv(void* sendbuf, int sendcount, MPI_Datatype sendtype,
530 int dest, int &SendRequestId,
531 void* recvbuf, int recvcount, MPI_Datatype recvtype,
532 int source, int &RecvRequestId)
534 int sts = MPI_SUCCESS ;
538 sts = IRecv(recvbuf, recvcount, recvtype, source, RecvRequestId) ;
539 if ( sts == MPI_SUCCESS )
541 sts = ISend(sendbuf, sendcount, sendtype, dest, SendRequestId) ;
545 // Perform a wait of a Send or Recv asynchronous Request
546 // Do nothing for a synchronous Request
547 // Manage MPI_Request * and MPI_Status * structure
548 int MPIAccess::wait( int RequestId )
550 int status = MPI_SUCCESS ;
551 if ( !MPICompleted( RequestId ) )
553 if ( *MPIRequest( RequestId ) != MPI_REQUEST_NULL )
556 cout << "MPIAccess::Wait" << _my_rank << " -> wait( " << RequestId
557 << " ) MPIRequest " << MPIRequest( RequestId ) << " MPIStatus "
558 << MPIStatus( RequestId ) << " MPITag " << MPITag( RequestId )
559 << " MPIIsRecv " << MPIIsRecv( RequestId ) << endl ;
560 status = _comm_interface.wait(MPIRequest( RequestId ), MPIStatus( RequestId )) ;
565 cout << "MPIAccess::Wait" << _my_rank << " MPIRequest == MPI_REQUEST_NULL"
568 setMPICompleted( RequestId , true ) ;
569 if ( MPIIsRecv( RequestId ) && MPIStatus( RequestId ) )
571 MPI_Datatype datatype = MPIDatatype( RequestId ) ;
573 status = _comm_interface.getCount(MPIStatus( RequestId ), datatype,
575 if ( status == MPI_SUCCESS )
577 setMPIOutCount( RequestId , outcount ) ;
578 deleteStatus( RequestId ) ;
580 cout << "MPIAccess::Wait" << _my_rank << " RequestId " << RequestId
581 << "MPIIsRecv " << MPIIsRecv( RequestId ) << " outcount " << outcount
587 cout << "MPIAccess::Wait" << _my_rank << " MPIIsRecv "
588 << MPIIsRecv( RequestId ) << " outcount " << outcount << endl ;
594 cout << "MPIAccess::Wait" << _my_rank << " MPIIsRecv " << MPIIsRecv( RequestId )
595 << " MPIOutCount " << MPIOutCount( RequestId ) << endl ;
599 cout << "MPIAccess::Wait" << _my_rank << " RequestId " << RequestId
600 << " Request " << MPIRequest( RequestId )
601 << " Status " << MPIStatus( RequestId ) << " MPICompleted "
602 << MPICompleted( RequestId ) << " MPIOutCount " << MPIOutCount( RequestId )
607 // Perform a "test" of a Send or Recv asynchronous Request
608 // If the request is done, returns true in the flag argument
609 // If the request is not finished, returns false in the flag argument
610 // Do nothing for a synchronous Request
611 // Manage MPI_request * and MPI_status * structure
612 int MPIAccess::test(int RequestId, int &flag)
614 int status = MPI_SUCCESS ;
615 flag = MPICompleted( RequestId ) ;
617 cout << "MPIAccess::Test" << _my_rank << " flag " << flag ;
618 if ( MPIIsRecv( RequestId ) )
629 cout << "Request" << RequestId << " " << MPIRequest( RequestId )
630 << " Status " << MPIStatus( RequestId ) << endl ;
633 if ( *MPIRequest( RequestId ) != MPI_REQUEST_NULL )
636 cout << "MPIAccess::Test" << _my_rank << " -> test( " << RequestId
637 << " ) MPIRequest " << MPIRequest( RequestId )
638 << " MPIStatus " << MPIStatus( RequestId )
639 << " MPITag " << MPITag( RequestId )
640 << " MPIIsRecv " << MPIIsRecv( RequestId ) << endl ;
641 status = _comm_interface.test(MPIRequest( RequestId ), &flag,
642 MPIStatus( RequestId )) ;
647 cout << "MPIAccess::Test" << _my_rank << " MPIRequest == MPI_REQUEST_NULL"
652 setMPICompleted( RequestId , true ) ;
653 if ( MPIIsRecv( RequestId ) && MPIStatus( RequestId ) )
656 MPI_Datatype datatype = MPIDatatype( RequestId ) ;
657 status = _comm_interface.getCount( MPIStatus( RequestId ), datatype,
659 if ( status == MPI_SUCCESS )
661 setMPIOutCount( RequestId , outcount ) ;
662 deleteStatus( RequestId ) ;
664 cout << "MPIAccess::Test" << _my_rank << " MPIIsRecv "
665 << MPIIsRecv( RequestId ) << " outcount " << outcount << endl ;
670 cout << "MPIAccess::Test" << _my_rank << " MPIIsRecv "
671 << MPIIsRecv( RequestId ) << " outcount " << outcount << endl ;
677 cout << "MPIAccess::Test" << _my_rank << " MPIIsRecv "
678 << MPIIsRecv( RequestId ) << " MPIOutCount "
679 << MPIOutCount( RequestId ) << endl ;
684 cout << "MPIAccess::Test" << _my_rank << " RequestId " << RequestId
685 << " flag " << flag << " MPICompleted " << MPICompleted( RequestId )
686 << " MPIOutCount " << MPIOutCount( RequestId ) << endl ;
690 int MPIAccess::waitAny(int count, int *array_of_RequestIds, int &RequestId)
692 int status = MPI_ERR_OTHER ;
694 cout << "MPIAccess::WaitAny not yet implemented" << endl ;
698 int MPIAccess::testAny(int count, int *array_of_RequestIds, int &RequestId, int &flag)
700 int status = MPI_ERR_OTHER ;
703 cout << "MPIAccess::TestAny not yet implemented" << endl ;
707 // Perform a wait of each Send or Recv asynchronous Request of the array
708 // array_of_RequestIds of size "count".
709 // That array may be filled with a call to SendRequestIdsSize or RecvRequestIdsSize
710 // Do nothing for a synchronous Request
711 // Manage MPI_Request * and MPI_Status * structure
712 int MPIAccess::waitAll(int count, int *array_of_RequestIds)
715 cout << "WaitAll" << _my_rank << " : count " << count << endl ;
717 int retstatus = MPI_SUCCESS ;
719 for ( i = 0 ; i < count ; i++ )
722 cout << "WaitAll" << _my_rank << " " << i << " -> Wait( "
723 << array_of_RequestIds[i] << " )" << endl ;
724 status = wait( array_of_RequestIds[i] ) ;
725 if ( status != MPI_SUCCESS )
729 cout << "EndWaitAll" << _my_rank << endl ;
733 // Perform a "test" of each Send or Recv asynchronous Request of the array
734 // array_of_RequestIds of size "count".
735 // That array may be filled with a call to SendRequestIdsSize or RecvRequestIdsSize
736 // If all requests are done, returns true in the flag argument
737 // If all requests are not finished, returns false in the flag argument
738 // Do nothing for a synchronous Request
739 // Manage MPI_Request * and MPI_Status * structure
740 int MPIAccess::testAll(int count, int *array_of_RequestIds, int &flag)
743 cout << "TestAll" << _my_rank << " : count " << count << endl ;
745 int retstatus = MPI_SUCCESS ;
746 bool retflag = true ;
748 for ( i = 0 ; i < count ; i++ )
750 status = test( array_of_RequestIds[i] , flag ) ;
751 retflag = retflag && (flag != 0) ;
752 if ( status != MPI_SUCCESS )
757 cout << "EndTestAll" << _my_rank << endl ;
761 int MPIAccess::waitSome(int count, int *array_of_RequestIds, int outcount,
762 int *outarray_of_RequestIds)
764 int status = MPI_ERR_OTHER ;
765 cout << "MPIAccess::WaitSome not yet implemented" << endl ;
769 int MPIAccess::testSome(int count, int *array_of_RequestIds, int outcounts,
770 int *outarray_of_RequestIds)
772 int status = MPI_ERR_OTHER ;
773 cout << "MPIAccess::TestSome not yet implemented" << endl ;
777 // Probe checks if a message is available for read from FromSource rank.
778 // Returns the corresponding source, MPITag, datatype and outcount
779 // Probe is a blocking call which wait until a message is available
780 int MPIAccess::probe(int FromSource, int &source, int &MPITag,
781 MPI_Datatype &myDatatype, int &outcount)
783 MPI_Status aMPIStatus ;
784 int sts = _comm_interface.probe( FromSource, MPI_ANY_TAG,
785 *_intra_communicator , &aMPIStatus ) ;
786 if ( sts == MPI_SUCCESS )
788 source = aMPIStatus.MPI_SOURCE ;
789 MPITag = aMPIStatus.MPI_TAG ;
790 int MethodId = (MPITag % MODULO_TAG) ;
791 myDatatype = datatype( (MEDCoupling::_MessageIdent) MethodId ) ;
792 _comm_interface.getCount(&aMPIStatus, myDatatype, &outcount ) ;
794 cout << "MPIAccess::Probe" << _my_rank << " FromSource " << FromSource
795 << " source " << source << " MPITag " << MPITag << " MethodId "
796 << MethodId << " datatype " << myDatatype << " outcount " << outcount
809 // IProbe checks if a message is available for read from FromSource rank.
810 // If there is a message available, returns the corresponding source,
811 // MPITag, datatype and outcount with flag = true
812 // If not, returns flag = false
813 int MPIAccess::IProbe(int FromSource, int &source, int &MPITag,
814 MPI_Datatype &myDataType, int &outcount, int &flag)
816 MPI_Status aMPIStatus ;
817 int sts = _comm_interface.Iprobe( FromSource, MPI_ANY_TAG,
818 *_intra_communicator , &flag,
820 if ( sts == MPI_SUCCESS && flag )
822 source = aMPIStatus.MPI_SOURCE ;
823 MPITag = aMPIStatus.MPI_TAG ;
824 int MethodId = (MPITag % MODULO_TAG) ;
825 myDataType = datatype( (MEDCoupling::_MessageIdent) MethodId ) ;
826 _comm_interface.getCount(&aMPIStatus, myDataType, &outcount ) ;
828 cout << "MPIAccess::IProbe" << _my_rank << " FromSource " << FromSource
829 << " source " << source << " MPITag " << MPITag << " MethodId "
830 << MethodId << " datatype " << myDataType << " outcount " << outcount
831 << " flag " << flag << endl ;
843 // Cancel concerns a "posted" asynchronous IRecv
844 // Returns flag = true if the receiving request was successfully canceled
845 // Returns flag = false if the receiving request was finished but not canceled
846 // Use cancel, wait and test_cancelled of the MPI API
847 int MPIAccess::cancel( int RecvRequestId, int &flag )
850 int sts = _comm_interface.cancel( MPIRequest( RecvRequestId ) ) ;
851 if ( sts == MPI_SUCCESS )
853 sts = _comm_interface.wait( MPIRequest( RecvRequestId ) ,
854 MPIStatus( RecvRequestId ) ) ;
855 if ( sts == MPI_SUCCESS )
856 sts = _comm_interface.testCancelled( MPIStatus( RecvRequestId ) , &flag ) ;
861 // Cancel concerns a "pending" receiving message (without IRecv "posted")
862 // Returns flag = true if the message was successfully canceled
863 // Returns flag = false if the receiving request was finished but not canceled
864 // Use Irecv, cancel, wait and test_cancelled of the MPI API
865 int MPIAccess::cancel( int source, int theMPITag, MPI_Datatype datatype, int outcount, int &flag )
868 MPI_Aint extent, lbound ;
870 sts = MPI_Type_get_extent( datatype , &lbound, &extent ) ;
871 if ( sts == MPI_SUCCESS )
873 void * recvbuf = malloc( extent*outcount ) ;
874 MPI_Request aRecvRequest ;
876 cout << "MPIAccess::Cancel" << _my_rank << " Irecv extent " << extent
877 << " datatype " << datatype << " source " << source << " theMPITag "
878 << theMPITag << endl ;
879 sts = _comm_interface.Irecv( recvbuf, outcount, datatype, source, theMPITag,
880 *_intra_communicator , &aRecvRequest ) ;
881 if ( sts == MPI_SUCCESS )
883 sts = _comm_interface.cancel( &aRecvRequest ) ;
885 cout << "MPIAccess::Cancel" << _my_rank << " theMPITag " << theMPITag
886 << " cancel done" << endl ;
887 if ( sts == MPI_SUCCESS )
891 cout << "MPIAccess::Cancel" << _my_rank << " wait" << endl ;
892 sts = _comm_interface.wait( &aRecvRequest , &aStatus ) ;
893 if ( sts == MPI_SUCCESS )
896 cout << "MPIAccess::Cancel" << _my_rank << " test_cancelled" << endl ;
897 sts = _comm_interface.testCancelled( &aStatus , &flag ) ;
901 if ( _trace && datatype == timeType() )
902 cout << "MPIAccess::Cancel" << _my_rank << " time "
903 << ((TimeMessage *) recvbuf)->time << " "
904 << ((TimeMessage *) recvbuf)->deltatime << endl ;
908 cout << "MPIAccess::Cancel" << _my_rank << " flag " << flag << endl ;
913 // CancelAll concerns all "pending" receiving message (without IRecv "posted")
914 // CancelAll use IProbe and Cancel (see obove)
915 int MPIAccess::cancelAll()
917 int sts = MPI_SUCCESS ;
921 MPI_Datatype datatype ;
924 for ( target = 0 ; target < _processor_group_size ; target++ )
926 sts = IProbe(target, source, MPITag, datatype, outcount, flag) ;
927 if ( sts == MPI_SUCCESS && flag )
929 sts = cancel(source, MPITag, datatype, outcount, flag) ;
931 cout << "MPIAccess::CancelAll" << _my_rank << " source " << source
932 << " MPITag " << MPITag << " datatype " << datatype
933 << " outcount " << outcount << " Cancel flag " << flag << endl ;
934 if ( sts != MPI_SUCCESS )
937 else if ( sts != MPI_SUCCESS )
943 // Same as barrier of MPI API
944 int MPIAccess::barrier()
946 int status = _comm_interface.barrier( *_intra_communicator ) ;
950 // Same as Error_string of MPI API
951 int MPIAccess::errorString(int errorcode, char *string, int *resultlen) const
953 return _comm_interface.errorString( errorcode, string, resultlen) ;
956 // Returns source, tag, error and outcount corresponding to receiving RequestId
957 // By default the corresponding structure of RequestId is deleted
958 int MPIAccess::status(int RequestId, int &source, int &tag, int &error,
959 int &outcount, bool keepRequestStruct)
961 MPI_Status *myStatus = MPIStatus( RequestId ) ;
963 cout << "MPIAccess::status" << _my_rank << " RequestId " << RequestId
964 << " status " << myStatus << endl ;
965 if ( myStatus != NULL && MPIAsynchronous( RequestId ) &&
966 MPICompleted( RequestId ) )
968 if ( MPIIsRecv( RequestId ) )
970 source = myStatus->MPI_SOURCE ;
971 tag = myStatus->MPI_TAG ;
972 error = myStatus->MPI_ERROR ;
973 MPI_Datatype datatype = MPIDatatype( RequestId ) ;
974 _comm_interface.getCount(myStatus, datatype, &outcount ) ;
976 cout << "MPIAccess::status" << _my_rank << " RequestId " << RequestId
977 << " status " << myStatus << " outcount " << outcount << endl ;
978 setMPIOutCount( RequestId , outcount ) ;
982 source = MPITarget( RequestId ) ;
983 tag = MPITag( RequestId ) ;
985 outcount = MPIOutCount( RequestId ) ;
987 if ( !keepRequestStruct )
988 deleteRequest( RequestId ) ;
993 source = MPITarget( RequestId ) ;
994 tag = MPITag( RequestId ) ;
996 outcount = MPIOutCount( RequestId ) ;
1001 int MPIAccess::requestFree( MPI_Request *request )
1003 return _comm_interface.requestFree( request ) ;
1006 // Print all informations of all known requests for debugging purpose
1007 void MPIAccess::check() const
1010 map< int , RequestStruct * >::const_iterator MapOfRequestStructiterator ;
1011 cout << "MPIAccess::Check" << _my_rank << "_map_of_request_struct_size "
1012 << _map_of_request_struct.size() << endl ;
1013 for ( MapOfRequestStructiterator = _map_of_request_struct.begin() ;
1014 MapOfRequestStructiterator != _map_of_request_struct.end() ;
1015 MapOfRequestStructiterator++ )
1017 if ( MapOfRequestStructiterator->second != NULL )
1019 cout << " Check" << _my_rank << " " << i << ". Request"
1020 << MapOfRequestStructiterator->first << "-->" ;
1021 if ( (MapOfRequestStructiterator->second)->MPIAsynchronous )
1023 if ( (MapOfRequestStructiterator->second)->MPIIsRecv )
1024 cout << "Recv from " ;
1026 cout << "Send to " ;
1027 cout << (MapOfRequestStructiterator->second)->MPITarget
1028 << " MPITag " << (MapOfRequestStructiterator->second)->MPITag
1029 << " DataType " << (MapOfRequestStructiterator->second)->MPIDatatype
1030 << " Request " << (MapOfRequestStructiterator->second)->MPIRequest
1031 << " Status " << (MapOfRequestStructiterator->second)->MPIStatus
1032 << " Completed " << (MapOfRequestStructiterator->second)->MPICompleted
1039 // Returns the MPI size of a TimeMessage
1040 MPI_Aint MPIAccess::timeExtent() const
1042 MPI_Aint aextent, lbound ;
1043 MPI_Type_get_extent( _MPI_TIME , &lbound, &aextent ) ;
1047 // Returns the MPI size of a MPI_INT
1048 MPI_Aint MPIAccess::intExtent() const
1050 MPI_Aint aextent, lbound ;
1051 MPI_Type_get_extent( MPI_INT , &lbound, &aextent ) ;
1055 // Returns the MPI size of a MPI_DOUBLE
1056 MPI_Aint MPIAccess::doubleExtent() const
1058 MPI_Aint aextent, lbound ;
1059 MPI_Type_get_extent( MPI_DOUBLE , &lbound, &aextent ) ;
1063 // Outputs fields of a TimeMessage structure
1064 ostream & operator<< (ostream & f ,const TimeMessage & aTimeMsg )
1066 f << " time " << aTimeMsg.time << " deltatime " << aTimeMsg.deltatime
1067 << " tag " << aTimeMsg.tag ;
1071 // Outputs the DataType coded in a Tag
1072 ostream & operator<< (ostream & f ,const _MessageIdent & methodtype )
1076 case _message_time :
1077 f << " MethodTime ";
1082 case _message_double :
1083 f << " MPI_DOUBLE ";
1086 f << " UnknownMethodType ";