Salome HOME
Fix compilation pb.
[modules/kernel.git] / src / ParallelContainer / SALOME_ParallelComponent_i.cxx
1 //  SALOME_ParallelComponent : implementation of container and engine for Parallel Kernel
2 //
3 //  Copyright (C) 2007 OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 //  CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS 
5 // 
6 //  This library is free software; you can redistribute it and/or 
7 //  modify it under the terms of the GNU Lesser General Public 
8 //  License as published by the Free Software Foundation; either 
9 //  version 2.1 of the License. 
10 // 
11 //  This library is distributed in the hope that it will be useful, 
12 //  but WITHOUT ANY WARRANTY; without even the implied warranty of 
13 //  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
14 //  Lesser General Public License for more details. 
15 // 
16 //  You should have received a copy of the GNU Lesser General Public 
17 //  License along with this library; if not, write to the Free Software 
18 //  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA 
19 // 
20 //  See http://www.opencascade.org/SALOME/ or email : webmaster.salome@opencascade.org 
21 //
22 //  File   : SALOME_ParallelComponent_i.cxx
23 //  Author : AndrĂ© RIBES, EDF
24 //  Author : Paul RASCLE, EDF - MARC TAJCHMAN, CEA
25
26 #include "SALOME_ParallelComponent_i.hxx"
27 #include "SALOME_ParallelContainer_i.hxx"
28
29 #include "OpUtil.hxx"
30 #include <stdio.h>
31 #ifndef WNT
32 #include <dlfcn.h>
33 #endif
34 #include <cstdlib>
35 #include "utilities.h"
36
37 #ifndef WNT
38 #include <sys/time.h>
39 #include <sys/resource.h>
40 #include <unistd.h>
41 #else
42 #include <sys/timeb.h>
43 int SIGUSR11 = 1000;
44 #endif
45
46 #include <paco_dummy.h>
47
48 using namespace std;
49
50 extern bool _Sleeping ;
51 static Engines_Parallel_Component_i * theEngines_Component ;
52
53 bool Engines_Parallel_Component_i::_isMultiStudy = true;
54 bool Engines_Parallel_Component_i::_isMultiInstance = false;
55
56 //=============================================================================
57 /*! 
58  *  Default constructor, not for use
59  */
60 //=============================================================================
61
62 Engines_Parallel_Component_i::Engines_Parallel_Component_i(CORBA::ORB_ptr orb, char * ior) : 
63   InterfaceParallel_impl(orb,ior), Engines::Component_serv(orb,ior), Engines::Parallel_Component_serv(orb,ior)
64 {
65   //ASSERT(0);
66   INFOS("Default Constructor...");
67 }
68
69 //=============================================================================
70 /*! 
71  *  Standard Constructor for generic Component, used in derived class
72  *  Connection to Registry and Notification
73  *  \param orb Object Request broker given by Container
74  *  \parap poa Portable Object Adapter from Container (normally root_poa)
75  *  \param contId container CORBA id inside the server
76  *  \param instanceName unique instance name for this object (see Container_i)
77  *  \param interfaceName component class name
78  *  \param notif use of notification
79  */
80 //=============================================================================
81
82 Engines_Parallel_Component_i::Engines_Parallel_Component_i(CORBA::ORB_ptr orb, char * ior,
83                                          PortableServer::POA_ptr poa, 
84                                          PortableServer::ObjectId * contId, 
85                                          const char *instanceName,
86                                          const char *interfaceName,
87                                          bool notif) :
88   InterfaceParallel_impl(orb,ior), 
89   Engines::Component_serv(orb,ior),
90   Engines::Parallel_Component_serv(orb,ior),
91   _instanceName(instanceName),
92   _interfaceName(interfaceName),
93   _myConnexionToRegistry(0),
94   _ThreadId(0) ,
95   _ThreadCpuUsed(0) ,
96   _Executed(false) ,
97   _graphName("") ,
98   _nodeName(""),
99  _studyId(-1)
100 {
101   MESSAGE("Component constructor with instanceName "<< _instanceName);
102   //SCRUTE(pd_refCount);
103   _orb = CORBA::ORB::_duplicate(orb);
104   _poa = PortableServer::POA::_duplicate(poa);
105   _contId = contId ;
106   CORBA::Object_var o = _poa->id_to_reference(*contId); // container ior...
107   const CORBA::String_var the_ior = _orb->object_to_string(o);
108   _myConnexionToRegistry = new RegistryConnexion(0, 0, the_ior,"theSession",
109                                                  _instanceName.c_str());
110
111   _notifSupplier = new NOTIFICATION_Supplier(instanceName, notif);
112
113   deploy_mutex = new pthread_mutex_t();
114   pthread_mutex_init(deploy_mutex, NULL);
115   _proxy = NULL;
116  //SCRUTE(pd_refCount);
117 }
118
119 //=============================================================================
120 /*! 
121  *  Destructor: call Container for decrement of instances count.
122  *  When instances count falls to 0, the container tries to remove the
123  *  component library (dlclose)
124  */
125 //=============================================================================
126
127 Engines_Parallel_Component_i::~Engines_Parallel_Component_i()
128 {
129   MESSAGE("Component destructor");
130   Engines_Parallel_Container_i::decInstanceCnt(_interfaceName);
131   pthread_mutex_destroy(deploy_mutex);
132   delete deploy_mutex;
133
134   if (_proxy)
135     delete _proxy;
136 }
137
138 //=============================================================================
139 /*! 
140  *  CORBA method: return name of the instance, unique in this Container
141  */
142 //=============================================================================
143
144 char* Engines_Parallel_Component_i::instanceName()
145 {
146    return CORBA::string_dup(_instanceName.c_str()) ;
147 }
148
149 //=============================================================================
150 /*! 
151  *  CORBA method: return name of the component class
152  */
153 //=============================================================================
154
155 char* Engines_Parallel_Component_i::interfaceName()
156 {
157   return CORBA::string_dup(_interfaceName.c_str()) ;
158 }
159
160 //=============================================================================
161 /*! 
162  *  CORBA method: Get study Id
163  *  \return -1: not initialised (Internal Error)
164  *           0: multistudy component instance
165  *          >0: study id associated to this instance
166  */
167 //=============================================================================
168
169 CORBA::Long Engines_Parallel_Component_i::getStudyId()
170 {
171   return _studyId;
172 }
173
174 //=============================================================================
175 /*! 
176  *  CORBA method: Test if instance is alive and responds
177  */
178 //=============================================================================
179
180 void Engines_Parallel_Component_i::ping()
181 {
182   //  MESSAGE("Engines_Parallel_Component_i::ping_c() pid "<< getpid() << " threadid " << pthread_self());
183 }
184
185 //=============================================================================
186 /*! 
187  *  CORBA method: Deactivate this instance. CORBA object is deactivated (do not
188  *  respond any more to CORBA calls), the connection to Regsitry is removed
189  *  (Registry informed of deactivation), internal server reference counter on
190  *  the derived servant class is decremented, to allow destruction of the class
191  *  (delete) by POA, when there are no more references.
192  *  -- TO BE USED BY CONTAINER ONLY (Container housekeeping) --
193  */
194 //=============================================================================
195
196 void Engines_Parallel_Component_i::destroy()
197 {
198   MESSAGE("Engines_Parallel_Component_i::destroy()");
199   //SCRUTE(pd_refCount);
200
201   delete _notifSupplier;
202   _notifSupplier = 0;
203
204   delete _myConnexionToRegistry;
205   _myConnexionToRegistry = 0 ;
206   _poa->deactivate_object(*_id) ;
207   CORBA::release(_poa) ;
208   delete(_id) ;
209   //SCRUTE(pd_refCount);
210   _thisObj->_remove_ref();
211   //SCRUTE(pd_refCount);
212   MESSAGE("Engines_Parallel_Component_i::destroyed") ;
213 }
214
215 //=============================================================================
216 /*! 
217  *  CORBA method: return CORBA reference of the Container
218  *
219  */
220 //=============================================================================
221
222 Engines::Container_ptr Engines_Parallel_Component_i::GetContainerRef()
223 {
224   MESSAGE("Engines_Parallel_Component_i::GetContainerRef");
225   CORBA::Object_ptr o = _poa->id_to_reference(*_contId) ;
226   return Engines::Container::_narrow(o);
227 }
228
229 //=============================================================================
230 /*! 
231  *  CORBA method: 
232  *  Gives a sequence of (key=string,value=any) to the component. 
233  *  Base class component stores the sequence in a map.
234  *  The map is cleared before.
235  *  This map is for use by derived classes. 
236  *  \param dico sequence of (key=string,value=any)
237  */
238 //=============================================================================
239
240 void Engines_Parallel_Component_i::setProperties(const Engines::FieldsDict& dico)
241 {
242   _fieldsDict.clear();
243   for (CORBA::ULong i=0; i<dico.length(); i++)
244     {
245       std::string cle(dico[i].key);
246       _fieldsDict[cle] = dico[i].value;
247     }
248 }
249
250 //=============================================================================
251 /*! 
252  *  CORBA method: 
253  *  returns a previously stored map (key=string,value=any) as a sequence.
254  *  (see setProperties)
255  */
256 //=============================================================================
257
258 Engines::FieldsDict* Engines_Parallel_Component_i::getProperties()
259 {
260   Engines::FieldsDict_var copie = new Engines::FieldsDict;
261   copie->length(_fieldsDict.size());
262   map<std::string,CORBA::Any>::iterator it;
263   CORBA::ULong i = 0;
264   for (it = _fieldsDict.begin(); it != _fieldsDict.end(); it++, i++)
265     {
266       std::string cle((*it).first);
267       copie[i].key = CORBA::string_dup(cle.c_str());
268       copie[i].value = _fieldsDict[cle];
269     }
270   return copie._retn();
271 }
272
273 //=============================================================================
274 /*! 
275  *  CORBA method: used by Supervision to give names to this instance
276  */
277 //=============================================================================
278
279 void Engines_Parallel_Component_i::Names( const char * graphName ,
280                                  const char * nodeName )
281 {
282   _graphName = graphName ;
283   _nodeName = nodeName ;
284   INFOS("Engines_Parallel_Component_i::Names( '" << _graphName << "' , '"
285         << _nodeName << "' )");
286 }
287
288 //=============================================================================
289 /*! 
290  *  CORBA method: used in Supervision
291  */
292 //=============================================================================
293
294 bool Engines_Parallel_Component_i::Kill_impl() 
295 {
296 //  MESSAGE("Engines_Parallel_Component_i::Kill_i() pthread_t "<< pthread_self()
297 //          << " pid " << getpid() << " instanceName "
298 //          << _instanceName.c_str() << " interface " << _interfaceName.c_str()
299 //          << " machineName " << GetHostname().c_str()<< " _id " << hex << _id
300 //          << dec << " _ThreadId " << _ThreadId << " this " << hex << this
301 //          << dec ) ;
302
303   bool RetVal = false ;
304 #ifndef WNT
305   if ( _ThreadId > 0 && pthread_self() != _ThreadId )
306     {
307       RetVal = Killer( _ThreadId , 0 ) ;
308       _ThreadId = (pthread_t ) -1 ;
309     }
310
311 #else
312   if ( _ThreadId > 0 && pthread_self().p != _ThreadId->p )
313     {
314       RetVal = Killer( *_ThreadId , 0 ) ;
315       _ThreadId = (pthread_t* ) 0 ;
316     }
317
318 #endif
319   return RetVal ;
320 }
321
322 //=============================================================================
323 /*! 
324  *  CORBA method: used in Supervision
325  */
326 //=============================================================================
327
328 bool Engines_Parallel_Component_i::Stop_impl()
329 {
330   MESSAGE("Engines_Parallel_Component_i::Stop_i() pthread_t "<< pthread_self()
331           << " pid " << getpid() << " instanceName "
332           << _instanceName.c_str() << " interface " << _interfaceName.c_str()
333           << " machineName " << GetHostname().c_str()<< " _id " << hex << _id
334           << dec << " _ThreadId " << _ThreadId );
335   
336
337   bool RetVal = false ;
338 #ifndef WNT
339   if ( _ThreadId > 0 && pthread_self() != _ThreadId )
340     {
341       RetVal = Killer( _ThreadId , 0 ) ;
342       _ThreadId = (pthread_t ) -1 ;
343     }
344 #else
345   if ( _ThreadId > 0 && pthread_self().p != _ThreadId->p )
346     {
347       RetVal = Killer( *_ThreadId , 0 ) ;
348       _ThreadId = (pthread_t* ) 0 ;
349     }
350 #endif
351   return RetVal ;
352 }
353
354 //=============================================================================
355 /*! 
356  *  CORBA method: used in Supervision
357  */
358 //=============================================================================
359
360 bool Engines_Parallel_Component_i::Suspend_impl()
361 {
362   MESSAGE("Engines_Parallel_Component_i::Suspend_i() pthread_t "<< pthread_self()
363           << " pid " << getpid() << " instanceName "
364           << _instanceName.c_str() << " interface " << _interfaceName.c_str()
365           << " machineName " << GetHostname().c_str()<< " _id " << hex << _id
366           << dec << " _ThreadId " << _ThreadId );
367
368   bool RetVal = false ;
369 #ifndef WNT
370   if ( _ThreadId > 0 && pthread_self() != _ThreadId )
371 #else
372   if ( _ThreadId > 0 && pthread_self().p != _ThreadId->p )
373 #endif
374     {
375       if ( _Sleeping )
376         {
377           return false ;
378         }
379     else 
380       {
381 #ifndef WNT
382         RetVal = Killer( _ThreadId ,SIGINT ) ;
383 #else
384         RetVal = Killer( *_ThreadId ,SIGINT ) ;
385 #endif
386         //if ( RetVal ) _Sleeping = true;
387
388       }
389     }
390   return RetVal ;
391 }
392
393 //=============================================================================
394 /*! 
395  *  CORBA method: used in Supervision
396  */
397 //=============================================================================
398
399 bool Engines_Parallel_Component_i::Resume_impl()
400 {
401   MESSAGE("Engines_Parallel_Component_i::Resume_i() pthread_t "<< pthread_self()
402           << " pid " << getpid() << " instanceName "
403           << _instanceName.c_str() << " interface " << _interfaceName.c_str()
404           << " machineName " << GetHostname().c_str()<< " _id " << hex << _id
405           << dec << " _ThreadId " << _ThreadId );
406   bool RetVal = false ;
407 #ifndef WNT
408   if ( _ThreadId > 0 && pthread_self() != _ThreadId )
409 #else
410   if ( _ThreadId > 0 && pthread_self().p != _ThreadId->p )
411 #endif
412     {
413     if ( _Sleeping ) 
414       {
415         _Sleeping = false ;
416         RetVal = true ;
417       }
418     else
419       {
420         RetVal = false ;
421       }
422     }
423   return RetVal ;
424 }
425
426 //=============================================================================
427 /*! 
428  *  CORBA method: 
429  */
430 //=============================================================================
431
432 CORBA::Long Engines_Parallel_Component_i::CpuUsed_impl()
433 {
434   long cpu = 0 ;
435   if ( _ThreadId || _Executed )
436     {
437     if ( _ThreadId > 0 )
438       {
439 #ifndef WNT
440       if ( pthread_self() != _ThreadId )
441 #else
442       if ( pthread_self().p != _ThreadId->p )
443 #endif
444         {
445         if ( _Sleeping )
446           {
447           }
448         else
449           {
450             // Get Cpu in the appropriate thread with that object !...
451             theEngines_Component = this ;
452 #ifndef WNT
453             Killer( _ThreadId ,SIGUSR1 ) ;
454 #else
455             Killer( *_ThreadId ,SIGUSR11 ) ;
456 #endif
457           }
458         cpu = _ThreadCpuUsed ;
459         }
460       else
461         {
462           _ThreadCpuUsed = CpuUsed() ;
463           cpu = _ThreadCpuUsed ;
464           // cout << pthread_self() << " Engines_Parallel_Component_i::CpuUsed_impl "
465           //      << _serviceName << " " << cpu << endl ;
466       }
467     }
468     else 
469       {
470         cpu = _ThreadCpuUsed ;
471         // cout << pthread_self() << " Engines_Parallel_Component_i::CpuUsed_impl "
472         //      << _serviceName << " " << cpu<< endl ;
473       }
474     }
475   else
476     {
477       // cout<< pthread_self()<<"Engines_Parallel_Component_i::CpuUsed_impl _ThreadId "
478       //     <<_ThreadId <<" "<<_serviceName<<" _StartUsed "<<_StartUsed<<endl;
479     }
480   return cpu ;
481 }
482
483
484 //=============================================================================
485 /*! 
486  *  C++ method: return Container Servant
487  */
488 //=============================================================================
489
490 Engines_Parallel_Container_i *Engines_Parallel_Component_i::GetContainerPtr()
491 {
492   return dynamic_cast<Engines_Parallel_Container_i*>(_poa->id_to_servant(*_contId)) ;
493 }
494
495 //=============================================================================
496 /*! 
497  *  C++ method: set study Id
498  *  \param studyId         0 if instance is not associated to a study, 
499  *                         >0 otherwise (== study id)
500  *  \return true if the set of study Id is OK
501  *  must be set once by Container, at instance creation,
502  *  and cannot be changed after.
503  */
504 //=============================================================================
505
506 CORBA::Boolean Engines_Parallel_Component_i::setStudyId(CORBA::Long studyId)
507 {
508   ASSERT( studyId >= 0);
509   CORBA::Boolean ret = false;
510   if (_studyId < 0) // --- not yet initialized 
511     {
512       _studyId = studyId;
513       ret = true;
514     }
515   else
516     if ( _studyId == studyId) ret = true;
517   return ret;
518 }
519
520 //=============================================================================
521 /*! 
522  *  C++ method: return CORBA instance id, the id is set in derived class
523  *  constructor, when instance is activated.
524  */
525 //=============================================================================
526
527 PortableServer::ObjectId * Engines_Parallel_Component_i::getId()
528 {
529 //  MESSAGE("PortableServer::ObjectId * Engines_Parallel_Component_i::getId()");
530   return _id ;
531 }
532
533 //=============================================================================
534 /*! 
535  *  C++ method: used by derived classes for supervision
536  */
537 //=============================================================================
538
539 void Engines_Parallel_Component_i::beginService(const char *serviceName)
540 {
541   MESSAGE(pthread_self() << "Send BeginService notification for " <<serviceName
542           << endl << "Component instance : " << _instanceName << endl << endl);
543 #ifndef WNT
544   _ThreadId = pthread_self() ;
545 #else
546   _ThreadId = new pthread_t;
547   _ThreadId->p = pthread_self().p ;
548   _ThreadId->x = pthread_self().x ;
549 #endif
550   _StartUsed = 0 ;
551   _StartUsed = CpuUsed_impl() ;
552   _ThreadCpuUsed = 0 ;
553   _Executed = true ;
554   _serviceName = serviceName ;
555   if ( pthread_setcanceltype( PTHREAD_CANCEL_ASYNCHRONOUS , NULL ) )
556     {
557       perror("pthread_setcanceltype ") ;
558       exit(0) ;
559     }
560   if ( pthread_setcancelstate( PTHREAD_CANCEL_ENABLE , NULL ) )
561     {
562       perror("pthread_setcancelstate ") ;
563       exit(0) ;
564     }
565 //  MESSAGE(pthread_self() << " Return from BeginService for " << serviceName
566 //          << " ThreadId " << _ThreadId << " StartUsed " << _StartUsed
567 //          << " _graphName " << _graphName << " _nodeName " << _nodeName );
568
569   // --- for supervisor : all strings given with setProperties
570   //     are set in environment
571   bool overwrite = true;
572   map<std::string,CORBA::Any>::iterator it;
573   for (it = _fieldsDict.begin(); it != _fieldsDict.end(); it++)
574     {
575       std::string cle((*it).first);
576       if ((*it).second.type()->kind() == CORBA::tk_string)
577         {
578           const char* value;
579           (*it).second >>= value;
580           // ---todo: replace __GNUC__ test by an autoconf macro AC_CHECK_FUNC.
581 #if defined __GNUC__
582           //int ret = setenv(cle.c_str(), value, overwrite);
583           setenv(cle.c_str(), value, overwrite);
584 #else
585           //CCRT porting : setenv not defined in stdlib.h
586           std::string s(cle);
587           s+='=';
588           s+=value;
589           // char* cast because 1st arg of linux putenv function
590           // is not a const char* !
591           //int ret=putenv((char *)s.c_str());
592           putenv((char *)s.c_str());
593           //End of CCRT porting
594 #endif
595           MESSAGE("--- setenv: "<<cle<<" = "<< value);
596         }
597     }
598 }
599
600 //=============================================================================
601 /*! 
602  *  C++ method: used by derived classes for supervision
603  */
604 //=============================================================================
605
606 void Engines_Parallel_Component_i::endService(const char *serviceName)
607 {
608   _ThreadCpuUsed = CpuUsed_impl() ;
609   MESSAGE(pthread_self() << " Send EndService notification for " << serviceName
610           << endl << " Component instance : " << _instanceName << " StartUsed "
611           << _StartUsed << " _ThreadCpuUsed "<< _ThreadCpuUsed << endl <<endl);
612   _ThreadId = 0 ;
613 }
614
615 //=============================================================================
616 /*! 
617  *  C++ method: -- CHECK IF USED --
618  */
619 //=============================================================================
620
621 char* Engines_Parallel_Component_i::graphName()
622 {
623   return CORBA::string_dup( _graphName.c_str() ) ;
624 }
625
626 //=============================================================================
627 /*! 
628  *  C++ method: -- CHECK IF USED --
629  */
630 //=============================================================================
631
632 char* Engines_Parallel_Component_i::nodeName()
633 {
634   return CORBA::string_dup( _nodeName.c_str() ) ;
635 }
636
637 //=============================================================================
638 /*! 
639  *  C++ method: used in Supervision (see kill_impl)
640  */
641 //=============================================================================
642
643 bool Engines_Parallel_Component_i::Killer( pthread_t ThreadId , int signum )
644 {
645 #ifndef WNT
646   if ( ThreadId )
647 #else
648   if ( ThreadId.p )
649 #endif
650     {
651       if ( signum == 0 )
652         {
653           if ( pthread_cancel( ThreadId ) )
654             {
655               perror("Killer pthread_cancel error") ;
656               return false ;
657             }
658           else
659             {
660               MESSAGE(pthread_self() << "Killer : ThreadId " << ThreadId
661                       << " pthread_canceled") ;
662             }
663         }
664       else
665         {
666           if ( pthread_kill( ThreadId , signum ) == -1 )
667             {
668               perror("Killer pthread_kill error") ;
669               return false ;
670             }
671           else 
672             {
673               MESSAGE(pthread_self() << "Killer : ThreadId " << ThreadId
674                       << " pthread_killed(" << signum << ")") ;
675             }
676         }
677     }
678   return true ;
679 }
680
681 //=============================================================================
682 /*! 
683  *  C++ method:
684  */ 
685 //=============================================================================
686
687 void SetCpuUsed()
688 {
689   theEngines_Component->SetCurCpu() ;
690 }
691
692 //=============================================================================
693 /*! 
694  *  C++ method:
695  */
696 //=============================================================================
697
698 void Engines_Parallel_Component_i::SetCurCpu()
699 {
700   _ThreadCpuUsed =  CpuUsed() ;
701   //  MESSAGE(pthread_self() << 
702   //  " Engines_Parallel_Component_i::SetCurCpu() _ThreadCpuUsed " << _ThreadCpuUsed) ;
703 }
704
705 //=============================================================================
706 /*! 
707  *  C++ method:
708  */
709 //=============================================================================
710
711 long Engines_Parallel_Component_i::CpuUsed()
712 {
713   long cpu = 0 ;
714 #ifndef WNT
715   struct rusage usage ;
716   if ( _ThreadId || _Executed )
717     {
718       if ( getrusage( RUSAGE_SELF , &usage ) == -1 )
719         {
720           perror("Engines_Parallel_Component_i::CpuUsed") ;
721           return 0 ;
722         }
723       cpu = usage.ru_utime.tv_sec - _StartUsed ;
724       // cout << pthread_self() << " Engines_Parallel_Component_i::CpuUsed " << " "
725       //      << _serviceName   << usage.ru_utime.tv_sec << " - " << _StartUsed
726       //      << " = " << cpu << endl ;
727     }
728   else
729     {
730       // cout << pthread_self() << "Engines_Parallel_Component_i::CpuUsed _ThreadId "
731       //      << _ThreadId << " " << _serviceName<< " _StartUsed " 
732       //      << _StartUsed << endl ;
733     }
734 #else
735         // NOT implementet yet
736 #endif
737
738
739   return cpu ;
740 }
741
742 //=============================================================================
743 /*! 
744  *  C++ method: Send message to event channel
745  */
746 //=============================================================================
747
748 void Engines_Parallel_Component_i::sendMessage(const char *event_type,
749                                       const char *message)
750 {
751     _notifSupplier->Send(graphName(), nodeName(), event_type, message);
752 }
753
754 //=============================================================================
755 /*! 
756  *  C++ method: return standard library name built on component name
757  */
758 //=============================================================================
759
760 string Engines_Parallel_Component_i::GetDynLibraryName(const char *componentName)
761 {
762   string ret="lib";
763   ret+=componentName;
764   ret+="Engine.so";
765   return ret;
766 }
767
768 //=============================================================================
769 /*! 
770  *  C++ method: DumpPython default implementation
771  */
772 //=============================================================================
773
774 Engines::TMPFile* Engines_Parallel_Component_i::DumpPython(CORBA::Object_ptr theStudy, 
775                                                            CORBA::Boolean isPublished, 
776                                                            CORBA::Boolean& isValidScript)
777 {
778   const char* aScript = "def RebuildData(theStudy): pass";
779   char* aBuffer = new char[strlen(aScript)+1];
780   strcpy(aBuffer, aScript);
781   CORBA::Octet* anOctetBuf =  (CORBA::Octet*)aBuffer;
782   int aBufferSize = strlen(aBuffer)+1;
783   Engines::TMPFile_var aStreamFile = new Engines::TMPFile(aBufferSize, aBufferSize, anOctetBuf, 1); 
784   isValidScript = true;
785   return aStreamFile._retn(); 
786 }
787
788
789 Engines::Salome_file_ptr 
790 Engines_Parallel_Component_i::setInputFileToService(const char* service_name, 
791                                                     const char* Salome_file_name) 
792 {
793   // Try to find the service, if it doesn't exist, we add it.
794   _Service_file_map_it = _Input_Service_file_map.find(service_name);
795   if (_Service_file_map_it ==  _Input_Service_file_map.end()) {
796     _t_Salome_file_map * _map = new _t_Salome_file_map();
797     _Input_Service_file_map[service_name] = _map;
798     _t_Proxy_Salome_file_map * _proxy_map = new _t_Proxy_Salome_file_map();
799     _Proxy_Input_Service_file_map[service_name] = _proxy_map;
800     _t_IOR_Proxy_Salome_file_map * _IOR_proxy_map = new _t_IOR_Proxy_Salome_file_map();
801     _IOR_Proxy_Input_Service_file_map[service_name] = _IOR_proxy_map;
802   }
803   _t_Salome_file_map * _map = _Input_Service_file_map[service_name];
804   _t_Proxy_Salome_file_map * _proxy_map = _Proxy_Input_Service_file_map[service_name];
805   _t_IOR_Proxy_Salome_file_map * _IOR_proxy_map = _IOR_Proxy_Input_Service_file_map[service_name];
806   
807   pthread_mutex_lock(deploy_mutex);
808   std::string proxy_ior;
809
810   // Try to find the Salome_file ...
811   _Salome_file_map_it = _map->find(Salome_file_name);
812   if (_Salome_file_map_it ==  _map->end()) {
813
814     // We create a new PaCO++ object.
815     // He has the same configuration than
816     // his component
817
818     // Firstly, we have to create the proxy object
819     // of the Salome_file and transmit his
820     // reference to the other nodes.
821     if (getMyRank() == 0) {
822       Engines::Parallel_Salome_file_proxy_impl * proxy = 
823         new Engines::Parallel_Salome_file_proxy_impl(CORBA::ORB::_duplicate(_orb));
824       PaCO_operation * proxy_global_ptr =  proxy->getContext("global_paco_context");
825       // We initialize the object with the context of the Parallel component
826       PaCO_operation * compo_global_ptr =  getContext("global_paco_context");
827       //compo_global_ptr->init_context(proxy_global_ptr);
828       proxy_global_ptr->init_context(compo_global_ptr);
829       
830       paco_fabrique_manager* pfm = paco_getFabriqueManager();
831       pfm->register_com("dummy", new paco_dummy_fabrique());
832       proxy_global_ptr->setComFab(NULL);
833       proxy_global_ptr->setLibCom("dummy",NULL);
834       
835       proxy_global_ptr->setTypeClient(true);
836       PaCO::PacoTopology_t client_topo;
837       client_topo.total = 1;
838       proxy_global_ptr->setClientTopo(client_topo);
839       PaCO::PacoTopology_t serveur_topo;
840       serveur_topo.total = getTotalNode();
841       proxy->setTopo(serveur_topo);
842
843       // We register the CORBA objet into the POA
844       CORBA::Object_ptr proxy_ref = proxy->_this();
845
846       // We send the reference to all the nodes...
847       CORBA::Object_ptr comp_proxy = _orb->string_to_object(_ior.c_str());
848       Engines::Parallel_Component_var component_proxy = Engines::Parallel_Component::_narrow(comp_proxy);
849       component_proxy->send_parallel_proxy_object(proxy_ref);
850
851       // Adding proxy into the map
852       (*_proxy_map)[Salome_file_name] = proxy;
853     }
854     else {
855       this->wait_parallel_object_proxy();
856     }
857
858     proxy_ior = this->get_parallel_proxy_object();
859     (*_IOR_proxy_map)[Salome_file_name] = proxy_ior;
860
861     // We register each node of the parallel Salome_file object
862     // into the proxy.
863     for (int i = 0; i < getTotalNode(); i++) {
864       if (i ==  getMyRank()) {
865         Parallel_Salome_file_i * servant = 
866           new Parallel_Salome_file_i(CORBA::ORB::_duplicate(_orb), proxy_ior.c_str());
867         PaCO_operation * servant_global_ptr = servant->getContext("global_paco_context");
868         
869         // We initialize the object with the context of the Parallel component
870         PaCO_operation * compo_global_ptr =  this->getContext("global_paco_context");
871 //      compo_global_ptr->init_context(servant_global_ptr);
872         servant_global_ptr->init_context(compo_global_ptr);
873         
874         // We register the CORBA objet into the POA
875         servant->POA_PaCO::InterfaceParallel::_this();
876
877         // Register the servant
878         servant->deploy(getMyRank());
879
880         // Adding servant to the map
881         (*_map)[Salome_file_name] = servant;
882       }
883
884       PaCO_operation * compo_global_ptr =  this->getContext("global_paco_context");
885       compo_global_ptr->my_com->paco_barrier();
886     }
887
888     // Parallel_Salome_file is created and deployed
889     delete _proxy;
890     _proxy = NULL;
891   }
892   pthread_mutex_unlock(deploy_mutex);
893   proxy_ior = (*_IOR_proxy_map)[Salome_file_name];
894   CORBA::Object_ptr proxy_ref = _orb->string_to_object(proxy_ior.c_str());
895   return Engines::Salome_file::_narrow(proxy_ref);
896 }
897
898 Engines::Salome_file_ptr 
899 Engines_Parallel_Component_i::setOutputFileToService(const char* service_name, 
900                                                      const char* Salome_file_name) 
901 {
902   // Try to find the service, if it doesn't exist, we add it.
903   _Service_file_map_it = _Output_Service_file_map.find(service_name);
904   if (_Service_file_map_it ==  _Output_Service_file_map.end()) {
905     _t_Salome_file_map * _map = new _t_Salome_file_map();
906     _Output_Service_file_map[service_name] = _map;
907     _t_Proxy_Salome_file_map * _proxy_map = new _t_Proxy_Salome_file_map();
908     _Proxy_Output_Service_file_map[service_name] = _proxy_map;
909     _t_IOR_Proxy_Salome_file_map * _IOR_proxy_map = new _t_IOR_Proxy_Salome_file_map();
910     _IOR_Proxy_Output_Service_file_map[service_name] = _IOR_proxy_map;
911   }
912   _t_Salome_file_map * _map = _Output_Service_file_map[service_name];
913   _t_Proxy_Salome_file_map * _proxy_map = _Proxy_Output_Service_file_map[service_name];
914   _t_IOR_Proxy_Salome_file_map * _IOR_proxy_map = _IOR_Proxy_Output_Service_file_map[service_name];
915   
916   pthread_mutex_lock(deploy_mutex);
917   std::string proxy_ior;
918
919   // Try to find the Salome_file ...
920   _Salome_file_map_it = _map->find(Salome_file_name);
921   if (_Salome_file_map_it ==  _map->end()) {
922
923     // We create a new PaCO++ object.
924     // He has the same configuration than
925     // his component
926
927     // Firstly, we have to create the proxy object
928     // of the Salome_file and transmit his
929     // reference to the other nodes.
930     if (getMyRank() == 0) {
931       Engines::Parallel_Salome_file_proxy_impl * proxy = 
932         new Engines::Parallel_Salome_file_proxy_impl(CORBA::ORB::_duplicate(_orb));
933       PaCO_operation * proxy_global_ptr =  proxy->getContext("global_paco_context");
934       // We initialize the object with the context of the Parallel component
935       PaCO_operation * compo_global_ptr =  getContext("global_paco_context");
936       //compo_global_ptr->init_context(proxy_global_ptr);
937       proxy_global_ptr->init_context(compo_global_ptr);
938
939       paco_fabrique_manager* pfm = paco_getFabriqueManager();
940       pfm->register_com("dummy", new paco_dummy_fabrique());
941       proxy_global_ptr->setComFab(NULL);
942       proxy_global_ptr->setLibCom("dummy",NULL);
943
944       proxy_global_ptr->setTypeClient(true);
945       PaCO::PacoTopology_t client_topo;
946       client_topo.total = 1;
947       proxy_global_ptr->setClientTopo(client_topo);
948       PaCO::PacoTopology_t serveur_topo;
949       serveur_topo.total = getTotalNode();
950       proxy->setTopo(serveur_topo);
951
952       // We register the CORBA objet into the POA
953       CORBA::Object_ptr proxy_ref = proxy->_this();
954
955       // We send the reference to all the nodes...
956       CORBA::Object_ptr comp_proxy = _orb->string_to_object(_ior.c_str());
957       Engines::Parallel_Component_var component_proxy = Engines::Parallel_Component::_narrow(comp_proxy);
958       component_proxy->send_parallel_proxy_object(proxy_ref);
959
960       // Adding proxy into the map
961       (*_proxy_map)[Salome_file_name] = proxy;
962     }
963     else {
964       this->wait_parallel_object_proxy();
965     }
966
967     proxy_ior = this->get_parallel_proxy_object();
968     (*_IOR_proxy_map)[Salome_file_name] = proxy_ior;
969
970     // We register each node of the parallel Salome_file object
971     // into the proxy.
972     for (int i = 0; i < getTotalNode(); i++) {
973       if (i ==  getMyRank()) {
974         Parallel_Salome_file_i * servant = 
975           new Parallel_Salome_file_i(CORBA::ORB::_duplicate(_orb), proxy_ior.c_str());
976         PaCO_operation * servant_global_ptr = servant->getContext("global_paco_context");
977         
978         // We initialize the object with the context of the Parallel component
979         PaCO_operation * compo_global_ptr =  this->getContext("global_paco_context");
980 //      compo_global_ptr->init_context(servant_global_ptr);
981         servant_global_ptr->init_context(compo_global_ptr);
982         
983         // We register the CORBA objet into the POA
984         servant->POA_PaCO::InterfaceParallel::_this();
985
986         // Register the servant
987         servant->deploy(getMyRank());
988
989         // Adding servant to the map
990         (*_map)[Salome_file_name] = servant;
991       }
992
993       PaCO_operation * compo_global_ptr =  this->getContext("global_paco_context");
994       compo_global_ptr->my_com->paco_barrier();
995     }
996
997     // Parallel_Salome_file is created and deployed
998     delete _proxy;
999     _proxy = NULL;
1000   }
1001   pthread_mutex_unlock(deploy_mutex);
1002   proxy_ior = (*_IOR_proxy_map)[Salome_file_name];
1003   CORBA::Object_ptr proxy_ref = _orb->string_to_object(proxy_ior.c_str());
1004   return Engines::Salome_file::_narrow(proxy_ref);
1005 }
1006
1007 Engines::Salome_file_ptr 
1008 Engines_Parallel_Component_i::getInputFileToService(const char* service_name, 
1009                                                     const char* Salome_file_name) 
1010 {
1011   // Try to find the service, if it doesn't exist, we throw an exception.
1012   _Proxy_Service_file_map_it = _Proxy_Input_Service_file_map.find(service_name);
1013   if (_Proxy_Service_file_map_it ==  _Proxy_Input_Service_file_map.end()) {
1014     SALOME::ExceptionStruct es;
1015     es.type = SALOME::INTERNAL_ERROR;
1016     es.text = "service doesn't have salome files";
1017     throw SALOME::SALOME_Exception(es);
1018   }
1019   _t_Proxy_Salome_file_map * _map = _Proxy_Input_Service_file_map[service_name];
1020
1021   // Try to find the Salome_file ...
1022   _Proxy_Salome_file_map_it = _map->find(Salome_file_name);
1023   if (_Proxy_Salome_file_map_it ==  _map->end()) {
1024     SALOME::ExceptionStruct es;
1025     es.type = SALOME::INTERNAL_ERROR;
1026     es.text = "service doesn't have this Salome_file";
1027     throw SALOME::SALOME_Exception(es);
1028   }
1029
1030   // Client get the proxy object
1031   Engines::Parallel_Salome_file_proxy_impl * Sfile = (*_map)[Salome_file_name];
1032   return Sfile->_this();
1033 }
1034
1035 Engines::Salome_file_ptr 
1036 Engines_Parallel_Component_i::getOutputFileToService(const char* service_name, 
1037                                                      const char* Salome_file_name) 
1038 {
1039   // Try to find the service, if it doesn't exist, we throw an exception.
1040   _Proxy_Service_file_map_it = _Proxy_Output_Service_file_map.find(service_name);
1041   if (_Proxy_Service_file_map_it ==  _Proxy_Output_Service_file_map.end()) {
1042     SALOME::ExceptionStruct es;
1043     es.type = SALOME::INTERNAL_ERROR;
1044     es.text = "service doesn't have salome files";
1045     throw SALOME::SALOME_Exception(es);
1046   }
1047   _t_Proxy_Salome_file_map * _map = _Proxy_Output_Service_file_map[service_name];
1048
1049   // Try to find the Salome_file ...
1050   _Proxy_Salome_file_map_it = _map->find(Salome_file_name);
1051   if (_Proxy_Salome_file_map_it ==  _map->end()) {
1052     SALOME::ExceptionStruct es;
1053     es.type = SALOME::INTERNAL_ERROR;
1054     es.text = "service doesn't have this Salome_file";
1055     throw SALOME::SALOME_Exception(es);
1056   }
1057
1058   // Client get the proxy object
1059   Engines::Parallel_Salome_file_proxy_impl * Sfile = (*_map)[Salome_file_name];
1060   return Sfile->_this();
1061 }
1062
1063
1064 void 
1065 Engines_Parallel_Component_i::checkInputFilesToService(const char* service_name) 
1066 {
1067   // Try to find the service, if it doesn't exist, nothing to do.
1068   _Proxy_Service_file_map_it = _Proxy_Input_Service_file_map.find(service_name);
1069   if (_Proxy_Service_file_map_it !=  _Proxy_Input_Service_file_map.end()) {
1070     _t_Proxy_Salome_file_map * _proxy_map = _Proxy_Input_Service_file_map[service_name];
1071     _t_Proxy_Salome_file_map::iterator begin = _proxy_map->begin();
1072     _t_Proxy_Salome_file_map::iterator end = _proxy_map->end();
1073
1074     for(;begin!=end;begin++) {
1075       Engines::Parallel_Salome_file_proxy_impl * file = begin->second;
1076       std::string file_port_name = begin->first;
1077       configureSalome_file(service_name, file_port_name, file);
1078       file->recvFiles();
1079     }
1080   }
1081 }
1082
1083 void 
1084 Engines_Parallel_Component_i::checkOutputFilesToService(const char* service_name) 
1085 {
1086   // Try to find the service, if it doesn't exist, nothing to do.
1087   _Proxy_Service_file_map_it = _Proxy_Output_Service_file_map.find(service_name);
1088   if (_Proxy_Service_file_map_it !=  _Proxy_Output_Service_file_map.end()) {
1089     _t_Proxy_Salome_file_map * _map = _Proxy_Output_Service_file_map[service_name];
1090     _t_Proxy_Salome_file_map::iterator begin = _map->begin();
1091     _t_Proxy_Salome_file_map::iterator end = _map->end();
1092
1093     for(;begin!=end;begin++) {
1094       Engines::Parallel_Salome_file_proxy_impl * file = begin->second;
1095       std::string file_port_name = begin->first;
1096       configureSalome_file(service_name, file_port_name, file);
1097       file->recvFiles();
1098     }
1099   }
1100
1101 }
1102
1103 //=============================================================================
1104 /*! 
1105  *  C++ method: Used by the Parallel Component to deploy a Parallel Salome_file
1106  */ 
1107 //=============================================================================
1108 void 
1109 Engines_Parallel_Component_i::send_parallel_proxy_object(CORBA::Object_ptr proxy_ref) {
1110   _proxy = _orb->object_to_string(proxy_ref);
1111 }
1112
1113 //=============================================================================
1114 /*! 
1115  *  C++ method: Used by the Parallel Component to deploy a Parallel Salome_file
1116  */ 
1117 //=============================================================================
1118 void 
1119 Engines_Parallel_Component_i::wait_parallel_object_proxy() {
1120   char * proxy = NULL;
1121   proxy =  get_parallel_proxy_object();
1122   while(proxy == NULL)
1123   {
1124     sleep(1);
1125     proxy = get_parallel_proxy_object();
1126   }
1127 }
1128
1129 //=============================================================================
1130 /*! 
1131  *  C++ method: Used by the Parallel Component to deploy a Parallel Salome_file
1132  */ 
1133 //=============================================================================
1134 char * 
1135 Engines_Parallel_Component_i::get_parallel_proxy_object() {
1136   return _proxy;
1137 }
1138
1139
1140 //=============================================================================
1141 /*! 
1142  *  C++ method: used to configure the Salome_file into the runtime.
1143  *  \param service_name name of the service that use this Salome_file
1144  *  \param file_port_name name of the Salome_file
1145  *  \param file Parallel Salome_file C++ object
1146  */
1147 //=============================================================================
1148 void
1149 Engines_Parallel_Component_i::configureSalome_file(std::string service_name,
1150                                                    std::string file_port_name,
1151                                                    Engines::Parallel_Salome_file_proxy_impl * file) 
1152 {
1153   // By default this method does nothing
1154 }
1155