1 // Copyright (C) 2007-2016 CEA/DEN, EDF R&D, OPEN CASCADE
3 // Copyright (C) 2003-2007 OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 // CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
6 // This library is free software; you can redistribute it and/or
7 // modify it under the terms of the GNU Lesser General Public
8 // License as published by the Free Software Foundation; either
9 // version 2.1 of the License, or (at your option) any later version.
11 // This library is distributed in the hope that it will be useful,
12 // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 // Lesser General Public License for more details.
16 // You should have received a copy of the GNU Lesser General Public
17 // License along with this library; if not, write to the Free Software
18 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
20 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
22 // File : MPIContainer_i.cxx
28 #include "Basics_Utils.hxx"
29 #include "SALOME_Component_i.hxx"
30 #include "MPIContainer_i.hxx"
31 #include "SALOME_NamingService.hxx"
32 #include "Utils_SINGLETON.hxx"
34 #include "utilities.h"
37 #include <pthread.h> // must be before Python.h !
44 #include "Container_init_python.hxx"
46 // L'appel au registry SALOME ne se fait que pour le process 0
47 Engines_MPIContainer_i::Engines_MPIContainer_i(CORBA::ORB_ptr orb,
48 PortableServer::POA_ptr poa,
50 int argc, char *argv[])
51 : Engines_Container_i(orb,poa,containerName,argc,argv,false)
54 _id = _poa->activate_object(this);
55 CORBA::Object_var obj=_poa->id_to_reference(*_id);
56 Engines::Container_var pCont = Engines::Container::_narrow(obj);
61 _NS = new SALOME_NamingService();
62 _NS->init_orb( CORBA::ORB::_duplicate(_orb) ) ;
64 std::string hostname = Kernel_Utils::GetHostname();
65 _containerName = _NS->BuildContainerNameForNS(containerName,hostname.c_str());
66 SCRUTE(_containerName);
67 _NS->Register(pCont, _containerName.c_str());
71 // Root recupere les ior des container des autre process
72 Engines::MPIObject_var pobj = POA_Engines::MPIContainer::_this();
73 BCastIOR(_orb,pobj,true);
76 Engines_MPIContainer_i::Engines_MPIContainer_i()
77 : Engines_Container_i()
81 Engines_MPIContainer_i::~Engines_MPIContainer_i(void)
83 MESSAGE("[" << _numproc << "] Engines_MPIContainer_i::~Engines_MPIContainer_i()");
87 void Engines_MPIContainer_i::Shutdown()
90 MESSAGE("[" << _numproc << "] shutdown of MPI Corba Server");
92 _NS->Destroy_FullDirectory(_containerName.c_str());
93 _NS->Destroy_Name(_containerName.c_str());
94 for(ip= 1;ip<_nbproc;ip++)
95 (Engines::MPIContainer::_narrow((*_tior)[ip]))->Shutdown();
98 std::map<std::string, Engines::EngineComponent_var>::iterator itm;
99 for (itm = _listInstances_map.begin(); itm != _listInstances_map.end(); itm++)
103 itm->second->destroy();
105 catch(const CORBA::Exception& e)
107 // ignore this entry and continue
111 // ignore this entry and continue
119 // Load a component library
120 bool Engines_MPIContainer_i::load_component_Library(const char* componentName, CORBA::String_out reason)
122 reason=CORBA::string_dup("");
126 th = new pthread_t[_nbproc];
127 for(int ip=1;ip<_nbproc;ip++){
128 thread_st *st = new thread_st;
131 st->compoName = componentName;
132 pthread_create(&(th[ip]),NULL,th_loadcomponentlibrary,(void*)st);
136 bool ret = Lload_component_Library(componentName);
139 for(int ip=1;ip<_nbproc;ip++)
140 pthread_join(th[ip],NULL);
146 bool Engines_MPIContainer_i::Lload_component_Library(const char* componentName)
148 std::string aCompName = componentName;
150 // --- try dlopen C++ component
152 std::string impl_name = std::string ("lib") + aCompName + std::string("Engine.so");
154 _numInstanceMutex.lock(); // lock to be alone
155 // (see decInstanceCnt, finalize_removal))
156 if (_toRemove_map[impl_name]) _toRemove_map.erase(impl_name);
157 if (_library_map[impl_name])
159 MESSAGE("[" << _numproc << "] Library " << impl_name << " already loaded");
160 _numInstanceMutex.unlock();
165 handle = dlopen( impl_name.c_str() , RTLD_LAZY ) ;
168 _library_map[impl_name] = handle;
169 _numInstanceMutex.unlock();
170 MESSAGE("[" << _numproc << "] Library " << impl_name << " loaded");
175 MESSAGE("[" << _numproc << "] Can't load shared library : " << impl_name);
176 MESSAGE("[" << _numproc << "] error dlopen: " << dlerror());
178 _numInstanceMutex.unlock();
180 // --- try import Python component
182 INFOS("[" << _numproc << "] try import Python component "<<componentName);
183 if (_isSupervContainer)
185 INFOS("[" << _numproc << "] Supervision Container does not support Python Component Engines");
188 if (_library_map[aCompName])
190 return true; // Python Component, already imported
194 Py_ACQUIRE_NEW_THREAD;
195 PyObject *mainmod = PyImport_AddModule((char *)"__main__");
196 PyObject *globals = PyModule_GetDict(mainmod);
197 PyObject *pyCont = PyDict_GetItemString(globals, "pyCont");
198 PyObject *result = PyObject_CallMethod(pyCont,
199 (char*)"import_component",
200 (char*)"s",componentName);
201 std::string ret= PyString_AsString(result);
203 Py_RELEASE_NEW_THREAD;
205 if (ret=="") // import possible: Python component
207 _library_map[aCompName] = (void *)pyCont; // any non O value OK
208 MESSAGE("[" << _numproc << "] import Python: "<<aCompName<<" OK");
215 // Create an instance of component
216 Engines::EngineComponent_ptr
217 Engines_MPIContainer_i::create_component_instance_env( const char* componentName,
218 const Engines::FieldsDict& env,
219 CORBA::String_out reason)
221 reason=CORBA::string_dup("");
225 th = new pthread_t[_nbproc];
226 for(int ip=1;ip<_nbproc;ip++){
227 thread_st *st = new thread_st;
230 st->compoName = componentName;
231 pthread_create(&(th[ip]),NULL,th_createcomponentinstance,(void*)st);
235 Engines::EngineComponent_ptr cptr = Lcreate_component_instance(componentName);
238 for(int ip=1;ip<_nbproc;ip++)
239 pthread_join(th[ip],NULL);
246 Engines::EngineComponent_ptr
247 Engines_MPIContainer_i::Lcreate_component_instance( const char* genericRegisterName )
249 Engines::EngineComponent_var iobject = Engines::EngineComponent::_nil() ;
250 Engines::MPIObject_var pobj;
252 std::string aCompName = genericRegisterName;
253 if (_library_map[aCompName]) { // Python component
254 if (_isSupervContainer) {
255 INFOS("Supervision Container does not support Python Component Engines");
256 return Engines::EngineComponent::_nil();
258 _numInstanceMutex.lock() ; // lock on the instance number
260 int numInstance = _numInstance ;
261 _numInstanceMutex.unlock() ;
264 sprintf( aNumI , "%d" , numInstance ) ;
265 std::string instanceName = aCompName + "_inst_" + aNumI ;
266 std::string component_registerName =
267 _containerName + "/" + instanceName;
269 Py_ACQUIRE_NEW_THREAD;
270 PyObject *mainmod = PyImport_AddModule((char*)"__main__");
271 PyObject *globals = PyModule_GetDict(mainmod);
272 PyObject *pyCont = PyDict_GetItemString(globals, "pyCont");
273 PyObject *result = PyObject_CallMethod(pyCont,
274 (char*)"create_component_instance",
277 instanceName.c_str());
280 PyArg_ParseTuple(result,"ss", &ior, &error);
281 std::string iors = ior;
283 Py_RELEASE_NEW_THREAD;
285 CORBA::Object_var obj = _orb->string_to_object(iors.c_str());
286 iobject = Engines::EngineComponent::_narrow( obj ) ;
287 pobj = Engines::MPIObject::_narrow(obj) ;
289 _NS->Register(iobject, component_registerName.c_str()) ;
290 // Root recupere les ior des composants des autre process
291 BCastIOR(_orb,pobj,false);
293 return iobject._retn();
298 std::string impl_name = std::string ("lib") + genericRegisterName +std::string("Engine.so");
299 if (_library_map.count(impl_name) != 0) // C++ component
301 void* handle = _library_map[impl_name];
302 iobject = createMPIInstance(genericRegisterName,
304 return iobject._retn();
307 return Engines::EngineComponent::_nil() ;
310 Engines::EngineComponent_ptr
311 Engines_MPIContainer_i::createMPIInstance(std::string genericRegisterName,
314 Engines::EngineComponent_var iobject;
315 Engines::MPIObject_var pobj;
316 // --- find the factory
318 std::string aGenRegisterName = genericRegisterName;
319 std::string factory_name = aGenRegisterName + std::string("Engine_factory");
321 typedef PortableServer::ObjectId * (*MPIFACTORY_FUNCTION)
323 PortableServer::POA_ptr,
324 PortableServer::ObjectId *,
329 MPIFACTORY_FUNCTION MPIComponent_factory = (MPIFACTORY_FUNCTION) dlsym(handle, factory_name.c_str());
331 if ( !MPIComponent_factory )
333 INFOS( "[" << _numproc << "] Can't resolve symbol: " + factory_name );
335 pobj = Engines::MPIObject::_nil();
336 BCastIOR(_orb,pobj,false);
337 return Engines::EngineComponent::_nil();
340 // --- create instance
342 iobject = Engines::EngineComponent::_nil() ;
346 _numInstanceMutex.lock() ; // lock on the instance number
348 int numInstance = _numInstance ;
349 _numInstanceMutex.unlock() ;
352 sprintf( aNumI , "%d" , numInstance ) ;
353 std::string instanceName = aGenRegisterName + "_inst_" + aNumI ;
354 std::string component_registerName =
355 _containerName + "/" + instanceName;
357 // --- Instanciate required CORBA object
359 PortableServer::ObjectId *id ; //not owner, do not delete (nore use var)
360 id = (MPIComponent_factory) ( _orb, _poa, _id, instanceName.c_str(), aGenRegisterName.c_str() ) ;
362 // --- get reference from id
364 CORBA::Object_var obj = _poa->id_to_reference(*id);
365 iobject = Engines::EngineComponent::_narrow( obj ) ;
366 pobj = Engines::MPIObject::_narrow(obj) ;
368 _listInstances_map[instanceName] = iobject;
369 _cntInstances_map[aGenRegisterName] += 1;
371 // --- register the engine under the name
372 // containerName(.dir)/instanceName(.object)
375 _NS->Register( iobject , component_registerName.c_str() ) ;
376 MESSAGE( component_registerName.c_str() << " bound" ) ;
378 // Root recupere les ior des composants des autre process
379 BCastIOR(_orb,pobj,false);
382 catch(const std::exception &ex){
384 return Engines::EngineComponent::_nil();
386 return iobject._retn();
390 Engines::EngineComponent_ptr Engines_MPIContainer_i::load_impl(const char* nameToRegister,
391 const char* componentName)
395 th = new pthread_t[_nbproc];
396 for(int ip=1;ip<_nbproc;ip++){
397 thread_st *st = new thread_st;
400 st->nameToRegister = nameToRegister;
401 st->compoName = componentName;
402 pthread_create(&(th[ip]),NULL,th_loadimpl,(void*)st);
406 Engines::EngineComponent_ptr cptr = Lload_impl(nameToRegister,componentName);
409 for(int ip=1;ip<_nbproc;ip++)
410 pthread_join(th[ip],NULL);
418 Engines::EngineComponent_ptr Engines_MPIContainer_i::Lload_impl(
419 const char* nameToRegister,
420 const char* componentName)
422 Engines::EngineComponent_var iobject;
423 Engines::MPIObject_var pobj;
426 sprintf(cproc,"_%d",_numproc);
428 BEGIN_OF("[" << _numproc << "] MPIContainer_i::Lload_impl");
430 _numInstanceMutex.lock() ; // lock on the instance number
433 sprintf(_aNumI,"%d",_numInstance) ;
435 std::string _impl_name = componentName;
436 std::string _nameToRegister = nameToRegister;
437 std::string instanceName = _nameToRegister + "_inst_" + _aNumI + cproc;
438 MESSAGE("[" << _numproc << "] instanceName=" << instanceName);
440 std::string absolute_impl_name(_impl_name);
441 MESSAGE("[" << _numproc << "] absolute_impl_name=" << absolute_impl_name);
442 void * handle = dlopen(absolute_impl_name.c_str(), RTLD_LAZY);
444 INFOS("[" << _numproc << "] Can't load shared library : " << absolute_impl_name);
445 INFOS("[" << _numproc << "] error dlopen: " << dlerror());
446 return Engines::EngineComponent::_nil() ;
449 std::string factory_name = _nameToRegister + std::string("Engine_factory");
450 MESSAGE("[" << _numproc << "] factory_name=" << factory_name) ;
453 PortableServer::ObjectId * (*MPIComponent_factory) (CORBA::ORB_ptr,
454 PortableServer::POA_ptr,
455 PortableServer::ObjectId *,
458 (PortableServer::ObjectId * (*) (CORBA::ORB_ptr,
459 PortableServer::POA_ptr,
460 PortableServer::ObjectId *,
463 dlsym(handle, factory_name.c_str());
466 if ((error = dlerror()) != NULL){
467 // Try to load a sequential component
468 MESSAGE("[" << _numproc << "] Try to load a sequential component");
469 _numInstanceMutex.unlock() ;
470 iobject = Engines_Container_i::load_impl(nameToRegister,componentName);
471 if( CORBA::is_nil(iobject) ) return Engines::EngineComponent::_duplicate(iobject);
474 // Instanciation du composant parallele
475 MESSAGE("[" << _numproc << "] Try to load a parallel component");
476 PortableServer::ObjectId * id = (MPIComponent_factory)
477 (_orb, _poa, _id, instanceName.c_str(), _nameToRegister.c_str());
478 // get reference from id
479 CORBA::Object_var o = _poa->id_to_reference(*id);
480 pobj = Engines::MPIObject::_narrow(o) ;
481 iobject = Engines::EngineComponent::_narrow(o) ;
485 // utiliser + tard le registry ici :
486 // register the engine under the name containerName.dir/nameToRegister.object
487 std::string component_registerName = _containerName + "/" + _nameToRegister;
488 _NS->Register(iobject, component_registerName.c_str()) ;
491 _numInstanceMutex.unlock() ;
493 // Root recupere les ior des composants des autre process
494 BCastIOR(_orb,pobj,false);
496 END_OF("[" <<_numproc << "] MPIContainer_i::Lload_impl");
497 return Engines::EngineComponent::_duplicate(iobject);
501 void Engines_MPIContainer_i::remove_impl(Engines::EngineComponent_ptr component_i)
503 Engines::MPIObject_ptr pcptr;
504 Engines::MPIObject_ptr spcptr;
508 pcptr = (Engines::MPIObject_ptr)component_i;
509 th = new pthread_t[_nbproc];
510 for(int ip=1;ip<_nbproc;ip++){
511 thread_st *st = new thread_st;
514 spcptr = Engines::MPIObject::_narrow((*(pcptr->tior()))[ip]);
515 st->cptr = (Engines::EngineComponent_ptr)spcptr;
516 pthread_create(&(th[ip]),NULL,th_removeimpl,(void*)st);
520 ASSERT(! CORBA::is_nil(component_i));
521 std::string instanceName = component_i->instanceName() ;
522 MESSAGE("[" << _numproc << "] unload component " << instanceName);
523 _numInstanceMutex.lock() ; // lock on the remove on handle_map
524 _listInstances_map.erase(instanceName);
525 _numInstanceMutex.unlock() ;
526 component_i->destroy() ;
528 _NS->Destroy_Name(instanceName.c_str());
531 for(int ip=1;ip<_nbproc;ip++)
532 pthread_join(th[ip],NULL);
538 void Engines_MPIContainer_i::finalize_removal()
542 th = new pthread_t[_nbproc];
543 for(int ip=1;ip<_nbproc;ip++){
544 thread_st *st = new thread_st;
547 pthread_create(&(th[ip]),NULL,th_finalizeremoval,(void*)st);
551 _numInstanceMutex.lock(); // lock to be alone
552 // (see decInstanceCnt, load_component_Library)
553 std::map<std::string, void *>::iterator ith;
554 for (ith = _toRemove_map.begin(); ith != _toRemove_map.end(); ith++)
556 void *handle = (*ith).second;
557 std::string impl_name= (*ith).first;
562 // dlclose(handle); // SALOME unstable after ...
563 // _library_map.erase(impl_name);
566 _toRemove_map.clear();
567 _numInstanceMutex.unlock();
570 for(int ip=1;ip<_nbproc;ip++)
571 pthread_join(th[ip],NULL);
576 void *th_loadcomponentlibrary(void *s)
578 thread_st *st = (thread_st*)s;
580 (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->load_component_Library(st->compoName.c_str(),reason);
581 CORBA::string_free(reason);
585 void *th_createcomponentinstance(void *s)
587 thread_st *st = (thread_st*)s;
588 (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->create_component_instance(st->compoName.c_str());
592 void *th_loadimpl(void *s)
594 thread_st *st = (thread_st*)s;
595 (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->load_impl(st->nameToRegister.c_str(),st->compoName.c_str());
599 void *th_removeimpl(void *s)
601 thread_st *st = (thread_st*)s;
602 (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->remove_impl(st->cptr);
606 void *th_finalizeremoval(void *s)
608 thread_st *st = (thread_st*)s;
609 (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->finalize_removal();