Salome HOME
Fix compilation pb.
[modules/kernel.git] / src / ParallelContainer / SALOME_ParallelComponent_i.cxx
index ddcd47d80eb0574a2ace94433cabc637856369fe..db60ae5e414d2774d8bd8d531c83db3f4029a5c1 100644 (file)
@@ -43,6 +43,7 @@
 int SIGUSR11 = 1000;
 #endif
 
+#include <paco_dummy.h>
 
 using namespace std;
 
@@ -59,7 +60,7 @@ bool Engines_Parallel_Component_i::_isMultiInstance = false;
 //=============================================================================
 
 Engines_Parallel_Component_i::Engines_Parallel_Component_i(CORBA::ORB_ptr orb, char * ior) : 
-  InterfaceParallel_impl(orb,ior), Engines::Component_serv(orb,ior)
+  InterfaceParallel_impl(orb,ior), Engines::Component_serv(orb,ior), Engines::Parallel_Component_serv(orb,ior)
 {
   //ASSERT(0);
   INFOS("Default Constructor...");
@@ -84,7 +85,9 @@ Engines_Parallel_Component_i::Engines_Parallel_Component_i(CORBA::ORB_ptr orb, c
                                         const char *instanceName,
                                         const char *interfaceName,
                                          bool notif) :
-  InterfaceParallel_impl(orb,ior), Engines::Component_serv(orb,ior),
+  InterfaceParallel_impl(orb,ior), 
+  Engines::Component_serv(orb,ior),
+  Engines::Parallel_Component_serv(orb,ior),
   _instanceName(instanceName),
   _interfaceName(interfaceName),
   _myConnexionToRegistry(0),
@@ -106,7 +109,11 @@ Engines_Parallel_Component_i::Engines_Parallel_Component_i(CORBA::ORB_ptr orb, c
                                                 _instanceName.c_str());
 
   _notifSupplier = new NOTIFICATION_Supplier(instanceName, notif);
-  //SCRUTE(pd_refCount);
+
+  deploy_mutex = new pthread_mutex_t();
+  pthread_mutex_init(deploy_mutex, NULL);
+  _proxy = NULL;
+ //SCRUTE(pd_refCount);
 }
 
 //=============================================================================
@@ -121,6 +128,11 @@ Engines_Parallel_Component_i::~Engines_Parallel_Component_i()
 {
   MESSAGE("Component destructor");
   Engines_Parallel_Container_i::decInstanceCnt(_interfaceName);
+  pthread_mutex_destroy(deploy_mutex);
+  delete deploy_mutex;
+
+  if (_proxy)
+    delete _proxy;
 }
 
 //=============================================================================
@@ -567,7 +579,8 @@ void Engines_Parallel_Component_i::beginService(const char *serviceName)
          (*it).second >>= value;
          // ---todo: replace __GNUC__ test by an autoconf macro AC_CHECK_FUNC.
 #if defined __GNUC__
-         int ret = setenv(cle.c_str(), value, overwrite);
+         //int ret = setenv(cle.c_str(), value, overwrite);
+         setenv(cle.c_str(), value, overwrite);
 #else
          //CCRT porting : setenv not defined in stdlib.h
          std::string s(cle);
@@ -575,7 +588,8 @@ void Engines_Parallel_Component_i::beginService(const char *serviceName)
          s+=value;
          // char* cast because 1st arg of linux putenv function
          // is not a const char* !
-         int ret=putenv((char *)s.c_str());
+         //int ret=putenv((char *)s.c_str());
+         putenv((char *)s.c_str());
          //End of CCRT porting
 #endif
          MESSAGE("--- setenv: "<<cle<<" = "<< value);
