Salome HOME
improvement: replace oneway by threads
authorsecher <secher>
Tue, 17 Mar 2009 13:38:14 +0000 (13:38 +0000)
committersecher <secher>
Tue, 17 Mar 2009 13:38:14 +0000 (13:38 +0000)
idl/SALOME_MPIContainer.idl
src/MPIContainer/MPIContainer_i.cxx
src/MPIContainer/MPIContainer_i.hxx

index 2c469754312423368efe14f0272e0ee493d62a8c..e82c416d5fda4d377c376e30c3bcc1b7df8a4454 100644 (file)
@@ -39,12 +39,6 @@ module Engines
 
   interface MPIContainer:Container,MPIObject
   {
-    // asynchronous version to load and remove parallel component in parallel
-    oneway void Asload_component_Library(in string componentName);
-    oneway void Ascreate_component_instance(in string componentName,in long studyId);
-    oneway void Asload_impl(in string nameToRegister, in string componentName);
-    oneway void Asremove_impl(in Component component_i);
-    oneway void Asfinalize_removal();
   };
 } ;
 
index c3fd234058342f057c1ef02fb6942b8531f8adbb..45de94d0ade1b251ddf8b0c6cf4538628a3860d5 100644 (file)
@@ -116,18 +116,26 @@ void Engines_MPIContainer_i::Shutdown()
 // Load a component library
 bool Engines_MPIContainer_i::load_component_Library(const char* componentName)
 {
-  if( _numproc == 0 ){
-    // Invocation du chargement du composant dans les autres process
-    for(int ip= 1;ip<_nbproc;ip++)
-      (Engines::MPIContainer::_narrow((*_tior)[ip]))->Asload_component_Library(componentName);
+  pthread_t *th;
+  if(_numproc == 0){
+    th = new pthread_t[_nbproc];
+    for(int ip=1;ip<_nbproc;ip++){
+      thread_st *st = new thread_st;
+      st->ip = ip;
+      st->tior = _tior;
+      st->compoName = componentName;
+      pthread_create(&(th[ip]),NULL,th_loadcomponentlibrary,(void*)st);
+    }
   }
 
-  return Lload_component_Library(componentName);
-}
+  bool ret = Lload_component_Library(componentName);
 
-void Engines_MPIContainer_i::Asload_component_Library(const char* componentName)
-{
-  Lload_component_Library(componentName);
+  if(_numproc == 0){
+    for(int ip=1;ip<_nbproc;ip++)
+      pthread_join(th[ip],NULL);
+    delete th;
+  }
+  return ret;
 }
 
 bool Engines_MPIContainer_i::Lload_component_Library(const char* componentName)
@@ -155,14 +163,12 @@ bool Engines_MPIContainer_i::Lload_component_Library(const char* componentName)
       _library_map[impl_name] = handle;
       _numInstanceMutex.unlock();
       MESSAGE("[" << _numproc << "] Library " << impl_name << " loaded");
-      MPI_Barrier(MPI_COMM_WORLD);
       return true;
     }
   else
     {
       MESSAGE("[" << _numproc << "] Can't load shared library : " << impl_name);
       MESSAGE("[" << _numproc << "] error dlopen: " << dlerror());
-      MPI_Barrier(MPI_COMM_WORLD);
     }
   _numInstanceMutex.unlock();
 
