Salome HOME
First stable version after merging with V3_2_2
[modules/kernel.git] / src / MPIContainer / MPIContainer_i.cxx
1 //  SALOME MPIContainer : implemenation of container based on MPI libraries
2 //
3 //  Copyright (C) 2003  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 //  CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS 
5 // 
6 //  This library is free software; you can redistribute it and/or 
7 //  modify it under the terms of the GNU Lesser General Public 
8 //  License as published by the Free Software Foundation; either 
9 //  version 2.1 of the License. 
10 // 
11 //  This library is distributed in the hope that it will be useful, 
12 //  but WITHOUT ANY WARRANTY; without even the implied warranty of 
13 //  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
14 //  Lesser General Public License for more details. 
15 // 
16 //  You should have received a copy of the GNU Lesser General Public 
17 //  License along with this library; if not, write to the Free Software 
18 //  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA 
19 // 
20 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 //
22 //
23 //
24 //  File   : MPIContainer_i.cxx
25 //  Module : SALOME
26
27 #include <iostream.h>
28 #include <dlfcn.h>
29 #include <stdio.h>
30 #include "SALOME_Component_i.hxx"
31 #include "MPIContainer_i.hxx"
32 #include "SALOME_NamingService.hxx"
33 #include "Utils_SINGLETON.hxx"
34 #include "OpUtil.hxx"
35 #include "utilities.h"
36 #include <time.h>
37 #include <sys/time.h>
38 #include <pthread.h>  // must be before Python.h !
39 #include <Python.h>
40 #include "Container_init_python.hxx"
41 using namespace std;
42
43 // L'appel au registry SALOME ne se fait que pour le process 0
44 Engines_MPIContainer_i::Engines_MPIContainer_i(int nbproc, int numproc,
45                                                CORBA::ORB_ptr orb, 
46                                                PortableServer::POA_ptr poa,
47                                                char * containerName,
48                                                int argc, char *argv[]) 
49   : Engines_Container_i(orb,poa,containerName,argc,argv,false), MPIObject_i(nbproc,numproc)
50 {
51   long id=0;
52   string IdContainerinNS;
53   char idc[3*sizeof(long)];
54
55   MESSAGE("[" << numproc << "] activate object");
56   _id = _poa->activate_object(this);
57
58   if(argc>1)
59     {
60       for(int i=0;i<argc;i++)
61         {
62           if(strcmp(argv[i],"-id")==NULL)
63             {
64               id = atoi(argv[i+1]);
65               continue;
66             }
67         }
68     }
69   SCRUTE(id);
70
71   if(numproc==0){
72
73     _NS = new SALOME_NamingService();
74     _NS->init_orb( CORBA::ORB::_duplicate(_orb) ) ;
75
76     CORBA::Object_var obj=_poa->id_to_reference(*_id);
77     Engines::Container_var pCont = Engines::Container::_narrow(obj);
78
79     string hostname = GetHostname();
80     _containerName = _NS->BuildContainerNameForNS(containerName,hostname.c_str());
81     SCRUTE(_containerName);
82     _NS->Register(pCont, _containerName.c_str());
83
84     // A parallel container registers in Naming Service
85     // on the machine where is process 0. ContainerManager does'nt know the name
86     // of this machine before the launch of the parallel container. So to get
87     // the IOR of the parallel container in Naming Service, ContainerManager
88     // gives a unique Id. The parallel container registers his name under
89     // /ContainerManager/Id directory in NamingService
90
91     IdContainerinNS = "/ContainerManager/id";
92     sprintf(idc,"%ld",id);
93     IdContainerinNS += idc;
94     SCRUTE(IdContainerinNS);
95     _NS->Register(pCont, IdContainerinNS.c_str());
96
97   }
98
99   // Root recupere les ior des container des autre process
100   Engines::MPIObject_var pobj = POA_Engines::MPIContainer::_this();
101   BCastIOR(_orb,pobj,true);
102 }
103
104 Engines_MPIContainer_i::Engines_MPIContainer_i(int nbproc, int numproc) 
105   : Engines_Container_i(), MPIObject_i(nbproc,numproc)
106 {
107 }
108
109 Engines_MPIContainer_i::~Engines_MPIContainer_i(void)
110 {
111   MESSAGE("[" << _numproc << "] Engines_MPIContainer_i::~Engines_MPIContainer_i()");
112 }
113
114 // Load component
115 void Engines_MPIContainer_i::Shutdown()
116 {
117   int ip;
118   MESSAGE("[" << _numproc << "] shutdown of MPI Corba Server");
119   if( _numproc == 0 ){
120     _NS->Destroy_FullDirectory(_containerName.c_str());
121     for(ip= 1;ip<_nbproc;ip++)
122       (Engines::MPIContainer::_narrow((*_tior)[ip]))->Shutdown();
123   }
124   _orb->shutdown(0);
125
126 }
127
128 // Load a component library
129 bool Engines_MPIContainer_i::load_component_Library(const char* componentName)
130 {
131   if( _numproc == 0 ){
132     // Invocation du chargement du composant dans les autres process
133     for(int ip= 1;ip<_nbproc;ip++)
134       (Engines::MPIContainer::_narrow((*_tior)[ip]))->Asload_component_Library(componentName);
135   }
136
137   return Lload_component_Library(componentName);
138 }
139
140 void Engines_MPIContainer_i::Asload_component_Library(const char* componentName)
141 {
142   Lload_component_Library(componentName);
143 }
144
145 bool Engines_MPIContainer_i::Lload_component_Library(const char* componentName)
146 {
147   string aCompName = componentName;
148
149   // --- try dlopen C++ component
150
151   string impl_name = string ("lib") + aCompName + string("Engine.so");
152   SCRUTE(impl_name);
153   
154   _numInstanceMutex.lock(); // lock to be alone 
155   // (see decInstanceCnt, finalize_removal))
156   if (_toRemove_map[impl_name]) _toRemove_map.erase(impl_name);
157   if (_library_map[impl_name])
158     {
159       MESSAGE("[" << _numproc << "] Library " << impl_name << " already loaded");
160       _numInstanceMutex.unlock();
161       return true;
162     }
163   
164   void* handle;
165   handle = dlopen( impl_name.c_str() , RTLD_LAZY ) ;
166   if ( handle )
167     {
168       _library_map[impl_name] = handle;
169       _numInstanceMutex.unlock();
170       return true;
171     }
172   else
173     {
174       INFOS("[" << _numproc << "] Can't load shared library : " << impl_name);
175       INFOS("[" << _numproc << "] error dlopen: " << dlerror());
176     }
177   _numInstanceMutex.unlock();
178
179   // --- try import Python component
180
181   INFOS("[" << _numproc << "] try import Python component "<<componentName);
182   if (_isSupervContainer)
183     {
184       INFOS("[" << _numproc << "] Supervision Container does not support Python Component Engines");
185       return false;
186     }
187   if (_library_map[aCompName])
188     {
189       return true; // Python Component, already imported
190     }
191   else
192     {
193       Py_ACQUIRE_NEW_THREAD;
194       PyObject *mainmod = PyImport_AddModule("__main__");
195       PyObject *globals = PyModule_GetDict(mainmod);
196       PyObject *pyCont = PyDict_GetItemString(globals, "pyCont");
197       PyObject *result = PyObject_CallMethod(pyCont,
198                                              "import_component",
199                                              "s",componentName);
200       int ret= PyInt_AsLong(result);
201       SCRUTE(ret);
202       Py_RELEASE_NEW_THREAD;
203   
204       if (ret) // import possible: Python component
205         {
206           _library_map[aCompName] = (void *)pyCont; // any non O value OK
207           MESSAGE("[" << _numproc << "] import Python: "<<aCompName<<" OK");
208           return true;
209         }
210     }
211   return false;
212 }
213
214 // Create an instance of component
215 Engines::Component_ptr
216 Engines_MPIContainer_i::create_component_instance( const char* componentName,
217                                                    CORBA::Long studyId)
218 {
219   if( _numproc == 0 ){
220     // Invocation du chargement du composant dans les autres process
221     for(int ip= 1;ip<_nbproc;ip++)
222       (Engines::MPIContainer::_narrow((*_tior)[ip]))->Ascreate_component_instance(componentName,studyId);
223   }
224
225   return Lcreate_component_instance(componentName,studyId);
226 }
227
228 void Engines_MPIContainer_i::Ascreate_component_instance( const char* componentName,
229                                                           CORBA::Long studyId)
230 {
231   Lcreate_component_instance(componentName,studyId);
232 }
233
234 Engines::Component_ptr
235 Engines_MPIContainer_i::Lcreate_component_instance( const char* genericRegisterName, CORBA::Long studyId)
236 {
237   if (studyId < 0) {
238     INFOS("studyId must be > 0 for mono study instance, =0 for multiStudy");
239     return Engines::Component::_nil() ;
240   }
241
242   Engines::Component_var iobject = Engines::Component::_nil() ;
243   Engines::MPIObject_var pobj;
244
245   string aCompName = genericRegisterName;
246   if (_library_map[aCompName]) { // Python component
247     if (_isSupervContainer) {
248       INFOS("Supervision Container does not support Python Component Engines");
249       return Engines::Component::_nil();
250     }
251     _numInstanceMutex.lock() ; // lock on the instance number
252     _numInstance++ ;
253     int numInstance = _numInstance ;
254     _numInstanceMutex.unlock() ;
255
256     char aNumI[12];
257     sprintf( aNumI , "%d" , numInstance ) ;
258     string instanceName = aCompName + "_inst_" + aNumI ;
259     string component_registerName =
260       _containerName + "/" + instanceName;
261
262     Py_ACQUIRE_NEW_THREAD;
263     PyObject *mainmod = PyImport_AddModule("__main__");
264     PyObject *globals = PyModule_GetDict(mainmod);
265     PyObject *pyCont = PyDict_GetItemString(globals, "pyCont");
266     PyObject *result = PyObject_CallMethod(pyCont,
267                                            "create_component_instance",
268                                            "ssl",
269                                            aCompName.c_str(),
270                                            instanceName.c_str(),
271                                            studyId);
272     string iors = PyString_AsString(result);
273     SCRUTE(iors);
274     Py_RELEASE_NEW_THREAD;
275   
276     CORBA::Object_var obj = _orb->string_to_object(iors.c_str());
277     iobject = Engines::Component::_narrow( obj ) ;
278     pobj = Engines::MPIObject::_narrow(obj) ;
279     if( _numproc == 0 )
280       _NS->Register(iobject, component_registerName.c_str()) ;
281     // Root recupere les ior des composants des autre process
282     BCastIOR(_orb,pobj,false);
283
284     return iobject._retn();
285   }
286   
287   //--- try C++
288
289   string impl_name = string ("lib") + genericRegisterName +string("Engine.so");
290   void* handle = _library_map[impl_name];
291   if ( !handle ) {
292     INFOS("shared library " << impl_name <<"must be loaded before instance");
293     return Engines::Component::_nil() ;
294   }
295   else {
296     iobject = createMPIInstance(genericRegisterName,
297                                 handle,
298                                 studyId);
299     return iobject._retn();
300   }
301 }
302
303 Engines::Component_ptr
304 Engines_MPIContainer_i::createMPIInstance(string genericRegisterName,
305                                           void *handle,
306                                           int studyId)
307 {
308   Engines::Component_var iobject;
309   Engines::MPIObject_var pobj;
310   // --- find the factory
311
312   string aGenRegisterName = genericRegisterName;
313   string factory_name = aGenRegisterName + string("Engine_factory");
314   SCRUTE(factory_name) ;
315
316   typedef  PortableServer::ObjectId * (*MPIFACTORY_FUNCTION)
317     (int,int,
318      CORBA::ORB_ptr,
319      PortableServer::POA_ptr, 
320      PortableServer::ObjectId *, 
321      const char *, 
322      const char *) ;
323
324   MPIFACTORY_FUNCTION MPIComponent_factory
325     = (MPIFACTORY_FUNCTION) dlsym(handle, factory_name.c_str());
326
327   char *error ;
328   if ( (error = dlerror() ) != NULL) {
329     // Try to load a sequential component
330     MESSAGE("[" << _numproc << "] Try to load a sequential component");
331     _numInstanceMutex.unlock() ;
332     iobject = Engines_Container_i::createInstance(genericRegisterName,handle,studyId);
333     if( CORBA::is_nil(iobject) ) return Engines::Component::_duplicate(iobject);
334   }
335
336   // --- create instance
337
338   iobject = Engines::Component::_nil() ;
339
340   try
341     {
342       _numInstanceMutex.lock() ; // lock on the instance number
343       _numInstance++ ;
344       int numInstance = _numInstance ;
345       _numInstanceMutex.unlock() ;
346
347       char aNumI[12];
348       sprintf( aNumI , "%d" , numInstance ) ;
349       string instanceName = aGenRegisterName + "_inst_" + aNumI ;
350       string component_registerName =
351         _containerName + "/" + instanceName;
352
353       // --- Instanciate required CORBA object
354
355       PortableServer::ObjectId *id ; //not owner, do not delete (nore use var)
356       id = (MPIComponent_factory) ( _nbproc,_numproc,_orb, _poa, _id, instanceName.c_str(),
357                                  aGenRegisterName.c_str() ) ;
358
359       // --- get reference & servant from id
360
361       CORBA::Object_var obj = _poa->id_to_reference(*id);
362       iobject = Engines::Component::_narrow( obj ) ;
363       pobj = Engines::MPIObject::_narrow(obj) ;
364
365       Engines_Component_i *servant =
366         dynamic_cast<Engines_Component_i*>(_poa->reference_to_servant(iobject));
367       ASSERT(servant);
368       //SCRUTE(servant->pd_refCount);
369       servant->_remove_ref(); // compensate previous id_to_reference 
370       //SCRUTE(servant->pd_refCount);
371       _listInstances_map[instanceName] = iobject;
372       _cntInstances_map[aGenRegisterName] += 1;
373       SCRUTE(aGenRegisterName);
374       SCRUTE(_cntInstances_map[aGenRegisterName]);
375       //SCRUTE(servant->pd_refCount);
376       bool ret_studyId = servant->setStudyId(studyId);
377       ASSERT(ret_studyId);
378
379       // --- register the engine under the name
380       //     containerName(.dir)/instanceName(.object)
381
382       if( _numproc == 0 ){
383         _NS->Register( iobject , component_registerName.c_str() ) ;
384         MESSAGE( component_registerName.c_str() << " bound" ) ;
385       }
386       // Root recupere les ior des composants des autre process
387       BCastIOR(_orb,pobj,false);
388
389     }
390   catch (...)
391     {
392       INFOS( "Container_i::createInstance exception catched" ) ;
393     }
394   return iobject._retn();
395 }
396
397 // Load component
398 Engines::Component_ptr Engines_MPIContainer_i::load_impl(const char* nameToRegister,
399                                                  const char* componentName)
400 {
401   if( _numproc == 0 ){
402     // Invocation du chargement du composant dans les autres process
403     for(int ip= 1;ip<_nbproc;ip++)
404       (Engines::MPIContainer::_narrow((*_tior)[ip]))->Asload_impl(nameToRegister,
405                                                                 componentName);
406   }
407
408   return Lload_impl(nameToRegister,componentName);
409
410 }
411
412 // Load component
413 void Engines_MPIContainer_i::Asload_impl(const char* nameToRegister,
414                                          const char* componentName)
415 {
416   Lload_impl(nameToRegister,componentName);
417 }
418
419 Engines::Component_ptr Engines_MPIContainer_i::Lload_impl(
420                                    const char* nameToRegister,
421                                    const char* componentName)
422 {
423   Engines::Component_var iobject;
424   Engines::MPIObject_var pobj;
425   char cproc[4];
426
427   sprintf(cproc,"_%d",_numproc);
428
429   BEGIN_OF("[" << _numproc << "] MPIContainer_i::Lload_impl");
430
431   _numInstanceMutex.lock() ; // lock on the instance number
432   _numInstance++ ;
433   char _aNumI[12];
434   sprintf(_aNumI,"%d",_numInstance) ;
435
436   string _impl_name = componentName;
437   string _nameToRegister = nameToRegister;
438   string instanceName = _nameToRegister + "_inst_" + _aNumI + cproc;
439   MESSAGE("[" << _numproc << "] instanceName=" << instanceName);
440
441   string absolute_impl_name(_impl_name);
442   MESSAGE("[" << _numproc << "] absolute_impl_name=" << absolute_impl_name);
443   void * handle = dlopen(absolute_impl_name.c_str(), RTLD_LAZY);
444   if(!handle){
445     INFOS("[" << _numproc << "] Can't load shared library : " << absolute_impl_name);
446     INFOS("[" << _numproc << "] error dlopen: " << dlerror());
447     return Engines::Component::_nil() ;
448   }
449
450   string factory_name = _nameToRegister + string("Engine_factory");
451   MESSAGE("[" << _numproc << "] factory_name=" << factory_name) ;
452
453   PortableServer::ObjectId * (*MPIComponent_factory) (int,int,
454                                                   CORBA::ORB_ptr,
455                                                   PortableServer::POA_ptr,
456                                                   PortableServer::ObjectId *,
457                                                   const char *,
458                                                   const char *) =
459     (PortableServer::ObjectId * (*) (int,int,
460                                      CORBA::ORB_ptr,
461                                      PortableServer::POA_ptr, 
462                                      PortableServer::ObjectId *, 
463                                      const char *, 
464                                      const char *)) 
465     dlsym(handle, factory_name.c_str());
466
467   char *error ;
468   if ((error = dlerror()) != NULL){
469     // Try to load a sequential component
470     MESSAGE("[" << _numproc << "] Try to load a sequential component");
471     _numInstanceMutex.unlock() ;
472     iobject = Engines_Container_i::load_impl(nameToRegister,componentName);
473     if( CORBA::is_nil(iobject) ) return Engines::Component::_duplicate(iobject);
474   }
475   else{
476     // Instanciation du composant parallele
477     MESSAGE("[" << _numproc << "] Try to load a parallel component");
478     PortableServer::ObjectId * id = (MPIComponent_factory)
479       (_nbproc,_numproc,_orb, _poa, _id, instanceName.c_str(), _nameToRegister.c_str());
480     // get reference from id
481     CORBA::Object_var o = _poa->id_to_reference(*id);
482     pobj = Engines::MPIObject::_narrow(o) ;
483     iobject = Engines::Component::_narrow(o) ;
484   }
485
486   if( _numproc == 0 ){
487     // utiliser + tard le registry ici :
488     // register the engine under the name containerName.dir/nameToRegister.object
489     string component_registerName = _containerName + "/" + _nameToRegister;
490     _NS->Register(iobject, component_registerName.c_str()) ;
491   }
492
493   _numInstanceMutex.unlock() ;
494
495   // Root recupere les ior des composants des autre process
496   BCastIOR(_orb,pobj,false);
497
498   END_OF("[" <<_numproc << "] MPIContainer_i::Lload_impl");
499   return Engines::Component::_duplicate(iobject);
500
501 }
502
503 void Engines_MPIContainer_i::remove_impl(Engines::Component_ptr component_i)
504 {
505   int ip;
506   Engines::Component_ptr cptr;
507   Engines::MPIObject_ptr pcptr;
508   Engines::MPIObject_ptr spcptr;
509
510   ASSERT(! CORBA::is_nil(component_i));
511
512   if( _numproc == 0 ){
513     // Invocation de la destruction du composant dans les autres process
514     pcptr = (Engines::MPIObject_ptr)component_i;
515     for(ip= 1;ip<_nbproc;ip++){
516       spcptr = Engines::MPIObject::_narrow((*(pcptr->tior()))[ip]);
517       cptr = (Engines::Component_ptr)spcptr;
518       (Engines::MPIContainer::_narrow((*_tior)[ip]))->Asremove_impl(cptr);
519     }
520   }
521
522   Lremove_impl(component_i);
523 }
524
525 void Engines_MPIContainer_i::Asremove_impl(Engines::Component_ptr component_i)
526 {
527   Lremove_impl(component_i);
528 }
529
530 void Engines_MPIContainer_i::Lremove_impl(Engines::Component_ptr component_i)
531 {
532   BEGIN_OF("[" << _numproc << "] MPIContainer_i::Lremove_impl");
533
534   ASSERT(! CORBA::is_nil(component_i));
535
536   string instanceName = component_i->instanceName() ;
537   MESSAGE("[" << _numproc << "] unload component " << instanceName);
538   component_i->destroy() ;
539   MESSAGE("[" << _numproc << "] test key handle_map");
540   _numInstanceMutex.lock() ; // lock on the remove on handle_map
541   _numInstanceMutex.unlock() ;
542   MESSAGE("[" << _numproc << "] list handle_map");
543
544   END_OF("[" << _numproc << "] MPIContainer_i::Lremove_impl");
545
546 }
547
548 void Engines_MPIContainer_i::finalize_removal()
549 {
550   int ip;
551
552   if( _numproc == 0 ){
553     // Invocation de la destruction du composant dans les autres process
554     for(ip= 1;ip<_nbproc;ip++)
555       (Engines::MPIContainer::_narrow((*_tior)[ip]))->Asfinalize_removal();
556   }
557
558   Lfinalize_removal();
559 }
560
561 void Engines_MPIContainer_i::Asfinalize_removal()
562 {
563   Lfinalize_removal();
564 }
565
566 void Engines_MPIContainer_i::Lfinalize_removal()
567 {
568   BEGIN_OF("[" << _numproc << "] MPIContainer_i::Lfinalize_removal");
569
570 //   map<string, void *>::iterator im ;
571 //   // lock on the explore remove_map & dlclose
572 //   _numInstanceMutex.lock() ; 
573 //   for (im = remove_map.begin() ; im != remove_map.end() ; im ++)
574 //     {
575 //       void * handle = (*im).second ;
576 //       MESSAGE("[" << _numproc << "] dlclose " << (*im).first);
577 //       dlclose(handle) ;
578 //     }
579 //   MESSAGE("[" << _numproc << "] remove_map.clear()");
580 //   remove_map.clear() ;  
581 //   _numInstanceMutex.unlock() ;
582
583   END_OF("[" << _numproc << "] MPIContainer_i::Lfinalize_removal");
584 }