]> SALOME platform Git repositories - modules/kernel.git/blob - src/MPIContainer/MPIObject_i.cxx
Salome HOME
Merge from V5_1_main 14/05/2010
[modules/kernel.git] / src / MPIContainer / MPIObject_i.cxx
1 //  Copyright (C) 2007-2010  CEA/DEN, EDF R&D, OPEN CASCADE
2 //
3 //  Copyright (C) 2003-2007  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 //  CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
5 //
6 //  This library is free software; you can redistribute it and/or
7 //  modify it under the terms of the GNU Lesser General Public
8 //  License as published by the Free Software Foundation; either
9 //  version 2.1 of the License.
10 //
11 //  This library is distributed in the hope that it will be useful,
12 //  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 //  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 //  Lesser General Public License for more details.
15 //
16 //  You should have received a copy of the GNU Lesser General Public
17 //  License along with this library; if not, write to the Free Software
18 //  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
19 //
20 //  See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 //
22
23 //  SALOME MPIContainer : implemenation of container based on MPI libraries
24 //  File   : MPIObject_i.cxx
25 //  Module : SALOME
26 //
27 #include "MPIObject_i.hxx"
28 #include "utilities.h"
29 #include "Utils_SALOME_Exception.hxx"
30
31 #define TIMEOUT 5
32
33 MPIObject_i::MPIObject_i()
34 {
35   MPI_Comm_size( MPI_COMM_WORLD, &_nbproc );
36   MPI_Comm_rank( MPI_COMM_WORLD, &_numproc );
37   _tior=NULL;
38 }
39
40 MPIObject_i::MPIObject_i(int nbproc, int numproc)
41 {
42   _nbproc = nbproc;
43   _numproc = numproc;
44   _tior=NULL;
45 }
46
47 MPIObject_i::~MPIObject_i()
48 {
49   if(_tior) delete _tior;
50 }
51
52 Engines::IORTab* MPIObject_i::tior()
53 {
54   Engines::IORTab_var tior = new Engines::IORTab;
55   tior->length(_tior->length());
56   for(unsigned int ip=0;ip<tior->length();ip++)
57     tior[ip] = (*_tior)[ip];
58   return tior._retn(); 
59 };
60
61 void MPIObject_i::tior(const Engines::IORTab& ior)
62 {
63   _tior = new Engines::IORTab;
64   _tior->length(ior.length());
65   for(unsigned int ip=0;ip<ior.length();ip++)
66     (*_tior)[ip] = ior[ip];
67 }
68
69 void MPIObject_i::BCastIOR(CORBA::ORB_ptr orb, Engines::MPIObject_ptr pobj, bool amiCont)
70 {
71   int err, ip, n;
72   char *ior;
73   MPI_Status status; /* status de reception de message MPI */
74   std::ostringstream msg;
75
76   if( _numproc == 0 )
77     {
78
79       //Allocation du tableau des IOR
80       Engines::IORTab_var iort = new Engines::IORTab;
81       iort->length(_nbproc);
82       
83       iort[0] = pobj;
84
85       // Process 0 recupere les ior de l'object sur les autres process
86       for(ip=1;ip<_nbproc;ip++)
87         {
88           err = MPI_Recv(&n,1,MPI_INT,ip,ip,MPI_COMM_WORLD,&status);
89           if(err)
90             {
91               msg << "[" << _numproc << "] MPI_RECV error";
92               throw SALOME_Exception(msg.str().c_str());
93             }
94           // Allocation de la chaine de longueur n
95           ior = new char[n];
96           err = MPI_Recv(ior,n,MPI_CHAR,ip,2*ip,MPI_COMM_WORLD,&status);
97           if(err)
98             {
99               msg << "[" << _numproc << "] MPI_RECV error";
100               throw SALOME_Exception(msg.str().c_str());
101             }
102           iort[ip] = orb->string_to_object(ior);
103           delete [] ior;
104           if(CORBA::is_nil(iort[ip]))
105             {
106               msg << "[" << ip << "] MPI Component not loaded";
107               throw SALOME_Exception(msg.str().c_str());
108             }
109         }
110       // On donne le tableau des ior a l'objet Corba du process 0
111       if( amiCont )
112         tior(*(iort._retn()));
113       else
114         pobj->tior(*(iort._retn()));
115     }
116   else
117     {
118       // Conversion IOR vers string
119       ior = orb->object_to_string(pobj);
120       n = strlen(ior) + 1;
121       // On envoie l'IOR au process 0
122       err = MPI_Send(&n,1,MPI_INT,0,_numproc,MPI_COMM_WORLD);
123       if(err)
124         {
125           msg << "[" << _numproc << "] MPI_SEND error";
126           throw SALOME_Exception(msg.str().c_str());
127         }
128       err = MPI_Send(ior,n,MPI_CHAR,0,2*_numproc,MPI_COMM_WORLD);
129       if(err)
130         {
131           msg << "[" << _numproc << "] MPI_SEND error";
132           throw SALOME_Exception(msg.str().c_str());
133         }
134       CORBA::string_free(ior);
135     }
136  
137 }
138
139 #ifdef HAVE_MPI2
140 void MPIObject_i::remoteMPI2Connect(std::string service)
141 {
142   int i;
143   char port_name[MPI_MAX_PORT_NAME];
144   char port_name_clt[MPI_MAX_PORT_NAME];
145   std::ostringstream msg;
146
147   if( service.size() == 0 )
148     {
149       msg << "[" << _numproc << "] You have to give a service name !";
150       throw SALOME_Exception(msg.str().c_str());
151     }
152
153   if( _srv.find(service) != _srv.end() )
154     {
155       msg << "[" << _numproc << "] service " << service << " already exist !";
156       throw SALOME_Exception(msg.str().c_str());
157     }
158
159   _srv[service] = false;
160
161   MPI_Barrier(MPI_COMM_WORLD);
162
163   MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_RETURN);
164   if( _numproc == 0 )
165     { 
166       /* rank 0 try to be a server. If service is already published, try to be a cient */
167       MPI_Open_port(MPI_INFO_NULL, port_name); 
168       if ( MPI_Publish_name((char*)service.c_str(), MPI_INFO_NULL, port_name) == MPI_SUCCESS )
169         {
170           _srv[service] = true;
171           _port_name[service] = port_name;
172           MESSAGE("[" << _numproc << "] service " << service << " available at " << port_name << std::endl);
173         }      
174       else if ( MPI_Lookup_name((char*)service.c_str(), MPI_INFO_NULL, port_name_clt) == MPI_SUCCESS )
175         {
176           MESSAGE("[" << _numproc << "] I get the connection with " << service << " at " << port_name_clt << std::endl);
177           MPI_Close_port( port_name );
178         }
179       else
180         {
181           msg << "[" << _numproc << "] Error on connection with " << service << " at " << port_name_clt;
182           throw SALOME_Exception(msg.str().c_str());
183         }
184     }
185   else
186     {
187       i=0;
188       /* Waiting rank 0 publish name and try to be a client */
189       while ( i != TIMEOUT  ) 
190         {
191           sleep(1);
192           if ( MPI_Lookup_name((char*)service.c_str(), MPI_INFO_NULL, port_name_clt) == MPI_SUCCESS )
193             {
194               MESSAGE("[" << _numproc << "] I get the connection with " << service << " at " << port_name_clt << std::endl);
195               break;
196             }
197           i++;
198         }
199       if(i==TIMEOUT)
200         {
201           msg << "[" << _numproc << "] Error on connection with " << service << " at " << port_name_clt;
202           throw SALOME_Exception(msg.str().c_str());
203         }
204     }
205   MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_ARE_FATAL);
206   
207   /* If rank 0 is server, all processes call MPI_Comm_accept */
208   /* If rank 0 is not server, all processes call MPI_Comm_connect */
209   int srv = (int)_srv[service];
210   MPI_Bcast(&srv,1,MPI_INT,0,MPI_COMM_WORLD);
211   _srv[service] = (bool)srv;
212   if ( _srv[service] )
213     MPI_Comm_accept( port_name, MPI_INFO_NULL, 0, MPI_COMM_WORLD, &(_icom[service]) );
214   else
215     MPI_Comm_connect(port_name_clt, MPI_INFO_NULL, 0, MPI_COMM_WORLD, &(_icom[service]) );
216
217   /* create global communicator: servers have low index in global communicator*/
218   MPI_Intercomm_merge(_icom[service],!_srv[service],&(_gcom[service]));
219
220   /* only rank 0 can be server for unpublish name */
221   if(_numproc != 0) _srv[service] = false;
222
223 }
224
225 void MPIObject_i::remoteMPI2Disconnect(std::string service)
226 {
227   std::ostringstream msg;
228
229   if( service.size() == 0 )
230     {
231       msg << "[" << _numproc << "] You have to give a service name !";
232       throw SALOME_Exception(msg.str().c_str());
233     }
234
235   if( _srv.find(service) == _srv.end() )
236     {
237       msg << "[" << _numproc << "] service " << service << " don't exist !";
238       throw SALOME_Exception(msg.str().c_str());
239     }
240   
241   MPI_Comm_disconnect( &(_gcom[service]) ); 
242   if ( _srv[service] )
243     {
244
245       char port_name[MPI_MAX_PORT_NAME];
246       strcpy(port_name,_port_name[service].c_str());
247
248       MPI_Unpublish_name((char*)service.c_str(), MPI_INFO_NULL, port_name); 
249       MESSAGE("[" << _numproc << "] " << service << ": close port " << _port_name[service] << std::endl);
250       MPI_Close_port( port_name ); 
251       _port_name.erase(service);
252     }
253   
254   _gcom.erase(service);
255   _icom.erase(service);
256   _srv.erase(service);
257
258 }
259 #endif
260