Salome HOME
PR: merge from branch BR_UT_V310a3 tag mergeto_trunk_05dec05
[modules/yacs.git] / src / MPIContainer / MPIContainer_i.cxx
1 //  SALOME MPIContainer : implemenation of container based on MPI libraries
2 //
3 //  Copyright (C) 2003  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 //  CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS 
5 // 
6 //  This library is free software; you can redistribute it and/or 
7 //  modify it under the terms of the GNU Lesser General Public 
8 //  License as published by the Free Software Foundation; either 
9 //  version 2.1 of the License. 
10 // 
11 //  This library is distributed in the hope that it will be useful, 
12 //  but WITHOUT ANY WARRANTY; without even the implied warranty of 
13 //  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
14 //  Lesser General Public License for more details. 
15 // 
16 //  You should have received a copy of the GNU Lesser General Public 
17 //  License along with this library; if not, write to the Free Software 
18 //  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA 
19 // 
20 //  See http://www.opencascade.org/SALOME/ or email : webmaster.salome@opencascade.org 
21 //
22 //
23 //
24 //  File   : MPIContainer_i.cxx
25 //  Module : SALOME
26
27 #include <iostream.h>
28 #include <dlfcn.h>
29 #include <stdio.h>
30 #include "SALOME_Component_i.hxx"
31 #include "MPIContainer_i.hxx"
32 #include "SALOME_NamingService.hxx"
33 #include "Utils_SINGLETON.hxx"
34 #include "OpUtil.hxx"
35 #include "utilities.h"
36 #include <Python.h>
37 #include "Container_init_python.hxx"
38 using namespace std;
39
40 // L'appel au registry SALOME ne se fait que pour le process 0
41 Engines_MPIContainer_i::Engines_MPIContainer_i(int nbproc, int numproc,
42                                                CORBA::ORB_ptr orb, 
43                                                PortableServer::POA_ptr poa,
44                                                char * containerName,
45                                                int argc, char *argv[]) 
46   : Engines_Container_i(orb,poa,containerName,argc,argv,false), MPIObject_i(nbproc,numproc)
47 {
48   long id=0;
49   string IdContainerinNS;
50   char idc[3*sizeof(long)];
51
52   MESSAGE("[" << numproc << "] activate object");
53   _id = _poa->activate_object(this);
54
55   if(argc>1)
56     {
57       for(int i=0;i<argc;i++)
58         {
59           if(strcmp(argv[i],"-id")==NULL)
60             {
61               id = atoi(argv[i+1]);
62               continue;
63             }
64         }
65     }
66   SCRUTE(id);
67
68   if(numproc==0){
69
70     _NS = new SALOME_NamingService();
71     _NS->init_orb( CORBA::ORB::_duplicate(_orb) ) ;
72
73     CORBA::Object_var obj=_poa->id_to_reference(*_id);
74     Engines::Container_var pCont = Engines::Container::_narrow(obj);
75
76     string hostname = GetHostname();
77     _containerName = _NS->BuildContainerNameForNS(containerName,hostname.c_str());
78     SCRUTE(_containerName);
79     _NS->Register(pCont, _containerName.c_str());
80
81     // A parallel container registers in Naming Service
82     // on the machine where is process 0. ContainerManager does'nt know the name
83     // of this machine before the launch of the parallel container. So to get
84     // the IOR of the parallel container in Naming Service, ContainerManager
85     // gives a unique Id. The parallel container registers his name under
86     // /ContainerManager/Id directory in NamingService
87
88     IdContainerinNS = "/ContainerManager/id";
89     sprintf(idc,"%ld",id);
90     IdContainerinNS += idc;
91     SCRUTE(IdContainerinNS);
92     _NS->Register(pCont, IdContainerinNS.c_str());
93
94   }
95
96   // Root recupere les ior des container des autre process
97   Engines::MPIObject_var pobj = POA_Engines::MPIContainer::_this();
98   BCastIOR(_orb,pobj,true);
99 }
100
101 Engines_MPIContainer_i::Engines_MPIContainer_i(int nbproc, int numproc) 
102   : Engines_Container_i(), MPIObject_i(nbproc,numproc)
103 {
104 }
105
106 Engines_MPIContainer_i::~Engines_MPIContainer_i(void)
107 {
108   MESSAGE("[" << _numproc << "] Engines_MPIContainer_i::~Engines_MPIContainer_i()");
109 }
110
111 // Load component
112 void Engines_MPIContainer_i::Shutdown()
113 {
114   int ip;
115   MESSAGE("[" << _numproc << "] shutdown of MPI Corba Server");
116   if( _numproc == 0 ){
117     _NS->Destroy_FullDirectory(_containerName.c_str());
118     for(ip= 1;ip<_nbproc;ip++)
119       (Engines::MPIContainer::_narrow((*_tior)[ip]))->Shutdown();
120   }
121   _orb->shutdown(0);
122
123 }
124
125 // Load a component library
126 bool Engines_MPIContainer_i::load_component_Library(const char* componentName)
127 {
128   if( _numproc == 0 ){
129     // Invocation du chargement du composant dans les autres process
130     for(int ip= 1;ip<_nbproc;ip++)
131       (Engines::MPIContainer::_narrow((*_tior)[ip]))->Asload_component_Library(componentName);
132   }
133
134   return Lload_component_Library(componentName);
135 }
136
137 void Engines_MPIContainer_i::Asload_component_Library(const char* componentName)
138 {
139   Lload_component_Library(componentName);
140 }
141
142 bool Engines_MPIContainer_i::Lload_component_Library(const char* componentName)
143 {
144   string aCompName = componentName;
145
146   // --- try dlopen C++ component
147
148   string impl_name = string ("lib") + aCompName + string("Engine.so");
149   SCRUTE(impl_name);
150   
151   _numInstanceMutex.lock(); // lock to be alone 
152   // (see decInstanceCnt, finalize_removal))
153   if (_toRemove_map[impl_name]) _toRemove_map.erase(impl_name);
154   if (_library_map[impl_name])
155     {
156       MESSAGE("[" << _numproc << "] Library " << impl_name << " already loaded");
157       _numInstanceMutex.unlock();
158       return true;
159     }
160   
161   void* handle;
162   handle = dlopen( impl_name.c_str() , RTLD_LAZY ) ;
163   if ( handle )
164     {
165       _library_map[impl_name] = handle;
166       _numInstanceMutex.unlock();
167       return true;
168     }
169   else
170     {
171       INFOS("[" << _numproc << "] Can't load shared library : " << impl_name);
172       INFOS("[" << _numproc << "] error dlopen: " << dlerror());
173     }
174   _numInstanceMutex.unlock();
175
176   // --- try import Python component
177
178   INFOS("[" << _numproc << "] try import Python component "<<componentName);
179   if (_isSupervContainer)
180     {
181       INFOS("[" << _numproc << "] Supervision Container does not support Python Component Engines");
182       return false;
183     }
184   if (_library_map[aCompName])
185     {
186       return true; // Python Component, already imported
187     }
188   else
189     {
190       Py_ACQUIRE_NEW_THREAD;
191       PyObject *mainmod = PyImport_AddModule("__main__");
192       PyObject *globals = PyModule_GetDict(mainmod);
193       PyObject *pyCont = PyDict_GetItemString(globals, "pyCont");
194       PyObject *result = PyObject_CallMethod(pyCont,
195                                              "import_component",
196                                              "s",componentName);
197       int ret= PyInt_AsLong(result);
198       SCRUTE(ret);
199       Py_RELEASE_NEW_THREAD;
200   
201       if (ret) // import possible: Python component
202         {
203           _library_map[aCompName] = (void *)pyCont; // any non O value OK
204           MESSAGE("[" << _numproc << "] import Python: "<<aCompName<<" OK");
205           return true;
206         }
207     }
208   return false;
209 }
210
211 // Create an instance of component
212 Engines::Component_ptr
213 Engines_MPIContainer_i::create_component_instance( const char* componentName,
214                                                    CORBA::Long studyId)
215 {
216   if( _numproc == 0 ){
217     // Invocation du chargement du composant dans les autres process
218     for(int ip= 1;ip<_nbproc;ip++)
219       (Engines::MPIContainer::_narrow((*_tior)[ip]))->Ascreate_component_instance(componentName,studyId);
220   }
221
222   return Lcreate_component_instance(componentName,studyId);
223 }
224
225 void Engines_MPIContainer_i::Ascreate_component_instance( const char* componentName,
226                                                           CORBA::Long studyId)
227 {
228   Lcreate_component_instance(componentName,studyId);
229 }
230
231 Engines::Component_ptr
232 Engines_MPIContainer_i::Lcreate_component_instance( const char* genericRegisterName, CORBA::Long studyId)
233 {
234   if (studyId < 0) {
235     INFOS("studyId must be > 0 for mono study instance, =0 for multiStudy");
236     return Engines::Component::_nil() ;
237   }
238
239   Engines::Component_var iobject = Engines::Component::_nil() ;
240   Engines::MPIObject_var pobj;
241
242   string aCompName = genericRegisterName;
243   if (_library_map[aCompName]) { // Python component
244     if (_isSupervContainer) {
245       INFOS("Supervision Container does not support Python Component Engines");
246       return Engines::Component::_nil();
247     }
248     _numInstanceMutex.lock() ; // lock on the instance number
249     _numInstance++ ;
250     int numInstance = _numInstance ;
251     _numInstanceMutex.unlock() ;
252
253     char aNumI[12];
254     sprintf( aNumI , "%d" , numInstance ) ;
255     string instanceName = aCompName + "_inst_" + aNumI ;
256     string component_registerName =
257       _containerName + "/" + instanceName;
258
259     Py_ACQUIRE_NEW_THREAD;
260     PyObject *mainmod = PyImport_AddModule("__main__");
261     PyObject *globals = PyModule_GetDict(mainmod);
262     PyObject *pyCont = PyDict_GetItemString(globals, "pyCont");
263     PyObject *result = PyObject_CallMethod(pyCont,
264                                            "create_component_instance",
265                                            "ssl",
266                                            aCompName.c_str(),
267                                            instanceName.c_str(),
268                                            studyId);
269     string iors = PyString_AsString(result);
270     SCRUTE(iors);
271     Py_RELEASE_NEW_THREAD;
272   
273     CORBA::Object_var obj = _orb->string_to_object(iors.c_str());
274     iobject = Engines::Component::_narrow( obj ) ;
275     pobj = Engines::MPIObject::_narrow(obj) ;
276     if( _numproc == 0 )
277       _NS->Register(iobject, component_registerName.c_str()) ;
278     // Root recupere les ior des composants des autre process
279     BCastIOR(_orb,pobj,false);
280
281     return iobject._retn();
282   }
283   
284   //--- try C++
285
286   string impl_name = string ("lib") + genericRegisterName +string("Engine.so");
287   void* handle = _library_map[impl_name];
288   if ( !handle ) {
289     INFOS("shared library " << impl_name <<"must be loaded before instance");
290     return Engines::Component::_nil() ;
291   }
292   else {
293     iobject = createMPIInstance(genericRegisterName,
294                                 handle,
295                                 studyId);
296     return iobject._retn();
297   }
298 }
299
300 Engines::Component_ptr
301 Engines_MPIContainer_i::createMPIInstance(string genericRegisterName,
302                                           void *handle,
303                                           int studyId)
304 {
305   Engines::Component_var iobject;
306   Engines::MPIObject_var pobj;
307   // --- find the factory
308
309   string aGenRegisterName = genericRegisterName;
310   string factory_name = aGenRegisterName + string("Engine_factory");
311   SCRUTE(factory_name) ;
312
313   typedef  PortableServer::ObjectId * (*MPIFACTORY_FUNCTION)
314     (int,int,
315      CORBA::ORB_ptr,
316      PortableServer::POA_ptr, 
317      PortableServer::ObjectId *, 
318      const char *, 
319      const char *) ;
320
321   MPIFACTORY_FUNCTION MPIComponent_factory
322     = (MPIFACTORY_FUNCTION) dlsym(handle, factory_name.c_str());
323
324   char *error ;
325   if ( (error = dlerror() ) != NULL) {
326     // Try to load a sequential component
327     MESSAGE("[" << _numproc << "] Try to load a sequential component");
328     _numInstanceMutex.unlock() ;
329     iobject = Engines_Container_i::createInstance(genericRegisterName,handle,studyId);
330     if( CORBA::is_nil(iobject) ) return Engines::Component::_duplicate(iobject);
331   }
332
333   // --- create instance
334
335   iobject = Engines::Component::_nil() ;
336
337   try
338     {
339       _numInstanceMutex.lock() ; // lock on the instance number
340       _numInstance++ ;
341       int numInstance = _numInstance ;
342       _numInstanceMutex.unlock() ;
343
344       char aNumI[12];
345       sprintf( aNumI , "%d" , numInstance ) ;
346       string instanceName = aGenRegisterName + "_inst_" + aNumI ;
347       string component_registerName =
348         _containerName + "/" + instanceName;
349
350       // --- Instanciate required CORBA object
351
352       PortableServer::ObjectId *id ; //not owner, do not delete (nore use var)
353       id = (MPIComponent_factory) ( _nbproc,_numproc,_orb, _poa, _id, instanceName.c_str(),
354                                  aGenRegisterName.c_str() ) ;
355
356       // --- get reference & servant from id
357
358       CORBA::Object_var obj = _poa->id_to_reference(*id);
359       iobject = Engines::Component::_narrow( obj ) ;
360       pobj = Engines::MPIObject::_narrow(obj) ;
361
362       Engines_Component_i *servant =
363         dynamic_cast<Engines_Component_i*>(_poa->reference_to_servant(iobject));
364       ASSERT(servant);
365       //SCRUTE(servant->pd_refCount);
366       servant->_remove_ref(); // compensate previous id_to_reference 
367       //SCRUTE(servant->pd_refCount);
368       _listInstances_map[instanceName] = iobject;
369       _cntInstances_map[aGenRegisterName] += 1;
370       SCRUTE(aGenRegisterName);
371       SCRUTE(_cntInstances_map[aGenRegisterName]);
372       //SCRUTE(servant->pd_refCount);
373       bool ret_studyId = servant->setStudyId(studyId);
374       ASSERT(ret_studyId);
375
376       // --- register the engine under the name
377       //     containerName(.dir)/instanceName(.object)
378
379       if( _numproc == 0 ){
380         _NS->Register( iobject , component_registerName.c_str() ) ;
381         MESSAGE( component_registerName.c_str() << " bound" ) ;
382       }
383       // Root recupere les ior des composants des autre process
384       BCastIOR(_orb,pobj,false);
385
386     }
387   catch (...)
388     {
389       INFOS( "Container_i::createInstance exception catched" ) ;
390     }
391   return iobject._retn();
392 }
393
394 // Load component
395 Engines::Component_ptr Engines_MPIContainer_i::load_impl(const char* nameToRegister,
396                                                  const char* componentName)
397 {
398   if( _numproc == 0 ){
399     // Invocation du chargement du composant dans les autres process
400     for(int ip= 1;ip<_nbproc;ip++)
401       (Engines::MPIContainer::_narrow((*_tior)[ip]))->Asload_impl(nameToRegister,
402                                                                 componentName);
403   }
404
405   return Lload_impl(nameToRegister,componentName);
406
407 }
408
409 // Load component
410 void Engines_MPIContainer_i::Asload_impl(const char* nameToRegister,
411                                          const char* componentName)
412 {
413   Lload_impl(nameToRegister,componentName);
414 }
415
416 Engines::Component_ptr Engines_MPIContainer_i::Lload_impl(
417                                    const char* nameToRegister,
418                                    const char* componentName)
419 {
420   Engines::Component_var iobject;
421   Engines::MPIObject_var pobj;
422   char cproc[4];
423
424   sprintf(cproc,"_%d",_numproc);
425
426   BEGIN_OF("[" << _numproc << "] MPIContainer_i::Lload_impl");
427
428   _numInstanceMutex.lock() ; // lock on the instance number
429   _numInstance++ ;
430   char _aNumI[12];
431   sprintf(_aNumI,"%d",_numInstance) ;
432
433   string _impl_name = componentName;
434   string _nameToRegister = nameToRegister;
435   string instanceName = _nameToRegister + "_inst_" + _aNumI + cproc;
436   MESSAGE("[" << _numproc << "] instanceName=" << instanceName);
437
438   string absolute_impl_name(_impl_name);
439   MESSAGE("[" << _numproc << "] absolute_impl_name=" << absolute_impl_name);
440   void * handle = dlopen(absolute_impl_name.c_str(), RTLD_LAZY);
441   if(!handle){
442     INFOS("[" << _numproc << "] Can't load shared library : " << absolute_impl_name);
443     INFOS("[" << _numproc << "] error dlopen: " << dlerror());
444     return Engines::Component::_nil() ;
445   }
446
447   string factory_name = _nameToRegister + string("Engine_factory");
448   MESSAGE("[" << _numproc << "] factory_name=" << factory_name) ;
449
450   PortableServer::ObjectId * (*MPIComponent_factory) (int,int,
451                                                   CORBA::ORB_ptr,
452                                                   PortableServer::POA_ptr,
453                                                   PortableServer::ObjectId *,
454                                                   const char *,
455                                                   const char *) =
456     (PortableServer::ObjectId * (*) (int,int,
457                                      CORBA::ORB_ptr,
458                                      PortableServer::POA_ptr, 
459                                      PortableServer::ObjectId *, 
460                                      const char *, 
461                                      const char *)) 
462     dlsym(handle, factory_name.c_str());
463
464   char *error ;
465   if ((error = dlerror()) != NULL){
466     // Try to load a sequential component
467     MESSAGE("[" << _numproc << "] Try to load a sequential component");
468     _numInstanceMutex.unlock() ;
469     iobject = Engines_Container_i::load_impl(nameToRegister,componentName);
470     if( CORBA::is_nil(iobject) ) return Engines::Component::_duplicate(iobject);
471   }
472   else{
473     // Instanciation du composant parallele
474     MESSAGE("[" << _numproc << "] Try to load a parallel component");
475     PortableServer::ObjectId * id = (MPIComponent_factory)
476       (_nbproc,_numproc,_orb, _poa, _id, instanceName.c_str(), _nameToRegister.c_str());
477     // get reference from id
478     CORBA::Object_var o = _poa->id_to_reference(*id);
479     pobj = Engines::MPIObject::_narrow(o) ;
480     iobject = Engines::Component::_narrow(o) ;
481   }
482
483   if( _numproc == 0 ){
484     // utiliser + tard le registry ici :
485     // register the engine under the name containerName.dir/nameToRegister.object
486     string component_registerName = _containerName + "/" + _nameToRegister;
487     _NS->Register(iobject, component_registerName.c_str()) ;
488   }
489
490   _numInstanceMutex.unlock() ;
491
492   // Root recupere les ior des composants des autre process
493   BCastIOR(_orb,pobj,false);
494
495   END_OF("[" <<_numproc << "] MPIContainer_i::Lload_impl");
496   return Engines::Component::_duplicate(iobject);
497
498 }
499
500 void Engines_MPIContainer_i::remove_impl(Engines::Component_ptr component_i)
501 {
502   int ip;
503   Engines::Component_ptr cptr;
504   Engines::MPIObject_ptr pcptr;
505   Engines::MPIObject_ptr spcptr;
506
507   ASSERT(! CORBA::is_nil(component_i));
508
509   if( _numproc == 0 ){
510     // Invocation de la destruction du composant dans les autres process
511     pcptr = (Engines::MPIObject_ptr)component_i;
512     for(ip= 1;ip<_nbproc;ip++){
513       spcptr = Engines::MPIObject::_narrow((*(pcptr->tior()))[ip]);
514       cptr = (Engines::Component_ptr)spcptr;
515       (Engines::MPIContainer::_narrow((*_tior)[ip]))->Asremove_impl(cptr);
516     }
517   }
518
519   Lremove_impl(component_i);
520 }
521
522 void Engines_MPIContainer_i::Asremove_impl(Engines::Component_ptr component_i)
523 {
524   Lremove_impl(component_i);
525 }
526
527 void Engines_MPIContainer_i::Lremove_impl(Engines::Component_ptr component_i)
528 {
529   BEGIN_OF("[" << _numproc << "] MPIContainer_i::Lremove_impl");
530
531   ASSERT(! CORBA::is_nil(component_i));
532
533   string instanceName = component_i->instanceName() ;
534   MESSAGE("[" << _numproc << "] unload component " << instanceName);
535   component_i->destroy() ;
536   MESSAGE("[" << _numproc << "] test key handle_map");
537   _numInstanceMutex.lock() ; // lock on the remove on handle_map
538   _numInstanceMutex.unlock() ;
539   MESSAGE("[" << _numproc << "] list handle_map");
540
541   END_OF("[" << _numproc << "] MPIContainer_i::Lremove_impl");
542
543 }
544
545 void Engines_MPIContainer_i::finalize_removal()
546 {
547   int ip;
548
549   if( _numproc == 0 ){
550     // Invocation de la destruction du composant dans les autres process
551     for(ip= 1;ip<_nbproc;ip++)
552       (Engines::MPIContainer::_narrow((*_tior)[ip]))->Asfinalize_removal();
553   }
554
555   Lfinalize_removal();
556 }
557
558 void Engines_MPIContainer_i::Asfinalize_removal()
559 {
560   Lfinalize_removal();
561 }
562
563 void Engines_MPIContainer_i::Lfinalize_removal()
564 {
565   BEGIN_OF("[" << _numproc << "] MPIContainer_i::Lfinalize_removal");
566
567 //   map<string, void *>::iterator im ;
568 //   // lock on the explore remove_map & dlclose
569 //   _numInstanceMutex.lock() ; 
570 //   for (im = remove_map.begin() ; im != remove_map.end() ; im ++)
571 //     {
572 //       void * handle = (*im).second ;
573 //       MESSAGE("[" << _numproc << "] dlclose " << (*im).first);
574 //       dlclose(handle) ;
575 //     }
576 //   MESSAGE("[" << _numproc << "] remove_map.clear()");
577 //   remove_map.clear() ;  
578 //   _numInstanceMutex.unlock() ;
579
580   END_OF("[" << _numproc << "] MPIContainer_i::Lfinalize_removal");
581 }