1 // SALOME_ParallelComponent : implementation of container and engine for Parallel Kernel
3 // Copyright (C) 2007 OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 // CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
6 // This library is free software; you can redistribute it and/or
7 // modify it under the terms of the GNU Lesser General Public
8 // License as published by the Free Software Foundation; either
9 // version 2.1 of the License.
11 // This library is distributed in the hope that it will be useful,
12 // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 // Lesser General Public License for more details.
16 // You should have received a copy of the GNU Lesser General Public
17 // License along with this library; if not, write to the Free Software
18 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
20 // See http://www.opencascade.org/SALOME/ or email : webmaster.salome@opencascade.org
22 // File : SALOME_ParallelComponent_i.cxx
23 // Author : André RIBES, EDF
24 // Author : Paul RASCLE, EDF - MARC TAJCHMAN, CEA
26 #include "SALOME_ParallelComponent_i.hxx"
27 #include "SALOME_ParallelContainer_i.hxx"
35 #include "utilities.h"
39 #include <sys/resource.h>
42 #include <sys/timeb.h>
49 extern bool _Sleeping ;
50 static Engines_Parallel_Component_i * theEngines_Component ;
52 bool Engines_Parallel_Component_i::_isMultiStudy = true;
53 bool Engines_Parallel_Component_i::_isMultiInstance = false;
55 //=============================================================================
57 * Default constructor, not for use
59 //=============================================================================
61 Engines_Parallel_Component_i::Engines_Parallel_Component_i(CORBA::ORB_ptr orb, char * ior) :
62 InterfaceParallel_impl(orb,ior), Engines::Component_serv(orb,ior)
65 INFOS("Default Constructor...");
68 //=============================================================================
70 * Standard Constructor for generic Component, used in derived class
71 * Connection to Registry and Notification
72 * \param orb Object Request broker given by Container
73 * \parap poa Portable Object Adapter from Container (normally root_poa)
74 * \param contId container CORBA id inside the server
75 * \param instanceName unique instance name for this object (see Container_i)
76 * \param interfaceName component class name
77 * \param notif use of notification
79 //=============================================================================
81 Engines_Parallel_Component_i::Engines_Parallel_Component_i(CORBA::ORB_ptr orb, char * ior,
82 PortableServer::POA_ptr poa,
83 PortableServer::ObjectId * contId,
84 const char *instanceName,
85 const char *interfaceName,
87 InterfaceParallel_impl(orb,ior), Engines::Component_serv(orb,ior),
88 _instanceName(instanceName),
89 _interfaceName(interfaceName),
90 _myConnexionToRegistry(0),
98 MESSAGE("Component constructor with instanceName "<< _instanceName);
99 //SCRUTE(pd_refCount);
100 _orb = CORBA::ORB::_duplicate(orb);
101 _poa = PortableServer::POA::_duplicate(poa);
103 CORBA::Object_var o = _poa->id_to_reference(*contId); // container ior...
104 const CORBA::String_var the_ior = _orb->object_to_string(o);
105 _myConnexionToRegistry = new RegistryConnexion(0, 0, the_ior,"theSession",
106 _instanceName.c_str());
108 _notifSupplier = new NOTIFICATION_Supplier(instanceName, notif);
109 //SCRUTE(pd_refCount);
112 //=============================================================================
114 * Destructor: call Container for decrement of instances count.
115 * When instances count falls to 0, the container tries to remove the
116 * component library (dlclose)
118 //=============================================================================
120 Engines_Parallel_Component_i::~Engines_Parallel_Component_i()
122 MESSAGE("Component destructor");
123 Engines_Parallel_Container_i::decInstanceCnt(_interfaceName);
126 //=============================================================================
128 * CORBA method: return name of the instance, unique in this Container
130 //=============================================================================
132 char* Engines_Parallel_Component_i::instanceName()
134 return CORBA::string_dup(_instanceName.c_str()) ;
137 //=============================================================================
139 * CORBA method: return name of the component class
141 //=============================================================================
143 char* Engines_Parallel_Component_i::interfaceName()
145 return CORBA::string_dup(_interfaceName.c_str()) ;
148 //=============================================================================
150 * CORBA method: Get study Id
151 * \return -1: not initialised (Internal Error)
152 * 0: multistudy component instance
153 * >0: study id associated to this instance
155 //=============================================================================
157 CORBA::Long Engines_Parallel_Component_i::getStudyId()
162 //=============================================================================
164 * CORBA method: Test if instance is alive and responds
166 //=============================================================================
168 void Engines_Parallel_Component_i::ping()
170 // MESSAGE("Engines_Parallel_Component_i::ping_c() pid "<< getpid() << " threadid " << pthread_self());
173 //=============================================================================
175 * CORBA method: Deactivate this instance. CORBA object is deactivated (do not
176 * respond any more to CORBA calls), the connection to Regsitry is removed
177 * (Registry informed of deactivation), internal server reference counter on
178 * the derived servant class is decremented, to allow destruction of the class
179 * (delete) by POA, when there are no more references.
180 * -- TO BE USED BY CONTAINER ONLY (Container housekeeping) --
182 //=============================================================================
184 void Engines_Parallel_Component_i::destroy()
186 MESSAGE("Engines_Parallel_Component_i::destroy()");
187 //SCRUTE(pd_refCount);
189 delete _notifSupplier;
192 delete _myConnexionToRegistry;
193 _myConnexionToRegistry = 0 ;
194 _poa->deactivate_object(*_id) ;
195 CORBA::release(_poa) ;
197 //SCRUTE(pd_refCount);
198 _thisObj->_remove_ref();
199 //SCRUTE(pd_refCount);
200 MESSAGE("Engines_Parallel_Component_i::destroyed") ;
203 //=============================================================================
205 * CORBA method: return CORBA reference of the Container
208 //=============================================================================
210 Engines::Container_ptr Engines_Parallel_Component_i::GetContainerRef()
212 MESSAGE("Engines_Parallel_Component_i::GetContainerRef");
213 CORBA::Object_ptr o = _poa->id_to_reference(*_contId) ;
214 return Engines::Container::_narrow(o);
217 //=============================================================================
220 * Gives a sequence of (key=string,value=any) to the component.
221 * Base class component stores the sequence in a map.
222 * The map is cleared before.
223 * This map is for use by derived classes.
224 * \param dico sequence of (key=string,value=any)
226 //=============================================================================
228 void Engines_Parallel_Component_i::setProperties(const Engines::FieldsDict& dico)
231 for (CORBA::ULong i=0; i<dico.length(); i++)
233 std::string cle(dico[i].key);
234 _fieldsDict[cle] = dico[i].value;
238 //=============================================================================
241 * returns a previously stored map (key=string,value=any) as a sequence.
242 * (see setProperties)
244 //=============================================================================
246 Engines::FieldsDict* Engines_Parallel_Component_i::getProperties()
248 Engines::FieldsDict_var copie = new Engines::FieldsDict;
249 copie->length(_fieldsDict.size());
250 map<std::string,CORBA::Any>::iterator it;
252 for (it = _fieldsDict.begin(); it != _fieldsDict.end(); it++, i++)
254 std::string cle((*it).first);
255 copie[i].key = CORBA::string_dup(cle.c_str());
256 copie[i].value = _fieldsDict[cle];
258 return copie._retn();
261 //=============================================================================
263 * CORBA method: used by Supervision to give names to this instance
265 //=============================================================================
267 void Engines_Parallel_Component_i::Names( const char * graphName ,
268 const char * nodeName )
270 _graphName = graphName ;
271 _nodeName = nodeName ;
272 INFOS("Engines_Parallel_Component_i::Names( '" << _graphName << "' , '"
273 << _nodeName << "' )");
276 //=============================================================================
278 * CORBA method: used in Supervision
280 //=============================================================================
282 bool Engines_Parallel_Component_i::Kill_impl()
284 // MESSAGE("Engines_Parallel_Component_i::Kill_i() pthread_t "<< pthread_self()
285 // << " pid " << getpid() << " instanceName "
286 // << _instanceName.c_str() << " interface " << _interfaceName.c_str()
287 // << " machineName " << GetHostname().c_str()<< " _id " << hex << _id
288 // << dec << " _ThreadId " << _ThreadId << " this " << hex << this
291 bool RetVal = false ;
293 if ( _ThreadId > 0 && pthread_self() != _ThreadId )
295 RetVal = Killer( _ThreadId , 0 ) ;
296 _ThreadId = (pthread_t ) -1 ;
300 if ( _ThreadId > 0 && pthread_self().p != _ThreadId->p )
302 RetVal = Killer( *_ThreadId , 0 ) ;
303 _ThreadId = (pthread_t* ) 0 ;
310 //=============================================================================
312 * CORBA method: used in Supervision
314 //=============================================================================
316 bool Engines_Parallel_Component_i::Stop_impl()
318 MESSAGE("Engines_Parallel_Component_i::Stop_i() pthread_t "<< pthread_self()
319 << " pid " << getpid() << " instanceName "
320 << _instanceName.c_str() << " interface " << _interfaceName.c_str()
321 << " machineName " << GetHostname().c_str()<< " _id " << hex << _id
322 << dec << " _ThreadId " << _ThreadId );
325 bool RetVal = false ;
327 if ( _ThreadId > 0 && pthread_self() != _ThreadId )
329 RetVal = Killer( _ThreadId , 0 ) ;
330 _ThreadId = (pthread_t ) -1 ;
333 if ( _ThreadId > 0 && pthread_self().p != _ThreadId->p )
335 RetVal = Killer( *_ThreadId , 0 ) ;
336 _ThreadId = (pthread_t* ) 0 ;
342 //=============================================================================
344 * CORBA method: used in Supervision
346 //=============================================================================
348 bool Engines_Parallel_Component_i::Suspend_impl()
350 MESSAGE("Engines_Parallel_Component_i::Suspend_i() pthread_t "<< pthread_self()
351 << " pid " << getpid() << " instanceName "
352 << _instanceName.c_str() << " interface " << _interfaceName.c_str()
353 << " machineName " << GetHostname().c_str()<< " _id " << hex << _id
354 << dec << " _ThreadId " << _ThreadId );
356 bool RetVal = false ;
358 if ( _ThreadId > 0 && pthread_self() != _ThreadId )
360 if ( _ThreadId > 0 && pthread_self().p != _ThreadId->p )
370 RetVal = Killer( _ThreadId ,SIGINT ) ;
372 RetVal = Killer( *_ThreadId ,SIGINT ) ;
374 //if ( RetVal ) _Sleeping = true;
381 //=============================================================================
383 * CORBA method: used in Supervision
385 //=============================================================================
387 bool Engines_Parallel_Component_i::Resume_impl()
389 MESSAGE("Engines_Parallel_Component_i::Resume_i() pthread_t "<< pthread_self()
390 << " pid " << getpid() << " instanceName "
391 << _instanceName.c_str() << " interface " << _interfaceName.c_str()
392 << " machineName " << GetHostname().c_str()<< " _id " << hex << _id
393 << dec << " _ThreadId " << _ThreadId );
394 bool RetVal = false ;
396 if ( _ThreadId > 0 && pthread_self() != _ThreadId )
398 if ( _ThreadId > 0 && pthread_self().p != _ThreadId->p )
414 //=============================================================================
418 //=============================================================================
420 CORBA::Long Engines_Parallel_Component_i::CpuUsed_impl()
423 if ( _ThreadId || _Executed )
428 if ( pthread_self() != _ThreadId )
430 if ( pthread_self().p != _ThreadId->p )
438 // Get Cpu in the appropriate thread with that object !...
439 theEngines_Component = this ;
441 Killer( _ThreadId ,SIGUSR1 ) ;
443 Killer( *_ThreadId ,SIGUSR11 ) ;
446 cpu = _ThreadCpuUsed ;
450 _ThreadCpuUsed = CpuUsed() ;
451 cpu = _ThreadCpuUsed ;
452 // cout << pthread_self() << " Engines_Parallel_Component_i::CpuUsed_impl "
453 // << _serviceName << " " << cpu << endl ;
458 cpu = _ThreadCpuUsed ;
459 // cout << pthread_self() << " Engines_Parallel_Component_i::CpuUsed_impl "
460 // << _serviceName << " " << cpu<< endl ;
465 // cout<< pthread_self()<<"Engines_Parallel_Component_i::CpuUsed_impl _ThreadId "
466 // <<_ThreadId <<" "<<_serviceName<<" _StartUsed "<<_StartUsed<<endl;
472 //=============================================================================
474 * C++ method: return Container Servant
476 //=============================================================================
478 Engines_Parallel_Container_i *Engines_Parallel_Component_i::GetContainerPtr()
480 return dynamic_cast<Engines_Parallel_Container_i*>(_poa->id_to_servant(*_contId)) ;
483 //=============================================================================
485 * C++ method: set study Id
486 * \param studyId 0 if instance is not associated to a study,
487 * >0 otherwise (== study id)
488 * \return true if the set of study Id is OK
489 * must be set once by Container, at instance creation,
490 * and cannot be changed after.
492 //=============================================================================
494 CORBA::Boolean Engines_Parallel_Component_i::setStudyId(CORBA::Long studyId)
496 ASSERT( studyId >= 0);
497 CORBA::Boolean ret = false;
498 if (_studyId < 0) // --- not yet initialized
504 if ( _studyId == studyId) ret = true;
508 //=============================================================================
510 * C++ method: return CORBA instance id, the id is set in derived class
511 * constructor, when instance is activated.
513 //=============================================================================
515 PortableServer::ObjectId * Engines_Parallel_Component_i::getId()
517 // MESSAGE("PortableServer::ObjectId * Engines_Parallel_Component_i::getId()");
521 //=============================================================================
523 * C++ method: used by derived classes for supervision
525 //=============================================================================
527 void Engines_Parallel_Component_i::beginService(const char *serviceName)
529 MESSAGE(pthread_self() << "Send BeginService notification for " <<serviceName
530 << endl << "Component instance : " << _instanceName << endl << endl);
532 _ThreadId = pthread_self() ;
534 _ThreadId = new pthread_t;
535 _ThreadId->p = pthread_self().p ;
536 _ThreadId->x = pthread_self().x ;
539 _StartUsed = CpuUsed_impl() ;
542 _serviceName = serviceName ;
543 if ( pthread_setcanceltype( PTHREAD_CANCEL_ASYNCHRONOUS , NULL ) )
545 perror("pthread_setcanceltype ") ;
548 if ( pthread_setcancelstate( PTHREAD_CANCEL_ENABLE , NULL ) )
550 perror("pthread_setcancelstate ") ;
553 // MESSAGE(pthread_self() << " Return from BeginService for " << serviceName
554 // << " ThreadId " << _ThreadId << " StartUsed " << _StartUsed
555 // << " _graphName " << _graphName << " _nodeName " << _nodeName );
557 // --- for supervisor : all strings given with setProperties
558 // are set in environment
559 bool overwrite = true;
560 map<std::string,CORBA::Any>::iterator it;
561 for (it = _fieldsDict.begin(); it != _fieldsDict.end(); it++)
563 std::string cle((*it).first);
564 if ((*it).second.type()->kind() == CORBA::tk_string)
567 (*it).second >>= value;
568 // ---todo: replace __GNUC__ test by an autoconf macro AC_CHECK_FUNC.
570 int ret = setenv(cle.c_str(), value, overwrite);
572 //CCRT porting : setenv not defined in stdlib.h
576 // char* cast because 1st arg of linux putenv function
577 // is not a const char* !
578 int ret=putenv((char *)s.c_str());
579 //End of CCRT porting
581 MESSAGE("--- setenv: "<<cle<<" = "<< value);
586 //=============================================================================
588 * C++ method: used by derived classes for supervision
590 //=============================================================================
592 void Engines_Parallel_Component_i::endService(const char *serviceName)
594 _ThreadCpuUsed = CpuUsed_impl() ;
595 MESSAGE(pthread_self() << " Send EndService notification for " << serviceName
596 << endl << " Component instance : " << _instanceName << " StartUsed "
597 << _StartUsed << " _ThreadCpuUsed "<< _ThreadCpuUsed << endl <<endl);
601 //=============================================================================
603 * C++ method: -- CHECK IF USED --
605 //=============================================================================
607 char* Engines_Parallel_Component_i::graphName()
609 return CORBA::string_dup( _graphName.c_str() ) ;
612 //=============================================================================
614 * C++ method: -- CHECK IF USED --
616 //=============================================================================
618 char* Engines_Parallel_Component_i::nodeName()
620 return CORBA::string_dup( _nodeName.c_str() ) ;
623 //=============================================================================
625 * C++ method: used in Supervision (see kill_impl)
627 //=============================================================================
629 bool Engines_Parallel_Component_i::Killer( pthread_t ThreadId , int signum )
639 if ( pthread_cancel( ThreadId ) )
641 perror("Killer pthread_cancel error") ;
646 MESSAGE(pthread_self() << "Killer : ThreadId " << ThreadId
647 << " pthread_canceled") ;
652 if ( pthread_kill( ThreadId , signum ) == -1 )
654 perror("Killer pthread_kill error") ;
659 MESSAGE(pthread_self() << "Killer : ThreadId " << ThreadId
660 << " pthread_killed(" << signum << ")") ;
667 //=============================================================================
671 //=============================================================================
675 theEngines_Component->SetCurCpu() ;
678 //=============================================================================
682 //=============================================================================
684 void Engines_Parallel_Component_i::SetCurCpu()
686 _ThreadCpuUsed = CpuUsed() ;
687 // MESSAGE(pthread_self() <<
688 // " Engines_Parallel_Component_i::SetCurCpu() _ThreadCpuUsed " << _ThreadCpuUsed) ;
691 //=============================================================================
695 //=============================================================================
697 long Engines_Parallel_Component_i::CpuUsed()
701 struct rusage usage ;
702 if ( _ThreadId || _Executed )
704 if ( getrusage( RUSAGE_SELF , &usage ) == -1 )
706 perror("Engines_Parallel_Component_i::CpuUsed") ;
709 cpu = usage.ru_utime.tv_sec - _StartUsed ;
710 // cout << pthread_self() << " Engines_Parallel_Component_i::CpuUsed " << " "
711 // << _serviceName << usage.ru_utime.tv_sec << " - " << _StartUsed
712 // << " = " << cpu << endl ;
716 // cout << pthread_self() << "Engines_Parallel_Component_i::CpuUsed _ThreadId "
717 // << _ThreadId << " " << _serviceName<< " _StartUsed "
718 // << _StartUsed << endl ;
721 // NOT implementet yet
728 //=============================================================================
730 * C++ method: Send message to event channel
732 //=============================================================================
734 void Engines_Parallel_Component_i::sendMessage(const char *event_type,
737 _notifSupplier->Send(graphName(), nodeName(), event_type, message);
740 //=============================================================================
742 * C++ method: return standard library name built on component name
744 //=============================================================================
746 string Engines_Parallel_Component_i::GetDynLibraryName(const char *componentName)
754 //=============================================================================
756 * C++ method: DumpPython default implementation
758 //=============================================================================
760 Engines::TMPFile* Engines_Parallel_Component_i::DumpPython(CORBA::Object_ptr theStudy,
761 CORBA::Boolean isPublished,
762 CORBA::Boolean& isValidScript)
764 char* aScript = "def RebuildData(theStudy): pass";
765 char* aBuffer = new char[strlen(aScript)+1];
766 strcpy(aBuffer, aScript);
767 CORBA::Octet* anOctetBuf = (CORBA::Octet*)aBuffer;
768 int aBufferSize = strlen(aBuffer)+1;
769 Engines::TMPFile_var aStreamFile = new Engines::TMPFile(aBufferSize, aBufferSize, anOctetBuf, 1);
770 isValidScript = true;
771 return aStreamFile._retn();