]> SALOME platform Git repositories - modules/kernel.git/blob - src/MPIContainer/MPIContainer_i.cxx
Salome HOME
Test EXPORTS definition with target name as suggested by cmake
[modules/kernel.git] / src / MPIContainer / MPIContainer_i.cxx
1 //  Copyright (C) 2007-2008  CEA/DEN, EDF R&D, OPEN CASCADE
2 //
3 //  Copyright (C) 2003-2007  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 //  CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
5 //
6 //  This library is free software; you can redistribute it and/or
7 //  modify it under the terms of the GNU Lesser General Public
8 //  License as published by the Free Software Foundation; either
9 //  version 2.1 of the License.
10 //
11 //  This library is distributed in the hope that it will be useful,
12 //  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 //  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 //  Lesser General Public License for more details.
15 //
16 //  You should have received a copy of the GNU Lesser General Public
17 //  License along with this library; if not, write to the Free Software
18 //  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
19 //
20 //  See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 //
22 //  SALOME MPIContainer : implemenation of container based on MPI libraries
23 //  File   : MPIContainer_i.cxx
24 //  Module : SALOME
25 //
26 #include <iostream.h>
27 #include <dlfcn.h>
28 #include <stdio.h>
29 #include "Basics_Utils.hxx"
30 #include "SALOME_Component_i.hxx"
31 #include "MPIContainer_i.hxx"
32 #include "SALOME_NamingService.hxx"
33 #include "Utils_SINGLETON.hxx"
34 #include "OpUtil.hxx"
35 #include "utilities.h"
36 #include <time.h>
37 #include <sys/time.h>
38 #include <pthread.h>  // must be before Python.h !
39 #include <Python.h>
40 #include "Container_init_python.hxx"
41 using namespace std;
42
43 // L'appel au registry SALOME ne se fait que pour le process 0
44 Engines_MPIContainer_i::Engines_MPIContainer_i(int nbproc, int numproc,
45                                                CORBA::ORB_ptr orb, 
46                                                PortableServer::POA_ptr poa,
47                                                char * containerName,
48                                                int argc, char *argv[]) 
49   : Engines_Container_i(orb,poa,containerName,argc,argv,false), MPIObject_i(nbproc,numproc)
50 {
51
52   _id = _poa->activate_object(this);
53   CORBA::Object_var obj=_poa->id_to_reference(*_id);
54   Engines::Container_var pCont = Engines::Container::_narrow(obj);
55   _remove_ref();
56
57   if(numproc==0){
58
59     _NS = new SALOME_NamingService();
60     _NS->init_orb( CORBA::ORB::_duplicate(_orb) ) ;
61
62     string hostname = Kernel_Utils::GetHostname();
63     _containerName = _NS->BuildContainerNameForNS(containerName,hostname.c_str());
64     SCRUTE(_containerName);
65     _NS->Register(pCont, _containerName.c_str());
66
67   }
68
69   // Root recupere les ior des container des autre process
70   Engines::MPIObject_var pobj = POA_Engines::MPIContainer::_this();
71   BCastIOR(_orb,pobj,true);
72 }
73
74 Engines_MPIContainer_i::Engines_MPIContainer_i(int nbproc, int numproc) 
75   : Engines_Container_i(), MPIObject_i(nbproc,numproc)
76 {
77 }
78
79 Engines_MPIContainer_i::~Engines_MPIContainer_i(void)
80 {
81   MESSAGE("[" << _numproc << "] Engines_MPIContainer_i::~Engines_MPIContainer_i()");
82 }
83
84 // Load component
85 void Engines_MPIContainer_i::Shutdown()
86 {
87   int ip;
88   MESSAGE("[" << _numproc << "] shutdown of MPI Corba Server");
89   if( _numproc == 0 ){
90     _NS->Destroy_FullDirectory(_containerName.c_str());
91     for(ip= 1;ip<_nbproc;ip++)
92       (Engines::MPIContainer::_narrow((*_tior)[ip]))->Shutdown();
93   }
94
95   std::map<std::string, Engines::Component_var>::iterator itm;
96   for (itm = _listInstances_map.begin(); itm != _listInstances_map.end(); itm++)
97     {
98       try
99         {
100           itm->second->destroy();
101         }
102       catch(const CORBA::Exception& e)
103         {
104           // ignore this entry and continue
105         }
106       catch(...)
107         {
108           // ignore this entry and continue
109         }
110     }
111
112   _orb->shutdown(0);
113
114 }
115
116 // Load a component library
117 bool Engines_MPIContainer_i::load_component_Library(const char* componentName)
118 {
119   if( _numproc == 0 ){
120     // Invocation du chargement du composant dans les autres process
121     for(int ip= 1;ip<_nbproc;ip++)
122       (Engines::MPIContainer::_narrow((*_tior)[ip]))->Asload_component_Library(componentName);
123   }
124
125   return Lload_component_Library(componentName);
126 }
127
128 void Engines_MPIContainer_i::Asload_component_Library(const char* componentName)
129 {
130   Lload_component_Library(componentName);
131 }
132
133 bool Engines_MPIContainer_i::Lload_component_Library(const char* componentName)
134 {
135   string aCompName = componentName;
136
137   // --- try dlopen C++ component
138
139   string impl_name = string ("lib") + aCompName + string("Engine.so");
140   
141   _numInstanceMutex.lock(); // lock to be alone 
142   // (see decInstanceCnt, finalize_removal))
143   if (_toRemove_map[impl_name]) _toRemove_map.erase(impl_name);
144   if (_library_map[impl_name])
145     {
146       MESSAGE("[" << _numproc << "] Library " << impl_name << " already loaded");
147       _numInstanceMutex.unlock();
148       return true;
149     }
150   
151   void* handle;
152   handle = dlopen( impl_name.c_str() , RTLD_LAZY ) ;
153   if ( handle )
154     {
155       _library_map[impl_name] = handle;
156       _numInstanceMutex.unlock();
157       MESSAGE("[" << _numproc << "] Library " << impl_name << " loaded");
158       MPI_Barrier(MPI_COMM_WORLD);
159       return true;
160     }
161   else
162     {
163       MESSAGE("[" << _numproc << "] Can't load shared library : " << impl_name);
164       MESSAGE("[" << _numproc << "] error dlopen: " << dlerror());
165       MPI_Barrier(MPI_COMM_WORLD);
166     }
167   _numInstanceMutex.unlock();
168
169   // --- try import Python component
170
171   INFOS("[" << _numproc << "] try import Python component "<<componentName);
172   if (_isSupervContainer)
173     {
174       INFOS("[" << _numproc << "] Supervision Container does not support Python Component Engines");
175       return false;
176     }
177   if (_library_map[aCompName])
178     {
179       return true; // Python Component, already imported
180     }
181   else
182     {
183       Py_ACQUIRE_NEW_THREAD;
184       PyObject *mainmod = PyImport_AddModule("__main__");
185       PyObject *globals = PyModule_GetDict(mainmod);
186       PyObject *pyCont = PyDict_GetItemString(globals, "pyCont");
187       PyObject *result = PyObject_CallMethod(pyCont,
188                                              "import_component",
189                                              "s",componentName);
190       int ret= PyInt_AsLong(result);
191       SCRUTE(ret);
192       Py_RELEASE_NEW_THREAD;
193   
194       if (ret) // import possible: Python component
195         {
196           _library_map[aCompName] = (void *)pyCont; // any non O value OK
197           MESSAGE("[" << _numproc << "] import Python: "<<aCompName<<" OK");
198           return true;
199         }
200     }
201   return false;
202 }
203
204 // Create an instance of component
205 Engines::Component_ptr
206 Engines_MPIContainer_i::create_component_instance( const char* componentName,
207                                                    CORBA::Long studyId)
208 {
209   if( _numproc == 0 ){
210     // Invocation du chargement du composant dans les autres process
211     for(int ip= 1;ip<_nbproc;ip++)
212       (Engines::MPIContainer::_narrow((*_tior)[ip]))->Ascreate_component_instance(componentName,studyId);
213   }
214
215   return Lcreate_component_instance(componentName,studyId);
216 }
217
218 void Engines_MPIContainer_i::Ascreate_component_instance( const char* componentName,
219                                                           CORBA::Long studyId)
220 {
221   Lcreate_component_instance(componentName,studyId);
222 }
223
224 Engines::Component_ptr
225 Engines_MPIContainer_i::Lcreate_component_instance( const char* genericRegisterName, CORBA::Long studyId)
226 {
227   if (studyId < 0) {
228     INFOS("studyId must be > 0 for mono study instance, =0 for multiStudy");
229     return Engines::Component::_nil() ;
230   }
231
232   Engines::Component_var iobject = Engines::Component::_nil() ;
233   Engines::MPIObject_var pobj;
234
235   string aCompName = genericRegisterName;
236   if (_library_map[aCompName]) { // Python component
237     if (_isSupervContainer) {
238       INFOS("Supervision Container does not support Python Component Engines");
239       return Engines::Component::_nil();
240     }
241     _numInstanceMutex.lock() ; // lock on the instance number
242     _numInstance++ ;
243     int numInstance = _numInstance ;
244     _numInstanceMutex.unlock() ;
245
246     char aNumI[12];
247     sprintf( aNumI , "%d" , numInstance ) ;
248     string instanceName = aCompName + "_inst_" + aNumI ;
249     string component_registerName =
250       _containerName + "/" + instanceName;
251
252     Py_ACQUIRE_NEW_THREAD;
253     PyObject *mainmod = PyImport_AddModule("__main__");
254     PyObject *globals = PyModule_GetDict(mainmod);
255     PyObject *pyCont = PyDict_GetItemString(globals, "pyCont");
256     PyObject *result = PyObject_CallMethod(pyCont,
257                                            "create_component_instance",
258                                            "ssl",
259                                            aCompName.c_str(),
260                                            instanceName.c_str(),
261                                            studyId);
262     string iors = PyString_AsString(result);
263     SCRUTE(iors);
264     Py_RELEASE_NEW_THREAD;
265   
266     CORBA::Object_var obj = _orb->string_to_object(iors.c_str());
267     iobject = Engines::Component::_narrow( obj ) ;
268     pobj = Engines::MPIObject::_narrow(obj) ;
269     if( _numproc == 0 )
270       _NS->Register(iobject, component_registerName.c_str()) ;
271     // Root recupere les ior des composants des autre process
272     BCastIOR(_orb,pobj,false);
273
274     return iobject._retn();
275   }
276   
277   //--- try C++
278
279   string impl_name = string ("lib") + genericRegisterName +string("Engine.so");
280   if (_library_map.count(impl_name) != 0) // C++ component
281     {
282       void* handle = _library_map[impl_name];
283       iobject = createMPIInstance(genericRegisterName,
284                                     handle,
285                                     studyId);
286       return iobject._retn();
287     }
288
289 }
290
291 Engines::Component_ptr
292 Engines_MPIContainer_i::createMPIInstance(string genericRegisterName,
293                                           void *handle,
294                                           int studyId)
295 {
296   Engines::Component_var iobject;
297   Engines::MPIObject_var pobj;
298   // --- find the factory
299
300   string aGenRegisterName = genericRegisterName;
301   string factory_name = aGenRegisterName + string("Engine_factory");
302
303   typedef  PortableServer::ObjectId * (*MPIFACTORY_FUNCTION)
304     (int,int,
305      CORBA::ORB_ptr,
306      PortableServer::POA_ptr, 
307      PortableServer::ObjectId *, 
308      const char *, 
309      const char *) ;
310
311   dlerror();
312   MPIFACTORY_FUNCTION MPIComponent_factory = (MPIFACTORY_FUNCTION) dlsym(handle, factory_name.c_str());
313
314   if ( !MPIComponent_factory )
315     {
316       INFOS( "[" << _numproc << "] Can't resolve symbol: " + factory_name );
317       SCRUTE( dlerror() );
318       pobj = Engines::MPIObject::_nil();
319       BCastIOR(_orb,pobj,false);
320       return Engines::Component::_nil();
321     }
322
323   // --- create instance
324
325   iobject = Engines::Component::_nil() ;
326
327   try
328     {
329       _numInstanceMutex.lock() ; // lock on the instance number
330       _numInstance++ ;
331       int numInstance = _numInstance ;
332       _numInstanceMutex.unlock() ;
333
334       char aNumI[12];
335       sprintf( aNumI , "%d" , numInstance ) ;
336       string instanceName = aGenRegisterName + "_inst_" + aNumI ;
337       string component_registerName =
338         _containerName + "/" + instanceName;
339
340       // --- Instanciate required CORBA object
341
342       PortableServer::ObjectId *id ; //not owner, do not delete (nore use var)
343       id = (MPIComponent_factory) ( _nbproc,_numproc,_orb, _poa, _id, instanceName.c_str(),
344                                  aGenRegisterName.c_str() ) ;
345
346       // --- get reference & servant from id
347
348       CORBA::Object_var obj = _poa->id_to_reference(*id);
349       iobject = Engines::Component::_narrow( obj ) ;
350       pobj = Engines::MPIObject::_narrow(obj) ;
351
352       Engines_Component_i *servant =
353         dynamic_cast<Engines_Component_i*>(_poa->reference_to_servant(iobject));
354       ASSERT(servant);
355       //SCRUTE(servant->pd_refCount);
356       servant->_remove_ref(); // compensate previous id_to_reference 
357       //SCRUTE(servant->pd_refCount);
358       _listInstances_map[instanceName] = iobject;
359       _cntInstances_map[aGenRegisterName] += 1;
360       //SCRUTE(servant->pd_refCount);
361       bool ret_studyId = servant->setStudyId(studyId);
362       ASSERT(ret_studyId);
363
364       // --- register the engine under the name
365       //     containerName(.dir)/instanceName(.object)
366
367       if( _numproc == 0 ){
368         _NS->Register( iobject , component_registerName.c_str() ) ;
369         MESSAGE( component_registerName.c_str() << " bound" ) ;
370       }
371       // Root recupere les ior des composants des autre process
372       BCastIOR(_orb,pobj,false);
373
374     }
375   catch(const POException &ex){
376     INFOS( ex.msg << " on process number " << ex.numproc ) ;
377     return Engines::Component::_nil();
378   }
379   catch (...){
380     INFOS( "Container_i::createInstance exception catched" ) ;
381     return Engines::Component::_nil();
382   }
383   return iobject._retn();
384 }
385
386 // Load component
387 Engines::Component_ptr Engines_MPIContainer_i::load_impl(const char* nameToRegister,
388                                                  const char* componentName)
389 {
390   if( _numproc == 0 ){
391     // Invocation du chargement du composant dans les autres process
392     for(int ip= 1;ip<_nbproc;ip++)
393       (Engines::MPIContainer::_narrow((*_tior)[ip]))->Asload_impl(nameToRegister,
394                                                                 componentName);
395   }
396
397   return Lload_impl(nameToRegister,componentName);
398
399 }
400
401 // Load component
402 void Engines_MPIContainer_i::Asload_impl(const char* nameToRegister,
403                                          const char* componentName)
404 {
405   Lload_impl(nameToRegister,componentName);
406 }
407
408 Engines::Component_ptr Engines_MPIContainer_i::Lload_impl(
409                                    const char* nameToRegister,
410                                    const char* componentName)
411 {
412   Engines::Component_var iobject;
413   Engines::MPIObject_var pobj;
414   char cproc[4];
415
416   sprintf(cproc,"_%d",_numproc);
417
418   BEGIN_OF("[" << _numproc << "] MPIContainer_i::Lload_impl");
419
420   _numInstanceMutex.lock() ; // lock on the instance number
421   _numInstance++ ;
422   char _aNumI[12];
423   sprintf(_aNumI,"%d",_numInstance) ;
424
425   string _impl_name = componentName;
426   string _nameToRegister = nameToRegister;
427   string instanceName = _nameToRegister + "_inst_" + _aNumI + cproc;
428   MESSAGE("[" << _numproc << "] instanceName=" << instanceName);
429
430   string absolute_impl_name(_impl_name);
431   MESSAGE("[" << _numproc << "] absolute_impl_name=" << absolute_impl_name);
432   void * handle = dlopen(absolute_impl_name.c_str(), RTLD_LAZY);
433   if(!handle){
434     INFOS("[" << _numproc << "] Can't load shared library : " << absolute_impl_name);
435     INFOS("[" << _numproc << "] error dlopen: " << dlerror());
436     return Engines::Component::_nil() ;
437   }
438
439   string factory_name = _nameToRegister + string("Engine_factory");
440   MESSAGE("[" << _numproc << "] factory_name=" << factory_name) ;
441
442   dlerror();
443   PortableServer::ObjectId * (*MPIComponent_factory) (int,int,
444                                                   CORBA::ORB_ptr,
445                                                   PortableServer::POA_ptr,
446                                                   PortableServer::ObjectId *,
447                                                   const char *,
448                                                   const char *) =
449     (PortableServer::ObjectId * (*) (int,int,
450                                      CORBA::ORB_ptr,
451                                      PortableServer::POA_ptr, 
452                                      PortableServer::ObjectId *, 
453                                      const char *, 
454                                      const char *)) 
455     dlsym(handle, factory_name.c_str());
456
457   char *error ;
458   if ((error = dlerror()) != NULL){
459     // Try to load a sequential component
460     MESSAGE("[" << _numproc << "] Try to load a sequential component");
461     _numInstanceMutex.unlock() ;
462     iobject = Engines_Container_i::load_impl(nameToRegister,componentName);
463     if( CORBA::is_nil(iobject) ) return Engines::Component::_duplicate(iobject);
464   }
465   else{
466     // Instanciation du composant parallele
467     MESSAGE("[" << _numproc << "] Try to load a parallel component");
468     PortableServer::ObjectId * id = (MPIComponent_factory)
469       (_nbproc,_numproc,_orb, _poa, _id, instanceName.c_str(), _nameToRegister.c_str());
470     // get reference from id
471     CORBA::Object_var o = _poa->id_to_reference(*id);
472     pobj = Engines::MPIObject::_narrow(o) ;
473     iobject = Engines::Component::_narrow(o) ;
474   }
475
476   if( _numproc == 0 ){
477     // utiliser + tard le registry ici :
478     // register the engine under the name containerName.dir/nameToRegister.object
479     string component_registerName = _containerName + "/" + _nameToRegister;
480     _NS->Register(iobject, component_registerName.c_str()) ;
481   }
482
483   _numInstanceMutex.unlock() ;
484
485   // Root recupere les ior des composants des autre process
486   BCastIOR(_orb,pobj,false);
487
488   END_OF("[" <<_numproc << "] MPIContainer_i::Lload_impl");
489   return Engines::Component::_duplicate(iobject);
490
491 }
492
493 void Engines_MPIContainer_i::remove_impl(Engines::Component_ptr component_i)
494 {
495   int ip;
496   Engines::Component_ptr cptr;
497   Engines::MPIObject_ptr pcptr;
498   Engines::MPIObject_ptr spcptr;
499
500   ASSERT(! CORBA::is_nil(component_i));
501
502   if( _numproc == 0 ){
503     // Invocation de la destruction du composant dans les autres process
504     pcptr = (Engines::MPIObject_ptr)component_i;
505     for(ip= 1;ip<_nbproc;ip++){
506       spcptr = Engines::MPIObject::_narrow((*(pcptr->tior()))[ip]);
507       cptr = (Engines::Component_ptr)spcptr;
508       (Engines::MPIContainer::_narrow((*_tior)[ip]))->Asremove_impl(cptr);
509     }
510   }
511
512   Lremove_impl(component_i);
513 }
514
515 void Engines_MPIContainer_i::Asremove_impl(Engines::Component_ptr component_i)
516 {
517   Lremove_impl(component_i);
518 }
519
520 void Engines_MPIContainer_i::Lremove_impl(Engines::Component_ptr component_i)
521 {
522   BEGIN_OF("[" << _numproc << "] MPIContainer_i::Lremove_impl");
523
524   ASSERT(! CORBA::is_nil(component_i));
525
526   string instanceName = component_i->instanceName() ;
527   MESSAGE("[" << _numproc << "] unload component " << instanceName);
528   component_i->destroy() ;
529   MESSAGE("[" << _numproc << "] test key handle_map");
530   _numInstanceMutex.lock() ; // lock on the remove on handle_map
531   _numInstanceMutex.unlock() ;
532   MESSAGE("[" << _numproc << "] list handle_map");
533
534   END_OF("[" << _numproc << "] MPIContainer_i::Lremove_impl");
535
536 }
537
538 void Engines_MPIContainer_i::finalize_removal()
539 {
540   int ip;
541
542   if( _numproc == 0 ){
543     // Invocation de la destruction du composant dans les autres process
544     for(ip= 1;ip<_nbproc;ip++)
545       (Engines::MPIContainer::_narrow((*_tior)[ip]))->Asfinalize_removal();
546   }
547
548   Lfinalize_removal();
549 }
550
551 void Engines_MPIContainer_i::Asfinalize_removal()
552 {
553   Lfinalize_removal();
554 }
555
556 void Engines_MPIContainer_i::Lfinalize_removal()
557 {
558   BEGIN_OF("[" << _numproc << "] MPIContainer_i::Lfinalize_removal");
559
560 //   map<string, void *>::iterator im ;
561 //   // lock on the explore remove_map & dlclose
562 //   _numInstanceMutex.lock() ; 
563 //   for (im = remove_map.begin() ; im != remove_map.end() ; im ++)
564 //     {
565 //       void * handle = (*im).second ;
566 //       MESSAGE("[" << _numproc << "] dlclose " << (*im).first);
567 //       dlclose(handle) ;
568 //     }
569 //   MESSAGE("[" << _numproc << "] remove_map.clear()");
570 //   remove_map.clear() ;  
571 //   _numInstanceMutex.unlock() ;
572
573   END_OF("[" << _numproc << "] MPIContainer_i::Lfinalize_removal");
574 }