Salome HOME
Merged from BR_V51_RB (removed Batch and added libBatch as an optional prerequisite).
[modules/kernel.git] / src / ParallelContainer / Parallel_Salome_file_i.cxx
1 //  Copyright (C) 2007-2008  CEA/DEN, EDF R&D, OPEN CASCADE
2 //
3 //  Copyright (C) 2003-2007  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 //  CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
5 //
6 //  This library is free software; you can redistribute it and/or
7 //  modify it under the terms of the GNU Lesser General Public
8 //  License as published by the Free Software Foundation; either
9 //  version 2.1 of the License.
10 //
11 //  This library is distributed in the hope that it will be useful,
12 //  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 //  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 //  Lesser General Public License for more details.
15 //
16 //  You should have received a copy of the GNU Lesser General Public
17 //  License along with this library; if not, write to the Free Software
18 //  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
19 //
20 //  See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 //
22 //  File   : Parallel_Salome_file_i.cxx
23 //  Author : AndrĂ© RIBES, EDF
24 //  Module : SALOME
25 //  $Header: 
26 //
27 #include "Parallel_Salome_file_i.hxx"
28 #include "utilities.h"
29
30 Parallel_Salome_file_i::Parallel_Salome_file_i(CORBA::ORB_ptr orb, 
31                                                const char * ior,
32                                                int rank) :
33   InterfaceParallel_impl(orb,ior,rank), 
34   Engines::Salome_file_serv(orb,ior,rank),
35   Engines::Salome_file_base_serv(orb,ior,rank),
36   Engines::fileTransfer_serv(orb,ior,rank),
37   Engines::Parallel_Salome_file_serv(orb,ior,rank),
38   Engines::fileTransfer_base_serv(orb,ior,rank),
39   Engines::Parallel_Salome_file_base_serv(orb,ior,rank)
40 {
41   CORBA::Object_ptr obj = _orb->string_to_object(ior);
42   proxy = Engines::Parallel_Salome_file::_narrow(obj);
43   parallel_file = NULL;
44 }
45
46 Parallel_Salome_file_i::~Parallel_Salome_file_i() {}
47
48 void 
49 Parallel_Salome_file_i::load(const char* hdf5_file) {
50   MESSAGE("Parallel_Salome_file_i::load : NOT YET IMPLEMENTED");
51   SALOME::ExceptionStruct es;
52   es.type = SALOME::INTERNAL_ERROR;
53   es.text = "Parallel_Salome_file_i::load : NOT YET IMPLEMENTED";
54   throw SALOME::SALOME_Exception(es);
55 }
56
57 void 
58 Parallel_Salome_file_i::save(const char* hdf5_file) {
59   MESSAGE("Parallel_Salome_file_i::save : NOT YET IMPLEMENTED");
60   SALOME::ExceptionStruct es;
61   es.type = SALOME::INTERNAL_ERROR;
62   es.text = "Parallel_Salome_file_i::save : NOT YET IMPLEMENTED";
63   throw SALOME::SALOME_Exception(es);
64 }
65
66 void 
67 Parallel_Salome_file_i::save_all(const char* hdf5_file) {
68   MESSAGE("Parallel_Salome_file_i::save_all : NOT YET IMPLEMENTED");
69   SALOME::ExceptionStruct es;
70   es.type = SALOME::INTERNAL_ERROR;
71   es.text = "Parallel_Salome_file_i::save_all : NOT YET IMPLEMENTED";
72   throw SALOME::SALOME_Exception(es);
73 }
74
75 void 
76 Parallel_Salome_file_i::connect(Engines::Salome_file_ptr source_Salome_file) {
77   // only one file managed case 
78   Salome_file_i::connect(source_Salome_file);
79
80   // Test if the file is managed in an another node
81   // If yes, node is updated
82   _t_fileManaged::iterator begin = _fileManaged.begin();
83   _t_fileManaged::iterator end = _fileManaged.end();
84   for(;begin!=end;begin++) {
85     std::string file_name = begin->first;
86     if (_fileManaged[file_name].node > 0 && getMyRank() == 0) {
87       if (parallel_file == NULL)
88         parallel_file = Engines::PaCO_Parallel_Salome_file::PaCO_narrow(proxy, _orb);
89       parallel_file->connect(source_Salome_file, _fileManaged[file_name].node);
90     }
91   }
92 }
93
94 void 
95 Parallel_Salome_file_i::connectDistributedFile(const char * file_name,
96                                                Engines::Salome_file_ptr source_Salome_file) {
97   Salome_file_i::connectDistributedFile(file_name, source_Salome_file);
98
99   // Test if the file is managed in an another node
100   // If yes, node is updated
101   std::string fname(file_name);
102   if (_fileManaged[fname].node > 0 && getMyRank() == 0) {
103     if (parallel_file == NULL)
104       parallel_file = Engines::PaCO_Parallel_Salome_file::PaCO_narrow(proxy, _orb);
105     parallel_file->connectDistributedFile(file_name, source_Salome_file, _fileManaged[fname].node);
106   }
107 }
108
109 void 
110 Parallel_Salome_file_i::setDistributedSourceFile(const char* file_name,
111                                                  const char * source_file_name) {
112   Salome_file_i::setDistributedSourceFile(file_name, source_file_name);
113   // Test if the file is managed in an another node
114   // If yes, node is updated
115   std::string fname(file_name);
116   if (_fileManaged[fname].node > 0 && getMyRank() == 0) {
117     if (parallel_file == NULL)
118       parallel_file = Engines::PaCO_Parallel_Salome_file::PaCO_narrow(proxy, _orb);
119     parallel_file->setDistributedSourceFile(file_name, source_file_name, _fileManaged[fname].node);
120   }
121 }
122
123 void
124 Parallel_Salome_file_i::recvFiles() {
125   if (parallel_file == NULL)
126     parallel_file = Engines::PaCO_Parallel_Salome_file::PaCO_narrow(proxy, _orb);
127
128   std::string files_not_ok("");
129   int total = getTotalNode();
130   for (int i =0; i<total; i++) {
131     try {
132      parallel_file->recvFiles_node(i);
133     }
134     catch (SALOME::SALOME_Exception & ex) {
135       files_not_ok = files_not_ok + std::string(ex.details.text.in());
136     }
137   }
138
139   if (files_not_ok != "")
140   {
141     SALOME::ExceptionStruct es;
142     es.type = SALOME::INTERNAL_ERROR;
143     std::string text = "files not ready : " + files_not_ok;
144     es.text = CORBA::string_dup(text.c_str());
145     throw SALOME::SALOME_Exception(es);
146   }
147   else
148   {
149     // We change the state of the Salome_file
150     _state.files_ok = true;
151   }
152 }
153
154 void 
155 Parallel_Salome_file_i::recvFiles_node() {
156
157   std::string files_not_ok("");
158   _t_fileManaged::iterator begin = _fileManaged.begin();
159   _t_fileManaged::iterator end = _fileManaged.end();
160   for(;begin!=end;begin++) 
161   {
162     bool result = true;
163     Engines::file file_infos = begin->second;
164     if (file_infos.node == getMyRank()) {
165       // Test if the file is local or distributed
166       if (std::string(file_infos.type.in()) == "local")
167       {
168         if (std::string(file_infos.status.in()) == "not_ok")
169           result = checkLocalFile(file_infos.file_name.in());
170       }
171       else
172       {
173         if (std::string(file_infos.status.in()) == "not_ok") {
174           // 2 cases :
175           // Source file is a Salome_file
176           // Source file is a Parallel_Salome_file
177           PaCO::InterfaceManager_var interface_manager = 
178             PaCO::InterfaceManager::_narrow(_fileDistributedSource[file_infos.file_name.in()]);
179           if (CORBA::is_nil(interface_manager))
180             result = getDistributedFile(file_infos.file_name.in());
181           else
182             result = getParallelDistributedFile(file_infos.file_name.in());
183         }
184       }
185       // if the result is false
186       // we add this file to files_not_ok
187       if (!result) 
188       {
189         files_not_ok.append(" ");
190         files_not_ok.append(file_infos.file_name.in());
191       }
192     }
193   }
194   if (files_not_ok != "")
195   {
196     SALOME::ExceptionStruct es;
197     es.type = SALOME::INTERNAL_ERROR;
198     std::string text = files_not_ok;
199     es.text = CORBA::string_dup(text.c_str());
200     throw SALOME::SALOME_Exception(es);
201   }
202 }
203
204 bool 
205 Parallel_Salome_file_i::getParallelDistributedFile(std::string file_name) {
206
207   bool result = true;
208   const char * source_file_name = _fileManaged[file_name].source_file_name.in();
209   int fileId;
210   FILE* fp;
211   std::string comp_file_name(_fileManaged[file_name].path.in());
212   comp_file_name.append("/");
213   comp_file_name.append(_fileManaged[file_name].file_name.in());
214
215   // Test if the process can write on disk
216   if ((fp = fopen(comp_file_name.c_str(),"wb")) == NULL)
217   {
218     INFOS("file " << comp_file_name << " cannot be open for writing");
219     _fileManaged[file_name].status = CORBA::string_dup("not_ok");
220     result = false;
221     return result;
222   }
223
224   Engines::PaCO_Parallel_Salome_file * parallel_source_file = 
225     Engines::PaCO_Parallel_Salome_file::PaCO_narrow(_fileDistributedSource[file_name], _orb);
226
227   int node = parallel_source_file->getFileNode(source_file_name);
228
229   try 
230   {
231     fileId = parallel_source_file->open(source_file_name, node);
232   }
233   catch (...) 
234   {
235     _fileManaged[file_name].status = CORBA::string_dup("not_ok");
236     fclose(fp);
237     result = false;
238     return result;
239   }
240
241   if (fileId > 0)
242   {
243     Engines::fileBlock* aBlock;
244     int toFollow = 1;
245     int ctr=0;
246     MESSAGE("begin of transfer of " << comp_file_name);
247     while (toFollow)
248     {
249       ctr++;
250       aBlock = parallel_source_file->getBlock(fileId, node);
251       toFollow = aBlock->length();
252       CORBA::Octet *buf = aBlock->get_buffer();
253       int nbWri = fwrite(buf, sizeof(CORBA::Octet), toFollow, fp);
254       delete aBlock;
255       ASSERT(nbWri == toFollow);
256     }
257     fclose(fp);
258     MESSAGE("end of transfer of " << comp_file_name);
259     parallel_source_file->close(fileId, node);
260   }
261   else
262   {
263     INFOS("open reference file for copy impossible");
264     result = false;
265     fclose(fp);
266     _fileManaged[file_name].status = CORBA::string_dup("not_ok");
267     return result;
268   }
269
270   _fileManaged[file_name].status = CORBA::string_dup("ok");
271   return result;
272 }
273
274 void 
275 Parallel_Salome_file_i::setContainer(Engines::Container_ptr container) {
276   _container = Engines::Container::_duplicate(container);
277
278   // Update All the files managed by the node
279   _t_fileManaged::iterator begin = _fileManaged.begin();
280   _t_fileManaged::iterator end = _fileManaged.end();
281   for(;begin!=end;begin++) {
282     begin->second.container = Engines::Container::_duplicate(container);
283   }
284 }
285
286 void 
287 Parallel_Salome_file_i::setFileNode(const char* file_name, CORBA::Long node) {
288   
289   // Test if this file is managed
290   std::string fname(file_name);
291   _t_fileManaged::iterator it = _fileManaged.find(fname);
292   if (it == _fileManaged.end()) 
293   {
294     SALOME::ExceptionStruct es;
295     es.type = SALOME::INTERNAL_ERROR;
296     es.text = "file is not managed";
297     throw SALOME::SALOME_Exception(es);
298   }
299
300   // Update file infos into this node (node 0)
301   // and into the node that actually managed it
302   _fileManaged[fname].node = node;
303
304   if (node > 0) {
305     if (parallel_file == NULL)
306       parallel_file = Engines::PaCO_Parallel_Salome_file::PaCO_narrow(proxy, _orb);
307
308     Engines::Container_ptr cont = parallel_file->updateFile(_fileManaged[fname], node);
309     parallel_file->connectDistributedFile(fname.c_str(),
310                                           _fileDistributedSource[fname],
311                                           node);
312
313     // Update file infos with the new reference of the container
314     _fileManaged[fname].container = Engines::Container::_duplicate(cont);
315   }
316 }
317
318 Engines::Container_ptr
319 Parallel_Salome_file_i::updateFile(const Engines::file& file) {
320   // Copy file
321   Engines::file new_file_infos(file);
322
323   // Adding it to node list
324   new_file_infos.container = Engines::Container::_duplicate(_container);
325   std::string fname(new_file_infos.file_name.in());
326   _fileManaged[fname] = new_file_infos;
327
328   // Return the new reference of the container associated to the file
329   return Engines::Container::_duplicate(_container);
330 }
331
332 CORBA::Long 
333 Parallel_Salome_file_i::getFileNode(const char* file_name) {
334   
335   // Test if this file is managed
336   std::string fname(file_name);
337   if (fname == "") {
338     // We enter in the simple case where the user
339     // has not used setDistributedSourceFile.
340     // In this case we try to see if the Salome_file
341     if (_fileManaged.size() == 1) 
342     {
343       // only one file managed 
344       _t_fileManaged::iterator it = _fileManaged.begin();
345       fname = it->first;
346     }
347     else
348     {
349       SALOME::ExceptionStruct es;
350       es.type = SALOME::INTERNAL_ERROR;
351       es.text = "Error : there is more than one file that is managed";
352       throw SALOME::SALOME_Exception(es);
353     }
354   }
355   _t_fileManaged::iterator it = _fileManaged.find(fname);
356   if (it == _fileManaged.end()) 
357   {
358     SALOME::ExceptionStruct es;
359     es.type = SALOME::INTERNAL_ERROR;
360     es.text = "file is not managed";
361     throw SALOME::SALOME_Exception(es);
362   }
363
364   return _fileManaged[fname].node;
365 }