]> SALOME platform Git repositories - modules/kernel.git/commitdiff
Salome HOME
- Adding support of Salome_file into containers and components
authorribes <ribes>
Fri, 21 Sep 2007 09:52:06 +0000 (09:52 +0000)
committerribes <ribes>
Fri, 21 Sep 2007 09:52:06 +0000 (09:52 +0000)
- Adding Parallel_Salome_file
- Adding Parallel_Component
- Need some tests

15 files changed:
idl/DSC_Engines.idl
idl/Makefile.am
idl/SALOME_Component.idl
idl/SALOME_Component.xml
src/Container/Component_i.cxx
src/Container/Container_i.cxx
src/Container/Salome_file_i.cxx
src/Container/Salome_file_i.hxx
src/DSC/ParallelDSC/ParallelDSC_i.cxx
src/ParallelContainer/Makefile.am
src/ParallelContainer/Parallel_Salome_file_i.cxx [new file with mode: 0644]
src/ParallelContainer/Parallel_Salome_file_i.hxx [new file with mode: 0644]
src/ParallelContainer/SALOME_ParallelComponent_i.cxx
src/ParallelContainer/SALOME_ParallelComponent_i.hxx
src/ParallelContainer/SALOME_ParallelContainer_i.cxx

index e27795bc0a4e3836ba2fa2ca0dfad92bec697047..83b558a59c9aec691377e4f3347bcce8eb18a29d 100644 (file)
@@ -370,7 +370,7 @@ module Engines {
     This interface defines the operations needed to add a paco++ port 
     into a parallel DSC component.
   */
-  interface Parallel_DSC : Engines::Superv_Component {
+  interface Parallel_DSC : Engines::Superv_Component, Engines::Parallel_Component {
 
     /*!
       This operation gives the proxy node of a paco++ port to all the nodes.
index bd67a2866a3946070b3c65b7eb564ea2a7acb470..70837010e6438b3dc597fc5875b4df0dc8f7b1f7 100644 (file)
@@ -132,6 +132,8 @@ GEN_PACO = SALOME_ComponentPaCO_Engines_Container_server.cc \
           SALOME_ComponentPaCO_Engines_Container_client.cc \
           SALOME_ComponentPaCO_Engines_Component_server.cc \
           SALOME_ComponentPaCO_Engines_Component_client.cc \
+          SALOME_ComponentPaCO_Engines_Parallel_Component_server.cc \
+          SALOME_ComponentPaCO_Engines_Parallel_Component_client.cc \
           SALOME_ComponentPaCO_Engines_fileTransfer_server.cc \
           SALOME_ComponentPaCO_Engines_fileTransfer_client.cc \
           SALOME_ComponentPaCO_Engines_Salome_file_server.cc \
@@ -151,6 +153,8 @@ INCLUDES_PACO = SALOME_ComponentPaCO_Engines_Container_server.h \
                SALOME_ComponentPaCO_Engines_Container_client.h \
                 SALOME_ComponentPaCO_Engines_Component_server.h \
                SALOME_ComponentPaCO_Engines_Component_client.h \
+                SALOME_ComponentPaCO_Engines_Parallel_Component_server.h \
+               SALOME_ComponentPaCO_Engines_Parallel_Component_client.h \
                SALOME_ComponentPaCO_Engines_fileTransfer_server.h \
                SALOME_ComponentPaCO_Engines_fileTransfer_client.h \
                SALOME_ComponentPaCO_Engines_Salome_file_server.h \
index 9cfef58412f446fe5fc709ffa476b413a46726bf..5ecb18affbebf8d83c4effd67667584a1da7b3c7 100644 (file)
@@ -380,6 +380,10 @@ module Engines
                                                in string Salome_file_name)      raises(SALOME::SALOME_Exception);
   } ;
 
+  interface Parallel_Component : Engines::Component {
+    void send_parallel_proxy_object(in Object proxy_ref);
+  };
+
   /*!
     A block of binary data used for file transfer. The maximum size of the
     block is defined on server side.
@@ -562,20 +566,6 @@ module Engines
     */
     void removeFiles();
 
-    /*!
-      Delete a file managed by the Salome_file. File is removed too.
-
-      \param file_name the name of the file.
-
-      \exception raised if the file doesn't exist.
-    */
-    void deleteFile(in string file_name)                   raises (SALOME::SALOME_Exception);
-
-    /*!
-      Delete all the files managed by the Salome_file. Files are removed too.
-    */
-    void deleteFiles();
-
 /**************/
 
     /*!
@@ -599,6 +589,14 @@ module Engines
       Return the state of the Salome_file.
     */
     Engines::SfState getSalome_fileState();
+
+
+    /*!
+      Set the container where files are.
+
+      \param container container CORBA's reference.
+    */
+    void setContainer(in Engines::Container container);
   };
 
   /*! \brief Interface of fileRef.
@@ -639,32 +637,41 @@ module Engines
   interface Parallel_Salome_file : Engines::Salome_file {
 
     /*!
-      Add a Local file to the Salome_file.
+      Set a number of node for the file. Default is the node 0.
 
-      \param file_name name of the file with the path.
-      \param node the number of the node where the file is.
+      \param file_name name of the file.
+      \param node_nbr node number where the file is.
 
-      \exception raised if the file is already added into the Salome_file.
+      \exception raised if the file doesn't exist.
     */
-    void setParallelLocalFile(in string comp_file_name, in long node) raises (SALOME::SALOME_Exception);
+    void setFileNode(in string file_name, in long node_nbr) raises (SALOME::SALOME_Exception);
 
     /*!
-      Add a Distributed file to the Salome_file.
+      Get the number of the node that actually managed the file.
 
-      \param comp_file_name name of the file with the path.
+      \param file_name name of managed file.
 
-      \exception raised if the file is already added into the Salome_file.
-    */
-    void setParallelDistributedFile(in string comp_file_name, in long node) raises (SALOME::SALOME_Exception);
+      \return node number of the file
+
+      \exception raised if the file doesn't exist.
+     */
+    long getFileNode(in string file_name) raises (SALOME::SALOME_Exception);
+
+    /*!
+      This method update the state of file for the Parallel_Salome_file. 
+
+      \param new_file the new state of file.
+     */
+    Engines::Container updateFile(in Engines::file new_file);
 
     /*!
-      This method update the state of the Parallel_Salome_file. Thus, each node
-      of the Parallel_Salome_file has the same state.
+      This method is used by the parallel implementation of recvFiles.
 
-      \param new_state the new state of the Parallel_Salome_file.
+      \exception raised if the file cannot be ok.
      */
-    void updateState(in Engines::SfState new_state);
+    void recvFiles_node() raises (SALOME::SALOME_Exception);
+
   };
-} ;
+};
 
 #endif
