From a12898396c5225755c4a7fa1086c9747dcaac1a2 Mon Sep 17 00:00:00 2001 From: secher Date: Fri, 27 Feb 2009 15:19:19 +0000 Subject: [PATCH] replacement oneway by threads --- idl/SALOME_MPIContainer.idl | 6 - src/MPIContainer/MPIContainer_i.cxx | 211 +++++++++++++++++----------- src/MPIContainer/MPIContainer_i.hxx | 30 ++-- 3 files changed, 144 insertions(+), 103 deletions(-) diff --git a/idl/SALOME_MPIContainer.idl b/idl/SALOME_MPIContainer.idl index 6bcf09a43..fd40354f4 100644 --- a/idl/SALOME_MPIContainer.idl +++ b/idl/SALOME_MPIContainer.idl @@ -31,12 +31,6 @@ module Engines { interface MPIContainer:Container,MPIObject { - // asynchronous version to load and remove parallel component in parallel - oneway void Asload_component_Library(in string componentName); - oneway void Ascreate_component_instance(in string componentName,in long studyId); - oneway void Asload_impl(in string nameToRegister, in string componentName); - oneway void Asremove_impl(in Component component_i); - oneway void Asfinalize_removal(); }; } ; diff --git a/src/MPIContainer/MPIContainer_i.cxx b/src/MPIContainer/MPIContainer_i.cxx index c3fd23405..a9d35cf3f 100644 --- a/src/MPIContainer/MPIContainer_i.cxx +++ b/src/MPIContainer/MPIContainer_i.cxx @@ -116,18 +116,26 @@ void Engines_MPIContainer_i::Shutdown() // Load a component library bool Engines_MPIContainer_i::load_component_Library(const char* componentName) { - if( _numproc == 0 ){ - // Invocation du chargement du composant dans les autres process - for(int ip= 1;ip<_nbproc;ip++) - (Engines::MPIContainer::_narrow((*_tior)[ip]))->Asload_component_Library(componentName); + pthread_t *th; + if(_numproc == 0){ + th = new pthread_t[_nbproc]; + for(int ip=1;ip<_nbproc;ip++){ + thread_st *st = new thread_st; + st->ip = ip; + st->tior = _tior; + st->compoName = componentName; + pthread_create(&(th[ip]),NULL,th_loadcomponentlibrary,(void*)st); + } } - return Lload_component_Library(componentName); -} + bool ret = Lload_component_Library(componentName); -void Engines_MPIContainer_i::Asload_component_Library(const char* componentName) -{ - Lload_component_Library(componentName); + if(_numproc == 0){ + for(int ip=1;ip<_nbproc;ip++) + pthread_join(th[ip],NULL); + delete th; + } + return ret; } bool Engines_MPIContainer_i::Lload_component_Library(const char* componentName) @@ -206,19 +214,28 @@ Engines::Component_ptr Engines_MPIContainer_i::create_component_instance( const char* componentName, CORBA::Long studyId) { - if( _numproc == 0 ){ - // Invocation du chargement du composant dans les autres process - for(int ip= 1;ip<_nbproc;ip++) - (Engines::MPIContainer::_narrow((*_tior)[ip]))->Ascreate_component_instance(componentName,studyId); + pthread_t *th; + if(_numproc == 0){ + th = new pthread_t[_nbproc]; + for(int ip=1;ip<_nbproc;ip++){ + thread_st *st = new thread_st; + st->ip = ip; + st->tior = _tior; + st->compoName = componentName; + st->studyId = studyId; + pthread_create(&(th[ip]),NULL,th_createcomponentinstance,(void*)st); + } } - return Lcreate_component_instance(componentName,studyId); -} + Engines::Component_ptr cptr = Lcreate_component_instance(componentName,studyId); -void Engines_MPIContainer_i::Ascreate_component_instance( const char* componentName, - CORBA::Long studyId) -{ - Lcreate_component_instance(componentName,studyId); + if(_numproc == 0){ + for(int ip=1;ip<_nbproc;ip++) + pthread_join(th[ip],NULL); + delete th; + } + + return cptr; } Engines::Component_ptr @@ -387,24 +404,31 @@ Engines_MPIContainer_i::createMPIInstance(string genericRegisterName, Engines::Component_ptr Engines_MPIContainer_i::load_impl(const char* nameToRegister, const char* componentName) { - if( _numproc == 0 ){ - // Invocation du chargement du composant dans les autres process - for(int ip= 1;ip<_nbproc;ip++) - (Engines::MPIContainer::_narrow((*_tior)[ip]))->Asload_impl(nameToRegister, - componentName); + pthread_t *th; + if(_numproc == 0){ + th = new pthread_t[_nbproc]; + for(int ip=1;ip<_nbproc;ip++){ + thread_st *st = new thread_st; + st->ip = ip; + st->tior = _tior; + st->nameToRegister = nameToRegister; + st->compoName = componentName; + pthread_create(&(th[ip]),NULL,th_loadimpl,(void*)st); + } } - return Lload_impl(nameToRegister,componentName); + Engines::Component_ptr cptr = Lload_impl(nameToRegister,componentName); -} + if(_numproc == 0){ + for(int ip=1;ip<_nbproc;ip++) + pthread_join(th[ip],NULL); + delete th; + } -// Load component -void Engines_MPIContainer_i::Asload_impl(const char* nameToRegister, - const char* componentName) -{ - Lload_impl(nameToRegister,componentName); + return cptr; } +// Load component Engines::Component_ptr Engines_MPIContainer_i::Lload_impl( const char* nameToRegister, const char* componentName) @@ -492,83 +516,106 @@ Engines::Component_ptr Engines_MPIContainer_i::Lload_impl( void Engines_MPIContainer_i::remove_impl(Engines::Component_ptr component_i) { - int ip; - Engines::Component_ptr cptr; Engines::MPIObject_ptr pcptr; Engines::MPIObject_ptr spcptr; - ASSERT(! CORBA::is_nil(component_i)); - - if( _numproc == 0 ){ - // Invocation de la destruction du composant dans les autres process + pthread_t *th; + if(_numproc == 0){ pcptr = (Engines::MPIObject_ptr)component_i; - for(ip= 1;ip<_nbproc;ip++){ + th = new pthread_t[_nbproc]; + for(int ip=1;ip<_nbproc;ip++){ + thread_st *st = new thread_st; + st->ip = ip; + st->tior = _tior; spcptr = Engines::MPIObject::_narrow((*(pcptr->tior()))[ip]); - cptr = (Engines::Component_ptr)spcptr; - (Engines::MPIContainer::_narrow((*_tior)[ip]))->Asremove_impl(cptr); + st->cptr = (Engines::Component_ptr)spcptr; + pthread_create(&(th[ip]),NULL,th_removeimpl,(void*)st); } } - Lremove_impl(component_i); -} - -void Engines_MPIContainer_i::Asremove_impl(Engines::Component_ptr component_i) -{ - Lremove_impl(component_i); -} - -void Engines_MPIContainer_i::Lremove_impl(Engines::Component_ptr component_i) -{ - BEGIN_OF("[" << _numproc << "] MPIContainer_i::Lremove_impl"); - ASSERT(! CORBA::is_nil(component_i)); - string instanceName = component_i->instanceName() ; MESSAGE("[" << _numproc << "] unload component " << instanceName); - component_i->destroy() ; - MESSAGE("[" << _numproc << "] test key handle_map"); _numInstanceMutex.lock() ; // lock on the remove on handle_map + _listInstances_map.erase(instanceName); _numInstanceMutex.unlock() ; - MESSAGE("[" << _numproc << "] list handle_map"); + component_i->destroy() ; + if(_numproc == 0) + _NS->Destroy_Name(instanceName.c_str()); - END_OF("[" << _numproc << "] MPIContainer_i::Lremove_impl"); + if(_numproc == 0){ + for(int ip=1;ip<_nbproc;ip++) + pthread_join(th[ip],NULL); + delete th; + } } void Engines_MPIContainer_i::finalize_removal() { - int ip; + pthread_t *th; + if(_numproc == 0){ + th = new pthread_t[_nbproc]; + for(int ip=1;ip<_nbproc;ip++){ + thread_st *st = new thread_st; + st->ip = ip; + st->tior = _tior; + pthread_create(&(th[ip]),NULL,th_finalizeremoval,(void*)st); + } + } - if( _numproc == 0 ){ - // Invocation de la destruction du composant dans les autres process - for(ip= 1;ip<_nbproc;ip++) - (Engines::MPIContainer::_narrow((*_tior)[ip]))->Asfinalize_removal(); + _numInstanceMutex.lock(); // lock to be alone + // (see decInstanceCnt, load_component_Library) + map::iterator ith; + for (ith = _toRemove_map.begin(); ith != _toRemove_map.end(); ith++) + { + void *handle = (*ith).second; + string impl_name= (*ith).first; + if (handle) + { + SCRUTE(handle); + SCRUTE(impl_name); + // dlclose(handle); // SALOME unstable after ... + // _library_map.erase(impl_name); + } } + _toRemove_map.clear(); + _numInstanceMutex.unlock(); - Lfinalize_removal(); + if(_numproc == 0){ + for(int ip=1;ip<_nbproc;ip++) + pthread_join(th[ip],NULL); + delete th; + } } -void Engines_MPIContainer_i::Asfinalize_removal() +void *th_loadcomponentlibrary(void *s) { - Lfinalize_removal(); + thread_st *st = (thread_st*)s; + (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->load_component_Library(st->compoName.c_str()); } -void Engines_MPIContainer_i::Lfinalize_removal() +void *th_createcomponentinstance(void *s) { - BEGIN_OF("[" << _numproc << "] MPIContainer_i::Lfinalize_removal"); - -// map::iterator im ; -// // lock on the explore remove_map & dlclose -// _numInstanceMutex.lock() ; -// for (im = remove_map.begin() ; im != remove_map.end() ; im ++) -// { -// void * handle = (*im).second ; -// MESSAGE("[" << _numproc << "] dlclose " << (*im).first); -// dlclose(handle) ; -// } -// MESSAGE("[" << _numproc << "] remove_map.clear()"); -// remove_map.clear() ; -// _numInstanceMutex.unlock() ; - - END_OF("[" << _numproc << "] MPIContainer_i::Lfinalize_removal"); + thread_st *st = (thread_st*)s; + (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->create_component_instance(st->compoName.c_str(),st->studyId); } + +void *th_loadimpl(void *s) +{ + thread_st *st = (thread_st*)s; + (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->load_impl(st->nameToRegister.c_str(),st->compoName.c_str()); +} + +void *th_removeimpl(void *s) +{ + thread_st *st = (thread_st*)s; + (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->remove_impl(st->cptr); +} + +void *th_finalizeremoval(void *s) +{ + thread_st *st = (thread_st*)s; + (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->finalize_removal(); +} + diff --git a/src/MPIContainer/MPIContainer_i.hxx b/src/MPIContainer/MPIContainer_i.hxx index a65448540..58f92aa3e 100644 --- a/src/MPIContainer/MPIContainer_i.hxx +++ b/src/MPIContainer/MPIContainer_i.hxx @@ -31,6 +31,21 @@ #include "SALOME_Container_i.hxx" #include "MPIObject_i.hxx" +void * th_loadcomponentlibrary(void *st); +void * th_createcomponentinstance(void *st); +void * th_loadimpl(void *st); +void * th_removeimpl(void *st); +void * th_finalizeremoval(void *st); + +typedef struct { + int ip; + Engines::IORTab* tior; + std::string compoName; + std::string nameToRegister; + long studyId; + Engines::Component_ptr cptr; +} thread_st; + class Engines_MPIContainer_i : public POA_Engines::MPIContainer, public Engines_Container_i, public MPIObject_i @@ -53,47 +68,32 @@ class Engines_MPIContainer_i : public POA_Engines::MPIContainer, // Load a component library // synchronous version for process 0 bool load_component_Library(const char* componentName); - // asynchronous version for other process - void Asload_component_Library(const char* componentName); // Create an instance of component // synchronous version for process 0 Engines::Component_ptr create_component_instance( const char* componentName, CORBA::Long studyId); // 0 for multiStudy - // asynchronous version for other process - void Ascreate_component_instance( const char* componentName, - CORBA::Long studyId); // 0 for multiStudy // Load a component in current MPI container // synchronous version for process 0 Engines::Component_ptr load_impl(const char* nameToRegister, const char* componentName); - // asynchronous version for other process - void Asload_impl(const char* nameToRegister, const char* componentName); // Unload a component from current MPI container // synchronous version for process 0 void remove_impl(Engines::Component_ptr component_i); - // asynchronous version for other process - void Asremove_impl(Engines::Component_ptr component_i); // synchronous version for process 0 void finalize_removal(); - // asynchronous version for other process - void Asfinalize_removal(); private: - // local version to not duplicate code - // called by synchronous and asynchronous version bool Lload_component_Library(const char* componentName); Engines::Component_ptr Lcreate_component_instance( const char* componentName, CORBA::Long studyId); // 0 for multiStudy Engines::Component_ptr Lload_impl(const char* nameToRegister, const char* componentName); - void Lremove_impl(Engines::Component_ptr component_i); - void Lfinalize_removal(); Engines::Component_ptr createMPIInstance(std::string genericRegisterName, -- 2.39.2