1 // SALOME MPIContainer : implemenation of container based on MPI libraries
3 // Copyright (C) 2003 OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 // CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
6 // This library is free software; you can redistribute it and/or
7 // modify it under the terms of the GNU Lesser General Public
8 // License as published by the Free Software Foundation; either
9 // version 2.1 of the License.
11 // This library is distributed in the hope that it will be useful,
12 // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 // Lesser General Public License for more details.
16 // You should have received a copy of the GNU Lesser General Public
17 // License along with this library; if not, write to the Free Software
18 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
20 // See http://www.opencascade.org/SALOME/ or email : webmaster.salome@opencascade.org
24 // File : MPIContainer_i.cxx
30 #include "SALOME_Component_i.hxx"
31 #include "MPIContainer_i.hxx"
32 #include "SALOME_NamingService.hxx"
33 #include "Utils_SINGLETON.hxx"
35 #include "utilities.h"
36 #include <pthread.h> // must be before Python.h !
38 #include "Container_init_python.hxx"
41 // L'appel au registry SALOME ne se fait que pour le process 0
42 Engines_MPIContainer_i::Engines_MPIContainer_i(int nbproc, int numproc,
44 PortableServer::POA_ptr poa,
46 int argc, char *argv[])
47 : Engines_Container_i(orb,poa,containerName,argc,argv,false), MPIObject_i(nbproc,numproc)
50 string IdContainerinNS;
51 char idc[3*sizeof(long)];
53 MESSAGE("[" << numproc << "] activate object");
54 _id = _poa->activate_object(this);
58 for(int i=0;i<argc;i++)
60 if(strcmp(argv[i],"-id")==NULL)
71 _NS = new SALOME_NamingService();
72 _NS->init_orb( CORBA::ORB::_duplicate(_orb) ) ;
74 CORBA::Object_var obj=_poa->id_to_reference(*_id);
75 Engines::Container_var pCont = Engines::Container::_narrow(obj);
77 string hostname = GetHostname();
78 _containerName = _NS->BuildContainerNameForNS(containerName,hostname.c_str());
79 SCRUTE(_containerName);
80 _NS->Register(pCont, _containerName.c_str());
82 // A parallel container registers in Naming Service
83 // on the machine where is process 0. ContainerManager does'nt know the name
84 // of this machine before the launch of the parallel container. So to get
85 // the IOR of the parallel container in Naming Service, ContainerManager
86 // gives a unique Id. The parallel container registers his name under
87 // /ContainerManager/Id directory in NamingService
89 IdContainerinNS = "/ContainerManager/id";
90 sprintf(idc,"%ld",id);
91 IdContainerinNS += idc;
92 SCRUTE(IdContainerinNS);
93 _NS->Register(pCont, IdContainerinNS.c_str());
97 // Root recupere les ior des container des autre process
98 Engines::MPIObject_var pobj = POA_Engines::MPIContainer::_this();
99 BCastIOR(_orb,pobj,true);
102 Engines_MPIContainer_i::Engines_MPIContainer_i(int nbproc, int numproc)
103 : Engines_Container_i(), MPIObject_i(nbproc,numproc)
107 Engines_MPIContainer_i::~Engines_MPIContainer_i(void)
109 MESSAGE("[" << _numproc << "] Engines_MPIContainer_i::~Engines_MPIContainer_i()");
113 void Engines_MPIContainer_i::Shutdown()
116 MESSAGE("[" << _numproc << "] shutdown of MPI Corba Server");
118 _NS->Destroy_FullDirectory(_containerName.c_str());
119 for(ip= 1;ip<_nbproc;ip++)
120 (Engines::MPIContainer::_narrow((*_tior)[ip]))->Shutdown();
126 // Load a component library
127 bool Engines_MPIContainer_i::load_component_Library(const char* componentName)
130 // Invocation du chargement du composant dans les autres process
131 for(int ip= 1;ip<_nbproc;ip++)
132 (Engines::MPIContainer::_narrow((*_tior)[ip]))->Asload_component_Library(componentName);
135 return Lload_component_Library(componentName);
138 void Engines_MPIContainer_i::Asload_component_Library(const char* componentName)
140 Lload_component_Library(componentName);
143 bool Engines_MPIContainer_i::Lload_component_Library(const char* componentName)
145 string aCompName = componentName;
147 // --- try dlopen C++ component
149 string impl_name = string ("lib") + aCompName + string("Engine.so");
152 _numInstanceMutex.lock(); // lock to be alone
153 // (see decInstanceCnt, finalize_removal))
154 if (_toRemove_map[impl_name]) _toRemove_map.erase(impl_name);
155 if (_library_map[impl_name])
157 MESSAGE("[" << _numproc << "] Library " << impl_name << " already loaded");
158 _numInstanceMutex.unlock();
163 handle = dlopen( impl_name.c_str() , RTLD_LAZY ) ;
166 _library_map[impl_name] = handle;
167 _numInstanceMutex.unlock();
172 INFOS("[" << _numproc << "] Can't load shared library : " << impl_name);
173 INFOS("[" << _numproc << "] error dlopen: " << dlerror());
175 _numInstanceMutex.unlock();
177 // --- try import Python component
179 INFOS("[" << _numproc << "] try import Python component "<<componentName);
180 if (_isSupervContainer)
182 INFOS("[" << _numproc << "] Supervision Container does not support Python Component Engines");
185 if (_library_map[aCompName])
187 return true; // Python Component, already imported
191 Py_ACQUIRE_NEW_THREAD;
192 PyObject *mainmod = PyImport_AddModule("__main__");
193 PyObject *globals = PyModule_GetDict(mainmod);
194 PyObject *pyCont = PyDict_GetItemString(globals, "pyCont");
195 PyObject *result = PyObject_CallMethod(pyCont,
198 int ret= PyInt_AsLong(result);
200 Py_RELEASE_NEW_THREAD;
202 if (ret) // import possible: Python component
204 _library_map[aCompName] = (void *)pyCont; // any non O value OK
205 MESSAGE("[" << _numproc << "] import Python: "<<aCompName<<" OK");
212 // Create an instance of component
213 Engines::Component_ptr
214 Engines_MPIContainer_i::create_component_instance( const char* componentName,
218 // Invocation du chargement du composant dans les autres process
219 for(int ip= 1;ip<_nbproc;ip++)
220 (Engines::MPIContainer::_narrow((*_tior)[ip]))->Ascreate_component_instance(componentName,studyId);
223 return Lcreate_component_instance(componentName,studyId);
226 void Engines_MPIContainer_i::Ascreate_component_instance( const char* componentName,
229 Lcreate_component_instance(componentName,studyId);
232 Engines::Component_ptr
233 Engines_MPIContainer_i::Lcreate_component_instance( const char* genericRegisterName, CORBA::Long studyId)
236 INFOS("studyId must be > 0 for mono study instance, =0 for multiStudy");
237 return Engines::Component::_nil() ;
240 Engines::Component_var iobject = Engines::Component::_nil() ;
241 Engines::MPIObject_var pobj;
243 string aCompName = genericRegisterName;
244 if (_library_map[aCompName]) { // Python component
245 if (_isSupervContainer) {
246 INFOS("Supervision Container does not support Python Component Engines");
247 return Engines::Component::_nil();
249 _numInstanceMutex.lock() ; // lock on the instance number
251 int numInstance = _numInstance ;
252 _numInstanceMutex.unlock() ;
255 sprintf( aNumI , "%d" , numInstance ) ;
256 string instanceName = aCompName + "_inst_" + aNumI ;
257 string component_registerName =
258 _containerName + "/" + instanceName;
260 Py_ACQUIRE_NEW_THREAD;
261 PyObject *mainmod = PyImport_AddModule("__main__");
262 PyObject *globals = PyModule_GetDict(mainmod);
263 PyObject *pyCont = PyDict_GetItemString(globals, "pyCont");
264 PyObject *result = PyObject_CallMethod(pyCont,
265 "create_component_instance",
268 instanceName.c_str(),
270 string iors = PyString_AsString(result);
272 Py_RELEASE_NEW_THREAD;
274 CORBA::Object_var obj = _orb->string_to_object(iors.c_str());
275 iobject = Engines::Component::_narrow( obj ) ;
276 pobj = Engines::MPIObject::_narrow(obj) ;
278 _NS->Register(iobject, component_registerName.c_str()) ;
279 // Root recupere les ior des composants des autre process
280 BCastIOR(_orb,pobj,false);
282 return iobject._retn();
287 string impl_name = string ("lib") + genericRegisterName +string("Engine.so");
288 void* handle = _library_map[impl_name];
290 INFOS("shared library " << impl_name <<"must be loaded before instance");
291 return Engines::Component::_nil() ;
294 iobject = createMPIInstance(genericRegisterName,
297 return iobject._retn();
301 Engines::Component_ptr
302 Engines_MPIContainer_i::createMPIInstance(string genericRegisterName,
306 Engines::Component_var iobject;
307 Engines::MPIObject_var pobj;
308 // --- find the factory
310 string aGenRegisterName = genericRegisterName;
311 string factory_name = aGenRegisterName + string("Engine_factory");
312 SCRUTE(factory_name) ;
314 typedef PortableServer::ObjectId * (*MPIFACTORY_FUNCTION)
317 PortableServer::POA_ptr,
318 PortableServer::ObjectId *,
322 MPIFACTORY_FUNCTION MPIComponent_factory
323 = (MPIFACTORY_FUNCTION) dlsym(handle, factory_name.c_str());
326 if ( (error = dlerror() ) != NULL) {
327 // Try to load a sequential component
328 MESSAGE("[" << _numproc << "] Try to load a sequential component");
329 _numInstanceMutex.unlock() ;
330 iobject = Engines_Container_i::createInstance(genericRegisterName,handle,studyId);
331 if( CORBA::is_nil(iobject) ) return Engines::Component::_duplicate(iobject);
334 // --- create instance
336 iobject = Engines::Component::_nil() ;
340 _numInstanceMutex.lock() ; // lock on the instance number
342 int numInstance = _numInstance ;
343 _numInstanceMutex.unlock() ;
346 sprintf( aNumI , "%d" , numInstance ) ;
347 string instanceName = aGenRegisterName + "_inst_" + aNumI ;
348 string component_registerName =
349 _containerName + "/" + instanceName;
351 // --- Instanciate required CORBA object
353 PortableServer::ObjectId *id ; //not owner, do not delete (nore use var)
354 id = (MPIComponent_factory) ( _nbproc,_numproc,_orb, _poa, _id, instanceName.c_str(),
355 aGenRegisterName.c_str() ) ;
357 // --- get reference & servant from id
359 CORBA::Object_var obj = _poa->id_to_reference(*id);
360 iobject = Engines::Component::_narrow( obj ) ;
361 pobj = Engines::MPIObject::_narrow(obj) ;
363 Engines_Component_i *servant =
364 dynamic_cast<Engines_Component_i*>(_poa->reference_to_servant(iobject));
366 //SCRUTE(servant->pd_refCount);
367 servant->_remove_ref(); // compensate previous id_to_reference
368 //SCRUTE(servant->pd_refCount);
369 _listInstances_map[instanceName] = iobject;
370 _cntInstances_map[aGenRegisterName] += 1;
371 SCRUTE(aGenRegisterName);
372 SCRUTE(_cntInstances_map[aGenRegisterName]);
373 //SCRUTE(servant->pd_refCount);
374 bool ret_studyId = servant->setStudyId(studyId);
377 // --- register the engine under the name
378 // containerName(.dir)/instanceName(.object)
381 _NS->Register( iobject , component_registerName.c_str() ) ;
382 MESSAGE( component_registerName.c_str() << " bound" ) ;
384 // Root recupere les ior des composants des autre process
385 BCastIOR(_orb,pobj,false);
390 INFOS( "Container_i::createInstance exception catched" ) ;
392 return iobject._retn();
396 Engines::Component_ptr Engines_MPIContainer_i::load_impl(const char* nameToRegister,
397 const char* componentName)
400 // Invocation du chargement du composant dans les autres process
401 for(int ip= 1;ip<_nbproc;ip++)
402 (Engines::MPIContainer::_narrow((*_tior)[ip]))->Asload_impl(nameToRegister,
406 return Lload_impl(nameToRegister,componentName);
411 void Engines_MPIContainer_i::Asload_impl(const char* nameToRegister,
412 const char* componentName)
414 Lload_impl(nameToRegister,componentName);
417 Engines::Component_ptr Engines_MPIContainer_i::Lload_impl(
418 const char* nameToRegister,
419 const char* componentName)
421 Engines::Component_var iobject;
422 Engines::MPIObject_var pobj;
425 sprintf(cproc,"_%d",_numproc);
427 BEGIN_OF("[" << _numproc << "] MPIContainer_i::Lload_impl");
429 _numInstanceMutex.lock() ; // lock on the instance number
432 sprintf(_aNumI,"%d",_numInstance) ;
434 string _impl_name = componentName;
435 string _nameToRegister = nameToRegister;
436 string instanceName = _nameToRegister + "_inst_" + _aNumI + cproc;
437 MESSAGE("[" << _numproc << "] instanceName=" << instanceName);
439 string absolute_impl_name(_impl_name);
440 MESSAGE("[" << _numproc << "] absolute_impl_name=" << absolute_impl_name);
441 void * handle = dlopen(absolute_impl_name.c_str(), RTLD_LAZY);
443 INFOS("[" << _numproc << "] Can't load shared library : " << absolute_impl_name);
444 INFOS("[" << _numproc << "] error dlopen: " << dlerror());
445 return Engines::Component::_nil() ;
448 string factory_name = _nameToRegister + string("Engine_factory");
449 MESSAGE("[" << _numproc << "] factory_name=" << factory_name) ;
451 PortableServer::ObjectId * (*MPIComponent_factory) (int,int,
453 PortableServer::POA_ptr,
454 PortableServer::ObjectId *,
457 (PortableServer::ObjectId * (*) (int,int,
459 PortableServer::POA_ptr,
460 PortableServer::ObjectId *,
463 dlsym(handle, factory_name.c_str());
466 if ((error = dlerror()) != NULL){
467 // Try to load a sequential component
468 MESSAGE("[" << _numproc << "] Try to load a sequential component");
469 _numInstanceMutex.unlock() ;
470 iobject = Engines_Container_i::load_impl(nameToRegister,componentName);
471 if( CORBA::is_nil(iobject) ) return Engines::Component::_duplicate(iobject);
474 // Instanciation du composant parallele
475 MESSAGE("[" << _numproc << "] Try to load a parallel component");
476 PortableServer::ObjectId * id = (MPIComponent_factory)
477 (_nbproc,_numproc,_orb, _poa, _id, instanceName.c_str(), _nameToRegister.c_str());
478 // get reference from id
479 CORBA::Object_var o = _poa->id_to_reference(*id);
480 pobj = Engines::MPIObject::_narrow(o) ;
481 iobject = Engines::Component::_narrow(o) ;
485 // utiliser + tard le registry ici :
486 // register the engine under the name containerName.dir/nameToRegister.object
487 string component_registerName = _containerName + "/" + _nameToRegister;
488 _NS->Register(iobject, component_registerName.c_str()) ;
491 _numInstanceMutex.unlock() ;
493 // Root recupere les ior des composants des autre process
494 BCastIOR(_orb,pobj,false);
496 END_OF("[" <<_numproc << "] MPIContainer_i::Lload_impl");
497 return Engines::Component::_duplicate(iobject);
501 void Engines_MPIContainer_i::remove_impl(Engines::Component_ptr component_i)
504 Engines::Component_ptr cptr;
505 Engines::MPIObject_ptr pcptr;
506 Engines::MPIObject_ptr spcptr;
508 ASSERT(! CORBA::is_nil(component_i));
511 // Invocation de la destruction du composant dans les autres process
512 pcptr = (Engines::MPIObject_ptr)component_i;
513 for(ip= 1;ip<_nbproc;ip++){
514 spcptr = Engines::MPIObject::_narrow((*(pcptr->tior()))[ip]);
515 cptr = (Engines::Component_ptr)spcptr;
516 (Engines::MPIContainer::_narrow((*_tior)[ip]))->Asremove_impl(cptr);
520 Lremove_impl(component_i);
523 void Engines_MPIContainer_i::Asremove_impl(Engines::Component_ptr component_i)
525 Lremove_impl(component_i);
528 void Engines_MPIContainer_i::Lremove_impl(Engines::Component_ptr component_i)
530 BEGIN_OF("[" << _numproc << "] MPIContainer_i::Lremove_impl");
532 ASSERT(! CORBA::is_nil(component_i));
534 string instanceName = component_i->instanceName() ;
535 MESSAGE("[" << _numproc << "] unload component " << instanceName);
536 component_i->destroy() ;
537 MESSAGE("[" << _numproc << "] test key handle_map");
538 _numInstanceMutex.lock() ; // lock on the remove on handle_map
539 _numInstanceMutex.unlock() ;
540 MESSAGE("[" << _numproc << "] list handle_map");
542 END_OF("[" << _numproc << "] MPIContainer_i::Lremove_impl");
546 void Engines_MPIContainer_i::finalize_removal()
551 // Invocation de la destruction du composant dans les autres process
552 for(ip= 1;ip<_nbproc;ip++)
553 (Engines::MPIContainer::_narrow((*_tior)[ip]))->Asfinalize_removal();
559 void Engines_MPIContainer_i::Asfinalize_removal()
564 void Engines_MPIContainer_i::Lfinalize_removal()
566 BEGIN_OF("[" << _numproc << "] MPIContainer_i::Lfinalize_removal");
568 // map<string, void *>::iterator im ;
569 // // lock on the explore remove_map & dlclose
570 // _numInstanceMutex.lock() ;
571 // for (im = remove_map.begin() ; im != remove_map.end() ; im ++)
573 // void * handle = (*im).second ;
574 // MESSAGE("[" << _numproc << "] dlclose " << (*im).first);
577 // MESSAGE("[" << _numproc << "] remove_map.clear()");
578 // remove_map.clear() ;
579 // _numInstanceMutex.unlock() ;
581 END_OF("[" << _numproc << "] MPIContainer_i::Lfinalize_removal");