1 // SUPERV GraphExecutor : contains classes that permit execution of graphs and particularly the execution automaton
3 // Copyright (C) 2003 OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 // CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
6 // This library is free software; you can redistribute it and/or
7 // modify it under the terms of the GNU Lesser General Public
8 // License as published by the Free Software Foundation; either
9 // version 2.1 of the License.
11 // This library is distributed in the hope that it will be useful,
12 // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 // Lesser General Public License for more details.
16 // You should have received a copy of the GNU Lesser General Public
17 // License along with this library; if not, write to the Free Software
18 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
20 // See http://www.opencascade.org/SALOME/ or email : webmaster.salome@opencascade.org
24 // File : DataFlowExecutor_InNode.cxx
25 // Author : Jean Rahuel, CEA
38 #include <SALOMEconfig.h>
39 #include CORBA_CLIENT_HEADER(SALOME_Component)
40 //#include "SALOME_NamingService.hxx"
41 #include "SALOME_LifeCycleCORBA.hxx"
43 #include "DataFlowBase_FactoryNode.hxx"
44 #include "DataFlowBase_GOTONode.hxx"
45 #include "DataFlowBase_LoopNode.hxx"
46 #include "DataFlowBase_EndOfLoopNode.hxx"
47 #include "DataFlowBase_SwitchNode.hxx"
48 #include "DataFlowBase_EndOfSwitchNode.hxx"
50 #include "DataFlowExecutor_OutNode.hxx"
52 static void InitInNode( int &_RewindStack ,
53 SUPERV::ControlState &_ControlState ,
54 SUPERV::AutomatonState &_currentState ,
55 GraphExecutor::InNode ** _aReStartNode ,
56 bool & _PyFuncRunned ,
57 PyObject ** _MyPyRunMethod ,
58 pthread_mutex_t &_MutexDataWait ,
60 pthread_mutex_t &_MutexWait ,
61 pthread_cond_t &_ReadyWait ,
62 pthread_cond_t &_RunningWait ,
63 pthread_cond_t &_DoneWait ,
64 pthread_cond_t &_SuspendedWait ,
65 pthread_cond_t &_SuspendWait ,
67 pthread_cond_t &_ResumeWait ,
69 pthread_cond_t &_KillWait ,
71 pthread_cond_t &_ThreadStartedWait ,
72 bool &_ThreadStartedSync ,
73 pthread_cond_t &_StopWait ,
74 GraphExecutor::FiniteStateMachine ** _Automaton ,
75 GraphExecutor::FiniteStateMachine * theAutomaton ,
76 CORBA::ORB_ptr * _Orb ,
77 CORBA::ORB_ptr ORB ) {
79 _ControlState = SUPERV::VoidState ;
80 _currentState = SUPERV::UnKnownState ;
81 *_aReStartNode = NULL ;
82 _PyFuncRunned = false ;
83 *_MyPyRunMethod = NULL ;
84 pthread_mutex_init( &_MutexDataWait , NULL ) ;
86 pthread_mutex_init( &_MutexWait , NULL ) ;
87 if ( pthread_cond_init( &_ReadyWait , NULL ) ) {
88 perror("pthread_cond_init( &_ReadyWait , NULL )") ;
91 if ( pthread_cond_init( &_RunningWait , NULL ) ) {
92 perror("pthread_cond_init( &_RunningWait , NULL )") ;
95 if ( pthread_cond_init( &_DoneWait , NULL ) ) {
96 perror("pthread_cond_init( &_DoneWait , NULL )") ;
99 if ( pthread_cond_init( &_SuspendedWait , NULL ) ) {
100 perror("pthread_cond_init( &_SuspendedWait , NULL )") ;
103 if ( pthread_cond_init( &_SuspendWait , NULL ) ) {
104 perror("pthread_cond_init( &_SuspendWait , NULL )") ;
107 _SuspendSync = false ;
108 if ( pthread_cond_init( &_ResumeWait , NULL ) ) {
109 perror("pthread_cond_init( &_ResumeWait , NULL )") ;
112 _ResumeSync = false ;
113 if ( pthread_cond_init( &_KillWait , NULL ) ) {
114 perror("pthread_cond_init( &_KillWait , NULL )") ;
118 if ( pthread_cond_init( &_ThreadStartedWait , NULL ) ) {
119 perror("pthread_cond_init( &_ThreadStartedWait , NULL )") ;
122 _ThreadStartedSync = false ;
123 if ( pthread_cond_init( &_StopWait , NULL ) ) {
124 perror("pthread_cond_init( &_StopWait , NULL )") ;
127 *_Automaton = theAutomaton ;
128 *_Orb = CORBA::ORB::_nil();
131 GraphExecutor::FiniteStateMachine * theAutomaton = new GraphExecutor::FiniteStateMachine() ;
133 //GraphExecutor::InNode::InNode() :
134 // GraphBase::FactoryNode() {
135 GraphExecutor::InNode::InNode() {
136 InitInNode( _RewindStack ,
161 CORBA::ORB::_nil() ) ;
164 GraphExecutor::InNode::InNode( CORBA::ORB_ptr ORB,
165 SALOME_NamingService* ptrNamingService ,
166 const SALOME_ModuleCatalog::Service& aService ,
167 const char * ComponentName ,
168 const char * NodeInterfaceName ,
169 const char * NodeName ,
170 const SUPERV::KindOfNode akind ,
171 GraphBase::ListOfFuncName aFuncName ,
172 GraphBase::ListOfPythonFunctions aPythonFunction ,
173 const SUPERV::SDate NodeFirstCreation ,
174 const SUPERV::SDate NodeLastModification ,
175 const char * NodeEditorRelease ,
176 const char * NodeAuthor ,
177 const char * NodeComputer ,
178 const char * NodeComment ,
179 const bool GeneratedName ,
182 int * Graph_prof_debug,
183 ofstream * Graph_fdebug) {
184 // ostream * Graph_fdebug = NULL ) :
185 // GraphBase::FactoryNode( ORB , ptrNamingService , aService ,
186 // ComponentName , NodeInterfaceName ,
187 // NodeName , akind ,
188 // NodeFirstCreation , NodeLastModification ,
189 // NodeEditorRelease , NodeAuthor ,
190 // NodeComputer , NodeComment , GeneratedName ,
192 // Graph_prof_debug , Graph_fdebug ) {
193 InitInNode( _RewindStack ,
219 SetDebug( ORB , Graph_prof_debug , Graph_fdebug ) ;
221 _ComputingNode = NULL ;
222 _FactoryNode = NULL ;
226 _EndOfLoopNode = NULL ;
228 _EndOfSwitchNode = NULL ;
230 case SUPERV::ComputingNode : {
231 cdebug << "GraphExecutor::InNode::InNode SUPERV::ComputingNode : " << NodeName ;
232 _ComputingNode = new GraphBase::ComputingNode( ORB , ptrNamingService ,
236 NodeLastModification ,
237 NodeEditorRelease , NodeAuthor ,
238 NodeComment , GeneratedName ,
240 Graph_prof_debug , Graph_fdebug ) ;
243 case SUPERV::FactoryNode : {
244 cdebug << "GraphExecutor::InNode::InNode SUPERV::FactoryNode : " << NodeName ;
245 _FactoryNode = new GraphBase::FactoryNode( ORB , ptrNamingService , aService ,
246 ComponentName , NodeInterfaceName ,
249 NodeLastModification ,
250 NodeEditorRelease , NodeAuthor ,
251 NodeComputer , NodeComment ,
252 GeneratedName , NodeX , NodeY ,
253 Graph_prof_debug , Graph_fdebug ) ;
254 _ComputingNode = (GraphBase::ComputingNode *) _FactoryNode ;
257 case SUPERV::InLineNode : {
258 cdebug << "GraphExecutor::InNode::InNode SUPERV::InLineNode : " << NodeName ;
259 _InLineNode = new GraphBase::InLineNode( ORB , ptrNamingService ,
260 aFuncName[0].c_str() , *aPythonFunction[0] ,
262 NodeFirstCreation , NodeLastModification ,
263 NodeEditorRelease , NodeAuthor ,
264 NodeComment , GeneratedName ,
266 Graph_prof_debug , Graph_fdebug ) ;
267 _ComputingNode = (GraphBase::ComputingNode *) _InLineNode ;
270 case SUPERV::GOTONode : {
271 cdebug << "GraphEditor::InNode::InNode SUPERV::GOTONode : " << NodeName ;
272 _GOTONode = new GraphBase::GOTONode( ORB , ptrNamingService ,
273 aFuncName[0].c_str() , *aPythonFunction[0] ,
275 NodeFirstCreation , NodeLastModification ,
276 NodeEditorRelease , NodeAuthor ,
277 NodeComment , GeneratedName ,
279 Graph_prof_debug , Graph_fdebug ) ;
280 _ComputingNode = (GraphBase::ComputingNode *) _GOTONode ;
281 _InLineNode = (GraphBase::InLineNode *) _ComputingNode ;
284 case SUPERV::LoopNode : {
285 cdebug << "GraphExecutor::InNode::InNode SUPERV::LoopNode : " << NodeName ;
286 _LoopNode = new GraphBase::LoopNode( ORB , ptrNamingService ,
287 aFuncName[0].c_str() , *aPythonFunction[0] ,
288 aFuncName[1].c_str() , *aPythonFunction[1] ,
289 aFuncName[2].c_str() , *aPythonFunction[2] ,
291 NodeFirstCreation , NodeLastModification ,
292 NodeEditorRelease , NodeAuthor ,
293 NodeComment , GeneratedName ,
295 Graph_prof_debug , Graph_fdebug ) ;
296 _ComputingNode = (GraphBase::ComputingNode *) _LoopNode ;
297 _GOTONode = (GraphBase::GOTONode *) _ComputingNode ;
298 _InLineNode = (GraphBase::InLineNode *) _ComputingNode ;
301 case SUPERV::EndLoopNode : {
302 cdebug << "GraphEditor::InNode::InNode SUPERV::EndOfLoopNode : " << NodeName ;
303 _EndOfLoopNode = new GraphBase::EndOfLoopNode(
304 ORB , ptrNamingService ,
305 aFuncName[0].c_str() , *aPythonFunction[0] ,
307 NodeFirstCreation , NodeLastModification ,
308 NodeEditorRelease , NodeAuthor ,
309 NodeComment , GeneratedName ,
311 Graph_prof_debug , Graph_fdebug ) ;
312 _ComputingNode = (GraphBase::ComputingNode *) _EndOfLoopNode ;
313 _GOTONode = (GraphBase::GOTONode *) _ComputingNode ;
314 _InLineNode = (GraphBase::InLineNode *) _ComputingNode ;
317 case SUPERV::SwitchNode : {
318 cdebug << "GraphExecutor::InNode::InNode SUPERV::SwitchNode : " << NodeName ;
319 _SwitchNode = new GraphBase::SwitchNode( ORB , ptrNamingService ,
320 aFuncName[0].c_str() , *aPythonFunction[0] ,
322 NodeFirstCreation , NodeLastModification ,
323 NodeEditorRelease , NodeAuthor ,
324 NodeComment , GeneratedName ,
326 Graph_prof_debug , Graph_fdebug ) ;
327 _ComputingNode = (GraphBase::ComputingNode *) _SwitchNode ;
328 _GOTONode = (GraphBase::GOTONode *) _ComputingNode ;
329 _InLineNode = (GraphBase::InLineNode *) _ComputingNode ;
332 case SUPERV::EndSwitchNode : {
333 cdebug << "GraphEditor::InNode::InNode SUPERV::EndOfSwitchNode : " << NodeName ;
334 _EndOfSwitchNode = new GraphBase::EndOfSwitchNode(
335 ORB , ptrNamingService ,
336 aFuncName[0].c_str() , *aPythonFunction[0] ,
338 NodeFirstCreation , NodeLastModification ,
339 NodeEditorRelease , NodeAuthor ,
340 NodeComment , GeneratedName ,
342 Graph_prof_debug , Graph_fdebug ) ;
343 _ComputingNode = (GraphBase::ComputingNode *) _EndOfSwitchNode ;
344 _GOTONode = (GraphBase::GOTONode *) _ComputingNode ;
345 _InLineNode = (GraphBase::InLineNode *) _ComputingNode ;
348 case SUPERV::DataFlowGraph : {
349 cdebug << "GraphEditor::InNode::InNode SUPERV::DataFlowGraph ERROR : " << NodeName ;
351 case SUPERV::DataStreamGraph : {
352 cdebug << "GraphEditor::InNode::InNode SUPERV::DataStreamGraph ERROR : " << NodeName ;
354 case SUPERV::UnknownNode : {
355 cdebug << "GraphEditor::InNode::InNode SUPERV::UnknownNode ERROR : " << NodeName ;
358 cdebug << "GraphExecutor::InNode::InNode " << (void *) this
359 << " _ComputingNode " << (void *) _ComputingNode ;
360 _ComputingNode->InNode( this ) ;
363 GraphExecutor::InNode::~InNode() {
366 void GraphExecutor::InNode::LockDataWait() {
367 if ( pthread_mutex_lock( &_MutexDataWait ) ) {
368 perror("Ready pthread_mutex_lock ") ;
373 void GraphExecutor::InNode::UnLockDataWait() {
375 if ( pthread_mutex_unlock( &_MutexDataWait ) ) {
376 perror("Ready pthread_mutex_unlock ") ;
381 Engines::Component_var GraphExecutor::InNode::Component() const {
382 if ( IsFactoryNode() ) {
383 return _FactoryNode->Component() ;
386 CORBA::Any const * anAnyComponent = GetChangeNodeInPort( 0 )->GetOutPort()->Value() ; // this
387 CORBA::Object_ptr obj ;
389 *anAnyComponent >>= obj ;
390 return Engines::Component::_narrow( obj ) ;
393 cdebug << "GraphExecutor::InNode::Component Component catch" << endl ;
396 return Engines::Component::_nil() ;
399 Engines::Container_var GraphExecutor::InNode::Container() const {
400 if ( IsFactoryNode() ) {
401 return _FactoryNode->Container() ;
403 return Engines::Container::_nil() ;
407 bool GraphExecutor::InNode::Ping() {
408 // cdebug_in << "GraphExecutor::InNode::Ping" << endl;
410 if ( IsFactoryNode() ) {
411 RetVal = !CORBA::is_nil( _FactoryNode->Component() ) ;
413 if ( State() != SUPERV::SuspendedExecutingState ) {
415 _FactoryNode->Component()->ping() ;
418 cdebug << "InNode::Ping() catched" << endl ;
419 State( SUPERV::ErroredState ) ;
420 _OutNode->State( SUPERV::ErroredState ) ;
429 // cdebug_out << "GraphExecutor::InNode::Ping" << endl ;
433 void GraphExecutor::InNode::NewThread( pthread_t aThread ) {
434 ThreadNo ( aThread ) ;
436 _OutNode->NewThread() ;
438 void GraphExecutor::InNode::ExitThread() {
440 _OutNode->ExitThread() ;
443 bool GraphExecutor::InNode::Suspend() {
444 cdebug_in << "GraphExecutor::InNode::Suspend " << Name() << " " << ThreadNo()
448 ControlState( SUPERV::VoidState ) ;
449 if ( _OutNode->IsDone() ) {
450 ControlState( SUPERV::VoidState ) ;
454 else if ( IsWaiting() || IsReady() ) {
455 ControlState( SUPERV::ToSuspendState ) ;
458 else if ( IsRunning() ) {
459 ControlState( SUPERV::ToSuspendState ) ;
460 if ( IsFactoryNode() || IsComputingNode() ) {
461 if ( !CORBA::is_nil( Component() ) ) {
463 RetVal = Component()->Suspend_impl() ;
466 cdebug << "InNode::Suspend() catched" << endl ;
467 State( SUPERV::ErroredState ) ;
468 _OutNode->State( SUPERV::ErroredState ) ;
473 cdebug << pthread_self() << "GraphExecutor::InNode::Suspend_impl " << Name()
474 << " --> thread" << ThreadNo() << " SuspendEvent " << endl;
475 SendEvent( GraphExecutor::SuspendEvent ) ;
476 cdebug << pthread_self() << "GraphExecutor::InNode::Suspended_impl in Container"
477 << Name() << " --> thread" << ThreadNo() << endl;
479 else if ( IsDone() ) {
480 ControlState( SUPERV::VoidState ) ;
481 RetVal = false ; // Too late ...
484 cdebug << "component Suspended and !IsDone and !IsRunning !"
490 cdebug << "Suspend cannot Suspend component !" << endl ;
495 cdebug << "Suspend with nilComponent while RunningState !" << endl ;
500 cdebug << "Suspend and !IsDone and !IsRunning and !IsWaiting ?"
504 cdebug_out << "GraphExecutor::InNode::Suspend " << RetVal << " "
505 << Automaton()->StateName( State() ) << endl ;
509 bool GraphExecutor::InNode::ContainerKill() {
510 cdebug_in << "GraphExecutor::InNode::ContainerKill " << Name() << " "
511 << ThreadNo() << endl;
513 if ( IsFactoryNode() ) {
515 RetVal = Container()->Kill_impl() ;
517 cdebug_out << "GraphExecutor::InNode::ContainerKill" << endl ;
521 bool GraphExecutor::InNode::Kill() {
522 cdebug_in << "GraphExecutor::InNode::Kill " << Name() << " " << ThreadNo() << " "
523 << Automaton()->StateName( State() ) << endl;
526 ControlState( SUPERV::ToKillState ) ; // if loop
527 if ( _OutNode->IsDone() ) {
528 ControlState( SUPERV::VoidState ) ;
533 ControlState( SUPERV::ToKillState ) ;
535 if ( _OutNode->IsDone() ) {
536 ControlState( SUPERV::VoidState ) ;
542 if ( IsFactoryNode() || IsComputingNode() ) {
543 if ( !CORBA::is_nil( Component() ) ) {
545 RetVal = Component()->Kill_impl() ;
548 cdebug << "InNode::Suspend() catched" << endl ;
549 State( SUPERV::ErroredState ) ;
550 _OutNode->State( SUPERV::ErroredState ) ;
553 cdebug << "Component()->Kill_impl() returns status " << RetVal << endl ;
556 cdebug << pthread_self() << "GraphExecutor::InNode::Kill_impl " << Name()
557 << " --> thread" << ThreadNo() << " SuspendEvent " << endl;
558 SendEvent( GraphExecutor::KillEvent ) ;
559 cdebug << pthread_self() << "GraphExecutor::InNode::Killed_impl in Container"
560 << Name() << " --> thread" << ThreadNo() << endl;
562 else if ( IsDone() ) {
563 ControlState( SUPERV::VoidState ) ;
564 RetVal = false ; // Too late ...
567 cdebug << "component Killed and !IsDone and !IsRunning !"
572 cdebug << "Kill with nilComponent cannot Kill component !" << endl ;
577 else if ( IsSuspended() ) {
578 cdebug << pthread_self() << "GraphExecutor::InNode::Kill " << Name()
579 << " --> thread" << ThreadNo() << " Resume()" << endl;
583 else if ( IsWaiting() ) {
587 cdebug << "Kill and !IsDone and !IsRunning and !IsWaiting ?"
593 cdebug_out << "GraphExecutor::InNode::Kill" << endl ;
597 bool GraphExecutor::InNode::KillDone() {
598 cdebug_in << "GraphExecutor::InNode::KillDone " << Name() << " " << ThreadNo()
601 if ( ControlState() == SUPERV::ToKillDoneState || IsDone() ) {
605 ControlState( SUPERV::ToKillDoneState ) ;
607 if ( _OutNode->IsDone() ) {
608 ControlState( SUPERV::VoidState ) ;
616 else if ( IsWaiting() ) {
620 cdebug << "KillDone and !IsDone and !IsRunning and !IsWaiting ?"
626 cdebug_out << "GraphExecutor::InNode::KillDone" << endl ;
630 bool GraphExecutor::InNode::Stop() {
631 cdebug_in << "GraphExecutor::InNode::Stop " << Name() << " " << ThreadNo()
634 if ( ControlState() == SUPERV::ToStopState || IsDone() ) {
638 ControlState( SUPERV::ToStopState ) ;
640 if ( _OutNode->IsDone() ) {
641 ControlState( SUPERV::VoidState ) ;
647 if ( IsFactoryNode() || IsComputingNode() ) {
648 if ( !CORBA::is_nil( Component() ) ) {
650 RetVal = Component()->Stop_impl() ;
653 cdebug << "InNode::Stop() catched" << endl ;
654 State( SUPERV::ErroredState ) ;
655 _OutNode->State( SUPERV::ErroredState ) ;
660 SendEvent( GraphExecutor::StopEvent ) ;
662 else if ( IsDone() ) {
663 ControlState( SUPERV::VoidState ) ;
664 RetVal = false ; // Too late ...
667 cdebug << "component Suspended and !IsDone and !IsRunning !"
673 cdebug << "Suspend cannot Suspend component !" << endl ;
678 cdebug << "Suspend with nilComponent while RunningState !" << endl ;
682 else if ( IsWaiting() ) {
686 cdebug << "Suspend and !IsDone and !IsRunning and !IsWaiting ?"
692 cdebug_out << "GraphExecutor::InNode::Stop" << endl ;
696 bool GraphExecutor::InNode::SuspendDone() {
697 cdebug_in << "GraphExecutor::InNode::SuspendDone " << Name() << " "
698 << ThreadNo() << endl;
700 if ( ControlState() == SUPERV::ToSuspendDoneState || IsDone() ) {
704 ControlState( SUPERV::ToSuspendDoneState ) ;
706 if ( _OutNode->IsDone() ) {
707 ControlState( SUPERV::VoidState ) ;
715 cdebug_out << "GraphExecutor::InNode::SuspendDone" << endl ;
719 bool GraphExecutor::InNode::Resume() {
720 cdebug_in << pthread_self() << "/" << ThreadNo()
721 << " GraphExecutor::InNode::Resume " << Name() << " "
722 << Automaton()->StateName( State() ) << endl;
723 bool RetVal = false ;
724 if ( IsSuspended() ) {
725 if ( State() == SUPERV::SuspendedReadyState ) {
726 ResumeAction( GraphExecutor::ToResumeEvent ) ;
729 else if ( State() == SUPERV::SuspendedExecutingState ) {
730 if ( IsFactoryNode() || IsComputingNode() ) {
732 RetVal = Component()->Resume_impl() ;
735 cdebug << "InNode::Resume() catched" << endl ;
736 State( SUPERV::ErroredState ) ;
737 _OutNode->State( SUPERV::ErroredState ) ;
742 else if ( State() == SUPERV::SuspendedSuccessedState ) {
743 ResumeAction( GraphExecutor::ResumeEvent ) ;
746 else if ( State() == SUPERV::SuspendedErroredState ) {
747 ResumeAction( GraphExecutor::ResumeEvent ) ;
751 cdebug << "GraphExecutor::InNode::Resume Not SuspendedReady/Executing/Successed/ErroredState "
752 << Automaton()->StateName( State() ) << endl ;
757 cdebug << "GraphExecutor::InNode::Resume Not Suspended State "
758 << Automaton()->StateName( State() ) << endl ;
761 if ( ControlState() == SUPERV::ToSuspendStartState ) {
762 ControlState( SUPERV::VoidState ) ;
766 if ( ControlState() == SUPERV::ToSuspendRunState ||
767 ( ControlState() == SUPERV::ToSuspendState &&
768 State() == SUPERV::SuspendedReadyState) ) {
769 if ( IsSuspended() ) {
770 if ( State() == SUPERV::SuspendedReadyState ) {
774 else if ( State() == SUPERV::SuspendedExecutingState ) {
776 RetVal = Component()->Resume_impl() ;
779 cdebug << "GraphExecutor::InNode::Resume State "
780 << Automaton()->StateName( State() ) << endl ;
783 if ( ControlState() != SUPERV::ToSuspendState ) {
784 ControlState( SUPERV::VoidState ) ;
787 else if ( IsRunning() ) {
790 else if ( IsWaiting() ) {
791 ControlState( SUPERV::VoidState ) ;
794 else if ( IsDone() ) {
798 else if ( ControlState() == SUPERV::ToSuspendDoneState ||
799 ( ControlState() == SUPERV::ToSuspendState &&
800 State() == SUPERV::SuspendedSuccessedState) ) {
801 if ( IsSuspended() ) {
802 if ( State() == SUPERV::SuspendedSuccessedState ) {
806 else if ( State() == SUPERV::SuspendedErroredState ) {
811 cdebug << "GraphExecutor::InNode::Resume State " << State() << endl ;
814 if ( ControlState() != SUPERV::ToSuspendState ) {
815 ControlState( SUPERV::VoidState ) ;
818 else if ( IsRunning() ) {
819 ControlState( SUPERV::VoidState ) ;
822 else if ( IsWaiting() ) {
823 ControlState( SUPERV::VoidState ) ;
826 else if ( IsDone() ) {
827 ControlState( SUPERV::VoidState ) ;
832 cdebug_out << "GraphExecutor::InNode::Resume " << RetVal << endl ;
836 bool GraphExecutor::InNode::ReStart( const char * AtNodeName ,
837 const bool AndSuspend ) {
838 bool RetVal = false ;
839 GraphExecutor::InNode * aRestartNode = (GraphExecutor::InNode *) _OutNode->Graph()->GetGraphNode( AtNodeName )->GetInNode() ;
840 cdebug_in << pthread_self() << "/" << ThreadNo()
841 << " --> GraphExecutor::InNode::ReStartAt( "
842 << AtNodeName << " , " << AndSuspend << ") " << endl
843 << "thread " << aRestartNode->ThreadNo() << " "
844 << Automaton()->StateName( aRestartNode->State() )
845 << " from " << Name() << " " << Automaton()->StateName( State() )
847 if ( IsWaiting() && aRestartNode->IsSuspended() ) {
848 RetVal = aRestartNode->Resume() ;
850 else if ( IsSuspended() ) {
851 if ( strcmp( AtNodeName , Name() ) ) {
852 aRestartNode->State( SUPERV::SuspendedSuccessedState ) ;
855 ReStartAction( aRestartNode , GraphExecutor::ReStartAndSuspendEvent ) ;
858 ReStartAction( aRestartNode , GraphExecutor::ReStartEvent ) ;
862 cdebug_out << "<-- GraphExecutor::InNode::ReStartAt" << endl ;
866 bool GraphExecutor::InNode::IsWaiting() {
868 // cdebug_in << "GraphExecutor::InNode::IsWaiting " << Name() << endl;
869 SUPERV::AutomatonState aState = State() ;
870 if ( aState == SUPERV::DataUndefState ||
871 aState == SUPERV::DataWaitingState ||
872 aState == SUPERV::SuspendedReadyState )
873 // aState == SUPERV::SuspendedExecutingState ||
874 // aState == SUPERV::SuspendedSuccessedState ||
875 // aState == SUPERV::SuspendedErroredState ||
876 // aState == SUPERV::SuspendedState
878 // cdebug_out << "GraphExecutor::InNode::IsWaiting" << endl ;
882 bool GraphExecutor::InNode::IsReady() {
884 // cdebug_in << "GraphExecutor::InNode::IsReady " << Name() << endl;
885 SUPERV::AutomatonState aState = State() ;
886 if ( aState == SUPERV::DataUndefState ||
887 aState == SUPERV::DataWaitingState ||
888 aState == SUPERV::DataReadyState ||
889 aState == SUPERV::ResumedReadyState )
891 // cdebug_out << "GraphExecutor::InNode::IsReady" << endl ;
895 bool GraphExecutor::InNode::IsRunning() {
897 // cdebug_in << "GraphExecutor::InNode::IsRunning " << Name() << endl;
898 SUPERV::AutomatonState aState = State() ;
899 if ( aState == SUPERV::ExecutingState ||
900 aState == SUPERV::ResumedExecutingState )
902 // cdebug_out << "GraphExecutor::InNode::IsRunning" << endl ;
906 bool GraphExecutor::InNode::IsDone() {
908 // cdebug_in << "GraphExecutor::InNode::IsDone " << Name() << endl;
909 SUPERV::AutomatonState aState = State() ;
910 if ( aState == SUPERV::KilledReadyState ||
911 aState == SUPERV::StoppedReadyState ||
912 aState == SUPERV::KilledExecutingState ||
913 aState == SUPERV::StoppedExecutingState ||
914 aState == SUPERV::SuspendedSuccessedState ||
915 aState == SUPERV::SuspendedErroredState ||
916 // aState == SUPERV::SuccessedExecutingState ||
917 // aState == SUPERV::ErroredExecutingState ||
918 aState == SUPERV::SuccessedState ||
919 aState == SUPERV::ErroredState ||
920 aState == SUPERV::ResumedSuccessedState ||
921 aState == SUPERV::ResumedErroredState ||
922 aState == SUPERV::KilledSuccessedState ||
923 aState == SUPERV::StoppedSuccessedState )
925 // cdebug_out << "GraphExecutor::InNode::IsDone" << endl ;
929 bool GraphExecutor::InNode::IsSuspended() {
931 // cdebug_in << "GraphExecutor::InNode::IsSuspended " << Name() << endl;
932 SUPERV::AutomatonState aState = State() ;
933 if ( aState == SUPERV::SuspendedReadyState ||
934 aState == SUPERV::SuspendedExecutingState ||
935 aState == SUPERV::SuspendedSuccessedState ||
936 aState == SUPERV::SuspendedErroredState )
938 // cdebug_out << "GraphExecutor::InNode::IsSuspended" << endl ;
941 bool GraphExecutor::InNode::IsKilled() {
943 // cdebug_in << "GraphExecutor::InNode::IsKilled " << Name() << endl;
944 SUPERV::AutomatonState aState = State() ;
945 if ( aState == SUPERV::KilledReadyState ||
946 aState == SUPERV::KilledExecutingState ||
947 aState == SUPERV::KilledSuccessedState ||
948 aState == SUPERV::KilledErroredState ||
949 aState == SUPERV::KilledState )
951 // cdebug_out << "GraphExecutor::InNode::IsKilled" << endl ;
954 bool GraphExecutor::InNode::IsStopped() {
956 // cdebug_in << "GraphExecutor::InNode::IsStopped " << Name() << endl;
957 SUPERV::AutomatonState aState = State() ;
958 if ( aState == SUPERV::StoppedReadyState ||
959 aState == SUPERV::StoppedExecutingState ||
960 aState == SUPERV::StoppedSuccessedState ||
961 aState == SUPERV::StoppedErroredState ||
962 aState == SUPERV::StoppedState )
964 // cdebug_out << "GraphExecutor::InNode::IsStopped" << endl ;
968 bool GraphExecutor::InNode::StateWait( SUPERV::GraphState aState ) {
969 bool RetVal = false ;
970 if ( pthread_mutex_lock( &_MutexWait ) ) {
971 perror("pthread_mutex_lock _Wait") ;
975 case SUPERV::ReadyState : {
977 cdebug_in << pthread_self() << " StateWait( Ready ) " << RetVal
978 << " " << Automaton()->StateName( _currentState )
979 << " pthread_cond_wait _ReadyWait " << Name() << endl ;
980 while ( !RetVal && !IsDone() ) {
981 cdebug << pthread_self() << " pthread_cond_wait ReadyWait" << endl ;
982 pthread_cond_wait( &_ReadyWait , &_MutexWait );
984 cdebug << pthread_self() << " pthread_cond_waited ReadyWait "
985 << Automaton()->StateName( _currentState ) << " " << RetVal
988 cdebug_out << pthread_self() << " StateWait( Ready ) " << RetVal
989 << " " << Automaton()->StateName( _currentState )
990 << " pthread_cond_wait _ReadyWait " << Name() << endl ;
993 case SUPERV::RunningState : {
994 RetVal = IsRunning() ;
995 cdebug_in << pthread_self() << " StateWait( Running ) " << RetVal
996 << " " << Automaton()->StateName( _currentState )
997 << " pthread_cond_wait _RunningWait " << Name() << endl ;
998 while ( !RetVal && !IsDone() ) {
999 cdebug << pthread_self() << " pthread_cond_wait RunningWait" << endl ;
1000 pthread_cond_wait( &_RunningWait , &_MutexWait );
1001 RetVal = IsRunning() ;
1002 cdebug << pthread_self() << " pthread_cond_waited RunningWait "
1003 << Automaton()->StateName( _currentState ) << " " << RetVal
1006 cdebug_out << pthread_self() << " StateWait( Running ) " << RetVal
1007 << " " << Automaton()->StateName( _currentState )
1008 << " pthread_cond_wait _RunningWait " << Name() << endl ;
1011 case SUPERV::DoneState : {
1013 cdebug_in << pthread_self() << " StateWait( Done ) " << RetVal
1014 << " " << Automaton()->StateName( _currentState )
1015 << " pthread_cond_wait _DoneWait " << Name() << endl ;
1017 cdebug << pthread_self() << " pthread_cond_wait DoneWait" << endl ;
1018 pthread_cond_wait( &_DoneWait , &_MutexWait );
1020 cdebug << pthread_self() << " pthread_cond_waited DoneWait "
1021 << Automaton()->StateName( _currentState ) << " " << RetVal
1024 cdebug_out << pthread_self() << " StateWait( Done ) " << RetVal
1025 << " " << Automaton()->StateName( _currentState )
1026 << " pthread_cond_wait _DoneWait " << Name() << endl ;
1029 case SUPERV::SuspendState : {
1030 RetVal = IsSuspended() ;
1031 cdebug_in << pthread_self() << " StateWait( Suspend ) " << RetVal
1032 << " " << Automaton()->StateName( _currentState )
1033 << " pthread_cond_wait _SuspendedWait " << Name() << endl ;
1034 while ( !RetVal && !IsDone() ) {
1035 cdebug << pthread_self() << " pthread_cond_wait SuspendedWait" << endl ;
1036 pthread_cond_wait( &_SuspendedWait , &_MutexWait );
1037 RetVal = IsSuspended() ;
1038 cdebug << pthread_self() << " pthread_cond_waited SuspendedWait "
1039 << Automaton()->StateName( _currentState ) << " " << RetVal
1042 cdebug_out << pthread_self() << " StateWait( Suspend ) " << RetVal
1043 << " " << Automaton()->StateName( _currentState )
1044 << " pthread_cond_wait _SuspendedWait " << Name() << endl ;
1048 cdebug << " GraphExecutor::OutNode::StateWait Error Undefined State : "
1052 if ( pthread_mutex_unlock( &_MutexWait ) ) {
1053 perror("pthread_mutex_lock _Wait") ;
1059 bool GraphExecutor::InNode::ReadyWait() {
1060 // cdebug_in << "GraphExecutor::InNode::ReadyWait " << Name() << endl;
1062 aret = StateWait( SUPERV::ReadyState ) ;
1063 // cdebug_out << "GraphExecutor::InNode::ReadyWait" << endl ;
1067 bool GraphExecutor::InNode::RunningWait() {
1068 // cdebug_in << "GraphExecutor::InNode::RunningWait " << Name() << endl;
1070 aret = StateWait( SUPERV::RunningState ) ;
1074 bool GraphExecutor::InNode::DoneWait() {
1075 // cdebug_in << "GraphExecutor::InNode::DoneWait " << Name() << endl;
1077 aret = StateWait( SUPERV::DoneState ) ;
1081 bool GraphExecutor::InNode::SuspendedWait() {
1082 // cdebug_in << "GraphExecutor::InNode::SuspendedWait " << Name() << endl;
1084 aret = StateWait( SUPERV::SuspendState ) ;
1088 void GraphExecutor::InNode::InitialState( GraphExecutor::OutNode * theOutNode )
1090 cdebug_in << "GraphExecutor::InNode::InitialState Node " << Name() << endl;
1092 _OutNode = theOutNode ;
1095 _ControlState = SUPERV::VoidState ;
1096 CreateNewThread( false ) ;
1097 CreateNewThreadIf( false ) ;
1098 _SuspendSync = false ;
1099 _ResumeSync = false ;
1100 // ThreadNo( pthread_self() ) ;
1103 for ( i = 0 ; i < GetNodeOutPortsSize() ; i++ ) {
1104 if ( GetNodeOutPort(i)->IsDataStream() ) {
1105 GetChangeNodeOutPort(i)->State( SUPERV::ReadyState ) ;
1106 GetChangeNodeOutPort(i)->Done( true ) ;
1108 else if ( i != 0 || !IsGOTONode() ) {
1109 GetChangeNodeOutPort(i)->State( SUPERV::WaitingState ) ;
1110 GetChangeNodeOutPort(i)->Done( false ) ;
1114 int Pc = GetNodeInPortsSize() ;
1115 for ( i = 0 ; i < GetNodeInPortsSize() ; i++ ) {
1116 const GraphBase::InPort * anInPort = GetNodeInPort(i) ;
1117 GraphBase::OutPort * anOutPort = anInPort->GetOutPort() ;
1118 if ( IsHeadNode() && IsLoopNode() && anInPort->IsLoop() ) {
1119 anOutPort->PortStatus( DataConnected );
1120 anOutPort->State( SUPERV::ReadyState ) ;
1121 anOutPort->Done( true ) ;
1122 CORBA::Any * anAny = new CORBA::Any() ;
1123 *anAny <<= (long ) 1 ;
1124 anOutPort->Value( anAny ) ;
1126 else if ( anInPort->IsGate() && anOutPort ) {
1127 if ( IsComputingNode() || IsFactoryNode() ) {
1128 anOutPort->State( SUPERV::WaitingState ) ;
1129 anOutPort->Done( false ) ;
1131 else if ( IsOneOfInLineNodes() ) {
1132 anOutPort->PortStatus( DataConnected );
1133 anOutPort->State( SUPERV::ReadyState ) ;
1134 anOutPort->Done( true ) ;
1137 // if ( ( anInPort->IsGate() || anInPort->IsBus() ) && anOutPort == NULL ) {
1138 if ( anInPort->IsGate() && anOutPort == NULL ) {
1141 else if ( anOutPort ) {
1142 if ( anOutPort->IsDataConnected() || anOutPort->IsDataStream() ) {
1145 if ( anOutPort->IsDataConnected() || anOutPort->IsDataStream() ) {
1146 anOutPort->State( SUPERV::ReadyState ) ;
1147 anOutPort->Done( true ) ;
1149 else if ( anOutPort->IsPortConnected() ) {
1150 anOutPort->State( SUPERV::WaitingState ) ;
1151 anOutPort->Done( false ) ;
1155 if ( !anOutPort->IsDataStream() || anInPort->IsDataStream() ) {
1156 cdebug << "InPort" << i << " state change : " << anInPort->PortName() << " from OutPort "
1157 << anOutPort->PortName() << " from Node " << anOutPort->NodeName()
1158 << " with state " << theAutomaton->StateName( anOutPort->State() ) << endl ;
1159 GetChangeNodeInPort(i)->State( anOutPort->State() ) ;
1162 cdebug << "InPort" << i << " state change : " << anInPort->PortName() << " from OutPort "
1163 << anOutPort->PortName() << " from Node " << anOutPort->NodeName()
1164 << " with state ReadyState" << endl ;
1165 GetChangeNodeInPort(i)->State( SUPERV::ReadyState ) ;
1169 cdebug << "InPort" << i << " : " << anInPort->PortName() << " from OutPort "
1170 << anOutPort->PortName() << " from Node " << anOutPort->NodeName()
1172 if ( anOutPort->State() == SUPERV::WaitingState ) {
1173 cdebug << "WaitingState" ;
1175 else if ( anOutPort->State() == SUPERV::ReadyState ) {
1176 cdebug << "ReadyState" ;
1181 cdebug << " PortConnected("
1182 << anOutPort->IsPortConnected() << ") DataConnected("
1183 << anOutPort->IsDataConnected() << ")" << endl ;
1187 _currentState = Pc > 0 ? SUPERV::DataWaitingState
1188 : SUPERV::DataReadyState ;
1189 if ( Pc == GetNodeInPortsSize() ) {
1190 _OutNode->PushEvent( this , GraphExecutor::NoDataReadyEvent ,
1193 else if ( Pc != 0 ) {
1194 _OutNode->PushEvent( this , GraphExecutor::SomeDataReadyEvent ,
1198 _OutNode->PushEvent( this , GraphExecutor::AllDataReadyEvent ,
1202 for ( i = 0 ; i < GetNodeOutPortsSize() ; i++ ) {
1203 cdebug << "OutPort" << i << " : " << GetNodeOutPort(i)->PortName() << " "
1204 << theAutomaton->StateName( GetChangeNodeOutPort(i)->State() )
1205 << " " << GetNodeOutPort(i)->Kind() << endl ;
1208 cdebug << "CurrentState = " << theAutomaton->StateName( _currentState )
1211 cdebug_out << "GraphExecutor::InNode::InitialState" << endl;
1214 bool GraphExecutor::InNode::InitPythonFunctions(bool WithErr ) {
1215 cdebug_in << "GraphExecutor::InNode::InitPythonFunctions " << Name() << endl;
1217 if ( !PyFuncRunned() && IsOneOfInLineNodes() ) {
1218 if ( IsLoopNode() ) {
1219 PyObject * PyRunMethod = InLineNode()->PyRunMethod() ;
1220 PyObject * PyMoreMethod = NULL ;
1221 PyObject * PyNextMethod = NULL ;
1222 if ( PyRunMethod ) {
1225 PyRunMethod = InitPyDynInvoke( InLineNode()->PyFuncName() ,
1226 InLineNode()->PythonFunction() ,
1228 InLineNode()->PyRunMethod( PyRunMethod ) ;
1231 PyMoreMethod = LoopNode()->PyMoreMethod() ;
1232 if ( PyMoreMethod ) {
1235 PyMoreMethod = InitPyDynInvoke( LoopNode()->PyMoreName() ,
1236 LoopNode()->MorePythonFunction() ,
1238 LoopNode()->PyMoreMethod( PyMoreMethod ) ;
1242 PyNextMethod = LoopNode()->PyNextMethod() ;
1243 if ( PyNextMethod ) {
1246 PyNextMethod = InitPyDynInvoke( LoopNode()->PyNextName() ,
1247 LoopNode()->NextPythonFunction() ,
1249 LoopNode()->PyNextMethod( PyNextMethod ) ;
1252 cdebug << "GraphExecutor::InNode::InitPythonFunctions " << Name() << " PyRunMethod(Init) " << PyRunMethod
1253 << " PyMoreMethod " << PyMoreMethod << " PyNextMethod " << PyNextMethod << endl;
1255 else if ( IsInLineNode() || IsSwitchNode() ) {
1256 PyObject * PyRunMethod = InLineNode()->PyRunMethod() ;
1257 if ( PyRunMethod ) {
1260 PyRunMethod = InitPyDynInvoke( InLineNode()->PyFuncName() ,
1261 InLineNode()->PythonFunction() ,
1263 InLineNode()->PyRunMethod( PyRunMethod ) ;
1265 cdebug << "GraphExecutor::InNode::InitPythonFunctions " << Name() << " PyRunMethod " << PyRunMethod << endl;
1267 else if ( ( IsEndLoopNode() || IsEndSwitchNode() || IsGOTONode() ) &&
1268 (*InLineNode()->PythonFunction()).length() ) {
1269 PyObject * PyRunMethod = InLineNode()->PyRunMethod() ;
1270 if ( PyRunMethod ) {
1273 PyRunMethod = InitPyDynInvoke( InLineNode()->PyFuncName() ,
1274 InLineNode()->PythonFunction() ,
1276 InLineNode()->PyRunMethod( PyRunMethod ) ;
1278 cdebug << "GraphExecutor::InNode::InitPythonFunctions " << Name() << " PyRunMethod " << PyRunMethod << endl;
1281 Err = WithErr && Err ;
1282 cdebug_out << "GraphExecutor::InNode::InitPythonFunctions " << Name() ;
1284 cdebug << " Error " << Err ;
1290 const long GraphExecutor::InNode::CpuUsed( bool tot ) {
1291 CORBA::Long cpu = 0 ;
1292 // cdebug_in << "GraphExecutor::InNode::CpuUsed( " << tot << " )" << Name() << endl ;
1293 if ( IsOneOfInLineNodes() ) {
1294 // cdebug << "CpuUsed " << Name() << " --> PyCpuUsed()" << endl ;
1295 // cout << "CpuUsed " << Name() << " --> PyCpuUsed()" << endl ;
1296 cpu = PyCpuUsed( tot ) ;
1299 if ( !CORBA::is_nil( Component() ) ) {
1300 // cdebug << "CpuUsed " << Name() << " --> Component()->CpuUsed_impl()" << endl ;
1301 // cout << "CpuUsed " << Name() << " --> Component()->CpuUsed_impl()" << endl ;
1303 cpu = Component()->CpuUsed_impl() ;
1306 cdebug << "CpuUsed " << Name() << " --> Component()->CpuUsed_impl() ERROR catched " << endl ;
1307 State( SUPERV::ErroredState ) ;
1308 _OutNode->State( SUPERV::ErroredState ) ;
1313 // cdebug_out << "GraphExecutor::InNode::CpuUsed " << Name() << " CpuUsed : " << cpu << endl ;
1314 // cout << "CpuUsed " << Name() << " CpuUsed : " << cpu << endl ;
1318 #include <sys/time.h>
1319 #include <sys/resource.h>
1322 long GraphExecutor::InNode::PyCpu() {
1323 struct rusage usage ;
1325 if ( getrusage( RUSAGE_SELF , &usage ) == -1 ) {
1326 perror("GraphExecutor::InNode::PyCpu") ;
1329 // return usage.ru_utime.__time_t tv_sec ;
1330 // cdebug << pthread_self() << "PyCpu " << Name() << " " << usage.ru_utime.tv_sec << " "
1331 // << usage.ru_utime.tv_usec << " " << usage.ru_stime.tv_sec << " " << usage.ru_stime.tv_usec
1333 cpu = usage.ru_utime.tv_sec ;
1337 long GraphExecutor::InNode::PyCpuUsed( bool tot ) {
1339 if ( _PyTotCpuUsed == -1 ) {
1340 if ( _Pythread == pthread_self() ) {
1341 // cdebug << pthread_self() << "GraphExecutor::InNode::PyCpuUsed(" << tot << ") " << Name()
1342 // << " _PyTotCpuUsed " << _PyTotCpuUsed << " PyCpu() " << PyCpu() << " - " << " _PyCpuUsed "
1343 // << _PyCpuUsed << endl ;
1344 cpu = PyCpu() - _PyCpuUsed ;
1346 _PyTotCpuUsed = cpu ;
1354 cpu = _PyTotCpuUsed ;
1356 // cdebug << pthread_self() << "GraphExecutor::InNode::PyCpuUsed(" << tot << ") " << Name() << "_PyTotCpuUsed"
1357 // << _PyTotCpuUsed << " CpuUsed : " << cpu << endl ;
1361 void GraphExecutor::InNode::SetPyCpuUsed() {
1362 _PyTotCpuUsed = -1 ;
1364 _Pythread = pthread_self() ;
1365 _PyCpuUsed = PyCpu() ;
1366 // cdebug << pthread_self() << "GraphExecutor::InNode::SetPyCpuUsed " << Name() << " _PyCpuUsed : "
1367 // << _PyCpuUsed << endl ;