]> SALOME platform Git repositories - modules/kernel.git/blob - src/MPIContainer/MPIContainer_i.cxx
Salome HOME
updated copyright message
[modules/kernel.git] / src / MPIContainer / MPIContainer_i.cxx
1 // Copyright (C) 2007-2023  CEA, EDF, OPEN CASCADE
2 //
3 // Copyright (C) 2003-2007  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 // CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
5 //
6 // This library is free software; you can redistribute it and/or
7 // modify it under the terms of the GNU Lesser General Public
8 // License as published by the Free Software Foundation; either
9 // version 2.1 of the License, or (at your option) any later version.
10 //
11 // This library is distributed in the hope that it will be useful,
12 // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 // Lesser General Public License for more details.
15 //
16 // You should have received a copy of the GNU Lesser General Public
17 // License along with this library; if not, write to the Free Software
18 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
19 //
20 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 //
22
23 #include <iostream>
24 #include <dlfcn.h>
25 #include <stdio.h>
26 #include "Basics_Utils.hxx"
27 #include "SALOME_Component_i.hxx"
28 #include "MPIContainer_i.hxx"
29 #include "SALOME_NamingService.hxx"
30 #include "Utils_SINGLETON.hxx"
31 #include "OpUtil.hxx"
32 #include "utilities.h"
33 #include <time.h>
34 #include <sys/time.h>
35 #include <pthread.h>  // must be before Python.h !
36
37 #ifdef _XOPEN_SOURCE
38 #undef _XOPEN_SOURCE
39 #endif
40
41 #include <Python.h>
42 #include "Container_init_python.hxx"
43
44 // L'appel au registry SALOME ne se fait que pour le process 0
45 Engines_MPIContainer_i::Engines_MPIContainer_i(CORBA::ORB_ptr orb, 
46                                                PortableServer::POA_ptr poa,
47                                                char * containerName,
48                                                int argc, char *argv[]) 
49   : Engines_Container_i(orb,poa,containerName,argc,argv,nullptr,false)
50 {
51
52   _id = _poa->activate_object(this);
53   CORBA::Object_var obj=_poa->id_to_reference(*_id);
54   Engines::Container_var pCont = Engines::Container::_narrow(obj);
55   _remove_ref();
56
57   if(_numproc==0){
58
59     _NS = new SALOME_NamingService();
60     _NS->init_orb( CORBA::ORB::_duplicate(_orb) ) ;
61
62     std::string hostname = Kernel_Utils::GetHostname();
63     _containerName = SALOME_NamingService_Abstract::BuildContainerNameForNS(containerName,hostname.c_str());
64     SCRUTE(_containerName);
65     _NS->Register(pCont, _containerName.c_str());
66
67   }
68
69   // Root recupere les ior des container des autre process
70   Engines::MPIObject_var pobj = POA_Engines::MPIContainer::_this();
71   BCastIOR(_orb,pobj,true);
72 }
73
74 Engines_MPIContainer_i::Engines_MPIContainer_i() 
75   : Engines_Container_i()
76 {
77 }
78
79 Engines_MPIContainer_i::~Engines_MPIContainer_i(void)
80 {
81   MESSAGE("[" << _numproc << "] Engines_MPIContainer_i::~Engines_MPIContainer_i()");
82 }
83
84 // Load component
85 void Engines_MPIContainer_i::Shutdown()
86 {
87   int ip;
88   MESSAGE("[" << _numproc << "] shutdown of MPI Corba Server");
89   if( _numproc == 0 ){
90     _NS->Destroy_FullDirectory(_containerName.c_str());
91     _NS->Destroy_Name(_containerName.c_str());
92     for(ip= 1;ip<_nbproc;ip++)
93       (Engines::MPIContainer::_narrow((*_tior)[ip]))->Shutdown();
94   }
95
96   std::map<std::string, Engines::EngineComponent_var>::iterator itm;
97   for (itm = _listInstances_map.begin(); itm != _listInstances_map.end(); itm++)
98     {
99       try
100         {
101           itm->second->destroy();
102         }
103       catch(const CORBA::Exception& e)
104         {
105           // ignore this entry and continue
106         }
107       catch(...)
108         {
109           // ignore this entry and continue
110         }
111     }
112
113   _orb->shutdown(0);
114
115 }
116
117 // Load a component library
118 bool Engines_MPIContainer_i::load_component_Library(const char* componentName, CORBA::String_out reason)
119 {
120   reason=CORBA::string_dup("");
121
122   pthread_t *th = 0;
123   if(_numproc == 0){
124     th = new pthread_t[_nbproc];
125     for(int ip=1;ip<_nbproc;ip++){
126       thread_st *st = new thread_st;
127       st->ip = ip;
128       st->tior = _tior;
129       st->compoName = componentName;
130       pthread_create(&(th[ip]),NULL,th_loadcomponentlibrary,(void*)st);
131     }
132   }
133
134   bool ret = Lload_component_Library(componentName);
135
136   if(_numproc == 0){
137     for(int ip=1;ip<_nbproc;ip++)
138       pthread_join(th[ip],NULL);
139     delete th;
140   }
141   return ret;
142 }
143
144 bool Engines_MPIContainer_i::Lload_component_Library(const char* componentName)
145 {
146   std::string aCompName = componentName;
147
148   // --- try dlopen C++ component
149
150 #ifdef __APPLE__
151   std::string impl_name = std::string ("lib") + aCompName + std::string("Engine.dylib");
152 #else
153   std::string impl_name = std::string ("lib") + aCompName + std::string("Engine.so");
154 #endif
155   
156   _numInstanceMutex.lock(); // lock to be alone 
157   // (see decInstanceCnt, finalize_removal))
158   if (_toRemove_map[impl_name]) _toRemove_map.erase(impl_name);
159   if (_library_map[impl_name])
160     {
161       MESSAGE("[" << _numproc << "] Library " << impl_name << " already loaded");
162       _numInstanceMutex.unlock();
163       return true;
164     }
165   
166   void* handle;
167   handle = dlopen( impl_name.c_str() , RTLD_LAZY | RTLD_GLOBAL ) ;
168   if ( handle )
169     {
170       _library_map[impl_name] = handle;
171       _numInstanceMutex.unlock();
172       MESSAGE("[" << _numproc << "] Library " << impl_name << " loaded");
173       return true;
174     }
175   else
176     {
177       MESSAGE("[" << _numproc << "] Can't load shared library : " << impl_name);
178       MESSAGE("[" << _numproc << "] error dlopen: " << dlerror());
179     }
180   _numInstanceMutex.unlock();
181
182   // --- try import Python component
183
184   INFOS("[" << _numproc << "] try import Python component "<<componentName);
185   if (_isSupervContainer)
186     {
187       INFOS("[" << _numproc << "] Supervision Container does not support Python Component Engines");
188       return false;
189     }
190   if (_library_map[aCompName])
191     {
192       return true; // Python Component, already imported
193     }
194   else
195     {
196       Py_ACQUIRE_NEW_THREAD;
197       PyObject *mainmod = PyImport_AddModule((char *)"__main__");
198       PyObject *globals = PyModule_GetDict(mainmod);
199       PyObject *pyCont = PyDict_GetItemString(globals, "pyCont");
200       PyObject *result = PyObject_CallMethod(pyCont,
201                                              (char*)"import_component",
202                                              (char*)"s",componentName);
203       std::string ret= PyUnicode_AsUTF8(result);
204       SCRUTE(ret);
205       Py_RELEASE_NEW_THREAD;
206   
207       if (ret=="") // import possible: Python component
208         {
209           _library_map[aCompName] = (void *)pyCont; // any non O value OK
210           MESSAGE("[" << _numproc << "] import Python: "<<aCompName<<" OK");
211           return true;
212         }
213     }
214   return false;
215 }
216
217 // Create an instance of component
218 Engines::EngineComponent_ptr
219 Engines_MPIContainer_i::create_component_instance_env( const char* componentName,
220                                                        const Engines::FieldsDict& /*env*/,
221                                                        CORBA::String_out reason)
222 {
223   reason=CORBA::string_dup("");
224
225   pthread_t *th = 0;
226   if(_numproc == 0){
227     th = new pthread_t[_nbproc];
228     for(int ip=1;ip<_nbproc;ip++){
229       thread_st *st = new thread_st;
230       st->ip = ip;
231       st->tior = _tior;
232       st->compoName = componentName;
233       pthread_create(&(th[ip]),NULL,th_createcomponentinstance,(void*)st);
234     }
235   }
236
237   Engines::EngineComponent_ptr cptr = Lcreate_component_instance(componentName);
238
239   if(_numproc == 0){
240     for(int ip=1;ip<_nbproc;ip++)
241       pthread_join(th[ip],NULL);
242     delete th;
243   }
244
245   return cptr;
246 }
247
248 Engines::EngineComponent_ptr
249 Engines_MPIContainer_i::Lcreate_component_instance( const char* genericRegisterName )
250 {
251   Engines::EngineComponent_var iobject = Engines::EngineComponent::_nil() ;
252   Engines::MPIObject_var pobj;
253
254   std::string aCompName = genericRegisterName;
255   if (_library_map[aCompName]) { // Python component
256     if (_isSupervContainer) {
257       INFOS("Supervision Container does not support Python Component Engines");
258       return Engines::EngineComponent::_nil();
259     }
260     _numInstanceMutex.lock() ; // lock on the instance number
261     _numInstance++ ;
262     int numInstance = _numInstance ;
263     _numInstanceMutex.unlock() ;
264
265     char aNumI[12];
266     sprintf( aNumI , "%d" , numInstance ) ;
267     std::string instanceName = aCompName + "_inst_" + aNumI ;
268     std::string component_registerName =
269       _containerName + "/" + instanceName;
270
271     Py_ACQUIRE_NEW_THREAD;
272     PyObject *mainmod = PyImport_AddModule((char*)"__main__");
273     PyObject *globals = PyModule_GetDict(mainmod);
274     PyObject *pyCont = PyDict_GetItemString(globals, "pyCont");
275     PyObject *result = PyObject_CallMethod(pyCont,
276                                            (char*)"create_component_instance",
277                                            (char*)"ss",
278                                            aCompName.c_str(),
279                                            instanceName.c_str());
280     const char *ior;
281     const char *error;
282     PyArg_ParseTuple(result,"ss", &ior, &error);
283     std::string iors = ior;
284     SCRUTE(iors);
285     Py_RELEASE_NEW_THREAD;
286   
287     CORBA::Object_var obj = _orb->string_to_object(iors.c_str());
288     iobject = Engines::EngineComponent::_narrow( obj ) ;
289     pobj = Engines::MPIObject::_narrow(obj) ;
290     if( _numproc == 0 )
291       _NS->Register(iobject, component_registerName.c_str()) ;
292     // Root recupere les ior des composants des autre process
293     BCastIOR(_orb,pobj,false);
294
295     return iobject._retn();
296   }
297   
298   //--- try C++
299
300 #ifdef __APPLE__
301   std::string impl_name = std::string ("lib") + genericRegisterName +std::string("Engine.dylib");
302 #else
303   std::string impl_name = std::string ("lib") + genericRegisterName +std::string("Engine.so");
304 #endif
305   if (_library_map.count(impl_name) != 0) // C++ component
306     {
307       void* handle = _library_map[impl_name];
308       iobject = createMPIInstance(genericRegisterName,
309                                   handle);
310       return iobject._retn();
311     }
312
313   return Engines::EngineComponent::_nil() ;
314 }
315
316 Engines::EngineComponent_ptr
317 Engines_MPIContainer_i::createMPIInstance(std::string genericRegisterName,
318                                           void *handle)
319 {
320   Engines::EngineComponent_var iobject;
321   Engines::MPIObject_var pobj;
322   // --- find the factory
323
324   std::string aGenRegisterName = genericRegisterName;
325   std::string factory_name = aGenRegisterName + std::string("Engine_factory");
326
327   typedef  PortableServer::ObjectId * (*MPIFACTORY_FUNCTION)
328     (CORBA::ORB_ptr,
329      PortableServer::POA_ptr, 
330      PortableServer::ObjectId *, 
331      const char *, 
332      const char *) ;
333
334   dlerror();
335   MPIFACTORY_FUNCTION MPIComponent_factory = (MPIFACTORY_FUNCTION) dlsym(handle, factory_name.c_str());
336
337   if ( !MPIComponent_factory )
338     {
339       INFOS( "[" << _numproc << "] Can't resolve symbol: " + factory_name );
340       SCRUTE( dlerror() );
341       pobj = Engines::MPIObject::_nil();
342       BCastIOR(_orb,pobj,false);
343       return Engines::EngineComponent::_nil();
344     }
345
346   // --- create instance
347
348   iobject = Engines::EngineComponent::_nil() ;
349
350   try
351     {
352       _numInstanceMutex.lock() ; // lock on the instance number
353       _numInstance++ ;
354       int numInstance = _numInstance ;
355       _numInstanceMutex.unlock() ;
356
357       char aNumI[12];
358       sprintf( aNumI , "%d" , numInstance ) ;
359       std::string instanceName = aGenRegisterName + "_inst_" + aNumI ;
360       std::string component_registerName =
361         _containerName + "/" + instanceName;
362
363       // --- Instantiate required CORBA object
364
365       PortableServer::ObjectId *id ; //not owner, do not delete (nore use var)
366       id = (MPIComponent_factory) ( _orb, _poa, _id, instanceName.c_str(), aGenRegisterName.c_str() ) ;
367
368       // --- get reference from id
369
370       CORBA::Object_var obj = _poa->id_to_reference(*id);
371       iobject = Engines::EngineComponent::_narrow( obj ) ;
372       pobj = Engines::MPIObject::_narrow(obj) ;
373
374       _listInstances_map[instanceName] = iobject;
375       _cntInstances_map[aGenRegisterName] += 1;
376
377       // --- register the engine under the name
378       //     containerName(.dir)/instanceName(.object)
379
380       if( _numproc == 0 ){
381         _NS->Register( iobject , component_registerName.c_str() ) ;
382         MESSAGE( component_registerName.c_str() << " bound" ) ;
383       }
384       // Root recupere les ior des composants des autre process
385       BCastIOR(_orb,pobj,false);
386
387     }
388   catch(const std::exception &ex){
389     INFOS( ex.what() ) ;
390     return Engines::EngineComponent::_nil();
391   }
392   return iobject._retn();
393 }
394
395 // Load component
396 Engines::EngineComponent_ptr Engines_MPIContainer_i::load_impl(const char* nameToRegister,
397                                                  const char* componentName)
398 {
399   pthread_t *th = 0;
400   if(_numproc == 0){
401     th = new pthread_t[_nbproc];
402     for(int ip=1;ip<_nbproc;ip++){
403       thread_st *st = new thread_st;
404       st->ip = ip;
405       st->tior = _tior;
406       st->nameToRegister = nameToRegister;
407       st->compoName = componentName;
408       pthread_create(&(th[ip]),NULL,th_loadimpl,(void*)st);
409     }
410   }
411
412   Engines::EngineComponent_ptr cptr =  Lload_impl(nameToRegister,componentName);
413
414   if(_numproc == 0){
415     for(int ip=1;ip<_nbproc;ip++)
416       pthread_join(th[ip],NULL);
417     delete th;
418   }
419
420   return cptr;
421 }
422
423 // Load component
424 Engines::EngineComponent_ptr Engines_MPIContainer_i::Lload_impl(
425                                    const char* nameToRegister,
426                                    const char* componentName)
427 {
428   Engines::EngineComponent_var iobject;
429   Engines::MPIObject_var pobj;
430   char cproc[4];
431
432   sprintf(cproc,"_%d",_numproc);
433
434   BEGIN_OF("[" << _numproc << "] MPIContainer_i::Lload_impl");
435
436   _numInstanceMutex.lock() ; // lock on the instance number
437   _numInstance++ ;
438   char _aNumI[12];
439   sprintf(_aNumI,"%d",_numInstance) ;
440
441   std::string _impl_name = componentName;
442   std::string _nameToRegister = nameToRegister;
443   std::string instanceName = _nameToRegister + "_inst_" + _aNumI + cproc;
444   MESSAGE("[" << _numproc << "] instanceName=" << instanceName);
445
446   std::string absolute_impl_name(_impl_name);
447   MESSAGE("[" << _numproc << "] absolute_impl_name=" << absolute_impl_name);
448   void * handle = dlopen(absolute_impl_name.c_str(), RTLD_LAZY | RTLD_GLOBAL);
449   if(!handle){
450     INFOS("[" << _numproc << "] Can't load shared library : " << absolute_impl_name);
451     INFOS("[" << _numproc << "] error dlopen: " << dlerror());
452     return Engines::EngineComponent::_nil() ;
453   }
454
455   std::string factory_name = _nameToRegister + std::string("Engine_factory");
456   MESSAGE("[" << _numproc << "] factory_name=" << factory_name) ;
457
458   dlerror();
459   PortableServer::ObjectId * (*MPIComponent_factory) (CORBA::ORB_ptr,
460                                                   PortableServer::POA_ptr,
461                                                   PortableServer::ObjectId *,
462                                                   const char *,
463                                                   const char *) =
464     (PortableServer::ObjectId * (*) (CORBA::ORB_ptr,
465                                      PortableServer::POA_ptr, 
466                                      PortableServer::ObjectId *, 
467                                      const char *, 
468                                      const char *)) 
469     dlsym(handle, factory_name.c_str());
470
471   char *error ;
472   if ((error = dlerror()) != NULL){
473     // Try to load a sequential component
474     MESSAGE("[" << _numproc << "] Try to load a sequential component");
475     _numInstanceMutex.unlock() ;
476     iobject = Engines_Container_i::load_impl(nameToRegister,componentName);
477     if( CORBA::is_nil(iobject) ) return Engines::EngineComponent::_duplicate(iobject);
478   }
479   else{
480     // Instanciation du composant parallele
481     MESSAGE("[" << _numproc << "] Try to load a parallel component");
482     PortableServer::ObjectId * id = (MPIComponent_factory)
483       (_orb, _poa, _id, instanceName.c_str(), _nameToRegister.c_str());
484     // get reference from id
485     CORBA::Object_var o = _poa->id_to_reference(*id);
486     pobj = Engines::MPIObject::_narrow(o) ;
487     iobject = Engines::EngineComponent::_narrow(o) ;
488   }
489
490   if( _numproc == 0 ){
491     // utiliser + tard le registry ici :
492     // register the engine under the name containerName.dir/nameToRegister.object
493     std::string component_registerName = _containerName + "/" + _nameToRegister;
494     _NS->Register(iobject, component_registerName.c_str()) ;
495   }
496
497   _numInstanceMutex.unlock() ;
498
499   // Root recupere les ior des composants des autre process
500   BCastIOR(_orb,pobj,false);
501
502   END_OF("[" <<_numproc << "] MPIContainer_i::Lload_impl");
503   return Engines::EngineComponent::_duplicate(iobject);
504
505 }
506
507 void Engines_MPIContainer_i::remove_impl(Engines::EngineComponent_ptr component_i)
508 {
509   Engines::MPIObject_ptr pcptr;
510   Engines::MPIObject_ptr spcptr;
511
512   pthread_t *th = 0;
513   if(_numproc == 0){
514     pcptr = (Engines::MPIObject_ptr)component_i;
515     th = new pthread_t[_nbproc];
516     for(int ip=1;ip<_nbproc;ip++){
517       thread_st *st = new thread_st;
518       st->ip = ip;
519       st->tior = _tior;
520       spcptr = Engines::MPIObject::_narrow((*(pcptr->tior()))[ip]);
521       st->cptr = (Engines::EngineComponent_ptr)spcptr;
522       pthread_create(&(th[ip]),NULL,th_removeimpl,(void*)st);
523     }
524   }
525
526   ASSERT(! CORBA::is_nil(component_i));
527   std::string instanceName = component_i->instanceName() ;
528   MESSAGE("[" << _numproc << "] unload component " << instanceName);
529   _numInstanceMutex.lock() ; // lock on the remove on handle_map
530   _listInstances_map.erase(instanceName);
531   _numInstanceMutex.unlock() ;
532   component_i->destroy() ;
533   if(_numproc == 0)
534     _NS->Destroy_Name(instanceName.c_str());
535
536   if(_numproc == 0){
537     for(int ip=1;ip<_nbproc;ip++)
538       pthread_join(th[ip],NULL);
539     delete th;
540   }
541
542 }
543
544 void Engines_MPIContainer_i::finalize_removal()
545 {
546   pthread_t *th = 0;
547   if(_numproc == 0){
548     th = new pthread_t[_nbproc];
549     for(int ip=1;ip<_nbproc;ip++){
550       thread_st *st = new thread_st;
551       st->ip = ip;
552       st->tior = _tior;
553       pthread_create(&(th[ip]),NULL,th_finalizeremoval,(void*)st);
554     }
555   }
556
557   _numInstanceMutex.lock(); // lock to be alone
558   // (see decInstanceCnt, load_component_Library)
559   std::map<std::string, void *>::iterator ith;
560   for (ith = _toRemove_map.begin(); ith != _toRemove_map.end(); ith++)
561   {
562     void *handle = (*ith).second;
563     std::string impl_name= (*ith).first;
564     if (handle)
565     {
566       SCRUTE(handle);
567       SCRUTE(impl_name);
568       //        dlclose(handle);                // SALOME unstable after ...
569       //        _library_map.erase(impl_name);
570     }
571   }
572   _toRemove_map.clear();
573   _numInstanceMutex.unlock();
574
575   if(_numproc == 0){
576     for(int ip=1;ip<_nbproc;ip++)
577       pthread_join(th[ip],NULL);
578     delete th;
579   }
580 }
581
582 void *th_loadcomponentlibrary(void *s)
583 {
584   thread_st *st = (thread_st*)s;
585   char* reason;
586   (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->load_component_Library(st->compoName.c_str(),reason);
587   CORBA::string_free(reason);
588   return NULL;
589 }
590
591 void *th_createcomponentinstance(void *s)
592 {
593   thread_st *st = (thread_st*)s;
594   (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->create_component_instance(st->compoName.c_str());
595   return NULL;
596 }
597
598 void *th_loadimpl(void *s)
599 {
600   thread_st *st = (thread_st*)s;
601   (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->load_impl(st->nameToRegister.c_str(),st->compoName.c_str());
602   return NULL;
603 }
604
605 void *th_removeimpl(void *s)
606 {
607   thread_st *st = (thread_st*)s;
608   (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->remove_impl(st->cptr);
609   return NULL;
610 }
611
612 void *th_finalizeremoval(void *s)
613 {
614   thread_st *st = (thread_st*)s;
615   (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->finalize_removal();
616   return NULL;
617 }