1 // SALOME MPIContainer : implemenation of container based on MPI libraries
3 // Copyright (C) 2003 OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 // CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
6 // This library is free software; you can redistribute it and/or
7 // modify it under the terms of the GNU Lesser General Public
8 // License as published by the Free Software Foundation; either
9 // version 2.1 of the License.
11 // This library is distributed in the hope that it will be useful,
12 // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 // Lesser General Public License for more details.
16 // You should have received a copy of the GNU Lesser General Public
17 // License along with this library; if not, write to the Free Software
18 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
20 // See http://www.opencascade.org/SALOME/ or email : webmaster.salome@opencascade.org
24 // File : MPIContainer_i.cxx
30 #include "SALOME_Component_i.hxx"
31 #include "MPIContainer_i.hxx"
32 #include "SALOME_NamingService.hxx"
33 #include "Utils_SINGLETON.hxx"
35 #include "utilities.h"
37 #include "Container_init_python.hxx"
40 // L'appel au registry SALOME ne se fait que pour le process 0
41 Engines_MPIContainer_i::Engines_MPIContainer_i(int nbproc, int numproc,
43 PortableServer::POA_ptr poa,
45 int argc, char *argv[])
46 : Engines_Container_i(orb,poa,containerName,argc,argv,false), MPIObject_i(nbproc,numproc)
48 MESSAGE("[" << numproc << "] activate object");
49 _id = _poa->activate_object(this);
54 _NS = new SALOME_NamingService();
55 // _NS = SINGLETON_<SALOME_NamingService>::Instance() ;
56 // ASSERT(SINGLETON_<SALOME_NamingService>::IsAlreadyExisting()) ;
57 _NS->init_orb( CORBA::ORB::_duplicate(_orb) ) ;
59 // Engines::Container_ptr pCont
60 // = Engines::Container::_narrow(POA_Engines::MPIContainer::_this());
61 CORBA::Object_var obj=_poa->id_to_reference(*_id);
62 Engines::Container_var pCont = Engines::Container::_narrow(obj);
63 string hostname = GetHostname();
64 _containerName = _NS->BuildContainerNameForNS(containerName,hostname.c_str());
65 SCRUTE(_containerName);
66 _NS->Register(pCont, _containerName.c_str());
69 // Root recupere les ior des container des autre process
70 Engines::MPIObject_var pobj = POA_Engines::MPIContainer::_this();
71 BCastIOR(_orb,pobj,true);
74 Engines_MPIContainer_i::Engines_MPIContainer_i(int nbproc, int numproc)
75 : Engines_Container_i(), MPIObject_i(nbproc,numproc)
79 Engines_MPIContainer_i::~Engines_MPIContainer_i(void)
81 MESSAGE("[" << _numproc << "] Engines_MPIContainer_i::~Engines_MPIContainer_i()");
85 void Engines_MPIContainer_i::Shutdown()
88 MESSAGE("[" << _numproc << "] shutdown of MPI Corba Server");
90 _NS->Destroy_FullDirectory(_containerName.c_str());
91 for(ip= 1;ip<_nbproc;ip++)
92 (Engines::MPIContainer::_narrow((*_tior)[ip]))->Shutdown();
98 // Load a component library
99 bool Engines_MPIContainer_i::load_component_Library(const char* componentName)
102 // Invocation du chargement du composant dans les autres process
103 for(int ip= 1;ip<_nbproc;ip++)
104 (Engines::MPIContainer::_narrow((*_tior)[ip]))->Asload_component_Library(componentName);
107 return Lload_component_Library(componentName);
110 void Engines_MPIContainer_i::Asload_component_Library(const char* componentName)
112 Lload_component_Library(componentName);
115 bool Engines_MPIContainer_i::Lload_component_Library(const char* componentName)
117 string aCompName = componentName;
119 // --- try dlopen C++ component
121 string impl_name = string ("lib") + aCompName + string("Engine.so");
124 _numInstanceMutex.lock(); // lock to be alone
125 // (see decInstanceCnt, finalize_removal))
126 if (_toRemove_map[impl_name]) _toRemove_map.erase(impl_name);
127 if (_library_map[impl_name])
129 MESSAGE("[" << _numproc << "] Library " << impl_name << " already loaded");
130 _numInstanceMutex.unlock();
135 handle = dlopen( impl_name.c_str() , RTLD_LAZY ) ;
138 _library_map[impl_name] = handle;
139 _numInstanceMutex.unlock();
144 INFOS("[" << _numproc << "] Can't load shared library : " << impl_name);
145 INFOS("[" << _numproc << "] error dlopen: " << dlerror());
147 _numInstanceMutex.unlock();
149 // --- try import Python component
151 INFOS("[" << _numproc << "] try import Python component "<<componentName);
152 if (_isSupervContainer)
154 INFOS("[" << _numproc << "] Supervision Container does not support Python Component Engines");
157 if (_library_map[aCompName])
159 return true; // Python Component, already imported
163 Py_ACQUIRE_NEW_THREAD;
164 PyObject *mainmod = PyImport_AddModule("__main__");
165 PyObject *globals = PyModule_GetDict(mainmod);
166 PyObject *pyCont = PyDict_GetItemString(globals, "pyCont");
167 PyObject *result = PyObject_CallMethod(pyCont,
170 int ret= PyInt_AsLong(result);
172 Py_RELEASE_NEW_THREAD;
174 if (ret) // import possible: Python component
176 _library_map[aCompName] = (void *)pyCont; // any non O value OK
177 MESSAGE("[" << _numproc << "] import Python: "<<aCompName<<" OK");
184 // Create an instance of component
185 Engines::Component_ptr
186 Engines_MPIContainer_i::create_component_instance( const char* componentName,
190 // Invocation du chargement du composant dans les autres process
191 for(int ip= 1;ip<_nbproc;ip++)
192 (Engines::MPIContainer::_narrow((*_tior)[ip]))->Ascreate_component_instance(componentName,studyId);
195 return Lcreate_component_instance(componentName,studyId);
198 void Engines_MPIContainer_i::Ascreate_component_instance( const char* componentName,
201 Lcreate_component_instance(componentName,studyId);
204 Engines::Component_ptr
205 Engines_MPIContainer_i::Lcreate_component_instance( const char* genericRegisterName, CORBA::Long studyId)
208 INFOS("studyId must be > 0 for mono study instance, =0 for multiStudy");
209 return Engines::Component::_nil() ;
212 Engines::Component_var iobject = Engines::Component::_nil() ;
213 Engines::MPIObject_var pobj;
215 string aCompName = genericRegisterName;
216 if (_library_map[aCompName]) { // Python component
217 if (_isSupervContainer) {
218 INFOS("Supervision Container does not support Python Component Engines");
219 return Engines::Component::_nil();
221 _numInstanceMutex.lock() ; // lock on the instance number
223 int numInstance = _numInstance ;
224 _numInstanceMutex.unlock() ;
227 sprintf( aNumI , "%d" , numInstance ) ;
228 string instanceName = aCompName + "_inst_" + aNumI ;
229 string component_registerName =
230 _containerName + "/" + instanceName;
232 Py_ACQUIRE_NEW_THREAD;
233 PyObject *mainmod = PyImport_AddModule("__main__");
234 PyObject *globals = PyModule_GetDict(mainmod);
235 PyObject *pyCont = PyDict_GetItemString(globals, "pyCont");
236 PyObject *result = PyObject_CallMethod(pyCont,
237 "create_component_instance",
240 instanceName.c_str(),
242 string iors = PyString_AsString(result);
244 Py_RELEASE_NEW_THREAD;
246 CORBA::Object_var obj = _orb->string_to_object(iors.c_str());
247 iobject = Engines::Component::_narrow( obj ) ;
248 pobj = Engines::MPIObject::_narrow(obj) ;
250 _NS->Register(iobject, component_registerName.c_str()) ;
251 // Root recupere les ior des composants des autre process
252 BCastIOR(_orb,pobj,false);
254 return iobject._retn();
259 string impl_name = string ("lib") + genericRegisterName +string("Engine.so");
260 void* handle = _library_map[impl_name];
262 INFOS("shared library " << impl_name <<"must be loaded before instance");
263 return Engines::Component::_nil() ;
266 iobject = createMPIInstance(genericRegisterName,
269 return iobject._retn();
273 Engines::Component_ptr
274 Engines_MPIContainer_i::createMPIInstance(string genericRegisterName,
278 Engines::Component_var iobject;
279 Engines::MPIObject_var pobj;
280 // --- find the factory
282 string aGenRegisterName = genericRegisterName;
283 string factory_name = aGenRegisterName + string("Engine_factory");
284 SCRUTE(factory_name) ;
286 typedef PortableServer::ObjectId * (*MPIFACTORY_FUNCTION)
289 PortableServer::POA_ptr,
290 PortableServer::ObjectId *,
294 MPIFACTORY_FUNCTION MPIComponent_factory
295 = (MPIFACTORY_FUNCTION) dlsym(handle, factory_name.c_str());
298 if ( (error = dlerror() ) != NULL) {
299 // Try to load a sequential component
300 MESSAGE("[" << _numproc << "] Try to load a sequential component");
301 _numInstanceMutex.unlock() ;
302 iobject = Engines_Container_i::createInstance(genericRegisterName,handle,studyId);
303 if( CORBA::is_nil(iobject) ) return Engines::Component::_duplicate(iobject);
306 // --- create instance
308 iobject = Engines::Component::_nil() ;
312 _numInstanceMutex.lock() ; // lock on the instance number
314 int numInstance = _numInstance ;
315 _numInstanceMutex.unlock() ;
318 sprintf( aNumI , "%d" , numInstance ) ;
319 string instanceName = aGenRegisterName + "_inst_" + aNumI ;
320 string component_registerName =
321 _containerName + "/" + instanceName;
323 // --- Instanciate required CORBA object
325 PortableServer::ObjectId *id ; //not owner, do not delete (nore use var)
326 id = (MPIComponent_factory) ( _nbproc,_numproc,_orb, _poa, _id, instanceName.c_str(),
327 aGenRegisterName.c_str() ) ;
329 // --- get reference & servant from id
331 CORBA::Object_var obj = _poa->id_to_reference(*id);
332 iobject = Engines::Component::_narrow( obj ) ;
333 pobj = Engines::MPIObject::_narrow(obj) ;
335 Engines_Component_i *servant =
336 dynamic_cast<Engines_Component_i*>(_poa->reference_to_servant(iobject));
338 //SCRUTE(servant->pd_refCount);
339 servant->_remove_ref(); // compensate previous id_to_reference
340 //SCRUTE(servant->pd_refCount);
341 _listInstances_map[instanceName] = iobject;
342 _cntInstances_map[aGenRegisterName] += 1;
343 SCRUTE(aGenRegisterName);
344 SCRUTE(_cntInstances_map[aGenRegisterName]);
345 //SCRUTE(servant->pd_refCount);
346 bool ret_studyId = servant->setStudyId(studyId);
349 // --- register the engine under the name
350 // containerName(.dir)/instanceName(.object)
353 _NS->Register( iobject , component_registerName.c_str() ) ;
354 MESSAGE( component_registerName.c_str() << " bound" ) ;
356 // Root recupere les ior des composants des autre process
357 BCastIOR(_orb,pobj,false);
362 INFOS( "Container_i::createInstance exception catched" ) ;
364 return iobject._retn();
368 Engines::Component_ptr Engines_MPIContainer_i::load_impl(const char* nameToRegister,
369 const char* componentName)
372 // Invocation du chargement du composant dans les autres process
373 for(int ip= 1;ip<_nbproc;ip++)
374 (Engines::MPIContainer::_narrow((*_tior)[ip]))->Asload_impl(nameToRegister,
378 return Lload_impl(nameToRegister,componentName);
383 void Engines_MPIContainer_i::Asload_impl(const char* nameToRegister,
384 const char* componentName)
386 Lload_impl(nameToRegister,componentName);
389 Engines::Component_ptr Engines_MPIContainer_i::Lload_impl(
390 const char* nameToRegister,
391 const char* componentName)
393 Engines::Component_var iobject;
394 Engines::MPIObject_var pobj;
397 sprintf(cproc,"_%d",_numproc);
399 BEGIN_OF("[" << _numproc << "] MPIContainer_i::Lload_impl");
401 _numInstanceMutex.lock() ; // lock on the instance number
404 sprintf(_aNumI,"%d",_numInstance) ;
406 string _impl_name = componentName;
407 string _nameToRegister = nameToRegister;
408 string instanceName = _nameToRegister + "_inst_" + _aNumI + cproc;
409 MESSAGE("[" << _numproc << "] instanceName=" << instanceName);
411 string absolute_impl_name(_impl_name);
412 MESSAGE("[" << _numproc << "] absolute_impl_name=" << absolute_impl_name);
413 void * handle = dlopen(absolute_impl_name.c_str(), RTLD_LAZY);
415 INFOS("[" << _numproc << "] Can't load shared library : " << absolute_impl_name);
416 INFOS("[" << _numproc << "] error dlopen: " << dlerror());
417 return Engines::Component::_nil() ;
420 string factory_name = _nameToRegister + string("Engine_factory");
421 MESSAGE("[" << _numproc << "] factory_name=" << factory_name) ;
423 PortableServer::ObjectId * (*MPIComponent_factory) (int,int,
425 PortableServer::POA_ptr,
426 PortableServer::ObjectId *,
429 (PortableServer::ObjectId * (*) (int,int,
431 PortableServer::POA_ptr,
432 PortableServer::ObjectId *,
435 dlsym(handle, factory_name.c_str());
438 if ((error = dlerror()) != NULL){
439 // Try to load a sequential component
440 MESSAGE("[" << _numproc << "] Try to load a sequential component");
441 _numInstanceMutex.unlock() ;
442 iobject = Engines_Container_i::load_impl(nameToRegister,componentName);
443 if( CORBA::is_nil(iobject) ) return Engines::Component::_duplicate(iobject);
446 // Instanciation du composant parallele
447 MESSAGE("[" << _numproc << "] Try to load a parallel component");
448 PortableServer::ObjectId * id = (MPIComponent_factory)
449 (_nbproc,_numproc,_orb, _poa, _id, instanceName.c_str(), _nameToRegister.c_str());
450 // get reference from id
451 CORBA::Object_var o = _poa->id_to_reference(*id);
452 pobj = Engines::MPIObject::_narrow(o) ;
453 iobject = Engines::Component::_narrow(o) ;
457 // utiliser + tard le registry ici :
458 // register the engine under the name containerName.dir/nameToRegister.object
459 string component_registerName = _containerName + "/" + _nameToRegister;
460 _NS->Register(iobject, component_registerName.c_str()) ;
463 _numInstanceMutex.unlock() ;
465 // Root recupere les ior des composants des autre process
466 BCastIOR(_orb,pobj,false);
468 END_OF("[" <<_numproc << "] MPIContainer_i::Lload_impl");
469 return Engines::Component::_duplicate(iobject);
473 void Engines_MPIContainer_i::remove_impl(Engines::Component_ptr component_i)
476 Engines::Component_ptr cptr;
477 Engines::MPIObject_ptr pcptr;
478 Engines::MPIObject_ptr spcptr;
480 ASSERT(! CORBA::is_nil(component_i));
483 // Invocation de la destruction du composant dans les autres process
484 pcptr = (Engines::MPIObject_ptr)component_i;
485 for(ip= 1;ip<_nbproc;ip++){
486 spcptr = Engines::MPIObject::_narrow((*(pcptr->tior()))[ip]);
487 cptr = (Engines::Component_ptr)spcptr;
488 (Engines::MPIContainer::_narrow((*_tior)[ip]))->Asremove_impl(cptr);
492 Lremove_impl(component_i);
495 void Engines_MPIContainer_i::Asremove_impl(Engines::Component_ptr component_i)
497 Lremove_impl(component_i);
500 void Engines_MPIContainer_i::Lremove_impl(Engines::Component_ptr component_i)
502 BEGIN_OF("[" << _numproc << "] MPIContainer_i::Lremove_impl");
504 ASSERT(! CORBA::is_nil(component_i));
506 string instanceName = component_i->instanceName() ;
507 MESSAGE("[" << _numproc << "] unload component " << instanceName);
508 component_i->destroy() ;
509 MESSAGE("[" << _numproc << "] test key handle_map");
510 _numInstanceMutex.lock() ; // lock on the remove on handle_map
511 _numInstanceMutex.unlock() ;
512 MESSAGE("[" << _numproc << "] list handle_map");
514 END_OF("[" << _numproc << "] MPIContainer_i::Lremove_impl");
518 void Engines_MPIContainer_i::finalize_removal()
523 // Invocation de la destruction du composant dans les autres process
524 for(ip= 1;ip<_nbproc;ip++)
525 (Engines::MPIContainer::_narrow((*_tior)[ip]))->Asfinalize_removal();
531 void Engines_MPIContainer_i::Asfinalize_removal()
536 void Engines_MPIContainer_i::Lfinalize_removal()
538 BEGIN_OF("[" << _numproc << "] MPIContainer_i::Lfinalize_removal");
540 // map<string, void *>::iterator im ;
541 // // lock on the explore remove_map & dlclose
542 // _numInstanceMutex.lock() ;
543 // for (im = remove_map.begin() ; im != remove_map.end() ; im ++)
545 // void * handle = (*im).second ;
546 // MESSAGE("[" << _numproc << "] dlclose " << (*im).first);
549 // MESSAGE("[" << _numproc << "] remove_map.clear()");
550 // remove_map.clear() ;
551 // _numInstanceMutex.unlock() ;
553 END_OF("[" << _numproc << "] MPIContainer_i::Lfinalize_removal");