Salome HOME
fab45e7032dccba174f5007b1d9c3a63ec02805c
[modules/kernel.git] / src / ParallelContainer / SALOME_ParallelComponent_i.cxx
1 //  SALOME_ParallelComponent : implementation of container and engine for Parallel Kernel
2 //
3 //  Copyright (C) 2007 OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 //  CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS 
5 // 
6 //  This library is free software; you can redistribute it and/or 
7 //  modify it under the terms of the GNU Lesser General Public 
8 //  License as published by the Free Software Foundation; either 
9 //  version 2.1 of the License. 
10 // 
11 //  This library is distributed in the hope that it will be useful, 
12 //  but WITHOUT ANY WARRANTY; without even the implied warranty of 
13 //  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
14 //  Lesser General Public License for more details. 
15 // 
16 //  You should have received a copy of the GNU Lesser General Public 
17 //  License along with this library; if not, write to the Free Software 
18 //  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA 
19 // 
20 //  See http://www.opencascade.org/SALOME/ or email : webmaster.salome@opencascade.org 
21 //
22 //  File   : SALOME_ParallelComponent_i.cxx
23 //  Author : AndrĂ© RIBES, EDF
24 //  Author : Paul RASCLE, EDF - MARC TAJCHMAN, CEA
25
26 #include "SALOME_ParallelComponent_i.hxx"
27 #include "SALOME_ParallelContainer_i.hxx"
28
29 #include "OpUtil.hxx"
30 #include <stdio.h>
31 #ifndef WNT
32 #include <dlfcn.h>
33 #endif
34 #include <cstdlib>
35 #include "utilities.h"
36
37 #ifndef WNT
38 #include <sys/time.h>
39 #include <sys/resource.h>
40 #include <unistd.h>
41 #else
42 #include <sys/timeb.h>
43 int SIGUSR11 = 1000;
44 #endif
45
46 #include <paco_dummy.h>
47 #include <paco_omni.h>
48
49 using namespace std;
50
51 extern bool _Sleeping ;
52 static Engines_Parallel_Component_i * theEngines_Component ;
53
54 bool Engines_Parallel_Component_i::_isMultiStudy = true;
55 bool Engines_Parallel_Component_i::_isMultiInstance = false;
56
57 //=============================================================================
58 /*! 
59  *  Default constructor, not for use
60  */
61 //=============================================================================
62
63 Engines_Parallel_Component_i::Engines_Parallel_Component_i(CORBA::ORB_ptr orb, char * ior, int rank) : 
64   InterfaceParallel_impl(orb,ior,rank), 
65   Engines::Component_serv(orb,ior,rank), 
66   Engines::Parallel_Component_serv(orb,ior,rank)
67 {
68   //ASSERT(0);
69   INFOS("Default Constructor...");
70 }
71
72 //=============================================================================
73 /*! 
74  *  Standard Constructor for generic Component, used in derived class
75  *  Connection to Registry and Notification
76  *  \param orb Object Request broker given by Container
77  *  \parap poa Portable Object Adapter from Container (normally root_poa)
78  *  \param contId container CORBA id inside the server
79  *  \param instanceName unique instance name for this object (see Container_i)
80  *  \param interfaceName component class name
81  *  \param notif use of notification
82  */
83 //=============================================================================
84
85 Engines_Parallel_Component_i::Engines_Parallel_Component_i(CORBA::ORB_ptr orb, char * ior, int rank,
86                                          PortableServer::POA_ptr poa, 
87                                          PortableServer::ObjectId * contId, 
88                                          const char *instanceName,
89                                          const char *interfaceName,
90                                          bool notif) :
91   InterfaceParallel_impl(orb,ior,rank), 
92   Engines::Component_serv(orb,ior,rank),
93   Engines::Parallel_Component_serv(orb,ior,rank),
94   _instanceName(instanceName),
95   _interfaceName(interfaceName),
96   _myConnexionToRegistry(0),
97   _ThreadId(0) ,
98   _ThreadCpuUsed(0) ,
99   _Executed(false) ,
100   _graphName("") ,
101   _nodeName(""),
102  _studyId(-1)
103 {
104   MESSAGE("Component constructor with instanceName "<< _instanceName);
105   //SCRUTE(pd_refCount);
106   _orb = CORBA::ORB::_duplicate(orb);
107   _poa = PortableServer::POA::_duplicate(poa);
108   _contId = contId ;
109   CORBA::Object_var o = _poa->id_to_reference(*contId); // container ior...
110   const CORBA::String_var the_ior = _orb->object_to_string(o);
111   _myConnexionToRegistry = new RegistryConnexion(0, 0, the_ior,"theSession",
112                                                  _instanceName.c_str());
113
114   _notifSupplier = new NOTIFICATION_Supplier(instanceName, notif);
115
116   deploy_mutex = new pthread_mutex_t();
117   pthread_mutex_init(deploy_mutex, NULL);
118   _proxy = NULL;
119  //SCRUTE(pd_refCount);
120 }
121
122 //=============================================================================
123 /*! 
124  *  Destructor: call Container for decrement of instances count.
125  *  When instances count falls to 0, the container tries to remove the
126  *  component library (dlclose)
127  */
128 //=============================================================================
129
130 Engines_Parallel_Component_i::~Engines_Parallel_Component_i()
131 {
132   MESSAGE("Component destructor");
133   Engines_Parallel_Container_i::decInstanceCnt(_interfaceName);
134   pthread_mutex_destroy(deploy_mutex);
135   delete deploy_mutex;
136
137   if (_proxy)
138     delete _proxy;
139 }
140
141 //=============================================================================
142 /*! 
143  *  CORBA method: return name of the instance, unique in this Container
144  */
145 //=============================================================================
146
147 char* Engines_Parallel_Component_i::instanceName()
148 {
149    return CORBA::string_dup(_instanceName.c_str()) ;
150 }
151
152 //=============================================================================
153 /*! 
154  *  CORBA method: return name of the component class
155  */
156 //=============================================================================
157
158 char* Engines_Parallel_Component_i::interfaceName()
159 {
160   return CORBA::string_dup(_interfaceName.c_str()) ;
161 }
162
163 //=============================================================================
164 /*! 
165  *  CORBA method: Get study Id
166  *  \return -1: not initialised (Internal Error)
167  *           0: multistudy component instance
168  *          >0: study id associated to this instance
169  */
170 //=============================================================================
171
172 CORBA::Long Engines_Parallel_Component_i::getStudyId()
173 {
174   return _studyId;
175 }
176
177 //=============================================================================
178 /*! 
179  *  CORBA method: Test if instance is alive and responds
180  */
181 //=============================================================================
182
183 void Engines_Parallel_Component_i::ping()
184 {
185   //  MESSAGE("Engines_Parallel_Component_i::ping_c() pid "<< getpid() << " threadid " << pthread_self());
186 }
187
188 //=============================================================================
189 /*! 
190  *  CORBA method: Deactivate this instance. CORBA object is deactivated (do not
191  *  respond any more to CORBA calls), the connection to Regsitry is removed
192  *  (Registry informed of deactivation), internal server reference counter on
193  *  the derived servant class is decremented, to allow destruction of the class
194  *  (delete) by POA, when there are no more references.
195  *  -- TO BE USED BY CONTAINER ONLY (Container housekeeping) --
196  */
197 //=============================================================================
198
199 void Engines_Parallel_Component_i::destroy()
200 {
201   MESSAGE("Engines_Parallel_Component_i::destroy()");
202   //SCRUTE(pd_refCount);
203
204   delete _notifSupplier;
205   _notifSupplier = 0;
206
207   delete _myConnexionToRegistry;
208   _myConnexionToRegistry = 0 ;
209   _poa->deactivate_object(*_id) ;
210   CORBA::release(_poa) ;
211   delete(_id) ;
212   //SCRUTE(pd_refCount);
213   _thisObj->_remove_ref();
214   //SCRUTE(pd_refCount);
215   MESSAGE("Engines_Parallel_Component_i::destroyed") ;
216 }
217
218 //=============================================================================
219 /*! 
220  *  CORBA method: return CORBA reference of the Container
221  *
222  */
223 //=============================================================================
224
225 Engines::Container_ptr Engines_Parallel_Component_i::GetContainerRef()
226 {
227   MESSAGE("Engines_Parallel_Component_i::GetContainerRef");
228   CORBA::Object_ptr o = _poa->id_to_reference(*_contId) ;
229   return Engines::Container::_narrow(o);
230 }
231
232 //=============================================================================
233 /*! 
234  *  CORBA method: 
235  *  Gives a sequence of (key=string,value=any) to the component. 
236  *  Base class component stores the sequence in a map.
237  *  The map is cleared before.
238  *  This map is for use by derived classes. 
239  *  \param dico sequence of (key=string,value=any)
240  */
241 //=============================================================================
242
243 void Engines_Parallel_Component_i::setProperties(const Engines::FieldsDict& dico)
244 {
245   _fieldsDict.clear();
246   for (CORBA::ULong i=0; i<dico.length(); i++)
247     {
248       std::string cle(dico[i].key);
249       _fieldsDict[cle] = dico[i].value;
250     }
251 }
252
253 //=============================================================================
254 /*! 
255  *  CORBA method: 
256  *  returns a previously stored map (key=string,value=any) as a sequence.
257  *  (see setProperties)
258  */
259 //=============================================================================
260
261 Engines::FieldsDict* Engines_Parallel_Component_i::getProperties()
262 {
263   Engines::FieldsDict_var copie = new Engines::FieldsDict;
264   copie->length(_fieldsDict.size());
265   map<std::string,CORBA::Any>::iterator it;
266   CORBA::ULong i = 0;
267   for (it = _fieldsDict.begin(); it != _fieldsDict.end(); it++, i++)
268     {
269       std::string cle((*it).first);
270       copie[i].key = CORBA::string_dup(cle.c_str());
271       copie[i].value = _fieldsDict[cle];
272     }
273   return copie._retn();
274 }
275
276 //=============================================================================
277 /*! 
278  *  CORBA method: used by Supervision to give names to this instance
279  */
280 //=============================================================================
281
282 void Engines_Parallel_Component_i::Names( const char * graphName ,
283                                  const char * nodeName )
284 {
285   _graphName = graphName ;
286   _nodeName = nodeName ;
287   INFOS("Engines_Parallel_Component_i::Names( '" << _graphName << "' , '"
288         << _nodeName << "' )");
289 }
290
291 //=============================================================================
292 /*! 
293  *  CORBA method: used in Supervision
294  */
295 //=============================================================================
296
297 bool Engines_Parallel_Component_i::Kill_impl() 
298 {
299 //  MESSAGE("Engines_Parallel_Component_i::Kill_i() pthread_t "<< pthread_self()
300 //          << " pid " << getpid() << " instanceName "
301 //          << _instanceName.c_str() << " interface " << _interfaceName.c_str()
302 //          << " machineName " << GetHostname().c_str()<< " _id " << hex << _id
303 //          << dec << " _ThreadId " << _ThreadId << " this " << hex << this
304 //          << dec ) ;
305
306   bool RetVal = false ;
307 #ifndef WNT
308   if ( _ThreadId > 0 && pthread_self() != _ThreadId )
309     {
310       RetVal = Killer( _ThreadId , 0 ) ;
311       _ThreadId = (pthread_t ) -1 ;
312     }
313
314 #else
315   if ( _ThreadId > 0 && pthread_self().p != _ThreadId->p )
316     {
317       RetVal = Killer( *_ThreadId , 0 ) ;
318       _ThreadId = (pthread_t* ) 0 ;
319     }
320
321 #endif
322   return RetVal ;
323 }
324
325 //=============================================================================
326 /*! 
327  *  CORBA method: used in Supervision
328  */
329 //=============================================================================
330
331 bool Engines_Parallel_Component_i::Stop_impl()
332 {
333   MESSAGE("Engines_Parallel_Component_i::Stop_i() pthread_t "<< pthread_self()
334           << " pid " << getpid() << " instanceName "
335           << _instanceName.c_str() << " interface " << _interfaceName.c_str()
336           << " machineName " << GetHostname().c_str()<< " _id " << hex << _id
337           << dec << " _ThreadId " << _ThreadId );
338   
339
340   bool RetVal = false ;
341 #ifndef WNT
342   if ( _ThreadId > 0 && pthread_self() != _ThreadId )
343     {
344       RetVal = Killer( _ThreadId , 0 ) ;
345       _ThreadId = (pthread_t ) -1 ;
346     }
347 #else
348   if ( _ThreadId > 0 && pthread_self().p != _ThreadId->p )
349     {
350       RetVal = Killer( *_ThreadId , 0 ) ;
351       _ThreadId = (pthread_t* ) 0 ;
352     }
353 #endif
354   return RetVal ;
355 }
356
357 //=============================================================================
358 /*! 
359  *  CORBA method: used in Supervision
360  */
361 //=============================================================================
362
363 bool Engines_Parallel_Component_i::Suspend_impl()
364 {
365   MESSAGE("Engines_Parallel_Component_i::Suspend_i() pthread_t "<< pthread_self()
366           << " pid " << getpid() << " instanceName "
367           << _instanceName.c_str() << " interface " << _interfaceName.c_str()
368           << " machineName " << GetHostname().c_str()<< " _id " << hex << _id
369           << dec << " _ThreadId " << _ThreadId );
370
371   bool RetVal = false ;
372 #ifndef WNT
373   if ( _ThreadId > 0 && pthread_self() != _ThreadId )
374 #else
375   if ( _ThreadId > 0 && pthread_self().p != _ThreadId->p )
376 #endif
377     {
378       if ( _Sleeping )
379         {
380           return false ;
381         }
382     else 
383       {
384 #ifndef WNT
385         RetVal = Killer( _ThreadId ,SIGINT ) ;
386 #else
387         RetVal = Killer( *_ThreadId ,SIGINT ) ;
388 #endif
389         //if ( RetVal ) _Sleeping = true;
390
391       }
392     }
393   return RetVal ;
394 }
395
396 //=============================================================================
397 /*! 
398  *  CORBA method: used in Supervision
399  */
400 //=============================================================================
401
402 bool Engines_Parallel_Component_i::Resume_impl()
403 {
404   MESSAGE("Engines_Parallel_Component_i::Resume_i() pthread_t "<< pthread_self()
405           << " pid " << getpid() << " instanceName "
406           << _instanceName.c_str() << " interface " << _interfaceName.c_str()
407           << " machineName " << GetHostname().c_str()<< " _id " << hex << _id
408           << dec << " _ThreadId " << _ThreadId );
409   bool RetVal = false ;
410 #ifndef WNT
411   if ( _ThreadId > 0 && pthread_self() != _ThreadId )
412 #else
413   if ( _ThreadId > 0 && pthread_self().p != _ThreadId->p )
414 #endif
415     {
416     if ( _Sleeping ) 
417       {
418         _Sleeping = false ;
419         RetVal = true ;
420       }
421     else
422       {
423         RetVal = false ;
424       }
425     }
426   return RetVal ;
427 }
428
429 //=============================================================================
430 /*! 
431  *  CORBA method: 
432  */
433 //=============================================================================
434
435 CORBA::Long Engines_Parallel_Component_i::CpuUsed_impl()
436 {
437   long cpu = 0 ;
438   if ( _ThreadId || _Executed )
439     {
440     if ( _ThreadId > 0 )
441       {
442 #ifndef WNT
443       if ( pthread_self() != _ThreadId )
444 #else
445       if ( pthread_self().p != _ThreadId->p )
446 #endif
447         {
448         if ( _Sleeping )
449           {
450           }
451         else
452           {
453             // Get Cpu in the appropriate thread with that object !...
454             theEngines_Component = this ;
455 #ifndef WNT
456             Killer( _ThreadId ,SIGUSR1 ) ;
457 #else
458             Killer( *_ThreadId ,SIGUSR11 ) ;
459 #endif
460           }
461         cpu = _ThreadCpuUsed ;
462         }
463       else
464         {
465           _ThreadCpuUsed = CpuUsed() ;
466           cpu = _ThreadCpuUsed ;
467           // cout << pthread_self() << " Engines_Parallel_Component_i::CpuUsed_impl "
468           //      << _serviceName << " " << cpu << endl ;
469       }
470     }
471     else 
472       {
473         cpu = _ThreadCpuUsed ;
474         // cout << pthread_self() << " Engines_Parallel_Component_i::CpuUsed_impl "
475         //      << _serviceName << " " << cpu<< endl ;
476       }
477     }
478   else
479     {
480       // cout<< pthread_self()<<"Engines_Parallel_Component_i::CpuUsed_impl _ThreadId "
481       //     <<_ThreadId <<" "<<_serviceName<<" _StartUsed "<<_StartUsed<<endl;
482     }
483   return cpu ;
484 }
485
486
487 //=============================================================================
488 /*! 
489  *  C++ method: return Container Servant
490  */
491 //=============================================================================
492
493 Engines_Parallel_Container_i *Engines_Parallel_Component_i::GetContainerPtr()
494 {
495   return dynamic_cast<Engines_Parallel_Container_i*>(_poa->id_to_servant(*_contId)) ;
496 }
497
498 //=============================================================================
499 /*! 
500  *  C++ method: set study Id
501  *  \param studyId         0 if instance is not associated to a study, 
502  *                         >0 otherwise (== study id)
503  *  \return true if the set of study Id is OK
504  *  must be set once by Container, at instance creation,
505  *  and cannot be changed after.
506  */
507 //=============================================================================
508
509 CORBA::Boolean Engines_Parallel_Component_i::setStudyId(CORBA::Long studyId)
510 {
511   ASSERT( studyId >= 0);
512   CORBA::Boolean ret = false;
513   if (_studyId < 0) // --- not yet initialized 
514     {
515       _studyId = studyId;
516       ret = true;
517     }
518   else
519     if ( _studyId == studyId) ret = true;
520   return ret;
521 }
522
523 //=============================================================================
524 /*! 
525  *  C++ method: return CORBA instance id, the id is set in derived class
526  *  constructor, when instance is activated.
527  */
528 //=============================================================================
529
530 PortableServer::ObjectId * Engines_Parallel_Component_i::getId()
531 {
532 //  MESSAGE("PortableServer::ObjectId * Engines_Parallel_Component_i::getId()");
533   return _id ;
534 }
535
536 //=============================================================================
537 /*! 
538  *  C++ method: used by derived classes for supervision
539  */
540 //=============================================================================
541
542 void Engines_Parallel_Component_i::beginService(const char *serviceName)
543 {
544   MESSAGE(pthread_self() << "Send BeginService notification for " <<serviceName
545           << endl << "Component instance : " << _instanceName << endl << endl);
546 #ifndef WNT
547   _ThreadId = pthread_self() ;
548 #else
549   _ThreadId = new pthread_t;
550   _ThreadId->p = pthread_self().p ;
551   _ThreadId->x = pthread_self().x ;
552 #endif
553   _StartUsed = 0 ;
554   _StartUsed = CpuUsed_impl() ;
555   _ThreadCpuUsed = 0 ;
556   _Executed = true ;
557   _serviceName = serviceName ;
558   if ( pthread_setcanceltype( PTHREAD_CANCEL_ASYNCHRONOUS , NULL ) )
559     {
560       perror("pthread_setcanceltype ") ;
561       exit(0) ;
562     }
563   if ( pthread_setcancelstate( PTHREAD_CANCEL_ENABLE , NULL ) )
564     {
565       perror("pthread_setcancelstate ") ;
566       exit(0) ;
567     }
568 //  MESSAGE(pthread_self() << " Return from BeginService for " << serviceName
569 //          << " ThreadId " << _ThreadId << " StartUsed " << _StartUsed
570 //          << " _graphName " << _graphName << " _nodeName " << _nodeName );
571
572   // --- for supervisor : all strings given with setProperties
573   //     are set in environment
574   bool overwrite = true;
575   map<std::string,CORBA::Any>::iterator it;
576   for (it = _fieldsDict.begin(); it != _fieldsDict.end(); it++)
577     {
578       std::string cle((*it).first);
579       if ((*it).second.type()->kind() == CORBA::tk_string)
580         {
581           const char* value;
582           (*it).second >>= value;
583           // ---todo: replace __GNUC__ test by an autoconf macro AC_CHECK_FUNC.
584 #if defined __GNUC__
585           //int ret = setenv(cle.c_str(), value, overwrite);
586           setenv(cle.c_str(), value, overwrite);
587 #else
588           //CCRT porting : setenv not defined in stdlib.h
589           std::string s(cle);
590           s+='=';
591           s+=value;
592           // char* cast because 1st arg of linux putenv function
593           // is not a const char* !
594           //int ret=putenv((char *)s.c_str());
595           putenv((char *)s.c_str());
596           //End of CCRT porting
597 #endif
598           MESSAGE("--- setenv: "<<cle<<" = "<< value);
599         }
600     }
601 }
602
603 //=============================================================================
604 /*! 
605  *  C++ method: used by derived classes for supervision
606  */
607 //=============================================================================
608
609 void Engines_Parallel_Component_i::endService(const char *serviceName)
610 {
611   _ThreadCpuUsed = CpuUsed_impl() ;
612   MESSAGE(pthread_self() << " Send EndService notification for " << serviceName
613           << endl << " Component instance : " << _instanceName << " StartUsed "
614           << _StartUsed << " _ThreadCpuUsed "<< _ThreadCpuUsed << endl <<endl);
615   _ThreadId = 0 ;
616 }
617
618 //=============================================================================
619 /*! 
620  *  C++ method: -- CHECK IF USED --
621  */
622 //=============================================================================
623
624 char* Engines_Parallel_Component_i::graphName()
625 {
626   return CORBA::string_dup( _graphName.c_str() ) ;
627 }
628
629 //=============================================================================
630 /*! 
631  *  C++ method: -- CHECK IF USED --
632  */
633 //=============================================================================
634
635 char* Engines_Parallel_Component_i::nodeName()
636 {
637   return CORBA::string_dup( _nodeName.c_str() ) ;
638 }
639
640 //=============================================================================
641 /*! 
642  *  C++ method: used in Supervision (see kill_impl)
643  */
644 //=============================================================================
645
646 bool Engines_Parallel_Component_i::Killer( pthread_t ThreadId , int signum )
647 {
648 #ifndef WNT
649   if ( ThreadId )
650 #else
651   if ( ThreadId.p )
652 #endif
653     {
654       if ( signum == 0 )
655         {
656           if ( pthread_cancel( ThreadId ) )
657             {
658               perror("Killer pthread_cancel error") ;
659               return false ;
660             }
661           else
662             {
663               MESSAGE(pthread_self() << "Killer : ThreadId " << ThreadId
664                       << " pthread_canceled") ;
665             }
666         }
667       else
668         {
669           if ( pthread_kill( ThreadId , signum ) == -1 )
670             {
671               perror("Killer pthread_kill error") ;
672               return false ;
673             }
674           else 
675             {
676               MESSAGE(pthread_self() << "Killer : ThreadId " << ThreadId
677                       << " pthread_killed(" << signum << ")") ;
678             }
679         }
680     }
681   return true ;
682 }
683
684 //=============================================================================
685 /*! 
686  *  C++ method:
687  */ 
688 //=============================================================================
689
690 void SetCpuUsed()
691 {
692   theEngines_Component->SetCurCpu() ;
693 }
694
695 //=============================================================================
696 /*! 
697  *  C++ method:
698  */
699 //=============================================================================
700
701 void Engines_Parallel_Component_i::SetCurCpu()
702 {
703   _ThreadCpuUsed =  CpuUsed() ;
704   //  MESSAGE(pthread_self() << 
705   //  " Engines_Parallel_Component_i::SetCurCpu() _ThreadCpuUsed " << _ThreadCpuUsed) ;
706 }
707
708 //=============================================================================
709 /*! 
710  *  C++ method:
711  */
712 //=============================================================================
713
714 long Engines_Parallel_Component_i::CpuUsed()
715 {
716   long cpu = 0 ;
717 #ifndef WNT
718   struct rusage usage ;
719   if ( _ThreadId || _Executed )
720     {
721       if ( getrusage( RUSAGE_SELF , &usage ) == -1 )
722         {
723           perror("Engines_Parallel_Component_i::CpuUsed") ;
724           return 0 ;
725         }
726       cpu = usage.ru_utime.tv_sec - _StartUsed ;
727       // cout << pthread_self() << " Engines_Parallel_Component_i::CpuUsed " << " "
728       //      << _serviceName   << usage.ru_utime.tv_sec << " - " << _StartUsed
729       //      << " = " << cpu << endl ;
730     }
731   else
732     {
733       // cout << pthread_self() << "Engines_Parallel_Component_i::CpuUsed _ThreadId "
734       //      << _ThreadId << " " << _serviceName<< " _StartUsed " 
735       //      << _StartUsed << endl ;
736     }
737 #else
738         // NOT implementet yet
739 #endif
740
741
742   return cpu ;
743 }
744
745 //=============================================================================
746 /*! 
747  *  C++ method: Send message to event channel
748  */
749 //=============================================================================
750
751 void Engines_Parallel_Component_i::sendMessage(const char *event_type,
752                                       const char *message)
753 {
754     _notifSupplier->Send(graphName(), nodeName(), event_type, message);
755 }
756
757 //=============================================================================
758 /*! 
759  *  C++ method: return standard library name built on component name
760  */
761 //=============================================================================
762
763 string Engines_Parallel_Component_i::GetDynLibraryName(const char *componentName)
764 {
765   string ret="lib";
766   ret+=componentName;
767   ret+="Engine.so";
768   return ret;
769 }
770
771 //=============================================================================
772 /*! 
773  *  C++ method: DumpPython default implementation
774  */
775 //=============================================================================
776
777 Engines::TMPFile* Engines_Parallel_Component_i::DumpPython(CORBA::Object_ptr theStudy, 
778                                                            CORBA::Boolean isPublished, 
779                                                            CORBA::Boolean& isValidScript)
780 {
781   const char* aScript = "def RebuildData(theStudy): pass";
782   char* aBuffer = new char[strlen(aScript)+1];
783   strcpy(aBuffer, aScript);
784   CORBA::Octet* anOctetBuf =  (CORBA::Octet*)aBuffer;
785   int aBufferSize = strlen(aBuffer)+1;
786   Engines::TMPFile_var aStreamFile = new Engines::TMPFile(aBufferSize, aBufferSize, anOctetBuf, 1); 
787   isValidScript = true;
788   return aStreamFile._retn(); 
789 }
790
791
792 Engines::Salome_file_ptr 
793 Engines_Parallel_Component_i::setInputFileToService(const char* service_name, 
794                                                     const char* Salome_file_name) 
795 {
796   // Try to find the service, if it doesn't exist, we add it.
797   _Service_file_map_it = _Input_Service_file_map.find(service_name);
798   if (_Service_file_map_it ==  _Input_Service_file_map.end()) {
799     _t_Salome_file_map * _map = new _t_Salome_file_map();
800     _Input_Service_file_map[service_name] = _map;
801     _t_Proxy_Salome_file_map * _proxy_map = new _t_Proxy_Salome_file_map();
802     _Proxy_Input_Service_file_map[service_name] = _proxy_map;
803     _t_IOR_Proxy_Salome_file_map * _IOR_proxy_map = new _t_IOR_Proxy_Salome_file_map();
804     _IOR_Proxy_Input_Service_file_map[service_name] = _IOR_proxy_map;
805   }
806   _t_Salome_file_map * _map = _Input_Service_file_map[service_name];
807   _t_Proxy_Salome_file_map * _proxy_map = _Proxy_Input_Service_file_map[service_name];
808   _t_IOR_Proxy_Salome_file_map * _IOR_proxy_map = _IOR_Proxy_Input_Service_file_map[service_name];
809   
810   pthread_mutex_lock(deploy_mutex);
811   std::string proxy_ior;
812
813   // Try to find the Salome_file ...
814   _Salome_file_map_it = _map->find(Salome_file_name);
815   if (_Salome_file_map_it ==  _map->end()) {
816
817     // We create a new PaCO++ object.
818     // He has the same configuration than
819     // his component
820
821     // Firstly, we have to create the proxy object
822     // of the Salome_file and transmit his
823     // reference to the other nodes.
824     Engines::Parallel_Salome_file_proxy_impl * proxy = NULL;
825     if (getMyRank() == 0) {
826       proxy = new Engines::Parallel_Salome_file_proxy_impl(CORBA::ORB::_duplicate(_orb),
827                                                            new paco_omni_fabrique());
828       proxy->copyGlobalContext(this); 
829       PaCO::PacoTopology_t serveur_topo;
830       serveur_topo.total = getTotalNode();
831       proxy->setTopology(serveur_topo);
832
833       // We register the CORBA objet into the POA
834       CORBA::Object_ptr proxy_ref = proxy->_this();
835
836       // We send the reference to all the nodes...
837       Engines::Parallel_Component_var component_proxy = 
838         Engines::Parallel_Component::_narrow(InterfaceParallel_impl::_proxy);
839       component_proxy->send_parallel_proxy_object(proxy_ref);
840
841       // Adding proxy into the map
842       (*_proxy_map)[Salome_file_name] = proxy;
843     }
844     else {
845       this->wait_parallel_object_proxy();
846     }
847
848     proxy_ior = this->get_parallel_proxy_object();
849     (*_IOR_proxy_map)[Salome_file_name] = proxy_ior;
850
851     // We register each node of the parallel Salome_file object
852     // into the proxy.
853     for (int i = 0; i < getTotalNode(); i++) {
854       if (i ==  getMyRank()) {
855         Parallel_Salome_file_i * servant = 
856           new Parallel_Salome_file_i(CORBA::ORB::_duplicate(_orb), 
857                                      proxy_ior.c_str(),
858                                      i);
859         servant->copyGlobalContext(this); 
860         
861         // We register the CORBA objet into the POA
862         servant->POA_PaCO::InterfaceParallel::_this();
863
864         // Register the servant
865         servant->deploy();
866
867         // Adding servant to the map
868         (*_map)[Salome_file_name] = servant;
869       }
870
871       _my_com->paco_barrier();
872       // start parallel object
873       if (getMyRank() == 0) {
874         proxy->start();
875         _my_com->paco_barrier();
876       }
877       else
878         _my_com->paco_barrier();
879     }
880     // Parallel_Salome_file is created and deployed
881     delete _proxy;
882     _proxy = NULL;
883   }
884
885   pthread_mutex_unlock(deploy_mutex);
886   proxy_ior = (*_IOR_proxy_map)[Salome_file_name];
887   CORBA::Object_ptr proxy_ref = _orb->string_to_object(proxy_ior.c_str());
888   return Engines::Salome_file::_narrow(proxy_ref);
889 }
890
891 Engines::Salome_file_ptr 
892 Engines_Parallel_Component_i::setOutputFileToService(const char* service_name, 
893                                                      const char* Salome_file_name) 
894 {
895   // Try to find the service, if it doesn't exist, we add it.
896   _Service_file_map_it = _Output_Service_file_map.find(service_name);
897   if (_Service_file_map_it ==  _Output_Service_file_map.end()) {
898     _t_Salome_file_map * _map = new _t_Salome_file_map();
899     _Output_Service_file_map[service_name] = _map;
900     _t_Proxy_Salome_file_map * _proxy_map = new _t_Proxy_Salome_file_map();
901     _Proxy_Output_Service_file_map[service_name] = _proxy_map;
902     _t_IOR_Proxy_Salome_file_map * _IOR_proxy_map = new _t_IOR_Proxy_Salome_file_map();
903     _IOR_Proxy_Output_Service_file_map[service_name] = _IOR_proxy_map;
904   }
905   _t_Salome_file_map * _map = _Output_Service_file_map[service_name];
906   _t_Proxy_Salome_file_map * _proxy_map = _Proxy_Output_Service_file_map[service_name];
907   _t_IOR_Proxy_Salome_file_map * _IOR_proxy_map = _IOR_Proxy_Output_Service_file_map[service_name];
908   
909   pthread_mutex_lock(deploy_mutex);
910   std::string proxy_ior;
911
912   // Try to find the Salome_file ...
913   _Salome_file_map_it = _map->find(Salome_file_name);
914   Engines::Parallel_Salome_file_proxy_impl * proxy;
915   if (_Salome_file_map_it ==  _map->end()) {
916
917     // We create a new PaCO++ object.
918     // He has the same configuration than
919     // his component
920
921     // Firstly, we have to create the proxy object
922     // of the Salome_file and transmit his
923     // reference to the other nodes.
924     if (getMyRank() == 0) {
925         proxy = new Engines::Parallel_Salome_file_proxy_impl(CORBA::ORB::_duplicate(_orb),
926                                                              new paco_omni_fabrique());
927       proxy->copyGlobalContext(this); 
928       PaCO::PacoTopology_t serveur_topo;
929       serveur_topo.total = getTotalNode();
930       proxy->setTopology(serveur_topo);
931
932       // We register the CORBA objet into the POA
933       CORBA::Object_ptr proxy_ref = proxy->_this();
934
935       // We send the reference to all the nodes...
936       Engines::Parallel_Component_var component_proxy = 
937         Engines::Parallel_Component::_narrow(InterfaceParallel_impl::_proxy);
938       component_proxy->send_parallel_proxy_object(proxy_ref);
939
940       // Adding proxy into the map
941       (*_proxy_map)[Salome_file_name] = proxy;
942     }
943     else {
944       this->wait_parallel_object_proxy();
945     }
946
947     proxy_ior = this->get_parallel_proxy_object();
948     (*_IOR_proxy_map)[Salome_file_name] = proxy_ior;
949
950     // We register each node of the parallel Salome_file object
951     // into the proxy.
952     for (int i = 0; i < getTotalNode(); i++) {
953       if (i ==  getMyRank()) {
954         Parallel_Salome_file_i * servant = 
955           new Parallel_Salome_file_i(CORBA::ORB::_duplicate(_orb), 
956                                      proxy_ior.c_str(),
957                                      i);
958         servant->copyGlobalContext(this); 
959         
960         // We register the CORBA objet into the POA
961         servant->POA_PaCO::InterfaceParallel::_this();
962
963         // Register the servant
964         servant->deploy();
965
966         // Adding servant to the map
967         (*_map)[Salome_file_name] = servant;
968       }
969
970       _my_com->paco_barrier();
971       // start parallel object
972       if (getMyRank() == 0) {
973         proxy->start();
974         _my_com->paco_barrier();
975       }
976       else
977         _my_com->paco_barrier();
978     }
979
980     // Parallel_Salome_file is created and deployed
981     delete _proxy;
982     _proxy = NULL;
983   }
984   pthread_mutex_unlock(deploy_mutex);
985   proxy_ior = (*_IOR_proxy_map)[Salome_file_name];
986   CORBA::Object_ptr proxy_ref = _orb->string_to_object(proxy_ior.c_str());
987   return Engines::Salome_file::_narrow(proxy_ref);
988 }
989
990 Engines::Salome_file_ptr 
991 Engines_Parallel_Component_i::getInputFileToService(const char* service_name, 
992                                                     const char* Salome_file_name) 
993 {
994   // Try to find the service, if it doesn't exist, we throw an exception.
995   _Proxy_Service_file_map_it = _Proxy_Input_Service_file_map.find(service_name);
996   if (_Proxy_Service_file_map_it ==  _Proxy_Input_Service_file_map.end()) {
997     SALOME::ExceptionStruct es;
998     es.type = SALOME::INTERNAL_ERROR;
999     es.text = "service doesn't have salome files";
1000     throw SALOME::SALOME_Exception(es);
1001   }
1002   _t_Proxy_Salome_file_map * _map = _Proxy_Input_Service_file_map[service_name];
1003
1004   // Try to find the Salome_file ...
1005   _Proxy_Salome_file_map_it = _map->find(Salome_file_name);
1006   if (_Proxy_Salome_file_map_it ==  _map->end()) {
1007     SALOME::ExceptionStruct es;
1008     es.type = SALOME::INTERNAL_ERROR;
1009     es.text = "service doesn't have this Salome_file";
1010     throw SALOME::SALOME_Exception(es);
1011   }
1012
1013   // Client get the proxy object
1014   Engines::Parallel_Salome_file_proxy_impl * Sfile = (*_map)[Salome_file_name];
1015   return Sfile->_this();
1016 }
1017
1018 Engines::Salome_file_ptr 
1019 Engines_Parallel_Component_i::getOutputFileToService(const char* service_name, 
1020                                                      const char* Salome_file_name) 
1021 {
1022   // Try to find the service, if it doesn't exist, we throw an exception.
1023   _Proxy_Service_file_map_it = _Proxy_Output_Service_file_map.find(service_name);
1024   if (_Proxy_Service_file_map_it ==  _Proxy_Output_Service_file_map.end()) {
1025     SALOME::ExceptionStruct es;
1026     es.type = SALOME::INTERNAL_ERROR;
1027     es.text = "service doesn't have salome files";
1028     throw SALOME::SALOME_Exception(es);
1029   }
1030   _t_Proxy_Salome_file_map * _map = _Proxy_Output_Service_file_map[service_name];
1031
1032   // Try to find the Salome_file ...
1033   _Proxy_Salome_file_map_it = _map->find(Salome_file_name);
1034   if (_Proxy_Salome_file_map_it ==  _map->end()) {
1035     SALOME::ExceptionStruct es;
1036     es.type = SALOME::INTERNAL_ERROR;
1037     es.text = "service doesn't have this Salome_file";
1038     throw SALOME::SALOME_Exception(es);
1039   }
1040
1041   // Client get the proxy object
1042   Engines::Parallel_Salome_file_proxy_impl * Sfile = (*_map)[Salome_file_name];
1043   return Sfile->_this();
1044 }
1045
1046
1047 void 
1048 Engines_Parallel_Component_i::checkInputFilesToService(const char* service_name) 
1049 {
1050   // Try to find the service, if it doesn't exist, nothing to do.
1051   _Proxy_Service_file_map_it = _Proxy_Input_Service_file_map.find(service_name);
1052   if (_Proxy_Service_file_map_it !=  _Proxy_Input_Service_file_map.end()) {
1053     _t_Proxy_Salome_file_map * _proxy_map = _Proxy_Input_Service_file_map[service_name];
1054     _t_Proxy_Salome_file_map::iterator begin = _proxy_map->begin();
1055     _t_Proxy_Salome_file_map::iterator end = _proxy_map->end();
1056
1057     for(;begin!=end;begin++) {
1058       Engines::Parallel_Salome_file_proxy_impl * file = begin->second;
1059       std::string file_port_name = begin->first;
1060       configureSalome_file(service_name, file_port_name, file);
1061       file->recvFiles();
1062     }
1063   }
1064 }
1065
1066 void 
1067 Engines_Parallel_Component_i::checkOutputFilesToService(const char* service_name) 
1068 {
1069   // Try to find the service, if it doesn't exist, nothing to do.
1070   _Proxy_Service_file_map_it = _Proxy_Output_Service_file_map.find(service_name);
1071   if (_Proxy_Service_file_map_it !=  _Proxy_Output_Service_file_map.end()) {
1072     _t_Proxy_Salome_file_map * _map = _Proxy_Output_Service_file_map[service_name];
1073     _t_Proxy_Salome_file_map::iterator begin = _map->begin();
1074     _t_Proxy_Salome_file_map::iterator end = _map->end();
1075
1076     for(;begin!=end;begin++) {
1077       Engines::Parallel_Salome_file_proxy_impl * file = begin->second;
1078       std::string file_port_name = begin->first;
1079       configureSalome_file(service_name, file_port_name, file);
1080       file->recvFiles();
1081     }
1082   }
1083
1084 }
1085
1086 //=============================================================================
1087 /*! 
1088  *  C++ method: Used by the Parallel Component to deploy a Parallel Salome_file
1089  */ 
1090 //=============================================================================
1091 void 
1092 Engines_Parallel_Component_i::send_parallel_proxy_object(CORBA::Object_ptr proxy_ref) {
1093   _proxy = _orb->object_to_string(proxy_ref);
1094 }
1095
1096 //=============================================================================
1097 /*! 
1098  *  C++ method: Used by the Parallel Component to deploy a Parallel Salome_file
1099  */ 
1100 //=============================================================================
1101 void 
1102 Engines_Parallel_Component_i::wait_parallel_object_proxy() {
1103   char * proxy = NULL;
1104   proxy =  get_parallel_proxy_object();
1105   while(proxy == NULL)
1106   {
1107     sleep(1);
1108     proxy = get_parallel_proxy_object();
1109   }
1110 }
1111
1112 //=============================================================================
1113 /*! 
1114  *  C++ method: Used by the Parallel Component to deploy a Parallel Salome_file
1115  */ 
1116 //=============================================================================
1117 char * 
1118 Engines_Parallel_Component_i::get_parallel_proxy_object() {
1119   return _proxy;
1120 }
1121
1122
1123 //=============================================================================
1124 /*! 
1125  *  C++ method: used to configure the Salome_file into the runtime.
1126  *  \param service_name name of the service that use this Salome_file
1127  *  \param file_port_name name of the Salome_file
1128  *  \param file Parallel Salome_file C++ object
1129  */
1130 //=============================================================================
1131 void
1132 Engines_Parallel_Component_i::configureSalome_file(std::string service_name,
1133                                                    std::string file_port_name,
1134                                                    Engines::Parallel_Salome_file_proxy_impl * file) 
1135 {
1136   // By default this method does nothing
1137 }
1138