#include <iostream.h>
#include <dlfcn.h>
#include <stdio.h>
+#include "SALOME_Component_i.hxx"
#include "MPIContainer_i.hxx"
#include "SALOME_NamingService.hxx"
#include "Utils_SINGLETON.hxx"
#include "OpUtil.hxx"
#include "utilities.h"
+#include <pthread.h> // must be before Python.h !
+#include <Python.h>
+#include "Container_init_python.hxx"
using namespace std;
// L'appel au registry SALOME ne se fait que pour le process 0
PortableServer::POA_ptr poa,
char * containerName,
int argc, char *argv[])
- : Engines_Container_i(orb,poa,containerName,argc,argv,false,false), MPIObject_i(nbproc,numproc)
+ : Engines_Container_i(orb,poa,containerName,argc,argv,false), MPIObject_i(nbproc,numproc)
{
+ long id=0;
+ string IdContainerinNS;
+ char idc[3*sizeof(long)];
+
MESSAGE("[" << numproc << "] activate object");
_id = _poa->activate_object(this);
+ if(argc>1)
+ {
+ for(int i=0;i<argc;i++)
+ {
+ if(strcmp(argv[i],"-id")==NULL)
+ {
+ id = atoi(argv[i+1]);
+ continue;
+ }
+ }
+ }
+ SCRUTE(id);
+
if(numproc==0){
- // _NS = new SALOME_NamingService(_orb);
- _NS = SINGLETON_<SALOME_NamingService>::Instance() ;
- ASSERT(SINGLETON_<SALOME_NamingService>::IsAlreadyExisting()) ;
- _NS->init_orb( orb ) ;
+ _NS = new SALOME_NamingService();
+ _NS->init_orb( CORBA::ORB::_duplicate(_orb) ) ;
- Engines::Container_ptr pCont
- = Engines::Container::_narrow(POA_Engines::MPIContainer::_this());
+ CORBA::Object_var obj=_poa->id_to_reference(*_id);
+ Engines::Container_var pCont = Engines::Container::_narrow(obj);
+
+ string hostname = GetHostname();
+ _containerName = _NS->BuildContainerNameForNS(containerName,hostname.c_str());
SCRUTE(_containerName);
_NS->Register(pCont, _containerName.c_str());
+
+ // A parallel container registers in Naming Service
+ // on the machine where is process 0. ContainerManager does'nt know the name
+ // of this machine before the launch of the parallel container. So to get
+ // the IOR of the parallel container in Naming Service, ContainerManager
+ // gives a unique Id. The parallel container registers his name under
+ // /ContainerManager/Id directory in NamingService
+
+ IdContainerinNS = "/ContainerManager/id";
+ sprintf(idc,"%ld",id);
+ IdContainerinNS += idc;
+ SCRUTE(IdContainerinNS);
+ _NS->Register(pCont, IdContainerinNS.c_str());
+
}
// Root recupere les ior des container des autre process
Engines_MPIContainer_i::~Engines_MPIContainer_i(void)
{
MESSAGE("[" << _numproc << "] Engines_MPIContainer_i::~Engines_MPIContainer_i()");
- if( !handle_map.empty() ){
- MESSAGE("[" << _numproc << "] Engines_MPIContainer_i::~Engines_MPIContainer_i: warning destroy a not empty container");
+}
+
+// Load component
+void Engines_MPIContainer_i::Shutdown()
+{
+ int ip;
+ MESSAGE("[" << _numproc << "] shutdown of MPI Corba Server");
+ if( _numproc == 0 ){
+ _NS->Destroy_FullDirectory(_containerName.c_str());
+ for(ip= 1;ip<_nbproc;ip++)
+ (Engines::MPIContainer::_narrow((*_tior)[ip]))->Shutdown();
}
+ _orb->shutdown(0);
+
}
-// Start MPI Container
-Engines::MPIContainer_ptr Engines_MPIContainer_i::start_MPIimpl(
- const char* ContainerName,
- CORBA::Short nbproc )
+// Load a component library
+bool Engines_MPIContainer_i::load_component_Library(const char* componentName)
{
+ if( _numproc == 0 ){
+ // Invocation du chargement du composant dans les autres process
+ for(int ip= 1;ip<_nbproc;ip++)
+ (Engines::MPIContainer::_narrow((*_tior)[ip]))->Asload_component_Library(componentName);
+ }
- char nbp[1024];
+ return Lload_component_Library(componentName);
+}
- MESSAGE("[" << _numproc << "] start_impl argc " << _argc << " ContainerName " << ContainerName
- << hex << this << dec) ;
- _numInstanceMutex.lock() ; // lock on the instance number
+void Engines_MPIContainer_i::Asload_component_Library(const char* componentName)
+{
+ Lload_component_Library(componentName);
+}
+
+bool Engines_MPIContainer_i::Lload_component_Library(const char* componentName)
+{
+ string aCompName = componentName;
+
+ // --- try dlopen C++ component
- CORBA::Object_var obj = Engines::MPIContainer::_nil() ;
- bool nilvar = true ;
- try {
- string cont("/Containers/");
- cont += machineName() ;
- cont += "/" ;
- cont += ContainerName;
- INFOS("[" << _numproc << "] " << machineName() << " start_impl unknown container " << cont.c_str()
- << " try to Resolve" );
- obj = _NS->Resolve( cont.c_str() );
- nilvar = CORBA::is_nil( obj ) ;
- if ( nilvar ) {
- INFOS("[" << _numproc << "] " << machineName() << " start_impl unknown container "
- << ContainerName);
+ string impl_name = string ("lib") + aCompName + string("Engine.so");
+ SCRUTE(impl_name);
+
+ _numInstanceMutex.lock(); // lock to be alone
+ // (see decInstanceCnt, finalize_removal))
+ if (_toRemove_map[impl_name]) _toRemove_map.erase(impl_name);
+ if (_library_map[impl_name])
+ {
+ MESSAGE("[" << _numproc << "] Library " << impl_name << " already loaded");
+ _numInstanceMutex.unlock();
+ return true;
}
+
+ void* handle;
+ handle = dlopen( impl_name.c_str() , RTLD_LAZY ) ;
+ if ( handle )
+ {
+ _library_map[impl_name] = handle;
+ _numInstanceMutex.unlock();
+ return true;
+ }
+ else
+ {
+ INFOS("[" << _numproc << "] Can't load shared library : " << impl_name);
+ INFOS("[" << _numproc << "] error dlopen: " << dlerror());
+ }
+ _numInstanceMutex.unlock();
+
+ // --- try import Python component
+
+ INFOS("[" << _numproc << "] try import Python component "<<componentName);
+ if (_isSupervContainer)
+ {
+ INFOS("[" << _numproc << "] Supervision Container does not support Python Component Engines");
+ return false;
+ }
+ if (_library_map[aCompName])
+ {
+ return true; // Python Component, already imported
+ }
+ else
+ {
+ Py_ACQUIRE_NEW_THREAD;
+ PyObject *mainmod = PyImport_AddModule("__main__");
+ PyObject *globals = PyModule_GetDict(mainmod);
+ PyObject *pyCont = PyDict_GetItemString(globals, "pyCont");
+ PyObject *result = PyObject_CallMethod(pyCont,
+ "import_component",
+ "s",componentName);
+ int ret= PyInt_AsLong(result);
+ SCRUTE(ret);
+ Py_RELEASE_NEW_THREAD;
+
+ if (ret) // import possible: Python component
+ {
+ _library_map[aCompName] = (void *)pyCont; // any non O value OK
+ MESSAGE("[" << _numproc << "] import Python: "<<aCompName<<" OK");
+ return true;
+ }
+ }
+ return false;
+}
+
+// Create an instance of component
+Engines::Component_ptr
+Engines_MPIContainer_i::create_component_instance( const char* componentName,
+ CORBA::Long studyId)
+{
+ if( _numproc == 0 ){
+ // Invocation du chargement du composant dans les autres process
+ for(int ip= 1;ip<_nbproc;ip++)
+ (Engines::MPIContainer::_narrow((*_tior)[ip]))->Ascreate_component_instance(componentName,studyId);
}
- catch (ServiceUnreachable&) {
- INFOS("[" << _numproc << "] " << machineName() << "Caught exception: Naming Service Unreachable");
- }
- catch (...) {
- INFOS("[" << _numproc << "] " << machineName() << "Caught unknown exception.");
+
+ return Lcreate_component_instance(componentName,studyId);
+}
+
+void Engines_MPIContainer_i::Ascreate_component_instance( const char* componentName,
+ CORBA::Long studyId)
+{
+ Lcreate_component_instance(componentName,studyId);
+}
+
+Engines::Component_ptr
+Engines_MPIContainer_i::Lcreate_component_instance( const char* genericRegisterName, CORBA::Long studyId)
+{
+ if (studyId < 0) {
+ INFOS("studyId must be > 0 for mono study instance, =0 for multiStudy");
+ return Engines::Component::_nil() ;
}
- if ( !nilvar ) {
+
+ Engines::Component_var iobject = Engines::Component::_nil() ;
+ Engines::MPIObject_var pobj;
+
+ string aCompName = genericRegisterName;
+ if (_library_map[aCompName]) { // Python component
+ if (_isSupervContainer) {
+ INFOS("Supervision Container does not support Python Component Engines");
+ return Engines::Component::_nil();
+ }
+ _numInstanceMutex.lock() ; // lock on the instance number
+ _numInstance++ ;
+ int numInstance = _numInstance ;
_numInstanceMutex.unlock() ;
- MESSAGE("[" << _numproc << "] start_impl container found without new launch") ;
- return Engines::MPIContainer::_narrow(obj);
- }
- int i = 0 ;
- while ( _argv[ i ] ) {
- MESSAGE("[" << _numproc << "] argv" << i << " " << _argv[ i ]) ;
- i++ ;
+
+ char aNumI[12];
+ sprintf( aNumI , "%d" , numInstance ) ;
+ string instanceName = aCompName + "_inst_" + aNumI ;
+ string component_registerName =
+ _containerName + "/" + instanceName;
+
+ Py_ACQUIRE_NEW_THREAD;
+ PyObject *mainmod = PyImport_AddModule("__main__");
+ PyObject *globals = PyModule_GetDict(mainmod);
+ PyObject *pyCont = PyDict_GetItemString(globals, "pyCont");
+ PyObject *result = PyObject_CallMethod(pyCont,
+ "create_component_instance",
+ "ssl",
+ aCompName.c_str(),
+ instanceName.c_str(),
+ studyId);
+ string iors = PyString_AsString(result);
+ SCRUTE(iors);
+ Py_RELEASE_NEW_THREAD;
+
+ CORBA::Object_var obj = _orb->string_to_object(iors.c_str());
+ iobject = Engines::Component::_narrow( obj ) ;
+ pobj = Engines::MPIObject::_narrow(obj) ;
+ if( _numproc == 0 )
+ _NS->Register(iobject, component_registerName.c_str()) ;
+ // Root recupere les ior des composants des autre process
+ BCastIOR(_orb,pobj,false);
+
+ return iobject._retn();
}
- sprintf(nbp,"mpirun -np %d SALOME_MPIContainer ",nbproc);
- string shstr(nbp);
- shstr += ContainerName ;
- if ( _argc == 4 ) {
- shstr += " " ;
- shstr += _argv[ 2 ] ;
- shstr += " " ;
- shstr += _argv[ 3 ] ;
+
+ //--- try C++
+
+ string impl_name = string ("lib") + genericRegisterName +string("Engine.so");
+ void* handle = _library_map[impl_name];
+ if ( !handle ) {
+ INFOS("shared library " << impl_name <<"must be loaded before instance");
+ return Engines::Component::_nil() ;
}
- shstr += " > /tmp/" ;
- shstr += ContainerName ;
- shstr += ".log 2>&1 &" ;
- MESSAGE("system(" << shstr << ")") ;
- int status = system( shstr.c_str() ) ;
- if (status == -1) {
- INFOS("[" << _numproc << "] Engines_MPIContainer_i::start_impl SALOME_MPIContainer failed (system command status -1)") ;
+ else {
+ iobject = createMPIInstance(genericRegisterName,
+ handle,
+ studyId);
+ return iobject._retn();
}
- else if (status == 217) {
- INFOS("[" << _numproc << "] Engines_MPIContainer_i::start_impl SALOME_MPIContainer failed (system command status 217)") ;
+}
+
+Engines::Component_ptr
+Engines_MPIContainer_i::createMPIInstance(string genericRegisterName,
+ void *handle,
+ int studyId)
+{
+ Engines::Component_var iobject;
+ Engines::MPIObject_var pobj;
+ // --- find the factory
+
+ string aGenRegisterName = genericRegisterName;
+ string factory_name = aGenRegisterName + string("Engine_factory");
+ SCRUTE(factory_name) ;
+
+ typedef PortableServer::ObjectId * (*MPIFACTORY_FUNCTION)
+ (int,int,
+ CORBA::ORB_ptr,
+ PortableServer::POA_ptr,
+ PortableServer::ObjectId *,
+ const char *,
+ const char *) ;
+
+ MPIFACTORY_FUNCTION MPIComponent_factory
+ = (MPIFACTORY_FUNCTION) dlsym(handle, factory_name.c_str());
+
+ char *error ;
+ if ( (error = dlerror() ) != NULL) {
+ // Try to load a sequential component
+ MESSAGE("[" << _numproc << "] Try to load a sequential component");
+ _numInstanceMutex.unlock() ;
+ iobject = Engines_Container_i::createInstance(genericRegisterName,handle,studyId);
+ if( CORBA::is_nil(iobject) ) return Engines::Component::_duplicate(iobject);
}
- INFOS("[" << _numproc << "] " << machineName() << " Engines_MPIContainer_i::start_impl SALOME_MPIContainer launch done");
-
- obj = Engines::MPIContainer::_nil() ;
- try {
- string cont("/Containers/");
- cont += machineName() ;
- cont += "/" ;
- cont += ContainerName;
- nilvar = true ;
- int count = 20 ;
- while ( nilvar && count >= 0) {
- sleep( 1 ) ;
- obj = _NS->Resolve(cont.c_str());
- nilvar = CORBA::is_nil( obj ) ;
- if ( nilvar ) {
- INFOS("[" << _numproc << "] " << count << ". " << machineName()
- << " start_impl unknown container " << cont.c_str());
- count -= 1 ;
+
+ // --- create instance
+
+ iobject = Engines::Component::_nil() ;
+
+ try
+ {
+ _numInstanceMutex.lock() ; // lock on the instance number
+ _numInstance++ ;
+ int numInstance = _numInstance ;
+ _numInstanceMutex.unlock() ;
+
+ char aNumI[12];
+ sprintf( aNumI , "%d" , numInstance ) ;
+ string instanceName = aGenRegisterName + "_inst_" + aNumI ;
+ string component_registerName =
+ _containerName + "/" + instanceName;
+
+ // --- Instanciate required CORBA object
+
+ PortableServer::ObjectId *id ; //not owner, do not delete (nore use var)
+ id = (MPIComponent_factory) ( _nbproc,_numproc,_orb, _poa, _id, instanceName.c_str(),
+ aGenRegisterName.c_str() ) ;
+
+ // --- get reference & servant from id
+
+ CORBA::Object_var obj = _poa->id_to_reference(*id);
+ iobject = Engines::Component::_narrow( obj ) ;
+ pobj = Engines::MPIObject::_narrow(obj) ;
+
+ Engines_Component_i *servant =
+ dynamic_cast<Engines_Component_i*>(_poa->reference_to_servant(iobject));
+ ASSERT(servant);
+ //SCRUTE(servant->pd_refCount);
+ servant->_remove_ref(); // compensate previous id_to_reference
+ //SCRUTE(servant->pd_refCount);
+ _listInstances_map[instanceName] = iobject;
+ _cntInstances_map[aGenRegisterName] += 1;
+ SCRUTE(aGenRegisterName);
+ SCRUTE(_cntInstances_map[aGenRegisterName]);
+ //SCRUTE(servant->pd_refCount);
+ bool ret_studyId = servant->setStudyId(studyId);
+ ASSERT(ret_studyId);
+
+ // --- register the engine under the name
+ // containerName(.dir)/instanceName(.object)
+
+ if( _numproc == 0 ){
+ _NS->Register( iobject , component_registerName.c_str() ) ;
+ MESSAGE( component_registerName.c_str() << " bound" ) ;
}
+ // Root recupere les ior des composants des autre process
+ BCastIOR(_orb,pobj,false);
+
}
- _numInstanceMutex.unlock() ;
- if ( !nilvar ) {
- MESSAGE("[" << _numproc << "] start_impl container found after new launch of SALOME_MPIContainer") ;
+ catch (...)
+ {
+ INFOS( "Container_i::createInstance exception catched" ) ;
}
- return Engines::MPIContainer::_narrow(obj);
- }
- catch (ServiceUnreachable&) {
- INFOS("[" << _numproc << "] " << machineName() << "Caught exception: Naming Service Unreachable");
- }
- catch (...) {
- INFOS("[" << _numproc << "] " << machineName() << "Caught unknown exception.");
- }
- _numInstanceMutex.unlock() ;
- MESSAGE("[" << _numproc << "] start_impl MPI container not found after new launch of SALOME_MPIContainer") ;
- return Engines::MPIContainer::_nil() ;
+ return iobject._retn();
}
// Load component
Engines::Component_ptr Engines_MPIContainer_i::load_impl(const char* nameToRegister,
const char* componentName)
{
- int ip;
-
if( _numproc == 0 ){
// Invocation du chargement du composant dans les autres process
- for(ip= 1;ip<_nbproc;ip++)
- (Engines::MPIContainer::_narrow((*_tior)[ip]))->SPload_impl(nameToRegister,
+ for(int ip= 1;ip<_nbproc;ip++)
+ (Engines::MPIContainer::_narrow((*_tior)[ip]))->Asload_impl(nameToRegister,
componentName);
}
}
// Load component
-void Engines_MPIContainer_i::SPload_impl(const char* nameToRegister,
+void Engines_MPIContainer_i::Asload_impl(const char* nameToRegister,
const char* componentName)
{
Lload_impl(nameToRegister,componentName);
_NS->Register(iobject, component_registerName.c_str()) ;
}
- handle_map[instanceName] = handle;
_numInstanceMutex.unlock() ;
// Root recupere les ior des composants des autre process
for(ip= 1;ip<_nbproc;ip++){
spcptr = Engines::MPIObject::_narrow((*(pcptr->tior()))[ip]);
cptr = (Engines::Component_ptr)spcptr;
- (Engines::MPIContainer::_narrow((*_tior)[ip]))->SPremove_impl(cptr);
+ (Engines::MPIContainer::_narrow((*_tior)[ip]))->Asremove_impl(cptr);
}
}
Lremove_impl(component_i);
}
-void Engines_MPIContainer_i::SPremove_impl(Engines::Component_ptr component_i)
+void Engines_MPIContainer_i::Asremove_impl(Engines::Component_ptr component_i)
{
Lremove_impl(component_i);
}
void Engines_MPIContainer_i::Lremove_impl(Engines::Component_ptr component_i)
{
- int ip;
- Engines::Component_ptr cptr;
- Engines::MPIObject_ptr pcptr;
- Engines::MPIObject_ptr spcptr;
-
BEGIN_OF("[" << _numproc << "] MPIContainer_i::Lremove_impl");
ASSERT(! CORBA::is_nil(component_i));
component_i->destroy() ;
MESSAGE("[" << _numproc << "] test key handle_map");
_numInstanceMutex.lock() ; // lock on the remove on handle_map
- if (handle_map[instanceName]) // if key does not exist, created & initialized null
- {
- remove_map[instanceName] = handle_map[instanceName] ;
- }
- else MESSAGE("[" << _numproc << "] no key handle_map");
- handle_map.erase(instanceName) ;
_numInstanceMutex.unlock() ;
MESSAGE("[" << _numproc << "] list handle_map");
- map<string, void *>::iterator im ;
- for (im = handle_map.begin() ; im != handle_map.end() ; im ++)
- {
- MESSAGE("[" << _numproc << "] stay " << (*im).first);
- }
END_OF("[" << _numproc << "] MPIContainer_i::Lremove_impl");
if( _numproc == 0 ){
// Invocation de la destruction du composant dans les autres process
for(ip= 1;ip<_nbproc;ip++)
- (Engines::MPIContainer::_narrow((*_tior)[ip]))->SPfinalize_removal();
+ (Engines::MPIContainer::_narrow((*_tior)[ip]))->Asfinalize_removal();
}
Lfinalize_removal();
}
-void Engines_MPIContainer_i::SPfinalize_removal()
+void Engines_MPIContainer_i::Asfinalize_removal()
{
Lfinalize_removal();
}
{
BEGIN_OF("[" << _numproc << "] MPIContainer_i::Lfinalize_removal");
- map<string, void *>::iterator im ;
- // lock on the explore remove_map & dlclose
- _numInstanceMutex.lock() ;
- for (im = remove_map.begin() ; im != remove_map.end() ; im ++)
- {
- void * handle = (*im).second ;
- MESSAGE("[" << _numproc << "] dlclose " << (*im).first);
- dlclose(handle) ;
- }
- MESSAGE("[" << _numproc << "] remove_map.clear()");
- remove_map.clear() ;
- _numInstanceMutex.unlock() ;
+// map<string, void *>::iterator im ;
+// // lock on the explore remove_map & dlclose
+// _numInstanceMutex.lock() ;
+// for (im = remove_map.begin() ; im != remove_map.end() ; im ++)
+// {
+// void * handle = (*im).second ;
+// MESSAGE("[" << _numproc << "] dlclose " << (*im).first);
+// dlclose(handle) ;
+// }
+// MESSAGE("[" << _numproc << "] remove_map.clear()");
+// remove_map.clear() ;
+// _numInstanceMutex.unlock() ;
END_OF("[" << _numproc << "] MPIContainer_i::Lfinalize_removal");
}