]> SALOME platform Git repositories - modules/kernel.git/blob - src/MPIContainer/MPIContainer_i.cxx
Salome HOME
Merge from V5_1_main 14/05/2010
[modules/kernel.git] / src / MPIContainer / MPIContainer_i.cxx
1 //  Copyright (C) 2007-2010  CEA/DEN, EDF R&D, OPEN CASCADE
2 //
3 //  Copyright (C) 2003-2007  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 //  CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
5 //
6 //  This library is free software; you can redistribute it and/or
7 //  modify it under the terms of the GNU Lesser General Public
8 //  License as published by the Free Software Foundation; either
9 //  version 2.1 of the License.
10 //
11 //  This library is distributed in the hope that it will be useful,
12 //  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 //  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 //  Lesser General Public License for more details.
15 //
16 //  You should have received a copy of the GNU Lesser General Public
17 //  License along with this library; if not, write to the Free Software
18 //  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
19 //
20 //  See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 //
22
23 //  SALOME MPIContainer : implemenation of container based on MPI libraries
24 //  File   : MPIContainer_i.cxx
25 //  Module : SALOME
26 //
27 #include <iostream>
28 #include <dlfcn.h>
29 #include <stdio.h>
30 #include "Basics_Utils.hxx"
31 #include "SALOME_Component_i.hxx"
32 #include "MPIContainer_i.hxx"
33 #include "SALOME_NamingService.hxx"
34 #include "Utils_SINGLETON.hxx"
35 #include "OpUtil.hxx"
36 #include "utilities.h"
37 #include <time.h>
38 #include <sys/time.h>
39 #include <pthread.h>  // must be before Python.h !
40 #include <Python.h>
41 #include "Container_init_python.hxx"
42
43 // L'appel au registry SALOME ne se fait que pour le process 0
44 Engines_MPIContainer_i::Engines_MPIContainer_i(int nbproc, int numproc,
45                                                CORBA::ORB_ptr orb, 
46                                                PortableServer::POA_ptr poa,
47                                                char * containerName,
48                                                int argc, char *argv[]) 
49   : Engines_Container_i(orb,poa,containerName,argc,argv,false), MPIObject_i(nbproc,numproc)
50 {
51
52   _id = _poa->activate_object(this);
53   CORBA::Object_var obj=_poa->id_to_reference(*_id);
54   Engines::Container_var pCont = Engines::Container::_narrow(obj);
55   _remove_ref();
56
57   if(numproc==0){
58
59     _NS = new SALOME_NamingService();
60     _NS->init_orb( CORBA::ORB::_duplicate(_orb) ) ;
61
62     std::string hostname = Kernel_Utils::GetHostname();
63     _containerName = _NS->BuildContainerNameForNS(containerName,hostname.c_str());
64     SCRUTE(_containerName);
65     _NS->Register(pCont, _containerName.c_str());
66
67   }
68
69   // Root recupere les ior des container des autre process
70   Engines::MPIObject_var pobj = POA_Engines::MPIContainer::_this();
71   BCastIOR(_orb,pobj,true);
72 }
73
74 Engines_MPIContainer_i::Engines_MPIContainer_i(int nbproc, int numproc) 
75   : Engines_Container_i(), MPIObject_i(nbproc,numproc)
76 {
77 }
78
79 Engines_MPIContainer_i::~Engines_MPIContainer_i(void)
80 {
81   MESSAGE("[" << _numproc << "] Engines_MPIContainer_i::~Engines_MPIContainer_i()");
82 }
83
84 // Load component
85 void Engines_MPIContainer_i::Shutdown()
86 {
87   int ip;
88   MESSAGE("[" << _numproc << "] shutdown of MPI Corba Server");
89   if( _numproc == 0 ){
90     _NS->Destroy_FullDirectory(_containerName.c_str());
91     for(ip= 1;ip<_nbproc;ip++)
92       (Engines::MPIContainer::_narrow((*_tior)[ip]))->Shutdown();
93   }
94
95   std::map<std::string, Engines::Component_var>::iterator itm;
96   for (itm = _listInstances_map.begin(); itm != _listInstances_map.end(); itm++)
97     {
98       try
99         {
100           itm->second->destroy();
101         }
102       catch(const CORBA::Exception& e)
103         {
104           // ignore this entry and continue
105         }
106       catch(...)
107         {
108           // ignore this entry and continue
109         }
110     }
111
112   _orb->shutdown(0);
113
114 }
115
116 // Load a component library
117 bool Engines_MPIContainer_i::load_component_Library(const char* componentName, CORBA::String_out reason)
118 {
119   reason=CORBA::string_dup("");
120
121   pthread_t *th;
122   if(_numproc == 0){
123     th = new pthread_t[_nbproc];
124     for(int ip=1;ip<_nbproc;ip++){
125       thread_st *st = new thread_st;
126       st->ip = ip;
127       st->tior = _tior;
128       st->compoName = componentName;
129       pthread_create(&(th[ip]),NULL,th_loadcomponentlibrary,(void*)st);
130     }
131   }
132
133   bool ret = Lload_component_Library(componentName);
134
135   if(_numproc == 0){
136     for(int ip=1;ip<_nbproc;ip++)
137       pthread_join(th[ip],NULL);
138     delete th;
139   }
140   return ret;
141 }
142
143 bool Engines_MPIContainer_i::Lload_component_Library(const char* componentName)
144 {
145   std::string aCompName = componentName;
146
147   // --- try dlopen C++ component
148
149   std::string impl_name = std::string ("lib") + aCompName + std::string("Engine.so");
150   
151   _numInstanceMutex.lock(); // lock to be alone 
152   // (see decInstanceCnt, finalize_removal))
153   if (_toRemove_map[impl_name]) _toRemove_map.erase(impl_name);
154   if (_library_map[impl_name])
155     {
156       MESSAGE("[" << _numproc << "] Library " << impl_name << " already loaded");
157       _numInstanceMutex.unlock();
158       return true;
159     }
160   
161   void* handle;
162   handle = dlopen( impl_name.c_str() , RTLD_LAZY ) ;
163   if ( handle )
164     {
165       _library_map[impl_name] = handle;
166       _numInstanceMutex.unlock();
167       MESSAGE("[" << _numproc << "] Library " << impl_name << " loaded");
168       return true;
169     }
170   else
171     {
172       MESSAGE("[" << _numproc << "] Can't load shared library : " << impl_name);
173       MESSAGE("[" << _numproc << "] error dlopen: " << dlerror());
174     }
175   _numInstanceMutex.unlock();
176
177   // --- try import Python component
178
179   INFOS("[" << _numproc << "] try import Python component "<<componentName);
180   if (_isSupervContainer)
181     {
182       INFOS("[" << _numproc << "] Supervision Container does not support Python Component Engines");
183       return false;
184     }
185   if (_library_map[aCompName])
186     {
187       return true; // Python Component, already imported
188     }
189   else
190     {
191       Py_ACQUIRE_NEW_THREAD;
192       PyObject *mainmod = PyImport_AddModule((char *)"__main__");
193       PyObject *globals = PyModule_GetDict(mainmod);
194       PyObject *pyCont = PyDict_GetItemString(globals, "pyCont");
195       PyObject *result = PyObject_CallMethod(pyCont,
196                                              (char*)"import_component",
197                                              (char*)"s",componentName);
198       std::string ret= PyString_AsString(result);
199       SCRUTE(ret);
200       Py_RELEASE_NEW_THREAD;
201   
202       if (ret=="") // import possible: Python component
203         {
204           _library_map[aCompName] = (void *)pyCont; // any non O value OK
205           MESSAGE("[" << _numproc << "] import Python: "<<aCompName<<" OK");
206           return true;
207         }
208     }
209   return false;
210 }
211
212 // Create an instance of component
213 Engines::Component_ptr
214 Engines_MPIContainer_i::create_component_instance_env( const char* componentName,
215                                                        CORBA::Long studyId,
216                                                        const Engines::FieldsDict& env,
217                                                        CORBA::String_out reason)
218 {
219   reason=CORBA::string_dup("");
220
221   pthread_t *th;
222   if(_numproc == 0){
223     th = new pthread_t[_nbproc];
224     for(int ip=1;ip<_nbproc;ip++){
225       thread_st *st = new thread_st;
226       st->ip = ip;
227       st->tior = _tior;
228       st->compoName = componentName;
229       st->studyId = studyId;
230       pthread_create(&(th[ip]),NULL,th_createcomponentinstance,(void*)st);
231     }
232   }
233
234   Engines::Component_ptr cptr = Lcreate_component_instance(componentName,studyId);
235
236   if(_numproc == 0){
237     for(int ip=1;ip<_nbproc;ip++)
238       pthread_join(th[ip],NULL);
239     delete th;
240   }
241
242   return cptr;
243 }
244
245 Engines::Component_ptr
246 Engines_MPIContainer_i::Lcreate_component_instance( const char* genericRegisterName, CORBA::Long studyId)
247 {
248   if (studyId < 0) {
249     INFOS("studyId must be > 0 for mono study instance, =0 for multiStudy");
250     return Engines::Component::_nil() ;
251   }
252
253   Engines::Component_var iobject = Engines::Component::_nil() ;
254   Engines::MPIObject_var pobj;
255
256   std::string aCompName = genericRegisterName;
257   if (_library_map[aCompName]) { // Python component
258     if (_isSupervContainer) {
259       INFOS("Supervision Container does not support Python Component Engines");
260       return Engines::Component::_nil();
261     }
262     _numInstanceMutex.lock() ; // lock on the instance number
263     _numInstance++ ;
264     int numInstance = _numInstance ;
265     _numInstanceMutex.unlock() ;
266
267     char aNumI[12];
268     sprintf( aNumI , "%d" , numInstance ) ;
269     std::string instanceName = aCompName + "_inst_" + aNumI ;
270     std::string component_registerName =
271       _containerName + "/" + instanceName;
272
273     Py_ACQUIRE_NEW_THREAD;
274     PyObject *mainmod = PyImport_AddModule((char*)"__main__");
275     PyObject *globals = PyModule_GetDict(mainmod);
276     PyObject *pyCont = PyDict_GetItemString(globals, "pyCont");
277     PyObject *result = PyObject_CallMethod(pyCont,
278                                            (char*)"create_component_instance",
279                                            (char*)"ssl",
280                                            aCompName.c_str(),
281                                            instanceName.c_str(),
282                                            studyId);
283     const char *ior;
284     const char *error;
285     PyArg_ParseTuple(result,"ss", &ior, &error);
286     std::string iors = ior;
287     SCRUTE(iors);
288     Py_RELEASE_NEW_THREAD;
289   
290     CORBA::Object_var obj = _orb->string_to_object(iors.c_str());
291     iobject = Engines::Component::_narrow( obj ) ;
292     pobj = Engines::MPIObject::_narrow(obj) ;
293     if( _numproc == 0 )
294       _NS->Register(iobject, component_registerName.c_str()) ;
295     // Root recupere les ior des composants des autre process
296     BCastIOR(_orb,pobj,false);
297
298     return iobject._retn();
299   }
300   
301   //--- try C++
302
303   std::string impl_name = std::string ("lib") + genericRegisterName +std::string("Engine.so");
304   if (_library_map.count(impl_name) != 0) // C++ component
305     {
306       void* handle = _library_map[impl_name];
307       iobject = createMPIInstance(genericRegisterName,
308                                     handle,
309                                     studyId);
310       return iobject._retn();
311     }
312
313   return Engines::Component::_nil() ;
314 }
315
316 Engines::Component_ptr
317 Engines_MPIContainer_i::createMPIInstance(std::string genericRegisterName,
318                                           void *handle,
319                                           int studyId)
320 {
321   Engines::Component_var iobject;
322   Engines::MPIObject_var pobj;
323   // --- find the factory
324
325   std::string aGenRegisterName = genericRegisterName;
326   std::string factory_name = aGenRegisterName + std::string("Engine_factory");
327
328   typedef  PortableServer::ObjectId * (*MPIFACTORY_FUNCTION)
329     (int,int,
330      CORBA::ORB_ptr,
331      PortableServer::POA_ptr, 
332      PortableServer::ObjectId *, 
333      const char *, 
334      const char *) ;
335
336   dlerror();
337   MPIFACTORY_FUNCTION MPIComponent_factory = (MPIFACTORY_FUNCTION) dlsym(handle, factory_name.c_str());
338
339   if ( !MPIComponent_factory )
340     {
341       INFOS( "[" << _numproc << "] Can't resolve symbol: " + factory_name );
342       SCRUTE( dlerror() );
343       pobj = Engines::MPIObject::_nil();
344       BCastIOR(_orb,pobj,false);
345       return Engines::Component::_nil();
346     }
347
348   // --- create instance
349
350   iobject = Engines::Component::_nil() ;
351
352   try
353     {
354       _numInstanceMutex.lock() ; // lock on the instance number
355       _numInstance++ ;
356       int numInstance = _numInstance ;
357       _numInstanceMutex.unlock() ;
358
359       char aNumI[12];
360       sprintf( aNumI , "%d" , numInstance ) ;
361       std::string instanceName = aGenRegisterName + "_inst_" + aNumI ;
362       std::string component_registerName =
363         _containerName + "/" + instanceName;
364
365       // --- Instanciate required CORBA object
366
367       PortableServer::ObjectId *id ; //not owner, do not delete (nore use var)
368       id = (MPIComponent_factory) ( _nbproc,_numproc,_orb, _poa, _id, instanceName.c_str(),
369                                  aGenRegisterName.c_str() ) ;
370
371       // --- get reference & servant from id
372
373       CORBA::Object_var obj = _poa->id_to_reference(*id);
374       iobject = Engines::Component::_narrow( obj ) ;
375       pobj = Engines::MPIObject::_narrow(obj) ;
376
377       Engines_Component_i *servant =
378         dynamic_cast<Engines_Component_i*>(_poa->reference_to_servant(iobject));
379       ASSERT(servant);
380       //SCRUTE(servant->pd_refCount);
381       servant->_remove_ref(); // compensate previous id_to_reference 
382       //SCRUTE(servant->pd_refCount);
383       _listInstances_map[instanceName] = iobject;
384       _cntInstances_map[aGenRegisterName] += 1;
385       //SCRUTE(servant->pd_refCount);
386       bool ret_studyId = servant->setStudyId(studyId);
387       ASSERT(ret_studyId);
388
389       // --- register the engine under the name
390       //     containerName(.dir)/instanceName(.object)
391
392       if( _numproc == 0 ){
393         _NS->Register( iobject , component_registerName.c_str() ) ;
394         MESSAGE( component_registerName.c_str() << " bound" ) ;
395       }
396       // Root recupere les ior des composants des autre process
397       BCastIOR(_orb,pobj,false);
398
399     }
400   catch(const std::exception &ex){
401     INFOS( ex.what() ) ;
402     return Engines::Component::_nil();
403   }
404   return iobject._retn();
405 }
406
407 // Load component
408 Engines::Component_ptr Engines_MPIContainer_i::load_impl(const char* nameToRegister,
409                                                  const char* componentName)
410 {
411   pthread_t *th;
412   if(_numproc == 0){
413     th = new pthread_t[_nbproc];
414     for(int ip=1;ip<_nbproc;ip++){
415       thread_st *st = new thread_st;
416       st->ip = ip;
417       st->tior = _tior;
418       st->nameToRegister = nameToRegister;
419       st->compoName = componentName;
420       pthread_create(&(th[ip]),NULL,th_loadimpl,(void*)st);
421     }
422   }
423
424   Engines::Component_ptr cptr =  Lload_impl(nameToRegister,componentName);
425
426   if(_numproc == 0){
427     for(int ip=1;ip<_nbproc;ip++)
428       pthread_join(th[ip],NULL);
429     delete th;
430   }
431
432   return cptr;
433 }
434
435 // Load component
436 Engines::Component_ptr Engines_MPIContainer_i::Lload_impl(
437                                    const char* nameToRegister,
438                                    const char* componentName)
439 {
440   Engines::Component_var iobject;
441   Engines::MPIObject_var pobj;
442   char cproc[4];
443
444   sprintf(cproc,"_%d",_numproc);
445
446   BEGIN_OF("[" << _numproc << "] MPIContainer_i::Lload_impl");
447
448   _numInstanceMutex.lock() ; // lock on the instance number
449   _numInstance++ ;
450   char _aNumI[12];
451   sprintf(_aNumI,"%d",_numInstance) ;
452
453   std::string _impl_name = componentName;
454   std::string _nameToRegister = nameToRegister;
455   std::string instanceName = _nameToRegister + "_inst_" + _aNumI + cproc;
456   MESSAGE("[" << _numproc << "] instanceName=" << instanceName);
457
458   std::string absolute_impl_name(_impl_name);
459   MESSAGE("[" << _numproc << "] absolute_impl_name=" << absolute_impl_name);
460   void * handle = dlopen(absolute_impl_name.c_str(), RTLD_LAZY);
461   if(!handle){
462     INFOS("[" << _numproc << "] Can't load shared library : " << absolute_impl_name);
463     INFOS("[" << _numproc << "] error dlopen: " << dlerror());
464     return Engines::Component::_nil() ;
465   }
466
467   std::string factory_name = _nameToRegister + std::string("Engine_factory");
468   MESSAGE("[" << _numproc << "] factory_name=" << factory_name) ;
469
470   dlerror();
471   PortableServer::ObjectId * (*MPIComponent_factory) (int,int,
472                                                   CORBA::ORB_ptr,
473                                                   PortableServer::POA_ptr,
474                                                   PortableServer::ObjectId *,
475                                                   const char *,
476                                                   const char *) =
477     (PortableServer::ObjectId * (*) (int,int,
478                                      CORBA::ORB_ptr,
479                                      PortableServer::POA_ptr, 
480                                      PortableServer::ObjectId *, 
481                                      const char *, 
482                                      const char *)) 
483     dlsym(handle, factory_name.c_str());
484
485   char *error ;
486   if ((error = dlerror()) != NULL){
487     // Try to load a sequential component
488     MESSAGE("[" << _numproc << "] Try to load a sequential component");
489     _numInstanceMutex.unlock() ;
490     iobject = Engines_Container_i::load_impl(nameToRegister,componentName);
491     if( CORBA::is_nil(iobject) ) return Engines::Component::_duplicate(iobject);
492   }
493   else{
494     // Instanciation du composant parallele
495     MESSAGE("[" << _numproc << "] Try to load a parallel component");
496     PortableServer::ObjectId * id = (MPIComponent_factory)
497       (_nbproc,_numproc,_orb, _poa, _id, instanceName.c_str(), _nameToRegister.c_str());
498     // get reference from id
499     CORBA::Object_var o = _poa->id_to_reference(*id);
500     pobj = Engines::MPIObject::_narrow(o) ;
501     iobject = Engines::Component::_narrow(o) ;
502   }
503
504   if( _numproc == 0 ){
505     // utiliser + tard le registry ici :
506     // register the engine under the name containerName.dir/nameToRegister.object
507     std::string component_registerName = _containerName + "/" + _nameToRegister;
508     _NS->Register(iobject, component_registerName.c_str()) ;
509   }
510
511   _numInstanceMutex.unlock() ;
512
513   // Root recupere les ior des composants des autre process
514   BCastIOR(_orb,pobj,false);
515
516   END_OF("[" <<_numproc << "] MPIContainer_i::Lload_impl");
517   return Engines::Component::_duplicate(iobject);
518
519 }
520
521 void Engines_MPIContainer_i::remove_impl(Engines::Component_ptr component_i)
522 {
523   Engines::MPIObject_ptr pcptr;
524   Engines::MPIObject_ptr spcptr;
525
526   pthread_t *th;
527   if(_numproc == 0){
528     pcptr = (Engines::MPIObject_ptr)component_i;
529     th = new pthread_t[_nbproc];
530     for(int ip=1;ip<_nbproc;ip++){
531       thread_st *st = new thread_st;
532       st->ip = ip;
533       st->tior = _tior;
534       spcptr = Engines::MPIObject::_narrow((*(pcptr->tior()))[ip]);
535       st->cptr = (Engines::Component_ptr)spcptr;
536       pthread_create(&(th[ip]),NULL,th_removeimpl,(void*)st);
537     }
538   }
539
540   ASSERT(! CORBA::is_nil(component_i));
541   std::string instanceName = component_i->instanceName() ;
542   MESSAGE("[" << _numproc << "] unload component " << instanceName);
543   _numInstanceMutex.lock() ; // lock on the remove on handle_map
544   _listInstances_map.erase(instanceName);
545   _numInstanceMutex.unlock() ;
546   component_i->destroy() ;
547   if(_numproc == 0)
548     _NS->Destroy_Name(instanceName.c_str());
549
550   if(_numproc == 0){
551     for(int ip=1;ip<_nbproc;ip++)
552       pthread_join(th[ip],NULL);
553     delete th;
554   }
555
556 }
557
558 void Engines_MPIContainer_i::finalize_removal()
559 {
560   pthread_t *th;
561   if(_numproc == 0){
562     th = new pthread_t[_nbproc];
563     for(int ip=1;ip<_nbproc;ip++){
564       thread_st *st = new thread_st;
565       st->ip = ip;
566       st->tior = _tior;
567       pthread_create(&(th[ip]),NULL,th_finalizeremoval,(void*)st);
568     }
569   }
570
571   _numInstanceMutex.lock(); // lock to be alone
572   // (see decInstanceCnt, load_component_Library)
573   std::map<std::string, void *>::iterator ith;
574   for (ith = _toRemove_map.begin(); ith != _toRemove_map.end(); ith++)
575   {
576     void *handle = (*ith).second;
577     std::string impl_name= (*ith).first;
578     if (handle)
579     {
580       SCRUTE(handle);
581       SCRUTE(impl_name);
582       //        dlclose(handle);                // SALOME unstable after ...
583       //        _library_map.erase(impl_name);
584     }
585   }
586   _toRemove_map.clear();
587   _numInstanceMutex.unlock();
588
589   if(_numproc == 0){
590     for(int ip=1;ip<_nbproc;ip++)
591       pthread_join(th[ip],NULL);
592     delete th;
593   }
594 }
595
596 void *th_loadcomponentlibrary(void *s)
597 {
598   thread_st *st = (thread_st*)s;
599   char* reason;
600   (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->load_component_Library(st->compoName.c_str(),reason);
601   CORBA::string_free(reason);
602   return NULL;
603 }
604
605 void *th_createcomponentinstance(void *s)
606 {
607   thread_st *st = (thread_st*)s;
608   (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->create_component_instance(st->compoName.c_str(),st->studyId);
609   return NULL;
610 }
611
612 void *th_loadimpl(void *s)
613 {
614   thread_st *st = (thread_st*)s;
615   (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->load_impl(st->nameToRegister.c_str(),st->compoName.c_str());
616   return NULL;
617 }
618
619 void *th_removeimpl(void *s)
620 {
621   thread_st *st = (thread_st*)s;
622   (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->remove_impl(st->cptr);
623   return NULL;
624 }
625
626 void *th_finalizeremoval(void *s)
627 {
628   thread_st *st = (thread_st*)s;
629   (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->finalize_removal();
630   return NULL;
631 }