1 // Copyright (C) 2007-2008 CEA/DEN, EDF R&D, OPEN CASCADE
3 // Copyright (C) 2003-2007 OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 // CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
6 // This library is free software; you can redistribute it and/or
7 // modify it under the terms of the GNU Lesser General Public
8 // License as published by the Free Software Foundation; either
9 // version 2.1 of the License.
11 // This library is distributed in the hope that it will be useful,
12 // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 // Lesser General Public License for more details.
16 // You should have received a copy of the GNU Lesser General Public
17 // License along with this library; if not, write to the Free Software
18 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
20 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
22 // SALOME MPIContainer : implemenation of container based on MPI libraries
23 // File : MPIContainer_i.cxx
29 #include "Basics_Utils.hxx"
30 #include "SALOME_Component_i.hxx"
31 #include "MPIContainer_i.hxx"
32 #include "SALOME_NamingService.hxx"
33 #include "Utils_SINGLETON.hxx"
35 #include "utilities.h"
38 #include <pthread.h> // must be before Python.h !
40 #include "Container_init_python.hxx"
43 // L'appel au registry SALOME ne se fait que pour le process 0
44 Engines_MPIContainer_i::Engines_MPIContainer_i(int nbproc, int numproc,
46 PortableServer::POA_ptr poa,
48 int argc, char *argv[])
49 : Engines_Container_i(orb,poa,containerName,argc,argv,false), MPIObject_i(nbproc,numproc)
52 _id = _poa->activate_object(this);
53 CORBA::Object_var obj=_poa->id_to_reference(*_id);
54 Engines::Container_var pCont = Engines::Container::_narrow(obj);
59 _NS = new SALOME_NamingService();
60 _NS->init_orb( CORBA::ORB::_duplicate(_orb) ) ;
62 string hostname = Kernel_Utils::GetHostname();
63 _containerName = _NS->BuildContainerNameForNS(containerName,hostname.c_str());
64 SCRUTE(_containerName);
65 _NS->Register(pCont, _containerName.c_str());
69 // Root recupere les ior des container des autre process
70 Engines::MPIObject_var pobj = POA_Engines::MPIContainer::_this();
71 BCastIOR(_orb,pobj,true);
74 Engines_MPIContainer_i::Engines_MPIContainer_i(int nbproc, int numproc)
75 : Engines_Container_i(), MPIObject_i(nbproc,numproc)
79 Engines_MPIContainer_i::~Engines_MPIContainer_i(void)
81 MESSAGE("[" << _numproc << "] Engines_MPIContainer_i::~Engines_MPIContainer_i()");
85 void Engines_MPIContainer_i::Shutdown()
88 MESSAGE("[" << _numproc << "] shutdown of MPI Corba Server");
90 _NS->Destroy_FullDirectory(_containerName.c_str());
91 for(ip= 1;ip<_nbproc;ip++)
92 (Engines::MPIContainer::_narrow((*_tior)[ip]))->Shutdown();
95 std::map<std::string, Engines::Component_var>::iterator itm;
96 for (itm = _listInstances_map.begin(); itm != _listInstances_map.end(); itm++)
100 itm->second->destroy();
102 catch(const CORBA::Exception& e)
104 // ignore this entry and continue
108 // ignore this entry and continue
116 // Load a component library
117 bool Engines_MPIContainer_i::load_component_Library(const char* componentName)
120 // Invocation du chargement du composant dans les autres process
121 for(int ip= 1;ip<_nbproc;ip++)
122 (Engines::MPIContainer::_narrow((*_tior)[ip]))->Asload_component_Library(componentName);
125 return Lload_component_Library(componentName);
128 void Engines_MPIContainer_i::Asload_component_Library(const char* componentName)
130 Lload_component_Library(componentName);
133 bool Engines_MPIContainer_i::Lload_component_Library(const char* componentName)
135 string aCompName = componentName;
137 // --- try dlopen C++ component
139 string impl_name = string ("lib") + aCompName + string("Engine.so");
141 _numInstanceMutex.lock(); // lock to be alone
142 // (see decInstanceCnt, finalize_removal))
143 if (_toRemove_map[impl_name]) _toRemove_map.erase(impl_name);
144 if (_library_map[impl_name])
146 MESSAGE("[" << _numproc << "] Library " << impl_name << " already loaded");
147 _numInstanceMutex.unlock();
152 handle = dlopen( impl_name.c_str() , RTLD_LAZY ) ;
155 _library_map[impl_name] = handle;
156 _numInstanceMutex.unlock();
157 MESSAGE("[" << _numproc << "] Library " << impl_name << " loaded");
158 MPI_Barrier(MPI_COMM_WORLD);
163 MESSAGE("[" << _numproc << "] Can't load shared library : " << impl_name);
164 MESSAGE("[" << _numproc << "] error dlopen: " << dlerror());
165 MPI_Barrier(MPI_COMM_WORLD);
167 _numInstanceMutex.unlock();
169 // --- try import Python component
171 INFOS("[" << _numproc << "] try import Python component "<<componentName);
172 if (_isSupervContainer)
174 INFOS("[" << _numproc << "] Supervision Container does not support Python Component Engines");
177 if (_library_map[aCompName])
179 return true; // Python Component, already imported
183 Py_ACQUIRE_NEW_THREAD;
184 PyObject *mainmod = PyImport_AddModule("__main__");
185 PyObject *globals = PyModule_GetDict(mainmod);
186 PyObject *pyCont = PyDict_GetItemString(globals, "pyCont");
187 PyObject *result = PyObject_CallMethod(pyCont,
190 int ret= PyInt_AsLong(result);
192 Py_RELEASE_NEW_THREAD;
194 if (ret) // import possible: Python component
196 _library_map[aCompName] = (void *)pyCont; // any non O value OK
197 MESSAGE("[" << _numproc << "] import Python: "<<aCompName<<" OK");
204 // Create an instance of component
205 Engines::Component_ptr
206 Engines_MPIContainer_i::create_component_instance( const char* componentName,
210 // Invocation du chargement du composant dans les autres process
211 for(int ip= 1;ip<_nbproc;ip++)
212 (Engines::MPIContainer::_narrow((*_tior)[ip]))->Ascreate_component_instance(componentName,studyId);
215 return Lcreate_component_instance(componentName,studyId);
218 void Engines_MPIContainer_i::Ascreate_component_instance( const char* componentName,
221 Lcreate_component_instance(componentName,studyId);
224 Engines::Component_ptr
225 Engines_MPIContainer_i::Lcreate_component_instance( const char* genericRegisterName, CORBA::Long studyId)
228 INFOS("studyId must be > 0 for mono study instance, =0 for multiStudy");
229 return Engines::Component::_nil() ;
232 Engines::Component_var iobject = Engines::Component::_nil() ;
233 Engines::MPIObject_var pobj;
235 string aCompName = genericRegisterName;
236 if (_library_map[aCompName]) { // Python component
237 if (_isSupervContainer) {
238 INFOS("Supervision Container does not support Python Component Engines");
239 return Engines::Component::_nil();
241 _numInstanceMutex.lock() ; // lock on the instance number
243 int numInstance = _numInstance ;
244 _numInstanceMutex.unlock() ;
247 sprintf( aNumI , "%d" , numInstance ) ;
248 string instanceName = aCompName + "_inst_" + aNumI ;
249 string component_registerName =
250 _containerName + "/" + instanceName;
252 Py_ACQUIRE_NEW_THREAD;
253 PyObject *mainmod = PyImport_AddModule("__main__");
254 PyObject *globals = PyModule_GetDict(mainmod);
255 PyObject *pyCont = PyDict_GetItemString(globals, "pyCont");
256 PyObject *result = PyObject_CallMethod(pyCont,
257 "create_component_instance",
260 instanceName.c_str(),
262 string iors = PyString_AsString(result);
264 Py_RELEASE_NEW_THREAD;
266 CORBA::Object_var obj = _orb->string_to_object(iors.c_str());
267 iobject = Engines::Component::_narrow( obj ) ;
268 pobj = Engines::MPIObject::_narrow(obj) ;
270 _NS->Register(iobject, component_registerName.c_str()) ;
271 // Root recupere les ior des composants des autre process
272 BCastIOR(_orb,pobj,false);
274 return iobject._retn();
279 string impl_name = string ("lib") + genericRegisterName +string("Engine.so");
280 if (_library_map.count(impl_name) != 0) // C++ component
282 void* handle = _library_map[impl_name];
283 iobject = createMPIInstance(genericRegisterName,
286 return iobject._retn();
291 Engines::Component_ptr
292 Engines_MPIContainer_i::createMPIInstance(string genericRegisterName,
296 Engines::Component_var iobject;
297 Engines::MPIObject_var pobj;
298 // --- find the factory
300 string aGenRegisterName = genericRegisterName;
301 string factory_name = aGenRegisterName + string("Engine_factory");
303 typedef PortableServer::ObjectId * (*MPIFACTORY_FUNCTION)
306 PortableServer::POA_ptr,
307 PortableServer::ObjectId *,
312 MPIFACTORY_FUNCTION MPIComponent_factory = (MPIFACTORY_FUNCTION) dlsym(handle, factory_name.c_str());
314 if ( !MPIComponent_factory )
316 INFOS( "[" << _numproc << "] Can't resolve symbol: " + factory_name );
318 pobj = Engines::MPIObject::_nil();
319 BCastIOR(_orb,pobj,false);
320 return Engines::Component::_nil();
323 // --- create instance
325 iobject = Engines::Component::_nil() ;
329 _numInstanceMutex.lock() ; // lock on the instance number
331 int numInstance = _numInstance ;
332 _numInstanceMutex.unlock() ;
335 sprintf( aNumI , "%d" , numInstance ) ;
336 string instanceName = aGenRegisterName + "_inst_" + aNumI ;
337 string component_registerName =
338 _containerName + "/" + instanceName;
340 // --- Instanciate required CORBA object
342 PortableServer::ObjectId *id ; //not owner, do not delete (nore use var)
343 id = (MPIComponent_factory) ( _nbproc,_numproc,_orb, _poa, _id, instanceName.c_str(),
344 aGenRegisterName.c_str() ) ;
346 // --- get reference & servant from id
348 CORBA::Object_var obj = _poa->id_to_reference(*id);
349 iobject = Engines::Component::_narrow( obj ) ;
350 pobj = Engines::MPIObject::_narrow(obj) ;
352 Engines_Component_i *servant =
353 dynamic_cast<Engines_Component_i*>(_poa->reference_to_servant(iobject));
355 //SCRUTE(servant->pd_refCount);
356 servant->_remove_ref(); // compensate previous id_to_reference
357 //SCRUTE(servant->pd_refCount);
358 _listInstances_map[instanceName] = iobject;
359 _cntInstances_map[aGenRegisterName] += 1;
360 //SCRUTE(servant->pd_refCount);
361 bool ret_studyId = servant->setStudyId(studyId);
364 // --- register the engine under the name
365 // containerName(.dir)/instanceName(.object)
368 _NS->Register( iobject , component_registerName.c_str() ) ;
369 MESSAGE( component_registerName.c_str() << " bound" ) ;
371 // Root recupere les ior des composants des autre process
372 BCastIOR(_orb,pobj,false);
375 catch(const POException &ex){
376 INFOS( ex.msg << " on process number " << ex.numproc ) ;
377 return Engines::Component::_nil();
380 INFOS( "Container_i::createInstance exception catched" ) ;
381 return Engines::Component::_nil();
383 return iobject._retn();
387 Engines::Component_ptr Engines_MPIContainer_i::load_impl(const char* nameToRegister,
388 const char* componentName)
391 // Invocation du chargement du composant dans les autres process
392 for(int ip= 1;ip<_nbproc;ip++)
393 (Engines::MPIContainer::_narrow((*_tior)[ip]))->Asload_impl(nameToRegister,
397 return Lload_impl(nameToRegister,componentName);
402 void Engines_MPIContainer_i::Asload_impl(const char* nameToRegister,
403 const char* componentName)
405 Lload_impl(nameToRegister,componentName);
408 Engines::Component_ptr Engines_MPIContainer_i::Lload_impl(
409 const char* nameToRegister,
410 const char* componentName)
412 Engines::Component_var iobject;
413 Engines::MPIObject_var pobj;
416 sprintf(cproc,"_%d",_numproc);
418 BEGIN_OF("[" << _numproc << "] MPIContainer_i::Lload_impl");
420 _numInstanceMutex.lock() ; // lock on the instance number
423 sprintf(_aNumI,"%d",_numInstance) ;
425 string _impl_name = componentName;
426 string _nameToRegister = nameToRegister;
427 string instanceName = _nameToRegister + "_inst_" + _aNumI + cproc;
428 MESSAGE("[" << _numproc << "] instanceName=" << instanceName);
430 string absolute_impl_name(_impl_name);
431 MESSAGE("[" << _numproc << "] absolute_impl_name=" << absolute_impl_name);
432 void * handle = dlopen(absolute_impl_name.c_str(), RTLD_LAZY);
434 INFOS("[" << _numproc << "] Can't load shared library : " << absolute_impl_name);
435 INFOS("[" << _numproc << "] error dlopen: " << dlerror());
436 return Engines::Component::_nil() ;
439 string factory_name = _nameToRegister + string("Engine_factory");
440 MESSAGE("[" << _numproc << "] factory_name=" << factory_name) ;
443 PortableServer::ObjectId * (*MPIComponent_factory) (int,int,
445 PortableServer::POA_ptr,
446 PortableServer::ObjectId *,
449 (PortableServer::ObjectId * (*) (int,int,
451 PortableServer::POA_ptr,
452 PortableServer::ObjectId *,
455 dlsym(handle, factory_name.c_str());
458 if ((error = dlerror()) != NULL){
459 // Try to load a sequential component
460 MESSAGE("[" << _numproc << "] Try to load a sequential component");
461 _numInstanceMutex.unlock() ;
462 iobject = Engines_Container_i::load_impl(nameToRegister,componentName);
463 if( CORBA::is_nil(iobject) ) return Engines::Component::_duplicate(iobject);
466 // Instanciation du composant parallele
467 MESSAGE("[" << _numproc << "] Try to load a parallel component");
468 PortableServer::ObjectId * id = (MPIComponent_factory)
469 (_nbproc,_numproc,_orb, _poa, _id, instanceName.c_str(), _nameToRegister.c_str());
470 // get reference from id
471 CORBA::Object_var o = _poa->id_to_reference(*id);
472 pobj = Engines::MPIObject::_narrow(o) ;
473 iobject = Engines::Component::_narrow(o) ;
477 // utiliser + tard le registry ici :
478 // register the engine under the name containerName.dir/nameToRegister.object
479 string component_registerName = _containerName + "/" + _nameToRegister;
480 _NS->Register(iobject, component_registerName.c_str()) ;
483 _numInstanceMutex.unlock() ;
485 // Root recupere les ior des composants des autre process
486 BCastIOR(_orb,pobj,false);
488 END_OF("[" <<_numproc << "] MPIContainer_i::Lload_impl");
489 return Engines::Component::_duplicate(iobject);
493 void Engines_MPIContainer_i::remove_impl(Engines::Component_ptr component_i)
496 Engines::Component_ptr cptr;
497 Engines::MPIObject_ptr pcptr;
498 Engines::MPIObject_ptr spcptr;
500 ASSERT(! CORBA::is_nil(component_i));
503 // Invocation de la destruction du composant dans les autres process
504 pcptr = (Engines::MPIObject_ptr)component_i;
505 for(ip= 1;ip<_nbproc;ip++){
506 spcptr = Engines::MPIObject::_narrow((*(pcptr->tior()))[ip]);
507 cptr = (Engines::Component_ptr)spcptr;
508 (Engines::MPIContainer::_narrow((*_tior)[ip]))->Asremove_impl(cptr);
512 Lremove_impl(component_i);
515 void Engines_MPIContainer_i::Asremove_impl(Engines::Component_ptr component_i)
517 Lremove_impl(component_i);
520 void Engines_MPIContainer_i::Lremove_impl(Engines::Component_ptr component_i)
522 BEGIN_OF("[" << _numproc << "] MPIContainer_i::Lremove_impl");
524 ASSERT(! CORBA::is_nil(component_i));
526 string instanceName = component_i->instanceName() ;
527 MESSAGE("[" << _numproc << "] unload component " << instanceName);
528 component_i->destroy() ;
529 MESSAGE("[" << _numproc << "] test key handle_map");
530 _numInstanceMutex.lock() ; // lock on the remove on handle_map
531 _numInstanceMutex.unlock() ;
532 MESSAGE("[" << _numproc << "] list handle_map");
534 END_OF("[" << _numproc << "] MPIContainer_i::Lremove_impl");
538 void Engines_MPIContainer_i::finalize_removal()
543 // Invocation de la destruction du composant dans les autres process
544 for(ip= 1;ip<_nbproc;ip++)
545 (Engines::MPIContainer::_narrow((*_tior)[ip]))->Asfinalize_removal();
551 void Engines_MPIContainer_i::Asfinalize_removal()
556 void Engines_MPIContainer_i::Lfinalize_removal()
558 BEGIN_OF("[" << _numproc << "] MPIContainer_i::Lfinalize_removal");
560 // map<string, void *>::iterator im ;
561 // // lock on the explore remove_map & dlclose
562 // _numInstanceMutex.lock() ;
563 // for (im = remove_map.begin() ; im != remove_map.end() ; im ++)
565 // void * handle = (*im).second ;
566 // MESSAGE("[" << _numproc << "] dlclose " << (*im).first);
569 // MESSAGE("[" << _numproc << "] remove_map.clear()");
570 // remove_map.clear() ;
571 // _numInstanceMutex.unlock() ;
573 END_OF("[" << _numproc << "] MPIContainer_i::Lfinalize_removal");