1 // Copyright (C) 2007-2008 CEA/DEN, EDF R&D, OPEN CASCADE
3 // Copyright (C) 2003-2007 OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 // CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
6 // This library is free software; you can redistribute it and/or
7 // modify it under the terms of the GNU Lesser General Public
8 // License as published by the Free Software Foundation; either
9 // version 2.1 of the License.
11 // This library is distributed in the hope that it will be useful,
12 // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 // Lesser General Public License for more details.
16 // You should have received a copy of the GNU Lesser General Public
17 // License along with this library; if not, write to the Free Software
18 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
20 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
22 // SALOME MPIContainer : implemenation of container based on MPI libraries
23 // File : MPIContainer_i.cxx
29 #include "Basics_Utils.hxx"
30 #include "SALOME_Component_i.hxx"
31 #include "MPIContainer_i.hxx"
32 #include "SALOME_NamingService.hxx"
33 #include "Utils_SINGLETON.hxx"
35 #include "utilities.h"
38 #include <pthread.h> // must be before Python.h !
40 #include "Container_init_python.hxx"
43 // L'appel au registry SALOME ne se fait que pour le process 0
44 Engines_MPIContainer_i::Engines_MPIContainer_i(int nbproc, int numproc,
46 PortableServer::POA_ptr poa,
48 int argc, char *argv[])
49 : Engines_Container_i(orb,poa,containerName,argc,argv,false), MPIObject_i(nbproc,numproc)
52 _id = _poa->activate_object(this);
53 CORBA::Object_var obj=_poa->id_to_reference(*_id);
54 Engines::Container_var pCont = Engines::Container::_narrow(obj);
59 _NS = new SALOME_NamingService();
60 _NS->init_orb( CORBA::ORB::_duplicate(_orb) ) ;
62 string hostname = Kernel_Utils::GetHostname();
63 _containerName = _NS->BuildContainerNameForNS(containerName,hostname.c_str());
64 SCRUTE(_containerName);
65 _NS->Register(pCont, _containerName.c_str());
69 // Root recupere les ior des container des autre process
70 Engines::MPIObject_var pobj = POA_Engines::MPIContainer::_this();
71 BCastIOR(_orb,pobj,true);
74 Engines_MPIContainer_i::Engines_MPIContainer_i(int nbproc, int numproc)
75 : Engines_Container_i(), MPIObject_i(nbproc,numproc)
79 Engines_MPIContainer_i::~Engines_MPIContainer_i(void)
81 MESSAGE("[" << _numproc << "] Engines_MPIContainer_i::~Engines_MPIContainer_i()");
85 void Engines_MPIContainer_i::Shutdown()
88 MESSAGE("[" << _numproc << "] shutdown of MPI Corba Server");
90 _NS->Destroy_FullDirectory(_containerName.c_str());
91 for(ip= 1;ip<_nbproc;ip++)
92 (Engines::MPIContainer::_narrow((*_tior)[ip]))->Shutdown();
95 std::map<std::string, Engines::Component_var>::iterator itm;
96 for (itm = _listInstances_map.begin(); itm != _listInstances_map.end(); itm++)
100 itm->second->destroy();
102 catch(const CORBA::Exception& e)
104 // ignore this entry and continue
108 // ignore this entry and continue
116 // Load a component library
117 bool Engines_MPIContainer_i::load_component_Library(const char* componentName)
121 th = new pthread_t[_nbproc];
122 for(int ip=1;ip<_nbproc;ip++){
123 thread_st *st = new thread_st;
126 st->compoName = componentName;
127 pthread_create(&(th[ip]),NULL,th_loadcomponentlibrary,(void*)st);
131 bool ret = Lload_component_Library(componentName);
134 for(int ip=1;ip<_nbproc;ip++)
135 pthread_join(th[ip],NULL);
141 bool Engines_MPIContainer_i::Lload_component_Library(const char* componentName)
143 string aCompName = componentName;
145 // --- try dlopen C++ component
147 string impl_name = string ("lib") + aCompName + string("Engine.so");
149 _numInstanceMutex.lock(); // lock to be alone
150 // (see decInstanceCnt, finalize_removal))
151 if (_toRemove_map[impl_name]) _toRemove_map.erase(impl_name);
152 if (_library_map[impl_name])
154 MESSAGE("[" << _numproc << "] Library " << impl_name << " already loaded");
155 _numInstanceMutex.unlock();
160 handle = dlopen( impl_name.c_str() , RTLD_LAZY ) ;
163 _library_map[impl_name] = handle;
164 _numInstanceMutex.unlock();
165 MESSAGE("[" << _numproc << "] Library " << impl_name << " loaded");
170 MESSAGE("[" << _numproc << "] Can't load shared library : " << impl_name);
171 MESSAGE("[" << _numproc << "] error dlopen: " << dlerror());
173 _numInstanceMutex.unlock();
175 // --- try import Python component
177 INFOS("[" << _numproc << "] try import Python component "<<componentName);
178 if (_isSupervContainer)
180 INFOS("[" << _numproc << "] Supervision Container does not support Python Component Engines");
183 if (_library_map[aCompName])
185 return true; // Python Component, already imported
189 Py_ACQUIRE_NEW_THREAD;
190 PyObject *mainmod = PyImport_AddModule((char *)"__main__");
191 PyObject *globals = PyModule_GetDict(mainmod);
192 PyObject *pyCont = PyDict_GetItemString(globals, "pyCont");
193 PyObject *result = PyObject_CallMethod(pyCont,
194 (char*)"import_component",
195 (char*)"s",componentName);
196 std::string ret= PyString_AsString(result);
198 Py_RELEASE_NEW_THREAD;
200 if (ret=="") // import possible: Python component
202 _library_map[aCompName] = (void *)pyCont; // any non O value OK
203 MESSAGE("[" << _numproc << "] import Python: "<<aCompName<<" OK");
210 // Create an instance of component
211 Engines::Component_ptr
212 Engines_MPIContainer_i::create_component_instance( const char* componentName,
217 th = new pthread_t[_nbproc];
218 for(int ip=1;ip<_nbproc;ip++){
219 thread_st *st = new thread_st;
222 st->compoName = componentName;
223 st->studyId = studyId;
224 pthread_create(&(th[ip]),NULL,th_createcomponentinstance,(void*)st);
228 Engines::Component_ptr cptr = Lcreate_component_instance(componentName,studyId);
231 for(int ip=1;ip<_nbproc;ip++)
232 pthread_join(th[ip],NULL);
239 Engines::Component_ptr
240 Engines_MPIContainer_i::Lcreate_component_instance( const char* genericRegisterName, CORBA::Long studyId)
243 INFOS("studyId must be > 0 for mono study instance, =0 for multiStudy");
244 return Engines::Component::_nil() ;
247 Engines::Component_var iobject = Engines::Component::_nil() ;
248 Engines::MPIObject_var pobj;
250 string aCompName = genericRegisterName;
251 if (_library_map[aCompName]) { // Python component
252 if (_isSupervContainer) {
253 INFOS("Supervision Container does not support Python Component Engines");
254 return Engines::Component::_nil();
256 _numInstanceMutex.lock() ; // lock on the instance number
258 int numInstance = _numInstance ;
259 _numInstanceMutex.unlock() ;
262 sprintf( aNumI , "%d" , numInstance ) ;
263 string instanceName = aCompName + "_inst_" + aNumI ;
264 string component_registerName =
265 _containerName + "/" + instanceName;
267 Py_ACQUIRE_NEW_THREAD;
268 PyObject *mainmod = PyImport_AddModule((char*)"__main__");
269 PyObject *globals = PyModule_GetDict(mainmod);
270 PyObject *pyCont = PyDict_GetItemString(globals, "pyCont");
271 PyObject *result = PyObject_CallMethod(pyCont,
272 (char*)"create_component_instance",
275 instanceName.c_str(),
277 string iors = PyString_AsString(result);
279 Py_RELEASE_NEW_THREAD;
281 CORBA::Object_var obj = _orb->string_to_object(iors.c_str());
282 iobject = Engines::Component::_narrow( obj ) ;
283 pobj = Engines::MPIObject::_narrow(obj) ;
285 _NS->Register(iobject, component_registerName.c_str()) ;
286 // Root recupere les ior des composants des autre process
287 BCastIOR(_orb,pobj,false);
289 return iobject._retn();
294 string impl_name = string ("lib") + genericRegisterName +string("Engine.so");
295 if (_library_map.count(impl_name) != 0) // C++ component
297 void* handle = _library_map[impl_name];
298 iobject = createMPIInstance(genericRegisterName,
301 return iobject._retn();
304 return Engines::Component::_nil() ;
307 Engines::Component_ptr
308 Engines_MPIContainer_i::createMPIInstance(string genericRegisterName,
312 Engines::Component_var iobject;
313 Engines::MPIObject_var pobj;
314 // --- find the factory
316 string aGenRegisterName = genericRegisterName;
317 string factory_name = aGenRegisterName + string("Engine_factory");
319 typedef PortableServer::ObjectId * (*MPIFACTORY_FUNCTION)
322 PortableServer::POA_ptr,
323 PortableServer::ObjectId *,
328 MPIFACTORY_FUNCTION MPIComponent_factory = (MPIFACTORY_FUNCTION) dlsym(handle, factory_name.c_str());
330 if ( !MPIComponent_factory )
332 INFOS( "[" << _numproc << "] Can't resolve symbol: " + factory_name );
334 pobj = Engines::MPIObject::_nil();
335 BCastIOR(_orb,pobj,false);
336 return Engines::Component::_nil();
339 // --- create instance
341 iobject = Engines::Component::_nil() ;
345 _numInstanceMutex.lock() ; // lock on the instance number
347 int numInstance = _numInstance ;
348 _numInstanceMutex.unlock() ;
351 sprintf( aNumI , "%d" , numInstance ) ;
352 string instanceName = aGenRegisterName + "_inst_" + aNumI ;
353 string component_registerName =
354 _containerName + "/" + instanceName;
356 // --- Instanciate required CORBA object
358 PortableServer::ObjectId *id ; //not owner, do not delete (nore use var)
359 id = (MPIComponent_factory) ( _nbproc,_numproc,_orb, _poa, _id, instanceName.c_str(),
360 aGenRegisterName.c_str() ) ;
362 // --- get reference & servant from id
364 CORBA::Object_var obj = _poa->id_to_reference(*id);
365 iobject = Engines::Component::_narrow( obj ) ;
366 pobj = Engines::MPIObject::_narrow(obj) ;
368 Engines_Component_i *servant =
369 dynamic_cast<Engines_Component_i*>(_poa->reference_to_servant(iobject));
371 //SCRUTE(servant->pd_refCount);
372 servant->_remove_ref(); // compensate previous id_to_reference
373 //SCRUTE(servant->pd_refCount);
374 _listInstances_map[instanceName] = iobject;
375 _cntInstances_map[aGenRegisterName] += 1;
376 //SCRUTE(servant->pd_refCount);
377 bool ret_studyId = servant->setStudyId(studyId);
380 // --- register the engine under the name
381 // containerName(.dir)/instanceName(.object)
384 _NS->Register( iobject , component_registerName.c_str() ) ;
385 MESSAGE( component_registerName.c_str() << " bound" ) ;
387 // Root recupere les ior des composants des autre process
388 BCastIOR(_orb,pobj,false);
391 catch(const POException &ex){
392 INFOS( ex.msg << " on process number " << ex.numproc ) ;
393 return Engines::Component::_nil();
396 INFOS( "Container_i::createInstance exception catched" ) ;
397 return Engines::Component::_nil();
399 return iobject._retn();
403 Engines::Component_ptr Engines_MPIContainer_i::load_impl(const char* nameToRegister,
404 const char* componentName)
408 th = new pthread_t[_nbproc];
409 for(int ip=1;ip<_nbproc;ip++){
410 thread_st *st = new thread_st;
413 st->nameToRegister = nameToRegister;
414 st->compoName = componentName;
415 pthread_create(&(th[ip]),NULL,th_loadimpl,(void*)st);
419 Engines::Component_ptr cptr = Lload_impl(nameToRegister,componentName);
422 for(int ip=1;ip<_nbproc;ip++)
423 pthread_join(th[ip],NULL);
431 Engines::Component_ptr Engines_MPIContainer_i::Lload_impl(
432 const char* nameToRegister,
433 const char* componentName)
435 Engines::Component_var iobject;
436 Engines::MPIObject_var pobj;
439 sprintf(cproc,"_%d",_numproc);
441 BEGIN_OF("[" << _numproc << "] MPIContainer_i::Lload_impl");
443 _numInstanceMutex.lock() ; // lock on the instance number
446 sprintf(_aNumI,"%d",_numInstance) ;
448 string _impl_name = componentName;
449 string _nameToRegister = nameToRegister;
450 string instanceName = _nameToRegister + "_inst_" + _aNumI + cproc;
451 MESSAGE("[" << _numproc << "] instanceName=" << instanceName);
453 string absolute_impl_name(_impl_name);
454 MESSAGE("[" << _numproc << "] absolute_impl_name=" << absolute_impl_name);
455 void * handle = dlopen(absolute_impl_name.c_str(), RTLD_LAZY);
457 INFOS("[" << _numproc << "] Can't load shared library : " << absolute_impl_name);
458 INFOS("[" << _numproc << "] error dlopen: " << dlerror());
459 return Engines::Component::_nil() ;
462 string factory_name = _nameToRegister + string("Engine_factory");
463 MESSAGE("[" << _numproc << "] factory_name=" << factory_name) ;
466 PortableServer::ObjectId * (*MPIComponent_factory) (int,int,
468 PortableServer::POA_ptr,
469 PortableServer::ObjectId *,
472 (PortableServer::ObjectId * (*) (int,int,
474 PortableServer::POA_ptr,
475 PortableServer::ObjectId *,
478 dlsym(handle, factory_name.c_str());
481 if ((error = dlerror()) != NULL){
482 // Try to load a sequential component
483 MESSAGE("[" << _numproc << "] Try to load a sequential component");
484 _numInstanceMutex.unlock() ;
485 iobject = Engines_Container_i::load_impl(nameToRegister,componentName);
486 if( CORBA::is_nil(iobject) ) return Engines::Component::_duplicate(iobject);
489 // Instanciation du composant parallele
490 MESSAGE("[" << _numproc << "] Try to load a parallel component");
491 PortableServer::ObjectId * id = (MPIComponent_factory)
492 (_nbproc,_numproc,_orb, _poa, _id, instanceName.c_str(), _nameToRegister.c_str());
493 // get reference from id
494 CORBA::Object_var o = _poa->id_to_reference(*id);
495 pobj = Engines::MPIObject::_narrow(o) ;
496 iobject = Engines::Component::_narrow(o) ;
500 // utiliser + tard le registry ici :
501 // register the engine under the name containerName.dir/nameToRegister.object
502 string component_registerName = _containerName + "/" + _nameToRegister;
503 _NS->Register(iobject, component_registerName.c_str()) ;
506 _numInstanceMutex.unlock() ;
508 // Root recupere les ior des composants des autre process
509 BCastIOR(_orb,pobj,false);
511 END_OF("[" <<_numproc << "] MPIContainer_i::Lload_impl");
512 return Engines::Component::_duplicate(iobject);
516 void Engines_MPIContainer_i::remove_impl(Engines::Component_ptr component_i)
518 Engines::MPIObject_ptr pcptr;
519 Engines::MPIObject_ptr spcptr;
523 pcptr = (Engines::MPIObject_ptr)component_i;
524 th = new pthread_t[_nbproc];
525 for(int ip=1;ip<_nbproc;ip++){
526 thread_st *st = new thread_st;
529 spcptr = Engines::MPIObject::_narrow((*(pcptr->tior()))[ip]);
530 st->cptr = (Engines::Component_ptr)spcptr;
531 pthread_create(&(th[ip]),NULL,th_removeimpl,(void*)st);
535 ASSERT(! CORBA::is_nil(component_i));
536 string instanceName = component_i->instanceName() ;
537 MESSAGE("[" << _numproc << "] unload component " << instanceName);
538 _numInstanceMutex.lock() ; // lock on the remove on handle_map
539 _listInstances_map.erase(instanceName);
540 _numInstanceMutex.unlock() ;
541 component_i->destroy() ;
543 _NS->Destroy_Name(instanceName.c_str());
546 for(int ip=1;ip<_nbproc;ip++)
547 pthread_join(th[ip],NULL);
553 void Engines_MPIContainer_i::finalize_removal()
557 th = new pthread_t[_nbproc];
558 for(int ip=1;ip<_nbproc;ip++){
559 thread_st *st = new thread_st;
562 pthread_create(&(th[ip]),NULL,th_finalizeremoval,(void*)st);
566 _numInstanceMutex.lock(); // lock to be alone
567 // (see decInstanceCnt, load_component_Library)
568 map<string, void *>::iterator ith;
569 for (ith = _toRemove_map.begin(); ith != _toRemove_map.end(); ith++)
571 void *handle = (*ith).second;
572 string impl_name= (*ith).first;
577 // dlclose(handle); // SALOME unstable after ...
578 // _library_map.erase(impl_name);
581 _toRemove_map.clear();
582 _numInstanceMutex.unlock();
585 for(int ip=1;ip<_nbproc;ip++)
586 pthread_join(th[ip],NULL);
591 void *th_loadcomponentlibrary(void *s)
593 thread_st *st = (thread_st*)s;
594 (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->load_component_Library(st->compoName.c_str());
598 void *th_createcomponentinstance(void *s)
600 thread_st *st = (thread_st*)s;
601 (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->create_component_instance(st->compoName.c_str(),st->studyId);
605 void *th_loadimpl(void *s)
607 thread_st *st = (thread_st*)s;
608 (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->load_impl(st->nameToRegister.c_str(),st->compoName.c_str());
612 void *th_removeimpl(void *s)
614 thread_st *st = (thread_st*)s;
615 (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->remove_impl(st->cptr);
619 void *th_finalizeremoval(void *s)
621 thread_st *st = (thread_st*)s;
622 (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->finalize_removal();