]> SALOME platform Git repositories - modules/kernel.git/blob - src/MPIContainer/MPIContainer_i.cxx
Salome HOME
Merge from V6_main_20120808 08Aug12
[modules/kernel.git] / src / MPIContainer / MPIContainer_i.cxx
1 // Copyright (C) 2007-2012  CEA/DEN, EDF R&D, OPEN CASCADE
2 //
3 // Copyright (C) 2003-2007  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 // CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
5 //
6 // This library is free software; you can redistribute it and/or
7 // modify it under the terms of the GNU Lesser General Public
8 // License as published by the Free Software Foundation; either
9 // version 2.1 of the License.
10 //
11 // This library is distributed in the hope that it will be useful,
12 // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 // Lesser General Public License for more details.
15 //
16 // You should have received a copy of the GNU Lesser General Public
17 // License along with this library; if not, write to the Free Software
18 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
19 //
20 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 //
22 //  File   : MPIContainer_i.cxx
23 //  Module : SALOME
24
25 #include <iostream>
26 #include <dlfcn.h>
27 #include <stdio.h>
28 #include "Basics_Utils.hxx"
29 #include "SALOME_Component_i.hxx"
30 #include "MPIContainer_i.hxx"
31 #include "SALOME_NamingService.hxx"
32 #include "Utils_SINGLETON.hxx"
33 #include "OpUtil.hxx"
34 #include "utilities.h"
35 #include <time.h>
36 #include <sys/time.h>
37 #include <pthread.h>  // must be before Python.h !
38 #include <Python.h>
39 #include "Container_init_python.hxx"
40
41 // L'appel au registry SALOME ne se fait que pour le process 0
42 Engines_MPIContainer_i::Engines_MPIContainer_i(CORBA::ORB_ptr orb, 
43                                                PortableServer::POA_ptr poa,
44                                                char * containerName,
45                                                int argc, char *argv[]) 
46   : Engines_Container_i(orb,poa,containerName,argc,argv,false)
47 {
48
49   _id = _poa->activate_object(this);
50   CORBA::Object_var obj=_poa->id_to_reference(*_id);
51   Engines::Container_var pCont = Engines::Container::_narrow(obj);
52   _remove_ref();
53
54   if(_numproc==0){
55
56     _NS = new SALOME_NamingService();
57     _NS->init_orb( CORBA::ORB::_duplicate(_orb) ) ;
58
59     std::string hostname = Kernel_Utils::GetHostname();
60     _containerName = _NS->BuildContainerNameForNS(containerName,hostname.c_str());
61     SCRUTE(_containerName);
62     _NS->Register(pCont, _containerName.c_str());
63
64   }
65
66   // Root recupere les ior des container des autre process
67   Engines::MPIObject_var pobj = POA_Engines::MPIContainer::_this();
68   BCastIOR(_orb,pobj,true);
69 }
70
71 Engines_MPIContainer_i::Engines_MPIContainer_i() 
72   : Engines_Container_i()
73 {
74 }
75
76 Engines_MPIContainer_i::~Engines_MPIContainer_i(void)
77 {
78   MESSAGE("[" << _numproc << "] Engines_MPIContainer_i::~Engines_MPIContainer_i()");
79 }
80
81 // Load component
82 void Engines_MPIContainer_i::Shutdown()
83 {
84   int ip;
85   MESSAGE("[" << _numproc << "] shutdown of MPI Corba Server");
86   if( _numproc == 0 ){
87     _NS->Destroy_FullDirectory(_containerName.c_str());
88     _NS->Destroy_Name(_containerName.c_str());
89     for(ip= 1;ip<_nbproc;ip++)
90       (Engines::MPIContainer::_narrow((*_tior)[ip]))->Shutdown();
91   }
92
93   std::map<std::string, Engines::EngineComponent_var>::iterator itm;
94   for (itm = _listInstances_map.begin(); itm != _listInstances_map.end(); itm++)
95     {
96       try
97         {
98           itm->second->destroy();
99         }
100       catch(const CORBA::Exception& e)
101         {
102           // ignore this entry and continue
103         }
104       catch(...)
105         {
106           // ignore this entry and continue
107         }
108     }
109
110   _orb->shutdown(0);
111
112 }
113
114 // Load a component library
115 bool Engines_MPIContainer_i::load_component_Library(const char* componentName, CORBA::String_out reason)
116 {
117   reason=CORBA::string_dup("");
118
119   pthread_t *th;
120   if(_numproc == 0){
121     th = new pthread_t[_nbproc];
122     for(int ip=1;ip<_nbproc;ip++){
123       thread_st *st = new thread_st;
124       st->ip = ip;
125       st->tior = _tior;
126       st->compoName = componentName;
127       pthread_create(&(th[ip]),NULL,th_loadcomponentlibrary,(void*)st);
128     }
129   }
130
131   bool ret = Lload_component_Library(componentName);
132
133   if(_numproc == 0){
134     for(int ip=1;ip<_nbproc;ip++)
135       pthread_join(th[ip],NULL);
136     delete th;
137   }
138   return ret;
139 }
140
141 bool Engines_MPIContainer_i::Lload_component_Library(const char* componentName)
142 {
143   std::string aCompName = componentName;
144
145   // --- try dlopen C++ component
146
147   std::string impl_name = std::string ("lib") + aCompName + std::string("Engine.so");
148   
149   _numInstanceMutex.lock(); // lock to be alone 
150   // (see decInstanceCnt, finalize_removal))
151   if (_toRemove_map[impl_name]) _toRemove_map.erase(impl_name);
152   if (_library_map[impl_name])
153     {
154       MESSAGE("[" << _numproc << "] Library " << impl_name << " already loaded");
155       _numInstanceMutex.unlock();
156       return true;
157     }
158   
159   void* handle;
160   handle = dlopen( impl_name.c_str() , RTLD_LAZY ) ;
161   if ( handle )
162     {
163       _library_map[impl_name] = handle;
164       _numInstanceMutex.unlock();
165       MESSAGE("[" << _numproc << "] Library " << impl_name << " loaded");
166       return true;
167     }
168   else
169     {
170       MESSAGE("[" << _numproc << "] Can't load shared library : " << impl_name);
171       MESSAGE("[" << _numproc << "] error dlopen: " << dlerror());
172     }
173   _numInstanceMutex.unlock();
174
175   // --- try import Python component
176
177   INFOS("[" << _numproc << "] try import Python component "<<componentName);
178   if (_isSupervContainer)
179     {
180       INFOS("[" << _numproc << "] Supervision Container does not support Python Component Engines");
181       return false;
182     }
183   if (_library_map[aCompName])
184     {
185       return true; // Python Component, already imported
186     }
187   else
188     {
189       Py_ACQUIRE_NEW_THREAD;
190       PyObject *mainmod = PyImport_AddModule((char *)"__main__");
191       PyObject *globals = PyModule_GetDict(mainmod);
192       PyObject *pyCont = PyDict_GetItemString(globals, "pyCont");
193       PyObject *result = PyObject_CallMethod(pyCont,
194                                              (char*)"import_component",
195                                              (char*)"s",componentName);
196       std::string ret= PyString_AsString(result);
197       SCRUTE(ret);
198       Py_RELEASE_NEW_THREAD;
199   
200       if (ret=="") // import possible: Python component
201         {
202           _library_map[aCompName] = (void *)pyCont; // any non O value OK
203           MESSAGE("[" << _numproc << "] import Python: "<<aCompName<<" OK");
204           return true;
205         }
206     }
207   return false;
208 }
209
210 // Create an instance of component
211 Engines::EngineComponent_ptr
212 Engines_MPIContainer_i::create_component_instance_env( const char* componentName,
213                                                        CORBA::Long studyId,
214                                                        const Engines::FieldsDict& env,
215                                                        CORBA::String_out reason)
216 {
217   reason=CORBA::string_dup("");
218
219   pthread_t *th;
220   if(_numproc == 0){
221     th = new pthread_t[_nbproc];
222     for(int ip=1;ip<_nbproc;ip++){
223       thread_st *st = new thread_st;
224       st->ip = ip;
225       st->tior = _tior;
226       st->compoName = componentName;
227       st->studyId = studyId;
228       pthread_create(&(th[ip]),NULL,th_createcomponentinstance,(void*)st);
229     }
230   }
231
232   Engines::EngineComponent_ptr cptr = Lcreate_component_instance(componentName,studyId);
233
234   if(_numproc == 0){
235     for(int ip=1;ip<_nbproc;ip++)
236       pthread_join(th[ip],NULL);
237     delete th;
238   }
239
240   return cptr;
241 }
242
243 Engines::EngineComponent_ptr
244 Engines_MPIContainer_i::Lcreate_component_instance( const char* genericRegisterName, CORBA::Long studyId)
245 {
246   if (studyId < 0) {
247     INFOS("studyId must be > 0 for mono study instance, =0 for multiStudy");
248     return Engines::EngineComponent::_nil() ;
249   }
250
251   Engines::EngineComponent_var iobject = Engines::EngineComponent::_nil() ;
252   Engines::MPIObject_var pobj;
253
254   std::string aCompName = genericRegisterName;
255   if (_library_map[aCompName]) { // Python component
256     if (_isSupervContainer) {
257       INFOS("Supervision Container does not support Python Component Engines");
258       return Engines::EngineComponent::_nil();
259     }
260     _numInstanceMutex.lock() ; // lock on the instance number
261     _numInstance++ ;
262     int numInstance = _numInstance ;
263     _numInstanceMutex.unlock() ;
264
265     char aNumI[12];
266     sprintf( aNumI , "%d" , numInstance ) ;
267     std::string instanceName = aCompName + "_inst_" + aNumI ;
268     std::string component_registerName =
269       _containerName + "/" + instanceName;
270
271     Py_ACQUIRE_NEW_THREAD;
272     PyObject *mainmod = PyImport_AddModule((char*)"__main__");
273     PyObject *globals = PyModule_GetDict(mainmod);
274     PyObject *pyCont = PyDict_GetItemString(globals, "pyCont");
275     PyObject *result = PyObject_CallMethod(pyCont,
276                                            (char*)"create_component_instance",
277                                            (char*)"ssl",
278                                            aCompName.c_str(),
279                                            instanceName.c_str(),
280                                            studyId);
281     const char *ior;
282     const char *error;
283     PyArg_ParseTuple(result,"ss", &ior, &error);
284     std::string iors = ior;
285     SCRUTE(iors);
286     Py_RELEASE_NEW_THREAD;
287   
288     CORBA::Object_var obj = _orb->string_to_object(iors.c_str());
289     iobject = Engines::EngineComponent::_narrow( obj ) ;
290     pobj = Engines::MPIObject::_narrow(obj) ;
291     if( _numproc == 0 )
292       _NS->Register(iobject, component_registerName.c_str()) ;
293     // Root recupere les ior des composants des autre process
294     BCastIOR(_orb,pobj,false);
295
296     return iobject._retn();
297   }
298   
299   //--- try C++
300
301   std::string impl_name = std::string ("lib") + genericRegisterName +std::string("Engine.so");
302   if (_library_map.count(impl_name) != 0) // C++ component
303     {
304       void* handle = _library_map[impl_name];
305       iobject = createMPIInstance(genericRegisterName,
306                                     handle,
307                                     studyId);
308       return iobject._retn();
309     }
310
311   return Engines::EngineComponent::_nil() ;
312 }
313
314 Engines::EngineComponent_ptr
315 Engines_MPIContainer_i::createMPIInstance(std::string genericRegisterName,
316                                           void *handle,
317                                           int studyId)
318 {
319   Engines::EngineComponent_var iobject;
320   Engines::MPIObject_var pobj;
321   // --- find the factory
322
323   std::string aGenRegisterName = genericRegisterName;
324   std::string factory_name = aGenRegisterName + std::string("Engine_factory");
325
326   typedef  PortableServer::ObjectId * (*MPIFACTORY_FUNCTION)
327     (CORBA::ORB_ptr,
328      PortableServer::POA_ptr, 
329      PortableServer::ObjectId *, 
330      const char *, 
331      const char *) ;
332
333   dlerror();
334   MPIFACTORY_FUNCTION MPIComponent_factory = (MPIFACTORY_FUNCTION) dlsym(handle, factory_name.c_str());
335
336   if ( !MPIComponent_factory )
337     {
338       INFOS( "[" << _numproc << "] Can't resolve symbol: " + factory_name );
339       SCRUTE( dlerror() );
340       pobj = Engines::MPIObject::_nil();
341       BCastIOR(_orb,pobj,false);
342       return Engines::EngineComponent::_nil();
343     }
344
345   // --- create instance
346
347   iobject = Engines::EngineComponent::_nil() ;
348
349   try
350     {
351       _numInstanceMutex.lock() ; // lock on the instance number
352       _numInstance++ ;
353       int numInstance = _numInstance ;
354       _numInstanceMutex.unlock() ;
355
356       char aNumI[12];
357       sprintf( aNumI , "%d" , numInstance ) ;
358       std::string instanceName = aGenRegisterName + "_inst_" + aNumI ;
359       std::string component_registerName =
360         _containerName + "/" + instanceName;
361
362       // --- Instanciate required CORBA object
363
364       PortableServer::ObjectId *id ; //not owner, do not delete (nore use var)
365       id = (MPIComponent_factory) ( _orb, _poa, _id, instanceName.c_str(), aGenRegisterName.c_str() ) ;
366
367       // --- get reference & servant from id
368
369       CORBA::Object_var obj = _poa->id_to_reference(*id);
370       iobject = Engines::EngineComponent::_narrow( obj ) ;
371       pobj = Engines::MPIObject::_narrow(obj) ;
372
373       Engines_Component_i *servant =
374         dynamic_cast<Engines_Component_i*>(_poa->reference_to_servant(iobject));
375       ASSERT(servant);
376       //SCRUTE(servant->pd_refCount);
377       servant->_remove_ref(); // compensate previous id_to_reference 
378       //SCRUTE(servant->pd_refCount);
379       _listInstances_map[instanceName] = iobject;
380       _cntInstances_map[aGenRegisterName] += 1;
381       //SCRUTE(servant->pd_refCount);
382 #ifndef _DEBUG_
383       servant->setStudyId(studyId);
384 #else
385       bool ret_studyId = servant->setStudyId(studyId);
386       ASSERT(ret_studyId);
387 #endif
388
389       // --- register the engine under the name
390       //     containerName(.dir)/instanceName(.object)
391
392       if( _numproc == 0 ){
393         _NS->Register( iobject , component_registerName.c_str() ) ;
394         MESSAGE( component_registerName.c_str() << " bound" ) ;
395       }
396       // Root recupere les ior des composants des autre process
397       BCastIOR(_orb,pobj,false);
398
399     }
400   catch(const std::exception &ex){
401     INFOS( ex.what() ) ;
402     return Engines::EngineComponent::_nil();
403   }
404   return iobject._retn();
405 }
406
407 // Load component
408 Engines::EngineComponent_ptr Engines_MPIContainer_i::load_impl(const char* nameToRegister,
409                                                  const char* componentName)
410 {
411   pthread_t *th;
412   if(_numproc == 0){
413     th = new pthread_t[_nbproc];
414     for(int ip=1;ip<_nbproc;ip++){
415       thread_st *st = new thread_st;
416       st->ip = ip;
417       st->tior = _tior;
418       st->nameToRegister = nameToRegister;
419       st->compoName = componentName;
420       pthread_create(&(th[ip]),NULL,th_loadimpl,(void*)st);
421     }
422   }
423
424   Engines::EngineComponent_ptr cptr =  Lload_impl(nameToRegister,componentName);
425
426   if(_numproc == 0){
427     for(int ip=1;ip<_nbproc;ip++)
428       pthread_join(th[ip],NULL);
429     delete th;
430   }
431
432   return cptr;
433 }
434
435 // Load component
436 Engines::EngineComponent_ptr Engines_MPIContainer_i::Lload_impl(
437                                    const char* nameToRegister,
438                                    const char* componentName)
439 {
440   Engines::EngineComponent_var iobject;
441   Engines::MPIObject_var pobj;
442   char cproc[4];
443
444   sprintf(cproc,"_%d",_numproc);
445
446   BEGIN_OF("[" << _numproc << "] MPIContainer_i::Lload_impl");
447
448   _numInstanceMutex.lock() ; // lock on the instance number
449   _numInstance++ ;
450   char _aNumI[12];
451   sprintf(_aNumI,"%d",_numInstance) ;
452
453   std::string _impl_name = componentName;
454   std::string _nameToRegister = nameToRegister;
455   std::string instanceName = _nameToRegister + "_inst_" + _aNumI + cproc;
456   MESSAGE("[" << _numproc << "] instanceName=" << instanceName);
457
458   std::string absolute_impl_name(_impl_name);
459   MESSAGE("[" << _numproc << "] absolute_impl_name=" << absolute_impl_name);
460   void * handle = dlopen(absolute_impl_name.c_str(), RTLD_LAZY);
461   if(!handle){
462     INFOS("[" << _numproc << "] Can't load shared library : " << absolute_impl_name);
463     INFOS("[" << _numproc << "] error dlopen: " << dlerror());
464     return Engines::EngineComponent::_nil() ;
465   }
466
467   std::string factory_name = _nameToRegister + std::string("Engine_factory");
468   MESSAGE("[" << _numproc << "] factory_name=" << factory_name) ;
469
470   dlerror();
471   PortableServer::ObjectId * (*MPIComponent_factory) (CORBA::ORB_ptr,
472                                                   PortableServer::POA_ptr,
473                                                   PortableServer::ObjectId *,
474                                                   const char *,
475                                                   const char *) =
476     (PortableServer::ObjectId * (*) (CORBA::ORB_ptr,
477                                      PortableServer::POA_ptr, 
478                                      PortableServer::ObjectId *, 
479                                      const char *, 
480                                      const char *)) 
481     dlsym(handle, factory_name.c_str());
482
483   char *error ;
484   if ((error = dlerror()) != NULL){
485     // Try to load a sequential component
486     MESSAGE("[" << _numproc << "] Try to load a sequential component");
487     _numInstanceMutex.unlock() ;
488     iobject = Engines_Container_i::load_impl(nameToRegister,componentName);
489     if( CORBA::is_nil(iobject) ) return Engines::EngineComponent::_duplicate(iobject);
490   }
491   else{
492     // Instanciation du composant parallele
493     MESSAGE("[" << _numproc << "] Try to load a parallel component");
494     PortableServer::ObjectId * id = (MPIComponent_factory)
495       (_orb, _poa, _id, instanceName.c_str(), _nameToRegister.c_str());
496     // get reference from id
497     CORBA::Object_var o = _poa->id_to_reference(*id);
498     pobj = Engines::MPIObject::_narrow(o) ;
499     iobject = Engines::EngineComponent::_narrow(o) ;
500   }
501
502   if( _numproc == 0 ){
503     // utiliser + tard le registry ici :
504     // register the engine under the name containerName.dir/nameToRegister.object
505     std::string component_registerName = _containerName + "/" + _nameToRegister;
506     _NS->Register(iobject, component_registerName.c_str()) ;
507   }
508
509   _numInstanceMutex.unlock() ;
510
511   // Root recupere les ior des composants des autre process
512   BCastIOR(_orb,pobj,false);
513
514   END_OF("[" <<_numproc << "] MPIContainer_i::Lload_impl");
515   return Engines::EngineComponent::_duplicate(iobject);
516
517 }
518
519 void Engines_MPIContainer_i::remove_impl(Engines::EngineComponent_ptr component_i)
520 {
521   Engines::MPIObject_ptr pcptr;
522   Engines::MPIObject_ptr spcptr;
523
524   pthread_t *th;
525   if(_numproc == 0){
526     pcptr = (Engines::MPIObject_ptr)component_i;
527     th = new pthread_t[_nbproc];
528     for(int ip=1;ip<_nbproc;ip++){
529       thread_st *st = new thread_st;
530       st->ip = ip;
531       st->tior = _tior;
532       spcptr = Engines::MPIObject::_narrow((*(pcptr->tior()))[ip]);
533       st->cptr = (Engines::EngineComponent_ptr)spcptr;
534       pthread_create(&(th[ip]),NULL,th_removeimpl,(void*)st);
535     }
536   }
537
538   ASSERT(! CORBA::is_nil(component_i));
539   std::string instanceName = component_i->instanceName() ;
540   MESSAGE("[" << _numproc << "] unload component " << instanceName);
541   _numInstanceMutex.lock() ; // lock on the remove on handle_map
542   _listInstances_map.erase(instanceName);
543   _numInstanceMutex.unlock() ;
544   component_i->destroy() ;
545   if(_numproc == 0)
546     _NS->Destroy_Name(instanceName.c_str());
547
548   if(_numproc == 0){
549     for(int ip=1;ip<_nbproc;ip++)
550       pthread_join(th[ip],NULL);
551     delete th;
552   }
553
554 }
555
556 void Engines_MPIContainer_i::finalize_removal()
557 {
558   pthread_t *th;
559   if(_numproc == 0){
560     th = new pthread_t[_nbproc];
561     for(int ip=1;ip<_nbproc;ip++){
562       thread_st *st = new thread_st;
563       st->ip = ip;
564       st->tior = _tior;
565       pthread_create(&(th[ip]),NULL,th_finalizeremoval,(void*)st);
566     }
567   }
568
569   _numInstanceMutex.lock(); // lock to be alone
570   // (see decInstanceCnt, load_component_Library)
571   std::map<std::string, void *>::iterator ith;
572   for (ith = _toRemove_map.begin(); ith != _toRemove_map.end(); ith++)
573   {
574     void *handle = (*ith).second;
575     std::string impl_name= (*ith).first;
576     if (handle)
577     {
578       SCRUTE(handle);
579       SCRUTE(impl_name);
580       //        dlclose(handle);                // SALOME unstable after ...
581       //        _library_map.erase(impl_name);
582     }
583   }
584   _toRemove_map.clear();
585   _numInstanceMutex.unlock();
586
587   if(_numproc == 0){
588     for(int ip=1;ip<_nbproc;ip++)
589       pthread_join(th[ip],NULL);
590     delete th;
591   }
592 }
593
594 void *th_loadcomponentlibrary(void *s)
595 {
596   thread_st *st = (thread_st*)s;
597   char* reason;
598   (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->load_component_Library(st->compoName.c_str(),reason);
599   CORBA::string_free(reason);
600   return NULL;
601 }
602
603 void *th_createcomponentinstance(void *s)
604 {
605   thread_st *st = (thread_st*)s;
606   (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->create_component_instance(st->compoName.c_str(),st->studyId);
607   return NULL;
608 }
609
610 void *th_loadimpl(void *s)
611 {
612   thread_st *st = (thread_st*)s;
613   (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->load_impl(st->nameToRegister.c_str(),st->compoName.c_str());
614   return NULL;
615 }
616
617 void *th_removeimpl(void *s)
618 {
619   thread_st *st = (thread_st*)s;
620   (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->remove_impl(st->cptr);
621   return NULL;
622 }
623
624 void *th_finalizeremoval(void *s)
625 {
626   thread_st *st = (thread_st*)s;
627   (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->finalize_removal();
628   return NULL;
629 }