]> SALOME platform Git repositories - tools/medcoupling.git/commitdiff
Salome HOME
*** empty log message ***
authorageay <ageay>
Tue, 5 Apr 2011 09:38:12 +0000 (09:38 +0000)
committerageay <ageay>
Tue, 5 Apr 2011 09:38:12 +0000 (09:38 +0000)
src/ParaMEDMEM/OverlapDEC.cxx
src/ParaMEDMEM/OverlapElementLocator.cxx
src/ParaMEDMEM/OverlapElementLocator.hxx
src/ParaMEDMEM/OverlapInterpolationMatrix.cxx
src/ParaMEDMEM/OverlapInterpolationMatrix.hxx
src/ParaMEDMEM/OverlapMapping.cxx
src/ParaMEDMEM/OverlapMapping.hxx
src/ParaMEDMEMTest/ParaMEDMEMTest.hxx
src/ParaMEDMEMTest/ParaMEDMEMTest_OverlapDEC.cxx

index 7a62893ca132b833d9daeb7d1c421ff6084c735c..4ac0dfdf0b17a72ea7e55e31b31dcb129f949bc7 100644 (file)
     of proc #m. In this case proc#k computes part of mesh A in boundingbox B of proc#m. It implies that the corresponding cellIds or nodeIds of
     corresponding part are sent to proc #m too.
 
-    Let's consider the couple (k,m) in TODO list. This couple is treated by either k or m as seen \ref ParaMEDMEMOverlapDECAlgoStep2 "here".
+    Let's consider the couple (k,m) in TODO list. This couple is treated by either k or m as seen \ref ParaMEDMEMOverlapDECAlgoStep2 "here in Step2".
 
