Salome HOME
Merge branch 'abn/mpi_imp'
[modules/med.git] / src / ParaMEDMEMComponent / ParaMEDMEMComponent_i.cxx
1 // Copyright (C) 2007-2015  CEA/DEN, EDF R&D
2 //
3 // This library is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU Lesser General Public
5 // License as published by the Free Software Foundation; either
6 // version 2.1 of the License, or (at your option) any later version.
7 //
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
11 // Lesser General Public License for more details.
12 //
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
16 //
17 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
18 //
19
20 #include "ParaMEDMEMComponent_i.hxx"
21 #include "utilities.h"
22 #include "Utils_SALOME_Exception.hxx"
23 using namespace std;
24 using namespace ParaMEDMEM;
25
26 typedef struct
27 {
28   bool exception;
29   string msg;
30 } except_st;
31
32 pthread_mutex_t m1 = PTHREAD_MUTEX_INITIALIZER;
33 pthread_mutex_t m2 = PTHREAD_MUTEX_INITIALIZER;
34
35 ParaMEDMEMComponent_i::ParaMEDMEMComponent_i()
36 {
37   _interface = new CommInterface;
38 }
39
40 ParaMEDMEMComponent_i::ParaMEDMEMComponent_i(CORBA::ORB_ptr orb,
41                                              PortableServer::POA_ptr poa, 
42                                              PortableServer::ObjectId * contId, 
43                                              const char *instanceName,
44                                              const char *interfaceName,
45                                              bool regist)
46   : Engines_Component_i(orb,poa,contId,instanceName,interfaceName,false,regist)
47 {
48   _interface = new CommInterface;
49 }
50
51 ParaMEDMEMComponent_i::~ParaMEDMEMComponent_i()
52 {
53   MESSAGE("* [" << _numproc << "] ParaMEDMEMComponent destructor");
54   delete _interface;
55   pthread_mutex_destroy (&m1);
56   pthread_mutex_destroy (&m2);
57 }
58
59 void ParaMEDMEMComponent_i::initializeCoupling(const char * coupling, const char * ior) throw(SALOME::SALOME_Exception)
60 {
61   int gsize, grank;
62   except_st *est;
63   void *ret_th;
64   pthread_t *th;
65   ostringstream msg;
66   
67   pthread_mutex_lock(&m1);
68   if(_numproc == 0)
69     {
70       th = new pthread_t[_nbproc];
71       for(int ip=1;ip<_nbproc;ip++)
72         {
73           thread_st *st = new thread_st;
74           st->ip = ip;
75           st->tior = _tior;
76           st->coupling = coupling;
77           st->ior = ior;
78           pthread_create(&(th[ip]),NULL,th_initializecoupling,(void*)st);
79         }
80     }
81
82   try{
83     string service = coupling;
84     if( service.size() == 0 )
85       throw SALOME_Exception("You have to give a service name !");
86     
87     if( _gcom.find(service) != _gcom.end() )
88       {
89         msg << "service " << service << " already exists !";
90         throw SALOME_Exception(msg.str().c_str());
91       }
92
93     // Connection to distributed parallel component
94 #ifdef HAVE_MPI2
95     remoteMPI2Connect(coupling);
96 #else
97     throw SALOME_Exception("You have to use a MPI2 compliant mpi implementation !");
98 #endif
99
100     MPI_Comm_size( _gcom[coupling], &gsize );
101     MPI_Comm_rank( _gcom[coupling], &grank );
102     MESSAGE("[" << grank << "] new communicator of " << gsize << " processes");
103
104     // Creation of processors group for ParaMEDMEM
105     // source is always the lower processor numbers
106     // target is always the upper processor numbers
107     if(_numproc==grank)
108       {
109         _source[coupling] = new MPIProcessorGroup(*_interface,0,_nbproc-1,_gcom[coupling]);
110         _target[coupling] = new MPIProcessorGroup(*_interface,_nbproc,gsize-1,_gcom[coupling]);
111         _commgroup[coupling] = _source[coupling];
112       }
113     else
114       {
115         _source[coupling] = new MPIProcessorGroup(*_interface,0,gsize-_nbproc-1,_gcom[coupling]);
116         _target[coupling] = new MPIProcessorGroup(*_interface,gsize-_nbproc,gsize-1,_gcom[coupling]);
117         _commgroup[coupling] = _target[coupling];
118       }
119     _connectto [coupling] = ior;
120     _dec[coupling] = NULL;
121     _dec_options[coupling] = NULL;
122     
123   }
124   catch(const std::exception &ex)
125     {
126       MESSAGE(ex.what());
127       THROW_SALOME_CORBA_EXCEPTION(ex.what(),SALOME::INTERNAL_ERROR);
128     }
129
130   pthread_mutex_unlock(&m1);
131   if(_numproc == 0)
132     {
133       for(int ip=1;ip<_nbproc;ip++)
134         {
135           pthread_join(th[ip],&ret_th);
136           est = (except_st*)ret_th;
137           if(est->exception)
138             {
139               msg << "[" << ip << "] " << est->msg;
140               THROW_SALOME_CORBA_EXCEPTION(msg.str().c_str(),SALOME::INTERNAL_ERROR);
141             }
142           delete est;
143         }
144       delete[] th;
145     }
146 }
147
148 void ParaMEDMEMComponent_i::terminateCoupling(const char * coupling) throw(SALOME::SALOME_Exception)
149 {
150   except_st *est;
151   void *ret_th;
152   pthread_t *th;
153   ostringstream msg;
154
155   pthread_mutex_lock(&m2);
156   if(_numproc == 0)
157     {
158       th = new pthread_t[_nbproc];
159       for(int ip=1;ip<_nbproc;ip++)
160         {
161           thread_st *st = new thread_st;
162           st->ip = ip;
163           st->tior = _tior;
164           st->coupling = coupling;
165           pthread_create(&(th[ip]),NULL,th_terminatecoupling,(void*)st);
166         }
167     }
168
169   try{
170     string service = coupling;
171     if( service.size() == 0 )
172       throw SALOME_Exception("You have to give a service name !");
173
174     if( _gcom.find(service) == _gcom.end() )
175       {
176         msg << "service " << service << " doesn't exist !";
177         throw SALOME_Exception(msg.str().c_str());
178       }
179
180     // Disconnection to distributed parallel component
181 #ifdef HAVE_MPI2
182     remoteMPI2Disconnect(coupling);
183 #else
184     throw SALOME_Exception("You have to use a MPI2 compliant mpi implementation !");
185 #endif
186
187     /* Processors groups and DEC destruction */
188     delete _source[coupling];
189     _source.erase(coupling);
190     delete _target[coupling];
191     _target.erase(coupling);
192     delete _dec[coupling];
193     _dec.erase(coupling);
194     _commgroup.erase(coupling);
195     if(_dec_options[coupling])
196       {
197         delete _dec_options[coupling];
198         _dec_options.erase(coupling);
199       }
200     _connectto.erase(coupling);
201   }
202   catch(const std::exception &ex)
203     {
204       MESSAGE(ex.what());
205       THROW_SALOME_CORBA_EXCEPTION(ex.what(),SALOME::INTERNAL_ERROR);
206     }
207   pthread_mutex_unlock(&m2);
208   if(_numproc == 0)
209     {
210       for(int ip=1;ip<_nbproc;ip++)
211         {
212           pthread_join(th[ip],&ret_th);
213           est = (except_st*)ret_th;
214           if(est->exception)
215             {
216               ostringstream msg;
217               msg << "[" << ip << "] " << est->msg;
218               THROW_SALOME_CORBA_EXCEPTION(msg.str().c_str(),SALOME::INTERNAL_ERROR);
219             }
220           delete est;
221         }
222       delete[] th;
223     }
224 }
225
226 void ParaMEDMEMComponent_i::setInterpolationOptions(const char * coupling,
227                                                     CORBA::Long print_level,
228                                                     const char * intersection_type,
229                                                     CORBA::Double precision,
230                                                     CORBA::Double median_plane,
231                                                     CORBA::Boolean do_rotate,
232                                                     CORBA::Double bounding_box_adjustment,
233                                                     CORBA::Double bounding_box_adjustment_abs,
234                                                     CORBA::Double max_distance_for_3Dsurf_intersect,
235                                                     CORBA::Long orientation,
236                                                     CORBA::Boolean measure_abs,
237                                                     const char * splitting_policy) throw(SALOME::SALOME_Exception)
238 {
239   except_st *est;
240   void *ret_th;
241   pthread_t *th;
242   ostringstream msg;
243
244   if(_numproc == 0)
245     {
246       th = new pthread_t[_nbproc];
247       for(int ip=1;ip<_nbproc;ip++)
248         {
249           thread_st *st = new thread_st;
250           st->ip = ip;
251           st->tior = _tior;
252           st->coupling = coupling;
253           st->print_level = print_level;
254           st->intersection_type = intersection_type;
255           st->precision = precision;
256           st->median_plane = median_plane;
257           st->do_rotate = do_rotate;
258           st->bounding_box_adjustment = bounding_box_adjustment;
259           st->bounding_box_adjustment_abs = bounding_box_adjustment_abs;
260           st->max_distance_for_3Dsurf_intersect = max_distance_for_3Dsurf_intersect;
261           st->orientation = orientation;
262           st->measure_abs = measure_abs;
263           st->splitting_policy = splitting_policy;
264           pthread_create(&(th[ip]),NULL,th_setinterpolationoptions,(void*)st);
265         }
266     }
267
268   if(!_dec_options[coupling])
269     _dec_options[coupling] = new INTERP_KERNEL::InterpolationOptions;
270
271   bool ret = _dec_options[coupling]->setInterpolationOptions(print_level,
272                                                              intersection_type,
273                                                              precision,
274                                                              median_plane,
275                                                              do_rotate,
276                                                              bounding_box_adjustment,
277                                                              bounding_box_adjustment_abs,
278                                                              max_distance_for_3Dsurf_intersect,
279                                                              orientation,
280                                                              measure_abs,
281                                                              splitting_policy );
282
283   if(!ret)
284     {
285       MESSAGE("Error on setting interpolation options");
286       THROW_SALOME_CORBA_EXCEPTION("Error on setting interpolation options",SALOME::INTERNAL_ERROR);
287     }
288   
289   if(_numproc == 0)
290     {
291       for(int ip=1;ip<_nbproc;ip++)
292         {
293           pthread_join(th[ip],&ret_th);
294           est = (except_st*)ret_th;
295           if(est->exception)
296             {
297               msg << "[" << ip << "] " << est->msg;
298               THROW_SALOME_CORBA_EXCEPTION(msg.str().c_str(),SALOME::INTERNAL_ERROR);
299             }
300           delete est;
301         }
302       delete[] th;
303     }
304 }
305
306 void ParaMEDMEMComponent_i::_setInputField(SALOME_MED::MPIMEDCouplingFieldDoubleCorbaInterface_ptr fieldptr, MEDCouplingFieldDouble *field)
307 {
308   int grank;
309   except_st *est;
310   void *ret_th;
311   pthread_t th;
312   ostringstream msg;
313   string coupling;
314   
315   std::map<std::string,std::string>::const_iterator it = mapSearchByValue(_connectto, fieldptr->getRef());
316   if(it != _connectto.end())
317     coupling = (*it).first.c_str();
318   else
319     throw SALOME_Exception("Reference of remote component doesn't find in connectto map !");
320
321   if(_numproc == 0)
322     {
323       thread_st *st = new thread_st;
324       st->fieldptr = fieldptr;
325       st->coupling = coupling;
326       pthread_create(&th,NULL,th_getdata,(void*)st);
327     }
328
329   if( coupling.size() == 0 )
330     throw SALOME_Exception("You have to give a service name !");
331
332   if( _gcom.find(coupling) == _gcom.end() )
333     {
334       msg << "service " << coupling << " doesn't exist !";
335       throw SALOME_Exception(msg.str().c_str());
336     }
337
338   if(!_dec[coupling])
339     {
340
341       MPI_Comm_rank( _gcom[coupling], &grank );
342
343       // Creating the intersection Data Exchange Channel
344       // Processors which received the field are always the second argument of InterpKernelDEC object
345       if(_numproc==grank)
346         _dec[coupling] = new InterpKernelDEC(*_target[coupling], *_source[coupling]);
347       else
348         _dec[coupling] = new InterpKernelDEC(*_source[coupling], *_target[coupling]);
349
350       if(_dec_options[coupling])
351         _dec[coupling]->copyOptions(*(_dec_options[coupling]));
352       
353       //Attaching the field to the DEC
354       _dec[coupling]->attachLocalField(field);
355
356       // computing the interpolation matrix
357       _dec[coupling]->synchronize();
358
359     }
360   else
361     //Attaching the field to the DEC
362     _dec[coupling]->attachLocalField(field);
363   
364   //Receiving data
365   _dec[coupling]->recvData();
366
367   if(_numproc == 0)
368     {
369       pthread_join(th,&ret_th);
370       est = (except_st*)ret_th;
371       if(est->exception)
372         throw SALOME_Exception(est->msg.c_str());
373       delete est;
374     }
375
376 }
377
378 void ParaMEDMEMComponent_i::_getOutputField(const char * coupling, MEDCouplingFieldDouble *field)
379 {
380   int grank;
381   string service = coupling;
382   ostringstream msg;
383
384   if( service.size() == 0 )
385     throw SALOME_Exception("You have to give a service name !");
386
387   if( _gcom.find(service) == _gcom.end() )
388     {
389       msg << "service " << service << " doesn't exist !";
390       throw SALOME_Exception(msg.str().c_str());
391     }
392
393   if(!_dec[coupling])
394     {
395
396       MPI_Comm_rank( _gcom[coupling], &grank );
397
398       // Creating the intersection Data Exchange Channel
399       // Processors which sent the field are always the first argument of InterpKernelDEC object
400       if(_numproc==grank)
401         _dec[coupling] = new InterpKernelDEC(*_source[coupling], *_target[coupling]);
402       else
403         _dec[coupling] = new InterpKernelDEC(*_target[coupling], *_source[coupling]);
404   
405       if(_dec_options[coupling])
406         _dec[coupling]->copyOptions(*(_dec_options[coupling]));
407       
408       //Attaching the field to the DEC
409       _dec[coupling]->attachLocalField(field);
410     
411       // computing the interpolation matrix
412       _dec[coupling]->synchronize();
413     }
414   else
415     //Attaching the field to the DEC
416     _dec[coupling]->attachLocalField(field);
417
418   //Sending data
419   _dec[coupling]->sendData();
420 }
421
422 void ParaMEDMEMComponent_i::_initializeCoupling(SALOME_MED::MPIMEDCouplingFieldDoubleCorbaInterface_ptr fieldptr)
423 {
424   except_st *est;
425   void *ret_th;
426   pthread_t *th;
427   //this string specifies the coupling
428   string coupling;
429   //getting IOR string of the remote object
430   string rcompo = fieldptr->getRef();
431   if(_numproc == 0){
432     //getting IOR string of the local object
433     CORBA::Object_var my_ref = _poa->servant_to_reference (_thisObj);
434     string lcompo = _orb->object_to_string(my_ref);
435     //the component does not communicate with itself, a connection is required
436     if( rcompo.find(lcompo) == std::string::npos ){
437       th = new pthread_t[1];
438       //finding the IOR of the remote object in the map
439       std::map<std::string,std::string>::const_iterator it = mapSearchByValue(_connectto, rcompo);
440       //if it is not found : connecting two objects : this is the first (and the only) connection between these objects
441       if (it == _connectto.end()){
442         //generating the coupling string : concatenation of two IOR strings
443         coupling = lcompo + rcompo;
444
445         //initializing the coupling on the remote object in a thread
446         thread_st *st = new thread_st;
447         CORBA::Object_var obj = _orb->string_to_object (rcompo.c_str());
448         SALOME_MED::ParaMEDMEMComponent_var compo = SALOME_MED::ParaMEDMEMComponent::_narrow(obj);
449         st->compo = compo._retn();
450         st->coupling = coupling;
451         st->ior = lcompo;
452         
453         pthread_create(&(th[0]),NULL,th_initializecouplingdist,(void*)st);
454         
455         //initializing the coupling on the local object
456         initializeCoupling (coupling.c_str(), rcompo.c_str());
457         pthread_join (th[0], &ret_th); 
458         est = (except_st*)ret_th;
459         if(est->exception)
460           THROW_SALOME_CORBA_EXCEPTION(est->msg.c_str(),SALOME::INTERNAL_ERROR);
461         delete est;
462         delete[] th;
463       }
464     }
465   }
466 }
467
468 std::map<std::string,std::string>::const_iterator ParaMEDMEMComponent_i::mapSearchByValue(std::map<std::string,std::string> & search_map, std::string search_val)
469 {
470   std::map<std::string,std::string>::const_iterator iRet = search_map.end();
471   for (std::map<std::string,std::string>::const_iterator iTer = search_map.begin(); iTer != search_map.end(); iTer ++)
472     {
473       if( iTer->second.find(search_val) != std::string::npos )
474         {
475           iRet = iTer;
476           break;
477         }
478     }
479   return iRet;
480 }
481
482 bool ParaMEDMEMComponent_i::amICoupledWithThisComponent(const char* cref)
483 {
484   std::map<std::string,std::string>::const_iterator it = mapSearchByValue(_connectto, cref);
485   if(it != _connectto.end())
486     return true;
487   else
488     return false;
489 }
490
491 void *th_setinterpolationoptions(void *s)
492 {
493   ostringstream msg;
494   thread_st *st = (thread_st*)s;
495   except_st *est = new except_st;
496   est->exception = false;
497   try
498     {
499       SALOME_MED::ParaMEDMEMComponent_var compo=SALOME_MED::ParaMEDMEMComponent::_narrow((*(st->tior))[st->ip]);
500       compo->setInterpolationOptions(st->coupling.c_str(),
501                                      st->print_level,
502                                      st->intersection_type,
503                                      st->precision,
504                                      st->median_plane,
505                                      st->do_rotate,
506                                      st->bounding_box_adjustment,
507                                      st->bounding_box_adjustment_abs,
508                                      st->max_distance_for_3Dsurf_intersect,
509                                      st->orientation,
510                                      st->measure_abs,
511                                      st->splitting_policy);
512     }
513   catch(const SALOME::SALOME_Exception &ex)
514     {
515       est->exception = true;
516       est->msg = ex.details.text;
517     }
518   catch(const CORBA::Exception &ex)
519     {
520       est->exception = true;
521       msg << "CORBA::Exception: " << ex;
522       est->msg = msg.str();
523     }
524   delete st;
525   return((void*)est);
526 }
527
528 void *th_initializecoupling(void *s)
529 {
530   ostringstream msg;
531   thread_st *st = (thread_st*)s;
532   except_st *est = new except_st;
533   est->exception = false;
534
535   try
536     {
537       SALOME_MED::ParaMEDMEMComponent_var compo=SALOME_MED::ParaMEDMEMComponent::_narrow((*(st->tior))[st->ip]);
538       compo->initializeCoupling(st->coupling.c_str(),st->ior.c_str());
539     }
540   catch(const SALOME::SALOME_Exception &ex)
541     {
542       est->exception = true;
543       est->msg = ex.details.text;
544     }
545   catch(const CORBA::Exception &ex)
546     {
547       est->exception = true;
548       msg << "CORBA::Exception: " << ex;
549       est->msg = msg.str();
550     }
551   delete st;
552   return((void*)est);
553 }
554
555 void *th_terminatecoupling(void *s)
556 {
557   ostringstream msg;
558   thread_st *st = (thread_st*)s;
559   except_st *est = new except_st;
560   est->exception = false;
561
562   try
563     {
564       SALOME_MED::ParaMEDMEMComponent_var compo=SALOME_MED::ParaMEDMEMComponent::_narrow((*(st->tior))[st->ip]);
565       compo->terminateCoupling(st->coupling.c_str());
566     }
567   catch(const SALOME::SALOME_Exception &ex)
568     {
569       est->exception = true;
570       est->msg = ex.details.text;
571     }
572   catch(const CORBA::Exception &ex)
573     {
574       est->exception = true;
575       msg << "CORBA::Exception: " << ex;
576       est->msg = msg.str();
577     }
578   delete st;
579   return((void*)est);
580 }
581
582 void *th_getdata(void *s)
583 {
584   ostringstream msg;
585   thread_st *st = (thread_st*)s;
586   except_st *est = new except_st;
587   est->exception = false;
588
589   try
590     {
591       st->fieldptr->getDataByMPI(st->coupling.c_str());
592     }
593   catch(const SALOME::SALOME_Exception &ex)
594     {
595       est->exception = true;
596       est->msg = ex.details.text;
597     }
598   catch(const CORBA::Exception &ex)
599     {
600       est->exception = true;
601       msg << "CORBA::Exception: " << ex;
602       est->msg = msg.str();
603     }
604   delete st;
605   return((void*)est);
606 }
607
608 void *th_initializecouplingdist(void *s)
609 {
610   ostringstream msg;
611   thread_st *st = (thread_st*)s;
612   except_st *est = new except_st;
613   est->exception = false;
614
615   try
616     {
617       st->compo->initializeCoupling(st->coupling.c_str(), st->ior.c_str());
618     }
619   catch(const SALOME::SALOME_Exception &ex)
620     {
621       est->exception = true;
622       est->msg = ex.details.text;
623     }
624   catch(const CORBA::Exception &ex)
625     {
626       est->exception = true;
627       msg << "CORBA::Exception: " << ex;
628       est->msg = msg.str();
629     }
630   delete st;
631   return((void*)est);
632 }
633
634 void *th_terminatecouplingdist(void *s)
635 {
636   ostringstream msg;
637   thread_st *st = (thread_st*)s;
638   except_st *est = new except_st;
639   est->exception = false;
640
641   try
642     {
643       st->compo->terminateCoupling(st->coupling.c_str());
644     }
645   catch(const SALOME::SALOME_Exception &ex)
646     {
647       est->exception = true;
648       est->msg = ex.details.text;
649     }
650   catch(const CORBA::Exception &ex)
651     {
652       est->exception = true;
653       msg << "CORBA::Exception: " << ex;
654       est->msg = msg.str();
655     }
656   delete st;
657   return((void*)est);
658 }