Salome HOME
28b53ce6be08da49bba497ad13a7a2184618ad53
[modules/kernel.git] / src / ParallelContainer / SALOME_ParallelComponent_i.cxx
1 // Copyright (C) 2007-2014  CEA/DEN, EDF R&D, OPEN CASCADE
2 //
3 // Copyright (C) 2003-2007  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 // CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
5 //
6 // This library is free software; you can redistribute it and/or
7 // modify it under the terms of the GNU Lesser General Public
8 // License as published by the Free Software Foundation; either
9 // version 2.1 of the License, or (at your option) any later version.
10 //
11 // This library is distributed in the hope that it will be useful,
12 // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 // Lesser General Public License for more details.
15 //
16 // You should have received a copy of the GNU Lesser General Public
17 // License along with this library; if not, write to the Free Software
18 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
19 //
20 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 //
22
23 //  SALOME_ParallelComponent : implementation of container and engine for Parallel Kernel
24 //  File   : SALOME_ParallelComponent_i.cxx
25 //  Author : AndrĂ© RIBES, EDF
26 //  Author : Paul RASCLE, EDF - MARC TAJCHMAN, CEA
27 //
28 #include "SALOME_ParallelComponent_i.hxx"
29 #include "SALOME_ParallelContainer_i.hxx"
30
31 #include "OpUtil.hxx"
32 #include <stdio.h>
33 #ifndef WIN32
34 #include <dlfcn.h>
35 #endif
36 #include <cstdlib>
37 #include "utilities.h"
38 #include "Basics_Utils.hxx"
39
40 #ifndef WIN32
41 #include <sys/time.h>
42 #include <sys/resource.h>
43 #include <unistd.h>
44 #else
45 #include <sys/timeb.h>
46 int SIGUSR11 = 1000;
47 #endif
48
49 #include <paco_dummy.h>
50 #include <paco_omni.h>
51
52
53 extern bool _Sleeping ;
54 static Engines_Parallel_Component_i * theEngines_Component ;
55
56 bool Engines_Parallel_Component_i::_isMultiStudy = true;
57 bool Engines_Parallel_Component_i::_isMultiInstance = false;
58
59 //=============================================================================
60 /*! 
61  *  Standard Constructor for generic Component, used in derived class
62  *  Connection to Registry and Notification
63  *  \param orb Object Request broker given by Container
64  *  \parap poa Portable Object Adapter from Container (normally root_poa)
65  *  \param contId container CORBA id inside the server
66  *  \param instanceName unique instance name for this object (see Container_i)
67  *  \param interfaceName component class name
68  *  \param notif use of notification
69  */
70 //=============================================================================
71
72 Engines_Parallel_Component_i::Engines_Parallel_Component_i(CORBA::ORB_ptr orb, char * ior, int rank,
73                                          PortableServer::POA_ptr poa, 
74                                          PortableServer::ObjectId * contId, 
75                                          const char *instanceName,
76                                          const char *interfaceName,
77                                          bool notif,
78                                          bool regist) :
79   InterfaceParallel_impl(orb,ior,rank), 
80   Engines::EngineComponent_serv(orb,ior,rank),
81   Engines::EngineComponent_base_serv(orb,ior,rank),
82   Engines::Parallel_Component_serv(orb,ior,rank),
83   Engines::Parallel_Component_base_serv(orb,ior,rank),
84   _instanceName(instanceName),
85   _interfaceName(interfaceName),
86   _id(NULL),
87   _myConnexionToRegistry(0),
88   _ThreadId(0) ,
89   _ThreadCpuUsed(0) ,
90   _Executed(false) ,
91   _graphName("") ,
92   _nodeName(""),
93   _studyId(-1),
94   _destroyed(false),
95   _CanceledThread(false)
96 {
97   MESSAGE("Parallel Component constructor with instanceName "<< _instanceName);
98   //SCRUTE(pd_refCount);
99   _orb = CORBA::ORB::_duplicate(orb);
100   _poa = PortableServer::POA::_duplicate(poa);
101   _contId = contId ;
102   CORBA::Object_var o = _poa->id_to_reference(*contId); // container ior...
103
104   if (regist)
105   {
106     CORBA::String_var the_ior = _orb->object_to_string(o);
107     _myConnexionToRegistry = new RegistryConnexion(0, 0, the_ior,"theSession",
108                                                    _instanceName.c_str());
109   }
110   _notifSupplier = new NOTIFICATION_Supplier(instanceName, notif);
111
112   deploy_mutex = new pthread_mutex_t();
113   pthread_mutex_init(deploy_mutex, NULL);
114   _proxy = NULL;
115  //SCRUTE(pd_refCount);
116 }
117
118 //=============================================================================
119 /*! 
120  *  Destructor: call Container for decrement of instances count.
121  *  When instances count falls to 0, the container tries to remove the
122  *  component library (dlclose)
123  */
124 //=============================================================================
125
126 Engines_Parallel_Component_i::~Engines_Parallel_Component_i()
127 {
128   MESSAGE("Parallel Component destructor");
129   Engines_Parallel_Container_i::decInstanceCnt(_interfaceName);
130   if(_myConnexionToRegistry)delete _myConnexionToRegistry;
131   if(_notifSupplier)delete _notifSupplier;
132   if (_id)
133     delete(_id);
134
135   pthread_mutex_destroy(deploy_mutex);
136   delete deploy_mutex;
137   if (_proxy)
138     delete _proxy;
139 }
140
141 //=============================================================================
142 /*! 
143  *  CORBA method: return name of the instance, unique in this Container
144  */
145 //=============================================================================
146
147 char* Engines_Parallel_Component_i::instanceName()
148 {
149    return CORBA::string_dup(_instanceName.c_str()) ;
150 }
151
152 //=============================================================================
153 /*! 
154  *  CORBA method: return name of the component class
155  */
156 //=============================================================================
157
158 char* Engines_Parallel_Component_i::interfaceName()
159 {
160   return CORBA::string_dup(_interfaceName.c_str()) ;
161 }
162
163 //=============================================================================
164 /*! 
165  *  CORBA method: Get study Id
166  *  \return -1: not initialised (Internal Error)
167  *           0: multistudy component instance
168  *          >0: study id associated to this instance
169  */
170 //=============================================================================
171
172 CORBA::Long Engines_Parallel_Component_i::getStudyId()
173 {
174   return _studyId;
175 }
176
177 //=============================================================================
178 /*! 
179  *  CORBA method: Test if instance is alive and responds
180  */
181 //=============================================================================
182
183 void Engines_Parallel_Component_i::ping()
184 {
185 #ifndef WIN32
186   MESSAGE("Engines_Parallel_Component_i::ping() pid "<< getpid() << " threadid "
187           << pthread_self());
188 #else
189   MESSAGE("Engines_Parallel_Component_i::ping() pid "<< _getpid()<< " threadid "
190           << pthread_self().p );
191 #endif
192 }
193
194 //=============================================================================
195 /*! 
196  *  CORBA method: Deactivate this instance. CORBA object is deactivated (do not
197  *  respond any more to CORBA calls), the connection to Regsitry is removed
198  *  (Registry informed of deactivation), internal server reference counter on
199  *  the derived servant class is decremented, to allow destruction of the class
200  *  (delete) by POA, when there are no more references.
201  *  -- TO BE USED BY CONTAINER ONLY (Container housekeeping) --
202  */
203 //=============================================================================
204
205 void Engines_Parallel_Component_i::destroy()
206 {
207   MESSAGE("Engines_Parallel_Component_i::destroy()");
208   MESSAGE("Object Instance will be deleted when Shutdown of the container will be called");
209   if (!_destroyed)
210   {
211     _remove_ref();
212     _destroyed = true;
213   }
214 }
215
216 //=============================================================================
217 /*! 
218  *  CORBA method: return CORBA reference of the Container
219  *
220  */
221 //=============================================================================
222
223 Engines::Container_ptr Engines_Parallel_Component_i::GetContainerRef()
224 {
225   MESSAGE("Engines_Parallel_Component_i::GetContainerRef");
226   CORBA::Object_var o = _poa->id_to_reference(*_contId) ;
227   return Engines::Container::_narrow(o);
228 }
229
230 //=============================================================================
231 /*! 
232  *  CORBA method: 
233  *  Gives a sequence of (key=string,value=any) to the component. 
234  *  Base class component stores the sequence in a map.
235  *  The map is cleared before.
236  *  This map is for use by derived classes. 
237  *  \param dico sequence of (key=string,value=any)
238  */
239 //=============================================================================
240
241 void Engines_Parallel_Component_i::setProperties(const Engines::FieldsDict& dico)
242 {
243   _fieldsDict.clear();
244   for (CORBA::ULong i=0; i<dico.length(); i++)
245     {
246       std::string cle(dico[i].key);
247       _fieldsDict[cle] = dico[i].value;
248     }
249 }
250
251 //=============================================================================
252 /*! 
253  *  CORBA method: 
254  *  returns a previously stored map (key=string,value=any) as a sequence.
255  *  (see setProperties)
256  */
257 //=============================================================================
258
259 Engines::FieldsDict* Engines_Parallel_Component_i::getProperties()
260 {
261   Engines::FieldsDict_var copie = new Engines::FieldsDict;
262   copie->length(_fieldsDict.size());
263   std::map<std::string,CORBA::Any>::iterator it;
264   CORBA::ULong i = 0;
265   for (it = _fieldsDict.begin(); it != _fieldsDict.end(); it++, i++)
266     {
267       std::string cle((*it).first);
268       copie[i].key = CORBA::string_dup(cle.c_str());
269       copie[i].value = _fieldsDict[cle];
270     }
271   return copie._retn();
272 }
273
274 //=============================================================================
275 /*! 
276  *  CORBA method: used by Supervision to give names to this instance
277  */
278 //=============================================================================
279
280 void Engines_Parallel_Component_i::Names( const char * graphName ,
281                                  const char * nodeName )
282 {
283   _graphName = graphName;
284   _nodeName = nodeName;
285   MESSAGE("Engines_Parallel_Component_i::Names( '" << _graphName << "' , '" 
286                                                    << _nodeName << "' )");
287 }
288
289 //=============================================================================
290 /*! 
291  *  CORBA method: used in Supervision
292  */
293 //=============================================================================
294
295 bool Engines_Parallel_Component_i::Kill_impl() 
296 {
297 //  MESSAGE("Engines_Parallel_Component_i::Kill_i() pthread_t "<< pthread_self()
298 //          << " pid " << getpid() << " instanceName "
299 //          << _instanceName.c_str() << " interface " << _interfaceName.c_str()
300 //          << " machineName " << GetHostname().c_str()<< " _id " << hex << _id
301 //          << dec << " _ThreadId " << _ThreadId << " this " << hex << this
302 //          << dec ) ;
303
304   bool RetVal = false ;
305 #ifndef WIN32
306   if ( _ThreadId > 0 && pthread_self() != _ThreadId )
307     {
308       RetVal = Killer( _ThreadId , SIGUSR2 ) ;
309       _ThreadId = (pthread_t ) -1 ;
310     }
311
312 #else
313   if ( _ThreadId > 0 && pthread_self().p != _ThreadId->p )
314     {
315       RetVal = Killer( *_ThreadId , 0 ) ;
316       _ThreadId = (pthread_t* ) 0 ;
317     }
318
319 #endif
320   return RetVal ;
321 }
322
323 //=============================================================================
324 /*! 
325  *  CORBA method: used in Supervision
326  */
327 //=============================================================================
328
329 bool Engines_Parallel_Component_i::Stop_impl()
330 {
331 #ifndef WIN32
332   MESSAGE("Engines_Parallel_Component_i::Stop_i() pthread_t "<< pthread_self()
333           << " pid " << getpid() << " instanceName "
334           << _instanceName.c_str() << " interface " << _interfaceName.c_str()
335           << " machineName " << Kernel_Utils::GetHostname().c_str()<< " _id " << hex << _id
336           << dec << " _ThreadId " << _ThreadId );
337 #else
338   MESSAGE("Engines_Parallel_Component_i::Stop_i() pthread_t "<< pthread_self().p
339           << " pid " << _getpid() << " instanceName "
340           << _instanceName.c_str() << " interface " << _interfaceName.c_str()
341           << " machineName " << Kernel_Utils::GetHostname().c_str()<< " _id " << hex << _id
342           << dec << " _ThreadId " << _ThreadId );
343 #endif
344   
345
346   bool RetVal = false ;
347 #ifndef WIN32
348   if ( _ThreadId > 0 && pthread_self() != _ThreadId )
349     {
350       RetVal = Killer( _ThreadId , 0 ) ;
351       _ThreadId = (pthread_t ) -1 ;
352     }
353 #else
354   if ( _ThreadId > 0 && pthread_self().p != _ThreadId->p )
355     {
356       RetVal = Killer( *_ThreadId , 0 ) ;
357       _ThreadId = (pthread_t* ) 0 ;
358     }
359 #endif
360   return RetVal ;
361 }
362
363 //=============================================================================
364 /*! 
365  *  CORBA method: used in Supervision
366  */
367 //=============================================================================
368
369 bool Engines_Parallel_Component_i::Suspend_impl()
370 {
371 #ifndef WIN32
372   MESSAGE("Engines_Parallel_Component_i::Suspend_i() pthread_t "<< pthread_self()
373           << " pid " << getpid() << " instanceName "
374           << _instanceName.c_str() << " interface " << _interfaceName.c_str()
375           << " machineName " << Kernel_Utils::GetHostname().c_str()<< " _id " << hex << _id
376           << dec << " _ThreadId " << _ThreadId );
377 #else
378   MESSAGE("Engines_Parallel_Component_i::Suspend_i() pthread_t "<< pthread_self().p
379           << " pid " << _getpid() << " instanceName "
380           << _instanceName.c_str() << " interface " << _interfaceName.c_str()
381           << " machineName " << Kernel_Utils::GetHostname().c_str()<< " _id " << hex << _id
382           << dec << " _ThreadId " << _ThreadId );
383 #endif
384
385   bool RetVal = false ;
386 #ifndef WIN32
387   if ( _ThreadId > 0 && pthread_self() != _ThreadId )
388 #else
389   if ( _ThreadId > 0 && pthread_self().p != _ThreadId->p )
390 #endif
391     {
392       if ( _Sleeping )
393         {
394           return false ;
395         }
396     else 
397       {
398 #ifndef WIN32
399         RetVal = Killer( _ThreadId ,SIGINT ) ;
400 #else
401         RetVal = Killer( *_ThreadId ,SIGINT ) ;
402 #endif
403         //if ( RetVal ) _Sleeping = true;
404
405       }
406     }
407   return RetVal ;
408 }
409
410 //=============================================================================
411 /*! 
412  *  CORBA method: used in Supervision
413  */
414 //=============================================================================
415
416 bool Engines_Parallel_Component_i::Resume_impl()
417 {
418 #ifndef WIN32
419   MESSAGE("Engines_Parallel_Component_i::Resume_i() pthread_t "<< pthread_self()
420           << " pid " << getpid() << " instanceName "
421           << _instanceName.c_str() << " interface " << _interfaceName.c_str()
422           << " machineName " << Kernel_Utils::GetHostname().c_str()<< " _id " << hex << _id
423           << dec << " _ThreadId " << _ThreadId );
424 #else
425   MESSAGE("Engines_Parallel_Component_i::Resume_i() pthread_t "<< pthread_self().p
426           << " pid " << _getpid() << " instanceName "
427           << _instanceName.c_str() << " interface " << _interfaceName.c_str()
428           << " machineName " << Kernel_Utils::GetHostname().c_str()<< " _id " << hex << _id
429           << dec << " _ThreadId " << _ThreadId );
430 #endif
431   bool RetVal = false ;
432 #ifndef WIN32
433   if ( _ThreadId > 0 && pthread_self() != _ThreadId )
434 #else
435   if ( _ThreadId > 0 && pthread_self().p != _ThreadId->p )
436 #endif
437     {
438     if ( _Sleeping ) 
439       {
440         _Sleeping = false ;
441         RetVal = true ;
442       }
443     else
444       {
445         RetVal = false ;
446       }
447     }
448   return RetVal ;
449 }
450
451 //=============================================================================
452 /*! 
453  *  CORBA method: 
454  */
455 //=============================================================================
456
457 CORBA::Long Engines_Parallel_Component_i::CpuUsed_impl()
458 {
459   long cpu = 0 ;
460   if ( _ThreadId || _Executed )
461     {
462     if ( _ThreadId > 0 )
463       {
464 #ifndef WIN32
465       if ( pthread_self() != _ThreadId )
466 #else
467       if ( pthread_self().p != _ThreadId->p )
468 #endif
469         {
470         if ( _Sleeping )
471           {
472           }
473         else
474           {
475             // Get Cpu in the appropriate thread with that object !...
476             theEngines_Component = this ;
477 #ifndef WIN32
478             Killer( _ThreadId ,SIGUSR1 ) ;
479 #else
480             Killer( *_ThreadId ,SIGUSR11 ) ;
481 #endif
482           }
483         cpu = _ThreadCpuUsed ;
484         }
485       else
486         {
487           _ThreadCpuUsed = CpuUsed() ;
488           cpu = _ThreadCpuUsed ;
489           // std::cout << pthread_self() << " Engines_Parallel_Component_i::CpuUsed_impl "
490           //      << _serviceName << " " << cpu << std::endl ;
491       }
492     }
493     else 
494       {
495         cpu = _ThreadCpuUsed ;
496         // std::cout << pthread_self() << " Engines_Parallel_Component_i::CpuUsed_impl "
497         //      << _serviceName << " " << cpu<< std::endl ;
498       }
499     }
500   else
501     {
502       // std::cout<< pthread_self()<<"Engines_Parallel_Component_i::CpuUsed_impl _ThreadId "
503       //     <<_ThreadId <<" "<<_serviceName<<" _StartUsed "<<_StartUsed<<std::endl;
504     }
505   return cpu ;
506 }
507
508
509 //=============================================================================
510 /*! 
511  *  C++ method: return Container Servant
512  */
513 //=============================================================================
514
515 Engines_Parallel_Container_i *Engines_Parallel_Component_i::GetContainerPtr()
516 {
517   return dynamic_cast<Engines_Parallel_Container_i*>(_poa->id_to_servant(*_contId)) ;
518 }
519
520 //=============================================================================
521 /*! 
522  *  C++ method: set study Id
523  *  \param studyId         0 if instance is not associated to a study, 
524  *                         >0 otherwise (== study id)
525  *  \return true if the set of study Id is OK
526  *  must be set once by Container, at instance creation,
527  *  and cannot be changed after.
528  */
529 //=============================================================================
530
531 CORBA::Boolean Engines_Parallel_Component_i::setStudyId(CORBA::Long studyId)
532 {
533   ASSERT( studyId >= 0);
534   CORBA::Boolean ret = false;
535   if (_studyId < 0) // --- not yet initialized 
536     {
537       _studyId = studyId;
538       ret = true;
539     }
540   else
541     if ( _studyId == studyId) ret = true;
542   return ret;
543 }
544
545 //=============================================================================
546 /*! 
547  *  C++ method: return CORBA instance id, the id is set in derived class
548  *  constructor, when instance is activated.
549  */
550 //=============================================================================
551
552 PortableServer::ObjectId * Engines_Parallel_Component_i::getId()
553 {
554 //  MESSAGE("PortableServer::ObjectId * Engines_Parallel_Component_i::getId()");
555   return _id ;
556 }
557
558 //=============================================================================
559 /*! 
560  *  C++ method: used by derived classes for supervision
561  */
562 //=============================================================================
563
564 void Engines_Parallel_Component_i::beginService(const char *serviceName)
565 {
566 #ifndef WIN32
567   MESSAGE(pthread_self() << "Send BeginService notification for " <<serviceName
568           << endl << "Parallel Component instance : " << _instanceName << endl << endl);
569 #else
570   MESSAGE(pthread_self().p << "Send BeginService notification for " <<serviceName
571           << endl << "Parallel Component instance : " << _instanceName << endl << endl);
572 #endif
573 #ifndef WIN32
574   _ThreadId = pthread_self() ;
575 #else
576   _ThreadId = new pthread_t;
577   _ThreadId->p = pthread_self().p ;
578   _ThreadId->x = pthread_self().x ;
579 #endif
580   _StartUsed = 0 ;
581   _StartUsed = CpuUsed_impl() ;
582   _ThreadCpuUsed = 0 ;
583   _Executed = true ;
584   _serviceName = serviceName ;
585   if ( pthread_setcanceltype( PTHREAD_CANCEL_ASYNCHRONOUS , NULL ) )
586     {
587       perror("pthread_setcanceltype ") ;
588       exit(0) ;
589     }
590   if ( pthread_setcancelstate( PTHREAD_CANCEL_ENABLE , NULL ) )
591     {
592       perror("pthread_setcancelstate ") ;
593       exit(0) ;
594     }
595 //  MESSAGE(pthread_self() << " Return from BeginService for " << serviceName
596 //          << " ThreadId " << _ThreadId << " StartUsed " << _StartUsed
597 //          << " _graphName " << _graphName << " _nodeName " << _nodeName );
598
599   // --- for supervisor : all strings given with setProperties
600   //     are set in environment
601   bool overwrite = true;
602   std::map<std::string,CORBA::Any>::iterator it;
603   for (it = _fieldsDict.begin(); it != _fieldsDict.end(); it++)
604     {
605       std::string cle((*it).first);
606       if ((*it).second.type()->kind() == CORBA::tk_string)
607         {
608           const char* value;
609           (*it).second >>= value;
610           // ---todo: replace __GNUC__ test by an autoconf macro AC_CHECK_FUNC.
611 #if defined __GNUC__
612           //int ret = setenv(cle.c_str(), value, overwrite);
613           setenv(cle.c_str(), value, overwrite);
614 #else
615           //CCRT porting : setenv not defined in stdlib.h
616           std::string s(cle);
617           s+='=';
618           s+=value;
619           // char* cast because 1st arg of linux putenv function
620           // is not a const char* !
621           //int ret=putenv((char *)s.c_str());
622           putenv((char *)s.c_str());
623           //End of CCRT porting
624 #endif
625           MESSAGE("--- setenv: "<<cle<<" = "<< value);
626         }
627     }
628 }
629
630 //=============================================================================
631 /*! 
632  *  C++ method: used by derived classes for supervision
633  */
634 //=============================================================================
635
636 void Engines_Parallel_Component_i::endService(const char *serviceName)
637 {
638   if ( !_CanceledThread )
639     _ThreadCpuUsed = CpuUsed_impl() ;
640
641 #ifndef WIN32
642   MESSAGE(pthread_self() << " Send EndService notification for " << serviceName
643           << endl << " Parallel Component instance : " << _instanceName << " StartUsed "
644           << _StartUsed << " _ThreadCpuUsed "<< _ThreadCpuUsed << endl <<endl);
645 #else
646   MESSAGE(pthread_self().p << " Send EndService notification for " << serviceName
647           << endl << " Parallel Component instance : " << _instanceName << " StartUsed "
648     << _StartUsed << " _ThreadCpuUsed "<< _ThreadCpuUsed << endl <<endl);
649 #endif
650   _ThreadId = 0 ;
651 }
652
653 //=============================================================================
654 /*! 
655  *  C++ method: -- CHECK IF USED --
656  */
657 //=============================================================================
658
659 char* Engines_Parallel_Component_i::graphName()
660 {
661   return CORBA::string_dup( _graphName.c_str() ) ;
662 }
663
664 //=============================================================================
665 /*! 
666  *  C++ method: -- CHECK IF USED --
667  */
668 //=============================================================================
669
670 char* Engines_Parallel_Component_i::nodeName()
671 {
672   return CORBA::string_dup( _nodeName.c_str() ) ;
673 }
674
675 //=============================================================================
676 /*! 
677  *  C++ method: used in Supervision (see kill_impl)
678  */
679 //=============================================================================
680
681 bool Engines_Parallel_Component_i::Killer( pthread_t ThreadId , int signum )
682 {
683 #ifndef WIN32
684   if ( ThreadId )
685 #else
686   if ( ThreadId.p )
687 #endif
688     {
689       if ( signum == 0 )
690         {
691           if ( pthread_cancel( ThreadId ) )
692             {
693               perror("Killer pthread_cancel error") ;
694               return false ;
695             }
696           else
697             {
698 #ifndef WIN32
699               MESSAGE(pthread_self() << "Killer : ThreadId " << ThreadId
700                       << " pthread_canceled") ;
701 #else
702         MESSAGE(pthread_self().p << "Killer : ThreadId " << ThreadId.p
703                       << " pthread_canceled") ;
704 #endif
705             }
706         }
707       else
708         {
709           if ( pthread_kill( ThreadId , signum ) == -1 )
710             {
711               perror("Killer pthread_kill error") ;
712               return false ;
713             }
714           else 
715             {
716 #ifndef WIN32
717         MESSAGE(pthread_self() << "Killer : ThreadId " << ThreadId
718                       << " pthread_killed(" << signum << ")") ;
719 #else
720         MESSAGE(pthread_self().p << "Killer : ThreadId " << ThreadId.p
721                       << " pthread_killed(" << signum << ")") ;
722 #endif
723             }
724         }
725     }
726   return true ;
727 }
728
729 //=============================================================================
730 /*! 
731  *  C++ method:
732  */ 
733 //=============================================================================
734
735 void SetCpuUsed()
736 {
737   if (theEngines_Component)
738     theEngines_Component->SetCurCpu();
739 }
740
741 //=============================================================================
742 /*! 
743  *  C++ method:
744  */
745 //=============================================================================
746
747 void Engines_Parallel_Component_i::SetCurCpu()
748 {
749   _ThreadCpuUsed =  CpuUsed() ;
750   //  MESSAGE(pthread_self() << 
751   //  " Engines_Parallel_Component_i::SetCurCpu() _ThreadCpuUsed " << _ThreadCpuUsed) ;
752 }
753
754 //=============================================================================
755 /*! 
756  *  C++ method:
757  */
758 //=============================================================================
759
760 long Engines_Parallel_Component_i::CpuUsed()
761 {
762   long cpu = 0 ;
763 #ifndef WIN32
764   struct rusage usage ;
765   if ( _ThreadId || _Executed )
766     {
767       if ( getrusage( RUSAGE_SELF , &usage ) == -1 )
768         {
769           perror("Engines_Parallel_Component_i::CpuUsed") ;
770           return 0 ;
771         }
772       cpu = usage.ru_utime.tv_sec - _StartUsed ;
773       // std::cout << pthread_self() << " Engines_Parallel_Component_i::CpuUsed " << " "
774       //      << _serviceName   << usage.ru_utime.tv_sec << " - " << _StartUsed
775       //      << " = " << cpu << std::endl ;
776     }
777   else
778     {
779       // std::cout << pthread_self() << "Engines_Parallel_Component_i::CpuUsed _ThreadId "
780       //      << _ThreadId << " " << _serviceName<< " _StartUsed " 
781       //      << _StartUsed << std::endl ;
782     }
783 #else
784         // NOT implementet yet
785 #endif
786
787
788   return cpu ;
789 }
790
791 void CallCancelThread()
792 {
793   if ( theEngines_Component )
794     theEngines_Component->CancelThread() ;
795 }
796
797 //=============================================================================
798 /*!
799  *  C++ method:
800  */
801 //=============================================================================
802
803 void Engines_Parallel_Component_i::CancelThread()
804 {
805   _CanceledThread = true;
806 }
807
808 //=============================================================================
809 /*! 
810  *  C++ method: Send message to event channel
811  */
812 //=============================================================================
813
814 void Engines_Parallel_Component_i::sendMessage(const char *event_type,
815                                       const char *message)
816 {
817     _notifSupplier->Send(graphName(), nodeName(), event_type, message);
818 }
819
820 //=============================================================================
821 /*! 
822  *  C++ method: return standard library name built on component name
823  */
824 //=============================================================================
825
826 std::string Engines_Parallel_Component_i::GetDynLibraryName(const char *componentName)
827 {
828 #ifndef WIN32
829   std::string ret="lib";
830   ret+=componentName;
831   ret+="Engine.so";
832 #else
833   std::string ret=componentName;
834   ret+="Engine.dll";
835 #endif 
836   return ret;
837 }
838
839 //=============================================================================
840 /*! 
841  *  C++ method: DumpPython default implementation
842  */
843 //=============================================================================
844
845 Engines::TMPFile* Engines_Parallel_Component_i::DumpPython(CORBA::Object_ptr theStudy, 
846                                                            CORBA::Boolean isPublished, 
847                                                            CORBA::Boolean isMultiFile,
848                                                            CORBA::Boolean& isValidScript)
849 {
850   const char* aScript = isMultiFile ? "def RebuildData(theStudy): pass" : "";
851   char* aBuffer = new char[strlen(aScript)+1];
852   strcpy(aBuffer, aScript);
853   CORBA::Octet* anOctetBuf =  (CORBA::Octet*)aBuffer;
854   int aBufferSize = strlen(aBuffer)+1;
855   Engines::TMPFile_var aStreamFile = new Engines::TMPFile(aBufferSize, aBufferSize, anOctetBuf, 1); 
856   isValidScript = true;
857   return aStreamFile._retn(); 
858 }
859
860
861 Engines::Salome_file_ptr 
862 Engines_Parallel_Component_i::setInputFileToService(const char* service_name, 
863                                                     const char* Salome_file_name) 
864 {
865   // Try to find the service, if it doesn't exist, we add it.
866   _Service_file_map_it = _Input_Service_file_map.find(service_name);
867   if (_Service_file_map_it ==  _Input_Service_file_map.end()) {
868     _t_Salome_file_map * _map = new _t_Salome_file_map();
869     _Input_Service_file_map[service_name] = _map;
870     _t_Proxy_Salome_file_map * _proxy_map = new _t_Proxy_Salome_file_map();
871     _Proxy_Input_Service_file_map[service_name] = _proxy_map;
872     _t_IOR_Proxy_Salome_file_map * _IOR_proxy_map = new _t_IOR_Proxy_Salome_file_map();
873     _IOR_Proxy_Input_Service_file_map[service_name] = _IOR_proxy_map;
874   }
875   _t_Salome_file_map * _map = _Input_Service_file_map[service_name];
876   _t_Proxy_Salome_file_map * _proxy_map = _Proxy_Input_Service_file_map[service_name];
877   _t_IOR_Proxy_Salome_file_map * _IOR_proxy_map = _IOR_Proxy_Input_Service_file_map[service_name];
878   
879   pthread_mutex_lock(deploy_mutex);
880   std::string proxy_ior;
881
882   // Try to find the Salome_file ...
883   _Salome_file_map_it = _map->find(Salome_file_name);
884   if (_Salome_file_map_it ==  _map->end()) {
885
886     // We create a new PaCO++ object.
887     // He has the same configuration than
888     // his component
889
890     // Firstly, we have to create the proxy object
891     // of the Salome_file and transmit his
892     // reference to the other nodes.
893     Engines::Parallel_Salome_file_proxy_impl * proxy = NULL;
894     if (getMyRank() == 0) {
895       proxy = new Engines::Parallel_Salome_file_proxy_impl(CORBA::ORB::_duplicate(_orb),
896                                                            new paco_omni_fabrique());
897       proxy->copyGlobalContext(this); 
898       PaCO::PacoTopology_t serveur_topo;
899       serveur_topo.total = getTotalNode();
900       proxy->setTopology(serveur_topo);
901
902       // We register the CORBA objet into the POA
903       CORBA::Object_ptr proxy_ref = proxy->_this();
904
905       // We send the reference to all the nodes...
906       Engines::Parallel_Component_var component_proxy = 
907         Engines::Parallel_Component::_narrow(InterfaceParallel_impl::_proxy);
908       component_proxy->send_parallel_proxy_object(proxy_ref);
909
910       // Adding proxy into the map
911       (*_proxy_map)[Salome_file_name] = proxy;
912     }
913     else {
914       this->wait_parallel_object_proxy();
915     }
916
917     proxy_ior = this->get_parallel_proxy_object();
918     (*_IOR_proxy_map)[Salome_file_name] = proxy_ior;
919
920     // We register each node of the parallel Salome_file object
921     // into the proxy.
922     for (int i = 0; i < getTotalNode(); i++) {
923       if (i ==  getMyRank()) {
924         Parallel_Salome_file_i * servant = 
925           new Parallel_Salome_file_i(CORBA::ORB::_duplicate(_orb), 
926                                      proxy_ior.c_str(),
927                                      i);
928         servant->copyGlobalContext(this); 
929         
930         // We register the CORBA objet into the POA
931         servant->POA_PaCO::InterfaceParallel::_this();
932
933         // Register the servant
934         servant->deploy();
935
936         // Adding servant to the map
937         (*_map)[Salome_file_name] = servant;
938       }
939
940       _my_com->paco_barrier();
941       // start parallel object
942       if (getMyRank() == 0) {
943         proxy->start();
944         _my_com->paco_barrier();
945       }
946       else
947         _my_com->paco_barrier();
948     }
949     // Parallel_Salome_file is created and deployed
950     delete _proxy;
951     _proxy = NULL;
952   }
953
954   pthread_mutex_unlock(deploy_mutex);
955   proxy_ior = (*_IOR_proxy_map)[Salome_file_name];
956   CORBA::Object_ptr proxy_ref = _orb->string_to_object(proxy_ior.c_str());
957   return Engines::Salome_file::_narrow(proxy_ref);
958 }
959
960 Engines::Salome_file_ptr 
961 Engines_Parallel_Component_i::setOutputFileToService(const char* service_name, 
962                                                      const char* Salome_file_name) 
963 {
964   // Try to find the service, if it doesn't exist, we add it.
965   _Service_file_map_it = _Output_Service_file_map.find(service_name);
966   if (_Service_file_map_it ==  _Output_Service_file_map.end()) {
967     _t_Salome_file_map * _map = new _t_Salome_file_map();
968     _Output_Service_file_map[service_name] = _map;
969     _t_Proxy_Salome_file_map * _proxy_map = new _t_Proxy_Salome_file_map();
970     _Proxy_Output_Service_file_map[service_name] = _proxy_map;
971     _t_IOR_Proxy_Salome_file_map * _IOR_proxy_map = new _t_IOR_Proxy_Salome_file_map();
972     _IOR_Proxy_Output_Service_file_map[service_name] = _IOR_proxy_map;
973   }
974   _t_Salome_file_map * _map = _Output_Service_file_map[service_name];
975   _t_Proxy_Salome_file_map * _proxy_map = _Proxy_Output_Service_file_map[service_name];
976   _t_IOR_Proxy_Salome_file_map * _IOR_proxy_map = _IOR_Proxy_Output_Service_file_map[service_name];
977   
978   pthread_mutex_lock(deploy_mutex);
979   std::string proxy_ior;
980
981   // Try to find the Salome_file ...
982   _Salome_file_map_it = _map->find(Salome_file_name);
983   Engines::Parallel_Salome_file_proxy_impl * proxy;
984   if (_Salome_file_map_it ==  _map->end()) {
985
986     // We create a new PaCO++ object.
987     // He has the same configuration than
988     // his component
989
990     // Firstly, we have to create the proxy object
991     // of the Salome_file and transmit his
992     // reference to the other nodes.
993     if (getMyRank() == 0) {
994         proxy = new Engines::Parallel_Salome_file_proxy_impl(CORBA::ORB::_duplicate(_orb),
995                                                              new paco_omni_fabrique());
996       proxy->copyGlobalContext(this); 
997       PaCO::PacoTopology_t serveur_topo;
998       serveur_topo.total = getTotalNode();
999       proxy->setTopology(serveur_topo);
1000
1001       // We register the CORBA objet into the POA
1002       CORBA::Object_ptr proxy_ref = proxy->_this();
1003
1004       // We send the reference to all the nodes...
1005       Engines::Parallel_Component_var component_proxy = 
1006         Engines::Parallel_Component::_narrow(InterfaceParallel_impl::_proxy);
1007       component_proxy->send_parallel_proxy_object(proxy_ref);
1008
1009       // Adding proxy into the map
1010       (*_proxy_map)[Salome_file_name] = proxy;
1011     }
1012     else {
1013       this->wait_parallel_object_proxy();
1014     }
1015
1016     proxy_ior = this->get_parallel_proxy_object();
1017     (*_IOR_proxy_map)[Salome_file_name] = proxy_ior;
1018
1019     // We register each node of the parallel Salome_file object
1020     // into the proxy.
1021     for (int i = 0; i < getTotalNode(); i++) {
1022       if (i ==  getMyRank()) {
1023         Parallel_Salome_file_i * servant = 
1024           new Parallel_Salome_file_i(CORBA::ORB::_duplicate(_orb), 
1025                                      proxy_ior.c_str(),
1026                                      i);
1027         servant->copyGlobalContext(this); 
1028         
1029         // We register the CORBA objet into the POA
1030         servant->POA_PaCO::InterfaceParallel::_this();
1031
1032         // Register the servant
1033         servant->deploy();
1034
1035         // Adding servant to the map
1036         (*_map)[Salome_file_name] = servant;
1037       }
1038
1039       _my_com->paco_barrier();
1040       // start parallel object
1041       if (getMyRank() == 0) {
1042         proxy->start();
1043         _my_com->paco_barrier();
1044       }
1045       else
1046         _my_com->paco_barrier();
1047     }
1048
1049     // Parallel_Salome_file is created and deployed
1050     delete _proxy;
1051     _proxy = NULL;
1052   }
1053   pthread_mutex_unlock(deploy_mutex);
1054   proxy_ior = (*_IOR_proxy_map)[Salome_file_name];
1055   CORBA::Object_ptr proxy_ref = _orb->string_to_object(proxy_ior.c_str());
1056   return Engines::Salome_file::_narrow(proxy_ref);
1057 }
1058
1059 Engines::Salome_file_ptr 
1060 Engines_Parallel_Component_i::getInputFileToService(const char* service_name, 
1061                                                     const char* Salome_file_name) 
1062 {
1063   // Try to find the service, if it doesn't exist, we throw an exception.
1064   _Proxy_Service_file_map_it = _Proxy_Input_Service_file_map.find(service_name);
1065   if (_Proxy_Service_file_map_it ==  _Proxy_Input_Service_file_map.end()) {
1066     SALOME::ExceptionStruct es;
1067     es.type = SALOME::INTERNAL_ERROR;
1068     es.text = "service doesn't have salome files";
1069     throw SALOME::SALOME_Exception(es);
1070   }
1071   _t_Proxy_Salome_file_map * _map = _Proxy_Input_Service_file_map[service_name];
1072
1073   // Try to find the Salome_file ...
1074   _Proxy_Salome_file_map_it = _map->find(Salome_file_name);
1075   if (_Proxy_Salome_file_map_it ==  _map->end()) {
1076     SALOME::ExceptionStruct es;
1077     es.type = SALOME::INTERNAL_ERROR;
1078     es.text = "service doesn't have this Salome_file";
1079     throw SALOME::SALOME_Exception(es);
1080   }
1081
1082   // Client get the proxy object
1083   Engines::Parallel_Salome_file_proxy_impl * Sfile = (*_map)[Salome_file_name];
1084   return Sfile->_this();
1085 }
1086
1087 Engines::Salome_file_ptr 
1088 Engines_Parallel_Component_i::getOutputFileToService(const char* service_name, 
1089                                                      const char* Salome_file_name) 
1090 {
1091   // Try to find the service, if it doesn't exist, we throw an exception.
1092   _Proxy_Service_file_map_it = _Proxy_Output_Service_file_map.find(service_name);
1093   if (_Proxy_Service_file_map_it ==  _Proxy_Output_Service_file_map.end()) {
1094     SALOME::ExceptionStruct es;
1095     es.type = SALOME::INTERNAL_ERROR;
1096     es.text = "service doesn't have salome files";
1097     throw SALOME::SALOME_Exception(es);
1098   }
1099   _t_Proxy_Salome_file_map * _map = _Proxy_Output_Service_file_map[service_name];
1100
1101   // Try to find the Salome_file ...
1102   _Proxy_Salome_file_map_it = _map->find(Salome_file_name);
1103   if (_Proxy_Salome_file_map_it ==  _map->end()) {
1104     SALOME::ExceptionStruct es;
1105     es.type = SALOME::INTERNAL_ERROR;
1106     es.text = "service doesn't have this Salome_file";
1107     throw SALOME::SALOME_Exception(es);
1108   }
1109
1110   // Client get the proxy object
1111   Engines::Parallel_Salome_file_proxy_impl * Sfile = (*_map)[Salome_file_name];
1112   return Sfile->_this();
1113 }
1114
1115
1116 void 
1117 Engines_Parallel_Component_i::checkInputFilesToService(const char* service_name) 
1118 {
1119   // Try to find the service, if it doesn't exist, nothing to do.
1120   _Proxy_Service_file_map_it = _Proxy_Input_Service_file_map.find(service_name);
1121   if (_Proxy_Service_file_map_it !=  _Proxy_Input_Service_file_map.end()) {
1122     _t_Proxy_Salome_file_map * _proxy_map = _Proxy_Input_Service_file_map[service_name];
1123     _t_Proxy_Salome_file_map::iterator begin = _proxy_map->begin();
1124     _t_Proxy_Salome_file_map::iterator end = _proxy_map->end();
1125
1126     for(;begin!=end;begin++) {
1127       Engines::Parallel_Salome_file_proxy_impl * file = begin->second;
1128       std::string file_port_name = begin->first;
1129       configureSalome_file(service_name, file_port_name, file);
1130       file->recvFiles();
1131     }
1132   }
1133 }
1134
1135 void 
1136 Engines_Parallel_Component_i::checkOutputFilesToService(const char* service_name) 
1137 {
1138   // Try to find the service, if it doesn't exist, nothing to do.
1139   _Proxy_Service_file_map_it = _Proxy_Output_Service_file_map.find(service_name);
1140   if (_Proxy_Service_file_map_it !=  _Proxy_Output_Service_file_map.end()) {
1141     _t_Proxy_Salome_file_map * _map = _Proxy_Output_Service_file_map[service_name];
1142     _t_Proxy_Salome_file_map::iterator begin = _map->begin();
1143     _t_Proxy_Salome_file_map::iterator end = _map->end();
1144
1145     for(;begin!=end;begin++) {
1146       Engines::Parallel_Salome_file_proxy_impl * file = begin->second;
1147       std::string file_port_name = begin->first;
1148       configureSalome_file(service_name, file_port_name, file);
1149       file->recvFiles();
1150     }
1151   }
1152
1153 }
1154
1155 //=============================================================================
1156 /*! 
1157  *  C++ method: Used by the Parallel Component to deploy a Parallel Salome_file
1158  */ 
1159 //=============================================================================
1160 void 
1161 Engines_Parallel_Component_i::send_parallel_proxy_object(CORBA::Object_ptr proxy_ref) {
1162   _proxy = _orb->object_to_string(proxy_ref);
1163 }
1164
1165 //=============================================================================
1166 /*! 
1167  *  C++ method: Used by the Parallel Component to deploy a Parallel Salome_file
1168  */ 
1169 //=============================================================================
1170 void 
1171 Engines_Parallel_Component_i::wait_parallel_object_proxy() {
1172   char * proxy = NULL;
1173   proxy =  get_parallel_proxy_object();
1174   while(proxy == NULL)
1175   {
1176     sleep(1);
1177     proxy = get_parallel_proxy_object();
1178   }
1179 }
1180
1181 //=============================================================================
1182 /*! 
1183  *  C++ method: Used by the Parallel Component to deploy a Parallel Salome_file
1184  */ 
1185 //=============================================================================
1186 char * 
1187 Engines_Parallel_Component_i::get_parallel_proxy_object() {
1188   return _proxy;
1189 }
1190
1191
1192 //=============================================================================
1193 /*! 
1194  *  C++ method: used to configure the Salome_file into the runtime.
1195  *  \param service_name name of the service that use this Salome_file
1196  *  \param file_port_name name of the Salome_file
1197  *  \param file Parallel Salome_file C++ object
1198  */
1199 //=============================================================================
1200 void
1201 Engines_Parallel_Component_i::configureSalome_file(std::string service_name,
1202                                                    std::string file_port_name,
1203                                                    Engines::Parallel_Salome_file_proxy_impl * file) 
1204 {
1205   // By default this method does nothing
1206 }