Salome HOME
PR: merge some files (MPI) from debug_V2_1_0a2
[modules/kernel.git] / src / MPIContainer / MPIContainer_i.cxx
1 //  SALOME MPIContainer : implemenation of container based on MPI libraries
2 //
3 //  Copyright (C) 2003  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 //  CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS 
5 // 
6 //  This library is free software; you can redistribute it and/or 
7 //  modify it under the terms of the GNU Lesser General Public 
8 //  License as published by the Free Software Foundation; either 
9 //  version 2.1 of the License. 
10 // 
11 //  This library is distributed in the hope that it will be useful, 
12 //  but WITHOUT ANY WARRANTY; without even the implied warranty of 
13 //  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
14 //  Lesser General Public License for more details. 
15 // 
16 //  You should have received a copy of the GNU Lesser General Public 
17 //  License along with this library; if not, write to the Free Software 
18 //  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA 
19 // 
20 //  See http://www.opencascade.org/SALOME/ or email : webmaster.salome@opencascade.org 
21 //
22 //
23 //
24 //  File   : MPIContainer_i.cxx
25 //  Module : SALOME
26
27 #include <iostream.h>
28 #include <dlfcn.h>
29 #include <stdio.h>
30 #include "MPIContainer_i.hxx"
31 #include "SALOME_NamingService.hxx"
32 #include "Utils_SINGLETON.hxx"
33 #include "OpUtil.hxx"
34 #include "utilities.h"
35 using namespace std;
36
37 // L'appel au registry SALOME ne se fait que pour le process 0
38 Engines_MPIContainer_i::Engines_MPIContainer_i(int nbproc, int numproc,
39                                                CORBA::ORB_ptr orb, 
40                                                PortableServer::POA_ptr poa,
41                                                char * containerName,
42                                                int argc, char *argv[]) 
43   : Engines_Container_i(orb,poa,containerName,argc,argv,false), MPIObject_i(nbproc,numproc)
44 {
45   MESSAGE("[" << numproc << "] activate object");
46   _id = _poa->activate_object(this);
47   this->_add_ref();
48
49   if(numproc==0){
50
51     //   _NS = new SALOME_NamingService(_orb);
52     _NS = SINGLETON_<SALOME_NamingService>::Instance() ;
53     ASSERT(SINGLETON_<SALOME_NamingService>::IsAlreadyExisting()) ;
54     _NS->init_orb( orb ) ;
55
56 //     Engines::Container_ptr pCont 
57 //       = Engines::Container::_narrow(POA_Engines::MPIContainer::_this());
58     Engines::Container_ptr pCont = Engines::Container::_narrow(_poa->id_to_reference(*_id));
59     SCRUTE(_containerName);
60     _NS->Register(pCont, _containerName.c_str());
61   }
62
63   // Root recupere les ior des container des autre process
64   Engines::MPIObject_var pobj = POA_Engines::MPIContainer::_this();
65   BCastIOR(_orb,pobj,true);
66 }
67
68 Engines_MPIContainer_i::Engines_MPIContainer_i(int nbproc, int numproc) 
69   : Engines_Container_i(), MPIObject_i(nbproc,numproc)
70 {
71 }
72
73 Engines_MPIContainer_i::~Engines_MPIContainer_i(void)
74 {
75   MESSAGE("[" << _numproc << "] Engines_MPIContainer_i::~Engines_MPIContainer_i()");
76   if( !handle_map.empty() ){
77     MESSAGE("[" << _numproc << "] Engines_MPIContainer_i::~Engines_MPIContainer_i: warning destroy a not empty container");
78   }
79 }
80
81 // Start MPI Container
82 Engines::MPIContainer_ptr Engines_MPIContainer_i::start_MPIimpl(
83                                          const char* ContainerName,
84                                          CORBA::Short nbproc )
85 {
86
87   char nbp[1024];
88
89   MESSAGE("[" << _numproc << "] start_impl argc " << _argc << " ContainerName " << ContainerName
90           << hex << this << dec) ;
91   _numInstanceMutex.lock() ; // lock on the instance number
92
93   CORBA::Object_var obj = Engines::MPIContainer::_nil() ;
94   bool nilvar = true ;
95   try {
96     string cont("/Containers/");
97     cont += machineName() ;
98     cont += "/" ;
99     cont += ContainerName;
100     INFOS("[" << _numproc << "] " << machineName() << " start_impl unknown container " << cont.c_str()
101           << " try to Resolve" );
102     obj = _NS->Resolve( cont.c_str() );
103     nilvar = CORBA::is_nil( obj ) ;
104     if ( nilvar ) {
105       INFOS("[" << _numproc << "] " << machineName() << " start_impl unknown container "
106             << ContainerName);
107     }
108   }
109   catch (ServiceUnreachable&) {
110     INFOS("[" << _numproc << "] " << machineName() << "Caught exception: Naming Service Unreachable");
111   }
112   catch (...) {
113     INFOS("[" << _numproc << "] " << machineName() << "Caught unknown exception.");
114   }
115   if ( !nilvar ) {
116     _numInstanceMutex.unlock() ;
117     MESSAGE("[" << _numproc << "] start_impl container found without new launch") ;
118     return Engines::MPIContainer::_narrow(obj);
119   }
120   int i = 0 ;
121   while ( _argv[ i ] ) {
122     MESSAGE("[" << _numproc << "]            argv" << i << " " << _argv[ i ]) ;
123     i++ ;
124   }
125   sprintf(nbp,"mpirun -np %d SALOME_MPIContainer ",nbproc);
126   string shstr(nbp);
127   shstr += ContainerName ;
128   if ( _argc == 4 ) {
129     shstr += " " ;
130     shstr += _argv[ 2 ] ;
131     shstr += " " ;
132     shstr += _argv[ 3 ] ;
133   }
134   shstr += " > /tmp/" ;
135   shstr += ContainerName ;
136   shstr += ".log 2>&1 &" ;
137   MESSAGE("system(" << shstr << ")") ;
138   int status = system( shstr.c_str() ) ;
139   if (status == -1) {
140     INFOS("[" << _numproc << "] Engines_MPIContainer_i::start_impl SALOME_MPIContainer failed (system command status -1)") ;
141   }
142   else if (status == 217) {
143     INFOS("[" << _numproc << "] Engines_MPIContainer_i::start_impl SALOME_MPIContainer failed (system command status 217)") ;
144   }
145   INFOS("[" << _numproc << "] " << machineName() << " Engines_MPIContainer_i::start_impl SALOME_MPIContainer launch done");
146
147   obj = Engines::MPIContainer::_nil() ;
148   try {
149     string cont("/Containers/");
150     cont += machineName() ;
151     cont += "/" ;
152     cont += ContainerName;
153     nilvar = true ;
154     int count = 20 ;
155     while ( nilvar && count >= 0) {
156       sleep( 1 ) ;
157       obj = _NS->Resolve(cont.c_str());
158       nilvar = CORBA::is_nil( obj ) ;
159       if ( nilvar ) {
160         INFOS("[" << _numproc << "] " << count << ". " << machineName()
161               << " start_impl unknown container " << cont.c_str());
162         count -= 1 ;
163       }
164     }
165     _numInstanceMutex.unlock() ;
166     if ( !nilvar ) {
167       MESSAGE("[" << _numproc << "] start_impl container found after new launch of SALOME_MPIContainer") ;
168     }
169     return Engines::MPIContainer::_narrow(obj);
170   }
171   catch (ServiceUnreachable&) {
172     INFOS("[" << _numproc << "] " << machineName() << "Caught exception: Naming Service Unreachable");
173   }
174   catch (...) {
175     INFOS("[" << _numproc << "] " << machineName() << "Caught unknown exception.");
176   }
177   _numInstanceMutex.unlock() ;
178   MESSAGE("[" << _numproc << "] start_impl MPI container not found after new launch of SALOME_MPIContainer") ;
179   return Engines::MPIContainer::_nil() ;
180 }
181
182 // Load component
183 Engines::Component_ptr Engines_MPIContainer_i::load_impl(const char* nameToRegister,
184                                                  const char* componentName)
185 {
186   int ip;
187
188   if( _numproc == 0 ){
189     // Invocation du chargement du composant dans les autres process
190     for(ip= 1;ip<_nbproc;ip++)
191       (Engines::MPIContainer::_narrow((*_tior)[ip]))->SPload_impl(nameToRegister,
192                                                                 componentName);
193   }
194
195   return Lload_impl(nameToRegister,componentName);
196
197 }
198
199 // Load component
200 void Engines_MPIContainer_i::SPload_impl(const char* nameToRegister,
201                                          const char* componentName)
202 {
203   Lload_impl(nameToRegister,componentName);
204 }
205
206 Engines::Component_ptr Engines_MPIContainer_i::Lload_impl(
207                                    const char* nameToRegister,
208                                    const char* componentName)
209 {
210   Engines::Component_var iobject;
211   Engines::MPIObject_var pobj;
212   char cproc[4];
213
214   sprintf(cproc,"_%d",_numproc);
215
216   BEGIN_OF("[" << _numproc << "] MPIContainer_i::Lload_impl");
217
218   _numInstanceMutex.lock() ; // lock on the instance number
219   _numInstance++ ;
220   char _aNumI[12];
221   sprintf(_aNumI,"%d",_numInstance) ;
222
223   string _impl_name = componentName;
224   string _nameToRegister = nameToRegister;
225   string instanceName = _nameToRegister + "_inst_" + _aNumI + cproc;
226   MESSAGE("[" << _numproc << "] instanceName=" << instanceName);
227
228   string absolute_impl_name(_impl_name);
229   MESSAGE("[" << _numproc << "] absolute_impl_name=" << absolute_impl_name);
230   void * handle = dlopen(absolute_impl_name.c_str(), RTLD_LAZY);
231   if(!handle){
232     INFOS("[" << _numproc << "] Can't load shared library : " << absolute_impl_name);
233     INFOS("[" << _numproc << "] error dlopen: " << dlerror());
234     return Engines::Component::_nil() ;
235   }
236
237   string factory_name = _nameToRegister + string("Engine_factory");
238   MESSAGE("[" << _numproc << "] factory_name=" << factory_name) ;
239
240   PortableServer::ObjectId * (*MPIComponent_factory) (int,int,
241                                                   CORBA::ORB_ptr,
242                                                   PortableServer::POA_ptr,
243                                                   PortableServer::ObjectId *,
244                                                   const char *,
245                                                   const char *) =
246     (PortableServer::ObjectId * (*) (int,int,
247                                      CORBA::ORB_ptr,
248                                      PortableServer::POA_ptr, 
249                                      PortableServer::ObjectId *, 
250                                      const char *, 
251                                      const char *)) 
252     dlsym(handle, factory_name.c_str());
253
254   char *error ;
255   if ((error = dlerror()) != NULL){
256     // Try to load a sequential component
257     MESSAGE("[" << _numproc << "] Try to load a sequential component");
258     _numInstanceMutex.unlock() ;
259     iobject = Engines_Container_i::load_impl(nameToRegister,componentName);
260     if( CORBA::is_nil(iobject) ) return Engines::Component::_duplicate(iobject);
261   }
262   else{
263     // Instanciation du composant parallele
264     MESSAGE("[" << _numproc << "] Try to load a parallel component");
265     PortableServer::ObjectId * id = (MPIComponent_factory)
266       (_nbproc,_numproc,_orb, _poa, _id, instanceName.c_str(), _nameToRegister.c_str());
267     // get reference from id
268     CORBA::Object_var o = _poa->id_to_reference(*id);
269     pobj = Engines::MPIObject::_narrow(o) ;
270     iobject = Engines::Component::_narrow(o) ;
271   }
272
273   if( _numproc == 0 ){
274     // utiliser + tard le registry ici :
275     // register the engine under the name containerName.dir/nameToRegister.object
276     string component_registerName = _containerName + "/" + _nameToRegister;
277     _NS->Register(iobject, component_registerName.c_str()) ;
278   }
279
280   handle_map[instanceName] = handle;
281   _numInstanceMutex.unlock() ;
282
283   // Root recupere les ior des composants des autre process
284   BCastIOR(_orb,pobj,false);
285
286   END_OF("[" <<_numproc << "] MPIContainer_i::Lload_impl");
287   return Engines::Component::_duplicate(iobject);
288
289 }
290
291 void Engines_MPIContainer_i::remove_impl(Engines::Component_ptr component_i)
292 {
293   int ip;
294   Engines::Component_ptr cptr;
295   Engines::MPIObject_ptr pcptr;
296   Engines::MPIObject_ptr spcptr;
297
298   ASSERT(! CORBA::is_nil(component_i));
299
300   if( _numproc == 0 ){
301     // Invocation de la destruction du composant dans les autres process
302     pcptr = (Engines::MPIObject_ptr)component_i;
303     for(ip= 1;ip<_nbproc;ip++){
304       spcptr = Engines::MPIObject::_narrow((*(pcptr->tior()))[ip]);
305       cptr = (Engines::Component_ptr)spcptr;
306       (Engines::MPIContainer::_narrow((*_tior)[ip]))->SPremove_impl(cptr);
307     }
308   }
309
310   Lremove_impl(component_i);
311 }
312
313 void Engines_MPIContainer_i::SPremove_impl(Engines::Component_ptr component_i)
314 {
315   Lremove_impl(component_i);
316 }
317
318 void Engines_MPIContainer_i::Lremove_impl(Engines::Component_ptr component_i)
319 {
320   int ip;
321   Engines::Component_ptr cptr;
322   Engines::MPIObject_ptr pcptr;
323   Engines::MPIObject_ptr spcptr;
324
325   BEGIN_OF("[" << _numproc << "] MPIContainer_i::Lremove_impl");
326
327   ASSERT(! CORBA::is_nil(component_i));
328
329   string instanceName = component_i->instanceName() ;
330   MESSAGE("[" << _numproc << "] unload component " << instanceName);
331   component_i->destroy() ;
332   MESSAGE("[" << _numproc << "] test key handle_map");
333   _numInstanceMutex.lock() ; // lock on the remove on handle_map
334   if (handle_map[instanceName]) // if key does not exist, created & initialized null
335     {
336       remove_map[instanceName] = handle_map[instanceName] ;
337     }
338   else MESSAGE("[" << _numproc << "] no key handle_map");
339   handle_map.erase(instanceName) ;   
340   _numInstanceMutex.unlock() ;
341   MESSAGE("[" << _numproc << "] list handle_map");
342   map<string, void *>::iterator im ;
343   for (im = handle_map.begin() ; im != handle_map.end() ; im ++)
344     {
345       MESSAGE("[" << _numproc << "] stay " << (*im).first);
346     }
347
348   END_OF("[" << _numproc << "] MPIContainer_i::Lremove_impl");
349
350 }
351
352 void Engines_MPIContainer_i::finalize_removal()
353 {
354   int ip;
355
356   if( _numproc == 0 ){
357     // Invocation de la destruction du composant dans les autres process
358     for(ip= 1;ip<_nbproc;ip++)
359       (Engines::MPIContainer::_narrow((*_tior)[ip]))->SPfinalize_removal();
360   }
361
362   Lfinalize_removal();
363 }
364
365 void Engines_MPIContainer_i::SPfinalize_removal()
366 {
367   Lfinalize_removal();
368 }
369
370 void Engines_MPIContainer_i::Lfinalize_removal()
371 {
372   BEGIN_OF("[" << _numproc << "] MPIContainer_i::Lfinalize_removal");
373
374   map<string, void *>::iterator im ;
375   // lock on the explore remove_map & dlclose
376   _numInstanceMutex.lock() ; 
377   for (im = remove_map.begin() ; im != remove_map.end() ; im ++)
378     {
379       void * handle = (*im).second ;
380       MESSAGE("[" << _numproc << "] dlclose " << (*im).first);
381       dlclose(handle) ;
382     }
383   MESSAGE("[" << _numproc << "] remove_map.clear()");
384   remove_map.clear() ;  
385   _numInstanceMutex.unlock() ;
386
387   END_OF("[" << _numproc << "] MPIContainer_i::Lfinalize_removal");
388 }
389
390 // Load component
391 void Engines_MPIContainer_i::MPIShutdown()
392 {
393   int ip;
394   MESSAGE("[" << _numproc << "] shutdown of Corba Server");
395   if( _numproc == 0 ){
396     for(ip= 1;ip<_nbproc;ip++)
397       (Engines::MPIContainer::_narrow((*_tior)[ip]))->Shutdown();
398   }
399
400   Shutdown();
401
402 }
403