Salome HOME
Merge branch 'omu/Launcher9'
[modules/kernel.git] / src / MPIContainer / MPIContainer_i.cxx
1 // Copyright (C) 2007-2016  CEA/DEN, EDF R&D, OPEN CASCADE
2 //
3 // Copyright (C) 2003-2007  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 // CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
5 //
6 // This library is free software; you can redistribute it and/or
7 // modify it under the terms of the GNU Lesser General Public
8 // License as published by the Free Software Foundation; either
9 // version 2.1 of the License, or (at your option) any later version.
10 //
11 // This library is distributed in the hope that it will be useful,
12 // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 // Lesser General Public License for more details.
15 //
16 // You should have received a copy of the GNU Lesser General Public
17 // License along with this library; if not, write to the Free Software
18 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
19 //
20 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 //
22 //  File   : MPIContainer_i.cxx
23 //  Module : SALOME
24
25 #include <iostream>
26 #include <dlfcn.h>
27 #include <stdio.h>
28 #include "Basics_Utils.hxx"
29 #include "SALOME_Component_i.hxx"
30 #include "MPIContainer_i.hxx"
31 #include "SALOME_NamingService.hxx"
32 #include "Utils_SINGLETON.hxx"
33 #include "OpUtil.hxx"
34 #include "utilities.h"
35 #include <time.h>
36 #include <sys/time.h>
37 #include <pthread.h>  // must be before Python.h !
38
39 #ifdef _XOPEN_SOURCE
40 #undef _XOPEN_SOURCE
41 #endif
42
43 #include <Python.h>
44 #include "Container_init_python.hxx"
45
46 // L'appel au registry SALOME ne se fait que pour le process 0
47 Engines_MPIContainer_i::Engines_MPIContainer_i(CORBA::ORB_ptr orb, 
48                                                PortableServer::POA_ptr poa,
49                                                char * containerName,
50                                                int argc, char *argv[]) 
51   : Engines_Container_i(orb,poa,containerName,argc,argv,false)
52 {
53
54   _id = _poa->activate_object(this);
55   CORBA::Object_var obj=_poa->id_to_reference(*_id);
56   Engines::Container_var pCont = Engines::Container::_narrow(obj);
57   _remove_ref();
58
59   if(_numproc==0){
60
61     _NS = new SALOME_NamingService();
62     _NS->init_orb( CORBA::ORB::_duplicate(_orb) ) ;
63
64     std::string hostname = Kernel_Utils::GetHostname();
65     _containerName = _NS->BuildContainerNameForNS(containerName,hostname.c_str());
66     SCRUTE(_containerName);
67     _NS->Register(pCont, _containerName.c_str());
68
69   }
70
71   // Root recupere les ior des container des autre process
72   Engines::MPIObject_var pobj = POA_Engines::MPIContainer::_this();
73   BCastIOR(_orb,pobj,true);
74 }
75
76 Engines_MPIContainer_i::Engines_MPIContainer_i() 
77   : Engines_Container_i()
78 {
79 }
80
81 Engines_MPIContainer_i::~Engines_MPIContainer_i(void)
82 {
83   MESSAGE("[" << _numproc << "] Engines_MPIContainer_i::~Engines_MPIContainer_i()");
84 }
85
86 // Load component
87 void Engines_MPIContainer_i::Shutdown()
88 {
89   int ip;
90   MESSAGE("[" << _numproc << "] shutdown of MPI Corba Server");
91   if( _numproc == 0 ){
92     _NS->Destroy_FullDirectory(_containerName.c_str());
93     _NS->Destroy_Name(_containerName.c_str());
94     for(ip= 1;ip<_nbproc;ip++)
95       (Engines::MPIContainer::_narrow((*_tior)[ip]))->Shutdown();
96   }
97
98   std::map<std::string, Engines::EngineComponent_var>::iterator itm;
99   for (itm = _listInstances_map.begin(); itm != _listInstances_map.end(); itm++)
100     {
101       try
102         {
103           itm->second->destroy();
104         }
105       catch(const CORBA::Exception& e)
106         {
107           // ignore this entry and continue
108         }
109       catch(...)
110         {
111           // ignore this entry and continue
112         }
113     }
114
115   _orb->shutdown(0);
116
117 }
118
119 // Load a component library
120 bool Engines_MPIContainer_i::load_component_Library(const char* componentName, CORBA::String_out reason)
121 {
122   reason=CORBA::string_dup("");
123
124   pthread_t *th;
125   if(_numproc == 0){
126     th = new pthread_t[_nbproc];
127     for(int ip=1;ip<_nbproc;ip++){
128       thread_st *st = new thread_st;
129       st->ip = ip;
130       st->tior = _tior;
131       st->compoName = componentName;
132       pthread_create(&(th[ip]),NULL,th_loadcomponentlibrary,(void*)st);
133     }
134   }
135
136   bool ret = Lload_component_Library(componentName);
137
138   if(_numproc == 0){
139     for(int ip=1;ip<_nbproc;ip++)
140       pthread_join(th[ip],NULL);
141     delete th;
142   }
143   return ret;
144 }
145
146 bool Engines_MPIContainer_i::Lload_component_Library(const char* componentName)
147 {
148   std::string aCompName = componentName;
149
150   // --- try dlopen C++ component
151
152 #ifdef __APPLE__
153   std::string impl_name = std::string ("lib") + aCompName + std::string("Engine.dylib");
154 #else
155   std::string impl_name = std::string ("lib") + aCompName + std::string("Engine.so");
156 #endif
157   
158   _numInstanceMutex.lock(); // lock to be alone 
159   // (see decInstanceCnt, finalize_removal))
160   if (_toRemove_map[impl_name]) _toRemove_map.erase(impl_name);
161   if (_library_map[impl_name])
162     {
163       MESSAGE("[" << _numproc << "] Library " << impl_name << " already loaded");
164       _numInstanceMutex.unlock();
165       return true;
166     }
167   
168   void* handle;
169   handle = dlopen( impl_name.c_str() , RTLD_LAZY | RTLD_GLOBAL ) ;
170   if ( handle )
171     {
172       _library_map[impl_name] = handle;
173       _numInstanceMutex.unlock();
174       MESSAGE("[" << _numproc << "] Library " << impl_name << " loaded");
175       return true;
176     }
177   else
178     {
179       MESSAGE("[" << _numproc << "] Can't load shared library : " << impl_name);
180       MESSAGE("[" << _numproc << "] error dlopen: " << dlerror());
181     }
182   _numInstanceMutex.unlock();
183
184   // --- try import Python component
185
186   INFOS("[" << _numproc << "] try import Python component "<<componentName);
187   if (_isSupervContainer)
188     {
189       INFOS("[" << _numproc << "] Supervision Container does not support Python Component Engines");
190       return false;
191     }
192   if (_library_map[aCompName])
193     {
194       return true; // Python Component, already imported
195     }
196   else
197     {
198       Py_ACQUIRE_NEW_THREAD;
199       PyObject *mainmod = PyImport_AddModule((char *)"__main__");
200       PyObject *globals = PyModule_GetDict(mainmod);
201       PyObject *pyCont = PyDict_GetItemString(globals, "pyCont");
202       PyObject *result = PyObject_CallMethod(pyCont,
203                                              (char*)"import_component",
204                                              (char*)"s",componentName);
205       std::string ret= PyUnicode_AsUTF8(result);
206       SCRUTE(ret);
207       Py_RELEASE_NEW_THREAD;
208   
209       if (ret=="") // import possible: Python component
210         {
211           _library_map[aCompName] = (void *)pyCont; // any non O value OK
212           MESSAGE("[" << _numproc << "] import Python: "<<aCompName<<" OK");
213           return true;
214         }
215     }
216   return false;
217 }
218
219 // Create an instance of component
220 Engines::EngineComponent_ptr
221 Engines_MPIContainer_i::create_component_instance_env( const char* componentName,
222                                                        const Engines::FieldsDict& env,
223                                                        CORBA::String_out reason)
224 {
225   reason=CORBA::string_dup("");
226
227   pthread_t *th;
228   if(_numproc == 0){
229     th = new pthread_t[_nbproc];
230     for(int ip=1;ip<_nbproc;ip++){
231       thread_st *st = new thread_st;
232       st->ip = ip;
233       st->tior = _tior;
234       st->compoName = componentName;
235       pthread_create(&(th[ip]),NULL,th_createcomponentinstance,(void*)st);
236     }
237   }
238
239   Engines::EngineComponent_ptr cptr = Lcreate_component_instance(componentName);
240
241   if(_numproc == 0){
242     for(int ip=1;ip<_nbproc;ip++)
243       pthread_join(th[ip],NULL);
244     delete th;
245   }
246
247   return cptr;
248 }
249
250 Engines::EngineComponent_ptr
251 Engines_MPIContainer_i::Lcreate_component_instance( const char* genericRegisterName )
252 {
253   Engines::EngineComponent_var iobject = Engines::EngineComponent::_nil() ;
254   Engines::MPIObject_var pobj;
255
256   std::string aCompName = genericRegisterName;
257   if (_library_map[aCompName]) { // Python component
258     if (_isSupervContainer) {
259       INFOS("Supervision Container does not support Python Component Engines");
260       return Engines::EngineComponent::_nil();
261     }
262     _numInstanceMutex.lock() ; // lock on the instance number
263     _numInstance++ ;
264     int numInstance = _numInstance ;
265     _numInstanceMutex.unlock() ;
266
267     char aNumI[12];
268     sprintf( aNumI , "%d" , numInstance ) ;
269     std::string instanceName = aCompName + "_inst_" + aNumI ;
270     std::string component_registerName =
271       _containerName + "/" + instanceName;
272
273     Py_ACQUIRE_NEW_THREAD;
274     PyObject *mainmod = PyImport_AddModule((char*)"__main__");
275     PyObject *globals = PyModule_GetDict(mainmod);
276     PyObject *pyCont = PyDict_GetItemString(globals, "pyCont");
277     PyObject *result = PyObject_CallMethod(pyCont,
278                                            (char*)"create_component_instance",
279                                            (char*)"ss",
280                                            aCompName.c_str(),
281                                            instanceName.c_str());
282     const char *ior;
283     const char *error;
284     PyArg_ParseTuple(result,"ss", &ior, &error);
285     std::string iors = ior;
286     SCRUTE(iors);
287     Py_RELEASE_NEW_THREAD;
288   
289     CORBA::Object_var obj = _orb->string_to_object(iors.c_str());
290     iobject = Engines::EngineComponent::_narrow( obj ) ;
291     pobj = Engines::MPIObject::_narrow(obj) ;
292     if( _numproc == 0 )
293       _NS->Register(iobject, component_registerName.c_str()) ;
294     // Root recupere les ior des composants des autre process
295     BCastIOR(_orb,pobj,false);
296
297     return iobject._retn();
298   }
299   
300   //--- try C++
301
302 #ifdef __APPLE__
303   std::string impl_name = std::string ("lib") + genericRegisterName +std::string("Engine.dylib");
304 #else
305   std::string impl_name = std::string ("lib") + genericRegisterName +std::string("Engine.so");
306 #endif
307   if (_library_map.count(impl_name) != 0) // C++ component
308     {
309       void* handle = _library_map[impl_name];
310       iobject = createMPIInstance(genericRegisterName,
311                                   handle);
312       return iobject._retn();
313     }
314
315   return Engines::EngineComponent::_nil() ;
316 }
317
318 Engines::EngineComponent_ptr
319 Engines_MPIContainer_i::createMPIInstance(std::string genericRegisterName,
320                                           void *handle)
321 {
322   Engines::EngineComponent_var iobject;
323   Engines::MPIObject_var pobj;
324   // --- find the factory
325
326   std::string aGenRegisterName = genericRegisterName;
327   std::string factory_name = aGenRegisterName + std::string("Engine_factory");
328
329   typedef  PortableServer::ObjectId * (*MPIFACTORY_FUNCTION)
330     (CORBA::ORB_ptr,
331      PortableServer::POA_ptr, 
332      PortableServer::ObjectId *, 
333      const char *, 
334      const char *) ;
335
336   dlerror();
337   MPIFACTORY_FUNCTION MPIComponent_factory = (MPIFACTORY_FUNCTION) dlsym(handle, factory_name.c_str());
338
339   if ( !MPIComponent_factory )
340     {
341       INFOS( "[" << _numproc << "] Can't resolve symbol: " + factory_name );
342       SCRUTE( dlerror() );
343       pobj = Engines::MPIObject::_nil();
344       BCastIOR(_orb,pobj,false);
345       return Engines::EngineComponent::_nil();
346     }
347
348   // --- create instance
349
350   iobject = Engines::EngineComponent::_nil() ;
351
352   try
353     {
354       _numInstanceMutex.lock() ; // lock on the instance number
355       _numInstance++ ;
356       int numInstance = _numInstance ;
357       _numInstanceMutex.unlock() ;
358
359       char aNumI[12];
360       sprintf( aNumI , "%d" , numInstance ) ;
361       std::string instanceName = aGenRegisterName + "_inst_" + aNumI ;
362       std::string component_registerName =
363         _containerName + "/" + instanceName;
364
365       // --- Instantiate required CORBA object
366
367       PortableServer::ObjectId *id ; //not owner, do not delete (nore use var)
368       id = (MPIComponent_factory) ( _orb, _poa, _id, instanceName.c_str(), aGenRegisterName.c_str() ) ;
369
370       // --- get reference from id
371
372       CORBA::Object_var obj = _poa->id_to_reference(*id);
373       iobject = Engines::EngineComponent::_narrow( obj ) ;
374       pobj = Engines::MPIObject::_narrow(obj) ;
375
376       _listInstances_map[instanceName] = iobject;
377       _cntInstances_map[aGenRegisterName] += 1;
378
379       // --- register the engine under the name
380       //     containerName(.dir)/instanceName(.object)
381
382       if( _numproc == 0 ){
383         _NS->Register( iobject , component_registerName.c_str() ) ;
384         MESSAGE( component_registerName.c_str() << " bound" ) ;
385       }
386       // Root recupere les ior des composants des autre process
387       BCastIOR(_orb,pobj,false);
388
389     }
390   catch(const std::exception &ex){
391     INFOS( ex.what() ) ;
392     return Engines::EngineComponent::_nil();
393   }
394   return iobject._retn();
395 }
396
397 // Load component
398 Engines::EngineComponent_ptr Engines_MPIContainer_i::load_impl(const char* nameToRegister,
399                                                  const char* componentName)
400 {
401   pthread_t *th;
402   if(_numproc == 0){
403     th = new pthread_t[_nbproc];
404     for(int ip=1;ip<_nbproc;ip++){
405       thread_st *st = new thread_st;
406       st->ip = ip;
407       st->tior = _tior;
408       st->nameToRegister = nameToRegister;
409       st->compoName = componentName;
410       pthread_create(&(th[ip]),NULL,th_loadimpl,(void*)st);
411     }
412   }
413
414   Engines::EngineComponent_ptr cptr =  Lload_impl(nameToRegister,componentName);
415
416   if(_numproc == 0){
417     for(int ip=1;ip<_nbproc;ip++)
418       pthread_join(th[ip],NULL);
419     delete th;
420   }
421
422   return cptr;
423 }
424
425 // Load component
426 Engines::EngineComponent_ptr Engines_MPIContainer_i::Lload_impl(
427                                    const char* nameToRegister,
428                                    const char* componentName)
429 {
430   Engines::EngineComponent_var iobject;
431   Engines::MPIObject_var pobj;
432   char cproc[4];
433
434   sprintf(cproc,"_%d",_numproc);
435
436   BEGIN_OF("[" << _numproc << "] MPIContainer_i::Lload_impl");
437
438   _numInstanceMutex.lock() ; // lock on the instance number
439   _numInstance++ ;
440   char _aNumI[12];
441   sprintf(_aNumI,"%d",_numInstance) ;
442
443   std::string _impl_name = componentName;
444   std::string _nameToRegister = nameToRegister;
445   std::string instanceName = _nameToRegister + "_inst_" + _aNumI + cproc;
446   MESSAGE("[" << _numproc << "] instanceName=" << instanceName);
447
448   std::string absolute_impl_name(_impl_name);
449   MESSAGE("[" << _numproc << "] absolute_impl_name=" << absolute_impl_name);
450   void * handle = dlopen(absolute_impl_name.c_str(), RTLD_LAZY | RTLD_GLOBAL);
451   if(!handle){
452     INFOS("[" << _numproc << "] Can't load shared library : " << absolute_impl_name);
453     INFOS("[" << _numproc << "] error dlopen: " << dlerror());
454     return Engines::EngineComponent::_nil() ;
455   }
456
457   std::string factory_name = _nameToRegister + std::string("Engine_factory");
458   MESSAGE("[" << _numproc << "] factory_name=" << factory_name) ;
459
460   dlerror();
461   PortableServer::ObjectId * (*MPIComponent_factory) (CORBA::ORB_ptr,
462                                                   PortableServer::POA_ptr,
463                                                   PortableServer::ObjectId *,
464                                                   const char *,
465                                                   const char *) =
466     (PortableServer::ObjectId * (*) (CORBA::ORB_ptr,
467                                      PortableServer::POA_ptr, 
468                                      PortableServer::ObjectId *, 
469                                      const char *, 
470                                      const char *)) 
471     dlsym(handle, factory_name.c_str());
472
473   char *error ;
474   if ((error = dlerror()) != NULL){
475     // Try to load a sequential component
476     MESSAGE("[" << _numproc << "] Try to load a sequential component");
477     _numInstanceMutex.unlock() ;
478     iobject = Engines_Container_i::load_impl(nameToRegister,componentName);
479     if( CORBA::is_nil(iobject) ) return Engines::EngineComponent::_duplicate(iobject);
480   }
481   else{
482     // Instanciation du composant parallele
483     MESSAGE("[" << _numproc << "] Try to load a parallel component");
484     PortableServer::ObjectId * id = (MPIComponent_factory)
485       (_orb, _poa, _id, instanceName.c_str(), _nameToRegister.c_str());
486     // get reference from id
487     CORBA::Object_var o = _poa->id_to_reference(*id);
488     pobj = Engines::MPIObject::_narrow(o) ;
489     iobject = Engines::EngineComponent::_narrow(o) ;
490   }
491
492   if( _numproc == 0 ){
493     // utiliser + tard le registry ici :
494     // register the engine under the name containerName.dir/nameToRegister.object
495     std::string component_registerName = _containerName + "/" + _nameToRegister;
496     _NS->Register(iobject, component_registerName.c_str()) ;
497   }
498
499   _numInstanceMutex.unlock() ;
500
501   // Root recupere les ior des composants des autre process
502   BCastIOR(_orb,pobj,false);
503
504   END_OF("[" <<_numproc << "] MPIContainer_i::Lload_impl");
505   return Engines::EngineComponent::_duplicate(iobject);
506
507 }
508
509 void Engines_MPIContainer_i::remove_impl(Engines::EngineComponent_ptr component_i)
510 {
511   Engines::MPIObject_ptr pcptr;
512   Engines::MPIObject_ptr spcptr;
513
514   pthread_t *th;
515   if(_numproc == 0){
516     pcptr = (Engines::MPIObject_ptr)component_i;
517     th = new pthread_t[_nbproc];
518     for(int ip=1;ip<_nbproc;ip++){
519       thread_st *st = new thread_st;
520       st->ip = ip;
521       st->tior = _tior;
522       spcptr = Engines::MPIObject::_narrow((*(pcptr->tior()))[ip]);
523       st->cptr = (Engines::EngineComponent_ptr)spcptr;
524       pthread_create(&(th[ip]),NULL,th_removeimpl,(void*)st);
525     }
526   }
527
528   ASSERT(! CORBA::is_nil(component_i));
529   std::string instanceName = component_i->instanceName() ;
530   MESSAGE("[" << _numproc << "] unload component " << instanceName);
531   _numInstanceMutex.lock() ; // lock on the remove on handle_map
532   _listInstances_map.erase(instanceName);
533   _numInstanceMutex.unlock() ;
534   component_i->destroy() ;
535   if(_numproc == 0)
536     _NS->Destroy_Name(instanceName.c_str());
537
538   if(_numproc == 0){
539     for(int ip=1;ip<_nbproc;ip++)
540       pthread_join(th[ip],NULL);
541     delete th;
542   }
543
544 }
545
546 void Engines_MPIContainer_i::finalize_removal()
547 {
548   pthread_t *th;
549   if(_numproc == 0){
550     th = new pthread_t[_nbproc];
551     for(int ip=1;ip<_nbproc;ip++){
552       thread_st *st = new thread_st;
553       st->ip = ip;
554       st->tior = _tior;
555       pthread_create(&(th[ip]),NULL,th_finalizeremoval,(void*)st);
556     }
557   }
558
559   _numInstanceMutex.lock(); // lock to be alone
560   // (see decInstanceCnt, load_component_Library)
561   std::map<std::string, void *>::iterator ith;
562   for (ith = _toRemove_map.begin(); ith != _toRemove_map.end(); ith++)
563   {
564     void *handle = (*ith).second;
565     std::string impl_name= (*ith).first;
566     if (handle)
567     {
568       SCRUTE(handle);
569       SCRUTE(impl_name);
570       //        dlclose(handle);                // SALOME unstable after ...
571       //        _library_map.erase(impl_name);
572     }
573   }
574   _toRemove_map.clear();
575   _numInstanceMutex.unlock();
576
577   if(_numproc == 0){
578     for(int ip=1;ip<_nbproc;ip++)
579       pthread_join(th[ip],NULL);
580     delete th;
581   }
582 }
583
584 void *th_loadcomponentlibrary(void *s)
585 {
586   thread_st *st = (thread_st*)s;
587   char* reason;
588   (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->load_component_Library(st->compoName.c_str(),reason);
589   CORBA::string_free(reason);
590   return NULL;
591 }
592
593 void *th_createcomponentinstance(void *s)
594 {
595   thread_st *st = (thread_st*)s;
596   (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->create_component_instance(st->compoName.c_str());
597   return NULL;
598 }
599
600 void *th_loadimpl(void *s)
601 {
602   thread_st *st = (thread_st*)s;
603   (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->load_impl(st->nameToRegister.c_str(),st->compoName.c_str());
604   return NULL;
605 }
606
607 void *th_removeimpl(void *s)
608 {
609   thread_st *st = (thread_st*)s;
610   (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->remove_impl(st->cptr);
611   return NULL;
612 }
613
614 void *th_finalizeremoval(void *s)
615 {
616   thread_st *st = (thread_st*)s;
617   (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->finalize_removal();
618   return NULL;
619 }