Salome HOME
dfeb178e13579d792ccfb0b10b30e9d79103abae
[modules/kernel.git] / src / MPIContainer / MPIContainer_i.cxx
1 //  SALOME MPIContainer : implemenation of container based on MPI libraries
2 //
3 //  Copyright (C) 2003  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 //  CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS 
5 // 
6 //  This library is free software; you can redistribute it and/or 
7 //  modify it under the terms of the GNU Lesser General Public 
8 //  License as published by the Free Software Foundation; either 
9 //  version 2.1 of the License. 
10 // 
11 //  This library is distributed in the hope that it will be useful, 
12 //  but WITHOUT ANY WARRANTY; without even the implied warranty of 
13 //  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
14 //  Lesser General Public License for more details. 
15 // 
16 //  You should have received a copy of the GNU Lesser General Public 
17 //  License along with this library; if not, write to the Free Software 
18 //  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA 
19 // 
20 //  See http://www.opencascade.org/SALOME/ or email : webmaster.salome@opencascade.org 
21 //
22 //
23 //
24 //  File   : MPIContainer_i.cxx
25 //  Module : SALOME
26
27 #include <iostream.h>
28 #include <dlfcn.h>
29 #include <stdio.h>
30 #include "SALOME_Component_i.hxx"
31 #include "MPIContainer_i.hxx"
32 #include "SALOME_NamingService.hxx"
33 #include "Utils_SINGLETON.hxx"
34 #include "OpUtil.hxx"
35 #include "utilities.h"
36 #include <pthread.h>  // must be before Python.h !
37 #include <Python.h>
38 #include "Container_init_python.hxx"
39 using namespace std;
40
41 // L'appel au registry SALOME ne se fait que pour le process 0
42 Engines_MPIContainer_i::Engines_MPIContainer_i(int nbproc, int numproc,
43                                                CORBA::ORB_ptr orb, 
44                                                PortableServer::POA_ptr poa,
45                                                char * containerName,
46                                                int argc, char *argv[]) 
47   : Engines_Container_i(orb,poa,containerName,argc,argv,false), MPIObject_i(nbproc,numproc)
48 {
49   long id=0;
50   string IdContainerinNS;
51   char idc[3*sizeof(long)];
52
53   MESSAGE("[" << numproc << "] activate object");
54   _id = _poa->activate_object(this);
55
56   if(argc>1)
57     {
58       for(int i=0;i<argc;i++)
59         {
60           if(strcmp(argv[i],"-id")==NULL)
61             {
62               id = atoi(argv[i+1]);
63               continue;
64             }
65         }
66     }
67   SCRUTE(id);
68
69   if(numproc==0){
70
71     _NS = new SALOME_NamingService();
72     _NS->init_orb( CORBA::ORB::_duplicate(_orb) ) ;
73
74     CORBA::Object_var obj=_poa->id_to_reference(*_id);
75     Engines::Container_var pCont = Engines::Container::_narrow(obj);
76
77     string hostname = GetHostname();
78     _containerName = _NS->BuildContainerNameForNS(containerName,hostname.c_str());
79     SCRUTE(_containerName);
80     _NS->Register(pCont, _containerName.c_str());
81
82     // A parallel container registers in Naming Service
83     // on the machine where is process 0. ContainerManager does'nt know the name
84     // of this machine before the launch of the parallel container. So to get
85     // the IOR of the parallel container in Naming Service, ContainerManager
86     // gives a unique Id. The parallel container registers his name under
87     // /ContainerManager/Id directory in NamingService
88
89     IdContainerinNS = "/ContainerManager/id";
90     sprintf(idc,"%ld",id);
91     IdContainerinNS += idc;
92     SCRUTE(IdContainerinNS);
93     _NS->Register(pCont, IdContainerinNS.c_str());
94
95   }
96
97   // Root recupere les ior des container des autre process
98   Engines::MPIObject_var pobj = POA_Engines::MPIContainer::_this();
99   BCastIOR(_orb,pobj,true);
100 }
101
102 Engines_MPIContainer_i::Engines_MPIContainer_i(int nbproc, int numproc) 
103   : Engines_Container_i(), MPIObject_i(nbproc,numproc)
104 {
105 }
106
107 Engines_MPIContainer_i::~Engines_MPIContainer_i(void)
108 {
109   MESSAGE("[" << _numproc << "] Engines_MPIContainer_i::~Engines_MPIContainer_i()");
110 }
111
112 // Load component
113 void Engines_MPIContainer_i::Shutdown()
114 {
115   int ip;
116   MESSAGE("[" << _numproc << "] shutdown of MPI Corba Server");
117   if( _numproc == 0 ){
118     _NS->Destroy_FullDirectory(_containerName.c_str());
119     for(ip= 1;ip<_nbproc;ip++)
120       (Engines::MPIContainer::_narrow((*_tior)[ip]))->Shutdown();
121   }
122   _orb->shutdown(0);
123
124 }
125
126 // Load a component library
127 bool Engines_MPIContainer_i::load_component_Library(const char* componentName)
128 {
129   if( _numproc == 0 ){
130     // Invocation du chargement du composant dans les autres process
131     for(int ip= 1;ip<_nbproc;ip++)
132       (Engines::MPIContainer::_narrow((*_tior)[ip]))->Asload_component_Library(componentName);
133   }
134
135   return Lload_component_Library(componentName);
136 }
137
138 void Engines_MPIContainer_i::Asload_component_Library(const char* componentName)
139 {
140   Lload_component_Library(componentName);
141 }
142
143 bool Engines_MPIContainer_i::Lload_component_Library(const char* componentName)
144 {
145   string aCompName = componentName;
146
147   // --- try dlopen C++ component
148
149   string impl_name = string ("lib") + aCompName + string("Engine.so");
150   SCRUTE(impl_name);
151   
152   _numInstanceMutex.lock(); // lock to be alone 
153   // (see decInstanceCnt, finalize_removal))
154   if (_toRemove_map[impl_name]) _toRemove_map.erase(impl_name);
155   if (_library_map[impl_name])
156     {
157       MESSAGE("[" << _numproc << "] Library " << impl_name << " already loaded");
158       _numInstanceMutex.unlock();
159       return true;
160     }
161   
162   void* handle;
163   handle = dlopen( impl_name.c_str() , RTLD_LAZY ) ;
164   if ( handle )
165     {
166       _library_map[impl_name] = handle;
167       _numInstanceMutex.unlock();
168       return true;
169     }
170   else
171     {
172       INFOS("[" << _numproc << "] Can't load shared library : " << impl_name);
173       INFOS("[" << _numproc << "] error dlopen: " << dlerror());
174     }
175   _numInstanceMutex.unlock();
176
177   // --- try import Python component
178
179   INFOS("[" << _numproc << "] try import Python component "<<componentName);
180   if (_isSupervContainer)
181     {
182       INFOS("[" << _numproc << "] Supervision Container does not support Python Component Engines");
183       return false;
184     }
185   if (_library_map[aCompName])
186     {
187       return true; // Python Component, already imported
188     }
189   else
190     {
191       Py_ACQUIRE_NEW_THREAD;
192       PyObject *mainmod = PyImport_AddModule("__main__");
193       PyObject *globals = PyModule_GetDict(mainmod);
194       PyObject *pyCont = PyDict_GetItemString(globals, "pyCont");
195       PyObject *result = PyObject_CallMethod(pyCont,
196                                              "import_component",
197                                              "s",componentName);
198       int ret= PyInt_AsLong(result);
199       SCRUTE(ret);
200       Py_RELEASE_NEW_THREAD;
201   
202       if (ret) // import possible: Python component
203         {
204           _library_map[aCompName] = (void *)pyCont; // any non O value OK
205           MESSAGE("[" << _numproc << "] import Python: "<<aCompName<<" OK");
206           return true;
207         }
208     }
209   return false;
210 }
211
212 // Create an instance of component
213 Engines::Component_ptr
214 Engines_MPIContainer_i::create_component_instance( const char* componentName,
215                                                    CORBA::Long studyId)
216 {
217   if( _numproc == 0 ){
218     // Invocation du chargement du composant dans les autres process
219     for(int ip= 1;ip<_nbproc;ip++)
220       (Engines::MPIContainer::_narrow((*_tior)[ip]))->Ascreate_component_instance(componentName,studyId);
221   }
222
223   return Lcreate_component_instance(componentName,studyId);
224 }
225
226 void Engines_MPIContainer_i::Ascreate_component_instance( const char* componentName,
227                                                           CORBA::Long studyId)
228 {
229   Lcreate_component_instance(componentName,studyId);
230 }
231
232 Engines::Component_ptr
233 Engines_MPIContainer_i::Lcreate_component_instance( const char* genericRegisterName, CORBA::Long studyId)
234 {
235   if (studyId < 0) {
236     INFOS("studyId must be > 0 for mono study instance, =0 for multiStudy");
237     return Engines::Component::_nil() ;
238   }
239
240   Engines::Component_var iobject = Engines::Component::_nil() ;
241   Engines::MPIObject_var pobj;
242
243   string aCompName = genericRegisterName;
244   if (_library_map[aCompName]) { // Python component
245     if (_isSupervContainer) {
246       INFOS("Supervision Container does not support Python Component Engines");
247       return Engines::Component::_nil();
248     }
249     _numInstanceMutex.lock() ; // lock on the instance number
250     _numInstance++ ;
251     int numInstance = _numInstance ;
252     _numInstanceMutex.unlock() ;
253
254     char aNumI[12];
255     sprintf( aNumI , "%d" , numInstance ) ;
256     string instanceName = aCompName + "_inst_" + aNumI ;
257     string component_registerName =
258       _containerName + "/" + instanceName;
259
260     Py_ACQUIRE_NEW_THREAD;
261     PyObject *mainmod = PyImport_AddModule("__main__");
262     PyObject *globals = PyModule_GetDict(mainmod);
263     PyObject *pyCont = PyDict_GetItemString(globals, "pyCont");
264     PyObject *result = PyObject_CallMethod(pyCont,
265                                            "create_component_instance",
266                                            "ssl",
267                                            aCompName.c_str(),
268                                            instanceName.c_str(),
269                                            studyId);
270     string iors = PyString_AsString(result);
271     SCRUTE(iors);
272     Py_RELEASE_NEW_THREAD;
273   
274     CORBA::Object_var obj = _orb->string_to_object(iors.c_str());
275     iobject = Engines::Component::_narrow( obj ) ;
276     pobj = Engines::MPIObject::_narrow(obj) ;
277     if( _numproc == 0 )
278       _NS->Register(iobject, component_registerName.c_str()) ;
279     // Root recupere les ior des composants des autre process
280     BCastIOR(_orb,pobj,false);
281
282     return iobject._retn();
283   }
284   
285   //--- try C++
286
287   string impl_name = string ("lib") + genericRegisterName +string("Engine.so");
288   void* handle = _library_map[impl_name];
289   if ( !handle ) {
290     INFOS("shared library " << impl_name <<"must be loaded before instance");
291     return Engines::Component::_nil() ;
292   }
293   else {
294     iobject = createMPIInstance(genericRegisterName,
295                                 handle,
296                                 studyId);
297     return iobject._retn();
298   }
299 }
300
301 Engines::Component_ptr
302 Engines_MPIContainer_i::createMPIInstance(string genericRegisterName,
303                                           void *handle,
304                                           int studyId)
305 {
306   Engines::Component_var iobject;
307   Engines::MPIObject_var pobj;
308   // --- find the factory
309
310   string aGenRegisterName = genericRegisterName;
311   string factory_name = aGenRegisterName + string("Engine_factory");
312   SCRUTE(factory_name) ;
313
314   typedef  PortableServer::ObjectId * (*MPIFACTORY_FUNCTION)
315     (int,int,
316      CORBA::ORB_ptr,
317      PortableServer::POA_ptr, 
318      PortableServer::ObjectId *, 
319      const char *, 
320      const char *) ;
321
322   MPIFACTORY_FUNCTION MPIComponent_factory
323     = (MPIFACTORY_FUNCTION) dlsym(handle, factory_name.c_str());
324
325   char *error ;
326   if ( (error = dlerror() ) != NULL) {
327     // Try to load a sequential component
328     MESSAGE("[" << _numproc << "] Try to load a sequential component");
329     _numInstanceMutex.unlock() ;
330     iobject = Engines_Container_i::createInstance(genericRegisterName,handle,studyId);
331     if( CORBA::is_nil(iobject) ) return Engines::Component::_duplicate(iobject);
332   }
333
334   // --- create instance
335
336   iobject = Engines::Component::_nil() ;
337
338   try
339     {
340       _numInstanceMutex.lock() ; // lock on the instance number
341       _numInstance++ ;
342       int numInstance = _numInstance ;
343       _numInstanceMutex.unlock() ;
344
345       char aNumI[12];
346       sprintf( aNumI , "%d" , numInstance ) ;
347       string instanceName = aGenRegisterName + "_inst_" + aNumI ;
348       string component_registerName =
349         _containerName + "/" + instanceName;
350
351       // --- Instanciate required CORBA object
352
353       PortableServer::ObjectId *id ; //not owner, do not delete (nore use var)
354       id = (MPIComponent_factory) ( _nbproc,_numproc,_orb, _poa, _id, instanceName.c_str(),
355                                  aGenRegisterName.c_str() ) ;
356
357       // --- get reference & servant from id
358
359       CORBA::Object_var obj = _poa->id_to_reference(*id);
360       iobject = Engines::Component::_narrow( obj ) ;
361       pobj = Engines::MPIObject::_narrow(obj) ;
362
363       Engines_Component_i *servant =
364         dynamic_cast<Engines_Component_i*>(_poa->reference_to_servant(iobject));
365       ASSERT(servant);
366       //SCRUTE(servant->pd_refCount);
367       servant->_remove_ref(); // compensate previous id_to_reference 
368       //SCRUTE(servant->pd_refCount);
369       _listInstances_map[instanceName] = iobject;
370       _cntInstances_map[aGenRegisterName] += 1;
371       SCRUTE(aGenRegisterName);
372       SCRUTE(_cntInstances_map[aGenRegisterName]);
373       //SCRUTE(servant->pd_refCount);
374       bool ret_studyId = servant->setStudyId(studyId);
375       ASSERT(ret_studyId);
376
377       // --- register the engine under the name
378       //     containerName(.dir)/instanceName(.object)
379
380       if( _numproc == 0 ){
381         _NS->Register( iobject , component_registerName.c_str() ) ;
382         MESSAGE( component_registerName.c_str() << " bound" ) ;
383       }
384       // Root recupere les ior des composants des autre process
385       BCastIOR(_orb,pobj,false);
386
387     }
388   catch (...)
389     {
390       INFOS( "Container_i::createInstance exception catched" ) ;
391     }
392   return iobject._retn();
393 }
394
395 // Load component
396 Engines::Component_ptr Engines_MPIContainer_i::load_impl(const char* nameToRegister,
397                                                  const char* componentName)
398 {
399   if( _numproc == 0 ){
400     // Invocation du chargement du composant dans les autres process
401     for(int ip= 1;ip<_nbproc;ip++)
402       (Engines::MPIContainer::_narrow((*_tior)[ip]))->Asload_impl(nameToRegister,
403                                                                 componentName);
404   }
405
406   return Lload_impl(nameToRegister,componentName);
407
408 }
409
410 // Load component
411 void Engines_MPIContainer_i::Asload_impl(const char* nameToRegister,
412                                          const char* componentName)
413 {
414   Lload_impl(nameToRegister,componentName);
415 }
416
417 Engines::Component_ptr Engines_MPIContainer_i::Lload_impl(
418                                    const char* nameToRegister,
419                                    const char* componentName)
420 {
421   Engines::Component_var iobject;
422   Engines::MPIObject_var pobj;
423   char cproc[4];
424
425   sprintf(cproc,"_%d",_numproc);
426
427   BEGIN_OF("[" << _numproc << "] MPIContainer_i::Lload_impl");
428
429   _numInstanceMutex.lock() ; // lock on the instance number
430   _numInstance++ ;
431   char _aNumI[12];
432   sprintf(_aNumI,"%d",_numInstance) ;
433
434   string _impl_name = componentName;
435   string _nameToRegister = nameToRegister;
436   string instanceName = _nameToRegister + "_inst_" + _aNumI + cproc;
437   MESSAGE("[" << _numproc << "] instanceName=" << instanceName);
438
439   string absolute_impl_name(_impl_name);
440   MESSAGE("[" << _numproc << "] absolute_impl_name=" << absolute_impl_name);
441   void * handle = dlopen(absolute_impl_name.c_str(), RTLD_LAZY);
442   if(!handle){
443     INFOS("[" << _numproc << "] Can't load shared library : " << absolute_impl_name);
444     INFOS("[" << _numproc << "] error dlopen: " << dlerror());
445     return Engines::Component::_nil() ;
446   }
447
448   string factory_name = _nameToRegister + string("Engine_factory");
449   MESSAGE("[" << _numproc << "] factory_name=" << factory_name) ;
450
451   PortableServer::ObjectId * (*MPIComponent_factory) (int,int,
452                                                   CORBA::ORB_ptr,
453                                                   PortableServer::POA_ptr,
454                                                   PortableServer::ObjectId *,
455                                                   const char *,
456                                                   const char *) =
457     (PortableServer::ObjectId * (*) (int,int,
458                                      CORBA::ORB_ptr,
459                                      PortableServer::POA_ptr, 
460                                      PortableServer::ObjectId *, 
461                                      const char *, 
462                                      const char *)) 
463     dlsym(handle, factory_name.c_str());
464
465   char *error ;
466   if ((error = dlerror()) != NULL){
467     // Try to load a sequential component
468     MESSAGE("[" << _numproc << "] Try to load a sequential component");
469     _numInstanceMutex.unlock() ;
470     iobject = Engines_Container_i::load_impl(nameToRegister,componentName);
471     if( CORBA::is_nil(iobject) ) return Engines::Component::_duplicate(iobject);
472   }
473   else{
474     // Instanciation du composant parallele
475     MESSAGE("[" << _numproc << "] Try to load a parallel component");
476     PortableServer::ObjectId * id = (MPIComponent_factory)
477       (_nbproc,_numproc,_orb, _poa, _id, instanceName.c_str(), _nameToRegister.c_str());
478     // get reference from id
479     CORBA::Object_var o = _poa->id_to_reference(*id);
480     pobj = Engines::MPIObject::_narrow(o) ;
481     iobject = Engines::Component::_narrow(o) ;
482   }
483
484   if( _numproc == 0 ){
485     // utiliser + tard le registry ici :
486     // register the engine under the name containerName.dir/nameToRegister.object
487     string component_registerName = _containerName + "/" + _nameToRegister;
488     _NS->Register(iobject, component_registerName.c_str()) ;
489   }
490
491   _numInstanceMutex.unlock() ;
492
493   // Root recupere les ior des composants des autre process
494   BCastIOR(_orb,pobj,false);
495
496   END_OF("[" <<_numproc << "] MPIContainer_i::Lload_impl");
497   return Engines::Component::_duplicate(iobject);
498
499 }
500
501 void Engines_MPIContainer_i::remove_impl(Engines::Component_ptr component_i)
502 {
503   int ip;
504   Engines::Component_ptr cptr;
505   Engines::MPIObject_ptr pcptr;
506   Engines::MPIObject_ptr spcptr;
507
508   ASSERT(! CORBA::is_nil(component_i));
509
510   if( _numproc == 0 ){
511     // Invocation de la destruction du composant dans les autres process
512     pcptr = (Engines::MPIObject_ptr)component_i;
513     for(ip= 1;ip<_nbproc;ip++){
514       spcptr = Engines::MPIObject::_narrow((*(pcptr->tior()))[ip]);
515       cptr = (Engines::Component_ptr)spcptr;
516       (Engines::MPIContainer::_narrow((*_tior)[ip]))->Asremove_impl(cptr);
517     }
518   }
519
520   Lremove_impl(component_i);
521 }
522
523 void Engines_MPIContainer_i::Asremove_impl(Engines::Component_ptr component_i)
524 {
525   Lremove_impl(component_i);
526 }
527
528 void Engines_MPIContainer_i::Lremove_impl(Engines::Component_ptr component_i)
529 {
530   BEGIN_OF("[" << _numproc << "] MPIContainer_i::Lremove_impl");
531
532   ASSERT(! CORBA::is_nil(component_i));
533
534   string instanceName = component_i->instanceName() ;
535   MESSAGE("[" << _numproc << "] unload component " << instanceName);
536   component_i->destroy() ;
537   MESSAGE("[" << _numproc << "] test key handle_map");
538   _numInstanceMutex.lock() ; // lock on the remove on handle_map
539   _numInstanceMutex.unlock() ;
540   MESSAGE("[" << _numproc << "] list handle_map");
541
542   END_OF("[" << _numproc << "] MPIContainer_i::Lremove_impl");
543
544 }
545
546 void Engines_MPIContainer_i::finalize_removal()
547 {
548   int ip;
549
550   if( _numproc == 0 ){
551     // Invocation de la destruction du composant dans les autres process
552     for(ip= 1;ip<_nbproc;ip++)
553       (Engines::MPIContainer::_narrow((*_tior)[ip]))->Asfinalize_removal();
554   }
555
556   Lfinalize_removal();
557 }
558
559 void Engines_MPIContainer_i::Asfinalize_removal()
560 {
561   Lfinalize_removal();
562 }
563
564 void Engines_MPIContainer_i::Lfinalize_removal()
565 {
566   BEGIN_OF("[" << _numproc << "] MPIContainer_i::Lfinalize_removal");
567
568 //   map<string, void *>::iterator im ;
569 //   // lock on the explore remove_map & dlclose
570 //   _numInstanceMutex.lock() ; 
571 //   for (im = remove_map.begin() ; im != remove_map.end() ; im ++)
572 //     {
573 //       void * handle = (*im).second ;
574 //       MESSAGE("[" << _numproc << "] dlclose " << (*im).first);
575 //       dlclose(handle) ;
576 //     }
577 //   MESSAGE("[" << _numproc << "] remove_map.clear()");
578 //   remove_map.clear() ;  
579 //   _numInstanceMutex.unlock() ;
580
581   END_OF("[" << _numproc << "] MPIContainer_i::Lfinalize_removal");
582 }