]> SALOME platform Git repositories - modules/kernel.git/blob - src/MPIContainer/MPIContainer_i.cxx
Salome HOME
Copyright update: 2016
[modules/kernel.git] / src / MPIContainer / MPIContainer_i.cxx
1 // Copyright (C) 2007-2016  CEA/DEN, EDF R&D, OPEN CASCADE
2 //
3 // Copyright (C) 2003-2007  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 // CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
5 //
6 // This library is free software; you can redistribute it and/or
7 // modify it under the terms of the GNU Lesser General Public
8 // License as published by the Free Software Foundation; either
9 // version 2.1 of the License, or (at your option) any later version.
10 //
11 // This library is distributed in the hope that it will be useful,
12 // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 // Lesser General Public License for more details.
15 //
16 // You should have received a copy of the GNU Lesser General Public
17 // License along with this library; if not, write to the Free Software
18 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
19 //
20 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 //
22 //  File   : MPIContainer_i.cxx
23 //  Module : SALOME
24
25 #include <iostream>
26 #include <dlfcn.h>
27 #include <stdio.h>
28 #include "Basics_Utils.hxx"
29 #include "SALOME_Component_i.hxx"
30 #include "MPIContainer_i.hxx"
31 #include "SALOME_NamingService.hxx"
32 #include "Utils_SINGLETON.hxx"
33 #include "OpUtil.hxx"
34 #include "utilities.h"
35 #include <time.h>
36 #include <sys/time.h>
37 #include <pthread.h>  // must be before Python.h !
38
39 #ifdef _XOPEN_SOURCE
40 #undef _XOPEN_SOURCE
41 #endif
42
43 #include <Python.h>
44 #include "Container_init_python.hxx"
45
46 // L'appel au registry SALOME ne se fait que pour le process 0
47 Engines_MPIContainer_i::Engines_MPIContainer_i(CORBA::ORB_ptr orb, 
48                                                PortableServer::POA_ptr poa,
49                                                char * containerName,
50                                                int argc, char *argv[]) 
51   : Engines_Container_i(orb,poa,containerName,argc,argv,false)
52 {
53
54   _id = _poa->activate_object(this);
55   CORBA::Object_var obj=_poa->id_to_reference(*_id);
56   Engines::Container_var pCont = Engines::Container::_narrow(obj);
57   _remove_ref();
58
59   if(_numproc==0){
60
61     _NS = new SALOME_NamingService();
62     _NS->init_orb( CORBA::ORB::_duplicate(_orb) ) ;
63
64     std::string hostname = Kernel_Utils::GetHostname();
65     _containerName = _NS->BuildContainerNameForNS(containerName,hostname.c_str());
66     SCRUTE(_containerName);
67     _NS->Register(pCont, _containerName.c_str());
68
69   }
70
71   // Root recupere les ior des container des autre process
72   Engines::MPIObject_var pobj = POA_Engines::MPIContainer::_this();
73   BCastIOR(_orb,pobj,true);
74 }
75
76 Engines_MPIContainer_i::Engines_MPIContainer_i() 
77   : Engines_Container_i()
78 {
79 }
80
81 Engines_MPIContainer_i::~Engines_MPIContainer_i(void)
82 {
83   MESSAGE("[" << _numproc << "] Engines_MPIContainer_i::~Engines_MPIContainer_i()");
84 }
85
86 // Load component
87 void Engines_MPIContainer_i::Shutdown()
88 {
89   int ip;
90   MESSAGE("[" << _numproc << "] shutdown of MPI Corba Server");
91   if( _numproc == 0 ){
92     _NS->Destroy_FullDirectory(_containerName.c_str());
93     _NS->Destroy_Name(_containerName.c_str());
94     for(ip= 1;ip<_nbproc;ip++)
95       (Engines::MPIContainer::_narrow((*_tior)[ip]))->Shutdown();
96   }
97
98   std::map<std::string, Engines::EngineComponent_var>::iterator itm;
99   for (itm = _listInstances_map.begin(); itm != _listInstances_map.end(); itm++)
100     {
101       try
102         {
103           itm->second->destroy();
104         }
105       catch(const CORBA::Exception& e)
106         {
107           // ignore this entry and continue
108         }
109       catch(...)
110         {
111           // ignore this entry and continue
112         }
113     }
114
115   _orb->shutdown(0);
116
117 }
118
119 // Load a component library
120 bool Engines_MPIContainer_i::load_component_Library(const char* componentName, CORBA::String_out reason)
121 {
122   reason=CORBA::string_dup("");
123
124   pthread_t *th;
125   if(_numproc == 0){
126     th = new pthread_t[_nbproc];
127     for(int ip=1;ip<_nbproc;ip++){
128       thread_st *st = new thread_st;
129       st->ip = ip;
130       st->tior = _tior;
131       st->compoName = componentName;
132       pthread_create(&(th[ip]),NULL,th_loadcomponentlibrary,(void*)st);
133     }
134   }
135
136   bool ret = Lload_component_Library(componentName);
137
138   if(_numproc == 0){
139     for(int ip=1;ip<_nbproc;ip++)
140       pthread_join(th[ip],NULL);
141     delete th;
142   }
143   return ret;
144 }
145
146 bool Engines_MPIContainer_i::Lload_component_Library(const char* componentName)
147 {
148   std::string aCompName = componentName;
149
150   // --- try dlopen C++ component
151
152   std::string impl_name = std::string ("lib") + aCompName + std::string("Engine.so");
153   
154   _numInstanceMutex.lock(); // lock to be alone 
155   // (see decInstanceCnt, finalize_removal))
156   if (_toRemove_map[impl_name]) _toRemove_map.erase(impl_name);
157   if (_library_map[impl_name])
158     {
159       MESSAGE("[" << _numproc << "] Library " << impl_name << " already loaded");
160       _numInstanceMutex.unlock();
161       return true;
162     }
163   
164   void* handle;
165   handle = dlopen( impl_name.c_str() , RTLD_LAZY ) ;
166   if ( handle )
167     {
168       _library_map[impl_name] = handle;
169       _numInstanceMutex.unlock();
170       MESSAGE("[" << _numproc << "] Library " << impl_name << " loaded");
171       return true;
172     }
173   else
174     {
175       MESSAGE("[" << _numproc << "] Can't load shared library : " << impl_name);
176       MESSAGE("[" << _numproc << "] error dlopen: " << dlerror());
177     }
178   _numInstanceMutex.unlock();
179
180   // --- try import Python component
181
182   INFOS("[" << _numproc << "] try import Python component "<<componentName);
183   if (_isSupervContainer)
184     {
185       INFOS("[" << _numproc << "] Supervision Container does not support Python Component Engines");
186       return false;
187     }
188   if (_library_map[aCompName])
189     {
190       return true; // Python Component, already imported
191     }
192   else
193     {
194       Py_ACQUIRE_NEW_THREAD;
195       PyObject *mainmod = PyImport_AddModule((char *)"__main__");
196       PyObject *globals = PyModule_GetDict(mainmod);
197       PyObject *pyCont = PyDict_GetItemString(globals, "pyCont");
198       PyObject *result = PyObject_CallMethod(pyCont,
199                                              (char*)"import_component",
200                                              (char*)"s",componentName);
201       std::string ret= PyString_AsString(result);
202       SCRUTE(ret);
203       Py_RELEASE_NEW_THREAD;
204   
205       if (ret=="") // import possible: Python component
206         {
207           _library_map[aCompName] = (void *)pyCont; // any non O value OK
208           MESSAGE("[" << _numproc << "] import Python: "<<aCompName<<" OK");
209           return true;
210         }
211     }
212   return false;
213 }
214
215 // Create an instance of component
216 Engines::EngineComponent_ptr
217 Engines_MPIContainer_i::create_component_instance_env( const char* componentName,
218                                                        CORBA::Long studyId,
219                                                        const Engines::FieldsDict& env,
220                                                        CORBA::String_out reason)
221 {
222   reason=CORBA::string_dup("");
223
224   pthread_t *th;
225   if(_numproc == 0){
226     th = new pthread_t[_nbproc];
227     for(int ip=1;ip<_nbproc;ip++){
228       thread_st *st = new thread_st;
229       st->ip = ip;
230       st->tior = _tior;
231       st->compoName = componentName;
232       st->studyId = studyId;
233       pthread_create(&(th[ip]),NULL,th_createcomponentinstance,(void*)st);
234     }
235   }
236
237   Engines::EngineComponent_ptr cptr = Lcreate_component_instance(componentName,studyId);
238
239   if(_numproc == 0){
240     for(int ip=1;ip<_nbproc;ip++)
241       pthread_join(th[ip],NULL);
242     delete th;
243   }
244
245   return cptr;
246 }
247
248 Engines::EngineComponent_ptr
249 Engines_MPIContainer_i::Lcreate_component_instance( const char* genericRegisterName, CORBA::Long studyId)
250 {
251   if (studyId < 0) {
252     INFOS("studyId must be > 0 for mono study instance, =0 for multiStudy");
253     return Engines::EngineComponent::_nil() ;
254   }
255
256   Engines::EngineComponent_var iobject = Engines::EngineComponent::_nil() ;
257   Engines::MPIObject_var pobj;
258
259   std::string aCompName = genericRegisterName;
260   if (_library_map[aCompName]) { // Python component
261     if (_isSupervContainer) {
262       INFOS("Supervision Container does not support Python Component Engines");
263       return Engines::EngineComponent::_nil();
264     }
265     _numInstanceMutex.lock() ; // lock on the instance number
266     _numInstance++ ;
267     int numInstance = _numInstance ;
268     _numInstanceMutex.unlock() ;
269
270     char aNumI[12];
271     sprintf( aNumI , "%d" , numInstance ) ;
272     std::string instanceName = aCompName + "_inst_" + aNumI ;
273     std::string component_registerName =
274       _containerName + "/" + instanceName;
275
276     Py_ACQUIRE_NEW_THREAD;
277     PyObject *mainmod = PyImport_AddModule((char*)"__main__");
278     PyObject *globals = PyModule_GetDict(mainmod);
279     PyObject *pyCont = PyDict_GetItemString(globals, "pyCont");
280     PyObject *result = PyObject_CallMethod(pyCont,
281                                            (char*)"create_component_instance",
282                                            (char*)"ssl",
283                                            aCompName.c_str(),
284                                            instanceName.c_str(),
285                                            studyId);
286     const char *ior;
287     const char *error;
288     PyArg_ParseTuple(result,"ss", &ior, &error);
289     std::string iors = ior;
290     SCRUTE(iors);
291     Py_RELEASE_NEW_THREAD;
292   
293     CORBA::Object_var obj = _orb->string_to_object(iors.c_str());
294     iobject = Engines::EngineComponent::_narrow( obj ) ;
295     pobj = Engines::MPIObject::_narrow(obj) ;
296     if( _numproc == 0 )
297       _NS->Register(iobject, component_registerName.c_str()) ;
298     // Root recupere les ior des composants des autre process
299     BCastIOR(_orb,pobj,false);
300
301     return iobject._retn();
302   }
303   
304   //--- try C++
305
306   std::string impl_name = std::string ("lib") + genericRegisterName +std::string("Engine.so");
307   if (_library_map.count(impl_name) != 0) // C++ component
308     {
309       void* handle = _library_map[impl_name];
310       iobject = createMPIInstance(genericRegisterName,
311                                     handle,
312                                     studyId);
313       return iobject._retn();
314     }
315
316   return Engines::EngineComponent::_nil() ;
317 }
318
319 Engines::EngineComponent_ptr
320 Engines_MPIContainer_i::createMPIInstance(std::string genericRegisterName,
321                                           void *handle,
322                                           int studyId)
323 {
324   Engines::EngineComponent_var iobject;
325   Engines::MPIObject_var pobj;
326   // --- find the factory
327
328   std::string aGenRegisterName = genericRegisterName;
329   std::string factory_name = aGenRegisterName + std::string("Engine_factory");
330
331   typedef  PortableServer::ObjectId * (*MPIFACTORY_FUNCTION)
332     (CORBA::ORB_ptr,
333      PortableServer::POA_ptr, 
334      PortableServer::ObjectId *, 
335      const char *, 
336      const char *) ;
337
338   dlerror();
339   MPIFACTORY_FUNCTION MPIComponent_factory = (MPIFACTORY_FUNCTION) dlsym(handle, factory_name.c_str());
340
341   if ( !MPIComponent_factory )
342     {
343       INFOS( "[" << _numproc << "] Can't resolve symbol: " + factory_name );
344       SCRUTE( dlerror() );
345       pobj = Engines::MPIObject::_nil();
346       BCastIOR(_orb,pobj,false);
347       return Engines::EngineComponent::_nil();
348     }
349
350   // --- create instance
351
352   iobject = Engines::EngineComponent::_nil() ;
353
354   try
355     {
356       _numInstanceMutex.lock() ; // lock on the instance number
357       _numInstance++ ;
358       int numInstance = _numInstance ;
359       _numInstanceMutex.unlock() ;
360
361       char aNumI[12];
362       sprintf( aNumI , "%d" , numInstance ) ;
363       std::string instanceName = aGenRegisterName + "_inst_" + aNumI ;
364       std::string component_registerName =
365         _containerName + "/" + instanceName;
366
367       // --- Instanciate required CORBA object
368
369       PortableServer::ObjectId *id ; //not owner, do not delete (nore use var)
370       id = (MPIComponent_factory) ( _orb, _poa, _id, instanceName.c_str(), aGenRegisterName.c_str() ) ;
371
372       // --- get reference & servant from id
373
374       CORBA::Object_var obj = _poa->id_to_reference(*id);
375       iobject = Engines::EngineComponent::_narrow( obj ) ;
376       pobj = Engines::MPIObject::_narrow(obj) ;
377
378       Engines_Component_i *servant =
379         dynamic_cast<Engines_Component_i*>(_poa->reference_to_servant(iobject));
380       ASSERT(servant);
381       //SCRUTE(servant->pd_refCount);
382       servant->_remove_ref(); // compensate previous id_to_reference 
383       //SCRUTE(servant->pd_refCount);
384       _listInstances_map[instanceName] = iobject;
385       _cntInstances_map[aGenRegisterName] += 1;
386       //SCRUTE(servant->pd_refCount);
387 #ifndef _DEBUG_
388       servant->setStudyId(studyId);
389 #else
390       bool ret_studyId = servant->setStudyId(studyId);
391       ASSERT(ret_studyId);
392 #endif
393
394       // --- register the engine under the name
395       //     containerName(.dir)/instanceName(.object)
396
397       if( _numproc == 0 ){
398         _NS->Register( iobject , component_registerName.c_str() ) ;
399         MESSAGE( component_registerName.c_str() << " bound" ) ;
400       }
401       // Root recupere les ior des composants des autre process
402       BCastIOR(_orb,pobj,false);
403
404     }
405   catch(const std::exception &ex){
406     INFOS( ex.what() ) ;
407     return Engines::EngineComponent::_nil();
408   }
409   return iobject._retn();
410 }
411
412 // Load component
413 Engines::EngineComponent_ptr Engines_MPIContainer_i::load_impl(const char* nameToRegister,
414                                                  const char* componentName)
415 {
416   pthread_t *th;
417   if(_numproc == 0){
418     th = new pthread_t[_nbproc];
419     for(int ip=1;ip<_nbproc;ip++){
420       thread_st *st = new thread_st;
421       st->ip = ip;
422       st->tior = _tior;
423       st->nameToRegister = nameToRegister;
424       st->compoName = componentName;
425       pthread_create(&(th[ip]),NULL,th_loadimpl,(void*)st);
426     }
427   }
428
429   Engines::EngineComponent_ptr cptr =  Lload_impl(nameToRegister,componentName);
430
431   if(_numproc == 0){
432     for(int ip=1;ip<_nbproc;ip++)
433       pthread_join(th[ip],NULL);
434     delete th;
435   }
436
437   return cptr;
438 }
439
440 // Load component
441 Engines::EngineComponent_ptr Engines_MPIContainer_i::Lload_impl(
442                                    const char* nameToRegister,
443                                    const char* componentName)
444 {
445   Engines::EngineComponent_var iobject;
446   Engines::MPIObject_var pobj;
447   char cproc[4];
448
449   sprintf(cproc,"_%d",_numproc);
450
451   BEGIN_OF("[" << _numproc << "] MPIContainer_i::Lload_impl");
452
453   _numInstanceMutex.lock() ; // lock on the instance number
454   _numInstance++ ;
455   char _aNumI[12];
456   sprintf(_aNumI,"%d",_numInstance) ;
457
458   std::string _impl_name = componentName;
459   std::string _nameToRegister = nameToRegister;
460   std::string instanceName = _nameToRegister + "_inst_" + _aNumI + cproc;
461   MESSAGE("[" << _numproc << "] instanceName=" << instanceName);
462
463   std::string absolute_impl_name(_impl_name);
464   MESSAGE("[" << _numproc << "] absolute_impl_name=" << absolute_impl_name);
465   void * handle = dlopen(absolute_impl_name.c_str(), RTLD_LAZY);
466   if(!handle){
467     INFOS("[" << _numproc << "] Can't load shared library : " << absolute_impl_name);
468     INFOS("[" << _numproc << "] error dlopen: " << dlerror());
469     return Engines::EngineComponent::_nil() ;
470   }
471
472   std::string factory_name = _nameToRegister + std::string("Engine_factory");
473   MESSAGE("[" << _numproc << "] factory_name=" << factory_name) ;
474
475   dlerror();
476   PortableServer::ObjectId * (*MPIComponent_factory) (CORBA::ORB_ptr,
477                                                   PortableServer::POA_ptr,
478                                                   PortableServer::ObjectId *,
479                                                   const char *,
480                                                   const char *) =
481     (PortableServer::ObjectId * (*) (CORBA::ORB_ptr,
482                                      PortableServer::POA_ptr, 
483                                      PortableServer::ObjectId *, 
484                                      const char *, 
485                                      const char *)) 
486     dlsym(handle, factory_name.c_str());
487
488   char *error ;
489   if ((error = dlerror()) != NULL){
490     // Try to load a sequential component
491     MESSAGE("[" << _numproc << "] Try to load a sequential component");
492     _numInstanceMutex.unlock() ;
493     iobject = Engines_Container_i::load_impl(nameToRegister,componentName);
494     if( CORBA::is_nil(iobject) ) return Engines::EngineComponent::_duplicate(iobject);
495   }
496   else{
497     // Instanciation du composant parallele
498     MESSAGE("[" << _numproc << "] Try to load a parallel component");
499     PortableServer::ObjectId * id = (MPIComponent_factory)
500       (_orb, _poa, _id, instanceName.c_str(), _nameToRegister.c_str());
501     // get reference from id
502     CORBA::Object_var o = _poa->id_to_reference(*id);
503     pobj = Engines::MPIObject::_narrow(o) ;
504     iobject = Engines::EngineComponent::_narrow(o) ;
505   }
506
507   if( _numproc == 0 ){
508     // utiliser + tard le registry ici :
509     // register the engine under the name containerName.dir/nameToRegister.object
510     std::string component_registerName = _containerName + "/" + _nameToRegister;
511     _NS->Register(iobject, component_registerName.c_str()) ;
512   }
513
514   _numInstanceMutex.unlock() ;
515
516   // Root recupere les ior des composants des autre process
517   BCastIOR(_orb,pobj,false);
518
519   END_OF("[" <<_numproc << "] MPIContainer_i::Lload_impl");
520   return Engines::EngineComponent::_duplicate(iobject);
521
522 }
523
524 void Engines_MPIContainer_i::remove_impl(Engines::EngineComponent_ptr component_i)
525 {
526   Engines::MPIObject_ptr pcptr;
527   Engines::MPIObject_ptr spcptr;
528
529   pthread_t *th;
530   if(_numproc == 0){
531     pcptr = (Engines::MPIObject_ptr)component_i;
532     th = new pthread_t[_nbproc];
533     for(int ip=1;ip<_nbproc;ip++){
534       thread_st *st = new thread_st;
535       st->ip = ip;
536       st->tior = _tior;
537       spcptr = Engines::MPIObject::_narrow((*(pcptr->tior()))[ip]);
538       st->cptr = (Engines::EngineComponent_ptr)spcptr;
539       pthread_create(&(th[ip]),NULL,th_removeimpl,(void*)st);
540     }
541   }
542
543   ASSERT(! CORBA::is_nil(component_i));
544   std::string instanceName = component_i->instanceName() ;
545   MESSAGE("[" << _numproc << "] unload component " << instanceName);
546   _numInstanceMutex.lock() ; // lock on the remove on handle_map
547   _listInstances_map.erase(instanceName);
548   _numInstanceMutex.unlock() ;
549   component_i->destroy() ;
550   if(_numproc == 0)
551     _NS->Destroy_Name(instanceName.c_str());
552
553   if(_numproc == 0){
554     for(int ip=1;ip<_nbproc;ip++)
555       pthread_join(th[ip],NULL);
556     delete th;
557   }
558
559 }
560
561 void Engines_MPIContainer_i::finalize_removal()
562 {
563   pthread_t *th;
564   if(_numproc == 0){
565     th = new pthread_t[_nbproc];
566     for(int ip=1;ip<_nbproc;ip++){
567       thread_st *st = new thread_st;
568       st->ip = ip;
569       st->tior = _tior;
570       pthread_create(&(th[ip]),NULL,th_finalizeremoval,(void*)st);
571     }
572   }
573
574   _numInstanceMutex.lock(); // lock to be alone
575   // (see decInstanceCnt, load_component_Library)
576   std::map<std::string, void *>::iterator ith;
577   for (ith = _toRemove_map.begin(); ith != _toRemove_map.end(); ith++)
578   {
579     void *handle = (*ith).second;
580     std::string impl_name= (*ith).first;
581     if (handle)
582     {
583       SCRUTE(handle);
584       SCRUTE(impl_name);
585       //        dlclose(handle);                // SALOME unstable after ...
586       //        _library_map.erase(impl_name);
587     }
588   }
589   _toRemove_map.clear();
590   _numInstanceMutex.unlock();
591
592   if(_numproc == 0){
593     for(int ip=1;ip<_nbproc;ip++)
594       pthread_join(th[ip],NULL);
595     delete th;
596   }
597 }
598
599 void *th_loadcomponentlibrary(void *s)
600 {
601   thread_st *st = (thread_st*)s;
602   char* reason;
603   (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->load_component_Library(st->compoName.c_str(),reason);
604   CORBA::string_free(reason);
605   return NULL;
606 }
607
608 void *th_createcomponentinstance(void *s)
609 {
610   thread_st *st = (thread_st*)s;
611   (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->create_component_instance(st->compoName.c_str(),st->studyId);
612   return NULL;
613 }
614
615 void *th_loadimpl(void *s)
616 {
617   thread_st *st = (thread_st*)s;
618   (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->load_impl(st->nameToRegister.c_str(),st->compoName.c_str());
619   return NULL;
620 }
621
622 void *th_removeimpl(void *s)
623 {
624   thread_st *st = (thread_st*)s;
625   (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->remove_impl(st->cptr);
626   return NULL;
627 }
628
629 void *th_finalizeremoval(void *s)
630 {
631   thread_st *st = (thread_st*)s;
632   (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->finalize_removal();
633   return NULL;
634 }