Salome HOME
PR: merge from branch BR_UnitTests tag mergeto_trunk_17oct05
[modules/yacs.git] / src / MPIContainer / MPIContainer_i.cxx
1 //  SALOME MPIContainer : implemenation of container based on MPI libraries
2 //
3 //  Copyright (C) 2003  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 //  CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS 
5 // 
6 //  This library is free software; you can redistribute it and/or 
7 //  modify it under the terms of the GNU Lesser General Public 
8 //  License as published by the Free Software Foundation; either 
9 //  version 2.1 of the License. 
10 // 
11 //  This library is distributed in the hope that it will be useful, 
12 //  but WITHOUT ANY WARRANTY; without even the implied warranty of 
13 //  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
14 //  Lesser General Public License for more details. 
15 // 
16 //  You should have received a copy of the GNU Lesser General Public 
17 //  License along with this library; if not, write to the Free Software 
18 //  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA 
19 // 
20 //  See http://www.opencascade.org/SALOME/ or email : webmaster.salome@opencascade.org 
21 //
22 //
23 //
24 //  File   : MPIContainer_i.cxx
25 //  Module : SALOME
26
27 #include <iostream.h>
28 #include <dlfcn.h>
29 #include <stdio.h>
30 #include "SALOME_Component_i.hxx"
31 #include "MPIContainer_i.hxx"
32 #include "SALOME_NamingService.hxx"
33 #include "Utils_SINGLETON.hxx"
34 #include "OpUtil.hxx"
35 #include "utilities.h"
36 #include <Python.h>
37 #include "Container_init_python.hxx"
38 using namespace std;
39
40 // L'appel au registry SALOME ne se fait que pour le process 0
41 Engines_MPIContainer_i::Engines_MPIContainer_i(int nbproc, int numproc,
42                                                CORBA::ORB_ptr orb, 
43                                                PortableServer::POA_ptr poa,
44                                                char * containerName,
45                                                int argc, char *argv[]) 
46   : Engines_Container_i(orb,poa,containerName,argc,argv,false), MPIObject_i(nbproc,numproc)
47 {
48   MESSAGE("[" << numproc << "] activate object");
49   _id = _poa->activate_object(this);
50 //   this->_add_ref();
51
52   if(numproc==0){
53
54     _NS = new SALOME_NamingService();
55 //     _NS = SINGLETON_<SALOME_NamingService>::Instance() ;
56 //     ASSERT(SINGLETON_<SALOME_NamingService>::IsAlreadyExisting()) ;
57     _NS->init_orb( CORBA::ORB::_duplicate(_orb) ) ;
58
59 //     Engines::Container_ptr pCont 
60 //       = Engines::Container::_narrow(POA_Engines::MPIContainer::_this());
61     CORBA::Object_var obj=_poa->id_to_reference(*_id);
62     Engines::Container_var pCont = Engines::Container::_narrow(obj);
63     string hostname = GetHostname();
64     _containerName = _NS->BuildContainerNameForNS(containerName,hostname.c_str());
65     SCRUTE(_containerName);
66     _NS->Register(pCont, _containerName.c_str());
67   }
68
69   // Root recupere les ior des container des autre process
70   Engines::MPIObject_var pobj = POA_Engines::MPIContainer::_this();
71   BCastIOR(_orb,pobj,true);
72 }
73
74 Engines_MPIContainer_i::Engines_MPIContainer_i(int nbproc, int numproc) 
75   : Engines_Container_i(), MPIObject_i(nbproc,numproc)
76 {
77 }
78
79 Engines_MPIContainer_i::~Engines_MPIContainer_i(void)
80 {
81   MESSAGE("[" << _numproc << "] Engines_MPIContainer_i::~Engines_MPIContainer_i()");
82 }
83
84 // Load component
85 void Engines_MPIContainer_i::Shutdown()
86 {
87   int ip;
88   MESSAGE("[" << _numproc << "] shutdown of MPI Corba Server");
89   if( _numproc == 0 ){
90     _NS->Destroy_FullDirectory(_containerName.c_str());
91     for(ip= 1;ip<_nbproc;ip++)
92       (Engines::MPIContainer::_narrow((*_tior)[ip]))->Shutdown();
93   }
94   _orb->shutdown(0);
95
96 }
97
98 // Load a component library
99 bool Engines_MPIContainer_i::load_component_Library(const char* componentName)
100 {
101   if( _numproc == 0 ){
102     // Invocation du chargement du composant dans les autres process
103     for(int ip= 1;ip<_nbproc;ip++)
104       (Engines::MPIContainer::_narrow((*_tior)[ip]))->Asload_component_Library(componentName);
105   }
106
107   return Lload_component_Library(componentName);
108 }
109
110 void Engines_MPIContainer_i::Asload_component_Library(const char* componentName)
111 {
112   Lload_component_Library(componentName);
113 }
114
115 bool Engines_MPIContainer_i::Lload_component_Library(const char* componentName)
116 {
117   string aCompName = componentName;
118
119   // --- try dlopen C++ component
120
121   string impl_name = string ("lib") + aCompName + string("Engine.so");
122   SCRUTE(impl_name);
123   
124   _numInstanceMutex.lock(); // lock to be alone 
125   // (see decInstanceCnt, finalize_removal))
126   if (_toRemove_map[impl_name]) _toRemove_map.erase(impl_name);
127   if (_library_map[impl_name])
128     {
129       MESSAGE("[" << _numproc << "] Library " << impl_name << " already loaded");
130       _numInstanceMutex.unlock();
131       return true;
132     }
133   
134   void* handle;
135   handle = dlopen( impl_name.c_str() , RTLD_LAZY ) ;
136   if ( handle )
137     {
138       _library_map[impl_name] = handle;
139       _numInstanceMutex.unlock();
140       return true;
141     }
142   else
143     {
144       INFOS("[" << _numproc << "] Can't load shared library : " << impl_name);
145       INFOS("[" << _numproc << "] error dlopen: " << dlerror());
146     }
147   _numInstanceMutex.unlock();
148
149   // --- try import Python component
150
151   INFOS("[" << _numproc << "] try import Python component "<<componentName);
152   if (_isSupervContainer)
153     {
154       INFOS("[" << _numproc << "] Supervision Container does not support Python Component Engines");
155       return false;
156     }
157   if (_library_map[aCompName])
158     {
159       return true; // Python Component, already imported
160     }
161   else
162     {
163       Py_ACQUIRE_NEW_THREAD;
164       PyObject *mainmod = PyImport_AddModule("__main__");
165       PyObject *globals = PyModule_GetDict(mainmod);
166       PyObject *pyCont = PyDict_GetItemString(globals, "pyCont");
167       PyObject *result = PyObject_CallMethod(pyCont,
168                                              "import_component",
169                                              "s",componentName);
170       int ret= PyInt_AsLong(result);
171       SCRUTE(ret);
172       Py_RELEASE_NEW_THREAD;
173   
174       if (ret) // import possible: Python component
175         {
176           _library_map[aCompName] = (void *)pyCont; // any non O value OK
177           MESSAGE("[" << _numproc << "] import Python: "<<aCompName<<" OK");
178           return true;
179         }
180     }
181   return false;
182 }
183
184 // Create an instance of component
185 Engines::Component_ptr
186 Engines_MPIContainer_i::create_component_instance( const char* componentName,
187                                                    CORBA::Long studyId)
188 {
189   if( _numproc == 0 ){
190     // Invocation du chargement du composant dans les autres process
191     for(int ip= 1;ip<_nbproc;ip++)
192       (Engines::MPIContainer::_narrow((*_tior)[ip]))->Ascreate_component_instance(componentName,studyId);
193   }
194
195   return Lcreate_component_instance(componentName,studyId);
196 }
197
198 void Engines_MPIContainer_i::Ascreate_component_instance( const char* componentName,
199                                                           CORBA::Long studyId)
200 {
201   Lcreate_component_instance(componentName,studyId);
202 }
203
204 Engines::Component_ptr
205 Engines_MPIContainer_i::Lcreate_component_instance( const char* genericRegisterName, CORBA::Long studyId)
206 {
207   if (studyId < 0) {
208     INFOS("studyId must be > 0 for mono study instance, =0 for multiStudy");
209     return Engines::Component::_nil() ;
210   }
211
212   Engines::Component_var iobject = Engines::Component::_nil() ;
213   Engines::MPIObject_var pobj;
214
215   string aCompName = genericRegisterName;
216   if (_library_map[aCompName]) { // Python component
217     if (_isSupervContainer) {
218       INFOS("Supervision Container does not support Python Component Engines");
219       return Engines::Component::_nil();
220     }
221     _numInstanceMutex.lock() ; // lock on the instance number
222     _numInstance++ ;
223     int numInstance = _numInstance ;
224     _numInstanceMutex.unlock() ;
225
226     char aNumI[12];
227     sprintf( aNumI , "%d" , numInstance ) ;
228     string instanceName = aCompName + "_inst_" + aNumI ;
229     string component_registerName =
230       _containerName + "/" + instanceName;
231
232     Py_ACQUIRE_NEW_THREAD;
233     PyObject *mainmod = PyImport_AddModule("__main__");
234     PyObject *globals = PyModule_GetDict(mainmod);
235     PyObject *pyCont = PyDict_GetItemString(globals, "pyCont");
236     PyObject *result = PyObject_CallMethod(pyCont,
237                                            "create_component_instance",
238                                            "ssl",
239                                            aCompName.c_str(),
240                                            instanceName.c_str(),
241                                            studyId);
242     string iors = PyString_AsString(result);
243     SCRUTE(iors);
244     Py_RELEASE_NEW_THREAD;
245   
246     CORBA::Object_var obj = _orb->string_to_object(iors.c_str());
247     iobject = Engines::Component::_narrow( obj ) ;
248     pobj = Engines::MPIObject::_narrow(obj) ;
249     if( _numproc == 0 )
250       _NS->Register(iobject, component_registerName.c_str()) ;
251     // Root recupere les ior des composants des autre process
252     BCastIOR(_orb,pobj,false);
253
254     return iobject._retn();
255   }
256   
257   //--- try C++
258
259   string impl_name = string ("lib") + genericRegisterName +string("Engine.so");
260   void* handle = _library_map[impl_name];
261   if ( !handle ) {
262     INFOS("shared library " << impl_name <<"must be loaded before instance");
263     return Engines::Component::_nil() ;
264   }
265   else {
266     iobject = createMPIInstance(genericRegisterName,
267                                 handle,
268                                 studyId);
269     return iobject._retn();
270   }
271 }
272
273 Engines::Component_ptr
274 Engines_MPIContainer_i::createMPIInstance(string genericRegisterName,
275                                           void *handle,
276                                           int studyId)
277 {
278   Engines::Component_var iobject;
279   Engines::MPIObject_var pobj;
280   // --- find the factory
281
282   string aGenRegisterName = genericRegisterName;
283   string factory_name = aGenRegisterName + string("Engine_factory");
284   SCRUTE(factory_name) ;
285
286   typedef  PortableServer::ObjectId * (*MPIFACTORY_FUNCTION)
287     (int,int,
288      CORBA::ORB_ptr,
289      PortableServer::POA_ptr, 
290      PortableServer::ObjectId *, 
291      const char *, 
292      const char *) ;
293
294   MPIFACTORY_FUNCTION MPIComponent_factory
295     = (MPIFACTORY_FUNCTION) dlsym(handle, factory_name.c_str());
296
297   char *error ;
298   if ( (error = dlerror() ) != NULL) {
299     // Try to load a sequential component
300     MESSAGE("[" << _numproc << "] Try to load a sequential component");
301     _numInstanceMutex.unlock() ;
302     iobject = Engines_Container_i::createInstance(genericRegisterName,handle,studyId);
303     if( CORBA::is_nil(iobject) ) return Engines::Component::_duplicate(iobject);
304   }
305
306   // --- create instance
307
308   iobject = Engines::Component::_nil() ;
309
310   try
311     {
312       _numInstanceMutex.lock() ; // lock on the instance number
313       _numInstance++ ;
314       int numInstance = _numInstance ;
315       _numInstanceMutex.unlock() ;
316
317       char aNumI[12];
318       sprintf( aNumI , "%d" , numInstance ) ;
319       string instanceName = aGenRegisterName + "_inst_" + aNumI ;
320       string component_registerName =
321         _containerName + "/" + instanceName;
322
323       // --- Instanciate required CORBA object
324
325       PortableServer::ObjectId *id ; //not owner, do not delete (nore use var)
326       id = (MPIComponent_factory) ( _nbproc,_numproc,_orb, _poa, _id, instanceName.c_str(),
327                                  aGenRegisterName.c_str() ) ;
328
329       // --- get reference & servant from id
330
331       CORBA::Object_var obj = _poa->id_to_reference(*id);
332       iobject = Engines::Component::_narrow( obj ) ;
333       pobj = Engines::MPIObject::_narrow(obj) ;
334
335       Engines_Component_i *servant =
336         dynamic_cast<Engines_Component_i*>(_poa->reference_to_servant(iobject));
337       ASSERT(servant);
338       //SCRUTE(servant->pd_refCount);
339       servant->_remove_ref(); // compensate previous id_to_reference 
340       //SCRUTE(servant->pd_refCount);
341       _listInstances_map[instanceName] = iobject;
342       _cntInstances_map[aGenRegisterName] += 1;
343       SCRUTE(aGenRegisterName);
344       SCRUTE(_cntInstances_map[aGenRegisterName]);
345       //SCRUTE(servant->pd_refCount);
346       bool ret_studyId = servant->setStudyId(studyId);
347       ASSERT(ret_studyId);
348
349       // --- register the engine under the name
350       //     containerName(.dir)/instanceName(.object)
351
352       if( _numproc == 0 ){
353         _NS->Register( iobject , component_registerName.c_str() ) ;
354         MESSAGE( component_registerName.c_str() << " bound" ) ;
355       }
356       // Root recupere les ior des composants des autre process
357       BCastIOR(_orb,pobj,false);
358
359     }
360   catch (...)
361     {
362       INFOS( "Container_i::createInstance exception catched" ) ;
363     }
364   return iobject._retn();
365 }
366
367 // Load component
368 Engines::Component_ptr Engines_MPIContainer_i::load_impl(const char* nameToRegister,
369                                                  const char* componentName)
370 {
371   if( _numproc == 0 ){
372     // Invocation du chargement du composant dans les autres process
373     for(int ip= 1;ip<_nbproc;ip++)
374       (Engines::MPIContainer::_narrow((*_tior)[ip]))->Asload_impl(nameToRegister,
375                                                                 componentName);
376   }
377
378   return Lload_impl(nameToRegister,componentName);
379
380 }
381
382 // Load component
383 void Engines_MPIContainer_i::Asload_impl(const char* nameToRegister,
384                                          const char* componentName)
385 {
386   Lload_impl(nameToRegister,componentName);
387 }
388
389 Engines::Component_ptr Engines_MPIContainer_i::Lload_impl(
390                                    const char* nameToRegister,
391                                    const char* componentName)
392 {
393   Engines::Component_var iobject;
394   Engines::MPIObject_var pobj;
395   char cproc[4];
396
397   sprintf(cproc,"_%d",_numproc);
398
399   BEGIN_OF("[" << _numproc << "] MPIContainer_i::Lload_impl");
400
401   _numInstanceMutex.lock() ; // lock on the instance number
402   _numInstance++ ;
403   char _aNumI[12];
404   sprintf(_aNumI,"%d",_numInstance) ;
405
406   string _impl_name = componentName;
407   string _nameToRegister = nameToRegister;
408   string instanceName = _nameToRegister + "_inst_" + _aNumI + cproc;
409   MESSAGE("[" << _numproc << "] instanceName=" << instanceName);
410
411   string absolute_impl_name(_impl_name);
412   MESSAGE("[" << _numproc << "] absolute_impl_name=" << absolute_impl_name);
413   void * handle = dlopen(absolute_impl_name.c_str(), RTLD_LAZY);
414   if(!handle){
415     INFOS("[" << _numproc << "] Can't load shared library : " << absolute_impl_name);
416     INFOS("[" << _numproc << "] error dlopen: " << dlerror());
417     return Engines::Component::_nil() ;
418   }
419
420   string factory_name = _nameToRegister + string("Engine_factory");
421   MESSAGE("[" << _numproc << "] factory_name=" << factory_name) ;
422
423   PortableServer::ObjectId * (*MPIComponent_factory) (int,int,
424                                                   CORBA::ORB_ptr,
425                                                   PortableServer::POA_ptr,
426                                                   PortableServer::ObjectId *,
427                                                   const char *,
428                                                   const char *) =
429     (PortableServer::ObjectId * (*) (int,int,
430                                      CORBA::ORB_ptr,
431                                      PortableServer::POA_ptr, 
432                                      PortableServer::ObjectId *, 
433                                      const char *, 
434                                      const char *)) 
435     dlsym(handle, factory_name.c_str());
436
437   char *error ;
438   if ((error = dlerror()) != NULL){
439     // Try to load a sequential component
440     MESSAGE("[" << _numproc << "] Try to load a sequential component");
441     _numInstanceMutex.unlock() ;
442     iobject = Engines_Container_i::load_impl(nameToRegister,componentName);
443     if( CORBA::is_nil(iobject) ) return Engines::Component::_duplicate(iobject);
444   }
445   else{
446     // Instanciation du composant parallele
447     MESSAGE("[" << _numproc << "] Try to load a parallel component");
448     PortableServer::ObjectId * id = (MPIComponent_factory)
449       (_nbproc,_numproc,_orb, _poa, _id, instanceName.c_str(), _nameToRegister.c_str());
450     // get reference from id
451     CORBA::Object_var o = _poa->id_to_reference(*id);
452     pobj = Engines::MPIObject::_narrow(o) ;
453     iobject = Engines::Component::_narrow(o) ;
454   }
455
456   if( _numproc == 0 ){
457     // utiliser + tard le registry ici :
458     // register the engine under the name containerName.dir/nameToRegister.object
459     string component_registerName = _containerName + "/" + _nameToRegister;
460     _NS->Register(iobject, component_registerName.c_str()) ;
461   }
462
463   _numInstanceMutex.unlock() ;
464
465   // Root recupere les ior des composants des autre process
466   BCastIOR(_orb,pobj,false);
467
468   END_OF("[" <<_numproc << "] MPIContainer_i::Lload_impl");
469   return Engines::Component::_duplicate(iobject);
470
471 }
472
473 void Engines_MPIContainer_i::remove_impl(Engines::Component_ptr component_i)
474 {
475   int ip;
476   Engines::Component_ptr cptr;
477   Engines::MPIObject_ptr pcptr;
478   Engines::MPIObject_ptr spcptr;
479
480   ASSERT(! CORBA::is_nil(component_i));
481
482   if( _numproc == 0 ){
483     // Invocation de la destruction du composant dans les autres process
484     pcptr = (Engines::MPIObject_ptr)component_i;
485     for(ip= 1;ip<_nbproc;ip++){
486       spcptr = Engines::MPIObject::_narrow((*(pcptr->tior()))[ip]);
487       cptr = (Engines::Component_ptr)spcptr;
488       (Engines::MPIContainer::_narrow((*_tior)[ip]))->Asremove_impl(cptr);
489     }
490   }
491
492   Lremove_impl(component_i);
493 }
494
495 void Engines_MPIContainer_i::Asremove_impl(Engines::Component_ptr component_i)
496 {
497   Lremove_impl(component_i);
498 }
499
500 void Engines_MPIContainer_i::Lremove_impl(Engines::Component_ptr component_i)
501 {
502   BEGIN_OF("[" << _numproc << "] MPIContainer_i::Lremove_impl");
503
504   ASSERT(! CORBA::is_nil(component_i));
505
506   string instanceName = component_i->instanceName() ;
507   MESSAGE("[" << _numproc << "] unload component " << instanceName);
508   component_i->destroy() ;
509   MESSAGE("[" << _numproc << "] test key handle_map");
510   _numInstanceMutex.lock() ; // lock on the remove on handle_map
511   _numInstanceMutex.unlock() ;
512   MESSAGE("[" << _numproc << "] list handle_map");
513
514   END_OF("[" << _numproc << "] MPIContainer_i::Lremove_impl");
515
516 }
517
518 void Engines_MPIContainer_i::finalize_removal()
519 {
520   int ip;
521
522   if( _numproc == 0 ){
523     // Invocation de la destruction du composant dans les autres process
524     for(ip= 1;ip<_nbproc;ip++)
525       (Engines::MPIContainer::_narrow((*_tior)[ip]))->Asfinalize_removal();
526   }
527
528   Lfinalize_removal();
529 }
530
531 void Engines_MPIContainer_i::Asfinalize_removal()
532 {
533   Lfinalize_removal();
534 }
535
536 void Engines_MPIContainer_i::Lfinalize_removal()
537 {
538   BEGIN_OF("[" << _numproc << "] MPIContainer_i::Lfinalize_removal");
539
540 //   map<string, void *>::iterator im ;
541 //   // lock on the explore remove_map & dlclose
542 //   _numInstanceMutex.lock() ; 
543 //   for (im = remove_map.begin() ; im != remove_map.end() ; im ++)
544 //     {
545 //       void * handle = (*im).second ;
546 //       MESSAGE("[" << _numproc << "] dlclose " << (*im).first);
547 //       dlclose(handle) ;
548 //     }
549 //   MESSAGE("[" << _numproc << "] remove_map.clear()");
550 //   remove_map.clear() ;  
551 //   _numInstanceMutex.unlock() ;
552
553   END_OF("[" << _numproc << "] MPIContainer_i::Lfinalize_removal");
554 }