Salome HOME
8e8cc6aee039c82c1e4b95072d231312e63245d2
[modules/kernel.git] / src / ParallelContainer / SALOME_ParallelComponent_i.cxx
1 //  Copyright (C) 2007-2008  CEA/DEN, EDF R&D, OPEN CASCADE
2 //
3 //  Copyright (C) 2003-2007  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 //  CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
5 //
6 //  This library is free software; you can redistribute it and/or
7 //  modify it under the terms of the GNU Lesser General Public
8 //  License as published by the Free Software Foundation; either
9 //  version 2.1 of the License.
10 //
11 //  This library is distributed in the hope that it will be useful,
12 //  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 //  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 //  Lesser General Public License for more details.
15 //
16 //  You should have received a copy of the GNU Lesser General Public
17 //  License along with this library; if not, write to the Free Software
18 //  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
19 //
20 //  See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 //
22 //  SALOME_ParallelComponent : implementation of container and engine for Parallel Kernel
23 //  File   : SALOME_ParallelComponent_i.cxx
24 //  Author : AndrĂ© RIBES, EDF
25 //  Author : Paul RASCLE, EDF - MARC TAJCHMAN, CEA
26 //
27 #include "SALOME_ParallelComponent_i.hxx"
28 #include "SALOME_ParallelContainer_i.hxx"
29
30 #include "OpUtil.hxx"
31 #include <stdio.h>
32 #ifndef WIN32
33 #include <dlfcn.h>
34 #endif
35 #include <cstdlib>
36 #include "utilities.h"
37
38 #ifndef WIN32
39 #include <sys/time.h>
40 #include <sys/resource.h>
41 #include <unistd.h>
42 #else
43 #include <sys/timeb.h>
44 int SIGUSR11 = 1000;
45 #endif
46
47 #include <paco_dummy.h>
48 #include <paco_omni.h>
49
50 using namespace std;
51
52 extern bool _Sleeping ;
53 static Engines_Parallel_Component_i * theEngines_Component ;
54
55 bool Engines_Parallel_Component_i::_isMultiStudy = true;
56 bool Engines_Parallel_Component_i::_isMultiInstance = false;
57
58 //=============================================================================
59 /*! 
60  *  Default constructor, not for use
61  */
62 //=============================================================================
63
64 Engines_Parallel_Component_i::Engines_Parallel_Component_i(CORBA::ORB_ptr orb, char * ior, int rank) : 
65   InterfaceParallel_impl(orb,ior,rank), 
66   Engines::Component_serv(orb,ior,rank), 
67   Engines::Parallel_Component_serv(orb,ior,rank)
68 {
69   //ASSERT(0);
70   INFOS("Default Constructor...");
71 }
72
73 //=============================================================================
74 /*! 
75  *  Standard Constructor for generic Component, used in derived class
76  *  Connection to Registry and Notification
77  *  \param orb Object Request broker given by Container
78  *  \parap poa Portable Object Adapter from Container (normally root_poa)
79  *  \param contId container CORBA id inside the server
80  *  \param instanceName unique instance name for this object (see Container_i)
81  *  \param interfaceName component class name
82  *  \param notif use of notification
83  */
84 //=============================================================================
85
86 Engines_Parallel_Component_i::Engines_Parallel_Component_i(CORBA::ORB_ptr orb, char * ior, int rank,
87                                          PortableServer::POA_ptr poa, 
88                                          PortableServer::ObjectId * contId, 
89                                          const char *instanceName,
90                                          const char *interfaceName,
91                                          bool notif) :
92   InterfaceParallel_impl(orb,ior,rank), 
93   Engines::Component_serv(orb,ior,rank),
94   Engines::Parallel_Component_serv(orb,ior,rank),
95   _instanceName(instanceName),
96   _interfaceName(interfaceName),
97   _myConnexionToRegistry(0),
98   _ThreadId(0) ,
99   _ThreadCpuUsed(0) ,
100   _Executed(false) ,
101   _graphName("") ,
102   _nodeName(""),
103  _studyId(-1)
104 {
105   MESSAGE("Component constructor with instanceName "<< _instanceName);
106   //SCRUTE(pd_refCount);
107   _orb = CORBA::ORB::_duplicate(orb);
108   _poa = PortableServer::POA::_duplicate(poa);
109   _contId = contId ;
110   CORBA::Object_var o = _poa->id_to_reference(*contId); // container ior...
111   const CORBA::String_var the_ior = _orb->object_to_string(o);
112   _myConnexionToRegistry = new RegistryConnexion(0, 0, the_ior,"theSession",
113                                                  _instanceName.c_str());
114
115   _notifSupplier = new NOTIFICATION_Supplier(instanceName, notif);
116
117   deploy_mutex = new pthread_mutex_t();
118   pthread_mutex_init(deploy_mutex, NULL);
119   _proxy = NULL;
120  //SCRUTE(pd_refCount);
121 }
122
123 //=============================================================================
124 /*! 
125  *  Destructor: call Container for decrement of instances count.
126  *  When instances count falls to 0, the container tries to remove the
127  *  component library (dlclose)
128  */
129 //=============================================================================
130
131 Engines_Parallel_Component_i::~Engines_Parallel_Component_i()
132 {
133   MESSAGE("Component destructor");
134   Engines_Parallel_Container_i::decInstanceCnt(_interfaceName);
135   pthread_mutex_destroy(deploy_mutex);
136   delete deploy_mutex;
137
138   if (_proxy)
139     delete _proxy;
140 }
141
142 //=============================================================================
143 /*! 
144  *  CORBA method: return name of the instance, unique in this Container
145  */
146 //=============================================================================
147
148 char* Engines_Parallel_Component_i::instanceName()
149 {
150    return CORBA::string_dup(_instanceName.c_str()) ;
151 }
152
153 //=============================================================================
154 /*! 
155  *  CORBA method: return name of the component class
156  */
157 //=============================================================================
158
159 char* Engines_Parallel_Component_i::interfaceName()
160 {
161   return CORBA::string_dup(_interfaceName.c_str()) ;
162 }
163
164 //=============================================================================
165 /*! 
166  *  CORBA method: Get study Id
167  *  \return -1: not initialised (Internal Error)
168  *           0: multistudy component instance
169  *          >0: study id associated to this instance
170  */
171 //=============================================================================
172
173 CORBA::Long Engines_Parallel_Component_i::getStudyId()
174 {
175   return _studyId;
176 }
177
178 //=============================================================================
179 /*! 
180  *  CORBA method: Test if instance is alive and responds
181  */
182 //=============================================================================
183
184 void Engines_Parallel_Component_i::ping()
185 {
186   //  MESSAGE("Engines_Parallel_Component_i::ping_c() pid "<< getpid() << " threadid " << pthread_self());
187 }
188
189 //=============================================================================
190 /*! 
191  *  CORBA method: Deactivate this instance. CORBA object is deactivated (do not
192  *  respond any more to CORBA calls), the connection to Regsitry is removed
193  *  (Registry informed of deactivation), internal server reference counter on
194  *  the derived servant class is decremented, to allow destruction of the class
195  *  (delete) by POA, when there are no more references.
196  *  -- TO BE USED BY CONTAINER ONLY (Container housekeeping) --
197  */
198 //=============================================================================
199
200 void Engines_Parallel_Component_i::destroy()
201 {
202   MESSAGE("Engines_Parallel_Component_i::destroy()");
203   //SCRUTE(pd_refCount);
204
205   delete _notifSupplier;
206   _notifSupplier = 0;
207
208   delete _myConnexionToRegistry;
209   _myConnexionToRegistry = 0 ;
210   _poa->deactivate_object(*_id) ;
211   CORBA::release(_poa) ;
212   delete(_id) ;
213   //SCRUTE(pd_refCount);
214   _thisObj->_remove_ref();
215   //SCRUTE(pd_refCount);
216   MESSAGE("Engines_Parallel_Component_i::destroyed") ;
217 }
218
219 //=============================================================================
220 /*! 
221  *  CORBA method: return CORBA reference of the Container
222  *
223  */
224 //=============================================================================
225
226 Engines::Container_ptr Engines_Parallel_Component_i::GetContainerRef()
227 {
228   MESSAGE("Engines_Parallel_Component_i::GetContainerRef");
229   CORBA::Object_ptr o = _poa->id_to_reference(*_contId) ;
230   return Engines::Container::_narrow(o);
231 }
232
233 //=============================================================================
234 /*! 
235  *  CORBA method: 
236  *  Gives a sequence of (key=string,value=any) to the component. 
237  *  Base class component stores the sequence in a map.
238  *  The map is cleared before.
239  *  This map is for use by derived classes. 
240  *  \param dico sequence of (key=string,value=any)
241  */
242 //=============================================================================
243
244 void Engines_Parallel_Component_i::setProperties(const Engines::FieldsDict& dico)
245 {
246   _fieldsDict.clear();
247   for (CORBA::ULong i=0; i<dico.length(); i++)
248     {
249       std::string cle(dico[i].key);
250       _fieldsDict[cle] = dico[i].value;
251     }
252 }
253
254 //=============================================================================
255 /*! 
256  *  CORBA method: 
257  *  returns a previously stored map (key=string,value=any) as a sequence.
258  *  (see setProperties)
259  */
260 //=============================================================================
261
262 Engines::FieldsDict* Engines_Parallel_Component_i::getProperties()
263 {
264   Engines::FieldsDict_var copie = new Engines::FieldsDict;
265   copie->length(_fieldsDict.size());
266   map<std::string,CORBA::Any>::iterator it;
267   CORBA::ULong i = 0;
268   for (it = _fieldsDict.begin(); it != _fieldsDict.end(); it++, i++)
269     {
270       std::string cle((*it).first);
271       copie[i].key = CORBA::string_dup(cle.c_str());
272       copie[i].value = _fieldsDict[cle];
273     }
274   return copie._retn();
275 }
276
277 //=============================================================================
278 /*! 
279  *  CORBA method: used by Supervision to give names to this instance
280  */
281 //=============================================================================
282
283 void Engines_Parallel_Component_i::Names( const char * graphName ,
284                                  const char * nodeName )
285 {
286   _graphName = graphName ;
287   _nodeName = nodeName ;
288   INFOS("Engines_Parallel_Component_i::Names( '" << _graphName << "' , '"
289         << _nodeName << "' )");
290 }
291
292 //=============================================================================
293 /*! 
294  *  CORBA method: used in Supervision
295  */
296 //=============================================================================
297
298 bool Engines_Parallel_Component_i::Kill_impl() 
299 {
300 //  MESSAGE("Engines_Parallel_Component_i::Kill_i() pthread_t "<< pthread_self()
301 //          << " pid " << getpid() << " instanceName "
302 //          << _instanceName.c_str() << " interface " << _interfaceName.c_str()
303 //          << " machineName " << GetHostname().c_str()<< " _id " << hex << _id
304 //          << dec << " _ThreadId " << _ThreadId << " this " << hex << this
305 //          << dec ) ;
306
307   bool RetVal = false ;
308 #ifndef WIN32
309   if ( _ThreadId > 0 && pthread_self() != _ThreadId )
310     {
311       RetVal = Killer( _ThreadId , 0 ) ;
312       _ThreadId = (pthread_t ) -1 ;
313     }
314
315 #else
316   if ( _ThreadId > 0 && pthread_self().p != _ThreadId->p )
317     {
318       RetVal = Killer( *_ThreadId , 0 ) ;
319       _ThreadId = (pthread_t* ) 0 ;
320     }
321
322 #endif
323   return RetVal ;
324 }
325
326 //=============================================================================
327 /*! 
328  *  CORBA method: used in Supervision
329  */
330 //=============================================================================
331
332 bool Engines_Parallel_Component_i::Stop_impl()
333 {
334   MESSAGE("Engines_Parallel_Component_i::Stop_i() pthread_t "<< pthread_self()
335           << " pid " << getpid() << " instanceName "
336           << _instanceName.c_str() << " interface " << _interfaceName.c_str()
337           << " machineName " << Kernel_Utils::GetHostname().c_str()<< " _id " << hex << _id
338           << dec << " _ThreadId " << _ThreadId );
339   
340
341   bool RetVal = false ;
342 #ifndef WIN32
343   if ( _ThreadId > 0 && pthread_self() != _ThreadId )
344     {
345       RetVal = Killer( _ThreadId , 0 ) ;
346       _ThreadId = (pthread_t ) -1 ;
347     }
348 #else
349   if ( _ThreadId > 0 && pthread_self().p != _ThreadId->p )
350     {
351       RetVal = Killer( *_ThreadId , 0 ) ;
352       _ThreadId = (pthread_t* ) 0 ;
353     }
354 #endif
355   return RetVal ;
356 }
357
358 //=============================================================================
359 /*! 
360  *  CORBA method: used in Supervision
361  */
362 //=============================================================================
363
364 bool Engines_Parallel_Component_i::Suspend_impl()
365 {
366   MESSAGE("Engines_Parallel_Component_i::Suspend_i() pthread_t "<< pthread_self()
367           << " pid " << getpid() << " instanceName "
368           << _instanceName.c_str() << " interface " << _interfaceName.c_str()
369           << " machineName " << Kernel_Utils::GetHostname().c_str()<< " _id " << hex << _id
370           << dec << " _ThreadId " << _ThreadId );
371
372   bool RetVal = false ;
373 #ifndef WIN32
374   if ( _ThreadId > 0 && pthread_self() != _ThreadId )
375 #else
376   if ( _ThreadId > 0 && pthread_self().p != _ThreadId->p )
377 #endif
378     {
379       if ( _Sleeping )
380         {
381           return false ;
382         }
383     else 
384       {
385 #ifndef WIN32
386         RetVal = Killer( _ThreadId ,SIGINT ) ;
387 #else
388         RetVal = Killer( *_ThreadId ,SIGINT ) ;
389 #endif
390         //if ( RetVal ) _Sleeping = true;
391
392       }
393     }
394   return RetVal ;
395 }
396
397 //=============================================================================
398 /*! 
399  *  CORBA method: used in Supervision
400  */
401 //=============================================================================
402
403 bool Engines_Parallel_Component_i::Resume_impl()
404 {
405   MESSAGE("Engines_Parallel_Component_i::Resume_i() pthread_t "<< pthread_self()
406           << " pid " << getpid() << " instanceName "
407           << _instanceName.c_str() << " interface " << _interfaceName.c_str()
408           << " machineName " << Kernel_Utils::GetHostname().c_str()<< " _id " << hex << _id
409           << dec << " _ThreadId " << _ThreadId );
410   bool RetVal = false ;
411 #ifndef WIN32
412   if ( _ThreadId > 0 && pthread_self() != _ThreadId )
413 #else
414   if ( _ThreadId > 0 && pthread_self().p != _ThreadId->p )
415 #endif
416     {
417     if ( _Sleeping ) 
418       {
419         _Sleeping = false ;
420         RetVal = true ;
421       }
422     else
423       {
424         RetVal = false ;
425       }
426     }
427   return RetVal ;
428 }
429
430 //=============================================================================
431 /*! 
432  *  CORBA method: 
433  */
434 //=============================================================================
435
436 CORBA::Long Engines_Parallel_Component_i::CpuUsed_impl()
437 {
438   long cpu = 0 ;
439   if ( _ThreadId || _Executed )
440     {
441     if ( _ThreadId > 0 )
442       {
443 #ifndef WIN32
444       if ( pthread_self() != _ThreadId )
445 #else
446       if ( pthread_self().p != _ThreadId->p )
447 #endif
448         {
449         if ( _Sleeping )
450           {
451           }
452         else
453           {
454             // Get Cpu in the appropriate thread with that object !...
455             theEngines_Component = this ;
456 #ifndef WIN32
457             Killer( _ThreadId ,SIGUSR1 ) ;
458 #else
459             Killer( *_ThreadId ,SIGUSR11 ) ;
460 #endif
461           }
462         cpu = _ThreadCpuUsed ;
463         }
464       else
465         {
466           _ThreadCpuUsed = CpuUsed() ;
467           cpu = _ThreadCpuUsed ;
468           // cout << pthread_self() << " Engines_Parallel_Component_i::CpuUsed_impl "
469           //      << _serviceName << " " << cpu << endl ;
470       }
471     }
472     else 
473       {
474         cpu = _ThreadCpuUsed ;
475         // cout << pthread_self() << " Engines_Parallel_Component_i::CpuUsed_impl "
476         //      << _serviceName << " " << cpu<< endl ;
477       }
478     }
479   else
480     {
481       // cout<< pthread_self()<<"Engines_Parallel_Component_i::CpuUsed_impl _ThreadId "
482       //     <<_ThreadId <<" "<<_serviceName<<" _StartUsed "<<_StartUsed<<endl;
483     }
484   return cpu ;
485 }
486
487
488 //=============================================================================
489 /*! 
490  *  C++ method: return Container Servant
491  */
492 //=============================================================================
493
494 Engines_Parallel_Container_i *Engines_Parallel_Component_i::GetContainerPtr()
495 {
496   return dynamic_cast<Engines_Parallel_Container_i*>(_poa->id_to_servant(*_contId)) ;
497 }
498
499 //=============================================================================
500 /*! 
501  *  C++ method: set study Id
502  *  \param studyId         0 if instance is not associated to a study, 
503  *                         >0 otherwise (== study id)
504  *  \return true if the set of study Id is OK
505  *  must be set once by Container, at instance creation,
506  *  and cannot be changed after.
507  */
508 //=============================================================================
509
510 CORBA::Boolean Engines_Parallel_Component_i::setStudyId(CORBA::Long studyId)
511 {
512   ASSERT( studyId >= 0);
513   CORBA::Boolean ret = false;
514   if (_studyId < 0) // --- not yet initialized 
515     {
516       _studyId = studyId;
517       ret = true;
518     }
519   else
520     if ( _studyId == studyId) ret = true;
521   return ret;
522 }
523
524 //=============================================================================
525 /*! 
526  *  C++ method: return CORBA instance id, the id is set in derived class
527  *  constructor, when instance is activated.
528  */
529 //=============================================================================
530
531 PortableServer::ObjectId * Engines_Parallel_Component_i::getId()
532 {
533 //  MESSAGE("PortableServer::ObjectId * Engines_Parallel_Component_i::getId()");
534   return _id ;
535 }
536
537 //=============================================================================
538 /*! 
539  *  C++ method: used by derived classes for supervision
540  */
541 //=============================================================================
542
543 void Engines_Parallel_Component_i::beginService(const char *serviceName)
544 {
545   MESSAGE(pthread_self() << "Send BeginService notification for " <<serviceName
546           << endl << "Component instance : " << _instanceName << endl << endl);
547 #ifndef WIN32
548   _ThreadId = pthread_self() ;
549 #else
550   _ThreadId = new pthread_t;
551   _ThreadId->p = pthread_self().p ;
552   _ThreadId->x = pthread_self().x ;
553 #endif
554   _StartUsed = 0 ;
555   _StartUsed = CpuUsed_impl() ;
556   _ThreadCpuUsed = 0 ;
557   _Executed = true ;
558   _serviceName = serviceName ;
559   if ( pthread_setcanceltype( PTHREAD_CANCEL_ASYNCHRONOUS , NULL ) )
560     {
561       perror("pthread_setcanceltype ") ;
562       exit(0) ;
563     }
564   if ( pthread_setcancelstate( PTHREAD_CANCEL_ENABLE , NULL ) )
565     {
566       perror("pthread_setcancelstate ") ;
567       exit(0) ;
568     }
569 //  MESSAGE(pthread_self() << " Return from BeginService for " << serviceName
570 //          << " ThreadId " << _ThreadId << " StartUsed " << _StartUsed
571 //          << " _graphName " << _graphName << " _nodeName " << _nodeName );
572
573   // --- for supervisor : all strings given with setProperties
574   //     are set in environment
575   bool overwrite = true;
576   map<std::string,CORBA::Any>::iterator it;
577   for (it = _fieldsDict.begin(); it != _fieldsDict.end(); it++)
578     {
579       std::string cle((*it).first);
580       if ((*it).second.type()->kind() == CORBA::tk_string)
581         {
582           const char* value;
583           (*it).second >>= value;
584           // ---todo: replace __GNUC__ test by an autoconf macro AC_CHECK_FUNC.
585 #if defined __GNUC__
586           //int ret = setenv(cle.c_str(), value, overwrite);
587           setenv(cle.c_str(), value, overwrite);
588 #else
589           //CCRT porting : setenv not defined in stdlib.h
590           std::string s(cle);
591           s+='=';
592           s+=value;
593           // char* cast because 1st arg of linux putenv function
594           // is not a const char* !
595           //int ret=putenv((char *)s.c_str());
596           putenv((char *)s.c_str());
597           //End of CCRT porting
598 #endif
599           MESSAGE("--- setenv: "<<cle<<" = "<< value);
600         }
601     }
602 }
603
604 //=============================================================================
605 /*! 
606  *  C++ method: used by derived classes for supervision
607  */
608 //=============================================================================
609
610 void Engines_Parallel_Component_i::endService(const char *serviceName)
611 {
612   _ThreadCpuUsed = CpuUsed_impl() ;
613   MESSAGE(pthread_self() << " Send EndService notification for " << serviceName
614           << endl << " Component instance : " << _instanceName << " StartUsed "
615           << _StartUsed << " _ThreadCpuUsed "<< _ThreadCpuUsed << endl <<endl);
616   _ThreadId = 0 ;
617 }
618
619 //=============================================================================
620 /*! 
621  *  C++ method: -- CHECK IF USED --
622  */
623 //=============================================================================
624
625 char* Engines_Parallel_Component_i::graphName()
626 {
627   return CORBA::string_dup( _graphName.c_str() ) ;
628 }
629
630 //=============================================================================
631 /*! 
632  *  C++ method: -- CHECK IF USED --
633  */
634 //=============================================================================
635
636 char* Engines_Parallel_Component_i::nodeName()
637 {
638   return CORBA::string_dup( _nodeName.c_str() ) ;
639 }
640
641 //=============================================================================
642 /*! 
643  *  C++ method: used in Supervision (see kill_impl)
644  */
645 //=============================================================================
646
647 bool Engines_Parallel_Component_i::Killer( pthread_t ThreadId , int signum )
648 {
649 #ifndef WIN32
650   if ( ThreadId )
651 #else
652   if ( ThreadId.p )
653 #endif
654     {
655       if ( signum == 0 )
656         {
657           if ( pthread_cancel( ThreadId ) )
658             {
659               perror("Killer pthread_cancel error") ;
660               return false ;
661             }
662           else
663             {
664               MESSAGE(pthread_self() << "Killer : ThreadId " << ThreadId
665                       << " pthread_canceled") ;
666             }
667         }
668       else
669         {
670           if ( pthread_kill( ThreadId , signum ) == -1 )
671             {
672               perror("Killer pthread_kill error") ;
673               return false ;
674             }
675           else 
676             {
677               MESSAGE(pthread_self() << "Killer : ThreadId " << ThreadId
678                       << " pthread_killed(" << signum << ")") ;
679             }
680         }
681     }
682   return true ;
683 }
684
685 //=============================================================================
686 /*! 
687  *  C++ method:
688  */ 
689 //=============================================================================
690
691 void SetCpuUsed()
692 {
693   theEngines_Component->SetCurCpu() ;
694 }
695
696 //=============================================================================
697 /*! 
698  *  C++ method:
699  */
700 //=============================================================================
701
702 void Engines_Parallel_Component_i::SetCurCpu()
703 {
704   _ThreadCpuUsed =  CpuUsed() ;
705   //  MESSAGE(pthread_self() << 
706   //  " Engines_Parallel_Component_i::SetCurCpu() _ThreadCpuUsed " << _ThreadCpuUsed) ;
707 }
708
709 //=============================================================================
710 /*! 
711  *  C++ method:
712  */
713 //=============================================================================
714
715 long Engines_Parallel_Component_i::CpuUsed()
716 {
717   long cpu = 0 ;
718 #ifndef WIN32
719   struct rusage usage ;
720   if ( _ThreadId || _Executed )
721     {
722       if ( getrusage( RUSAGE_SELF , &usage ) == -1 )
723         {
724           perror("Engines_Parallel_Component_i::CpuUsed") ;
725           return 0 ;
726         }
727       cpu = usage.ru_utime.tv_sec - _StartUsed ;
728       // cout << pthread_self() << " Engines_Parallel_Component_i::CpuUsed " << " "
729       //      << _serviceName   << usage.ru_utime.tv_sec << " - " << _StartUsed
730       //      << " = " << cpu << endl ;
731     }
732   else
733     {
734       // cout << pthread_self() << "Engines_Parallel_Component_i::CpuUsed _ThreadId "
735       //      << _ThreadId << " " << _serviceName<< " _StartUsed " 
736       //      << _StartUsed << endl ;
737     }
738 #else
739         // NOT implementet yet
740 #endif
741
742
743   return cpu ;
744 }
745
746 //=============================================================================
747 /*! 
748  *  C++ method: Send message to event channel
749  */
750 //=============================================================================
751
752 void Engines_Parallel_Component_i::sendMessage(const char *event_type,
753                                       const char *message)
754 {
755     _notifSupplier->Send(graphName(), nodeName(), event_type, message);
756 }
757
758 //=============================================================================
759 /*! 
760  *  C++ method: return standard library name built on component name
761  */
762 //=============================================================================
763
764 string Engines_Parallel_Component_i::GetDynLibraryName(const char *componentName)
765 {
766   string ret="lib";
767   ret+=componentName;
768   ret+="Engine.so";
769   return ret;
770 }
771
772 //=============================================================================
773 /*! 
774  *  C++ method: DumpPython default implementation
775  */
776 //=============================================================================
777
778 Engines::TMPFile* Engines_Parallel_Component_i::DumpPython(CORBA::Object_ptr theStudy, 
779                                                            CORBA::Boolean isPublished, 
780                                                            CORBA::Boolean& isValidScript)
781 {
782   const char* aScript = "def RebuildData(theStudy): pass";
783   char* aBuffer = new char[strlen(aScript)+1];
784   strcpy(aBuffer, aScript);
785   CORBA::Octet* anOctetBuf =  (CORBA::Octet*)aBuffer;
786   int aBufferSize = strlen(aBuffer)+1;
787   Engines::TMPFile_var aStreamFile = new Engines::TMPFile(aBufferSize, aBufferSize, anOctetBuf, 1); 
788   isValidScript = true;
789   return aStreamFile._retn(); 
790 }
791
792
793 Engines::Salome_file_ptr 
794 Engines_Parallel_Component_i::setInputFileToService(const char* service_name, 
795                                                     const char* Salome_file_name) 
796 {
797   // Try to find the service, if it doesn't exist, we add it.
798   _Service_file_map_it = _Input_Service_file_map.find(service_name);
799   if (_Service_file_map_it ==  _Input_Service_file_map.end()) {
800     _t_Salome_file_map * _map = new _t_Salome_file_map();
801     _Input_Service_file_map[service_name] = _map;
802     _t_Proxy_Salome_file_map * _proxy_map = new _t_Proxy_Salome_file_map();
803     _Proxy_Input_Service_file_map[service_name] = _proxy_map;
804     _t_IOR_Proxy_Salome_file_map * _IOR_proxy_map = new _t_IOR_Proxy_Salome_file_map();
805     _IOR_Proxy_Input_Service_file_map[service_name] = _IOR_proxy_map;
806   }
807   _t_Salome_file_map * _map = _Input_Service_file_map[service_name];
808   _t_Proxy_Salome_file_map * _proxy_map = _Proxy_Input_Service_file_map[service_name];
809   _t_IOR_Proxy_Salome_file_map * _IOR_proxy_map = _IOR_Proxy_Input_Service_file_map[service_name];
810   
811   pthread_mutex_lock(deploy_mutex);
812   std::string proxy_ior;
813
814   // Try to find the Salome_file ...
815   _Salome_file_map_it = _map->find(Salome_file_name);
816   if (_Salome_file_map_it ==  _map->end()) {
817
818     // We create a new PaCO++ object.
819     // He has the same configuration than
820     // his component
821
822     // Firstly, we have to create the proxy object
823     // of the Salome_file and transmit his
824     // reference to the other nodes.
825     Engines::Parallel_Salome_file_proxy_impl * proxy = NULL;
826     if (getMyRank() == 0) {
827       proxy = new Engines::Parallel_Salome_file_proxy_impl(CORBA::ORB::_duplicate(_orb),
828                                                            new paco_omni_fabrique());
829       proxy->copyGlobalContext(this); 
830       PaCO::PacoTopology_t serveur_topo;
831       serveur_topo.total = getTotalNode();
832       proxy->setTopology(serveur_topo);
833
834       // We register the CORBA objet into the POA
835       CORBA::Object_ptr proxy_ref = proxy->_this();
836
837       // We send the reference to all the nodes...
838       Engines::Parallel_Component_var component_proxy = 
839         Engines::Parallel_Component::_narrow(InterfaceParallel_impl::_proxy);
840       component_proxy->send_parallel_proxy_object(proxy_ref);
841
842       // Adding proxy into the map
843       (*_proxy_map)[Salome_file_name] = proxy;
844     }
845     else {
846       this->wait_parallel_object_proxy();
847     }
848
849     proxy_ior = this->get_parallel_proxy_object();
850     (*_IOR_proxy_map)[Salome_file_name] = proxy_ior;
851
852     // We register each node of the parallel Salome_file object
853     // into the proxy.
854     for (int i = 0; i < getTotalNode(); i++) {
855       if (i ==  getMyRank()) {
856         Parallel_Salome_file_i * servant = 
857           new Parallel_Salome_file_i(CORBA::ORB::_duplicate(_orb), 
858                                      proxy_ior.c_str(),
859                                      i);
860         servant->copyGlobalContext(this); 
861         
862         // We register the CORBA objet into the POA
863         servant->POA_PaCO::InterfaceParallel::_this();
864
865         // Register the servant
866         servant->deploy();
867
868         // Adding servant to the map
869         (*_map)[Salome_file_name] = servant;
870       }
871
872       _my_com->paco_barrier();
873       // start parallel object
874       if (getMyRank() == 0) {
875         proxy->start();
876         _my_com->paco_barrier();
877       }
878       else
879         _my_com->paco_barrier();
880     }
881     // Parallel_Salome_file is created and deployed
882     delete _proxy;
883     _proxy = NULL;
884   }
885
886   pthread_mutex_unlock(deploy_mutex);
887   proxy_ior = (*_IOR_proxy_map)[Salome_file_name];
888   CORBA::Object_ptr proxy_ref = _orb->string_to_object(proxy_ior.c_str());
889   return Engines::Salome_file::_narrow(proxy_ref);
890 }
891
892 Engines::Salome_file_ptr 
893 Engines_Parallel_Component_i::setOutputFileToService(const char* service_name, 
894                                                      const char* Salome_file_name) 
895 {
896   // Try to find the service, if it doesn't exist, we add it.
897   _Service_file_map_it = _Output_Service_file_map.find(service_name);
898   if (_Service_file_map_it ==  _Output_Service_file_map.end()) {
899     _t_Salome_file_map * _map = new _t_Salome_file_map();
900     _Output_Service_file_map[service_name] = _map;
901     _t_Proxy_Salome_file_map * _proxy_map = new _t_Proxy_Salome_file_map();
902     _Proxy_Output_Service_file_map[service_name] = _proxy_map;
903     _t_IOR_Proxy_Salome_file_map * _IOR_proxy_map = new _t_IOR_Proxy_Salome_file_map();
904     _IOR_Proxy_Output_Service_file_map[service_name] = _IOR_proxy_map;
905   }
906   _t_Salome_file_map * _map = _Output_Service_file_map[service_name];
907   _t_Proxy_Salome_file_map * _proxy_map = _Proxy_Output_Service_file_map[service_name];
908   _t_IOR_Proxy_Salome_file_map * _IOR_proxy_map = _IOR_Proxy_Output_Service_file_map[service_name];
909   
910   pthread_mutex_lock(deploy_mutex);
911   std::string proxy_ior;
912
913   // Try to find the Salome_file ...
914   _Salome_file_map_it = _map->find(Salome_file_name);
915   Engines::Parallel_Salome_file_proxy_impl * proxy;
916   if (_Salome_file_map_it ==  _map->end()) {
917
918     // We create a new PaCO++ object.
919     // He has the same configuration than
920     // his component
921
922     // Firstly, we have to create the proxy object
923     // of the Salome_file and transmit his
924     // reference to the other nodes.
925     if (getMyRank() == 0) {
926         proxy = new Engines::Parallel_Salome_file_proxy_impl(CORBA::ORB::_duplicate(_orb),
927                                                              new paco_omni_fabrique());
928       proxy->copyGlobalContext(this); 
929       PaCO::PacoTopology_t serveur_topo;
930       serveur_topo.total = getTotalNode();
931       proxy->setTopology(serveur_topo);
932
933       // We register the CORBA objet into the POA
934       CORBA::Object_ptr proxy_ref = proxy->_this();
935
936       // We send the reference to all the nodes...
937       Engines::Parallel_Component_var component_proxy = 
938         Engines::Parallel_Component::_narrow(InterfaceParallel_impl::_proxy);
939       component_proxy->send_parallel_proxy_object(proxy_ref);
940
941       // Adding proxy into the map
942       (*_proxy_map)[Salome_file_name] = proxy;
943     }
944     else {
945       this->wait_parallel_object_proxy();
946     }
947
948     proxy_ior = this->get_parallel_proxy_object();
949     (*_IOR_proxy_map)[Salome_file_name] = proxy_ior;
950
951     // We register each node of the parallel Salome_file object
952     // into the proxy.
953     for (int i = 0; i < getTotalNode(); i++) {
954       if (i ==  getMyRank()) {
955         Parallel_Salome_file_i * servant = 
956           new Parallel_Salome_file_i(CORBA::ORB::_duplicate(_orb), 
957                                      proxy_ior.c_str(),
958                                      i);
959         servant->copyGlobalContext(this); 
960         
961         // We register the CORBA objet into the POA
962         servant->POA_PaCO::InterfaceParallel::_this();
963
964         // Register the servant
965         servant->deploy();
966
967         // Adding servant to the map
968         (*_map)[Salome_file_name] = servant;
969       }
970
971       _my_com->paco_barrier();
972       // start parallel object
973       if (getMyRank() == 0) {
974         proxy->start();
975         _my_com->paco_barrier();
976       }
977       else
978         _my_com->paco_barrier();
979     }
980
981     // Parallel_Salome_file is created and deployed
982     delete _proxy;
983     _proxy = NULL;
984   }
985   pthread_mutex_unlock(deploy_mutex);
986   proxy_ior = (*_IOR_proxy_map)[Salome_file_name];
987   CORBA::Object_ptr proxy_ref = _orb->string_to_object(proxy_ior.c_str());
988   return Engines::Salome_file::_narrow(proxy_ref);
989 }
990
991 Engines::Salome_file_ptr 
992 Engines_Parallel_Component_i::getInputFileToService(const char* service_name, 
993                                                     const char* Salome_file_name) 
994 {
995   // Try to find the service, if it doesn't exist, we throw an exception.
996   _Proxy_Service_file_map_it = _Proxy_Input_Service_file_map.find(service_name);
997   if (_Proxy_Service_file_map_it ==  _Proxy_Input_Service_file_map.end()) {
998     SALOME::ExceptionStruct es;
999     es.type = SALOME::INTERNAL_ERROR;
1000     es.text = "service doesn't have salome files";
1001     throw SALOME::SALOME_Exception(es);
1002   }
1003   _t_Proxy_Salome_file_map * _map = _Proxy_Input_Service_file_map[service_name];
1004
1005   // Try to find the Salome_file ...
1006   _Proxy_Salome_file_map_it = _map->find(Salome_file_name);
1007   if (_Proxy_Salome_file_map_it ==  _map->end()) {
1008     SALOME::ExceptionStruct es;
1009     es.type = SALOME::INTERNAL_ERROR;
1010     es.text = "service doesn't have this Salome_file";
1011     throw SALOME::SALOME_Exception(es);
1012   }
1013
1014   // Client get the proxy object
1015   Engines::Parallel_Salome_file_proxy_impl * Sfile = (*_map)[Salome_file_name];
1016   return Sfile->_this();
1017 }
1018
1019 Engines::Salome_file_ptr 
1020 Engines_Parallel_Component_i::getOutputFileToService(const char* service_name, 
1021                                                      const char* Salome_file_name) 
1022 {
1023   // Try to find the service, if it doesn't exist, we throw an exception.
1024   _Proxy_Service_file_map_it = _Proxy_Output_Service_file_map.find(service_name);
1025   if (_Proxy_Service_file_map_it ==  _Proxy_Output_Service_file_map.end()) {
1026     SALOME::ExceptionStruct es;
1027     es.type = SALOME::INTERNAL_ERROR;
1028     es.text = "service doesn't have salome files";
1029     throw SALOME::SALOME_Exception(es);
1030   }
1031   _t_Proxy_Salome_file_map * _map = _Proxy_Output_Service_file_map[service_name];
1032
1033   // Try to find the Salome_file ...
1034   _Proxy_Salome_file_map_it = _map->find(Salome_file_name);
1035   if (_Proxy_Salome_file_map_it ==  _map->end()) {
1036     SALOME::ExceptionStruct es;
1037     es.type = SALOME::INTERNAL_ERROR;
1038     es.text = "service doesn't have this Salome_file";
1039     throw SALOME::SALOME_Exception(es);
1040   }
1041
1042   // Client get the proxy object
1043   Engines::Parallel_Salome_file_proxy_impl * Sfile = (*_map)[Salome_file_name];
1044   return Sfile->_this();
1045 }
1046
1047
1048 void 
1049 Engines_Parallel_Component_i::checkInputFilesToService(const char* service_name) 
1050 {
1051   // Try to find the service, if it doesn't exist, nothing to do.
1052   _Proxy_Service_file_map_it = _Proxy_Input_Service_file_map.find(service_name);
1053   if (_Proxy_Service_file_map_it !=  _Proxy_Input_Service_file_map.end()) {
1054     _t_Proxy_Salome_file_map * _proxy_map = _Proxy_Input_Service_file_map[service_name];
1055     _t_Proxy_Salome_file_map::iterator begin = _proxy_map->begin();
1056     _t_Proxy_Salome_file_map::iterator end = _proxy_map->end();
1057
1058     for(;begin!=end;begin++) {
1059       Engines::Parallel_Salome_file_proxy_impl * file = begin->second;
1060       std::string file_port_name = begin->first;
1061       configureSalome_file(service_name, file_port_name, file);
1062       file->recvFiles();
1063     }
1064   }
1065 }
1066
1067 void 
1068 Engines_Parallel_Component_i::checkOutputFilesToService(const char* service_name) 
1069 {
1070   // Try to find the service, if it doesn't exist, nothing to do.
1071   _Proxy_Service_file_map_it = _Proxy_Output_Service_file_map.find(service_name);
1072   if (_Proxy_Service_file_map_it !=  _Proxy_Output_Service_file_map.end()) {
1073     _t_Proxy_Salome_file_map * _map = _Proxy_Output_Service_file_map[service_name];
1074     _t_Proxy_Salome_file_map::iterator begin = _map->begin();
1075     _t_Proxy_Salome_file_map::iterator end = _map->end();
1076
1077     for(;begin!=end;begin++) {
1078       Engines::Parallel_Salome_file_proxy_impl * file = begin->second;
1079       std::string file_port_name = begin->first;
1080       configureSalome_file(service_name, file_port_name, file);
1081       file->recvFiles();
1082     }
1083   }
1084
1085 }
1086
1087 //=============================================================================
1088 /*! 
1089  *  C++ method: Used by the Parallel Component to deploy a Parallel Salome_file
1090  */ 
1091 //=============================================================================
1092 void 
1093 Engines_Parallel_Component_i::send_parallel_proxy_object(CORBA::Object_ptr proxy_ref) {
1094   _proxy = _orb->object_to_string(proxy_ref);
1095 }
1096
1097 //=============================================================================
1098 /*! 
1099  *  C++ method: Used by the Parallel Component to deploy a Parallel Salome_file
1100  */ 
1101 //=============================================================================
1102 void 
1103 Engines_Parallel_Component_i::wait_parallel_object_proxy() {
1104   char * proxy = NULL;
1105   proxy =  get_parallel_proxy_object();
1106   while(proxy == NULL)
1107   {
1108     sleep(1);
1109     proxy = get_parallel_proxy_object();
1110   }
1111 }
1112
1113 //=============================================================================
1114 /*! 
1115  *  C++ method: Used by the Parallel Component to deploy a Parallel Salome_file
1116  */ 
1117 //=============================================================================
1118 char * 
1119 Engines_Parallel_Component_i::get_parallel_proxy_object() {
1120   return _proxy;
1121 }
1122
1123
1124 //=============================================================================
1125 /*! 
1126  *  C++ method: used to configure the Salome_file into the runtime.
1127  *  \param service_name name of the service that use this Salome_file
1128  *  \param file_port_name name of the Salome_file
1129  *  \param file Parallel Salome_file C++ object
1130  */
1131 //=============================================================================
1132 void
1133 Engines_Parallel_Component_i::configureSalome_file(std::string service_name,
1134                                                    std::string file_port_name,
1135                                                    Engines::Parallel_Salome_file_proxy_impl * file) 
1136 {
1137   // By default this method does nothing
1138 }
1139