Salome HOME
9c33dc35201586405c2bfc8599ab78bfe268bba3
[tools/medcoupling.git] / src / ParaMEDMEM / MPIAccess.cxx
1 //  Copyright (C) 2007-2008  CEA/DEN, EDF R&D
2 //
3 //  This library is free software; you can redistribute it and/or
4 //  modify it under the terms of the GNU Lesser General Public
5 //  License as published by the Free Software Foundation; either
6 //  version 2.1 of the License.
7 //
8 //  This library is distributed in the hope that it will be useful,
9 //  but WITHOUT ANY WARRANTY; without even the implied warranty of
10 //  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
11 //  Lesser General Public License for more details.
12 //
13 //  You should have received a copy of the GNU Lesser General Public
14 //  License along with this library; if not, write to the Free Software
15 //  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
16 //
17 //  See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
18 //
19 #include "MPIAccess.hxx"
20 #include "InterpolationUtils.hxx"
21
22 #include <iostream>
23
24 using namespace std;
25
26 namespace ParaMEDMEM
27 {
28   /*! \defgroup mpi_access MPIAccess
29     Class \a MPIAccess is the gateway to the MPI library.
30     It is a helper class that gathers the calls to the MPI
31     library that are made in the ParaMEDMEM library. This gathering
32     allows easier gathering of information about the communication
33     in the library. With MPIAccess, tags are managed automatically
34     and asynchronous operations are easier.
35
36     It is typically called after the MPI_Init() call in a program. It is afterwards passed as a parameter to the constructors of ParaMEDMEM objects so that they access the MPI library via the MPIAccess.
37
38     As an example, the following code initializes a processor group made of the zero processor.
39
40     \verbatim
41     #include "MPIAccess.hxx"
42     #include "ProcessorGroup.hxx"
43
44     int main(int argc, char** argv)
45     {
46     //initialization
47     MPI_Init(&argc, &argv);
48     ParaMEDMEM::CommInterface comm_interface;
49
50     //setting up a processor group with proc 0
51     set<int> procs;
52     procs.insert(0);
53     ParaMEDMEM::ProcessorGroup group(procs, comm_interface);
54
55     ParaMEDMEM::MPIAccess mpi_access(group);
56
57     //cleanup
58     MPI_Finalize();
59     }
60     \endverbatim
61   */
62
63
64   /*! Creates a MPIAccess that is based on the processors included in \a ProcessorGroup.
65     This class may be called for easier use of MPI API.
66
67     \param ProcessorGroup MPIProcessorGroup object giving access to group management
68     \param BaseTag and MaxTag define the range of tags to be used.
69     Tags are managed by MPIAccess. They are cyclically incremented.
70     When there is a Send or a Receive operation there is a new RequestId tag returned
71     to the caller. That RequestId may be used to manage the operation Wait, Check of
72     status etc... The MPITag internally managed by MPIAccess is used as "tag" argument
73     in MPI call.
74   */
75
76   MPIAccess::MPIAccess(MPIProcessorGroup * ProcessorGroup, int BaseTag, int MaxTag) :
77     _comm_interface( ProcessorGroup->getCommInterface() ) ,
78     _intra_communicator( ProcessorGroup->getComm() )
79   {
80     int mpitagub ;
81     int flag ;
82     //MPI_Attr_get does not run with _IntraCommunicator ???
83     //MPI_Attr_get(*_IntraCommunicator,MPI_TAG_UB,&mpitagub,&flag) ;
84     MPI_Attr_get(MPI_COMM_WORLD,MPI_TAG_UB,&mpitagub,&flag) ;
85     mpitagub=abs(mpitagub);
86     if ( BaseTag != 0 )
87       BaseTag = (BaseTag/MODULO_TAG)*MODULO_TAG ;
88     if ( MaxTag == 0 )
89       MaxTag = (mpitagub/MODULO_TAG-1)*MODULO_TAG ;
90     MPI_Comm_rank( *_intra_communicator, &_my_rank ) ;
91     cout << "MPIAccess::MPIAccess" << _my_rank << " this " << this << " BaseTag " << BaseTag
92          << " MaxTag " << MaxTag << " mpitagub " << mpitagub << " (minimum 32767) "
93          << " flag " << flag << endl ;
94     if ( !flag | (BaseTag < 0) | (BaseTag >= MaxTag) | (MaxTag > mpitagub) )
95       throw INTERP_KERNEL::Exception("wrong call to MPIAccess constructor");
96
97     _processor_group = ProcessorGroup ;
98     _processor_group_size = _processor_group->size() ;
99     _trace = false ;
100     
101     cout << "MPIAccess::MPIAccess" << _my_rank << " _processor_group_size "
102          << _processor_group_size << endl ;
103     
104     _base_request = -1 ;
105     _max_request = std::numeric_limits<int>::max() ;
106     _request = _base_request ;
107     
108     _base_MPI_tag = BaseTag ;
109     _max_MPI_tag = MaxTag ;
110     
111     _send_request = new int[ _processor_group_size ] ;
112     _recv_request = new int[ _processor_group_size ] ;
113
114     _send_requests.resize( _processor_group_size ) ;
115     _recv_requests.resize( _processor_group_size ) ;
116
117     _send_MPI_tag = new int[ _processor_group_size ] ;
118     _recv_MPI_Tag = new int[ _processor_group_size ] ;
119     int i ;
120     for (i = 0 ; i < _processor_group_size ; i++ )
121       {
122         _send_request[ i ] = _max_request ;
123         _recv_request[ i ] = _max_request ;
124         _send_requests[ i ].resize(0) ;
125         _recv_requests[ i ].resize(0) ;
126         _send_MPI_tag[ i ] = _max_MPI_tag ;
127         _recv_MPI_Tag[ i ] = _max_MPI_tag ;
128       }
129     MPI_Datatype array_of_types[3] ;
130     array_of_types[0] = MPI_DOUBLE ;
131     array_of_types[1] = MPI_DOUBLE ;
132     array_of_types[2] = MPI_INT ;
133     int array_of_blocklengths[3] ;
134     array_of_blocklengths[0] = 1 ;
135     array_of_blocklengths[1] = 1 ;
136     array_of_blocklengths[2] = 1 ;
137     MPI_Aint array_of_displacements[3] ;
138     array_of_displacements[0] = 0 ;
139     array_of_displacements[1] = sizeof(double) ;
140     array_of_displacements[2] = 2*sizeof(double) ;
141     MPI_Type_struct(3, array_of_blocklengths, array_of_displacements,
142                     array_of_types, &_MPI_TIME) ;
143     MPI_Type_commit(&_MPI_TIME) ;
144   }
145
146   MPIAccess::~MPIAccess()
147   {
148     cout << "MPIAccess::~MPIAccess" << _my_rank << " this " << this << endl ;
149     delete [] _send_request ;
150     delete [] _recv_request ;
151     delete [] _send_MPI_tag ;
152     delete [] _recv_MPI_Tag ;
153     MPI_Type_free(&_MPI_TIME) ;
154     cout << "End of MPIAccess::~MPIAccess" << _my_rank << " this " << this << endl ;
155   }
156
157   /*
158     MPIAccess and "RequestIds" :
159     ============================
160
161     . WARNING : In the specification document, the distinction
162     between "MPITags" and "RequestIds" is not clear. "MPITags"
163     are arguments of calls to MPI. "RequestIds" does not concern
164     calls to MPI. "RequestIds" are named "tag"as arguments in/out
165     in the MPIAccess API in the specification documentation.
166     But in the implementation we have the right name RequestId (or
167     RecvRequestId/SendRequestId).
168
169     . When we have a MPI write/read request via MPIAccess, we get
170     an identifier "RequestId".
171     That identifier matches a  structure RequestStruct of
172     MPIAccess. The access to that structure is done with the map
173     "_MapOfRequestStruct".
174     That structure RequestStruct give the possibility to manage
175     the structures MPI_Request and MPI_Status * of MPI. It give
176     also the possibility to get informations about that request :
177     target, send/recv, tag, [a]synchronous, type, outcount.
178
179     . That identifier is used to control an asynchronous request
180     via MPIAccess : Wait, Test, Probe, etc...
181
182     . In practise "RequestId" is simply an integer fo the interval
183     [0 , 2**32-1]. There is only one such a cyclic for
184     [I]Sends and [I]Recvs.
185
186     . That "RequestIds" and their associated structures give an easy
187     way to manage asynchronous communications.
188     For example we have mpi_access->Wait( int RequestId ) instead of
189     MPI_Wait(MPI_Request *request, MPI_Status *status).
190
191     . The API of MPIAccess may give the "SendRequestIds" of a "target",
192     the "RecvRequestIds" from a "source" or the "SendRequestIds" of
193     all "targets" or the "RecvRequestIds" of all "sources".
194     That avoid to manage them in Presentation-ParaMEDMEM.
195   */
196
197   int MPIAccess::newRequest( MPI_Datatype datatype, int tag , int destsourcerank ,
198                              bool fromsourcerank , bool asynchronous )
199   {
200     RequestStruct *mpiaccessstruct = new RequestStruct;
201     mpiaccessstruct->MPITag = tag ;
202     mpiaccessstruct->MPIDatatype = datatype ;
203     mpiaccessstruct->MPITarget = destsourcerank ;
204     mpiaccessstruct->MPIIsRecv = fromsourcerank ;
205     MPI_Status *aStatus = new MPI_Status ;
206     mpiaccessstruct->MPIStatus = aStatus ;
207     mpiaccessstruct->MPIAsynchronous = asynchronous ;
208     mpiaccessstruct->MPICompleted = !asynchronous ;
209     mpiaccessstruct->MPIOutCount = -1 ;
210     if ( !asynchronous )
211       {
212         mpiaccessstruct->MPIRequest = MPI_REQUEST_NULL ;
213         mpiaccessstruct->MPIStatus->MPI_SOURCE = destsourcerank ;
214         mpiaccessstruct->MPIStatus->MPI_TAG = tag ;
215         mpiaccessstruct->MPIStatus->MPI_ERROR = MPI_SUCCESS ;
216       }
217     if ( _request == _max_request )
218       _request = _base_request ;
219     _request += 1 ;
220     _map_of_request_struct[_request] = mpiaccessstruct ;
221     if ( fromsourcerank )
222       _recv_request[ destsourcerank ] = _request;
223     else
224       _send_request[ destsourcerank ] = _request;
225     if ( _trace )
226       cout << "NewRequest" << _my_rank << "( " << _request << " ) "
227            << mpiaccessstruct << endl ;
228     return _request ;
229   }
230
231   /*
232     MPIAccess and "tags" (or "MPITags") :
233     =====================================
234
235     . The constructor give the possibility to choose an interval of
236     tags to use : [BaseTag , MaxTag].
237     The default is [ 0 , MPI_TAG_UB], MPI_TAG_UB being the maximum
238     value in an implementation of MPI (minimum 32767 = 2**15-1).
239     On awa with the implementation lam MPI_TAG_UB value is
240     7353944. The norma MPI specify that value is the same in all
241     processes started by mpirun.
242     In the case of the use of the same IntraCommunicator in a process
243     for several distinct data flows (or for several IntraCommunicators
244     with common processes), that permits to avoid ambibuity
245     and may help debug.
246
247     . In MPIAccess the tags have two parts (#define MODULO_TAG 10) :
248     + The last decimal digit decimal correspond to MPI_DataType ( 1 for
249     TimeMessages, 2 for MPI_INT and 3 for MPI_DOUBLE)
250     + The value of other digits correspond to a circular numero for each
251     message.
252     + A TimeMessage and the associated DataMessage have the same numero
253     (but the types are different and the tags also).
254
255     . For a Send of a message from a process "source" to a process
256     "target", we have _send_MPI_tag[target] in the process
257     source (it contains the last "tag" used for the Send of a pour l'envoi de
258     message to the process target).
259     And in the process "target" which receive that message, we have
260     _recv_MPI_Tag[source] (it contains the last "tag" used for the Recv
261     of messages from the process source).
262     Naturally in the MPI norma the values of that tags must be the same.
263   */
264   int MPIAccess::newSendTag( MPI_Datatype datatype, int destrank , int method ,
265                              bool asynchronous, int &RequestId )
266   {
267     int tag ;
268     tag = incrTag( _send_MPI_tag[destrank] ) ;
269     tag = valTag( tag, method ) ;
270     _send_MPI_tag[ destrank ] = tag ;
271     RequestId = newRequest( datatype, tag, destrank , false , asynchronous ) ;
272     _send_request[ destrank ] = RequestId ;
273     _send_requests[ destrank ].push_back( RequestId ) ;
274     return tag ;
275   }
276
277   int MPIAccess::newRecvTag( MPI_Datatype datatype, int sourcerank , int method ,
278                              bool asynchronous, int &RequestId )
279   {
280     int tag ;
281     tag = incrTag( _recv_MPI_Tag[sourcerank] ) ;
282     tag = valTag( tag, method ) ;
283     _recv_MPI_Tag[ sourcerank ] = tag ;
284     RequestId = newRequest( datatype, tag , sourcerank , true , asynchronous ) ;
285     _recv_request[ sourcerank ] = RequestId ;
286     _recv_requests[ sourcerank ].push_back( RequestId ) ;
287     return tag ;
288   }
289
290   // Returns the number of all SendRequestIds that may be used to allocate
291   // ArrayOfSendRequests for the call to SendRequestIds
292   int MPIAccess::sendRequestIdsSize()
293   {
294     int size = 0;
295     for (int i = 0 ; i < _processor_group_size ; i++ )
296       size += _send_requests[ i ].size() ;
297     return size ;
298   }
299
300   // Returns in ArrayOfSendRequests with the dimension "size" all the
301   // SendRequestIds
302   int MPIAccess::sendRequestIds(int size, int *ArrayOfSendRequests)
303   {
304     int destrank ;
305     int i = 0 ;
306     for ( destrank = 0 ; destrank < _processor_group_size ; destrank++ )
307       {
308         list< int >::const_iterator iter ;
309         for (iter = _send_requests[ destrank ].begin() ; iter != _send_requests[destrank].end() ; iter++ )
310           ArrayOfSendRequests[i++] = *iter ;
311       }
312     return i ;
313   }
314
315   // Returns the number of all RecvRequestIds that may be used to allocate
316   // ArrayOfRecvRequests for the call to RecvRequestIds
317   int MPIAccess::recvRequestIdsSize()
318   {
319     int size = 0 ;
320     for (int i = 0 ; i < _processor_group_size ; i++ )
321       size += _recv_requests[ i ].size() ;
322     return size ;
323   }
324
325   // Returns in ArrayOfRecvRequests with the dimension "size" all the
326   // RecvRequestIds
327   int MPIAccess::recvRequestIds(int size, int *ArrayOfRecvRequests)
328   {
329     int sourcerank ;
330     int i = 0 ;
331     for ( sourcerank = 0 ; sourcerank < _processor_group_size ; sourcerank++ )
332       {
333         list< int >::const_iterator iter ;
334         for (iter = _recv_requests[ sourcerank ].begin() ; iter != _recv_requests[sourcerank].end() ; iter++ )
335           ArrayOfRecvRequests[i++] = *iter ;
336       }
337     return i ;
338   }
339
340   // Returns in ArrayOfSendRequests with the dimension "size" all the
341   // SendRequestIds to a destination rank
342   int MPIAccess::sendRequestIds(int destrank, int size, int *ArrayOfSendRequests)
343   {
344     if (size < _send_requests[destrank].size() )
345       throw INTERP_KERNEL::Exception("wrong call to MPIAccess::SendRequestIds");
346     int i = 0 ;
347     list< int >::const_iterator iter ;
348     for (iter = _send_requests[ destrank ].begin() ; iter != _send_requests[destrank].end() ; iter++ )
349       ArrayOfSendRequests[i++] = *iter ;
350     return _send_requests[destrank].size() ;
351   }
352
353   // Returns in ArrayOfRecvRequests with the dimension "size" all the
354   // RecvRequestIds from a sourcerank
355   int MPIAccess::recvRequestIds(int sourcerank, int size, int *ArrayOfRecvRequests)
356   {
357     if (size < _recv_requests[sourcerank].size() )
358       throw INTERP_KERNEL::Exception("wrong call to MPIAccess::RecvRequestIds");
359     int i = 0 ;
360     list< int >::const_iterator iter ;
361     _recv_requests[ sourcerank ] ;
362     for (iter = _recv_requests[ sourcerank ].begin() ; iter != _recv_requests[sourcerank].end() ; iter++ )
363       ArrayOfRecvRequests[i++] = *iter ;
364     return _recv_requests[sourcerank].size() ;
365   }
366
367   // Send in synchronous mode count values of type datatype from buffer to target
368   // (returns RequestId identifier even if the corresponding structure is deleted :
369   // it is only in order to have the same signature as the asynchronous mode)
370   int MPIAccess::send(void* buffer, int count, MPI_Datatype datatype, int target, int &RequestId)
371   {
372     int sts = MPI_SUCCESS ;
373     RequestId = -1 ;
374     if ( count )
375       {
376         _MessageIdent aMethodIdent = methodId( datatype ) ;
377         int MPItag = newSendTag( datatype, target , aMethodIdent , false , RequestId ) ;
378         if ( aMethodIdent == _message_time )
379           {
380             TimeMessage *aTimeMsg = (TimeMessage *) buffer ;
381             aTimeMsg->tag = MPItag ;
382           }
383         deleteRequest( RequestId ) ;
384         sts = _comm_interface.send(buffer, count, datatype, target, MPItag,
385                                   *_intra_communicator ) ;
386         if ( _trace )
387           cout << "MPIAccess::Send" << _my_rank << " SendRequestId "
388                << RequestId << " count " << count << " target " << target
389                << " MPItag " << MPItag << endl ;
390       }
391     return sts ;
392   }
393
394   // Receive (read) in synchronous mode count values of type datatype in buffer from source
395   // (returns RequestId identifier even if the corresponding structure is deleted :
396   // it is only in order to have the same signature as the asynchronous mode)
397   // The output argument OutCount is optionnal : *OutCount <= count
398   int MPIAccess::recv(void* buffer, int count, MPI_Datatype datatype, int source, int &RequestId, int *OutCount)
399   {
400     int sts = MPI_SUCCESS ;
401     RequestId = -1 ;
402     if ( OutCount != NULL )
403       *OutCount = -1 ;
404     if ( count )
405       {
406         _MessageIdent aMethodIdent = methodId( datatype ) ;
407         int MPItag = newRecvTag( datatype, source , aMethodIdent , false , RequestId ) ;
408         sts =  _comm_interface.recv(buffer, count, datatype, source, MPItag,
409                                    *_intra_communicator , MPIStatus( RequestId ) ) ;
410         int outcount = 0 ;
411         if ( sts == MPI_SUCCESS )
412           {
413             MPI_Datatype datatype = MPIDatatype( RequestId ) ;
414             _comm_interface.getCount(MPIStatus( RequestId ), datatype, &outcount ) ;
415             setMPIOutCount( RequestId , outcount ) ;
416             setMPICompleted( RequestId , true ) ;
417             deleteStatus( RequestId ) ;
418           }
419         if ( OutCount != NULL )
420           *OutCount = outcount ;
421         if ( _trace )
422           cout << "MPIAccess::Recv" << _my_rank << " RecvRequestId "
423                << RequestId << " count " << count << " source " << source
424                << " MPItag " << MPItag << endl ;
425         deleteRequest( RequestId ) ;
426       }
427     return sts ;
428   }
429
430   // Send in asynchronous mode count values of type datatype from buffer to target
431   // Returns RequestId identifier.
432   int MPIAccess::ISend(void* buffer, int count, MPI_Datatype datatype, int target, int &RequestId)
433   {
434     int sts = MPI_SUCCESS ;
435     RequestId = -1 ;
436     if ( count )
437       {
438         _MessageIdent aMethodIdent = methodId( datatype ) ;
439         int MPItag = newSendTag( datatype, target , aMethodIdent , true , RequestId ) ;
440         if ( aMethodIdent == _message_time )
441           {
442             TimeMessage *aTimeMsg = (TimeMessage *) buffer ;
443             aTimeMsg->tag = MPItag ;
444           }
445         MPI_Request *aSendRequest = MPIRequest( RequestId ) ;
446         if ( _trace )
447           {
448             cout << "MPIAccess::ISend" << _my_rank << " ISendRequestId "
449                  << RequestId << " count " << count << " target " << target
450                  << " MPItag " << MPItag << endl ;
451             if ( MPItag == 1 )
452               cout << "MPIAccess::ISend" << _my_rank << " time "
453                    << ((TimeMessage *)buffer)->time << " " << ((TimeMessage *)buffer)->deltatime
454                    << endl ;
455           }
456         sts = _comm_interface.Isend(buffer, count, datatype, target, MPItag,
457                                    *_intra_communicator , aSendRequest) ;
458       }
459     return sts ;
460   }
461
462   // Receive (read) in asynchronous mode count values of type datatype in buffer from source
463   // returns RequestId identifier.
464   int MPIAccess::IRecv(void* buffer, int count, MPI_Datatype datatype, int source, int &RequestId)
465   {
466     int sts = MPI_SUCCESS ;
467     RequestId = -1 ;
468     if ( count )
469       {
470         _MessageIdent aMethodIdent = methodId( datatype ) ;
471         int MPItag = newRecvTag( datatype, source , aMethodIdent , true , RequestId ) ;
472         MPI_Request *aRecvRequest = MPIRequest( RequestId ) ;
473         if ( _trace )
474           {
475             cout << "MPIAccess::IRecv" << _my_rank << " IRecvRequestId "
476                  << RequestId << " count " << count << " source " << source
477                  << " MPItag " << MPItag << endl ;
478             if ( MPItag == 1 )
479               cout << "MPIAccess::ISend" << _my_rank << " time "
480                    << ((TimeMessage *)buffer)->time << " " << ((TimeMessage *)buffer)->deltatime
481                    << endl ;
482           }
483         sts = _comm_interface.Irecv(buffer, count, datatype, source, MPItag,
484                                    *_intra_communicator , aRecvRequest) ;
485       }
486     return sts ;
487   }
488
489   // Perform a Send and a Recv in synchronous mode
490   int MPIAccess::sendRecv(void* sendbuf, int sendcount, MPI_Datatype sendtype,
491                           int dest, int &SendRequestId,
492                           void* recvbuf, int recvcount, MPI_Datatype recvtype,
493                           int source, int &RecvRequestId, int *OutCount)
494   {
495     int sts = MPI_SUCCESS ;
496     SendRequestId = -1 ;
497     RecvRequestId = -1 ;
498     if ( recvcount )
499       sts = IRecv(recvbuf, recvcount, recvtype, source, RecvRequestId) ;
500     int outcount = -1 ;
501     if ( _trace )
502       cout << "MPIAccess::SendRecv" << _my_rank << " IRecv RecvRequestId "
503            << RecvRequestId << endl ;
504     if ( sts == MPI_SUCCESS )
505       {
506         if ( sendcount )
507           sts = send(sendbuf, sendcount, sendtype, dest, SendRequestId) ;
508         if ( _trace )
509           cout << "MPIAccess::SendRecv" << _my_rank << " Send SendRequestId "
510                << SendRequestId << endl ;
511         if ( sts == MPI_SUCCESS && recvcount )
512           {
513             sts = wait( RecvRequestId ) ;
514             outcount = MPIOutCount( RecvRequestId ) ;
515             if ( _trace )
516               cout << "MPIAccess::SendRecv" << _my_rank << " IRecv RecvRequestId "
517                    << RecvRequestId << " outcount " << outcount << endl ;
518           }
519       }
520     if ( OutCount != NULL )
521       {
522         *OutCount = outcount ;
523         if ( _trace )
524           cout << "MPIAccess::SendRecv" << _my_rank << " *OutCount = " << *OutCount
525                << endl ;
526       }
527     deleteRequest( RecvRequestId ) ;
528     return sts ;
529   }
530
531   // Perform a Send and a Recv in asynchronous mode
532   int MPIAccess::ISendRecv(void* sendbuf, int sendcount, MPI_Datatype sendtype,
533                            int dest, int &SendRequestId,
534                            void* recvbuf, int recvcount, MPI_Datatype recvtype,
535                            int source, int &RecvRequestId)
536   {
537     int sts = MPI_SUCCESS ;
538     SendRequestId = -1 ;
539     RecvRequestId = -1 ;
540     if ( recvcount )
541       sts = IRecv(recvbuf, recvcount, recvtype, source, RecvRequestId) ;
542     if ( sts == MPI_SUCCESS )
543       if ( sendcount )
544         sts = ISend(sendbuf, sendcount, sendtype, dest, SendRequestId) ;
545     return sts ;
546   }
547
548   // Perform a wait of a Send or Recv asynchronous Request
549   // Do nothing for a synchronous Request
550   // Manage MPI_Request * and MPI_Status * structure
551   int MPIAccess::wait( int RequestId )
552   {
553     int status = MPI_SUCCESS ;
554     if ( !MPICompleted( RequestId ) )
555       {
556         if ( *MPIRequest( RequestId ) != MPI_REQUEST_NULL )
557           {
558             if ( _trace )
559               cout << "MPIAccess::Wait" << _my_rank << " -> wait( " << RequestId
560                    << " ) MPIRequest " << MPIRequest( RequestId ) << " MPIStatus "
561                    << MPIStatus( RequestId ) << " MPITag " << MPITag( RequestId )
562                    << " MPIIsRecv " << MPIIsRecv( RequestId ) << endl ;
563             status = _comm_interface.wait(MPIRequest( RequestId ), MPIStatus( RequestId )) ;
564           }
565         else
566           {
567             if ( _trace )
568               cout << "MPIAccess::Wait" << _my_rank << " MPIRequest == MPI_REQUEST_NULL"
569                    << endl ;
570           }
571         setMPICompleted( RequestId , true ) ;
572         if ( MPIIsRecv( RequestId ) && MPIStatus( RequestId ) )
573           {
574             MPI_Datatype datatype = MPIDatatype( RequestId ) ;
575             int outcount ;
576             status = _comm_interface.getCount(MPIStatus( RequestId ), datatype,
577                                              &outcount ) ;
578             if ( status == MPI_SUCCESS )
579               {
580                 setMPIOutCount( RequestId , outcount ) ;
581                 deleteStatus( RequestId ) ;
582                 if ( _trace )
583                   cout << "MPIAccess::Wait" << _my_rank << " RequestId " << RequestId
584                        << "MPIIsRecv " << MPIIsRecv( RequestId ) << " outcount " << outcount
585                        << endl ;
586               }
587             else
588               {
589                 if ( _trace )
590                   cout << "MPIAccess::Wait" << _my_rank << " MPIIsRecv "
591                        << MPIIsRecv( RequestId ) << " outcount " << outcount << endl ;
592               }
593           }
594         else
595           {
596             if ( _trace )
597               cout << "MPIAccess::Wait" << _my_rank << " MPIIsRecv " << MPIIsRecv( RequestId )
598                    << " MPIOutCount " << MPIOutCount( RequestId ) << endl ;
599           }
600       }
601     if ( _trace )
602       cout << "MPIAccess::Wait" << _my_rank << " RequestId " << RequestId
603            << " Request " << MPIRequest( RequestId )
604            << " Status " << MPIStatus( RequestId ) << " MPICompleted "
605            << MPICompleted( RequestId ) << " MPIOutCount " << MPIOutCount( RequestId )
606            << endl ;
607     return status ;
608   }
609
610   // Perform a "test" of a Send or Recv asynchronous Request
611   // If the request is done, returns true in the flag argument
612   // If the request is not finished, returns false in the flag argument
613   // Do nothing for a synchronous Request
614   // Manage MPI_request * and MPI_status * structure
615   int MPIAccess::test(int RequestId, int &flag)
616   {
617     int status = MPI_SUCCESS ;
618     flag = MPICompleted( RequestId ) ;
619     if ( _trace )
620       cout << "MPIAccess::Test" << _my_rank << " flag " << flag ;
621     if ( MPIIsRecv( RequestId ) )
622       {
623         if ( _trace )
624           cout << " Recv" ;
625       }
626     else
627       {
628         if ( _trace )
629           cout << " Send" ;
630       }
631     if( _trace )
632       cout << "Request" << RequestId << " " << MPIRequest( RequestId )
633            << " Status " << MPIStatus( RequestId ) << endl ;
634     if ( !flag )
635       {
636         if ( *MPIRequest( RequestId ) != MPI_REQUEST_NULL )
637           {
638             if ( _trace )
639               cout << "MPIAccess::Test" << _my_rank << " -> test( " << RequestId
640                    << " ) MPIRequest " << MPIRequest( RequestId )
641                    << " MPIStatus " << MPIStatus( RequestId )
642                    << " MPITag " << MPITag( RequestId )
643                    << " MPIIsRecv " << MPIIsRecv( RequestId ) << endl ;
644             status = _comm_interface.test(MPIRequest( RequestId ), &flag,
645                                          MPIStatus( RequestId )) ;
646           }
647         else
648           {
649             if ( _trace )
650               cout << "MPIAccess::Test" << _my_rank << " MPIRequest == MPI_REQUEST_NULL"
651                    << endl ;
652           }
653         if ( flag )
654           {
655             setMPICompleted( RequestId , true ) ;
656             if ( MPIIsRecv( RequestId ) && MPIStatus( RequestId ) )
657               {
658                 int outcount ;
659                 MPI_Datatype datatype = MPIDatatype( RequestId ) ;
660                 status = _comm_interface.getCount( MPIStatus( RequestId ), datatype,
661                                                   &outcount ) ;
662                 if ( status == MPI_SUCCESS )
663                   {
664                     setMPIOutCount( RequestId , outcount ) ;
665                     deleteStatus( RequestId ) ;
666                     if ( _trace )
667                       cout << "MPIAccess::Test" << _my_rank << " MPIIsRecv "
668                            << MPIIsRecv( RequestId ) << " outcount " << outcount << endl ;
669                   }
670                 else
671                   {
672                     if ( _trace )
673                       cout << "MPIAccess::Test" << _my_rank << " MPIIsRecv "
674                            << MPIIsRecv( RequestId ) << " outcount " << outcount << endl ;
675                   }
676               }
677             else
678               {
679                 if ( _trace )
680                   cout << "MPIAccess::Test" << _my_rank << " MPIIsRecv "
681                        << MPIIsRecv( RequestId ) << " MPIOutCount "
682                        << MPIOutCount( RequestId ) << endl ;
683               }
684           }
685       }
686     if ( _trace )
687       cout << "MPIAccess::Test" << _my_rank << " RequestId " << RequestId
688            << " flag " << flag << " MPICompleted " << MPICompleted( RequestId )
689            << " MPIOutCount " << MPIOutCount( RequestId ) << endl ;
690     return status ;
691   }
692
693   int MPIAccess::waitAny(int count, int *array_of_RequestIds, int &RequestId)
694   {
695     int status = MPI_ERR_OTHER ;
696     RequestId = -1 ;
697     cout << "MPIAccess::WaitAny not yet implemented" << endl ;
698     return status ;
699   }
700
701   int MPIAccess::testAny(int count, int *array_of_RequestIds, int &RequestId, int &flag)
702   {
703     int status = MPI_ERR_OTHER ;
704     RequestId = -1 ;
705     flag = 0 ;
706     cout << "MPIAccess::TestAny not yet implemented" << endl ;
707     return status ;
708   }
709   
710   // Perform a wait of each Send or Recv asynchronous Request of the array 
711   // array_of_RequestIds of size "count".
712   // That array may be filled with a call to SendRequestIdsSize or RecvRequestIdsSize
713   // Do nothing for a synchronous Request
714   // Manage MPI_Request * and MPI_Status * structure
715   int MPIAccess::waitAll(int count, int *array_of_RequestIds)
716   {
717     if ( _trace )
718       cout << "WaitAll" << _my_rank << " : count " << count << endl ;
719     int status ;
720     int retstatus = MPI_SUCCESS ;
721     int i ;
722     for ( i = 0 ; i < count ; i++ )
723       {
724         if ( _trace )
725           cout << "WaitAll" << _my_rank << " " << i << " -> Wait( "
726                << array_of_RequestIds[i] << " )" << endl ;
727         status = wait( array_of_RequestIds[i] ) ;
728         if ( status != MPI_SUCCESS )
729           retstatus = status ;
730       }
731     if ( _trace )
732       cout << "EndWaitAll" << _my_rank << endl ;
733     return retstatus ;
734   }
735
736   // Perform a "test" of each Send or Recv asynchronous Request of the array 
737   // array_of_RequestIds of size "count".
738   // That array may be filled with a call to SendRequestIdsSize or RecvRequestIdsSize
739   // If all requests are done, returns true in the flag argument
740   // If all requests are not finished, returns false in the flag argument
741   // Do nothing for a synchronous Request
742   // Manage MPI_Request * and MPI_Status * structure
743   int MPIAccess::testAll(int count, int *array_of_RequestIds, int &flag)
744   {
745     if ( _trace )
746       cout << "TestAll" << _my_rank << " : count " << count << endl ;
747     int status ;
748     int retstatus = MPI_SUCCESS ;
749     bool retflag = true ;
750     int i ;
751     for ( i = 0 ; i < count ; i++ )
752       {
753         status = test( array_of_RequestIds[i] , flag ) ;
754         retflag = retflag && (flag != 0) ;
755         if ( status != MPI_SUCCESS )
756           retstatus = status ;
757       }
758     flag = retflag ;
759     if ( _trace )
760       cout << "EndTestAll" << _my_rank << endl ;
761     return retstatus ;
762   }
763
764   int MPIAccess::waitSome(int count, int *array_of_RequestIds, int outcount,
765                           int *outarray_of_RequestIds)
766   {
767     int status = MPI_ERR_OTHER ;
768     cout << "MPIAccess::WaitSome not yet implemented" << endl ;
769     return status ;
770   }
771
772   int MPIAccess::testSome(int count, int *array_of_RequestIds, int outcounts,
773                           int *outarray_of_RequestIds)
774   {
775     int status = MPI_ERR_OTHER ;
776     cout << "MPIAccess::TestSome not yet implemented" << endl ;
777     return status ;
778   }
779   
780   // Probe checks if a message is available for read from FromSource rank.
781   // Returns the corresponding source, MPITag, datatype and outcount
782   // Probe is a blocking call which wait until a message is available
783   int MPIAccess::probe(int FromSource, int &source, int &MPITag,
784                        MPI_Datatype &myDatatype, int &outcount)
785   {
786     MPI_Status aMPIStatus ;
787     int sts =  _comm_interface.probe( FromSource, MPI_ANY_TAG,
788                                      *_intra_communicator , &aMPIStatus ) ;
789     if ( sts == MPI_SUCCESS )
790       {
791         source = aMPIStatus.MPI_SOURCE ;
792         MPITag = aMPIStatus.MPI_TAG ;
793         int MethodId = (MPITag % MODULO_TAG) ;
794         myDatatype = datatype( (ParaMEDMEM::_MessageIdent) MethodId ) ;
795         _comm_interface.getCount(&aMPIStatus, myDatatype, &outcount ) ;
796         if ( _trace )
797           cout << "MPIAccess::Probe" << _my_rank << " FromSource " << FromSource
798                << " source " << source << " MPITag " << MPITag << " MethodId "
799                << MethodId << " datatype " << myDatatype << " outcount " << outcount
800                << endl ;
801       }
802     else
803       {
804         source = -1 ;
805         MPITag = -1 ;
806         myDatatype = 0 ;
807         outcount = -1 ;
808       }
809     return sts ;
810   }
811
812   // IProbe checks if a message is available for read from FromSource rank.
813   // If there is a message available, returns the corresponding source,
814   // MPITag, datatype and outcount with flag = true
815   // If not, returns flag = false
816   int MPIAccess::IProbe(int FromSource, int &source, int &MPITag,
817                         MPI_Datatype &myDataType, int &outcount, int &flag)
818   {
819     MPI_Status aMPIStatus ;
820     int sts =  _comm_interface.Iprobe( FromSource, MPI_ANY_TAG,
821                                       *_intra_communicator , &flag,
822                                       &aMPIStatus ) ;
823     if ( sts == MPI_SUCCESS && flag )
824       {
825         source = aMPIStatus.MPI_SOURCE ;
826         MPITag = aMPIStatus.MPI_TAG ;
827         int MethodId = (MPITag % MODULO_TAG) ;
828         myDataType = datatype( (ParaMEDMEM::_MessageIdent) MethodId ) ;
829         _comm_interface.getCount(&aMPIStatus, myDataType, &outcount ) ;
830         if ( _trace )
831           cout << "MPIAccess::IProbe" << _my_rank << " FromSource " << FromSource
832                << " source " << source << " MPITag " << MPITag << " MethodId "
833                << MethodId << " datatype " << myDataType << " outcount " << outcount
834                << " flag " << flag << endl ;
835       }
836     else
837       {
838         source = -1 ;
839         MPITag = -1 ;
840         myDataType = 0 ;
841         outcount = -1 ;
842       }
843     return sts ;
844   }
845
846   // Cancel concerns a "posted" asynchronous IRecv
847   // Returns flag = true if the receiving request was successfully canceled
848   // Returns flag = false if the receiving request was finished but not canceled
849   // Use cancel, wait and test_cancelled of the MPI API
850   int MPIAccess::cancel( int RecvRequestId, int &flag )
851   {
852     flag = 0 ;
853     int sts = _comm_interface.cancel( MPIRequest( RecvRequestId ) ) ;
854     if ( sts == MPI_SUCCESS )
855       {
856         sts = _comm_interface.wait( MPIRequest( RecvRequestId ) ,
857                                    MPIStatus( RecvRequestId ) ) ;
858         if ( sts == MPI_SUCCESS )
859           sts = _comm_interface.testCancelled( MPIStatus( RecvRequestId ) , &flag ) ;
860       }
861     return sts ;
862   }
863
864   // Cancel concerns a "pending" receiving message (without IRecv "posted")
865   // Returns flag = true if the message was successfully canceled
866   // Returns flag = false if the receiving request was finished but not canceled
867   // Use Irecv, cancel, wait and test_cancelled of the MPI API
868   int MPIAccess::cancel( int source, int theMPITag, MPI_Datatype datatype, int outcount, int &flag )
869   {
870     int sts ;
871     MPI_Aint extent ;
872     flag = 0 ;
873     sts =  MPI_Type_extent( datatype , &extent ) ;
874     if ( sts == MPI_SUCCESS )
875       {
876         void * recvbuf = malloc( extent*outcount ) ;
877         MPI_Request aRecvRequest ;
878         if ( _trace )
879           cout << "MPIAccess::Cancel" << _my_rank << " Irecv extent " << extent
880                << " datatype " << datatype << " source " << source << " theMPITag "
881                << theMPITag << endl ;
882         sts = _comm_interface.Irecv( recvbuf, outcount, datatype, source, theMPITag,
883                                     *_intra_communicator , &aRecvRequest ) ;
884         if ( sts == MPI_SUCCESS )
885           {
886             sts = _comm_interface.cancel( &aRecvRequest ) ;
887             if ( _trace )
888               cout << "MPIAccess::Cancel" << _my_rank << " theMPITag " << theMPITag
889                    << " cancel done" << endl ;
890             if ( sts == MPI_SUCCESS )
891               {
892                 MPI_Status aStatus ;
893                 if ( _trace )
894                   cout << "MPIAccess::Cancel" << _my_rank << " wait" << endl ;
895                 sts = _comm_interface.wait( &aRecvRequest , &aStatus ) ;
896                 if ( sts == MPI_SUCCESS )
897                   {
898                     if ( _trace )
899                       cout << "MPIAccess::Cancel" << _my_rank << " test_cancelled" << endl ;
900                     sts = _comm_interface.testCancelled( &aStatus , &flag ) ;
901                   }
902               }
903           }
904         if ( _trace && datatype == timeType() )
905           cout << "MPIAccess::Cancel" << _my_rank << " time "
906                << ((TimeMessage *) recvbuf)->time << " "
907                << ((TimeMessage *) recvbuf)->deltatime << endl ;
908         free( recvbuf ) ;
909       }
910     if ( _trace )
911       cout << "MPIAccess::Cancel" << _my_rank << " flag " << flag << endl ;
912     return sts ;
913   }
914
915
916   // CancelAll concerns all "pending" receiving message (without IRecv "posted")
917   // CancelAll use IProbe and Cancel (see obove)
918   int MPIAccess::cancelAll()
919   {
920     int sts = MPI_SUCCESS ;
921     int target ;
922     int source ;
923     int MPITag ;
924     MPI_Datatype datatype ;
925     int outcount ;
926     int flag ;
927     for ( target = 0 ; target < _processor_group_size ; target++ )
928       {
929         sts = IProbe(target, source, MPITag, datatype, outcount, flag) ;
930         if ( sts == MPI_SUCCESS && flag )
931           {
932             sts = cancel(source, MPITag, datatype, outcount, flag) ;
933             if ( _trace )
934               cout << "MPIAccess::CancelAll" << _my_rank << " source " << source
935                    << " MPITag " << MPITag << " datatype " << datatype
936                    << " outcount " << outcount << " Cancel flag " << flag << endl ;
937             if ( sts != MPI_SUCCESS )
938               break ;
939           }
940         else if ( sts != MPI_SUCCESS )
941           break ;
942       }
943     return sts ;
944   }
945
946   // Same as barrier of MPI API
947   int MPIAccess::barrier()
948   {
949     int status = _comm_interface.barrier( *_intra_communicator ) ;
950     return status ;
951   }
952
953   // Same as Error_string of MPI API
954   int MPIAccess::errorString(int errorcode, char *string, int *resultlen) const
955   {
956     return _comm_interface.errorString( errorcode, string, resultlen) ;
957   }
958   
959   // Returns source, tag, error and outcount corresponding to receiving RequestId
960   // By default the corresponding structure of RequestId is deleted
961   int MPIAccess::status(int RequestId, int &source, int &tag, int &error,
962                         int &outcount, bool keepRequestStruct)
963   {
964     MPI_Status *myStatus = MPIStatus( RequestId ) ;
965     if ( _trace )
966       cout << "MPIAccess::status" << _my_rank << " RequestId " << RequestId
967            << " status " << myStatus << endl ;
968     if ( myStatus != NULL && MPIAsynchronous( RequestId ) &&
969          MPICompleted( RequestId ) )
970       {
971         if ( MPIIsRecv( RequestId ) )
972           {
973             source = myStatus->MPI_SOURCE ;
974             tag = myStatus->MPI_TAG ;
975             error = myStatus->MPI_ERROR ;
976             MPI_Datatype datatype = MPIDatatype( RequestId ) ;
977             _comm_interface.getCount(myStatus, datatype, &outcount ) ;
978             if ( _trace )
979               cout << "MPIAccess::status" << _my_rank << " RequestId " << RequestId
980                    << " status " << myStatus << " outcount " << outcount << endl ;
981             setMPIOutCount( RequestId , outcount ) ;
982           }
983         else
984           {
985             source = MPITarget( RequestId ) ;
986             tag = MPITag( RequestId ) ;
987             error = 0 ;
988             outcount = MPIOutCount( RequestId ) ;
989           }
990         if ( !keepRequestStruct )
991           deleteRequest( RequestId ) ;
992         return MPI_SUCCESS ;
993       }
994     else
995       {
996         source = MPITarget( RequestId ) ;
997         tag = MPITag( RequestId ) ;
998         error = 0 ;
999         outcount = MPIOutCount( RequestId ) ;
1000       }
1001     return MPI_SUCCESS ;
1002   }
1003   
1004   int MPIAccess::requestFree( MPI_Request *request )
1005   {
1006     return _comm_interface.requestFree( request ) ;
1007   }
1008   
1009   // Print all informations of all known requests for debugging purpose
1010   void MPIAccess::check() const
1011   {
1012     int i = 0 ;
1013     map< int , RequestStruct * >::const_iterator MapOfRequestStructiterator ;
1014     cout << "MPIAccess::Check" << _my_rank << "_map_of_request_struct_size "
1015          << _map_of_request_struct.size() << endl ;
1016     for ( MapOfRequestStructiterator = _map_of_request_struct.begin() ;
1017           MapOfRequestStructiterator != _map_of_request_struct.end() ;
1018           MapOfRequestStructiterator++ )
1019       {
1020         if ( MapOfRequestStructiterator->second != NULL )
1021           {
1022             cout << "    Check" << _my_rank << " " << i << ". Request"
1023                  << MapOfRequestStructiterator->first << "-->" ;
1024             if ( (MapOfRequestStructiterator->second)->MPIAsynchronous )
1025               cout << "I" ;
1026             if ( (MapOfRequestStructiterator->second)->MPIIsRecv )
1027               cout << "Recv from " ;
1028             else
1029               cout << "Send to " ;
1030             cout << (MapOfRequestStructiterator->second)->MPITarget
1031                  << " MPITag " << (MapOfRequestStructiterator->second)->MPITag
1032                  << " DataType " << (MapOfRequestStructiterator->second)->MPIDatatype
1033                  << " Request " << (MapOfRequestStructiterator->second)->MPIRequest
1034                  << " Status " << (MapOfRequestStructiterator->second)->MPIStatus
1035                  << " Completed " << (MapOfRequestStructiterator->second)->MPICompleted
1036                  << endl ;
1037           }
1038         i++ ;
1039       }
1040   }
1041
1042   // Outputs fields of a TimeMessage structure
1043   ostream & operator<< (ostream & f ,const TimeMessage & aTimeMsg )
1044   {
1045     f << " time " << aTimeMsg.time << " deltatime " << aTimeMsg.deltatime
1046       << " tag " << aTimeMsg.tag ;
1047     return f;
1048   }
1049
1050   // Outputs the DataType coded in a Tag
1051   ostream & operator<< (ostream & f ,const _MessageIdent & methodtype )
1052   {
1053     switch (methodtype)
1054       {
1055       case _message_time :
1056         f << " MethodTime ";
1057         break;
1058       case _message_int :
1059         f << " MPI_INT ";
1060         break;
1061       case _message_double :
1062         f << " MPI_DOUBLE ";
1063         break;
1064       default :
1065         f << " UnknownMethodType ";
1066         break;
1067       }
1068     return f;
1069   }
1070 }