Salome HOME
Fix configuration / compilation problems in KERNEL with libBatch (caused by wrong...
[modules/kernel.git] / src / MPIContainer / MPIContainer_i.cxx
1 //  Copyright (C) 2007-2010  CEA/DEN, EDF R&D, OPEN CASCADE
2 //
3 //  Copyright (C) 2003-2007  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 //  CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
5 //
6 //  This library is free software; you can redistribute it and/or
7 //  modify it under the terms of the GNU Lesser General Public
8 //  License as published by the Free Software Foundation; either
9 //  version 2.1 of the License.
10 //
11 //  This library is distributed in the hope that it will be useful,
12 //  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 //  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 //  Lesser General Public License for more details.
15 //
16 //  You should have received a copy of the GNU Lesser General Public
17 //  License along with this library; if not, write to the Free Software
18 //  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
19 //
20 //  See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 //
22 //  SALOME MPIContainer : implemenation of container based on MPI libraries
23 //  File   : MPIContainer_i.cxx
24 //  Module : SALOME
25
26 #include <iostream>
27 #include <dlfcn.h>
28 #include <stdio.h>
29 #include "Basics_Utils.hxx"
30 #include "SALOME_Component_i.hxx"
31 #include "MPIContainer_i.hxx"
32 #include "SALOME_NamingService.hxx"
33 #include "Utils_SINGLETON.hxx"
34 #include "OpUtil.hxx"
35 #include "utilities.h"
36 #include <time.h>
37 #include <sys/time.h>
38 #include <pthread.h>  // must be before Python.h !
39 #include <Python.h>
40 #include "Container_init_python.hxx"
41
42 // L'appel au registry SALOME ne se fait que pour le process 0
43 Engines_MPIContainer_i::Engines_MPIContainer_i(CORBA::ORB_ptr orb, 
44                                                PortableServer::POA_ptr poa,
45                                                char * containerName,
46                                                int argc, char *argv[]) 
47   : Engines_Container_i(orb,poa,containerName,argc,argv,false)
48 {
49
50   _id = _poa->activate_object(this);
51   CORBA::Object_var obj=_poa->id_to_reference(*_id);
52   Engines::Container_var pCont = Engines::Container::_narrow(obj);
53   _remove_ref();
54
55   if(_numproc==0){
56
57     _NS = new SALOME_NamingService();
58     _NS->init_orb( CORBA::ORB::_duplicate(_orb) ) ;
59
60     std::string hostname = Kernel_Utils::GetHostname();
61     _containerName = _NS->BuildContainerNameForNS(containerName,hostname.c_str());
62     SCRUTE(_containerName);
63     _NS->Register(pCont, _containerName.c_str());
64
65   }
66
67   // Root recupere les ior des container des autre process
68   Engines::MPIObject_var pobj = POA_Engines::MPIContainer::_this();
69   BCastIOR(_orb,pobj,true);
70 }
71
72 Engines_MPIContainer_i::Engines_MPIContainer_i() 
73   : Engines_Container_i()
74 {
75 }
76
77 Engines_MPIContainer_i::~Engines_MPIContainer_i(void)
78 {
79   MESSAGE("[" << _numproc << "] Engines_MPIContainer_i::~Engines_MPIContainer_i()");
80 }
81
82 // Load component
83 void Engines_MPIContainer_i::Shutdown()
84 {
85   int ip;
86   MESSAGE("[" << _numproc << "] shutdown of MPI Corba Server");
87   if( _numproc == 0 ){
88     _NS->Destroy_FullDirectory(_containerName.c_str());
89     _NS->Destroy_Name(_containerName.c_str());
90     for(ip= 1;ip<_nbproc;ip++)
91       (Engines::MPIContainer::_narrow((*_tior)[ip]))->Shutdown();
92   }
93
94   std::map<std::string, Engines::EngineComponent_var>::iterator itm;
95   for (itm = _listInstances_map.begin(); itm != _listInstances_map.end(); itm++)
96     {
97       try
98         {
99           itm->second->destroy();
100         }
101       catch(const CORBA::Exception& e)
102         {
103           // ignore this entry and continue
104         }
105       catch(...)
106         {
107           // ignore this entry and continue
108         }
109     }
110
111   _orb->shutdown(0);
112
113 }
114
115 // Load a component library
116 bool Engines_MPIContainer_i::load_component_Library(const char* componentName, CORBA::String_out reason)
117 {
118   reason=CORBA::string_dup("");
119
120   pthread_t *th;
121   if(_numproc == 0){
122     th = new pthread_t[_nbproc];
123     for(int ip=1;ip<_nbproc;ip++){
124       thread_st *st = new thread_st;
125       st->ip = ip;
126       st->tior = _tior;
127       st->compoName = componentName;
128       pthread_create(&(th[ip]),NULL,th_loadcomponentlibrary,(void*)st);
129     }
130   }
131
132   bool ret = Lload_component_Library(componentName);
133
134   if(_numproc == 0){
135     for(int ip=1;ip<_nbproc;ip++)
136       pthread_join(th[ip],NULL);
137     delete th;
138   }
139   return ret;
140 }
141
142 bool Engines_MPIContainer_i::Lload_component_Library(const char* componentName)
143 {
144   std::string aCompName = componentName;
145
146   // --- try dlopen C++ component
147
148   std::string impl_name = std::string ("lib") + aCompName + std::string("Engine.so");
149   
150   _numInstanceMutex.lock(); // lock to be alone 
151   // (see decInstanceCnt, finalize_removal))
152   if (_toRemove_map[impl_name]) _toRemove_map.erase(impl_name);
153   if (_library_map[impl_name])
154     {
155       MESSAGE("[" << _numproc << "] Library " << impl_name << " already loaded");
156       _numInstanceMutex.unlock();
157       return true;
158     }
159   
160   void* handle;
161   handle = dlopen( impl_name.c_str() , RTLD_LAZY ) ;
162   if ( handle )
163     {
164       _library_map[impl_name] = handle;
165       _numInstanceMutex.unlock();
166       MESSAGE("[" << _numproc << "] Library " << impl_name << " loaded");
167       return true;
168     }
169   else
170     {
171       MESSAGE("[" << _numproc << "] Can't load shared library : " << impl_name);
172       MESSAGE("[" << _numproc << "] error dlopen: " << dlerror());
173     }
174   _numInstanceMutex.unlock();
175
176   // --- try import Python component
177
178   INFOS("[" << _numproc << "] try import Python component "<<componentName);
179   if (_isSupervContainer)
180     {
181       INFOS("[" << _numproc << "] Supervision Container does not support Python Component Engines");
182       return false;
183     }
184   if (_library_map[aCompName])
185     {
186       return true; // Python Component, already imported
187     }
188   else
189     {
190       Py_ACQUIRE_NEW_THREAD;
191       PyObject *mainmod = PyImport_AddModule((char *)"__main__");
192       PyObject *globals = PyModule_GetDict(mainmod);
193       PyObject *pyCont = PyDict_GetItemString(globals, "pyCont");
194       PyObject *result = PyObject_CallMethod(pyCont,
195                                              (char*)"import_component",
196                                              (char*)"s",componentName);
197       std::string ret= PyString_AsString(result);
198       SCRUTE(ret);
199       Py_RELEASE_NEW_THREAD;
200   
201       if (ret=="") // import possible: Python component
202         {
203           _library_map[aCompName] = (void *)pyCont; // any non O value OK
204           MESSAGE("[" << _numproc << "] import Python: "<<aCompName<<" OK");
205           return true;
206         }
207     }
208   return false;
209 }
210
211 // Create an instance of component
212 Engines::EngineComponent_ptr
213 Engines_MPIContainer_i::create_component_instance_env( const char* componentName,
214                                                        CORBA::Long studyId,
215                                                        const Engines::FieldsDict& env,
216                                                        CORBA::String_out reason)
217 {
218   reason=CORBA::string_dup("");
219
220   pthread_t *th;
221   if(_numproc == 0){
222     th = new pthread_t[_nbproc];
223     for(int ip=1;ip<_nbproc;ip++){
224       thread_st *st = new thread_st;
225       st->ip = ip;
226       st->tior = _tior;
227       st->compoName = componentName;
228       st->studyId = studyId;
229       pthread_create(&(th[ip]),NULL,th_createcomponentinstance,(void*)st);
230     }
231   }
232
233   Engines::EngineComponent_ptr cptr = Lcreate_component_instance(componentName,studyId);
234
235   if(_numproc == 0){
236     for(int ip=1;ip<_nbproc;ip++)
237       pthread_join(th[ip],NULL);
238     delete th;
239   }
240
241   return cptr;
242 }
243
244 Engines::EngineComponent_ptr
245 Engines_MPIContainer_i::Lcreate_component_instance( const char* genericRegisterName, CORBA::Long studyId)
246 {
247   if (studyId < 0) {
248     INFOS("studyId must be > 0 for mono study instance, =0 for multiStudy");
249     return Engines::EngineComponent::_nil() ;
250   }
251
252   Engines::EngineComponent_var iobject = Engines::EngineComponent::_nil() ;
253   Engines::MPIObject_var pobj;
254
255   std::string aCompName = genericRegisterName;
256   if (_library_map[aCompName]) { // Python component
257     if (_isSupervContainer) {
258       INFOS("Supervision Container does not support Python Component Engines");
259       return Engines::EngineComponent::_nil();
260     }
261     _numInstanceMutex.lock() ; // lock on the instance number
262     _numInstance++ ;
263     int numInstance = _numInstance ;
264     _numInstanceMutex.unlock() ;
265
266     char aNumI[12];
267     sprintf( aNumI , "%d" , numInstance ) ;
268     std::string instanceName = aCompName + "_inst_" + aNumI ;
269     std::string component_registerName =
270       _containerName + "/" + instanceName;
271
272     Py_ACQUIRE_NEW_THREAD;
273     PyObject *mainmod = PyImport_AddModule((char*)"__main__");
274     PyObject *globals = PyModule_GetDict(mainmod);
275     PyObject *pyCont = PyDict_GetItemString(globals, "pyCont");
276     PyObject *result = PyObject_CallMethod(pyCont,
277                                            (char*)"create_component_instance",
278                                            (char*)"ssl",
279                                            aCompName.c_str(),
280                                            instanceName.c_str(),
281                                            studyId);
282     const char *ior;
283     const char *error;
284     PyArg_ParseTuple(result,"ss", &ior, &error);
285     std::string iors = ior;
286     SCRUTE(iors);
287     Py_RELEASE_NEW_THREAD;
288   
289     CORBA::Object_var obj = _orb->string_to_object(iors.c_str());
290     iobject = Engines::EngineComponent::_narrow( obj ) ;
291     pobj = Engines::MPIObject::_narrow(obj) ;
292     if( _numproc == 0 )
293       _NS->Register(iobject, component_registerName.c_str()) ;
294     // Root recupere les ior des composants des autre process
295     BCastIOR(_orb,pobj,false);
296
297     return iobject._retn();
298   }
299   
300   //--- try C++
301
302   std::string impl_name = std::string ("lib") + genericRegisterName +std::string("Engine.so");
303   if (_library_map.count(impl_name) != 0) // C++ component
304     {
305       void* handle = _library_map[impl_name];
306       iobject = createMPIInstance(genericRegisterName,
307                                     handle,
308                                     studyId);
309       return iobject._retn();
310     }
311
312   return Engines::EngineComponent::_nil() ;
313 }
314
315 Engines::EngineComponent_ptr
316 Engines_MPIContainer_i::createMPIInstance(std::string genericRegisterName,
317                                           void *handle,
318                                           int studyId)
319 {
320   Engines::EngineComponent_var iobject;
321   Engines::MPIObject_var pobj;
322   // --- find the factory
323
324   std::string aGenRegisterName = genericRegisterName;
325   std::string factory_name = aGenRegisterName + std::string("Engine_factory");
326
327   typedef  PortableServer::ObjectId * (*MPIFACTORY_FUNCTION)
328     (CORBA::ORB_ptr,
329      PortableServer::POA_ptr, 
330      PortableServer::ObjectId *, 
331      const char *, 
332      const char *) ;
333
334   dlerror();
335   MPIFACTORY_FUNCTION MPIComponent_factory = (MPIFACTORY_FUNCTION) dlsym(handle, factory_name.c_str());
336
337   if ( !MPIComponent_factory )
338     {
339       INFOS( "[" << _numproc << "] Can't resolve symbol: " + factory_name );
340       SCRUTE( dlerror() );
341       pobj = Engines::MPIObject::_nil();
342       BCastIOR(_orb,pobj,false);
343       return Engines::EngineComponent::_nil();
344     }
345
346   // --- create instance
347
348   iobject = Engines::EngineComponent::_nil() ;
349
350   try
351     {
352       _numInstanceMutex.lock() ; // lock on the instance number
353       _numInstance++ ;
354       int numInstance = _numInstance ;
355       _numInstanceMutex.unlock() ;
356
357       char aNumI[12];
358       sprintf( aNumI , "%d" , numInstance ) ;
359       std::string instanceName = aGenRegisterName + "_inst_" + aNumI ;
360       std::string component_registerName =
361         _containerName + "/" + instanceName;
362
363       // --- Instanciate required CORBA object
364
365       PortableServer::ObjectId *id ; //not owner, do not delete (nore use var)
366       id = (MPIComponent_factory) ( _orb, _poa, _id, instanceName.c_str(), aGenRegisterName.c_str() ) ;
367
368       // --- get reference & servant from id
369
370       CORBA::Object_var obj = _poa->id_to_reference(*id);
371       iobject = Engines::EngineComponent::_narrow( obj ) ;
372       pobj = Engines::MPIObject::_narrow(obj) ;
373
374       Engines_Component_i *servant =
375         dynamic_cast<Engines_Component_i*>(_poa->reference_to_servant(iobject));
376       ASSERT(servant);
377       //SCRUTE(servant->pd_refCount);
378       servant->_remove_ref(); // compensate previous id_to_reference 
379       //SCRUTE(servant->pd_refCount);
380       _listInstances_map[instanceName] = iobject;
381       _cntInstances_map[aGenRegisterName] += 1;
382       //SCRUTE(servant->pd_refCount);
383 #ifndef _DEBUG_
384       servant->setStudyId(studyId);
385 #else
386       bool ret_studyId = servant->setStudyId(studyId);
387       ASSERT(ret_studyId);
388 #endif
389
390       // --- register the engine under the name
391       //     containerName(.dir)/instanceName(.object)
392
393       if( _numproc == 0 ){
394         _NS->Register( iobject , component_registerName.c_str() ) ;
395         MESSAGE( component_registerName.c_str() << " bound" ) ;
396       }
397       // Root recupere les ior des composants des autre process
398       BCastIOR(_orb,pobj,false);
399
400     }
401   catch(const std::exception &ex){
402     INFOS( ex.what() ) ;
403     return Engines::EngineComponent::_nil();
404   }
405   return iobject._retn();
406 }
407
408 // Load component
409 Engines::EngineComponent_ptr Engines_MPIContainer_i::load_impl(const char* nameToRegister,
410                                                  const char* componentName)
411 {
412   pthread_t *th;
413   if(_numproc == 0){
414     th = new pthread_t[_nbproc];
415     for(int ip=1;ip<_nbproc;ip++){
416       thread_st *st = new thread_st;
417       st->ip = ip;
418       st->tior = _tior;
419       st->nameToRegister = nameToRegister;
420       st->compoName = componentName;
421       pthread_create(&(th[ip]),NULL,th_loadimpl,(void*)st);
422     }
423   }
424
425   Engines::EngineComponent_ptr cptr =  Lload_impl(nameToRegister,componentName);
426
427   if(_numproc == 0){
428     for(int ip=1;ip<_nbproc;ip++)
429       pthread_join(th[ip],NULL);
430     delete th;
431   }
432
433   return cptr;
434 }
435
436 // Load component
437 Engines::EngineComponent_ptr Engines_MPIContainer_i::Lload_impl(
438                                    const char* nameToRegister,
439                                    const char* componentName)
440 {
441   Engines::EngineComponent_var iobject;
442   Engines::MPIObject_var pobj;
443   char cproc[4];
444
445   sprintf(cproc,"_%d",_numproc);
446
447   BEGIN_OF("[" << _numproc << "] MPIContainer_i::Lload_impl");
448
449   _numInstanceMutex.lock() ; // lock on the instance number
450   _numInstance++ ;
451   char _aNumI[12];
452   sprintf(_aNumI,"%d",_numInstance) ;
453
454   std::string _impl_name = componentName;
455   std::string _nameToRegister = nameToRegister;
456   std::string instanceName = _nameToRegister + "_inst_" + _aNumI + cproc;
457   MESSAGE("[" << _numproc << "] instanceName=" << instanceName);
458
459   std::string absolute_impl_name(_impl_name);
460   MESSAGE("[" << _numproc << "] absolute_impl_name=" << absolute_impl_name);
461   void * handle = dlopen(absolute_impl_name.c_str(), RTLD_LAZY);
462   if(!handle){
463     INFOS("[" << _numproc << "] Can't load shared library : " << absolute_impl_name);
464     INFOS("[" << _numproc << "] error dlopen: " << dlerror());
465     return Engines::EngineComponent::_nil() ;
466   }
467
468   std::string factory_name = _nameToRegister + std::string("Engine_factory");
469   MESSAGE("[" << _numproc << "] factory_name=" << factory_name) ;
470
471   dlerror();
472   PortableServer::ObjectId * (*MPIComponent_factory) (CORBA::ORB_ptr,
473                                                   PortableServer::POA_ptr,
474                                                   PortableServer::ObjectId *,
475                                                   const char *,
476                                                   const char *) =
477     (PortableServer::ObjectId * (*) (CORBA::ORB_ptr,
478                                      PortableServer::POA_ptr, 
479                                      PortableServer::ObjectId *, 
480                                      const char *, 
481                                      const char *)) 
482     dlsym(handle, factory_name.c_str());
483
484   char *error ;
485   if ((error = dlerror()) != NULL){
486     // Try to load a sequential component
487     MESSAGE("[" << _numproc << "] Try to load a sequential component");
488     _numInstanceMutex.unlock() ;
489     iobject = Engines_Container_i::load_impl(nameToRegister,componentName);
490     if( CORBA::is_nil(iobject) ) return Engines::EngineComponent::_duplicate(iobject);
491   }
492   else{
493     // Instanciation du composant parallele
494     MESSAGE("[" << _numproc << "] Try to load a parallel component");
495     PortableServer::ObjectId * id = (MPIComponent_factory)
496       (_orb, _poa, _id, instanceName.c_str(), _nameToRegister.c_str());
497     // get reference from id
498     CORBA::Object_var o = _poa->id_to_reference(*id);
499     pobj = Engines::MPIObject::_narrow(o) ;
500     iobject = Engines::EngineComponent::_narrow(o) ;
501   }
502
503   if( _numproc == 0 ){
504     // utiliser + tard le registry ici :
505     // register the engine under the name containerName.dir/nameToRegister.object
506     std::string component_registerName = _containerName + "/" + _nameToRegister;
507     _NS->Register(iobject, component_registerName.c_str()) ;
508   }
509
510   _numInstanceMutex.unlock() ;
511
512   // Root recupere les ior des composants des autre process
513   BCastIOR(_orb,pobj,false);
514
515   END_OF("[" <<_numproc << "] MPIContainer_i::Lload_impl");
516   return Engines::EngineComponent::_duplicate(iobject);
517
518 }
519
520 void Engines_MPIContainer_i::remove_impl(Engines::EngineComponent_ptr component_i)
521 {
522   Engines::MPIObject_ptr pcptr;
523   Engines::MPIObject_ptr spcptr;
524
525   pthread_t *th;
526   if(_numproc == 0){
527     pcptr = (Engines::MPIObject_ptr)component_i;
528     th = new pthread_t[_nbproc];
529     for(int ip=1;ip<_nbproc;ip++){
530       thread_st *st = new thread_st;
531       st->ip = ip;
532       st->tior = _tior;
533       spcptr = Engines::MPIObject::_narrow((*(pcptr->tior()))[ip]);
534       st->cptr = (Engines::EngineComponent_ptr)spcptr;
535       pthread_create(&(th[ip]),NULL,th_removeimpl,(void*)st);
536     }
537   }
538
539   ASSERT(! CORBA::is_nil(component_i));
540   std::string instanceName = component_i->instanceName() ;
541   MESSAGE("[" << _numproc << "] unload component " << instanceName);
542   _numInstanceMutex.lock() ; // lock on the remove on handle_map
543   _listInstances_map.erase(instanceName);
544   _numInstanceMutex.unlock() ;
545   component_i->destroy() ;
546   if(_numproc == 0)
547     _NS->Destroy_Name(instanceName.c_str());
548
549   if(_numproc == 0){
550     for(int ip=1;ip<_nbproc;ip++)
551       pthread_join(th[ip],NULL);
552     delete th;
553   }
554
555 }
556
557 void Engines_MPIContainer_i::finalize_removal()
558 {
559   pthread_t *th;
560   if(_numproc == 0){
561     th = new pthread_t[_nbproc];
562     for(int ip=1;ip<_nbproc;ip++){
563       thread_st *st = new thread_st;
564       st->ip = ip;
565       st->tior = _tior;
566       pthread_create(&(th[ip]),NULL,th_finalizeremoval,(void*)st);
567     }
568   }
569
570   _numInstanceMutex.lock(); // lock to be alone
571   // (see decInstanceCnt, load_component_Library)
572   std::map<std::string, void *>::iterator ith;
573   for (ith = _toRemove_map.begin(); ith != _toRemove_map.end(); ith++)
574   {
575     void *handle = (*ith).second;
576     std::string impl_name= (*ith).first;
577     if (handle)
578     {
579       SCRUTE(handle);
580       SCRUTE(impl_name);
581       //        dlclose(handle);                // SALOME unstable after ...
582       //        _library_map.erase(impl_name);
583     }
584   }
585   _toRemove_map.clear();
586   _numInstanceMutex.unlock();
587
588   if(_numproc == 0){
589     for(int ip=1;ip<_nbproc;ip++)
590       pthread_join(th[ip],NULL);
591     delete th;
592   }
593 }
594
595 void *th_loadcomponentlibrary(void *s)
596 {
597   thread_st *st = (thread_st*)s;
598   char* reason;
599   (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->load_component_Library(st->compoName.c_str(),reason);
600   CORBA::string_free(reason);
601   return NULL;
602 }
603
604 void *th_createcomponentinstance(void *s)
605 {
606   thread_st *st = (thread_st*)s;
607   (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->create_component_instance(st->compoName.c_str(),st->studyId);
608   return NULL;
609 }
610
611 void *th_loadimpl(void *s)
612 {
613   thread_st *st = (thread_st*)s;
614   (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->load_impl(st->nameToRegister.c_str(),st->compoName.c_str());
615   return NULL;
616 }
617
618 void *th_removeimpl(void *s)
619 {
620   thread_st *st = (thread_st*)s;
621   (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->remove_impl(st->cptr);
622   return NULL;
623 }
624
625 void *th_finalizeremoval(void *s)
626 {
627   thread_st *st = (thread_st*)s;
628   (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->finalize_removal();
629   return NULL;
630 }