Salome HOME
refactor!: remove adm_local/ directory
[tools/medcoupling.git] / src / ParaMEDMEM / OverlapElementLocator.cxx
1 // Copyright (C) 2007-2024  CEA, EDF
2 //
3 // This library is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU Lesser General Public
5 // License as published by the Free Software Foundation; either
6 // version 2.1 of the License, or (at your option) any later version.
7 //
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
11 // Lesser General Public License for more details.
12 //
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
16 //
17 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
18 //
19 // Author : Anthony Geay (CEA/DEN)
20
21 #include "OverlapElementLocator.hxx"
22
23 #include "CommInterface.hxx"
24 #include "Topology.hxx"
25 #include "BlockTopology.hxx"
26 #include "ParaFIELD.hxx"
27 #include "ParaMESH.hxx"
28 #include "ProcessorGroup.hxx"
29 #include "MPIProcessorGroup.hxx"
30 #include "OverlapInterpolationMatrix.hxx"
31 #include "MEDCouplingFieldDouble.hxx"
32 #include "MEDCouplingFieldDiscretization.hxx"
33 #include "DirectedBoundingBox.hxx"
34 #include "InterpKernelAutoPtr.hxx"
35
36 #include <limits>
37
38 using namespace std;
39
40 namespace MEDCoupling 
41
42   const int OverlapElementLocator::START_TAG_MESH_XCH = 1140;
43
44   OverlapElementLocator::OverlapElementLocator(const ParaFIELD *sourceField, const ParaFIELD *targetField,
45                                                const ProcessorGroup& group, double epsAbs, int workSharingAlgo)
46     : _local_source_field(sourceField),
47       _local_target_field(targetField),
48       _local_source_mesh(0),
49       _local_target_mesh(0),
50       _domain_bounding_boxes(0),
51       _epsAbs(epsAbs),
52       _group(group)
53   { 
54     if(_local_source_field)
55       _local_source_mesh=_local_source_field->getSupport()->getCellMesh();
56     if(_local_target_field)
57       _local_target_mesh=_local_target_field->getSupport()->getCellMesh();
58     _comm=getCommunicator();
59
60     computeBoundingBoxesAndInteractionList();
61     switch(workSharingAlgo)
62     {
63       case 0:
64         computeTodoList_original();   break;
65       case 1:
66         computeTodoList_new(false);   break;
67       case 2:
68         computeTodoList_new(true);    break;
69       default:
70         throw INTERP_KERNEL::Exception("OverlapElementLocator::OverlapElementLocator(): invalid algorithm selected!");
71     }
72
73     fillProcToSend();
74   }
75
76   OverlapElementLocator::~OverlapElementLocator()
77   {
78     delete [] _domain_bounding_boxes;
79   }
80
81   const MPI_Comm *OverlapElementLocator::getCommunicator() const
82   {
83     const MPIProcessorGroup* group=static_cast<const MPIProcessorGroup*>(&_group);
84     return group->getComm();
85   }
86
87   void OverlapElementLocator::computeBoundingBoxesAndInteractionList()
88   {
89     CommInterface comm_interface=_group.getCommInterface();
90     const MPIProcessorGroup* group=static_cast<const MPIProcessorGroup*> (&_group);
91     _local_space_dim=0;
92     if(_local_source_mesh)
93       _local_space_dim=_local_source_mesh->getSpaceDimension();
94     else
95       _local_space_dim=_local_target_mesh->getSpaceDimension();
96     //
97     const MPI_Comm* comm = group->getComm();
98     int bbSize=2*2*_local_space_dim;//2 (for source/target) 2 (min/max)
99     _domain_bounding_boxes=new double[bbSize*_group.size()];
100     INTERP_KERNEL::AutoPtr<double> minmax=new double[bbSize];
101     //Format minmax : Xmin_src,Xmax_src,Ymin_src,Ymax_src,Zmin_src,Zmax_src,Xmin_trg,Xmax_trg,Ymin_trg,Ymax_trg,Zmin_trg,Zmax_trg
102     if(_local_source_mesh)
103       _local_source_mesh->getBoundingBox(minmax);
104     else
105       {
106         for(int i=0;i<_local_space_dim;i++)
107           {
108             minmax[i*2]=std::numeric_limits<double>::max();
109             minmax[i*2+1]=-std::numeric_limits<double>::max();
110           }
111       }
112     if(_local_target_mesh)
113       _local_target_mesh->getBoundingBox(minmax+2*_local_space_dim);
114     else
115       {
116         for(int i=0;i<_local_space_dim;i++)
117           {
118             minmax[i*2+2*_local_space_dim]=std::numeric_limits<double>::max();
119             minmax[i*2+1+2*_local_space_dim]=-std::numeric_limits<double>::max();
120           }
121       }
122     comm_interface.allGather(minmax, bbSize, MPI_DOUBLE,
123                              _domain_bounding_boxes,bbSize, MPI_DOUBLE, 
124                              *comm);
125   
126     // Computation of all pairs needing an interpolation - pairs are duplicated now !
127     _proc_pairs.clear();//first is source second is target
128     _proc_pairs.resize(_group.size());
129     for(int i=0;i<_group.size();i++)
130       for(int j=0;j<_group.size();j++)
131         if(intersectsBoundingBox(i,j))
132           _proc_pairs[i].push_back(j);
133   }
134
135   /*! See main OverlapDEC documentation for an explanation on this one. This is the original work sharing algorithm.
136    */
137   void OverlapElementLocator::computeTodoList_original()
138   {
139     // OK now let's assigning as balanced as possible, job to each proc of group
140     _all_todo_lists.resize(_group.size());
141     for(int i = 0; i < _group.size(); i++)
142       for(const int j: _proc_pairs[i])
143         {
144           if(_all_todo_lists[i].size()<=_all_todo_lists[j].size())//it includes the fact that i==j
145             _all_todo_lists[i].push_back(ProcCouple(i,j));
146           else
147             _all_todo_lists[j].push_back(ProcCouple(i,j));
148         }
149     //Keeping todo list of current proc. _to_do_list contains a set of pair where at least _group.myRank() appears once.
150     //This proc will be in charge to perform interpolation of any of element of '_to_do_list'
151     //If _group.myRank()==myPair.first, current proc should fetch target mesh of myPair.second (if different from _group.myRank()).
152     //If _group.myRank()==myPair.second, current proc should fetch source mesh of myPair.second.
153
154     int myProcId=_group.myRank();
155     _to_do_list=_all_todo_lists[myProcId];
156
157 #ifdef DEC_DEBUG
158     std::stringstream scout;
159     scout << "(" << myProcId << ") my TODO list is: ";
160     for (const ProcCouple& pc: _to_do_list)
161       scout << "(" << pc.first << "," << pc.second << ")";
162     std::cout << scout.str() << "\n";
163 #endif
164   }
165
166   /*! More efficient (?) work sharing algorithm: a job (i,j) is initially assigned twice: to proc#i and to proc#j.
167    * Then try to reduce as much as possible the variance of the num of jobs per proc by selecting the right duplicate
168    * to remove:
169    *  - take the most loaded proc i,
170    *    + select the job (i,j) for which proc#j is the less loaded
171    *    + remove this job from proc#i, and mark it as 'unremovable' from proc#j
172    *  - repeat until no more duplicates are found
173    */
174   void OverlapElementLocator::computeTodoList_new(bool revertIter)
175   {
176     using namespace std;
177     int infinity = std::numeric_limits<int>::max();
178
179     //
180     // Initialisation
181     //
182     int grp_size = _group.size();
183     //    For each proc, a map giving for a job (=an interaction (i,j)) its 'load', i.e. the amount of work found on proc #j
184     vector < map<ProcCouple, int> > full_set(grp_size );
185     for(int srcProcID = 0; srcProcID < _group.size(); srcProcID++)
186       for(const int tgtProcID: _proc_pairs[srcProcID])
187         {
188           // Here a pair of the form (i,i) is added only once!
189           ProcCouple cpl = make_pair(srcProcID, tgtProcID);
190           // The interaction (i,j) is initially given to both procs #i and #j - load is initialised at -1:
191           full_set[srcProcID][cpl] = -1;
192           full_set[tgtProcID][cpl] = -1;
193         }
194     //    Compute load:
195     int procID = 0;
196     for(auto& itVector : full_set)
197       {
198         for (auto &mapIt : itVector)
199           {
200             const ProcCouple & cpl = mapIt.first;
201             if (cpl.first == cpl.second)   // interaction (i,i) : can not be removed:
202               // special case - this couple can not be removed in the future
203               mapIt.second = infinity;
204             else
205               {
206                 if(cpl.first == procID)
207                   mapIt.second = (int)full_set[cpl.second].size();
208                 else // cpl.second == srcProcID
209                   mapIt.second = (int)full_set[cpl.first].size();
210               }
211           }
212         procID++;
213       }
214     INTERP_KERNEL::AutoPtr<bool> proc_valid = new bool[grp_size];
215     fill((bool *)proc_valid, proc_valid+grp_size, true);
216
217     //
218     // Now the algo:
219     //
220     while (find((bool *)proc_valid, proc_valid+grp_size, true) != proc_valid+grp_size) // as long as proc_valid is not full of 'false'
221       {
222         // Most loaded proc:
223         int max_sz = -1, max_id = -1;
224         int procID = 0;
225         for(const auto& a_set: full_set)
226           {
227             int sz = (int)a_set.size();
228             if (proc_valid[procID] && sz > max_sz)
229               {
230                 max_sz = sz;
231                 max_id = procID;
232               }
233             procID++;
234           }
235
236         // Nothing more to do:
237         if (max_sz == -1) break;
238         // For this proc, job with less loaded second proc:
239         int min_sz = infinity;
240         map<ProcCouple, int> & max_map = full_set[max_id];
241         ProcCouple hit_cpl = make_pair(-1,-1);
242         if(revertIter)
243           {
244           // Use a reverse iterator here increases our chances to hit a couple of the form (i, myProcId)
245           // meaning that the final matrix computed won't have to be sent: save some comm.
246           for(auto ritMap=max_map.rbegin(); ritMap != max_map.rend(); ritMap++)
247             if ((*ritMap).second < min_sz)
248               {
249                 hit_cpl = (*ritMap).first;
250                 min_sz = (*ritMap).second;
251               }
252           }
253         else
254           {
255             for(const auto& mapIt : max_map)
256               if (mapIt.second < min_sz)
257                 {
258                   hit_cpl = mapIt.first;
259                   min_sz = mapIt.second;
260                 }
261           }
262         if (hit_cpl.first == -1)
263           {
264             // Plouf. Current proc 'max_id' can not be reduced. Invalid it and move next:
265             proc_valid[max_id] = false;
266             continue;
267           }
268         // Remove item from proc 'max_id'
269         full_set[max_id].erase(hit_cpl);
270         // And mark it as not removable on the other side:
271         if (hit_cpl.first == max_id)
272           full_set[hit_cpl.second][hit_cpl] = infinity;
273         else  // hit_cpl.second == max_id
274           full_set[hit_cpl.first][hit_cpl] = infinity;
275
276         // Now update all counts of valid maps:
277         procID = 0;
278         for(auto& itVector: full_set)
279           {
280             if(proc_valid[procID] && procID != max_id)
281               for(auto& mapIt: itVector)
282                 {
283                   const ProcCouple & cpl = mapIt.first;
284                   if (mapIt.second == infinity)  // Unremovable item:
285                     continue;
286                   if (cpl.first == max_id || cpl.second == max_id)
287                     mapIt.second--;
288                 }
289             procID++;
290           }
291       }
292
293     //
294     // Final formatting - extract remaining keys in each map:
295     //
296     int myProcId = _group.myRank();
297     _all_todo_lists.resize(grp_size);
298     procID = 0;
299     for(const auto& itVector: full_set)
300       {
301         for(const auto& mapIt: itVector)
302           _all_todo_lists[procID].push_back(mapIt.first);
303         procID++;
304       }
305     _to_do_list=_all_todo_lists[myProcId];
306
307 #ifdef DEC_DEBUG
308     std::stringstream scout;
309     scout << "(" << myProcId << ") my TODO list is: ";
310     for (const ProcCouple& pc: _to_do_list)
311       scout << "(" << pc.first << "," << pc.second << ")";
312     std::cout << scout.str() << "\n";
313 #endif
314   }
315
316   void OverlapElementLocator::debugPrintWorkSharing(std::ostream & ostr) const
317   {
318     std::vector< std::vector< ProcCouple > >::const_iterator it = _all_todo_lists.begin();
319     ostr << "TODO list lengths: ";
320     for(; it != _all_todo_lists.end(); ++it)
321       ostr << (*it).size() << " ";
322     ostr << "\n";
323   }
324
325   void OverlapElementLocator::fillProcToSend()
326   {
327     // Feeding now '_procs_to_send*'. A same id can appears twice. The second parameter in pair means what
328     // to send true=source, false=target
329     int myProcId=_group.myRank();
330     _procs_to_send_mesh.clear();
331     _procs_to_send_field.clear();
332     for(int i=0;i<_group.size();i++)
333       {
334         for(const ProcCouple& pc: _all_todo_lists[i])
335           {
336             if(pc.first==myProcId)
337               {
338                 if(i!=myProcId)
339                   _procs_to_send_mesh.push_back(Proc_SrcOrTgt(i,true));
340                 _procs_to_send_field.push_back(pc.second);
341               }
342             if(pc.second==myProcId)
343               if(i!=myProcId)
344                 _procs_to_send_mesh.push_back(Proc_SrcOrTgt(i,false));
345           }
346       }
347     // Sort to avoid deadlocks!!
348     std::sort(_procs_to_send_mesh.begin(), _procs_to_send_mesh.end());
349 #ifdef DEC_DEBUG
350     std::stringstream scout;
351     scout << "(" << _group.myRank() << ") PROC TO SEND list is: ";
352     for (const auto& pc: _procs_to_send_mesh)
353       scout << "(" << pc.first << "," << (pc.second ? "src":"tgt") << ") ";
354     std::cout << scout.str() << "\n";
355 #endif
356   }
357
358
359   /*!
360    * The aim of this method is to perform the communication to get data corresponding to '_to_do_list' attribute.
361    * The principle is the following : if proc n1 and n2 need to perform a cross sending with n1<n2, then n1 will send first and receive then.
362    */
363   void OverlapElementLocator::exchangeMeshes(OverlapInterpolationMatrix& matrix)
364   {
365     int myProcId=_group.myRank();
366     //starting to receive every procs whose id is lower than myProcId.
367     std::vector<Proc_SrcOrTgt> firstRcv, secondRcv;
368     for (const ProcCouple& pc: _to_do_list)
369       {
370         if(pc.first == pc.second) continue; // no xchg needed
371
372         if(pc.first==myProcId)
373           {
374             if(pc.second<myProcId)      firstRcv.push_back(Proc_SrcOrTgt(pc.second,false));
375             else                        secondRcv.push_back(Proc_SrcOrTgt(pc.second, false));
376           }
377         else
378           {//pc.second==myProcId
379             if(pc.first<myProcId)       firstRcv.push_back(Proc_SrcOrTgt(pc.first,true));
380             else                        secondRcv.push_back(Proc_SrcOrTgt(pc.first,true));
381           }
382       }
383     // Actual receiving, in order please to avoid deadlocks!
384     std::sort(firstRcv.begin(), firstRcv.end());
385     for (const Proc_SrcOrTgt& pst: firstRcv)
386       receiveRemoteMeshFrom(pst.first, pst.second);
387
388     // Sending source or target mesh to remote procs (_procs_to_send_mesh is sorted too!)
389     for (const Proc_SrcOrTgt& pst: _procs_to_send_mesh)
390       sendLocalMeshTo(pst.first, pst.second,matrix);
391
392     // Actual 2nd receiving, in order again please to avoid deadlocks!
393     std::sort(secondRcv.begin(), secondRcv.end());
394     for (const Proc_SrcOrTgt& pst: secondRcv)
395       receiveRemoteMeshFrom(pst.first, pst.second);
396   }
397   
398   std::string OverlapElementLocator::getSourceMethod() const
399   {
400     return _local_source_field->getField()->getDiscretization()->getStringRepr();
401   }
402
403   std::string OverlapElementLocator::getTargetMethod() const
404   {
405     return _local_target_field->getField()->getDiscretization()->getStringRepr();
406   }
407
408   const MEDCouplingPointSet *OverlapElementLocator::getSourceMesh(int procId) const
409   {
410     int myProcId=_group.myRank();
411     if(myProcId==procId)
412       return _local_source_mesh;
413     Proc_SrcOrTgt p(procId,true);
414     std::map<Proc_SrcOrTgt, AutoMCPointSet >::const_iterator it=_remote_meshes.find(p);
415     return (*it).second;
416   }
417
418   const DataArrayIdType *OverlapElementLocator::getSourceIds(int procId) const
419   {
420     int myProcId=_group.myRank();
421     if(myProcId==procId)
422       return 0;
423     Proc_SrcOrTgt p(procId,true);
424     std::map<Proc_SrcOrTgt, AutoDAInt >::const_iterator it=_remote_elems.find(p);
425     return (*it).second;
426   }
427
428   const MEDCouplingPointSet *OverlapElementLocator::getTargetMesh(int procId) const
429   {
430     int myProcId=_group.myRank();
431     if(myProcId==procId)
432       return _local_target_mesh;
433     Proc_SrcOrTgt p(procId,false);
434     std::map<Proc_SrcOrTgt, AutoMCPointSet >::const_iterator it=_remote_meshes.find(p);
435     return (*it).second;
436   }
437
438   const DataArrayIdType *OverlapElementLocator::getTargetIds(int procId) const
439   {
440     int myProcId=_group.myRank();
441     if(myProcId==procId)
442       return 0;
443     Proc_SrcOrTgt p(procId,false);
444     std::map<Proc_SrcOrTgt, AutoDAInt >::const_iterator it=_remote_elems.find(p);
445     return (*it).second;
446   }
447
448   bool OverlapElementLocator::isInMyTodoList(int i, int j) const
449   {
450     ProcCouple cpl = std::make_pair(i, j);
451     return std::find(_to_do_list.begin(), _to_do_list.end(), cpl)!=_to_do_list.end();
452   }
453
454   bool OverlapElementLocator::intersectsBoundingBox(int isource, int itarget) const
455   {
456     const double *source_bb=_domain_bounding_boxes+isource*2*2*_local_space_dim;
457     const double *target_bb=_domain_bounding_boxes+itarget*2*2*_local_space_dim+2*_local_space_dim;
458
459     for (int idim=0; idim < _local_space_dim; idim++)
460       {
461         bool intersects = (target_bb[idim*2]<source_bb[idim*2+1]+_epsAbs)
462           && (source_bb[idim*2]<target_bb[idim*2+1]+_epsAbs);
463         if (!intersects)
464           return false; 
465       }
466     return true;
467   }
468
469   /*!
470    * This methods sends (part of) local source if 'sourceOrTarget'==True to proc 'procId'.
471    * This methods sends (part of) local target if 'sourceOrTarget'==False to proc 'procId'.
472    *
473    * This method prepares the matrix too, for matrix assembling and future matrix-vector computation.
474    */
475   void OverlapElementLocator::sendLocalMeshTo(int procId, bool sourceOrTarget, OverlapInterpolationMatrix& matrix) const
476   {
477 #ifdef DEC_DEBUG
478     int rank = _group.myRank();
479     std::string st = sourceOrTarget ? "src" : "tgt";
480     std::stringstream scout;
481     scout << "(" << rank << ") SEND part of " << st << " TO: " << procId;
482     std::cout << scout.str() << "\n";
483 #endif
484
485    //int myProcId=_group.myRank();
486    const double *distant_bb=0;
487    MEDCouplingPointSet *local_mesh=0;
488    const ParaFIELD *field=0;
489    if(sourceOrTarget)//source for local mesh but target for distant mesh
490      {
491        distant_bb=_domain_bounding_boxes+procId*2*2*_local_space_dim+2*_local_space_dim;
492        local_mesh=_local_source_mesh;
493        field=_local_source_field;
494      }
495    else//target for local but source for distant
496      {
497        distant_bb=_domain_bounding_boxes+procId*2*2*_local_space_dim;
498        local_mesh=_local_target_mesh;
499        field=_local_target_field;
500      }
501    AutoDAInt elems=local_mesh->getCellsInBoundingBox(distant_bb,getBoundingBoxAdjustment());
502    DataArrayIdType *old2new_map;
503    MEDCouplingPointSet *send_mesh=static_cast<MEDCouplingPointSet *>(field->getField()->buildSubMeshData(elems->begin(),elems->end(),old2new_map));
504    if(sourceOrTarget)
505      matrix.keepTracksOfSourceIds(procId,old2new_map);
506    else
507      matrix.keepTracksOfTargetIds(procId,old2new_map);
508    sendMesh(procId,send_mesh,old2new_map);
509    send_mesh->decrRef();
510    old2new_map->decrRef();
511   }
512
513   /*!
514    * This method receives source remote mesh on proc 'procId' if sourceOrTarget==True
515    * This method receives target remote mesh on proc 'procId' if sourceOrTarget==False
516    */
517   void OverlapElementLocator::receiveRemoteMeshFrom(int procId, bool sourceOrTarget)
518   {
519 #ifdef DEC_DEBUG
520     int rank = _group.myRank();
521     std::string st = sourceOrTarget ? "src" : "tgt";
522     std::stringstream scout;
523     scout << "(" << rank << ") RCV part of " << st << " FROM: " << procId;
524     std::cout << scout.str() << "\n";
525 #endif
526     DataArrayIdType *old2new_map=0;
527     MEDCouplingPointSet *m=0;
528     receiveMesh(procId,m,old2new_map);
529     Proc_SrcOrTgt p(procId,sourceOrTarget);
530     _remote_meshes[p]=m;
531     _remote_elems[p]=old2new_map;
532   }
533
534   void OverlapElementLocator::sendMesh(int procId, const MEDCouplingPointSet *mesh, const DataArrayIdType *idsToSend) const
535   {
536     CommInterface comInterface=_group.getCommInterface();
537
538     // First stage : exchanging sizes
539     vector<double> tinyInfoLocalD;//tinyInfoLocalD not used for the moment
540     vector<mcIdType> tinyInfoLocal;
541     vector<string> tinyInfoLocalS;
542     mesh->getTinySerializationInformation(tinyInfoLocalD,tinyInfoLocal,tinyInfoLocalS);
543     const MPI_Comm *comm=getCommunicator();
544     //
545     mcIdType lgth[2];
546     lgth[0]=ToIdType(tinyInfoLocal.size());
547     lgth[1]=idsToSend->getNbOfElems();
548     comInterface.send(&lgth,2,MPI_ID_TYPE,procId,START_TAG_MESH_XCH,*_comm);
549     comInterface.send(&tinyInfoLocal[0],(int)tinyInfoLocal.size(),MPI_ID_TYPE,procId,START_TAG_MESH_XCH+1,*comm);
550     //
551     DataArrayIdType *v1Local=0;
552     DataArrayDouble *v2Local=0;
553     mesh->serialize(v1Local,v2Local);
554     comInterface.send(v1Local->getPointer(),(int)v1Local->getNbOfElems(),MPI_ID_TYPE,procId,START_TAG_MESH_XCH+2,*comm);
555     comInterface.send(v2Local->getPointer(),(int)v2Local->getNbOfElems(),MPI_DOUBLE,procId,START_TAG_MESH_XCH+3,*comm);
556     //finished for mesh, ids now
557     comInterface.send(const_cast<mcIdType *>(idsToSend->getConstPointer()),(int)lgth[1],MPI_ID_TYPE,procId,START_TAG_MESH_XCH+4,*comm);
558     //
559     v1Local->decrRef();
560     v2Local->decrRef();
561   }
562
563   void OverlapElementLocator::receiveMesh(int procId, MEDCouplingPointSet* &mesh, DataArrayIdType *&ids) const
564   {
565     mcIdType lgth[2];
566     MPI_Status status;
567     const MPI_Comm *comm=getCommunicator();
568     CommInterface comInterface=_group.getCommInterface();
569     comInterface.recv(lgth,2,MPI_ID_TYPE,procId,START_TAG_MESH_XCH,*_comm,&status);
570     std::vector<mcIdType> tinyInfoDistant(lgth[0]);
571     ids=DataArrayIdType::New();
572     ids->alloc(lgth[1],1);
573     comInterface.recv(&tinyInfoDistant[0],(int)lgth[0],MPI_ID_TYPE,procId,START_TAG_MESH_XCH+1,*comm,&status);
574     mesh=MEDCouplingPointSet::BuildInstanceFromMeshType((MEDCouplingMeshType)tinyInfoDistant[0]);
575     std::vector<std::string> unusedTinyDistantSts;
576     vector<double> tinyInfoDistantD(1);//tinyInfoDistantD not used for the moment
577     DataArrayIdType *v1Distant=DataArrayIdType::New();
578     DataArrayDouble *v2Distant=DataArrayDouble::New();
579     mesh->resizeForUnserialization(tinyInfoDistant,v1Distant,v2Distant,unusedTinyDistantSts);
580     comInterface.recv(v1Distant->getPointer(),(int)v1Distant->getNbOfElems(),MPI_ID_TYPE,procId,START_TAG_MESH_XCH+2,*comm,&status);
581     comInterface.recv(v2Distant->getPointer(),(int)v2Distant->getNbOfElems(),MPI_DOUBLE,procId,START_TAG_MESH_XCH+3,*comm,&status);
582     mesh->unserialization(tinyInfoDistantD,tinyInfoDistant,v1Distant,v2Distant,unusedTinyDistantSts);
583     //finished for mesh, ids now
584     comInterface.recv(ids->getPointer(),(int)lgth[1],MPI_ID_TYPE,procId,1144,*comm,&status);
585     //
586     v1Distant->decrRef();
587     v2Distant->decrRef();
588   }
589 }