]> SALOME platform Git repositories - modules/kernel.git/blob - src/MPIContainer/MPIContainer_i.cxx
Salome HOME
PR: merge from tag mergeto_trunk_20Jan05
[modules/kernel.git] / src / MPIContainer / MPIContainer_i.cxx
1 //  SALOME MPIContainer : implemenation of container based on MPI libraries
2 //
3 //  Copyright (C) 2003  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 //  CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS 
5 // 
6 //  This library is free software; you can redistribute it and/or 
7 //  modify it under the terms of the GNU Lesser General Public 
8 //  License as published by the Free Software Foundation; either 
9 //  version 2.1 of the License. 
10 // 
11 //  This library is distributed in the hope that it will be useful, 
12 //  but WITHOUT ANY WARRANTY; without even the implied warranty of 
13 //  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
14 //  Lesser General Public License for more details. 
15 // 
16 //  You should have received a copy of the GNU Lesser General Public 
17 //  License along with this library; if not, write to the Free Software 
18 //  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA 
19 // 
20 //  See http://www.opencascade.org/SALOME/ or email : webmaster.salome@opencascade.org 
21 //
22 //
23 //
24 //  File   : MPIContainer_i.cxx
25 //  Module : SALOME
26
27 #include <iostream.h>
28 #include <dlfcn.h>
29 #include <stdio.h>
30 #include "MPIContainer_i.hxx"
31 #include "SALOME_NamingService.hxx"
32 #include "Utils_SINGLETON.hxx"
33 #include "OpUtil.hxx"
34 #include "utilities.h"
35 using namespace std;
36
37 // L'appel au registry SALOME ne se fait que pour le process 0
38 Engines_MPIContainer_i::Engines_MPIContainer_i(int nbproc, int numproc,
39                                                CORBA::ORB_ptr orb, 
40                                                PortableServer::POA_ptr poa,
41                                                char * containerName,
42                                                int argc, char *argv[]) 
43   : Engines_Container_i(orb,poa,containerName,argc,argv,false), MPIObject_i(nbproc,numproc)
44 {
45   MESSAGE("[" << numproc << "] activate object");
46   _id = _poa->activate_object(this);
47
48   if(numproc==0){
49
50     //   _NS = new SALOME_NamingService(_orb);
51     _NS = SINGLETON_<SALOME_NamingService>::Instance() ;
52     ASSERT(SINGLETON_<SALOME_NamingService>::IsAlreadyExisting()) ;
53     _NS->init_orb( orb ) ;
54
55     Engines::Container_ptr pCont 
56       = Engines::Container::_narrow(POA_Engines::MPIContainer::_this());
57     SCRUTE(_containerName);
58     _NS->Register(pCont, _containerName.c_str());
59   }
60
61   // Root recupere les ior des container des autre process
62   Engines::MPIObject_var pobj = POA_Engines::MPIContainer::_this();
63   BCastIOR(_orb,pobj,true);
64 }
65
66 Engines_MPIContainer_i::Engines_MPIContainer_i(int nbproc, int numproc) 
67   : Engines_Container_i(), MPIObject_i(nbproc,numproc)
68 {
69 }
70
71 Engines_MPIContainer_i::~Engines_MPIContainer_i(void)
72 {
73   MESSAGE("[" << _numproc << "] Engines_MPIContainer_i::~Engines_MPIContainer_i()");
74   if( !handle_map.empty() ){
75     MESSAGE("[" << _numproc << "] Engines_MPIContainer_i::~Engines_MPIContainer_i: warning destroy a not empty container");
76   }
77 }
78
79 // Start MPI Container
80 Engines::MPIContainer_ptr Engines_MPIContainer_i::start_MPIimpl(
81                                          const char* ContainerName,
82                                          CORBA::Short nbproc )
83 {
84
85   char nbp[1024];
86
87   MESSAGE("[" << _numproc << "] start_impl argc " << _argc << " ContainerName " << ContainerName
88           << hex << this << dec) ;
89   _numInstanceMutex.lock() ; // lock on the instance number
90
91   CORBA::Object_var obj = Engines::MPIContainer::_nil() ;
92   bool nilvar = true ;
93   try {
94     string cont("/Containers/");
95     cont += machineName() ;
96     cont += "/" ;
97     cont += ContainerName;
98     INFOS("[" << _numproc << "] " << machineName() << " start_impl unknown container " << cont.c_str()
99           << " try to Resolve" );
100     obj = _NS->Resolve( cont.c_str() );
101     nilvar = CORBA::is_nil( obj ) ;
102     if ( nilvar ) {
103       INFOS("[" << _numproc << "] " << machineName() << " start_impl unknown container "
104             << ContainerName);
105     }
106   }
107   catch (ServiceUnreachable&) {
108     INFOS("[" << _numproc << "] " << machineName() << "Caught exception: Naming Service Unreachable");
109   }
110   catch (...) {
111     INFOS("[" << _numproc << "] " << machineName() << "Caught unknown exception.");
112   }
113   if ( !nilvar ) {
114     _numInstanceMutex.unlock() ;
115     MESSAGE("[" << _numproc << "] start_impl container found without new launch") ;
116     return Engines::MPIContainer::_narrow(obj);
117   }
118   int i = 0 ;
119   while ( _argv[ i ] ) {
120     MESSAGE("[" << _numproc << "]            argv" << i << " " << _argv[ i ]) ;
121     i++ ;
122   }
123   sprintf(nbp,"mpirun -np %d SALOME_MPIContainer ",nbproc);
124   string shstr(nbp);
125   shstr += ContainerName ;
126   if ( _argc == 4 ) {
127     shstr += " " ;
128     shstr += _argv[ 2 ] ;
129     shstr += " " ;
130     shstr += _argv[ 3 ] ;
131   }
132   shstr += " > /tmp/" ;
133   shstr += ContainerName ;
134   shstr += ".log 2>&1 &" ;
135   MESSAGE("system(" << shstr << ")") ;
136   int status = system( shstr.c_str() ) ;
137   if (status == -1) {
138     INFOS("[" << _numproc << "] Engines_MPIContainer_i::start_impl SALOME_MPIContainer failed (system command status -1)") ;
139   }
140   else if (status == 217) {
141     INFOS("[" << _numproc << "] Engines_MPIContainer_i::start_impl SALOME_MPIContainer failed (system command status 217)") ;
142   }
143   INFOS("[" << _numproc << "] " << machineName() << " Engines_MPIContainer_i::start_impl SALOME_MPIContainer launch done");
144
145   obj = Engines::MPIContainer::_nil() ;
146   try {
147     string cont("/Containers/");
148     cont += machineName() ;
149     cont += "/" ;
150     cont += ContainerName;
151     nilvar = true ;
152     int count = 20 ;
153     while ( nilvar && count >= 0) {
154       sleep( 1 ) ;
155       obj = _NS->Resolve(cont.c_str());
156       nilvar = CORBA::is_nil( obj ) ;
157       if ( nilvar ) {
158         INFOS("[" << _numproc << "] " << count << ". " << machineName()
159               << " start_impl unknown container " << cont.c_str());
160         count -= 1 ;
161       }
162     }
163     _numInstanceMutex.unlock() ;
164     if ( !nilvar ) {
165       MESSAGE("[" << _numproc << "] start_impl container found after new launch of SALOME_MPIContainer") ;
166     }
167     return Engines::MPIContainer::_narrow(obj);
168   }
169   catch (ServiceUnreachable&) {
170     INFOS("[" << _numproc << "] " << machineName() << "Caught exception: Naming Service Unreachable");
171   }
172   catch (...) {
173     INFOS("[" << _numproc << "] " << machineName() << "Caught unknown exception.");
174   }
175   _numInstanceMutex.unlock() ;
176   MESSAGE("[" << _numproc << "] start_impl MPI container not found after new launch of SALOME_MPIContainer") ;
177   return Engines::MPIContainer::_nil() ;
178 }
179
180 // Load component
181 Engines::Component_ptr Engines_MPIContainer_i::load_impl(const char* nameToRegister,
182                                                  const char* componentName)
183 {
184   int ip;
185
186   if( _numproc == 0 ){
187     // Invocation du chargement du composant dans les autres process
188     for(ip= 1;ip<_nbproc;ip++)
189       (Engines::MPIContainer::_narrow((*_tior)[ip]))->SPload_impl(nameToRegister,
190                                                                 componentName);
191   }
192
193   return Lload_impl(nameToRegister,componentName);
194
195 }
196
197 // Load component
198 void Engines_MPIContainer_i::SPload_impl(const char* nameToRegister,
199                                          const char* componentName)
200 {
201   Lload_impl(nameToRegister,componentName);
202 }
203
204 Engines::Component_ptr Engines_MPIContainer_i::Lload_impl(
205                                    const char* nameToRegister,
206                                    const char* componentName)
207 {
208   Engines::Component_var iobject;
209   Engines::MPIObject_var pobj;
210   char cproc[4];
211
212   sprintf(cproc,"_%d",_numproc);
213
214   BEGIN_OF("[" << _numproc << "] MPIContainer_i::Lload_impl");
215
216   _numInstanceMutex.lock() ; // lock on the instance number
217   _numInstance++ ;
218   char _aNumI[12];
219   sprintf(_aNumI,"%d",_numInstance) ;
220
221   string _impl_name = componentName;
222   string _nameToRegister = nameToRegister;
223   string instanceName = _nameToRegister + "_inst_" + _aNumI + cproc;
224   MESSAGE("[" << _numproc << "] instanceName=" << instanceName);
225
226   string absolute_impl_name(_impl_name);
227   MESSAGE("[" << _numproc << "] absolute_impl_name=" << absolute_impl_name);
228   void * handle = dlopen(absolute_impl_name.c_str(), RTLD_LAZY);
229   if(!handle){
230     INFOS("[" << _numproc << "] Can't load shared library : " << absolute_impl_name);
231     INFOS("[" << _numproc << "] error dlopen: " << dlerror());
232     return Engines::Component::_nil() ;
233   }
234
235   string factory_name = _nameToRegister + string("Engine_factory");
236   MESSAGE("[" << _numproc << "] factory_name=" << factory_name) ;
237
238   PortableServer::ObjectId * (*MPIComponent_factory) (int,int,
239                                                   CORBA::ORB_ptr,
240                                                   PortableServer::POA_ptr,
241                                                   PortableServer::ObjectId *,
242                                                   const char *,
243                                                   const char *) =
244     (PortableServer::ObjectId * (*) (int,int,
245                                      CORBA::ORB_ptr,
246                                      PortableServer::POA_ptr, 
247                                      PortableServer::ObjectId *, 
248                                      const char *, 
249                                      const char *)) 
250     dlsym(handle, factory_name.c_str());
251
252   char *error ;
253   if ((error = dlerror()) != NULL){
254     // Try to load a sequential component
255     MESSAGE("[" << _numproc << "] Try to load a sequential component");
256     _numInstanceMutex.unlock() ;
257     iobject = Engines_Container_i::load_impl(nameToRegister,componentName);
258     if( CORBA::is_nil(iobject) ) return Engines::Component::_duplicate(iobject);
259   }
260   else{
261     // Instanciation du composant parallele
262     MESSAGE("[" << _numproc << "] Try to load a parallel component");
263     PortableServer::ObjectId * id = (MPIComponent_factory)
264       (_nbproc,_numproc,_orb, _poa, _id, instanceName.c_str(), _nameToRegister.c_str());
265     // get reference from id
266     CORBA::Object_var o = _poa->id_to_reference(*id);
267     pobj = Engines::MPIObject::_narrow(o) ;
268     iobject = Engines::Component::_narrow(o) ;
269   }
270
271   if( _numproc == 0 ){
272     // utiliser + tard le registry ici :
273     // register the engine under the name containerName.dir/nameToRegister.object
274     string component_registerName = _containerName + "/" + _nameToRegister;
275     _NS->Register(iobject, component_registerName.c_str()) ;
276   }
277
278   handle_map[instanceName] = handle;
279   _numInstanceMutex.unlock() ;
280
281   // Root recupere les ior des composants des autre process
282   BCastIOR(_orb,pobj,false);
283
284   END_OF("[" <<_numproc << "] MPIContainer_i::Lload_impl");
285   return Engines::Component::_duplicate(iobject);
286
287 }
288
289 void Engines_MPIContainer_i::remove_impl(Engines::Component_ptr component_i)
290 {
291   int ip;
292   Engines::Component_ptr cptr;
293   Engines::MPIObject_ptr pcptr;
294   Engines::MPIObject_ptr spcptr;
295
296   ASSERT(! CORBA::is_nil(component_i));
297
298   if( _numproc == 0 ){
299     // Invocation de la destruction du composant dans les autres process
300     pcptr = (Engines::MPIObject_ptr)component_i;
301     for(ip= 1;ip<_nbproc;ip++){
302       spcptr = Engines::MPIObject::_narrow((*(pcptr->tior()))[ip]);
303       cptr = (Engines::Component_ptr)spcptr;
304       (Engines::MPIContainer::_narrow((*_tior)[ip]))->SPremove_impl(cptr);
305     }
306   }
307
308   Lremove_impl(component_i);
309 }
310
311 void Engines_MPIContainer_i::SPremove_impl(Engines::Component_ptr component_i)
312 {
313   Lremove_impl(component_i);
314 }
315
316 void Engines_MPIContainer_i::Lremove_impl(Engines::Component_ptr component_i)
317 {
318   int ip;
319   Engines::Component_ptr cptr;
320   Engines::MPIObject_ptr pcptr;
321   Engines::MPIObject_ptr spcptr;
322
323   BEGIN_OF("[" << _numproc << "] MPIContainer_i::Lremove_impl");
324
325   ASSERT(! CORBA::is_nil(component_i));
326
327   string instanceName = component_i->instanceName() ;
328   MESSAGE("[" << _numproc << "] unload component " << instanceName);
329   component_i->destroy() ;
330   MESSAGE("[" << _numproc << "] test key handle_map");
331   _numInstanceMutex.lock() ; // lock on the remove on handle_map
332   if (handle_map[instanceName]) // if key does not exist, created & initialized null
333     {
334       remove_map[instanceName] = handle_map[instanceName] ;
335     }
336   else MESSAGE("[" << _numproc << "] no key handle_map");
337   handle_map.erase(instanceName) ;   
338   _numInstanceMutex.unlock() ;
339   MESSAGE("[" << _numproc << "] list handle_map");
340   map<string, void *>::iterator im ;
341   for (im = handle_map.begin() ; im != handle_map.end() ; im ++)
342     {
343       MESSAGE("[" << _numproc << "] stay " << (*im).first);
344     }
345
346   END_OF("[" << _numproc << "] MPIContainer_i::Lremove_impl");
347
348 }
349
350 void Engines_MPIContainer_i::finalize_removal()
351 {
352   int ip;
353
354   if( _numproc == 0 ){
355     // Invocation de la destruction du composant dans les autres process
356     for(ip= 1;ip<_nbproc;ip++)
357       (Engines::MPIContainer::_narrow((*_tior)[ip]))->SPfinalize_removal();
358   }
359
360   Lfinalize_removal();
361 }
362
363 void Engines_MPIContainer_i::SPfinalize_removal()
364 {
365   Lfinalize_removal();
366 }
367
368 void Engines_MPIContainer_i::Lfinalize_removal()
369 {
370   BEGIN_OF("[" << _numproc << "] MPIContainer_i::Lfinalize_removal");
371
372   map<string, void *>::iterator im ;
373   // lock on the explore remove_map & dlclose
374   _numInstanceMutex.lock() ; 
375   for (im = remove_map.begin() ; im != remove_map.end() ; im ++)
376     {
377       void * handle = (*im).second ;
378       MESSAGE("[" << _numproc << "] dlclose " << (*im).first);
379       dlclose(handle) ;
380     }
381   MESSAGE("[" << _numproc << "] remove_map.clear()");
382   remove_map.clear() ;  
383   _numInstanceMutex.unlock() ;
384
385   END_OF("[" << _numproc << "] MPIContainer_i::Lfinalize_removal");
386 }