index a37b740f7319b756310cb701cca84b27a0731360..189f18638059592a9592a9713a782f335ef57a8b 100644 (file)
  <Interface>
   <Name>Component</Name>
   <Method>
-   <Name>ping</Name>
+   <Name>setInputServiceSalome_file</Name>
    <Type>distributed</Type>
   </Method>
  </Interface>
  <Interface>
-  <Name>Parallel_Salome_file</Name>
+  <Name>Parallel_Component</Name>
   <Method>
-   <Name>setParallelLocalFile</Name>
+   <Name>send_parallel_proxy_object</Name>
    <Type>distributed</Type>
   </Method>
+ </Interface>
+ <Interface>
+  <Name>Salome_file</Name>
   <Method>
-   <Name>setParallelDistributedFile</Name>
+   <Name>recvFiles</Name>
    <Type>distributed</Type>
   </Method>
  </Interface>
+ <Interface>
+  <Name>Parallel_Salome_file</Name>
+ </Interface>
 </Module>
 </GridCCM_Interface_description>
index 630413497a2864140d9d7ddfd5772f7d535adacf..ae2250f05e0b638317c6fda56c2253fc0c05a6f4 100644 (file)
@@ -877,6 +877,8 @@ Engines_Component_i::setInputFileToService(const char* service_name,
   _Salome_file_map_it = _map->find(Salome_file_name);
   if (_Salome_file_map_it ==  _map->end()) {
     Salome_file_i * Sfile = new Salome_file_i();
+    Engines::Container_ptr container = this->GetContainerRef();
+    Sfile->setContainer(Engines::Container::_duplicate(container));
     (*_map)[Salome_file_name] = Sfile;
   }
   else {
@@ -907,7 +909,6 @@ Engines_Component_i::checkInputServiceFiles(const char* service_name)
       file->recvFiles();
     }
   }
-
 }
 
 Engines::Salome_file_ptr 
