1 // Copyright (C) 2007-2023 CEA, EDF, OPEN CASCADE
3 // Copyright (C) 2003-2007 OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 // CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
6 // This library is free software; you can redistribute it and/or
7 // modify it under the terms of the GNU Lesser General Public
8 // License as published by the Free Software Foundation; either
9 // version 2.1 of the License, or (at your option) any later version.
11 // This library is distributed in the hope that it will be useful,
12 // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 // Lesser General Public License for more details.
16 // You should have received a copy of the GNU Lesser General Public
17 // License along with this library; if not, write to the Free Software
18 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
20 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
26 #include "Basics_Utils.hxx"
27 #include "SALOME_Component_i.hxx"
28 #include "MPIContainer_i.hxx"
29 #include "SALOME_NamingService.hxx"
30 #include "Utils_SINGLETON.hxx"
32 #include "PythonCppUtils.hxx"
33 #include "utilities.h"
36 #include <pthread.h> // must be before Python.h !
43 #include "Container_init_python.hxx"
45 // L'appel au registry SALOME ne se fait que pour le process 0
46 Engines_MPIContainer_i::Engines_MPIContainer_i(CORBA::ORB_ptr orb,
47 PortableServer::POA_ptr poa,
49 int argc, char *argv[])
50 : Engines_Container_i(orb,poa,containerName,argc,argv,nullptr,false)
53 _id = _poa->activate_object(this);
54 CORBA::Object_var obj=_poa->id_to_reference(*_id);
55 Engines::Container_var pCont = Engines::Container::_narrow(obj);
60 _NS = new SALOME_NamingService();
61 _NS->init_orb( CORBA::ORB::_duplicate(_orb) ) ;
63 std::string hostname = Kernel_Utils::GetHostname();
64 _containerName = SALOME_NamingService_Abstract::BuildContainerNameForNS(containerName,hostname.c_str());
65 SCRUTE(_containerName);
66 _NS->Register(pCont, _containerName.c_str());
70 // Root recupere les ior des container des autre process
71 Engines::MPIObject_var pobj = POA_Engines::MPIContainer::_this();
72 BCastIOR(_orb,pobj,true);
75 Engines_MPIContainer_i::Engines_MPIContainer_i()
76 : Engines_Container_i()
80 Engines_MPIContainer_i::~Engines_MPIContainer_i(void)
82 MESSAGE("[" << _numproc << "] Engines_MPIContainer_i::~Engines_MPIContainer_i()");
86 void Engines_MPIContainer_i::Shutdown()
89 MESSAGE("[" << _numproc << "] shutdown of MPI Corba Server");
91 _NS->Destroy_FullDirectory(_containerName.c_str());
92 _NS->Destroy_Name(_containerName.c_str());
93 for(ip= 1;ip<_nbproc;ip++)
94 (Engines::MPIContainer::_narrow((*_tior)[ip]))->Shutdown();
97 std::map<std::string, Engines::EngineComponent_var>::iterator itm;
98 for (itm = _listInstances_map.begin(); itm != _listInstances_map.end(); itm++)
102 itm->second->destroy();
104 catch(const CORBA::Exception& e)
106 // ignore this entry and continue
110 // ignore this entry and continue
118 // Load a component library
119 bool Engines_MPIContainer_i::load_component_Library(const char* componentName, CORBA::String_out reason)
121 reason=CORBA::string_dup("");
125 th = new pthread_t[_nbproc];
126 for(int ip=1;ip<_nbproc;ip++){
127 thread_st *st = new thread_st;
130 st->compoName = componentName;
131 pthread_create(&(th[ip]),NULL,th_loadcomponentlibrary,(void*)st);
135 bool ret = Lload_component_Library(componentName);
138 for(int ip=1;ip<_nbproc;ip++)
139 pthread_join(th[ip],NULL);
145 bool Engines_MPIContainer_i::Lload_component_Library(const char* componentName)
147 std::string aCompName = componentName;
149 // --- try dlopen C++ component
152 std::string impl_name = std::string ("lib") + aCompName + std::string("Engine.dylib");
154 std::string impl_name = std::string ("lib") + aCompName + std::string("Engine.so");
157 _numInstanceMutex.lock(); // lock to be alone
158 // (see decInstanceCnt, finalize_removal))
159 if (_toRemove_map[impl_name]) _toRemove_map.erase(impl_name);
160 if (_library_map[impl_name])
162 MESSAGE("[" << _numproc << "] Library " << impl_name << " already loaded");
163 _numInstanceMutex.unlock();
168 handle = dlopen( impl_name.c_str() , RTLD_LAZY | RTLD_GLOBAL ) ;
171 _library_map[impl_name] = handle;
172 _numInstanceMutex.unlock();
173 MESSAGE("[" << _numproc << "] Library " << impl_name << " loaded");
178 MESSAGE("[" << _numproc << "] Can't load shared library : " << impl_name);
179 MESSAGE("[" << _numproc << "] error dlopen: " << dlerror());
181 _numInstanceMutex.unlock();
183 // --- try import Python component
185 INFOS("[" << _numproc << "] try import Python component "<<componentName);
186 if (_isSupervContainer)
188 INFOS("[" << _numproc << "] Supervision Container does not support Python Component Engines");
191 if (_library_map[aCompName])
193 return true; // Python Component, already imported
197 PyObject *pyCont = nullptr;
201 PyObject *mainmod = PyImport_AddModule((char *)"__main__");
202 PyObject *globals = PyModule_GetDict(mainmod);
203 pyCont = PyDict_GetItemString(globals, "pyCont");
204 PyObject *result = PyObject_CallMethod(pyCont,
205 (char*)"import_component",
206 (char*)"s",componentName);
207 ret= PyUnicode_AsUTF8(result);
211 if (ret=="") // import possible: Python component
213 _library_map[aCompName] = (void *)pyCont; // any non O value OK
214 MESSAGE("[" << _numproc << "] import Python: "<<aCompName<<" OK");
221 // Create an instance of component
222 Engines::EngineComponent_ptr
223 Engines_MPIContainer_i::create_component_instance_env( const char* componentName,
224 const Engines::FieldsDict& /*env*/,
225 CORBA::String_out reason)
227 reason=CORBA::string_dup("");
231 th = new pthread_t[_nbproc];
232 for(int ip=1;ip<_nbproc;ip++){
233 thread_st *st = new thread_st;
236 st->compoName = componentName;
237 pthread_create(&(th[ip]),NULL,th_createcomponentinstance,(void*)st);
241 Engines::EngineComponent_ptr cptr = Lcreate_component_instance(componentName);
244 for(int ip=1;ip<_nbproc;ip++)
245 pthread_join(th[ip],NULL);
252 Engines::EngineComponent_ptr
253 Engines_MPIContainer_i::Lcreate_component_instance( const char* genericRegisterName )
255 Engines::EngineComponent_var iobject = Engines::EngineComponent::_nil() ;
256 Engines::MPIObject_var pobj;
258 std::string aCompName = genericRegisterName;
259 if (_library_map[aCompName]) { // Python component
260 if (_isSupervContainer) {
261 INFOS("Supervision Container does not support Python Component Engines");
262 return Engines::EngineComponent::_nil();
264 _numInstanceMutex.lock() ; // lock on the instance number
266 int numInstance = _numInstance ;
267 _numInstanceMutex.unlock() ;
270 sprintf( aNumI , "%d" , numInstance ) ;
271 std::string instanceName = aCompName + "_inst_" + aNumI ;
272 std::string component_registerName =
273 _containerName + "/" + instanceName;
278 PyObject *mainmod = PyImport_AddModule((char*)"__main__");
279 PyObject *globals = PyModule_GetDict(mainmod);
280 PyObject *pyCont = PyDict_GetItemString(globals, "pyCont");
281 PyObject *result = PyObject_CallMethod(pyCont,
282 (char*)"create_component_instance",
285 instanceName.c_str());
288 PyArg_ParseTuple(result,"ss", &ior, &error);
293 CORBA::Object_var obj = _orb->string_to_object(iors.c_str());
294 iobject = Engines::EngineComponent::_narrow( obj ) ;
295 pobj = Engines::MPIObject::_narrow(obj) ;
297 _NS->Register(iobject, component_registerName.c_str()) ;
298 // Root recupere les ior des composants des autre process
299 BCastIOR(_orb,pobj,false);
301 return iobject._retn();
307 std::string impl_name = std::string ("lib") + genericRegisterName +std::string("Engine.dylib");
309 std::string impl_name = std::string ("lib") + genericRegisterName +std::string("Engine.so");
311 if (_library_map.count(impl_name) != 0) // C++ component
313 void* handle = _library_map[impl_name];
314 iobject = createMPIInstance(genericRegisterName,
316 return iobject._retn();
319 return Engines::EngineComponent::_nil() ;
322 Engines::EngineComponent_ptr
323 Engines_MPIContainer_i::createMPIInstance(std::string genericRegisterName,
326 Engines::EngineComponent_var iobject;
327 Engines::MPIObject_var pobj;
328 // --- find the factory
330 std::string aGenRegisterName = genericRegisterName;
331 std::string factory_name = aGenRegisterName + std::string("Engine_factory");
333 typedef PortableServer::ObjectId * (*MPIFACTORY_FUNCTION)
335 PortableServer::POA_ptr,
336 PortableServer::ObjectId *,
341 MPIFACTORY_FUNCTION MPIComponent_factory = (MPIFACTORY_FUNCTION) dlsym(handle, factory_name.c_str());
343 if ( !MPIComponent_factory )
345 INFOS( "[" << _numproc << "] Can't resolve symbol: " + factory_name );
347 pobj = Engines::MPIObject::_nil();
348 BCastIOR(_orb,pobj,false);
349 return Engines::EngineComponent::_nil();
352 // --- create instance
354 iobject = Engines::EngineComponent::_nil() ;
358 _numInstanceMutex.lock() ; // lock on the instance number
360 int numInstance = _numInstance ;
361 _numInstanceMutex.unlock() ;
364 sprintf( aNumI , "%d" , numInstance ) ;
365 std::string instanceName = aGenRegisterName + "_inst_" + aNumI ;
366 std::string component_registerName =
367 _containerName + "/" + instanceName;
369 // --- Instantiate required CORBA object
371 PortableServer::ObjectId *id ; //not owner, do not delete (nore use var)
372 id = (MPIComponent_factory) ( _orb, _poa, _id, instanceName.c_str(), aGenRegisterName.c_str() ) ;
374 // --- get reference from id
376 CORBA::Object_var obj = _poa->id_to_reference(*id);
377 iobject = Engines::EngineComponent::_narrow( obj ) ;
378 pobj = Engines::MPIObject::_narrow(obj) ;
380 _listInstances_map[instanceName] = iobject;
381 _cntInstances_map[aGenRegisterName] += 1;
383 // --- register the engine under the name
384 // containerName(.dir)/instanceName(.object)
387 _NS->Register( iobject , component_registerName.c_str() ) ;
388 MESSAGE( component_registerName.c_str() << " bound" ) ;
390 // Root recupere les ior des composants des autre process
391 BCastIOR(_orb,pobj,false);
394 catch(const std::exception &ex){
396 return Engines::EngineComponent::_nil();
398 return iobject._retn();
402 Engines::EngineComponent_ptr Engines_MPIContainer_i::load_impl(const char* nameToRegister,
403 const char* componentName)
407 th = new pthread_t[_nbproc];
408 for(int ip=1;ip<_nbproc;ip++){
409 thread_st *st = new thread_st;
412 st->nameToRegister = nameToRegister;
413 st->compoName = componentName;
414 pthread_create(&(th[ip]),NULL,th_loadimpl,(void*)st);
418 Engines::EngineComponent_ptr cptr = Lload_impl(nameToRegister,componentName);
421 for(int ip=1;ip<_nbproc;ip++)
422 pthread_join(th[ip],NULL);
430 Engines::EngineComponent_ptr Engines_MPIContainer_i::Lload_impl(
431 const char* nameToRegister,
432 const char* componentName)
434 Engines::EngineComponent_var iobject;
435 Engines::MPIObject_var pobj;
438 sprintf(cproc,"_%d",_numproc);
440 BEGIN_OF("[" << _numproc << "] MPIContainer_i::Lload_impl");
442 _numInstanceMutex.lock() ; // lock on the instance number
445 sprintf(_aNumI,"%d",_numInstance) ;
447 std::string _impl_name = componentName;
448 std::string _nameToRegister = nameToRegister;
449 std::string instanceName = _nameToRegister + "_inst_" + _aNumI + cproc;
450 MESSAGE("[" << _numproc << "] instanceName=" << instanceName);
452 std::string absolute_impl_name(_impl_name);
453 MESSAGE("[" << _numproc << "] absolute_impl_name=" << absolute_impl_name);
454 void * handle = dlopen(absolute_impl_name.c_str(), RTLD_LAZY | RTLD_GLOBAL);
456 INFOS("[" << _numproc << "] Can't load shared library : " << absolute_impl_name);
457 INFOS("[" << _numproc << "] error dlopen: " << dlerror());
458 return Engines::EngineComponent::_nil() ;
461 std::string factory_name = _nameToRegister + std::string("Engine_factory");
462 MESSAGE("[" << _numproc << "] factory_name=" << factory_name) ;
465 PortableServer::ObjectId * (*MPIComponent_factory) (CORBA::ORB_ptr,
466 PortableServer::POA_ptr,
467 PortableServer::ObjectId *,
470 (PortableServer::ObjectId * (*) (CORBA::ORB_ptr,
471 PortableServer::POA_ptr,
472 PortableServer::ObjectId *,
475 dlsym(handle, factory_name.c_str());
478 if ((error = dlerror()) != NULL){
479 // Try to load a sequential component
480 MESSAGE("[" << _numproc << "] Try to load a sequential component");
481 _numInstanceMutex.unlock() ;
482 iobject = Engines_Container_i::load_impl(nameToRegister,componentName);
483 if( CORBA::is_nil(iobject) ) return Engines::EngineComponent::_duplicate(iobject);
486 // Instanciation du composant parallele
487 MESSAGE("[" << _numproc << "] Try to load a parallel component");
488 PortableServer::ObjectId * id = (MPIComponent_factory)
489 (_orb, _poa, _id, instanceName.c_str(), _nameToRegister.c_str());
490 // get reference from id
491 CORBA::Object_var o = _poa->id_to_reference(*id);
492 pobj = Engines::MPIObject::_narrow(o) ;
493 iobject = Engines::EngineComponent::_narrow(o) ;
497 // utiliser + tard le registry ici :
498 // register the engine under the name containerName.dir/nameToRegister.object
499 std::string component_registerName = _containerName + "/" + _nameToRegister;
500 _NS->Register(iobject, component_registerName.c_str()) ;
503 _numInstanceMutex.unlock() ;
505 // Root recupere les ior des composants des autre process
506 BCastIOR(_orb,pobj,false);
508 END_OF("[" <<_numproc << "] MPIContainer_i::Lload_impl");
509 return Engines::EngineComponent::_duplicate(iobject);
513 void Engines_MPIContainer_i::remove_impl(Engines::EngineComponent_ptr component_i)
515 Engines::MPIObject_ptr pcptr;
516 Engines::MPIObject_ptr spcptr;
520 pcptr = (Engines::MPIObject_ptr)component_i;
521 th = new pthread_t[_nbproc];
522 for(int ip=1;ip<_nbproc;ip++){
523 thread_st *st = new thread_st;
526 spcptr = Engines::MPIObject::_narrow((*(pcptr->tior()))[ip]);
527 st->cptr = (Engines::EngineComponent_ptr)spcptr;
528 pthread_create(&(th[ip]),NULL,th_removeimpl,(void*)st);
532 ASSERT(! CORBA::is_nil(component_i));
533 std::string instanceName = component_i->instanceName() ;
534 MESSAGE("[" << _numproc << "] unload component " << instanceName);
535 _numInstanceMutex.lock() ; // lock on the remove on handle_map
536 _listInstances_map.erase(instanceName);
537 _numInstanceMutex.unlock() ;
538 component_i->destroy() ;
540 _NS->Destroy_Name(instanceName.c_str());
543 for(int ip=1;ip<_nbproc;ip++)
544 pthread_join(th[ip],NULL);
550 void Engines_MPIContainer_i::finalize_removal()
554 th = new pthread_t[_nbproc];
555 for(int ip=1;ip<_nbproc;ip++){
556 thread_st *st = new thread_st;
559 pthread_create(&(th[ip]),NULL,th_finalizeremoval,(void*)st);
563 _numInstanceMutex.lock(); // lock to be alone
564 // (see decInstanceCnt, load_component_Library)
565 std::map<std::string, void *>::iterator ith;
566 for (ith = _toRemove_map.begin(); ith != _toRemove_map.end(); ith++)
568 void *handle = (*ith).second;
569 std::string impl_name= (*ith).first;
574 // dlclose(handle); // SALOME unstable after ...
575 // _library_map.erase(impl_name);
578 _toRemove_map.clear();
579 _numInstanceMutex.unlock();
582 for(int ip=1;ip<_nbproc;ip++)
583 pthread_join(th[ip],NULL);
588 void *th_loadcomponentlibrary(void *s)
590 thread_st *st = (thread_st*)s;
592 (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->load_component_Library(st->compoName.c_str(),reason);
593 CORBA::string_free(reason);
597 void *th_createcomponentinstance(void *s)
599 thread_st *st = (thread_st*)s;
600 (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->create_component_instance(st->compoName.c_str());
604 void *th_loadimpl(void *s)
606 thread_st *st = (thread_st*)s;
607 (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->load_impl(st->nameToRegister.c_str(),st->compoName.c_str());
611 void *th_removeimpl(void *s)
613 thread_st *st = (thread_st*)s;
614 (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->remove_impl(st->cptr);
618 void *th_finalizeremoval(void *s)
620 thread_st *st = (thread_st*)s;
621 (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->finalize_removal();