Salome HOME
6871f1f3514232d7b22633d943386b8b238d5e7e
[modules/med.git] / src / ParaMEDMEM / OverlapMapping.cxx
1 // Copyright (C) 2007-2015  CEA/DEN, EDF R&D
2 //
3 // This library is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU Lesser General Public
5 // License as published by the Free Software Foundation; either
6 // version 2.1 of the License, or (at your option) any later version.
7 //
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
11 // Lesser General Public License for more details.
12 //
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
16 //
17 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
18 //
19 // Author : Anthony Geay (CEA/DEN)
20
21 #include "OverlapMapping.hxx"
22 #include "MPIProcessorGroup.hxx"
23
24 #include "MEDCouplingFieldDouble.hxx"
25 #include "MEDCouplingAutoRefCountObjectPtr.hxx"
26
27 #include "InterpKernelAutoPtr.hxx"
28
29 #include <numeric>
30 #include <algorithm>
31
32 using namespace ParaMEDMEM;
33
34 OverlapMapping::OverlapMapping(const ProcessorGroup& group):_group(group)
35 {
36 }
37
38 /*!
39  * Keeps the link between a given a proc holding source mesh data, and the corresponding cell IDs.
40  */
41 void OverlapMapping::keepTracksOfSourceIds(int procId, DataArrayInt *ids)
42 {
43   ids->incrRef();
44   _sent_src_ids_st2.push_back(ids);
45   _sent_src_proc_st2.push_back(procId);
46 }
47
48 /*!
49  * Same as keepTracksOfSourceIds() but for target mesh data.
50 */
51 void OverlapMapping::keepTracksOfTargetIds(int procId, DataArrayInt *ids)
52 {
53   ids->incrRef();
54   _sent_trg_ids_st2.push_back(ids);
55   _sent_trg_proc_st2.push_back(procId);
56 }
57
58 /*!
59  * This method stores in the local members the contribution coming from a matrix in format
60  * Target(rows)/Source(cols) for a source procId 'srcProcId' and for a target procId 'trgProcId'.
61  * All IDs received here (source and target) are in the format of local IDs.
62  *
63  * @param srcIds is null if the source mesh is on the local proc
64  * @param trgIds is null if the source mesh is on the local proc
65  *
66  * One of the 2 is necessarily null (the two can be null together)
67  */
68 void OverlapMapping::addContributionST(const std::vector< SparseDoubleVec >& matrixST, const DataArrayInt *srcIds, int srcProcId, const DataArrayInt *trgIds, int trgProcId)
69 {
70   _matrixes_st.push_back(matrixST);
71   _source_proc_id_st.push_back(srcProcId);
72   _target_proc_id_st.push_back(trgProcId);
73   if(srcIds)  // source mesh part is remote
74     {//item#1 of step2 algorithm in proc m. Only to know in advanced nb of recv ids [ (0,1) computed on proc1 and Matrix-Vector on proc1 ]
75       _nb_of_src_ids_proc_st2.push_back(srcIds->getNumberOfTuples());
76       _src_ids_proc_st2.push_back(srcProcId);
77     }
78   else        // source mesh part is local
79     {//item#0 of step2 algorithm in proc k
80       std::set<int> s;
81       // For all source IDs (=col indices) in the sparse matrix:
82       for(std::vector< SparseDoubleVec >::const_iterator it1=matrixST.begin();it1!=matrixST.end();it1++)
83         for(SparseDoubleVec::const_iterator it2=(*it1).begin();it2!=(*it1).end();it2++)
84           s.insert((*it2).first);
85       _src_ids_zip_st2.resize(_src_ids_zip_st2.size()+1);
86       _src_ids_zip_st2.back().insert(_src_ids_zip_st2.back().end(),s.begin(),s.end());
87       _src_ids_zip_proc_st2.push_back(trgProcId);
88     }
89 }
90
91 /*!
92  * This method is in charge to send matrices in AlltoAll mode.
93  *
94  * 'procsToSendField' gives the list of procs field data has to be sent to.
95  * See OverlapElementLocator::computeBoundingBoxesAndTodoList()
96  *
97  * After the call of this method, 'this' contains the matrixST for all source cells of the current proc
98  */
99 void OverlapMapping::prepare(const std::vector< int >& procsToSendField, int nbOfTrgElems)
100 {
101 //  printMatrixesST();
102
103   CommInterface commInterface=_group.getCommInterface();
104   const MPIProcessorGroup *group=static_cast<const MPIProcessorGroup*>(&_group);
105   const MPI_Comm *comm=group->getComm();
106   int grpSize=_group.size();
107   INTERP_KERNEL::AutoPtr<int> nbsend=new int[grpSize];
108   INTERP_KERNEL::AutoPtr<int> nbsend2=new int[grpSize];
109   INTERP_KERNEL::AutoPtr<int> nbsend3=new int[grpSize];
110   std::fill<int *>(nbsend,nbsend+grpSize,0);
111   int myProcId=_group.myRank();
112   for(std::size_t i=0;i<_matrixes_st.size();i++)
113     if(_source_proc_id_st[i]==myProcId)
114       nbsend[_target_proc_id_st[i]]=_matrixes_st[i].size();
115   INTERP_KERNEL::AutoPtr<int> nbrecv=new int[grpSize];
116   commInterface.allToAll(nbsend,1,MPI_INT,nbrecv,1,MPI_INT,*comm);
117   //exchanging matrix
118   //first exchanging offsets+ids_source
119   INTERP_KERNEL::AutoPtr<int> nbrecv1=new int[grpSize];
120   INTERP_KERNEL::AutoPtr<int> nbrecv2=new int[grpSize];
121   //
122   int *tmp=0;
123   serializeMatrixStep0ST(nbrecv,
124                          tmp,nbsend2,nbsend3,
125                          nbrecv1,nbrecv2);
126   INTERP_KERNEL::AutoPtr<int> bigArr=tmp;
127   INTERP_KERNEL::AutoPtr<int> bigArrRecv=new int[nbrecv2[grpSize-1]+nbrecv1[grpSize-1]];
128   commInterface.allToAllV(bigArr,nbsend2,nbsend3,MPI_INT,
129                           bigArrRecv,nbrecv1,nbrecv2,MPI_INT,
130                           *comm);// sending ids of sparse matrix (n+1 elems)
131   //second phase echange target ids
132   std::fill<int *>(nbsend2,nbsend2+grpSize,0);
133   INTERP_KERNEL::AutoPtr<int> nbrecv3=new int[grpSize];
134   INTERP_KERNEL::AutoPtr<int> nbrecv4=new int[grpSize];
135   double *tmp2=0;
136   int lgthOfArr=serializeMatrixStep1ST(nbrecv,bigArrRecv,nbrecv1,nbrecv2,
137                                        tmp,tmp2,
138                                        nbsend2,nbsend3,nbrecv3,nbrecv4);
139   INTERP_KERNEL::AutoPtr<int> bigArr2=tmp;
140   INTERP_KERNEL::AutoPtr<double> bigArrD2=tmp2;
141   INTERP_KERNEL::AutoPtr<int> bigArrRecv2=new int[lgthOfArr];
142   INTERP_KERNEL::AutoPtr<double> bigArrDRecv2=new double[lgthOfArr];
143   commInterface.allToAllV(bigArr2,nbsend2,nbsend3,MPI_INT,
144                           bigArrRecv2,nbrecv3,nbrecv4,MPI_INT,
145                           *comm);
146   commInterface.allToAllV(bigArrD2,nbsend2,nbsend3,MPI_DOUBLE,
147                           bigArrDRecv2,nbrecv3,nbrecv4,MPI_DOUBLE,
148                           *comm);
149   //finishing
150   unserializationST(nbOfTrgElems,nbrecv,bigArrRecv,nbrecv1,nbrecv2,
151                     bigArrRecv2,bigArrDRecv2,nbrecv3,nbrecv4);
152   //updating _src_ids_zip_st2 and _src_ids_zip_st2 with received matrix.
153   updateZipSourceIdsForFuture();
154   //finish to fill _the_matrix_st with already in place matrix in _matrixes_st (local computation)
155   finishToFillFinalMatrixST();
156   // Prepare proc lists for future field data exchange (mutliply()):
157   fillProcToSendRcvForMultiply(procsToSendField);
158   // Make some space on local proc:
159   _matrixes_st.clear();
160
161 //  std::stringstream scout;
162 //  scout << "\n(" << myProcId << ") to send:";
163 //  for (std::vector< int >::const_iterator itdbg=_proc_ids_to_send_vector_st.begin(); itdbg!=_proc_ids_to_send_vector_st.end(); itdbg++)
164 //    scout << "," << *itdbg;
165 //  scout << "\n(" << myProcId << ") to recv:";
166 //    for (std::vector< int >::const_iterator itdbg=_proc_ids_to_recv_vector_st.begin(); itdbg!=_proc_ids_to_recv_vector_st.end(); itdbg++)
167 //      scout << "," << *itdbg;
168 //  std::cout << scout.str() << "\n";
169 //
170 //  printTheMatrix();
171 }
172
173 /**
174  * Fill the members _proc_ids_to_send_vector_st and _proc_ids_to_recv_vector_st
175  * indicating for each proc to which proc it should send its source field data
176  * and from which proc it should receive source field data.
177  *
178  * Should be called once finishToFillFinalMatrixST() has been executed, i.e. when the
179  * local matrices are completly ready.
180  */
181 void OverlapMapping::fillProcToSendRcvForMultiply(const std::vector< int >& procsToSendField)
182 {
183 //  _proc_ids_to_send_vector_st.clear();
184   _proc_ids_to_recv_vector_st.clear();
185   // Receiving side is easy - just inspect non-void terms in the final matrix
186   int i=0;
187   std::vector< std::vector< SparseDoubleVec > >::const_iterator it;
188   std::vector< SparseDoubleVec >::const_iterator it2;
189   for(it=_the_matrix_st.begin(); it!=_the_matrix_st.end(); it++, i++)
190     _proc_ids_to_recv_vector_st.push_back(_the_matrix_st_source_proc_id[i]);
191
192   // Sending side was computed in OverlapElementLocator::computeBoundingBoxesAndTodoList()
193   _proc_ids_to_send_vector_st = procsToSendField;
194 }
195
196 /*!
197  * Compute denominators.
198  */
199 void OverlapMapping::computeDenoGlobConstraint()
200 {
201   _the_deno_st.clear();
202   std::size_t sz1=_the_matrix_st.size();
203   _the_deno_st.resize(sz1);
204   for(std::size_t i=0;i<sz1;i++)
205     {
206       std::size_t sz2=_the_matrix_st[i].size();
207       _the_deno_st[i].resize(sz2);
208       for(std::size_t j=0;j<sz2;j++)
209         {
210           double sum=0;
211           SparseDoubleVec& mToFill=_the_deno_st[i][j];
212           const SparseDoubleVec& m=_the_matrix_st[i][j];
213           for(SparseDoubleVec::const_iterator it=m.begin();it!=m.end();it++)
214             sum+=(*it).second;
215           for(SparseDoubleVec::const_iterator it=m.begin();it!=m.end();it++)
216             mToFill[(*it).first]=sum;
217         }
218     }
219 }
220
221 /*!
222  * Compute denominators.
223  */
224 void OverlapMapping::computeDenoConservativeVolumic(int nbOfTuplesTrg)
225 {
226   int myProcId=_group.myRank();
227   //
228   _the_deno_st.clear();
229   std::size_t sz1=_the_matrix_st.size();
230   _the_deno_st.resize(sz1);
231   std::vector<double> deno(nbOfTuplesTrg);
232   for(std::size_t i=0;i<sz1;i++)
233     {
234       const std::vector< SparseDoubleVec >& mat=_the_matrix_st[i];
235       int curSrcId=_the_matrix_st_source_proc_id[i];
236       std::vector<int>::iterator isItem1=std::find(_sent_trg_proc_st2.begin(),_sent_trg_proc_st2.end(),curSrcId);
237       int rowId=0;
238       if(isItem1==_sent_trg_proc_st2.end() || curSrcId==myProcId)//item1 of step2 main algo. Simple, because rowId of mat are directly target ids.
239         {
240           for(std::vector< SparseDoubleVec >::const_iterator it1=mat.begin();it1!=mat.end();it1++,rowId++)
241             for(SparseDoubleVec::const_iterator it2=(*it1).begin();it2!=(*it1).end();it2++)
242               deno[rowId]+=(*it2).second;
243         }
244       else
245         {//item0 of step2 main algo. More complicated.
246           std::vector<int>::iterator fnd=isItem1;//std::find(_trg_proc_st2.begin(),_trg_proc_st2.end(),curSrcId);
247           int locId=std::distance(_sent_trg_proc_st2.begin(),fnd);
248           const DataArrayInt *trgIds=_sent_trg_ids_st2[locId];
249           const int *trgIds2=trgIds->getConstPointer();
250           for(std::vector< SparseDoubleVec >::const_iterator it1=mat.begin();it1!=mat.end();it1++,rowId++)
251             for(SparseDoubleVec::const_iterator it2=(*it1).begin();it2!=(*it1).end();it2++)
252               deno[trgIds2[rowId]]+=(*it2).second;
253         }
254     }
255   //
256   for(std::size_t i=0;i<sz1;i++)
257     {
258       int rowId=0;
259       const std::vector< SparseDoubleVec >& mat=_the_matrix_st[i];
260       int curSrcId=_the_matrix_st_source_proc_id[i];
261       std::vector<int>::iterator isItem1=std::find(_sent_trg_proc_st2.begin(),_sent_trg_proc_st2.end(),curSrcId);
262       std::vector< SparseDoubleVec >& denoM=_the_deno_st[i];
263       denoM.resize(mat.size());
264       if(isItem1==_sent_trg_proc_st2.end() || curSrcId==myProcId)//item1 of step2 main algo. Simple, because rowId of mat are directly target ids.
265         {
266           int rowId=0;
267           for(std::vector< SparseDoubleVec >::const_iterator it1=mat.begin();it1!=mat.end();it1++,rowId++)
268             for(SparseDoubleVec::const_iterator it2=(*it1).begin();it2!=(*it1).end();it2++)
269               denoM[rowId][(*it2).first]=deno[rowId];
270         }
271       else
272         {
273           std::vector<int>::iterator fnd=isItem1;
274           int locId=std::distance(_sent_trg_proc_st2.begin(),fnd);
275           const DataArrayInt *trgIds=_sent_trg_ids_st2[locId];
276           const int *trgIds2=trgIds->getConstPointer();
277           for(std::vector< SparseDoubleVec >::const_iterator it1=mat.begin();it1!=mat.end();it1++,rowId++)
278             for(SparseDoubleVec::const_iterator it2=(*it1).begin();it2!=(*it1).end();it2++)
279               denoM[rowId][(*it2).first]=deno[trgIds2[rowId]];
280         }
281     }
282 }
283
284 /*!
285  * This method performs step #0/3 in serialization process.
286  * \param count tells specifies nb of elems to send to corresponding proc id. size equal to _group.size().
287  * \param offsets tells for a proc i where to start serialize#0 matrix. size equal to _group.size().
288  * \param nbOfElemsSrc of size _group.size(). Comes from previous all2all call. tells how many srcIds per proc contains matrix for current proc.
289  */
290 void OverlapMapping::serializeMatrixStep0ST(const int *nbOfElemsSrc, int *&bigArr, int *count, int *offsets,
291                                             int *countForRecv, int *offsetsForRecv) const
292 {
293   int grpSize=_group.size();
294   std::fill<int *>(count,count+grpSize,0);
295   int szz=0;
296   int myProcId=_group.myRank();
297   for(std::size_t i=0;i<_matrixes_st.size();i++)
298     {
299       if(_source_proc_id_st[i]==myProcId)// && _target_proc_id_st[i]!=myProcId
300         {
301           count[_target_proc_id_st[i]]=_matrixes_st[i].size()+1;
302           szz+=_matrixes_st[i].size()+1;
303         }
304     }
305   bigArr=new int[szz];
306   offsets[0]=0;
307   for(int i=1;i<grpSize;i++)
308     offsets[i]=offsets[i-1]+count[i-1];
309   for(std::size_t i=0;i<_matrixes_st.size();i++)
310     {
311       if(_source_proc_id_st[i]==myProcId)
312         {
313           int start=offsets[_target_proc_id_st[i]];
314           int *work=bigArr+start;
315           *work=0;
316           const std::vector< SparseDoubleVec >& mat=_matrixes_st[i];
317           for(std::vector< SparseDoubleVec >::const_iterator it=mat.begin();it!=mat.end();it++,work++)
318             work[1]=work[0]+(*it).size();
319         }
320     }
321   //
322   offsetsForRecv[0]=0;
323   for(int i=0;i<grpSize;i++)
324     {
325       if(nbOfElemsSrc[i]>0)
326         countForRecv[i]=nbOfElemsSrc[i]+1;
327       else
328         countForRecv[i]=0;
329       if(i>0)
330         offsetsForRecv[i]=offsetsForRecv[i-1]+countForRecv[i-1];
331     }
332 }
333
334 /*!
335  * This method performs step#1 and step#2/3. It returns the size of expected array to get allToAllV.
336  * It is where the locally computed matrices are serialized to be sent to adequate final proc.
337  */
338 int OverlapMapping::serializeMatrixStep1ST(const int *nbOfElemsSrc, const int *recvStep0, const int *countStep0, const int *offsStep0,
339                                            int *&bigArrI, double *&bigArrD, int *count, int *offsets,
340                                            int *countForRecv, int *offsForRecv) const
341 {
342   int grpSize=_group.size();
343   int myProcId=_group.myRank();
344   offsForRecv[0]=0;
345   int szz=0;
346   for(int i=0;i<grpSize;i++)
347     {
348       if(nbOfElemsSrc[i]!=0)
349         countForRecv[i]=recvStep0[offsStep0[i]+nbOfElemsSrc[i]];
350       else
351         countForRecv[i]=0;
352       szz+=countForRecv[i];
353       if(i>0)
354         offsForRecv[i]=offsForRecv[i-1]+countForRecv[i-1];
355     }
356   //
357   std::fill(count,count+grpSize,0);
358   offsets[0]=0;
359   int fullLgth=0;
360   for(std::size_t i=0;i<_matrixes_st.size();i++)
361     {
362       if(_source_proc_id_st[i]==myProcId)
363         {
364           const std::vector< SparseDoubleVec >& mat=_matrixes_st[i];
365           int lgthToSend=0;
366           for(std::vector< SparseDoubleVec >::const_iterator it=mat.begin();it!=mat.end();it++)
367             lgthToSend+=(*it).size();
368           count[_target_proc_id_st[i]]=lgthToSend;
369           fullLgth+=lgthToSend;
370         }
371     }
372   for(int i=1;i<grpSize;i++)
373     offsets[i]=offsets[i-1]+count[i-1];
374   //
375   bigArrI=new int[fullLgth];
376   bigArrD=new double[fullLgth];
377   // feeding arrays
378   fullLgth=0;
379   for(std::size_t i=0;i<_matrixes_st.size();i++)
380     {
381       if(_source_proc_id_st[i]==myProcId)
382         {
383           const std::vector< SparseDoubleVec >& mat=_matrixes_st[i];
384           for(std::vector< SparseDoubleVec >::const_iterator it1=mat.begin();it1!=mat.end();it1++)
385             {
386               int j=0;
387               for(SparseDoubleVec::const_iterator it2=(*it1).begin();it2!=(*it1).end();it2++,j++)
388                 {
389                   bigArrI[fullLgth+j]=(*it2).first;
390                   bigArrD[fullLgth+j]=(*it2).second;
391                 }
392               fullLgth+=(*it1).size();
393             }
394         }
395     }
396   return szz;
397 }
398
399 /*!
400  * This is the last step after all2Alls for matrix exchange.
401  * _the_matrix_st is the final matrix : 
402  *      - The first entry is srcId in current proc.
403  *      - The second is the pseudo id of source proc (correspondance with true id is in attribute _the_matrix_st_source_proc_id and _the_matrix_st_source_ids)
404  *      - the third is the srcId in the pseudo source proc
405  */
406 void OverlapMapping::unserializationST(int nbOfTrgElems,
407                                        const int *nbOfElemsSrcPerProc,//first all2all
408                                        const int *bigArrRecv, const int *bigArrRecvCounts, const int *bigArrRecvOffs,//2nd all2all
409                                        const int *bigArrRecv2, const double *bigArrDRecv2, const int *bigArrRecv2Count, const int *bigArrRecv2Offs)//3rd and 4th all2alls
410 {
411   _the_matrix_st.clear();
412   _the_matrix_st_source_proc_id.clear();
413   //
414   int grpSize=_group.size();
415   for(int i=0;i<grpSize;i++)
416     if(nbOfElemsSrcPerProc[i]!=0)
417       _the_matrix_st_source_proc_id.push_back(i);
418   int nbOfPseudoProcs=_the_matrix_st_source_proc_id.size();//_the_matrix_st_target_proc_id.size() contains number of matrix fetched remotely whose sourceProcId==myProcId
419   _the_matrix_st.resize(nbOfPseudoProcs);
420   //
421   int j=0;
422   for(int i=0;i<grpSize;i++)
423     if(nbOfElemsSrcPerProc[i]!=0)
424       {
425         _the_matrix_st[j].resize(nbOfElemsSrcPerProc[i]);
426         for(int k=0;k<nbOfElemsSrcPerProc[i];k++)
427           {
428             int offs=bigArrRecv[bigArrRecvOffs[i]+k];
429             int lgthOfMap=bigArrRecv[bigArrRecvOffs[i]+k+1]-offs;
430             for(int l=0;l<lgthOfMap;l++)
431               _the_matrix_st[j][k][bigArrRecv2[bigArrRecv2Offs[i]+offs+l]]=bigArrDRecv2[bigArrRecv2Offs[i]+offs+l];
432           }
433         j++;
434       }
435 }
436
437 /*!
438  * This method should be called when all remote matrix with sourceProcId==thisProcId have been retrieved and are
439  * in 'this->_the_matrix_st' and 'this->_the_matrix_st_target_proc_id' and 'this->_the_matrix_st_target_ids'.
440  * This method finish the job of filling 'this->_the_matrix_st' and 'this->_the_matrix_st_target_proc_id'
441  * by putting candidates in 'this->_matrixes_st' into them (i.e. local computation result).
442  */
443 void OverlapMapping::finishToFillFinalMatrixST()
444 {
445   int myProcId=_group.myRank();
446   int sz=_matrixes_st.size();
447   int nbOfEntryToAdd=0;
448   for(int i=0;i<sz;i++)
449     if(_source_proc_id_st[i]!=myProcId)
450       nbOfEntryToAdd++;
451   if(nbOfEntryToAdd==0)
452     return ;
453   int oldNbOfEntry=_the_matrix_st.size();
454   int newNbOfEntry=oldNbOfEntry+nbOfEntryToAdd;
455   _the_matrix_st.resize(newNbOfEntry);
456   int j=oldNbOfEntry;
457   for(int i=0;i<sz;i++)
458     if(_source_proc_id_st[i]!=myProcId)
459       {
460         const std::vector<SparseDoubleVec >& mat=_matrixes_st[i];
461         _the_matrix_st[j]=mat;
462         _the_matrix_st_source_proc_id.push_back(_source_proc_id_st[i]);
463         j++;
464       }
465 }
466
467 /*!
468  * This method performs a transpose multiply of 'fieldInput' and put the result into 'fieldOutput'.
469  * 'fieldInput' is expected to be the sourcefield and 'fieldOutput' the targetfield.
470  */
471 void OverlapMapping::multiply(const MEDCouplingFieldDouble *fieldInput, MEDCouplingFieldDouble *fieldOutput) const
472 {
473   int nbOfCompo=fieldInput->getNumberOfComponents();//to improve same number of components to test
474   CommInterface commInterface=_group.getCommInterface();
475   const MPIProcessorGroup *group=static_cast<const MPIProcessorGroup*>(&_group);
476   const MPI_Comm *comm=group->getComm();
477   int grpSize=_group.size();
478   int myProcId=_group.myRank();
479   //
480   INTERP_KERNEL::AutoPtr<int> nbsend=new int[grpSize];
481   INTERP_KERNEL::AutoPtr<int> nbsend2=new int[grpSize];
482   INTERP_KERNEL::AutoPtr<int> nbrecv=new int[grpSize];
483   INTERP_KERNEL::AutoPtr<int> nbrecv2=new int[grpSize];
484   std::fill<int *>(nbsend,nbsend+grpSize,0);
485   std::fill<int *>(nbrecv,nbrecv+grpSize,0);
486   nbsend2[0]=0;
487   nbrecv2[0]=0;
488   std::vector<double> valsToSend;
489
490   /*
491    * FIELD VALUE XCHGE
492    */
493   for(int i=0;i<grpSize;i++)
494     {
495       // Prepare field data to be sent/received through a MPI_AllToAllV
496       if(std::find(_proc_ids_to_send_vector_st.begin(),_proc_ids_to_send_vector_st.end(),i)!=_proc_ids_to_send_vector_st.end())
497         {
498           std::vector<int>::const_iterator isItem1=std::find(_sent_src_proc_st2.begin(),_sent_src_proc_st2.end(),i);
499           MEDCouplingAutoRefCountObjectPtr<DataArrayDouble> vals;
500           if(isItem1!=_sent_src_proc_st2.end()) // source data was sent to proc 'i'
501             {
502               // Prepare local field data to send to proc i
503               int id=std::distance(_sent_src_proc_st2.begin(),isItem1);
504               vals=fieldInput->getArray()->selectByTupleId(*_sent_src_ids_st2[id]);
505             }
506           else                                  // no source data was sent to proc 'i'
507             {//item0 of step2 main algo
508               std::vector<int>::const_iterator isItem22 = std::find(_src_ids_zip_proc_st2.begin(),_src_ids_zip_proc_st2.end(),i );
509               int id=std::distance(_src_ids_zip_proc_st2.begin(),isItem22);
510               if (isItem22 == _src_ids_zip_proc_st2.end())
511                 std::cout << "(" << myProcId << ") has end iterator!!!!!\n";
512               vals=fieldInput->getArray()->selectByTupleId(&(_src_ids_zip_st2[id])[0],&(_src_ids_zip_st2[id])[0]+_src_ids_zip_st2[id].size());
513             }
514           nbsend[i]=vals->getNbOfElems();
515           valsToSend.insert(valsToSend.end(),vals->getConstPointer(),vals->getConstPointer()+nbsend[i]);
516         }
517       if(std::find(_proc_ids_to_recv_vector_st.begin(),_proc_ids_to_recv_vector_st.end(),i)!=_proc_ids_to_recv_vector_st.end())
518         {
519           std::vector<int>::const_iterator isItem0=std::find(_sent_trg_proc_st2.begin(),_sent_trg_proc_st2.end(),i);
520           if(isItem0==_sent_trg_proc_st2.end())//item1 of step2 main algo [ (0,1) computed on proc1 and Matrix-Vector on proc1 ]
521             {
522               std::vector<int>::const_iterator it1=std::find(_src_ids_proc_st2.begin(),_src_ids_proc_st2.end(),i);
523               if(it1!=_src_ids_proc_st2.end())
524                 {
525                   int id=std::distance(_src_ids_proc_st2.begin(),it1);
526                   nbrecv[i]=_nb_of_src_ids_proc_st2[id]*nbOfCompo;
527                 }
528               else if(i==myProcId)   // diagonal element (i.e. proc #i talking to same proc #i)
529                 {
530                   nbrecv[i] = nbsend[i];
531                 }
532               else
533                 throw INTERP_KERNEL::Exception("OverlapMapping::multiply(): Internal error at field values exchange!");
534             }
535           else
536             {//item0 of step2 main algo [ (2,1) computed on proc2 but Matrix-Vector on proc1 ] [(1,0) computed on proc1 but Matrix-Vector on proc0]
537               int id=std::distance(_src_ids_zip_proc_st2.begin(),std::find(_src_ids_zip_proc_st2.begin(),_src_ids_zip_proc_st2.end(),i));
538               nbrecv[i]=_src_ids_zip_st2[id].size()*nbOfCompo;
539             }
540         }
541     }
542   for(int i=1;i<grpSize;i++)
543     {
544       nbsend2[i]=nbsend2[i-1]+nbsend[i-1];
545       nbrecv2[i]=nbrecv2[i-1]+nbrecv[i-1];
546     }
547   INTERP_KERNEL::AutoPtr<double> bigArr=new double[nbrecv2[grpSize-1]+nbrecv[grpSize-1]];
548
549 //  scout << "("  << myProcId << ") nbsend :" << nbsend[0] << "," << nbsend[1] << "," << nbsend[2] << "\n";
550 //  scout << "("  << myProcId << ") nbrecv :" << nbrecv[0] << "," << nbrecv[1] << "," << nbrecv[2] << "\n";
551 //  std::cout << scout.str() << "\n";
552   /*
553    * *********************** ALL-TO-ALL
554    */
555   commInterface.allToAllV(&valsToSend[0],nbsend,nbsend2,MPI_DOUBLE,
556                           bigArr,nbrecv,nbrecv2,MPI_DOUBLE,*comm);
557   /*
558    *
559    * TARGET FIELD COMPUTATION (matrix-vec computation)
560    */
561   fieldOutput->getArray()->fillWithZero();
562   INTERP_KERNEL::AutoPtr<double> tmp=new double[nbOfCompo];
563   for(int i=0;i<grpSize;i++)
564     {
565       if(nbrecv[i]>0)
566         {
567           double *pt=fieldOutput->getArray()->getPointer();
568           std::vector<int>::const_iterator it=std::find(_the_matrix_st_source_proc_id.begin(),_the_matrix_st_source_proc_id.end(),i);
569           if(it==_the_matrix_st_source_proc_id.end())
570             throw INTERP_KERNEL::Exception("Big problem !");
571           int id=std::distance(_the_matrix_st_source_proc_id.begin(),it);
572           const std::vector< SparseDoubleVec >& mat=_the_matrix_st[id];
573           const std::vector< SparseDoubleVec >& deno=_the_deno_st[id];
574           std::vector<int>::const_iterator isItem0=std::find(_sent_trg_proc_st2.begin(),_sent_trg_proc_st2.end(),i);
575           if(isItem0==_sent_trg_proc_st2.end())//item1 of step2 main algo [ (0,1) computed on proc1 and Matrix-Vector on proc1 ]
576             {
577               int nbOfTrgTuples=mat.size();
578               for(int j=0;j<nbOfTrgTuples;j++,pt+=nbOfCompo)
579                 {
580                   const SparseDoubleVec& mat1=mat[j];
581                   const SparseDoubleVec& deno1=deno[j];
582                   SparseDoubleVec::const_iterator it4=deno1.begin();
583                   for(SparseDoubleVec::const_iterator it3=mat1.begin();it3!=mat1.end();it3++,it4++)
584                     {
585                       std::transform(bigArr+nbrecv2[i]+((*it3).first)*nbOfCompo,
586                                      bigArr+nbrecv2[i]+((*it3).first+1)*(nbOfCompo),
587                                      (double *)tmp,
588                                      std::bind2nd(std::multiplies<double>(),(*it3).second/(*it4).second));
589                       std::transform((double *)tmp,(double *)tmp+nbOfCompo,pt,pt,std::plus<double>());
590                     }
591                 }
592             }
593           else
594             {//item0 of step2 main algo [ (2,1) computed on proc2 but Matrix-Vector on proc1 ]
595               double *pt=fieldOutput->getArray()->getPointer();
596               std::map<int,int> zipCor;
597               int id=std::distance(_src_ids_zip_proc_st2.begin(),std::find(_src_ids_zip_proc_st2.begin(),_src_ids_zip_proc_st2.end(),i));
598               const std::vector<int> zipIds=_src_ids_zip_st2[id];
599               int newId=0;
600               for(std::vector<int>::const_iterator it=zipIds.begin();it!=zipIds.end();it++,newId++)
601                 zipCor[*it]=newId;
602               int id2=std::distance(_sent_trg_proc_st2.begin(),std::find(_sent_trg_proc_st2.begin(),_sent_trg_proc_st2.end(),i));
603               const DataArrayInt *tgrIds=_sent_trg_ids_st2[id2];
604               const int *tgrIds2=tgrIds->getConstPointer();
605               int nbOfTrgTuples=mat.size();
606               for(int j=0;j<nbOfTrgTuples;j++)
607                 {
608                   const SparseDoubleVec& mat1=mat[j];
609                   const SparseDoubleVec& deno1=deno[j];
610                   SparseDoubleVec::const_iterator it5=deno1.begin();
611                   for(SparseDoubleVec::const_iterator it3=mat1.begin();it3!=mat1.end();it3++,it5++)
612                     {
613                       std::map<int,int>::const_iterator it4=zipCor.find((*it3).first);
614                       if(it4==zipCor.end())
615                         throw INTERP_KERNEL::Exception("OverlapMapping::multiply(): Internal error in final value computation!");
616                       std::transform(bigArr+nbrecv2[i]+((*it4).second)*nbOfCompo,
617                                      bigArr+nbrecv2[i]+((*it4).second+1)*(nbOfCompo),
618                                      (double *)tmp,
619                                      std::bind2nd(std::multiplies<double>(),(*it3).second/(*it5).second));
620                       std::transform((double *)tmp,(double *)tmp+nbOfCompo,pt+tgrIds2[j]*nbOfCompo,pt+tgrIds2[j]*nbOfCompo,std::plus<double>());
621                     }
622                 }
623             }
624         }
625     }
626 }
627
628 /*!
629  * This method performs a transpose multiply of 'fieldInput' and put the result into 'fieldOutput'.
630  * 'fieldInput' is expected to be the targetfield and 'fieldOutput' the sourcefield.
631  */
632 void OverlapMapping::transposeMultiply(const MEDCouplingFieldDouble *fieldInput, MEDCouplingFieldDouble *fieldOutput)
633 {
634 }
635
636 /*!
637  * This method should be called immediately after _the_matrix_st has been filled with remote computed matrix
638  * put in this proc for Matrix-Vector.
639  * This method computes for these matrix the minimal set of source ids corresponding to the source proc id.
640  *
641  */
642 void OverlapMapping::updateZipSourceIdsForFuture()
643 {
644   /* When it is called, only the bits received from other processors (i.e. the remotely executed jobs) are in the
645     big matrix _the_matrix_st. */
646
647   CommInterface commInterface=_group.getCommInterface();
648   int myProcId=_group.myRank();
649   int nbOfMatrixRecveived=_the_matrix_st_source_proc_id.size();
650   for(int i=0;i<nbOfMatrixRecveived;i++)
651     {
652       int curSrcProcId=_the_matrix_st_source_proc_id[i];
653       if(curSrcProcId!=myProcId)
654         {
655           const std::vector< SparseDoubleVec >& mat=_the_matrix_st[i];
656           _src_ids_zip_proc_st2.push_back(curSrcProcId);
657           _src_ids_zip_st2.resize(_src_ids_zip_st2.size()+1);
658           std::set<int> s;
659           for(std::vector< SparseDoubleVec >::const_iterator it1=mat.begin();it1!=mat.end();it1++)
660             for(SparseDoubleVec::const_iterator it2=(*it1).begin();it2!=(*it1).end();it2++)
661               s.insert((*it2).first);
662           _src_ids_zip_st2.back().insert(_src_ids_zip_st2.back().end(),s.begin(),s.end());
663
664 //          std::stringstream scout;
665 //          scout << "(" << myProcId << ") pushed " << _src_ids_zip_st2.back().size() << " values for tgt proc " << curSrcProcId;
666 //          std::cout << scout.str() << "\n";
667         }
668     }
669 }
670
671 // void OverlapMapping::printTheMatrix() const
672 // {
673 //   CommInterface commInterface=_group.getCommInterface();
674 //   const MPIProcessorGroup *group=static_cast<const MPIProcessorGroup*>(&_group);
675 //   const MPI_Comm *comm=group->getComm();
676 //   int grpSize=_group.size();
677 //   int myProcId=_group.myRank();
678 //   std::stringstream oscerr;
679 //   int nbOfMat=_the_matrix_st.size();
680 //   oscerr << "(" <<  myProcId <<  ") I hold " << nbOfMat << " matrix(ces) : "<< std::endl;
681 //   for(int i=0;i<nbOfMat;i++)
682 //     {
683 //       oscerr << "   - Matrix #" << i << " coming from source proc #" << _the_matrix_st_source_proc_id[i] << ": ";
684 //       const std::vector< SparseDoubleVec >& locMat=_the_matrix_st[i];
685 //       int j = 0;
686 //       for(std::vector< SparseDoubleVec >::const_iterator it1=locMat.begin();it1!=locMat.end();it1++, j++)
687 //         {
688 //           oscerr << " Target Cell #" << j;
689 //           for(SparseDoubleVec::const_iterator it2=(*it1).begin();it2!=(*it1).end();it2++)
690 //             oscerr << " (" << (*it2).first << "," << (*it2).second << "), ";
691 //           oscerr << std::endl;
692 //         }
693 //     }
694 //   oscerr << "*********" << std::endl;
695 //
696 //   // Hope this will be flushed in one go:
697 //   std::cerr << oscerr.str() << std::endl;
698 ////   if(myProcId != 0)
699 ////     MPI_Barrier(MPI_COMM_WORLD);
700 // }
701 //
702 // void OverlapMapping::printMatrixesST() const
703 //  {
704 //    CommInterface commInterface=_group.getCommInterface();
705 //    const MPIProcessorGroup *group=static_cast<const MPIProcessorGroup*>(&_group);
706 //    const MPI_Comm *comm=group->getComm();
707 //    int grpSize=_group.size();
708 //    int myProcId=_group.myRank();
709 //    std::stringstream oscerr;
710 //    int nbOfMat=_matrixes_st.size();
711 //    oscerr << "(" <<  myProcId <<  ") I hold " << nbOfMat << " LOCAL matrix(ces) : "<< std::endl;
712 //    for(int i=0;i<nbOfMat;i++)
713 //      {
714 //        oscerr << "   - Matrix #" << i << ": (source proc #" << _source_proc_id_st[i] << " / tgt proc#" << _target_proc_id_st[i] << "):";
715 //        const std::vector< SparseDoubleVec >& locMat=_matrixes_st[i];
716 //        int j = 0;
717 //        for(std::vector< SparseDoubleVec >::const_iterator it1=locMat.begin();it1!=locMat.end();it1++, j++)
718 //          {
719 //            oscerr << " Target Cell #" << j;
720 //            for(SparseDoubleVec::const_iterator it2=(*it1).begin();it2!=(*it1).end();it2++)
721 //              oscerr << " (" << (*it2).first << "," << (*it2).second << "), ";
722 //            oscerr << std::endl;
723 //          }
724 //      }
725 //    oscerr << "*********" << std::endl;
726 //
727 //    // Hope this will be flushed in one go:
728 //    std::cerr << oscerr.str() << std::endl;
729 //  }