Salome HOME
CCAR: import_hook.py was too strict in ensure_list (ImportError raised)
[modules/kernel.git] / src / MPIContainer / MPIObject_i.cxx
index ff45f5e5ea3d6f9330707abda3a782451af94fb2..fd04587e1e411aa15d5d6d7d1ff6525c46c3c79d 100644 (file)
@@ -1,4 +1,4 @@
-//  Copyright (C) 2007-2008  CEA/DEN, EDF R&D, OPEN CASCADE
+//  Copyright (C) 2007-2010  CEA/DEN, EDF R&D, OPEN CASCADE
 //
 //  Copyright (C) 2003-2007  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
 //  CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
 //
 //  See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
 //
+
 //  SALOME MPIContainer : implemenation of container based on MPI libraries
 //  File   : MPIObject_i.cxx
 //  Module : SALOME
 //
 #include "MPIObject_i.hxx"
 #include "utilities.h"
-using namespace std;
+#include "Utils_SALOME_Exception.hxx"
+
 #define TIMEOUT 5
 
 MPIObject_i::MPIObject_i()
@@ -35,13 +37,6 @@ MPIObject_i::MPIObject_i()
   _tior=NULL;
 }
 
-MPIObject_i::MPIObject_i(int nbproc, int numproc)
-{
-  _nbproc = nbproc;
-  _numproc = numproc;
-  _tior=NULL;
-}
-
 MPIObject_i::~MPIObject_i()
 {
   if(_tior) delete _tior;
@@ -64,138 +59,195 @@ void MPIObject_i::tior(const Engines::IORTab& ior)
     (*_tior)[ip] = ior[ip];
 }
 
