Salome HOME
0021165: [CEA] Unit test crashes salome session
[modules/kernel.git] / src / ParallelContainer / Parallel_Salome_file_i.cxx
1 //  Copyright (C) 2007-2010  CEA/DEN, EDF R&D, OPEN CASCADE
2 //
3 //  Copyright (C) 2003-2007  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 //  CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
5 //
6 //  This library is free software; you can redistribute it and/or
7 //  modify it under the terms of the GNU Lesser General Public
8 //  License as published by the Free Software Foundation; either
9 //  version 2.1 of the License.
10 //
11 //  This library is distributed in the hope that it will be useful,
12 //  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 //  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 //  Lesser General Public License for more details.
15 //
16 //  You should have received a copy of the GNU Lesser General Public
17 //  License along with this library; if not, write to the Free Software
18 //  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
19 //
20 //  See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 //
22
23 //  File   : Parallel_Salome_file_i.cxx
24 //  Author : AndrĂ© RIBES, EDF
25 //  Module : SALOME
26 //  $Header: 
27 //
28 #include "Parallel_Salome_file_i.hxx"
29 #include "utilities.h"
30
31 Parallel_Salome_file_i::Parallel_Salome_file_i(CORBA::ORB_ptr orb, 
32                                                const char * ior,
33                                                int rank) :
34   InterfaceParallel_impl(orb,ior,rank), 
35   Engines::Salome_file_serv(orb,ior,rank),
36   Engines::Salome_file_base_serv(orb,ior,rank),
37   Engines::fileTransfer_serv(orb,ior,rank),
38   Engines::Parallel_Salome_file_serv(orb,ior,rank),
39   Engines::fileTransfer_base_serv(orb,ior,rank),
40   Engines::Parallel_Salome_file_base_serv(orb,ior,rank)
41 {
42   CORBA::Object_ptr obj = _orb->string_to_object(ior);
43   proxy = Engines::Parallel_Salome_file::_narrow(obj);
44   parallel_file = NULL;
45 }
46
47 Parallel_Salome_file_i::~Parallel_Salome_file_i() {}
48
49 void 
50 Parallel_Salome_file_i::load(const char* hdf5_file) {
51   MESSAGE("Parallel_Salome_file_i::load : NOT YET IMPLEMENTED");
52   SALOME::ExceptionStruct es;
53   es.type = SALOME::INTERNAL_ERROR;
54   es.text = "Parallel_Salome_file_i::load : NOT YET IMPLEMENTED";
55   throw SALOME::SALOME_Exception(es);
56 }
57
58 void 
59 Parallel_Salome_file_i::save(const char* hdf5_file) {
60   MESSAGE("Parallel_Salome_file_i::save : NOT YET IMPLEMENTED");
61   SALOME::ExceptionStruct es;
62   es.type = SALOME::INTERNAL_ERROR;
63   es.text = "Parallel_Salome_file_i::save : NOT YET IMPLEMENTED";
64   throw SALOME::SALOME_Exception(es);
65 }
66
67 void 
68 Parallel_Salome_file_i::save_all(const char* hdf5_file) {
69   MESSAGE("Parallel_Salome_file_i::save_all : NOT YET IMPLEMENTED");
70   SALOME::ExceptionStruct es;
71   es.type = SALOME::INTERNAL_ERROR;
72   es.text = "Parallel_Salome_file_i::save_all : NOT YET IMPLEMENTED";
73   throw SALOME::SALOME_Exception(es);
74 }
75
76 void 
77 Parallel_Salome_file_i::connect(Engines::Salome_file_ptr source_Salome_file) {
78   // only one file managed case 
79   Salome_file_i::connect(source_Salome_file);
80
81   // Test if the file is managed in an another node
82   // If yes, node is updated
83   _t_fileManaged::iterator begin = _fileManaged.begin();
84   _t_fileManaged::iterator end = _fileManaged.end();
85   for(;begin!=end;begin++) {
86     std::string file_name = begin->first;
87     if (_fileManaged[file_name].node > 0 && getMyRank() == 0) {
88       if (parallel_file == NULL)
89         parallel_file = Engines::PaCO_Parallel_Salome_file::PaCO_narrow(proxy, _orb);
90       parallel_file->connect(source_Salome_file, _fileManaged[file_name].node);
91     }
92   }
93 }
94
95 void 
96 Parallel_Salome_file_i::connectDistributedFile(const char * file_name,
97                                                Engines::Salome_file_ptr source_Salome_file) {
98   Salome_file_i::connectDistributedFile(file_name, source_Salome_file);
99
100   // Test if the file is managed in an another node
101   // If yes, node is updated
102   std::string fname(file_name);
103   if (_fileManaged[fname].node > 0 && getMyRank() == 0) {
104     if (parallel_file == NULL)
105       parallel_file = Engines::PaCO_Parallel_Salome_file::PaCO_narrow(proxy, _orb);
106     parallel_file->connectDistributedFile(file_name, source_Salome_file, _fileManaged[fname].node);
107   }
108 }
109
110 void 
111 Parallel_Salome_file_i::setDistributedSourceFile(const char* file_name,
112                                                  const char * source_file_name) {
113   Salome_file_i::setDistributedSourceFile(file_name, source_file_name);
114   // Test if the file is managed in an another node
115   // If yes, node is updated
116   std::string fname(file_name);
117   if (_fileManaged[fname].node > 0 && getMyRank() == 0) {
118     if (parallel_file == NULL)
119       parallel_file = Engines::PaCO_Parallel_Salome_file::PaCO_narrow(proxy, _orb);
120     parallel_file->setDistributedSourceFile(file_name, source_file_name, _fileManaged[fname].node);
121   }
122 }
123
124 void
125 Parallel_Salome_file_i::recvFiles() {
126   if (parallel_file == NULL)
127     parallel_file = Engines::PaCO_Parallel_Salome_file::PaCO_narrow(proxy, _orb);
128
129   std::string files_not_ok("");
130   int total = getTotalNode();
131   for (int i =0; i<total; i++) {
132     try {
133      parallel_file->recvFiles_node(i);
134     }
135     catch (SALOME::SALOME_Exception & ex) {
136       files_not_ok = files_not_ok + std::string(ex.details.text.in());
137     }
138   }
139
140   if (files_not_ok != "")
141   {
142     SALOME::ExceptionStruct es;
143     es.type = SALOME::INTERNAL_ERROR;
144     std::string text = "files not ready : " + files_not_ok;
145     es.text = CORBA::string_dup(text.c_str());
146     throw SALOME::SALOME_Exception(es);
147   }
148   else
149   {
150     // We change the state of the Salome_file
151     _state.files_ok = true;
152   }
153 }
154
155 void 
156 Parallel_Salome_file_i::recvFiles_node() {
157
158   std::string files_not_ok("");
159   _t_fileManaged::iterator begin = _fileManaged.begin();
160   _t_fileManaged::iterator end = _fileManaged.end();
161   for(;begin!=end;begin++) 
162   {
163     bool result = true;
164     Engines::file file_infos = begin->second;
165     if (file_infos.node == getMyRank()) {
166       // Test if the file is local or distributed
167       if (std::string(file_infos.type.in()) == "local")
168       {
169         if (std::string(file_infos.status.in()) == "not_ok")
170           result = checkLocalFile(file_infos.file_name.in());
171       }
172       else
173       {
174         if (std::string(file_infos.status.in()) == "not_ok") {
175           // 2 cases :
176           // Source file is a Salome_file
177           // Source file is a Parallel_Salome_file
178           PaCO::InterfaceManager_var interface_manager = 
179             PaCO::InterfaceManager::_narrow(_fileDistributedSource[file_infos.file_name.in()]);
180           if (CORBA::is_nil(interface_manager))
181             result = getDistributedFile(file_infos.file_name.in());
182           else
183             result = getParallelDistributedFile(file_infos.file_name.in());
184         }
185       }
186       // if the result is false
187       // we add this file to files_not_ok
188       if (!result) 
189       {
190         files_not_ok.append(" ");
191         files_not_ok.append(file_infos.file_name.in());
192       }
193     }
194   }
195   if (files_not_ok != "")
196   {
197     SALOME::ExceptionStruct es;
198     es.type = SALOME::INTERNAL_ERROR;
199     std::string text = files_not_ok;
200     es.text = CORBA::string_dup(text.c_str());
201     throw SALOME::SALOME_Exception(es);
202   }
203 }
204
205 bool 
206 Parallel_Salome_file_i::getParallelDistributedFile(std::string file_name) {
207
208   bool result = true;
209   const char * source_file_name = _fileManaged[file_name].source_file_name.in();
210   int fileId;
211   FILE* fp;
212   std::string comp_file_name(_fileManaged[file_name].path.in());
213   comp_file_name.append("/");
214   comp_file_name.append(_fileManaged[file_name].file_name.in());
215
216   // Test if the process can write on disk
217   if ((fp = fopen(comp_file_name.c_str(),"wb")) == NULL)
218   {
219     INFOS("file " << comp_file_name << " cannot be open for writing");
220     _fileManaged[file_name].status = CORBA::string_dup("not_ok");
221     result = false;
222     return result;
223   }
224
225   Engines::PaCO_Parallel_Salome_file * parallel_source_file = 
226     Engines::PaCO_Parallel_Salome_file::PaCO_narrow(_fileDistributedSource[file_name], _orb);
227
228   int node = parallel_source_file->getFileNode(source_file_name);
229
230   try 
231   {
232     fileId = parallel_source_file->open(source_file_name, node);
233   }
234   catch (...) 
235   {
236     _fileManaged[file_name].status = CORBA::string_dup("not_ok");
237     fclose(fp);
238     result = false;
239     return result;
240   }
241
242   if (fileId > 0)
243   {
244     Engines::fileBlock* aBlock;
245     int toFollow = 1;
246     int ctr=0;
247     MESSAGE("begin of transfer of " << comp_file_name);
248     while (toFollow)
249     {
250       ctr++;
251       aBlock = parallel_source_file->getBlock(fileId, node);
252       toFollow = aBlock->length();
253       CORBA::Octet *buf = aBlock->get_buffer();
254       int nbWri = fwrite(buf, sizeof(CORBA::Octet), toFollow, fp);
255       delete aBlock;
256       ASSERT(nbWri == toFollow);
257     }
258     fclose(fp);
259     MESSAGE("end of transfer of " << comp_file_name);
260     parallel_source_file->close(fileId, node);
261   }
262   else
263   {
264     INFOS("open reference file for copy impossible");
265     result = false;
266     fclose(fp);
267     _fileManaged[file_name].status = CORBA::string_dup("not_ok");
268     return result;
269   }
270
271   _fileManaged[file_name].status = CORBA::string_dup("ok");
272   return result;
273 }
274
275 void 
276 Parallel_Salome_file_i::setContainer(Engines::Container_ptr container) {
277   _container = Engines::Container::_duplicate(container);
278
279   // Update All the files managed by the node
280   _t_fileManaged::iterator begin = _fileManaged.begin();
281   _t_fileManaged::iterator end = _fileManaged.end();
282   for(;begin!=end;begin++) {
283     begin->second.container = Engines::Container::_duplicate(container);
284   }
285 }
286
287 void 
288 Parallel_Salome_file_i::setFileNode(const char* file_name, CORBA::Long node) {
289   
290   // Test if this file is managed
291   std::string fname(file_name);
292   _t_fileManaged::iterator it = _fileManaged.find(fname);
293   if (it == _fileManaged.end()) 
294   {
295     SALOME::ExceptionStruct es;
296     es.type = SALOME::INTERNAL_ERROR;
297     es.text = "file is not managed";
298     throw SALOME::SALOME_Exception(es);
299   }
300
301   // Update file infos into this node (node 0)
302   // and into the node that actually managed it
303   _fileManaged[fname].node = node;
304
305   if (node > 0) {
306     if (parallel_file == NULL)
307       parallel_file = Engines::PaCO_Parallel_Salome_file::PaCO_narrow(proxy, _orb);
308
309     Engines::Container_ptr cont = parallel_file->updateFile(_fileManaged[fname], node);
310     parallel_file->connectDistributedFile(fname.c_str(),
311                                           _fileDistributedSource[fname],
312                                           node);
313
314     // Update file infos with the new reference of the container
315     _fileManaged[fname].container = Engines::Container::_duplicate(cont);
316   }
317 }
318
319 Engines::Container_ptr
320 Parallel_Salome_file_i::updateFile(const Engines::file& file) {
321   // Copy file
322   Engines::file new_file_infos(file);
323
324   // Adding it to node list
325   new_file_infos.container = Engines::Container::_duplicate(_container);
326   std::string fname(new_file_infos.file_name.in());
327   _fileManaged[fname] = new_file_infos;
328
329   // Return the new reference of the container associated to the file
330   return Engines::Container::_duplicate(_container);
331 }
332
333 CORBA::Long 
334 Parallel_Salome_file_i::getFileNode(const char* file_name) {
335   
336   // Test if this file is managed
337   std::string fname(file_name);
338   if (fname == "") {
339     // We enter in the simple case where the user
340     // has not used setDistributedSourceFile.
341     // In this case we try to see if the Salome_file
342     if (_fileManaged.size() == 1) 
343     {
344       // only one file managed 
345       _t_fileManaged::iterator it = _fileManaged.begin();
346       fname = it->first;
347     }
348     else
349     {
350       SALOME::ExceptionStruct es;
351       es.type = SALOME::INTERNAL_ERROR;
352       es.text = "Error : there is more than one file that is managed";
353       throw SALOME::SALOME_Exception(es);
354     }
355   }
356   _t_fileManaged::iterator it = _fileManaged.find(fname);
357   if (it == _fileManaged.end()) 
358   {
359     SALOME::ExceptionStruct es;
360     es.type = SALOME::INTERNAL_ERROR;
361     es.text = "file is not managed";
362     throw SALOME::SALOME_Exception(es);
363   }
364
365   return _fileManaged[fname].node;
366 }