]> SALOME platform Git repositories - modules/med.git/blob - src/ParaMEDMEM/OverlapMapping.cxx
Salome HOME
1ef168ff443190be0f8df4a0ceb0763f9b491629
[modules/med.git] / src / ParaMEDMEM / OverlapMapping.cxx
1 // Copyright (C) 2007-2015  CEA/DEN, EDF R&D
2 //
3 // This library is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU Lesser General Public
5 // License as published by the Free Software Foundation; either
6 // version 2.1 of the License, or (at your option) any later version.
7 //
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
11 // Lesser General Public License for more details.
12 //
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
16 //
17 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
18 //
19 // Author : Anthony Geay (CEA/DEN)
20
21 #include "OverlapMapping.hxx"
22 #include "MPIProcessorGroup.hxx"
23
24 #include "MEDCouplingFieldDouble.hxx"
25 #include "MEDCouplingAutoRefCountObjectPtr.hxx"
26
27 #include "InterpKernelAutoPtr.hxx"
28
29 #include <numeric>
30 #include <algorithm>
31
32 using namespace ParaMEDMEM;
33
34 OverlapMapping::OverlapMapping(const ProcessorGroup& group, const OverlapElementLocator & loc):
35     _group(group),_locator(loc)
36 {
37 }
38
39 /*!
40  * Keeps the link between a given a proc holding source mesh data, and the corresponding cell IDs.
41  */
42 void OverlapMapping::keepTracksOfSourceIds(int procId, DataArrayInt *ids)
43 {
44   ids->incrRef();
45   _sent_src_ids_st2.push_back(ids);
46   _sent_src_proc_st2.push_back(procId);
47 }
48
49 /*!
50  * Same as keepTracksOfSourceIds() but for target mesh data.
51 */
52 void OverlapMapping::keepTracksOfTargetIds(int procId, DataArrayInt *ids)
53 {
54   ids->incrRef();
55   _sent_trg_ids_st2.push_back(ids);
56   _sent_trg_proc_st2.push_back(procId);
57 }
58
59 /*!
60  * This method stores in the local members the contribution coming from a matrix in format
61  * Target(rows)/Source(cols) for a source procId 'srcProcId' and for a target procId 'trgProcId'.
62  * All IDs received here (source and target) are in the format of local IDs.
63  *
64  * @param srcIds is null if the source mesh is on the local proc
65  * @param trgIds is null if the source mesh is on the local proc
66  *
67  * One of the 2 is necessarily null (the two can be null together)
68  */
69 void OverlapMapping::addContributionST(const std::vector< SparseDoubleVec >& matrixST, const DataArrayInt *srcIds, int srcProcId, const DataArrayInt *trgIds, int trgProcId)
70 {
71   _matrixes_st.push_back(matrixST);
72   _source_proc_id_st.push_back(srcProcId);
73   _target_proc_id_st.push_back(trgProcId);
74   if(srcIds)  // source mesh part is remote <=> srcProcId != myRank
75     {
76       _rcv_src_ids_proc_st2.push_back(srcProcId);
77       _nb_of_rcv_src_ids_proc_st2.push_back(srcIds->getNumberOfTuples());
78     }
79   else        // source mesh part is local
80     {
81       std::set<int> s;
82       // For all source IDs (=col indices) in the sparse matrix:
83       for(std::vector< SparseDoubleVec >::const_iterator it1=matrixST.begin();it1!=matrixST.end();it1++)
84         for(SparseDoubleVec::const_iterator it2=(*it1).begin();it2!=(*it1).end();it2++)
85           s.insert((*it2).first);
86       _src_ids_zip_proc_st2.push_back(trgProcId);
87       _src_ids_zip_st2.resize(_src_ids_zip_st2.size()+1);
88       _src_ids_zip_st2.back().insert(_src_ids_zip_st2.back().end(),s.begin(),s.end());
89     }
90 }
91
92 /*!
93  * This method is in charge to send matrices in AlltoAll mode.
94  *
95  * 'procsToSendField' gives the list of procs field data has to be sent to.
96  * See OverlapElementLocator::computeBoundingBoxesAndTodoList()
97  *
98  * After the call of this method, 'this' contains the matrixST for all source cells of the current proc
99  */
100 void OverlapMapping::prepare(const std::vector< int >& procsToSendField, int nbOfTrgElems)
101 {
102 #ifdef DEC_DEBUG
103   printMatrixesST();
104 #endif
105
106   CommInterface commInterface=_group.getCommInterface();
107   const MPIProcessorGroup *group=static_cast<const MPIProcessorGroup*>(&_group);
108   const MPI_Comm *comm=group->getComm();
109   int grpSize=_group.size();
110   INTERP_KERNEL::AutoPtr<int> nbsend=new int[grpSize];
111   INTERP_KERNEL::AutoPtr<int> nbsend2=new int[grpSize];
112   INTERP_KERNEL::AutoPtr<int> nbsend3=new int[grpSize];
113   std::fill<int *>(nbsend,nbsend+grpSize,0);
114   int myProcId=_group.myRank();
115   for(std::size_t i=0;i<_matrixes_st.size();i++)
116     if(_source_proc_id_st[i]==myProcId)
117       nbsend[_target_proc_id_st[i]]=_matrixes_st[i].size();
118   INTERP_KERNEL::AutoPtr<int> nbrecv=new int[grpSize];
119   commInterface.allToAll(nbsend,1,MPI_INT,nbrecv,1,MPI_INT,*comm);
120   //exchanging matrix
121   //first exchanging offsets+ids_source
122   INTERP_KERNEL::AutoPtr<int> nbrecv1=new int[grpSize];
123   INTERP_KERNEL::AutoPtr<int> nbrecv2=new int[grpSize];
124   //
125   int *tmp=0;
126   serializeMatrixStep0ST(nbrecv,
127                          tmp,nbsend2,nbsend3,
128                          nbrecv1,nbrecv2);
129   INTERP_KERNEL::AutoPtr<int> bigArr=tmp;
130   INTERP_KERNEL::AutoPtr<int> bigArrRecv=new int[nbrecv2[grpSize-1]+nbrecv1[grpSize-1]];
131   commInterface.allToAllV(bigArr,nbsend2,nbsend3,MPI_INT,
132                           bigArrRecv,nbrecv1,nbrecv2,MPI_INT,
133                           *comm);// sending ids of sparse matrix (n+1 elems)
134   //second phase echange target ids
135   std::fill<int *>(nbsend2,nbsend2+grpSize,0);
136   INTERP_KERNEL::AutoPtr<int> nbrecv3=new int[grpSize];
137   INTERP_KERNEL::AutoPtr<int> nbrecv4=new int[grpSize];
138   double *tmp2=0;
139   int lgthOfArr=serializeMatrixStep1ST(nbrecv,bigArrRecv,nbrecv1,nbrecv2,
140                                        tmp,tmp2,
141                                        nbsend2,nbsend3,nbrecv3,nbrecv4);
142   INTERP_KERNEL::AutoPtr<int> bigArr2=tmp;
143   INTERP_KERNEL::AutoPtr<double> bigArrD2=tmp2;
144   INTERP_KERNEL::AutoPtr<int> bigArrRecv2=new int[lgthOfArr];
145   INTERP_KERNEL::AutoPtr<double> bigArrDRecv2=new double[lgthOfArr];
146   commInterface.allToAllV(bigArr2,nbsend2,nbsend3,MPI_INT,
147                           bigArrRecv2,nbrecv3,nbrecv4,MPI_INT,
148                           *comm);
149   commInterface.allToAllV(bigArrD2,nbsend2,nbsend3,MPI_DOUBLE,
150                           bigArrDRecv2,nbrecv3,nbrecv4,MPI_DOUBLE,
151                           *comm);
152   //finishing
153   unserializationST(nbOfTrgElems,nbrecv,bigArrRecv,nbrecv1,nbrecv2,
154                     bigArrRecv2,bigArrDRecv2,nbrecv3,nbrecv4);
155   //updating _src_ids_zip_st2 and _src_ids_zip_st2 with received matrix.
156   updateZipSourceIdsForMultiply();
157   //finish to fill _the_matrix_st with already in place matrix in _matrixes_st (local computation)
158   finishToFillFinalMatrixST();
159   // Prepare proc list for future field data exchange (mutliply()):
160   _proc_ids_to_send_vector_st = procsToSendField;
161   // Make some space on local proc:
162   _matrixes_st.clear();
163
164 #ifdef DEC_DEBUG
165   printTheMatrix();
166 #endif
167 }
168
169 ///*!
170 // * Compute denominators for IntegralGlobConstraint interp.
171 // * TO BE REVISED: needs another communication since some bits are held non locally
172 // */
173 //void OverlapMapping::computeDenoGlobConstraint()
174 //{
175 //  _the_deno_st.clear();
176 //  std::size_t sz1=_the_matrix_st.size();
177 //  _the_deno_st.resize(sz1);
178 //  for(std::size_t i=0;i<sz1;i++)
179 //    {
180 //      std::size_t sz2=_the_matrix_st[i].size();
181 //      _the_deno_st[i].resize(sz2);
182 //      for(std::size_t j=0;j<sz2;j++)
183 //        {
184 //          double sum=0;
185 //          SparseDoubleVec& mToFill=_the_deno_st[i][j];
186 //          const SparseDoubleVec& m=_the_matrix_st[i][j];
187 //          for(SparseDoubleVec::const_iterator it=m.begin();it!=m.end();it++)
188 //            sum+=(*it).second;
189 //          for(SparseDoubleVec::const_iterator it=m.begin();it!=m.end();it++)
190 //            mToFill[(*it).first]=sum;
191 //        }
192 //    }
193 //    printDenoMatrix();
194 //}
195
196 ///*! Compute integral denominators
197 // * TO BE REVISED: needs another communication since some source areas are held non locally
198 // */
199 //void OverlapMapping::computeDenoIntegral()
200 //{
201 //  _the_deno_st.clear();
202 //  std::size_t sz1=_the_matrix_st.size();
203 //  _the_deno_st.resize(sz1);
204 //  for(std::size_t i=0;i<sz1;i++)
205 //    {
206 //      std::size_t sz2=_the_matrix_st[i].size();
207 //      _the_deno_st[i].resize(sz2);
208 //      for(std::size_t j=0;j<sz2;j++)
209 //        {
210 //          SparseDoubleVec& mToFill=_the_deno_st[i][j];
211 //          for(SparseDoubleVec::const_iterator it=mToFill.begin();it!=mToFill.end();it++)
212 //            mToFill[(*it).first] = sourceAreas;
213 //        }
214 //    }
215 //    printDenoMatrix();
216 //}
217
218 /*! Compute rev integral denominators
219   */
220 void OverlapMapping::computeDenoRevIntegral(const DataArrayDouble & targetAreas)
221 {
222   _the_deno_st.clear();
223   std::size_t sz1=_the_matrix_st.size();
224   _the_deno_st.resize(sz1);
225   const double * targetAreasP = targetAreas.getConstPointer();
226   for(std::size_t i=0;i<sz1;i++)
227     {
228       std::size_t sz2=_the_matrix_st[i].size();
229       _the_deno_st[i].resize(sz2);
230       for(std::size_t j=0;j<sz2;j++)
231         {
232           SparseDoubleVec& mToFill=_the_deno_st[i][j];
233           SparseDoubleVec& mToIterate=_the_matrix_st[i][j];
234           for(SparseDoubleVec::const_iterator it=mToIterate.begin();it!=mToIterate.end();it++)
235             mToFill[(*it).first] = targetAreasP[j];
236         }
237     }
238 //    printDenoMatrix();
239 }
240
241
242 /*!
243  * Compute denominators for ConvervativeVolumic interp.
244  */
245 void OverlapMapping::computeDenoConservativeVolumic(int nbOfTuplesTrg)
246 {
247   int myProcId=_group.myRank();
248   //
249   _the_deno_st.clear();
250   std::size_t sz1=_the_matrix_st.size();
251   _the_deno_st.resize(sz1);
252   std::vector<double> deno(nbOfTuplesTrg);
253   // Fills in the vector indexed by target cell ID:
254   for(std::size_t i=0;i<sz1;i++)
255     {
256       const std::vector< SparseDoubleVec >& mat=_the_matrix_st[i];
257       int curSrcId=_the_matrix_st_source_proc_id[i];
258       std::vector<int>::iterator isItem1=std::find(_sent_trg_proc_st2.begin(),_sent_trg_proc_st2.end(),curSrcId);
259       int rowId=0;
260       if(isItem1==_sent_trg_proc_st2.end() || curSrcId==myProcId) // Local computation: simple, because rowId of mat are directly target cell ids.
261         {
262           for(std::vector< SparseDoubleVec >::const_iterator it1=mat.begin();it1!=mat.end();it1++,rowId++)
263             for(SparseDoubleVec::const_iterator it2=(*it1).begin();it2!=(*it1).end();it2++)
264               deno[rowId]+=(*it2).second;
265         }
266       else  // matrix was received, remote computation
267         {
268           std::vector<int>::iterator fnd=isItem1;//std::find(_trg_proc_st2.begin(),_trg_proc_st2.end(),curSrcId);
269           int locId=std::distance(_sent_trg_proc_st2.begin(),fnd);
270           const DataArrayInt *trgIds=_sent_trg_ids_st2[locId];
271           const int *trgIds2=trgIds->getConstPointer();
272           for(std::vector< SparseDoubleVec >::const_iterator it1=mat.begin();it1!=mat.end();it1++,rowId++)
273             for(SparseDoubleVec::const_iterator it2=(*it1).begin();it2!=(*it1).end();it2++)
274               deno[trgIds2[rowId]]+=(*it2).second;
275         }
276     }
277   // Broadcast the vector into a structure similar to the initial sparse matrix of numerators:
278   for(std::size_t i=0;i<sz1;i++)
279     {
280       int rowId=0;
281       const std::vector< SparseDoubleVec >& mat=_the_matrix_st[i];
282       int curSrcId=_the_matrix_st_source_proc_id[i];
283       std::vector<int>::iterator isItem1=std::find(_sent_trg_proc_st2.begin(),_sent_trg_proc_st2.end(),curSrcId);
284       std::vector< SparseDoubleVec >& denoM=_the_deno_st[i];
285       denoM.resize(mat.size());
286       if(isItem1==_sent_trg_proc_st2.end() || curSrcId==myProcId)//item1 of step2 main algo. Simple, because rowId of mat are directly target ids.
287         {
288           int rowId=0;
289           for(std::vector< SparseDoubleVec >::const_iterator it1=mat.begin();it1!=mat.end();it1++,rowId++)
290             for(SparseDoubleVec::const_iterator it2=(*it1).begin();it2!=(*it1).end();it2++)
291               denoM[rowId][(*it2).first]=deno[rowId];
292         }
293       else
294         {
295           std::vector<int>::iterator fnd=isItem1;
296           int locId=std::distance(_sent_trg_proc_st2.begin(),fnd);
297           const DataArrayInt *trgIds=_sent_trg_ids_st2[locId];
298           const int *trgIds2=trgIds->getConstPointer();
299           for(std::vector< SparseDoubleVec >::const_iterator it1=mat.begin();it1!=mat.end();it1++,rowId++)
300             for(SparseDoubleVec::const_iterator it2=(*it1).begin();it2!=(*it1).end();it2++)
301               denoM[rowId][(*it2).first]=deno[trgIds2[rowId]];
302         }
303     }
304 //  printDenoMatrix();
305 }
306
307 /*!
308  * This method performs step #0/3 in serialization process.
309  * \param count tells specifies nb of elems to send to corresponding proc id. size equal to _group.size().
310  * \param offsets tells for a proc i where to start serialize#0 matrix. size equal to _group.size().
311  * \param nbOfElemsSrc of size _group.size(). Comes from previous all2all call. tells how many srcIds per proc contains matrix for current proc.
312  */
313 void OverlapMapping::serializeMatrixStep0ST(const int *nbOfElemsSrc, int *&bigArr, int *count, int *offsets,
314                                             int *countForRecv, int *offsetsForRecv) const
315 {
316   int grpSize=_group.size();
317   std::fill<int *>(count,count+grpSize,0);
318   int szz=0;
319   int myProcId=_group.myRank();
320   for(std::size_t i=0;i<_matrixes_st.size();i++)
321     {
322       if(_source_proc_id_st[i]==myProcId)// && _target_proc_id_st[i]!=myProcId
323         {
324           count[_target_proc_id_st[i]]=_matrixes_st[i].size()+1;
325           szz+=_matrixes_st[i].size()+1;
326         }
327     }
328   bigArr=new int[szz];
329   offsets[0]=0;
330   for(int i=1;i<grpSize;i++)
331     offsets[i]=offsets[i-1]+count[i-1];
332   for(std::size_t i=0;i<_matrixes_st.size();i++)
333     {
334       if(_source_proc_id_st[i]==myProcId)
335         {
336           int start=offsets[_target_proc_id_st[i]];
337           int *work=bigArr+start;
338           *work=0;
339           const std::vector< SparseDoubleVec >& mat=_matrixes_st[i];
340           for(std::vector< SparseDoubleVec >::const_iterator it=mat.begin();it!=mat.end();it++,work++)
341             work[1]=work[0]+(*it).size();
342         }
343     }
344   //
345   offsetsForRecv[0]=0;
346   for(int i=0;i<grpSize;i++)
347     {
348       if(nbOfElemsSrc[i]>0)
349         countForRecv[i]=nbOfElemsSrc[i]+1;
350       else
351         countForRecv[i]=0;
352       if(i>0)
353         offsetsForRecv[i]=offsetsForRecv[i-1]+countForRecv[i-1];
354     }
355 }
356
357 /*!
358  * This method performs step#1 and step#2/3. It returns the size of expected array to get allToAllV.
359  * It is where the locally computed matrices are serialized to be sent to adequate final proc.
360  */
361 int OverlapMapping::serializeMatrixStep1ST(const int *nbOfElemsSrc, const int *recvStep0, const int *countStep0, const int *offsStep0,
362                                            int *&bigArrI, double *&bigArrD, int *count, int *offsets,
363                                            int *countForRecv, int *offsForRecv) const
364 {
365   int grpSize=_group.size();
366   int myProcId=_group.myRank();
367   offsForRecv[0]=0;
368   int szz=0;
369   for(int i=0;i<grpSize;i++)
370     {
371       if(nbOfElemsSrc[i]!=0)
372         countForRecv[i]=recvStep0[offsStep0[i]+nbOfElemsSrc[i]];
373       else
374         countForRecv[i]=0;
375       szz+=countForRecv[i];
376       if(i>0)
377         offsForRecv[i]=offsForRecv[i-1]+countForRecv[i-1];
378     }
379   //
380   std::fill(count,count+grpSize,0);
381   offsets[0]=0;
382   int fullLgth=0;
383   for(std::size_t i=0;i<_matrixes_st.size();i++)
384     {
385       if(_source_proc_id_st[i]==myProcId)
386         {
387           const std::vector< SparseDoubleVec >& mat=_matrixes_st[i];
388           int lgthToSend=0;
389           for(std::vector< SparseDoubleVec >::const_iterator it=mat.begin();it!=mat.end();it++)
390             lgthToSend+=(*it).size();
391           count[_target_proc_id_st[i]]=lgthToSend;
392           fullLgth+=lgthToSend;
393         }
394     }
395   for(int i=1;i<grpSize;i++)
396     offsets[i]=offsets[i-1]+count[i-1];
397   //
398   bigArrI=new int[fullLgth];
399   bigArrD=new double[fullLgth];
400   // feeding arrays
401   fullLgth=0;
402   for(std::size_t i=0;i<_matrixes_st.size();i++)
403     {
404       if(_source_proc_id_st[i]==myProcId)
405         {
406           const std::vector< SparseDoubleVec >& mat=_matrixes_st[i];
407           for(std::vector< SparseDoubleVec >::const_iterator it1=mat.begin();it1!=mat.end();it1++)
408             {
409               int j=0;
410               for(SparseDoubleVec::const_iterator it2=(*it1).begin();it2!=(*it1).end();it2++,j++)
411                 {
412                   bigArrI[fullLgth+j]=(*it2).first;
413                   bigArrD[fullLgth+j]=(*it2).second;
414                 }
415               fullLgth+=(*it1).size();
416             }
417         }
418     }
419   return szz;
420 }
421
422 /*!
423  * This is the last step after all2Alls for matrix exchange.
424  * _the_matrix_st is the final matrix : 
425  *      - The first entry is srcId in current proc.
426  *      - The second is the pseudo id of source proc (correspondance with true id is in attribute _the_matrix_st_source_proc_id and _the_matrix_st_source_ids)
427  *      - the third is the srcId in the pseudo source proc
428  */
429 void OverlapMapping::unserializationST(int nbOfTrgElems,
430                                        const int *nbOfElemsSrcPerProc,//first all2all
431                                        const int *bigArrRecv, const int *bigArrRecvCounts, const int *bigArrRecvOffs,//2nd all2all
432                                        const int *bigArrRecv2, const double *bigArrDRecv2, const int *bigArrRecv2Count, const int *bigArrRecv2Offs)//3rd and 4th all2alls
433 {
434   _the_matrix_st.clear();
435   _the_matrix_st_source_proc_id.clear();
436   //
437   int grpSize=_group.size();
438   for(int i=0;i<grpSize;i++)
439     if(nbOfElemsSrcPerProc[i]!=0)
440       _the_matrix_st_source_proc_id.push_back(i);
441   int nbOfPseudoProcs=_the_matrix_st_source_proc_id.size();//_the_matrix_st_target_proc_id.size() contains number of matrix fetched remotely whose sourceProcId==myProcId
442   _the_matrix_st.resize(nbOfPseudoProcs);
443   //
444   int j=0;
445   for(int i=0;i<grpSize;i++)
446     if(nbOfElemsSrcPerProc[i]!=0)
447       {
448         _the_matrix_st[j].resize(nbOfElemsSrcPerProc[i]);
449         for(int k=0;k<nbOfElemsSrcPerProc[i];k++)
450           {
451             int offs=bigArrRecv[bigArrRecvOffs[i]+k];
452             int lgthOfMap=bigArrRecv[bigArrRecvOffs[i]+k+1]-offs;
453             for(int l=0;l<lgthOfMap;l++)
454               _the_matrix_st[j][k][bigArrRecv2[bigArrRecv2Offs[i]+offs+l]]=bigArrDRecv2[bigArrRecv2Offs[i]+offs+l];
455           }
456         j++;
457       }
458 }
459
460 /*!
461  * This method should be called when all remote matrix with sourceProcId==thisProcId have been retrieved and are
462  * in 'this->_the_matrix_st' and 'this->_the_matrix_st_target_proc_id' and 'this->_the_matrix_st_target_ids'.
463  * This method finish the job of filling 'this->_the_matrix_st' and 'this->_the_matrix_st_target_proc_id'
464  * by putting candidates in 'this->_matrixes_st' into them (i.e. local computation result).
465  */
466 void OverlapMapping::finishToFillFinalMatrixST()
467 {
468   int myProcId=_group.myRank();
469   int sz=_matrixes_st.size();
470   int nbOfEntryToAdd=0;
471   for(int i=0;i<sz;i++)
472     if(_source_proc_id_st[i]!=myProcId)
473       nbOfEntryToAdd++;
474   if(nbOfEntryToAdd==0)
475     return ;
476   int oldNbOfEntry=_the_matrix_st.size();
477   int newNbOfEntry=oldNbOfEntry+nbOfEntryToAdd;
478   _the_matrix_st.resize(newNbOfEntry);
479   int j=oldNbOfEntry;
480   for(int i=0;i<sz;i++)
481     if(_source_proc_id_st[i]!=myProcId)
482       {
483         const std::vector<SparseDoubleVec >& mat=_matrixes_st[i];
484         _the_matrix_st[j]=mat;
485         _the_matrix_st_source_proc_id.push_back(_source_proc_id_st[i]);
486         j++;
487       }
488 }
489
490
491 /*!
492  * This method performs a transpose multiply of 'fieldInput' and put the result into 'fieldOutput'.
493  * 'fieldInput' is expected to be the sourcefield and 'fieldOutput' the targetfield.
494  */
495 void OverlapMapping::multiply(const MEDCouplingFieldDouble *fieldInput, MEDCouplingFieldDouble *fieldOutput, double default_val) const
496 {
497   using namespace std;
498
499   int nbOfCompo=fieldInput->getNumberOfComponents();//to improve same number of components to test
500   CommInterface commInterface=_group.getCommInterface();
501   const MPIProcessorGroup *group=static_cast<const MPIProcessorGroup*>(&_group);
502   const MPI_Comm *comm=group->getComm();
503   int grpSize=_group.size();
504   int myProcID=_group.myRank();
505   //
506   INTERP_KERNEL::AutoPtr<int> nbsend=new int[grpSize];
507   INTERP_KERNEL::AutoPtr<int> nbsend2=new int[grpSize];
508   INTERP_KERNEL::AutoPtr<int> nbrecv=new int[grpSize];
509   INTERP_KERNEL::AutoPtr<int> nbrecv2=new int[grpSize];
510   fill<int *>(nbsend,nbsend+grpSize,0);
511   fill<int *>(nbrecv,nbrecv+grpSize,0);
512   nbsend2[0]=0;
513   nbrecv2[0]=0;
514   vector<double> valsToSend;
515
516   /*
517    * FIELD VALUE XCHGE:
518    * We call the 'BB source IDs' (bounding box source IDs) the set of source cell IDs transmitted just based on the bounding box information.
519    * This is potentially bigger than what is finally in the interp matrix and this is stored in _sent_src_ids_st2.
520    * We call 'interp source IDs' the set of source cell IDs with non null entries in the interp matrix. This is a sub-set of the above.
521    */
522   for(int procID=0;procID<grpSize;procID++)
523     {
524       /* SENDING part: compute field values to be SENT (and how many of them)
525        *   - for all proc 'procID' in group
526        *      * if procID == myProcID, send nothing
527        *      * elif 'procID' in _proc_ids_to_send_vector_st (computed from the BB intersection)
528        *        % if myProcID computed the job (myProcID, procID)
529        *           => send only 'interp source IDs' field values (i.e. IDs stored in _src_ids_zip_proc_st2)
530        *        % else (=we just sent mesh data to procID, but have never seen the matrix, i.e. matrix was computed remotely by procID)
531        *           => send 'BB source IDs' set of field values (i.e. IDs stored in _sent_src_ids_st2)
532        */
533       if (procID == myProcID)
534         nbsend[procID] = 0;
535       else
536         if(find(_proc_ids_to_send_vector_st.begin(),_proc_ids_to_send_vector_st.end(),procID)!=_proc_ids_to_send_vector_st.end())
537           {
538             MEDCouplingAutoRefCountObjectPtr<DataArrayDouble> vals;
539             if(_locator.isInMyTodoList(myProcID, procID))
540               {
541                 vector<int>::const_iterator isItem11 = find(_src_ids_zip_proc_st2.begin(),_src_ids_zip_proc_st2.end(),procID);
542                 if (isItem11 == _src_ids_zip_proc_st2.end())
543                   throw INTERP_KERNEL::Exception("OverlapMapping::multiply(): internal error: SEND: unexpected end iterator in _src_ids_zip_proc_st2!");
544                 int id=distance(_src_ids_zip_proc_st2.begin(),isItem11);
545                 int sz=_src_ids_zip_st2[id].size();
546                 vals=fieldInput->getArray()->selectByTupleId(&(_src_ids_zip_st2[id])[0],&(_src_ids_zip_st2[id])[0]+sz);
547               }
548             else
549               {
550                 vector<int>::const_iterator isItem11 = find(_sent_src_proc_st2.begin(),_sent_src_proc_st2.end(),procID );
551                 if (isItem11 == _sent_src_proc_st2.end())
552                   throw INTERP_KERNEL::Exception("OverlapMapping::multiply(): internal error: SEND: unexpected end iterator in _sent_src_proc_st2!");
553                 int id=distance(_sent_src_proc_st2.begin(),isItem11);
554                 vals=fieldInput->getArray()->selectByTupleId(*_sent_src_ids_st2[id]);
555               }
556             nbsend[procID] = vals->getNbOfElems();
557             valsToSend.insert(valsToSend.end(),vals->getConstPointer(),vals->getConstPointer()+nbsend[procID]);
558           }
559
560       /* RECEIVE: compute number of field values to be RECEIVED
561        *   - for all proc 'procID' in group
562        *      * if procID == myProcID, rcv nothing
563        *      * elif 'procID' in _proc_ids_to_recv_vector_st (computed from BB intersec)
564        *        % if myProcID computed the job (procID, myProcID)
565        *          => receive full set ('BB source IDs') of field data from proc #procID which has never seen the matrix
566        *             i.e. prepare to receive the numb in _nb_of_rcv_src_ids_proc_st2
567        *        % else (=we did NOT compute the job, hence procID has, and knows the matrix)
568        *          => receive 'interp source IDs' set of field values
569        */
570       const std::vector< int > & _proc_ids_to_recv_vector_st = _the_matrix_st_source_proc_id;
571       if (procID == myProcID)
572         nbrecv[procID] = 0;
573       else
574         if(find(_proc_ids_to_recv_vector_st.begin(),_proc_ids_to_recv_vector_st.end(),procID)!=_proc_ids_to_recv_vector_st.end())
575           {
576             if(_locator.isInMyTodoList(procID, myProcID))
577               {
578                 vector<int>::const_iterator isItem11 = find(_rcv_src_ids_proc_st2.begin(),_rcv_src_ids_proc_st2.end(),procID);
579                 if (isItem11 == _rcv_src_ids_proc_st2.end())
580                   throw INTERP_KERNEL::Exception("OverlapMapping::multiply(): internal error: RCV: unexpected end iterator in _rcv_src_ids_proc_st2!");
581                 int id=distance(_rcv_src_ids_proc_st2.begin(),isItem11);
582                 nbrecv[procID] = _nb_of_rcv_src_ids_proc_st2[id];
583               }
584             else
585               {
586                 vector<int>::const_iterator isItem11 = find(_src_ids_zip_proc_st2.begin(),_src_ids_zip_proc_st2.end(),procID);
587                 if (isItem11 == _src_ids_zip_proc_st2.end())
588                   throw INTERP_KERNEL::Exception("OverlapMapping::multiply(): internal error: RCV: unexpected end iterator in _src_ids_zip_proc_st2!");
589                 int id=distance(_src_ids_zip_proc_st2.begin(),isItem11);
590                 nbrecv[procID] = _src_ids_zip_st2[id].size()*nbOfCompo;
591               }
592           }
593     }
594   // Compute offsets in the sending/receiving array.
595   for(int i=1;i<grpSize;i++)
596     {
597       nbsend2[i]=nbsend2[i-1]+nbsend[i-1];
598       nbrecv2[i]=nbrecv2[i-1]+nbrecv[i-1];
599     }
600   INTERP_KERNEL::AutoPtr<double> bigArr=new double[nbrecv2[grpSize-1]+nbrecv[grpSize-1]];
601
602 #ifdef DEC_DEBUG
603   stringstream scout;
604   scout << "("  << myProcID << ") nbsend :" << nbsend[0] << "," << nbsend[1] << "," << nbsend[2] << "\n";
605   scout << "("  << myProcID << ") nbrecv :" << nbrecv[0] << "," << nbrecv[1] << "," << nbrecv[2] << "\n";
606   scout << "("  << myProcID << ") valsToSend: ";
607   for (int iii=0; iii<valsToSend.size(); iii++)
608     scout << ", " << valsToSend[iii];
609 #endif
610
611   /*
612    * *********************** ALL-TO-ALL
613    */
614   commInterface.allToAllV(&valsToSend[0],nbsend,nbsend2,MPI_DOUBLE,
615                           bigArr,nbrecv,nbrecv2,MPI_DOUBLE,*comm);
616 #ifdef DEC_DEBUG
617   MPI_Barrier(MPI_COMM_WORLD);
618   scout << "("  << myProcID << ") bigArray: ";
619     for (int iii=0; iii<nbrecv2[grpSize-1]+nbrecv[grpSize-1]; iii++)
620       scout << ", " << bigArr[iii];
621   cout << scout.str() << "\n";
622 #endif
623
624   /*
625    * TARGET FIELD COMPUTATION (matrix-vec computation)
626    */
627   fieldOutput->getArray()->fillWithZero();
628   INTERP_KERNEL::AutoPtr<double> tmp=new double[nbOfCompo];
629
630   // By default field value set to default value - so mark which cells are hit
631   INTERP_KERNEL::AutoPtr<bool> hit_cells = new bool[fieldOutput->getNumberOfTuples()];
632
633   for(vector<int>::const_iterator itProc=_the_matrix_st_source_proc_id.begin(); itProc != _the_matrix_st_source_proc_id.end();itProc++)
634   // For each source processor corresponding to a locally held matrix:
635     {
636       int srcProcID = *itProc;
637       int id = distance(_the_matrix_st_source_proc_id.begin(),itProc);
638       const vector< SparseDoubleVec >& mat =_the_matrix_st[id];
639       const vector< SparseDoubleVec >& deno = _the_deno_st[id];
640
641       /*   FINAL MULTIPLICATION
642        *      * if srcProcID == myProcID, local multiplication without any mapping
643        *         => for all target cell ID 'tgtCellID'
644        *           => for all src cell ID 'srcCellID' in the sparse vector
645        *             => tgtFieldLocal[tgtCellID] += srcFieldLocal[srcCellID] * matrix[tgtCellID][srcCellID] / deno[tgtCellID][srcCellID]
646        */
647       if (srcProcID == myProcID)
648         {
649           int nbOfTrgTuples=mat.size();
650           double * targetBase = fieldOutput->getArray()->getPointer();
651           for(int j=0; j<nbOfTrgTuples; j++)
652             {
653               const SparseDoubleVec& mat1=mat[j];
654               const SparseDoubleVec& deno1=deno[j];
655               SparseDoubleVec::const_iterator it5=deno1.begin();
656               const double * localSrcField = fieldInput->getArray()->getConstPointer();
657               double * targetPt = targetBase+j*nbOfCompo;
658               for(SparseDoubleVec::const_iterator it3=mat1.begin();it3!=mat1.end();it3++,it5++)
659                 {
660                   // Apply the multiplication for all components:
661                   double ratio = (*it3).second/(*it5).second;
662                   transform(localSrcField+((*it3).first)*nbOfCompo,
663                             localSrcField+((*it3).first+1)*nbOfCompo,
664                             (double *)tmp,
665                             bind2nd(multiplies<double>(),ratio) );
666                   // Accumulate with current value:
667                   transform((double *)tmp,(double *)tmp+nbOfCompo,targetPt,targetPt,plus<double>());
668                   hit_cells[j] = true;
669                 }
670             }
671         }
672
673       if(nbrecv[srcProcID]<=0)  // also covers the preceding 'if'
674         continue;
675
676       /*      * if something was received
677        *         %  if received matrix (=we didn't compute the job), this means that :
678        *            1. we sent part of our targetIDs to srcProcID before, so that srcProcId can do the computation.
679        *            2. srcProcID has sent us only the 'interp source IDs' field values
680        *            => invert _src_ids_zip_st2 -> 'revert_zip'
681        *            => for all target cell ID 'tgtCellID'
682        *              => mappedTgtID = _sent_trg_ids_st2[srcProcID][tgtCellID]
683        *              => for all src cell ID 'srcCellID' in the sparse vector
684        *                 => idx = revert_zip[srcCellID]
685        *                 => tgtFieldLocal[mappedTgtID] += rcvValue[srcProcID][idx] * matrix[tgtCellID][srcCellID] / deno[tgtCellID][srcCellID]
686        */
687       if(!_locator.isInMyTodoList(srcProcID, myProcID))
688         {
689           // invert _src_ids_zip_proc_st2
690           map<int,int> revert_zip;
691           vector< int >::const_iterator it11= find(_src_ids_zip_proc_st2.begin(),_src_ids_zip_proc_st2.end(),srcProcID);
692           if (it11 == _src_ids_zip_proc_st2.end())
693             throw INTERP_KERNEL::Exception("OverlapMapping::multiply(): internal error: MULTIPLY: unexpected end iterator in _src_ids_zip_proc_st2!");
694           int id1=distance(_src_ids_zip_proc_st2.begin(),it11);
695           int newId=0;
696           for(vector<int>::const_iterator it=_src_ids_zip_st2[id1].begin();it!=_src_ids_zip_st2[id1].end();it++,newId++)
697             revert_zip[*it]=newId;
698           vector<int>::const_iterator isItem24 = find(_sent_trg_proc_st2.begin(),_sent_trg_proc_st2.end(),srcProcID);
699           if (isItem24 == _sent_trg_proc_st2.end())
700             throw INTERP_KERNEL::Exception("OverlapMapping::multiply(): internal error: MULTIPLY: unexpected end iterator in _sent_trg_proc_st2!");
701           int id2 = distance(_sent_trg_proc_st2.begin(),isItem24);
702           const DataArrayInt *tgrIdsDA=_sent_trg_ids_st2[id2];
703           const int *tgrIds = tgrIdsDA->getConstPointer();
704
705           int nbOfTrgTuples=mat.size();
706           double * targetBase = fieldOutput->getArray()->getPointer();
707           for(int j=0;j<nbOfTrgTuples;j++)
708             {
709               const SparseDoubleVec& mat1=mat[j];
710               const SparseDoubleVec& deno1=deno[j];
711               SparseDoubleVec::const_iterator it5=deno1.begin();
712               double * targetPt = targetBase+tgrIds[j]*nbOfCompo;
713               for(SparseDoubleVec::const_iterator it3=mat1.begin();it3!=mat1.end();it3++,it5++)
714                 {
715                   map<int,int>::const_iterator it4=revert_zip.find((*it3).first);
716                   if(it4==revert_zip.end())
717                     throw INTERP_KERNEL::Exception("OverlapMapping::multiply(): internal error: MULTIPLY: unexpected end iterator in revert_zip!");
718                   double ratio = (*it3).second/(*it5).second;
719                   transform(bigArr+nbrecv2[srcProcID]+((*it4).second)*nbOfCompo,
720                             bigArr+nbrecv2[srcProcID]+((*it4).second+1)*nbOfCompo,
721                             (double *)tmp,
722                             bind2nd(multiplies<double>(),ratio) );
723                   transform((double *)tmp,(double *)tmp+nbOfCompo,targetPt,targetPt,plus<double>());
724                   hit_cells[tgrIds[j]] = true;
725                 }
726             }
727         }
728       else
729         /*         % else (=we computed the job and we received the 'BB source IDs' set of source field values)
730          *            => for all target cell ID 'tgtCellID'
731          *              => for all src cell ID 'srcCellID' in the sparse vector
732          *                => tgtFieldLocal[tgtCellID] += rcvValue[srcProcID][srcCellID] * matrix[tgtCellID][srcCellID] / deno[tgtCellID][srcCellID]
733          */
734         {
735           // Same loop as in the case srcProcID == myProcID, except that instead of working on local field data, we work on bigArr
736           int nbOfTrgTuples=mat.size();
737           double * targetBase = fieldOutput->getArray()->getPointer();
738           for(int j=0;j<nbOfTrgTuples;j++)
739             {
740               const SparseDoubleVec& mat1=mat[j];
741               const SparseDoubleVec& deno1=deno[j];
742               SparseDoubleVec::const_iterator it5=deno1.begin();
743               double * targetPt = targetBase+j*nbOfCompo;
744               for(SparseDoubleVec::const_iterator it3=mat1.begin();it3!=mat1.end();it3++,it5++)
745                 {
746                   // Apply the multiplication for all components:
747                   double ratio = (*it3).second/(*it5).second;
748                   transform(bigArr+nbrecv2[srcProcID]+((*it3).first)*nbOfCompo,
749                             bigArr+nbrecv2[srcProcID]+((*it3).first+1)*nbOfCompo,
750                             (double *)tmp,
751                             bind2nd(multiplies<double>(),ratio));
752                   // Accumulate with current value:
753                   transform((double *)tmp,(double *)tmp+nbOfCompo,targetPt,targetPt,plus<double>());
754                   hit_cells[j] = true;
755                 }
756             }
757         }
758     }
759
760   // Fill in default values for cells which haven't been hit:
761   int i = 0;
762   for(bool * hit_cells_ptr=hit_cells; i< fieldOutput->getNumberOfTuples(); hit_cells_ptr++,i++)
763     if (!(*hit_cells_ptr))
764       {
765         double * targetPt=fieldOutput->getArray()->getPointer();
766         fill(targetPt+i*nbOfCompo, targetPt+(i+1)*nbOfCompo, default_val);
767       }
768 }
769
770 /*!
771  * This method performs a transpose multiply of 'fieldInput' and put the result into 'fieldOutput'.
772  * 'fieldInput' is expected to be the targetfield and 'fieldOutput' the sourcefield.
773  */
774 void OverlapMapping::transposeMultiply(const MEDCouplingFieldDouble *fieldInput, MEDCouplingFieldDouble *fieldOutput)
775 {
776 }
777
778 /*!
779  * This method should be called immediately after _the_matrix_st has been filled with remote computed matrix
780  * put in this proc for Matrix-Vector.
781  * It finishes the filling _src_ids_zip_st2 and _src_ids_zip_proc_st2 (see member doc)
782  */
783 void OverlapMapping::updateZipSourceIdsForMultiply()
784 {
785   /* When it is called, only the bits received from other processors (i.e. the remotely executed jobs) are in the
786     big matrix _the_matrix_st. */
787
788   CommInterface commInterface=_group.getCommInterface();
789   int myProcId=_group.myRank();
790   int nbOfMatrixRecveived=_the_matrix_st_source_proc_id.size();
791   for(int i=0;i<nbOfMatrixRecveived;i++)
792     {
793       int curSrcProcId=_the_matrix_st_source_proc_id[i];
794       if(curSrcProcId!=myProcId)  // if =, data has been populated by addContributionST()
795         {
796           const std::vector< SparseDoubleVec >& mat=_the_matrix_st[i];
797           _src_ids_zip_proc_st2.push_back(curSrcProcId);
798           _src_ids_zip_st2.resize(_src_ids_zip_st2.size()+1);
799           std::set<int> s;
800           for(std::vector< SparseDoubleVec >::const_iterator it1=mat.begin();it1!=mat.end();it1++)
801             for(SparseDoubleVec::const_iterator it2=(*it1).begin();it2!=(*it1).end();it2++)
802               s.insert((*it2).first);
803           _src_ids_zip_st2.back().insert(_src_ids_zip_st2.back().end(),s.begin(),s.end());
804         }
805     }
806 }
807
808 #ifdef DEC_DEBUG
809  void OverlapMapping::printTheMatrix() const
810  {
811    CommInterface commInterface=_group.getCommInterface();
812    const MPIProcessorGroup *group=static_cast<const MPIProcessorGroup*>(&_group);
813    const MPI_Comm *comm=group->getComm();
814    int grpSize=_group.size();
815    int myProcId=_group.myRank();
816    std::stringstream oscerr;
817    int nbOfMat=_the_matrix_st.size();
818    oscerr << "(" <<  myProcId <<  ") I hold " << nbOfMat << " matrix(ces) : "<< std::endl;
819    for(int i=0;i<nbOfMat;i++)
820      {
821        oscerr << "   - Matrix #" << i << " coming from source proc #" << _the_matrix_st_source_proc_id[i] << ":\n ";
822        const std::vector< SparseDoubleVec >& locMat=_the_matrix_st[i];
823        int j = 0;
824        for(std::vector< SparseDoubleVec >::const_iterator it1=locMat.begin();it1!=locMat.end();it1++, j++)
825          {
826            oscerr << " Target Cell #" << j;
827            for(SparseDoubleVec::const_iterator it2=(*it1).begin();it2!=(*it1).end();it2++)
828              oscerr << " (" << (*it2).first << "," << (*it2).second << "), ";
829            oscerr << std::endl;
830          }
831      }
832    oscerr << "*********" << std::endl;
833
834    // Hope this will be flushed in one go:
835    std::cerr << oscerr.str() << std::endl;
836 //   if(myProcId != 0)
837 //     MPI_Barrier(MPI_COMM_WORLD);
838  }
839
840  void OverlapMapping::printMatrixesST() const
841   {
842     CommInterface commInterface=_group.getCommInterface();
843     const MPIProcessorGroup *group=static_cast<const MPIProcessorGroup*>(&_group);
844     const MPI_Comm *comm=group->getComm();
845     int grpSize=_group.size();
846     int myProcId=_group.myRank();
847     std::stringstream oscerr;
848     int nbOfMat=_matrixes_st.size();
849     oscerr << "(" <<  myProcId <<  ") I hold " << nbOfMat << " LOCAL matrix(ces) : "<< std::endl;
850     for(int i=0;i<nbOfMat;i++)
851       {
852         oscerr << "   - Matrix #" << i << ": (source proc #" << _source_proc_id_st[i] << " / tgt proc#" << _target_proc_id_st[i] << "): \n";
853         const std::vector< SparseDoubleVec >& locMat=_matrixes_st[i];
854         int j = 0;
855         for(std::vector< SparseDoubleVec >::const_iterator it1=locMat.begin();it1!=locMat.end();it1++, j++)
856           {
857             oscerr << " Target Cell #" << j;
858             for(SparseDoubleVec::const_iterator it2=(*it1).begin();it2!=(*it1).end();it2++)
859               oscerr << " (" << (*it2).first << "," << (*it2).second << "), ";
860             oscerr << std::endl;
861           }
862       }
863     oscerr << "*********" << std::endl;
864
865     // Hope this will be flushed in one go:
866     std::cerr << oscerr.str() << std::endl;
867   }
868
869  void OverlapMapping::printDenoMatrix() const
870    {
871      CommInterface commInterface=_group.getCommInterface();
872      const MPIProcessorGroup *group=static_cast<const MPIProcessorGroup*>(&_group);
873      const MPI_Comm *comm=group->getComm();
874      int grpSize=_group.size();
875      int myProcId=_group.myRank();
876      std::stringstream oscerr;
877      int nbOfMat=_the_deno_st.size();
878      oscerr << "(" <<  myProcId <<  ") I hold " << nbOfMat << " DENOMINATOR matrix(ces) : "<< std::endl;
879      for(int i=0;i<nbOfMat;i++)
880        {
881          oscerr << "   - Matrix #" << i << " coming from source proc #" << _the_matrix_st_source_proc_id[i] << ": \n";
882          const std::vector< SparseDoubleVec >& locMat=_the_deno_st[i];
883          int j = 0;
884          for(std::vector< SparseDoubleVec >::const_iterator it1=locMat.begin();it1!=locMat.end();it1++, j++)
885            {
886              oscerr << " Target Cell #" << j;
887              for(SparseDoubleVec::const_iterator it2=(*it1).begin();it2!=(*it1).end();it2++)
888                oscerr << " (" << (*it2).first << "," << (*it2).second << "), ";
889              oscerr << std::endl;
890            }
891        }
892      oscerr << "*********" << std::endl;
893
894      // Hope this will be flushed in one go:
895      std::cerr << oscerr.str() << std::endl;
896    }
897 #endif