PyInitialized( true ) ;
}
anInNode->InitialState( this ) ;
+ if ( anInNode->IsOneOfInLineNodes() ) {
+ anInNode->InitPythonFunctions( false ) ;
+ }
+ }
+// One more time because inline nodes may share one definition of the same function
+ for ( i = 0 ; i < GraphNodesSize() ; i++ ) {
+ GraphExecutor::InNode * anInNode = (GraphExecutor::InNode *) GraphNodes( i )->GetInNode() ;
+ if ( anInNode->IsOneOfInLineNodes() ) {
+ if ( anInNode->InitPythonFunctions( true ) ) {
+ }
+ }
}
cdebug << "Execution starting GraphExecutor::Action_DataOk_RunService Node "
if ( anInNode->State() != SUPERV::DataReadyState ) {
cdebug << "GraphExecutor::OutNode::Run inconsistency State of Node "
<< anInNode->Name() << " : " << anInNode->State() << endl ;
+ cdebug_out << "GraphExecutor::OutNode::Run State ERROR" << endl ;
return false ;
}
// PushEvent( anInNode , GraphExecutor::ReadyEvent ,
if ( !anInNode->SendEvent( GraphExecutor::SomeDataReadyEvent ) ) {
cdebug << "InNode::SendEvent( SomeDataReadyEvent ) ERROR Node "
<< anInNode->Name() << endl ;
+ cdebug_out << "GraphExecutor::OutNode::Run SendEvent ERROR" << endl ;
return false ;
}
anInNode->SuspendedWait() ;
else if ( !anInNode->SendEvent( GraphExecutor::ExecuteEvent ) ) {
cdebug << "InNode::SendEvent( ExecuteEvent ) ERROR Node "
<< anInNode->Name() << endl ;
+ cdebug_out << "GraphExecutor::OutNode::Run SendEvent ERROR" << endl ;
return false ;
}
else {
SUPERV::AutomatonState OutNodeState = SUPERV::SuccessedState ;
SUPERV::AutomatonState InNodeState ;
bool AllDone = true ;
- if ( !_Done ) {
+ if ( !Done() ) {
for ( j = 0 ; j < QueueNodesSize() ; j++ ) {
InNodeState = ( (GraphExecutor::InNode * ) QueueNodes( j )->GetInNode() )->State() ;
cdebug << j << ". "
InNodeState != SUPERV::DataWaitingState ) {
AllDone = false ;
}
- if ( InNodeState != SUPERV::SuccessedState ) {
+ if ( InNodeState != SUPERV::SuccessedState &&
+ InNodeState != SUPERV::DataWaitingState &&
+ InNodeState != SUPERV::DataReadyState ) {
OutNodeState = InNodeState ;
}
}
}
}
if ( AllDone ) {
- State( OutNodeState ) ;
- _Done = true ;
- _JustStarted = false ;
+ int alivenodes = 0 ;
for ( j = 0 ; j < GraphNodesSize() ; j++ ) {
GraphExecutor::InNode * aNode ;
aNode = (GraphExecutor::InNode * ) GraphNodes( j )->GetInNode() ;
SUPERV::GraphState aState = AutomatonGraphState( aNode->State() ) ;
cdebug << "GraphExecutor::OutNode::CheckAllDone " << aNode->Name() << " "
- << theAutomaton->StateName( aNode->State() ) << " :" << endl ;
+ << theAutomaton->StateName( aNode->State() ) << " CreateNewThread " << aNode->CreateNewThread()
+ << endl ;
if ( aState == SUPERV::ErrorState ||
aState == SUPERV::SuspendErroredState ||
aState == SUPERV::KillState ||
aState == SUPERV::StopState ) {
OutNodeState = aNode->State() ;
+ State( OutNodeState ) ;
+ }
+ else if ( aState == SUPERV::ReadyState ||
+ aState == SUPERV::SuspendReadyState ||
+ aState == SUPERV::RunningState ||
+ aState == SUPERV::SuspendDoneState ||
+ aState == SUPERV::SuspendErroredState ||
+ aState == SUPERV::ReRunState ||
+ aState == SUPERV::ReStartState ||
+ aState == SUPERV::SuspendState ) {
+ alivenodes += 1 ;
}
aNode->SuspendedAction() ;
aNode->DoneAction() ;
}
// PushEvent AFTER State and _Done ! ...
+ if ( alivenodes == 0 ) {
+ State( OutNodeState ) ;
+ Done( true ) ;
+ _JustStarted = false ;
+ }
PushEvent( NULL , GraphExecutor::EndExecuteEvent ,
OutNodeState ) ;
// Py_Finalize() ;
// PyInitialized( false ) ;
}
}
- cdebug_out << "GraphExecutor::OutNode::CheckAllDone " << _Done
- << " GraphAutomatonState " << theAutomaton->StateName( AutomatonState() )
- << " State " << State() << " Threads " << _Threads << " SuspendedThreads "
- << _SuspendedThreads << endl ;
- if ( _Done ) {
+ if ( IsDone() ) {
MESSAGE("================================================================================") ;
- MESSAGE( Name() << " IS DONE" ) ;
+ MESSAGE( Name() << " IS DONE : " << theAutomaton->StateName( AutomatonState() ) << " EventQSize "
+ << EventQSize() ) ;
MESSAGE("================================================================================") ;
- }
+ cdebug << "================================================================================" << endl ;
+ cdebug << Name() << " IS DONE : " << theAutomaton->StateName( AutomatonState() ) << " EventQSize "
+ << EventQSize() << endl ;
+ cdebug << "================================================================================" << endl ;
+ //cout << Name() << " IS DONE : " << theAutomaton->StateName( AutomatonState() ) << " EventQSize "
+ // << EventQSize() << endl ;
+ }
+ cdebug_out << "GraphExecutor::OutNode::CheckAllDone " << IsDone()
+ << " GraphAutomatonState " << theAutomaton->StateName( AutomatonState() )
+ << " State " << State() << " Threads " << _Threads << " SuspendedThreads "
+ << _SuspendedThreads << " EventQSize " << EventQSize() << endl ;
}
void GraphExecutor::OutNode::PThreadLock( pthread_mutex_t * aMutex , char * errmsg ) {
perror( "GraphExecutor::OutNode::PyThreadLock" ) ;
exit( 0 ) ;
}
+ theAutomaton->PyLock() ;
// cout << " GraphExecutor::OutNode::PyThreadLocked " << pthread_self() << endl ;
}
perror( "GraphExecutor::OutNode::PyThreadUnLock" ) ;
exit( 0 ) ;
}
+ theAutomaton->PyUnLock() ;
// cout << " GraphExecutor::OutNode::PyThreadUnLocked " << pthread_self() << endl ;
}
exit( 0 ) ;
}
}
+
void GraphExecutor::OutNode::ExitThread() {
if ( pthread_mutex_lock( &_MutexWait ) ) {
perror("pthread_mutex_lock _ExitThread") ;
}
if ( _Threads == 0 && _SuspendedThreads == 0 ) {
CheckAllDone() ;
+ if ( IsDone() ) {
+ theAutomaton->Executed() ;
+ }
}
}
void GraphExecutor::OutNode::JoinedWait() {
exit( 0 ) ;
}
}
+
void GraphExecutor::OutNode::SuspendThread() {
if ( pthread_mutex_lock( &_MutexWait ) ) {
perror("pthread_mutex_lock _SuspendThread") ;
return aGraphState ;
}
+int GraphExecutor::OutNode::GetListSize() {
+ return _EventNodes.size();
+}
+
bool GraphExecutor::OutNode::PushEvent( GraphExecutor::InNode * aNode ,
GraphExecutor::NodeEvent anEvent ,
SUPERV::AutomatonState aState ) {
}
// cdebug_out << "PushEvent Threads " << Threads() << " SuspendedThreads "
// << SuspendedThreads() << endl ;
+#if 0
if ( _EventNodes.size() > 101 ) {
while ( _EventNodes.size() > 31 ) {
_EventNodes.pop_front() ;
_States.pop_front() ;
}
}
+#endif
if ( pthread_mutex_unlock( &_MutexWait ) ) {
perror("PushEvent pthread_mutex_unlock ") ;
exit( 0 ) ;
bool WithWait ) {
int ThreadsNumber ;
int SuspendedThreadsNumber ;
+ if ( _EventNodes.size() > 0 ) {
+ cdebug_in << "GraphExecutor::OutNode::Event " << _EventNodes.size() << " in queue" << endl ;
+ }
if ( pthread_mutex_lock( &_MutexWait ) ) {
perror("EventLoop pthread_mutex_lock ") ;
exit( 0 ) ;
SUPERV::AutomatonState theState = SUPERV::UnKnownState ;
anEvent = SUPERV::UndefinedEvent ;
aState = SUPERV::UndefinedState ;
- if ( ( IsDone() || IsKilled() || IsStopped() ) && _EventNodes.size() == 0 ) {
-// cdebug << "EventLoop IsDone()/IsKilled()/IsStopped() && _EventNodes.size() == 0" << endl ;
+ if ( ( Done() || IsKilled() || IsStopped() ) && _EventNodes.size() == 0 ) {
+// cdebug << "EventLoop Done()/IsKilled()/IsStopped() && _EventNodes.size() == 0" << endl ;
RetVal = false ;
}
else if ( !WithWait && _EventNodes.size() == 0 ) {
perror("EventLoop pthread_mutex_lock ") ;
exit( 0 ) ;
}
+ if ( _EventNodes.size() > 0 ) {
+ cdebug_out << "GraphExecutor::OutNode::Event " << _EventNodes.size() << " in queue" << endl ;
+ }
return RetVal ;
}
}
return RetVal ;
}
+long GraphExecutor::OutNode::EventQSize() {
+ return _EventNodes.size() ;
+}
+
+void GraphExecutor::OutNode::EventList() {
+ if ( pthread_mutex_lock( &_MutexWait ) ) {
+ perror("EventList pthread_mutex_lock ") ;
+ exit( 0 ) ;
+ }
+ list< char * >::iterator itEventNodes = _EventNodes.begin() ;
+ list< GraphExecutor::NodeEvent >::iterator itEvents = _Events.begin() ;
+ list< SUPERV::AutomatonState >::iterator itStates = _States.begin() ;
+ while ( itEventNodes != _EventNodes.end() ) {
+ cdebug << pthread_self() << "EVENTSTACK "
+ << *itEventNodes << " " << *itEvents << " "
+ << theAutomaton->StateName( *itStates )
+ << " Threads " << Threads() << " SuspendedThreads " << SuspendedThreads() << endl ;
+ itEventNodes++ ;
+ itEvents++ ;
+ itStates++ ;
+ }
+ if ( pthread_mutex_unlock( &_MutexWait ) ) {
+ perror("EventList pthread_mutex_lock ") ;
+ exit( 0 ) ;
+ }
+}
void GraphExecutor::OutNode::State(SUPERV::AutomatonState aState ) {
+// cdebug << "GraphExecutor::OutNode::State " << Name() << " "
+// << theAutomaton->StateName( AutomatonGraphState( _State ) ) << " ---> "
+// << theAutomaton->StateName( AutomatonGraphState( aState ) ) << endl ;
_State = aState ;
}
return AutomatonGraphState( aret ) ;
}
-SUPERV::GraphState GraphExecutor::OutNode::State(
- const char * NodeName ,
- const char * ServiceParameterName ) {
+SUPERV::GraphState GraphExecutor::OutNode::State( const char * NodeName ,
+ const char * ServiceParameterName ) {
// cdebug_in << "GraphExecutor::OutNode::State " << NodeName << " "
// << ServiceParameterName<< endl;
- SUPERV::GraphState aret =
- PortState( NodeName , ServiceParameterName ) ;
+ SUPERV::GraphState aret = PortState( NodeName , ServiceParameterName ) ;
// cdebug_out << "GraphExecutor::OutNode::State" << endl ;
return aret ;
}
bool GraphExecutor::OutNode::IsWaiting() {
// cdebug_in << "GraphExecutor::OutNode::IsWaiting" << endl;
// cdebug_out << "GraphExecutor::OutNode::IsWaiting" << endl ;
- return !_Done ;
+ return !IsDone() ;
}
bool GraphExecutor::OutNode::IsReady() {
// cdebug_in << "GraphExecutor::OutNode::IsReady" << endl;
// cdebug_out << "GraphExecutor::OutNode::IsReady" << endl ;
- return !_Done ;
+ return !IsDone() ;
}
bool GraphExecutor::OutNode::IsRunning() {
// cdebug_in << "GraphExecutor::OutNode::IsRunning" << endl;
// cdebug_out << "GraphExecutor::OutNode::IsRunning" << endl ;
- return !_Done ;
+ return !IsDone() ;
}
bool GraphExecutor::OutNode::IsDone() {
// cdebug_in << "GraphExecutor::OutNode::IsDone" << endl;
// cdebug_out << "GraphExecutor::OutNode::IsDone" << endl ;
- return ( _Done || IsKilled() || IsStopped() ) ;
+ return ( Done() || IsKilled() || IsStopped() ) ;
}
bool GraphExecutor::OutNode::IsSuspended() {
}
bool GraphExecutor::OutNode::Suspend() {
- bool RetVal = false ;
+ int RetVal = 0 ;
cdebug_in << "GraphExecutor::OutNode::Suspend" << endl;
- _ControlState = SUPERV::ToSuspendState ;
+// _ControlState = SUPERV::ToSuspendState ;
int i ;
for ( i = 0 ; i < GraphNodesSize() ; i++ ) {
GraphExecutor::InNode * aNode = (GraphExecutor::InNode * ) GraphNodes( i )->GetInNode() ;
bool sts = aNode->Suspend() ;
if ( sts && aNode->IsSuspended() ) {
- cdebug << aNode->Name() << " Suspend" << endl ;
+ RetVal += 1 ;
+ cdebug << aNode->Name() << " Suspended" << endl ;
}
else if ( aNode->IsWaiting() || aNode->IsDone() ) {
cdebug << aNode->Name() << " not Suspended : "
<< theAutomaton->StateName( aNode->State() ) << endl ;
}
else {
+ RetVal += 1 ;
cdebug << aNode->Name() << " cannot be Suspended : "
<< theAutomaton->StateName( aNode->State() ) << endl ;
- RetVal = false ;
}
}
- State( SUPERV::SuspendedState ) ;
- cdebug_out << "GraphExecutor::OutNode::Suspend" << endl ;
if ( RetVal ) {
+ State( SUPERV::SuspendedState ) ;
MESSAGE("================================================================================") ;
MESSAGE( Name() << " IS SUSPENDED" ) ;
MESSAGE("================================================================================") ;
}
+ else {
+ MESSAGE("================================================================================") ;
+ MESSAGE( Name() << " IS NOT SUSPENDED" ) ;
+ MESSAGE("================================================================================") ;
+ }
+ cdebug_out << "GraphExecutor::OutNode::Suspend" << theAutomaton->StateName( State() ) << endl ;
return RetVal ;
}
+
bool GraphExecutor::OutNode::Resume() {
- bool RetVal = false ;
+ int RetVal = 0 ;
cdebug_in << "GraphExecutor::OutNode::Resume" << endl;
- cdebug_out << "GraphExecutor::OutNode::Resume" << endl ;
+ if ( IsSuspended() ) {
+ State( SUPERV::ExecutingState ) ;
+ int i ;
+ for ( i = 0 ; i < GraphNodesSize() ; i++ ) {
+ GraphExecutor::InNode * aNode = (GraphExecutor::InNode * ) GraphNodes( i )->GetInNode() ;
+ aNode->ControlState( SUPERV::VoidState ) ;
+ if ( aNode->IsSuspended() ) {
+ cdebug << aNode->Name() << "->Resume " << theAutomaton->StateName( aNode->State() )
+ << endl ;
+ if ( aNode->Resume() ) {
+ cdebug << aNode->Name() << " Resumed " << theAutomaton->StateName( aNode->State() )
+ << endl ;
+ RetVal += 1 ;
+ }
+ else {
+ cdebug << aNode->Name() << " Resume failed"
+ << theAutomaton->StateName( aNode->State() ) << endl ;
+ }
+ }
+ else {
+ cdebug << aNode->Name() << " " << theAutomaton->StateName( aNode->State() )
+ << endl ;
+ }
+ }
+ }
+ else {
+ cdebug << Name() << " not suspended " << theAutomaton->StateName( State() ) << endl ;
+ }
if ( RetVal ) {
MESSAGE("================================================================================") ;
MESSAGE( Name() << " IS RESUMED" ) ;
MESSAGE("================================================================================") ;
}
+ else {
+ MESSAGE("================================================================================") ;
+ MESSAGE( Name() << " IS NOT RESUMED" ) ;
+ MESSAGE("================================================================================") ;
+ }
+ cdebug_out << "GraphExecutor::OutNode::Resume" << theAutomaton->StateName( State() ) << " " << RetVal << endl ;
return RetVal ;
}
for ( i = 0 ; i < GraphNodesSize() ; i++ ) {
GraphExecutor::InNode * aNode = (GraphExecutor::InNode * ) GraphNodes( i )->GetInNode() ;
if ( !aNode->IsKilled() && !aNode->IsWaiting() && !aNode->IsDone() ) {
+ cdebug << aNode->Name() << " not killed : "
+ << theAutomaton->StateName( aNode->State() ) << " " << aNode->Name() << "->"
+ << "KilledAction()" << endl ;
aNode->KilledAction() ;
}
}
RetVal = true ;
}
State( SUPERV::KilledState ) ;
- cdebug_out << "GraphExecutor::OutNode::Kill" << endl ;
if ( RetVal ) {
MESSAGE("================================================================================") ;
MESSAGE( Name() << " IS KILLED" ) ;
MESSAGE("================================================================================") ;
}
+ else {
+ MESSAGE("================================================================================") ;
+ MESSAGE( Name() << " IS NOT KILLED" ) ;
+ MESSAGE("================================================================================") ;
+ }
+ cdebug_out << "GraphExecutor::OutNode::Kill" << endl ;
return RetVal ;
}
bool GraphExecutor::OutNode::Stop() {
bool RetVal = false ;
cdebug_in << "GraphExecutor::OutNode::Stop" << endl;
- Kill() ;
+ RetVal = Kill() ;
cdebug_out << "GraphExecutor::OutNode::Stop" << endl ;
if ( RetVal ) {
MESSAGE("================================================================================") ;
exit( 0 ) ;
}
aret = IsSuspended() ;
- while ( !aret && !_Done ) {
+ while ( !aret && !IsDone() ) {
pthread_cond_wait( &_EventWait , &_MutexWait );
aret = IsSuspended() ;
}