From: ribes Date: Fri, 13 Mar 2009 14:54:34 +0000 (+0000) Subject: - New Version of Parallel Container X-Git-Tag: new_launcher_alpha_091119~50 X-Git-Url: http://git.salome-platform.org/gitweb/?a=commitdiff_plain;h=d641d6e1f2926458e260e907a8b22d1d5b7f771d;p=modules%2Fkernel.git - New Version of Parallel Container --- diff --git a/idl/DSC_Engines.idl b/idl/DSC_Engines.idl index f37911362..ca20b44a2 100644 --- a/idl/DSC_Engines.idl +++ b/idl/DSC_Engines.idl @@ -325,9 +325,9 @@ module Engines { */ void disconnect(in connectionId id, in Engines::DSC::Message message) raises(Engines::ConnectionManager::BadId, - Engines::DSC::PortNotDefined, - Engines::DSC::PortNotConnected, - Engines::DSC::BadPortReference); + Engines::DSC::PortNotDefined, + Engines::DSC::PortNotConnected, + Engines::DSC::BadPortReference); //! Shutdown the ConnectionManager process. oneway void ShutdownWithExit(); diff --git a/idl/Makefile.am b/idl/Makefile.am index d724adcc6..bf2ddb34c 100644 --- a/idl/Makefile.am +++ b/idl/Makefile.am @@ -141,7 +141,7 @@ PAR_INCLUDES = SALOME_Exception.hxx SALOME_GenericObj.hxx SALOMEDS.hxx PAR_LIB = libSalomeParallelIDLKernel.la IDL_PACO = SALOME_ComponentPaCO.idl SALOME_PortsPaCO.idl DSC_EnginesPaCO.idl \ - SALOME_ParamPorts.idl SALOME_PACOExtension.idl + SALOME_ParamPortsPaCO.idl SALOME_PACOExtensionPaCO.idl GEN_PACO = SALOME_ComponentPaCO_Engines_Container_server.cxx \ SALOME_ComponentPaCO_Engines_Container_client.cxx \ diff --git a/idl/SALOME_PACOExtension.idl b/idl/SALOME_PACOExtension.idl index 01cd4f818..6ebecd2d5 100644 --- a/idl/SALOME_PACOExtension.idl +++ b/idl/SALOME_PACOExtension.idl @@ -43,7 +43,7 @@ module Engines // Replicated Method used by the proxy to create // a PACO Component void create_paco_component_node_instance(in string registeredName, - in long studyId); + in long studyId) raises(SALOME::SALOME_Exception); }; /*--------------------------------------------------------------------------------------------*/ diff --git a/src/LifeCycleCORBA/SALOME_LifeCycleCORBA.cxx b/src/LifeCycleCORBA/SALOME_LifeCycleCORBA.cxx index a9ab219c8..6986108ba 100644 --- a/src/LifeCycleCORBA/SALOME_LifeCycleCORBA.cxx +++ b/src/LifeCycleCORBA/SALOME_LifeCycleCORBA.cxx @@ -700,7 +700,7 @@ SALOME_LifeCycleCORBA::Load_ParallelComponent(const Engines::MachineParameters& MESSAGE("Creating component instance"); // @PARALLEL@ permits to identify that the component requested // is a parallel component. - string name = string(componentName) + string("@PARALLEL@"); + string name = string(componentName); Engines::Component_var myInstance = cont->create_component_instance(name.c_str(), studyId); if (CORBA::is_nil(myInstance)) INFOS("create_component_instance returns a NULL component !"); diff --git a/src/ParallelContainer/SALOME_ParallelContainerNodeDummy.cxx b/src/ParallelContainer/SALOME_ParallelContainerNodeDummy.cxx index 165a86de3..55dd65ad4 100644 --- a/src/ParallelContainer/SALOME_ParallelContainerNodeDummy.cxx +++ b/src/ParallelContainer/SALOME_ParallelContainerNodeDummy.cxx @@ -48,6 +48,8 @@ #include "SALOMETraceCollector.hxx" #include "OpUtil.hxx" +#include "Container_init_python.hxx" + using namespace std; #ifdef _DEBUG_ @@ -120,6 +122,7 @@ int main(int argc, char* argv[]) // Initialise the ORB. CORBA::ORB_var orb = CORBA::ORB_init(argc, argv); + KERNEL_PYTHON::init_python(argc,argv); std::string containerName(""); if(argc > 1) { @@ -159,6 +162,7 @@ int main(int argc, char* argv[]) myid, root_poa, (char*) node_name.c_str(), + containerName, argc, argv); // PaCO++ init paco_fabrique_manager * pfm = paco_getFabriqueManager(); @@ -168,9 +172,7 @@ int main(int argc, char* argv[]) servant->setLibThread("omni"); // Activation - PortableServer::ObjectId * _id = root_poa->activate_object(servant); - servant->set_id(_id); - obj = root_poa->id_to_reference(*_id); + obj = servant->_this(); // In the NamingService string hostname = Kernel_Utils::GetHostname(); @@ -183,6 +185,9 @@ int main(int argc, char* argv[]) ns->Register(obj, _containerName.c_str()); pman->activate(); orb->run(); + PyGILState_Ensure(); + //Delete python container that destroy orb from python (pyCont._orb.destroy()) + Py_Finalize(); } catch (PaCO::PACO_Exception& e) { diff --git a/src/ParallelContainer/SALOME_ParallelContainerNodeMpi.cxx b/src/ParallelContainer/SALOME_ParallelContainerNodeMpi.cxx index 882f88e33..72354a0ac 100644 --- a/src/ParallelContainer/SALOME_ParallelContainerNodeMpi.cxx +++ b/src/ParallelContainer/SALOME_ParallelContainerNodeMpi.cxx @@ -51,6 +51,8 @@ #include "SALOMETraceCollector.hxx" #include "OpUtil.hxx" +#include "Container_init_python.hxx" + using namespace std; #ifdef _DEBUG_ @@ -147,6 +149,7 @@ int main(int argc, char* argv[]) cerr << "Level provided : " << provided << endl; // Initialise the ORB. CORBA::ORB_var orb = CORBA::ORB_init(argc, argv); + KERNEL_PYTHON::init_python(argc,argv); // Code pour choisir le reseau infiniband ..... /* string hostname_temp = GetHostname(); @@ -196,6 +199,7 @@ int main(int argc, char* argv[]) myid, root_poa, (char*) node_name.c_str(), + containerName, argc, argv); // PaCO++ init paco_fabrique_manager * pfm = paco_getFabriqueManager(); @@ -206,10 +210,7 @@ int main(int argc, char* argv[]) servant->setLibCom("mpi", ¶llel_object_group); servant->setLibThread("omni"); - // Activation - PortableServer::ObjectId * _id = root_poa->activate_object(servant); - servant->set_id(_id); - obj = root_poa->id_to_reference(*_id); + obj = servant->_this(); // In the NamingService string hostname = Kernel_Utils::GetHostname(); @@ -225,6 +226,9 @@ int main(int argc, char* argv[]) ns->Register(obj, _containerName.c_str()); pman->activate(); orb->run(); + PyGILState_Ensure(); + //Delete python container that destroy orb from python (pyCont._orb.destroy()) + Py_Finalize(); } catch (PaCO::PACO_Exception& e) { diff --git a/src/ParallelContainer/SALOME_ParallelContainerProxyDummy.cxx b/src/ParallelContainer/SALOME_ParallelContainerProxyDummy.cxx index 683a6f660..4ff59ce23 100644 --- a/src/ParallelContainer/SALOME_ParallelContainerProxyDummy.cxx +++ b/src/ParallelContainer/SALOME_ParallelContainerProxyDummy.cxx @@ -115,8 +115,9 @@ int main(int argc, char* argv[]) serveur_topo.total = nbnodes; proxy->setTopology(serveur_topo); - PortableServer::ObjectId_var _id = root_poa->activate_object(proxy); - obj = root_poa->id_to_reference(_id); + //PortableServer::ObjectId_var _id = root_poa->activate_object(proxy); + //obj = root_poa->id_to_reference(_id); + obj = proxy->_this(); // In the NamingService string hostname = Kernel_Utils::GetHostname(); diff --git a/src/ParallelContainer/SALOME_ParallelContainerProxyMpi.cxx b/src/ParallelContainer/SALOME_ParallelContainerProxyMpi.cxx index 2ed477903..82b28a8bf 100644 --- a/src/ParallelContainer/SALOME_ParallelContainerProxyMpi.cxx +++ b/src/ParallelContainer/SALOME_ParallelContainerProxyMpi.cxx @@ -173,8 +173,9 @@ int main(int argc, char* argv[]) proxy->setTopology(serveur_topo); // Activation - PortableServer::ObjectId_var _id = root_poa->activate_object(proxy); - obj = root_poa->id_to_reference(_id); + //PortableServer::ObjectId_var _id = root_poa->activate_object(proxy); + //obj = root_poa->id_to_reference(_id); + obj = proxy->_this(); // in the NamingService string hostname = Kernel_Utils::GetHostname(); diff --git a/src/ParallelContainer/SALOME_ParallelContainerProxy_i.cxx b/src/ParallelContainer/SALOME_ParallelContainerProxy_i.cxx index 90b61996b..fe7931499 100644 --- a/src/ParallelContainer/SALOME_ParallelContainerProxy_i.cxx +++ b/src/ParallelContainer/SALOME_ParallelContainerProxy_i.cxx @@ -120,19 +120,26 @@ Container_proxy_impl_final::load_component_Library(const char* componentName) //Test if lib could contain a parallel component - std::string paco_test_fct_signature("isAPACO_Component"); + std::string paco_test_fct_signature = aCompName + std::string("_isAPACO_Component"); + INFOS("SIG is : " << paco_test_fct_signature); PACO_TEST_FUNCTION paco_test_fct = NULL; #ifndef WIN32 paco_test_fct = (PACO_TEST_FUNCTION)dlsym(handle, paco_test_fct_signature.c_str()); #else paco_test_fct = (PACO_TEST_FUNCTION)GetProcAddress((HINSTANCE)handle, paco_test_fct_signature.c_str()); #endif - if (!paco_test_fct) + if (paco_test_fct) { // PaCO Component found MESSAGE("PACO LIB FOUND"); _libtype_map[aCompName] = "par"; - _parlibfct_map[aCompName] = paco_test_fct; + } + else + { + MESSAGE("SEQ LIB FOUND"); +#ifndef WIN32 + MESSAGE("dlerror() result is : " << dlerror()); +#endif } } _numInstanceMutex.unlock(); @@ -166,7 +173,8 @@ Container_proxy_impl_final::load_component_Library(const char* componentName) } // If ret is false -> lib is not loaded ! - _libtype_map.erase(aCompName); + if (!ret) + _libtype_map.erase(aCompName); return ret; } @@ -181,7 +189,7 @@ Container_proxy_impl_final::create_component_instance(const char* componentName, if (_libtype_map.count(aCompName) == 0) { // Component is not loaded ! - INFOS("Proxy: component is not loaded !"); + INFOS("Proxy: component is not loaded ! : " << aCompName); return Engines::Component::_nil(); } @@ -189,12 +197,6 @@ Container_proxy_impl_final::create_component_instance(const char* componentName, if (_libtype_map[aCompName] == "seq") return Engines::Container_proxy_impl::create_component_instance(componentName, studyId); - // Test if the component inside the parallel lib - // is parallel or sequential - bool parallel_component = (_parlibfct_map[aCompName]) (componentName); - if (!parallel_component) - return Engines::Container_proxy_impl::create_component_instance(componentName, studyId); - // Parallel Component ! Engines::Component_var component_proxy = Engines::Component::_nil(); @@ -242,12 +244,21 @@ Container_proxy_impl_final::create_component_instance(const char* componentName, CORBA::Object_var obj = _poa->id_to_reference(*id); component_proxy = Engines::Component::_narrow(obj) ; - _cntInstances_map[impl_name] += 1; + if (!CORBA::is_nil(component_proxy)) + { + _cntInstances_map[impl_name] += 1; - // --- register the engine under the name - // containerName(.dir)/instanceName(.object) - _NS->Register(component_proxy , component_registerName.c_str()) ; - MESSAGE(component_registerName.c_str() << " bound" ) ; + // --- register the engine under the name + // containerName(.dir)/instanceName(.object) + _NS->Register(component_proxy , component_registerName.c_str()) ; + MESSAGE(component_registerName.c_str() << " bound" ) ; + } + else + { + INFOS("The factory returns a nil object !"); + return Engines::Component::_nil(); + } + } catch (...) { @@ -255,5 +266,40 @@ Container_proxy_impl_final::create_component_instance(const char* componentName, return Engines::Component::_nil(); } + // Create on each node a work node + for (CORBA::ULong i = 0; i < _infos.nodes.length(); i++) + { + MESSAGE("Call create_paco_component_node_instance on work node : " << i); + CORBA::Object_var object = _orb->string_to_object(_infos.nodes[i]); + Engines::PACO_Container_var node = Engines::PACO_Container::_narrow(object); + if (!CORBA::is_nil(node)) + { + try + { + node->create_paco_component_node_instance(componentName, studyId); + MESSAGE("Call create_paco_component_node_instance done on node : " << i); + } + catch (SALOME::SALOME_Exception & ex) + { + INFOS("SALOME_EXCEPTION : " << ex.details.text); + return Engines::Component::_nil(); + } + catch (...) + { + INFOS("Unknown Exception catch during create_paco_component_node_instance on node : " << i); + return Engines::Component::_nil(); + } + } + else + { + INFOS("Cannot call create_paco_component_node_instance on node " << i << " ref is nil !"); + return Engines::Component::_nil(); + } + } + + // Start Parallel object + PaCO::InterfaceManager_var paco_proxy = PaCO::InterfaceManager::_narrow(component_proxy); + paco_proxy->start(); + return component_proxy; } diff --git a/src/ParallelContainer/SALOME_ParallelContainerProxy_i.hxx b/src/ParallelContainer/SALOME_ParallelContainerProxy_i.hxx index bef0b12b7..0b2fc3e88 100644 --- a/src/ParallelContainer/SALOME_ParallelContainerProxy_i.hxx +++ b/src/ParallelContainer/SALOME_ParallelContainerProxy_i.hxx @@ -54,14 +54,13 @@ class Container_proxy_impl_final : private: std::map _libtype_map; // libname -> libtype (seq ou par) - typedef bool (*PACO_TEST_FUNCTION) (const char *); + typedef void (*PACO_TEST_FUNCTION) (); typedef PortableServer::ObjectId * (*FACTORY_FUNCTION) (CORBA::ORB_ptr, paco_fabrique_thread *, PortableServer::POA_ptr, PortableServer::ObjectId *, const char *, int); - std::map _parlibfct_map; int _numInstance; std::string _containerName; PortableServer::POA_var _poa; diff --git a/src/ParallelContainer/SALOME_ParallelContainer_i.cxx b/src/ParallelContainer/SALOME_ParallelContainer_i.cxx index af29dbc61..d2dd64ebd 100644 --- a/src/ParallelContainer/SALOME_ParallelContainer_i.cxx +++ b/src/ParallelContainer/SALOME_ParallelContainer_i.cxx @@ -76,7 +76,8 @@ Engines_Parallel_Container_i::Engines_Parallel_Container_i (CORBA::ORB_ptr orb, char * ior, int rank, PortableServer::POA_ptr poa, - char *containerName , + char *containerName, + std::string proxy_containerName, int argc , char* argv[], bool isServantAloneInProcess ) : @@ -104,7 +105,8 @@ Engines_Parallel_Container_i::Engines_Parallel_Container_i (CORBA::ORB_ptr orb, _NS = new SALOME_NamingService(); _NS->init_orb(_orb); _containerName = _NS->BuildContainerNameForNS(containerName, _hostname.c_str()); - _NS->Register(container_node, _containerName.c_str()); + _proxy_containerName = proxy_containerName; + //_NS->Register(container_node, _containerName.c_str()); // Init Python container part CORBA::String_var sior = _orb->object_to_string(container_node); @@ -881,6 +883,87 @@ void Engines_Parallel_Container_i::create_paco_component_node_instance(const char* componentName, CORBA::Long studyId) { + // Init de la méthode + char * proxy_ior; + Engines::Component_PaCO_var work_node; + std::string aCompName = componentName; +#ifndef WIN32 + string impl_name = string ("lib") + aCompName +string("Engine.so"); +#else + string impl_name = aCompName +string("Engine.dll"); +#endif + void* handle = _library_map[impl_name]; + _numInstanceMutex.lock() ; // lock on the instance number + _numInstance++ ; + _numInstanceMutex.unlock() ; + char aNumI[12]; + sprintf( aNumI , "%d" , _numInstance ) ; + string instanceName = aCompName + "_inst_" + aNumI ; + + // Step 1 : Get proxy ! + string component_registerName = _proxy_containerName + "/" + instanceName; + CORBA::Object_var temp = _NS->Resolve(component_registerName.c_str()); + Engines::Component_var obj_proxy = Engines::Component::_narrow(temp); + if (CORBA::is_nil(obj_proxy)) + { + INFOS("Proxy reference from NamingService is nil !"); + SALOME::ExceptionStruct es; + es.type = SALOME::INTERNAL_ERROR; + es.text = "Proxy reference from NamingService is nil !"; + throw SALOME::SALOME_Exception(es); + } + proxy_ior = _orb->object_to_string(obj_proxy); + + // Get factory + string factory_name = aCompName + string("Engine_factory"); + FACTORY_FUNCTION Component_factory = (FACTORY_FUNCTION) dlsym(handle, factory_name.c_str()); + if (!Component_factory) + { + INFOS("Can't resolve symbol : " + factory_name); +#ifndef WIN32 + INFOS("dlerror() result is : " << dlerror()); +#endif + std::string ex_text = "Can't resolve symbol : " + factory_name; + SALOME::ExceptionStruct es; + es.type = SALOME::INTERNAL_ERROR; + es.text = CORBA::string_dup(ex_text.c_str()); + throw SALOME::SALOME_Exception(es); + } + + try + { + char aNumI2[12]; + sprintf(aNumI2 , "%d" , getMyRank()) ; + string instanceName = aCompName + "_inst_" + aNumI + "_work_node_" + aNumI2; + string component_registerName = _containerName + aNumI2 + "/" + instanceName; + + // --- Instanciate work node + PortableServer::ObjectId *id ; //not owner, do not delete (nore use var) + id = (Component_factory) (_orb, proxy_ior, getMyRank(), _poa, _id, instanceName.c_str(), componentName); + + // --- get reference & servant from id + CORBA::Object_var obj = _poa->id_to_reference(*id); + work_node = Engines::Component_PaCO::_narrow(obj) ; + if (CORBA::is_nil(work_node)) + { + INFOS("work_node reference from factory is nil !"); + SALOME::ExceptionStruct es; + es.type = SALOME::INTERNAL_ERROR; + es.text = "work_node reference from factory is nil !"; + throw SALOME::SALOME_Exception(es); + } + work_node->deploy(); + _NS->Register(work_node, component_registerName.c_str()); + MESSAGE(component_registerName.c_str() << " bound" ); + } + catch (...) + { + INFOS("Container_i::create_paco_component_node_instance exception catched"); + SALOME::ExceptionStruct es; + es.type = SALOME::INTERNAL_ERROR; + es.text = "Container_i::create_paco_component_node_instance exception catched"; + throw SALOME::SALOME_Exception(es); + } } Engines::Component_ptr diff --git a/src/ParallelContainer/SALOME_ParallelContainer_i.hxx b/src/ParallelContainer/SALOME_ParallelContainer_i.hxx index 492f28bc1..251941f92 100644 --- a/src/ParallelContainer/SALOME_ParallelContainer_i.hxx +++ b/src/ParallelContainer/SALOME_ParallelContainer_i.hxx @@ -63,6 +63,7 @@ public: Engines_Parallel_Container_i(CORBA::ORB_ptr orb, char * ior, int rank, PortableServer::POA_ptr poa, char * containerName , + std::string proxy_containerName, int argc, char* argv[], bool isServantAloneInProcess = true); virtual ~Engines_Parallel_Container_i(); @@ -142,6 +143,7 @@ protected: std::string _hostname; std::string _library_path; std::string _containerName; + std::string _proxy_containerName; std::string _logfilename; CORBA::ORB_var _orb; PortableServer::POA_var _poa; @@ -160,6 +162,11 @@ protected: _fileRef_map_t _fileRef_map; _Salome_file_map_t _Salome_file_map; + typedef PortableServer::ObjectId * (*FACTORY_FUNCTION) (CORBA::ORB_ptr, char *, int, + PortableServer::POA_ptr, + PortableServer::ObjectId *, + const char *, + const char *); }; #endif