From: secher Date: Wed, 29 Apr 2009 10:05:44 +0000 (+0000) Subject: improvement of MPIObject X-Git-Tag: CCAR_cm3_start~3 X-Git-Url: http://git.salome-platform.org/gitweb/?a=commitdiff_plain;h=4c27b3226a92637b5694fae1a34f14f9779dc83e;p=modules%2Fkernel.git improvement of MPIObject --- diff --git a/src/MPIContainer/MPIObject_i.cxx b/src/MPIContainer/MPIObject_i.cxx index ff45f5e5e..af78c0f2c 100644 --- a/src/MPIContainer/MPIObject_i.cxx +++ b/src/MPIContainer/MPIObject_i.cxx @@ -64,8 +64,7 @@ void MPIObject_i::tior(const Engines::IORTab& ior) (*_tior)[ip] = ior[ip]; } -void MPIObject_i::BCastIOR(CORBA::ORB_ptr orb, Engines::MPIObject_ptr pobj, - bool amiCont) throw(POException) +void MPIObject_i::BCastIOR(CORBA::ORB_ptr orb, Engines::MPIObject_ptr pobj, bool amiCont) { int err, ip, n; char *ior; @@ -125,33 +124,43 @@ void MPIObject_i::BCastIOR(CORBA::ORB_ptr orb, Engines::MPIObject_ptr pobj, } #ifdef HAVE_MPI2 -MPI_Comm MPIObject_i::remoteMPI2Connect(string service) throw(POException) +void MPIObject_i::remoteMPI2Connect(bool high, string service) { int i; - MPI_Comm gcom; + char port_name[MPI_MAX_PORT_NAME]; char port_name_clt[MPI_MAX_PORT_NAME]; - _srv = 0; - _service = service; + if( service.size() == 0 ){ + MESSAGE("[" << _numproc << "] You have to give a service name !"); + throw POException(_numproc,"You have to give a service name !"); + } + + if( _srv.find(service) != _srv.end() ){ + MESSAGE("[" << _numproc << "] service " << service << " already exist !"); + throw POException(_numproc,"service " + service + " already exist !"); + } + + _srv[service] = false; MPI_Barrier(MPI_COMM_WORLD); MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_RETURN); if( _numproc == 0 ){ /* rank 0 try to be a server. If service is already published, try to be a cient */ - MPI_Open_port(MPI_INFO_NULL, _port_name); - if ( MPI_Publish_name((char*)_service.c_str(), MPI_INFO_NULL, _port_name) == MPI_SUCCESS ) { - _srv = 1; - MESSAGE("[" << _numproc << "] service " << _service << " available at " << _port_name << "\n"); + MPI_Open_port(MPI_INFO_NULL, port_name); + if ( MPI_Publish_name((char*)service.c_str(), MPI_INFO_NULL, port_name) == MPI_SUCCESS ) { + _srv[service] = true; + _port_name[service] = port_name; + MESSAGE("[" << _numproc << "] service " << service << " available at " << port_name << endl); } - else if ( MPI_Lookup_name((char*)_service.c_str(), MPI_INFO_NULL, port_name_clt) == MPI_SUCCESS ){ - MESSAGE("[" << _numproc << "] I get the connection with " << _service << " at " << port_name_clt << "!\n"); - MPI_Close_port( _port_name ); + else if ( MPI_Lookup_name((char*)service.c_str(), MPI_INFO_NULL, port_name_clt) == MPI_SUCCESS ){ + MESSAGE("[" << _numproc << "] I get the connection with " << service << " at " << port_name_clt << endl); + MPI_Close_port( port_name ); } else{ /* Throw exception */ - MESSAGE("[" << _numproc << "] Error on connection with " << _service << " at " << port_name_clt << "!\n"); - throw POException(_numproc,"Error on connection with " + _service + " at " + port_name_clt); + MESSAGE("[" << _numproc << "] Error on connection with " << service << " at " << port_name_clt << endl); + throw POException(_numproc,"Error on connection with " + service + " at " + port_name_clt); } } else{ @@ -159,43 +168,67 @@ MPI_Comm MPIObject_i::remoteMPI2Connect(string service) throw(POException) /* Waiting rank 0 publish name and try to be a client */ while ( i != TIMEOUT ) { sleep(1); - if ( MPI_Lookup_name((char*)_service.c_str(), MPI_INFO_NULL, port_name_clt) == MPI_SUCCESS ){ - MESSAGE("[" << _numproc << "] I get the connection with " << _service << " at " << port_name_clt << "!\n"); + if ( MPI_Lookup_name((char*)service.c_str(), MPI_INFO_NULL, port_name_clt) == MPI_SUCCESS ){ + MESSAGE("[" << _numproc << "] I get the connection with " << service << " at " << port_name_clt << endl); break; } i++; } if(i==TIMEOUT){ /* Throw exception */ - MESSAGE("[" << _numproc << "] Error on connection with " << _service << " at " << port_name_clt << "!\n"); - throw POException(_numproc,"Error on connection with " + _service + " at " + port_name_clt); + MESSAGE("[" << _numproc << "] Error on connection with " << service << " at " << port_name_clt << endl); + throw POException(_numproc,"Error on connection with " + service + " at " + port_name_clt); } } MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_ARE_FATAL); /* If rank 0 is server, all processes call MPI_Comm_accept */ /* If rank 0 is not server, all processes call MPI_Comm_connect */ - MPI_Bcast(&_srv,1,MPI_INT,0,MPI_COMM_WORLD); - if ( _srv ) - MPI_Comm_accept( _port_name, MPI_INFO_NULL, 0, MPI_COMM_WORLD, &gcom ); + int srv = (int)_srv[service]; + MPI_Bcast(&srv,1,MPI_INT,0,MPI_COMM_WORLD); + _srv[service] = (bool)srv; + if ( _srv[service] ) + MPI_Comm_accept( port_name, MPI_INFO_NULL, 0, MPI_COMM_WORLD, &(_icom[service]) ); else - MPI_Comm_connect(port_name_clt, MPI_INFO_NULL, 0, MPI_COMM_WORLD, &gcom ); + MPI_Comm_connect(port_name_clt, MPI_INFO_NULL, 0, MPI_COMM_WORLD, &(_icom[service]) ); - /* only rank 0 can be server for unpublish name */ - if(_numproc != 0) _srv = 0; + /* create global communicator */ + MPI_Intercomm_merge(_icom[service],high,&(_gcom[service])); - return gcom; + /* only rank 0 can be server for unpublish name */ + if(_numproc != 0) _srv[service] = false; } -void MPIObject_i::remoteMPI2Disconnect(MPI_Comm gcom) +void MPIObject_i::remoteMPI2Disconnect(std::string service) { - MPI_Comm_disconnect( &gcom ); - if ( _srv ) { - MPI_Unpublish_name((char*)_service.c_str(), MPI_INFO_NULL, _port_name); - MESSAGE("[" << _numproc << "] " << _service << ": close port " << _port_name << "\n"); - MPI_Close_port( _port_name ); + + if( service.size() == 0 ){ + MESSAGE("[" << _numproc << "] You have to give a service name !"); + throw POException(_numproc,"You have to give a service name !"); } + + if( _srv.find(service) == _srv.end() ){ + MESSAGE("[" << _numproc << "] service " << service << " don't exist !"); + throw POException(_numproc,"service " + service + " don't exist !"); + } + + MPI_Comm_disconnect( &(_gcom[service]) ); + if ( _srv[service] ) { + + char port_name[MPI_MAX_PORT_NAME]; + strcpy(port_name,_port_name[service].c_str()); + + MPI_Unpublish_name((char*)service.c_str(), MPI_INFO_NULL, port_name); + MESSAGE("[" << _numproc << "] " << service << ": close port " << _port_name[service] << endl); + MPI_Close_port( port_name ); + _port_name.erase(service); + } + + _gcom.erase(service); + _icom.erase(service); + _srv.erase(service); + } #endif diff --git a/src/MPIContainer/MPIObject_i.hxx b/src/MPIContainer/MPIObject_i.hxx index 641cb1138..66d24f58d 100644 --- a/src/MPIContainer/MPIObject_i.hxx +++ b/src/MPIContainer/MPIObject_i.hxx @@ -30,7 +30,6 @@ #include #include #include CORBA_SERVER_HEADER(SALOME_MPIObject) -#define defaultService "SERVER" class POException { @@ -58,18 +57,21 @@ class MPIObject_i: public POA_Engines::MPIObject // IOR des objets paralleles sur tous les process mpi Engines::IORTab* _tior; // Echange des IOR de l'objet entre process - void BCastIOR(CORBA::ORB_ptr orb,Engines::MPIObject_ptr pobj,bool amiCont) throw(POException); + void BCastIOR(CORBA::ORB_ptr orb,Engines::MPIObject_ptr pobj,bool amiCont); #ifdef HAVE_MPI2 // MPI2 connection - MPI_Comm remoteMPI2Connect(std::string service=defaultService) throw(POException); + void remoteMPI2Connect(bool high, std::string service); // MPI2 disconnection - void remoteMPI2Disconnect(MPI_Comm gcom); + void remoteMPI2Disconnect(std::string service); #endif +protected: + std::map _gcom; + private: - int _srv; - char _port_name[MPI_MAX_PORT_NAME]; - std::string _service; + std::map _icom; + std::map _srv; + std::map _port_name; } ;