@@ -758,10 +772,10 @@ string Engines_Parallel_Component_i::GetDynLibraryName(const char *componentName
 //=============================================================================
 
 Engines::TMPFile* Engines_Parallel_Component_i::DumpPython(CORBA::Object_ptr theStudy, 
-                                                 CORBA::Boolean isPublished, 
-                                                 CORBA::Boolean& isValidScript)
+                                                          CORBA::Boolean isPublished, 
+                                                          CORBA::Boolean& isValidScript)
 {
-  char* aScript = "def RebuildData(theStudy): pass";
+  const char* aScript = "def RebuildData(theStudy): pass";
   char* aBuffer = new char[strlen(aScript)+1];
   strcpy(aBuffer, aScript);
   CORBA::Octet* anOctetBuf =  (CORBA::Octet*)aBuffer;
@@ -770,3 +784,372 @@ Engines::TMPFile* Engines_Parallel_Component_i::DumpPython(CORBA::Object_ptr the
   isValidScript = true;
   return aStreamFile._retn(); 
 }
+
+
+Engines::Salome_file_ptr 
+Engines_Parallel_Component_i::setInputFileToService(const char* service_name, 
+                                                   const char* Salome_file_name) 
+{
+  // Try to find the service, if it doesn't exist, we add it.
+  _Service_file_map_it = _Input_Service_file_map.find(service_name);
+  if (_Service_file_map_it ==  _Input_Service_file_map.end()) {
+    _t_Salome_file_map * _map = new _t_Salome_file_map();
+    _Input_Service_file_map[service_name] = _map;
+    _t_Proxy_Salome_file_map * _proxy_map = new _t_Proxy_Salome_file_map();
+    _Proxy_Input_Service_file_map[service_name] = _proxy_map;
+    _t_IOR_Proxy_Salome_file_map * _IOR_proxy_map = new _t_IOR_Proxy_Salome_file_map();
+    _IOR_Proxy_Input_Service_file_map[service_name] = _IOR_proxy_map;
+  }
+  _t_Salome_file_map * _map = _Input_Service_file_map[service_name];
+  _t_Proxy_Salome_file_map * _proxy_map = _Proxy_Input_Service_file_map[service_name];
+  _t_IOR_Proxy_Salome_file_map * _IOR_proxy_map = _IOR_Proxy_Input_Service_file_map[service_name];
+  
+  pthread_mutex_lock(deploy_mutex);
+  std::string proxy_ior;
+
+  // Try to find the Salome_file ...
+  _Salome_file_map_it = _map->find(Salome_file_name);
+  if (_Salome_file_map_it ==  _map->end()) {
+
+    // We create a new PaCO++ object.
+    // He has the same configuration than
+    // his component
+
+    // Firstly, we have to create the proxy object
+    // of the Salome_file and transmit his
+    // reference to the other nodes.
+    if (getMyRank() == 0) {
+      Engines::Parallel_Salome_file_proxy_impl * proxy = 
+       new Engines::Parallel_Salome_file_proxy_impl(CORBA::ORB::_duplicate(_orb));
+      PaCO_operation * proxy_global_ptr =  proxy->getContext("global_paco_context");
+      // We initialize the object with the context of the Parallel component
+      PaCO_operation * compo_global_ptr =  getContext("global_paco_context");
+      //compo_global_ptr->init_context(proxy_global_ptr);
+      proxy_global_ptr->init_context(compo_global_ptr);
+      
+      paco_fabrique_manager* pfm = paco_getFabriqueManager();
+      pfm->register_com("dummy", new paco_dummy_fabrique());
+      proxy_global_ptr->setComFab(NULL);
+      proxy_global_ptr->setLibCom("dummy",NULL);
+      
+      proxy_global_ptr->setTypeClient(true);
+      PaCO::PacoTopology_t client_topo;
+      client_topo.total = 1;
+      proxy_global_ptr->setClientTopo(client_topo);
+      PaCO::PacoTopology_t serveur_topo;
+      serveur_topo.total = getTotalNode();
+      proxy->setTopo(serveur_topo);
+
+      // We register the CORBA objet into the POA
+      CORBA::Object_ptr proxy_ref = proxy->_this();
+
+      // We send the reference to all the nodes...
+      CORBA::Object_ptr comp_proxy = _orb->string_to_object(_ior.c_str());
+      Engines::Parallel_Component_var component_proxy = Engines::Parallel_Component::_narrow(comp_proxy);
+      component_proxy->send_parallel_proxy_object(proxy_ref);
+
+      // Adding proxy into the map
+      (*_proxy_map)[Salome_file_name] = proxy;
+    }
+    else {
+      this->wait_parallel_object_proxy();
+    }
+
+    proxy_ior = this->get_parallel_proxy_object();
+    (*_IOR_proxy_map)[Salome_file_name] = proxy_ior;
+
+    // We register each node of the parallel Salome_file object
+    // into the proxy.
+    for (int i = 0; i < getTotalNode(); i++) {
+      if (i ==  getMyRank()) {
+       Parallel_Salome_file_i * servant = 
+         new Parallel_Salome_file_i(CORBA::ORB::_duplicate(_orb), proxy_ior.c_str());
+       PaCO_operation * servant_global_ptr = servant->getContext("global_paco_context");
+       
+       // We initialize the object with the context of the Parallel component
+       PaCO_operation * compo_global_ptr =  this->getContext("global_paco_context");
+//     compo_global_ptr->init_context(servant_global_ptr);
+       servant_global_ptr->init_context(compo_global_ptr);
+       
+       // We register the CORBA objet into the POA
+       servant->POA_PaCO::InterfaceParallel::_this();
+
+       // Register the servant
+       servant->deploy(getMyRank());
+
+       // Adding servant to the map
+       (*_map)[Salome_file_name] = servant;
+      }
+
+      PaCO_operation * compo_global_ptr =  this->getContext("global_paco_context");
+      compo_global_ptr->my_com->paco_barrier();
+    }
+
+    // Parallel_Salome_file is created and deployed
+    delete _proxy;
+    _proxy = NULL;
+  }
+  pthread_mutex_unlock(deploy_mutex);
+  proxy_ior = (*_IOR_proxy_map)[Salome_file_name];
+  CORBA::Object_ptr proxy_ref = _orb->string_to_object(proxy_ior.c_str());
+  return Engines::Salome_file::_narrow(proxy_ref);
+}
+
+Engines::Salome_file_ptr 
+Engines_Parallel_Component_i::setOutputFileToService(const char* service_name, 
+                                                    const char* Salome_file_name) 
+{
+  // Try to find the service, if it doesn't exist, we add it.
+  _Service_file_map_it = _Output_Service_file_map.find(service_name);
+  if (_Service_file_map_it ==  _Output_Service_file_map.end()) {
+    _t_Salome_file_map * _map = new _t_Salome_file_map();
+    _Output_Service_file_map[service_name] = _map;
+    _t_Proxy_Salome_file_map * _proxy_map = new _t_Proxy_Salome_file_map();
+    _Proxy_Output_Service_file_map[service_name] = _proxy_map;
+    _t_IOR_Proxy_Salome_file_map * _IOR_proxy_map = new _t_IOR_Proxy_Salome_file_map();
+    _IOR_Proxy_Output_Service_file_map[service_name] = _IOR_proxy_map;
+  }
+  _t_Salome_file_map * _map = _Output_Service_file_map[service_name];
+  _t_Proxy_Salome_file_map * _proxy_map = _Proxy_Output_Service_file_map[service_name];
+  _t_IOR_Proxy_Salome_file_map * _IOR_proxy_map = _IOR_Proxy_Output_Service_file_map[service_name];
+  
+  pthread_mutex_lock(deploy_mutex);
+  std::string proxy_ior;
+
+  // Try to find the Salome_file ...
+  _Salome_file_map_it = _map->find(Salome_file_name);
+  if (_Salome_file_map_it ==  _map->end()) {
+
+    // We create a new PaCO++ object.
+    // He has the same configuration than
+    // his component
+
+    // Firstly, we have to create the proxy object
+    // of the Salome_file and transmit his
+    // reference to the other nodes.
+    if (getMyRank() == 0) {
+      Engines::Parallel_Salome_file_proxy_impl * proxy = 
+       new Engines::Parallel_Salome_file_proxy_impl(CORBA::ORB::_duplicate(_orb));
+      PaCO_operation * proxy_global_ptr =  proxy->getContext("global_paco_context");
+      // We initialize the object with the context of the Parallel component
+      PaCO_operation * compo_global_ptr =  getContext("global_paco_context");
+      //compo_global_ptr->init_context(proxy_global_ptr);
+      proxy_global_ptr->init_context(compo_global_ptr);
+
+      paco_fabrique_manager* pfm = paco_getFabriqueManager();
+      pfm->register_com("dummy", new paco_dummy_fabrique());
+      proxy_global_ptr->setComFab(NULL);
+      proxy_global_ptr->setLibCom("dummy",NULL);
+
+      proxy_global_ptr->setTypeClient(true);
+      PaCO::PacoTopology_t client_topo;
+      client_topo.total = 1;
+      proxy_global_ptr->setClientTopo(client_topo);
+      PaCO::PacoTopology_t serveur_topo;
+      serveur_topo.total = getTotalNode();
+      proxy->setTopo(serveur_topo);
+
+      // We register the CORBA objet into the POA
+      CORBA::Object_ptr proxy_ref = proxy->_this();
+
+      // We send the reference to all the nodes...
+      CORBA::Object_ptr comp_proxy = _orb->string_to_object(_ior.c_str());
+      Engines::Parallel_Component_var component_proxy = Engines::Parallel_Component::_narrow(comp_proxy);
+      component_proxy->send_parallel_proxy_object(proxy_ref);
+
+      // Adding proxy into the map
+      (*_proxy_map)[Salome_file_name] = proxy;
+    }
+    else {
+      this->wait_parallel_object_proxy();
+    }
+
+    proxy_ior = this->get_parallel_proxy_object();
+    (*_IOR_proxy_map)[Salome_file_name] = proxy_ior;
+
+    // We register each node of the parallel Salome_file object
+    // into the proxy.
+    for (int i = 0; i < getTotalNode(); i++) {
+      if (i ==  getMyRank()) {
+       Parallel_Salome_file_i * servant = 
+         new Parallel_Salome_file_i(CORBA::ORB::_duplicate(_orb), proxy_ior.c_str());
+       PaCO_operation * servant_global_ptr = servant->getContext("global_paco_context");
+       
+       // We initialize the object with the context of the Parallel component
+       PaCO_operation * compo_global_ptr =  this->getContext("global_paco_context");
+//     compo_global_ptr->init_context(servant_global_ptr);
+       servant_global_ptr->init_context(compo_global_ptr);
+       
+       // We register the CORBA objet into the POA
+       servant->POA_PaCO::InterfaceParallel::_this();
+
+       // Register the servant
+       servant->deploy(getMyRank());
+
+       // Adding servant to the map
+       (*_map)[Salome_file_name] = servant;
+      }
+
+      PaCO_operation * compo_global_ptr =  this->getContext("global_paco_context");
+      compo_global_ptr->my_com->paco_barrier();
+    }
+
+    // Parallel_Salome_file is created and deployed
+    delete _proxy;
+    _proxy = NULL;
+  }
+  pthread_mutex_unlock(deploy_mutex);
+  proxy_ior = (*_IOR_proxy_map)[Salome_file_name];
+  CORBA::Object_ptr proxy_ref = _orb->string_to_object(proxy_ior.c_str());
+  return Engines::Salome_file::_narrow(proxy_ref);
+}
+
+Engines::Salome_file_ptr 
+Engines_Parallel_Component_i::getInputFileToService(const char* service_name, 
+                                                   const char* Salome_file_name) 
+{
+  // Try to find the service, if it doesn't exist, we throw an exception.
+  _Proxy_Service_file_map_it = _Proxy_Input_Service_file_map.find(service_name);
+  if (_Proxy_Service_file_map_it ==  _Proxy_Input_Service_file_map.end()) {
+    SALOME::ExceptionStruct es;
+    es.type = SALOME::INTERNAL_ERROR;
+    es.text = "service doesn't have salome files";
+    throw SALOME::SALOME_Exception(es);
+  }
+  _t_Proxy_Salome_file_map * _map = _Proxy_Input_Service_file_map[service_name];
+
+  // Try to find the Salome_file ...
+  _Proxy_Salome_file_map_it = _map->find(Salome_file_name);
+  if (_Proxy_Salome_file_map_it ==  _map->end()) {
+    SALOME::ExceptionStruct es;
+    es.type = SALOME::INTERNAL_ERROR;
+    es.text = "service doesn't have this Salome_file";
+    throw SALOME::SALOME_Exception(es);
+  }
+
+  // Client get the proxy object
+  Engines::Parallel_Salome_file_proxy_impl * Sfile = (*_map)[Salome_file_name];
+  return Sfile->_this();
+}
+
+Engines::Salome_file_ptr 
+Engines_Parallel_Component_i::getOutputFileToService(const char* service_name, 
+                                                    const char* Salome_file_name) 
+{
+  // Try to find the service, if it doesn't exist, we throw an exception.
+  _Proxy_Service_file_map_it = _Proxy_Output_Service_file_map.find(service_name);
+  if (_Proxy_Service_file_map_it ==  _Proxy_Output_Service_file_map.end()) {
+    SALOME::ExceptionStruct es;
+    es.type = SALOME::INTERNAL_ERROR;
+    es.text = "service doesn't have salome files";
+    throw SALOME::SALOME_Exception(es);
+  }
+  _t_Proxy_Salome_file_map * _map = _Proxy_Output_Service_file_map[service_name];
+
+  // Try to find the Salome_file ...
+  _Proxy_Salome_file_map_it = _map->find(Salome_file_name);
+  if (_Proxy_Salome_file_map_it ==  _map->end()) {
+    SALOME::ExceptionStruct es;
+    es.type = SALOME::INTERNAL_ERROR;
+    es.text = "service doesn't have this Salome_file";
+    throw SALOME::SALOME_Exception(es);
+  }
+
+  // Client get the proxy object
+  Engines::Parallel_Salome_file_proxy_impl * Sfile = (*_map)[Salome_file_name];
+  return Sfile->_this();
+}
+
+
+void 
+Engines_Parallel_Component_i::checkInputFilesToService(const char* service_name) 
+{
+  // Try to find the service, if it doesn't exist, nothing to do.
+  _Proxy_Service_file_map_it = _Proxy_Input_Service_file_map.find(service_name);
+  if (_Proxy_Service_file_map_it !=  _Proxy_Input_Service_file_map.end()) {
+    _t_Proxy_Salome_file_map * _proxy_map = _Proxy_Input_Service_file_map[service_name];
+    _t_Proxy_Salome_file_map::iterator begin = _proxy_map->begin();
+    _t_Proxy_Salome_file_map::iterator end = _proxy_map->end();
+
+    for(;begin!=end;begin++) {
+      Engines::Parallel_Salome_file_proxy_impl * file = begin->second;
+      std::string file_port_name = begin->first;
+      configureSalome_file(service_name, file_port_name, file);
+      file->recvFiles();
+    }
+  }
+}
+
+void 
+Engines_Parallel_Component_i::checkOutputFilesToService(const char* service_name) 
+{
+  // Try to find the service, if it doesn't exist, nothing to do.
+  _Proxy_Service_file_map_it = _Proxy_Output_Service_file_map.find(service_name);
+  if (_Proxy_Service_file_map_it !=  _Proxy_Output_Service_file_map.end()) {
+    _t_Proxy_Salome_file_map * _map = _Proxy_Output_Service_file_map[service_name];
+    _t_Proxy_Salome_file_map::iterator begin = _map->begin();
+    _t_Proxy_Salome_file_map::iterator end = _map->end();
+
+    for(;begin!=end;begin++) {
+      Engines::Parallel_Salome_file_proxy_impl * file = begin->second;
+      std::string file_port_name = begin->first;
+      configureSalome_file(service_name, file_port_name, file);
+      file->recvFiles();
+    }
+  }
+
+}
+
+//=============================================================================
+/*! 
+ *  C++ method: Used by the Parallel Component to deploy a Parallel Salome_file
+ */ 
+//=============================================================================
+void 
+Engines_Parallel_Component_i::send_parallel_proxy_object(CORBA::Object_ptr proxy_ref) {
+  _proxy = _orb->object_to_string(proxy_ref);
+}
+
+//=============================================================================
+/*! 
+ *  C++ method: Used by the Parallel Component to deploy a Parallel Salome_file
+ */ 
+//=============================================================================
+void 
+Engines_Parallel_Component_i::wait_parallel_object_proxy() {
+  char * proxy = NULL;
+  proxy =  get_parallel_proxy_object();
+  while(proxy == NULL)
+  {
+    sleep(1);
+    proxy = get_parallel_proxy_object();
+  }
+}
+
+//=============================================================================
+/*! 
+ *  C++ method: Used by the Parallel Component to deploy a Parallel Salome_file
+ */ 
+//=============================================================================
+char * 
+Engines_Parallel_Component_i::get_parallel_proxy_object() {
+  return _proxy;
+}
+
+
+//=============================================================================
+/*! 
+ *  C++ method: used to configure the Salome_file into the runtime.
+ *  \param service_name name of the service that use this Salome_file
+ *  \param file_port_name name of the Salome_file
+ *  \param file Parallel Salome_file C++ object
+ */
+//=============================================================================
+void
+Engines_Parallel_Component_i::configureSalome_file(std::string service_name,
+                                                  std::string file_port_name,
+                                                  Engines::Parallel_Salome_file_proxy_impl * file) 
+{
+  // By default this method does nothing
+}
+