-    As it will be dealt in Step 6, at the end for final matrix-vector computation the result matrix of the couple (k,m) anywhere it is computed (proc#k or proc#m)
+    As it will be dealt in Step 6, at the end for final matrix-vector computation the result matrix of the couple (k,m) anywhere it is computed (proc #k or proc #m)
     it will be stored in \b proc#m.
 
-    If proc#k is in charge of this couple (k,m) target ids (cells or nodes) of mesh in proc#m are renumbered, because proc#m has stripped 
-    its target mesh to avoid big amount of data. In this case has it is finally proc#m in charge of the matrix, proc#m keeps preciously the
-    target ids sent to proc#k. No problem will appear for source ids because no restriction done.
+    - If proc #k is in charge (performs matrix computation) of this couple (k,m) target ids (cells or nodes) of mesh in proc #m are renumbered, because proc #m has stripped 
+    its target mesh to avoid big amount of data to transfer. In this case as it is finally proc #m in charge finally of the matrix, proc #k must keep preciously the
+    source ids needed to be sent to proc#m. No problem will appear for matrix assembling in proc m, for source ids because no restriction done.
+    Concerning source ids to be sent for matrix-vector computation, proc k will known precisely which source ids field values to send to proc #m.
+    This is incarnated by OverlapMapping::keepTracksOfTargetIds in proc m.
+
+    - If proc #m is in charge (performs matrix computation) of this couple (k,m) source ids (cells or nodes) of mesh in proc #k are renumbered, because proc #k has stripped
+    its source mesh to avoid big amount of data to transfer. In this case as it is finally proc #m in charge finally of the matrix, proc #m receive the source ids
+    from remote proc #k so the matrix is directly OK, no need of renumbering will be needed in \ref ParaMEDMEMOverlapDECAlgoStep5 "Step 5". But proc k must
+    keep tracks of sent ids to proc m for matrix-vector computation.
+    This is incarnated by OverlapMapping::keepTracksOfSourceIds in proc k.
 
     This step is performed in ParaMEDMEM::OverlapElementLocator::exchangeMeshes method.
 
     For a proc#k, it is necessary to fetch info of all matrix built in \ref ParaMEDMEMOverlapDECAlgoStep4 "Step4" where the first element in pair
     is equal to k.
 
+    After this step, the matrix repartition is the following after call ParaMEDMEM::OverlapMapping::prepare :
+
+    - proc#0 : (0,0),(1,0),(2,0)
+    - proc#1 : (0,1),(2,1)
+    - proc#2 : (1,2),(2,2)
+
+    Tuple (2,1) computed on proc 2 is stored at the end of "prepare" in proc 1. So it is an example of item 0 in \ref ParaMEDMEMOverlapDECAlgoStep2 "Step2".
+    Tuple (0,1) computed on proc 1 and stored in proc 1 too. So it is an example of item 1 in \ref ParaMEDMEMOverlapDECAlgoStep2 "Step2".
+
+    In ParaMEDMEM::OverlapMapping::_proc_ids_to_send_vector_st will contain :
+
+    - Proc#0 : 0,1
+    - Proc#1 : 0,2
+    - Proc#2 : 0,1,2
+
+    In ParaMEDMEM::OverlapMapping::_proc_ids_to_recv_vector_st will contain :
+
+    - Proc#0 : 0,1,2
+    - Proc#1 : 0,2
+    - Proc#2 : 1,2
+
     The method in charge to perform this is : ParaMEDMEM::OverlapMapping::prepare.
 */
 namespace ParaMEDMEM
@@ -191,7 +220,7 @@ namespace ParaMEDMEM
     _interpolation_matrix=new OverlapInterpolationMatrix(_source_field,_target_field,*_group,*this,*this);
     OverlapElementLocator locator(_source_field,_target_field,*_group);
     locator.copyOptions(*this);
-    locator.exchangeMeshes();
+    locator.exchangeMeshes(*_interpolation_matrix);
     std::vector< std::pair<int,int> > jobs=locator.getToDoList();
     std::string srcMeth=locator.getSourceMethod();
     std::string trgMeth=locator.getTargetMethod();
index af751cdca58dad852740d1322e7fe397c86fb77f..628350f3c080961191a3bc21a09623fada215124 100644 (file)
@@ -27,6 +27,7 @@
 #include "ParaMESH.hxx"
 #include "ProcessorGroup.hxx"
 #include "MPIProcessorGroup.hxx"
+#include "OverlapInterpolationMatrix.hxx"
 #include "MEDCouplingFieldDouble.hxx"
 #include "MEDCouplingFieldDiscretization.hxx"
 #include "DirectedBoundingBox.hxx"
@@ -156,7 +157,7 @@ namespace ParaMEDMEM
    * The aim of this method is to perform the communication to get data corresponding to '_to_do_list' attribute.
    * The principle is the following : if proc n1 and n2 need to perform a cross sending with n1<n2, then n1 will send first and receive then.
    */
-  void OverlapElementLocator::exchangeMeshes()
+  void OverlapElementLocator::exchangeMeshes(OverlapInterpolationMatrix& matrix)
   {
     int myProcId=_group.myRank();
     //starting to receive every procs whose id is lower than myProcId.
@@ -183,7 +184,7 @@ namespace ParaMEDMEM
       }
     //sending source or target mesh to remote procs
     for(std::vector< std::pair<int,bool> >::const_iterator it2=_procs_to_send.begin();it2!=_procs_to_send.end();it2++)
-      sendLocalMeshTo((*it2).first,(*it2).second);
+      sendLocalMeshTo((*it2).first,(*it2).second,matrix);
     //fetching remaining meshes
     for(std::vector< std::pair<int,int> >::const_iterator it=toDoListForFetchRemaining.begin();it!=toDoListForFetchRemaining.end();it++)
       {
@@ -266,8 +267,10 @@ namespace ParaMEDMEM
   /*!
    * This methods sends local source if 'sourceOrTarget'==True to proc 'procId'.
    * This methods sends local target if 'sourceOrTarget'==False to proc 'procId'.
+   *
+   * This method prepares the matrix too, for matrix assembling and future matrix-vector computation.
    */
-  void OverlapElementLocator::sendLocalMeshTo(int procId, bool sourceOrTarget) const
+  void OverlapElementLocator::sendLocalMeshTo(int procId, bool sourceOrTarget, OverlapInterpolationMatrix& matrix) const
   {
    vector<int> elems;
    //int myProcId=_group.myRank();
@@ -288,7 +291,11 @@ namespace ParaMEDMEM
      }
    local_mesh->getCellsInBoundingBox(distant_bb,getBoundingBoxAdjustment(),elems);
    DataArrayInt *idsToSend;
-   MEDCouplingPointSet *send_mesh=(MEDCouplingPointSet *)field->getField()->buildSubMeshData(&elems[0],&elems[elems.size()],idsToSend);
+   MEDCouplingPointSet *send_mesh=static_cast<MEDCouplingPointSet *>(field->getField()->buildSubMeshData(&elems[0],&elems[elems.size()],idsToSend));
+   if(sourceOrTarget)
+     matrix.keepTracksOfSourceIds(procId,idsToSend);//Case#1 in Step2 of main algorithm.
+   else
+     matrix.keepTracksOfTargetIds(procId,idsToSend);//Case#0 in Step2 of main algorithm.
    sendMesh(procId,send_mesh,idsToSend);
    send_mesh->decrRef();
    idsToSend->decrRef();
index 4490be609230d1518641d3223fa8517b7f602a0f..754ab6e7c46e1b566b03280ea6e61d811470c3e4 100644 (file)
@@ -35,15 +35,15 @@ namespace ParaMEDMEM
   class ParaFIELD;
   class ProcessorGroup;
   class ParaSUPPORT;
-  class InterpolationMatrix;
-
+  class OverlapInterpolationMatrix;
+  
   class OverlapElementLocator : public INTERP_KERNEL::InterpolationOptions
   {
   public:
     OverlapElementLocator(const ParaFIELD *sourceField, const ParaFIELD *targetField, const ProcessorGroup& group);
     virtual ~OverlapElementLocator();
     const MPI_Comm *getCommunicator() const;
-    void exchangeMeshes();
+    void exchangeMeshes(OverlapInterpolationMatrix& matrix);
     std::vector< std::pair<int,int> > getToDoList() const { return _to_do_list; }
     std::vector< std::vector< int > > getProcsInInteraction() const { return _proc_pairs; }
     std::string getSourceMethod() const;
@@ -55,7 +55,7 @@ namespace ParaMEDMEM
   private:
     void computeBoundingBoxes();
     bool intersectsBoundingBox(int i, int j) const;
-    void sendLocalMeshTo(int procId, bool sourceOrTarget) const;
+    void sendLocalMeshTo(int procId, bool sourceOrTarget, OverlapInterpolationMatrix& matrix) const;
     void receiveRemoteMesh(int procId, bool sourceOrTarget);
     void sendMesh(int procId, const MEDCouplingPointSet *mesh, const DataArrayInt *idsToSend) const;
     void receiveMesh(int procId, MEDCouplingPointSet* &mesh, DataArrayInt *&ids) const;
index f08c2e6641e195dd2286c8157d6923ba0bc28793..edc3f3e0e313f3be7a111c50a2e842d7652578bc 100644 (file)
@@ -61,6 +61,16 @@ namespace ParaMEDMEM
     _target_volume.resize(nbelems);
   }
 
+  void OverlapInterpolationMatrix::keepTracksOfSourceIds(int procId, DataArrayInt *ids)
+  {
+    _mapping.keepTracksOfSourceIds(procId,ids);
+  }
+
+  void OverlapInterpolationMatrix::keepTracksOfTargetIds(int procId, DataArrayInt *ids)
+  {
+    _mapping.keepTracksOfTargetIds(procId,ids);
+  }
+
   OverlapInterpolationMatrix::~OverlapInterpolationMatrix()
   {
   }
@@ -203,42 +213,7 @@ namespace ParaMEDMEM
                                                          const DataArrayInt *srcIds, int srcProc,
                                                          const DataArrayInt *trgIds, int trgProc)
   {
-    //computing matrix with real ids for target
-    int sz=res.size();
-    const int *trgIds2=0;
-    const int *srcIds2=0;
-    int nbTrgIds=_target_field->getField()->getNumberOfTuplesExpected();
-    int nbSrcIds=_source_field->getField()->getNumberOfTuplesExpected();
-    std::vector< std::map<int,double> > res1(sz);
-    INTERP_KERNEL::AutoPtr<int> tmp2=new int[nbTrgIds];
-    INTERP_KERNEL::AutoPtr<int> tmp3=new int[nbSrcIds];
-    if(trgIds)
-      trgIds2=trgIds->getConstPointer();
-    else
-      {
-        trgIds2=tmp2;
-        for(int i=0;i<nbTrgIds;i++)
-          tmp2[i]=i;
-      }
-    if(srcIds)
-      srcIds2=srcIds->getConstPointer();
-    else
-      {
-        srcIds2=tmp3;
-        for(int i=0;i<nbSrcIds;i++)
-          tmp3[i]=i;
-      }
-    for(int i=0;i<sz;i++)
-      {
-        std::map<int,double>& m=res1[i];
-        const std::map<int,double>& ref=res[i];
-        for(std::map<int,double>::const_iterator it=ref.begin();it!=ref.end();it++)
-          {
-            m[srcIds2[(*it).first]]=(*it).second;
-          } 
-      }
-    //dealing source ids
-    _mapping.addContributionST(res1,srcIds2,trgIds2,nbTrgIds,srcProc,trgProc);
+    _mapping.addContributionST(res,srcIds,srcProc,trgIds,trgProc);
   }
 
   /*!
@@ -255,8 +230,10 @@ namespace ParaMEDMEM
 
   void OverlapInterpolationMatrix::computeDeno()
   {
-    if(_target_field->getField()->getNature()==IntegralGlobConstraint)
-      _mapping.computeDenoGlobConstraint();
+    if(_target_field->getField()->getNature()==ConservativeVolumic)
+      _mapping.computeDenoConservativeVolumic(_target_field->getField()->getNumberOfTuplesExpected());
+    else
+      throw INTERP_KERNEL::Exception("Policy Not implemented yet : only ConservativeVolumic defined !");
   }
 
   void OverlapInterpolationMatrix::multiply()
index 1f84e7ced10804be9832f628fa8ec845519745bb..eca4e8c326d9729e82a606ae7369ce34f633cba2 100644 (file)
@@ -41,6 +41,10 @@ namespace ParaMEDMEM
                                const DECOptions& dec_opt,
                                const InterpolationOptions& i_opt);
 
+    void keepTracksOfSourceIds(int procId, DataArrayInt *ids);
+
+    void keepTracksOfTargetIds(int procId, DataArrayInt *ids);
+
     void addContribution(const MEDCouplingPointSet *src, const DataArrayInt *srcIds, const std::string& srcMeth, int srcProcId,
                          const MEDCouplingPointSet *trg, const DataArrayInt *trgIds, const std::string& trgMeth, int trgProcId);
 
index 830d026b8ee836353574951fcca965bfac04505d..dfc078c0ee8d1b64ae37eaacd4e483a22416079b 100644 (file)
@@ -34,21 +34,53 @@ OverlapMapping::OverlapMapping(const ProcessorGroup& group):_group(group)
 {
 }
 
+/*!
+ * This method keeps tracks of source ids to know in step 6 of main algorithm, which tuple ids to send away.
+ * This method incarnates item#1 of step2 algorithm.
+ */
+void OverlapMapping::keepTracksOfSourceIds(int procId, DataArrayInt *ids)
+{
+  ids->incrRef();
+  _src_ids_st2.push_back(ids);
+  _src_proc_st2.push_back(procId);
+}
+
+/*!
+ * This method keeps tracks of target ids to know in step 6 of main algorithm.
+ * This method incarnates item#0 of step2 algorithm.
+ */
+void OverlapMapping::keepTracksOfTargetIds(int procId, DataArrayInt *ids)
+{
+  ids->incrRef();
+  _trg_ids_st2.push_back(ids);
+  _trg_proc_st2.push_back(procId);
+}
+
 /*!
  * This method stores from a matrix in format Target(rows)/Source(cols) for a source procId 'srcProcId' and for a target procId 'trgProcId'.
  * All ids (source and target) are in format of local ids. 
  */
-void OverlapMapping::addContributionST(const std::vector< std::map<int,double> >& matrixST, const int *srcIds, const int *trgIds, int trgIdsLgth, int srcProcId, int trgProcId)
+void OverlapMapping::addContributionST(const std::vector< std::map<int,double> >& matrixST, const DataArrayInt *srcIds, int srcProcId, const DataArrayInt *trgIds, int trgProcId)
 {
   int nbOfRows=matrixST.size();
   _matrixes_st.push_back(matrixST);
-  //_source_ids_st.resize(_source_ids_st.size()+1);
-  //_source_ids_st.back().insert(_source_ids_st.back().end(),srcIds,srcIds+nbOfRows);
   _source_proc_id_st.push_back(srcProcId);
-  //
-  _target_ids_st.resize(_target_ids_st.size()+1);
-  _target_ids_st.back().insert(_target_ids_st.back().end(),trgIds,trgIds+trgIdsLgth);
   _target_proc_id_st.push_back(trgProcId);
+  if(srcIds)
+    {//item#1 of step2 algorithm in proc m. Only to know in advanced nb of recv ids [ (0,1) computed on proc1 and Matrix-Vector on proc1 ]
+      _nb_of_src_ids_proc_st2.push_back(srcIds->getNumberOfTuples());
+      _src_ids_proc_st2.push_back(srcProcId);
+    }
+  else
+    {//item#0 of step2 algorithm in proc k
+      std::set<int> s;
+      for(std::vector< std::map<int,double> >::const_iterator it1=matrixST.begin();it1!=matrixST.end();it1++)
+        for(std::map<int,double>::const_iterator it2=(*it1).begin();it2!=(*it1).end();it2++)
+          s.insert((*it2).first);
+      _src_ids_zip_st2.resize(_src_ids_zip_st2.size()+1);
+      _src_ids_zip_st2.back().insert(_src_ids_zip_st2.back().end(),s.begin(),s.end());
+      _src_ids_zip_proc_st2.push_back(trgProcId);
+    }
 }
 
 /*!
@@ -56,8 +88,7 @@ void OverlapMapping::addContributionST(const std::vector< std::map<int,double> >
  * In 'procsInInteraction' for a proc with id i, is in interaction with procs listed in procsInInteraction[i].
  *
  * This method is in charge to send matrixes in AlltoAll mode.
- * After the call of this method 'this' contains the matrixST for all source elements of the current proc and 
- * matrixTS for all target elements of current proc.
+ * After the call of this method 'this' contains the matrixST for all source elements of the current proc
  */
 void OverlapMapping::prepare(const std::vector< std::vector<int> >& procsInInteraction, int nbOfTrgElems)
 {
@@ -70,11 +101,15 @@ void OverlapMapping::prepare(const std::vector< std::vector<int> >& procsInInter
   INTERP_KERNEL::AutoPtr<int> nbsend3=new int[grpSize];
   std::fill<int *>(nbsend,nbsend+grpSize,0);
   int myProcId=_group.myRank();
+  _proc_ids_to_recv_vector_st.clear();
+  int curProc=0;
+  for(std::vector< std::vector<int> >::const_iterator it1=procsInInteraction.begin();it1!=procsInInteraction.end();it1++,curProc++)
+    if(std::find((*it1).begin(),(*it1).end(),myProcId)!=(*it1).end())
+      _proc_ids_to_recv_vector_st.push_back(curProc);
+  _proc_ids_to_send_vector_st=procsInInteraction[myProcId];
   for(std::size_t i=0;i<_matrixes_st.size();i++)
-    {
-      if(_source_proc_id_st[i]!=myProcId)
-        nbsend[_source_proc_id_st[i]]=_matrixes_st[i].size();
-    }
+    if(_source_proc_id_st[i]==myProcId)
+      nbsend[_target_proc_id_st[i]]=_matrixes_st[i].size();
   INTERP_KERNEL::AutoPtr<int> nbrecv=new int[grpSize];
   commInterface.allToAll(nbsend,1,MPI_INT,nbrecv,1,MPI_INT,*comm);
   //exchanging matrix
@@ -90,7 +125,7 @@ void OverlapMapping::prepare(const std::vector< std::vector<int> >& procsInInter
   INTERP_KERNEL::AutoPtr<int> bigArrRecv=new int[nbrecv2[grpSize-1]+nbrecv1[grpSize-1]];
   commInterface.allToAllV(bigArr,nbsend2,nbsend3,MPI_INT,
                           bigArrRecv,nbrecv1,nbrecv2,MPI_INT,
-                          *comm);// sending ids of sparse matrix (n+1 elems) + src ids (n elems)
+                          *comm);// sending ids of sparse matrix (n+1 elems)
   //second phase echange target ids
   std::fill<int *>(nbsend2,nbsend2+grpSize,0);
   INTERP_KERNEL::AutoPtr<int> nbrecv3=new int[grpSize];
@@ -112,10 +147,11 @@ void OverlapMapping::prepare(const std::vector< std::vector<int> >& procsInInter
   //finishing
   unserializationST(nbOfTrgElems,nbrecv,bigArrRecv,nbrecv1,nbrecv2,
                     bigArrRecv2,bigArrDRecv2,nbrecv3,nbrecv4);
-  //finish to fill _the_matrix_st and _the_matrix_st_target_proc_id with already in place matrix in _matrixes_st
-  finishToFillFinalMatrixST(nbOfTrgElems);
-  //exchanging target ids for future sending
-  prepareIdsToSendST();
+  //updating _src_ids_zip_st2 and _src_ids_zip_st2 with received matrix.
+  updateZipSourceIdsForFuture();
+  //finish to fill _the_matrix_st with already in place matrix in _matrixes_st
+  finishToFillFinalMatrixST();
+  //printTheMatrix();
 }
 
 /*!
@@ -143,6 +179,72 @@ void OverlapMapping::computeDenoGlobConstraint()
     }
 }
 
+/*!
+ * Compute denominators.
+ */
+void OverlapMapping::computeDenoConservativeVolumic(int nbOfTuplesTrg)
+{
+  CommInterface commInterface=_group.getCommInterface();
+  const MPIProcessorGroup *group=static_cast<const MPIProcessorGroup*>(&_group);
+  const MPI_Comm *comm=group->getComm();
+  int myProcId=_group.myRank();
+  //
+  _the_deno_st.clear();
+  std::size_t sz1=_the_matrix_st.size();
+  _the_deno_st.resize(sz1);
+  std::vector<double> deno(nbOfTuplesTrg);
+  for(std::size_t i=0;i<sz1;i++)
+    {
+      const std::vector< std::map<int,double> >& mat=_the_matrix_st[i];
+      int curSrcId=_the_matrix_st_source_proc_id[i];
+      std::vector<int>::iterator isItem1=std::find(_trg_proc_st2.begin(),_trg_proc_st2.end(),curSrcId);
+      int rowId=0;
+      if(isItem1==_trg_proc_st2.end() || curSrcId==myProcId)//item1 of step2 main algo. Simple, because rowId of mat are directly target ids.
+        {
+          for(std::vector< std::map<int,double> >::const_iterator it1=mat.begin();it1!=mat.end();it1++,rowId++)
+            for(std::map<int,double>::const_iterator it2=(*it1).begin();it2!=(*it1).end();it2++)
+              deno[rowId]+=(*it2).second;
+        }
+      else
+        {//item0 of step2 main algo. More complicated.
+          std::vector<int>::iterator fnd=isItem1;//std::find(_trg_proc_st2.begin(),_trg_proc_st2.end(),curSrcId);
+          int locId=std::distance(_trg_proc_st2.begin(),fnd);
+          const DataArrayInt *trgIds=_trg_ids_st2[locId];
+          const int *trgIds2=trgIds->getConstPointer();
+          for(std::vector< std::map<int,double> >::const_iterator it1=mat.begin();it1!=mat.end();it1++,rowId++)
+            for(std::map<int,double>::const_iterator it2=(*it1).begin();it2!=(*it1).end();it2++)
+              deno[trgIds2[rowId]]+=(*it2).second;
+        }
+    }
+  //
+  for(std::size_t i=0;i<sz1;i++)
+    {
+      int rowId=0;
+      const std::vector< std::map<int,double> >& mat=_the_matrix_st[i];
+      int curSrcId=_the_matrix_st_source_proc_id[i];
+      std::vector<int>::iterator isItem1=std::find(_trg_proc_st2.begin(),_trg_proc_st2.end(),curSrcId);
+      std::vector< std::map<int,double> >& denoM=_the_deno_st[i];
+      denoM.resize(mat.size());
+      if(isItem1==_trg_proc_st2.end() || curSrcId==myProcId)//item1 of step2 main algo. Simple, because rowId of mat are directly target ids.
+        {
+          int rowId=0;
+          for(std::vector< std::map<int,double> >::const_iterator it1=mat.begin();it1!=mat.end();it1++,rowId++)
+            for(std::map<int,double>::const_iterator it2=(*it1).begin();it2!=(*it1).end();it2++)
+              denoM[rowId][(*it2).first]=deno[rowId];
+        }
+      else
+        {
+          std::vector<int>::iterator fnd=isItem1;
+          int locId=std::distance(_trg_proc_st2.begin(),fnd);
+          const DataArrayInt *trgIds=_trg_ids_st2[locId];
+          const int *trgIds2=trgIds->getConstPointer();
+          for(std::vector< std::map<int,double> >::const_iterator it1=mat.begin();it1!=mat.end();it1++,rowId++)
+            for(std::map<int,double>::const_iterator it2=(*it1).begin();it2!=(*it1).end();it2++)
+              denoM[rowId][(*it2).first]=deno[trgIds2[rowId]];
+        }
+    }
+}
+
 /*!
  * This method performs step #0/3 in serialization process.
  * \param count tells specifies nb of elems to send to corresponding proc id. size equal to _group.size().
@@ -158,10 +260,10 @@ void OverlapMapping::serializeMatrixStep0ST(const int *nbOfElemsSrc, int *&bigAr
   int myProcId=_group.myRank();
   for(std::size_t i=0;i<_matrixes_st.size();i++)
     {
-      if(_source_proc_id_st[i]!=myProcId)
+      if(_source_proc_id_st[i]==myProcId)// && _target_proc_id_st[i]!=myProcId
         {
-          count[_source_proc_id_st[i]]=2*_matrixes_st[i].size()+1;
-          szz+=2*_matrixes_st[i].size()+1;
+          count[_target_proc_id_st[i]]=_matrixes_st[i].size()+1;
+          szz+=_matrixes_st[i].size()+1;
         }
     }
   bigArr=new int[szz];
@@ -170,15 +272,14 @@ void OverlapMapping::serializeMatrixStep0ST(const int *nbOfElemsSrc, int *&bigAr
     offsets[i]=offsets[i-1]+count[i-1];
   for(std::size_t i=0;i<_matrixes_st.size();i++)
     {
-      if(_source_proc_id_st[i]!=myProcId)
+      if(_source_proc_id_st[i]==myProcId)
         {
-          int start=offsets[_source_proc_id_st[i]];
+          int start=offsets[_target_proc_id_st[i]];
           int *work=bigArr+start;
           *work=0;
           const std::vector< std::map<int,double> >& mat=_matrixes_st[i];
           for(std::vector< std::map<int,double> >::const_iterator it=mat.begin();it!=mat.end();it++,work++)
             work[1]=work[0]+(*it).size();
-          std::copy(_source_ids_st[i].begin(),_source_ids_st[i].end(),work+1);
         }
     }
   //
@@ -186,7 +287,7 @@ void OverlapMapping::serializeMatrixStep0ST(const int *nbOfElemsSrc, int *&bigAr
   for(int i=0;i<grpSize;i++)
     {
       if(nbOfElemsSrc[i]>0)
-        countForRecv[i]=2*nbOfElemsSrc[i]+1;
+        countForRecv[i]=nbOfElemsSrc[i]+1;
       else
         countForRecv[i]=0;
       if(i>0)
@@ -221,13 +322,13 @@ int OverlapMapping::serializeMatrixStep1ST(const int *nbOfElemsSrc, const int *r
   int fullLgth=0;
   for(std::size_t i=0;i<_matrixes_st.size();i++)
     {
-      if(_source_proc_id_st[i]!=myProcId)
+      if(_source_proc_id_st[i]==myProcId)
         {
           const std::vector< std::map<int,double> >& mat=_matrixes_st[i];
           int lgthToSend=0;
           for(std::vector< std::map<int,double> >::const_iterator it=mat.begin();it!=mat.end();it++)
             lgthToSend+=(*it).size();
-          count[_source_proc_id_st[i]]=lgthToSend;
+          count[_target_proc_id_st[i]]=lgthToSend;
           fullLgth+=lgthToSend;
         }
     }
@@ -240,7 +341,7 @@ int OverlapMapping::serializeMatrixStep1ST(const int *nbOfElemsSrc, const int *r
   fullLgth=0;
   for(std::size_t i=0;i<_matrixes_st.size();i++)
     {
-      if(_source_proc_id_st[i]!=myProcId)
+      if(_source_proc_id_st[i]==myProcId)
         {
           const std::vector< std::map<int,double> >& mat=_matrixes_st[i];
           for(std::vector< std::map<int,double> >::const_iterator it1=mat.begin();it1!=mat.end();it1++)
@@ -278,35 +379,19 @@ void OverlapMapping::unserializationST(int nbOfTrgElems,
     if(nbOfElemsSrcPerProc[i]!=0)
       _the_matrix_st_source_proc_id.push_back(i);
   int nbOfPseudoProcs=_the_matrix_st_source_proc_id.size();//_the_matrix_st_target_proc_id.size() contains number of matrix fetched remotely whose sourceProcId==myProcId
-  _the_matrix_st_source_ids.resize(nbOfPseudoProcs);
   _the_matrix_st.resize(nbOfPseudoProcs);
-  for(int i=0;i<nbOfPseudoProcs;i++)
-    _the_matrix_st[i].resize(nbOfTrgElems);
   //
   int j=0;
   for(int i=0;i<grpSize;i++)
     if(nbOfElemsSrcPerProc[i]!=0)
       {
-        std::set<int> sourceIdsZip;// this zip is to reduce amount of data to send/rexcv on transposeMultiply target ids transfert
+        _the_matrix_st[j].resize(nbOfElemsSrcPerProc[i]);
         for(int k=0;k<nbOfElemsSrcPerProc[i];k++)
           {
             int offs=bigArrRecv[bigArrRecvOffs[i]+k];
             int lgthOfMap=bigArrRecv[bigArrRecvOffs[i]+k+1]-offs;
             for(int l=0;l<lgthOfMap;l++)
-              sourceIdsZip.insert(bigArrRecv2[bigArrRecv2Offs[i]+offs+l]);
-          }
-        _the_matrix_st_source_ids[j].insert(_the_matrix_st_source_ids[j].end(),sourceIdsZip.begin(),sourceIdsZip.end());
-        std::map<int,int> old2newSrcIds;
-        int newNbTrg=0;
-        for(std::set<int>::const_iterator it=sourceIdsZip.begin();it!=sourceIdsZip.end();it++,newNbTrg++)
-          old2newSrcIds[*it]=newNbTrg;
-        for(int k=0;k<nbOfElemsSrcPerProc[i];k++)
-          {
-            int srcId=bigArrRecv[bigArrRecvOffs[i]+nbOfElemsSrcPerProc[i]+1+k];
-            int offs=bigArrRecv[bigArrRecvOffs[i]+k];
-            int lgthOfMap=bigArrRecv[bigArrRecvOffs[i]+k+1]-offs;
-            for(int l=0;l<lgthOfMap;l++)
-              _the_matrix_st[j][old2newSrcIds[bigArrRecv2[bigArrRecv2Offs[i]+offs+l]]][srcId]=bigArrDRecv2[bigArrRecv2Offs[i]+offs+l];
+              _the_matrix_st[j][k][bigArrRecv2[bigArrRecv2Offs[i]+offs+l]]=bigArrDRecv2[bigArrRecv2Offs[i]+offs+l];
           }
         j++;
       }
@@ -317,41 +402,29 @@ void OverlapMapping::unserializationST(int nbOfTrgElems,
  * and 'this->_the_matrix_st_target_ids'.
  * This method finish the job of filling 'this->_the_matrix_st' and 'this->_the_matrix_st_target_proc_id' by putting candidates in 'this->_matrixes_st' into them.
  */
-void OverlapMapping::finishToFillFinalMatrixST(int nbOfTrgElems)
+void OverlapMapping::finishToFillFinalMatrixST()
 {
   int myProcId=_group.myRank();
   int sz=_matrixes_st.size();
   int nbOfEntryToAdd=0;
   for(int i=0;i<sz;i++)
-    if(_source_proc_id_st[i]==myProcId)
-      {
-        nbOfEntryToAdd++;
-        _the_matrix_st_source_proc_id.push_back(_target_proc_id_st[i]);
-      }
+    if(_source_proc_id_st[i]!=myProcId)
+      nbOfEntryToAdd++;
   if(nbOfEntryToAdd==0)
     return ;
   int oldNbOfEntry=_the_matrix_st.size();
   int newNbOfEntry=oldNbOfEntry+nbOfEntryToAdd;
   _the_matrix_st.resize(newNbOfEntry);
-  _the_matrix_st_source_ids.resize(newNbOfEntry);
-  for(int i=oldNbOfEntry;i<newNbOfEntry;i++)
-    _the_matrix_st[i].resize(nbOfTrgElems);
   int j=oldNbOfEntry;
   for(int i=0;i<sz;i++)
-    if(_source_proc_id_st[i]==myProcId)
+    if(_source_proc_id_st[i]!=myProcId)
       {
         const std::vector<std::map<int,double> >& mat=_matrixes_st[i];
-        const std::vector<int>& srcIds=_source_ids_st[i];
-        int sz=srcIds.size();//assert srcIds.size()==mat.size()
-        for(int k=0;k<sz;k++)
-          {
-            const std::map<int,double>& m2=mat[k];
-            for(std::map<int,double>::const_iterator it=m2.begin();it!=m2.end();it++)
-              _the_matrix_st[j][(*it).first][srcIds[k]]=(*it).second;
-          }
-        _the_matrix_st_source_ids[j].insert(_the_matrix_st_source_ids[j].end(),_source_ids_st[i].begin(),_source_ids_st[i].end());
+        _the_matrix_st[j]=mat;
+        _the_matrix_st_source_proc_id.push_back(_source_proc_id_st[i]);
         j++;
       }
+  _matrixes_st.clear();
 }
 
 /*!
@@ -410,111 +483,196 @@ void OverlapMapping::prepareIdsToSendST()
  * This method performs a transpose multiply of 'fieldInput' and put the result into 'fieldOutput'.
  * 'fieldInput' is expected to be the sourcefield and 'fieldOutput' the targetfield.
  */
-void OverlapMapping::multiply(const MEDCouplingFieldDouble *fieldInput, MEDCouplingFieldDouble *fieldOutput)
-{
-  
-}
-
-/*!
- * This method performs a transpose multiply of 'fieldInput' and put the result into 'fieldOutput'.
- * 'fieldInput' is expected to be the targetfield and 'fieldOutput' the sourcefield.
- */
-void OverlapMapping::transposeMultiply(const MEDCouplingFieldDouble *fieldInput, MEDCouplingFieldDouble *fieldOutput)
+void OverlapMapping::multiply(const MEDCouplingFieldDouble *fieldInput, MEDCouplingFieldDouble *fieldOutput) const
 {
-#if 0
-  CommInterface commInterface=_group.getCommInterface();
-  const MPIProcessorGroup *group=static_cast<const MPIProcessorGroup*>(&_group);
-  const MPI_Comm *comm=group->getComm();
-  int grpSize=_group.size();
-  INTERP_KERNEL::AutoPtr<int> nbsend=new int[grpSize];
-  std::fill<int *>(nbsend,nbsend+grpSize,0);
-  for(std::size_t i=0;i<_the_matrix_st.size();i++)
-    nbsend[_the_matrix_st_target_proc_id[i]]=_the_matrix_st_target_ids[i].size();
-  INTERP_KERNEL::AutoPtr<int> nbrecv=new int[grpSize];
-  commInterface.allToAll(nbsend,1,MPI_INT,nbrecv,1,MPI_INT,*comm);
-  int nbOfCompo=fieldInput->getNumberOfComponents();//to improve same number of components
-  std::transform((int *)nbsend,(int *)nbsend+grpSize,(int *)nbsend,std::bind2nd(std::multiplies<int>(),nbOfCompo));
-  std::transform((int *)nbrecv,(int *)nbrecv+grpSize,(int *)nbrecv,std::bind2nd(std::multiplies<int>(),nbOfCompo));
-  INTERP_KERNEL::AutoPtr<int> nbsend2=new int[grpSize];
-  nbsend2[0]=0;
-  for(int i=1;i<grpSize;i++)
-    nbsend2[i]=nbsend2[i-1]+nbsend[i-1];
-  int szToSend=nbsend2[grpSize-1]+nbsend[grpSize-1];
-  INTERP_KERNEL::AutoPtr<double> nbsend3=new double[szToSend];
-  double *work=nbsend3;
-  for(std::size_t i=0;i<_the_matrix_st.size();i++)
-    {
-      MEDCouplingAutoRefCountObjectPtr<DataArrayDouble> ptr=fieldInput->getArray()->selectByTupleId(&_target_ids_st[i][0],&(_target_ids_st[i][0])+_target_ids_st[i].size());
-      std::copy(ptr->getConstPointer(),ptr->getConstPointer()+nbsend[_target_proc_id_st[i]],work+nbsend2[_target_proc_id_st[i]]);
-    }
-  INTERP_KERNEL::AutoPtr<int> nbrecv3=new int[grpSize];
-  nbrecv3[0]=0;
-  for(int i=1;i<grpSize;i++)
-    nbrecv3[i]=nbrecv3[i-1]+nbrecv[i-1];
-  int szToFetch=nbsend2[grpSize-1]+nbrecv[grpSize-1];
-  INTERP_KERNEL::AutoPtr<double> nbrecv2=new double[szToFetch];
-#endif
-#if 0
-  //
   int nbOfCompo=fieldInput->getNumberOfComponents();//to improve same number of components to test
   CommInterface commInterface=_group.getCommInterface();
   const MPIProcessorGroup *group=static_cast<const MPIProcessorGroup*>(&_group);
   const MPI_Comm *comm=group->getComm();
   int grpSize=_group.size();
+  int myProcId=_group.myRank();
   //
   INTERP_KERNEL::AutoPtr<int> nbsend=new int[grpSize];
-  std::fill<int *>(nbsend,nbsend+grpSize,0);
   INTERP_KERNEL::AutoPtr<int> nbsend2=new int[grpSize];
-  for(int i=0;i<grpSize;i++)
-    nbsend[i]=nbOfCompo*_target_ids_to_send_st[i].size();
-  nbsend2[0];
-  for(int i=1;i<grpSize;i++)
-    nbsend2[i]=nbsend2[i-1]+nbsend[i-1];
-  int szToSend=nbsend2[grpSize-1]+nbsend[grpSize-1];
-  INTERP_KERNEL::AutoPtr<double> nbsend3=new double[szToSend];
   INTERP_KERNEL::AutoPtr<int> nbrecv=new int[grpSize];
-  INTERP_KERNEL::AutoPtr<int> nbrecv3=new int[grpSize];
+  INTERP_KERNEL::AutoPtr<int> nbrecv2=new int[grpSize];
+  std::fill<int *>(nbsend,nbsend+grpSize,0);
   std::fill<int *>(nbrecv,nbrecv+grpSize,0);
-  for(std::size_t i=0;i<_the_matrix_st_source_proc_id.size();i++)
-    nbrecv[_the_matrix_st_source_proc_id[i]]=_the_matrix_st_target_ids[i].size()*nbOfCompo;
-  nbrecv3[0]=0;
-  for(int i=1;i<grpSize;i++)
-    nbrecv3[i]=nbrecv3[i-1]+nbrecv[i-1];
-  int szToRecv=nbrecv3[grpSize-1]+nbrecv[grpSize-1];
-  double *work=nbsend3;
+  nbsend2[0]=0;
+  nbrecv2[0]=0;
+  std::vector<double> valsToSend;
   for(int i=0;i<grpSize;i++)
     {
-      MEDCouplingAutoRefCountObjectPtr<DataArrayDouble> ptr=fieldInput->getArray()->selectByTupleId(&(_target_ids_to_send_st[i][0]),
-                                                                                                    &(_target_ids_to_send_st[i][0])+_target_ids_to_send_st[i].size());
-      std::copy(ptr->getConstPointer(),ptr->getConstPointer()+nbsend[i],work+nbsend2[i]);
+      if(std::find(_proc_ids_to_send_vector_st.begin(),_proc_ids_to_send_vector_st.end(),i)!=_proc_ids_to_send_vector_st.end())
+        {
+          std::vector<int>::const_iterator isItem1=std::find(_src_proc_st2.begin(),_src_proc_st2.end(),i);
+          MEDCouplingAutoRefCountObjectPtr<DataArrayDouble> vals;
+          if(isItem1!=_src_proc_st2.end())//item1 of step2 main algo
+            {
+              int id=std::distance(_src_proc_st2.begin(),isItem1);
+              vals=fieldInput->getArray()->selectByTupleId(_src_ids_st2[id]->getConstPointer(),_src_ids_st2[id]->getConstPointer()+_src_ids_st2[id]->getNumberOfTuples());
+            }
+          else
+            {//item0 of step2 main algo
+              int id=std::distance(_src_ids_zip_proc_st2.begin(),std::find(_src_ids_zip_proc_st2.begin(),_src_ids_zip_proc_st2.end(),i));
+              vals=fieldInput->getArray()->selectByTupleId(&(_src_ids_zip_st2[id])[0],&(_src_ids_zip_st2[id])[0]+_src_ids_zip_st2[id].size());
+            }
+          nbsend[i]=vals->getNbOfElems();
+          valsToSend.insert(valsToSend.end(),vals->getConstPointer(),vals->getConstPointer()+nbsend[i]);
+        }
+      if(std::find(_proc_ids_to_recv_vector_st.begin(),_proc_ids_to_recv_vector_st.end(),i)!=_proc_ids_to_recv_vector_st.end())
+        {
+          std::vector<int>::const_iterator isItem0=std::find(_trg_proc_st2.begin(),_trg_proc_st2.end(),i);
+          if(isItem0==_trg_proc_st2.end())//item1 of step2 main algo [ (0,1) computed on proc1 and Matrix-Vector on proc1 ]
+            {
+              std::vector<int>::const_iterator it1=std::find(_src_ids_proc_st2.begin(),_src_ids_proc_st2.end(),i);
+              if(it1!=_src_ids_proc_st2.end())
+                {
+                  int id=std::distance(_src_ids_proc_st2.begin(),it1);
+                  nbrecv[i]=_nb_of_src_ids_proc_st2[id]*nbOfCompo;
+                }
+              else if(i==myProcId)
+                {
+                  nbrecv[i]=fieldInput->getNumberOfTuplesExpected()*nbOfCompo;
+                }
+              else
+                throw INTERP_KERNEL::Exception("Plouff ! send email to anthony.geay@cea.fr ! ");
+            }
+          else
+            {//item0 of step2 main algo [ (2,1) computed on proc2 but Matrix-Vector on proc1 ] [(1,0) computed on proc1 but Matrix-Vector on proc0]
+              int id=std::distance(_src_ids_zip_proc_st2.begin(),std::find(_src_ids_zip_proc_st2.begin(),_src_ids_zip_proc_st2.end(),i));
+              nbrecv[i]=_src_ids_zip_st2[id].size()*nbOfCompo;
+            }
+        }
     }
-  INTERP_KERNEL::AutoPtr<double> nbrecv2=new double[szToRecv];
-  commInterface.allToAllV(nbsend3,nbsend,nbsend2,MPI_DOUBLE,
-                          nbrecv2,nbrecv,nbrecv3,MPI_DOUBLE,
-                          *comm);
-  // deserialization
+  for(int i=1;i<grpSize;i++)
+    {
+      nbsend2[i]=nbsend2[i-1]+nbsend[i-1];
+      nbrecv2[i]=nbrecv2[i-1]+nbrecv[i-1];
+    }
+  INTERP_KERNEL::AutoPtr<double> bigArr=new double[nbrecv2[grpSize-1]+nbrecv[grpSize-1]];
+  commInterface.allToAllV(&valsToSend[0],nbsend,nbsend2,MPI_DOUBLE,
+                          bigArr,nbrecv,nbrecv2,MPI_DOUBLE,*comm);
   fieldOutput->getArray()->fillWithZero();
-  INTERP_KERNEL::AutoPtr<double> workZone=new double[nbOfCompo];
-  for(std::size_t i=0;i<_the_matrix_st.size();i++)
+  INTERP_KERNEL::AutoPtr<double> tmp=new double[nbOfCompo];
+  for(int i=0;i<grpSize;i++)
     {
-      double *res=fieldOutput->getArray()->getPointer();
-      int sourceProcId=_the_matrix_st_source_proc_id[i];
-      const std::vector<std::map<int,double> >& m=_the_matrix_st[i];
-      const std::vector<std::map<int,double> >& deno=_the_deno_st[i];
-      const double *vecOnTargetProcId=((const double *)nbrecv2)+nbrecv3[sourceProcId];
-      std::size_t nbOfIds=m.size();
-      for(std::size_t j=0;j<nbOfIds;j++,res+=nbOfCompo)
+      if(nbrecv[i]>0)
         {
-          const std::map<int,double>& m2=m[j];
-          const std::map<int,double>& deno2=deno[j];
-          for(std::map<int,double>::const_iterator it=m2.begin();it!=m2.end();it++)
+          double *pt=fieldOutput->getArray()->getPointer();
+          std::vector<int>::const_iterator it=std::find(_the_matrix_st_source_proc_id.begin(),_the_matrix_st_source_proc_id.end(),i);
+          if(it==_the_matrix_st_source_proc_id.end())
+            throw INTERP_KERNEL::Exception("Big problem !");
+          int id=std::distance(_the_matrix_st_source_proc_id.begin(),it);
+          const std::vector< std::map<int,double> >& mat=_the_matrix_st[id];
+          const std::vector< std::map<int,double> >& deno=_the_deno_st[id];
+          std::vector<int>::const_iterator isItem0=std::find(_trg_proc_st2.begin(),_trg_proc_st2.end(),i);
+          if(isItem0==_trg_proc_st2.end())//item1 of step2 main algo [ (0,1) computed on proc1 and Matrix-Vector on proc1 ]
             {
-              std::map<int,double>::const_iterator it2=deno2.find((*it).first);
-              std::transform(vecOnTargetProcId+((*it).first*nbOfCompo),vecOnTargetProcId+((*it).first+1)*nbOfCompo,(double *)workZone,std::bind2nd(std::multiplies<double>(),(*it).second));
-              std::transform((double *)workZone,(double *)workZone+nbOfCompo,(double *)workZone,std::bind2nd(std::multiplies<double>(),1./(*it2).second));
-              std::transform((double *)workZone,(double *)workZone+nbOfCompo,res,res,std::plus<double>());
+              int nbOfTrgTuples=mat.size();
+              for(int j=0;j<nbOfTrgTuples;j++,pt+=nbOfCompo)
+                {
+                  const std::map<int,double>& mat1=mat[j];
+                  const std::map<int,double>& deno1=deno[j];
+                  std::map<int,double>::const_iterator it4=deno1.begin();
+                  for(std::map<int,double>::const_iterator it3=mat1.begin();it3!=mat1.end();it3++,it4++)
+                    {
+                      std::transform(bigArr+nbrecv2[i]+((*it3).first)*nbOfCompo,bigArr+nbrecv2[i]+((*it3).first+1)*(nbOfCompo),(double *)tmp,std::bind2nd(std::multiplies<double>(),(*it3).second/(*it4).second));
+                      std::transform((double *)tmp,(double *)tmp+nbOfCompo,pt,pt,std::plus<double>());
+                    }
+                }
+            }
+          else
+            {//item0 of step2 main algo [ (2,1) computed on proc2 but Matrix-Vector on proc1 ]
+              double *pt=fieldOutput->getArray()->getPointer();
+              std::map<int,int> zipCor;
+              int id=std::distance(_src_ids_zip_proc_st2.begin(),std::find(_src_ids_zip_proc_st2.begin(),_src_ids_zip_proc_st2.end(),i));
+              const std::vector<int> zipIds=_src_ids_zip_st2[id];
+              int newId=0;
+              for(std::vector<int>::const_iterator it=zipIds.begin();it!=zipIds.end();it++,newId++)
+                zipCor[*it]=newId;
+              int id2=std::distance(_trg_proc_st2.begin(),std::find(_trg_proc_st2.begin(),_trg_proc_st2.end(),i));
+              const DataArrayInt *tgrIds=_trg_ids_st2[id2];
+              const int *tgrIds2=tgrIds->getConstPointer();
+              int nbOfTrgTuples=mat.size();
+              for(int j=0;j<nbOfTrgTuples;j++)
+                {
+                  const std::map<int,double>& mat1=mat[j];
+                  const std::map<int,double>& deno1=deno[j];
+                  std::map<int,double>::const_iterator it5=deno1.begin();
+                  for(std::map<int,double>::const_iterator it3=mat1.begin();it3!=mat1.end();it3++,it5++)
+                    {
+                      std::map<int,int>::const_iterator it4=zipCor.find((*it3).first);
+                      if(it4==zipCor.end())
+                        throw INTERP_KERNEL::Exception("Hmmmmm send e mail to anthony.geay@cea.fr !");
+                      std::transform(bigArr+nbrecv2[i]+((*it4).second)*nbOfCompo,bigArr+nbrecv2[i]+((*it4).second+1)*(nbOfCompo),(double *)tmp,std::bind2nd(std::multiplies<double>(),(*it3).second/(*it5).second));
+                      std::transform((double *)tmp,(double *)tmp+nbOfCompo,pt+tgrIds2[j]*nbOfCompo,pt+tgrIds2[j]*nbOfCompo,std::plus<double>());
+                    }
+                }
             }
         }
     }
-#endif
 }
+
+/*!
+ * This method performs a transpose multiply of 'fieldInput' and put the result into 'fieldOutput'.
+ * 'fieldInput' is expected to be the targetfield and 'fieldOutput' the sourcefield.
+ */
+void OverlapMapping::transposeMultiply(const MEDCouplingFieldDouble *fieldInput, MEDCouplingFieldDouble *fieldOutput)
+{
+}
+
+/*!
+ * This method should be called immediately after _the_matrix_st has been filled with remote computed matrix put in this proc for Matrix-Vector.
+ * This method computes for these matrix the minimal set of source ids corresponding to the source proc id. 
+ */
+void OverlapMapping::updateZipSourceIdsForFuture()
+{
+  CommInterface commInterface=_group.getCommInterface();
+  const MPIProcessorGroup *group=static_cast<const MPIProcessorGroup*>(&_group);
+  const MPI_Comm *comm=group->getComm();
+  int grpSize=_group.size();
+  int myProcId=_group.myRank();
+  int nbOfMatrixRecveived=_the_matrix_st_source_proc_id.size();
+  for(int i=0;i<nbOfMatrixRecveived;i++)
+    {
+      int curSrcProcId=_the_matrix_st_source_proc_id[i];
+      if(curSrcProcId!=myProcId)
+        {
+          const std::vector< std::map<int,double> >& mat=_the_matrix_st[i];
+          _src_ids_zip_proc_st2.push_back(curSrcProcId);
+          _src_ids_zip_st2.resize(_src_ids_zip_st2.size()+1);
+          std::set<int> s;
+          for(std::vector< std::map<int,double> >::const_iterator it1=mat.begin();it1!=mat.end();it1++)
+            for(std::map<int,double>::const_iterator it2=(*it1).begin();it2!=(*it1).end();it2++)
+              s.insert((*it2).first);
+          _src_ids_zip_st2.back().insert(_src_ids_zip_st2.back().end(),s.begin(),s.end());
+        }
+    }
+}
+
+// #include <iostream>
+
+// void OverlapMapping::printTheMatrix() const
+// {
+//   CommInterface commInterface=_group.getCommInterface();
+//   const MPIProcessorGroup *group=static_cast<const MPIProcessorGroup*>(&_group);
+//   const MPI_Comm *comm=group->getComm();
+//   int grpSize=_group.size();
+//   int myProcId=_group.myRank();
+//   std::cerr << "I am proc #" << myProcId << std::endl;
+//   int nbOfMat=_the_matrix_st.size();
+//   std::cerr << "I do manage " << nbOfMat << "matrix : "<< std::endl;
+//   for(int i=0;i<nbOfMat;i++)
+//     {
+//       std::cerr << "   - Matrix #" << i << " on source proc #" << _the_matrix_st_source_proc_id[i];
+//       const std::vector< std::map<int,double> >& locMat=_the_matrix_st[i];
+//       for(std::vector< std::map<int,double> >::const_iterator it1=locMat.begin();it1!=locMat.end();it1++)
+//         {
+//           for(std::map<int,double>::const_iterator it2=(*it1).begin();it2!=(*it1).end();it2++)
+//             std::cerr << "(" << (*it2).first << "," << (*it2).second << "), ";
+//           std::cerr << std::endl;
+//         }
+//     }
+//   std::cerr << "*********" << std::endl;
+// }
index 0bcd76f12c79c00a8b6736679107d50a1c5af88a..3ac4abdb68e7b78579ececd851ed05f8f8679d77 100644 (file)
 #ifndef __OVERLAPMAPPING_HXX__
 #define __OVERLAPMAPPING_HXX__
 
+#include "MEDCouplingAutoRefCountObjectPtr.hxx"
+
 #include <vector>
 #include <map>
 
 namespace ParaMEDMEM
 {
   class ProcessorGroup;
+  class DataArrayInt;
   class MEDCouplingFieldDouble;
 
   class OverlapMapping
   {
   public:
     OverlapMapping(const ProcessorGroup& group);
-    void addContributionST(const std::vector< std::map<int,double> >& matrixST, const int *srcIds, const int *trgIds, int trgIdsLgth, int srcProcId, int trgProcId);
+    void keepTracksOfSourceIds(int procId, DataArrayInt *ids);
+    void keepTracksOfTargetIds(int procId, DataArrayInt *ids);
+    void addContributionST(const std::vector< std::map<int,double> >& matrixST, const DataArrayInt *srcIds, int srcProcId, const DataArrayInt *trgIds, int trgProcId);
     void prepare(const std::vector< std::vector<int> >& procsInInteraction, int nbOfTrgElems);
+    void computeDenoConservativeVolumic(int nbOfTuplesTrg);
     void computeDenoGlobConstraint();
     //
-    void multiply(const MEDCouplingFieldDouble *fieldInput, MEDCouplingFieldDouble *fieldOutput);
+    void multiply(const MEDCouplingFieldDouble *fieldInput, MEDCouplingFieldDouble *fieldOutput) const;
     void transposeMultiply(const MEDCouplingFieldDouble *fieldInput, MEDCouplingFieldDouble *fieldOutput);
   private:
     void serializeMatrixStep0ST(const int *nbOfElemsSrc, int *&bigArr, int *count, int *offsets,
@@ -46,10 +52,21 @@ namespace ParaMEDMEM
                                int *countForRecv, int *offsForRecv) const;
     void unserializationST(int nbOfTrgElems, const int *nbOfElemsSrcPerProc, const int *bigArrRecv, const int *bigArrRecvCounts, const int *bigArrRecvOffs,
                            const int *bigArrRecv2, const double *bigArrDRecv2, const int *bigArrRecv2Count, const int *bigArrRecv2Offs);
-    void finishToFillFinalMatrixST(int nbOfTrgElems);
+    void finishToFillFinalMatrixST();
     void prepareIdsToSendST();
+    void updateZipSourceIdsForFuture();
+    //void printTheMatrix() const;
   private:
     const ProcessorGroup &_group;
+    //! vector of ids
+    std::vector< MEDCouplingAutoRefCountObjectPtr<DataArrayInt> > _src_ids_st2;//item #1
+    std::vector< int > _src_proc_st2;//item #1
+    std::vector< MEDCouplingAutoRefCountObjectPtr<DataArrayInt> > _trg_ids_st2;//item #0
+    std::vector< int > _trg_proc_st2;//item #0
+    std::vector< int > _nb_of_src_ids_proc_st2;//item #1
+    std::vector< int > _src_ids_proc_st2;//item #1
+    std::vector< std::vector<int> > _src_ids_zip_st2;//same size as _src_ids_zip_proc_st2. Sorted. specifies for each id the corresponding ids to send. This is for item0 of Step2 of main algorithm
+    std::vector< int > _src_ids_zip_proc_st2;
     //! vector of matrixes the first entry correspond to source proc id in _source_ids_st
     std::vector< std::vector< std::map<int,double> > > _matrixes_st;
     std::vector< std::vector<int> > _source_ids_st;
@@ -61,7 +78,10 @@ namespace ParaMEDMEM
     std::vector< int > _the_matrix_st_source_proc_id;
     std::vector< std::vector<int> > _the_matrix_st_source_ids;
     std::vector< std::vector< std::map<int,double> > > _the_deno_st;
-    //! this attribute is of size _group.size(); for each procId in _group _target_ids_to_send_st[procId] contains tupleId to send abroad
+    //! this attribute stores the proc ids that wait for data from this proc ids for matrix-vector computation
+    std::vector< int > _proc_ids_to_send_vector_st;
+    std::vector< int > _proc_ids_to_recv_vector_st;
+    //! this attribute is of size _group.size(); for each procId in _group _source_ids_to_send_st[procId] contains tupleId to send abroad
     std::vector< std::vector<int> > _source_ids_to_send_st;
   };
 }
index 05a0b32757a60f21388aa511d2011744c0368c1a..ea2128b5fa1322d41c1d84e483ea1e299dfb317f 100644 (file)
@@ -47,7 +47,7 @@ class ParaMEDMEMTest : public CppUnit::TestFixture
   CPPUNIT_TEST(testInterpKernelDEC2DM1D_P0P0);
   CPPUNIT_TEST(testInterpKernelDECPartialProcs);
   CPPUNIT_TEST(testInterpKernelDEC3DSurfEmptyBBox);
-  //CPPUNIT_TEST(testOverlapDEC1);
+  CPPUNIT_TEST(testOverlapDEC1);
 
   CPPUNIT_TEST(testSynchronousEqualInterpKernelWithoutInterpNativeDEC_2D);
   CPPUNIT_TEST(testSynchronousEqualInterpKernelWithoutInterpDEC_2D);
index e4391a4da19d3bc83b7ae96fd4cf76fc0b6a09ad..5a41497be98f5bd74c43bfa800e524d4013ed8e1 100644 (file)
@@ -81,7 +81,7 @@ void ParaMEDMEMTest::testOverlapDEC1()
       ParaMEDMEM::ComponentTopology comptopo;
       parameshS=new ParaMEDMEM::ParaMESH(meshS,*dec.getGrp(),"source mesh");
       parafieldS=new ParaMEDMEM::ParaFIELD(ParaMEDMEM::ON_CELLS,ParaMEDMEM::NO_TIME,parameshS,comptopo);
-      parafieldS->getField()->setNature(ParaMEDMEM::ConservativeVolumic);//ConservativeVolumic IntegralGlobConstraint
+      parafieldS->getField()->setNature(ParaMEDMEM::ConservativeVolumic);//IntegralGlobConstraint
       double *valsS=parafieldS->getField()->getArray()->getPointer();
       valsS[0]=7.; valsS[1]=8.;
       //
@@ -98,7 +98,7 @@ void ParaMEDMEMTest::testOverlapDEC1()
       meshT->finishInsertingCells();
       parameshT=new ParaMEDMEM::ParaMESH(meshT,*dec.getGrp(),"target mesh");
       parafieldT=new ParaMEDMEM::ParaFIELD(ParaMEDMEM::ON_CELLS,ParaMEDMEM::NO_TIME,parameshT,comptopo);
-      parafieldT->getField()->setNature(ParaMEDMEM::IntegralGlobConstraint);//ConservativeVolumic
+      parafieldT->getField()->setNature(ParaMEDMEM::ConservativeVolumic);//IntegralGlobConstraint
       double *valsT=parafieldT->getField()->getArray()->getPointer();
       valsT[0]=7.;
     }
@@ -122,7 +122,7 @@ void ParaMEDMEMTest::testOverlapDEC1()
       ParaMEDMEM::ComponentTopology comptopo;
       parameshS=new ParaMEDMEM::ParaMESH(meshS,*dec.getGrp(),"source mesh");
       parafieldS=new ParaMEDMEM::ParaFIELD(ParaMEDMEM::ON_CELLS,ParaMEDMEM::NO_TIME,parameshS,comptopo);
-      parafieldS->getField()->setNature(ParaMEDMEM::IntegralGlobConstraint);//ConservativeVolumic
+      parafieldS->getField()->setNature(ParaMEDMEM::ConservativeVolumic);//IntegralGlobConstraint
       double *valsS=parafieldS->getField()->getArray()->getPointer();
       valsS[0]=9.; valsS[1]=11.;
       //
@@ -139,7 +139,7 @@ void ParaMEDMEMTest::testOverlapDEC1()
       meshT->finishInsertingCells();
       parameshT=new ParaMEDMEM::ParaMESH(meshT,*dec.getGrp(),"target mesh");
       parafieldT=new ParaMEDMEM::ParaFIELD(ParaMEDMEM::ON_CELLS,ParaMEDMEM::NO_TIME,parameshT,comptopo);
-      parafieldT->getField()->setNature(ParaMEDMEM::IntegralGlobConstraint);
+      parafieldT->getField()->setNature(ParaMEDMEM::ConservativeVolumic);//IntegralGlobConstraint
       double *valsT=parafieldT->getField()->getArray()->getPointer();
       valsT[0]=8.;
     }
@@ -162,7 +162,7 @@ void ParaMEDMEMTest::testOverlapDEC1()
       ParaMEDMEM::ComponentTopology comptopo;
       parameshS=new ParaMEDMEM::ParaMESH(meshS,*dec.getGrp(),"source mesh");
       parafieldS=new ParaMEDMEM::ParaFIELD(ParaMEDMEM::ON_CELLS,ParaMEDMEM::NO_TIME,parameshS,comptopo);
-      parafieldS->getField()->setNature(ParaMEDMEM::IntegralGlobConstraint);//ConservativeVolumic
+      parafieldS->getField()->setNature(ParaMEDMEM::ConservativeVolumic);//IntegralGlobConstraint
       double *valsS=parafieldS->getField()->getArray()->getPointer();
       valsS[0]=10.;
       //
@@ -179,7 +179,7 @@ void ParaMEDMEMTest::testOverlapDEC1()
       meshT->finishInsertingCells();
       parameshT=new ParaMEDMEM::ParaMESH(meshT,*dec.getGrp(),"target mesh");
       parafieldT=new ParaMEDMEM::ParaFIELD(ParaMEDMEM::ON_CELLS,ParaMEDMEM::NO_TIME,parameshT,comptopo);
-      parafieldT->getField()->setNature(ParaMEDMEM::IntegralGlobConstraint);
+      parafieldT->getField()->setNature(ParaMEDMEM::ConservativeVolumic);//IntegralGlobConstraint
       double *valsT=parafieldT->getField()->getArray()->getPointer();
       valsT[0]=9.;
     }
@@ -188,20 +188,18 @@ void ParaMEDMEMTest::testOverlapDEC1()
   dec.synchronize();
   dec.sendRecvData(true);
   //
-  /*if(rank==0)
+  if(rank==0)
     {
-      CPPUNIT_ASSERT_DOUBLES_EQUAL(7.5,parafieldS->getField()->getArray()->getIJ(0,0),1e-12);
-      CPPUNIT_ASSERT_DOUBLES_EQUAL(8.,parafieldS->getField()->getArray()->getIJ(0,1),1e-12);
+      CPPUNIT_ASSERT_DOUBLES_EQUAL(8.75,parafieldT->getField()->getArray()->getIJ(0,0),1e-12);
     }
   if(rank==1)
     {
-      CPPUNIT_ASSERT_DOUBLES_EQUAL(8.,parafieldS->getField()->getArray()->getIJ(0,0),1e-12);
-      CPPUNIT_ASSERT_DOUBLES_EQUAL(8.,parafieldS->getField()->getArray()->getIJ(0,1),1e-12);
+      CPPUNIT_ASSERT_DOUBLES_EQUAL(8.5,parafieldT->getField()->getArray()->getIJ(0,0),1e-12);
     }
   if(rank==2)
     {
-      CPPUNIT_ASSERT_DOUBLES_EQUAL(8.5,parafieldS->getField()->getArray()->getIJ(0,0),1e-12);
-      }*/
+      CPPUNIT_ASSERT_DOUBLES_EQUAL(10.5,parafieldT->getField()->getArray()->getIJ(0,0),1e-12);
+    }
   delete parafieldS;
   delete parafieldT;
   delete parameshS;