-void MPIObject_i::BCastIOR(CORBA::ORB_ptr orb, Engines::MPIObject_ptr pobj, 
-                          bool amiCont) throw(POException)
+void MPIObject_i::BCastIOR(CORBA::ORB_ptr orb, Engines::MPIObject_ptr pobj, bool amiCont)
 {
   int err, ip, n;
   char *ior;
   MPI_Status status; /* status de reception de message MPI */
-
-  if( _numproc == 0 ){
-
-    //Allocation du tableau des IOR
-    Engines::IORTab_var iort = new Engines::IORTab;
-    iort->length(_nbproc);
-
-    iort[0] = pobj;
-
-    // Process 0 recupere les ior de l'object sur les autres process
-    for(ip=1;ip<_nbproc;ip++){
-      err = MPI_Recv(&n,1,MPI_INT,ip,ip,MPI_COMM_WORLD,&status);
-      if(err){
-       MESSAGE("[" << _numproc << "] MPI_RECV error");
-       exit(1);
-      }
-      // Allocation de la chaine de longueur n
-      ior = new char[n];
-      err = MPI_Recv(ior,n,MPI_CHAR,ip,2*ip,MPI_COMM_WORLD,&status);
-      if(err){
-       MESSAGE("[" << _numproc << "] MPI_RECV error");
-       exit(1);
-      }
-      iort[ip] = orb->string_to_object(ior);
-      delete [] ior;
-      if(CORBA::is_nil(iort[ip]))
-       throw POException(ip,"MPI Component not loaded");
-    }
-    // On donne le tableau des ior a l'objet Corba du process 0
-    if( amiCont )
-      tior(*(iort._retn()));
-    else
-      pobj->tior(*(iort._retn()));
-  }
-  else{
-    // Conversion IOR vers string
-    ior = orb->object_to_string(pobj);
-    n = strlen(ior) + 1;
-    // On envoie l'IOR au process 0
-    err = MPI_Send(&n,1,MPI_INT,0,_numproc,MPI_COMM_WORLD);
-    if(err){
-      MESSAGE("[" << _numproc << "] MPI_SEND error");
-      exit(1);
+  std::ostringstream msg;
+
+  if( _numproc == 0 )
+    {
+
+      //Allocation du tableau des IOR
+      Engines::IORTab_var iort = new Engines::IORTab;
+      iort->length(_nbproc);
+      
+      iort[0] = pobj;
+
+      // Process 0 recupere les ior de l'object sur les autres process
+      for(ip=1;ip<_nbproc;ip++)
+        {
+          err = MPI_Recv(&n,1,MPI_INT,ip,ip,MPI_COMM_WORLD,&status);
+          if(err)
+            {
+              msg << "[" << _numproc << "] MPI_RECV error";
+              throw SALOME_Exception(msg.str().c_str());
+            }
+          // Allocation de la chaine de longueur n
+          ior = new char[n];
+          err = MPI_Recv(ior,n,MPI_CHAR,ip,2*ip,MPI_COMM_WORLD,&status);
+          if(err)
+            {
+              msg << "[" << _numproc << "] MPI_RECV error";
+              throw SALOME_Exception(msg.str().c_str());
+            }
+          iort[ip] = orb->string_to_object(ior);
+          delete [] ior;
+          if(CORBA::is_nil(iort[ip]))
+            {
+              msg << "[" << ip << "] MPI Component not loaded";
+              throw SALOME_Exception(msg.str().c_str());
+            }
+        }
+      // On donne le tableau des ior a l'objet Corba du process 0
+      if( amiCont )
+        tior(*(iort._retn()));
+      else
+        pobj->tior(*(iort._retn()));
     }
-    err = MPI_Send(ior,n,MPI_CHAR,0,2*_numproc,MPI_COMM_WORLD);
-    if(err){
-      MESSAGE("[" << _numproc << "] MPI_SEND error");
-      exit(1);
+  else
+    {
+      // Conversion IOR vers string
+      ior = orb->object_to_string(pobj);
+      n = strlen(ior) + 1;
+      // On envoie l'IOR au process 0
+      err = MPI_Send(&n,1,MPI_INT,0,_numproc,MPI_COMM_WORLD);
+      if(err)
+        {
+          msg << "[" << _numproc << "] MPI_SEND error";
+          throw SALOME_Exception(msg.str().c_str());
+        }
+      err = MPI_Send(ior,n,MPI_CHAR,0,2*_numproc,MPI_COMM_WORLD);
+      if(err)
+        {
+          msg << "[" << _numproc << "] MPI_SEND error";
+          throw SALOME_Exception(msg.str().c_str());
+        }
+      CORBA::string_free(ior);
     }
-    CORBA::string_free(ior);
-  }
-
 }
 
 #ifdef HAVE_MPI2
-MPI_Comm MPIObject_i::remoteMPI2Connect(string service) throw(POException)
+void MPIObject_i::remoteMPI2Connect(std::string service)
 {
   int i;
-  MPI_Comm gcom;
+  char port_name[MPI_MAX_PORT_NAME];
   char port_name_clt[MPI_MAX_PORT_NAME];
+  std::ostringstream msg;
 
-  _srv = 0;
-  _service = service;
+  if( service.size() == 0 )
+    {
+      msg << "[" << _numproc << "] You have to give a service name !";
+      throw SALOME_Exception(msg.str().c_str());
+    }
+
+  if( _srv.find(service) != _srv.end() )
+    {
+      msg << "[" << _numproc << "] service " << service << " already exist !";
+      throw SALOME_Exception(msg.str().c_str());
+    }
+
+  _srv[service] = false;
 
   MPI_Barrier(MPI_COMM_WORLD);
 
   MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_RETURN);
-  if( _numproc == 0 ){ 
-    /* rank 0 try to be a server. If service is already published, try to be a cient */
-    MPI_Open_port(MPI_INFO_NULL, _port_name); 
-    if ( MPI_Publish_name((char*)_service.c_str(), MPI_INFO_NULL, _port_name) == MPI_SUCCESS )  {
-      _srv = 1;
-      MESSAGE("[" << _numproc << "] service " << _service << " available at " << _port_name << "\n");
-    }      
-    else if ( MPI_Lookup_name((char*)_service.c_str(), MPI_INFO_NULL, port_name_clt) == MPI_SUCCESS ){
-      MESSAGE("[" << _numproc << "] I get the connection with " << _service << " at " << port_name_clt << "!\n");
-      MPI_Close_port( _port_name );
-    }
-    else{
-      /* Throw exception */
-      MESSAGE("[" << _numproc << "] Error on connection with " << _service << " at " << port_name_clt << "!\n");
-      throw POException(_numproc,"Error on connection with " + _service + " at " + port_name_clt);
+  if( _numproc == 0 )
+    { 
+      /* rank 0 try to be a server. If service is already published, try to be a cient */
+      MPI_Open_port(MPI_INFO_NULL, port_name); 
+      if ( MPI_Publish_name((char*)service.c_str(), MPI_INFO_NULL, port_name) == MPI_SUCCESS )
+        {
+          _srv[service] = true;
+          _port_name[service] = port_name;
+          MESSAGE("[" << _numproc << "] service " << service << " available at " << port_name << std::endl);
+        }      
+      else if ( MPI_Lookup_name((char*)service.c_str(), MPI_INFO_NULL, port_name_clt) == MPI_SUCCESS )
+        {
+          MESSAGE("[" << _numproc << "] I get the connection with " << service << " at " << port_name_clt << std::endl);
+          MPI_Close_port( port_name );
+        }
+      else
+        {
+          msg << "[" << _numproc << "] Error on connection with " << service << " at " << port_name_clt;
+          throw SALOME_Exception(msg.str().c_str());
+        }
     }
-  }
-  else{
-    i=0;
-    /* Waiting rank 0 publish name and try to be a client */
-    while ( i != TIMEOUT  ) {
-      sleep(1);
-      if ( MPI_Lookup_name((char*)_service.c_str(), MPI_INFO_NULL, port_name_clt) == MPI_SUCCESS ){
-       MESSAGE("[" << _numproc << "] I get the connection with " << _service << " at " << port_name_clt << "!\n");
-       break;
-      }
-      i++;
-    }
-    if(i==TIMEOUT){
-      /* Throw exception */
-      MESSAGE("[" << _numproc << "] Error on connection with " << _service << " at " << port_name_clt << "!\n");
-      throw POException(_numproc,"Error on connection with " + _service + " at " + port_name_clt);
+  else
+    {
+      i=0;
+      /* Waiting rank 0 publish name and try to be a client */
+      while ( i != TIMEOUT  ) 
+        {
+          sleep(1);
+          if ( MPI_Lookup_name((char*)service.c_str(), MPI_INFO_NULL, port_name_clt) == MPI_SUCCESS )
+            {
+              MESSAGE("[" << _numproc << "] I get the connection with " << service << " at " << port_name_clt << std::endl);
+              break;
+            }
+          i++;
+        }
+      if(i==TIMEOUT)
+        {
+          msg << "[" << _numproc << "] Error on connection with " << service << " at " << port_name_clt;
+          throw SALOME_Exception(msg.str().c_str());
+        }
     }
-  }
   MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_ARE_FATAL);
   
   /* If rank 0 is server, all processes call MPI_Comm_accept */
   /* If rank 0 is not server, all processes call MPI_Comm_connect */
-  MPI_Bcast(&_srv,1,MPI_INT,0,MPI_COMM_WORLD);
-  if ( _srv )
-    MPI_Comm_accept( _port_name, MPI_INFO_NULL, 0, MPI_COMM_WORLD, &gcom );
+  int srv = (int)_srv[service];
+  MPI_Bcast(&srv,1,MPI_INT,0,MPI_COMM_WORLD);
+  _srv[service] = (bool)srv;
+  if ( _srv[service] )
+    MPI_Comm_accept( port_name, MPI_INFO_NULL, 0, MPI_COMM_WORLD, &(_icom[service]) );
   else
-    MPI_Comm_connect(port_name_clt, MPI_INFO_NULL, 0, MPI_COMM_WORLD, &gcom );
+    MPI_Comm_connect(port_name_clt, MPI_INFO_NULL, 0, MPI_COMM_WORLD, &(_icom[service]) );
 
-  /* only rank 0 can be server for unpublish name */
-  if(_numproc != 0) _srv = 0;
+  /* create global communicator: servers have low index in global communicator*/
+  MPI_Intercomm_merge(_icom[service],!_srv[service],&(_gcom[service]));
 
-  return gcom;
+  /* only rank 0 can be server for unpublish name */
+  if(_numproc != 0) _srv[service] = false;
 
 }
 
-void MPIObject_i::remoteMPI2Disconnect(MPI_Comm gcom)
+void MPIObject_i::remoteMPI2Disconnect(std::string service)
 {
-  MPI_Comm_disconnect( &gcom ); 
-  if ( _srv ) {
-    MPI_Unpublish_name((char*)_service.c_str(), MPI_INFO_NULL, _port_name); 
-    MESSAGE("[" << _numproc << "] " << _service << ": close port " << _port_name << "\n");
-    MPI_Close_port( _port_name ); 
-  }
+  std::ostringstream msg;
+
+  if( service.size() == 0 )
+    {
+      msg << "[" << _numproc << "] You have to give a service name !";
+      throw SALOME_Exception(msg.str().c_str());
+    }
+
+  if( _srv.find(service) == _srv.end() )
+    {
+      msg << "[" << _numproc << "] service " << service << " don't exist !";
+      throw SALOME_Exception(msg.str().c_str());
+    }
+  
+  MPI_Comm_disconnect( &(_gcom[service]) ); 
+  if ( _srv[service] )
+    {
+
+      char port_name[MPI_MAX_PORT_NAME];
+      strcpy(port_name,_port_name[service].c_str());
+
+      MPI_Unpublish_name((char*)service.c_str(), MPI_INFO_NULL, port_name); 
+      MESSAGE("[" << _numproc << "] " << service << ": close port " << _port_name[service] << std::endl);
+      MPI_Close_port( port_name ); 
+      _port_name.erase(service);
+    }
+  
+  _gcom.erase(service);
+  _icom.erase(service);
+  _srv.erase(service);
+
 }
 #endif