Salome HOME
[EDF29150] : Base mechanism to keep track of CPU and Mem info
[modules/kernel.git] / src / MPIContainer / MPIObject_i.cxx
1 // Copyright (C) 2007-2023  CEA, EDF, OPEN CASCADE
2 //
3 // Copyright (C) 2003-2007  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 // CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
5 //
6 // This library is free software; you can redistribute it and/or
7 // modify it under the terms of the GNU Lesser General Public
8 // License as published by the Free Software Foundation; either
9 // version 2.1 of the License, or (at your option) any later version.
10 //
11 // This library is distributed in the hope that it will be useful,
12 // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 // Lesser General Public License for more details.
15 //
16 // You should have received a copy of the GNU Lesser General Public
17 // License along with this library; if not, write to the Free Software
18 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
19 //
20 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 //
22
23 #include "MPIObject_i.hxx"
24 #include "utilities.h"
25 #include "Utils_SALOME_Exception.hxx"
26 #include "Basics_MpiUtils.hxx"
27
28 #define TIMEOUT 5
29
30 #ifndef WIN32
31 #include <unistd.h>
32 #endif
33
34 MPIObject_i::MPIObject_i()
35 {
36   MPI_Comm_size( MPI_COMM_WORLD, &_nbproc );
37   MPI_Comm_rank( MPI_COMM_WORLD, &_numproc );
38   _tior=NULL;
39 }
40
41 MPIObject_i::~MPIObject_i()
42 {
43   if(_tior) delete _tior;
44 }
45
46 Engines::IORTab* MPIObject_i::tior()
47 {
48   Engines::IORTab_var tior = new Engines::IORTab;
49   tior->length(_tior->length());
50   for(unsigned int ip=0;ip<tior->length();ip++)
51     tior[ip] = (*_tior)[ip];
52   return tior._retn(); 
53 }
54
55 void MPIObject_i::tior(const Engines::IORTab& ior)
56 {
57   _tior = new Engines::IORTab;
58   _tior->length(ior.length());
59   for(unsigned int ip=0;ip<ior.length();ip++)
60     (*_tior)[ip] = ior[ip];
61 }
62
63 void MPIObject_i::BCastIOR(CORBA::ORB_ptr orb, Engines::MPIObject_ptr pobj, bool amiCont)
64 {
65   int err, ip, n;
66   char *ior;
67   MPI_Status status; /* status de reception de message MPI */
68   std::ostringstream msg;
69
70   if( _numproc == 0 )
71     {
72
73       //Allocation du tableau des IOR
74       Engines::IORTab_var iort = new Engines::IORTab;
75       iort->length(_nbproc);
76       
77       iort[0] = pobj;
78
79       // Process 0 recupere les ior de l'object sur les autres process
80       for(ip=1;ip<_nbproc;ip++)
81         {
82           err = MPI_Recv(&n,1,MPI_INT,ip,ip,MPI_COMM_WORLD,&status);
83           if(err)
84             {
85               msg << "[" << _numproc << "] MPI_RECV error";
86               throw SALOME_Exception(msg.str().c_str());
87             }
88           // Allocation de la chaine de longueur n
89           ior = new char[n];
90           err = MPI_Recv(ior,n,MPI_CHAR,ip,2*ip,MPI_COMM_WORLD,&status);
91           if(err)
92             {
93               msg << "[" << _numproc << "] MPI_RECV error";
94               throw SALOME_Exception(msg.str().c_str());
95             }
96           iort[ip] = orb->string_to_object(ior);
97           delete [] ior;
98           if(CORBA::is_nil(iort[ip]))
99             {
100               msg << "[" << ip << "] MPI Component not loaded";
101               throw SALOME_Exception(msg.str().c_str());
102             }
103         }
104       // On donne le tableau des ior a l'objet Corba du process 0
105       if( amiCont )
106         tior(*(iort._retn()));
107       else
108         pobj->tior(*(iort._retn()));
109     }
110   else
111     {
112       // Conversion IOR vers string
113       ior = orb->object_to_string(pobj);
114       n = strlen(ior) + 1;
115       // On envoie l'IOR au process 0
116       err = MPI_Send(&n,1,MPI_INT,0,_numproc,MPI_COMM_WORLD);
117       if(err)
118         {
119           msg << "[" << _numproc << "] MPI_SEND error";
120           throw SALOME_Exception(msg.str().c_str());
121         }
122       err = MPI_Send(ior,n,MPI_CHAR,0,2*_numproc,MPI_COMM_WORLD);
123       if(err)
124         {
125           msg << "[" << _numproc << "] MPI_SEND error";
126           throw SALOME_Exception(msg.str().c_str());
127         }
128       CORBA::string_free(ior);
129     }
130  
131 }
132
133 #ifdef HAVE_MPI2
134 void MPIObject_i::remoteMPI2Connect(std::string service)
135 {
136   int i;
137   char port_name[MPI_MAX_PORT_NAME];
138   char port_name_clt[MPI_MAX_PORT_NAME];
139   MPI_Info info;
140   std::ostringstream msg;
141
142   if( service.size() == 0 )
143     {
144       msg << "[" << _numproc << "] You have to give a service name !";
145       throw SALOME_Exception(msg.str().c_str());
146     }
147
148   if( _srv.find(service) != _srv.end() )
149     {
150       msg << "[" << _numproc << "] service " << service << " already exist !";
151       throw SALOME_Exception(msg.str().c_str());
152     }
153
154   _srv[service] = false;
155
156   MPI_Barrier(MPI_COMM_WORLD);
157   MPI_ERROR_HANDLER(MPI_ERRORS_RETURN);
158   MPI_Info_create(&info);
159   MPI_Info_set(info, "ompi_unique", "true");
160   if( _numproc == 0 )
161     { 
162       /* rank 0 try to be a server. If service is already published, try to be a cient */
163       MPI_Open_port(MPI_INFO_NULL, port_name); 
164       if ( MPI_Lookup_name((char*)service.c_str(), MPI_INFO_NULL, port_name_clt) == MPI_SUCCESS )
165         {
166           MESSAGE("[" << _numproc << "] I get the connection with " << service << " at " << port_name_clt << std::endl);
167           MPI_Close_port( port_name );
168         }
169       else if ( MPI_Publish_name((char*)service.c_str(), info, port_name) == MPI_SUCCESS )
170         {
171           _srv[service] = true;
172           _port_name[service] = port_name;
173           MESSAGE("[" << _numproc << "] service " << service << " available at " << port_name << std::endl);
174         }      
175       else if ( MPI_Lookup_name((char*)service.c_str(), MPI_INFO_NULL, port_name_clt) == MPI_SUCCESS )
176         {
177           MESSAGE("[" << _numproc << "] I get the connection with " << service << " at " << port_name_clt << std::endl);
178           MPI_Close_port( port_name );
179         }
180       else
181         {
182           msg << "[" << _numproc << "] Error on connection with " << service << " at " << port_name_clt;
183           throw SALOME_Exception(msg.str().c_str());
184         }
185     }
186   else
187     {
188       i=0;
189       /* Waiting rank 0 publish name and try to be a client */
190       while ( i != TIMEOUT  ) 
191         {
192           sleep(1);
193           if ( MPI_Lookup_name((char*)service.c_str(), MPI_INFO_NULL, port_name_clt) == MPI_SUCCESS )
194             {
195               MESSAGE("[" << _numproc << "] I get the connection with " << service << " at " << port_name_clt << std::endl);
196               break;
197             }
198           i++;
199         }
200       if(i==TIMEOUT)
201         {
202           msg << "[" << _numproc << "] Error on connection with " << service << " at " << port_name_clt;
203           throw SALOME_Exception(msg.str().c_str());
204         }
205     }
206   MPI_ERROR_HANDLER(MPI_ERRORS_ARE_FATAL);
207   
208   /* If rank 0 is server, all processes call MPI_Comm_accept */
209   /* If rank 0 is not server, all processes call MPI_Comm_connect */
210   int srv = (int)_srv[service];
211   MPI_Bcast(&srv,1,MPI_INT,0,MPI_COMM_WORLD);
212   _srv[service] = (bool)srv;
213   if ( _srv[service] )
214     MPI_Comm_accept( port_name, MPI_INFO_NULL, 0, MPI_COMM_WORLD, &(_icom[service]) );
215   else
216     MPI_Comm_connect(port_name_clt, MPI_INFO_NULL, 0, MPI_COMM_WORLD, &(_icom[service]) );
217
218   /* create global communicator: servers have low index in global communicator*/
219   MPI_Intercomm_merge(_icom[service],!_srv[service],&(_gcom[service]));
220
221   /* only rank 0 can be server for unpublish name */
222   if(_numproc != 0) _srv[service] = false;
223
224 }
225
226 void MPIObject_i::remoteMPI2Disconnect(std::string service)
227 {
228   std::ostringstream msg;
229
230   if( service.size() == 0 )
231     {
232       msg << "[" << _numproc << "] You have to give a service name !";
233       throw SALOME_Exception(msg.str().c_str());
234     }
235
236   if( _srv.find(service) == _srv.end() )
237     {
238       msg << "[" << _numproc << "] service " << service << " don't exist !";
239       throw SALOME_Exception(msg.str().c_str());
240     }
241   
242   MPI_Comm_disconnect( &(_gcom[service]) ); 
243   if ( _srv[service] )
244     {
245
246       char port_name[MPI_MAX_PORT_NAME];
247       strcpy(port_name,_port_name[service].c_str());
248
249       MPI_Unpublish_name((char*)service.c_str(), MPI_INFO_NULL, port_name); 
250       MESSAGE("[" << _numproc << "] " << service << ": close port " << _port_name[service] << std::endl);
251       MPI_Close_port( port_name ); 
252       _port_name.erase(service);
253     }
254   
255   _gcom.erase(service);
256   _icom.erase(service);
257   _srv.erase(service);
258
259 }
260 #endif
261