From 08ad6b9fcffa790af405bc243263171586fab846 Mon Sep 17 00:00:00 2001 From: ribes Date: Fri, 21 Sep 2007 09:52:06 +0000 Subject: [PATCH] - Adding support of Salome_file into containers and components - Adding Parallel_Salome_file - Adding Parallel_Component - Need some tests --- idl/DSC_Engines.idl | 2 +- idl/Makefile.am | 4 + idl/SALOME_Component.idl | 65 ++-- idl/SALOME_Component.xml | 14 +- src/Container/Component_i.cxx | 5 +- src/Container/Container_i.cxx | 1 + src/Container/Salome_file_i.cxx | 48 +-- src/Container/Salome_file_i.hxx | 5 +- src/DSC/ParallelDSC/ParallelDSC_i.cxx | 1 + src/ParallelContainer/Makefile.am | 6 +- .../Parallel_Salome_file_i.cxx | 333 ++++++++++++++++++ .../Parallel_Salome_file_i.hxx | 74 ++++ .../SALOME_ParallelComponent_i.cxx | 161 +++++++-- .../SALOME_ParallelComponent_i.hxx | 25 +- .../SALOME_ParallelContainer_i.cxx | 2 +- 15 files changed, 653 insertions(+), 93 deletions(-) create mode 100644 src/ParallelContainer/Parallel_Salome_file_i.cxx create mode 100644 src/ParallelContainer/Parallel_Salome_file_i.hxx diff --git a/idl/DSC_Engines.idl b/idl/DSC_Engines.idl index e27795bc0..83b558a59 100644 --- a/idl/DSC_Engines.idl +++ b/idl/DSC_Engines.idl @@ -370,7 +370,7 @@ module Engines { This interface defines the operations needed to add a paco++ port into a parallel DSC component. */ - interface Parallel_DSC : Engines::Superv_Component { + interface Parallel_DSC : Engines::Superv_Component, Engines::Parallel_Component { /*! This operation gives the proxy node of a paco++ port to all the nodes. diff --git a/idl/Makefile.am b/idl/Makefile.am index bd67a2866..70837010e 100644 --- a/idl/Makefile.am +++ b/idl/Makefile.am @@ -132,6 +132,8 @@ GEN_PACO = SALOME_ComponentPaCO_Engines_Container_server.cc \ SALOME_ComponentPaCO_Engines_Container_client.cc \ SALOME_ComponentPaCO_Engines_Component_server.cc \ SALOME_ComponentPaCO_Engines_Component_client.cc \ + SALOME_ComponentPaCO_Engines_Parallel_Component_server.cc \ + SALOME_ComponentPaCO_Engines_Parallel_Component_client.cc \ SALOME_ComponentPaCO_Engines_fileTransfer_server.cc \ SALOME_ComponentPaCO_Engines_fileTransfer_client.cc \ SALOME_ComponentPaCO_Engines_Salome_file_server.cc \ @@ -151,6 +153,8 @@ INCLUDES_PACO = SALOME_ComponentPaCO_Engines_Container_server.h \ SALOME_ComponentPaCO_Engines_Container_client.h \ SALOME_ComponentPaCO_Engines_Component_server.h \ SALOME_ComponentPaCO_Engines_Component_client.h \ + SALOME_ComponentPaCO_Engines_Parallel_Component_server.h \ + SALOME_ComponentPaCO_Engines_Parallel_Component_client.h \ SALOME_ComponentPaCO_Engines_fileTransfer_server.h \ SALOME_ComponentPaCO_Engines_fileTransfer_client.h \ SALOME_ComponentPaCO_Engines_Salome_file_server.h \ diff --git a/idl/SALOME_Component.idl b/idl/SALOME_Component.idl index 9cfef5841..5ecb18aff 100644 --- a/idl/SALOME_Component.idl +++ b/idl/SALOME_Component.idl @@ -380,6 +380,10 @@ module Engines in string Salome_file_name) raises(SALOME::SALOME_Exception); } ; + interface Parallel_Component : Engines::Component { + void send_parallel_proxy_object(in Object proxy_ref); + }; + /*! A block of binary data used for file transfer. The maximum size of the block is defined on server side. @@ -562,20 +566,6 @@ module Engines */ void removeFiles(); - /*! - Delete a file managed by the Salome_file. File is removed too. - - \param file_name the name of the file. - - \exception raised if the file doesn't exist. - */ - void deleteFile(in string file_name) raises (SALOME::SALOME_Exception); - - /*! - Delete all the files managed by the Salome_file. Files are removed too. - */ - void deleteFiles(); - /**************/ /*! @@ -599,6 +589,14 @@ module Engines Return the state of the Salome_file. */ Engines::SfState getSalome_fileState(); + + + /*! + Set the container where files are. + + \param container container CORBA's reference. + */ + void setContainer(in Engines::Container container); }; /*! \brief Interface of fileRef. @@ -639,32 +637,41 @@ module Engines interface Parallel_Salome_file : Engines::Salome_file { /*! - Add a Local file to the Salome_file. + Set a number of node for the file. Default is the node 0. - \param file_name name of the file with the path. - \param node the number of the node where the file is. + \param file_name name of the file. + \param node_nbr node number where the file is. - \exception raised if the file is already added into the Salome_file. + \exception raised if the file doesn't exist. */ - void setParallelLocalFile(in string comp_file_name, in long node) raises (SALOME::SALOME_Exception); + void setFileNode(in string file_name, in long node_nbr) raises (SALOME::SALOME_Exception); /*! - Add a Distributed file to the Salome_file. + Get the number of the node that actually managed the file. - \param comp_file_name name of the file with the path. + \param file_name name of managed file. - \exception raised if the file is already added into the Salome_file. - */ - void setParallelDistributedFile(in string comp_file_name, in long node) raises (SALOME::SALOME_Exception); + \return node number of the file + + \exception raised if the file doesn't exist. + */ + long getFileNode(in string file_name) raises (SALOME::SALOME_Exception); + + /*! + This method update the state of file for the Parallel_Salome_file. + + \param new_file the new state of file. + */ + Engines::Container updateFile(in Engines::file new_file); /*! - This method update the state of the Parallel_Salome_file. Thus, each node - of the Parallel_Salome_file has the same state. + This method is used by the parallel implementation of recvFiles. - \param new_state the new state of the Parallel_Salome_file. + \exception raised if the file cannot be ok. */ - void updateState(in Engines::SfState new_state); + void recvFiles_node() raises (SALOME::SALOME_Exception); + }; -} ; +}; #endif diff --git a/idl/SALOME_Component.xml b/idl/SALOME_Component.xml index a37b740f7..189f18638 100644 --- a/idl/SALOME_Component.xml +++ b/idl/SALOME_Component.xml @@ -47,20 +47,26 @@ Component - ping + setInputServiceSalome_file distributed - Parallel_Salome_file + Parallel_Component - setParallelLocalFile + send_parallel_proxy_object distributed + + + Salome_file - setParallelDistributedFile + recvFiles distributed + + Parallel_Salome_file + diff --git a/src/Container/Component_i.cxx b/src/Container/Component_i.cxx index 630413497..ae2250f05 100644 --- a/src/Container/Component_i.cxx +++ b/src/Container/Component_i.cxx @@ -877,6 +877,8 @@ Engines_Component_i::setInputFileToService(const char* service_name, _Salome_file_map_it = _map->find(Salome_file_name); if (_Salome_file_map_it == _map->end()) { Salome_file_i * Sfile = new Salome_file_i(); + Engines::Container_ptr container = this->GetContainerRef(); + Sfile->setContainer(Engines::Container::_duplicate(container)); (*_map)[Salome_file_name] = Sfile; } else { @@ -907,7 +909,6 @@ Engines_Component_i::checkInputServiceFiles(const char* service_name) file->recvFiles(); } } - } Engines::Salome_file_ptr @@ -953,6 +954,8 @@ Engines_Component_i::setOutputFileToService(const char* service_name, _Salome_file_map_it = _map->find(Salome_file_name); if (_Salome_file_map_it == _map->end()) { Salome_file_i * Sfile = new Salome_file_i(); + Engines::Container_ptr container = this->GetContainerRef(); + Sfile->setContainer(Engines::Container::_duplicate(container)); (*_map)[Salome_file_name] = Sfile; } else { diff --git a/src/Container/Container_i.cxx b/src/Container/Container_i.cxx index 799a396d7..e5270c674 100644 --- a/src/Container/Container_i.cxx +++ b/src/Container/Container_i.cxx @@ -676,6 +676,7 @@ Engines_Container_i::createSalome_file(const char* origFileName) if (CORBA::is_nil(_Salome_file_map[origName])) { Salome_file_i* aSalome_file = new Salome_file_i(); + aSalome_file->setContainer(Engines::Container::_duplicate(this->_this())); try { aSalome_file->setLocalFile(origFileName); diff --git a/src/Container/Salome_file_i.cxx b/src/Container/Salome_file_i.cxx index aef2e21bf..184a554c0 100644 --- a/src/Container/Salome_file_i.cxx +++ b/src/Container/Salome_file_i.cxx @@ -44,6 +44,7 @@ Salome_file_i::Salome_file_i() _state.hdf5_file_name = CORBA::string_dup(""); _state.number_of_files = 0; _state.files_ok = true; + _container = Engines::Container::_nil(); } //============================================================================= @@ -204,6 +205,9 @@ Salome_file_i::load(const char* hdf5_file) { infos.type = CORBA::string_dup(type.c_str()); infos.source_file_name = CORBA::string_dup(source_file_name.c_str()); infos.status = CORBA::string_dup(status.c_str()); + // Infos for parallel extensions... + infos.node = 0; + infos.container = Engines::Container::_duplicate(_container); _fileManaged[file_name] = infos; @@ -514,6 +518,9 @@ Salome_file_i::setLocalFile(const char* comp_file_name) infos.type = CORBA::string_dup(type.c_str()); infos.source_file_name = CORBA::string_dup(source_file_name.c_str()); infos.status = CORBA::string_dup(status.c_str()); + // Infos for parallel extensions... + infos.node = 0; + infos.container = Engines::Container::_duplicate(_container); _fileManaged[file_name] = infos; @@ -571,6 +578,9 @@ Salome_file_i::setDistributedFile(const char* comp_file_name) infos.type = CORBA::string_dup(type.c_str()); infos.source_file_name = CORBA::string_dup(source_file_name.c_str()); infos.status = CORBA::string_dup(status.c_str()); + // Infos for parallel extensions... + infos.node = 0; + infos.container = Engines::Container::_duplicate(_container); _fileManaged[file_name] = infos; @@ -762,6 +772,7 @@ Salome_file_i::getDistributedFile(std::string file_name) std::string comp_file_name(_fileManaged[file_name].path.in()); comp_file_name.append(_fileManaged[file_name].file_name.in()); + // Test if the process can write on disk if ((fp = fopen(comp_file_name.c_str(),"wb")) == NULL) { INFOS("file " << comp_file_name << " cannot be open for writing"); @@ -826,17 +837,6 @@ Salome_file_i::removeFile(const char* file_name) MESSAGE("Salome_file_i::removeFile : NOT YET IMPLEMENTED"); } -//============================================================================= -/*! - * CORBA method - * \see Engines::Salome_file::deleteFile - */ -//============================================================================= -void -Salome_file_i::deleteFile(const char* file_name) { - MESSAGE("Salome_file_i::deleteFile : NOT YET IMPLEMENTED"); -} - //============================================================================= /*! * CORBA method @@ -851,18 +851,7 @@ Salome_file_i::removeFiles() { //============================================================================= /*! * CORBA method - * \see Engines::Salome_file::deleteFiles - */ -//============================================================================= -void -Salome_file_i::deleteFiles() { - MESSAGE("Salome_file_i::deleteFiles : NOT YET IMPLEMENTED"); -} - -//============================================================================= -/*! - * CORBA method - * \see Engines::Salome_file::recvFiles + * \see Engines::Salome_file::getFilesInfos */ //============================================================================= Engines::files* @@ -1022,3 +1011,16 @@ Salome_file_i::getBlock(CORBA::Long fileId) return aBlock; } +void +Salome_file_i::setContainer(Engines::Container_ptr container) +{ + _container = Engines::Container::_duplicate(container); + + // Update All the files + _t_fileManaged::iterator begin = _fileManaged.begin(); + _t_fileManaged::iterator end = _fileManaged.end(); + for(;begin!=end;begin++) { + begin->second.container = Engines::Container::_duplicate(container); + } +} + diff --git a/src/Container/Salome_file_i.hxx b/src/Container/Salome_file_i.hxx index b8432cd01..565cd7f68 100644 --- a/src/Container/Salome_file_i.hxx +++ b/src/Container/Salome_file_i.hxx @@ -63,12 +63,10 @@ class CONTAINER_EXPORT Salome_file_i: // Removing or deleting files virtual void removeFile(const char* file_name); - virtual void deleteFile(const char* file_name); virtual void removeFiles(); - virtual void deleteFiles(); - // Informations methods: + virtual void setContainer(Engines::Container_ptr container); virtual Engines::files* getFilesInfos(); virtual Engines::file* getFileInfos(const char* file_name); virtual Engines::SfState* getSalome_fileState(); @@ -101,6 +99,7 @@ class CONTAINER_EXPORT Salome_file_i: _t_fileManaged _fileManaged; _t_fileDistributedSource _fileDistributedSource; Engines::SfState _state; + Engines::Container_ptr _container; }; #endif diff --git a/src/DSC/ParallelDSC/ParallelDSC_i.cxx b/src/DSC/ParallelDSC/ParallelDSC_i.cxx index 7e19a253a..c4f15adb5 100644 --- a/src/DSC/ParallelDSC/ParallelDSC_i.cxx +++ b/src/DSC/ParallelDSC/ParallelDSC_i.cxx @@ -36,6 +36,7 @@ Engines_ParallelDSC_i::Engines_ParallelDSC_i(CORBA::ORB_ptr orb, char * ior, Engines::DSC_serv(orb, ior), Engines::Superv_Component_serv(orb, ior), Engines::Component_serv(orb, ior), + Engines::Parallel_Component_serv(orb, ior), InterfaceParallel_impl(orb,ior) { } diff --git a/src/ParallelContainer/Makefile.am b/src/ParallelContainer/Makefile.am index 4fc7ec173..1734d4f10 100644 --- a/src/ParallelContainer/Makefile.am +++ b/src/ParallelContainer/Makefile.am @@ -33,7 +33,8 @@ include $(top_srcdir)/salome_adm/unix/make_common_starter.am # # header files salomeinclude_HEADERS = SALOME_ParallelComponent_i.hxx \ - SALOME_ParallelContainer_i.hxx + SALOME_ParallelContainer_i.hxx \ + Parallel_Salome_file_i.hxx # # =============================================================== @@ -74,7 +75,8 @@ lib_LTLIBRARIES = libSalomeParallelContainer.la libSalomeParallelContainer_la_SOURCES = SALOME_ParallelComponent_i.cxx \ SALOME_ParallelContainer_i.cxx \ - $(top_srcdir)/src/Container/Salome_file_i.cxx + $(top_srcdir)/src/Container/Salome_file_i.cxx \ + Parallel_Salome_file_i.cxx libSalomeParallelContainer_la_CXXFLAGS = $(COMMON_CPPFLAGS) diff --git a/src/ParallelContainer/Parallel_Salome_file_i.cxx b/src/ParallelContainer/Parallel_Salome_file_i.cxx new file mode 100644 index 000000000..440a0b455 --- /dev/null +++ b/src/ParallelContainer/Parallel_Salome_file_i.cxx @@ -0,0 +1,333 @@ +// Copyright (C) 2007 OPEN CASCADE, CEA/DEN, EDF R&D, PRINCIPIA R&D +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Lesser General Public +// License as published by the Free Software Foundation; either +// version 2.1 of the License. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public +// License along with this library; if not, write to the Free Software +// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +// +// See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com +// +// +// +// File : Parallel_Salome_file_i.cxx +// Author : André RIBES, EDF +// Module : SALOME +// $Header: + +#include "Parallel_Salome_file_i.hxx" +#include "utilities.h" + +Parallel_Salome_file_i::Parallel_Salome_file_i(CORBA::ORB_ptr orb, char * ior) : + InterfaceParallel_impl(orb,ior), + Engines::Salome_file_serv(orb,ior), + Engines::fileTransfer_serv(orb,ior), + Engines::Parallel_Salome_file_serv(orb,ior) +{ + CORBA::Object_ptr obj = _orb->string_to_object(ior); + proxy = Engines::Parallel_Salome_file::_narrow(obj); + parallel_file = NULL; +} + +Parallel_Salome_file_i::~Parallel_Salome_file_i() {} + +void +Parallel_Salome_file_i::load(const char* hdf5_file) { + MESSAGE("Parallel_Salome_file_i::load : NOT YET IMPLEMENTED"); + SALOME::ExceptionStruct es; + es.type = SALOME::INTERNAL_ERROR; + es.text = "Parallel_Salome_file_i::load : NOT YET IMPLEMENTED"; + throw SALOME::SALOME_Exception(es); +} + +void +Parallel_Salome_file_i::save(const char* hdf5_file) { + MESSAGE("Parallel_Salome_file_i::save : NOT YET IMPLEMENTED"); + SALOME::ExceptionStruct es; + es.type = SALOME::INTERNAL_ERROR; + es.text = "Parallel_Salome_file_i::save : NOT YET IMPLEMENTED"; + throw SALOME::SALOME_Exception(es); +} + +void +Parallel_Salome_file_i::save_all(const char* hdf5_file) { + MESSAGE("Parallel_Salome_file_i::save_all : NOT YET IMPLEMENTED"); + SALOME::ExceptionStruct es; + es.type = SALOME::INTERNAL_ERROR; + es.text = "Parallel_Salome_file_i::save_all : NOT YET IMPLEMENTED"; + throw SALOME::SALOME_Exception(es); +} + +void +Parallel_Salome_file_i::connect(Engines::Salome_file_ptr source_Salome_file) { + // only one file managed case + Salome_file_i::connect(source_Salome_file); + + // Test if the file is managed in an another node + // If yes, node is updated + _t_fileManaged::iterator it = _fileManaged.begin(); + std::string file_name = it->first; + if (_fileManaged[file_name].node > 0 && getMyRank() == 0) { + if (parallel_file == NULL) + parallel_file = Engines::PaCO_Parallel_Salome_file::PaCO_narrow(proxy, _orb); + parallel_file->connect(source_Salome_file, _fileManaged[file_name].node); + } +} + +void +Parallel_Salome_file_i::connectDistributedFile(const char * file_name, + Engines::Salome_file_ptr source_Salome_file) { + Salome_file_i::connectDistributedFile(file_name, source_Salome_file); + + // Test if the file is managed in an another node + // If yes, node is updated + std::string fname(file_name); + if (_fileManaged[fname].node > 0 && getMyRank() == 0) { + if (parallel_file == NULL) + parallel_file = Engines::PaCO_Parallel_Salome_file::PaCO_narrow(proxy, _orb); + parallel_file->connectDistributedFile(file_name, source_Salome_file, _fileManaged[fname].node); + } +} + +void +Parallel_Salome_file_i::setDistributedSourceFile(const char* file_name, + const char * source_file_name) { + Salome_file_i::setDistributedSourceFile(file_name, source_file_name); + // Test if the file is managed in an another node + // If yes, node is updated + std::string fname(file_name); + if (_fileManaged[fname].node > 0 && getMyRank() == 0) { + if (parallel_file == NULL) + parallel_file = Engines::PaCO_Parallel_Salome_file::PaCO_narrow(proxy, _orb); + parallel_file->setDistributedSourceFile(file_name, source_file_name, _fileManaged[fname].node); + } +} + +void +Parallel_Salome_file_i::recvFiles() { + if (parallel_file == NULL) + parallel_file = Engines::PaCO_Parallel_Salome_file::PaCO_narrow(proxy, _orb); + + std::string files_not_ok(""); + int total = getTotalNode(); + for (int i =0; irecvFiles_node(i); + } + catch (SALOME::SALOME_Exception & ex) { + files_not_ok = files_not_ok + std::string(ex.details.text.in()); + } + } + + if (files_not_ok != "") + { + SALOME::ExceptionStruct es; + es.type = SALOME::INTERNAL_ERROR; + std::string text = "files not ready : " + files_not_ok; + es.text = CORBA::string_dup(text.c_str()); + throw SALOME::SALOME_Exception(es); + } + else + { + // We change the state of the Salome_file + _state.files_ok = true; + } +} + +void +Parallel_Salome_file_i::recvFiles_node() { + + std::string files_not_ok(""); + _t_fileManaged::iterator begin = _fileManaged.begin(); + _t_fileManaged::iterator end = _fileManaged.end(); + for(;begin!=end;begin++) + { + bool result = true; + Engines::file file_infos = begin->second; + if (file_infos.node == getMyRank()) { + // Test if the file is local or distributed + if (std::string(file_infos.type.in()) == "local") + { + if (std::string(file_infos.status.in()) == "not_ok") + result = checkLocalFile(file_infos.file_name.in()); + } + else + { + if (std::string(file_infos.status.in()) == "not_ok") { + // 2 cases : + // Source file is a Salome_file + // Source file is a Parallel_Salome_file + PaCO::ParallelKernel_var interface_manager = + PaCO::ParallelKernel::_narrow(_fileDistributedSource[file_infos.file_name.in()]); + if (CORBA::is_nil(interface_manager)) + result = getDistributedFile(file_infos.file_name.in()); + else + result = getParallelDistributedFile(file_infos.file_name.in()); + } + } + // if the result is false + // we add this file to files_not_ok + if (!result) + { + files_not_ok.append(" "); + files_not_ok.append(file_infos.file_name.in()); + } + } + } + if (files_not_ok != "") + { + SALOME::ExceptionStruct es; + es.type = SALOME::INTERNAL_ERROR; + std::string text = files_not_ok; + es.text = CORBA::string_dup(text.c_str()); + throw SALOME::SALOME_Exception(es); + } +} + +bool +Parallel_Salome_file_i::getParallelDistributedFile(std::string file_name) { + + bool result = true; + const char * source_file_name = _fileManaged[file_name].source_file_name.in(); + int fileId; + FILE* fp; + std::string comp_file_name(_fileManaged[file_name].path.in()); + comp_file_name.append(_fileManaged[file_name].file_name.in()); + + // Test if the process can write on disk + if ((fp = fopen(comp_file_name.c_str(),"wb")) == NULL) + { + INFOS("file " << comp_file_name << " cannot be open for writing"); + _fileManaged[file_name].status = CORBA::string_dup("not_ok"); + result = false; + return result; + } + + Engines::PaCO_Parallel_Salome_file * parallel_source_file = + Engines::PaCO_Parallel_Salome_file::PaCO_narrow(_fileDistributedSource[file_name], _orb); + + int node = parallel_source_file->getFileNode(source_file_name); + + try + { + fileId = parallel_source_file->open(source_file_name, node); + } + catch (...) + { + _fileManaged[file_name].status = CORBA::string_dup("not_ok"); + fclose(fp); + result = false; + return result; + } + + if (fileId > 0) + { + Engines::fileBlock* aBlock; + int toFollow = 1; + int ctr=0; + MESSAGE("begin of transfer of " << comp_file_name); + while (toFollow) + { + ctr++; + aBlock = parallel_source_file->getBlock(fileId, node); + toFollow = aBlock->length(); + CORBA::Octet *buf = aBlock->get_buffer(); + int nbWri = fwrite(buf, sizeof(CORBA::Octet), toFollow, fp); + ASSERT(nbWri == toFollow); + } + fclose(fp); + MESSAGE("end of transfer of " << comp_file_name); + parallel_source_file->close(fileId, node); + } + else + { + INFOS("open reference file for copy impossible"); + result = false; + fclose(fp); + _fileManaged[file_name].status = CORBA::string_dup("not_ok"); + return result; + } + + _fileManaged[file_name].status = CORBA::string_dup("ok"); + return result; +} + +void +Parallel_Salome_file_i::setContainer(Engines::Container_ptr container) { + _container = Engines::Container::_duplicate(container); + + // Update All the files managed by the node + _t_fileManaged::iterator begin = _fileManaged.begin(); + _t_fileManaged::iterator end = _fileManaged.end(); + for(;begin!=end;begin++) { + begin->second.container = Engines::Container::_duplicate(container); + } +} + +void +Parallel_Salome_file_i::setFileNode(const char* file_name, CORBA::Long node) { + + // Test if this file is managed + std::string fname(file_name); + _t_fileManaged::iterator it = _fileManaged.find(fname); + if (it == _fileManaged.end()) + { + SALOME::ExceptionStruct es; + es.type = SALOME::INTERNAL_ERROR; + es.text = "file is not managed"; + throw SALOME::SALOME_Exception(es); + } + + // Update file infos into this node (node 0) + // and into the node that actually managed it + _fileManaged[fname].node = node; + + if (node > 0) { + if (parallel_file == NULL) + parallel_file = Engines::PaCO_Parallel_Salome_file::PaCO_narrow(proxy, _orb); + + Engines::Container_ptr cont = parallel_file->updateFile(_fileManaged[fname], node); + + // Update file infos with the new reference of the container + _fileManaged[fname].container = Engines::Container::_duplicate(cont); + } +} + +Engines::Container_ptr +Parallel_Salome_file_i::updateFile(const Engines::file& file) { + // Copy file + Engines::file new_file_infos(file); + + // Adding it to node list + new_file_infos.container = Engines::Container::_duplicate(_container); + std::string fname(new_file_infos.file_name.in()); + _fileManaged[fname] = new_file_infos; + + // Return the new reference of the container associated to the file + return Engines::Container::_duplicate(_container); +} + +CORBA::Long +Parallel_Salome_file_i::getFileNode(const char* file_name) { + + // Test if this file is managed + std::string fname(file_name); + _t_fileManaged::iterator it = _fileManaged.find(fname); + if (it == _fileManaged.end()) + { + SALOME::ExceptionStruct es; + es.type = SALOME::INTERNAL_ERROR; + es.text = "file is not managed"; + throw SALOME::SALOME_Exception(es); + } + + return _fileManaged[fname].node; +} diff --git a/src/ParallelContainer/Parallel_Salome_file_i.hxx b/src/ParallelContainer/Parallel_Salome_file_i.hxx new file mode 100644 index 000000000..f2847b824 --- /dev/null +++ b/src/ParallelContainer/Parallel_Salome_file_i.hxx @@ -0,0 +1,74 @@ +// Copyright (C) 2007 OPEN CASCADE, CEA/DEN, EDF R&D, PRINCIPIA R&D +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Lesser General Public +// License as published by the Free Software Foundation; either +// version 2.1 of the License. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public +// License along with this library; if not, write to the Free Software +// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +// +// See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com +// +// +// +// File : Parallel_Salome_file_i.hxx +// Author : André RIBES, EDF +// Module : SALOME +// $Header: + +#ifndef _PARALLEL_SALOME_FILE_I_HXX_ +#define _PARALLEL_SALOME_FILE_I_HXX_ + +#include +#include +#include +#include + +#include "SALOME_ComponentPaCO_Engines_Parallel_Salome_file_server.h" +#include "Salome_file_i.hxx" + +class CONTAINER_EXPORT Parallel_Salome_file_i: + public virtual Salome_file_i, + public virtual Engines::Parallel_Salome_file_serv +{ + public: + Parallel_Salome_file_i(CORBA::ORB_ptr orb, char * ior); + virtual ~Parallel_Salome_file_i(); + + virtual void setFileNode(const char* file_name, CORBA::Long node); + virtual CORBA::Long getFileNode(const char* file_name); + virtual Engines::Container_ptr updateFile(const Engines::file& file); + + // New implementation for these methods + // For the parallel cases + virtual void load(const char* hdf5_file); + virtual void save(const char* hdf5_file); + virtual void save_all(const char* hdf5_file); + virtual void connect(Engines::Salome_file_ptr source_Salome_file); + virtual void connectDistributedFile(const char * file_name, + Engines::Salome_file_ptr source_Salome_file); + virtual void setDistributedSourceFile(const char* file_name, + const char * source_file_name); + virtual void recvFiles(); + virtual void recvFiles_node(); + virtual void setContainer(Engines::Container_ptr container); + //virtual void removeFile(const char* file_name); + //virtual void removeFiles(); + + // Local C++ methods + virtual bool getParallelDistributedFile(std::string file_name); + + private : + Engines::Parallel_Salome_file_var proxy; + Engines::PaCO_Parallel_Salome_file * parallel_file; +}; + +#endif + diff --git a/src/ParallelContainer/SALOME_ParallelComponent_i.cxx b/src/ParallelContainer/SALOME_ParallelComponent_i.cxx index f215c2291..3a6002cb7 100644 --- a/src/ParallelContainer/SALOME_ParallelComponent_i.cxx +++ b/src/ParallelContainer/SALOME_ParallelComponent_i.cxx @@ -59,7 +59,7 @@ bool Engines_Parallel_Component_i::_isMultiInstance = false; //============================================================================= Engines_Parallel_Component_i::Engines_Parallel_Component_i(CORBA::ORB_ptr orb, char * ior) : - InterfaceParallel_impl(orb,ior), Engines::Component_serv(orb,ior) + InterfaceParallel_impl(orb,ior), Engines::Component_serv(orb,ior), Engines::Parallel_Component_serv(orb,ior) { //ASSERT(0); INFOS("Default Constructor..."); @@ -84,7 +84,9 @@ Engines_Parallel_Component_i::Engines_Parallel_Component_i(CORBA::ORB_ptr orb, c const char *instanceName, const char *interfaceName, bool notif) : - InterfaceParallel_impl(orb,ior), Engines::Component_serv(orb,ior), + InterfaceParallel_impl(orb,ior), + Engines::Component_serv(orb,ior), + Engines::Parallel_Component_serv(orb,ior), _instanceName(instanceName), _interfaceName(interfaceName), _myConnexionToRegistry(0), @@ -106,7 +108,11 @@ Engines_Parallel_Component_i::Engines_Parallel_Component_i(CORBA::ORB_ptr orb, c _instanceName.c_str()); _notifSupplier = new NOTIFICATION_Supplier(instanceName, notif); - //SCRUTE(pd_refCount); + + deploy_mutex = new pthread_mutex_t(); + pthread_mutex_init(deploy_mutex, NULL); + _proxy = NULL; + //SCRUTE(pd_refCount); } //============================================================================= @@ -121,6 +127,11 @@ Engines_Parallel_Component_i::~Engines_Parallel_Component_i() { MESSAGE("Component destructor"); Engines_Parallel_Container_i::decInstanceCnt(_interfaceName); + pthread_mutex_destroy(deploy_mutex); + delete deploy_mutex; + + if (_proxy) + delete _proxy; } //============================================================================= @@ -567,7 +578,8 @@ void Engines_Parallel_Component_i::beginService(const char *serviceName) (*it).second >>= value; // ---todo: replace __GNUC__ test by an autoconf macro AC_CHECK_FUNC. #if defined __GNUC__ - int ret = setenv(cle.c_str(), value, overwrite); + //int ret = setenv(cle.c_str(), value, overwrite); + setenv(cle.c_str(), value, overwrite); #else //CCRT porting : setenv not defined in stdlib.h std::string s(cle); @@ -575,7 +587,8 @@ void Engines_Parallel_Component_i::beginService(const char *serviceName) s+=value; // char* cast because 1st arg of linux putenv function // is not a const char* ! - int ret=putenv((char *)s.c_str()); + //int ret=putenv((char *)s.c_str()); + putenv((char *)s.c_str()); //End of CCRT porting #endif MESSAGE("--- setenv: "<find(Salome_file_name); - if (_Salome_file_map_it == _map->end()) { + _Proxy_Salome_file_map_it = _map->find(Salome_file_name); + if (_Proxy_Salome_file_map_it == _map->end()) { SALOME::ExceptionStruct es; es.type = SALOME::INTERNAL_ERROR; es.text = "service doesn't have this Salome_file"; throw SALOME::SALOME_Exception(es); } - Salome_file_i * Sfile = (*_map)[Salome_file_name]; + // Client get the proxy object + Engines::Parallel_Salome_file_proxy_impl * Sfile = (*_map)[Salome_file_name]; return Sfile->_this(); } @@ -807,14 +822,88 @@ Engines_Parallel_Component_i::setInputFileToService(const char* service_name, if (_Service_file_map_it == _Input_Service_file_map.end()) { _t_Salome_file_map * _map = new _t_Salome_file_map(); _Input_Service_file_map[service_name] = _map; + _t_Proxy_Salome_file_map * _proxy_map = new _t_Proxy_Salome_file_map(); + _Proxy_Input_Service_file_map[service_name] = _proxy_map; } _t_Salome_file_map * _map = _Input_Service_file_map[service_name]; + _t_Proxy_Salome_file_map * _proxy_map = _Proxy_Input_Service_file_map[service_name]; + char * proxy_ior; + // Try to find the Salome_file ... _Salome_file_map_it = _map->find(Salome_file_name); if (_Salome_file_map_it == _map->end()) { - Salome_file_i * Sfile = new Salome_file_i(); - (*_map)[Salome_file_name] = Sfile; + + // We create a new PaCO++ object. + // He has the same configuration than + // his component + + pthread_mutex_lock(deploy_mutex); + // Firstly, we have to create the proxy object + // of the Salome_file and transmit his + // reference to the other nodes. + if (getMyRank() == 0) { + Engines::Parallel_Salome_file_proxy_impl * proxy = + new Engines::Parallel_Salome_file_proxy_impl(CORBA::ORB::_duplicate(_orb)); + PaCO_operation * proxy_global_ptr = proxy->getContext("global_paco_context"); + proxy_global_ptr->setTypeClient(true); + PaCO::PacoTopology_t client_topo; + client_topo.total = 1; + proxy_global_ptr->setClientTopo(client_topo); + PaCO::PacoTopology_t serveur_topo; + serveur_topo.total = getTotalNode(); + proxy->setTopo(serveur_topo); + + // We initialize the object with the context of the Parallel component + PaCO_operation * compo_global_ptr = this->getContext("global_paco_context"); + compo_global_ptr->init_context(proxy_global_ptr); + + // We register the CORBA objet into the POA + CORBA::Object_ptr proxy_ref = proxy->_this(); + + // We send the reference to all the nodes... + CORBA::Object_ptr comp_proxy = _orb->string_to_object(_ior.c_str()); + Engines::Parallel_Component_var component_proxy = Engines::Parallel_Component::_narrow(comp_proxy); + component_proxy->send_parallel_proxy_object(proxy_ref); + + // Adding proxy into the map + (*_proxy_map)[Salome_file_name] = proxy; + } + else { + this->wait_parallel_object_proxy(); + } + + proxy_ior = this->get_parallel_proxy_object(); + + // We register each node of the parallel Salome_file object + // into the proxy. + for (int i = 0; i < getTotalNode(); i++) { + if (i == getMyRank()) { + Parallel_Salome_file_i * servant = + new Parallel_Salome_file_i(CORBA::ORB::_duplicate(_orb), proxy_ior); + PaCO_operation * servant_global_ptr = servant->getContext("global_paco_context"); + + // We initialize the object with the context of the Parallel component + PaCO_operation * compo_global_ptr = this->getContext("global_paco_context"); + compo_global_ptr->init_context(servant_global_ptr); + + // We register the CORBA objet into the POA + servant->POA_PaCO::InterfaceParallel::_this(); + + // Register the servant + servant->deploy(getMyRank()); + + // Adding servant to the map + (*_map)[Salome_file_name] = servant; + } + + PaCO_operation * compo_global_ptr = this->getContext("global_paco_context"); + compo_global_ptr->my_com->paco_barrier(); + } + // Parallel_Salome_file is created and deployed + delete _proxy; + _proxy = NULL; + pthread_mutex_unlock(deploy_mutex); } else { // Salome_file_name already added into the service @@ -824,27 +913,26 @@ Engines_Parallel_Component_i::setInputFileToService(const char* service_name, es.text = "Salome_file_name already added"; throw SALOME::SALOME_Exception(es); } - Salome_file_i * Sfile = (*_map)[Salome_file_name]; - return Sfile->_this(); + CORBA::Object_ptr obj = _orb->string_to_object(proxy_ior); + return Engines::Salome_file::_narrow(obj); } void Engines_Parallel_Component_i::checkInputServiceFiles(const char* service_name) { // Try to find the service, if it doesn't exist, nothing to do. - _Service_file_map_it = _Input_Service_file_map.find(service_name); - if (_Service_file_map_it != _Input_Service_file_map.end()) { - _t_Salome_file_map * _map = _Input_Service_file_map[service_name]; - _t_Salome_file_map::iterator begin = _map->begin(); - _t_Salome_file_map::iterator end = _map->end(); + _Proxy_Service_file_map_it = _Proxy_Input_Service_file_map.find(service_name); + if (_Proxy_Service_file_map_it != _Proxy_Input_Service_file_map.end()) { + _t_Proxy_Salome_file_map * _proxy_map = _Proxy_Input_Service_file_map[service_name]; + _t_Proxy_Salome_file_map::iterator begin = _proxy_map->begin(); + _t_Proxy_Salome_file_map::iterator end = _proxy_map->end(); for(;begin!=end;begin++) { - Salome_file_i * file = begin->second; + Engines::Parallel_Salome_file_proxy_impl * file = begin->second; file->recvFiles(); } } - } Engines::Salome_file_ptr @@ -889,8 +977,13 @@ Engines_Parallel_Component_i::setOutputFileToService(const char* service_name, // Try to find the Salome_file ... _Salome_file_map_it = _map->find(Salome_file_name); if (_Salome_file_map_it == _map->end()) { - Salome_file_i * Sfile = new Salome_file_i(); - (*_map)[Salome_file_name] = Sfile; +// Salome_file_i * Sfile = new Salome_file_i(); +// (*_map)[Salome_file_name] = Sfile; +// +// +// +// TODO +// } else { // Salome_file_name already added into the service @@ -923,3 +1016,23 @@ Engines_Parallel_Component_i::checkOutputServiceFiles(const char* service_name) } +void +Engines_Parallel_Component_i::send_parallel_proxy_object(CORBA::Object_ptr proxy_ref) { + _proxy = _orb->object_to_string(proxy_ref); +} + +void +Engines_Parallel_Component_i::wait_parallel_object_proxy() { + char * proxy = NULL; + proxy = get_parallel_proxy_object(); + while(proxy == NULL) + { + sleep(1); + proxy = get_parallel_proxy_object(); + } +} + +char * +Engines_Parallel_Component_i::get_parallel_proxy_object() { + return _proxy; +} diff --git a/src/ParallelContainer/SALOME_ParallelComponent_i.hxx b/src/ParallelContainer/SALOME_ParallelComponent_i.hxx index 1025e49ff..5355d5f9f 100644 --- a/src/ParallelContainer/SALOME_ParallelComponent_i.hxx +++ b/src/ParallelContainer/SALOME_ParallelComponent_i.hxx @@ -37,11 +37,11 @@ #include #include -#include "SALOME_ComponentPaCO_Engines_Component_server.h" +#include "SALOME_ComponentPaCO_Engines_Parallel_Component_server.h" #include "NOTIFICATION.hxx" #include "RegistryConnexion.hxx" -#include "Salome_file_i.hxx" +#include "Parallel_Salome_file_i.hxx" class Engines_Parallel_Container_i; @@ -60,7 +60,7 @@ class Engines_Parallel_Container_i; #endif class CONTAINER_EXPORT Engines_Parallel_Component_i: - public virtual Engines::Component_serv, + public virtual Engines::Parallel_Component_serv, public virtual PortableServer::RefCountServantBase { public: @@ -112,6 +112,7 @@ public: virtual Engines::Salome_file_ptr setOutputFileToService(const char* service_name, const char* Salome_file_name); + void send_parallel_proxy_object(CORBA::Object_ptr proxy_ref); // --- local C++ methods PortableServer::ObjectId * getId(); @@ -131,6 +132,9 @@ public: void SetCurCpu() ; long CpuUsed() ; + void wait_parallel_object_proxy(); + char * get_parallel_proxy_object(); + protected: int _studyId; // -1: not initialised; 0: multiStudy; >0: study static bool _isMultiStudy; @@ -148,20 +152,31 @@ protected: NOTIFICATION_Supplier* _notifSupplier; std::map_fieldsDict; - // Map Salome_file_name to Salome_file* - typedef std::map _t_Salome_file_map; + // Map Salome_file_name to Parallel_Salome_file* + typedef std::map _t_Salome_file_map; + typedef std::map _t_Proxy_Salome_file_map; + // Map Service_name to _Salome_file_map typedef std::map _t_Service_file_map; + typedef std::map _t_Proxy_Service_file_map; _t_Service_file_map _Input_Service_file_map; _t_Service_file_map _Output_Service_file_map; _t_Service_file_map::iterator _Service_file_map_it; _t_Salome_file_map::iterator _Salome_file_map_it; + _t_Proxy_Service_file_map _Proxy_Input_Service_file_map; + _t_Proxy_Service_file_map _Proxy_Output_Service_file_map; + _t_Proxy_Service_file_map::iterator _Proxy_Service_file_map_it; + _t_Proxy_Salome_file_map::iterator _Proxy_Salome_file_map_it; + std::string _serviceName ; std::string _graphName ; std::string _nodeName ; + pthread_mutex_t * deploy_mutex; + char * _proxy; + private: #ifndef WNT pthread_t _ThreadId ; diff --git a/src/ParallelContainer/SALOME_ParallelContainer_i.cxx b/src/ParallelContainer/SALOME_ParallelContainer_i.cxx index b2b81b7b5..b1063f369 100644 --- a/src/ParallelContainer/SALOME_ParallelContainer_i.cxx +++ b/src/ParallelContainer/SALOME_ParallelContainer_i.cxx @@ -20,7 +20,7 @@ // See http://www.opencascade.org/SALOME/ or email : webmaster.salome@opencascade.org // // File : SALOME_ParallelContainer_i.cxx -// Author : André RIBES, EDF +// Author : Andr� RIBES, EDF // Author : Paul RASCLE, EDF - MARC TAJCHMAN, CEA #include -- 2.39.2