X-Git-Url: http://git.salome-platform.org/gitweb/?a=blobdiff_plain;f=src%2FMPIContainer%2FMPIContainer_i.cxx;h=dfeb178e13579d792ccfb0b10b30e9d79103abae;hb=36b4d0d5e82155abafd70bd5e61660744da41a4d;hp=ad729c45ae07eaf880699114ef8356a2c294fb27;hpb=89d8cdd476c5f4d65bc3fd1089f092af42e2841c;p=modules%2Fkernel.git diff --git a/src/MPIContainer/MPIContainer_i.cxx b/src/MPIContainer/MPIContainer_i.cxx index ad729c45a..dfeb178e1 100644 --- a/src/MPIContainer/MPIContainer_i.cxx +++ b/src/MPIContainer/MPIContainer_i.cxx @@ -27,11 +27,15 @@ #include #include #include +#include "SALOME_Component_i.hxx" #include "MPIContainer_i.hxx" #include "SALOME_NamingService.hxx" #include "Utils_SINGLETON.hxx" #include "OpUtil.hxx" #include "utilities.h" +#include // must be before Python.h ! +#include +#include "Container_init_python.hxx" using namespace std; // L'appel au registry SALOME ne se fait que pour le process 0 @@ -40,22 +44,54 @@ Engines_MPIContainer_i::Engines_MPIContainer_i(int nbproc, int numproc, PortableServer::POA_ptr poa, char * containerName, int argc, char *argv[]) - : Engines_Container_i(orb,poa,containerName,argc,argv,false,false), MPIObject_i(nbproc,numproc) + : Engines_Container_i(orb,poa,containerName,argc,argv,false), MPIObject_i(nbproc,numproc) { + long id=0; + string IdContainerinNS; + char idc[3*sizeof(long)]; + MESSAGE("[" << numproc << "] activate object"); _id = _poa->activate_object(this); + if(argc>1) + { + for(int i=0;i::Instance() ; - ASSERT(SINGLETON_::IsAlreadyExisting()) ; - _NS->init_orb( orb ) ; + _NS = new SALOME_NamingService(); + _NS->init_orb( CORBA::ORB::_duplicate(_orb) ) ; - Engines::Container_ptr pCont - = Engines::Container::_narrow(POA_Engines::MPIContainer::_this()); + CORBA::Object_var obj=_poa->id_to_reference(*_id); + Engines::Container_var pCont = Engines::Container::_narrow(obj); + + string hostname = GetHostname(); + _containerName = _NS->BuildContainerNameForNS(containerName,hostname.c_str()); SCRUTE(_containerName); _NS->Register(pCont, _containerName.c_str()); + + // A parallel container registers in Naming Service + // on the machine where is process 0. ContainerManager does'nt know the name + // of this machine before the launch of the parallel container. So to get + // the IOR of the parallel container in Naming Service, ContainerManager + // gives a unique Id. The parallel container registers his name under + // /ContainerManager/Id directory in NamingService + + IdContainerinNS = "/ContainerManager/id"; + sprintf(idc,"%ld",id); + IdContainerinNS += idc; + SCRUTE(IdContainerinNS); + _NS->Register(pCont, IdContainerinNS.c_str()); + } // Root recupere les ior des container des autre process @@ -71,122 +107,299 @@ Engines_MPIContainer_i::Engines_MPIContainer_i(int nbproc, int numproc) Engines_MPIContainer_i::~Engines_MPIContainer_i(void) { MESSAGE("[" << _numproc << "] Engines_MPIContainer_i::~Engines_MPIContainer_i()"); - if( !handle_map.empty() ){ - MESSAGE("[" << _numproc << "] Engines_MPIContainer_i::~Engines_MPIContainer_i: warning destroy a not empty container"); +} + +// Load component +void Engines_MPIContainer_i::Shutdown() +{ + int ip; + MESSAGE("[" << _numproc << "] shutdown of MPI Corba Server"); + if( _numproc == 0 ){ + _NS->Destroy_FullDirectory(_containerName.c_str()); + for(ip= 1;ip<_nbproc;ip++) + (Engines::MPIContainer::_narrow((*_tior)[ip]))->Shutdown(); } + _orb->shutdown(0); + } -// Start MPI Container -Engines::MPIContainer_ptr Engines_MPIContainer_i::start_MPIimpl( - const char* ContainerName, - CORBA::Short nbproc ) +// Load a component library +bool Engines_MPIContainer_i::load_component_Library(const char* componentName) { + if( _numproc == 0 ){ + // Invocation du chargement du composant dans les autres process + for(int ip= 1;ip<_nbproc;ip++) + (Engines::MPIContainer::_narrow((*_tior)[ip]))->Asload_component_Library(componentName); + } - char nbp[1024]; + return Lload_component_Library(componentName); +} - MESSAGE("[" << _numproc << "] start_impl argc " << _argc << " ContainerName " << ContainerName - << hex << this << dec) ; - _numInstanceMutex.lock() ; // lock on the instance number +void Engines_MPIContainer_i::Asload_component_Library(const char* componentName) +{ + Lload_component_Library(componentName); +} + +bool Engines_MPIContainer_i::Lload_component_Library(const char* componentName) +{ + string aCompName = componentName; + + // --- try dlopen C++ component - CORBA::Object_var obj = Engines::MPIContainer::_nil() ; - bool nilvar = true ; - try { - string cont("/Containers/"); - cont += machineName() ; - cont += "/" ; - cont += ContainerName; - INFOS("[" << _numproc << "] " << machineName() << " start_impl unknown container " << cont.c_str() - << " try to Resolve" ); - obj = _NS->Resolve( cont.c_str() ); - nilvar = CORBA::is_nil( obj ) ; - if ( nilvar ) { - INFOS("[" << _numproc << "] " << machineName() << " start_impl unknown container " - << ContainerName); + string impl_name = string ("lib") + aCompName + string("Engine.so"); + SCRUTE(impl_name); + + _numInstanceMutex.lock(); // lock to be alone + // (see decInstanceCnt, finalize_removal)) + if (_toRemove_map[impl_name]) _toRemove_map.erase(impl_name); + if (_library_map[impl_name]) + { + MESSAGE("[" << _numproc << "] Library " << impl_name << " already loaded"); + _numInstanceMutex.unlock(); + return true; } + + void* handle; + handle = dlopen( impl_name.c_str() , RTLD_LAZY ) ; + if ( handle ) + { + _library_map[impl_name] = handle; + _numInstanceMutex.unlock(); + return true; + } + else + { + INFOS("[" << _numproc << "] Can't load shared library : " << impl_name); + INFOS("[" << _numproc << "] error dlopen: " << dlerror()); + } + _numInstanceMutex.unlock(); + + // --- try import Python component + + INFOS("[" << _numproc << "] try import Python component "<Ascreate_component_instance(componentName,studyId); } - catch (ServiceUnreachable&) { - INFOS("[" << _numproc << "] " << machineName() << "Caught exception: Naming Service Unreachable"); - } - catch (...) { - INFOS("[" << _numproc << "] " << machineName() << "Caught unknown exception."); + + return Lcreate_component_instance(componentName,studyId); +} + +void Engines_MPIContainer_i::Ascreate_component_instance( const char* componentName, + CORBA::Long studyId) +{ + Lcreate_component_instance(componentName,studyId); +} + +Engines::Component_ptr +Engines_MPIContainer_i::Lcreate_component_instance( const char* genericRegisterName, CORBA::Long studyId) +{ + if (studyId < 0) { + INFOS("studyId must be > 0 for mono study instance, =0 for multiStudy"); + return Engines::Component::_nil() ; } - if ( !nilvar ) { + + Engines::Component_var iobject = Engines::Component::_nil() ; + Engines::MPIObject_var pobj; + + string aCompName = genericRegisterName; + if (_library_map[aCompName]) { // Python component + if (_isSupervContainer) { + INFOS("Supervision Container does not support Python Component Engines"); + return Engines::Component::_nil(); + } + _numInstanceMutex.lock() ; // lock on the instance number + _numInstance++ ; + int numInstance = _numInstance ; _numInstanceMutex.unlock() ; - MESSAGE("[" << _numproc << "] start_impl container found without new launch") ; - return Engines::MPIContainer::_narrow(obj); - } - int i = 0 ; - while ( _argv[ i ] ) { - MESSAGE("[" << _numproc << "] argv" << i << " " << _argv[ i ]) ; - i++ ; + + char aNumI[12]; + sprintf( aNumI , "%d" , numInstance ) ; + string instanceName = aCompName + "_inst_" + aNumI ; + string component_registerName = + _containerName + "/" + instanceName; + + Py_ACQUIRE_NEW_THREAD; + PyObject *mainmod = PyImport_AddModule("__main__"); + PyObject *globals = PyModule_GetDict(mainmod); + PyObject *pyCont = PyDict_GetItemString(globals, "pyCont"); + PyObject *result = PyObject_CallMethod(pyCont, + "create_component_instance", + "ssl", + aCompName.c_str(), + instanceName.c_str(), + studyId); + string iors = PyString_AsString(result); + SCRUTE(iors); + Py_RELEASE_NEW_THREAD; + + CORBA::Object_var obj = _orb->string_to_object(iors.c_str()); + iobject = Engines::Component::_narrow( obj ) ; + pobj = Engines::MPIObject::_narrow(obj) ; + if( _numproc == 0 ) + _NS->Register(iobject, component_registerName.c_str()) ; + // Root recupere les ior des composants des autre process + BCastIOR(_orb,pobj,false); + + return iobject._retn(); } - sprintf(nbp,"mpirun -np %d SALOME_MPIContainer ",nbproc); - string shstr(nbp); - shstr += ContainerName ; - if ( _argc == 4 ) { - shstr += " " ; - shstr += _argv[ 2 ] ; - shstr += " " ; - shstr += _argv[ 3 ] ; + + //--- try C++ + + string impl_name = string ("lib") + genericRegisterName +string("Engine.so"); + void* handle = _library_map[impl_name]; + if ( !handle ) { + INFOS("shared library " << impl_name <<"must be loaded before instance"); + return Engines::Component::_nil() ; } - shstr += " > /tmp/" ; - shstr += ContainerName ; - shstr += ".log 2>&1 &" ; - MESSAGE("system(" << shstr << ")") ; - int status = system( shstr.c_str() ) ; - if (status == -1) { - INFOS("[" << _numproc << "] Engines_MPIContainer_i::start_impl SALOME_MPIContainer failed (system command status -1)") ; + else { + iobject = createMPIInstance(genericRegisterName, + handle, + studyId); + return iobject._retn(); } - else if (status == 217) { - INFOS("[" << _numproc << "] Engines_MPIContainer_i::start_impl SALOME_MPIContainer failed (system command status 217)") ; +} + +Engines::Component_ptr +Engines_MPIContainer_i::createMPIInstance(string genericRegisterName, + void *handle, + int studyId) +{ + Engines::Component_var iobject; + Engines::MPIObject_var pobj; + // --- find the factory + + string aGenRegisterName = genericRegisterName; + string factory_name = aGenRegisterName + string("Engine_factory"); + SCRUTE(factory_name) ; + + typedef PortableServer::ObjectId * (*MPIFACTORY_FUNCTION) + (int,int, + CORBA::ORB_ptr, + PortableServer::POA_ptr, + PortableServer::ObjectId *, + const char *, + const char *) ; + + MPIFACTORY_FUNCTION MPIComponent_factory + = (MPIFACTORY_FUNCTION) dlsym(handle, factory_name.c_str()); + + char *error ; + if ( (error = dlerror() ) != NULL) { + // Try to load a sequential component + MESSAGE("[" << _numproc << "] Try to load a sequential component"); + _numInstanceMutex.unlock() ; + iobject = Engines_Container_i::createInstance(genericRegisterName,handle,studyId); + if( CORBA::is_nil(iobject) ) return Engines::Component::_duplicate(iobject); } - INFOS("[" << _numproc << "] " << machineName() << " Engines_MPIContainer_i::start_impl SALOME_MPIContainer launch done"); - - obj = Engines::MPIContainer::_nil() ; - try { - string cont("/Containers/"); - cont += machineName() ; - cont += "/" ; - cont += ContainerName; - nilvar = true ; - int count = 20 ; - while ( nilvar && count >= 0) { - sleep( 1 ) ; - obj = _NS->Resolve(cont.c_str()); - nilvar = CORBA::is_nil( obj ) ; - if ( nilvar ) { - INFOS("[" << _numproc << "] " << count << ". " << machineName() - << " start_impl unknown container " << cont.c_str()); - count -= 1 ; + + // --- create instance + + iobject = Engines::Component::_nil() ; + + try + { + _numInstanceMutex.lock() ; // lock on the instance number + _numInstance++ ; + int numInstance = _numInstance ; + _numInstanceMutex.unlock() ; + + char aNumI[12]; + sprintf( aNumI , "%d" , numInstance ) ; + string instanceName = aGenRegisterName + "_inst_" + aNumI ; + string component_registerName = + _containerName + "/" + instanceName; + + // --- Instanciate required CORBA object + + PortableServer::ObjectId *id ; //not owner, do not delete (nore use var) + id = (MPIComponent_factory) ( _nbproc,_numproc,_orb, _poa, _id, instanceName.c_str(), + aGenRegisterName.c_str() ) ; + + // --- get reference & servant from id + + CORBA::Object_var obj = _poa->id_to_reference(*id); + iobject = Engines::Component::_narrow( obj ) ; + pobj = Engines::MPIObject::_narrow(obj) ; + + Engines_Component_i *servant = + dynamic_cast(_poa->reference_to_servant(iobject)); + ASSERT(servant); + //SCRUTE(servant->pd_refCount); + servant->_remove_ref(); // compensate previous id_to_reference + //SCRUTE(servant->pd_refCount); + _listInstances_map[instanceName] = iobject; + _cntInstances_map[aGenRegisterName] += 1; + SCRUTE(aGenRegisterName); + SCRUTE(_cntInstances_map[aGenRegisterName]); + //SCRUTE(servant->pd_refCount); + bool ret_studyId = servant->setStudyId(studyId); + ASSERT(ret_studyId); + + // --- register the engine under the name + // containerName(.dir)/instanceName(.object) + + if( _numproc == 0 ){ + _NS->Register( iobject , component_registerName.c_str() ) ; + MESSAGE( component_registerName.c_str() << " bound" ) ; } + // Root recupere les ior des composants des autre process + BCastIOR(_orb,pobj,false); + } - _numInstanceMutex.unlock() ; - if ( !nilvar ) { - MESSAGE("[" << _numproc << "] start_impl container found after new launch of SALOME_MPIContainer") ; + catch (...) + { + INFOS( "Container_i::createInstance exception catched" ) ; } - return Engines::MPIContainer::_narrow(obj); - } - catch (ServiceUnreachable&) { - INFOS("[" << _numproc << "] " << machineName() << "Caught exception: Naming Service Unreachable"); - } - catch (...) { - INFOS("[" << _numproc << "] " << machineName() << "Caught unknown exception."); - } - _numInstanceMutex.unlock() ; - MESSAGE("[" << _numproc << "] start_impl MPI container not found after new launch of SALOME_MPIContainer") ; - return Engines::MPIContainer::_nil() ; + return iobject._retn(); } // Load component Engines::Component_ptr Engines_MPIContainer_i::load_impl(const char* nameToRegister, const char* componentName) { - int ip; - if( _numproc == 0 ){ // Invocation du chargement du composant dans les autres process - for(ip= 1;ip<_nbproc;ip++) - (Engines::MPIContainer::_narrow((*_tior)[ip]))->SPload_impl(nameToRegister, + for(int ip= 1;ip<_nbproc;ip++) + (Engines::MPIContainer::_narrow((*_tior)[ip]))->Asload_impl(nameToRegister, componentName); } @@ -195,7 +408,7 @@ Engines::Component_ptr Engines_MPIContainer_i::load_impl(const char* nameToRegis } // Load component -void Engines_MPIContainer_i::SPload_impl(const char* nameToRegister, +void Engines_MPIContainer_i::Asload_impl(const char* nameToRegister, const char* componentName) { Lload_impl(nameToRegister,componentName); @@ -275,7 +488,6 @@ Engines::Component_ptr Engines_MPIContainer_i::Lload_impl( _NS->Register(iobject, component_registerName.c_str()) ; } - handle_map[instanceName] = handle; _numInstanceMutex.unlock() ; // Root recupere les ior des composants des autre process @@ -301,25 +513,20 @@ void Engines_MPIContainer_i::remove_impl(Engines::Component_ptr component_i) for(ip= 1;ip<_nbproc;ip++){ spcptr = Engines::MPIObject::_narrow((*(pcptr->tior()))[ip]); cptr = (Engines::Component_ptr)spcptr; - (Engines::MPIContainer::_narrow((*_tior)[ip]))->SPremove_impl(cptr); + (Engines::MPIContainer::_narrow((*_tior)[ip]))->Asremove_impl(cptr); } } Lremove_impl(component_i); } -void Engines_MPIContainer_i::SPremove_impl(Engines::Component_ptr component_i) +void Engines_MPIContainer_i::Asremove_impl(Engines::Component_ptr component_i) { Lremove_impl(component_i); } void Engines_MPIContainer_i::Lremove_impl(Engines::Component_ptr component_i) { - int ip; - Engines::Component_ptr cptr; - Engines::MPIObject_ptr pcptr; - Engines::MPIObject_ptr spcptr; - BEGIN_OF("[" << _numproc << "] MPIContainer_i::Lremove_impl"); ASSERT(! CORBA::is_nil(component_i)); @@ -329,19 +536,8 @@ void Engines_MPIContainer_i::Lremove_impl(Engines::Component_ptr component_i) component_i->destroy() ; MESSAGE("[" << _numproc << "] test key handle_map"); _numInstanceMutex.lock() ; // lock on the remove on handle_map - if (handle_map[instanceName]) // if key does not exist, created & initialized null - { - remove_map[instanceName] = handle_map[instanceName] ; - } - else MESSAGE("[" << _numproc << "] no key handle_map"); - handle_map.erase(instanceName) ; _numInstanceMutex.unlock() ; MESSAGE("[" << _numproc << "] list handle_map"); - map::iterator im ; - for (im = handle_map.begin() ; im != handle_map.end() ; im ++) - { - MESSAGE("[" << _numproc << "] stay " << (*im).first); - } END_OF("[" << _numproc << "] MPIContainer_i::Lremove_impl"); @@ -354,13 +550,13 @@ void Engines_MPIContainer_i::finalize_removal() if( _numproc == 0 ){ // Invocation de la destruction du composant dans les autres process for(ip= 1;ip<_nbproc;ip++) - (Engines::MPIContainer::_narrow((*_tior)[ip]))->SPfinalize_removal(); + (Engines::MPIContainer::_narrow((*_tior)[ip]))->Asfinalize_removal(); } Lfinalize_removal(); } -void Engines_MPIContainer_i::SPfinalize_removal() +void Engines_MPIContainer_i::Asfinalize_removal() { Lfinalize_removal(); } @@ -369,18 +565,18 @@ void Engines_MPIContainer_i::Lfinalize_removal() { BEGIN_OF("[" << _numproc << "] MPIContainer_i::Lfinalize_removal"); - map::iterator im ; - // lock on the explore remove_map & dlclose - _numInstanceMutex.lock() ; - for (im = remove_map.begin() ; im != remove_map.end() ; im ++) - { - void * handle = (*im).second ; - MESSAGE("[" << _numproc << "] dlclose " << (*im).first); - dlclose(handle) ; - } - MESSAGE("[" << _numproc << "] remove_map.clear()"); - remove_map.clear() ; - _numInstanceMutex.unlock() ; +// map::iterator im ; +// // lock on the explore remove_map & dlclose +// _numInstanceMutex.lock() ; +// for (im = remove_map.begin() ; im != remove_map.end() ; im ++) +// { +// void * handle = (*im).second ; +// MESSAGE("[" << _numproc << "] dlclose " << (*im).first); +// dlclose(handle) ; +// } +// MESSAGE("[" << _numproc << "] remove_map.clear()"); +// remove_map.clear() ; +// _numInstanceMutex.unlock() ; END_OF("[" << _numproc << "] MPIContainer_i::Lfinalize_removal"); }