1 // SUPERV GraphExecutor : contains classes that permit execution of graphs and particularly the execution automaton
3 // Copyright (C) 2003 OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 // CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
6 // This library is free software; you can redistribute it and/or
7 // modify it under the terms of the GNU Lesser General Public
8 // License as published by the Free Software Foundation; either
9 // version 2.1 of the License.
11 // This library is distributed in the hope that it will be useful,
12 // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 // Lesser General Public License for more details.
16 // You should have received a copy of the GNU Lesser General Public
17 // License along with this library; if not, write to the Free Software
18 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
20 // See http://www.opencascade.org/SALOME/ or email : webmaster.salome@opencascade.org
24 // File : DataFlowExecutor_InNode.cxx
25 // Author : Jean Rahuel, CEA
38 #include <SALOMEconfig.h>
39 #include CORBA_CLIENT_HEADER(SALOME_Component)
40 //#include "SALOME_NamingService.hxx"
41 #include "SALOME_LifeCycleCORBA.hxx"
43 #include "DataFlowBase_FactoryNode.hxx"
44 #include "DataFlowBase_GOTONode.hxx"
45 #include "DataFlowBase_LoopNode.hxx"
46 #include "DataFlowBase_EndOfLoopNode.hxx"
47 #include "DataFlowBase_SwitchNode.hxx"
48 #include "DataFlowBase_EndOfSwitchNode.hxx"
50 #include "DataFlowExecutor_OutNode.hxx"
52 static void InitInNode( int &_RewindStack ,
53 SUPERV::ControlState &_ControlState ,
54 GraphExecutor::AutomatonState &_currentState ,
55 GraphExecutor::InNode ** _aReStartNode ,
56 bool & _PyFuncRunned ,
57 PyObject ** _MyPyRunMethod ,
58 pthread_mutex_t &_MutexDataWait ,
60 pthread_mutex_t &_MutexWait ,
61 pthread_cond_t &_ReadyWait ,
62 pthread_cond_t &_RunningWait ,
63 pthread_cond_t &_DoneWait ,
64 pthread_cond_t &_SuspendedWait ,
65 pthread_cond_t &_SuspendWait ,
67 pthread_cond_t &_ResumeWait ,
69 pthread_cond_t &_KillWait ,
71 pthread_cond_t &_ThreadStartedWait ,
72 bool &_ThreadStartedSync ,
73 pthread_cond_t &_StopWait ,
74 GraphExecutor::FiniteStateMachine ** _Automaton ,
75 GraphExecutor::FiniteStateMachine * theAutomaton ,
76 CORBA::ORB_ptr * _Orb ,
77 CORBA::ORB_ptr ORB ) {
79 _ControlState = SUPERV::VoidState ;
80 _currentState = GraphExecutor::UnKnownState ;
81 *_aReStartNode = NULL ;
82 _PyFuncRunned = false ;
83 *_MyPyRunMethod = NULL ;
84 pthread_mutex_init( &_MutexDataWait , NULL ) ;
86 pthread_mutex_init( &_MutexWait , NULL ) ;
87 if ( pthread_cond_init( &_ReadyWait , NULL ) ) {
88 perror("pthread_cond_init( &_ReadyWait , NULL )") ;
91 if ( pthread_cond_init( &_RunningWait , NULL ) ) {
92 perror("pthread_cond_init( &_RunningWait , NULL )") ;
95 if ( pthread_cond_init( &_DoneWait , NULL ) ) {
96 perror("pthread_cond_init( &_DoneWait , NULL )") ;
99 if ( pthread_cond_init( &_SuspendedWait , NULL ) ) {
100 perror("pthread_cond_init( &_SuspendedWait , NULL )") ;
103 if ( pthread_cond_init( &_SuspendWait , NULL ) ) {
104 perror("pthread_cond_init( &_SuspendWait , NULL )") ;
107 _SuspendSync = false ;
108 if ( pthread_cond_init( &_ResumeWait , NULL ) ) {
109 perror("pthread_cond_init( &_ResumeWait , NULL )") ;
112 _ResumeSync = false ;
113 if ( pthread_cond_init( &_KillWait , NULL ) ) {
114 perror("pthread_cond_init( &_KillWait , NULL )") ;
118 if ( pthread_cond_init( &_ThreadStartedWait , NULL ) ) {
119 perror("pthread_cond_init( &_ThreadStartedWait , NULL )") ;
122 _ThreadStartedSync = false ;
123 if ( pthread_cond_init( &_StopWait , NULL ) ) {
124 perror("pthread_cond_init( &_StopWait , NULL )") ;
127 *_Automaton = theAutomaton ;
128 *_Orb = CORBA::ORB::_nil();
131 GraphExecutor::FiniteStateMachine * theAutomaton = new GraphExecutor::FiniteStateMachine() ;
133 //GraphExecutor::InNode::InNode() :
134 // GraphBase::FactoryNode() {
135 GraphExecutor::InNode::InNode() {
136 InitInNode( _RewindStack ,
161 CORBA::ORB::_nil() ) ;
164 GraphExecutor::InNode::InNode( CORBA::ORB_ptr ORB,
165 SALOME_NamingService* ptrNamingService ,
166 const SALOME_ModuleCatalog::Service& aService ,
167 const char * ComponentName ,
168 const char * NodeInterfaceName ,
169 const char * NodeName ,
170 const SUPERV::KindOfNode akind ,
171 GraphBase::ListOfFuncName aFuncName ,
172 GraphBase::ListOfPythonFunctions aPythonFunction ,
173 const SUPERV::SDate NodeFirstCreation ,
174 const SUPERV::SDate NodeLastModification ,
175 const char * NodeEditorRelease ,
176 const char * NodeAuthor ,
177 const char * NodeComputer ,
178 const char * NodeComment ,
179 const bool GeneratedName ,
182 int * Graph_prof_debug,
183 ofstream * Graph_fdebug) {
184 // ostream * Graph_fdebug = NULL ) :
185 // GraphBase::FactoryNode( ORB , ptrNamingService , aService ,
186 // ComponentName , NodeInterfaceName ,
187 // NodeName , akind ,
188 // NodeFirstCreation , NodeLastModification ,
189 // NodeEditorRelease , NodeAuthor ,
190 // NodeComputer , NodeComment , GeneratedName ,
192 // Graph_prof_debug , Graph_fdebug ) {
193 InitInNode( _RewindStack ,
219 SetDebug( ORB , Graph_prof_debug , Graph_fdebug ) ;
221 _ComputingNode = NULL ;
222 _FactoryNode = NULL ;
226 _EndOfLoopNode = NULL ;
228 _EndOfSwitchNode = NULL ;
230 case SUPERV::ComputingNode : {
231 cdebug << "GraphExecutor::InNode::InNode SUPERV::ComputingNode : " << NodeName ;
232 _ComputingNode = new GraphBase::ComputingNode( ORB , ptrNamingService ,
236 NodeLastModification ,
237 NodeEditorRelease , NodeAuthor ,
238 NodeComment , GeneratedName ,
240 Graph_prof_debug , Graph_fdebug ) ;
243 case SUPERV::FactoryNode : {
244 cdebug << "GraphExecutor::InNode::InNode SUPERV::FactoryNode : " << NodeName ;
245 _FactoryNode = new GraphBase::FactoryNode( ORB , ptrNamingService , aService ,
246 ComponentName , NodeInterfaceName ,
249 NodeLastModification ,
250 NodeEditorRelease , NodeAuthor ,
251 NodeComputer , NodeComment ,
252 GeneratedName , NodeX , NodeY ,
253 Graph_prof_debug , Graph_fdebug ) ;
254 _ComputingNode = (GraphBase::ComputingNode *) _FactoryNode ;
257 case SUPERV::InLineNode : {
258 cdebug << "GraphExecutor::InNode::InNode SUPERV::InLineNode : " << NodeName ;
259 _InLineNode = new GraphBase::InLineNode( ORB , ptrNamingService ,
260 aFuncName[0].c_str() , *aPythonFunction[0] ,
262 NodeFirstCreation , NodeLastModification ,
263 NodeEditorRelease , NodeAuthor ,
264 NodeComment , GeneratedName ,
266 Graph_prof_debug , Graph_fdebug ) ;
267 _ComputingNode = (GraphBase::ComputingNode *) _InLineNode ;
270 case SUPERV::MacroNode : {
271 cdebug << "GraphExecutor::InNode::InNode SUPERV::MacroNode : " << NodeName << endl ;
272 _GraphMacroNode = new GraphBase::Graph( ORB , ptrNamingService ,
273 // aFuncName[0].c_str() , *aPythonFunction[0] ,
275 // NodeFirstCreation , NodeLastModification ,
276 // NodeEditorRelease , NodeAuthor ,
277 // NodeComment , GeneratedName ,
279 Graph_prof_debug , Graph_fdebug ) ;
280 _ComputingNode = (GraphBase::ComputingNode *) _GraphMacroNode ;
281 _InLineNode = (GraphBase::InLineNode *) _ComputingNode ;
282 _GOTONode = (GraphBase::GOTONode *) _InLineNode ;
283 _GraphMacroNode->Coordinates( NodeX , NodeY ) ;
286 case SUPERV::GOTONode : {
287 cdebug << "GraphEditor::InNode::InNode SUPERV::GOTONode : " << NodeName ;
288 _GOTONode = new GraphBase::GOTONode( ORB , ptrNamingService ,
289 aFuncName[0].c_str() , *aPythonFunction[0] ,
291 NodeFirstCreation , NodeLastModification ,
292 NodeEditorRelease , NodeAuthor ,
293 NodeComment , GeneratedName ,
295 Graph_prof_debug , Graph_fdebug ) ;
296 _ComputingNode = (GraphBase::ComputingNode *) _GOTONode ;
297 _InLineNode = (GraphBase::InLineNode *) _ComputingNode ;
300 case SUPERV::LoopNode : {
301 cdebug << "GraphExecutor::InNode::InNode SUPERV::LoopNode : " << NodeName ;
302 _LoopNode = new GraphBase::LoopNode( ORB , ptrNamingService ,
303 aFuncName[0].c_str() , *aPythonFunction[0] ,
304 aFuncName[1].c_str() , *aPythonFunction[1] ,
305 aFuncName[2].c_str() , *aPythonFunction[2] ,
307 NodeFirstCreation , NodeLastModification ,
308 NodeEditorRelease , NodeAuthor ,
309 NodeComment , GeneratedName ,
311 Graph_prof_debug , Graph_fdebug ) ;
312 _ComputingNode = (GraphBase::ComputingNode *) _LoopNode ;
313 _GOTONode = (GraphBase::GOTONode *) _ComputingNode ;
314 _InLineNode = (GraphBase::InLineNode *) _ComputingNode ;
317 case SUPERV::EndLoopNode : {
318 cdebug << "GraphEditor::InNode::InNode SUPERV::EndOfLoopNode : " << NodeName ;
319 _EndOfLoopNode = new GraphBase::EndOfLoopNode(
320 ORB , ptrNamingService ,
321 aFuncName[0].c_str() , *aPythonFunction[0] ,
323 NodeFirstCreation , NodeLastModification ,
324 NodeEditorRelease , NodeAuthor ,
325 NodeComment , GeneratedName ,
327 Graph_prof_debug , Graph_fdebug ) ;
328 _ComputingNode = (GraphBase::ComputingNode *) _EndOfLoopNode ;
329 _GOTONode = (GraphBase::GOTONode *) _ComputingNode ;
330 _InLineNode = (GraphBase::InLineNode *) _ComputingNode ;
333 case SUPERV::SwitchNode : {
334 cdebug << "GraphExecutor::InNode::InNode SUPERV::SwitchNode : " << NodeName ;
335 _SwitchNode = new GraphBase::SwitchNode( ORB , ptrNamingService ,
336 aFuncName[0].c_str() , *aPythonFunction[0] ,
338 NodeFirstCreation , NodeLastModification ,
339 NodeEditorRelease , NodeAuthor ,
340 NodeComment , GeneratedName ,
342 Graph_prof_debug , Graph_fdebug ) ;
343 _ComputingNode = (GraphBase::ComputingNode *) _SwitchNode ;
344 _GOTONode = (GraphBase::GOTONode *) _ComputingNode ;
345 _InLineNode = (GraphBase::InLineNode *) _ComputingNode ;
348 case SUPERV::EndSwitchNode : {
349 cdebug << "GraphEditor::InNode::InNode SUPERV::EndOfSwitchNode : " << NodeName ;
350 _EndOfSwitchNode = new GraphBase::EndOfSwitchNode(
351 ORB , ptrNamingService ,
352 aFuncName[0].c_str() , *aPythonFunction[0] ,
354 NodeFirstCreation , NodeLastModification ,
355 NodeEditorRelease , NodeAuthor ,
356 NodeComment , GeneratedName ,
358 Graph_prof_debug , Graph_fdebug ) ;
359 _ComputingNode = (GraphBase::ComputingNode *) _EndOfSwitchNode ;
360 _GOTONode = (GraphBase::GOTONode *) _ComputingNode ;
361 _InLineNode = (GraphBase::InLineNode *) _ComputingNode ;
364 case SUPERV::DataFlowGraph : {
365 cdebug << "GraphEditor::InNode::InNode SUPERV::DataFlowGraph ERROR : " << NodeName ;
367 case SUPERV::DataStreamGraph : {
368 cdebug << "GraphEditor::InNode::InNode SUPERV::DataStreamGraph ERROR : " << NodeName ;
370 case SUPERV::UnknownNode : {
371 cdebug << "GraphEditor::InNode::InNode SUPERV::UnknownNode ERROR : " << NodeName ;
374 cdebug << "GraphExecutor::InNode::InNode " << (void *) this
375 << " _ComputingNode " << (void *) _ComputingNode ;
376 _ComputingNode->InNode( this ) ;
379 GraphExecutor::InNode::~InNode() {
382 void GraphExecutor::InNode::LockDataWait() {
383 if ( pthread_mutex_lock( &_MutexDataWait ) ) {
384 perror("Ready pthread_mutex_lock ") ;
389 void GraphExecutor::InNode::UnLockDataWait() {
391 if ( pthread_mutex_unlock( &_MutexDataWait ) ) {
392 perror("Ready pthread_mutex_unlock ") ;
397 Engines::Component_var GraphExecutor::InNode::Component() const {
398 if ( IsFactoryNode() ) {
399 return _FactoryNode->Component() ;
402 CORBA::Any const * anAnyComponent = GetChangeNodeInPort( 0 )->GetOutPort()->Value() ; // this
403 CORBA::Object_ptr obj ;
405 *anAnyComponent >>= obj ;
406 return Engines::Component::_narrow( obj ) ;
409 cdebug << "GraphExecutor::InNode::Component Component catch" << endl ;
412 return Engines::Component::_nil() ;
415 Engines::Container_var GraphExecutor::InNode::Container() const {
416 if ( IsFactoryNode() ) {
417 return _FactoryNode->Container() ;
419 return Engines::Container::_nil() ;
423 bool GraphExecutor::InNode::Ping() {
424 // cdebug_in << "GraphExecutor::InNode::Ping" << endl;
426 if ( IsFactoryNode() ) {
427 RetVal = !CORBA::is_nil( _FactoryNode->Component() ) ;
429 if ( State() != GraphExecutor::SuspendedExecutingState ) {
431 _FactoryNode->Component()->ping() ;
434 cdebug << "InNode::Ping() catched" << endl ;
435 State( GraphExecutor::ErroredState ) ;
436 _OutNode->State( GraphExecutor::ErroredState ) ;
445 // cdebug_out << "GraphExecutor::InNode::Ping" << endl ;
449 void GraphExecutor::InNode::NewThread( pthread_t aThread ) {
450 ThreadNo ( aThread ) ;
452 _OutNode->NewThread() ;
454 void GraphExecutor::InNode::ExitThread() {
456 _OutNode->ExitThread() ;
459 bool GraphExecutor::InNode::Suspend() {
460 cdebug_in << "GraphExecutor::InNode::Suspend " << Name() << " " << ThreadNo()
464 ControlState( SUPERV::VoidState ) ;
465 if ( _OutNode->IsDone() ) {
466 ControlState( SUPERV::VoidState ) ;
470 else if ( IsWaiting() || IsReady() ) {
471 ControlState( SUPERV::ToSuspendState ) ;
474 else if ( IsRunning() ) {
475 ControlState( SUPERV::ToSuspendState ) ;
476 if ( IsFactoryNode() || IsComputingNode() ) {
477 if ( !CORBA::is_nil( Component() ) ) {
479 RetVal = Component()->Suspend_impl() ;
482 cdebug << "InNode::Suspend() catched" << endl ;
483 State( GraphExecutor::ErroredState ) ;
484 _OutNode->State( GraphExecutor::ErroredState ) ;
489 cdebug << pthread_self() << "GraphExecutor::InNode::Suspend_impl " << Name()
490 << " --> thread" << ThreadNo() << " SuspendEvent " << endl;
491 SendEvent( GraphExecutor::SuspendEvent ) ;
492 cdebug << pthread_self() << "GraphExecutor::InNode::Suspended_impl in Container"
493 << Name() << " --> thread" << ThreadNo() << endl;
495 else if ( IsDone() ) {
496 ControlState( SUPERV::VoidState ) ;
497 RetVal = false ; // Too late ...
500 cdebug << "component Suspended and !IsDone and !IsRunning !"
506 cdebug << "Suspend cannot Suspend component !" << endl ;
511 cdebug << "Suspend with nilComponent while RunningState !" << endl ;
516 cdebug << "Suspend and !IsDone and !IsRunning and !IsWaiting ?"
520 cdebug_out << "GraphExecutor::InNode::Suspend " << RetVal << " "
521 << Automaton()->StateName( State() ) << endl ;
525 bool GraphExecutor::InNode::ContainerKill() {
526 cdebug_in << "GraphExecutor::InNode::ContainerKill " << Name() << " "
527 << ThreadNo() << endl;
529 if ( IsFactoryNode() ) {
531 RetVal = Container()->Kill_impl() ;
533 cdebug_out << "GraphExecutor::InNode::ContainerKill" << endl ;
537 bool GraphExecutor::InNode::Kill() {
538 cdebug_in << "GraphExecutor::InNode::Kill " << Name() << " " << ThreadNo() << " "
539 << Automaton()->StateName( State() ) << endl;
542 ControlState( SUPERV::ToKillState ) ; // if loop
543 if ( _OutNode->IsDone() ) {
544 ControlState( SUPERV::VoidState ) ;
549 ControlState( SUPERV::ToKillState ) ;
551 if ( _OutNode->IsDone() ) {
552 ControlState( SUPERV::VoidState ) ;
558 if ( IsFactoryNode() || IsComputingNode() ) {
559 if ( !CORBA::is_nil( Component() ) ) {
561 RetVal = Component()->Kill_impl() ;
564 cdebug << "InNode::Suspend() catched" << endl ;
565 State( GraphExecutor::ErroredState ) ;
566 _OutNode->State( GraphExecutor::ErroredState ) ;
569 cdebug << "Component()->Kill_impl() returns status " << RetVal << endl ;
572 cdebug << pthread_self() << "GraphExecutor::InNode::Kill_impl " << Name()
573 << " --> thread" << ThreadNo() << " SuspendEvent " << endl;
574 SendEvent( GraphExecutor::KillEvent ) ;
575 cdebug << pthread_self() << "GraphExecutor::InNode::Killed_impl in Container"
576 << Name() << " --> thread" << ThreadNo() << endl;
578 else if ( IsDone() ) {
579 ControlState( SUPERV::VoidState ) ;
580 RetVal = false ; // Too late ...
583 cdebug << "component Killed and !IsDone and !IsRunning !"
588 cdebug << "Kill with nilComponent cannot Kill component !" << endl ;
593 else if ( IsSuspended() ) {
594 cdebug << pthread_self() << "GraphExecutor::InNode::Kill " << Name()
595 << " --> thread" << ThreadNo() << " Resume()" << endl;
599 else if ( IsWaiting() ) {
603 cdebug << "Kill and !IsDone and !IsRunning and !IsWaiting ?"
609 cdebug_out << "GraphExecutor::InNode::Kill" << endl ;
613 bool GraphExecutor::InNode::KillDone() {
614 cdebug_in << "GraphExecutor::InNode::KillDone " << Name() << " " << ThreadNo()
617 if ( ControlState() == SUPERV::ToKillDoneState || IsDone() ) {
621 ControlState( SUPERV::ToKillDoneState ) ;
623 if ( _OutNode->IsDone() ) {
624 ControlState( SUPERV::VoidState ) ;
632 else if ( IsWaiting() ) {
636 cdebug << "KillDone and !IsDone and !IsRunning and !IsWaiting ?"
642 cdebug_out << "GraphExecutor::InNode::KillDone" << endl ;
646 bool GraphExecutor::InNode::Stop() {
647 cdebug_in << "GraphExecutor::InNode::Stop " << Name() << " " << ThreadNo()
650 if ( ControlState() == SUPERV::ToStopState || IsDone() ) {
654 ControlState( SUPERV::ToStopState ) ;
656 if ( _OutNode->IsDone() ) {
657 ControlState( SUPERV::VoidState ) ;
663 if ( IsFactoryNode() || IsComputingNode() ) {
664 if ( !CORBA::is_nil( Component() ) ) {
666 RetVal = Component()->Stop_impl() ;
669 cdebug << "InNode::Stop() catched" << endl ;
670 State( GraphExecutor::ErroredState ) ;
671 _OutNode->State( GraphExecutor::ErroredState ) ;
676 SendEvent( GraphExecutor::StopEvent ) ;
678 else if ( IsDone() ) {
679 ControlState( SUPERV::VoidState ) ;
680 RetVal = false ; // Too late ...
683 cdebug << "component Suspended and !IsDone and !IsRunning !"
689 cdebug << "Suspend cannot Suspend component !" << endl ;
694 cdebug << "Suspend with nilComponent while RunningState !" << endl ;
698 else if ( IsWaiting() ) {
702 cdebug << "Suspend and !IsDone and !IsRunning and !IsWaiting ?"
708 cdebug_out << "GraphExecutor::InNode::Stop" << endl ;
712 bool GraphExecutor::InNode::SuspendDone() {
713 cdebug_in << "GraphExecutor::InNode::SuspendDone " << Name() << " "
714 << ThreadNo() << endl;
716 if ( ControlState() == SUPERV::ToSuspendDoneState || IsDone() ) {
720 ControlState( SUPERV::ToSuspendDoneState ) ;
722 if ( _OutNode->IsDone() ) {
723 ControlState( SUPERV::VoidState ) ;
731 cdebug_out << "GraphExecutor::InNode::SuspendDone" << endl ;
735 bool GraphExecutor::InNode::Resume() {
736 cdebug_in << pthread_self() << "/" << ThreadNo()
737 << " GraphExecutor::InNode::Resume " << Name() << " "
738 << Automaton()->StateName( State() ) << endl;
739 bool RetVal = false ;
740 if ( IsSuspended() ) {
741 if ( State() == GraphExecutor::SuspendedReadyState ) {
742 ResumeAction( GraphExecutor::ToResumeEvent ) ;
745 else if ( State() == GraphExecutor::SuspendedExecutingState ) {
746 if ( IsFactoryNode() || IsComputingNode() ) {
748 RetVal = Component()->Resume_impl() ;
751 cdebug << "InNode::Resume() catched" << endl ;
752 State( GraphExecutor::ErroredState ) ;
753 _OutNode->State( GraphExecutor::ErroredState ) ;
758 else if ( State() == GraphExecutor::SuspendedSuccessedState ) {
759 ResumeAction( GraphExecutor::ResumeEvent ) ;
762 else if ( State() == GraphExecutor::SuspendedErroredState ) {
763 ResumeAction( GraphExecutor::ResumeEvent ) ;
767 cdebug << "GraphExecutor::InNode::Resume Not SuspendedReady/Executing/Successed/ErroredState "
768 << Automaton()->StateName( State() ) << endl ;
773 cdebug << "GraphExecutor::InNode::Resume Not Suspended State "
774 << Automaton()->StateName( State() ) << endl ;
777 if ( ControlState() == SUPERV::ToSuspendStartState ) {
778 ControlState( SUPERV::VoidState ) ;
782 if ( ControlState() == SUPERV::ToSuspendRunState ||
783 ( ControlState() == SUPERV::ToSuspendState &&
784 State() == GraphExecutor::SuspendedReadyState) ) {
785 if ( IsSuspended() ) {
786 if ( State() == GraphExecutor::SuspendedReadyState ) {
790 else if ( State() == GraphExecutor::SuspendedExecutingState ) {
792 RetVal = Component()->Resume_impl() ;
795 cdebug << "GraphExecutor::InNode::Resume State "
796 << Automaton()->StateName( State() ) << endl ;
799 if ( ControlState() != SUPERV::ToSuspendState ) {
800 ControlState( SUPERV::VoidState ) ;
803 else if ( IsRunning() ) {
806 else if ( IsWaiting() ) {
807 ControlState( SUPERV::VoidState ) ;
810 else if ( IsDone() ) {
814 else if ( ControlState() == SUPERV::ToSuspendDoneState ||
815 ( ControlState() == SUPERV::ToSuspendState &&
816 State() == GraphExecutor::SuspendedSuccessedState) ) {
817 if ( IsSuspended() ) {
818 if ( State() == GraphExecutor::SuspendedSuccessedState ) {
822 else if ( State() == GraphExecutor::SuspendedErroredState ) {
827 cdebug << "GraphExecutor::InNode::Resume State " << State() << endl ;
830 if ( ControlState() != SUPERV::ToSuspendState ) {
831 ControlState( SUPERV::VoidState ) ;
834 else if ( IsRunning() ) {
835 ControlState( SUPERV::VoidState ) ;
838 else if ( IsWaiting() ) {
839 ControlState( SUPERV::VoidState ) ;
842 else if ( IsDone() ) {
843 ControlState( SUPERV::VoidState ) ;
848 cdebug_out << "GraphExecutor::InNode::Resume " << RetVal << endl ;
852 bool GraphExecutor::InNode::ReStart( const char * AtNodeName ,
853 const bool AndSuspend ) {
854 bool RetVal = false ;
855 GraphExecutor::InNode * aRestartNode = (GraphExecutor::InNode *) _OutNode->Graph()->GetGraphNode( AtNodeName )->GetInNode() ;
856 cdebug_in << pthread_self() << "/" << ThreadNo()
857 << " --> GraphExecutor::InNode::ReStartAt( "
858 << AtNodeName << " , " << AndSuspend << ") " << endl
859 << "thread " << aRestartNode->ThreadNo() << " "
860 << Automaton()->StateName( aRestartNode->State() )
861 << " from " << Name() << " " << Automaton()->StateName( State() )
863 if ( IsWaiting() && aRestartNode->IsSuspended() ) {
864 RetVal = aRestartNode->Resume() ;
866 else if ( IsSuspended() ) {
867 if ( strcmp( AtNodeName , Name() ) ) {
868 aRestartNode->State( GraphExecutor::SuspendedSuccessedState ) ;
871 ReStartAction( aRestartNode , GraphExecutor::ReStartAndSuspendEvent ) ;
874 ReStartAction( aRestartNode , GraphExecutor::ReStartEvent ) ;
878 cdebug_out << "<-- GraphExecutor::InNode::ReStartAt" << endl ;
882 bool GraphExecutor::InNode::IsWaiting() {
884 // cdebug_in << "GraphExecutor::InNode::IsWaiting " << Name() << endl;
885 GraphExecutor::AutomatonState aState = State() ;
886 if ( aState == GraphExecutor::DataUndefState ||
887 aState == GraphExecutor::DataWaitingState ||
888 aState == GraphExecutor::SuspendedReadyState )
889 // aState == GraphExecutor::SuspendedExecutingState ||
890 // aState == GraphExecutor::SuspendedSuccessedState ||
891 // aState == GraphExecutor::SuspendedErroredState ||
892 // aState == GraphExecutor::SuspendedState
894 // cdebug_out << "GraphExecutor::InNode::IsWaiting" << endl ;
898 bool GraphExecutor::InNode::IsReady() {
900 // cdebug_in << "GraphExecutor::InNode::IsReady " << Name() << endl;
901 GraphExecutor::AutomatonState aState = State() ;
902 // if ( aState == GraphExecutor::DataUndefState ||
903 // aState == GraphExecutor::DataWaitingState ||
904 if ( aState == GraphExecutor::DataReadyState ||
905 aState == GraphExecutor::ResumedReadyState )
907 // cdebug_out << "GraphExecutor::InNode::IsReady" << endl ;
911 bool GraphExecutor::InNode::IsRunning() {
913 // cdebug_in << "GraphExecutor::InNode::IsRunning " << Name() << endl;
914 GraphExecutor::AutomatonState aState = State() ;
915 if ( aState == GraphExecutor::ExecutingState ||
916 aState == GraphExecutor::ResumedExecutingState )
918 // cdebug_out << "GraphExecutor::InNode::IsRunning" << endl ;
922 bool GraphExecutor::InNode::IsDone() {
924 // cdebug_in << "GraphExecutor::InNode::IsDone " << Name() << endl;
925 GraphExecutor::AutomatonState aState = State() ;
926 if ( aState == GraphExecutor::KilledReadyState ||
927 aState == GraphExecutor::StoppedReadyState ||
928 aState == GraphExecutor::KilledExecutingState ||
929 aState == GraphExecutor::StoppedExecutingState ||
930 aState == GraphExecutor::SuspendedSuccessedState ||
931 aState == GraphExecutor::SuspendedErroredState ||
932 // aState == GraphExecutor::SuccessedExecutingState ||
933 // aState == GraphExecutor::ErroredExecutingState ||
934 aState == GraphExecutor::SuccessedState ||
935 aState == GraphExecutor::ErroredState ||
936 aState == GraphExecutor::ResumedSuccessedState ||
937 aState == GraphExecutor::ResumedErroredState ||
938 aState == GraphExecutor::KilledSuccessedState ||
939 aState == GraphExecutor::StoppedSuccessedState )
941 // cdebug_out << "GraphExecutor::InNode::IsDone" << endl ;
945 bool GraphExecutor::InNode::IsSuspended() {
947 // cdebug_in << "GraphExecutor::InNode::IsSuspended " << Name() << endl;
948 GraphExecutor::AutomatonState aState = State() ;
949 if ( aState == GraphExecutor::SuspendedReadyState ||
950 aState == GraphExecutor::SuspendedExecutingState ||
951 aState == GraphExecutor::SuspendedSuccessedState ||
952 aState == GraphExecutor::SuspendedErroredState )
954 // cdebug_out << "GraphExecutor::InNode::IsSuspended" << endl ;
957 bool GraphExecutor::InNode::IsKilled() {
959 // cdebug_in << "GraphExecutor::InNode::IsKilled " << Name() << endl;
960 GraphExecutor::AutomatonState aState = State() ;
961 if ( aState == GraphExecutor::KilledReadyState ||
962 aState == GraphExecutor::KilledExecutingState ||
963 aState == GraphExecutor::KilledSuccessedState ||
964 aState == GraphExecutor::KilledErroredState ||
965 aState == GraphExecutor::KilledState )
967 // cdebug_out << "GraphExecutor::InNode::IsKilled" << endl ;
970 bool GraphExecutor::InNode::IsStopped() {
972 // cdebug_in << "GraphExecutor::InNode::IsStopped " << Name() << endl;
973 GraphExecutor::AutomatonState aState = State() ;
974 if ( aState == GraphExecutor::StoppedReadyState ||
975 aState == GraphExecutor::StoppedExecutingState ||
976 aState == GraphExecutor::StoppedSuccessedState ||
977 aState == GraphExecutor::StoppedErroredState ||
978 aState == GraphExecutor::StoppedState )
980 // cdebug_out << "GraphExecutor::InNode::IsStopped" << endl ;
984 bool GraphExecutor::InNode::StateWait( SUPERV::GraphState aState ) {
985 bool RetVal = false ;
986 if ( pthread_mutex_lock( &_MutexWait ) ) {
987 perror("pthread_mutex_lock _Wait") ;
991 case SUPERV::ReadyState : {
993 cdebug_in << pthread_self() << " StateWait( Ready ) " << RetVal
994 << " " << Automaton()->StateName( _currentState )
995 << " pthread_cond_wait _ReadyWait " << Name() << endl ;
996 while ( !RetVal && !IsDone() ) {
997 cdebug << pthread_self() << " pthread_cond_wait ReadyWait" << endl ;
998 pthread_cond_wait( &_ReadyWait , &_MutexWait );
1000 cdebug << pthread_self() << " pthread_cond_waited ReadyWait "
1001 << Automaton()->StateName( _currentState ) << " " << RetVal
1004 cdebug_out << pthread_self() << " StateWait( Ready ) " << RetVal
1005 << " " << Automaton()->StateName( _currentState )
1006 << " pthread_cond_wait _ReadyWait " << Name() << endl ;
1009 case SUPERV::RunningState : {
1010 RetVal = IsRunning() ;
1011 cdebug_in << pthread_self() << " StateWait( Running ) " << RetVal
1012 << " " << Automaton()->StateName( _currentState )
1013 << " pthread_cond_wait _RunningWait " << Name() << endl ;
1014 while ( !RetVal && !IsDone() ) {
1015 cdebug << pthread_self() << " pthread_cond_wait RunningWait" << endl ;
1016 pthread_cond_wait( &_RunningWait , &_MutexWait );
1017 RetVal = IsRunning() ;
1018 cdebug << pthread_self() << " pthread_cond_waited RunningWait "
1019 << Automaton()->StateName( _currentState ) << " " << RetVal
1022 cdebug_out << pthread_self() << " StateWait( Running ) " << RetVal
1023 << " " << Automaton()->StateName( _currentState )
1024 << " pthread_cond_wait _RunningWait " << Name() << endl ;
1027 case SUPERV::DoneState : {
1029 cdebug_in << pthread_self() << " StateWait( Done ) " << RetVal
1030 << " " << Automaton()->StateName( _currentState )
1031 << " pthread_cond_wait _DoneWait " << Name() << endl ;
1033 cdebug << pthread_self() << " pthread_cond_wait DoneWait" << endl ;
1034 pthread_cond_wait( &_DoneWait , &_MutexWait );
1036 cdebug << pthread_self() << " pthread_cond_waited DoneWait "
1037 << Automaton()->StateName( _currentState ) << " " << RetVal
1040 cdebug_out << pthread_self() << " StateWait( Done ) " << RetVal
1041 << " " << Automaton()->StateName( _currentState )
1042 << " pthread_cond_wait _DoneWait " << Name() << endl ;
1045 case SUPERV::SuspendState : {
1046 RetVal = IsSuspended() ;
1047 cdebug_in << pthread_self() << " StateWait( Suspend ) " << RetVal
1048 << " " << Automaton()->StateName( _currentState )
1049 << " pthread_cond_wait _SuspendedWait " << Name() << endl ;
1050 while ( !RetVal && !IsDone() ) {
1051 cdebug << pthread_self() << " pthread_cond_wait SuspendedWait" << endl ;
1052 pthread_cond_wait( &_SuspendedWait , &_MutexWait );
1053 RetVal = IsSuspended() ;
1054 cdebug << pthread_self() << " pthread_cond_waited SuspendedWait "
1055 << Automaton()->StateName( _currentState ) << " " << RetVal
1058 cdebug_out << pthread_self() << " StateWait( Suspend ) " << RetVal
1059 << " " << Automaton()->StateName( _currentState )
1060 << " pthread_cond_wait _SuspendedWait " << Name() << endl ;
1064 cdebug << " SUPERV::OutNode::StateWait Error Undefined State : "
1068 if ( pthread_mutex_unlock( &_MutexWait ) ) {
1069 perror("pthread_mutex_lock _Wait") ;
1075 bool GraphExecutor::InNode::ReadyWait() {
1076 // cdebug_in << "GraphExecutor::InNode::ReadyWait " << Name() << endl;
1078 aret = StateWait( SUPERV::ReadyState ) ;
1079 // cdebug_out << "GraphExecutor::InNode::ReadyWait" << endl ;
1083 bool GraphExecutor::InNode::RunningWait() {
1084 // cdebug_in << "GraphExecutor::InNode::RunningWait " << Name() << endl;
1086 aret = StateWait( SUPERV::RunningState ) ;
1090 bool GraphExecutor::InNode::DoneWait() {
1091 // cdebug_in << "GraphExecutor::InNode::DoneWait " << Name() << endl;
1093 aret = StateWait( SUPERV::DoneState ) ;
1097 bool GraphExecutor::InNode::SuspendedWait() {
1098 // cdebug_in << "GraphExecutor::InNode::SuspendedWait " << Name() << endl;
1100 aret = StateWait( SUPERV::SuspendState ) ;
1104 void GraphExecutor::InNode::InitialState( GraphExecutor::OutNode * theOutNode )
1106 cdebug_in << "GraphExecutor::InNode::InitialState Node " << Name() << endl;
1108 _OutNode = theOutNode ;
1111 _ControlState = SUPERV::VoidState ;
1112 CreateNewThread( false ) ;
1113 CreateNewThreadIf( false ) ;
1114 _SuspendSync = false ;
1115 _ResumeSync = false ;
1116 // ThreadNo( pthread_self() ) ;
1119 for ( i = 0 ; i < GetNodeOutPortsSize() ; i++ ) {
1120 if ( GetNodeOutPort(i)->IsDataStream() ) {
1121 GetChangeNodeOutPort(i)->State( SUPERV::ReadyState ) ;
1122 GetChangeNodeOutPort(i)->Done( true ) ;
1124 else if ( i != 0 || !IsGOTONode() ) {
1125 GetChangeNodeOutPort(i)->State( SUPERV::WaitingState ) ;
1126 GetChangeNodeOutPort(i)->Done( false ) ;
1130 int Pc = GetNodeInPortsSize() ;
1131 for ( i = 0 ; i < GetNodeInPortsSize() ; i++ ) {
1132 const GraphBase::InPort * anInPort = GetNodeInPort(i) ;
1133 GraphBase::OutPort * anOutPort = anInPort->GetOutPort() ;
1134 if ( IsHeadNode() && IsLoopNode() && anInPort->IsLoop() ) {
1135 anOutPort->PortStatus( DataConnected );
1136 anOutPort->State( SUPERV::ReadyState ) ;
1137 anOutPort->Done( true ) ;
1138 CORBA::Any * anAny = new CORBA::Any() ;
1139 *anAny <<= (long ) 1 ;
1140 anOutPort->Value( anAny ) ;
1142 else if ( anInPort->IsGate() && anOutPort ) {
1143 if ( IsComputingNode() || IsFactoryNode() ) {
1144 anOutPort->State( SUPERV::WaitingState ) ;
1145 anOutPort->Done( false ) ;
1147 else if ( IsOneOfInLineNodes() ) {
1148 anOutPort->PortStatus( DataConnected );
1149 anOutPort->State( SUPERV::ReadyState ) ;
1150 anOutPort->Done( true ) ;
1153 // if ( ( anInPort->IsGate() || anInPort->IsBus() ) && anOutPort == NULL ) {
1154 if ( anInPort->IsGate() && anOutPort == NULL ) {
1157 else if ( anOutPort ) {
1158 if ( anOutPort->IsDataConnected() || anOutPort->IsDataStream() ) {
1161 if ( anOutPort->IsDataConnected() || anOutPort->IsDataStream() ) {
1162 anOutPort->State( SUPERV::ReadyState ) ;
1163 anOutPort->Done( true ) ;
1165 else if ( anOutPort->IsPortConnected() ) {
1166 anOutPort->State( SUPERV::WaitingState ) ;
1167 anOutPort->Done( false ) ;
1171 if ( !anOutPort->IsDataStream() || anInPort->IsDataStream() ) {
1172 cdebug << "InPort" << i << " state change : " << anInPort->PortName() << " from OutPort "
1173 << anOutPort->PortName() << " from Node " << anOutPort->NodeName()
1174 << " with state " << theAutomaton->StateName( anOutPort->State() ) << endl ;
1175 GetChangeNodeInPort(i)->State( anOutPort->State() ) ;
1177 else if ( anOutPort->IsDataConnected() ) {
1178 cdebug << "InPort" << i << " state change : " << anInPort->PortName() << " from OutPort "
1179 << anOutPort->PortName() << " from Node " << anOutPort->NodeName()
1180 << " with state ReadyState" << endl ;
1181 GetChangeNodeInPort(i)->State( SUPERV::ReadyState ) ;
1184 cdebug << "InPort" << i << " state NOT changed : " << anInPort->PortName() << " from OutPort "
1185 << anOutPort->PortName() << " " << anOutPort->PortStatus() << " from Node " << anOutPort->NodeName()
1186 << " with state " << anOutPort->State() << endl ;
1190 cdebug << "InPort" << i << " : " << anInPort->PortName() << " from OutPort "
1191 << anOutPort->PortName() << " from Node " << anOutPort->NodeName()
1193 if ( anOutPort->State() == SUPERV::WaitingState ) {
1194 cdebug << "WaitingState" ;
1196 else if ( anOutPort->State() == SUPERV::ReadyState ) {
1197 cdebug << "ReadyState" ;
1202 cdebug << " PortConnected("
1203 << anOutPort->IsPortConnected() << ") DataConnected("
1204 << anOutPort->IsDataConnected() << ")" << endl ;
1208 _currentState = Pc > 0 ? GraphExecutor::DataWaitingState
1209 : GraphExecutor::DataReadyState ;
1210 if ( Pc == GetNodeInPortsSize() ) {
1211 _OutNode->PushEvent( this , GraphExecutor::NoDataReadyEvent ,
1214 else if ( Pc != 0 ) {
1215 _OutNode->PushEvent( this , GraphExecutor::SomeDataReadyEvent ,
1219 _OutNode->PushEvent( this , GraphExecutor::AllDataReadyEvent ,
1223 for ( i = 0 ; i < GetNodeOutPortsSize() ; i++ ) {
1224 cdebug << "OutPort" << i << " : " << GetNodeOutPort(i)->PortName() << " "
1225 << theAutomaton->StateName( GetChangeNodeOutPort(i)->State() )
1226 << " " << GetNodeOutPort(i)->Kind() << endl ;
1229 cdebug << "CurrentState = " << theAutomaton->StateName( _currentState )
1232 cdebug_out << "GraphExecutor::InNode::InitialState" << endl;
1235 bool GraphExecutor::InNode::InitPythonFunctions(bool WithErr ) {
1236 cdebug_in << "GraphExecutor::InNode::InitPythonFunctions " << Name() << endl;
1238 if ( !PyFuncRunned() && IsOneOfInLineNodes() ) {
1239 if ( IsLoopNode() ) {
1240 PyObject * PyRunMethod = InLineNode()->PyRunMethod() ;
1241 PyObject * PyMoreMethod = NULL ;
1242 PyObject * PyNextMethod = NULL ;
1243 if ( PyRunMethod ) {
1246 PyRunMethod = InitPyDynInvoke( InLineNode()->PyFuncName() ,
1247 InLineNode()->PythonFunction() ,
1249 InLineNode()->PyRunMethod( PyRunMethod ) ;
1252 PyMoreMethod = LoopNode()->PyMoreMethod() ;
1253 if ( PyMoreMethod ) {
1256 PyMoreMethod = InitPyDynInvoke( LoopNode()->PyMoreName() ,
1257 LoopNode()->MorePythonFunction() ,
1259 LoopNode()->PyMoreMethod( PyMoreMethod ) ;
1263 PyNextMethod = LoopNode()->PyNextMethod() ;
1264 if ( PyNextMethod ) {
1267 PyNextMethod = InitPyDynInvoke( LoopNode()->PyNextName() ,
1268 LoopNode()->NextPythonFunction() ,
1270 LoopNode()->PyNextMethod( PyNextMethod ) ;
1273 cdebug << "GraphExecutor::InNode::InitPythonFunctions " << Name() << " PyRunMethod(Init) " << PyRunMethod
1274 << " PyMoreMethod " << PyMoreMethod << " PyNextMethod " << PyNextMethod << endl;
1276 else if ( IsInLineNode() || IsSwitchNode() ) {
1277 PyObject * PyRunMethod = InLineNode()->PyRunMethod() ;
1278 if ( PyRunMethod ) {
1281 PyRunMethod = InitPyDynInvoke( InLineNode()->PyFuncName() ,
1282 InLineNode()->PythonFunction() ,
1284 InLineNode()->PyRunMethod( PyRunMethod ) ;
1286 cdebug << "GraphExecutor::InNode::InitPythonFunctions " << Name() << " PyRunMethod " << PyRunMethod << endl;
1288 else if ( ( IsEndLoopNode() || IsEndSwitchNode() || IsGOTONode() ) &&
1289 (*InLineNode()->PythonFunction()).length() ) {
1290 PyObject * PyRunMethod = InLineNode()->PyRunMethod() ;
1291 if ( PyRunMethod ) {
1294 PyRunMethod = InitPyDynInvoke( InLineNode()->PyFuncName() ,
1295 InLineNode()->PythonFunction() ,
1297 InLineNode()->PyRunMethod( PyRunMethod ) ;
1299 cdebug << "GraphExecutor::InNode::InitPythonFunctions " << Name() << " PyRunMethod " << PyRunMethod << endl;
1302 Err = WithErr && Err ;
1303 cdebug_out << "GraphExecutor::InNode::InitPythonFunctions " << Name() ;
1305 cdebug << " Error " << Err ;
1311 const long GraphExecutor::InNode::CpuUsed( bool tot ) {
1312 CORBA::Long cpu = 0 ;
1313 // cdebug_in << "GraphExecutor::InNode::CpuUsed( " << tot << " )" << Name() << endl ;
1314 if ( IsOneOfInLineNodes() ) {
1315 // cdebug << "CpuUsed " << Name() << " --> PyCpuUsed()" << endl ;
1316 // cout << "CpuUsed " << Name() << " --> PyCpuUsed()" << endl ;
1317 cpu = PyCpuUsed( tot ) ;
1320 if ( !CORBA::is_nil( Component() ) ) {
1321 // cdebug << "CpuUsed " << Name() << " --> Component()->CpuUsed_impl()" << endl ;
1322 // cout << "CpuUsed " << Name() << " --> Component()->CpuUsed_impl()" << endl ;
1324 cpu = Component()->CpuUsed_impl() ;
1327 cdebug << "CpuUsed " << Name() << " --> Component()->CpuUsed_impl() ERROR catched " << endl ;
1328 State( GraphExecutor::ErroredState ) ;
1329 _OutNode->State( GraphExecutor::ErroredState ) ;
1334 // cdebug_out << "GraphExecutor::InNode::CpuUsed " << Name() << " CpuUsed : " << cpu << endl ;
1335 // cout << "CpuUsed " << Name() << " CpuUsed : " << cpu << endl ;
1339 #include <sys/time.h>
1340 #include <sys/resource.h>
1343 long GraphExecutor::InNode::PyCpu() {
1344 struct rusage usage ;
1346 if ( getrusage( RUSAGE_SELF , &usage ) == -1 ) {
1347 perror("GraphExecutor::InNode::PyCpu") ;
1350 // return usage.ru_utime.__time_t tv_sec ;
1351 // cdebug << pthread_self() << "PyCpu " << Name() << " " << usage.ru_utime.tv_sec << " "
1352 // << usage.ru_utime.tv_usec << " " << usage.ru_stime.tv_sec << " " << usage.ru_stime.tv_usec
1354 cpu = usage.ru_utime.tv_sec ;
1358 long GraphExecutor::InNode::PyCpuUsed( bool tot ) {
1360 if ( _PyTotCpuUsed == -1 ) {
1361 if ( _Pythread == pthread_self() ) {
1362 // cdebug << pthread_self() << "GraphExecutor::InNode::PyCpuUsed(" << tot << ") " << Name()
1363 // << " _PyTotCpuUsed " << _PyTotCpuUsed << " PyCpu() " << PyCpu() << " - " << " _PyCpuUsed "
1364 // << _PyCpuUsed << endl ;
1365 cpu = PyCpu() - _PyCpuUsed ;
1367 _PyTotCpuUsed = cpu ;
1375 cpu = _PyTotCpuUsed ;
1377 // cdebug << pthread_self() << "GraphExecutor::InNode::PyCpuUsed(" << tot << ") " << Name() << "_PyTotCpuUsed"
1378 // << _PyTotCpuUsed << " CpuUsed : " << cpu << endl ;
1382 void GraphExecutor::InNode::SetPyCpuUsed() {
1383 _PyTotCpuUsed = -1 ;
1385 _Pythread = pthread_self() ;
1386 _PyCpuUsed = PyCpu() ;
1387 // cdebug << pthread_self() << "GraphExecutor::InNode::SetPyCpuUsed " << Name() << " _PyCpuUsed : "
1388 // << _PyCpuUsed << endl ;