1 // Copyright (C) 2007-2010 CEA/DEN, EDF R&D, OPEN CASCADE
3 // Copyright (C) 2003-2007 OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 // CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
6 // This library is free software; you can redistribute it and/or
7 // modify it under the terms of the GNU Lesser General Public
8 // License as published by the Free Software Foundation; either
9 // version 2.1 of the License.
11 // This library is distributed in the hope that it will be useful,
12 // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 // Lesser General Public License for more details.
16 // You should have received a copy of the GNU Lesser General Public
17 // License along with this library; if not, write to the Free Software
18 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
20 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
23 // SALOME MPIContainer : implemenation of container based on MPI libraries
24 // File : MPIContainer_i.cxx
30 #include "Basics_Utils.hxx"
31 #include "SALOME_Component_i.hxx"
32 #include "MPIContainer_i.hxx"
33 #include "SALOME_NamingService.hxx"
34 #include "Utils_SINGLETON.hxx"
36 #include "utilities.h"
39 #include <pthread.h> // must be before Python.h !
41 #include "Container_init_python.hxx"
43 // L'appel au registry SALOME ne se fait que pour le process 0
44 Engines_MPIContainer_i::Engines_MPIContainer_i(CORBA::ORB_ptr orb,
45 PortableServer::POA_ptr poa,
47 int argc, char *argv[])
48 : Engines_Container_i(orb,poa,containerName,argc,argv,false)
51 _id = _poa->activate_object(this);
52 CORBA::Object_var obj=_poa->id_to_reference(*_id);
53 Engines::Container_var pCont = Engines::Container::_narrow(obj);
58 _NS = new SALOME_NamingService();
59 _NS->init_orb( CORBA::ORB::_duplicate(_orb) ) ;
61 std::string hostname = Kernel_Utils::GetHostname();
62 _containerName = _NS->BuildContainerNameForNS(containerName,hostname.c_str());
63 SCRUTE(_containerName);
64 _NS->Register(pCont, _containerName.c_str());
68 // Root recupere les ior des container des autre process
69 Engines::MPIObject_var pobj = POA_Engines::MPIContainer::_this();
70 BCastIOR(_orb,pobj,true);
73 Engines_MPIContainer_i::Engines_MPIContainer_i()
74 : Engines_Container_i()
78 Engines_MPIContainer_i::~Engines_MPIContainer_i(void)
80 MESSAGE("[" << _numproc << "] Engines_MPIContainer_i::~Engines_MPIContainer_i()");
84 void Engines_MPIContainer_i::Shutdown()
87 MESSAGE("[" << _numproc << "] shutdown of MPI Corba Server");
89 _NS->Destroy_FullDirectory(_containerName.c_str());
90 _NS->Destroy_Name(_containerName.c_str());
91 for(ip= 1;ip<_nbproc;ip++)
92 (Engines::MPIContainer::_narrow((*_tior)[ip]))->Shutdown();
95 std::map<std::string, Engines::Component_var>::iterator itm;
96 for (itm = _listInstances_map.begin(); itm != _listInstances_map.end(); itm++)
100 itm->second->destroy();
102 catch(const CORBA::Exception& e)
104 // ignore this entry and continue
108 // ignore this entry and continue
116 // Load a component library
117 bool Engines_MPIContainer_i::load_component_Library(const char* componentName, CORBA::String_out reason)
119 reason=CORBA::string_dup("");
123 th = new pthread_t[_nbproc];
124 for(int ip=1;ip<_nbproc;ip++){
125 thread_st *st = new thread_st;
128 st->compoName = componentName;
129 pthread_create(&(th[ip]),NULL,th_loadcomponentlibrary,(void*)st);
133 bool ret = Lload_component_Library(componentName);
136 for(int ip=1;ip<_nbproc;ip++)
137 pthread_join(th[ip],NULL);
143 bool Engines_MPIContainer_i::Lload_component_Library(const char* componentName)
145 std::string aCompName = componentName;
147 // --- try dlopen C++ component
149 std::string impl_name = std::string ("lib") + aCompName + std::string("Engine.so");
151 _numInstanceMutex.lock(); // lock to be alone
152 // (see decInstanceCnt, finalize_removal))
153 if (_toRemove_map[impl_name]) _toRemove_map.erase(impl_name);
154 if (_library_map[impl_name])
156 MESSAGE("[" << _numproc << "] Library " << impl_name << " already loaded");
157 _numInstanceMutex.unlock();
162 handle = dlopen( impl_name.c_str() , RTLD_LAZY ) ;
165 _library_map[impl_name] = handle;
166 _numInstanceMutex.unlock();
167 MESSAGE("[" << _numproc << "] Library " << impl_name << " loaded");
172 MESSAGE("[" << _numproc << "] Can't load shared library : " << impl_name);
173 MESSAGE("[" << _numproc << "] error dlopen: " << dlerror());
175 _numInstanceMutex.unlock();
177 // --- try import Python component
179 INFOS("[" << _numproc << "] try import Python component "<<componentName);
180 if (_isSupervContainer)
182 INFOS("[" << _numproc << "] Supervision Container does not support Python Component Engines");
185 if (_library_map[aCompName])
187 return true; // Python Component, already imported
191 Py_ACQUIRE_NEW_THREAD;
192 PyObject *mainmod = PyImport_AddModule((char *)"__main__");
193 PyObject *globals = PyModule_GetDict(mainmod);
194 PyObject *pyCont = PyDict_GetItemString(globals, "pyCont");
195 PyObject *result = PyObject_CallMethod(pyCont,
196 (char*)"import_component",
197 (char*)"s",componentName);
198 std::string ret= PyString_AsString(result);
200 Py_RELEASE_NEW_THREAD;
202 if (ret=="") // import possible: Python component
204 _library_map[aCompName] = (void *)pyCont; // any non O value OK
205 MESSAGE("[" << _numproc << "] import Python: "<<aCompName<<" OK");
212 // Create an instance of component
213 Engines::Component_ptr
214 Engines_MPIContainer_i::create_component_instance_env( const char* componentName,
216 const Engines::FieldsDict& env,
217 CORBA::String_out reason)
219 reason=CORBA::string_dup("");
223 th = new pthread_t[_nbproc];
224 for(int ip=1;ip<_nbproc;ip++){
225 thread_st *st = new thread_st;
228 st->compoName = componentName;
229 st->studyId = studyId;
230 pthread_create(&(th[ip]),NULL,th_createcomponentinstance,(void*)st);
234 Engines::Component_ptr cptr = Lcreate_component_instance(componentName,studyId);
237 for(int ip=1;ip<_nbproc;ip++)
238 pthread_join(th[ip],NULL);
245 Engines::Component_ptr
246 Engines_MPIContainer_i::Lcreate_component_instance( const char* genericRegisterName, CORBA::Long studyId)
249 INFOS("studyId must be > 0 for mono study instance, =0 for multiStudy");
250 return Engines::Component::_nil() ;
253 Engines::Component_var iobject = Engines::Component::_nil() ;
254 Engines::MPIObject_var pobj;
256 std::string aCompName = genericRegisterName;
257 if (_library_map[aCompName]) { // Python component
258 if (_isSupervContainer) {
259 INFOS("Supervision Container does not support Python Component Engines");
260 return Engines::Component::_nil();
262 _numInstanceMutex.lock() ; // lock on the instance number
264 int numInstance = _numInstance ;
265 _numInstanceMutex.unlock() ;
268 sprintf( aNumI , "%d" , numInstance ) ;
269 std::string instanceName = aCompName + "_inst_" + aNumI ;
270 std::string component_registerName =
271 _containerName + "/" + instanceName;
273 Py_ACQUIRE_NEW_THREAD;
274 PyObject *mainmod = PyImport_AddModule((char*)"__main__");
275 PyObject *globals = PyModule_GetDict(mainmod);
276 PyObject *pyCont = PyDict_GetItemString(globals, "pyCont");
277 PyObject *result = PyObject_CallMethod(pyCont,
278 (char*)"create_component_instance",
281 instanceName.c_str(),
285 PyArg_ParseTuple(result,"ss", &ior, &error);
286 std::string iors = ior;
288 Py_RELEASE_NEW_THREAD;
290 CORBA::Object_var obj = _orb->string_to_object(iors.c_str());
291 iobject = Engines::Component::_narrow( obj ) ;
292 pobj = Engines::MPIObject::_narrow(obj) ;
294 _NS->Register(iobject, component_registerName.c_str()) ;
295 // Root recupere les ior des composants des autre process
296 BCastIOR(_orb,pobj,false);
298 return iobject._retn();
303 std::string impl_name = std::string ("lib") + genericRegisterName +std::string("Engine.so");
304 if (_library_map.count(impl_name) != 0) // C++ component
306 void* handle = _library_map[impl_name];
307 iobject = createMPIInstance(genericRegisterName,
310 return iobject._retn();
313 return Engines::Component::_nil() ;
316 Engines::Component_ptr
317 Engines_MPIContainer_i::createMPIInstance(std::string genericRegisterName,
321 Engines::Component_var iobject;
322 Engines::MPIObject_var pobj;
323 // --- find the factory
325 std::string aGenRegisterName = genericRegisterName;
326 std::string factory_name = aGenRegisterName + std::string("Engine_factory");
328 typedef PortableServer::ObjectId * (*MPIFACTORY_FUNCTION)
330 PortableServer::POA_ptr,
331 PortableServer::ObjectId *,
336 MPIFACTORY_FUNCTION MPIComponent_factory = (MPIFACTORY_FUNCTION) dlsym(handle, factory_name.c_str());
338 if ( !MPIComponent_factory )
340 INFOS( "[" << _numproc << "] Can't resolve symbol: " + factory_name );
342 pobj = Engines::MPIObject::_nil();
343 BCastIOR(_orb,pobj,false);
344 return Engines::Component::_nil();
347 // --- create instance
349 iobject = Engines::Component::_nil() ;
353 _numInstanceMutex.lock() ; // lock on the instance number
355 int numInstance = _numInstance ;
356 _numInstanceMutex.unlock() ;
359 sprintf( aNumI , "%d" , numInstance ) ;
360 std::string instanceName = aGenRegisterName + "_inst_" + aNumI ;
361 std::string component_registerName =
362 _containerName + "/" + instanceName;
364 // --- Instanciate required CORBA object
366 PortableServer::ObjectId *id ; //not owner, do not delete (nore use var)
367 id = (MPIComponent_factory) ( _orb, _poa, _id, instanceName.c_str(), aGenRegisterName.c_str() ) ;
369 // --- get reference & servant from id
371 CORBA::Object_var obj = _poa->id_to_reference(*id);
372 iobject = Engines::Component::_narrow( obj ) ;
373 pobj = Engines::MPIObject::_narrow(obj) ;
375 Engines_Component_i *servant =
376 dynamic_cast<Engines_Component_i*>(_poa->reference_to_servant(iobject));
378 //SCRUTE(servant->pd_refCount);
379 servant->_remove_ref(); // compensate previous id_to_reference
380 //SCRUTE(servant->pd_refCount);
381 _listInstances_map[instanceName] = iobject;
382 _cntInstances_map[aGenRegisterName] += 1;
383 //SCRUTE(servant->pd_refCount);
384 bool ret_studyId = servant->setStudyId(studyId);
387 // --- register the engine under the name
388 // containerName(.dir)/instanceName(.object)
391 _NS->Register( iobject , component_registerName.c_str() ) ;
392 MESSAGE( component_registerName.c_str() << " bound" ) ;
394 // Root recupere les ior des composants des autre process
395 BCastIOR(_orb,pobj,false);
398 catch(const std::exception &ex){
400 return Engines::Component::_nil();
402 return iobject._retn();
406 Engines::Component_ptr Engines_MPIContainer_i::load_impl(const char* nameToRegister,
407 const char* componentName)
411 th = new pthread_t[_nbproc];
412 for(int ip=1;ip<_nbproc;ip++){
413 thread_st *st = new thread_st;
416 st->nameToRegister = nameToRegister;
417 st->compoName = componentName;
418 pthread_create(&(th[ip]),NULL,th_loadimpl,(void*)st);
422 Engines::Component_ptr cptr = Lload_impl(nameToRegister,componentName);
425 for(int ip=1;ip<_nbproc;ip++)
426 pthread_join(th[ip],NULL);
434 Engines::Component_ptr Engines_MPIContainer_i::Lload_impl(
435 const char* nameToRegister,
436 const char* componentName)
438 Engines::Component_var iobject;
439 Engines::MPIObject_var pobj;
442 sprintf(cproc,"_%d",_numproc);
444 BEGIN_OF("[" << _numproc << "] MPIContainer_i::Lload_impl");
446 _numInstanceMutex.lock() ; // lock on the instance number
449 sprintf(_aNumI,"%d",_numInstance) ;
451 std::string _impl_name = componentName;
452 std::string _nameToRegister = nameToRegister;
453 std::string instanceName = _nameToRegister + "_inst_" + _aNumI + cproc;
454 MESSAGE("[" << _numproc << "] instanceName=" << instanceName);
456 std::string absolute_impl_name(_impl_name);
457 MESSAGE("[" << _numproc << "] absolute_impl_name=" << absolute_impl_name);
458 void * handle = dlopen(absolute_impl_name.c_str(), RTLD_LAZY);
460 INFOS("[" << _numproc << "] Can't load shared library : " << absolute_impl_name);
461 INFOS("[" << _numproc << "] error dlopen: " << dlerror());
462 return Engines::Component::_nil() ;
465 std::string factory_name = _nameToRegister + std::string("Engine_factory");
466 MESSAGE("[" << _numproc << "] factory_name=" << factory_name) ;
469 PortableServer::ObjectId * (*MPIComponent_factory) (CORBA::ORB_ptr,
470 PortableServer::POA_ptr,
471 PortableServer::ObjectId *,
474 (PortableServer::ObjectId * (*) (CORBA::ORB_ptr,
475 PortableServer::POA_ptr,
476 PortableServer::ObjectId *,
479 dlsym(handle, factory_name.c_str());
482 if ((error = dlerror()) != NULL){
483 // Try to load a sequential component
484 MESSAGE("[" << _numproc << "] Try to load a sequential component");
485 _numInstanceMutex.unlock() ;
486 iobject = Engines_Container_i::load_impl(nameToRegister,componentName);
487 if( CORBA::is_nil(iobject) ) return Engines::Component::_duplicate(iobject);
490 // Instanciation du composant parallele
491 MESSAGE("[" << _numproc << "] Try to load a parallel component");
492 PortableServer::ObjectId * id = (MPIComponent_factory)
493 (_orb, _poa, _id, instanceName.c_str(), _nameToRegister.c_str());
494 // get reference from id
495 CORBA::Object_var o = _poa->id_to_reference(*id);
496 pobj = Engines::MPIObject::_narrow(o) ;
497 iobject = Engines::Component::_narrow(o) ;
501 // utiliser + tard le registry ici :
502 // register the engine under the name containerName.dir/nameToRegister.object
503 std::string component_registerName = _containerName + "/" + _nameToRegister;
504 _NS->Register(iobject, component_registerName.c_str()) ;
507 _numInstanceMutex.unlock() ;
509 // Root recupere les ior des composants des autre process
510 BCastIOR(_orb,pobj,false);
512 END_OF("[" <<_numproc << "] MPIContainer_i::Lload_impl");
513 return Engines::Component::_duplicate(iobject);
517 void Engines_MPIContainer_i::remove_impl(Engines::Component_ptr component_i)
519 Engines::MPIObject_ptr pcptr;
520 Engines::MPIObject_ptr spcptr;
524 pcptr = (Engines::MPIObject_ptr)component_i;
525 th = new pthread_t[_nbproc];
526 for(int ip=1;ip<_nbproc;ip++){
527 thread_st *st = new thread_st;
530 spcptr = Engines::MPIObject::_narrow((*(pcptr->tior()))[ip]);
531 st->cptr = (Engines::Component_ptr)spcptr;
532 pthread_create(&(th[ip]),NULL,th_removeimpl,(void*)st);
536 ASSERT(! CORBA::is_nil(component_i));
537 std::string instanceName = component_i->instanceName() ;
538 MESSAGE("[" << _numproc << "] unload component " << instanceName);
539 _numInstanceMutex.lock() ; // lock on the remove on handle_map
540 _listInstances_map.erase(instanceName);
541 _numInstanceMutex.unlock() ;
542 component_i->destroy() ;
544 _NS->Destroy_Name(instanceName.c_str());
547 for(int ip=1;ip<_nbproc;ip++)
548 pthread_join(th[ip],NULL);
554 void Engines_MPIContainer_i::finalize_removal()
558 th = new pthread_t[_nbproc];
559 for(int ip=1;ip<_nbproc;ip++){
560 thread_st *st = new thread_st;
563 pthread_create(&(th[ip]),NULL,th_finalizeremoval,(void*)st);
567 _numInstanceMutex.lock(); // lock to be alone
568 // (see decInstanceCnt, load_component_Library)
569 std::map<std::string, void *>::iterator ith;
570 for (ith = _toRemove_map.begin(); ith != _toRemove_map.end(); ith++)
572 void *handle = (*ith).second;
573 std::string impl_name= (*ith).first;
578 // dlclose(handle); // SALOME unstable after ...
579 // _library_map.erase(impl_name);
582 _toRemove_map.clear();
583 _numInstanceMutex.unlock();
586 for(int ip=1;ip<_nbproc;ip++)
587 pthread_join(th[ip],NULL);
592 void *th_loadcomponentlibrary(void *s)
594 thread_st *st = (thread_st*)s;
596 (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->load_component_Library(st->compoName.c_str(),reason);
597 CORBA::string_free(reason);
601 void *th_createcomponentinstance(void *s)
603 thread_st *st = (thread_st*)s;
604 (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->create_component_instance(st->compoName.c_str(),st->studyId);
608 void *th_loadimpl(void *s)
610 thread_st *st = (thread_st*)s;
611 (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->load_impl(st->nameToRegister.c_str(),st->compoName.c_str());
615 void *th_removeimpl(void *s)
617 thread_st *st = (thread_st*)s;
618 (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->remove_impl(st->cptr);
622 void *th_finalizeremoval(void *s)
624 thread_st *st = (thread_st*)s;
625 (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->finalize_removal();