Salome HOME
Synchronize adm files
[modules/kernel.git] / src / ParallelContainer / Parallel_Salome_file_i.cxx
1 // Copyright (C) 2007-2014  CEA/DEN, EDF R&D, OPEN CASCADE
2 //
3 // Copyright (C) 2003-2007  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 // CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
5 //
6 // This library is free software; you can redistribute it and/or
7 // modify it under the terms of the GNU Lesser General Public
8 // License as published by the Free Software Foundation; either
9 // version 2.1 of the License, or (at your option) any later version.
10 //
11 // This library is distributed in the hope that it will be useful,
12 // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 // Lesser General Public License for more details.
15 //
16 // You should have received a copy of the GNU Lesser General Public
17 // License along with this library; if not, write to the Free Software
18 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
19 //
20 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 //
22
23 //  File   : Parallel_Salome_file_i.cxx
24 //  Author : AndrĂ© RIBES, EDF
25 //  Module : SALOME
26 //  $Header: 
27 //
28 #include "Parallel_Salome_file_i.hxx"
29 #include "utilities.h"
30
31 Parallel_Salome_file_i::Parallel_Salome_file_i(CORBA::ORB_ptr orb, 
32                                                const char * ior,
33                                                int rank) :
34   InterfaceParallel_impl(orb,ior,rank), 
35   Engines::Salome_file_serv(orb,ior,rank),
36   Engines::Salome_file_base_serv(orb,ior,rank),
37   Engines::fileTransfer_serv(orb,ior,rank),
38   Engines::Parallel_Salome_file_serv(orb,ior,rank),
39   Engines::fileTransfer_base_serv(orb,ior,rank),
40   SALOME::GenericObj_serv(orb,ior,rank),
41   SALOME::GenericObj_base_serv(orb,ior,rank),
42   Engines::Parallel_Salome_file_base_serv(orb,ior,rank)
43 {
44   CORBA::Object_ptr obj = _orb->string_to_object(ior);
45   proxy = Engines::Parallel_Salome_file::_narrow(obj);
46   parallel_file = NULL;
47 }
48
49 Parallel_Salome_file_i::~Parallel_Salome_file_i() {}
50
51 void 
52 Parallel_Salome_file_i::load(const char* hdf5_file) {
53   MESSAGE("Parallel_Salome_file_i::load : NOT YET IMPLEMENTED");
54   SALOME::ExceptionStruct es;
55   es.type = SALOME::INTERNAL_ERROR;
56   es.text = "Parallel_Salome_file_i::load : NOT YET IMPLEMENTED";
57   throw SALOME::SALOME_Exception(es);
58 }
59
60 void 
61 Parallel_Salome_file_i::save(const char* hdf5_file) {
62   MESSAGE("Parallel_Salome_file_i::save : NOT YET IMPLEMENTED");
63   SALOME::ExceptionStruct es;
64   es.type = SALOME::INTERNAL_ERROR;
65   es.text = "Parallel_Salome_file_i::save : NOT YET IMPLEMENTED";
66   throw SALOME::SALOME_Exception(es);
67 }
68
69 void 
70 Parallel_Salome_file_i::save_all(const char* hdf5_file) {
71   MESSAGE("Parallel_Salome_file_i::save_all : NOT YET IMPLEMENTED");
72   SALOME::ExceptionStruct es;
73   es.type = SALOME::INTERNAL_ERROR;
74   es.text = "Parallel_Salome_file_i::save_all : NOT YET IMPLEMENTED";
75   throw SALOME::SALOME_Exception(es);
76 }
77
78 void 
79 Parallel_Salome_file_i::connect(Engines::Salome_file_ptr source_Salome_file) {
80   // only one file managed case 
81   Salome_file_i::connect(source_Salome_file);
82
83   // Test if the file is managed in an another node
84   // If yes, node is updated
85   _t_fileManaged::iterator begin = _fileManaged.begin();
86   _t_fileManaged::iterator end = _fileManaged.end();
87   for(;begin!=end;begin++) {
88     std::string file_name = begin->first;
89     if (_fileManaged[file_name].node > 0 && getMyRank() == 0) {
90       if (parallel_file == NULL)
91         parallel_file = Engines::PaCO_Parallel_Salome_file::PaCO_narrow(proxy, _orb);
92       parallel_file->connect(source_Salome_file, _fileManaged[file_name].node);
93     }
94   }
95 }
96
97 void 
98 Parallel_Salome_file_i::connectDistributedFile(const char * file_name,
99                                                Engines::Salome_file_ptr source_Salome_file) {
100   Salome_file_i::connectDistributedFile(file_name, source_Salome_file);
101
102   // Test if the file is managed in an another node
103   // If yes, node is updated
104   std::string fname(file_name);
105   if (_fileManaged[fname].node > 0 && getMyRank() == 0) {
106     if (parallel_file == NULL)
107       parallel_file = Engines::PaCO_Parallel_Salome_file::PaCO_narrow(proxy, _orb);
108     parallel_file->connectDistributedFile(file_name, source_Salome_file, _fileManaged[fname].node);
109   }
110 }
111
112 void 
113 Parallel_Salome_file_i::setDistributedSourceFile(const char* file_name,
114                                                  const char * source_file_name) {
115   Salome_file_i::setDistributedSourceFile(file_name, source_file_name);
116   // Test if the file is managed in an another node
117   // If yes, node is updated
118   std::string fname(file_name);
119   if (_fileManaged[fname].node > 0 && getMyRank() == 0) {
120     if (parallel_file == NULL)
121       parallel_file = Engines::PaCO_Parallel_Salome_file::PaCO_narrow(proxy, _orb);
122     parallel_file->setDistributedSourceFile(file_name, source_file_name, _fileManaged[fname].node);
123   }
124 }
125
126 void
127 Parallel_Salome_file_i::recvFiles() {
128   if (parallel_file == NULL)
129     parallel_file = Engines::PaCO_Parallel_Salome_file::PaCO_narrow(proxy, _orb);
130
131   std::string files_not_ok("");
132   int total = getTotalNode();
133   for (int i =0; i<total; i++) {
134     try {
135      parallel_file->recvFiles_node(i);
136     }
137     catch (SALOME::SALOME_Exception & ex) {
138       files_not_ok = files_not_ok + std::string(ex.details.text.in());
139     }
140   }
141
142   if (files_not_ok != "")
143   {
144     SALOME::ExceptionStruct es;
145     es.type = SALOME::INTERNAL_ERROR;
146     std::string text = "files not ready : " + files_not_ok;
147     es.text = CORBA::string_dup(text.c_str());
148     throw SALOME::SALOME_Exception(es);
149   }
150   else
151   {
152     // We change the state of the Salome_file
153     _state.files_ok = true;
154   }
155 }
156
157 void 
158 Parallel_Salome_file_i::recvFiles_node() {
159
160   std::string files_not_ok("");
161   _t_fileManaged::iterator begin = _fileManaged.begin();
162   _t_fileManaged::iterator end = _fileManaged.end();
163   for(;begin!=end;begin++) 
164   {
165     bool result = true;
166     Engines::file file_infos = begin->second;
167     if (file_infos.node == getMyRank()) {
168       // Test if the file is local or distributed
169       if (std::string(file_infos.type.in()) == "local")
170       {
171         if (std::string(file_infos.status.in()) == "not_ok")
172           result = checkLocalFile(file_infos.file_name.in());
173       }
174       else
175       {
176         if (std::string(file_infos.status.in()) == "not_ok") {
177           // 2 cases :
178           // Source file is a Salome_file
179           // Source file is a Parallel_Salome_file
180           PaCO::InterfaceManager_var interface_manager = 
181             PaCO::InterfaceManager::_narrow(_fileDistributedSource[file_infos.file_name.in()]);
182           if (CORBA::is_nil(interface_manager))
183             result = getDistributedFile(file_infos.file_name.in());
184           else
185             result = getParallelDistributedFile(file_infos.file_name.in());
186         }
187       }
188       // if the result is false
189       // we add this file to files_not_ok
190       if (!result) 
191       {
192         files_not_ok.append(" ");
193         files_not_ok.append(file_infos.file_name.in());
194       }
195     }
196   }
197   if (files_not_ok != "")
198   {
199     SALOME::ExceptionStruct es;
200     es.type = SALOME::INTERNAL_ERROR;
201     std::string text = files_not_ok;
202     es.text = CORBA::string_dup(text.c_str());
203     throw SALOME::SALOME_Exception(es);
204   }
205 }
206
207 bool 
208 Parallel_Salome_file_i::getParallelDistributedFile(std::string file_name) {
209
210   bool result = true;
211   const char * source_file_name = _fileManaged[file_name].source_file_name.in();
212   int fileId;
213   FILE* fp;
214   std::string comp_file_name(_fileManaged[file_name].path.in());
215   comp_file_name.append("/");
216   comp_file_name.append(_fileManaged[file_name].file_name.in());
217
218   // Test if the process can write on disk
219   if ((fp = fopen(comp_file_name.c_str(),"wb")) == NULL)
220   {
221     INFOS("file " << comp_file_name << " cannot be open for writing");
222     _fileManaged[file_name].status = CORBA::string_dup("not_ok");
223     result = false;
224     return result;
225   }
226
227   Engines::PaCO_Parallel_Salome_file * parallel_source_file = 
228     Engines::PaCO_Parallel_Salome_file::PaCO_narrow(_fileDistributedSource[file_name], _orb);
229
230   int node = parallel_source_file->getFileNode(source_file_name);
231
232   try 
233   {
234     fileId = parallel_source_file->open(source_file_name, node);
235   }
236   catch (...) 
237   {
238     _fileManaged[file_name].status = CORBA::string_dup("not_ok");
239     fclose(fp);
240     result = false;
241     return result;
242   }
243
244   if (fileId > 0)
245   {
246     Engines::fileBlock* aBlock;
247     int toFollow = 1;
248     int ctr=0;
249     MESSAGE("begin of transfer of " << comp_file_name);
250     while (toFollow)
251     {
252       ctr++;
253       aBlock = parallel_source_file->getBlock(fileId, node);
254       toFollow = aBlock->length();
255       CORBA::Octet *buf = aBlock->get_buffer();
256       int nbWri = fwrite(buf, sizeof(CORBA::Octet), toFollow, fp);
257       delete aBlock;
258       ASSERT(nbWri == toFollow);
259     }
260     fclose(fp);
261     MESSAGE("end of transfer of " << comp_file_name);
262     parallel_source_file->close(fileId, node);
263   }
264   else
265   {
266     INFOS("open reference file for copy impossible");
267     result = false;
268     fclose(fp);
269     _fileManaged[file_name].status = CORBA::string_dup("not_ok");
270     return result;
271   }
272
273   _fileManaged[file_name].status = CORBA::string_dup("ok");
274   return result;
275 }
276
277 void 
278 Parallel_Salome_file_i::setContainer(Engines::Container_ptr container) {
279   _container = Engines::Container::_duplicate(container);
280
281   // Update All the files managed by the node
282   _t_fileManaged::iterator begin = _fileManaged.begin();
283   _t_fileManaged::iterator end = _fileManaged.end();
284   for(;begin!=end;begin++) {
285     begin->second.container = Engines::Container::_duplicate(container);
286   }
287 }
288
289 void 
290 Parallel_Salome_file_i::setFileNode(const char* file_name, CORBA::Long node) {
291   
292   // Test if this file is managed
293   std::string fname(file_name);
294   _t_fileManaged::iterator it = _fileManaged.find(fname);
295   if (it == _fileManaged.end()) 
296   {
297     SALOME::ExceptionStruct es;
298     es.type = SALOME::INTERNAL_ERROR;
299     es.text = "file is not managed";
300     throw SALOME::SALOME_Exception(es);
301   }
302
303   // Update file infos into this node (node 0)
304   // and into the node that actually managed it
305   _fileManaged[fname].node = node;
306
307   if (node > 0) {
308     if (parallel_file == NULL)
309       parallel_file = Engines::PaCO_Parallel_Salome_file::PaCO_narrow(proxy, _orb);
310
311     Engines::Container_ptr cont = parallel_file->updateFile(_fileManaged[fname], node);
312     parallel_file->connectDistributedFile(fname.c_str(),
313                                           _fileDistributedSource[fname],
314                                           node);
315
316     // Update file infos with the new reference of the container
317     _fileManaged[fname].container = Engines::Container::_duplicate(cont);
318   }
319 }
320
321 Engines::Container_ptr
322 Parallel_Salome_file_i::updateFile(const Engines::file& file) {
323   // Copy file
324   Engines::file new_file_infos(file);
325
326   // Adding it to node list
327   new_file_infos.container = Engines::Container::_duplicate(_container);
328   std::string fname(new_file_infos.file_name.in());
329   _fileManaged[fname] = new_file_infos;
330
331   // Return the new reference of the container associated to the file
332   return Engines::Container::_duplicate(_container);
333 }
334
335 CORBA::Long 
336 Parallel_Salome_file_i::getFileNode(const char* file_name) {
337   
338   // Test if this file is managed
339   std::string fname(file_name);
340   if (fname == "") {
341     // We enter in the simple case where the user
342     // has not used setDistributedSourceFile.
343     // In this case we try to see if the Salome_file
344     if (_fileManaged.size() == 1) 
345     {
346       // only one file managed 
347       _t_fileManaged::iterator it = _fileManaged.begin();
348       fname = it->first;
349     }
350     else
351     {
352       SALOME::ExceptionStruct es;
353       es.type = SALOME::INTERNAL_ERROR;
354       es.text = "Error : there is more than one file that is managed";
355       throw SALOME::SALOME_Exception(es);
356     }
357   }
358   _t_fileManaged::iterator it = _fileManaged.find(fname);
359   if (it == _fileManaged.end()) 
360   {
361     SALOME::ExceptionStruct es;
362     es.type = SALOME::INTERNAL_ERROR;
363     es.text = "file is not managed";
364     throw SALOME::SALOME_Exception(es);
365   }
366
367   return _fileManaged[fname].node;
368 }