1 // SUPERV GraphExecutor : contains classes that permit execution of graphs and particularly the execution automaton
3 // Copyright (C) 2003 OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 // CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
6 // This library is free software; you can redistribute it and/or
7 // modify it under the terms of the GNU Lesser General Public
8 // License as published by the Free Software Foundation; either
9 // version 2.1 of the License.
11 // This library is distributed in the hope that it will be useful,
12 // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 // Lesser General Public License for more details.
16 // You should have received a copy of the GNU Lesser General Public
17 // License along with this library; if not, write to the Free Software
18 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
20 // See http://www.opencascade.org/SALOME/ or email : webmaster.salome@opencascade.org
24 // File : DataFlowExecutor_InNode.cxx
25 // Author : Jean Rahuel, CEA
38 #include <SALOMEconfig.h>
39 #include CORBA_CLIENT_HEADER(SALOME_Component)
40 //#include "SALOME_NamingService.hxx"
41 #include "SALOME_LifeCycleCORBA.hxx"
43 #include "DataFlowBase_FactoryNode.hxx"
44 #include "DataFlowBase_GOTONode.hxx"
45 #include "DataFlowBase_LoopNode.hxx"
46 #include "DataFlowBase_EndOfLoopNode.hxx"
47 #include "DataFlowBase_SwitchNode.hxx"
48 #include "DataFlowBase_EndOfSwitchNode.hxx"
50 #include "DataFlowExecutor_OutNode.hxx"
52 static void InitInNode( int &_RewindStack ,
53 SUPERV::ControlState &_ControlState ,
54 SUPERV::AutomatonState &_currentState ,
55 GraphExecutor::InNode ** _aReStartNode ,
56 bool & _PyFuncRunned ,
57 PyObject ** _MyPyRunMethod ,
58 pthread_mutex_t &_MutexDataWait ,
60 pthread_mutex_t &_MutexWait ,
61 pthread_cond_t &_ReadyWait ,
62 pthread_cond_t &_RunningWait ,
63 pthread_cond_t &_DoneWait ,
64 pthread_cond_t &_SuspendedWait ,
65 pthread_cond_t &_SuspendWait ,
67 pthread_cond_t &_ResumeWait ,
69 pthread_cond_t &_KillWait ,
71 pthread_cond_t &_StopWait ,
72 GraphExecutor::FiniteStateMachine ** _Automaton ,
73 GraphExecutor::FiniteStateMachine * theAutomaton ,
74 CORBA::ORB_ptr * _Orb ,
75 CORBA::ORB_ptr ORB ) {
77 _ControlState = SUPERV::VoidState ;
78 _currentState = SUPERV::UnKnownState ;
79 *_aReStartNode = NULL ;
80 _PyFuncRunned = false ;
81 *_MyPyRunMethod = NULL ;
82 pthread_mutex_init( &_MutexDataWait , NULL ) ;
84 pthread_mutex_init( &_MutexWait , NULL ) ;
85 if ( pthread_cond_init( &_ReadyWait , NULL ) ) {
86 perror("pthread_cond_init( &_ReadyWait , NULL )") ;
89 if ( pthread_cond_init( &_RunningWait , NULL ) ) {
90 perror("pthread_cond_init( &_RunningWait , NULL )") ;
93 if ( pthread_cond_init( &_DoneWait , NULL ) ) {
94 perror("pthread_cond_init( &_DoneWait , NULL )") ;
97 if ( pthread_cond_init( &_SuspendedWait , NULL ) ) {
98 perror("pthread_cond_init( &_SuspendedWait , NULL )") ;
101 if ( pthread_cond_init( &_SuspendWait , NULL ) ) {
102 perror("pthread_cond_init( &_SuspendWait , NULL )") ;
105 _SuspendSync = false ;
106 if ( pthread_cond_init( &_ResumeWait , NULL ) ) {
107 perror("pthread_cond_init( &_ResumeWait , NULL )") ;
110 _ResumeSync = false ;
111 if ( pthread_cond_init( &_KillWait , NULL ) ) {
112 perror("pthread_cond_init( &_KillWait , NULL )") ;
116 if ( pthread_cond_init( &_StopWait , NULL ) ) {
117 perror("pthread_cond_init( &_StopWait , NULL )") ;
120 *_Automaton = theAutomaton ;
121 *_Orb = CORBA::ORB::_nil();
124 GraphExecutor::FiniteStateMachine * theAutomaton = new
125 GraphExecutor::FiniteStateMachine() ;
127 //GraphExecutor::InNode::InNode() :
128 // GraphBase::FactoryNode() {
129 GraphExecutor::InNode::InNode() {
130 InitInNode( _RewindStack ,
153 CORBA::ORB::_nil() ) ;
156 GraphExecutor::InNode::InNode(CORBA::ORB_ptr ORB,
157 SALOME_NamingService* ptrNamingService ,
158 const SALOME_ModuleCatalog::Service& aService ,
159 const char * ComponentName ,
160 const char * NodeInterfaceName ,
161 const char * NodeName ,
162 const SUPERV::KindOfNode akind ,
163 GraphBase::ListOfFuncName aFuncName ,
164 GraphBase::ListOfPythonFunctions aPythonFunction ,
165 const SUPERV::SDate NodeFirstCreation ,
166 const SUPERV::SDate NodeLastModification ,
167 const char * NodeEditorRelease ,
168 const char * NodeAuthor ,
169 const char * NodeComputer ,
170 const char * NodeComment ,
171 const bool GeneratedName ,
174 int * Graph_prof_debug,
175 ofstream * Graph_fdebug) {
176 // ostream * Graph_fdebug = NULL ) :
177 // GraphBase::FactoryNode( ORB , ptrNamingService , aService ,
178 // ComponentName , NodeInterfaceName ,
179 // NodeName , akind ,
180 // NodeFirstCreation , NodeLastModification ,
181 // NodeEditorRelease , NodeAuthor ,
182 // NodeComputer , NodeComment , GeneratedName ,
184 // Graph_prof_debug , Graph_fdebug ) {
185 InitInNode( _RewindStack ,
209 SetDebug( ORB , Graph_prof_debug , Graph_fdebug ) ;
211 _ComputingNode = NULL ;
212 _FactoryNode = NULL ;
216 _EndOfLoopNode = NULL ;
218 _EndOfSwitchNode = NULL ;
220 case SUPERV::ComputingNode : {
221 cdebug << "GraphExecutor::InNode::InNode SUPERV::ComputingNode : " << NodeName ;
222 _ComputingNode = new GraphBase::ComputingNode( ORB , ptrNamingService ,
226 NodeLastModification ,
227 NodeEditorRelease , NodeAuthor ,
228 NodeComment , GeneratedName ,
230 Graph_prof_debug , Graph_fdebug ) ;
233 case SUPERV::FactoryNode : {
234 cdebug << "GraphExecutor::InNode::InNode SUPERV::FactoryNode : " << NodeName ;
235 _FactoryNode = new GraphBase::FactoryNode( ORB , ptrNamingService , aService ,
236 ComponentName , NodeInterfaceName ,
239 NodeLastModification ,
240 NodeEditorRelease , NodeAuthor ,
241 NodeComputer , NodeComment ,
242 GeneratedName , NodeX , NodeY ,
243 Graph_prof_debug , Graph_fdebug ) ;
244 _ComputingNode = (GraphBase::ComputingNode *) _FactoryNode ;
247 case SUPERV::InLineNode : {
248 cdebug << "GraphExecutor::InNode::InNode SUPERV::InLineNode : " << NodeName ;
249 _InLineNode = new GraphBase::InLineNode( ORB , ptrNamingService ,
250 aFuncName[0].c_str() , *aPythonFunction[0] ,
252 NodeFirstCreation , NodeLastModification ,
253 NodeEditorRelease , NodeAuthor ,
254 NodeComment , GeneratedName ,
256 Graph_prof_debug , Graph_fdebug ) ;
257 _ComputingNode = (GraphBase::ComputingNode *) _InLineNode ;
260 case SUPERV::GOTONode : {
261 cdebug << "GraphEditor::InNode::InNode SUPERV::GOTONode : " << NodeName ;
262 _GOTONode = new GraphBase::GOTONode( ORB , ptrNamingService ,
263 aFuncName[0].c_str() , *aPythonFunction[0] ,
265 NodeFirstCreation , NodeLastModification ,
266 NodeEditorRelease , NodeAuthor ,
267 NodeComment , GeneratedName ,
269 Graph_prof_debug , Graph_fdebug ) ;
270 _ComputingNode = (GraphBase::ComputingNode *) _GOTONode ;
271 _InLineNode = (GraphBase::InLineNode *) _ComputingNode ;
274 case SUPERV::LoopNode : {
275 cdebug << "GraphExecutor::InNode::InNode SUPERV::LoopNode : " << NodeName ;
276 _LoopNode = new GraphBase::LoopNode( ORB , ptrNamingService ,
277 aFuncName[0].c_str() , *aPythonFunction[0] ,
278 aFuncName[1].c_str() , *aPythonFunction[1] ,
279 aFuncName[2].c_str() , *aPythonFunction[2] ,
281 NodeFirstCreation , NodeLastModification ,
282 NodeEditorRelease , NodeAuthor ,
283 NodeComment , GeneratedName ,
285 Graph_prof_debug , Graph_fdebug ) ;
286 _ComputingNode = (GraphBase::ComputingNode *) _LoopNode ;
287 _GOTONode = (GraphBase::GOTONode *) _ComputingNode ;
288 _InLineNode = (GraphBase::InLineNode *) _ComputingNode ;
291 case SUPERV::EndLoopNode : {
292 cdebug << "GraphEditor::InNode::InNode SUPERV::EndOfLoopNode : " << NodeName ;
293 _EndOfLoopNode = new GraphBase::EndOfLoopNode(
294 ORB , ptrNamingService ,
295 aFuncName[0].c_str() , *aPythonFunction[0] ,
297 NodeFirstCreation , NodeLastModification ,
298 NodeEditorRelease , NodeAuthor ,
299 NodeComment , GeneratedName ,
301 Graph_prof_debug , Graph_fdebug ) ;
302 _ComputingNode = (GraphBase::ComputingNode *) _EndOfLoopNode ;
303 _GOTONode = (GraphBase::GOTONode *) _ComputingNode ;
304 _InLineNode = (GraphBase::InLineNode *) _ComputingNode ;
307 case SUPERV::SwitchNode : {
308 cdebug << "GraphExecutor::InNode::InNode SUPERV::SwitchNode : " << NodeName ;
309 _SwitchNode = new GraphBase::SwitchNode( ORB , ptrNamingService ,
310 aFuncName[0].c_str() , *aPythonFunction[0] ,
312 NodeFirstCreation , NodeLastModification ,
313 NodeEditorRelease , NodeAuthor ,
314 NodeComment , GeneratedName ,
316 Graph_prof_debug , Graph_fdebug ) ;
317 _ComputingNode = (GraphBase::ComputingNode *) _SwitchNode ;
318 _GOTONode = (GraphBase::GOTONode *) _ComputingNode ;
319 _InLineNode = (GraphBase::InLineNode *) _ComputingNode ;
322 case SUPERV::EndSwitchNode : {
323 cdebug << "GraphEditor::InNode::InNode SUPERV::EndOfSwitchNode : " << NodeName ;
324 _EndOfSwitchNode = new GraphBase::EndOfSwitchNode(
325 ORB , ptrNamingService ,
326 aFuncName[0].c_str() , *aPythonFunction[0] ,
328 NodeFirstCreation , NodeLastModification ,
329 NodeEditorRelease , NodeAuthor ,
330 NodeComment , GeneratedName ,
332 Graph_prof_debug , Graph_fdebug ) ;
333 _ComputingNode = (GraphBase::ComputingNode *) _EndOfSwitchNode ;
334 _GOTONode = (GraphBase::GOTONode *) _ComputingNode ;
335 _InLineNode = (GraphBase::InLineNode *) _ComputingNode ;
338 case SUPERV::DataFlowNode : {
339 cdebug << "GraphEditor::InNode::InNode SUPERV::DataFlowNode ERROR : " << NodeName ;
341 case SUPERV::UnknownNode : {
342 cdebug << "GraphEditor::InNode::InNode SUPERV::UnknownNode ERROR : " << NodeName ;
345 cdebug << "GraphExecutor::InNode::InNode " << (void *) this
346 << " _ComputingNode " << (void *) _ComputingNode ;
347 _ComputingNode->InNode( this ) ;
350 GraphExecutor::InNode::~InNode() {
353 void GraphExecutor::InNode::LockDataWait() {
354 if ( pthread_mutex_lock( &_MutexDataWait ) ) {
355 perror("Ready pthread_mutex_lock ") ;
360 void GraphExecutor::InNode::UnLockDataWait() {
362 if ( pthread_mutex_unlock( &_MutexDataWait ) ) {
363 perror("Ready pthread_mutex_unlock ") ;
368 Engines::Component_var GraphExecutor::InNode::Component() const {
369 if ( IsFactoryNode() ) {
370 return _FactoryNode->Component() ;
373 CORBA::Any const * anAnyComponent = GetChangeNodeInPort( 0 )->GetOutPort()->Value() ; // this
374 CORBA::Object_ptr obj ;
376 *anAnyComponent >>= obj ;
377 return Engines::Component::_narrow( obj ) ;
380 cdebug << "GraphExecutor::InNode::Component Component catch" << endl ;
383 return Engines::Component::_nil() ;
386 Engines::Container_var GraphExecutor::InNode::Container() const {
387 if ( IsFactoryNode() ) {
388 return _FactoryNode->Container() ;
390 return Engines::Container::_nil() ;
394 bool GraphExecutor::InNode::Ping() {
395 // cdebug_in << "GraphExecutor::InNode::Ping" << endl;
397 if ( IsFactoryNode() ) {
398 RetVal = !CORBA::is_nil( _FactoryNode->Component() ) ;
400 if ( State() != SUPERV::SuspendedExecutingState ) {
401 _FactoryNode->Component()->ping() ;
408 // cdebug_out << "GraphExecutor::InNode::Ping" << endl ;
412 void GraphExecutor::InNode::NewThread( pthread_t aThread ) {
413 ThreadNo ( aThread ) ;
415 _OutNode->NewThread() ;
417 void GraphExecutor::InNode::ExitThread() {
419 _OutNode->ExitThread() ;
422 bool GraphExecutor::InNode::Suspend() {
423 cdebug_in << "GraphExecutor::InNode::Suspend " << Name() << " " << ThreadNo()
427 ControlState( SUPERV::VoidState ) ;
430 else if ( IsWaiting() || IsReady() ) {
431 ControlState( SUPERV::ToSuspendState ) ;
434 else if ( IsRunning() ) {
435 ControlState( SUPERV::ToSuspendState ) ;
436 if ( IsFactoryNode() ) {
437 if ( !CORBA::is_nil( Component() ) ) {
438 RetVal = Component()->Suspend_impl() ;
441 SendEvent( GraphExecutor::SuspendEvent ) ;
443 else if ( IsDone() ) {
444 ControlState( SUPERV::VoidState ) ;
445 RetVal = false ; // Too late ...
448 cdebug << "component Suspended and !IsDone and !IsRunning !"
454 cdebug << "Suspend cannot Suspend component !" << endl ;
459 cdebug << "Suspend with nilComponent while RunningState !" << endl ;
464 cdebug << "Suspend and !IsDone and !IsRunning and !IsWaiting ?"
468 cdebug_out << "GraphExecutor::InNode::Suspend " << RetVal << endl ;
472 bool GraphExecutor::InNode::ContainerKill() {
473 cdebug_in << "GraphExecutor::InNode::ContainerKill " << Name() << " "
474 << ThreadNo() << endl;
476 if ( IsFactoryNode() ) {
478 Container()->Kill_impl() ;
480 cdebug_out << "GraphExecutor::InNode::ContainerKill" << endl ;
484 bool GraphExecutor::InNode::Kill() {
485 cdebug_in << "GraphExecutor::InNode::Kill " << Name() << " " << ThreadNo() << " "
486 << Automaton()->StateName( State() ) << endl;
488 if ( ControlState() == SUPERV::ToKillState || IsDone() ) {
492 ControlState( SUPERV::ToKillState ) ;
494 ControlState( SUPERV::VoidState ) ;
499 if ( IsFactoryNode() ) {
500 if ( !CORBA::is_nil( Component() ) ) {
501 RetVal = Component()->Kill_impl() ;
504 SendEvent( GraphExecutor::KillEvent ) ;
506 else if ( IsDone() ) {
507 ControlState( SUPERV::VoidState ) ;
508 RetVal = false ; // Too late ...
511 cdebug << "component Killed and !IsDone and !IsRunning !"
517 cdebug << "Kill cannot Kill component !" << endl ;
522 cdebug << "Kill with nilComponent while RunningState !" << endl ;
523 SendEvent( GraphExecutor::KillEvent ) ;
524 RetVal = IsKilled() ;
527 else if ( IsSuspended() ) {
528 SendEvent( GraphExecutor::KillEvent ) ;
531 else if ( IsWaiting() ) {
535 cdebug << "Kill and !IsDone and !IsRunning and !IsWaiting ?"
541 cdebug_out << "GraphExecutor::InNode::Kill" << endl ;
545 bool GraphExecutor::InNode::KillDone() {
546 cdebug_in << "GraphExecutor::InNode::KillDone " << Name() << " " << ThreadNo()
549 if ( ControlState() == SUPERV::ToKillDoneState || IsDone() ) {
553 ControlState( SUPERV::ToKillDoneState ) ;
555 ControlState( SUPERV::VoidState ) ;
560 if ( IsFactoryNode() ) {
561 if ( !CORBA::is_nil( Component() ) ) {
562 RetVal = Component()->Kill_impl() ;
565 SendEvent( GraphExecutor::SuspendEvent ) ;
567 else if ( IsDone() ) {
568 ControlState( SUPERV::VoidState ) ;
569 RetVal = false ; // Too late ...
572 cdebug << "component Suspended and !IsDone and !IsRunning !"
578 cdebug << "Suspend cannot Suspend component !" << endl ;
583 cdebug << "Suspend with nilComponent while RunningState !" << endl ;
587 else if ( IsWaiting() ) {
591 cdebug << "Suspend and !IsDone and !IsRunning and !IsWaiting ?"
597 cdebug_out << "GraphExecutor::InNode::KillDone" << endl ;
601 bool GraphExecutor::InNode::Stop() {
602 cdebug_in << "GraphExecutor::InNode::Stop " << Name() << " " << ThreadNo()
605 if ( ControlState() == SUPERV::ToStopState || IsDone() ) {
609 ControlState( SUPERV::ToStopState ) ;
611 ControlState( SUPERV::VoidState ) ;
616 if ( IsFactoryNode() ) {
617 if ( !CORBA::is_nil( Component() ) ) {
618 RetVal = Component()->Stop_impl() ;
621 SendEvent( GraphExecutor::SuspendEvent ) ;
623 else if ( IsDone() ) {
624 ControlState( SUPERV::VoidState ) ;
625 RetVal = false ; // Too late ...
628 cdebug << "component Suspended and !IsDone and !IsRunning !"
634 cdebug << "Suspend cannot Suspend component !" << endl ;
639 cdebug << "Suspend with nilComponent while RunningState !" << endl ;
643 else if ( IsWaiting() ) {
647 cdebug << "Suspend and !IsDone and !IsRunning and !IsWaiting ?"
653 cdebug_out << "GraphExecutor::InNode::Stop" << endl ;
657 bool GraphExecutor::InNode::SuspendDone() {
658 cdebug_in << "GraphExecutor::InNode::SuspendDone " << Name() << " "
659 << ThreadNo() << endl;
661 if ( ControlState() == SUPERV::ToSuspendDoneState || IsDone() ) {
665 ControlState( SUPERV::ToSuspendDoneState ) ;
667 ControlState( SUPERV::VoidState ) ;
674 cdebug_out << "GraphExecutor::InNode::SuspendDone" << endl ;
678 bool GraphExecutor::InNode::Resume() {
679 cdebug_in << pthread_self() << "/" << ThreadNo()
680 << " GraphExecutor::InNode::Resume " << Name() << " "
681 << Automaton()->StateName( State() ) << endl;
682 bool RetVal = false ;
683 if ( IsSuspended() ) {
684 if ( State() == SUPERV::SuspendedReadyState ) {
685 ResumeAction( GraphExecutor::ToResumeEvent ) ;
688 else if ( State() == SUPERV::SuspendedExecutingState ) {
689 if ( IsFactoryNode() ) {
690 RetVal = Component()->Resume_impl() ;
693 else if ( State() == SUPERV::SuspendedSuccessedState ) {
694 ResumeAction( GraphExecutor::ToResumeEvent ) ;
697 else if ( State() == SUPERV::SuspendedErroredState ) {
698 ResumeAction( GraphExecutor::ToResumeEvent ) ;
702 cdebug << "GraphExecutor::InNode::Resume Not SuspendedReady/Executing/Successed/ErroredState "
703 << Automaton()->StateName( State() ) << endl ;
708 cdebug << "GraphExecutor::InNode::Resume Not Suspended State "
709 << Automaton()->StateName( State() ) << endl ;
712 if ( ControlState() == SUPERV::ToSuspendStartState ) {
713 ControlState( SUPERV::VoidState ) ;
717 if ( ControlState() == SUPERV::ToSuspendRunState ||
718 ( ControlState() == SUPERV::ToSuspendState &&
719 State() == SUPERV::SuspendedReadyState) ) {
720 if ( IsSuspended() ) {
721 if ( State() == SUPERV::SuspendedReadyState ) {
725 else if ( State() == SUPERV::SuspendedExecutingState ) {
727 RetVal = Component()->Resume_impl() ;
730 cdebug << "GraphExecutor::InNode::Resume State "
731 << Automaton()->StateName( State() ) << endl ;
734 if ( ControlState() != SUPERV::ToSuspendState ) {
735 ControlState( SUPERV::VoidState ) ;
738 else if ( IsRunning() ) {
741 else if ( IsWaiting() ) {
742 ControlState( SUPERV::VoidState ) ;
745 else if ( IsDone() ) {
749 else if ( ControlState() == SUPERV::ToSuspendDoneState ||
750 ( ControlState() == SUPERV::ToSuspendState &&
751 State() == SUPERV::SuspendedSuccessedState) ) {
752 if ( IsSuspended() ) {
753 if ( State() == SUPERV::SuspendedSuccessedState ) {
757 else if ( State() == SUPERV::SuspendedErroredState ) {
762 cdebug << "GraphExecutor::InNode::Resume State " << State() << endl ;
765 if ( ControlState() != SUPERV::ToSuspendState ) {
766 ControlState( SUPERV::VoidState ) ;
769 else if ( IsRunning() ) {
770 ControlState( SUPERV::VoidState ) ;
773 else if ( IsWaiting() ) {
774 ControlState( SUPERV::VoidState ) ;
777 else if ( IsDone() ) {
778 ControlState( SUPERV::VoidState ) ;
783 cdebug_out << "GraphExecutor::InNode::Resume " << RetVal << endl ;
787 bool GraphExecutor::InNode::ReStart( const char * AtNodeName ,
788 const bool AndSuspend ) {
789 bool RetVal = false ;
790 GraphExecutor::InNode * aRestartNode = (GraphExecutor::InNode *) _OutNode->GetGraphNode( AtNodeName )->GetInNode() ;
791 cdebug_in << pthread_self() << "/" << ThreadNo()
792 << " --> GraphExecutor::InNode::ReStartAt( "
793 << AtNodeName << " , " << AndSuspend << ") " << endl
794 << "thread " << aRestartNode->ThreadNo() << " "
795 << Automaton()->StateName( aRestartNode->State() )
796 << " from " << Name() << " " << Automaton()->StateName( State() )
798 if ( IsWaiting() && aRestartNode->IsSuspended() ) {
799 RetVal = aRestartNode->Resume() ;
801 else if ( IsSuspended() ) {
802 if ( strcmp( AtNodeName , Name() ) ) {
803 aRestartNode->State( SUPERV::SuspendedSuccessedState ) ;
806 ReStartAction( aRestartNode , GraphExecutor::ReStartAndSuspendEvent ) ;
809 ReStartAction( aRestartNode , GraphExecutor::ReStartEvent ) ;
813 cdebug_out << "<-- GraphExecutor::InNode::ReStartAt" << endl ;
817 bool GraphExecutor::InNode::IsWaiting() {
819 // cdebug_in << "GraphExecutor::InNode::IsWaiting " << Name() << endl;
820 SUPERV::AutomatonState aState = State() ;
821 if ( aState == SUPERV::DataUndefState ||
822 aState == SUPERV::DataWaitingState ||
823 aState == SUPERV::SuspendedReadyState )
824 // aState == SUPERV::SuspendedExecutingState ||
825 // aState == SUPERV::SuspendedSuccessedState ||
826 // aState == SUPERV::SuspendedErroredState ||
827 // aState == SUPERV::SuspendedState
829 // cdebug_out << "GraphExecutor::InNode::IsWaiting" << endl ;
833 bool GraphExecutor::InNode::IsReady() {
835 // cdebug_in << "GraphExecutor::InNode::IsReady " << Name() << endl;
836 SUPERV::AutomatonState aState = State() ;
837 if ( aState == SUPERV::DataUndefState ||
838 aState == SUPERV::DataWaitingState ||
839 aState == SUPERV::DataReadyState ||
840 aState == SUPERV::ResumedReadyState )
842 // cdebug_out << "GraphExecutor::InNode::IsReady" << endl ;
846 bool GraphExecutor::InNode::IsRunning() {
848 // cdebug_in << "GraphExecutor::InNode::IsRunning " << Name() << endl;
849 SUPERV::AutomatonState aState = State() ;
850 if ( aState == SUPERV::ExecutingState ||
851 aState == SUPERV::ResumedExecutingState )
853 // cdebug_out << "GraphExecutor::InNode::IsRunning" << endl ;
857 bool GraphExecutor::InNode::IsDone() {
859 // cdebug_in << "GraphExecutor::InNode::IsDone " << Name() << endl;
860 SUPERV::AutomatonState aState = State() ;
861 if ( aState == SUPERV::KilledReadyState ||
862 aState == SUPERV::StoppedReadyState ||
863 aState == SUPERV::KilledExecutingState ||
864 aState == SUPERV::StoppedExecutingState ||
865 aState == SUPERV::SuspendedSuccessedState ||
866 aState == SUPERV::SuspendedErroredState ||
867 // aState == SUPERV::SuccessedExecutingState ||
868 // aState == SUPERV::ErroredExecutingState ||
869 aState == SUPERV::SuccessedState ||
870 aState == SUPERV::ErroredState ||
871 aState == SUPERV::ResumedSuccessedState ||
872 aState == SUPERV::ResumedErroredState ||
873 aState == SUPERV::KilledSuccessedState ||
874 aState == SUPERV::StoppedSuccessedState )
876 // cdebug_out << "GraphExecutor::InNode::IsDone" << endl ;
880 bool GraphExecutor::InNode::IsSuspended() {
882 // cdebug_in << "GraphExecutor::InNode::IsSuspended " << Name() << endl;
883 SUPERV::AutomatonState aState = State() ;
884 if ( aState == SUPERV::SuspendedReadyState ||
885 aState == SUPERV::SuspendedExecutingState ||
886 aState == SUPERV::SuspendedSuccessedState ||
887 aState == SUPERV::SuspendedErroredState )
889 // cdebug_out << "GraphExecutor::InNode::IsSuspended" << endl ;
892 bool GraphExecutor::InNode::IsKilled() {
894 // cdebug_in << "GraphExecutor::InNode::IsKilled " << Name() << endl;
895 SUPERV::AutomatonState aState = State() ;
896 if ( aState == SUPERV::KilledReadyState ||
897 aState == SUPERV::KilledExecutingState ||
898 aState == SUPERV::KilledSuccessedState ||
899 aState == SUPERV::KilledErroredState ||
900 aState == SUPERV::KilledState )
902 // cdebug_out << "GraphExecutor::InNode::IsKilled" << endl ;
905 bool GraphExecutor::InNode::IsStopped() {
907 // cdebug_in << "GraphExecutor::InNode::IsStopped " << Name() << endl;
908 SUPERV::AutomatonState aState = State() ;
909 if ( aState == SUPERV::StoppedReadyState ||
910 aState == SUPERV::StoppedExecutingState ||
911 aState == SUPERV::StoppedSuccessedState ||
912 aState == SUPERV::StoppedErroredState ||
913 aState == SUPERV::StoppedState )
915 // cdebug_out << "GraphExecutor::InNode::IsStopped" << endl ;
919 bool GraphExecutor::InNode::StateWait( SUPERV::GraphState aState ) {
920 bool RetVal = false ;
921 if ( pthread_mutex_lock( &_MutexWait ) ) {
922 perror("pthread_mutex_lock _Wait") ;
926 case SUPERV::ReadyState : {
928 cdebug_in << pthread_self() << " StateWait( Ready ) " << RetVal
929 << " " << Automaton()->StateName( _currentState )
930 << " pthread_cond_wait _ReadyWait " << Name() << endl ;
931 while ( !RetVal && !IsDone() ) {
932 cdebug << pthread_self() << " pthread_cond_wait ReadyWait" << endl ;
933 pthread_cond_wait( &_ReadyWait , &_MutexWait );
935 cdebug << pthread_self() << " pthread_cond_waited ReadyWait "
936 << Automaton()->StateName( _currentState ) << " " << RetVal
939 cdebug_out << pthread_self() << " StateWait( Ready ) " << RetVal
940 << " " << Automaton()->StateName( _currentState )
941 << " pthread_cond_wait _ReadyWait " << Name() << endl ;
944 case SUPERV::RunningState : {
945 RetVal = IsRunning() ;
946 cdebug_in << pthread_self() << " StateWait( Running ) " << RetVal
947 << " " << Automaton()->StateName( _currentState )
948 << " pthread_cond_wait _RunningWait " << Name() << endl ;
949 while ( !RetVal && !IsDone() ) {
950 cdebug << pthread_self() << " pthread_cond_wait RunningWait" << endl ;
951 pthread_cond_wait( &_RunningWait , &_MutexWait );
952 RetVal = IsRunning() ;
953 cdebug << pthread_self() << " pthread_cond_waited RunningWait "
954 << Automaton()->StateName( _currentState ) << " " << RetVal
957 cdebug_out << pthread_self() << " StateWait( Running ) " << RetVal
958 << " " << Automaton()->StateName( _currentState )
959 << " pthread_cond_wait _RunningWait " << Name() << endl ;
962 case SUPERV::DoneState : {
964 cdebug_in << pthread_self() << " StateWait( Done ) " << RetVal
965 << " " << Automaton()->StateName( _currentState )
966 << " pthread_cond_wait _DoneWait " << Name() << endl ;
968 cdebug << pthread_self() << " pthread_cond_wait DoneWait" << endl ;
969 pthread_cond_wait( &_DoneWait , &_MutexWait );
971 cdebug << pthread_self() << " pthread_cond_waited DoneWait "
972 << Automaton()->StateName( _currentState ) << " " << RetVal
975 cdebug_out << pthread_self() << " StateWait( Done ) " << RetVal
976 << " " << Automaton()->StateName( _currentState )
977 << " pthread_cond_wait _DoneWait " << Name() << endl ;
980 case SUPERV::SuspendState : {
981 RetVal = IsSuspended() ;
982 cdebug_in << pthread_self() << " StateWait( Suspend ) " << RetVal
983 << " " << Automaton()->StateName( _currentState )
984 << " pthread_cond_wait _SuspendedWait " << Name() << endl ;
985 while ( !RetVal && !IsDone() ) {
986 cdebug << pthread_self() << " pthread_cond_wait SuspendedWait" << endl ;
987 pthread_cond_wait( &_SuspendedWait , &_MutexWait );
988 RetVal = IsSuspended() ;
989 cdebug << pthread_self() << " pthread_cond_waited SuspendedWait "
990 << Automaton()->StateName( _currentState ) << " " << RetVal
993 cdebug_out << pthread_self() << " StateWait( Suspend ) " << RetVal
994 << " " << Automaton()->StateName( _currentState )
995 << " pthread_cond_wait _SuspendedWait " << Name() << endl ;
999 cdebug << " GraphExecutor::OutNode::StateWait Error Undefined State : "
1003 if ( pthread_mutex_unlock( &_MutexWait ) ) {
1004 perror("pthread_mutex_lock _Wait") ;
1010 bool GraphExecutor::InNode::ReadyWait() {
1011 // cdebug_in << "GraphExecutor::InNode::ReadyWait " << Name() << endl;
1013 aret = StateWait( SUPERV::ReadyState ) ;
1014 // cdebug_out << "GraphExecutor::InNode::ReadyWait" << endl ;
1018 bool GraphExecutor::InNode::RunningWait() {
1019 // cdebug_in << "GraphExecutor::InNode::RunningWait " << Name() << endl;
1021 aret = StateWait( SUPERV::RunningState ) ;
1025 bool GraphExecutor::InNode::DoneWait() {
1026 // cdebug_in << "GraphExecutor::InNode::DoneWait " << Name() << endl;
1028 aret = StateWait( SUPERV::DoneState ) ;
1032 bool GraphExecutor::InNode::SuspendedWait() {
1033 // cdebug_in << "GraphExecutor::InNode::SuspendedWait " << Name() << endl;
1035 aret = StateWait( SUPERV::SuspendState ) ;
1039 void GraphExecutor::InNode::InitialState( GraphExecutor::OutNode * theOutNode )
1041 cdebug_in << "GraphExecutor::InNode::InitialState Node " << Name() << endl;
1043 _OutNode = theOutNode ;
1046 _ControlState = SUPERV::VoidState ;
1047 CreateNewThread( false ) ;
1048 CreateNewThreadIf( false ) ;
1049 _SuspendSync = false ;
1050 _ResumeSync = false ;
1051 // ThreadNo( pthread_self() ) ;
1054 for ( i = 0 ; i < GetNodeOutPortsSize() ; i++ ) {
1055 if ( i != 0 || !IsGOTONode() ) {
1056 GetChangeNodeOutPort(i)->State( SUPERV::WaitingState ) ;
1057 GetChangeNodeOutPort(i)->Done( false ) ;
1061 int Pc = GetNodeInPortsSize() ;
1062 for ( i = 0 ; i < GetNodeInPortsSize() ; i++ ) {
1063 const GraphBase::InPort * anInPort = GetNodeInPort(i) ;
1064 GraphBase::OutPort * anOutPort = anInPort->GetOutPort() ;
1065 if ( IsHeadNode() && IsLoopNode() && anInPort->IsLoop() ) {
1066 anOutPort->PortStatus( DataConnected );
1067 anOutPort->State( SUPERV::ReadyState ) ;
1068 anOutPort->Done( true ) ;
1069 CORBA::Any * anAny = new CORBA::Any() ;
1070 *anAny <<= (long ) 1 ;
1071 anOutPort->Value( anAny ) ;
1073 else if ( anInPort->IsGate() && anOutPort ) {
1074 if ( IsComputingNode() || IsFactoryNode() ) {
1075 anOutPort->State( SUPERV::WaitingState ) ;
1076 anOutPort->Done( false ) ;
1078 else if ( IsOneOfInLineNodes() ) {
1079 anOutPort->PortStatus( DataConnected );
1080 anOutPort->State( SUPERV::ReadyState ) ;
1081 anOutPort->Done( true ) ;
1084 // if ( ( anInPort->IsGate() || anInPort->IsBus() ) && anOutPort == NULL ) {
1085 if ( anInPort->IsGate() && anOutPort == NULL ) {
1089 if ( anOutPort->IsDataConnected() ) {
1092 if ( anOutPort->IsPortConnected() ) {
1093 anOutPort->State( SUPERV::WaitingState ) ;
1094 anOutPort->Done( false ) ;
1096 else if ( anOutPort->IsDataConnected() ) {
1097 anOutPort->State( SUPERV::ReadyState ) ;
1098 anOutPort->Done( true ) ;
1102 GetChangeNodeInPort(i)->State( anOutPort->State() ) ;
1105 cdebug << "InPort" << i << " : " << anInPort->PortName() << " from OutPort "
1106 << anOutPort->PortName() << " from Node " << anOutPort->NodeName()
1108 if ( anOutPort->State() == SUPERV::WaitingState ) {
1109 cdebug << "WaitingState" ;
1111 else if ( anOutPort->State() == SUPERV::ReadyState ) {
1112 cdebug << "ReadyState" ;
1117 cdebug << " PortConnected("
1118 << anOutPort->IsPortConnected() << ") DataConnected("
1119 << anOutPort->IsDataConnected() << ")" << endl ;
1122 if ( !PyFuncRunned() ) {
1124 if ( IsLoopNode() ) {
1125 PyObject * PyRunMethod = InitPyDynInvoke( InLineNode()->PyFuncName() ,
1126 InLineNode()->PythonFunction() ) ;
1127 InLineNode()->PyRunMethod( PyRunMethod ) ;
1128 PyObject * PyMoreMethod = NULL ;
1129 if ( PyRunMethod ) {
1130 PyMoreMethod = InitPyDynInvoke( LoopNode()->PyMoreName() ,
1131 LoopNode()->MorePythonFunction() ) ;
1132 LoopNode()->PyMoreMethod( PyMoreMethod ) ;
1134 PyObject * PyNextMethod = NULL ;
1135 if ( PyMoreMethod ) {
1136 PyNextMethod = InitPyDynInvoke( LoopNode()->PyNextName() ,
1137 LoopNode()->NextPythonFunction() ) ;
1138 LoopNode()->PyNextMethod( PyNextMethod ) ;
1140 Err = !PyRunMethod || !PyMoreMethod || !PyNextMethod ;
1142 else if ( IsInLineNode() || IsSwitchNode() ) {
1143 PyObject * PyRunMethod = InitPyDynInvoke( InLineNode()->PyFuncName() ,
1144 InLineNode()->PythonFunction() ) ;
1145 InLineNode()->PyRunMethod( PyRunMethod ) ;
1146 Err = !PyRunMethod ;
1148 else if ( ( IsEndSwitchNode() || IsGOTONode() ) &&
1149 (*InLineNode()->PythonFunction()).length() ) {
1150 PyObject * PyRunMethod = InitPyDynInvoke( InLineNode()->PyFuncName() ,
1151 InLineNode()->PythonFunction() ) ;
1152 InLineNode()->PyRunMethod( PyRunMethod ) ;
1153 Err = !PyRunMethod ;
1158 _currentState = Pc > 0 ? SUPERV::DataWaitingState
1159 : SUPERV::DataReadyState ;
1160 if ( Pc == GetNodeInPortsSize() ) {
1161 _OutNode->PushEvent( this , GraphExecutor::NoDataReadyEvent ,
1164 else if ( Pc != 0 ) {
1165 _OutNode->PushEvent( this , GraphExecutor::SomeDataReadyEvent ,
1169 _OutNode->PushEvent( this , GraphExecutor::AllDataReadyEvent ,
1172 cdebug << "CurrentState = " << theAutomaton->StateName( _currentState )
1175 cdebug_out << "GraphExecutor::InNode::InitialState" << endl;
1178 const long GraphExecutor::InNode::CpuUsed( bool tot ) {
1179 CORBA::Long cpu = 0 ;
1180 // cdebug_in << "GraphExecutor::InNode::CpuUsed( " << tot << " )" << Name() << endl ;
1181 if ( IsOneOfInLineNodes() ) {
1182 // cdebug << "CpuUsed " << Name() << " --> PyCpuUsed()" << endl ;
1183 // cout << "CpuUsed " << Name() << " --> PyCpuUsed()" << endl ;
1184 cpu = PyCpuUsed( tot ) ;
1187 if ( !CORBA::is_nil( Component() ) ) {
1188 // cdebug << "CpuUsed " << Name() << " --> Component()->CpuUsed_impl()" << endl ;
1189 // cout << "CpuUsed " << Name() << " --> Component()->CpuUsed_impl()" << endl ;
1191 cpu = Component()->CpuUsed_impl() ;
1194 cdebug << "CpuUsed " << Name() << " --> Component()->CpuUsed_impl() ERROR catched " << endl ;
1199 // cdebug_out << "GraphExecutor::InNode::CpuUsed " << Name() << " CpuUsed : " << cpu << endl ;
1200 // cout << "CpuUsed " << Name() << " CpuUsed : " << cpu << endl ;
1204 #include <sys/time.h>
1205 #include <sys/resource.h>
1208 long GraphExecutor::InNode::PyCpu() {
1209 struct rusage usage ;
1211 if ( getrusage( RUSAGE_SELF , &usage ) == -1 ) {
1212 perror("GraphExecutor::InNode::PyCpu") ;
1215 // return usage.ru_utime.__time_t tv_sec ;
1216 // cdebug << pthread_self() << "PyCpu " << Name() << " " << usage.ru_utime.tv_sec << " "
1217 // << usage.ru_utime.tv_usec << " " << usage.ru_stime.tv_sec << " " << usage.ru_stime.tv_usec
1219 cpu = usage.ru_utime.tv_sec ;
1223 long GraphExecutor::InNode::PyCpuUsed( bool tot ) {
1225 if ( _PyTotCpuUsed == -1 ) {
1226 if ( _Pythread == pthread_self() ) {
1227 // cdebug << pthread_self() << "GraphExecutor::InNode::PyCpuUsed(" << tot << ") " << Name()
1228 // << " _PyTotCpuUsed " << _PyTotCpuUsed << " PyCpu() " << PyCpu() << " - " << " _PyCpuUsed "
1229 // << _PyCpuUsed << endl ;
1230 cpu = PyCpu() - _PyCpuUsed ;
1232 _PyTotCpuUsed = cpu ;
1240 cpu = _PyTotCpuUsed ;
1242 // cdebug << pthread_self() << "GraphExecutor::InNode::PyCpuUsed(" << tot << ") " << Name() << "_PyTotCpuUsed"
1243 // << _PyTotCpuUsed << " CpuUsed : " << cpu << endl ;
1247 void GraphExecutor::InNode::SetPyCpuUsed() {
1248 _PyTotCpuUsed = -1 ;
1250 _Pythread = pthread_self() ;
1251 _PyCpuUsed = PyCpu() ;
1252 // cdebug << pthread_self() << "GraphExecutor::InNode::SetPyCpuUsed " << Name() << " _PyCpuUsed : "
1253 // << _PyCpuUsed << endl ;