Salome HOME
just a little modification
[modules/kernel.git] / src / MPIContainer / MPIContainer_i.cxx
1 //  Copyright (C) 2007-2008  CEA/DEN, EDF R&D, OPEN CASCADE
2 //
3 //  Copyright (C) 2003-2007  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 //  CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
5 //
6 //  This library is free software; you can redistribute it and/or
7 //  modify it under the terms of the GNU Lesser General Public
8 //  License as published by the Free Software Foundation; either
9 //  version 2.1 of the License.
10 //
11 //  This library is distributed in the hope that it will be useful,
12 //  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 //  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 //  Lesser General Public License for more details.
15 //
16 //  You should have received a copy of the GNU Lesser General Public
17 //  License along with this library; if not, write to the Free Software
18 //  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
19 //
20 //  See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 //
22 //  SALOME MPIContainer : implemenation of container based on MPI libraries
23 //  File   : MPIContainer_i.cxx
24 //  Module : SALOME
25 //
26 #include <iostream>
27 #include <dlfcn.h>
28 #include <stdio.h>
29 #include "Basics_Utils.hxx"
30 #include "SALOME_Component_i.hxx"
31 #include "MPIContainer_i.hxx"
32 #include "SALOME_NamingService.hxx"
33 #include "Utils_SINGLETON.hxx"
34 #include "OpUtil.hxx"
35 #include "utilities.h"
36 #include <time.h>
37 #include <sys/time.h>
38 #include <pthread.h>  // must be before Python.h !
39 #include <Python.h>
40 #include "Container_init_python.hxx"
41 using namespace std;
42
43 // L'appel au registry SALOME ne se fait que pour le process 0
44 Engines_MPIContainer_i::Engines_MPIContainer_i(int nbproc, int numproc,
45                                                CORBA::ORB_ptr orb, 
46                                                PortableServer::POA_ptr poa,
47                                                char * containerName,
48                                                int argc, char *argv[]) 
49   : Engines_Container_i(orb,poa,containerName,argc,argv,false), MPIObject_i(nbproc,numproc)
50 {
51
52   _id = _poa->activate_object(this);
53   CORBA::Object_var obj=_poa->id_to_reference(*_id);
54   Engines::Container_var pCont = Engines::Container::_narrow(obj);
55   _remove_ref();
56
57   if(numproc==0){
58
59     _NS = new SALOME_NamingService();
60     _NS->init_orb( CORBA::ORB::_duplicate(_orb) ) ;
61
62     string hostname = Kernel_Utils::GetHostname();
63     _containerName = _NS->BuildContainerNameForNS(containerName,hostname.c_str());
64     SCRUTE(_containerName);
65     _NS->Register(pCont, _containerName.c_str());
66
67   }
68
69   // Root recupere les ior des container des autre process
70   Engines::MPIObject_var pobj = POA_Engines::MPIContainer::_this();
71   BCastIOR(_orb,pobj,true);
72 }
73
74 Engines_MPIContainer_i::Engines_MPIContainer_i(int nbproc, int numproc) 
75   : Engines_Container_i(), MPIObject_i(nbproc,numproc)
76 {
77 }
78
79 Engines_MPIContainer_i::~Engines_MPIContainer_i(void)
80 {
81   MESSAGE("[" << _numproc << "] Engines_MPIContainer_i::~Engines_MPIContainer_i()");
82 }
83
84 // Load component
85 void Engines_MPIContainer_i::Shutdown()
86 {
87   int ip;
88   MESSAGE("[" << _numproc << "] shutdown of MPI Corba Server");
89   if( _numproc == 0 ){
90     _NS->Destroy_FullDirectory(_containerName.c_str());
91     for(ip= 1;ip<_nbproc;ip++)
92       (Engines::MPIContainer::_narrow((*_tior)[ip]))->Shutdown();
93   }
94
95   std::map<std::string, Engines::Component_var>::iterator itm;
96   for (itm = _listInstances_map.begin(); itm != _listInstances_map.end(); itm++)
97     {
98       try
99         {
100           itm->second->destroy();
101         }
102       catch(const CORBA::Exception& e)
103         {
104           // ignore this entry and continue
105         }
106       catch(...)
107         {
108           // ignore this entry and continue
109         }
110     }
111
112   _orb->shutdown(0);
113
114 }
115
116 // Load a component library
117 bool Engines_MPIContainer_i::load_component_Library(const char* componentName)
118 {
119   pthread_t *th;
120   if(_numproc == 0){
121     th = new pthread_t[_nbproc];
122     for(int ip=1;ip<_nbproc;ip++){
123       thread_st *st = new thread_st;
124       st->ip = ip;
125       st->tior = _tior;
126       st->compoName = componentName;
127       pthread_create(&(th[ip]),NULL,th_loadcomponentlibrary,(void*)st);
128     }
129   }
130
131   bool ret = Lload_component_Library(componentName);
132
133   if(_numproc == 0){
134     for(int ip=1;ip<_nbproc;ip++)
135       pthread_join(th[ip],NULL);
136     delete th;
137   }
138   return ret;
139 }
140
141 bool Engines_MPIContainer_i::Lload_component_Library(const char* componentName)
142 {
143   string aCompName = componentName;
144
145   // --- try dlopen C++ component
146
147   string impl_name = string ("lib") + aCompName + string("Engine.so");
148   
149   _numInstanceMutex.lock(); // lock to be alone 
150   // (see decInstanceCnt, finalize_removal))
151   if (_toRemove_map[impl_name]) _toRemove_map.erase(impl_name);
152   if (_library_map[impl_name])
153     {
154       MESSAGE("[" << _numproc << "] Library " << impl_name << " already loaded");
155       _numInstanceMutex.unlock();
156       return true;
157     }
158   
159   void* handle;
160   handle = dlopen( impl_name.c_str() , RTLD_LAZY ) ;
161   if ( handle )
162     {
163       _library_map[impl_name] = handle;
164       _numInstanceMutex.unlock();
165       MESSAGE("[" << _numproc << "] Library " << impl_name << " loaded");
166       return true;
167     }
168   else
169     {
170       MESSAGE("[" << _numproc << "] Can't load shared library : " << impl_name);
171       MESSAGE("[" << _numproc << "] error dlopen: " << dlerror());
172     }
173   _numInstanceMutex.unlock();
174
175   // --- try import Python component
176
177   INFOS("[" << _numproc << "] try import Python component "<<componentName);
178   if (_isSupervContainer)
179     {
180       INFOS("[" << _numproc << "] Supervision Container does not support Python Component Engines");
181       return false;
182     }
183   if (_library_map[aCompName])
184     {
185       return true; // Python Component, already imported
186     }
187   else
188     {
189       Py_ACQUIRE_NEW_THREAD;
190       PyObject *mainmod = PyImport_AddModule((char *)"__main__");
191       PyObject *globals = PyModule_GetDict(mainmod);
192       PyObject *pyCont = PyDict_GetItemString(globals, "pyCont");
193       PyObject *result = PyObject_CallMethod(pyCont,
194                                              (char*)"import_component",
195                                              (char*)"s",componentName);
196       int ret= PyInt_AsLong(result);
197       SCRUTE(ret);
198       Py_RELEASE_NEW_THREAD;
199   
200       if (ret) // import possible: Python component
201         {
202           _library_map[aCompName] = (void *)pyCont; // any non O value OK
203           MESSAGE("[" << _numproc << "] import Python: "<<aCompName<<" OK");
204           return true;
205         }
206     }
207   return false;
208 }
209
210 // Create an instance of component
211 Engines::Component_ptr
212 Engines_MPIContainer_i::create_component_instance( const char* componentName,
213                                                    CORBA::Long studyId)
214 {
215   pthread_t *th;
216   if(_numproc == 0){
217     th = new pthread_t[_nbproc];
218     for(int ip=1;ip<_nbproc;ip++){
219       thread_st *st = new thread_st;
220       st->ip = ip;
221       st->tior = _tior;
222       st->compoName = componentName;
223       st->studyId = studyId;
224       pthread_create(&(th[ip]),NULL,th_createcomponentinstance,(void*)st);
225     }
226   }
227
228   Engines::Component_ptr cptr = Lcreate_component_instance(componentName,studyId);
229
230   if(_numproc == 0){
231     for(int ip=1;ip<_nbproc;ip++)
232       pthread_join(th[ip],NULL);
233     delete th;
234   }
235
236   return cptr;
237 }
238
239 Engines::Component_ptr
240 Engines_MPIContainer_i::Lcreate_component_instance( const char* genericRegisterName, CORBA::Long studyId)
241 {
242   if (studyId < 0) {
243     INFOS("studyId must be > 0 for mono study instance, =0 for multiStudy");
244     return Engines::Component::_nil() ;
245   }
246
247   Engines::Component_var iobject = Engines::Component::_nil() ;
248   Engines::MPIObject_var pobj;
249
250   string aCompName = genericRegisterName;
251   if (_library_map[aCompName]) { // Python component
252     if (_isSupervContainer) {
253       INFOS("Supervision Container does not support Python Component Engines");
254       return Engines::Component::_nil();
255     }
256     _numInstanceMutex.lock() ; // lock on the instance number
257     _numInstance++ ;
258     int numInstance = _numInstance ;
259     _numInstanceMutex.unlock() ;
260
261     char aNumI[12];
262     sprintf( aNumI , "%d" , numInstance ) ;
263     string instanceName = aCompName + "_inst_" + aNumI ;
264     string component_registerName =
265       _containerName + "/" + instanceName;
266
267     Py_ACQUIRE_NEW_THREAD;
268     PyObject *mainmod = PyImport_AddModule((char*)"__main__");
269     PyObject *globals = PyModule_GetDict(mainmod);
270     PyObject *pyCont = PyDict_GetItemString(globals, "pyCont");
271     PyObject *result = PyObject_CallMethod(pyCont,
272                                            (char*)"create_component_instance",
273                                            (char*)"ssl",
274                                            aCompName.c_str(),
275                                            instanceName.c_str(),
276                                            studyId);
277     string iors = PyString_AsString(result);
278     SCRUTE(iors);
279     Py_RELEASE_NEW_THREAD;
280   
281     CORBA::Object_var obj = _orb->string_to_object(iors.c_str());
282     iobject = Engines::Component::_narrow( obj ) ;
283     pobj = Engines::MPIObject::_narrow(obj) ;
284     if( _numproc == 0 )
285       _NS->Register(iobject, component_registerName.c_str()) ;
286     // Root recupere les ior des composants des autre process
287     BCastIOR(_orb,pobj,false);
288
289     return iobject._retn();
290   }
291   
292   //--- try C++
293
294   string impl_name = string ("lib") + genericRegisterName +string("Engine.so");
295   if (_library_map.count(impl_name) != 0) // C++ component
296     {
297       void* handle = _library_map[impl_name];
298       iobject = createMPIInstance(genericRegisterName,
299                                     handle,
300                                     studyId);
301       return iobject._retn();
302     }
303
304   return Engines::Component::_nil() ;
305 }
306
307 Engines::Component_ptr
308 Engines_MPIContainer_i::createMPIInstance(string genericRegisterName,
309                                           void *handle,
310                                           int studyId)
311 {
312   Engines::Component_var iobject;
313   Engines::MPIObject_var pobj;
314   // --- find the factory
315
316   string aGenRegisterName = genericRegisterName;
317   string factory_name = aGenRegisterName + string("Engine_factory");
318
319   typedef  PortableServer::ObjectId * (*MPIFACTORY_FUNCTION)
320     (int,int,
321      CORBA::ORB_ptr,
322      PortableServer::POA_ptr, 
323      PortableServer::ObjectId *, 
324      const char *, 
325      const char *) ;
326
327   dlerror();
328   MPIFACTORY_FUNCTION MPIComponent_factory = (MPIFACTORY_FUNCTION) dlsym(handle, factory_name.c_str());
329
330   if ( !MPIComponent_factory )
331     {
332       INFOS( "[" << _numproc << "] Can't resolve symbol: " + factory_name );
333       SCRUTE( dlerror() );
334       pobj = Engines::MPIObject::_nil();
335       BCastIOR(_orb,pobj,false);
336       return Engines::Component::_nil();
337     }
338
339   // --- create instance
340
341   iobject = Engines::Component::_nil() ;
342
343   try
344     {
345       _numInstanceMutex.lock() ; // lock on the instance number
346       _numInstance++ ;
347       int numInstance = _numInstance ;
348       _numInstanceMutex.unlock() ;
349
350       char aNumI[12];
351       sprintf( aNumI , "%d" , numInstance ) ;
352       string instanceName = aGenRegisterName + "_inst_" + aNumI ;
353       string component_registerName =
354         _containerName + "/" + instanceName;
355
356       // --- Instanciate required CORBA object
357
358       PortableServer::ObjectId *id ; //not owner, do not delete (nore use var)
359       id = (MPIComponent_factory) ( _nbproc,_numproc,_orb, _poa, _id, instanceName.c_str(),
360                                  aGenRegisterName.c_str() ) ;
361
362       // --- get reference & servant from id
363
364       CORBA::Object_var obj = _poa->id_to_reference(*id);
365       iobject = Engines::Component::_narrow( obj ) ;
366       pobj = Engines::MPIObject::_narrow(obj) ;
367
368       Engines_Component_i *servant =
369         dynamic_cast<Engines_Component_i*>(_poa->reference_to_servant(iobject));
370       ASSERT(servant);
371       //SCRUTE(servant->pd_refCount);
372       servant->_remove_ref(); // compensate previous id_to_reference 
373       //SCRUTE(servant->pd_refCount);
374       _listInstances_map[instanceName] = iobject;
375       _cntInstances_map[aGenRegisterName] += 1;
376       //SCRUTE(servant->pd_refCount);
377       bool ret_studyId = servant->setStudyId(studyId);
378       ASSERT(ret_studyId);
379
380       // --- register the engine under the name
381       //     containerName(.dir)/instanceName(.object)
382
383       if( _numproc == 0 ){
384         _NS->Register( iobject , component_registerName.c_str() ) ;
385         MESSAGE( component_registerName.c_str() << " bound" ) ;
386       }
387       // Root recupere les ior des composants des autre process
388       BCastIOR(_orb,pobj,false);
389
390     }
391   catch(const POException &ex){
392     INFOS( ex.msg << " on process number " << ex.numproc ) ;
393     return Engines::Component::_nil();
394   }
395   catch (...){
396     INFOS( "Container_i::createInstance exception catched" ) ;
397     return Engines::Component::_nil();
398   }
399   return iobject._retn();
400 }
401
402 // Load component
403 Engines::Component_ptr Engines_MPIContainer_i::load_impl(const char* nameToRegister,
404                                                  const char* componentName)
405 {
406   pthread_t *th;
407   if(_numproc == 0){
408     th = new pthread_t[_nbproc];
409     for(int ip=1;ip<_nbproc;ip++){
410       thread_st *st = new thread_st;
411       st->ip = ip;
412       st->tior = _tior;
413       st->nameToRegister = nameToRegister;
414       st->compoName = componentName;
415       pthread_create(&(th[ip]),NULL,th_loadimpl,(void*)st);
416     }
417   }
418
419   Engines::Component_ptr cptr =  Lload_impl(nameToRegister,componentName);
420
421   if(_numproc == 0){
422     for(int ip=1;ip<_nbproc;ip++)
423       pthread_join(th[ip],NULL);
424     delete th;
425   }
426
427   return cptr;
428 }
429
430 // Load component
431 Engines::Component_ptr Engines_MPIContainer_i::Lload_impl(
432                                    const char* nameToRegister,
433                                    const char* componentName)
434 {
435   Engines::Component_var iobject;
436   Engines::MPIObject_var pobj;
437   char cproc[4];
438
439   sprintf(cproc,"_%d",_numproc);
440
441   BEGIN_OF("[" << _numproc << "] MPIContainer_i::Lload_impl");
442
443   _numInstanceMutex.lock() ; // lock on the instance number
444   _numInstance++ ;
445   char _aNumI[12];
446   sprintf(_aNumI,"%d",_numInstance) ;
447
448   string _impl_name = componentName;
449   string _nameToRegister = nameToRegister;
450   string instanceName = _nameToRegister + "_inst_" + _aNumI + cproc;
451   MESSAGE("[" << _numproc << "] instanceName=" << instanceName);
452
453   string absolute_impl_name(_impl_name);
454   MESSAGE("[" << _numproc << "] absolute_impl_name=" << absolute_impl_name);
455   void * handle = dlopen(absolute_impl_name.c_str(), RTLD_LAZY);
456   if(!handle){
457     INFOS("[" << _numproc << "] Can't load shared library : " << absolute_impl_name);
458     INFOS("[" << _numproc << "] error dlopen: " << dlerror());
459     return Engines::Component::_nil() ;
460   }
461
462   string factory_name = _nameToRegister + string("Engine_factory");
463   MESSAGE("[" << _numproc << "] factory_name=" << factory_name) ;
464
465   dlerror();
466   PortableServer::ObjectId * (*MPIComponent_factory) (int,int,
467                                                   CORBA::ORB_ptr,
468                                                   PortableServer::POA_ptr,
469                                                   PortableServer::ObjectId *,
470                                                   const char *,
471                                                   const char *) =
472     (PortableServer::ObjectId * (*) (int,int,
473                                      CORBA::ORB_ptr,
474                                      PortableServer::POA_ptr, 
475                                      PortableServer::ObjectId *, 
476                                      const char *, 
477                                      const char *)) 
478     dlsym(handle, factory_name.c_str());
479
480   char *error ;
481   if ((error = dlerror()) != NULL){
482     // Try to load a sequential component
483     MESSAGE("[" << _numproc << "] Try to load a sequential component");
484     _numInstanceMutex.unlock() ;
485     iobject = Engines_Container_i::load_impl(nameToRegister,componentName);
486     if( CORBA::is_nil(iobject) ) return Engines::Component::_duplicate(iobject);
487   }
488   else{
489     // Instanciation du composant parallele
490     MESSAGE("[" << _numproc << "] Try to load a parallel component");
491     PortableServer::ObjectId * id = (MPIComponent_factory)
492       (_nbproc,_numproc,_orb, _poa, _id, instanceName.c_str(), _nameToRegister.c_str());
493     // get reference from id
494     CORBA::Object_var o = _poa->id_to_reference(*id);
495     pobj = Engines::MPIObject::_narrow(o) ;
496     iobject = Engines::Component::_narrow(o) ;
497   }
498
499   if( _numproc == 0 ){
500     // utiliser + tard le registry ici :
501     // register the engine under the name containerName.dir/nameToRegister.object
502     string component_registerName = _containerName + "/" + _nameToRegister;
503     _NS->Register(iobject, component_registerName.c_str()) ;
504   }
505
506   _numInstanceMutex.unlock() ;
507
508   // Root recupere les ior des composants des autre process
509   BCastIOR(_orb,pobj,false);
510
511   END_OF("[" <<_numproc << "] MPIContainer_i::Lload_impl");
512   return Engines::Component::_duplicate(iobject);
513
514 }
515
516 void Engines_MPIContainer_i::remove_impl(Engines::Component_ptr component_i)
517 {
518   Engines::MPIObject_ptr pcptr;
519   Engines::MPIObject_ptr spcptr;
520
521   pthread_t *th;
522   if(_numproc == 0){
523     pcptr = (Engines::MPIObject_ptr)component_i;
524     th = new pthread_t[_nbproc];
525     for(int ip=1;ip<_nbproc;ip++){
526       thread_st *st = new thread_st;
527       st->ip = ip;
528       st->tior = _tior;
529       spcptr = Engines::MPIObject::_narrow((*(pcptr->tior()))[ip]);
530       st->cptr = (Engines::Component_ptr)spcptr;
531       pthread_create(&(th[ip]),NULL,th_removeimpl,(void*)st);
532     }
533   }
534
535   ASSERT(! CORBA::is_nil(component_i));
536   string instanceName = component_i->instanceName() ;
537   MESSAGE("[" << _numproc << "] unload component " << instanceName);
538   _numInstanceMutex.lock() ; // lock on the remove on handle_map
539   _listInstances_map.erase(instanceName);
540   _numInstanceMutex.unlock() ;
541   component_i->destroy() ;
542   if(_numproc == 0)
543     _NS->Destroy_Name(instanceName.c_str());
544
545   if(_numproc == 0){
546     for(int ip=1;ip<_nbproc;ip++)
547       pthread_join(th[ip],NULL);
548     delete th;
549   }
550
551 }
552
553 void Engines_MPIContainer_i::finalize_removal()
554 {
555   pthread_t *th;
556   if(_numproc == 0){
557     th = new pthread_t[_nbproc];
558     for(int ip=1;ip<_nbproc;ip++){
559       thread_st *st = new thread_st;
560       st->ip = ip;
561       st->tior = _tior;
562       pthread_create(&(th[ip]),NULL,th_finalizeremoval,(void*)st);
563     }
564   }
565
566   _numInstanceMutex.lock(); // lock to be alone
567   // (see decInstanceCnt, load_component_Library)
568   map<string, void *>::iterator ith;
569   for (ith = _toRemove_map.begin(); ith != _toRemove_map.end(); ith++)
570   {
571     void *handle = (*ith).second;
572     string impl_name= (*ith).first;
573     if (handle)
574     {
575       SCRUTE(handle);
576       SCRUTE(impl_name);
577       //        dlclose(handle);                // SALOME unstable after ...
578       //        _library_map.erase(impl_name);
579     }
580   }
581   _toRemove_map.clear();
582   _numInstanceMutex.unlock();
583
584   if(_numproc == 0){
585     for(int ip=1;ip<_nbproc;ip++)
586       pthread_join(th[ip],NULL);
587     delete th;
588   }
589 }
590
591 void *th_loadcomponentlibrary(void *s)
592 {
593   thread_st *st = (thread_st*)s;
594   (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->load_component_Library(st->compoName.c_str());
595   return NULL;
596 }
597
598 void *th_createcomponentinstance(void *s)
599 {
600   thread_st *st = (thread_st*)s;
601   (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->create_component_instance(st->compoName.c_str(),st->studyId);
602   return NULL;
603 }
604
605 void *th_loadimpl(void *s)
606 {
607   thread_st *st = (thread_st*)s;
608   (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->load_impl(st->nameToRegister.c_str(),st->compoName.c_str());
609   return NULL;
610 }
611
612 void *th_removeimpl(void *s)
613 {
614   thread_st *st = (thread_st*)s;
615   (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->remove_impl(st->cptr);
616   return NULL;
617 }
618
619 void *th_finalizeremoval(void *s)
620 {
621   thread_st *st = (thread_st*)s;
622   (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->finalize_removal();
623   return NULL;
624 }