]> SALOME platform Git repositories - modules/kernel.git/blob - src/MPIContainer/MPIContainer_i.cxx
Salome HOME
fix problems from GUI
[modules/kernel.git] / src / MPIContainer / MPIContainer_i.cxx
1 // Copyright (C) 2007-2016  CEA/DEN, EDF R&D, OPEN CASCADE
2 //
3 // Copyright (C) 2003-2007  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 // CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
5 //
6 // This library is free software; you can redistribute it and/or
7 // modify it under the terms of the GNU Lesser General Public
8 // License as published by the Free Software Foundation; either
9 // version 2.1 of the License, or (at your option) any later version.
10 //
11 // This library is distributed in the hope that it will be useful,
12 // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 // Lesser General Public License for more details.
15 //
16 // You should have received a copy of the GNU Lesser General Public
17 // License along with this library; if not, write to the Free Software
18 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
19 //
20 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 //
22 //  File   : MPIContainer_i.cxx
23 //  Module : SALOME
24
25 #include <iostream>
26 #include <dlfcn.h>
27 #include <stdio.h>
28 #include "Basics_Utils.hxx"
29 #include "SALOME_Component_i.hxx"
30 #include "MPIContainer_i.hxx"
31 #include "SALOME_NamingService.hxx"
32 #include "Utils_SINGLETON.hxx"
33 #include "OpUtil.hxx"
34 #include "utilities.h"
35 #include <time.h>
36 #include <sys/time.h>
37 #include <pthread.h>  // must be before Python.h !
38
39 #ifdef _XOPEN_SOURCE
40 #undef _XOPEN_SOURCE
41 #endif
42
43 #include <Python.h>
44 #include "Container_init_python.hxx"
45
46 // L'appel au registry SALOME ne se fait que pour le process 0
47 Engines_MPIContainer_i::Engines_MPIContainer_i(CORBA::ORB_ptr orb, 
48                                                PortableServer::POA_ptr poa,
49                                                char * containerName,
50                                                int argc, char *argv[]) 
51   : Engines_Container_i(orb,poa,containerName,argc,argv,false)
52 {
53
54   _id = _poa->activate_object(this);
55   CORBA::Object_var obj=_poa->id_to_reference(*_id);
56   Engines::Container_var pCont = Engines::Container::_narrow(obj);
57   _remove_ref();
58
59   if(_numproc==0){
60
61     _NS = new SALOME_NamingService();
62     _NS->init_orb( CORBA::ORB::_duplicate(_orb) ) ;
63
64     std::string hostname = Kernel_Utils::GetHostname();
65     _containerName = _NS->BuildContainerNameForNS(containerName,hostname.c_str());
66     SCRUTE(_containerName);
67     _NS->Register(pCont, _containerName.c_str());
68
69   }
70
71   // Root recupere les ior des container des autre process
72   Engines::MPIObject_var pobj = POA_Engines::MPIContainer::_this();
73   BCastIOR(_orb,pobj,true);
74 }
75
76 Engines_MPIContainer_i::Engines_MPIContainer_i() 
77   : Engines_Container_i()
78 {
79 }
80
81 Engines_MPIContainer_i::~Engines_MPIContainer_i(void)
82 {
83   MESSAGE("[" << _numproc << "] Engines_MPIContainer_i::~Engines_MPIContainer_i()");
84 }
85
86 // Load component
87 void Engines_MPIContainer_i::Shutdown()
88 {
89   int ip;
90   MESSAGE("[" << _numproc << "] shutdown of MPI Corba Server");
91   if( _numproc == 0 ){
92     _NS->Destroy_FullDirectory(_containerName.c_str());
93     _NS->Destroy_Name(_containerName.c_str());
94     for(ip= 1;ip<_nbproc;ip++)
95       (Engines::MPIContainer::_narrow((*_tior)[ip]))->Shutdown();
96   }
97
98   std::map<std::string, Engines::EngineComponent_var>::iterator itm;
99   for (itm = _listInstances_map.begin(); itm != _listInstances_map.end(); itm++)
100     {
101       try
102         {
103           itm->second->destroy();
104         }
105       catch(const CORBA::Exception& e)
106         {
107           // ignore this entry and continue
108         }
109       catch(...)
110         {
111           // ignore this entry and continue
112         }
113     }
114
115   _orb->shutdown(0);
116
117 }
118
119 // Load a component library
120 bool Engines_MPIContainer_i::load_component_Library(const char* componentName, CORBA::String_out reason)
121 {
122   reason=CORBA::string_dup("");
123
124   pthread_t *th;
125   if(_numproc == 0){
126     th = new pthread_t[_nbproc];
127     for(int ip=1;ip<_nbproc;ip++){
128       thread_st *st = new thread_st;
129       st->ip = ip;
130       st->tior = _tior;
131       st->compoName = componentName;
132       pthread_create(&(th[ip]),NULL,th_loadcomponentlibrary,(void*)st);
133     }
134   }
135
136   bool ret = Lload_component_Library(componentName);
137
138   if(_numproc == 0){
139     for(int ip=1;ip<_nbproc;ip++)
140       pthread_join(th[ip],NULL);
141     delete th;
142   }
143   return ret;
144 }
145
146 bool Engines_MPIContainer_i::Lload_component_Library(const char* componentName)
147 {
148   std::string aCompName = componentName;
149
150   // --- try dlopen C++ component
151
152   std::string impl_name = std::string ("lib") + aCompName + std::string("Engine.so");
153   
154   _numInstanceMutex.lock(); // lock to be alone 
155   // (see decInstanceCnt, finalize_removal))
156   if (_toRemove_map[impl_name]) _toRemove_map.erase(impl_name);
157   if (_library_map[impl_name])
158     {
159       MESSAGE("[" << _numproc << "] Library " << impl_name << " already loaded");
160       _numInstanceMutex.unlock();
161       return true;
162     }
163   
164   void* handle;
165   handle = dlopen( impl_name.c_str() , RTLD_LAZY ) ;
166   if ( handle )
167     {
168       _library_map[impl_name] = handle;
169       _numInstanceMutex.unlock();
170       MESSAGE("[" << _numproc << "] Library " << impl_name << " loaded");
171       return true;
172     }
173   else
174     {
175       MESSAGE("[" << _numproc << "] Can't load shared library : " << impl_name);
176       MESSAGE("[" << _numproc << "] error dlopen: " << dlerror());
177     }
178   _numInstanceMutex.unlock();
179
180   // --- try import Python component
181
182   INFOS("[" << _numproc << "] try import Python component "<<componentName);
183   if (_isSupervContainer)
184     {
185       INFOS("[" << _numproc << "] Supervision Container does not support Python Component Engines");
186       return false;
187     }
188   if (_library_map[aCompName])
189     {
190       return true; // Python Component, already imported
191     }
192   else
193     {
194       Py_ACQUIRE_NEW_THREAD;
195       PyObject *mainmod = PyImport_AddModule((char *)"__main__");
196       PyObject *globals = PyModule_GetDict(mainmod);
197       PyObject *pyCont = PyDict_GetItemString(globals, "pyCont");
198       PyObject *result = PyObject_CallMethod(pyCont,
199                                              (char*)"import_component",
200                                              (char*)"s",componentName);
201       std::string ret= PyString_AsString(result);
202       SCRUTE(ret);
203       Py_RELEASE_NEW_THREAD;
204   
205       if (ret=="") // import possible: Python component
206         {
207           _library_map[aCompName] = (void *)pyCont; // any non O value OK
208           MESSAGE("[" << _numproc << "] import Python: "<<aCompName<<" OK");
209           return true;
210         }
211     }
212   return false;
213 }
214
215 // Create an instance of component
216 Engines::EngineComponent_ptr
217 Engines_MPIContainer_i::create_component_instance_env( const char* componentName,
218                                                        const Engines::FieldsDict& env,
219                                                        CORBA::String_out reason)
220 {
221   reason=CORBA::string_dup("");
222
223   pthread_t *th;
224   if(_numproc == 0){
225     th = new pthread_t[_nbproc];
226     for(int ip=1;ip<_nbproc;ip++){
227       thread_st *st = new thread_st;
228       st->ip = ip;
229       st->tior = _tior;
230       st->compoName = componentName;
231       pthread_create(&(th[ip]),NULL,th_createcomponentinstance,(void*)st);
232     }
233   }
234
235   Engines::EngineComponent_ptr cptr = Lcreate_component_instance(componentName);
236
237   if(_numproc == 0){
238     for(int ip=1;ip<_nbproc;ip++)
239       pthread_join(th[ip],NULL);
240     delete th;
241   }
242
243   return cptr;
244 }
245
246 Engines::EngineComponent_ptr
247 Engines_MPIContainer_i::Lcreate_component_instance( const char* genericRegisterName )
248 {
249   Engines::EngineComponent_var iobject = Engines::EngineComponent::_nil() ;
250   Engines::MPIObject_var pobj;
251
252   std::string aCompName = genericRegisterName;
253   if (_library_map[aCompName]) { // Python component
254     if (_isSupervContainer) {
255       INFOS("Supervision Container does not support Python Component Engines");
256       return Engines::EngineComponent::_nil();
257     }
258     _numInstanceMutex.lock() ; // lock on the instance number
259     _numInstance++ ;
260     int numInstance = _numInstance ;
261     _numInstanceMutex.unlock() ;
262
263     char aNumI[12];
264     sprintf( aNumI , "%d" , numInstance ) ;
265     std::string instanceName = aCompName + "_inst_" + aNumI ;
266     std::string component_registerName =
267       _containerName + "/" + instanceName;
268
269     Py_ACQUIRE_NEW_THREAD;
270     PyObject *mainmod = PyImport_AddModule((char*)"__main__");
271     PyObject *globals = PyModule_GetDict(mainmod);
272     PyObject *pyCont = PyDict_GetItemString(globals, "pyCont");
273     PyObject *result = PyObject_CallMethod(pyCont,
274                                            (char*)"create_component_instance",
275                                            (char*)"ss",
276                                            aCompName.c_str(),
277                                            instanceName.c_str());
278     const char *ior;
279     const char *error;
280     PyArg_ParseTuple(result,"ss", &ior, &error);
281     std::string iors = ior;
282     SCRUTE(iors);
283     Py_RELEASE_NEW_THREAD;
284   
285     CORBA::Object_var obj = _orb->string_to_object(iors.c_str());
286     iobject = Engines::EngineComponent::_narrow( obj ) ;
287     pobj = Engines::MPIObject::_narrow(obj) ;
288     if( _numproc == 0 )
289       _NS->Register(iobject, component_registerName.c_str()) ;
290     // Root recupere les ior des composants des autre process
291     BCastIOR(_orb,pobj,false);
292
293     return iobject._retn();
294   }
295   
296   //--- try C++
297
298   std::string impl_name = std::string ("lib") + genericRegisterName +std::string("Engine.so");
299   if (_library_map.count(impl_name) != 0) // C++ component
300     {
301       void* handle = _library_map[impl_name];
302       iobject = createMPIInstance(genericRegisterName,
303                                   handle);
304       return iobject._retn();
305     }
306
307   return Engines::EngineComponent::_nil() ;
308 }
309
310 Engines::EngineComponent_ptr
311 Engines_MPIContainer_i::createMPIInstance(std::string genericRegisterName,
312                                           void *handle)
313 {
314   Engines::EngineComponent_var iobject;
315   Engines::MPIObject_var pobj;
316   // --- find the factory
317
318   std::string aGenRegisterName = genericRegisterName;
319   std::string factory_name = aGenRegisterName + std::string("Engine_factory");
320
321   typedef  PortableServer::ObjectId * (*MPIFACTORY_FUNCTION)
322     (CORBA::ORB_ptr,
323      PortableServer::POA_ptr, 
324      PortableServer::ObjectId *, 
325      const char *, 
326      const char *) ;
327
328   dlerror();
329   MPIFACTORY_FUNCTION MPIComponent_factory = (MPIFACTORY_FUNCTION) dlsym(handle, factory_name.c_str());
330
331   if ( !MPIComponent_factory )
332     {
333       INFOS( "[" << _numproc << "] Can't resolve symbol: " + factory_name );
334       SCRUTE( dlerror() );
335       pobj = Engines::MPIObject::_nil();
336       BCastIOR(_orb,pobj,false);
337       return Engines::EngineComponent::_nil();
338     }
339
340   // --- create instance
341
342   iobject = Engines::EngineComponent::_nil() ;
343
344   try
345     {
346       _numInstanceMutex.lock() ; // lock on the instance number
347       _numInstance++ ;
348       int numInstance = _numInstance ;
349       _numInstanceMutex.unlock() ;
350
351       char aNumI[12];
352       sprintf( aNumI , "%d" , numInstance ) ;
353       std::string instanceName = aGenRegisterName + "_inst_" + aNumI ;
354       std::string component_registerName =
355         _containerName + "/" + instanceName;
356
357       // --- Instanciate required CORBA object
358
359       PortableServer::ObjectId *id ; //not owner, do not delete (nore use var)
360       id = (MPIComponent_factory) ( _orb, _poa, _id, instanceName.c_str(), aGenRegisterName.c_str() ) ;
361
362       // --- get reference from id
363
364       CORBA::Object_var obj = _poa->id_to_reference(*id);
365       iobject = Engines::EngineComponent::_narrow( obj ) ;
366       pobj = Engines::MPIObject::_narrow(obj) ;
367
368       _listInstances_map[instanceName] = iobject;
369       _cntInstances_map[aGenRegisterName] += 1;
370
371       // --- register the engine under the name
372       //     containerName(.dir)/instanceName(.object)
373
374       if( _numproc == 0 ){
375         _NS->Register( iobject , component_registerName.c_str() ) ;
376         MESSAGE( component_registerName.c_str() << " bound" ) ;
377       }
378       // Root recupere les ior des composants des autre process
379       BCastIOR(_orb,pobj,false);
380
381     }
382   catch(const std::exception &ex){
383     INFOS( ex.what() ) ;
384     return Engines::EngineComponent::_nil();
385   }
386   return iobject._retn();
387 }
388
389 // Load component
390 Engines::EngineComponent_ptr Engines_MPIContainer_i::load_impl(const char* nameToRegister,
391                                                  const char* componentName)
392 {
393   pthread_t *th;
394   if(_numproc == 0){
395     th = new pthread_t[_nbproc];
396     for(int ip=1;ip<_nbproc;ip++){
397       thread_st *st = new thread_st;
398       st->ip = ip;
399       st->tior = _tior;
400       st->nameToRegister = nameToRegister;
401       st->compoName = componentName;
402       pthread_create(&(th[ip]),NULL,th_loadimpl,(void*)st);
403     }
404   }
405
406   Engines::EngineComponent_ptr cptr =  Lload_impl(nameToRegister,componentName);
407
408   if(_numproc == 0){
409     for(int ip=1;ip<_nbproc;ip++)
410       pthread_join(th[ip],NULL);
411     delete th;
412   }
413
414   return cptr;
415 }
416
417 // Load component
418 Engines::EngineComponent_ptr Engines_MPIContainer_i::Lload_impl(
419                                    const char* nameToRegister,
420                                    const char* componentName)
421 {
422   Engines::EngineComponent_var iobject;
423   Engines::MPIObject_var pobj;
424   char cproc[4];
425
426   sprintf(cproc,"_%d",_numproc);
427
428   BEGIN_OF("[" << _numproc << "] MPIContainer_i::Lload_impl");
429
430   _numInstanceMutex.lock() ; // lock on the instance number
431   _numInstance++ ;
432   char _aNumI[12];
433   sprintf(_aNumI,"%d",_numInstance) ;
434
435   std::string _impl_name = componentName;
436   std::string _nameToRegister = nameToRegister;
437   std::string instanceName = _nameToRegister + "_inst_" + _aNumI + cproc;
438   MESSAGE("[" << _numproc << "] instanceName=" << instanceName);
439
440   std::string absolute_impl_name(_impl_name);
441   MESSAGE("[" << _numproc << "] absolute_impl_name=" << absolute_impl_name);
442   void * handle = dlopen(absolute_impl_name.c_str(), RTLD_LAZY);
443   if(!handle){
444     INFOS("[" << _numproc << "] Can't load shared library : " << absolute_impl_name);
445     INFOS("[" << _numproc << "] error dlopen: " << dlerror());
446     return Engines::EngineComponent::_nil() ;
447   }
448
449   std::string factory_name = _nameToRegister + std::string("Engine_factory");
450   MESSAGE("[" << _numproc << "] factory_name=" << factory_name) ;
451
452   dlerror();
453   PortableServer::ObjectId * (*MPIComponent_factory) (CORBA::ORB_ptr,
454                                                   PortableServer::POA_ptr,
455                                                   PortableServer::ObjectId *,
456                                                   const char *,
457                                                   const char *) =
458     (PortableServer::ObjectId * (*) (CORBA::ORB_ptr,
459                                      PortableServer::POA_ptr, 
460                                      PortableServer::ObjectId *, 
461                                      const char *, 
462                                      const char *)) 
463     dlsym(handle, factory_name.c_str());
464
465   char *error ;
466   if ((error = dlerror()) != NULL){
467     // Try to load a sequential component
468     MESSAGE("[" << _numproc << "] Try to load a sequential component");
469     _numInstanceMutex.unlock() ;
470     iobject = Engines_Container_i::load_impl(nameToRegister,componentName);
471     if( CORBA::is_nil(iobject) ) return Engines::EngineComponent::_duplicate(iobject);
472   }
473   else{
474     // Instanciation du composant parallele
475     MESSAGE("[" << _numproc << "] Try to load a parallel component");
476     PortableServer::ObjectId * id = (MPIComponent_factory)
477       (_orb, _poa, _id, instanceName.c_str(), _nameToRegister.c_str());
478     // get reference from id
479     CORBA::Object_var o = _poa->id_to_reference(*id);
480     pobj = Engines::MPIObject::_narrow(o) ;
481     iobject = Engines::EngineComponent::_narrow(o) ;
482   }
483
484   if( _numproc == 0 ){
485     // utiliser + tard le registry ici :
486     // register the engine under the name containerName.dir/nameToRegister.object
487     std::string component_registerName = _containerName + "/" + _nameToRegister;
488     _NS->Register(iobject, component_registerName.c_str()) ;
489   }
490
491   _numInstanceMutex.unlock() ;
492
493   // Root recupere les ior des composants des autre process
494   BCastIOR(_orb,pobj,false);
495
496   END_OF("[" <<_numproc << "] MPIContainer_i::Lload_impl");
497   return Engines::EngineComponent::_duplicate(iobject);
498
499 }
500
501 void Engines_MPIContainer_i::remove_impl(Engines::EngineComponent_ptr component_i)
502 {
503   Engines::MPIObject_ptr pcptr;
504   Engines::MPIObject_ptr spcptr;
505
506   pthread_t *th;
507   if(_numproc == 0){
508     pcptr = (Engines::MPIObject_ptr)component_i;
509     th = new pthread_t[_nbproc];
510     for(int ip=1;ip<_nbproc;ip++){
511       thread_st *st = new thread_st;
512       st->ip = ip;
513       st->tior = _tior;
514       spcptr = Engines::MPIObject::_narrow((*(pcptr->tior()))[ip]);
515       st->cptr = (Engines::EngineComponent_ptr)spcptr;
516       pthread_create(&(th[ip]),NULL,th_removeimpl,(void*)st);
517     }
518   }
519
520   ASSERT(! CORBA::is_nil(component_i));
521   std::string instanceName = component_i->instanceName() ;
522   MESSAGE("[" << _numproc << "] unload component " << instanceName);
523   _numInstanceMutex.lock() ; // lock on the remove on handle_map
524   _listInstances_map.erase(instanceName);
525   _numInstanceMutex.unlock() ;
526   component_i->destroy() ;
527   if(_numproc == 0)
528     _NS->Destroy_Name(instanceName.c_str());
529
530   if(_numproc == 0){
531     for(int ip=1;ip<_nbproc;ip++)
532       pthread_join(th[ip],NULL);
533     delete th;
534   }
535
536 }
537
538 void Engines_MPIContainer_i::finalize_removal()
539 {
540   pthread_t *th;
541   if(_numproc == 0){
542     th = new pthread_t[_nbproc];
543     for(int ip=1;ip<_nbproc;ip++){
544       thread_st *st = new thread_st;
545       st->ip = ip;
546       st->tior = _tior;
547       pthread_create(&(th[ip]),NULL,th_finalizeremoval,(void*)st);
548     }
549   }
550
551   _numInstanceMutex.lock(); // lock to be alone
552   // (see decInstanceCnt, load_component_Library)
553   std::map<std::string, void *>::iterator ith;
554   for (ith = _toRemove_map.begin(); ith != _toRemove_map.end(); ith++)
555   {
556     void *handle = (*ith).second;
557     std::string impl_name= (*ith).first;
558     if (handle)
559     {
560       SCRUTE(handle);
561       SCRUTE(impl_name);
562       //        dlclose(handle);                // SALOME unstable after ...
563       //        _library_map.erase(impl_name);
564     }
565   }
566   _toRemove_map.clear();
567   _numInstanceMutex.unlock();
568
569   if(_numproc == 0){
570     for(int ip=1;ip<_nbproc;ip++)
571       pthread_join(th[ip],NULL);
572     delete th;
573   }
574 }
575
576 void *th_loadcomponentlibrary(void *s)
577 {
578   thread_st *st = (thread_st*)s;
579   char* reason;
580   (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->load_component_Library(st->compoName.c_str(),reason);
581   CORBA::string_free(reason);
582   return NULL;
583 }
584
585 void *th_createcomponentinstance(void *s)
586 {
587   thread_st *st = (thread_st*)s;
588   (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->create_component_instance(st->compoName.c_str());
589   return NULL;
590 }
591
592 void *th_loadimpl(void *s)
593 {
594   thread_st *st = (thread_st*)s;
595   (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->load_impl(st->nameToRegister.c_str(),st->compoName.c_str());
596   return NULL;
597 }
598
599 void *th_removeimpl(void *s)
600 {
601   thread_st *st = (thread_st*)s;
602   (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->remove_impl(st->cptr);
603   return NULL;
604 }
605
606 void *th_finalizeremoval(void *s)
607 {
608   thread_st *st = (thread_st*)s;
609   (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->finalize_removal();
610   return NULL;
611 }