Salome HOME
merge from BR_V51_AR 7 may 09
[modules/kernel.git] / src / ParallelContainer / SALOME_ParallelContainerProxy_i.cxx
1 //  Copyright (C) 2007-2008  CEA/DEN, EDF R&D, OPEN CASCADE
2 //
3 //  Copyright (C) 2003-2007  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 //  CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
5 //
6 //  This library is free software; you can redistribute it and/or
7 //  modify it under the terms of the GNU Lesser General Public
8 //  License as published by the Free Software Foundation; either
9 //  version 2.1 of the License.
10 //
11 //  This library is distributed in the hope that it will be useful,
12 //  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 //  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 //  Lesser General Public License for more details.
15 //
16 //  You should have received a copy of the GNU Lesser General Public
17 //  License along with this library; if not, write to the Free Software
18 //  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
19 //
20 //  See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 //
22 //  SALOME_ParallelContainerProxy : implementation of container and engine for Parallel Kernel
23 //  File   : SALOME_ParallelContainerProxy_i.cxx
24 //  Author : André RIBES, EDF
25
26 #include "SALOME_ParallelContainerProxy_i.hxx"
27
28 Container_proxy_impl_final::Container_proxy_impl_final(CORBA::ORB_ptr orb, 
29                                                        paco_fabrique_thread * fab_thread, 
30                                                        PortableServer::POA_ptr poa,
31                                                        std::string containerName,
32                                                        bool is_a_return_proxy) :
33   Engines::PACO_Container_proxy_impl(orb, fab_thread, is_a_return_proxy),
34   Engines::Container_proxy_impl(orb, fab_thread, is_a_return_proxy),
35   InterfaceManager_impl(orb, fab_thread, is_a_return_proxy)
36 {
37   _numInstance = 0;
38   _hostname = Kernel_Utils::GetHostname();
39   _containerName = _NS->BuildContainerNameForNS(containerName.c_str(), _hostname.c_str());
40   _poa = PortableServer::POA::_duplicate(poa);
41
42   _fab_thread = fab_thread;
43
44   // Add CORBA object to the poa
45   _id = _poa->activate_object(this);
46   this->_remove_ref();
47
48   // Init SALOME Naming Service
49   _NS = new SALOME_NamingService();
50   _NS->init_orb(_orb);
51
52   // Init Python container part
53   CORBA::Object_var container_node = _poa->id_to_reference(*_id);
54   CORBA::String_var sior =  _orb->object_to_string(container_node);
55   std::string myCommand="pyCont = SALOME_Container.SALOME_Container_i('";
56   myCommand += _containerName + "','";
57   myCommand += sior;
58   myCommand += "')\n";
59   Py_ACQUIRE_NEW_THREAD;
60   PyRun_SimpleString("import SALOME_Container\n");
61   PyRun_SimpleString((char*)myCommand.c_str());
62   Py_RELEASE_NEW_THREAD;
63 }
64
65 Container_proxy_impl_final:: ~Container_proxy_impl_final() {
66   if (_id)
67     delete _id;
68   if (_NS)
69     delete _NS;
70
71   // _fab_thread not deleted beacause fab_thread is managed
72   // by paco_fabrique_manager
73 }
74
75 void
76 Container_proxy_impl_final::Shutdown()
77 {
78   // We Start by destroying all the parallel object
79   std::list<Container_proxy_impl_final::proxy_object>::iterator itm;
80   for (itm = _par_obj_inst_list.begin(); itm != _par_obj_inst_list.end(); itm++)
81   {
82     try
83     {
84       ((*itm).proxy_corba_ref)->destroy();
85     }
86     catch(const CORBA::Exception& e)
87     {
88       // ignore this entry and continue
89     }
90     catch(...)
91     {
92       // ignore this entry and continue
93     }
94
95     // Destroy proxy object... parallel object nodes are
96     // destroyed into the Shutdown of each container nodes
97     _poa->deactivate_object(*((*itm).proxy_id));
98     if ((*itm).proxy_id)
99       delete (*itm).proxy_id;
100     if ((*itm).proxy_regist)
101       delete (*itm).proxy_regist;
102   }
103
104   // We call shutdown in each node
105   for (CORBA::ULong i = 0; i < _infos.nodes.length(); i++)
106   {
107     MESSAGE("Shutdown work node : " << i);
108     CORBA::Object_var object = _orb->string_to_object(_infos.nodes[i]);
109     Engines::Container_var node = Engines::Container::_narrow(object);
110     if (!CORBA::is_nil(node))
111     {
112       try 
113       {
114         node->Shutdown();
115         MESSAGE("Shutdown done node : " << i);
116       }
117       catch (...)
118       {
119         INFOS("Exception catch during Shutdown of node : " << i);
120       }
121     }
122     else
123     {
124       INFOS("Cannot shutdown node " << i << " ref is nil !");
125     }
126   }
127
128   INFOS("Shutdown Parallel Proxy");
129   _NS->Destroy_FullDirectory(_containerName.c_str());
130   _NS->Destroy_Name(_containerName.c_str());
131   if(!CORBA::is_nil(_orb))
132     _orb->shutdown(0);
133 }
134
135 // On intercepte cette méthode pour pouvoir ensuite
136 // déterminer si on doit créer une instance sequentielle
137 // ou parallèle d'un composant dans la méthode create_component_instance
138 CORBA::Boolean 
139 Container_proxy_impl_final::load_component_Library(const char* componentName)
140 {
141   MESSAGE("Begin of load_component_Library on proxy : " << componentName)
142   std::string aCompName = componentName;
143
144   CORBA::Boolean ret = true;
145   if (_libtype_map.count(aCompName) == 0)
146   {
147     _numInstanceMutex.lock(); // lock to be alone
148
149     // Default lib is seq
150     _libtype_map[aCompName] = "seq";
151
152     // --- try dlopen C++ component
153     // If is not a C++ or failed then is maybe 
154     // a seq component...
155
156     MESSAGE("Try to load C++ component");
157 #ifndef WIN32
158     std::string impl_name = string ("lib") + aCompName + string("Engine.so");
159 #else
160     std::string impl_name = aCompName + string("Engine.dll");
161 #endif
162     void* handle;
163 #ifndef WIN32
164     handle = dlopen( impl_name.c_str() , RTLD_LAZY ) ;
165 #else
166     handle = dlopen( impl_name.c_str() , 0 ) ;
167 #endif
168     if ( handle )
169     {
170       _library_map[impl_name] = handle;
171       MESSAGE("Library " << impl_name << " loaded");
172
173       //Test if lib could contain a parallel component
174
175       std::string paco_test_fct_signature = aCompName + std::string("_isAPACO_Component");
176       INFOS("SIG is : " << paco_test_fct_signature);
177       PACO_TEST_FUNCTION paco_test_fct = NULL;
178 #ifndef WIN32
179       paco_test_fct = (PACO_TEST_FUNCTION)dlsym(handle, paco_test_fct_signature.c_str());
180 #else
181       paco_test_fct = (PACO_TEST_FUNCTION)GetProcAddress((HINSTANCE)handle, paco_test_fct_signature.c_str());
182 #endif
183       if (paco_test_fct)
184       {
185         // PaCO Component found
186         MESSAGE("PACO LIB FOUND");
187         _libtype_map[aCompName] = "par";
188       }
189       else
190       {
191         MESSAGE("SEQ LIB FOUND");
192 #ifndef WIN32
193         MESSAGE("dlerror() result is : " << dlerror());
194 #endif
195       }
196     }
197     else
198     {
199       MESSAGE("Error in importing Cpp component : " << impl_name);
200 #ifndef WIN32
201       MESSAGE("dlerror() result is : " << dlerror());
202 #endif
203
204       MESSAGE("Try to import Python component "<<componentName);
205       Py_ACQUIRE_NEW_THREAD;
206       PyObject *mainmod = PyImport_AddModule("__main__");
207       PyObject *globals = PyModule_GetDict(mainmod);
208       PyObject *pyCont = PyDict_GetItemString(globals, "pyCont");
209       PyObject *result = PyObject_CallMethod(pyCont,
210                                              (char*)"import_component",
211                                              (char*)"s",componentName);
212       int ret_p= PyInt_AsLong(result);
213       Py_XDECREF(result);
214       Py_RELEASE_NEW_THREAD;
215
216       if (ret_p) // import possible: Python component
217       {
218         MESSAGE("import Python: " << aCompName <<" OK");
219       }
220       else
221       {
222         MESSAGE("Error in importing Python component : " << aCompName);
223         ret = false;
224       }
225     }
226     _numInstanceMutex.unlock();
227   }
228
229   // Call load_component_Library in each node
230   if (ret)
231   {
232     for (CORBA::ULong i = 0; i < _infos.nodes.length(); i++)
233     {
234       MESSAGE("Call load_component_Library work node : " << i);
235       CORBA::Object_var object = _orb->string_to_object(_infos.nodes[i]);
236       Engines::Container_var node = Engines::Container::_narrow(object);
237       if (!CORBA::is_nil(node))
238       {
239         try 
240         {
241           node->load_component_Library(componentName);
242           MESSAGE("Call load_component_Library done node : " << i);
243         }
244         catch (...)
245         {
246           INFOS("Exception catch during load_component_Library of node : " << i);
247           ret = false;
248         }
249       }
250       else
251       {
252         INFOS("Cannot call load_component_Library node " << i << " ref is nil !");
253         ret = false;
254       }
255     }
256   }
257
258   // If ret is false -> lib is not loaded !
259   if (!ret)
260   {
261     INFOS("Cannot call load_component_Library " << aCompName);
262     _libtype_map.erase(aCompName);
263   }
264   return ret;
265 }
266
267 // Il y a deux cas :
268 // Composant sequentiel -> on le créer sur le noeud 0 (on pourrait faire une répartition de charge)
269 // Composant parallèle -> création du proxy ici puis appel de la création de chaque objet participant
270 // au composant parallèle
271 Engines::Component_ptr 
272 Container_proxy_impl_final::create_component_instance(const char* componentName, ::CORBA::Long studyId)
273 {
274   std::string aCompName = componentName;
275   if (_libtype_map.count(aCompName) == 0)
276   {
277     // Component is not loaded !
278     INFOS("Proxy: component is not loaded ! : " << aCompName);
279     return Engines::Component::_nil();
280   }
281
282   // If it is a sequential component
283   if (_libtype_map[aCompName] == "seq")
284   {
285     _numInstanceMutex.lock(); // lock on the instance number
286     _numInstance++;
287     _numInstanceMutex.unlock();
288     Engines::PACO_Container_proxy_impl::updateInstanceNumber();
289     return Engines::Container_proxy_impl::create_component_instance(componentName, studyId);
290   }
291
292   // Parallel Component !
293   Engines::Component_var component_proxy = Engines::Component::_nil();
294
295   // On commence par créer le proxy
296 #ifndef WIN32
297   std::string impl_name = string ("lib") + aCompName + string("Engine.so");
298 #else
299   std::string impl_name = aCompName + string("Engine.dll");
300 #endif
301   void* handle = _library_map[impl_name];
302   std::string factory_name = aCompName + std::string("EngineProxy_factory");
303
304   MESSAGE("Creating component proxy : " << factory_name);
305   FACTORY_FUNCTION component_proxy_factory = (FACTORY_FUNCTION) dlsym(handle, factory_name.c_str());
306
307   if (!component_proxy_factory)
308   {
309     INFOS("Can't resolve symbol: " + factory_name);
310 #ifndef WIN32
311     INFOS("dlerror() result is : " << dlerror());
312 #endif
313     return Engines::Component::_nil() ;
314   }
315   try {
316     _numInstanceMutex.lock() ; // lock on the instance number
317     _numInstance++ ;
318     int numInstance = _numInstance ;
319     _numInstanceMutex.unlock() ;
320
321     char aNumI[12];
322     sprintf( aNumI , "%d" , numInstance ) ;
323     string instanceName = aCompName + "_inst_" + aNumI ;
324     string component_registerName = _containerName + "/" + instanceName;
325
326     // --- Instanciate required CORBA object
327     Container_proxy_impl_final::proxy_object * proxy = new Container_proxy_impl_final::proxy_object();
328     
329     proxy->proxy_id = (component_proxy_factory) (_orb, 
330                                                  _fab_thread,
331                                                  _poa, 
332                                                  _id,
333                                                  &(proxy->proxy_regist),
334                                                  instanceName.c_str(), 
335                                                  _parallel_object_topology.total);
336
337     // --- get reference & servant from id
338     CORBA::Object_var obj = _poa->id_to_reference(*(proxy->proxy_id));
339     component_proxy = Engines::Component::_narrow(obj);
340     proxy->proxy_corba_ref = component_proxy;
341
342     if (!CORBA::is_nil(component_proxy))
343     {
344       _cntInstances_map[impl_name] += 1;
345       _par_obj_inst_list.push_back(*proxy);
346       delete proxy;
347
348       // --- register the engine under the name
349       //     containerName(.dir)/instanceName(.object)
350       _NS->Register(component_proxy , component_registerName.c_str()) ;
351       MESSAGE(component_registerName.c_str() << " bound" ) ;
352     }
353     else
354     {
355       INFOS("The factory returns a nil object !");
356       return Engines::Component::_nil();
357     }
358       
359   }
360   catch (...)
361   {
362     INFOS( "Exception catched in Proxy creation" );
363     return Engines::Component::_nil();
364   }
365
366   // Create on each node a work node
367   for (CORBA::ULong i = 0; i < _infos.nodes.length(); i++)
368   {
369     MESSAGE("Call create_paco_component_node_instance on work node : " << i);
370     CORBA::Object_var object = _orb->string_to_object(_infos.nodes[i]);
371     Engines::PACO_Container_var node = Engines::PACO_Container::_narrow(object);
372     if (!CORBA::is_nil(node))
373     {
374       try 
375       {
376         node->create_paco_component_node_instance(componentName, _containerName.c_str(), studyId);
377         MESSAGE("Call create_paco_component_node_instance done on node : " << i);
378       }
379       catch (SALOME::SALOME_Exception & ex)
380       {
381         INFOS("SALOME_EXCEPTION : " << ex.details.text);
382         return Engines::Component::_nil();
383       }
384       catch (...)
385       {
386         INFOS("Unknown Exception catch during create_paco_component_node_instance on node : " << i);
387         return Engines::Component::_nil();
388       }
389     }
390     else
391     {
392       INFOS("Cannot call create_paco_component_node_instance on node " << i << " ref is nil !");
393       return Engines::Component::_nil();
394     }
395   }
396
397   // Start Parallel object
398   PaCO::InterfaceManager_var paco_proxy = PaCO::InterfaceManager::_narrow(component_proxy);
399   paco_proxy->start();
400
401   return component_proxy;
402 }