Salome HOME
Fixed problem with dump study and small optimization:
[modules/kernel.git] / src / ParallelContainer / SALOME_ParallelComponent_i.cxx
1 // Copyright (C) 2007-2016  CEA/DEN, EDF R&D, OPEN CASCADE
2 //
3 // Copyright (C) 2003-2007  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 // CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
5 //
6 // This library is free software; you can redistribute it and/or
7 // modify it under the terms of the GNU Lesser General Public
8 // License as published by the Free Software Foundation; either
9 // version 2.1 of the License, or (at your option) any later version.
10 //
11 // This library is distributed in the hope that it will be useful,
12 // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 // Lesser General Public License for more details.
15 //
16 // You should have received a copy of the GNU Lesser General Public
17 // License along with this library; if not, write to the Free Software
18 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
19 //
20 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 //
22
23 //  SALOME_ParallelComponent : implementation of container and engine for Parallel Kernel
24 //  File   : SALOME_ParallelComponent_i.cxx
25 //  Author : Andr� RIBES, EDF
26 //  Author : Paul RASCLE, EDF - MARC TAJCHMAN, CEA
27 //
28 #include "SALOME_ParallelComponent_i.hxx"
29 #include "SALOME_ParallelContainer_i.hxx"
30
31 #include "OpUtil.hxx"
32 #include <stdio.h>
33 #ifndef WIN32
34 #include <dlfcn.h>
35 #endif
36 #include <cstdlib>
37 #include "utilities.h"
38 #include "Basics_Utils.hxx"
39
40 #ifndef WIN32
41 #include <sys/time.h>
42 #include <sys/resource.h>
43 #include <unistd.h>
44 #else
45 #include <sys/timeb.h>
46 int SIGUSR11 = 1000;
47 #endif
48
49 #include <paco_dummy.h>
50 #include <paco_omni.h>
51
52
53 extern bool _Sleeping ;
54 static Engines_Parallel_Component_i * theEngines_Component ;
55
56 bool Engines_Parallel_Component_i::_isMultiStudy = false;
57 bool Engines_Parallel_Component_i::_isMultiInstance = false;
58
59 //=============================================================================
60 /*! 
61  *  Standard Constructor for generic Component, used in derived class
62  *  Connection to Registry and Notification
63  *  \param orb Object Request broker given by Container
64  *  \parap poa Portable Object Adapter from Container (normally root_poa)
65  *  \param contId container CORBA id inside the server
66  *  \param instanceName unique instance name for this object (see Container_i)
67  *  \param interfaceName component class name
68  *  \param notif use of notification
69  */
70 //=============================================================================
71
72 Engines_Parallel_Component_i::Engines_Parallel_Component_i(CORBA::ORB_ptr orb, char * ior, int rank,
73                                          PortableServer::POA_ptr poa, 
74                                          PortableServer::ObjectId * contId, 
75                                          const char *instanceName,
76                                          const char *interfaceName,
77                                          bool notif,
78                                          bool regist) :
79   InterfaceParallel_impl(orb,ior,rank), 
80   Engines::EngineComponent_serv(orb,ior,rank),
81   Engines::EngineComponent_base_serv(orb,ior,rank),
82   Engines::Parallel_Component_serv(orb,ior,rank),
83   Engines::Parallel_Component_base_serv(orb,ior,rank),
84   _instanceName(instanceName),
85   _interfaceName(interfaceName),
86   _id(NULL),
87   _myConnexionToRegistry(0),
88   _ThreadId(0) ,
89   _ThreadCpuUsed(0) ,
90   _Executed(false) ,
91   _graphName("") ,
92   _nodeName(""),
93   _destroyed(false),
94   _CanceledThread(false)
95 {
96   MESSAGE("Parallel Component constructor with instanceName "<< _instanceName);
97   //SCRUTE(pd_refCount);
98   _orb = CORBA::ORB::_duplicate(orb);
99   _poa = PortableServer::POA::_duplicate(poa);
100   _contId = contId ;
101   CORBA::Object_var o = _poa->id_to_reference(*contId); // container ior...
102
103   if (regist)
104   {
105     CORBA::String_var the_ior = _orb->object_to_string(o);
106     _myConnexionToRegistry = new RegistryConnexion(0, 0, the_ior,"theSession",
107                                                    _instanceName.c_str());
108   }
109   _notifSupplier = new NOTIFICATION_Supplier(instanceName, notif);
110
111   deploy_mutex = new pthread_mutex_t();
112   pthread_mutex_init(deploy_mutex, NULL);
113   _proxy = NULL;
114  //SCRUTE(pd_refCount);
115 }
116
117 //=============================================================================
118 /*! 
119  *  Destructor: call Container for decrement of instances count.
120  *  When instances count falls to 0, the container tries to remove the
121  *  component library (dlclose)
122  */
123 //=============================================================================
124
125 Engines_Parallel_Component_i::~Engines_Parallel_Component_i()
126 {
127   MESSAGE("Parallel Component destructor");
128   Engines_Parallel_Container_i::decInstanceCnt(_interfaceName);
129   if(_myConnexionToRegistry)delete _myConnexionToRegistry;
130   if(_notifSupplier)delete _notifSupplier;
131   if (_id)
132     delete(_id);
133
134   pthread_mutex_destroy(deploy_mutex);
135   delete deploy_mutex;
136   if (_proxy)
137     delete _proxy;
138 }
139
140 //=============================================================================
141 /*! 
142  *  CORBA method: return name of the instance, unique in this Container
143  */
144 //=============================================================================
145
146 char* Engines_Parallel_Component_i::instanceName()
147 {
148    return CORBA::string_dup(_instanceName.c_str()) ;
149 }
150
151 //=============================================================================
152 /*! 
153  *  CORBA method: return name of the component class
154  */
155 //=============================================================================
156
157 char* Engines_Parallel_Component_i::interfaceName()
158 {
159   return CORBA::string_dup(_interfaceName.c_str()) ;
160 }
161
162 //=============================================================================
163 /*! 
164  *  CORBA method: Test if instance is alive and responds
165  */
166 //=============================================================================
167
168 void Engines_Parallel_Component_i::ping()
169 {
170 #ifndef WIN32
171   MESSAGE("Engines_Parallel_Component_i::ping() pid "<< getpid() << " threadid "
172           << pthread_self());
173 #else
174   MESSAGE("Engines_Parallel_Component_i::ping() pid "<< _getpid()<< " threadid "
175           << pthread_self().p );
176 #endif
177 }
178
179 //=============================================================================
180 /*! 
181  *  CORBA method: Deactivate this instance. CORBA object is deactivated (do not
182  *  respond any more to CORBA calls), the connection to Regsitry is removed
183  *  (Registry informed of deactivation), internal server reference counter on
184  *  the derived servant class is decremented, to allow destruction of the class
185  *  (delete) by POA, when there are no more references.
186  *  -- TO BE USED BY CONTAINER ONLY (Container housekeeping) --
187  */
188 //=============================================================================
189
190 void Engines_Parallel_Component_i::destroy()
191 {
192   MESSAGE("Engines_Parallel_Component_i::destroy()");
193   MESSAGE("Object Instance will be deleted when Shutdown of the container will be called");
194   if (!_destroyed)
195   {
196     _remove_ref();
197     _destroyed = true;
198   }
199 }
200
201 //=============================================================================
202 /*! 
203  *  CORBA method: return CORBA reference of the Container
204  *
205  */
206 //=============================================================================
207
208 Engines::Container_ptr Engines_Parallel_Component_i::GetContainerRef()
209 {
210   MESSAGE("Engines_Parallel_Component_i::GetContainerRef");
211   CORBA::Object_var o = _poa->id_to_reference(*_contId) ;
212   return Engines::Container::_narrow(o);
213 }
214
215 //=============================================================================
216 /*! 
217  *  CORBA method: 
218  *  Gives a sequence of (key=string,value=any) to the component. 
219  *  Base class component stores the sequence in a map.
220  *  The map is cleared before.
221  *  This map is for use by derived classes. 
222  *  \param dico sequence of (key=string,value=any)
223  */
224 //=============================================================================
225
226 void Engines_Parallel_Component_i::setProperties(const Engines::FieldsDict& dico)
227 {
228   _fieldsDict.clear();
229   for (CORBA::ULong i=0; i<dico.length(); i++)
230     {
231       std::string cle(dico[i].key);
232       _fieldsDict[cle] = dico[i].value;
233     }
234 }
235
236 //=============================================================================
237 /*! 
238  *  CORBA method: 
239  *  returns a previously stored map (key=string,value=any) as a sequence.
240  *  (see setProperties)
241  */
242 //=============================================================================
243
244 Engines::FieldsDict* Engines_Parallel_Component_i::getProperties()
245 {
246   Engines::FieldsDict_var copie = new Engines::FieldsDict;
247   copie->length(_fieldsDict.size());
248   std::map<std::string,CORBA::Any>::iterator it;
249   CORBA::ULong i = 0;
250   for (it = _fieldsDict.begin(); it != _fieldsDict.end(); it++, i++)
251     {
252       std::string cle((*it).first);
253       copie[i].key = CORBA::string_dup(cle.c_str());
254       copie[i].value = _fieldsDict[cle];
255     }
256   return copie._retn();
257 }
258
259 //=============================================================================
260 /*! 
261  *  CORBA method: used by Supervision to give names to this instance
262  */
263 //=============================================================================
264
265 void Engines_Parallel_Component_i::Names( const char * graphName ,
266                                  const char * nodeName )
267 {
268   _graphName = graphName;
269   _nodeName = nodeName;
270   MESSAGE("Engines_Parallel_Component_i::Names( '" << _graphName << "' , '" 
271                                                    << _nodeName << "' )");
272 }
273
274 //=============================================================================
275 /*! 
276  *  CORBA method: used in Supervision
277  */
278 //=============================================================================
279
280 bool Engines_Parallel_Component_i::Kill_impl() 
281 {
282 //  MESSAGE("Engines_Parallel_Component_i::Kill_i() pthread_t "<< pthread_self()
283 //          << " pid " << getpid() << " instanceName "
284 //          << _instanceName.c_str() << " interface " << _interfaceName.c_str()
285 //          << " machineName " << GetHostname().c_str()<< " _id " << hex << _id
286 //          << dec << " _ThreadId " << _ThreadId << " this " << hex << this
287 //          << dec ) ;
288
289   bool RetVal = false ;
290 #ifndef WIN32
291   if ( _ThreadId > 0 && pthread_self() != _ThreadId )
292     {
293       RetVal = Killer( _ThreadId , SIGUSR2 ) ;
294       _ThreadId = (pthread_t ) -1 ;
295     }
296
297 #else
298   if ( _ThreadId > 0 && pthread_self().p != _ThreadId->p )
299     {
300       RetVal = Killer( *_ThreadId , 0 ) ;
301       _ThreadId = (pthread_t* ) 0 ;
302     }
303
304 #endif
305   return RetVal ;
306 }
307
308 //=============================================================================
309 /*! 
310  *  CORBA method: used in Supervision
311  */
312 //=============================================================================
313
314 bool Engines_Parallel_Component_i::Stop_impl()
315 {
316 #ifndef WIN32
317   MESSAGE("Engines_Parallel_Component_i::Stop_i() pthread_t "<< pthread_self()
318           << " pid " << getpid() << " instanceName "
319           << _instanceName.c_str() << " interface " << _interfaceName.c_str()
320           << " machineName " << Kernel_Utils::GetHostname().c_str()<< " _id " << hex << _id
321           << dec << " _ThreadId " << _ThreadId );
322 #else
323   MESSAGE("Engines_Parallel_Component_i::Stop_i() pthread_t "<< pthread_self().p
324           << " pid " << _getpid() << " instanceName "
325           << _instanceName.c_str() << " interface " << _interfaceName.c_str()
326           << " machineName " << Kernel_Utils::GetHostname().c_str()<< " _id " << hex << _id
327           << dec << " _ThreadId " << _ThreadId );
328 #endif
329   
330
331   bool RetVal = false ;
332 #ifndef WIN32
333   if ( _ThreadId > 0 && pthread_self() != _ThreadId )
334     {
335       RetVal = Killer( _ThreadId , 0 ) ;
336       _ThreadId = (pthread_t ) -1 ;
337     }
338 #else
339   if ( _ThreadId > 0 && pthread_self().p != _ThreadId->p )
340     {
341       RetVal = Killer( *_ThreadId , 0 ) ;
342       _ThreadId = (pthread_t* ) 0 ;
343     }
344 #endif
345   return RetVal ;
346 }
347
348 //=============================================================================
349 /*! 
350  *  CORBA method: used in Supervision
351  */
352 //=============================================================================
353
354 bool Engines_Parallel_Component_i::Suspend_impl()
355 {
356 #ifndef WIN32
357   MESSAGE("Engines_Parallel_Component_i::Suspend_i() pthread_t "<< pthread_self()
358           << " pid " << getpid() << " instanceName "
359           << _instanceName.c_str() << " interface " << _interfaceName.c_str()
360           << " machineName " << Kernel_Utils::GetHostname().c_str()<< " _id " << hex << _id
361           << dec << " _ThreadId " << _ThreadId );
362 #else
363   MESSAGE("Engines_Parallel_Component_i::Suspend_i() pthread_t "<< pthread_self().p
364           << " pid " << _getpid() << " instanceName "
365           << _instanceName.c_str() << " interface " << _interfaceName.c_str()
366           << " machineName " << Kernel_Utils::GetHostname().c_str()<< " _id " << hex << _id
367           << dec << " _ThreadId " << _ThreadId );
368 #endif
369
370   bool RetVal = false ;
371 #ifndef WIN32
372   if ( _ThreadId > 0 && pthread_self() != _ThreadId )
373 #else
374   if ( _ThreadId > 0 && pthread_self().p != _ThreadId->p )
375 #endif
376     {
377       if ( _Sleeping )
378         {
379           return false ;
380         }
381     else 
382       {
383 #ifndef WIN32
384         RetVal = Killer( _ThreadId ,SIGINT ) ;
385 #else
386         RetVal = Killer( *_ThreadId ,SIGINT ) ;
387 #endif
388         //if ( RetVal ) _Sleeping = true;
389
390       }
391     }
392   return RetVal ;
393 }
394
395 //=============================================================================
396 /*! 
397  *  CORBA method: used in Supervision
398  */
399 //=============================================================================
400
401 bool Engines_Parallel_Component_i::Resume_impl()
402 {
403 #ifndef WIN32
404   MESSAGE("Engines_Parallel_Component_i::Resume_i() pthread_t "<< pthread_self()
405           << " pid " << getpid() << " instanceName "
406           << _instanceName.c_str() << " interface " << _interfaceName.c_str()
407           << " machineName " << Kernel_Utils::GetHostname().c_str()<< " _id " << hex << _id
408           << dec << " _ThreadId " << _ThreadId );
409 #else
410   MESSAGE("Engines_Parallel_Component_i::Resume_i() pthread_t "<< pthread_self().p
411           << " pid " << _getpid() << " instanceName "
412           << _instanceName.c_str() << " interface " << _interfaceName.c_str()
413           << " machineName " << Kernel_Utils::GetHostname().c_str()<< " _id " << hex << _id
414           << dec << " _ThreadId " << _ThreadId );
415 #endif
416   bool RetVal = false ;
417 #ifndef WIN32
418   if ( _ThreadId > 0 && pthread_self() != _ThreadId )
419 #else
420   if ( _ThreadId > 0 && pthread_self().p != _ThreadId->p )
421 #endif
422     {
423     if ( _Sleeping ) 
424       {
425         _Sleeping = false ;
426         RetVal = true ;
427       }
428     else
429       {
430         RetVal = false ;
431       }
432     }
433   return RetVal ;
434 }
435
436 //=============================================================================
437 /*! 
438  *  CORBA method: 
439  */
440 //=============================================================================
441
442 CORBA::Long Engines_Parallel_Component_i::CpuUsed_impl()
443 {
444   long cpu = 0 ;
445   if ( _ThreadId || _Executed )
446     {
447     if ( _ThreadId > 0 )
448       {
449 #ifndef WIN32
450       if ( pthread_self() != _ThreadId )
451 #else
452       if ( pthread_self().p != _ThreadId->p )
453 #endif
454         {
455         if ( _Sleeping )
456           {
457           }
458         else
459           {
460             // Get Cpu in the appropriate thread with that object !...
461             theEngines_Component = this ;
462 #ifndef WIN32
463             Killer( _ThreadId ,SIGUSR1 ) ;
464 #else
465             Killer( *_ThreadId ,SIGUSR11 ) ;
466 #endif
467           }
468         cpu = _ThreadCpuUsed ;
469         }
470       else
471         {
472           _ThreadCpuUsed = CpuUsed() ;
473           cpu = _ThreadCpuUsed ;
474           // std::cout << pthread_self() << " Engines_Parallel_Component_i::CpuUsed_impl "
475           //      << _serviceName << " " << cpu << std::endl ;
476       }
477     }
478     else 
479       {
480         cpu = _ThreadCpuUsed ;
481         // std::cout << pthread_self() << " Engines_Parallel_Component_i::CpuUsed_impl "
482         //      << _serviceName << " " << cpu<< std::endl ;
483       }
484     }
485   else
486     {
487       // std::cout<< pthread_self()<<"Engines_Parallel_Component_i::CpuUsed_impl _ThreadId "
488       //     <<_ThreadId <<" "<<_serviceName<<" _StartUsed "<<_StartUsed<<std::endl;
489     }
490   return cpu ;
491 }
492
493
494 //=============================================================================
495 /*! 
496  *  C++ method: return Container Servant
497  */
498 //=============================================================================
499
500 Engines_Parallel_Container_i *Engines_Parallel_Component_i::GetContainerPtr()
501 {
502   return dynamic_cast<Engines_Parallel_Container_i*>(_poa->id_to_servant(*_contId)) ;
503 }
504
505 //=============================================================================
506 /*! 
507  *  C++ method: return CORBA instance id, the id is set in derived class
508  *  constructor, when instance is activated.
509  */
510 //=============================================================================
511
512 PortableServer::ObjectId * Engines_Parallel_Component_i::getId()
513 {
514 //  MESSAGE("PortableServer::ObjectId * Engines_Parallel_Component_i::getId()");
515   return _id ;
516 }
517
518 //=============================================================================
519 /*! 
520  *  C++ method: used by derived classes for supervision
521  */
522 //=============================================================================
523
524 void Engines_Parallel_Component_i::beginService(const char *serviceName)
525 {
526 #ifndef WIN32
527   MESSAGE(pthread_self() << "Send BeginService notification for " <<serviceName
528           << endl << "Parallel Component instance : " << _instanceName << endl << endl);
529 #else
530   MESSAGE(pthread_self().p << "Send BeginService notification for " <<serviceName
531           << endl << "Parallel Component instance : " << _instanceName << endl << endl);
532 #endif
533 #ifndef WIN32
534   _ThreadId = pthread_self() ;
535 #else
536   _ThreadId = new pthread_t;
537   _ThreadId->p = pthread_self().p ;
538   _ThreadId->x = pthread_self().x ;
539 #endif
540   _StartUsed = 0 ;
541   _StartUsed = CpuUsed_impl() ;
542   _ThreadCpuUsed = 0 ;
543   _Executed = true ;
544   _serviceName = serviceName ;
545   if ( pthread_setcanceltype( PTHREAD_CANCEL_ASYNCHRONOUS , NULL ) )
546     {
547       perror("pthread_setcanceltype ") ;
548       exit(0) ;
549     }
550   if ( pthread_setcancelstate( PTHREAD_CANCEL_ENABLE , NULL ) )
551     {
552       perror("pthread_setcancelstate ") ;
553       exit(0) ;
554     }
555 //  MESSAGE(pthread_self() << " Return from BeginService for " << serviceName
556 //          << " ThreadId " << _ThreadId << " StartUsed " << _StartUsed
557 //          << " _graphName " << _graphName << " _nodeName " << _nodeName );
558
559   // --- for supervisor : all strings given with setProperties
560   //     are set in environment
561   bool overwrite = true;
562   std::map<std::string,CORBA::Any>::iterator it;
563   for (it = _fieldsDict.begin(); it != _fieldsDict.end(); it++)
564     {
565       std::string cle((*it).first);
566       if ((*it).second.type()->kind() == CORBA::tk_string)
567         {
568           const char* value;
569           (*it).second >>= value;
570           // ---todo: replace __GNUC__ test by an autoconf macro AC_CHECK_FUNC.
571 #if defined __GNUC__
572           //int ret = setenv(cle.c_str(), value, overwrite);
573           setenv(cle.c_str(), value, overwrite);
574 #else
575           //CCRT porting : setenv not defined in stdlib.h
576           std::string s(cle);
577           s+='=';
578           s+=value;
579           // char* cast because 1st arg of linux putenv function
580           // is not a const char* !
581           //int ret=putenv((char *)s.c_str());
582           putenv((char *)s.c_str());
583           //End of CCRT porting
584 #endif
585           MESSAGE("--- setenv: "<<cle<<" = "<< value);
586         }
587     }
588 }
589
590 //=============================================================================
591 /*! 
592  *  C++ method: used by derived classes for supervision
593  */
594 //=============================================================================
595
596 void Engines_Parallel_Component_i::endService(const char *serviceName)
597 {
598   if ( !_CanceledThread )
599     _ThreadCpuUsed = CpuUsed_impl() ;
600
601 #ifndef WIN32
602   MESSAGE(pthread_self() << " Send EndService notification for " << serviceName
603           << endl << " Parallel Component instance : " << _instanceName << " StartUsed "
604           << _StartUsed << " _ThreadCpuUsed "<< _ThreadCpuUsed << endl <<endl);
605 #else
606   MESSAGE(pthread_self().p << " Send EndService notification for " << serviceName
607           << endl << " Parallel Component instance : " << _instanceName << " StartUsed "
608     << _StartUsed << " _ThreadCpuUsed "<< _ThreadCpuUsed << endl <<endl);
609 #endif
610   _ThreadId = 0 ;
611 }
612
613 //=============================================================================
614 /*! 
615  *  C++ method: -- CHECK IF USED --
616  */
617 //=============================================================================
618
619 char* Engines_Parallel_Component_i::graphName()
620 {
621   return CORBA::string_dup( _graphName.c_str() ) ;
622 }
623
624 //=============================================================================
625 /*! 
626  *  C++ method: -- CHECK IF USED --
627  */
628 //=============================================================================
629
630 char* Engines_Parallel_Component_i::nodeName()
631 {
632   return CORBA::string_dup( _nodeName.c_str() ) ;
633 }
634
635 //=============================================================================
636 /*! 
637  *  C++ method: used in Supervision (see kill_impl)
638  */
639 //=============================================================================
640
641 bool Engines_Parallel_Component_i::Killer( pthread_t ThreadId , int signum )
642 {
643 #ifndef WIN32
644   if ( ThreadId )
645 #else
646   if ( ThreadId.p )
647 #endif
648     {
649       if ( signum == 0 )
650         {
651           if ( pthread_cancel( ThreadId ) )
652             {
653               perror("Killer pthread_cancel error") ;
654               return false ;
655             }
656           else
657             {
658 #ifndef WIN32
659               MESSAGE(pthread_self() << "Killer : ThreadId " << ThreadId
660                       << " pthread_canceled") ;
661 #else
662         MESSAGE(pthread_self().p << "Killer : ThreadId " << ThreadId.p
663                       << " pthread_canceled") ;
664 #endif
665             }
666         }
667       else
668         {
669           if ( pthread_kill( ThreadId , signum ) == -1 )
670             {
671               perror("Killer pthread_kill error") ;
672               return false ;
673             }
674           else 
675             {
676 #ifndef WIN32
677         MESSAGE(pthread_self() << "Killer : ThreadId " << ThreadId
678                       << " pthread_killed(" << signum << ")") ;
679 #else
680         MESSAGE(pthread_self().p << "Killer : ThreadId " << ThreadId.p
681                       << " pthread_killed(" << signum << ")") ;
682 #endif
683             }
684         }
685     }
686   return true ;
687 }
688
689 //=============================================================================
690 /*! 
691  *  C++ method:
692  */ 
693 //=============================================================================
694
695 void SetCpuUsed()
696 {
697   if (theEngines_Component)
698     theEngines_Component->SetCurCpu();
699 }
700
701 //=============================================================================
702 /*! 
703  *  C++ method:
704  */
705 //=============================================================================
706
707 void Engines_Parallel_Component_i::SetCurCpu()
708 {
709   _ThreadCpuUsed =  CpuUsed() ;
710   //  MESSAGE(pthread_self() << 
711   //  " Engines_Parallel_Component_i::SetCurCpu() _ThreadCpuUsed " << _ThreadCpuUsed) ;
712 }
713
714 //=============================================================================
715 /*! 
716  *  C++ method:
717  */
718 //=============================================================================
719
720 long Engines_Parallel_Component_i::CpuUsed()
721 {
722   long cpu = 0 ;
723 #ifndef WIN32
724   struct rusage usage ;
725   if ( _ThreadId || _Executed )
726     {
727       if ( getrusage( RUSAGE_SELF , &usage ) == -1 )
728         {
729           perror("Engines_Parallel_Component_i::CpuUsed") ;
730           return 0 ;
731         }
732       cpu = usage.ru_utime.tv_sec - _StartUsed ;
733       // std::cout << pthread_self() << " Engines_Parallel_Component_i::CpuUsed " << " "
734       //      << _serviceName   << usage.ru_utime.tv_sec << " - " << _StartUsed
735       //      << " = " << cpu << std::endl ;
736     }
737   else
738     {
739       // std::cout << pthread_self() << "Engines_Parallel_Component_i::CpuUsed _ThreadId "
740       //      << _ThreadId << " " << _serviceName<< " _StartUsed " 
741       //      << _StartUsed << std::endl ;
742     }
743 #else
744         // NOT implementet yet
745 #endif
746
747
748   return cpu ;
749 }
750
751 void CallCancelThread()
752 {
753   if ( theEngines_Component )
754     theEngines_Component->CancelThread() ;
755 }
756
757 //=============================================================================
758 /*!
759  *  C++ method:
760  */
761 //=============================================================================
762
763 void Engines_Parallel_Component_i::CancelThread()
764 {
765   _CanceledThread = true;
766 }
767
768 //=============================================================================
769 /*! 
770  *  C++ method: Send message to event channel
771  */
772 //=============================================================================
773
774 void Engines_Parallel_Component_i::sendMessage(const char *event_type,
775                                       const char *message)
776 {
777     _notifSupplier->Send(graphName(), nodeName(), event_type, message);
778 }
779
780 //=============================================================================
781 /*! 
782  *  C++ method: return standard library name built on component name
783  */
784 //=============================================================================
785
786 std::string Engines_Parallel_Component_i::GetDynLibraryName(const char *componentName)
787 {
788 #ifndef WIN32
789   std::string ret="lib";
790   ret+=componentName;
791   ret+="Engine.so";
792 #else
793   std::string ret=componentName;
794   ret+="Engine.dll";
795 #endif 
796   return ret;
797 }
798
799 //=============================================================================
800 /*! 
801  *  C++ method: DumpPython default implementation
802  */
803 //=============================================================================
804
805 Engines::TMPFile* Engines_Parallel_Component_i::DumpPython(CORBA::Boolean isPublished,
806                                                            CORBA::Boolean isMultiFile,
807                                                            CORBA::Boolean& isValidScript)
808 {
809   const char* aScript = isMultiFile ? "def RebuildData(): pass" : "";
810   char* aBuffer = new char[strlen(aScript)+1];
811   strcpy(aBuffer, aScript);
812   CORBA::Octet* anOctetBuf =  (CORBA::Octet*)aBuffer;
813   int aBufferSize = strlen(aBuffer)+1;
814   Engines::TMPFile_var aStreamFile = new Engines::TMPFile(aBufferSize, aBufferSize, anOctetBuf, 1); 
815   isValidScript = true;
816   return aStreamFile._retn(); 
817 }
818
819
820 Engines::Salome_file_ptr 
821 Engines_Parallel_Component_i::setInputFileToService(const char* service_name, 
822                                                     const char* Salome_file_name) 
823 {
824   // Try to find the service, if it doesn't exist, we add it.
825   _Service_file_map_it = _Input_Service_file_map.find(service_name);
826   if (_Service_file_map_it ==  _Input_Service_file_map.end()) {
827     _t_Salome_file_map * _map = new _t_Salome_file_map();
828     _Input_Service_file_map[service_name] = _map;
829     _t_Proxy_Salome_file_map * _proxy_map = new _t_Proxy_Salome_file_map();
830     _Proxy_Input_Service_file_map[service_name] = _proxy_map;
831     _t_IOR_Proxy_Salome_file_map * _IOR_proxy_map = new _t_IOR_Proxy_Salome_file_map();
832     _IOR_Proxy_Input_Service_file_map[service_name] = _IOR_proxy_map;
833   }
834   _t_Salome_file_map * _map = _Input_Service_file_map[service_name];
835   _t_Proxy_Salome_file_map * _proxy_map = _Proxy_Input_Service_file_map[service_name];
836   _t_IOR_Proxy_Salome_file_map * _IOR_proxy_map = _IOR_Proxy_Input_Service_file_map[service_name];
837   
838   pthread_mutex_lock(deploy_mutex);
839   std::string proxy_ior;
840
841   // Try to find the Salome_file ...
842   _Salome_file_map_it = _map->find(Salome_file_name);
843   if (_Salome_file_map_it ==  _map->end()) {
844
845     // We create a new PaCO++ object.
846     // He has the same configuration than
847     // his component
848
849     // Firstly, we have to create the proxy object
850     // of the Salome_file and transmit his
851     // reference to the other nodes.
852     Engines::Parallel_Salome_file_proxy_impl * proxy = NULL;
853     if (getMyRank() == 0) {
854       proxy = new Engines::Parallel_Salome_file_proxy_impl(CORBA::ORB::_duplicate(_orb),
855                                                            new paco_omni_fabrique());
856       proxy->copyGlobalContext(this); 
857       PaCO::PacoTopology_t serveur_topo;
858       serveur_topo.total = getTotalNode();
859       proxy->setTopology(serveur_topo);
860
861       // We register the CORBA objet into the POA
862       CORBA::Object_ptr proxy_ref = proxy->_this();
863
864       // We send the reference to all the nodes...
865       Engines::Parallel_Component_var component_proxy = 
866         Engines::Parallel_Component::_narrow(InterfaceParallel_impl::_proxy);
867       component_proxy->send_parallel_proxy_object(proxy_ref);
868
869       // Adding proxy into the map
870       (*_proxy_map)[Salome_file_name] = proxy;
871     }
872     else {
873       this->wait_parallel_object_proxy();
874     }
875
876     proxy_ior = this->get_parallel_proxy_object();
877     (*_IOR_proxy_map)[Salome_file_name] = proxy_ior;
878
879     // We register each node of the parallel Salome_file object
880     // into the proxy.
881     for (int i = 0; i < getTotalNode(); i++) {
882       if (i ==  getMyRank()) {
883         Parallel_Salome_file_i * servant = 
884           new Parallel_Salome_file_i(CORBA::ORB::_duplicate(_orb), 
885                                      proxy_ior.c_str(),
886                                      i);
887         servant->copyGlobalContext(this); 
888         
889         // We register the CORBA objet into the POA
890         servant->POA_PaCO::InterfaceParallel::_this();
891
892         // Register the servant
893         servant->deploy();
894
895         // Adding servant to the map
896         (*_map)[Salome_file_name] = servant;
897       }
898
899       _my_com->paco_barrier();
900       // start parallel object
901       if (getMyRank() == 0) {
902         proxy->start();
903         _my_com->paco_barrier();
904       }
905       else
906         _my_com->paco_barrier();
907     }
908     // Parallel_Salome_file is created and deployed
909     delete _proxy;
910     _proxy = NULL;
911   }
912
913   pthread_mutex_unlock(deploy_mutex);
914   proxy_ior = (*_IOR_proxy_map)[Salome_file_name];
915   CORBA::Object_ptr proxy_ref = _orb->string_to_object(proxy_ior.c_str());
916   return Engines::Salome_file::_narrow(proxy_ref);
917 }
918
919 Engines::Salome_file_ptr 
920 Engines_Parallel_Component_i::setOutputFileToService(const char* service_name, 
921                                                      const char* Salome_file_name) 
922 {
923   // Try to find the service, if it doesn't exist, we add it.
924   _Service_file_map_it = _Output_Service_file_map.find(service_name);
925   if (_Service_file_map_it ==  _Output_Service_file_map.end()) {
926     _t_Salome_file_map * _map = new _t_Salome_file_map();
927     _Output_Service_file_map[service_name] = _map;
928     _t_Proxy_Salome_file_map * _proxy_map = new _t_Proxy_Salome_file_map();
929     _Proxy_Output_Service_file_map[service_name] = _proxy_map;
930     _t_IOR_Proxy_Salome_file_map * _IOR_proxy_map = new _t_IOR_Proxy_Salome_file_map();
931     _IOR_Proxy_Output_Service_file_map[service_name] = _IOR_proxy_map;
932   }
933   _t_Salome_file_map * _map = _Output_Service_file_map[service_name];
934   _t_Proxy_Salome_file_map * _proxy_map = _Proxy_Output_Service_file_map[service_name];
935   _t_IOR_Proxy_Salome_file_map * _IOR_proxy_map = _IOR_Proxy_Output_Service_file_map[service_name];
936   
937   pthread_mutex_lock(deploy_mutex);
938   std::string proxy_ior;
939
940   // Try to find the Salome_file ...
941   _Salome_file_map_it = _map->find(Salome_file_name);
942   Engines::Parallel_Salome_file_proxy_impl * proxy;
943   if (_Salome_file_map_it ==  _map->end()) {
944
945     // We create a new PaCO++ object.
946     // He has the same configuration than
947     // his component
948
949     // Firstly, we have to create the proxy object
950     // of the Salome_file and transmit his
951     // reference to the other nodes.
952     if (getMyRank() == 0) {
953         proxy = new Engines::Parallel_Salome_file_proxy_impl(CORBA::ORB::_duplicate(_orb),
954                                                              new paco_omni_fabrique());
955       proxy->copyGlobalContext(this); 
956       PaCO::PacoTopology_t serveur_topo;
957       serveur_topo.total = getTotalNode();
958       proxy->setTopology(serveur_topo);
959
960       // We register the CORBA objet into the POA
961       CORBA::Object_ptr proxy_ref = proxy->_this();
962
963       // We send the reference to all the nodes...
964       Engines::Parallel_Component_var component_proxy = 
965         Engines::Parallel_Component::_narrow(InterfaceParallel_impl::_proxy);
966       component_proxy->send_parallel_proxy_object(proxy_ref);
967
968       // Adding proxy into the map
969       (*_proxy_map)[Salome_file_name] = proxy;
970     }
971     else {
972       this->wait_parallel_object_proxy();
973     }
974
975     proxy_ior = this->get_parallel_proxy_object();
976     (*_IOR_proxy_map)[Salome_file_name] = proxy_ior;
977
978     // We register each node of the parallel Salome_file object
979     // into the proxy.
980     for (int i = 0; i < getTotalNode(); i++) {
981       if (i ==  getMyRank()) {
982         Parallel_Salome_file_i * servant = 
983           new Parallel_Salome_file_i(CORBA::ORB::_duplicate(_orb), 
984                                      proxy_ior.c_str(),
985                                      i);
986         servant->copyGlobalContext(this); 
987         
988         // We register the CORBA objet into the POA
989         servant->POA_PaCO::InterfaceParallel::_this();
990
991         // Register the servant
992         servant->deploy();
993
994         // Adding servant to the map
995         (*_map)[Salome_file_name] = servant;
996       }
997
998       _my_com->paco_barrier();
999       // start parallel object
1000       if (getMyRank() == 0) {
1001         proxy->start();
1002         _my_com->paco_barrier();
1003       }
1004       else
1005         _my_com->paco_barrier();
1006     }
1007
1008     // Parallel_Salome_file is created and deployed
1009     delete _proxy;
1010     _proxy = NULL;
1011   }
1012   pthread_mutex_unlock(deploy_mutex);
1013   proxy_ior = (*_IOR_proxy_map)[Salome_file_name];
1014   CORBA::Object_ptr proxy_ref = _orb->string_to_object(proxy_ior.c_str());
1015   return Engines::Salome_file::_narrow(proxy_ref);
1016 }
1017
1018 Engines::Salome_file_ptr 
1019 Engines_Parallel_Component_i::getInputFileToService(const char* service_name, 
1020                                                     const char* Salome_file_name) 
1021 {
1022   // Try to find the service, if it doesn't exist, we throw an exception.
1023   _Proxy_Service_file_map_it = _Proxy_Input_Service_file_map.find(service_name);
1024   if (_Proxy_Service_file_map_it ==  _Proxy_Input_Service_file_map.end()) {
1025     SALOME::ExceptionStruct es;
1026     es.type = SALOME::INTERNAL_ERROR;
1027     es.text = "service doesn't have salome files";
1028     throw SALOME::SALOME_Exception(es);
1029   }
1030   _t_Proxy_Salome_file_map * _map = _Proxy_Input_Service_file_map[service_name];
1031
1032   // Try to find the Salome_file ...
1033   _Proxy_Salome_file_map_it = _map->find(Salome_file_name);
1034   if (_Proxy_Salome_file_map_it ==  _map->end()) {
1035     SALOME::ExceptionStruct es;
1036     es.type = SALOME::INTERNAL_ERROR;
1037     es.text = "service doesn't have this Salome_file";
1038     throw SALOME::SALOME_Exception(es);
1039   }
1040
1041   // Client get the proxy object
1042   Engines::Parallel_Salome_file_proxy_impl * Sfile = (*_map)[Salome_file_name];
1043   return Sfile->_this();
1044 }
1045
1046 Engines::Salome_file_ptr 
1047 Engines_Parallel_Component_i::getOutputFileToService(const char* service_name, 
1048                                                      const char* Salome_file_name) 
1049 {
1050   // Try to find the service, if it doesn't exist, we throw an exception.
1051   _Proxy_Service_file_map_it = _Proxy_Output_Service_file_map.find(service_name);
1052   if (_Proxy_Service_file_map_it ==  _Proxy_Output_Service_file_map.end()) {
1053     SALOME::ExceptionStruct es;
1054     es.type = SALOME::INTERNAL_ERROR;
1055     es.text = "service doesn't have salome files";
1056     throw SALOME::SALOME_Exception(es);
1057   }
1058   _t_Proxy_Salome_file_map * _map = _Proxy_Output_Service_file_map[service_name];
1059
1060   // Try to find the Salome_file ...
1061   _Proxy_Salome_file_map_it = _map->find(Salome_file_name);
1062   if (_Proxy_Salome_file_map_it ==  _map->end()) {
1063     SALOME::ExceptionStruct es;
1064     es.type = SALOME::INTERNAL_ERROR;
1065     es.text = "service doesn't have this Salome_file";
1066     throw SALOME::SALOME_Exception(es);
1067   }
1068
1069   // Client get the proxy object
1070   Engines::Parallel_Salome_file_proxy_impl * Sfile = (*_map)[Salome_file_name];
1071   return Sfile->_this();
1072 }
1073
1074
1075 void 
1076 Engines_Parallel_Component_i::checkInputFilesToService(const char* service_name) 
1077 {
1078   // Try to find the service, if it doesn't exist, nothing to do.
1079   _Proxy_Service_file_map_it = _Proxy_Input_Service_file_map.find(service_name);
1080   if (_Proxy_Service_file_map_it !=  _Proxy_Input_Service_file_map.end()) {
1081     _t_Proxy_Salome_file_map * _proxy_map = _Proxy_Input_Service_file_map[service_name];
1082     _t_Proxy_Salome_file_map::iterator begin = _proxy_map->begin();
1083     _t_Proxy_Salome_file_map::iterator end = _proxy_map->end();
1084
1085     for(;begin!=end;begin++) {
1086       Engines::Parallel_Salome_file_proxy_impl * file = begin->second;
1087       std::string file_port_name = begin->first;
1088       configureSalome_file(service_name, file_port_name, file);
1089       file->recvFiles();
1090     }
1091   }
1092 }
1093
1094 void 
1095 Engines_Parallel_Component_i::checkOutputFilesToService(const char* service_name) 
1096 {
1097   // Try to find the service, if it doesn't exist, nothing to do.
1098   _Proxy_Service_file_map_it = _Proxy_Output_Service_file_map.find(service_name);
1099   if (_Proxy_Service_file_map_it !=  _Proxy_Output_Service_file_map.end()) {
1100     _t_Proxy_Salome_file_map * _map = _Proxy_Output_Service_file_map[service_name];
1101     _t_Proxy_Salome_file_map::iterator begin = _map->begin();
1102     _t_Proxy_Salome_file_map::iterator end = _map->end();
1103
1104     for(;begin!=end;begin++) {
1105       Engines::Parallel_Salome_file_proxy_impl * file = begin->second;
1106       std::string file_port_name = begin->first;
1107       configureSalome_file(service_name, file_port_name, file);
1108       file->recvFiles();
1109     }
1110   }
1111
1112 }
1113
1114 //=============================================================================
1115 /*! 
1116  *  C++ method: Used by the Parallel Component to deploy a Parallel Salome_file
1117  */ 
1118 //=============================================================================
1119 void 
1120 Engines_Parallel_Component_i::send_parallel_proxy_object(CORBA::Object_ptr proxy_ref) {
1121   _proxy = _orb->object_to_string(proxy_ref);
1122 }
1123
1124 //=============================================================================
1125 /*! 
1126  *  C++ method: Used by the Parallel Component to deploy a Parallel Salome_file
1127  */ 
1128 //=============================================================================
1129 void 
1130 Engines_Parallel_Component_i::wait_parallel_object_proxy() {
1131   char * proxy = NULL;
1132   proxy =  get_parallel_proxy_object();
1133   while(proxy == NULL)
1134   {
1135     sleep(1);
1136     proxy = get_parallel_proxy_object();
1137   }
1138 }
1139
1140 //=============================================================================
1141 /*! 
1142  *  C++ method: Used by the Parallel Component to deploy a Parallel Salome_file
1143  */ 
1144 //=============================================================================
1145 char * 
1146 Engines_Parallel_Component_i::get_parallel_proxy_object() {
1147   return _proxy;
1148 }
1149
1150
1151 //=============================================================================
1152 /*! 
1153  *  C++ method: used to configure the Salome_file into the runtime.
1154  *  \param service_name name of the service that use this Salome_file
1155  *  \param file_port_name name of the Salome_file
1156  *  \param file Parallel Salome_file C++ object
1157  */
1158 //=============================================================================
1159 void
1160 Engines_Parallel_Component_i::configureSalome_file(std::string service_name,
1161                                                    std::string file_port_name,
1162                                                    Engines::Parallel_Salome_file_proxy_impl * file) 
1163 {
1164   // By default this method does nothing
1165 }