@@ -953,6 +954,8 @@ Engines_Component_i::setOutputFileToService(const char* service_name,
   _Salome_file_map_it = _map->find(Salome_file_name);
   if (_Salome_file_map_it ==  _map->end()) {
     Salome_file_i * Sfile = new Salome_file_i();
+    Engines::Container_ptr container = this->GetContainerRef();
+    Sfile->setContainer(Engines::Container::_duplicate(container));
     (*_map)[Salome_file_name] = Sfile;
   }
   else {
index 799a396d7f1289ad85e1e8b19db9c5cb2362ef21..e5270c67444b3e13ee53d6d505079d5c02217e22 100644 (file)
@@ -676,6 +676,7 @@ Engines_Container_i::createSalome_file(const char* origFileName)
   if (CORBA::is_nil(_Salome_file_map[origName]))
     {
       Salome_file_i* aSalome_file = new Salome_file_i();
+      aSalome_file->setContainer(Engines::Container::_duplicate(this->_this()));
       try 
       {
        aSalome_file->setLocalFile(origFileName);
index aef2e21bf054a1af9a1b80bca057bc7f06dc114b..184a554c01c1208b2ded4e29f9e7753409453250 100644 (file)
@@ -44,6 +44,7 @@ Salome_file_i::Salome_file_i()
   _state.hdf5_file_name = CORBA::string_dup("");
   _state.number_of_files = 0;
   _state.files_ok = true;
+  _container = Engines::Container::_nil();
 }
 
 //=============================================================================
@@ -204,6 +205,9 @@ Salome_file_i::load(const char* hdf5_file) {
        infos.type = CORBA::string_dup(type.c_str());
        infos.source_file_name = CORBA::string_dup(source_file_name.c_str());
        infos.status = CORBA::string_dup(status.c_str());
+       // Infos for parallel extensions...
+       infos.node = 0;
+       infos.container = Engines::Container::_duplicate(_container);
 
        _fileManaged[file_name] = infos;
 
@@ -514,6 +518,9 @@ Salome_file_i::setLocalFile(const char* comp_file_name)
   infos.type = CORBA::string_dup(type.c_str());
   infos.source_file_name = CORBA::string_dup(source_file_name.c_str());
   infos.status = CORBA::string_dup(status.c_str());
+  // Infos for parallel extensions...
+  infos.node = 0;
+  infos.container = Engines::Container::_duplicate(_container);
 
   _fileManaged[file_name] = infos;
 
@@ -571,6 +578,9 @@ Salome_file_i::setDistributedFile(const char* comp_file_name)
   infos.type = CORBA::string_dup(type.c_str());
   infos.source_file_name = CORBA::string_dup(source_file_name.c_str());
   infos.status = CORBA::string_dup(status.c_str());
+  // Infos for parallel extensions...
+  infos.node = 0;
+  infos.container = Engines::Container::_duplicate(_container);
 
   _fileManaged[file_name] = infos;
 
@@ -762,6 +772,7 @@ Salome_file_i::getDistributedFile(std::string file_name)
   std::string comp_file_name(_fileManaged[file_name].path.in());
   comp_file_name.append(_fileManaged[file_name].file_name.in());
 
+  // Test if the process can write on disk
   if ((fp = fopen(comp_file_name.c_str(),"wb")) == NULL)
   {
     INFOS("file " << comp_file_name << " cannot be open for writing");
@@ -826,17 +837,6 @@ Salome_file_i::removeFile(const char* file_name)
   MESSAGE("Salome_file_i::removeFile : NOT YET IMPLEMENTED");
 }
     
-//=============================================================================
-/*! 
- *  CORBA method
- * \see Engines::Salome_file::deleteFile
- */
-//=============================================================================
-void 
-Salome_file_i::deleteFile(const char* file_name) {
-  MESSAGE("Salome_file_i::deleteFile : NOT YET IMPLEMENTED");
-}
-
 //=============================================================================
 /*! 
  *  CORBA method
@@ -851,18 +851,7 @@ Salome_file_i::removeFiles() {
 //=============================================================================
 /*! 
  *  CORBA method
- * \see Engines::Salome_file::deleteFiles
- */
-//=============================================================================
-void 
-Salome_file_i::deleteFiles() {
-  MESSAGE("Salome_file_i::deleteFiles : NOT YET IMPLEMENTED");
-}
-
-//=============================================================================
-/*! 
- *  CORBA method
- * \see Engines::Salome_file::recvFiles
+ * \see Engines::Salome_file::getFilesInfos
  */
 //=============================================================================
 Engines::files* 
@@ -1022,3 +1011,16 @@ Salome_file_i::getBlock(CORBA::Long fileId)
   return aBlock;
 }
 
+void 
+Salome_file_i::setContainer(Engines::Container_ptr container)
+{
+  _container = Engines::Container::_duplicate(container);
+
+  // Update All the files
+  _t_fileManaged::iterator begin = _fileManaged.begin();
+  _t_fileManaged::iterator end = _fileManaged.end();
+  for(;begin!=end;begin++) {
+    begin->second.container = Engines::Container::_duplicate(container);
+  }
+}
+
index b8432cd0160a1613022098251a29b06095af661e..565cd7f68f1d2c3ae9946dc9b2c0a9d7da003766 100644 (file)
@@ -63,12 +63,10 @@ class CONTAINER_EXPORT Salome_file_i:
 
     // Removing or deleting files
     virtual void removeFile(const char* file_name);
-    virtual void deleteFile(const char* file_name);
     virtual void removeFiles();
-    virtual void deleteFiles();
-
 
     // Informations methods:
+    virtual void setContainer(Engines::Container_ptr container);
     virtual Engines::files* getFilesInfos();
     virtual Engines::file* getFileInfos(const char* file_name);
     virtual Engines::SfState* getSalome_fileState();
@@ -101,6 +99,7 @@ class CONTAINER_EXPORT Salome_file_i:
     _t_fileManaged _fileManaged;
     _t_fileDistributedSource _fileDistributedSource;
     Engines::SfState _state;
+    Engines::Container_ptr _container;
 };
 
 #endif
index 7e19a253a107612594bb0bc9b9900b03aedfdc78..c4f15adb57efc617e330ca5ba512525fda130f59 100644 (file)
@@ -36,6 +36,7 @@ Engines_ParallelDSC_i::Engines_ParallelDSC_i(CORBA::ORB_ptr orb, char * ior,
   Engines::DSC_serv(orb, ior),
   Engines::Superv_Component_serv(orb, ior),
   Engines::Component_serv(orb, ior),
+  Engines::Parallel_Component_serv(orb, ior),
   InterfaceParallel_impl(orb,ior)
 {
 }
index 4fc7ec17310e9f5518b3336336e066a0afd52128..1734d4f10a91ab870ed57f8eff28bd188b76c8cb 100644 (file)
@@ -33,7 +33,8 @@ include $(top_srcdir)/salome_adm/unix/make_common_starter.am
 #
 # header files  
 salomeinclude_HEADERS = SALOME_ParallelComponent_i.hxx \
-                       SALOME_ParallelContainer_i.hxx 
+                       SALOME_ParallelContainer_i.hxx \
+                       Parallel_Salome_file_i.hxx
 
 #
 # ===============================================================
@@ -74,7 +75,8 @@ lib_LTLIBRARIES = libSalomeParallelContainer.la
 
 libSalomeParallelContainer_la_SOURCES  = SALOME_ParallelComponent_i.cxx \
                                         SALOME_ParallelContainer_i.cxx \
-                                        $(top_srcdir)/src/Container/Salome_file_i.cxx
+                                        $(top_srcdir)/src/Container/Salome_file_i.cxx \
+                                        Parallel_Salome_file_i.cxx
 
 libSalomeParallelContainer_la_CXXFLAGS = $(COMMON_CPPFLAGS)
 
diff --git a/src/ParallelContainer/Parallel_Salome_file_i.cxx b/src/ParallelContainer/Parallel_Salome_file_i.cxx
new file mode 100644 (file)
index 0000000..440a0b4
--- /dev/null
@@ -0,0 +1,333 @@
+// Copyright (C) 2007  OPEN CASCADE, CEA/DEN, EDF R&D, PRINCIPIA R&D
+// 
+//  This library is free software; you can redistribute it and/or 
+//  modify it under the terms of the GNU Lesser General Public 
+//  License as published by the Free Software Foundation; either 
+//  version 2.1 of the License. 
+// 
+//  This library is distributed in the hope that it will be useful, 
+//  but WITHOUT ANY WARRANTY; without even the implied warranty of 
+//  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
+//  Lesser General Public License for more details. 
+// 
+//  You should have received a copy of the GNU Lesser General Public 
+//  License along with this library; if not, write to the Free Software 
+//  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA 
+// 
+// See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
+//
+//
+//
+//  File   : Parallel_Salome_file_i.cxx
+//  Author : André RIBES, EDF
+//  Module : SALOME
+//  $Header: 
+
+#include "Parallel_Salome_file_i.hxx"
+#include "utilities.h"
+
+Parallel_Salome_file_i::Parallel_Salome_file_i(CORBA::ORB_ptr orb, char * ior) :
+  InterfaceParallel_impl(orb,ior), 
+  Engines::Salome_file_serv(orb,ior),
+  Engines::fileTransfer_serv(orb,ior),
+  Engines::Parallel_Salome_file_serv(orb,ior)
+{
+  CORBA::Object_ptr obj = _orb->string_to_object(ior);
+  proxy = Engines::Parallel_Salome_file::_narrow(obj);
+  parallel_file = NULL;
+}
+
+Parallel_Salome_file_i::~Parallel_Salome_file_i() {}
+
+void 
+Parallel_Salome_file_i::load(const char* hdf5_file) {
+  MESSAGE("Parallel_Salome_file_i::load : NOT YET IMPLEMENTED");
+  SALOME::ExceptionStruct es;
+  es.type = SALOME::INTERNAL_ERROR;
+  es.text = "Parallel_Salome_file_i::load : NOT YET IMPLEMENTED";
+  throw SALOME::SALOME_Exception(es);
+}
+
+void 
+Parallel_Salome_file_i::save(const char* hdf5_file) {
+  MESSAGE("Parallel_Salome_file_i::save : NOT YET IMPLEMENTED");
+  SALOME::ExceptionStruct es;
+  es.type = SALOME::INTERNAL_ERROR;
+  es.text = "Parallel_Salome_file_i::save : NOT YET IMPLEMENTED";
+  throw SALOME::SALOME_Exception(es);
+}
+
+void 
+Parallel_Salome_file_i::save_all(const char* hdf5_file) {
+  MESSAGE("Parallel_Salome_file_i::save_all : NOT YET IMPLEMENTED");
+  SALOME::ExceptionStruct es;
+  es.type = SALOME::INTERNAL_ERROR;
+  es.text = "Parallel_Salome_file_i::save_all : NOT YET IMPLEMENTED";
+  throw SALOME::SALOME_Exception(es);
+}
+
+void 
+Parallel_Salome_file_i::connect(Engines::Salome_file_ptr source_Salome_file) {
+  // only one file managed case 
+  Salome_file_i::connect(source_Salome_file);
+
+  // Test if the file is managed in an another node
+  // If yes, node is updated
+  _t_fileManaged::iterator it = _fileManaged.begin();
+  std::string file_name = it->first;
+  if (_fileManaged[file_name].node > 0 && getMyRank() == 0) {
+    if (parallel_file == NULL)
+      parallel_file = Engines::PaCO_Parallel_Salome_file::PaCO_narrow(proxy, _orb);
+    parallel_file->connect(source_Salome_file, _fileManaged[file_name].node);
+  }
+}
+
+void 
+Parallel_Salome_file_i::connectDistributedFile(const char * file_name,
+                                              Engines::Salome_file_ptr source_Salome_file) {
+  Salome_file_i::connectDistributedFile(file_name, source_Salome_file);
+
+  // Test if the file is managed in an another node
+  // If yes, node is updated
+  std::string fname(file_name);
+  if (_fileManaged[fname].node > 0 && getMyRank() == 0) {
+    if (parallel_file == NULL)
+      parallel_file = Engines::PaCO_Parallel_Salome_file::PaCO_narrow(proxy, _orb);
+    parallel_file->connectDistributedFile(file_name, source_Salome_file, _fileManaged[fname].node);
+  }
+}
+
+void 
+Parallel_Salome_file_i::setDistributedSourceFile(const char* file_name,
+                                                const char * source_file_name) {
+  Salome_file_i::setDistributedSourceFile(file_name, source_file_name);
+  // Test if the file is managed in an another node
+  // If yes, node is updated
+  std::string fname(file_name);
+  if (_fileManaged[fname].node > 0 && getMyRank() == 0) {
+    if (parallel_file == NULL)
+      parallel_file = Engines::PaCO_Parallel_Salome_file::PaCO_narrow(proxy, _orb);
+    parallel_file->setDistributedSourceFile(file_name, source_file_name, _fileManaged[fname].node);
+  }
+}
+
+void
+Parallel_Salome_file_i::recvFiles() {
+  if (parallel_file == NULL)
+    parallel_file = Engines::PaCO_Parallel_Salome_file::PaCO_narrow(proxy, _orb);
+
+  std::string files_not_ok("");
+  int total = getTotalNode();
+  for (int i =0; i<total; i++) {
+    try {
+      parallel_file->recvFiles_node(i);
+    }
+    catch (SALOME::SALOME_Exception & ex) {
+      files_not_ok = files_not_ok + std::string(ex.details.text.in());
+    }
+  }
+
+  if (files_not_ok != "")
+  {
+    SALOME::ExceptionStruct es;
+    es.type = SALOME::INTERNAL_ERROR;
+    std::string text = "files not ready : " + files_not_ok;
+    es.text = CORBA::string_dup(text.c_str());
+    throw SALOME::SALOME_Exception(es);
+  }
+  else
+  {
+    // We change the state of the Salome_file
+    _state.files_ok = true;
+  }
+}
+
+void 
+Parallel_Salome_file_i::recvFiles_node() {
+
+  std::string files_not_ok("");
+  _t_fileManaged::iterator begin = _fileManaged.begin();
+  _t_fileManaged::iterator end = _fileManaged.end();
+  for(;begin!=end;begin++) 
+  {
+    bool result = true;
+    Engines::file file_infos = begin->second;
+    if (file_infos.node == getMyRank()) {
+      // Test if the file is local or distributed
+      if (std::string(file_infos.type.in()) == "local")
+      {
+       if (std::string(file_infos.status.in()) == "not_ok")
+         result = checkLocalFile(file_infos.file_name.in());
+      }
+      else
+      {
+       if (std::string(file_infos.status.in()) == "not_ok") {
+         // 2 cases :
+         // Source file is a Salome_file
+         // Source file is a Parallel_Salome_file
+         PaCO::ParallelKernel_var interface_manager = 
+           PaCO::ParallelKernel::_narrow(_fileDistributedSource[file_infos.file_name.in()]);
+         if (CORBA::is_nil(interface_manager))
+           result = getDistributedFile(file_infos.file_name.in());
+         else
+           result = getParallelDistributedFile(file_infos.file_name.in());
+       }
+      }
+      // if the result is false
+      // we add this file to files_not_ok
+      if (!result) 
+      {
+       files_not_ok.append(" ");
+       files_not_ok.append(file_infos.file_name.in());
+      }
+    }
+  }
+  if (files_not_ok != "")
+  {
+    SALOME::ExceptionStruct es;
+    es.type = SALOME::INTERNAL_ERROR;
+    std::string text = files_not_ok;
+    es.text = CORBA::string_dup(text.c_str());
+    throw SALOME::SALOME_Exception(es);
+  }
+}
+
+bool 
+Parallel_Salome_file_i::getParallelDistributedFile(std::string file_name) {
+
+  bool result = true;
+  const char * source_file_name = _fileManaged[file_name].source_file_name.in();
+  int fileId;
+  FILE* fp;
+  std::string comp_file_name(_fileManaged[file_name].path.in());
+  comp_file_name.append(_fileManaged[file_name].file_name.in());
+
+  // Test if the process can write on disk
+  if ((fp = fopen(comp_file_name.c_str(),"wb")) == NULL)
+  {
+    INFOS("file " << comp_file_name << " cannot be open for writing");
+    _fileManaged[file_name].status = CORBA::string_dup("not_ok");
+    result = false;
+    return result;
+  }
+
+  Engines::PaCO_Parallel_Salome_file * parallel_source_file = 
+    Engines::PaCO_Parallel_Salome_file::PaCO_narrow(_fileDistributedSource[file_name], _orb);
+
+  int node = parallel_source_file->getFileNode(source_file_name);
+
+  try 
+  {
+    fileId = parallel_source_file->open(source_file_name, node);
+  }
+  catch (...) 
+  {
+    _fileManaged[file_name].status = CORBA::string_dup("not_ok");
+    fclose(fp);
+    result = false;
+    return result;
+  }
+
+  if (fileId > 0)
+  {
+    Engines::fileBlock* aBlock;
+    int toFollow = 1;
+    int ctr=0;
+    MESSAGE("begin of transfer of " << comp_file_name);
+    while (toFollow)
+    {
+      ctr++;
+      aBlock = parallel_source_file->getBlock(fileId, node);
+      toFollow = aBlock->length();
+      CORBA::Octet *buf = aBlock->get_buffer();
+      int nbWri = fwrite(buf, sizeof(CORBA::Octet), toFollow, fp);
+      ASSERT(nbWri == toFollow);
+    }
+    fclose(fp);
+    MESSAGE("end of transfer of " << comp_file_name);
+    parallel_source_file->close(fileId, node);
+  }
+  else
+  {
+    INFOS("open reference file for copy impossible");
+    result = false;
+    fclose(fp);
+    _fileManaged[file_name].status = CORBA::string_dup("not_ok");
+    return result;
+  }
+
+  _fileManaged[file_name].status = CORBA::string_dup("ok");
+  return result;
+}
+
+void 
+Parallel_Salome_file_i::setContainer(Engines::Container_ptr container) {
+  _container = Engines::Container::_duplicate(container);
+
+  // Update All the files managed by the node
+  _t_fileManaged::iterator begin = _fileManaged.begin();
+  _t_fileManaged::iterator end = _fileManaged.end();
+  for(;begin!=end;begin++) {
+    begin->second.container = Engines::Container::_duplicate(container);
+  }
+}
+
+void 
+Parallel_Salome_file_i::setFileNode(const char* file_name, CORBA::Long node) {
+  
+  // Test if this file is managed
+  std::string fname(file_name);
+  _t_fileManaged::iterator it = _fileManaged.find(fname);
+  if (it == _fileManaged.end()) 
+  {
+    SALOME::ExceptionStruct es;
+    es.type = SALOME::INTERNAL_ERROR;
+    es.text = "file is not managed";
+    throw SALOME::SALOME_Exception(es);
+  }
+
+  // Update file infos into this node (node 0)
+  // and into the node that actually managed it
+  _fileManaged[fname].node = node;
+
+  if (node > 0) {
+    if (parallel_file == NULL)
+      parallel_file = Engines::PaCO_Parallel_Salome_file::PaCO_narrow(proxy, _orb);
+
+    Engines::Container_ptr cont = parallel_file->updateFile(_fileManaged[fname], node);
+
+    // Update file infos with the new reference of the container
+    _fileManaged[fname].container = Engines::Container::_duplicate(cont);
+  }
+}
+
+Engines::Container_ptr
+Parallel_Salome_file_i::updateFile(const Engines::file& file) {
+  // Copy file
+  Engines::file new_file_infos(file);
+
+  // Adding it to node list
+  new_file_infos.container = Engines::Container::_duplicate(_container);
+  std::string fname(new_file_infos.file_name.in());
+  _fileManaged[fname] = new_file_infos;
+
+  // Return the new reference of the container associated to the file
+  return Engines::Container::_duplicate(_container);
+}
+
+CORBA::Long 
+Parallel_Salome_file_i::getFileNode(const char* file_name) {
+  
+  // Test if this file is managed
+  std::string fname(file_name);
+  _t_fileManaged::iterator it = _fileManaged.find(fname);
+  if (it == _fileManaged.end()) 
+  {
+    SALOME::ExceptionStruct es;
+    es.type = SALOME::INTERNAL_ERROR;
+    es.text = "file is not managed";
+    throw SALOME::SALOME_Exception(es);
+  }
+
+  return _fileManaged[fname].node;
+}
diff --git a/src/ParallelContainer/Parallel_Salome_file_i.hxx b/src/ParallelContainer/Parallel_Salome_file_i.hxx
new file mode 100644 (file)
index 0000000..f2847b8
--- /dev/null
@@ -0,0 +1,74 @@
+// Copyright (C) 2007  OPEN CASCADE, CEA/DEN, EDF R&D, PRINCIPIA R&D
+// 
+//  This library is free software; you can redistribute it and/or 
+//  modify it under the terms of the GNU Lesser General Public 
+//  License as published by the Free Software Foundation; either 
+//  version 2.1 of the License. 
+// 
+//  This library is distributed in the hope that it will be useful, 
+//  but WITHOUT ANY WARRANTY; without even the implied warranty of 
+//  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
+//  Lesser General Public License for more details. 
+// 
+//  You should have received a copy of the GNU Lesser General Public 
+//  License along with this library; if not, write to the Free Software 
+//  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA 
+// 
+// See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
+//
+//
+//
+//  File   : Parallel_Salome_file_i.hxx
+//  Author : André RIBES, EDF
+//  Module : SALOME
+//  $Header: 
+
+#ifndef _PARALLEL_SALOME_FILE_I_HXX_
+#define _PARALLEL_SALOME_FILE_I_HXX_
+
+#include <SALOMEconfig.h>
+#include <SALOME_Container.hxx>
+#include <map>
+#include <cstdio>
+
+#include "SALOME_ComponentPaCO_Engines_Parallel_Salome_file_server.h"
+#include "Salome_file_i.hxx"
+
+class CONTAINER_EXPORT Parallel_Salome_file_i:
+  public virtual Salome_file_i,
+  public virtual Engines::Parallel_Salome_file_serv
+{
+  public:
+    Parallel_Salome_file_i(CORBA::ORB_ptr orb, char * ior);
+    virtual ~Parallel_Salome_file_i();
+
+    virtual void setFileNode(const char* file_name, CORBA::Long node);
+    virtual CORBA::Long getFileNode(const char* file_name);
+    virtual Engines::Container_ptr updateFile(const Engines::file& file);
+
+    // New implementation for these methods
+    // For the parallel cases
+    virtual void load(const char* hdf5_file);
+    virtual void save(const char* hdf5_file);
+    virtual void save_all(const char* hdf5_file);
+    virtual void connect(Engines::Salome_file_ptr source_Salome_file);
+    virtual void connectDistributedFile(const char * file_name,
+                                       Engines::Salome_file_ptr source_Salome_file);
+    virtual void setDistributedSourceFile(const char* file_name,
+                                         const char * source_file_name);
+    virtual void recvFiles();
+    virtual void recvFiles_node();
+    virtual void setContainer(Engines::Container_ptr container);
+    //virtual void removeFile(const char* file_name);
+    //virtual void removeFiles();
+    
+    // Local C++ methods
+    virtual bool getParallelDistributedFile(std::string file_name);
+
+  private :
+    Engines::Parallel_Salome_file_var proxy;
+    Engines::PaCO_Parallel_Salome_file * parallel_file;
+};
+
+#endif
+
index f215c229105c6f997180d2a18a291077c0a6a43d..3a6002cb7f9ef87dcca4a5178b937c044141e00f 100644 (file)
@@ -59,7 +59,7 @@ bool Engines_Parallel_Component_i::_isMultiInstance = false;
 //=============================================================================
 
 Engines_Parallel_Component_i::Engines_Parallel_Component_i(CORBA::ORB_ptr orb, char * ior) : 
-  InterfaceParallel_impl(orb,ior), Engines::Component_serv(orb,ior)
+  InterfaceParallel_impl(orb,ior), Engines::Component_serv(orb,ior), Engines::Parallel_Component_serv(orb,ior)
 {
   //ASSERT(0);
   INFOS("Default Constructor...");
@@ -84,7 +84,9 @@ Engines_Parallel_Component_i::Engines_Parallel_Component_i(CORBA::ORB_ptr orb, c
                                         const char *instanceName,
                                         const char *interfaceName,
                                          bool notif) :
-  InterfaceParallel_impl(orb,ior), Engines::Component_serv(orb,ior),
+  InterfaceParallel_impl(orb,ior), 
+  Engines::Component_serv(orb,ior),
+  Engines::Parallel_Component_serv(orb,ior),
   _instanceName(instanceName),
   _interfaceName(interfaceName),
   _myConnexionToRegistry(0),
@@ -106,7 +108,11 @@ Engines_Parallel_Component_i::Engines_Parallel_Component_i(CORBA::ORB_ptr orb, c
                                                 _instanceName.c_str());
 
   _notifSupplier = new NOTIFICATION_Supplier(instanceName, notif);
-  //SCRUTE(pd_refCount);
+
+  deploy_mutex = new pthread_mutex_t();
+  pthread_mutex_init(deploy_mutex, NULL);
+  _proxy = NULL;
+ //SCRUTE(pd_refCount);
 }
 
 //=============================================================================
@@ -121,6 +127,11 @@ Engines_Parallel_Component_i::~Engines_Parallel_Component_i()
 {
   MESSAGE("Component destructor");
   Engines_Parallel_Container_i::decInstanceCnt(_interfaceName);
+  pthread_mutex_destroy(deploy_mutex);
+  delete deploy_mutex;
+
+  if (_proxy)
+    delete _proxy;
 }
 
 //=============================================================================
@@ -567,7 +578,8 @@ void Engines_Parallel_Component_i::beginService(const char *serviceName)
          (*it).second >>= value;
          // ---todo: replace __GNUC__ test by an autoconf macro AC_CHECK_FUNC.
 #if defined __GNUC__
-         int ret = setenv(cle.c_str(), value, overwrite);
+         //int ret = setenv(cle.c_str(), value, overwrite);
+         setenv(cle.c_str(), value, overwrite);
 #else
          //CCRT porting : setenv not defined in stdlib.h
          std::string s(cle);
@@ -575,7 +587,8 @@ void Engines_Parallel_Component_i::beginService(const char *serviceName)
          s+=value;
          // char* cast because 1st arg of linux putenv function
          // is not a const char* !
-         int ret=putenv((char *)s.c_str());
+         //int ret=putenv((char *)s.c_str());
+         putenv((char *)s.c_str());
          //End of CCRT porting
 #endif
          MESSAGE("--- setenv: "<<cle<<" = "<< value);
@@ -771,30 +784,32 @@ Engines::TMPFile* Engines_Parallel_Component_i::DumpPython(CORBA::Object_ptr the
   return aStreamFile._retn(); 
 }
 
+// This is a parallel method
 Engines::Salome_file_ptr 
 Engines_Parallel_Component_i::getInputServiceSalome_file(const char* service_name, 
                                                const char* Salome_file_name) 
 {
   // Try to find the service, if it doesn't exist, we throw an exception.
-  _Service_file_map_it = _Input_Service_file_map.find(service_name);
-  if (_Service_file_map_it ==  _Input_Service_file_map.end()) {
+  _Proxy_Service_file_map_it = _Proxy_Input_Service_file_map.find(service_name);
+  if (_Proxy_Service_file_map_it ==  _Proxy_Input_Service_file_map.end()) {
     SALOME::ExceptionStruct es;
     es.type = SALOME::INTERNAL_ERROR;
     es.text = "service doesn't have salome files";
     throw SALOME::SALOME_Exception(es);
   }
-  _t_Salome_file_map * _map = _Input_Service_file_map[service_name];
+  _t_Proxy_Salome_file_map * _map = _Proxy_Input_Service_file_map[service_name];
 
   // Try to find the Salome_file ...
-  _Salome_file_map_it = _map->find(Salome_file_name);
-  if (_Salome_file_map_it ==  _map->end()) {
+  _Proxy_Salome_file_map_it = _map->find(Salome_file_name);
+  if (_Proxy_Salome_file_map_it ==  _map->end()) {
     SALOME::ExceptionStruct es;
     es.type = SALOME::INTERNAL_ERROR;
     es.text = "service doesn't have this Salome_file";
     throw SALOME::SALOME_Exception(es);
   }
-  Salome_file_i * Sfile = (*_map)[Salome_file_name];
 
+  // Client get the proxy object
+  Engines::Parallel_Salome_file_proxy_impl * Sfile = (*_map)[Salome_file_name];
   return Sfile->_this();
 }
 
@@ -807,14 +822,88 @@ Engines_Parallel_Component_i::setInputFileToService(const char* service_name,
   if (_Service_file_map_it ==  _Input_Service_file_map.end()) {
     _t_Salome_file_map * _map = new _t_Salome_file_map();
     _Input_Service_file_map[service_name] = _map;
+    _t_Proxy_Salome_file_map * _proxy_map = new _t_Proxy_Salome_file_map();
+    _Proxy_Input_Service_file_map[service_name] = _proxy_map;
   }
   _t_Salome_file_map * _map = _Input_Service_file_map[service_name];
+  _t_Proxy_Salome_file_map * _proxy_map = _Proxy_Input_Service_file_map[service_name];
   
+  char * proxy_ior;
+
   // Try to find the Salome_file ...
   _Salome_file_map_it = _map->find(Salome_file_name);
   if (_Salome_file_map_it ==  _map->end()) {
-    Salome_file_i * Sfile = new Salome_file_i();
-    (*_map)[Salome_file_name] = Sfile;
+
+    // We create a new PaCO++ object.
+    // He has the same configuration than
+    // his component
+
+    pthread_mutex_lock(deploy_mutex);
+    // Firstly, we have to create the proxy object
+    // of the Salome_file and transmit his
+    // reference to the other nodes.
+    if (getMyRank() == 0) {
+      Engines::Parallel_Salome_file_proxy_impl * proxy = 
+       new Engines::Parallel_Salome_file_proxy_impl(CORBA::ORB::_duplicate(_orb));
+      PaCO_operation * proxy_global_ptr =  proxy->getContext("global_paco_context");
+      proxy_global_ptr->setTypeClient(true);
+      PaCO::PacoTopology_t client_topo;
+      client_topo.total = 1;
+      proxy_global_ptr->setClientTopo(client_topo);
+      PaCO::PacoTopology_t serveur_topo;
+      serveur_topo.total = getTotalNode();
+      proxy->setTopo(serveur_topo);
+
+      // We initialize the object with the context of the Parallel component
+      PaCO_operation * compo_global_ptr =  this->getContext("global_paco_context");
+      compo_global_ptr->init_context(proxy_global_ptr);
+
+      // We register the CORBA objet into the POA
+      CORBA::Object_ptr proxy_ref = proxy->_this();
+
+      // We send the reference to all the nodes...
+      CORBA::Object_ptr comp_proxy = _orb->string_to_object(_ior.c_str());
+      Engines::Parallel_Component_var component_proxy = Engines::Parallel_Component::_narrow(comp_proxy);
+      component_proxy->send_parallel_proxy_object(proxy_ref);
+
+      // Adding proxy into the map
+      (*_proxy_map)[Salome_file_name] = proxy;
+    }
+    else {
+      this->wait_parallel_object_proxy();
+    }
+
+    proxy_ior = this->get_parallel_proxy_object();
+
+    // We register each node of the parallel Salome_file object
+    // into the proxy.
+    for (int i = 0; i < getTotalNode(); i++) {
+      if (i ==  getMyRank()) {
+       Parallel_Salome_file_i * servant = 
+         new Parallel_Salome_file_i(CORBA::ORB::_duplicate(_orb), proxy_ior);
+       PaCO_operation * servant_global_ptr = servant->getContext("global_paco_context");
+       
+       // We initialize the object with the context of the Parallel component
+       PaCO_operation * compo_global_ptr =  this->getContext("global_paco_context");
+       compo_global_ptr->init_context(servant_global_ptr);
+       
+       // We register the CORBA objet into the POA
+       servant->POA_PaCO::InterfaceParallel::_this();
+
+       // Register the servant
+       servant->deploy(getMyRank());
+
+       // Adding servant to the map
+       (*_map)[Salome_file_name] = servant;
+      }
+
+      PaCO_operation * compo_global_ptr =  this->getContext("global_paco_context");
+      compo_global_ptr->my_com->paco_barrier();
+    }
+    // Parallel_Salome_file is created and deployed
+    delete _proxy;
+    _proxy = NULL;
+    pthread_mutex_unlock(deploy_mutex);
   }
   else {
     // Salome_file_name already added into the service
@@ -824,27 +913,26 @@ Engines_Parallel_Component_i::setInputFileToService(const char* service_name,
     es.text = "Salome_file_name already added";
     throw SALOME::SALOME_Exception(es);
   }
-  Salome_file_i * Sfile = (*_map)[Salome_file_name];
 
-  return Sfile->_this();
+  CORBA::Object_ptr obj = _orb->string_to_object(proxy_ior);
+  return Engines::Salome_file::_narrow(obj);
 }
 
 void 
 Engines_Parallel_Component_i::checkInputServiceFiles(const char* service_name) 
 {
   // Try to find the service, if it doesn't exist, nothing to do.
-  _Service_file_map_it = _Input_Service_file_map.find(service_name);
-  if (_Service_file_map_it !=  _Input_Service_file_map.end()) {
-    _t_Salome_file_map * _map = _Input_Service_file_map[service_name];
-    _t_Salome_file_map::iterator begin = _map->begin();
-    _t_Salome_file_map::iterator end = _map->end();
+  _Proxy_Service_file_map_it = _Proxy_Input_Service_file_map.find(service_name);
+  if (_Proxy_Service_file_map_it !=  _Proxy_Input_Service_file_map.end()) {
+    _t_Proxy_Salome_file_map * _proxy_map = _Proxy_Input_Service_file_map[service_name];
+    _t_Proxy_Salome_file_map::iterator begin = _proxy_map->begin();
+    _t_Proxy_Salome_file_map::iterator end = _proxy_map->end();
 
     for(;begin!=end;begin++) {
-      Salome_file_i * file = begin->second;
+      Engines::Parallel_Salome_file_proxy_impl * file = begin->second;
       file->recvFiles();
     }
   }
-
 }
 
 Engines::Salome_file_ptr 
@@ -889,8 +977,13 @@ Engines_Parallel_Component_i::setOutputFileToService(const char* service_name,
   // Try to find the Salome_file ...
   _Salome_file_map_it = _map->find(Salome_file_name);
   if (_Salome_file_map_it ==  _map->end()) {
-    Salome_file_i * Sfile = new Salome_file_i();
-    (*_map)[Salome_file_name] = Sfile;
+//    Salome_file_i * Sfile = new Salome_file_i();
+//    (*_map)[Salome_file_name] = Sfile;
+//
+//
+//
+//   TODO
+//
   }
   else {
     // Salome_file_name already added into the service
@@ -923,3 +1016,23 @@ Engines_Parallel_Component_i::checkOutputServiceFiles(const char* service_name)
 
 }
 
+void 
+Engines_Parallel_Component_i::send_parallel_proxy_object(CORBA::Object_ptr proxy_ref) {
+  _proxy = _orb->object_to_string(proxy_ref);
+}
+
+void 
+Engines_Parallel_Component_i::wait_parallel_object_proxy() {
+  char * proxy = NULL;
+  proxy =  get_parallel_proxy_object();
+  while(proxy == NULL)
+  {
+    sleep(1);
+    proxy = get_parallel_proxy_object();
+  }
+}
+
+char * 
+Engines_Parallel_Component_i::get_parallel_proxy_object() {
+  return _proxy;
+}
index 1025e49ff2f24743de413a369d3389740f8698f1..5355d5f9f1d4bfaf9581e2ee3668a984b47dbee9 100644 (file)
 #include <map>
 #include <SALOMEconfig.h>
 
-#include "SALOME_ComponentPaCO_Engines_Component_server.h"
+#include "SALOME_ComponentPaCO_Engines_Parallel_Component_server.h"
 
 #include "NOTIFICATION.hxx"
 #include "RegistryConnexion.hxx"
-#include "Salome_file_i.hxx"
+#include "Parallel_Salome_file_i.hxx"
 
 class Engines_Parallel_Container_i;
 
@@ -60,7 +60,7 @@ class Engines_Parallel_Container_i;
 #endif
 
 class CONTAINER_EXPORT Engines_Parallel_Component_i: 
-  public virtual Engines::Component_serv,
+  public virtual Engines::Parallel_Component_serv,
   public virtual PortableServer::RefCountServantBase
 {
 public:
@@ -112,6 +112,7 @@ public:
  virtual Engines::Salome_file_ptr setOutputFileToService(const char* service_name, 
                                                         const char* Salome_file_name);
 
+ void send_parallel_proxy_object(CORBA::Object_ptr proxy_ref);
   // --- local C++ methods
 
   PortableServer::ObjectId * getId(); 
@@ -131,6 +132,9 @@ public:
   void SetCurCpu() ;
   long CpuUsed() ;
 
+  void wait_parallel_object_proxy();
+  char * get_parallel_proxy_object();
+
 protected:
   int _studyId; // -1: not initialised; 0: multiStudy; >0: study
   static bool _isMultiStudy;
@@ -148,20 +152,31 @@ protected:
   NOTIFICATION_Supplier* _notifSupplier;
   std::map<std::string,CORBA::Any>_fieldsDict;
 
-  // Map Salome_file_name to Salome_file*
-  typedef std::map<std::string, Salome_file_i*> _t_Salome_file_map;
+  // Map Salome_file_name to Parallel_Salome_file*
+  typedef std::map<std::string, Parallel_Salome_file_i*> _t_Salome_file_map;
+  typedef std::map<std::string, Engines::Parallel_Salome_file_proxy_impl*> _t_Proxy_Salome_file_map;
+
   // Map Service_name to  _Salome_file_map
   typedef std::map<std::string, Engines_Parallel_Component_i::_t_Salome_file_map*> _t_Service_file_map;
+  typedef std::map<std::string, Engines_Parallel_Component_i::_t_Proxy_Salome_file_map*> _t_Proxy_Service_file_map;
   
   _t_Service_file_map _Input_Service_file_map;
   _t_Service_file_map _Output_Service_file_map;
   _t_Service_file_map::iterator _Service_file_map_it;
   _t_Salome_file_map::iterator _Salome_file_map_it;
 
+  _t_Proxy_Service_file_map _Proxy_Input_Service_file_map;
+  _t_Proxy_Service_file_map _Proxy_Output_Service_file_map;
+  _t_Proxy_Service_file_map::iterator _Proxy_Service_file_map_it;
+  _t_Proxy_Salome_file_map::iterator _Proxy_Salome_file_map_it;
+
   std::string _serviceName ;
   std::string _graphName ;
   std::string _nodeName ;
 
+  pthread_mutex_t * deploy_mutex;
+  char * _proxy;
+
 private:
 #ifndef WNT
   pthread_t _ThreadId ;
index b2b81b7b5fd4eddcbf2805bdd8751778c963808b..b1063f3697a3f07ed72d4d01816e59e29a01fde7 100644 (file)
@@ -20,7 +20,7 @@
 //  See http://www.opencascade.org/SALOME/ or email : webmaster.salome@opencascade.org 
 //
 //  File   : SALOME_ParallelContainer_i.cxx
-//  Author : André RIBES, EDF
+//  Author : Andr RIBES, EDF
 //  Author : Paul RASCLE, EDF - MARC TAJCHMAN, CEA 
 
 #include <SALOMEconfig.h>