Salome HOME
Copyright update 2020
[tools/medcoupling.git] / src / ParaMEDMEM / OverlapElementLocator.cxx
1 // Copyright (C) 2007-2020  CEA/DEN, EDF R&D
2 //
3 // This library is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU Lesser General Public
5 // License as published by the Free Software Foundation; either
6 // version 2.1 of the License, or (at your option) any later version.
7 //
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
11 // Lesser General Public License for more details.
12 //
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
16 //
17 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
18 //
19 // Author : Anthony Geay (CEA/DEN)
20
21 #include "OverlapElementLocator.hxx"
22
23 #include "CommInterface.hxx"
24 #include "Topology.hxx"
25 #include "BlockTopology.hxx"
26 #include "ParaFIELD.hxx"
27 #include "ParaMESH.hxx"
28 #include "ProcessorGroup.hxx"
29 #include "MPIProcessorGroup.hxx"
30 #include "OverlapInterpolationMatrix.hxx"
31 #include "MEDCouplingFieldDouble.hxx"
32 #include "MEDCouplingFieldDiscretization.hxx"
33 #include "DirectedBoundingBox.hxx"
34 #include "InterpKernelAutoPtr.hxx"
35
36 #include <limits>
37
38 using namespace std;
39
40 namespace MEDCoupling 
41
42   const int OverlapElementLocator::START_TAG_MESH_XCH = 1140;
43
44   OverlapElementLocator::OverlapElementLocator(const ParaFIELD *sourceField, const ParaFIELD *targetField,
45                                                const ProcessorGroup& group, double epsAbs, int workSharingAlgo)
46     : _local_source_field(sourceField),
47       _local_target_field(targetField),
48       _local_source_mesh(0),
49       _local_target_mesh(0),
50       _domain_bounding_boxes(0),
51       _epsAbs(epsAbs),
52       _group(group)
53   { 
54     if(_local_source_field)
55       _local_source_mesh=_local_source_field->getSupport()->getCellMesh();
56     if(_local_target_field)
57       _local_target_mesh=_local_target_field->getSupport()->getCellMesh();
58     _comm=getCommunicator();
59
60     computeBoundingBoxesAndInteractionList();
61     switch(workSharingAlgo)
62     {
63       case 0:
64         computeTodoList_original();   break;
65       case 1:
66         computeTodoList_new(false);   break;
67       case 2:
68         computeTodoList_new(true);    break;
69       default:
70         throw INTERP_KERNEL::Exception("OverlapElementLocator::OverlapElementLocator(): invalid algorithm selected!");
71     }
72
73     fillProcToSend();
74   }
75
76   OverlapElementLocator::~OverlapElementLocator()
77   {
78     delete [] _domain_bounding_boxes;
79   }
80
81   const MPI_Comm *OverlapElementLocator::getCommunicator() const
82   {
83     const MPIProcessorGroup* group=static_cast<const MPIProcessorGroup*>(&_group);
84     return group->getComm();
85   }
86
87   void OverlapElementLocator::computeBoundingBoxesAndInteractionList()
88   {
89     CommInterface comm_interface=_group.getCommInterface();
90     const MPIProcessorGroup* group=static_cast<const MPIProcessorGroup*> (&_group);
91     _local_space_dim=0;
92     if(_local_source_mesh)
93       _local_space_dim=_local_source_mesh->getSpaceDimension();
94     else
95       _local_space_dim=_local_target_mesh->getSpaceDimension();
96     //
97     const MPI_Comm* comm = group->getComm();
98     int bbSize=2*2*_local_space_dim;//2 (for source/target) 2 (min/max)
99     _domain_bounding_boxes=new double[bbSize*_group.size()];
100     INTERP_KERNEL::AutoPtr<double> minmax=new double[bbSize];
101     //Format minmax : Xmin_src,Xmax_src,Ymin_src,Ymax_src,Zmin_src,Zmax_src,Xmin_trg,Xmax_trg,Ymin_trg,Ymax_trg,Zmin_trg,Zmax_trg
102     if(_local_source_mesh)
103       _local_source_mesh->getBoundingBox(minmax);
104     else
105       {
106         for(int i=0;i<_local_space_dim;i++)
107           {
108             minmax[i*2]=std::numeric_limits<double>::max();
109             minmax[i*2+1]=-std::numeric_limits<double>::max();
110           }
111       }
112     if(_local_target_mesh)
113       _local_target_mesh->getBoundingBox(minmax+2*_local_space_dim);
114     else
115       {
116         for(int i=0;i<_local_space_dim;i++)
117           {
118             minmax[i*2+2*_local_space_dim]=std::numeric_limits<double>::max();
119             minmax[i*2+1+2*_local_space_dim]=-std::numeric_limits<double>::max();
120           }
121       }
122     comm_interface.allGather(minmax, bbSize, MPI_DOUBLE,
123                              _domain_bounding_boxes,bbSize, MPI_DOUBLE, 
124                              *comm);
125   
126     // Computation of all pairs needing an interpolation pairs are duplicated now !
127     
128     _proc_pairs.clear();//first is source second is target
129     _proc_pairs.resize(_group.size());
130     for(int i=0;i<_group.size();i++)
131       for(int j=0;j<_group.size();j++)
132         if(intersectsBoundingBox(i,j))
133           _proc_pairs[i].push_back(j);
134   }
135
136   void OverlapElementLocator::computeTodoList_original()
137   {
138     // OK now let's assigning as balanced as possible, job to each proc of group
139     _all_todo_lists.resize(_group.size());
140     int i=0;
141     for(std::vector< std::vector< int > >::const_iterator it1=_proc_pairs.begin();it1!=_proc_pairs.end();it1++,i++)
142       for(std::vector< int >::const_iterator it2=(*it1).begin();it2!=(*it1).end();it2++)
143         {
144           if(_all_todo_lists[i].size()<=_all_todo_lists[*it2].size())//it includes the fact that i==*it2
145             _all_todo_lists[i].push_back(ProcCouple(i,*it2));
146           else
147             _all_todo_lists[*it2].push_back(ProcCouple(i,*it2));
148         }
149     //Keeping todo list of current proc. _to_do_list contains a set of pair where at least _group.myRank() appears once.
150     //This proc will be in charge to perform interpolation of any of element of '_to_do_list'
151     //If _group.myRank()==myPair.first, current proc should fetch target mesh of myPair.second (if different from _group.myRank()).
152     //If _group.myRank()==myPair.second, current proc should fetch source mesh of myPair.second.
153
154     int myProcId=_group.myRank();
155     _to_do_list=_all_todo_lists[myProcId];
156
157 #ifdef DEC_DEBUG
158     std::stringstream scout;
159     scout << "(" << myProcId << ") my TODO list is: ";
160     for (std::vector< ProcCouple >::const_iterator itdbg=_to_do_list.begin(); itdbg!=_to_do_list.end(); itdbg++)
161       scout << "(" << (*itdbg).first << "," << (*itdbg).second << ")";
162     std::cout << scout.str() << "\n";
163 #endif
164   }
165
166   /* More efficient (?) work sharing algorithm: a job (i,j) is initially assigned twice: to proc#i and to proc#j.
167    * Then try to reduce as much as possible the variance of the num of jobs per proc by selecting the right duplicate
168    * to remove:
169    *  - take the most loaded proc i,
170    *    + select the job (i,j) for which proc#j is the less loaded
171    *    + remove this job from proc#i, and mark it as 'unremovable' from proc#j
172    *  - repeat until no more duplicates are found
173    */
174   void OverlapElementLocator::computeTodoList_new(bool revertIter)
175   {
176     using namespace std;
177     int infinity = std::numeric_limits<int>::max();
178     // Initialisation
179     int grp_size = _group.size();
180     vector < map<ProcCouple, int> > full_set(grp_size );
181     int srcProcID = 0;
182     for(vector< vector< int > >::const_iterator it = _proc_pairs.begin(); it != _proc_pairs.end(); it++, srcProcID++)
183       for (vector< int >::const_iterator it2=(*it).begin(); it2 != (*it).end(); it2++)
184       {
185         // Here a pair of the form (i,i) is added only once!
186         int tgtProcID = *it2;
187         ProcCouple cpl = make_pair(srcProcID, tgtProcID);
188         full_set[srcProcID][cpl] = -1;
189         full_set[tgtProcID][cpl] = -1;
190       }
191     int procID = 0;
192     vector < map<ProcCouple, int> > ::iterator itVector;
193     map<ProcCouple, int>::iterator itMap;
194     for(itVector = full_set.begin(); itVector != full_set.end(); itVector++, procID++)
195       for (itMap=(*itVector).begin(); itMap != (*itVector).end(); itMap++)
196         {
197           const ProcCouple & cpl = (*itMap).first;
198           if (cpl.first == cpl.second)
199             // special case - this couple can not be removed in the future
200             (*itMap).second = infinity;
201           else
202             {
203             if(cpl.first == procID)
204               (*itMap).second = (int)full_set[cpl.second].size();
205             else // cpl.second == srcProcID
206               (*itMap).second = (int)full_set[cpl.first].size();
207             }
208         }
209     INTERP_KERNEL::AutoPtr<bool> proc_valid = new bool[grp_size];
210     fill((bool *)proc_valid, proc_valid+grp_size, true);
211
212     // Now the algo:
213     while (find((bool *)proc_valid, proc_valid+grp_size, true) != proc_valid+grp_size)
214       {
215         // Most loaded proc:
216         int max_sz = -1, max_id = -1;
217         for(itVector = full_set.begin(), procID=0; itVector != full_set.end(); itVector++, procID++)
218           {
219             int sz = (int)(*itVector).size();
220             if (proc_valid[procID] && sz > max_sz)
221               {
222                 max_sz = sz;
223                 max_id = procID;
224               }
225           }
226
227         // Nothing more to do:
228         if (max_sz == -1)
229           break;
230         // For this proc, job with less loaded second proc:
231         int min_sz = infinity;
232         map<ProcCouple, int> & max_map = full_set[max_id];
233         ProcCouple hit_cpl = make_pair(-1,-1);
234         if(revertIter)
235           {
236           // Use a reverse iterator here increases our chances to hit a couple of the form (i, myProcId)
237           // meaning that the final matrix computed won't have to be sent: save some comm.
238           map<ProcCouple, int> ::const_reverse_iterator ritMap;
239           for(ritMap=max_map.rbegin(); ritMap != max_map.rend(); ritMap++)
240             if ((*ritMap).second < min_sz)
241               hit_cpl = (*ritMap).first;
242           }
243         else
244           {
245             for(itMap=max_map.begin(); itMap != max_map.end(); itMap++)
246               if ((*itMap).second < min_sz)
247                 hit_cpl = (*itMap).first;
248           }
249         if (hit_cpl.first == -1)
250           {
251             // Plouf. Current proc 'max_id' can not be reduced. Invalid it:
252             proc_valid[max_id] = false;
253             continue;
254           }
255         // Remove item from proc 'max_id'
256         full_set[max_id].erase(hit_cpl);
257         // And mark it as not removable on the other side:
258         if (hit_cpl.first == max_id)
259           full_set[hit_cpl.second][hit_cpl] = infinity;
260         else  // hit_cpl.second == max_id
261           full_set[hit_cpl.first][hit_cpl] = infinity;
262
263         // Now update all counts of valid maps:
264         procID = 0;
265         for(itVector = full_set.begin(); itVector != full_set.end(); itVector++, procID++)
266           if(proc_valid[procID] && procID != max_id)
267             for(itMap = (*itVector).begin(); itMap != (*itVector).end(); itMap++)
268               {
269                 const ProcCouple & cpl = (*itMap).first;
270                 // Unremovable item:
271                 if ((*itMap).second == infinity)
272                   continue;
273                 if (cpl.first == max_id || cpl.second == max_id)
274                   (*itMap).second--;
275               }
276       }
277     // Final formatting - extract remaining keys in each map:
278     int myProcId=_group.myRank();
279     _all_todo_lists.resize(grp_size);
280     procID = 0;
281     for(itVector = full_set.begin(); itVector != full_set.end(); itVector++, procID++)
282       for(itMap = (*itVector).begin(); itMap != (*itVector).end(); itMap++)
283           _all_todo_lists[procID].push_back((*itMap).first);
284     _to_do_list=_all_todo_lists[myProcId];
285
286 #ifdef DEC_DEBUG
287     std::stringstream scout;
288     scout << "(" << myProcId << ") my TODO list is: ";
289     for (std::vector< ProcCouple >::const_iterator itdbg=_to_do_list.begin(); itdbg!=_to_do_list.end(); itdbg++)
290       scout << "(" << (*itdbg).first << "," << (*itdbg).second << ")";
291     std::cout << scout.str() << "\n";
292 #endif
293   }
294
295   void OverlapElementLocator::debugPrintWorkSharing(std::ostream & ostr) const
296   {
297     std::vector< std::vector< ProcCouple > >::const_iterator it = _all_todo_lists.begin();
298     ostr << "TODO list lengths: ";
299     for(; it != _all_todo_lists.end(); ++it)
300       ostr << (*it).size() << " ";
301     ostr << "\n";
302   }
303
304   void OverlapElementLocator::fillProcToSend()
305   {
306     // Feeding now '_procs_to_send*'. A same id can appears twice. The second parameter in pair means what
307     // to send true=source, false=target
308     int myProcId=_group.myRank();
309     _procs_to_send_mesh.clear();
310     _procs_to_send_field.clear();
311     for(int i=_group.size()-1;i>=0;i--)
312       {
313         const std::vector< ProcCouple >& anRemoteProcToDoList=_all_todo_lists[i];
314         for(std::vector< ProcCouple >::const_iterator it=anRemoteProcToDoList.begin();it!=anRemoteProcToDoList.end();it++)
315           {
316             if((*it).first==myProcId)
317               {
318                 if(i!=myProcId)
319                   _procs_to_send_mesh.push_back(Proc_SrcOrTgt(i,true));
320                 _procs_to_send_field.push_back((*it).second);
321               }
322             if((*it).second==myProcId)
323               if(i!=myProcId)
324                 _procs_to_send_mesh.push_back(Proc_SrcOrTgt(i,false));
325           }
326       }
327   }
328
329
330   /*!
331    * The aim of this method is to perform the communication to get data corresponding to '_to_do_list' attribute.
332    * The principle is the following : if proc n1 and n2 need to perform a cross sending with n1<n2, then n1 will send first and receive then.
333    */
334   void OverlapElementLocator::exchangeMeshes(OverlapInterpolationMatrix& matrix)
335   {
336     int myProcId=_group.myRank();
337     //starting to receive every procs whose id is lower than myProcId.
338     std::vector< ProcCouple > toDoListForFetchRemaining;
339     for(std::vector< ProcCouple >::const_iterator it=_to_do_list.begin();it!=_to_do_list.end();it++)
340       {
341         int first = (*it).first, second = (*it).second;
342         if(first!=second)
343           {
344             if(first==myProcId)
345               {
346                 if(second<myProcId)
347                   receiveRemoteMeshFrom(second,false);
348                 else
349                   toDoListForFetchRemaining.push_back(ProcCouple(first,second));
350               }
351             else
352               {//(*it).second==myProcId
353                 if(first<myProcId)
354                   receiveRemoteMeshFrom(first,true);
355                 else
356                   toDoListForFetchRemaining.push_back(ProcCouple(first,second));
357               }
358           }
359       }
360     //sending source or target mesh to remote procs
361     for(std::vector< Proc_SrcOrTgt >::const_iterator it2=_procs_to_send_mesh.begin();it2!=_procs_to_send_mesh.end();it2++)
362       sendLocalMeshTo((*it2).first,(*it2).second,matrix);
363     //fetching remaining meshes
364     for(std::vector< ProcCouple >::const_iterator it=toDoListForFetchRemaining.begin();it!=toDoListForFetchRemaining.end();it++)
365       {
366         if((*it).first!=(*it).second)
367           {
368             if((*it).first==myProcId)
369               receiveRemoteMeshFrom((*it).second,false);
370             else//(*it).second==myProcId
371               receiveRemoteMeshFrom((*it).first,true);
372           }
373       }
374   }
375   
376   std::string OverlapElementLocator::getSourceMethod() const
377   {
378     return _local_source_field->getField()->getDiscretization()->getStringRepr();
379   }
380
381   std::string OverlapElementLocator::getTargetMethod() const
382   {
383     return _local_target_field->getField()->getDiscretization()->getStringRepr();
384   }
385
386   const MEDCouplingPointSet *OverlapElementLocator::getSourceMesh(int procId) const
387   {
388     int myProcId=_group.myRank();
389     if(myProcId==procId)
390       return _local_source_mesh;
391     Proc_SrcOrTgt p(procId,true);
392     std::map<Proc_SrcOrTgt, AutoMCPointSet >::const_iterator it=_remote_meshes.find(p);
393     return (*it).second;
394   }
395
396   const DataArrayIdType *OverlapElementLocator::getSourceIds(int procId) const
397   {
398     int myProcId=_group.myRank();
399     if(myProcId==procId)
400       return 0;
401     Proc_SrcOrTgt p(procId,true);
402     std::map<Proc_SrcOrTgt, AutoDAInt >::const_iterator it=_remote_elems.find(p);
403     return (*it).second;
404   }
405
406   const MEDCouplingPointSet *OverlapElementLocator::getTargetMesh(int procId) const
407   {
408     int myProcId=_group.myRank();
409     if(myProcId==procId)
410       return _local_target_mesh;
411     Proc_SrcOrTgt p(procId,false);
412     std::map<Proc_SrcOrTgt, AutoMCPointSet >::const_iterator it=_remote_meshes.find(p);
413     return (*it).second;
414   }
415
416   const DataArrayIdType *OverlapElementLocator::getTargetIds(int procId) const
417   {
418     int myProcId=_group.myRank();
419     if(myProcId==procId)
420       return 0;
421     Proc_SrcOrTgt p(procId,false);
422     std::map<Proc_SrcOrTgt, AutoDAInt >::const_iterator it=_remote_elems.find(p);
423     return (*it).second;
424   }
425
426   bool OverlapElementLocator::isInMyTodoList(int i, int j) const
427   {
428     ProcCouple cpl = std::make_pair(i, j);
429     return std::find(_to_do_list.begin(), _to_do_list.end(), cpl)!=_to_do_list.end();
430   }
431
432   bool OverlapElementLocator::intersectsBoundingBox(int isource, int itarget) const
433   {
434     const double *source_bb=_domain_bounding_boxes+isource*2*2*_local_space_dim;
435     const double *target_bb=_domain_bounding_boxes+itarget*2*2*_local_space_dim+2*_local_space_dim;
436
437     for (int idim=0; idim < _local_space_dim; idim++)
438       {
439         bool intersects = (target_bb[idim*2]<source_bb[idim*2+1]+_epsAbs)
440           && (source_bb[idim*2]<target_bb[idim*2+1]+_epsAbs);
441         if (!intersects)
442           return false; 
443       }
444     return true;
445   }
446
447   /*!
448    * This methods sends (part of) local source if 'sourceOrTarget'==True to proc 'procId'.
449    * This methods sends (part of) local target if 'sourceOrTarget'==False to proc 'procId'.
450    *
451    * This method prepares the matrix too, for matrix assembling and future matrix-vector computation.
452    */
453   void OverlapElementLocator::sendLocalMeshTo(int procId, bool sourceOrTarget, OverlapInterpolationMatrix& matrix) const
454   {
455    //int myProcId=_group.myRank();
456    const double *distant_bb=0;
457    MEDCouplingPointSet *local_mesh=0;
458    const ParaFIELD *field=0;
459    if(sourceOrTarget)//source for local mesh but target for distant mesh
460      {
461        distant_bb=_domain_bounding_boxes+procId*2*2*_local_space_dim+2*_local_space_dim;
462        local_mesh=_local_source_mesh;
463        field=_local_source_field;
464      }
465    else//target for local but source for distant
466      {
467        distant_bb=_domain_bounding_boxes+procId*2*2*_local_space_dim;
468        local_mesh=_local_target_mesh;
469        field=_local_target_field;
470      }
471    AutoDAInt elems=local_mesh->getCellsInBoundingBox(distant_bb,getBoundingBoxAdjustment());
472    DataArrayIdType *old2new_map;
473    MEDCouplingPointSet *send_mesh=static_cast<MEDCouplingPointSet *>(field->getField()->buildSubMeshData(elems->begin(),elems->end(),old2new_map));
474    if(sourceOrTarget)
475      matrix.keepTracksOfSourceIds(procId,old2new_map);
476    else
477      matrix.keepTracksOfTargetIds(procId,old2new_map);
478    sendMesh(procId,send_mesh,old2new_map);
479    send_mesh->decrRef();
480    old2new_map->decrRef();
481   }
482
483   /*!
484    * This method receives source remote mesh on proc 'procId' if sourceOrTarget==True
485    * This method receives target remote mesh on proc 'procId' if sourceOrTarget==False
486    */
487   void OverlapElementLocator::receiveRemoteMeshFrom(int procId, bool sourceOrTarget)
488   {
489     DataArrayIdType *old2new_map=0;
490     MEDCouplingPointSet *m=0;
491     receiveMesh(procId,m,old2new_map);
492     Proc_SrcOrTgt p(procId,sourceOrTarget);
493     _remote_meshes[p]=m;
494     _remote_elems[p]=old2new_map;
495   }
496
497   void OverlapElementLocator::sendMesh(int procId, const MEDCouplingPointSet *mesh, const DataArrayIdType *idsToSend) const
498   {
499     CommInterface comInterface=_group.getCommInterface();
500
501     // First stage : exchanging sizes
502     vector<double> tinyInfoLocalD;//tinyInfoLocalD not used for the moment
503     vector<mcIdType> tinyInfoLocal;
504     vector<string> tinyInfoLocalS;
505     mesh->getTinySerializationInformation(tinyInfoLocalD,tinyInfoLocal,tinyInfoLocalS);
506     const MPI_Comm *comm=getCommunicator();
507     //
508     mcIdType lgth[2];
509     lgth[0]=ToIdType(tinyInfoLocal.size());
510     lgth[1]=idsToSend->getNbOfElems();
511     comInterface.send(&lgth,2,MPI_ID_TYPE,procId,START_TAG_MESH_XCH,*_comm);
512     comInterface.send(&tinyInfoLocal[0],(int)tinyInfoLocal.size(),MPI_ID_TYPE,procId,START_TAG_MESH_XCH+1,*comm);
513     //
514     DataArrayIdType *v1Local=0;
515     DataArrayDouble *v2Local=0;
516     mesh->serialize(v1Local,v2Local);
517     comInterface.send(v1Local->getPointer(),(int)v1Local->getNbOfElems(),MPI_ID_TYPE,procId,START_TAG_MESH_XCH+2,*comm);
518     comInterface.send(v2Local->getPointer(),(int)v2Local->getNbOfElems(),MPI_DOUBLE,procId,START_TAG_MESH_XCH+3,*comm);
519     //finished for mesh, ids now
520     comInterface.send(const_cast<mcIdType *>(idsToSend->getConstPointer()),(int)lgth[1],MPI_ID_TYPE,procId,START_TAG_MESH_XCH+4,*comm);
521     //
522     v1Local->decrRef();
523     v2Local->decrRef();
524   }
525
526   void OverlapElementLocator::receiveMesh(int procId, MEDCouplingPointSet* &mesh, DataArrayIdType *&ids) const
527   {
528     mcIdType lgth[2];
529     MPI_Status status;
530     const MPI_Comm *comm=getCommunicator();
531     CommInterface comInterface=_group.getCommInterface();
532     comInterface.recv(lgth,2,MPI_ID_TYPE,procId,START_TAG_MESH_XCH,*_comm,&status);
533     std::vector<mcIdType> tinyInfoDistant(lgth[0]);
534     ids=DataArrayIdType::New();
535     ids->alloc(lgth[1],1);
536     comInterface.recv(&tinyInfoDistant[0],(int)lgth[0],MPI_ID_TYPE,procId,START_TAG_MESH_XCH+1,*comm,&status);
537     mesh=MEDCouplingPointSet::BuildInstanceFromMeshType((MEDCouplingMeshType)tinyInfoDistant[0]);
538     std::vector<std::string> unusedTinyDistantSts;
539     vector<double> tinyInfoDistantD(1);//tinyInfoDistantD not used for the moment
540     DataArrayIdType *v1Distant=DataArrayIdType::New();
541     DataArrayDouble *v2Distant=DataArrayDouble::New();
542     mesh->resizeForUnserialization(tinyInfoDistant,v1Distant,v2Distant,unusedTinyDistantSts);
543     comInterface.recv(v1Distant->getPointer(),(int)v1Distant->getNbOfElems(),MPI_ID_TYPE,procId,START_TAG_MESH_XCH+2,*comm,&status);
544     comInterface.recv(v2Distant->getPointer(),(int)v2Distant->getNbOfElems(),MPI_DOUBLE,procId,START_TAG_MESH_XCH+3,*comm,&status);
545     mesh->unserialization(tinyInfoDistantD,tinyInfoDistant,v1Distant,v2Distant,unusedTinyDistantSts);
546     //finished for mesh, ids now
547     comInterface.recv(ids->getPointer(),(int)lgth[1],MPI_ID_TYPE,procId,1144,*comm,&status);
548     //
549     v1Distant->decrRef();
550     v2Distant->decrRef();
551   }
552 }