Salome HOME
Typo-fix by Kunda
[modules/kernel.git] / src / MPIContainer / MPIObject_i.cxx
1 // Copyright (C) 2007-2016  CEA/DEN, EDF R&D, OPEN CASCADE
2 //
3 // Copyright (C) 2003-2007  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 // CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
5 //
6 // This library is free software; you can redistribute it and/or
7 // modify it under the terms of the GNU Lesser General Public
8 // License as published by the Free Software Foundation; either
9 // version 2.1 of the License, or (at your option) any later version.
10 //
11 // This library is distributed in the hope that it will be useful,
12 // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 // Lesser General Public License for more details.
15 //
16 // You should have received a copy of the GNU Lesser General Public
17 // License along with this library; if not, write to the Free Software
18 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
19 //
20 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 //
22
23 //  SALOME MPIContainer : implementation of container based on MPI libraries
24 //  File   : MPIObject_i.cxx
25 //  Module : SALOME
26 //
27 #include "MPIObject_i.hxx"
28 #include "utilities.h"
29 #include "Utils_SALOME_Exception.hxx"
30
31 #define TIMEOUT 5
32
33 #ifndef WIN32
34 #include <unistd.h>
35 #endif
36
37 MPIObject_i::MPIObject_i()
38 {
39   MPI_Comm_size( MPI_COMM_WORLD, &_nbproc );
40   MPI_Comm_rank( MPI_COMM_WORLD, &_numproc );
41   _tior=NULL;
42 }
43
44 MPIObject_i::~MPIObject_i()
45 {
46   if(_tior) delete _tior;
47 }
48
49 Engines::IORTab* MPIObject_i::tior()
50 {
51   Engines::IORTab_var tior = new Engines::IORTab;
52   tior->length(_tior->length());
53   for(unsigned int ip=0;ip<tior->length();ip++)
54     tior[ip] = (*_tior)[ip];
55   return tior._retn(); 
56 };
57
58 void MPIObject_i::tior(const Engines::IORTab& ior)
59 {
60   _tior = new Engines::IORTab;
61   _tior->length(ior.length());
62   for(unsigned int ip=0;ip<ior.length();ip++)
63     (*_tior)[ip] = ior[ip];
64 }
65
66 void MPIObject_i::BCastIOR(CORBA::ORB_ptr orb, Engines::MPIObject_ptr pobj, bool amiCont)
67 {
68   int err, ip, n;
69   char *ior;
70   MPI_Status status; /* status de reception de message MPI */
71   std::ostringstream msg;
72
73   if( _numproc == 0 )
74     {
75
76       //Allocation du tableau des IOR
77       Engines::IORTab_var iort = new Engines::IORTab;
78       iort->length(_nbproc);
79       
80       iort[0] = pobj;
81
82       // Process 0 recupere les ior de l'object sur les autres process
83       for(ip=1;ip<_nbproc;ip++)
84         {
85           err = MPI_Recv(&n,1,MPI_INT,ip,ip,MPI_COMM_WORLD,&status);
86           if(err)
87             {
88               msg << "[" << _numproc << "] MPI_RECV error";
89               throw SALOME_Exception(msg.str().c_str());
90             }
91           // Allocation de la chaine de longueur n
92           ior = new char[n];
93           err = MPI_Recv(ior,n,MPI_CHAR,ip,2*ip,MPI_COMM_WORLD,&status);
94           if(err)
95             {
96               msg << "[" << _numproc << "] MPI_RECV error";
97               throw SALOME_Exception(msg.str().c_str());
98             }
99           iort[ip] = orb->string_to_object(ior);
100           delete [] ior;
101           if(CORBA::is_nil(iort[ip]))
102             {
103               msg << "[" << ip << "] MPI Component not loaded";
104               throw SALOME_Exception(msg.str().c_str());
105             }
106         }
107       // On donne le tableau des ior a l'objet Corba du process 0
108       if( amiCont )
109         tior(*(iort._retn()));
110       else
111         pobj->tior(*(iort._retn()));
112     }
113   else
114     {
115       // Conversion IOR vers string
116       ior = orb->object_to_string(pobj);
117       n = strlen(ior) + 1;
118       // On envoie l'IOR au process 0
119       err = MPI_Send(&n,1,MPI_INT,0,_numproc,MPI_COMM_WORLD);
120       if(err)
121         {
122           msg << "[" << _numproc << "] MPI_SEND error";
123           throw SALOME_Exception(msg.str().c_str());
124         }
125       err = MPI_Send(ior,n,MPI_CHAR,0,2*_numproc,MPI_COMM_WORLD);
126       if(err)
127         {
128           msg << "[" << _numproc << "] MPI_SEND error";
129           throw SALOME_Exception(msg.str().c_str());
130         }
131       CORBA::string_free(ior);
132     }
133  
134 }
135
136 #ifdef HAVE_MPI2
137 void MPIObject_i::remoteMPI2Connect(std::string service)
138 {
139   int i;
140   char port_name[MPI_MAX_PORT_NAME];
141   char port_name_clt[MPI_MAX_PORT_NAME];
142   MPI_Info info;
143   std::ostringstream msg;
144
145   if( service.size() == 0 )
146     {
147       msg << "[" << _numproc << "] You have to give a service name !";
148       throw SALOME_Exception(msg.str().c_str());
149     }
150
151   if( _srv.find(service) != _srv.end() )
152     {
153       msg << "[" << _numproc << "] service " << service << " already exist !";
154       throw SALOME_Exception(msg.str().c_str());
155     }
156
157   _srv[service] = false;
158
159   MPI_Barrier(MPI_COMM_WORLD);
160
161   MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_RETURN);
162   MPI_Info_create(&info);
163   MPI_Info_set(info, "ompi_unique", "true");
164   if( _numproc == 0 )
165     { 
166       /* rank 0 try to be a server. If service is already published, try to be a cient */
167       MPI_Open_port(MPI_INFO_NULL, port_name); 
168       if ( MPI_Lookup_name((char*)service.c_str(), MPI_INFO_NULL, port_name_clt) == MPI_SUCCESS )
169         {
170           MESSAGE("[" << _numproc << "] I get the connection with " << service << " at " << port_name_clt << std::endl);
171           MPI_Close_port( port_name );
172         }
173       else if ( MPI_Publish_name((char*)service.c_str(), info, port_name) == MPI_SUCCESS )
174         {
175           _srv[service] = true;
176           _port_name[service] = port_name;
177           MESSAGE("[" << _numproc << "] service " << service << " available at " << port_name << std::endl);
178         }      
179       else if ( MPI_Lookup_name((char*)service.c_str(), MPI_INFO_NULL, port_name_clt) == MPI_SUCCESS )
180         {
181           MESSAGE("[" << _numproc << "] I get the connection with " << service << " at " << port_name_clt << std::endl);
182           MPI_Close_port( port_name );
183         }
184       else
185         {
186           msg << "[" << _numproc << "] Error on connection with " << service << " at " << port_name_clt;
187           throw SALOME_Exception(msg.str().c_str());
188         }
189     }
190   else
191     {
192       i=0;
193       /* Waiting rank 0 publish name and try to be a client */
194       while ( i != TIMEOUT  ) 
195         {
196           sleep(1);
197           if ( MPI_Lookup_name((char*)service.c_str(), MPI_INFO_NULL, port_name_clt) == MPI_SUCCESS )
198             {
199               MESSAGE("[" << _numproc << "] I get the connection with " << service << " at " << port_name_clt << std::endl);
200               break;
201             }
202           i++;
203         }
204       if(i==TIMEOUT)
205         {
206           msg << "[" << _numproc << "] Error on connection with " << service << " at " << port_name_clt;
207           throw SALOME_Exception(msg.str().c_str());
208         }
209     }
210   MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_ARE_FATAL);
211   
212   /* If rank 0 is server, all processes call MPI_Comm_accept */
213   /* If rank 0 is not server, all processes call MPI_Comm_connect */
214   int srv = (int)_srv[service];
215   MPI_Bcast(&srv,1,MPI_INT,0,MPI_COMM_WORLD);
216   _srv[service] = (bool)srv;
217   if ( _srv[service] )
218     MPI_Comm_accept( port_name, MPI_INFO_NULL, 0, MPI_COMM_WORLD, &(_icom[service]) );
219   else
220     MPI_Comm_connect(port_name_clt, MPI_INFO_NULL, 0, MPI_COMM_WORLD, &(_icom[service]) );
221
222   /* create global communicator: servers have low index in global communicator*/
223   MPI_Intercomm_merge(_icom[service],!_srv[service],&(_gcom[service]));
224
225   /* only rank 0 can be server for unpublish name */
226   if(_numproc != 0) _srv[service] = false;
227
228 }
229
230 void MPIObject_i::remoteMPI2Disconnect(std::string service)
231 {
232   std::ostringstream msg;
233
234   if( service.size() == 0 )
235     {
236       msg << "[" << _numproc << "] You have to give a service name !";
237       throw SALOME_Exception(msg.str().c_str());
238     }
239
240   if( _srv.find(service) == _srv.end() )
241     {
242       msg << "[" << _numproc << "] service " << service << " don't exist !";
243       throw SALOME_Exception(msg.str().c_str());
244     }
245   
246   MPI_Comm_disconnect( &(_gcom[service]) ); 
247   if ( _srv[service] )
248     {
249
250       char port_name[MPI_MAX_PORT_NAME];
251       strcpy(port_name,_port_name[service].c_str());
252
253       MPI_Unpublish_name((char*)service.c_str(), MPI_INFO_NULL, port_name); 
254       MESSAGE("[" << _numproc << "] " << service << ": close port " << _port_name[service] << std::endl);
255       MPI_Close_port( port_name ); 
256       _port_name.erase(service);
257     }
258   
259   _gcom.erase(service);
260   _icom.erase(service);
261   _srv.erase(service);
262
263 }
264 #endif
265