Salome HOME
CCAR: import_hook.py was too strict in ensure_list (ImportError raised)
[modules/kernel.git] / src / MPIContainer / MPIContainer_i.cxx
1 //  Copyright (C) 2007-2010  CEA/DEN, EDF R&D, OPEN CASCADE
2 //
3 //  Copyright (C) 2003-2007  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 //  CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
5 //
6 //  This library is free software; you can redistribute it and/or
7 //  modify it under the terms of the GNU Lesser General Public
8 //  License as published by the Free Software Foundation; either
9 //  version 2.1 of the License.
10 //
11 //  This library is distributed in the hope that it will be useful,
12 //  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 //  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 //  Lesser General Public License for more details.
15 //
16 //  You should have received a copy of the GNU Lesser General Public
17 //  License along with this library; if not, write to the Free Software
18 //  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
19 //
20 //  See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 //
22
23 //  SALOME MPIContainer : implemenation of container based on MPI libraries
24 //  File   : MPIContainer_i.cxx
25 //  Module : SALOME
26 //
27 #include <iostream>
28 #include <dlfcn.h>
29 #include <stdio.h>
30 #include "Basics_Utils.hxx"
31 #include "SALOME_Component_i.hxx"
32 #include "MPIContainer_i.hxx"
33 #include "SALOME_NamingService.hxx"
34 #include "Utils_SINGLETON.hxx"
35 #include "OpUtil.hxx"
36 #include "utilities.h"
37 #include <time.h>
38 #include <sys/time.h>
39 #include <pthread.h>  // must be before Python.h !
40 #include <Python.h>
41 #include "Container_init_python.hxx"
42
43 // L'appel au registry SALOME ne se fait que pour le process 0
44 Engines_MPIContainer_i::Engines_MPIContainer_i(CORBA::ORB_ptr orb, 
45                                                PortableServer::POA_ptr poa,
46                                                char * containerName,
47                                                int argc, char *argv[]) 
48   : Engines_Container_i(orb,poa,containerName,argc,argv,false)
49 {
50
51   _id = _poa->activate_object(this);
52   CORBA::Object_var obj=_poa->id_to_reference(*_id);
53   Engines::Container_var pCont = Engines::Container::_narrow(obj);
54   _remove_ref();
55
56   if(_numproc==0){
57
58     _NS = new SALOME_NamingService();
59     _NS->init_orb( CORBA::ORB::_duplicate(_orb) ) ;
60
61     std::string hostname = Kernel_Utils::GetHostname();
62     _containerName = _NS->BuildContainerNameForNS(containerName,hostname.c_str());
63     SCRUTE(_containerName);
64     _NS->Register(pCont, _containerName.c_str());
65
66   }
67
68   // Root recupere les ior des container des autre process
69   Engines::MPIObject_var pobj = POA_Engines::MPIContainer::_this();
70   BCastIOR(_orb,pobj,true);
71 }
72
73 Engines_MPIContainer_i::Engines_MPIContainer_i() 
74   : Engines_Container_i()
75 {
76 }
77
78 Engines_MPIContainer_i::~Engines_MPIContainer_i(void)
79 {
80   MESSAGE("[" << _numproc << "] Engines_MPIContainer_i::~Engines_MPIContainer_i()");
81 }
82
83 // Load component
84 void Engines_MPIContainer_i::Shutdown()
85 {
86   int ip;
87   MESSAGE("[" << _numproc << "] shutdown of MPI Corba Server");
88   if( _numproc == 0 ){
89     _NS->Destroy_FullDirectory(_containerName.c_str());
90     _NS->Destroy_Name(_containerName.c_str());
91     for(ip= 1;ip<_nbproc;ip++)
92       (Engines::MPIContainer::_narrow((*_tior)[ip]))->Shutdown();
93   }
94
95   std::map<std::string, Engines::Component_var>::iterator itm;
96   for (itm = _listInstances_map.begin(); itm != _listInstances_map.end(); itm++)
97     {
98       try
99         {
100           itm->second->destroy();
101         }
102       catch(const CORBA::Exception& e)
103         {
104           // ignore this entry and continue
105         }
106       catch(...)
107         {
108           // ignore this entry and continue
109         }
110     }
111
112   _orb->shutdown(0);
113
114 }
115
116 // Load a component library
117 bool Engines_MPIContainer_i::load_component_Library(const char* componentName, CORBA::String_out reason)
118 {
119   reason=CORBA::string_dup("");
120
121   pthread_t *th;
122   if(_numproc == 0){
123     th = new pthread_t[_nbproc];
124     for(int ip=1;ip<_nbproc;ip++){
125       thread_st *st = new thread_st;
126       st->ip = ip;
127       st->tior = _tior;
128       st->compoName = componentName;
129       pthread_create(&(th[ip]),NULL,th_loadcomponentlibrary,(void*)st);
130     }
131   }
132
133   bool ret = Lload_component_Library(componentName);
134
135   if(_numproc == 0){
136     for(int ip=1;ip<_nbproc;ip++)
137       pthread_join(th[ip],NULL);
138     delete th;
139   }
140   return ret;
141 }
142
143 bool Engines_MPIContainer_i::Lload_component_Library(const char* componentName)
144 {
145   std::string aCompName = componentName;
146
147   // --- try dlopen C++ component
148
149   std::string impl_name = std::string ("lib") + aCompName + std::string("Engine.so");
150   
151   _numInstanceMutex.lock(); // lock to be alone 
152   // (see decInstanceCnt, finalize_removal))
153   if (_toRemove_map[impl_name]) _toRemove_map.erase(impl_name);
154   if (_library_map[impl_name])
155     {
156       MESSAGE("[" << _numproc << "] Library " << impl_name << " already loaded");
157       _numInstanceMutex.unlock();
158       return true;
159     }
160   
161   void* handle;
162   handle = dlopen( impl_name.c_str() , RTLD_LAZY ) ;
163   if ( handle )
164     {
165       _library_map[impl_name] = handle;
166       _numInstanceMutex.unlock();
167       MESSAGE("[" << _numproc << "] Library " << impl_name << " loaded");
168       return true;
169     }
170   else
171     {
172       MESSAGE("[" << _numproc << "] Can't load shared library : " << impl_name);
173       MESSAGE("[" << _numproc << "] error dlopen: " << dlerror());
174     }
175   _numInstanceMutex.unlock();
176
177   // --- try import Python component
178
179   INFOS("[" << _numproc << "] try import Python component "<<componentName);
180   if (_isSupervContainer)
181     {
182       INFOS("[" << _numproc << "] Supervision Container does not support Python Component Engines");
183       return false;
184     }
185   if (_library_map[aCompName])
186     {
187       return true; // Python Component, already imported
188     }
189   else
190     {
191       Py_ACQUIRE_NEW_THREAD;
192       PyObject *mainmod = PyImport_AddModule((char *)"__main__");
193       PyObject *globals = PyModule_GetDict(mainmod);
194       PyObject *pyCont = PyDict_GetItemString(globals, "pyCont");
195       PyObject *result = PyObject_CallMethod(pyCont,
196                                              (char*)"import_component",
197                                              (char*)"s",componentName);
198       std::string ret= PyString_AsString(result);
199       SCRUTE(ret);
200       Py_RELEASE_NEW_THREAD;
201   
202       if (ret=="") // import possible: Python component
203         {
204           _library_map[aCompName] = (void *)pyCont; // any non O value OK
205           MESSAGE("[" << _numproc << "] import Python: "<<aCompName<<" OK");
206           return true;
207         }
208     }
209   return false;
210 }
211
212 // Create an instance of component
213 Engines::Component_ptr
214 Engines_MPIContainer_i::create_component_instance_env( const char* componentName,
215                                                        CORBA::Long studyId,
216                                                        const Engines::FieldsDict& env,
217                                                        CORBA::String_out reason)
218 {
219   reason=CORBA::string_dup("");
220
221   pthread_t *th;
222   if(_numproc == 0){
223     th = new pthread_t[_nbproc];
224     for(int ip=1;ip<_nbproc;ip++){
225       thread_st *st = new thread_st;
226       st->ip = ip;
227       st->tior = _tior;
228       st->compoName = componentName;
229       st->studyId = studyId;
230       pthread_create(&(th[ip]),NULL,th_createcomponentinstance,(void*)st);
231     }
232   }
233
234   Engines::Component_ptr cptr = Lcreate_component_instance(componentName,studyId);
235
236   if(_numproc == 0){
237     for(int ip=1;ip<_nbproc;ip++)
238       pthread_join(th[ip],NULL);
239     delete th;
240   }
241
242   return cptr;
243 }
244
245 Engines::Component_ptr
246 Engines_MPIContainer_i::Lcreate_component_instance( const char* genericRegisterName, CORBA::Long studyId)
247 {
248   if (studyId < 0) {
249     INFOS("studyId must be > 0 for mono study instance, =0 for multiStudy");
250     return Engines::Component::_nil() ;
251   }
252
253   Engines::Component_var iobject = Engines::Component::_nil() ;
254   Engines::MPIObject_var pobj;
255
256   std::string aCompName = genericRegisterName;
257   if (_library_map[aCompName]) { // Python component
258     if (_isSupervContainer) {
259       INFOS("Supervision Container does not support Python Component Engines");
260       return Engines::Component::_nil();
261     }
262     _numInstanceMutex.lock() ; // lock on the instance number
263     _numInstance++ ;
264     int numInstance = _numInstance ;
265     _numInstanceMutex.unlock() ;
266
267     char aNumI[12];
268     sprintf( aNumI , "%d" , numInstance ) ;
269     std::string instanceName = aCompName + "_inst_" + aNumI ;
270     std::string component_registerName =
271       _containerName + "/" + instanceName;
272
273     Py_ACQUIRE_NEW_THREAD;
274     PyObject *mainmod = PyImport_AddModule((char*)"__main__");
275     PyObject *globals = PyModule_GetDict(mainmod);
276     PyObject *pyCont = PyDict_GetItemString(globals, "pyCont");
277     PyObject *result = PyObject_CallMethod(pyCont,
278                                            (char*)"create_component_instance",
279                                            (char*)"ssl",
280                                            aCompName.c_str(),
281                                            instanceName.c_str(),
282                                            studyId);
283     const char *ior;
284     const char *error;
285     PyArg_ParseTuple(result,"ss", &ior, &error);
286     std::string iors = ior;
287     SCRUTE(iors);
288     Py_RELEASE_NEW_THREAD;
289   
290     CORBA::Object_var obj = _orb->string_to_object(iors.c_str());
291     iobject = Engines::Component::_narrow( obj ) ;
292     pobj = Engines::MPIObject::_narrow(obj) ;
293     if( _numproc == 0 )
294       _NS->Register(iobject, component_registerName.c_str()) ;
295     // Root recupere les ior des composants des autre process
296     BCastIOR(_orb,pobj,false);
297
298     return iobject._retn();
299   }
300   
301   //--- try C++
302
303   std::string impl_name = std::string ("lib") + genericRegisterName +std::string("Engine.so");
304   if (_library_map.count(impl_name) != 0) // C++ component
305     {
306       void* handle = _library_map[impl_name];
307       iobject = createMPIInstance(genericRegisterName,
308                                     handle,
309                                     studyId);
310       return iobject._retn();
311     }
312
313   return Engines::Component::_nil() ;
314 }
315
316 Engines::Component_ptr
317 Engines_MPIContainer_i::createMPIInstance(std::string genericRegisterName,
318                                           void *handle,
319                                           int studyId)
320 {
321   Engines::Component_var iobject;
322   Engines::MPIObject_var pobj;
323   // --- find the factory
324
325   std::string aGenRegisterName = genericRegisterName;
326   std::string factory_name = aGenRegisterName + std::string("Engine_factory");
327
328   typedef  PortableServer::ObjectId * (*MPIFACTORY_FUNCTION)
329     (CORBA::ORB_ptr,
330      PortableServer::POA_ptr, 
331      PortableServer::ObjectId *, 
332      const char *, 
333      const char *) ;
334
335   dlerror();
336   MPIFACTORY_FUNCTION MPIComponent_factory = (MPIFACTORY_FUNCTION) dlsym(handle, factory_name.c_str());
337
338   if ( !MPIComponent_factory )
339     {
340       INFOS( "[" << _numproc << "] Can't resolve symbol: " + factory_name );
341       SCRUTE( dlerror() );
342       pobj = Engines::MPIObject::_nil();
343       BCastIOR(_orb,pobj,false);
344       return Engines::Component::_nil();
345     }
346
347   // --- create instance
348
349   iobject = Engines::Component::_nil() ;
350
351   try
352     {
353       _numInstanceMutex.lock() ; // lock on the instance number
354       _numInstance++ ;
355       int numInstance = _numInstance ;
356       _numInstanceMutex.unlock() ;
357
358       char aNumI[12];
359       sprintf( aNumI , "%d" , numInstance ) ;
360       std::string instanceName = aGenRegisterName + "_inst_" + aNumI ;
361       std::string component_registerName =
362         _containerName + "/" + instanceName;
363
364       // --- Instanciate required CORBA object
365
366       PortableServer::ObjectId *id ; //not owner, do not delete (nore use var)
367       id = (MPIComponent_factory) ( _orb, _poa, _id, instanceName.c_str(), aGenRegisterName.c_str() ) ;
368
369       // --- get reference & servant from id
370
371       CORBA::Object_var obj = _poa->id_to_reference(*id);
372       iobject = Engines::Component::_narrow( obj ) ;
373       pobj = Engines::MPIObject::_narrow(obj) ;
374
375       Engines_Component_i *servant =
376         dynamic_cast<Engines_Component_i*>(_poa->reference_to_servant(iobject));
377       ASSERT(servant);
378       //SCRUTE(servant->pd_refCount);
379       servant->_remove_ref(); // compensate previous id_to_reference 
380       //SCRUTE(servant->pd_refCount);
381       _listInstances_map[instanceName] = iobject;
382       _cntInstances_map[aGenRegisterName] += 1;
383       //SCRUTE(servant->pd_refCount);
384       bool ret_studyId = servant->setStudyId(studyId);
385       ASSERT(ret_studyId);
386
387       // --- register the engine under the name
388       //     containerName(.dir)/instanceName(.object)
389
390       if( _numproc == 0 ){
391         _NS->Register( iobject , component_registerName.c_str() ) ;
392         MESSAGE( component_registerName.c_str() << " bound" ) ;
393       }
394       // Root recupere les ior des composants des autre process
395       BCastIOR(_orb,pobj,false);
396
397     }
398   catch(const std::exception &ex){
399     INFOS( ex.what() ) ;
400     return Engines::Component::_nil();
401   }
402   return iobject._retn();
403 }
404
405 // Load component
406 Engines::Component_ptr Engines_MPIContainer_i::load_impl(const char* nameToRegister,
407                                                  const char* componentName)
408 {
409   pthread_t *th;
410   if(_numproc == 0){
411     th = new pthread_t[_nbproc];
412     for(int ip=1;ip<_nbproc;ip++){
413       thread_st *st = new thread_st;
414       st->ip = ip;
415       st->tior = _tior;
416       st->nameToRegister = nameToRegister;
417       st->compoName = componentName;
418       pthread_create(&(th[ip]),NULL,th_loadimpl,(void*)st);
419     }
420   }
421
422   Engines::Component_ptr cptr =  Lload_impl(nameToRegister,componentName);
423
424   if(_numproc == 0){
425     for(int ip=1;ip<_nbproc;ip++)
426       pthread_join(th[ip],NULL);
427     delete th;
428   }
429
430   return cptr;
431 }
432
433 // Load component
434 Engines::Component_ptr Engines_MPIContainer_i::Lload_impl(
435                                    const char* nameToRegister,
436                                    const char* componentName)
437 {
438   Engines::Component_var iobject;
439   Engines::MPIObject_var pobj;
440   char cproc[4];
441
442   sprintf(cproc,"_%d",_numproc);
443
444   BEGIN_OF("[" << _numproc << "] MPIContainer_i::Lload_impl");
445
446   _numInstanceMutex.lock() ; // lock on the instance number
447   _numInstance++ ;
448   char _aNumI[12];
449   sprintf(_aNumI,"%d",_numInstance) ;
450
451   std::string _impl_name = componentName;
452   std::string _nameToRegister = nameToRegister;
453   std::string instanceName = _nameToRegister + "_inst_" + _aNumI + cproc;
454   MESSAGE("[" << _numproc << "] instanceName=" << instanceName);
455
456   std::string absolute_impl_name(_impl_name);
457   MESSAGE("[" << _numproc << "] absolute_impl_name=" << absolute_impl_name);
458   void * handle = dlopen(absolute_impl_name.c_str(), RTLD_LAZY);
459   if(!handle){
460     INFOS("[" << _numproc << "] Can't load shared library : " << absolute_impl_name);
461     INFOS("[" << _numproc << "] error dlopen: " << dlerror());
462     return Engines::Component::_nil() ;
463   }
464
465   std::string factory_name = _nameToRegister + std::string("Engine_factory");
466   MESSAGE("[" << _numproc << "] factory_name=" << factory_name) ;
467
468   dlerror();
469   PortableServer::ObjectId * (*MPIComponent_factory) (CORBA::ORB_ptr,
470                                                   PortableServer::POA_ptr,
471                                                   PortableServer::ObjectId *,
472                                                   const char *,
473                                                   const char *) =
474     (PortableServer::ObjectId * (*) (CORBA::ORB_ptr,
475                                      PortableServer::POA_ptr, 
476                                      PortableServer::ObjectId *, 
477                                      const char *, 
478                                      const char *)) 
479     dlsym(handle, factory_name.c_str());
480
481   char *error ;
482   if ((error = dlerror()) != NULL){
483     // Try to load a sequential component
484     MESSAGE("[" << _numproc << "] Try to load a sequential component");
485     _numInstanceMutex.unlock() ;
486     iobject = Engines_Container_i::load_impl(nameToRegister,componentName);
487     if( CORBA::is_nil(iobject) ) return Engines::Component::_duplicate(iobject);
488   }
489   else{
490     // Instanciation du composant parallele
491     MESSAGE("[" << _numproc << "] Try to load a parallel component");
492     PortableServer::ObjectId * id = (MPIComponent_factory)
493       (_orb, _poa, _id, instanceName.c_str(), _nameToRegister.c_str());
494     // get reference from id
495     CORBA::Object_var o = _poa->id_to_reference(*id);
496     pobj = Engines::MPIObject::_narrow(o) ;
497     iobject = Engines::Component::_narrow(o) ;
498   }
499
500   if( _numproc == 0 ){
501     // utiliser + tard le registry ici :
502     // register the engine under the name containerName.dir/nameToRegister.object
503     std::string component_registerName = _containerName + "/" + _nameToRegister;
504     _NS->Register(iobject, component_registerName.c_str()) ;
505   }
506
507   _numInstanceMutex.unlock() ;
508
509   // Root recupere les ior des composants des autre process
510   BCastIOR(_orb,pobj,false);
511
512   END_OF("[" <<_numproc << "] MPIContainer_i::Lload_impl");
513   return Engines::Component::_duplicate(iobject);
514
515 }
516
517 void Engines_MPIContainer_i::remove_impl(Engines::Component_ptr component_i)
518 {
519   Engines::MPIObject_ptr pcptr;
520   Engines::MPIObject_ptr spcptr;
521
522   pthread_t *th;
523   if(_numproc == 0){
524     pcptr = (Engines::MPIObject_ptr)component_i;
525     th = new pthread_t[_nbproc];
526     for(int ip=1;ip<_nbproc;ip++){
527       thread_st *st = new thread_st;
528       st->ip = ip;
529       st->tior = _tior;
530       spcptr = Engines::MPIObject::_narrow((*(pcptr->tior()))[ip]);
531       st->cptr = (Engines::Component_ptr)spcptr;
532       pthread_create(&(th[ip]),NULL,th_removeimpl,(void*)st);
533     }
534   }
535
536   ASSERT(! CORBA::is_nil(component_i));
537   std::string instanceName = component_i->instanceName() ;
538   MESSAGE("[" << _numproc << "] unload component " << instanceName);
539   _numInstanceMutex.lock() ; // lock on the remove on handle_map
540   _listInstances_map.erase(instanceName);
541   _numInstanceMutex.unlock() ;
542   component_i->destroy() ;
543   if(_numproc == 0)
544     _NS->Destroy_Name(instanceName.c_str());
545
546   if(_numproc == 0){
547     for(int ip=1;ip<_nbproc;ip++)
548       pthread_join(th[ip],NULL);
549     delete th;
550   }
551
552 }
553
554 void Engines_MPIContainer_i::finalize_removal()
555 {
556   pthread_t *th;
557   if(_numproc == 0){
558     th = new pthread_t[_nbproc];
559     for(int ip=1;ip<_nbproc;ip++){
560       thread_st *st = new thread_st;
561       st->ip = ip;
562       st->tior = _tior;
563       pthread_create(&(th[ip]),NULL,th_finalizeremoval,(void*)st);
564     }
565   }
566
567   _numInstanceMutex.lock(); // lock to be alone
568   // (see decInstanceCnt, load_component_Library)
569   std::map<std::string, void *>::iterator ith;
570   for (ith = _toRemove_map.begin(); ith != _toRemove_map.end(); ith++)
571   {
572     void *handle = (*ith).second;
573     std::string impl_name= (*ith).first;
574     if (handle)
575     {
576       SCRUTE(handle);
577       SCRUTE(impl_name);
578       //        dlclose(handle);                // SALOME unstable after ...
579       //        _library_map.erase(impl_name);
580     }
581   }
582   _toRemove_map.clear();
583   _numInstanceMutex.unlock();
584
585   if(_numproc == 0){
586     for(int ip=1;ip<_nbproc;ip++)
587       pthread_join(th[ip],NULL);
588     delete th;
589   }
590 }
591
592 void *th_loadcomponentlibrary(void *s)
593 {
594   thread_st *st = (thread_st*)s;
595   char* reason;
596   (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->load_component_Library(st->compoName.c_str(),reason);
597   CORBA::string_free(reason);
598   return NULL;
599 }
600
601 void *th_createcomponentinstance(void *s)
602 {
603   thread_st *st = (thread_st*)s;
604   (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->create_component_instance(st->compoName.c_str(),st->studyId);
605   return NULL;
606 }
607
608 void *th_loadimpl(void *s)
609 {
610   thread_st *st = (thread_st*)s;
611   (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->load_impl(st->nameToRegister.c_str(),st->compoName.c_str());
612   return NULL;
613 }
614
615 void *th_removeimpl(void *s)
616 {
617   thread_st *st = (thread_st*)s;
618   (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->remove_impl(st->cptr);
619   return NULL;
620 }
621
622 void *th_finalizeremoval(void *s)
623 {
624   thread_st *st = (thread_st*)s;
625   (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->finalize_removal();
626   return NULL;
627 }