Salome HOME
Base implementation of Notebook
[modules/kernel.git] / src / MPIContainer / MPIObject_i.cxx
1 //  Copyright (C) 2007-2008  CEA/DEN, EDF R&D, OPEN CASCADE
2 //
3 //  Copyright (C) 2003-2007  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 //  CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
5 //
6 //  This library is free software; you can redistribute it and/or
7 //  modify it under the terms of the GNU Lesser General Public
8 //  License as published by the Free Software Foundation; either
9 //  version 2.1 of the License.
10 //
11 //  This library is distributed in the hope that it will be useful,
12 //  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 //  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 //  Lesser General Public License for more details.
15 //
16 //  You should have received a copy of the GNU Lesser General Public
17 //  License along with this library; if not, write to the Free Software
18 //  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
19 //
20 //  See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 //
22 //  SALOME MPIContainer : implemenation of container based on MPI libraries
23 //  File   : MPIObject_i.cxx
24 //  Module : SALOME
25 //
26 #include "MPIObject_i.hxx"
27 #include "utilities.h"
28 using namespace std;
29 #define TIMEOUT 5
30
31 MPIObject_i::MPIObject_i()
32 {
33   MPI_Comm_size( MPI_COMM_WORLD, &_nbproc );
34   MPI_Comm_rank( MPI_COMM_WORLD, &_numproc );
35   _tior=NULL;
36 }
37
38 MPIObject_i::MPIObject_i(int nbproc, int numproc)
39 {
40   _nbproc = nbproc;
41   _numproc = numproc;
42   _tior=NULL;
43 }
44
45 MPIObject_i::~MPIObject_i()
46 {
47   if(_tior) delete _tior;
48 }
49
50 Engines::IORTab* MPIObject_i::tior()
51 {
52   Engines::IORTab_var tior = new Engines::IORTab;
53   tior->length(_tior->length());
54   for(unsigned int ip=0;ip<tior->length();ip++)
55     tior[ip] = (*_tior)[ip];
56   return tior._retn(); 
57 };
58
59 void MPIObject_i::tior(const Engines::IORTab& ior)
60 {
61   _tior = new Engines::IORTab;
62   _tior->length(ior.length());
63   for(unsigned int ip=0;ip<ior.length();ip++)
64     (*_tior)[ip] = ior[ip];
65 }
66
67 void MPIObject_i::BCastIOR(CORBA::ORB_ptr orb, Engines::MPIObject_ptr pobj, bool amiCont)
68 {
69   int err, ip, n;
70   char *ior;
71   MPI_Status status; /* status de reception de message MPI */
72
73   if( _numproc == 0 ){
74
75     //Allocation du tableau des IOR
76     Engines::IORTab_var iort = new Engines::IORTab;
77     iort->length(_nbproc);
78
79     iort[0] = pobj;
80
81     // Process 0 recupere les ior de l'object sur les autres process
82     for(ip=1;ip<_nbproc;ip++){
83       err = MPI_Recv(&n,1,MPI_INT,ip,ip,MPI_COMM_WORLD,&status);
84       if(err){
85         MESSAGE("[" << _numproc << "] MPI_RECV error");
86         exit(1);
87       }
88       // Allocation de la chaine de longueur n
89       ior = new char[n];
90       err = MPI_Recv(ior,n,MPI_CHAR,ip,2*ip,MPI_COMM_WORLD,&status);
91       if(err){
92         MESSAGE("[" << _numproc << "] MPI_RECV error");
93         exit(1);
94       }
95       iort[ip] = orb->string_to_object(ior);
96       delete [] ior;
97       if(CORBA::is_nil(iort[ip]))
98         throw POException(ip,"MPI Component not loaded");
99     }
100     // On donne le tableau des ior a l'objet Corba du process 0
101     if( amiCont )
102       tior(*(iort._retn()));
103     else
104       pobj->tior(*(iort._retn()));
105   }
106   else{
107     // Conversion IOR vers string
108     ior = orb->object_to_string(pobj);
109     n = strlen(ior) + 1;
110     // On envoie l'IOR au process 0
111     err = MPI_Send(&n,1,MPI_INT,0,_numproc,MPI_COMM_WORLD);
112     if(err){
113       MESSAGE("[" << _numproc << "] MPI_SEND error");
114       exit(1);
115     }
116     err = MPI_Send(ior,n,MPI_CHAR,0,2*_numproc,MPI_COMM_WORLD);
117     if(err){
118       MESSAGE("[" << _numproc << "] MPI_SEND error");
119       exit(1);
120     }
121     CORBA::string_free(ior);
122   }
123
124 }
125
126 #ifdef HAVE_MPI2
127 void MPIObject_i::remoteMPI2Connect(string service)
128 {
129   int i;
130   char port_name[MPI_MAX_PORT_NAME];
131   char port_name_clt[MPI_MAX_PORT_NAME];
132
133   if( service.size() == 0 ){
134     MESSAGE("[" << _numproc << "] You have to give a service name !");
135     throw POException(_numproc,"You have to give a service name !");
136   }
137
138   if( _srv.find(service) != _srv.end() ){
139     MESSAGE("[" << _numproc << "] service " << service << " already exist !");
140     throw POException(_numproc,"service " + service + " already exist !");
141   }
142
143   _srv[service] = false;
144
145   MPI_Barrier(MPI_COMM_WORLD);
146
147   MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_RETURN);
148   if( _numproc == 0 ){ 
149     /* rank 0 try to be a server. If service is already published, try to be a cient */
150     MPI_Open_port(MPI_INFO_NULL, port_name); 
151     if ( MPI_Publish_name((char*)service.c_str(), MPI_INFO_NULL, port_name) == MPI_SUCCESS )  {
152       _srv[service] = true;
153       _port_name[service] = port_name;
154       MESSAGE("[" << _numproc << "] service " << service << " available at " << port_name << endl);
155     }      
156     else if ( MPI_Lookup_name((char*)service.c_str(), MPI_INFO_NULL, port_name_clt) == MPI_SUCCESS ){
157       MESSAGE("[" << _numproc << "] I get the connection with " << service << " at " << port_name_clt << endl);
158       MPI_Close_port( port_name );
159     }
160     else{
161       /* Throw exception */
162       MESSAGE("[" << _numproc << "] Error on connection with " << service << " at " << port_name_clt << endl);
163       throw POException(_numproc,"Error on connection with " + service + " at " + port_name_clt);
164     }
165   }
166   else{
167     i=0;
168     /* Waiting rank 0 publish name and try to be a client */
169     while ( i != TIMEOUT  ) {
170       sleep(1);
171       if ( MPI_Lookup_name((char*)service.c_str(), MPI_INFO_NULL, port_name_clt) == MPI_SUCCESS ){
172         MESSAGE("[" << _numproc << "] I get the connection with " << service << " at " << port_name_clt << endl);
173         break;
174       }
175       i++;
176     }
177     if(i==TIMEOUT){
178       /* Throw exception */
179       MESSAGE("[" << _numproc << "] Error on connection with " << service << " at " << port_name_clt << endl);
180       throw POException(_numproc,"Error on connection with " + service + " at " + port_name_clt);
181     }
182   }
183   MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_ARE_FATAL);
184   
185   /* If rank 0 is server, all processes call MPI_Comm_accept */
186   /* If rank 0 is not server, all processes call MPI_Comm_connect */
187   int srv = (int)_srv[service];
188   MPI_Bcast(&srv,1,MPI_INT,0,MPI_COMM_WORLD);
189   _srv[service] = (bool)srv;
190   if ( _srv[service] )
191     MPI_Comm_accept( port_name, MPI_INFO_NULL, 0, MPI_COMM_WORLD, &(_icom[service]) );
192   else
193     MPI_Comm_connect(port_name_clt, MPI_INFO_NULL, 0, MPI_COMM_WORLD, &(_icom[service]) );
194
195   /* create global communicator: servers have low index in global communicator*/
196   MPI_Intercomm_merge(_icom[service],!_srv[service],&(_gcom[service]));
197
198   /* only rank 0 can be server for unpublish name */
199   if(_numproc != 0) _srv[service] = false;
200
201 }
202
203 void MPIObject_i::remoteMPI2Disconnect(std::string service)
204 {
205
206   if( service.size() == 0 ){
207     MESSAGE("[" << _numproc << "] You have to give a service name !");
208     throw POException(_numproc,"You have to give a service name !");
209   }
210
211   if( _srv.find(service) == _srv.end() ){
212     MESSAGE("[" << _numproc << "] service " << service << " don't exist !");
213     throw POException(_numproc,"service " + service + " don't exist !");
214   }
215
216   MPI_Comm_disconnect( &(_gcom[service]) ); 
217   if ( _srv[service] ) {
218
219     char port_name[MPI_MAX_PORT_NAME];
220     strcpy(port_name,_port_name[service].c_str());
221
222     MPI_Unpublish_name((char*)service.c_str(), MPI_INFO_NULL, port_name); 
223     MESSAGE("[" << _numproc << "] " << service << ": close port " << _port_name[service] << endl);
224     MPI_Close_port( port_name ); 
225     _port_name.erase(service);
226   }
227
228   _gcom.erase(service);
229   _icom.erase(service);
230   _srv.erase(service);
231
232 }
233 #endif
234