Salome HOME
updated copyright message
[modules/kernel.git] / src / ParallelContainer / SALOME_ParallelComponent_i.cxx
1 // Copyright (C) 2007-2023  CEA, EDF, OPEN CASCADE
2 //
3 // Copyright (C) 2003-2007  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 // CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
5 //
6 // This library is free software; you can redistribute it and/or
7 // modify it under the terms of the GNU Lesser General Public
8 // License as published by the Free Software Foundation; either
9 // version 2.1 of the License, or (at your option) any later version.
10 //
11 // This library is distributed in the hope that it will be useful,
12 // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 // Lesser General Public License for more details.
15 //
16 // You should have received a copy of the GNU Lesser General Public
17 // License along with this library; if not, write to the Free Software
18 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
19 //
20 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 //
22
23 //  SALOME_ParallelComponent : implementation of container and engine for Parallel Kernel
24 //  File   : SALOME_ParallelComponent_i.cxx
25 //  Author : Andr� RIBES, EDF
26 //  Author : Paul RASCLE, EDF - MARC TAJCHMAN, CEA
27 //
28 #include "SALOME_ParallelComponent_i.hxx"
29 #include "SALOME_ParallelContainer_i.hxx"
30
31 #include "OpUtil.hxx"
32 #include <stdio.h>
33 #ifndef WIN32
34 #include <dlfcn.h>
35 #endif
36 #include <cstdlib>
37 #include "utilities.h"
38 #include "Basics_Utils.hxx"
39
40 #ifndef WIN32
41 #include <sys/time.h>
42 #include <sys/resource.h>
43 #include <unistd.h>
44 #else
45 #include <sys/timeb.h>
46 int SIGUSR11 = 1000;
47 #endif
48
49 #include <paco_dummy.h>
50 #include <paco_omni.h>
51
52
53 extern bool _Sleeping ;
54 static Engines_Parallel_Component_i * theEngines_Component ;
55
56 bool Engines_Parallel_Component_i::_isMultiInstance = false;
57
58 //=============================================================================
59 /*!
60  *  Standard Constructor for generic Component, used in derived class
61  *  Connection to Registry and Notification
62  *  \param orb Object Request broker given by Container
63  *  \parap poa Portable Object Adapter from Container (normally root_poa)
64  *  \param contId container CORBA id inside the server
65  *  \param instanceName unique instance name for this object (see Container_i)
66  *  \param interfaceName component class name
67  *  \param notif use of notification
68  */
69 //=============================================================================
70
71 Engines_Parallel_Component_i::Engines_Parallel_Component_i(CORBA::ORB_ptr orb, char * ior, int rank,
72                                          PortableServer::POA_ptr poa,
73                                          PortableServer::ObjectId * contId,
74                                          const char *instanceName,
75                                          const char *interfaceName,
76                                          bool notif,
77                                          bool regist) :
78   InterfaceParallel_impl(orb,ior,rank),
79   Engines::EngineComponent_serv(orb,ior,rank),
80   Engines::EngineComponent_base_serv(orb,ior,rank),
81   Engines::Parallel_Component_serv(orb,ior,rank),
82   Engines::Parallel_Component_base_serv(orb,ior,rank),
83   _instanceName(instanceName),
84   _interfaceName(interfaceName),
85   _id(NULL),
86   _myConnexionToRegistry(0),
87   _ThreadId(0) ,
88   _ThreadCpuUsed(0) ,
89   _Executed(false) ,
90   _graphName("") ,
91   _nodeName(""),
92   _destroyed(false),
93   _CanceledThread(false)
94 {
95   MESSAGE("Parallel Component constructor with instanceName "<< _instanceName);
96   //SCRUTE(pd_refCount);
97   _orb = CORBA::ORB::_duplicate(orb);
98   _poa = PortableServer::POA::_duplicate(poa);
99   _contId = contId ;
100   CORBA::Object_var o = _poa->id_to_reference(*contId); // container ior...
101
102   if (regist)
103   {
104     CORBA::String_var the_ior = _orb->object_to_string(o);
105     _myConnexionToRegistry = new RegistryConnexion(0, 0, the_ior,"theSession",
106                                                    _instanceName.c_str());
107   }
108   _notifSupplier = new NOTIFICATION_Supplier(instanceName, notif);
109
110   deploy_mutex = new pthread_mutex_t();
111   pthread_mutex_init(deploy_mutex, NULL);
112   _proxy = NULL;
113  //SCRUTE(pd_refCount);
114 }
115
116 //=============================================================================
117 /*!
118  *  Destructor: call Container for decrement of instances count.
119  *  When instances count falls to 0, the container tries to remove the
120  *  component library (dlclose)
121  */
122 //=============================================================================
123
124 Engines_Parallel_Component_i::~Engines_Parallel_Component_i()
125 {
126   MESSAGE("Parallel Component destructor");
127   Engines_Parallel_Container_i::decInstanceCnt(_interfaceName);
128   if(_myConnexionToRegistry)delete _myConnexionToRegistry;
129   if(_notifSupplier)delete _notifSupplier;
130   if (_id)
131     delete(_id);
132
133   pthread_mutex_destroy(deploy_mutex);
134   delete deploy_mutex;
135   if (_proxy)
136     delete _proxy;
137 }
138
139 //=============================================================================
140 /*!
141  *  CORBA method: return name of the instance, unique in this Container
142  */
143 //=============================================================================
144
145 char* Engines_Parallel_Component_i::instanceName()
146 {
147    return CORBA::string_dup(_instanceName.c_str()) ;
148 }
149
150 //=============================================================================
151 /*!
152  *  CORBA method: return name of the component class
153  */
154 //=============================================================================
155
156 char* Engines_Parallel_Component_i::interfaceName()
157 {
158   return CORBA::string_dup(_interfaceName.c_str()) ;
159 }
160
161 //=============================================================================
162 /*!
163  *  CORBA method: Test if instance is alive and responds
164  */
165 //=============================================================================
166
167 void Engines_Parallel_Component_i::ping()
168 {
169 #ifndef WIN32
170   MESSAGE("Engines_Parallel_Component_i::ping() pid "<< getpid() << " threadid "
171           << pthread_self());
172 #else
173   MESSAGE("Engines_Parallel_Component_i::ping() pid "<< _getpid()<< " threadid "
174           << pthread_self().p );
175 #endif
176 }
177
178 //=============================================================================
179 /*!
180  *  CORBA method: Deactivate this instance. CORBA object is deactivated (do not
181  *  respond any more to CORBA calls), the connection to Regsitry is removed
182  *  (Registry informed of deactivation), internal server reference counter on
183  *  the derived servant class is decremented, to allow destruction of the class
184  *  (delete) by POA, when there are no more references.
185  *  -- TO BE USED BY CONTAINER ONLY (Container housekeeping) --
186  */
187 //=============================================================================
188
189 void Engines_Parallel_Component_i::destroy()
190 {
191   MESSAGE("Engines_Parallel_Component_i::destroy()");
192   MESSAGE("Object Instance will be deleted when Shutdown of the container will be called");
193   if (!_destroyed)
194   {
195     _remove_ref();
196     _destroyed = true;
197   }
198 }
199
200 //=============================================================================
201 /*!
202  *  CORBA method: return CORBA reference of the Container
203  *
204  */
205 //=============================================================================
206
207 Engines::Container_ptr Engines_Parallel_Component_i::GetContainerRef()
208 {
209   MESSAGE("Engines_Parallel_Component_i::GetContainerRef");
210   CORBA::Object_var o = _poa->id_to_reference(*_contId) ;
211   return Engines::Container::_narrow(o);
212 }
213
214 //=============================================================================
215 /*!
216  *  CORBA method:
217  *  Gives a sequence of (key=string,value=any) to the component.
218  *  Base class component stores the sequence in a map.
219  *  The map is cleared before.
220  *  This map is for use by derived classes.
221  *  \param dico sequence of (key=string,value=any)
222  */
223 //=============================================================================
224
225 void Engines_Parallel_Component_i::setProperties(const Engines::FieldsDict& dico)
226 {
227   _fieldsDict.clear();
228   for (CORBA::ULong i=0; i<dico.length(); i++)
229     {
230       std::string cle(dico[i].key);
231       _fieldsDict[cle] = dico[i].value;
232     }
233 }
234
235 //=============================================================================
236 /*!
237  *  CORBA method:
238  *  returns a previously stored map (key=string,value=any) as a sequence.
239  *  (see setProperties)
240  */
241 //=============================================================================
242
243 Engines::FieldsDict* Engines_Parallel_Component_i::getProperties()
244 {
245   Engines::FieldsDict_var copie = new Engines::FieldsDict;
246   copie->length(_fieldsDict.size());
247   std::map<std::string,CORBA::Any>::iterator it;
248   CORBA::ULong i = 0;
249   for (it = _fieldsDict.begin(); it != _fieldsDict.end(); it++, i++)
250     {
251       std::string cle((*it).first);
252       copie[i].key = CORBA::string_dup(cle.c_str());
253       copie[i].value = _fieldsDict[cle];
254     }
255   return copie._retn();
256 }
257
258 //=============================================================================
259 /*!
260  *  CORBA method: used by Supervision to give names to this instance
261  */
262 //=============================================================================
263
264 void Engines_Parallel_Component_i::Names( const char * graphName ,
265                                  const char * nodeName )
266 {
267   _graphName = graphName;
268   _nodeName = nodeName;
269   MESSAGE("Engines_Parallel_Component_i::Names( '" << _graphName << "' , '"
270                                                    << _nodeName << "' )");
271 }
272
273 //=============================================================================
274 /*!
275  *  CORBA method: used in Supervision
276  */
277 //=============================================================================
278
279 bool Engines_Parallel_Component_i::Kill_impl()
280 {
281 //  MESSAGE("Engines_Parallel_Component_i::Kill_i() pthread_t "<< pthread_self()
282 //          << " pid " << getpid() << " instanceName "
283 //          << _instanceName.c_str() << " interface " << _interfaceName.c_str()
284 //          << " machineName " << GetHostname().c_str()<< " _id " << hex << _id
285 //          << dec << " _ThreadId " << _ThreadId << " this " << hex << this
286 //          << dec ) ;
287
288   bool RetVal = false ;
289 #ifndef WIN32
290   if ( _ThreadId > 0 && pthread_self() != _ThreadId )
291     {
292       RetVal = Killer( _ThreadId , SIGUSR2 ) ;
293       _ThreadId = (pthread_t ) -1 ;
294     }
295
296 #else
297   if ( _ThreadId > 0 && pthread_self().p != _ThreadId->p )
298     {
299       RetVal = Killer( *_ThreadId , 0 ) ;
300       _ThreadId = (pthread_t* ) 0 ;
301     }
302
303 #endif
304   return RetVal ;
305 }
306
307 //=============================================================================
308 /*!
309  *  CORBA method: used in Supervision
310  */
311 //=============================================================================
312
313 bool Engines_Parallel_Component_i::Stop_impl()
314 {
315 #ifndef WIN32
316   MESSAGE("Engines_Parallel_Component_i::Stop_i() pthread_t "<< pthread_self()
317           << " pid " << getpid() << " instanceName "
318           << _instanceName.c_str() << " interface " << _interfaceName.c_str()
319           << " machineName " << Kernel_Utils::GetHostname().c_str()<< " _id " << hex << _id
320           << dec << " _ThreadId " << _ThreadId );
321 #else
322   MESSAGE("Engines_Parallel_Component_i::Stop_i() pthread_t "<< pthread_self().p
323           << " pid " << _getpid() << " instanceName "
324           << _instanceName.c_str() << " interface " << _interfaceName.c_str()
325           << " machineName " << Kernel_Utils::GetHostname().c_str()<< " _id " << hex << _id
326           << dec << " _ThreadId " << _ThreadId );
327 #endif
328
329
330   bool RetVal = false ;
331 #ifndef WIN32
332   if ( _ThreadId > 0 && pthread_self() != _ThreadId )
333     {
334       RetVal = Killer( _ThreadId , 0 ) ;
335       _ThreadId = (pthread_t ) -1 ;
336     }
337 #else
338   if ( _ThreadId > 0 && pthread_self().p != _ThreadId->p )
339     {
340       RetVal = Killer( *_ThreadId , 0 ) ;
341       _ThreadId = (pthread_t* ) 0 ;
342     }
343 #endif
344   return RetVal ;
345 }
346
347 //=============================================================================
348 /*!
349  *  CORBA method: used in Supervision
350  */
351 //=============================================================================
352
353 bool Engines_Parallel_Component_i::Suspend_impl()
354 {
355 #ifndef WIN32
356   MESSAGE("Engines_Parallel_Component_i::Suspend_i() pthread_t "<< pthread_self()
357           << " pid " << getpid() << " instanceName "
358           << _instanceName.c_str() << " interface " << _interfaceName.c_str()
359           << " machineName " << Kernel_Utils::GetHostname().c_str()<< " _id " << hex << _id
360           << dec << " _ThreadId " << _ThreadId );
361 #else
362   MESSAGE("Engines_Parallel_Component_i::Suspend_i() pthread_t "<< pthread_self().p
363           << " pid " << _getpid() << " instanceName "
364           << _instanceName.c_str() << " interface " << _interfaceName.c_str()
365           << " machineName " << Kernel_Utils::GetHostname().c_str()<< " _id " << hex << _id
366           << dec << " _ThreadId " << _ThreadId );
367 #endif
368
369   bool RetVal = false ;
370 #ifndef WIN32
371   if ( _ThreadId > 0 && pthread_self() != _ThreadId )
372 #else
373   if ( _ThreadId > 0 && pthread_self().p != _ThreadId->p )
374 #endif
375     {
376       if ( _Sleeping )
377         {
378           return false ;
379         }
380     else
381       {
382 #ifndef WIN32
383         RetVal = Killer( _ThreadId ,SIGINT ) ;
384 #else
385         RetVal = Killer( *_ThreadId ,SIGINT ) ;
386 #endif
387         //if ( RetVal ) _Sleeping = true;
388
389       }
390     }
391   return RetVal ;
392 }
393
394 //=============================================================================
395 /*!
396  *  CORBA method: used in Supervision
397  */
398 //=============================================================================
399
400 bool Engines_Parallel_Component_i::Resume_impl()
401 {
402 #ifndef WIN32
403   MESSAGE("Engines_Parallel_Component_i::Resume_i() pthread_t "<< pthread_self()
404           << " pid " << getpid() << " instanceName "
405           << _instanceName.c_str() << " interface " << _interfaceName.c_str()
406           << " machineName " << Kernel_Utils::GetHostname().c_str()<< " _id " << hex << _id
407           << dec << " _ThreadId " << _ThreadId );
408 #else
409   MESSAGE("Engines_Parallel_Component_i::Resume_i() pthread_t "<< pthread_self().p
410           << " pid " << _getpid() << " instanceName "
411           << _instanceName.c_str() << " interface " << _interfaceName.c_str()
412           << " machineName " << Kernel_Utils::GetHostname().c_str()<< " _id " << hex << _id
413           << dec << " _ThreadId " << _ThreadId );
414 #endif
415   bool RetVal = false ;
416 #ifndef WIN32
417   if ( _ThreadId > 0 && pthread_self() != _ThreadId )
418 #else
419   if ( _ThreadId > 0 && pthread_self().p != _ThreadId->p )
420 #endif
421     {
422     if ( _Sleeping )
423       {
424         _Sleeping = false ;
425         RetVal = true ;
426       }
427     else
428       {
429         RetVal = false ;
430       }
431     }
432   return RetVal ;
433 }
434
435 //=============================================================================
436 /*!
437  *  CORBA method:
438  */
439 //=============================================================================
440
441 CORBA::Long Engines_Parallel_Component_i::CpuUsed_impl()
442 {
443   long cpu = 0 ;
444   if ( _ThreadId || _Executed )
445     {
446     if ( _ThreadId > 0 )
447       {
448 #ifndef WIN32
449       if ( pthread_self() != _ThreadId )
450 #else
451       if ( pthread_self().p != _ThreadId->p )
452 #endif
453         {
454         if ( _Sleeping )
455           {
456           }
457         else
458           {
459             // Get Cpu in the appropriate thread with that object !...
460             theEngines_Component = this ;
461 #ifndef WIN32
462             Killer( _ThreadId ,SIGUSR1 ) ;
463 #else
464             Killer( *_ThreadId ,SIGUSR11 ) ;
465 #endif
466           }
467         cpu = _ThreadCpuUsed ;
468         }
469       else
470         {
471           _ThreadCpuUsed = CpuUsed() ;
472           cpu = _ThreadCpuUsed ;
473           // std::cout << pthread_self() << " Engines_Parallel_Component_i::CpuUsed_impl "
474           //      << _serviceName << " " << cpu << std::endl ;
475       }
476     }
477     else
478       {
479         cpu = _ThreadCpuUsed ;
480         // std::cout << pthread_self() << " Engines_Parallel_Component_i::CpuUsed_impl "
481         //      << _serviceName << " " << cpu<< std::endl ;
482       }
483     }
484   else
485     {
486       // std::cout<< pthread_self()<<"Engines_Parallel_Component_i::CpuUsed_impl _ThreadId "
487       //     <<_ThreadId <<" "<<_serviceName<<" _StartUsed "<<_StartUsed<<std::endl;
488     }
489   return cpu ;
490 }
491
492
493 //=============================================================================
494 /*!
495  *  C++ method: return Container Servant
496  */
497 //=============================================================================
498
499 Engines_Parallel_Container_i *Engines_Parallel_Component_i::GetContainerPtr()
500 {
501   return dynamic_cast<Engines_Parallel_Container_i*>(_poa->id_to_servant(*_contId)) ;
502 }
503
504 //=============================================================================
505 /*!
506  *  C++ method: return CORBA instance id, the id is set in derived class
507  *  constructor, when instance is activated.
508  */
509 //=============================================================================
510
511 PortableServer::ObjectId * Engines_Parallel_Component_i::getId()
512 {
513 //  MESSAGE("PortableServer::ObjectId * Engines_Parallel_Component_i::getId()");
514   return _id ;
515 }
516
517 //=============================================================================
518 /*!
519  *  C++ method: used by derived classes for supervision
520  */
521 //=============================================================================
522
523 void Engines_Parallel_Component_i::beginService(const char *serviceName)
524 {
525 #ifndef WIN32
526   MESSAGE(pthread_self() << "Send BeginService notification for " <<serviceName
527           << endl << "Parallel Component instance : " << _instanceName << endl << endl);
528 #else
529   MESSAGE(pthread_self().p << "Send BeginService notification for " <<serviceName
530           << endl << "Parallel Component instance : " << _instanceName << endl << endl);
531 #endif
532 #ifndef WIN32
533   _ThreadId = pthread_self() ;
534 #else
535   _ThreadId = new pthread_t;
536   _ThreadId->p = pthread_self().p ;
537   _ThreadId->x = pthread_self().x ;
538 #endif
539   _StartUsed = 0 ;
540   _StartUsed = CpuUsed_impl() ;
541   _ThreadCpuUsed = 0 ;
542   _Executed = true ;
543   _serviceName = serviceName ;
544   if ( pthread_setcanceltype( PTHREAD_CANCEL_ASYNCHRONOUS , NULL ) )
545     {
546       perror("pthread_setcanceltype ") ;
547       exit(0) ;
548     }
549   if ( pthread_setcancelstate( PTHREAD_CANCEL_ENABLE , NULL ) )
550     {
551       perror("pthread_setcancelstate ") ;
552       exit(0) ;
553     }
554 //  MESSAGE(pthread_self() << " Return from BeginService for " << serviceName
555 //          << " ThreadId " << _ThreadId << " StartUsed " << _StartUsed
556 //          << " _graphName " << _graphName << " _nodeName " << _nodeName );
557
558   // --- for supervisor : all strings given with setProperties
559   //     are set in environment
560   bool overwrite = true;
561   std::map<std::string,CORBA::Any>::iterator it;
562   for (it = _fieldsDict.begin(); it != _fieldsDict.end(); it++)
563     {
564       std::string cle((*it).first);
565       if ((*it).second.type()->kind() == CORBA::tk_string)
566         {
567           const char* value;
568           (*it).second >>= value;
569           // ---todo: replace __GNUC__ test by an autoconf macro AC_CHECK_FUNC.
570 #if defined __GNUC__
571           //int ret = setenv(cle.c_str(), value, overwrite);
572           setenv(cle.c_str(), value, overwrite);
573 #else
574           //CCRT porting : setenv not defined in stdlib.h
575           std::string s(cle);
576           s+='=';
577           s+=value;
578           // char* cast because 1st arg of linux putenv function
579           // is not a const char* !
580           //int ret=putenv((char *)s.c_str());
581           putenv((char *)s.c_str());
582           //End of CCRT porting
583 #endif
584           MESSAGE("--- setenv: "<<cle<<" = "<< value);
585         }
586     }
587 }
588
589 //=============================================================================
590 /*!
591  *  C++ method: used by derived classes for supervision
592  */
593 //=============================================================================
594
595 void Engines_Parallel_Component_i::endService(const char *serviceName)
596 {
597   if ( !_CanceledThread )
598     _ThreadCpuUsed = CpuUsed_impl() ;
599
600 #ifndef WIN32
601   MESSAGE(pthread_self() << " Send EndService notification for " << serviceName
602           << endl << " Parallel Component instance : " << _instanceName << " StartUsed "
603           << _StartUsed << " _ThreadCpuUsed "<< _ThreadCpuUsed << endl <<endl);
604 #else
605   MESSAGE(pthread_self().p << " Send EndService notification for " << serviceName
606           << endl << " Parallel Component instance : " << _instanceName << " StartUsed "
607     << _StartUsed << " _ThreadCpuUsed "<< _ThreadCpuUsed << endl <<endl);
608 #endif
609   _ThreadId = 0 ;
610 }
611
612 //=============================================================================
613 /*!
614  *  C++ method: -- CHECK IF USED --
615  */
616 //=============================================================================
617
618 char* Engines_Parallel_Component_i::graphName()
619 {
620   return CORBA::string_dup( _graphName.c_str() ) ;
621 }
622
623 //=============================================================================
624 /*!
625  *  C++ method: -- CHECK IF USED --
626  */
627 //=============================================================================
628
629 char* Engines_Parallel_Component_i::nodeName()
630 {
631   return CORBA::string_dup( _nodeName.c_str() ) ;
632 }
633
634 //=============================================================================
635 /*!
636  *  C++ method: used in Supervision (see kill_impl)
637  */
638 //=============================================================================
639
640 bool Engines_Parallel_Component_i::Killer( pthread_t ThreadId , int signum )
641 {
642 #ifndef WIN32
643   if ( ThreadId )
644 #else
645   if ( ThreadId.p )
646 #endif
647     {
648       if ( signum == 0 )
649         {
650           if ( pthread_cancel( ThreadId ) )
651             {
652               perror("Killer pthread_cancel error") ;
653               return false ;
654             }
655           else
656             {
657 #ifndef WIN32
658               MESSAGE(pthread_self() << "Killer : ThreadId " << ThreadId
659                       << " pthread_canceled") ;
660 #else
661         MESSAGE(pthread_self().p << "Killer : ThreadId " << ThreadId.p
662                       << " pthread_canceled") ;
663 #endif
664             }
665         }
666       else
667         {
668           if ( pthread_kill( ThreadId , signum ) == -1 )
669             {
670               perror("Killer pthread_kill error") ;
671               return false ;
672             }
673           else
674             {
675 #ifndef WIN32
676         MESSAGE(pthread_self() << "Killer : ThreadId " << ThreadId
677                       << " pthread_killed(" << signum << ")") ;
678 #else
679         MESSAGE(pthread_self().p << "Killer : ThreadId " << ThreadId.p
680                       << " pthread_killed(" << signum << ")") ;
681 #endif
682             }
683         }
684     }
685   return true ;
686 }
687
688 //=============================================================================
689 /*!
690  *  C++ method:
691  */
692 //=============================================================================
693
694 void SetCpuUsed()
695 {
696   if (theEngines_Component)
697     theEngines_Component->SetCurCpu();
698 }
699
700 //=============================================================================
701 /*!
702  *  C++ method:
703  */
704 //=============================================================================
705
706 void Engines_Parallel_Component_i::SetCurCpu()
707 {
708   _ThreadCpuUsed =  CpuUsed() ;
709   //  MESSAGE(pthread_self() <<
710   //  " Engines_Parallel_Component_i::SetCurCpu() _ThreadCpuUsed " << _ThreadCpuUsed) ;
711 }
712
713 //=============================================================================
714 /*!
715  *  C++ method:
716  */
717 //=============================================================================
718
719 long Engines_Parallel_Component_i::CpuUsed()
720 {
721   long cpu = 0 ;
722 #ifndef WIN32
723   struct rusage usage ;
724   if ( _ThreadId || _Executed )
725     {
726       if ( getrusage( RUSAGE_SELF , &usage ) == -1 )
727         {
728           perror("Engines_Parallel_Component_i::CpuUsed") ;
729           return 0 ;
730         }
731       cpu = usage.ru_utime.tv_sec - _StartUsed ;
732       // std::cout << pthread_self() << " Engines_Parallel_Component_i::CpuUsed " << " "
733       //      << _serviceName   << usage.ru_utime.tv_sec << " - " << _StartUsed
734       //      << " = " << cpu << std::endl ;
735     }
736   else
737     {
738       // std::cout << pthread_self() << "Engines_Parallel_Component_i::CpuUsed _ThreadId "
739       //      << _ThreadId << " " << _serviceName<< " _StartUsed "
740       //      << _StartUsed << std::endl ;
741     }
742 #else
743         // NOT implementet yet
744 #endif
745
746
747   return cpu ;
748 }
749
750 void CallCancelThread()
751 {
752   if ( theEngines_Component )
753     theEngines_Component->CancelThread() ;
754 }
755
756 //=============================================================================
757 /*!
758  *  C++ method:
759  */
760 //=============================================================================
761
762 void Engines_Parallel_Component_i::CancelThread()
763 {
764   _CanceledThread = true;
765 }
766
767 //=============================================================================
768 /*!
769  *  C++ method: Send message to event channel
770  */
771 //=============================================================================
772
773 void Engines_Parallel_Component_i::sendMessage(const char *event_type,
774                                       const char *message)
775 {
776     _notifSupplier->Send(graphName(), nodeName(), event_type, message);
777 }
778
779 //=============================================================================
780 /*!
781  *  C++ method: return standard library name built on component name
782  */
783 //=============================================================================
784
785 std::string Engines_Parallel_Component_i::GetDynLibraryName(const char *componentName)
786 {
787 #ifndef WIN32
788   std::string ret="lib";
789   ret+=componentName;
790 #ifdef __APPLE__
791   ret+="Engine.dylib";
792 #else
793   ret+="Engine.so";
794 #endif
795 #else
796   std::string ret=componentName;
797   ret+="Engine.dll";
798 #endif
799   return ret;
800 }
801
802 //=============================================================================
803 /*!
804  *  C++ method: DumpPython default implementation
805  */
806 //=============================================================================
807
808 Engines::TMPFile* Engines_Parallel_Component_i::DumpPython(CORBA::Boolean isPublished,
809                                                            CORBA::Boolean isMultiFile,
810                                                            CORBA::Boolean& isValidScript)
811 {
812   const char* aScript = isMultiFile ? "def RebuildData(): pass" : "";
813   char* aBuffer = new char[strlen(aScript)+1];
814   strcpy(aBuffer, aScript);
815   CORBA::Octet* anOctetBuf =  (CORBA::Octet*)aBuffer;
816   int aBufferSize = strlen(aBuffer)+1;
817   Engines::TMPFile_var aStreamFile = new Engines::TMPFile(aBufferSize, aBufferSize, anOctetBuf, 1);
818   isValidScript = true;
819   return aStreamFile._retn();
820 }
821
822
823 Engines::Salome_file_ptr
824 Engines_Parallel_Component_i::setInputFileToService(const char* service_name,
825                                                     const char* Salome_file_name)
826 {
827   // Try to find the service, if it doesn't exist, we add it.
828   _Service_file_map_it = _Input_Service_file_map.find(service_name);
829   if (_Service_file_map_it ==  _Input_Service_file_map.end()) {
830     _t_Salome_file_map * _map = new _t_Salome_file_map();
831     _Input_Service_file_map[service_name] = _map;
832     _t_Proxy_Salome_file_map * _proxy_map = new _t_Proxy_Salome_file_map();
833     _Proxy_Input_Service_file_map[service_name] = _proxy_map;
834     _t_IOR_Proxy_Salome_file_map * _IOR_proxy_map = new _t_IOR_Proxy_Salome_file_map();
835     _IOR_Proxy_Input_Service_file_map[service_name] = _IOR_proxy_map;
836   }
837   _t_Salome_file_map * _map = _Input_Service_file_map[service_name];
838   _t_Proxy_Salome_file_map * _proxy_map = _Proxy_Input_Service_file_map[service_name];
839   _t_IOR_Proxy_Salome_file_map * _IOR_proxy_map = _IOR_Proxy_Input_Service_file_map[service_name];
840
841   pthread_mutex_lock(deploy_mutex);
842   std::string proxy_ior;
843
844   // Try to find the Salome_file ...
845   _Salome_file_map_it = _map->find(Salome_file_name);
846   if (_Salome_file_map_it ==  _map->end()) {
847
848     // We create a new PaCO++ object.
849     // He has the same configuration than
850     // his component
851
852     // Firstly, we have to create the proxy object
853     // of the Salome_file and transmit his
854     // reference to the other nodes.
855     Engines::Parallel_Salome_file_proxy_impl * proxy = NULL;
856     if (getMyRank() == 0) {
857       proxy = new Engines::Parallel_Salome_file_proxy_impl(CORBA::ORB::_duplicate(_orb),
858                                                            new paco_omni_fabrique());
859       proxy->copyGlobalContext(this);
860       PaCO::PacoTopology_t serveur_topo;
861       serveur_topo.total = getTotalNode();
862       proxy->setTopology(serveur_topo);
863
864       // We register the CORBA object into the POA
865       CORBA::Object_ptr proxy_ref = proxy->_this();
866
867       // We send the reference to all the nodes...
868       Engines::Parallel_Component_var component_proxy =
869         Engines::Parallel_Component::_narrow(InterfaceParallel_impl::_proxy);
870       component_proxy->send_parallel_proxy_object(proxy_ref);
871
872       // Adding proxy into the map
873       (*_proxy_map)[Salome_file_name] = proxy;
874     }
875     else {
876       this->wait_parallel_object_proxy();
877     }
878
879     proxy_ior = this->get_parallel_proxy_object();
880     (*_IOR_proxy_map)[Salome_file_name] = proxy_ior;
881
882     // We register each node of the parallel Salome_file object
883     // into the proxy.
884     for (int i = 0; i < getTotalNode(); i++) {
885       if (i ==  getMyRank()) {
886         Parallel_Salome_file_i * servant =
887           new Parallel_Salome_file_i(CORBA::ORB::_duplicate(_orb),
888                                      proxy_ior.c_str(),
889                                      i);
890         servant->copyGlobalContext(this);
891
892         // We register the CORBA object into the POA
893         servant->POA_PaCO::InterfaceParallel::_this();
894
895         // Register the servant
896         servant->deploy();
897
898         // Adding servant to the map
899         (*_map)[Salome_file_name] = servant;
900       }
901
902       _my_com->paco_barrier();
903       // start parallel object
904       if (getMyRank() == 0) {
905         proxy->start();
906         _my_com->paco_barrier();
907       }
908       else
909         _my_com->paco_barrier();
910     }
911     // Parallel_Salome_file is created and deployed
912     delete _proxy;
913     _proxy = NULL;
914   }
915
916   pthread_mutex_unlock(deploy_mutex);
917   proxy_ior = (*_IOR_proxy_map)[Salome_file_name];
918   CORBA::Object_ptr proxy_ref = _orb->string_to_object(proxy_ior.c_str());
919   return Engines::Salome_file::_narrow(proxy_ref);
920 }
921
922 Engines::Salome_file_ptr
923 Engines_Parallel_Component_i::setOutputFileToService(const char* service_name,
924                                                      const char* Salome_file_name)
925 {
926   // Try to find the service, if it doesn't exist, we add it.
927   _Service_file_map_it = _Output_Service_file_map.find(service_name);
928   if (_Service_file_map_it ==  _Output_Service_file_map.end()) {
929     _t_Salome_file_map * _map = new _t_Salome_file_map();
930     _Output_Service_file_map[service_name] = _map;
931     _t_Proxy_Salome_file_map * _proxy_map = new _t_Proxy_Salome_file_map();
932     _Proxy_Output_Service_file_map[service_name] = _proxy_map;
933     _t_IOR_Proxy_Salome_file_map * _IOR_proxy_map = new _t_IOR_Proxy_Salome_file_map();
934     _IOR_Proxy_Output_Service_file_map[service_name] = _IOR_proxy_map;
935   }
936   _t_Salome_file_map * _map = _Output_Service_file_map[service_name];
937   _t_Proxy_Salome_file_map * _proxy_map = _Proxy_Output_Service_file_map[service_name];
938   _t_IOR_Proxy_Salome_file_map * _IOR_proxy_map = _IOR_Proxy_Output_Service_file_map[service_name];
939
940   pthread_mutex_lock(deploy_mutex);
941   std::string proxy_ior;
942
943   // Try to find the Salome_file ...
944   _Salome_file_map_it = _map->find(Salome_file_name);
945   Engines::Parallel_Salome_file_proxy_impl * proxy;
946   if (_Salome_file_map_it ==  _map->end()) {
947
948     // We create a new PaCO++ object.
949     // He has the same configuration than
950     // his component
951
952     // Firstly, we have to create the proxy object
953     // of the Salome_file and transmit his
954     // reference to the other nodes.
955     if (getMyRank() == 0) {
956         proxy = new Engines::Parallel_Salome_file_proxy_impl(CORBA::ORB::_duplicate(_orb),
957                                                              new paco_omni_fabrique());
958       proxy->copyGlobalContext(this);
959       PaCO::PacoTopology_t serveur_topo;
960       serveur_topo.total = getTotalNode();
961       proxy->setTopology(serveur_topo);
962
963       // We register the CORBA object into the POA
964       CORBA::Object_ptr proxy_ref = proxy->_this();
965
966       // We send the reference to all the nodes...
967       Engines::Parallel_Component_var component_proxy =
968         Engines::Parallel_Component::_narrow(InterfaceParallel_impl::_proxy);
969       component_proxy->send_parallel_proxy_object(proxy_ref);
970
971       // Adding proxy into the map
972       (*_proxy_map)[Salome_file_name] = proxy;
973     }
974     else {
975       this->wait_parallel_object_proxy();
976     }
977
978     proxy_ior = this->get_parallel_proxy_object();
979     (*_IOR_proxy_map)[Salome_file_name] = proxy_ior;
980
981     // We register each node of the parallel Salome_file object
982     // into the proxy.
983     for (int i = 0; i < getTotalNode(); i++) {
984       if (i ==  getMyRank()) {
985         Parallel_Salome_file_i * servant =
986           new Parallel_Salome_file_i(CORBA::ORB::_duplicate(_orb),
987                                      proxy_ior.c_str(),
988                                      i);
989         servant->copyGlobalContext(this);
990
991         // We register the CORBA object into the POA
992         servant->POA_PaCO::InterfaceParallel::_this();
993
994         // Register the servant
995         servant->deploy();
996
997         // Adding servant to the map
998         (*_map)[Salome_file_name] = servant;
999       }
1000
1001       _my_com->paco_barrier();
1002       // start parallel object
1003       if (getMyRank() == 0) {
1004         proxy->start();
1005         _my_com->paco_barrier();
1006       }
1007       else
1008         _my_com->paco_barrier();
1009     }
1010
1011     // Parallel_Salome_file is created and deployed
1012     delete _proxy;
1013     _proxy = NULL;
1014   }
1015   pthread_mutex_unlock(deploy_mutex);
1016   proxy_ior = (*_IOR_proxy_map)[Salome_file_name];
1017   CORBA::Object_ptr proxy_ref = _orb->string_to_object(proxy_ior.c_str());
1018   return Engines::Salome_file::_narrow(proxy_ref);
1019 }
1020
1021 Engines::Salome_file_ptr
1022 Engines_Parallel_Component_i::getInputFileToService(const char* service_name,
1023                                                     const char* Salome_file_name)
1024 {
1025   // Try to find the service, if it doesn't exist, we throw an exception.
1026   _Proxy_Service_file_map_it = _Proxy_Input_Service_file_map.find(service_name);
1027   if (_Proxy_Service_file_map_it ==  _Proxy_Input_Service_file_map.end()) {
1028     SALOME::ExceptionStruct es;
1029     es.type = SALOME::INTERNAL_ERROR;
1030     es.text = "service doesn't have salome files";
1031     throw SALOME::SALOME_Exception(es);
1032   }
1033   _t_Proxy_Salome_file_map * _map = _Proxy_Input_Service_file_map[service_name];
1034
1035   // Try to find the Salome_file ...
1036   _Proxy_Salome_file_map_it = _map->find(Salome_file_name);
1037   if (_Proxy_Salome_file_map_it ==  _map->end()) {
1038     SALOME::ExceptionStruct es;
1039     es.type = SALOME::INTERNAL_ERROR;
1040     es.text = "service doesn't have this Salome_file";
1041     throw SALOME::SALOME_Exception(es);
1042   }
1043
1044   // Client get the proxy object
1045   Engines::Parallel_Salome_file_proxy_impl * Sfile = (*_map)[Salome_file_name];
1046   return Sfile->_this();
1047 }
1048
1049 Engines::Salome_file_ptr
1050 Engines_Parallel_Component_i::getOutputFileToService(const char* service_name,
1051                                                      const char* Salome_file_name)
1052 {
1053   // Try to find the service, if it doesn't exist, we throw an exception.
1054   _Proxy_Service_file_map_it = _Proxy_Output_Service_file_map.find(service_name);
1055   if (_Proxy_Service_file_map_it ==  _Proxy_Output_Service_file_map.end()) {
1056     SALOME::ExceptionStruct es;
1057     es.type = SALOME::INTERNAL_ERROR;
1058     es.text = "service doesn't have salome files";
1059     throw SALOME::SALOME_Exception(es);
1060   }
1061   _t_Proxy_Salome_file_map * _map = _Proxy_Output_Service_file_map[service_name];
1062
1063   // Try to find the Salome_file ...
1064   _Proxy_Salome_file_map_it = _map->find(Salome_file_name);
1065   if (_Proxy_Salome_file_map_it ==  _map->end()) {
1066     SALOME::ExceptionStruct es;
1067     es.type = SALOME::INTERNAL_ERROR;
1068     es.text = "service doesn't have this Salome_file";
1069     throw SALOME::SALOME_Exception(es);
1070   }
1071
1072   // Client get the proxy object
1073   Engines::Parallel_Salome_file_proxy_impl * Sfile = (*_map)[Salome_file_name];
1074   return Sfile->_this();
1075 }
1076
1077
1078 void
1079 Engines_Parallel_Component_i::checkInputFilesToService(const char* service_name)
1080 {
1081   // Try to find the service, if it doesn't exist, nothing to do.
1082   _Proxy_Service_file_map_it = _Proxy_Input_Service_file_map.find(service_name);
1083   if (_Proxy_Service_file_map_it !=  _Proxy_Input_Service_file_map.end()) {
1084     _t_Proxy_Salome_file_map * _proxy_map = _Proxy_Input_Service_file_map[service_name];
1085     _t_Proxy_Salome_file_map::iterator begin = _proxy_map->begin();
1086     _t_Proxy_Salome_file_map::iterator end = _proxy_map->end();
1087
1088     for(;begin!=end;begin++) {
1089       Engines::Parallel_Salome_file_proxy_impl * file = begin->second;
1090       std::string file_port_name = begin->first;
1091       configureSalome_file(service_name, file_port_name, file);
1092       file->recvFiles();
1093     }
1094   }
1095 }
1096
1097 void
1098 Engines_Parallel_Component_i::checkOutputFilesToService(const char* service_name)
1099 {
1100   // Try to find the service, if it doesn't exist, nothing to do.
1101   _Proxy_Service_file_map_it = _Proxy_Output_Service_file_map.find(service_name);
1102   if (_Proxy_Service_file_map_it !=  _Proxy_Output_Service_file_map.end()) {
1103     _t_Proxy_Salome_file_map * _map = _Proxy_Output_Service_file_map[service_name];
1104     _t_Proxy_Salome_file_map::iterator begin = _map->begin();
1105     _t_Proxy_Salome_file_map::iterator end = _map->end();
1106
1107     for(;begin!=end;begin++) {
1108       Engines::Parallel_Salome_file_proxy_impl * file = begin->second;
1109       std::string file_port_name = begin->first;
1110       configureSalome_file(service_name, file_port_name, file);
1111       file->recvFiles();
1112     }
1113   }
1114
1115 }
1116
1117 //=============================================================================
1118 /*!
1119  *  C++ method: Used by the Parallel Component to deploy a Parallel Salome_file
1120  */
1121 //=============================================================================
1122 void
1123 Engines_Parallel_Component_i::send_parallel_proxy_object(CORBA::Object_ptr proxy_ref) {
1124   _proxy = _orb->object_to_string(proxy_ref);
1125 }
1126
1127 //=============================================================================
1128 /*!
1129  *  C++ method: Used by the Parallel Component to deploy a Parallel Salome_file
1130  */
1131 //=============================================================================
1132 void
1133 Engines_Parallel_Component_i::wait_parallel_object_proxy() {
1134   char * proxy = NULL;
1135   proxy =  get_parallel_proxy_object();
1136   while(proxy == NULL)
1137   {
1138     sleep(1);
1139     proxy = get_parallel_proxy_object();
1140   }
1141 }
1142
1143 //=============================================================================
1144 /*!
1145  *  C++ method: Used by the Parallel Component to deploy a Parallel Salome_file
1146  */
1147 //=============================================================================
1148 char *
1149 Engines_Parallel_Component_i::get_parallel_proxy_object() {
1150   return _proxy;
1151 }
1152
1153
1154 //=============================================================================
1155 /*!
1156  *  C++ method: used to configure the Salome_file into the runtime.
1157  *  \param service_name name of the service that use this Salome_file
1158  *  \param file_port_name name of the Salome_file
1159  *  \param file Parallel Salome_file C++ object
1160  */
1161 //=============================================================================
1162 void
1163 Engines_Parallel_Component_i::configureSalome_file(std::string service_name,
1164                                                    std::string file_port_name,
1165                                                    Engines::Parallel_Salome_file_proxy_impl * file)
1166 {
1167   // By default this method does nothing
1168 }