@@ -206,19 +212,28 @@ Engines::Component_ptr
 Engines_MPIContainer_i::create_component_instance( const char* componentName,
                                                   CORBA::Long studyId)
 {
-  if( _numproc == 0 ){
-    // Invocation du chargement du composant dans les autres process
-    for(int ip= 1;ip<_nbproc;ip++)
-      (Engines::MPIContainer::_narrow((*_tior)[ip]))->Ascreate_component_instance(componentName,studyId);
+  pthread_t *th;
+  if(_numproc == 0){
+    th = new pthread_t[_nbproc];
+    for(int ip=1;ip<_nbproc;ip++){
+      thread_st *st = new thread_st;
+      st->ip = ip;
+      st->tior = _tior;
+      st->compoName = componentName;
+      st->studyId = studyId;
+      pthread_create(&(th[ip]),NULL,th_createcomponentinstance,(void*)st);
+    }
   }
 
-  return Lcreate_component_instance(componentName,studyId);
-}
+  Engines::Component_ptr cptr = Lcreate_component_instance(componentName,studyId);
 
-void Engines_MPIContainer_i::Ascreate_component_instance( const char* componentName,
-                                                         CORBA::Long studyId)
-{
-  Lcreate_component_instance(componentName,studyId);
+  if(_numproc == 0){
+    for(int ip=1;ip<_nbproc;ip++)
+      pthread_join(th[ip],NULL);
+    delete th;
+  }
+
+  return cptr;
 }
 
 Engines::Component_ptr
@@ -387,24 +402,31 @@ Engines_MPIContainer_i::createMPIInstance(string genericRegisterName,
 Engines::Component_ptr Engines_MPIContainer_i::load_impl(const char* nameToRegister,
                                                 const char* componentName)
 {
-  if( _numproc == 0 ){
-    // Invocation du chargement du composant dans les autres process
-    for(int ip= 1;ip<_nbproc;ip++)
-      (Engines::MPIContainer::_narrow((*_tior)[ip]))->Asload_impl(nameToRegister,
-                                                               componentName);
+  pthread_t *th;
+  if(_numproc == 0){
+    th = new pthread_t[_nbproc];
+    for(int ip=1;ip<_nbproc;ip++){
+      thread_st *st = new thread_st;
+      st->ip = ip;
+      st->tior = _tior;
+      st->nameToRegister = nameToRegister;
+      st->compoName = componentName;
+      pthread_create(&(th[ip]),NULL,th_loadimpl,(void*)st);
+    }
   }
 
-  return Lload_impl(nameToRegister,componentName);
+  Engines::Component_ptr cptr =  Lload_impl(nameToRegister,componentName);
 
-}
+  if(_numproc == 0){
+    for(int ip=1;ip<_nbproc;ip++)
+      pthread_join(th[ip],NULL);
+    delete th;
+  }
 
-// Load component
-void Engines_MPIContainer_i::Asload_impl(const char* nameToRegister,
-                                        const char* componentName)
-{
-  Lload_impl(nameToRegister,componentName);
+  return cptr;
 }
 
+// Load component
 Engines::Component_ptr Engines_MPIContainer_i::Lload_impl(
                                   const char* nameToRegister,
                                   const char* componentName)
@@ -492,83 +514,106 @@ Engines::Component_ptr Engines_MPIContainer_i::Lload_impl(
 
 void Engines_MPIContainer_i::remove_impl(Engines::Component_ptr component_i)
 {
-  int ip;
-  Engines::Component_ptr cptr;
   Engines::MPIObject_ptr pcptr;
   Engines::MPIObject_ptr spcptr;
 
-  ASSERT(! CORBA::is_nil(component_i));
-
-  if( _numproc == 0 ){
-    // Invocation de la destruction du composant dans les autres process
+  pthread_t *th;
+  if(_numproc == 0){
     pcptr = (Engines::MPIObject_ptr)component_i;
-    for(ip= 1;ip<_nbproc;ip++){
+    th = new pthread_t[_nbproc];
+    for(int ip=1;ip<_nbproc;ip++){
+      thread_st *st = new thread_st;
+      st->ip = ip;
+      st->tior = _tior;
       spcptr = Engines::MPIObject::_narrow((*(pcptr->tior()))[ip]);
-      cptr = (Engines::Component_ptr)spcptr;
-      (Engines::MPIContainer::_narrow((*_tior)[ip]))->Asremove_impl(cptr);
+      st->cptr = (Engines::Component_ptr)spcptr;
+      pthread_create(&(th[ip]),NULL,th_removeimpl,(void*)st);
     }
   }
 
-  Lremove_impl(component_i);
-}
-
-void Engines_MPIContainer_i::Asremove_impl(Engines::Component_ptr component_i)
-{
-  Lremove_impl(component_i);
-}
-
-void Engines_MPIContainer_i::Lremove_impl(Engines::Component_ptr component_i)
-{
-  BEGIN_OF("[" << _numproc << "] MPIContainer_i::Lremove_impl");
-
   ASSERT(! CORBA::is_nil(component_i));
-
   string instanceName = component_i->instanceName() ;
   MESSAGE("[" << _numproc << "] unload component " << instanceName);
-  component_i->destroy() ;
-  MESSAGE("[" << _numproc << "] test key handle_map");
   _numInstanceMutex.lock() ; // lock on the remove on handle_map
+  _listInstances_map.erase(instanceName);
   _numInstanceMutex.unlock() ;
-  MESSAGE("[" << _numproc << "] list handle_map");
+  component_i->destroy() ;
+  if(_numproc == 0)
+    _NS->Destroy_Name(instanceName.c_str());
 
-  END_OF("[" << _numproc << "] MPIContainer_i::Lremove_impl");
+  if(_numproc == 0){
+    for(int ip=1;ip<_nbproc;ip++)
+      pthread_join(th[ip],NULL);
+    delete th;
+  }
 
 }
 
 void Engines_MPIContainer_i::finalize_removal()
 {
-  int ip;
+  pthread_t *th;
+  if(_numproc == 0){
+    th = new pthread_t[_nbproc];
+    for(int ip=1;ip<_nbproc;ip++){
+      thread_st *st = new thread_st;
+      st->ip = ip;
+      st->tior = _tior;
+      pthread_create(&(th[ip]),NULL,th_finalizeremoval,(void*)st);
+    }
+  }
 
-  if( _numproc == 0 ){
-    // Invocation de la destruction du composant dans les autres process
-    for(ip= 1;ip<_nbproc;ip++)
-      (Engines::MPIContainer::_narrow((*_tior)[ip]))->Asfinalize_removal();
+  _numInstanceMutex.lock(); // lock to be alone
+  // (see decInstanceCnt, load_component_Library)
+  map<string, void *>::iterator ith;
+  for (ith = _toRemove_map.begin(); ith != _toRemove_map.end(); ith++)
+  {
+    void *handle = (*ith).second;
+    string impl_name= (*ith).first;
+    if (handle)
+    {
+      SCRUTE(handle);
+      SCRUTE(impl_name);
+      //        dlclose(handle);                // SALOME unstable after ...
+      //        _library_map.erase(impl_name);
+    }
   }
+  _toRemove_map.clear();
+  _numInstanceMutex.unlock();
 
-  Lfinalize_removal();
+  if(_numproc == 0){
+    for(int ip=1;ip<_nbproc;ip++)
+      pthread_join(th[ip],NULL);
+    delete th;
+  }
 }
 
-void Engines_MPIContainer_i::Asfinalize_removal()
+void *th_loadcomponentlibrary(void *s)
 {
-  Lfinalize_removal();
+  thread_st *st = (thread_st*)s;
+  (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->load_component_Library(st->compoName.c_str());
 }
 
-void Engines_MPIContainer_i::Lfinalize_removal()
+void *th_createcomponentinstance(void *s)
 {
-  BEGIN_OF("[" << _numproc << "] MPIContainer_i::Lfinalize_removal");
-
-//   map<string, void *>::iterator im ;
-//   // lock on the explore remove_map & dlclose
-//   _numInstanceMutex.lock() ; 
-//   for (im = remove_map.begin() ; im != remove_map.end() ; im ++)
-//     {
-//       void * handle = (*im).second ;
-//       MESSAGE("[" << _numproc << "] dlclose " << (*im).first);
-//       dlclose(handle) ;
-//     }
-//   MESSAGE("[" << _numproc << "] remove_map.clear()");
-//   remove_map.clear() ;  
-//   _numInstanceMutex.unlock() ;
-
-  END_OF("[" << _numproc << "] MPIContainer_i::Lfinalize_removal");
+  thread_st *st = (thread_st*)s;
+  (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->create_component_instance(st->compoName.c_str(),st->studyId);
 }
+
+void *th_loadimpl(void *s)
+{
+  thread_st *st = (thread_st*)s;
+  (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->load_impl(st->nameToRegister.c_str(),st->compoName.c_str());
+}
+
+void *th_removeimpl(void *s)
+{
+  thread_st *st = (thread_st*)s;
+  (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->remove_impl(st->cptr);
+}
+
+void *th_finalizeremoval(void *s)
+{
+  thread_st *st = (thread_st*)s;
+  (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->finalize_removal();
+}
+
index a654485406c39fd41d493dffb406f22d8dfcdfa6..58f92aa3ee7f7e6703b0aed98b456d1f580b3405 100644 (file)
 #include "SALOME_Container_i.hxx"
 #include "MPIObject_i.hxx"
 
+void * th_loadcomponentlibrary(void *st);
+void * th_createcomponentinstance(void *st);
+void * th_loadimpl(void *st);
+void * th_removeimpl(void *st);
+void * th_finalizeremoval(void *st);
+
+typedef struct {
+  int ip;
+  Engines::IORTab* tior;
+  std::string compoName;
+  std::string nameToRegister;
+  long studyId;
+  Engines::Component_ptr cptr;
+} thread_st;
+
 class Engines_MPIContainer_i : public POA_Engines::MPIContainer,
                               public Engines_Container_i,
                               public MPIObject_i
@@ -53,47 +68,32 @@ class Engines_MPIContainer_i : public POA_Engines::MPIContainer,
   // Load a component library
   // synchronous version for process 0
   bool load_component_Library(const char* componentName);
-  // asynchronous version for other process
-  void Asload_component_Library(const char* componentName);
 
   // Create an instance of component
   // synchronous version for process 0
   Engines::Component_ptr
   create_component_instance( const char* componentName,
                             CORBA::Long studyId); // 0 for multiStudy
-  // asynchronous version for other process
-  void Ascreate_component_instance( const char* componentName,
-                                 CORBA::Long studyId); // 0 for multiStudy
 
   // Load a component in current MPI container
   // synchronous version for process 0
   Engines::Component_ptr load_impl(const char* nameToRegister,
                                   const char* componentName);
-  // asynchronous version for other process
-  void Asload_impl(const char* nameToRegister, const char* componentName);
 
   // Unload a component from current MPI container
   // synchronous version for process 0
   void remove_impl(Engines::Component_ptr component_i);
-  // asynchronous version for other process
-  void Asremove_impl(Engines::Component_ptr component_i);
 
   // synchronous version for process 0
   void finalize_removal();
-  // asynchronous version for other process
-  void Asfinalize_removal();
 
  private:
-  // local version to not duplicate code 
-  // called by synchronous and asynchronous version
   bool Lload_component_Library(const char* componentName);
   Engines::Component_ptr
   Lcreate_component_instance( const char* componentName,
                              CORBA::Long studyId); // 0 for multiStudy
   Engines::Component_ptr Lload_impl(const char* nameToRegister,
                                    const char* componentName);
-  void Lremove_impl(Engines::Component_ptr component_i);
-  void Lfinalize_removal();
 
   Engines::Component_ptr
   createMPIInstance(std::string genericRegisterName,