Salome HOME
Merge V8_4_BR branch.
[modules/kernel.git] / src / MPIContainer / MPIContainer_i.cxx
1 // Copyright (C) 2007-2016  CEA/DEN, EDF R&D, OPEN CASCADE
2 //
3 // Copyright (C) 2003-2007  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 // CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
5 //
6 // This library is free software; you can redistribute it and/or
7 // modify it under the terms of the GNU Lesser General Public
8 // License as published by the Free Software Foundation; either
9 // version 2.1 of the License, or (at your option) any later version.
10 //
11 // This library is distributed in the hope that it will be useful,
12 // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 // Lesser General Public License for more details.
15 //
16 // You should have received a copy of the GNU Lesser General Public
17 // License along with this library; if not, write to the Free Software
18 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
19 //
20 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 //
22 //  File   : MPIContainer_i.cxx
23 //  Module : SALOME
24
25 #include <iostream>
26 #include <dlfcn.h>
27 #include <stdio.h>
28 #include "Basics_Utils.hxx"
29 #include "SALOME_Component_i.hxx"
30 #include "MPIContainer_i.hxx"
31 #include "SALOME_NamingService.hxx"
32 #include "Utils_SINGLETON.hxx"
33 #include "OpUtil.hxx"
34 #include "utilities.h"
35 #include <time.h>
36 #include <sys/time.h>
37 #include <pthread.h>  // must be before Python.h !
38
39 #ifdef _XOPEN_SOURCE
40 #undef _XOPEN_SOURCE
41 #endif
42
43 #include <Python.h>
44 #include "Container_init_python.hxx"
45
46 // L'appel au registry SALOME ne se fait que pour le process 0
47 Engines_MPIContainer_i::Engines_MPIContainer_i(CORBA::ORB_ptr orb, 
48                                                PortableServer::POA_ptr poa,
49                                                char * containerName,
50                                                int argc, char *argv[]) 
51   : Engines_Container_i(orb,poa,containerName,argc,argv,false)
52 {
53
54   _id = _poa->activate_object(this);
55   CORBA::Object_var obj=_poa->id_to_reference(*_id);
56   Engines::Container_var pCont = Engines::Container::_narrow(obj);
57   _remove_ref();
58
59   if(_numproc==0){
60
61     _NS = new SALOME_NamingService();
62     _NS->init_orb( CORBA::ORB::_duplicate(_orb) ) ;
63
64     std::string hostname = Kernel_Utils::GetHostname();
65     _containerName = _NS->BuildContainerNameForNS(containerName,hostname.c_str());
66     SCRUTE(_containerName);
67     _NS->Register(pCont, _containerName.c_str());
68
69   }
70
71   // Root recupere les ior des container des autre process
72   Engines::MPIObject_var pobj = POA_Engines::MPIContainer::_this();
73   BCastIOR(_orb,pobj,true);
74 }
75
76 Engines_MPIContainer_i::Engines_MPIContainer_i() 
77   : Engines_Container_i()
78 {
79 }
80
81 Engines_MPIContainer_i::~Engines_MPIContainer_i(void)
82 {
83   MESSAGE("[" << _numproc << "] Engines_MPIContainer_i::~Engines_MPIContainer_i()");
84 }
85
86 // Load component
87 void Engines_MPIContainer_i::Shutdown()
88 {
89   int ip;
90   MESSAGE("[" << _numproc << "] shutdown of MPI Corba Server");
91   if( _numproc == 0 ){
92     _NS->Destroy_FullDirectory(_containerName.c_str());
93     _NS->Destroy_Name(_containerName.c_str());
94     for(ip= 1;ip<_nbproc;ip++)
95       (Engines::MPIContainer::_narrow((*_tior)[ip]))->Shutdown();
96   }
97
98   std::map<std::string, Engines::EngineComponent_var>::iterator itm;
99   for (itm = _listInstances_map.begin(); itm != _listInstances_map.end(); itm++)
100     {
101       try
102         {
103           itm->second->destroy();
104         }
105       catch(const CORBA::Exception& e)
106         {
107           // ignore this entry and continue
108         }
109       catch(...)
110         {
111           // ignore this entry and continue
112         }
113     }
114
115   _orb->shutdown(0);
116
117 }
118
119 // Load a component library
120 bool Engines_MPIContainer_i::load_component_Library(const char* componentName, CORBA::String_out reason)
121 {
122   reason=CORBA::string_dup("");
123
124   pthread_t *th;
125   if(_numproc == 0){
126     th = new pthread_t[_nbproc];
127     for(int ip=1;ip<_nbproc;ip++){
128       thread_st *st = new thread_st;
129       st->ip = ip;
130       st->tior = _tior;
131       st->compoName = componentName;
132       pthread_create(&(th[ip]),NULL,th_loadcomponentlibrary,(void*)st);
133     }
134   }
135
136   bool ret = Lload_component_Library(componentName);
137
138   if(_numproc == 0){
139     for(int ip=1;ip<_nbproc;ip++)
140       pthread_join(th[ip],NULL);
141     delete th;
142   }
143   return ret;
144 }
145
146 bool Engines_MPIContainer_i::Lload_component_Library(const char* componentName)
147 {
148   std::string aCompName = componentName;
149
150   // --- try dlopen C++ component
151
152 #ifdef __APPLE__
153   std::string impl_name = std::string ("lib") + aCompName + std::string("Engine.dylib");
154 #else
155   std::string impl_name = std::string ("lib") + aCompName + std::string("Engine.so");
156 #endif
157   
158   _numInstanceMutex.lock(); // lock to be alone 
159   // (see decInstanceCnt, finalize_removal))
160   if (_toRemove_map[impl_name]) _toRemove_map.erase(impl_name);
161   if (_library_map[impl_name])
162     {
163       MESSAGE("[" << _numproc << "] Library " << impl_name << " already loaded");
164       _numInstanceMutex.unlock();
165       return true;
166     }
167   
168   void* handle;
169   handle = dlopen( impl_name.c_str() , RTLD_LAZY | RTLD_GLOBAL ) ;
170   if ( handle )
171     {
172       _library_map[impl_name] = handle;
173       _numInstanceMutex.unlock();
174       MESSAGE("[" << _numproc << "] Library " << impl_name << " loaded");
175       return true;
176     }
177   else
178     {
179       MESSAGE("[" << _numproc << "] Can't load shared library : " << impl_name);
180       MESSAGE("[" << _numproc << "] error dlopen: " << dlerror());
181     }
182   _numInstanceMutex.unlock();
183
184   // --- try import Python component
185
186   INFOS("[" << _numproc << "] try import Python component "<<componentName);
187   if (_isSupervContainer)
188     {
189       INFOS("[" << _numproc << "] Supervision Container does not support Python Component Engines");
190       return false;
191     }
192   if (_library_map[aCompName])
193     {
194       return true; // Python Component, already imported
195     }
196   else
197     {
198       Py_ACQUIRE_NEW_THREAD;
199       PyObject *mainmod = PyImport_AddModule((char *)"__main__");
200       PyObject *globals = PyModule_GetDict(mainmod);
201       PyObject *pyCont = PyDict_GetItemString(globals, "pyCont");
202       PyObject *result = PyObject_CallMethod(pyCont,
203                                              (char*)"import_component",
204                                              (char*)"s",componentName);
205       std::string ret= PyString_AsString(result);
206       SCRUTE(ret);
207       Py_RELEASE_NEW_THREAD;
208   
209       if (ret=="") // import possible: Python component
210         {
211           _library_map[aCompName] = (void *)pyCont; // any non O value OK
212           MESSAGE("[" << _numproc << "] import Python: "<<aCompName<<" OK");
213           return true;
214         }
215     }
216   return false;
217 }
218
219 // Create an instance of component
220 Engines::EngineComponent_ptr
221 Engines_MPIContainer_i::create_component_instance_env( const char* componentName,
222                                                        CORBA::Long studyId,
223                                                        const Engines::FieldsDict& env,
224                                                        CORBA::String_out reason)
225 {
226   reason=CORBA::string_dup("");
227
228   pthread_t *th;
229   if(_numproc == 0){
230     th = new pthread_t[_nbproc];
231     for(int ip=1;ip<_nbproc;ip++){
232       thread_st *st = new thread_st;
233       st->ip = ip;
234       st->tior = _tior;
235       st->compoName = componentName;
236       st->studyId = studyId;
237       pthread_create(&(th[ip]),NULL,th_createcomponentinstance,(void*)st);
238     }
239   }
240
241   Engines::EngineComponent_ptr cptr = Lcreate_component_instance(componentName,studyId);
242
243   if(_numproc == 0){
244     for(int ip=1;ip<_nbproc;ip++)
245       pthread_join(th[ip],NULL);
246     delete th;
247   }
248
249   return cptr;
250 }
251
252 Engines::EngineComponent_ptr
253 Engines_MPIContainer_i::Lcreate_component_instance( const char* genericRegisterName, CORBA::Long studyId)
254 {
255   if (studyId < 0) {
256     INFOS("studyId must be > 0 for mono study instance, =0 for multiStudy");
257     return Engines::EngineComponent::_nil() ;
258   }
259
260   Engines::EngineComponent_var iobject = Engines::EngineComponent::_nil() ;
261   Engines::MPIObject_var pobj;
262
263   std::string aCompName = genericRegisterName;
264   if (_library_map[aCompName]) { // Python component
265     if (_isSupervContainer) {
266       INFOS("Supervision Container does not support Python Component Engines");
267       return Engines::EngineComponent::_nil();
268     }
269     _numInstanceMutex.lock() ; // lock on the instance number
270     _numInstance++ ;
271     int numInstance = _numInstance ;
272     _numInstanceMutex.unlock() ;
273
274     char aNumI[12];
275     sprintf( aNumI , "%d" , numInstance ) ;
276     std::string instanceName = aCompName + "_inst_" + aNumI ;
277     std::string component_registerName =
278       _containerName + "/" + instanceName;
279
280     Py_ACQUIRE_NEW_THREAD;
281     PyObject *mainmod = PyImport_AddModule((char*)"__main__");
282     PyObject *globals = PyModule_GetDict(mainmod);
283     PyObject *pyCont = PyDict_GetItemString(globals, "pyCont");
284     PyObject *result = PyObject_CallMethod(pyCont,
285                                            (char*)"create_component_instance",
286                                            (char*)"ssl",
287                                            aCompName.c_str(),
288                                            instanceName.c_str(),
289                                            studyId);
290     const char *ior;
291     const char *error;
292     PyArg_ParseTuple(result,"ss", &ior, &error);
293     std::string iors = ior;
294     SCRUTE(iors);
295     Py_RELEASE_NEW_THREAD;
296   
297     CORBA::Object_var obj = _orb->string_to_object(iors.c_str());
298     iobject = Engines::EngineComponent::_narrow( obj ) ;
299     pobj = Engines::MPIObject::_narrow(obj) ;
300     if( _numproc == 0 )
301       _NS->Register(iobject, component_registerName.c_str()) ;
302     // Root recupere les ior des composants des autre process
303     BCastIOR(_orb,pobj,false);
304
305     return iobject._retn();
306   }
307   
308   //--- try C++
309
310 #ifdef __APPLE__
311   std::string impl_name = std::string ("lib") + genericRegisterName +std::string("Engine.dylib");
312 #else
313   std::string impl_name = std::string ("lib") + genericRegisterName +std::string("Engine.so");
314 #endif
315   if (_library_map.count(impl_name) != 0) // C++ component
316     {
317       void* handle = _library_map[impl_name];
318       iobject = createMPIInstance(genericRegisterName,
319                                     handle,
320                                     studyId);
321       return iobject._retn();
322     }
323
324   return Engines::EngineComponent::_nil() ;
325 }
326
327 Engines::EngineComponent_ptr
328 Engines_MPIContainer_i::createMPIInstance(std::string genericRegisterName,
329                                           void *handle,
330                                           int studyId)
331 {
332   Engines::EngineComponent_var iobject;
333   Engines::MPIObject_var pobj;
334   // --- find the factory
335
336   std::string aGenRegisterName = genericRegisterName;
337   std::string factory_name = aGenRegisterName + std::string("Engine_factory");
338
339   typedef  PortableServer::ObjectId * (*MPIFACTORY_FUNCTION)
340     (CORBA::ORB_ptr,
341      PortableServer::POA_ptr, 
342      PortableServer::ObjectId *, 
343      const char *, 
344      const char *) ;
345
346   dlerror();
347   MPIFACTORY_FUNCTION MPIComponent_factory = (MPIFACTORY_FUNCTION) dlsym(handle, factory_name.c_str());
348
349   if ( !MPIComponent_factory )
350     {
351       INFOS( "[" << _numproc << "] Can't resolve symbol: " + factory_name );
352       SCRUTE( dlerror() );
353       pobj = Engines::MPIObject::_nil();
354       BCastIOR(_orb,pobj,false);
355       return Engines::EngineComponent::_nil();
356     }
357
358   // --- create instance
359
360   iobject = Engines::EngineComponent::_nil() ;
361
362   try
363     {
364       _numInstanceMutex.lock() ; // lock on the instance number
365       _numInstance++ ;
366       int numInstance = _numInstance ;
367       _numInstanceMutex.unlock() ;
368
369       char aNumI[12];
370       sprintf( aNumI , "%d" , numInstance ) ;
371       std::string instanceName = aGenRegisterName + "_inst_" + aNumI ;
372       std::string component_registerName =
373         _containerName + "/" + instanceName;
374
375       // --- Instantiate required CORBA object
376
377       PortableServer::ObjectId *id ; //not owner, do not delete (nore use var)
378       id = (MPIComponent_factory) ( _orb, _poa, _id, instanceName.c_str(), aGenRegisterName.c_str() ) ;
379
380       // --- get reference & servant from id
381
382       CORBA::Object_var obj = _poa->id_to_reference(*id);
383       iobject = Engines::EngineComponent::_narrow( obj ) ;
384       pobj = Engines::MPIObject::_narrow(obj) ;
385
386       Engines_Component_i *servant =
387         dynamic_cast<Engines_Component_i*>(_poa->reference_to_servant(iobject));
388       ASSERT(servant);
389       //SCRUTE(servant->pd_refCount);
390       servant->_remove_ref(); // compensate previous id_to_reference 
391       //SCRUTE(servant->pd_refCount);
392       _listInstances_map[instanceName] = iobject;
393       _cntInstances_map[aGenRegisterName] += 1;
394       //SCRUTE(servant->pd_refCount);
395 #ifndef _DEBUG_
396       servant->setStudyId(studyId);
397 #else
398       bool ret_studyId = servant->setStudyId(studyId);
399       ASSERT(ret_studyId);
400 #endif
401
402       // --- register the engine under the name
403       //     containerName(.dir)/instanceName(.object)
404
405       if( _numproc == 0 ){
406         _NS->Register( iobject , component_registerName.c_str() ) ;
407         MESSAGE( component_registerName.c_str() << " bound" ) ;
408       }
409       // Root recupere les ior des composants des autre process
410       BCastIOR(_orb,pobj,false);
411
412     }
413   catch(const std::exception &ex){
414     INFOS( ex.what() ) ;
415     return Engines::EngineComponent::_nil();
416   }
417   return iobject._retn();
418 }
419
420 // Load component
421 Engines::EngineComponent_ptr Engines_MPIContainer_i::load_impl(const char* nameToRegister,
422                                                  const char* componentName)
423 {
424   pthread_t *th;
425   if(_numproc == 0){
426     th = new pthread_t[_nbproc];
427     for(int ip=1;ip<_nbproc;ip++){
428       thread_st *st = new thread_st;
429       st->ip = ip;
430       st->tior = _tior;
431       st->nameToRegister = nameToRegister;
432       st->compoName = componentName;
433       pthread_create(&(th[ip]),NULL,th_loadimpl,(void*)st);
434     }
435   }
436
437   Engines::EngineComponent_ptr cptr =  Lload_impl(nameToRegister,componentName);
438
439   if(_numproc == 0){
440     for(int ip=1;ip<_nbproc;ip++)
441       pthread_join(th[ip],NULL);
442     delete th;
443   }
444
445   return cptr;
446 }
447
448 // Load component
449 Engines::EngineComponent_ptr Engines_MPIContainer_i::Lload_impl(
450                                    const char* nameToRegister,
451                                    const char* componentName)
452 {
453   Engines::EngineComponent_var iobject;
454   Engines::MPIObject_var pobj;
455   char cproc[4];
456
457   sprintf(cproc,"_%d",_numproc);
458
459   BEGIN_OF("[" << _numproc << "] MPIContainer_i::Lload_impl");
460
461   _numInstanceMutex.lock() ; // lock on the instance number
462   _numInstance++ ;
463   char _aNumI[12];
464   sprintf(_aNumI,"%d",_numInstance) ;
465
466   std::string _impl_name = componentName;
467   std::string _nameToRegister = nameToRegister;
468   std::string instanceName = _nameToRegister + "_inst_" + _aNumI + cproc;
469   MESSAGE("[" << _numproc << "] instanceName=" << instanceName);
470
471   std::string absolute_impl_name(_impl_name);
472   MESSAGE("[" << _numproc << "] absolute_impl_name=" << absolute_impl_name);
473   void * handle = dlopen(absolute_impl_name.c_str(), RTLD_LAZY | RTLD_GLOBAL);
474   if(!handle){
475     INFOS("[" << _numproc << "] Can't load shared library : " << absolute_impl_name);
476     INFOS("[" << _numproc << "] error dlopen: " << dlerror());
477     return Engines::EngineComponent::_nil() ;
478   }
479
480   std::string factory_name = _nameToRegister + std::string("Engine_factory");
481   MESSAGE("[" << _numproc << "] factory_name=" << factory_name) ;
482
483   dlerror();
484   PortableServer::ObjectId * (*MPIComponent_factory) (CORBA::ORB_ptr,
485                                                   PortableServer::POA_ptr,
486                                                   PortableServer::ObjectId *,
487                                                   const char *,
488                                                   const char *) =
489     (PortableServer::ObjectId * (*) (CORBA::ORB_ptr,
490                                      PortableServer::POA_ptr, 
491                                      PortableServer::ObjectId *, 
492                                      const char *, 
493                                      const char *)) 
494     dlsym(handle, factory_name.c_str());
495
496   char *error ;
497   if ((error = dlerror()) != NULL){
498     // Try to load a sequential component
499     MESSAGE("[" << _numproc << "] Try to load a sequential component");
500     _numInstanceMutex.unlock() ;
501     iobject = Engines_Container_i::load_impl(nameToRegister,componentName);
502     if( CORBA::is_nil(iobject) ) return Engines::EngineComponent::_duplicate(iobject);
503   }
504   else{
505     // Instanciation du composant parallele
506     MESSAGE("[" << _numproc << "] Try to load a parallel component");
507     PortableServer::ObjectId * id = (MPIComponent_factory)
508       (_orb, _poa, _id, instanceName.c_str(), _nameToRegister.c_str());
509     // get reference from id
510     CORBA::Object_var o = _poa->id_to_reference(*id);
511     pobj = Engines::MPIObject::_narrow(o) ;
512     iobject = Engines::EngineComponent::_narrow(o) ;
513   }
514
515   if( _numproc == 0 ){
516     // utiliser + tard le registry ici :
517     // register the engine under the name containerName.dir/nameToRegister.object
518     std::string component_registerName = _containerName + "/" + _nameToRegister;
519     _NS->Register(iobject, component_registerName.c_str()) ;
520   }
521
522   _numInstanceMutex.unlock() ;
523
524   // Root recupere les ior des composants des autre process
525   BCastIOR(_orb,pobj,false);
526
527   END_OF("[" <<_numproc << "] MPIContainer_i::Lload_impl");
528   return Engines::EngineComponent::_duplicate(iobject);
529
530 }
531
532 void Engines_MPIContainer_i::remove_impl(Engines::EngineComponent_ptr component_i)
533 {
534   Engines::MPIObject_ptr pcptr;
535   Engines::MPIObject_ptr spcptr;
536
537   pthread_t *th;
538   if(_numproc == 0){
539     pcptr = (Engines::MPIObject_ptr)component_i;
540     th = new pthread_t[_nbproc];
541     for(int ip=1;ip<_nbproc;ip++){
542       thread_st *st = new thread_st;
543       st->ip = ip;
544       st->tior = _tior;
545       spcptr = Engines::MPIObject::_narrow((*(pcptr->tior()))[ip]);
546       st->cptr = (Engines::EngineComponent_ptr)spcptr;
547       pthread_create(&(th[ip]),NULL,th_removeimpl,(void*)st);
548     }
549   }
550
551   ASSERT(! CORBA::is_nil(component_i));
552   std::string instanceName = component_i->instanceName() ;
553   MESSAGE("[" << _numproc << "] unload component " << instanceName);
554   _numInstanceMutex.lock() ; // lock on the remove on handle_map
555   _listInstances_map.erase(instanceName);
556   _numInstanceMutex.unlock() ;
557   component_i->destroy() ;
558   if(_numproc == 0)
559     _NS->Destroy_Name(instanceName.c_str());
560
561   if(_numproc == 0){
562     for(int ip=1;ip<_nbproc;ip++)
563       pthread_join(th[ip],NULL);
564     delete th;
565   }
566
567 }
568
569 void Engines_MPIContainer_i::finalize_removal()
570 {
571   pthread_t *th;
572   if(_numproc == 0){
573     th = new pthread_t[_nbproc];
574     for(int ip=1;ip<_nbproc;ip++){
575       thread_st *st = new thread_st;
576       st->ip = ip;
577       st->tior = _tior;
578       pthread_create(&(th[ip]),NULL,th_finalizeremoval,(void*)st);
579     }
580   }
581
582   _numInstanceMutex.lock(); // lock to be alone
583   // (see decInstanceCnt, load_component_Library)
584   std::map<std::string, void *>::iterator ith;
585   for (ith = _toRemove_map.begin(); ith != _toRemove_map.end(); ith++)
586   {
587     void *handle = (*ith).second;
588     std::string impl_name= (*ith).first;
589     if (handle)
590     {
591       SCRUTE(handle);
592       SCRUTE(impl_name);
593       //        dlclose(handle);                // SALOME unstable after ...
594       //        _library_map.erase(impl_name);
595     }
596   }
597   _toRemove_map.clear();
598   _numInstanceMutex.unlock();
599
600   if(_numproc == 0){
601     for(int ip=1;ip<_nbproc;ip++)
602       pthread_join(th[ip],NULL);
603     delete th;
604   }
605 }
606
607 void *th_loadcomponentlibrary(void *s)
608 {
609   thread_st *st = (thread_st*)s;
610   char* reason;
611   (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->load_component_Library(st->compoName.c_str(),reason);
612   CORBA::string_free(reason);
613   return NULL;
614 }
615
616 void *th_createcomponentinstance(void *s)
617 {
618   thread_st *st = (thread_st*)s;
619   (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->create_component_instance(st->compoName.c_str(),st->studyId);
620   return NULL;
621 }
622
623 void *th_loadimpl(void *s)
624 {
625   thread_st *st = (thread_st*)s;
626   (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->load_impl(st->nameToRegister.c_str(),st->compoName.c_str());
627   return NULL;
628 }
629
630 void *th_removeimpl(void *s)
631 {
632   thread_st *st = (thread_st*)s;
633   (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->remove_impl(st->cptr);
634   return NULL;
635 }
636
637 void *th_finalizeremoval(void *s)
638 {
639   thread_st *st = (thread_st*)s;
640   (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->finalize_removal();
641   return NULL;
642 }