This interface defines the operations needed to add a paco++ port
into a parallel DSC component.
*/
- interface Parallel_DSC : Engines::Superv_Component {
+ interface Parallel_DSC : Engines::Superv_Component, Engines::Parallel_Component {
/*!
This operation gives the proxy node of a paco++ port to all the nodes.
SALOME_ComponentPaCO_Engines_Container_client.cc \
SALOME_ComponentPaCO_Engines_Component_server.cc \
SALOME_ComponentPaCO_Engines_Component_client.cc \
+ SALOME_ComponentPaCO_Engines_Parallel_Component_server.cc \
+ SALOME_ComponentPaCO_Engines_Parallel_Component_client.cc \
SALOME_ComponentPaCO_Engines_fileTransfer_server.cc \
SALOME_ComponentPaCO_Engines_fileTransfer_client.cc \
SALOME_ComponentPaCO_Engines_Salome_file_server.cc \
SALOME_ComponentPaCO_Engines_Container_client.h \
SALOME_ComponentPaCO_Engines_Component_server.h \
SALOME_ComponentPaCO_Engines_Component_client.h \
+ SALOME_ComponentPaCO_Engines_Parallel_Component_server.h \
+ SALOME_ComponentPaCO_Engines_Parallel_Component_client.h \
SALOME_ComponentPaCO_Engines_fileTransfer_server.h \
SALOME_ComponentPaCO_Engines_fileTransfer_client.h \
SALOME_ComponentPaCO_Engines_Salome_file_server.h \
in string Salome_file_name) raises(SALOME::SALOME_Exception);
} ;
+ interface Parallel_Component : Engines::Component {
+ void send_parallel_proxy_object(in Object proxy_ref);
+ };
+
/*!
A block of binary data used for file transfer. The maximum size of the
block is defined on server side.
*/
void removeFiles();
- /*!
- Delete a file managed by the Salome_file. File is removed too.
-
- \param file_name the name of the file.
-
- \exception raised if the file doesn't exist.
- */
- void deleteFile(in string file_name) raises (SALOME::SALOME_Exception);
-
- /*!
- Delete all the files managed by the Salome_file. Files are removed too.
- */
- void deleteFiles();
-
/**************/
/*!
Return the state of the Salome_file.
*/
Engines::SfState getSalome_fileState();
+
+
+ /*!
+ Set the container where files are.
+
+ \param container container CORBA's reference.
+ */
+ void setContainer(in Engines::Container container);
};
/*! \brief Interface of fileRef.
interface Parallel_Salome_file : Engines::Salome_file {
/*!
- Add a Local file to the Salome_file.
+ Set a number of node for the file. Default is the node 0.
- \param file_name name of the file with the path.
- \param node the number of the node where the file is.
+ \param file_name name of the file.
+ \param node_nbr node number where the file is.
- \exception raised if the file is already added into the Salome_file.
+ \exception raised if the file doesn't exist.
*/
- void setParallelLocalFile(in string comp_file_name, in long node) raises (SALOME::SALOME_Exception);
+ void setFileNode(in string file_name, in long node_nbr) raises (SALOME::SALOME_Exception);
/*!
- Add a Distributed file to the Salome_file.
+ Get the number of the node that actually managed the file.
- \param comp_file_name name of the file with the path.
+ \param file_name name of managed file.
- \exception raised if the file is already added into the Salome_file.
- */
- void setParallelDistributedFile(in string comp_file_name, in long node) raises (SALOME::SALOME_Exception);
+ \return node number of the file
+
+ \exception raised if the file doesn't exist.
+ */
+ long getFileNode(in string file_name) raises (SALOME::SALOME_Exception);
+
+ /*!
+ This method update the state of file for the Parallel_Salome_file.
+
+ \param new_file the new state of file.
+ */
+ Engines::Container updateFile(in Engines::file new_file);
/*!
- This method update the state of the Parallel_Salome_file. Thus, each node
- of the Parallel_Salome_file has the same state.
+ This method is used by the parallel implementation of recvFiles.
- \param new_state the new state of the Parallel_Salome_file.
+ \exception raised if the file cannot be ok.
*/
- void updateState(in Engines::SfState new_state);
+ void recvFiles_node() raises (SALOME::SALOME_Exception);
+
};
-} ;
+};
#endif
<Interface>
<Name>Component</Name>
<Method>
- <Name>ping</Name>
+ <Name>setInputServiceSalome_file</Name>
<Type>distributed</Type>
</Method>
</Interface>
<Interface>
- <Name>Parallel_Salome_file</Name>
+ <Name>Parallel_Component</Name>
<Method>
- <Name>setParallelLocalFile</Name>
+ <Name>send_parallel_proxy_object</Name>
<Type>distributed</Type>
</Method>
+ </Interface>
+ <Interface>
+ <Name>Salome_file</Name>
<Method>
- <Name>setParallelDistributedFile</Name>
+ <Name>recvFiles</Name>
<Type>distributed</Type>
</Method>
</Interface>
+ <Interface>
+ <Name>Parallel_Salome_file</Name>
+ </Interface>
</Module>
</GridCCM_Interface_description>
_Salome_file_map_it = _map->find(Salome_file_name);
if (_Salome_file_map_it == _map->end()) {
Salome_file_i * Sfile = new Salome_file_i();
+ Engines::Container_ptr container = this->GetContainerRef();
+ Sfile->setContainer(Engines::Container::_duplicate(container));
(*_map)[Salome_file_name] = Sfile;
}
else {
file->recvFiles();
}
}
-
}
Engines::Salome_file_ptr
_Salome_file_map_it = _map->find(Salome_file_name);
if (_Salome_file_map_it == _map->end()) {
Salome_file_i * Sfile = new Salome_file_i();
+ Engines::Container_ptr container = this->GetContainerRef();
+ Sfile->setContainer(Engines::Container::_duplicate(container));
(*_map)[Salome_file_name] = Sfile;
}
else {
if (CORBA::is_nil(_Salome_file_map[origName]))
{
Salome_file_i* aSalome_file = new Salome_file_i();
+ aSalome_file->setContainer(Engines::Container::_duplicate(this->_this()));
try
{
aSalome_file->setLocalFile(origFileName);
_state.hdf5_file_name = CORBA::string_dup("");
_state.number_of_files = 0;
_state.files_ok = true;
+ _container = Engines::Container::_nil();
}
//=============================================================================
infos.type = CORBA::string_dup(type.c_str());
infos.source_file_name = CORBA::string_dup(source_file_name.c_str());
infos.status = CORBA::string_dup(status.c_str());
+ // Infos for parallel extensions...
+ infos.node = 0;
+ infos.container = Engines::Container::_duplicate(_container);
_fileManaged[file_name] = infos;
infos.type = CORBA::string_dup(type.c_str());
infos.source_file_name = CORBA::string_dup(source_file_name.c_str());
infos.status = CORBA::string_dup(status.c_str());
+ // Infos for parallel extensions...
+ infos.node = 0;
+ infos.container = Engines::Container::_duplicate(_container);
_fileManaged[file_name] = infos;
infos.type = CORBA::string_dup(type.c_str());
infos.source_file_name = CORBA::string_dup(source_file_name.c_str());
infos.status = CORBA::string_dup(status.c_str());
+ // Infos for parallel extensions...
+ infos.node = 0;
+ infos.container = Engines::Container::_duplicate(_container);
_fileManaged[file_name] = infos;
std::string comp_file_name(_fileManaged[file_name].path.in());
comp_file_name.append(_fileManaged[file_name].file_name.in());
+ // Test if the process can write on disk
if ((fp = fopen(comp_file_name.c_str(),"wb")) == NULL)
{
INFOS("file " << comp_file_name << " cannot be open for writing");
MESSAGE("Salome_file_i::removeFile : NOT YET IMPLEMENTED");
}
-//=============================================================================
-/*!
- * CORBA method
- * \see Engines::Salome_file::deleteFile
- */
-//=============================================================================
-void
-Salome_file_i::deleteFile(const char* file_name) {
- MESSAGE("Salome_file_i::deleteFile : NOT YET IMPLEMENTED");
-}
-
//=============================================================================
/*!
* CORBA method
//=============================================================================
/*!
* CORBA method
- * \see Engines::Salome_file::deleteFiles
- */
-//=============================================================================
-void
-Salome_file_i::deleteFiles() {
- MESSAGE("Salome_file_i::deleteFiles : NOT YET IMPLEMENTED");
-}
-
-//=============================================================================
-/*!
- * CORBA method
- * \see Engines::Salome_file::recvFiles
+ * \see Engines::Salome_file::getFilesInfos
*/
//=============================================================================
Engines::files*
return aBlock;
}
+void
+Salome_file_i::setContainer(Engines::Container_ptr container)
+{
+ _container = Engines::Container::_duplicate(container);
+
+ // Update All the files
+ _t_fileManaged::iterator begin = _fileManaged.begin();
+ _t_fileManaged::iterator end = _fileManaged.end();
+ for(;begin!=end;begin++) {
+ begin->second.container = Engines::Container::_duplicate(container);
+ }
+}
+
// Removing or deleting files
virtual void removeFile(const char* file_name);
- virtual void deleteFile(const char* file_name);
virtual void removeFiles();
- virtual void deleteFiles();
-
// Informations methods:
+ virtual void setContainer(Engines::Container_ptr container);
virtual Engines::files* getFilesInfos();
virtual Engines::file* getFileInfos(const char* file_name);
virtual Engines::SfState* getSalome_fileState();
_t_fileManaged _fileManaged;
_t_fileDistributedSource _fileDistributedSource;
Engines::SfState _state;
+ Engines::Container_ptr _container;
};
#endif
Engines::DSC_serv(orb, ior),
Engines::Superv_Component_serv(orb, ior),
Engines::Component_serv(orb, ior),
+ Engines::Parallel_Component_serv(orb, ior),
InterfaceParallel_impl(orb,ior)
{
}
#
# header files
salomeinclude_HEADERS = SALOME_ParallelComponent_i.hxx \
- SALOME_ParallelContainer_i.hxx
+ SALOME_ParallelContainer_i.hxx \
+ Parallel_Salome_file_i.hxx
#
# ===============================================================
libSalomeParallelContainer_la_SOURCES = SALOME_ParallelComponent_i.cxx \
SALOME_ParallelContainer_i.cxx \
- $(top_srcdir)/src/Container/Salome_file_i.cxx
+ $(top_srcdir)/src/Container/Salome_file_i.cxx \
+ Parallel_Salome_file_i.cxx
libSalomeParallelContainer_la_CXXFLAGS = $(COMMON_CPPFLAGS)
--- /dev/null
+// Copyright (C) 2007 OPEN CASCADE, CEA/DEN, EDF R&D, PRINCIPIA R&D
+//
+// This library is free software; you can redistribute it and/or
+// modify it under the terms of the GNU Lesser General Public
+// License as published by the Free Software Foundation; either
+// version 2.1 of the License.
+//
+// This library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public
+// License along with this library; if not, write to the Free Software
+// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+//
+// See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
+//
+//
+//
+// File : Parallel_Salome_file_i.cxx
+// Author : André RIBES, EDF
+// Module : SALOME
+// $Header:
+
+#include "Parallel_Salome_file_i.hxx"
+#include "utilities.h"
+
+Parallel_Salome_file_i::Parallel_Salome_file_i(CORBA::ORB_ptr orb, char * ior) :
+ InterfaceParallel_impl(orb,ior),
+ Engines::Salome_file_serv(orb,ior),
+ Engines::fileTransfer_serv(orb,ior),
+ Engines::Parallel_Salome_file_serv(orb,ior)
+{
+ CORBA::Object_ptr obj = _orb->string_to_object(ior);
+ proxy = Engines::Parallel_Salome_file::_narrow(obj);
+ parallel_file = NULL;
+}
+
+Parallel_Salome_file_i::~Parallel_Salome_file_i() {}
+
+void
+Parallel_Salome_file_i::load(const char* hdf5_file) {
+ MESSAGE("Parallel_Salome_file_i::load : NOT YET IMPLEMENTED");
+ SALOME::ExceptionStruct es;
+ es.type = SALOME::INTERNAL_ERROR;
+ es.text = "Parallel_Salome_file_i::load : NOT YET IMPLEMENTED";
+ throw SALOME::SALOME_Exception(es);
+}
+
+void
+Parallel_Salome_file_i::save(const char* hdf5_file) {
+ MESSAGE("Parallel_Salome_file_i::save : NOT YET IMPLEMENTED");
+ SALOME::ExceptionStruct es;
+ es.type = SALOME::INTERNAL_ERROR;
+ es.text = "Parallel_Salome_file_i::save : NOT YET IMPLEMENTED";
+ throw SALOME::SALOME_Exception(es);
+}
+
+void
+Parallel_Salome_file_i::save_all(const char* hdf5_file) {
+ MESSAGE("Parallel_Salome_file_i::save_all : NOT YET IMPLEMENTED");
+ SALOME::ExceptionStruct es;
+ es.type = SALOME::INTERNAL_ERROR;
+ es.text = "Parallel_Salome_file_i::save_all : NOT YET IMPLEMENTED";
+ throw SALOME::SALOME_Exception(es);
+}
+
+void
+Parallel_Salome_file_i::connect(Engines::Salome_file_ptr source_Salome_file) {
+ // only one file managed case
+ Salome_file_i::connect(source_Salome_file);
+
+ // Test if the file is managed in an another node
+ // If yes, node is updated
+ _t_fileManaged::iterator it = _fileManaged.begin();
+ std::string file_name = it->first;
+ if (_fileManaged[file_name].node > 0 && getMyRank() == 0) {
+ if (parallel_file == NULL)
+ parallel_file = Engines::PaCO_Parallel_Salome_file::PaCO_narrow(proxy, _orb);
+ parallel_file->connect(source_Salome_file, _fileManaged[file_name].node);
+ }
+}
+
+void
+Parallel_Salome_file_i::connectDistributedFile(const char * file_name,
+ Engines::Salome_file_ptr source_Salome_file) {
+ Salome_file_i::connectDistributedFile(file_name, source_Salome_file);
+
+ // Test if the file is managed in an another node
+ // If yes, node is updated
+ std::string fname(file_name);
+ if (_fileManaged[fname].node > 0 && getMyRank() == 0) {
+ if (parallel_file == NULL)
+ parallel_file = Engines::PaCO_Parallel_Salome_file::PaCO_narrow(proxy, _orb);
+ parallel_file->connectDistributedFile(file_name, source_Salome_file, _fileManaged[fname].node);
+ }
+}
+
+void
+Parallel_Salome_file_i::setDistributedSourceFile(const char* file_name,
+ const char * source_file_name) {
+ Salome_file_i::setDistributedSourceFile(file_name, source_file_name);
+ // Test if the file is managed in an another node
+ // If yes, node is updated
+ std::string fname(file_name);
+ if (_fileManaged[fname].node > 0 && getMyRank() == 0) {
+ if (parallel_file == NULL)
+ parallel_file = Engines::PaCO_Parallel_Salome_file::PaCO_narrow(proxy, _orb);
+ parallel_file->setDistributedSourceFile(file_name, source_file_name, _fileManaged[fname].node);
+ }
+}
+
+void
+Parallel_Salome_file_i::recvFiles() {
+ if (parallel_file == NULL)
+ parallel_file = Engines::PaCO_Parallel_Salome_file::PaCO_narrow(proxy, _orb);
+
+ std::string files_not_ok("");
+ int total = getTotalNode();
+ for (int i =0; i<total; i++) {
+ try {
+ parallel_file->recvFiles_node(i);
+ }
+ catch (SALOME::SALOME_Exception & ex) {
+ files_not_ok = files_not_ok + std::string(ex.details.text.in());
+ }
+ }
+
+ if (files_not_ok != "")
+ {
+ SALOME::ExceptionStruct es;
+ es.type = SALOME::INTERNAL_ERROR;
+ std::string text = "files not ready : " + files_not_ok;
+ es.text = CORBA::string_dup(text.c_str());
+ throw SALOME::SALOME_Exception(es);
+ }
+ else
+ {
+ // We change the state of the Salome_file
+ _state.files_ok = true;
+ }
+}
+
+void
+Parallel_Salome_file_i::recvFiles_node() {
+
+ std::string files_not_ok("");
+ _t_fileManaged::iterator begin = _fileManaged.begin();
+ _t_fileManaged::iterator end = _fileManaged.end();
+ for(;begin!=end;begin++)
+ {
+ bool result = true;
+ Engines::file file_infos = begin->second;
+ if (file_infos.node == getMyRank()) {
+ // Test if the file is local or distributed
+ if (std::string(file_infos.type.in()) == "local")
+ {
+ if (std::string(file_infos.status.in()) == "not_ok")
+ result = checkLocalFile(file_infos.file_name.in());
+ }
+ else
+ {
+ if (std::string(file_infos.status.in()) == "not_ok") {
+ // 2 cases :
+ // Source file is a Salome_file
+ // Source file is a Parallel_Salome_file
+ PaCO::ParallelKernel_var interface_manager =
+ PaCO::ParallelKernel::_narrow(_fileDistributedSource[file_infos.file_name.in()]);
+ if (CORBA::is_nil(interface_manager))
+ result = getDistributedFile(file_infos.file_name.in());
+ else
+ result = getParallelDistributedFile(file_infos.file_name.in());
+ }
+ }
+ // if the result is false
+ // we add this file to files_not_ok
+ if (!result)
+ {
+ files_not_ok.append(" ");
+ files_not_ok.append(file_infos.file_name.in());
+ }
+ }
+ }
+ if (files_not_ok != "")
+ {
+ SALOME::ExceptionStruct es;
+ es.type = SALOME::INTERNAL_ERROR;
+ std::string text = files_not_ok;
+ es.text = CORBA::string_dup(text.c_str());
+ throw SALOME::SALOME_Exception(es);
+ }
+}
+
+bool
+Parallel_Salome_file_i::getParallelDistributedFile(std::string file_name) {
+
+ bool result = true;
+ const char * source_file_name = _fileManaged[file_name].source_file_name.in();
+ int fileId;
+ FILE* fp;
+ std::string comp_file_name(_fileManaged[file_name].path.in());
+ comp_file_name.append(_fileManaged[file_name].file_name.in());
+
+ // Test if the process can write on disk
+ if ((fp = fopen(comp_file_name.c_str(),"wb")) == NULL)
+ {
+ INFOS("file " << comp_file_name << " cannot be open for writing");
+ _fileManaged[file_name].status = CORBA::string_dup("not_ok");
+ result = false;
+ return result;
+ }
+
+ Engines::PaCO_Parallel_Salome_file * parallel_source_file =
+ Engines::PaCO_Parallel_Salome_file::PaCO_narrow(_fileDistributedSource[file_name], _orb);
+
+ int node = parallel_source_file->getFileNode(source_file_name);
+
+ try
+ {
+ fileId = parallel_source_file->open(source_file_name, node);
+ }
+ catch (...)
+ {
+ _fileManaged[file_name].status = CORBA::string_dup("not_ok");
+ fclose(fp);
+ result = false;
+ return result;
+ }
+
+ if (fileId > 0)
+ {
+ Engines::fileBlock* aBlock;
+ int toFollow = 1;
+ int ctr=0;
+ MESSAGE("begin of transfer of " << comp_file_name);
+ while (toFollow)
+ {
+ ctr++;
+ aBlock = parallel_source_file->getBlock(fileId, node);
+ toFollow = aBlock->length();
+ CORBA::Octet *buf = aBlock->get_buffer();
+ int nbWri = fwrite(buf, sizeof(CORBA::Octet), toFollow, fp);
+ ASSERT(nbWri == toFollow);
+ }
+ fclose(fp);
+ MESSAGE("end of transfer of " << comp_file_name);
+ parallel_source_file->close(fileId, node);
+ }
+ else
+ {
+ INFOS("open reference file for copy impossible");
+ result = false;
+ fclose(fp);
+ _fileManaged[file_name].status = CORBA::string_dup("not_ok");
+ return result;
+ }
+
+ _fileManaged[file_name].status = CORBA::string_dup("ok");
+ return result;
+}
+
+void
+Parallel_Salome_file_i::setContainer(Engines::Container_ptr container) {
+ _container = Engines::Container::_duplicate(container);
+
+ // Update All the files managed by the node
+ _t_fileManaged::iterator begin = _fileManaged.begin();
+ _t_fileManaged::iterator end = _fileManaged.end();
+ for(;begin!=end;begin++) {
+ begin->second.container = Engines::Container::_duplicate(container);
+ }
+}
+
+void
+Parallel_Salome_file_i::setFileNode(const char* file_name, CORBA::Long node) {
+
+ // Test if this file is managed
+ std::string fname(file_name);
+ _t_fileManaged::iterator it = _fileManaged.find(fname);
+ if (it == _fileManaged.end())
+ {
+ SALOME::ExceptionStruct es;
+ es.type = SALOME::INTERNAL_ERROR;
+ es.text = "file is not managed";
+ throw SALOME::SALOME_Exception(es);
+ }
+
+ // Update file infos into this node (node 0)
+ // and into the node that actually managed it
+ _fileManaged[fname].node = node;
+
+ if (node > 0) {
+ if (parallel_file == NULL)
+ parallel_file = Engines::PaCO_Parallel_Salome_file::PaCO_narrow(proxy, _orb);
+
+ Engines::Container_ptr cont = parallel_file->updateFile(_fileManaged[fname], node);
+
+ // Update file infos with the new reference of the container
+ _fileManaged[fname].container = Engines::Container::_duplicate(cont);
+ }
+}
+
+Engines::Container_ptr
+Parallel_Salome_file_i::updateFile(const Engines::file& file) {
+ // Copy file
+ Engines::file new_file_infos(file);
+
+ // Adding it to node list
+ new_file_infos.container = Engines::Container::_duplicate(_container);
+ std::string fname(new_file_infos.file_name.in());
+ _fileManaged[fname] = new_file_infos;
+
+ // Return the new reference of the container associated to the file
+ return Engines::Container::_duplicate(_container);
+}
+
+CORBA::Long
+Parallel_Salome_file_i::getFileNode(const char* file_name) {
+
+ // Test if this file is managed
+ std::string fname(file_name);
+ _t_fileManaged::iterator it = _fileManaged.find(fname);
+ if (it == _fileManaged.end())
+ {
+ SALOME::ExceptionStruct es;
+ es.type = SALOME::INTERNAL_ERROR;
+ es.text = "file is not managed";
+ throw SALOME::SALOME_Exception(es);
+ }
+
+ return _fileManaged[fname].node;
+}
--- /dev/null
+// Copyright (C) 2007 OPEN CASCADE, CEA/DEN, EDF R&D, PRINCIPIA R&D
+//
+// This library is free software; you can redistribute it and/or
+// modify it under the terms of the GNU Lesser General Public
+// License as published by the Free Software Foundation; either
+// version 2.1 of the License.
+//
+// This library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public
+// License along with this library; if not, write to the Free Software
+// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+//
+// See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
+//
+//
+//
+// File : Parallel_Salome_file_i.hxx
+// Author : André RIBES, EDF
+// Module : SALOME
+// $Header:
+
+#ifndef _PARALLEL_SALOME_FILE_I_HXX_
+#define _PARALLEL_SALOME_FILE_I_HXX_
+
+#include <SALOMEconfig.h>
+#include <SALOME_Container.hxx>
+#include <map>
+#include <cstdio>
+
+#include "SALOME_ComponentPaCO_Engines_Parallel_Salome_file_server.h"
+#include "Salome_file_i.hxx"
+
+class CONTAINER_EXPORT Parallel_Salome_file_i:
+ public virtual Salome_file_i,
+ public virtual Engines::Parallel_Salome_file_serv
+{
+ public:
+ Parallel_Salome_file_i(CORBA::ORB_ptr orb, char * ior);
+ virtual ~Parallel_Salome_file_i();
+
+ virtual void setFileNode(const char* file_name, CORBA::Long node);
+ virtual CORBA::Long getFileNode(const char* file_name);
+ virtual Engines::Container_ptr updateFile(const Engines::file& file);
+
+ // New implementation for these methods
+ // For the parallel cases
+ virtual void load(const char* hdf5_file);
+ virtual void save(const char* hdf5_file);
+ virtual void save_all(const char* hdf5_file);
+ virtual void connect(Engines::Salome_file_ptr source_Salome_file);
+ virtual void connectDistributedFile(const char * file_name,
+ Engines::Salome_file_ptr source_Salome_file);
+ virtual void setDistributedSourceFile(const char* file_name,
+ const char * source_file_name);
+ virtual void recvFiles();
+ virtual void recvFiles_node();
+ virtual void setContainer(Engines::Container_ptr container);
+ //virtual void removeFile(const char* file_name);
+ //virtual void removeFiles();
+
+ // Local C++ methods
+ virtual bool getParallelDistributedFile(std::string file_name);
+
+ private :
+ Engines::Parallel_Salome_file_var proxy;
+ Engines::PaCO_Parallel_Salome_file * parallel_file;
+};
+
+#endif
+
//=============================================================================
Engines_Parallel_Component_i::Engines_Parallel_Component_i(CORBA::ORB_ptr orb, char * ior) :
- InterfaceParallel_impl(orb,ior), Engines::Component_serv(orb,ior)
+ InterfaceParallel_impl(orb,ior), Engines::Component_serv(orb,ior), Engines::Parallel_Component_serv(orb,ior)
{
//ASSERT(0);
INFOS("Default Constructor...");
const char *instanceName,
const char *interfaceName,
bool notif) :
- InterfaceParallel_impl(orb,ior), Engines::Component_serv(orb,ior),
+ InterfaceParallel_impl(orb,ior),
+ Engines::Component_serv(orb,ior),
+ Engines::Parallel_Component_serv(orb,ior),
_instanceName(instanceName),
_interfaceName(interfaceName),
_myConnexionToRegistry(0),
_instanceName.c_str());
_notifSupplier = new NOTIFICATION_Supplier(instanceName, notif);
- //SCRUTE(pd_refCount);
+
+ deploy_mutex = new pthread_mutex_t();
+ pthread_mutex_init(deploy_mutex, NULL);
+ _proxy = NULL;
+ //SCRUTE(pd_refCount);
}
//=============================================================================
{
MESSAGE("Component destructor");
Engines_Parallel_Container_i::decInstanceCnt(_interfaceName);
+ pthread_mutex_destroy(deploy_mutex);
+ delete deploy_mutex;
+
+ if (_proxy)
+ delete _proxy;
}
//=============================================================================
(*it).second >>= value;
// ---todo: replace __GNUC__ test by an autoconf macro AC_CHECK_FUNC.
#if defined __GNUC__
- int ret = setenv(cle.c_str(), value, overwrite);
+ //int ret = setenv(cle.c_str(), value, overwrite);
+ setenv(cle.c_str(), value, overwrite);
#else
//CCRT porting : setenv not defined in stdlib.h
std::string s(cle);
s+=value;
// char* cast because 1st arg of linux putenv function
// is not a const char* !
- int ret=putenv((char *)s.c_str());
+ //int ret=putenv((char *)s.c_str());
+ putenv((char *)s.c_str());
//End of CCRT porting
#endif
MESSAGE("--- setenv: "<<cle<<" = "<< value);
return aStreamFile._retn();
}
+// This is a parallel method
Engines::Salome_file_ptr
Engines_Parallel_Component_i::getInputServiceSalome_file(const char* service_name,
const char* Salome_file_name)
{
// Try to find the service, if it doesn't exist, we throw an exception.
- _Service_file_map_it = _Input_Service_file_map.find(service_name);
- if (_Service_file_map_it == _Input_Service_file_map.end()) {
+ _Proxy_Service_file_map_it = _Proxy_Input_Service_file_map.find(service_name);
+ if (_Proxy_Service_file_map_it == _Proxy_Input_Service_file_map.end()) {
SALOME::ExceptionStruct es;
es.type = SALOME::INTERNAL_ERROR;
es.text = "service doesn't have salome files";
throw SALOME::SALOME_Exception(es);
}
- _t_Salome_file_map * _map = _Input_Service_file_map[service_name];
+ _t_Proxy_Salome_file_map * _map = _Proxy_Input_Service_file_map[service_name];
// Try to find the Salome_file ...
- _Salome_file_map_it = _map->find(Salome_file_name);
- if (_Salome_file_map_it == _map->end()) {
+ _Proxy_Salome_file_map_it = _map->find(Salome_file_name);
+ if (_Proxy_Salome_file_map_it == _map->end()) {
SALOME::ExceptionStruct es;
es.type = SALOME::INTERNAL_ERROR;
es.text = "service doesn't have this Salome_file";
throw SALOME::SALOME_Exception(es);
}
- Salome_file_i * Sfile = (*_map)[Salome_file_name];
+ // Client get the proxy object
+ Engines::Parallel_Salome_file_proxy_impl * Sfile = (*_map)[Salome_file_name];
return Sfile->_this();
}
if (_Service_file_map_it == _Input_Service_file_map.end()) {
_t_Salome_file_map * _map = new _t_Salome_file_map();
_Input_Service_file_map[service_name] = _map;
+ _t_Proxy_Salome_file_map * _proxy_map = new _t_Proxy_Salome_file_map();
+ _Proxy_Input_Service_file_map[service_name] = _proxy_map;
}
_t_Salome_file_map * _map = _Input_Service_file_map[service_name];
+ _t_Proxy_Salome_file_map * _proxy_map = _Proxy_Input_Service_file_map[service_name];
+ char * proxy_ior;
+
// Try to find the Salome_file ...
_Salome_file_map_it = _map->find(Salome_file_name);
if (_Salome_file_map_it == _map->end()) {
- Salome_file_i * Sfile = new Salome_file_i();
- (*_map)[Salome_file_name] = Sfile;
+
+ // We create a new PaCO++ object.
+ // He has the same configuration than
+ // his component
+
+ pthread_mutex_lock(deploy_mutex);
+ // Firstly, we have to create the proxy object
+ // of the Salome_file and transmit his
+ // reference to the other nodes.
+ if (getMyRank() == 0) {
+ Engines::Parallel_Salome_file_proxy_impl * proxy =
+ new Engines::Parallel_Salome_file_proxy_impl(CORBA::ORB::_duplicate(_orb));
+ PaCO_operation * proxy_global_ptr = proxy->getContext("global_paco_context");
+ proxy_global_ptr->setTypeClient(true);
+ PaCO::PacoTopology_t client_topo;
+ client_topo.total = 1;
+ proxy_global_ptr->setClientTopo(client_topo);
+ PaCO::PacoTopology_t serveur_topo;
+ serveur_topo.total = getTotalNode();
+ proxy->setTopo(serveur_topo);
+
+ // We initialize the object with the context of the Parallel component
+ PaCO_operation * compo_global_ptr = this->getContext("global_paco_context");
+ compo_global_ptr->init_context(proxy_global_ptr);
+
+ // We register the CORBA objet into the POA
+ CORBA::Object_ptr proxy_ref = proxy->_this();
+
+ // We send the reference to all the nodes...
+ CORBA::Object_ptr comp_proxy = _orb->string_to_object(_ior.c_str());
+ Engines::Parallel_Component_var component_proxy = Engines::Parallel_Component::_narrow(comp_proxy);
+ component_proxy->send_parallel_proxy_object(proxy_ref);
+
+ // Adding proxy into the map
+ (*_proxy_map)[Salome_file_name] = proxy;
+ }
+ else {
+ this->wait_parallel_object_proxy();
+ }
+
+ proxy_ior = this->get_parallel_proxy_object();
+
+ // We register each node of the parallel Salome_file object
+ // into the proxy.
+ for (int i = 0; i < getTotalNode(); i++) {
+ if (i == getMyRank()) {
+ Parallel_Salome_file_i * servant =
+ new Parallel_Salome_file_i(CORBA::ORB::_duplicate(_orb), proxy_ior);
+ PaCO_operation * servant_global_ptr = servant->getContext("global_paco_context");
+
+ // We initialize the object with the context of the Parallel component
+ PaCO_operation * compo_global_ptr = this->getContext("global_paco_context");
+ compo_global_ptr->init_context(servant_global_ptr);
+
+ // We register the CORBA objet into the POA
+ servant->POA_PaCO::InterfaceParallel::_this();
+
+ // Register the servant
+ servant->deploy(getMyRank());
+
+ // Adding servant to the map
+ (*_map)[Salome_file_name] = servant;
+ }
+
+ PaCO_operation * compo_global_ptr = this->getContext("global_paco_context");
+ compo_global_ptr->my_com->paco_barrier();
+ }
+ // Parallel_Salome_file is created and deployed
+ delete _proxy;
+ _proxy = NULL;
+ pthread_mutex_unlock(deploy_mutex);
}
else {
// Salome_file_name already added into the service
es.text = "Salome_file_name already added";
throw SALOME::SALOME_Exception(es);
}
- Salome_file_i * Sfile = (*_map)[Salome_file_name];
- return Sfile->_this();
+ CORBA::Object_ptr obj = _orb->string_to_object(proxy_ior);
+ return Engines::Salome_file::_narrow(obj);
}
void
Engines_Parallel_Component_i::checkInputServiceFiles(const char* service_name)
{
// Try to find the service, if it doesn't exist, nothing to do.
- _Service_file_map_it = _Input_Service_file_map.find(service_name);
- if (_Service_file_map_it != _Input_Service_file_map.end()) {
- _t_Salome_file_map * _map = _Input_Service_file_map[service_name];
- _t_Salome_file_map::iterator begin = _map->begin();
- _t_Salome_file_map::iterator end = _map->end();
+ _Proxy_Service_file_map_it = _Proxy_Input_Service_file_map.find(service_name);
+ if (_Proxy_Service_file_map_it != _Proxy_Input_Service_file_map.end()) {
+ _t_Proxy_Salome_file_map * _proxy_map = _Proxy_Input_Service_file_map[service_name];
+ _t_Proxy_Salome_file_map::iterator begin = _proxy_map->begin();
+ _t_Proxy_Salome_file_map::iterator end = _proxy_map->end();
for(;begin!=end;begin++) {
- Salome_file_i * file = begin->second;
+ Engines::Parallel_Salome_file_proxy_impl * file = begin->second;
file->recvFiles();
}
}
-
}
Engines::Salome_file_ptr
// Try to find the Salome_file ...
_Salome_file_map_it = _map->find(Salome_file_name);
if (_Salome_file_map_it == _map->end()) {
- Salome_file_i * Sfile = new Salome_file_i();
- (*_map)[Salome_file_name] = Sfile;
+// Salome_file_i * Sfile = new Salome_file_i();
+// (*_map)[Salome_file_name] = Sfile;
+//
+//
+//
+// TODO
+//
}
else {
// Salome_file_name already added into the service
}
+void
+Engines_Parallel_Component_i::send_parallel_proxy_object(CORBA::Object_ptr proxy_ref) {
+ _proxy = _orb->object_to_string(proxy_ref);
+}
+
+void
+Engines_Parallel_Component_i::wait_parallel_object_proxy() {
+ char * proxy = NULL;
+ proxy = get_parallel_proxy_object();
+ while(proxy == NULL)
+ {
+ sleep(1);
+ proxy = get_parallel_proxy_object();
+ }
+}
+
+char *
+Engines_Parallel_Component_i::get_parallel_proxy_object() {
+ return _proxy;
+}
#include <map>
#include <SALOMEconfig.h>
-#include "SALOME_ComponentPaCO_Engines_Component_server.h"
+#include "SALOME_ComponentPaCO_Engines_Parallel_Component_server.h"
#include "NOTIFICATION.hxx"
#include "RegistryConnexion.hxx"
-#include "Salome_file_i.hxx"
+#include "Parallel_Salome_file_i.hxx"
class Engines_Parallel_Container_i;
#endif
class CONTAINER_EXPORT Engines_Parallel_Component_i:
- public virtual Engines::Component_serv,
+ public virtual Engines::Parallel_Component_serv,
public virtual PortableServer::RefCountServantBase
{
public:
virtual Engines::Salome_file_ptr setOutputFileToService(const char* service_name,
const char* Salome_file_name);
+ void send_parallel_proxy_object(CORBA::Object_ptr proxy_ref);
// --- local C++ methods
PortableServer::ObjectId * getId();
void SetCurCpu() ;
long CpuUsed() ;
+ void wait_parallel_object_proxy();
+ char * get_parallel_proxy_object();
+
protected:
int _studyId; // -1: not initialised; 0: multiStudy; >0: study
static bool _isMultiStudy;
NOTIFICATION_Supplier* _notifSupplier;
std::map<std::string,CORBA::Any>_fieldsDict;
- // Map Salome_file_name to Salome_file*
- typedef std::map<std::string, Salome_file_i*> _t_Salome_file_map;
+ // Map Salome_file_name to Parallel_Salome_file*
+ typedef std::map<std::string, Parallel_Salome_file_i*> _t_Salome_file_map;
+ typedef std::map<std::string, Engines::Parallel_Salome_file_proxy_impl*> _t_Proxy_Salome_file_map;
+
// Map Service_name to _Salome_file_map
typedef std::map<std::string, Engines_Parallel_Component_i::_t_Salome_file_map*> _t_Service_file_map;
+ typedef std::map<std::string, Engines_Parallel_Component_i::_t_Proxy_Salome_file_map*> _t_Proxy_Service_file_map;
_t_Service_file_map _Input_Service_file_map;
_t_Service_file_map _Output_Service_file_map;
_t_Service_file_map::iterator _Service_file_map_it;
_t_Salome_file_map::iterator _Salome_file_map_it;
+ _t_Proxy_Service_file_map _Proxy_Input_Service_file_map;
+ _t_Proxy_Service_file_map _Proxy_Output_Service_file_map;
+ _t_Proxy_Service_file_map::iterator _Proxy_Service_file_map_it;
+ _t_Proxy_Salome_file_map::iterator _Proxy_Salome_file_map_it;
+
std::string _serviceName ;
std::string _graphName ;
std::string _nodeName ;
+ pthread_mutex_t * deploy_mutex;
+ char * _proxy;
+
private:
#ifndef WNT
pthread_t _ThreadId ;
// See http://www.opencascade.org/SALOME/ or email : webmaster.salome@opencascade.org
//
// File : SALOME_ParallelContainer_i.cxx
-// Author : André RIBES, EDF
+// Author : Andr� RIBES, EDF
// Author : Paul RASCLE, EDF - MARC TAJCHMAN, CEA
#include <SALOMEconfig.h>