Salome HOME
MPIContyainer_i.cxx: Fix some compilation errors.
[modules/kernel.git] / src / MPIContainer / MPIContainer_i.cxx
1 // Copyright (C) 2007-2023  CEA, EDF, OPEN CASCADE
2 //
3 // Copyright (C) 2003-2007  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 // CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
5 //
6 // This library is free software; you can redistribute it and/or
7 // modify it under the terms of the GNU Lesser General Public
8 // License as published by the Free Software Foundation; either
9 // version 2.1 of the License, or (at your option) any later version.
10 //
11 // This library is distributed in the hope that it will be useful,
12 // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 // Lesser General Public License for more details.
15 //
16 // You should have received a copy of the GNU Lesser General Public
17 // License along with this library; if not, write to the Free Software
18 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
19 //
20 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 //
22
23 #include <iostream>
24 #include <dlfcn.h>
25 #include <stdio.h>
26 #include "Basics_Utils.hxx"
27 #include "SALOME_Component_i.hxx"
28 #include "MPIContainer_i.hxx"
29 #include "SALOME_NamingService.hxx"
30 #include "Utils_SINGLETON.hxx"
31 #include "OpUtil.hxx"
32 #include "PythonCppUtils.hxx"
33 #include "utilities.h"
34 #include <time.h>
35 #include <sys/time.h>
36 #include <pthread.h>  // must be before Python.h !
37
38 #ifdef _XOPEN_SOURCE
39 #undef _XOPEN_SOURCE
40 #endif
41
42 #include <Python.h>
43 #include "Container_init_python.hxx"
44
45 // L'appel au registry SALOME ne se fait que pour le process 0
46 Engines_MPIContainer_i::Engines_MPIContainer_i(CORBA::ORB_ptr orb,
47                                                PortableServer::POA_ptr poa,
48                                                char * containerName,
49                                                int argc, char *argv[])
50   : Engines_Container_i(orb,poa,containerName,argc,argv,nullptr,false)
51 {
52
53   _id = _poa->activate_object(this);
54   CORBA::Object_var obj=_poa->id_to_reference(*_id);
55   Engines::Container_var pCont = Engines::Container::_narrow(obj);
56   _remove_ref();
57
58   if(_numproc==0){
59
60     _NS = new SALOME_NamingService();
61     _NS->init_orb( CORBA::ORB::_duplicate(_orb) ) ;
62
63     std::string hostname = Kernel_Utils::GetHostname();
64     _containerName = SALOME_NamingService_Abstract::BuildContainerNameForNS(containerName,hostname.c_str());
65     SCRUTE(_containerName);
66     _NS->Register(pCont, _containerName.c_str());
67
68   }
69
70   // Root recupere les ior des container des autre process
71   Engines::MPIObject_var pobj = POA_Engines::MPIContainer::_this();
72   BCastIOR(_orb,pobj,true);
73 }
74
75 Engines_MPIContainer_i::Engines_MPIContainer_i()
76   : Engines_Container_i()
77 {
78 }
79
80 Engines_MPIContainer_i::~Engines_MPIContainer_i(void)
81 {
82   MESSAGE("[" << _numproc << "] Engines_MPIContainer_i::~Engines_MPIContainer_i()");
83 }
84
85 // Load component
86 void Engines_MPIContainer_i::Shutdown()
87 {
88   int ip;
89   MESSAGE("[" << _numproc << "] shutdown of MPI Corba Server");
90   if( _numproc == 0 ){
91     _NS->Destroy_FullDirectory(_containerName.c_str());
92     _NS->Destroy_Name(_containerName.c_str());
93     for(ip= 1;ip<_nbproc;ip++)
94       (Engines::MPIContainer::_narrow((*_tior)[ip]))->Shutdown();
95   }
96
97   std::map<std::string, Engines::EngineComponent_var>::iterator itm;
98   for (itm = _listInstances_map.begin(); itm != _listInstances_map.end(); itm++)
99     {
100       try
101         {
102           itm->second->destroy();
103         }
104       catch(const CORBA::Exception& e)
105         {
106           // ignore this entry and continue
107         }
108       catch(...)
109         {
110           // ignore this entry and continue
111         }
112     }
113
114   _orb->shutdown(0);
115
116 }
117
118 // Load a component library
119 bool Engines_MPIContainer_i::load_component_Library(const char* componentName, CORBA::String_out reason)
120 {
121   reason=CORBA::string_dup("");
122
123   pthread_t *th = 0;
124   if(_numproc == 0){
125     th = new pthread_t[_nbproc];
126     for(int ip=1;ip<_nbproc;ip++){
127       thread_st *st = new thread_st;
128       st->ip = ip;
129       st->tior = _tior;
130       st->compoName = componentName;
131       pthread_create(&(th[ip]),NULL,th_loadcomponentlibrary,(void*)st);
132     }
133   }
134
135   bool ret = Lload_component_Library(componentName);
136
137   if(_numproc == 0){
138     for(int ip=1;ip<_nbproc;ip++)
139       pthread_join(th[ip],NULL);
140     delete th;
141   }
142   return ret;
143 }
144
145 bool Engines_MPIContainer_i::Lload_component_Library(const char* componentName)
146 {
147   std::string aCompName = componentName;
148
149   // --- try dlopen C++ component
150
151 #ifdef __APPLE__
152   std::string impl_name = std::string ("lib") + aCompName + std::string("Engine.dylib");
153 #else
154   std::string impl_name = std::string ("lib") + aCompName + std::string("Engine.so");
155 #endif
156
157   _numInstanceMutex.lock(); // lock to be alone
158   // (see decInstanceCnt, finalize_removal))
159   if (_toRemove_map[impl_name]) _toRemove_map.erase(impl_name);
160   if (_library_map[impl_name])
161     {
162       MESSAGE("[" << _numproc << "] Library " << impl_name << " already loaded");
163       _numInstanceMutex.unlock();
164       return true;
165     }
166
167   void* handle;
168   handle = dlopen( impl_name.c_str() , RTLD_LAZY | RTLD_GLOBAL ) ;
169   if ( handle )
170     {
171       _library_map[impl_name] = handle;
172       _numInstanceMutex.unlock();
173       MESSAGE("[" << _numproc << "] Library " << impl_name << " loaded");
174       return true;
175     }
176   else
177     {
178       MESSAGE("[" << _numproc << "] Can't load shared library : " << impl_name);
179       MESSAGE("[" << _numproc << "] error dlopen: " << dlerror());
180     }
181   _numInstanceMutex.unlock();
182
183   // --- try import Python component
184
185   INFOS("[" << _numproc << "] try import Python component "<<componentName);
186   if (_isSupervContainer)
187     {
188       INFOS("[" << _numproc << "] Supervision Container does not support Python Component Engines");
189       return false;
190     }
191   if (_library_map[aCompName])
192     {
193       return true; // Python Component, already imported
194     }
195   else
196     {
197       PyObject *pyCont = nullptr;
198       std::string ret;
199       {
200         AutoGIL agil;
201         PyObject *mainmod = PyImport_AddModule((char *)"__main__");
202         PyObject *globals = PyModule_GetDict(mainmod);
203         pyCont = PyDict_GetItemString(globals, "pyCont");
204         PyObject *result = PyObject_CallMethod(pyCont,
205                                               (char*)"import_component",
206                                               (char*)"s",componentName);
207         ret= PyUnicode_AsUTF8(result);
208         SCRUTE(ret);
209       }
210
211       if (ret=="") // import possible: Python component
212         {
213           _library_map[aCompName] = (void *)pyCont; // any non O value OK
214           MESSAGE("[" << _numproc << "] import Python: "<<aCompName<<" OK");
215           return true;
216         }
217     }
218   return false;
219 }
220
221 // Create an instance of component
222 Engines::EngineComponent_ptr
223 Engines_MPIContainer_i::create_component_instance_env( const char* componentName,
224                                                        const Engines::FieldsDict& /*env*/,
225                                                        CORBA::String_out reason)
226 {
227   reason=CORBA::string_dup("");
228
229   pthread_t *th = 0;
230   if(_numproc == 0){
231     th = new pthread_t[_nbproc];
232     for(int ip=1;ip<_nbproc;ip++){
233       thread_st *st = new thread_st;
234       st->ip = ip;
235       st->tior = _tior;
236       st->compoName = componentName;
237       pthread_create(&(th[ip]),NULL,th_createcomponentinstance,(void*)st);
238     }
239   }
240
241   Engines::EngineComponent_ptr cptr = Lcreate_component_instance(componentName);
242
243   if(_numproc == 0){
244     for(int ip=1;ip<_nbproc;ip++)
245       pthread_join(th[ip],NULL);
246     delete th;
247   }
248
249   return cptr;
250 }
251
252 Engines::EngineComponent_ptr
253 Engines_MPIContainer_i::Lcreate_component_instance( const char* genericRegisterName )
254 {
255   Engines::EngineComponent_var iobject = Engines::EngineComponent::_nil() ;
256   Engines::MPIObject_var pobj;
257
258   std::string aCompName = genericRegisterName;
259   if (_library_map[aCompName]) { // Python component
260     if (_isSupervContainer) {
261       INFOS("Supervision Container does not support Python Component Engines");
262       return Engines::EngineComponent::_nil();
263     }
264     _numInstanceMutex.lock() ; // lock on the instance number
265     _numInstance++ ;
266     int numInstance = _numInstance ;
267     _numInstanceMutex.unlock() ;
268
269     char aNumI[12];
270     sprintf( aNumI , "%d" , numInstance ) ;
271     std::string instanceName = aCompName + "_inst_" + aNumI ;
272     std::string component_registerName =
273       _containerName + "/" + instanceName;
274
275     std::string iors;
276     {
277       AutoGIL agil;
278       PyObject *mainmod = PyImport_AddModule((char*)"__main__");
279       PyObject *globals = PyModule_GetDict(mainmod);
280       PyObject *pyCont = PyDict_GetItemString(globals, "pyCont");
281       PyObject *result = PyObject_CallMethod(pyCont,
282                                             (char*)"create_component_instance",
283                                             (char*)"ss",
284                                             aCompName.c_str(),
285                                             instanceName.c_str());
286       const char *ior;
287       const char *error;
288       PyArg_ParseTuple(result,"ss", &ior, &error);
289       iors = ior;
290       SCRUTE(iors);
291     }
292
293     CORBA::Object_var obj = _orb->string_to_object(iors.c_str());
294     iobject = Engines::EngineComponent::_narrow( obj ) ;
295     pobj = Engines::MPIObject::_narrow(obj) ;
296     if( _numproc == 0 )
297       _NS->Register(iobject, component_registerName.c_str()) ;
298     // Root recupere les ior des composants des autre process
299     BCastIOR(_orb,pobj,false);
300
301     return iobject._retn();
302   }
303
304   //--- try C++
305
306 #ifdef __APPLE__
307   std::string impl_name = std::string ("lib") + genericRegisterName +std::string("Engine.dylib");
308 #else
309   std::string impl_name = std::string ("lib") + genericRegisterName +std::string("Engine.so");
310 #endif
311   if (_library_map.count(impl_name) != 0) // C++ component
312     {
313       void* handle = _library_map[impl_name];
314       iobject = createMPIInstance(genericRegisterName,
315                                   handle);
316       return iobject._retn();
317     }
318
319   return Engines::EngineComponent::_nil() ;
320 }
321
322 Engines::EngineComponent_ptr
323 Engines_MPIContainer_i::createMPIInstance(std::string genericRegisterName,
324                                           void *handle)
325 {
326   Engines::EngineComponent_var iobject;
327   Engines::MPIObject_var pobj;
328   // --- find the factory
329
330   std::string aGenRegisterName = genericRegisterName;
331   std::string factory_name = aGenRegisterName + std::string("Engine_factory");
332
333   typedef  PortableServer::ObjectId * (*MPIFACTORY_FUNCTION)
334     (CORBA::ORB_ptr,
335      PortableServer::POA_ptr,
336      PortableServer::ObjectId *,
337      const char *,
338      const char *) ;
339
340   dlerror();
341   MPIFACTORY_FUNCTION MPIComponent_factory = (MPIFACTORY_FUNCTION) dlsym(handle, factory_name.c_str());
342
343   if ( !MPIComponent_factory )
344     {
345       INFOS( "[" << _numproc << "] Can't resolve symbol: " + factory_name );
346       SCRUTE( dlerror() );
347       pobj = Engines::MPIObject::_nil();
348       BCastIOR(_orb,pobj,false);
349       return Engines::EngineComponent::_nil();
350     }
351
352   // --- create instance
353
354   iobject = Engines::EngineComponent::_nil() ;
355
356   try
357     {
358       _numInstanceMutex.lock() ; // lock on the instance number
359       _numInstance++ ;
360       int numInstance = _numInstance ;
361       _numInstanceMutex.unlock() ;
362
363       char aNumI[12];
364       sprintf( aNumI , "%d" , numInstance ) ;
365       std::string instanceName = aGenRegisterName + "_inst_" + aNumI ;
366       std::string component_registerName =
367         _containerName + "/" + instanceName;
368
369       // --- Instantiate required CORBA object
370
371       PortableServer::ObjectId *id ; //not owner, do not delete (nore use var)
372       id = (MPIComponent_factory) ( _orb, _poa, _id, instanceName.c_str(), aGenRegisterName.c_str() ) ;
373
374       // --- get reference from id
375
376       CORBA::Object_var obj = _poa->id_to_reference(*id);
377       iobject = Engines::EngineComponent::_narrow( obj ) ;
378       pobj = Engines::MPIObject::_narrow(obj) ;
379
380       _listInstances_map[instanceName] = iobject;
381       _cntInstances_map[aGenRegisterName] += 1;
382
383       // --- register the engine under the name
384       //     containerName(.dir)/instanceName(.object)
385
386       if( _numproc == 0 ){
387         _NS->Register( iobject , component_registerName.c_str() ) ;
388         MESSAGE( component_registerName.c_str() << " bound" ) ;
389       }
390       // Root recupere les ior des composants des autre process
391       BCastIOR(_orb,pobj,false);
392
393     }
394   catch(const std::exception &ex){
395     INFOS( ex.what() ) ;
396     return Engines::EngineComponent::_nil();
397   }
398   return iobject._retn();
399 }
400
401 // Load component
402 Engines::EngineComponent_ptr Engines_MPIContainer_i::load_impl(const char* nameToRegister,
403                                                  const char* componentName)
404 {
405   pthread_t *th = 0;
406   if(_numproc == 0){
407     th = new pthread_t[_nbproc];
408     for(int ip=1;ip<_nbproc;ip++){
409       thread_st *st = new thread_st;
410       st->ip = ip;
411       st->tior = _tior;
412       st->nameToRegister = nameToRegister;
413       st->compoName = componentName;
414       pthread_create(&(th[ip]),NULL,th_loadimpl,(void*)st);
415     }
416   }
417
418   Engines::EngineComponent_ptr cptr =  Lload_impl(nameToRegister,componentName);
419
420   if(_numproc == 0){
421     for(int ip=1;ip<_nbproc;ip++)
422       pthread_join(th[ip],NULL);
423     delete th;
424   }
425
426   return cptr;
427 }
428
429 // Load component
430 Engines::EngineComponent_ptr Engines_MPIContainer_i::Lload_impl(
431                                    const char* nameToRegister,
432                                    const char* componentName)
433 {
434   Engines::EngineComponent_var iobject;
435   Engines::MPIObject_var pobj;
436   char cproc[4];
437
438   sprintf(cproc,"_%d",_numproc);
439
440   BEGIN_OF("[" << _numproc << "] MPIContainer_i::Lload_impl");
441
442   _numInstanceMutex.lock() ; // lock on the instance number
443   _numInstance++ ;
444   char _aNumI[12];
445   sprintf(_aNumI,"%d",_numInstance) ;
446
447   std::string _impl_name = componentName;
448   std::string _nameToRegister = nameToRegister;
449   std::string instanceName = _nameToRegister + "_inst_" + _aNumI + cproc;
450   MESSAGE("[" << _numproc << "] instanceName=" << instanceName);
451
452   std::string absolute_impl_name(_impl_name);
453   MESSAGE("[" << _numproc << "] absolute_impl_name=" << absolute_impl_name);
454   void * handle = dlopen(absolute_impl_name.c_str(), RTLD_LAZY | RTLD_GLOBAL);
455   if(!handle){
456     INFOS("[" << _numproc << "] Can't load shared library : " << absolute_impl_name);
457     INFOS("[" << _numproc << "] error dlopen: " << dlerror());
458     return Engines::EngineComponent::_nil() ;
459   }
460
461   std::string factory_name = _nameToRegister + std::string("Engine_factory");
462   MESSAGE("[" << _numproc << "] factory_name=" << factory_name) ;
463
464   dlerror();
465   PortableServer::ObjectId * (*MPIComponent_factory) (CORBA::ORB_ptr,
466                                                   PortableServer::POA_ptr,
467                                                   PortableServer::ObjectId *,
468                                                   const char *,
469                                                   const char *) =
470     (PortableServer::ObjectId * (*) (CORBA::ORB_ptr,
471                                      PortableServer::POA_ptr,
472                                      PortableServer::ObjectId *,
473                                      const char *,
474                                      const char *))
475     dlsym(handle, factory_name.c_str());
476
477   char *error ;
478   if ((error = dlerror()) != NULL){
479     // Try to load a sequential component
480     MESSAGE("[" << _numproc << "] Try to load a sequential component");
481     _numInstanceMutex.unlock() ;
482     iobject = Engines_Container_i::load_impl(nameToRegister,componentName);
483     if( CORBA::is_nil(iobject) ) return Engines::EngineComponent::_duplicate(iobject);
484   }
485   else{
486     // Instanciation du composant parallele
487     MESSAGE("[" << _numproc << "] Try to load a parallel component");
488     PortableServer::ObjectId * id = (MPIComponent_factory)
489       (_orb, _poa, _id, instanceName.c_str(), _nameToRegister.c_str());
490     // get reference from id
491     CORBA::Object_var o = _poa->id_to_reference(*id);
492     pobj = Engines::MPIObject::_narrow(o) ;
493     iobject = Engines::EngineComponent::_narrow(o) ;
494   }
495
496   if( _numproc == 0 ){
497     // utiliser + tard le registry ici :
498     // register the engine under the name containerName.dir/nameToRegister.object
499     std::string component_registerName = _containerName + "/" + _nameToRegister;
500     _NS->Register(iobject, component_registerName.c_str()) ;
501   }
502
503   _numInstanceMutex.unlock() ;
504
505   // Root recupere les ior des composants des autre process
506   BCastIOR(_orb,pobj,false);
507
508   END_OF("[" <<_numproc << "] MPIContainer_i::Lload_impl");
509   return Engines::EngineComponent::_duplicate(iobject);
510
511 }
512
513 void Engines_MPIContainer_i::remove_impl(Engines::EngineComponent_ptr component_i)
514 {
515   Engines::MPIObject_ptr pcptr;
516   Engines::MPIObject_ptr spcptr;
517
518   pthread_t *th = 0;
519   if(_numproc == 0){
520     pcptr = (Engines::MPIObject_ptr)component_i;
521     th = new pthread_t[_nbproc];
522     for(int ip=1;ip<_nbproc;ip++){
523       thread_st *st = new thread_st;
524       st->ip = ip;
525       st->tior = _tior;
526       spcptr = Engines::MPIObject::_narrow((*(pcptr->tior()))[ip]);
527       st->cptr = (Engines::EngineComponent_ptr)spcptr;
528       pthread_create(&(th[ip]),NULL,th_removeimpl,(void*)st);
529     }
530   }
531
532   ASSERT(! CORBA::is_nil(component_i));
533   std::string instanceName = component_i->instanceName() ;
534   MESSAGE("[" << _numproc << "] unload component " << instanceName);
535   _numInstanceMutex.lock() ; // lock on the remove on handle_map
536   _listInstances_map.erase(instanceName);
537   _numInstanceMutex.unlock() ;
538   component_i->destroy() ;
539   if(_numproc == 0)
540     _NS->Destroy_Name(instanceName.c_str());
541
542   if(_numproc == 0){
543     for(int ip=1;ip<_nbproc;ip++)
544       pthread_join(th[ip],NULL);
545     delete th;
546   }
547
548 }
549
550 void Engines_MPIContainer_i::finalize_removal()
551 {
552   pthread_t *th = 0;
553   if(_numproc == 0){
554     th = new pthread_t[_nbproc];
555     for(int ip=1;ip<_nbproc;ip++){
556       thread_st *st = new thread_st;
557       st->ip = ip;
558       st->tior = _tior;
559       pthread_create(&(th[ip]),NULL,th_finalizeremoval,(void*)st);
560     }
561   }
562
563   _numInstanceMutex.lock(); // lock to be alone
564   // (see decInstanceCnt, load_component_Library)
565   std::map<std::string, void *>::iterator ith;
566   for (ith = _toRemove_map.begin(); ith != _toRemove_map.end(); ith++)
567   {
568     void *handle = (*ith).second;
569     std::string impl_name= (*ith).first;
570     if (handle)
571     {
572       SCRUTE(handle);
573       SCRUTE(impl_name);
574       //        dlclose(handle);                // SALOME unstable after ...
575       //        _library_map.erase(impl_name);
576     }
577   }
578   _toRemove_map.clear();
579   _numInstanceMutex.unlock();
580
581   if(_numproc == 0){
582     for(int ip=1;ip<_nbproc;ip++)
583       pthread_join(th[ip],NULL);
584     delete th;
585   }
586 }
587
588 void *th_loadcomponentlibrary(void *s)
589 {
590   thread_st *st = (thread_st*)s;
591   char* reason;
592   (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->load_component_Library(st->compoName.c_str(),reason);
593   CORBA::string_free(reason);
594   return NULL;
595 }
596
597 void *th_createcomponentinstance(void *s)
598 {
599   thread_st *st = (thread_st*)s;
600   (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->create_component_instance(st->compoName.c_str());
601   return NULL;
602 }
603
604 void *th_loadimpl(void *s)
605 {
606   thread_st *st = (thread_st*)s;
607   (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->load_impl(st->nameToRegister.c_str(),st->compoName.c_str());
608   return NULL;
609 }
610
611 void *th_removeimpl(void *s)
612 {
613   thread_st *st = (thread_st*)s;
614   (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->remove_impl(st->cptr);
615   return NULL;
616 }
617
618 void *th_finalizeremoval(void *s)
619 {
620   thread_st *st = (thread_st*)s;
621   (Engines::MPIContainer::_narrow((*(st->tior))[st->ip]))->finalize_removal();
622   return NULL;
623 }