From 014b91845d89bd7344cccfa6dd3b28e7b76f4dcc Mon Sep 17 00:00:00 2001 From: rahuel Date: Fri, 10 Jun 2005 07:56:09 +0000 Subject: [PATCH] PAL8624 --- .../DataFlowExecutor_DataFlow.cxx | 101 ++++++++++++------ src/GraphExecutor/DataFlowExecutor_InNode.cxx | 95 +++++++++++----- src/GraphExecutor/DataFlowExecutor_InNode.hxx | 36 +++++-- 3 files changed, 165 insertions(+), 67 deletions(-) diff --git a/src/GraphExecutor/DataFlowExecutor_DataFlow.cxx b/src/GraphExecutor/DataFlowExecutor_DataFlow.cxx index 801e9f1..2eb3d0d 100644 --- a/src/GraphExecutor/DataFlowExecutor_DataFlow.cxx +++ b/src/GraphExecutor/DataFlowExecutor_DataFlow.cxx @@ -118,17 +118,19 @@ bool GraphExecutor::DataFlow::ContainerKill( const char *aNodeName ) { bool GraphExecutor::DataFlow::InputOfAny( const char * ToServiceParameterName , const CORBA::Any & aValue , bool SomeDataReady ) { - cdebug_in <<"GraphExecutor::DataFlow::InputOfAny" << endl ; + cdebug_in <<"GraphExecutor::DataFlow::InputOfAny( " << ToServiceParameterName + << " SomeDataReady " << SomeDataReady << " )" << endl ; bool RetVal = false ; - cdebug << pthread_self() << "GraphExecutor::DataFlow::::InputOfAny " << Graph()->Name() << "(" - << ToServiceParameterName << ")" << endl ; + cdebug << pthread_self() << "GraphExecutor::DataFlow::InputOfAny " << Name() << "( " + << ToServiceParameterName << " , aValue , SomeDataReady " << SomeDataReady << " ) " + << endl ; if ( Graph()->GraphMacroLevel() ) { GraphBase::OutPort * anOutPort ; anOutPort = Graph()->GetChangeInDataNodePort( ToServiceParameterName ) ; - cdebug << pthread_self() << "GraphExecutor::DataFlow::InputOfAny " << Graph()->Name() + cdebug << pthread_self() << "GraphExecutor::DataFlow::InputOfAny " << Name() << " " << State() << " " << ToServiceParameterName << " " << anOutPort->PortState() << " Done : " << anOutPort->PortDone() << endl ; - RetVal = AddInputData( Graph()->Name() , ToServiceParameterName , aValue ) ; + RetVal = AddInputData( Name() , ToServiceParameterName , aValue ) ; anOutPort->PortState( SUPERV::ReadyState ) ; // There is only one port : anOutPort->ChangeInPorts( 0 )->PortState( SUPERV::ReadyState ) ; @@ -136,30 +138,68 @@ bool GraphExecutor::DataFlow::InputOfAny( const char * ToServiceParameterName , // There is only one inport of a Node in an ReversedOutport of a graph : GraphExecutor::InNode * anInNode = (GraphExecutor::InNode * ) Graph()->GetChangeGraphNode( anOutPort->InPorts( 0 )->NodeName() )->GetInNode() ; cdebug << pthread_self() << "GraphExecutor::DataFlow::InputOfAny : " << anInNode->Name() - << "->SendSomeDataReady( " << Graph()->Name() << " ) for Port " << anOutPort->InPorts( 0 )->PortName() - << endl ; + << "->SendSomeDataReady( " << Name() << " ) for Port " + << anOutPort->InPorts( 0 )->PortName() << " SomeDataReady " << SomeDataReady << endl ; if ( SomeDataReady ) { - MESSAGE( "GraphExecutor::InputOfAny " << Graph()->Name() << " SendSomeDataReady --> " << anInNode->Name() + MESSAGE( "GraphExecutor::InputOfAny " << Name() << " SendSomeDataReady --> " << anInNode->Name() << " " << anInNode->State() << " " << anOutPort->InPorts( 0 )->PortName() ) ; - int sts = anInNode->SendSomeDataReady( Graph()->Name() ) ; + int sts = anInNode->SendSomeDataReady( Name() ) ; cdebug << "GraphExecutor::DataFlow::InputOfAny " << anInNode->Name() - << "->SendSomeDataReady( " << Graph()->Name() << " ) sts " << sts << endl ; - if ( sts && anInNode->IsReady() ) { - cdebug << pthread_self() << "/" << anInNode->ThreadNo() << "GraphExecutor::DataFlow::InputOfAny : " + << "->SendSomeDataReady( " << Name() << " ) sts " << sts << " State " + << anInNode->State() << " IsReady " << anInNode->IsReady() + << " SomeDataReady " << SomeDataReady << endl ; +//JR 15.04.2005 Debug PAL8624 RetroConception : + if ( sts && anInNode->HasAllDataReady() ) { + cdebug << pthread_self() << "/" << anInNode->ThreadNo() + << "GraphExecutor::DataFlow::InputOfAny : " << anInNode->Name() << "->SendEvent( GraphExecutor::ExecuteEvent ) " - << " " << anInNode->Name() << "->IsReady() " << anInNode->IsReady() << " State " - << anInNode->State() << endl ; - if ( anInNode->IsLockedDataWait() ) { - cdebug << pthread_self() << "/" << anInNode->ThreadNo() << "GraphExecutor::DataFlow::InputOfAny : " - << anInNode->Name() << " IsLockedDataWait() ==> UnLockDataWait" << endl ; - anInNode->UnLockDataWait() ; - } + << " " << anInNode->Name() << "->HasAllDataReady() " << anInNode->HasAllDataReady() + << " State " << anInNode->State() << endl ; anInNode->ThreadNo( 0 ) ; anInNode->CreateNewThread( true ) ; - anInNode->SendEvent( GraphExecutor::ExecuteEvent ) ; -// State( GraphExecutor::ExecutingState ) ; + if ( !anInNode->SendEvent( GraphExecutor::AllDataReadyEvent ) ) { // ==> Ready to execute +//JR 06.05.2005 Debug PAL8624 RetroConception : +#if 0 + if ( res && anInNode->IsReady() ) { + cdebug << pthread_self() << "/" << anInNode->ThreadNo() << "GraphExecutor::DataFlow::InputOfAny : " + << anInNode->Name() << "->SendEvent( GraphExecutor::ExecuteEvent ) " + << " " << anInNode->Name() << "->IsReady() " << anInNode->IsReady() << " State " + << anInNode->State() << endl ; +//JR 15.04.2005 Debug PAL8624 RetroConception : +// if ( anInNode->IsLockedDataWait() ) { +// cdebug << pthread_self() << "/" << anInNode->ThreadNo() << "GraphExecutor::DataFlow::InputOfAny : " +// << anInNode->Name() << " IsLockedDataWait() ==> UnLockDataWait" << endl ; +// anInNode->UnLockDataWait() ; +// } + anInNode->ThreadNo( 0 ) ; + anInNode->CreateNewThread( true ) ; + anInNode->SendEvent( GraphExecutor::ExecuteEvent ) ; +// State( GraphExecutor::ExecutingState ) ; + } + else { + RetVal = false ; +#endif + cdebug << pthread_self() << "/" << anInNode->ThreadNo() + << "GraphExecutor::DataFlow::InputOfAny : NotAllDataReady ERROR : " + << anInNode->Name() << "->SendEvent( GraphExecutor::ExecuteEvent ) " + << " " << anInNode->Name() << "->IsReady() " << anInNode->IsReady() << " State " + << anInNode->State() << endl ; + } + } + else { + cdebug << pthread_self() << "/" << anInNode->ThreadNo() + << "GraphExecutor::DataFlow::InputOfAny : NotAllDataReady : " + << anInNode->Name() << "->SendEvent( GraphExecutor::ExecuteEvent ) " + << " " << anInNode->Name() << "->IsReady() " << anInNode->IsReady() << " State " + << anInNode->State() << endl ; } } + else { + cdebug << "GraphExecutor::DataFlow::InputOfAny " << anInNode->Name() + << "->SendSomeDataReady( " << Name() << " ) State " + << anInNode->State() << " IsReady " << anInNode->IsReady() + << " SomeDataReady " << SomeDataReady << endl ; + } } else { cdebug << pthread_self() << "GraphExecutor::DataFlow::InputOfAny GraphMacroLevel " @@ -176,7 +216,7 @@ bool GraphExecutor::DataFlow::OutputOfAny( const char * aNodeName , const CORBA::Any & aValue ) { cdebug_in << pthread_self() << "/" << ThreadNo() << "GraphExecutor::DataFlow::OutputOfAny( " << aNodeName << " , " << ToServiceParameterName - << " , aValue ) from " << Graph()->Name() << endl ; + << " , aValue ) from " << Name() << endl ; bool RetVal = false ; GraphBase::Graph * aMacroNode = (GraphBase::Graph * ) Graph()->GetChangeGraphNode( aNodeName ) ; // GraphExecutor::InNode * anInNode = (GraphExecutor::InNode * ) aMacroGraph->GetInNode() ; @@ -188,25 +228,26 @@ bool GraphExecutor::DataFlow::OutputOfAny( const char * aNodeName , const char * ToNodeName = anOutPort->ChangeInPorts( i )->NodeName() ; const char * ToParameterName = anOutPort->ChangeInPorts( i )->PortName() ; GraphBase::ComputingNode * aComputingNode = Graph()->GetChangeGraphNode( ToNodeName ) ; - if ( strcmp( ToNodeName , Graph()->Name() ) ) { + if ( strcmp( ToNodeName , Name() ) ) { GraphExecutor::InNode * aLinkedNode = (GraphExecutor::InNode * ) aComputingNode->GetInNode() ; cdebug << pthread_self() << "/" << aLinkedNode->ThreadNo() << "GraphExecutor::DataFlow::OutputOfAny to Node " << ToNodeName << "(" << ToParameterName << ") from MacroNode " << aNodeName << endl ; int sts ; - if ( aLinkedNode->IsLockedDataWait() ) { - cdebug << "GraphExecutor::DataFlow::OutputOfAny " << aLinkedNode->Name() - << " IsLockedDataWait --> UnLockDataWait" << endl ; - } +//JR 15.04.2005 Debug PAL8624 RetroConception : +// if ( aLinkedNode->IsLockedDataWait() ) { +// cdebug << "GraphExecutor::DataFlow::OutputOfAny " << aLinkedNode->Name() +// << " IsLockedDataWait --> UnLockDataWait" << endl ; +// } sts = aLinkedNode->SendSomeDataReady( (char * ) aNodeName ) ; cdebug << pthread_self() << "/" << aLinkedNode->ThreadNo() << "GraphExecutor::DataFlow::OutputOfAny " << aLinkedNode->Name() << "->SendSomeDataReady( " << aNodeName << " ) sts " << sts << " " << aLinkedNode->State() << endl ; if ( sts ) { if ( aLinkedNode->State() == GraphExecutor::DataReadyState ) { - aLinkedNode->CreateNewThreadIf( true ) ; - aLinkedNode->UnLockDataWait() ; -// aLinkedNode->DataUndef_AllDataReadyAction() ; +//JR 15.04.2005 Debug PAL8624 RetroConception : +// aLinkedNode->CreateNewThreadIf( true ) ; +// aLinkedNode->UnLockDataWait() ; aLinkedNode->SendEvent( GraphExecutor::ExecuteEvent ) ; } } diff --git a/src/GraphExecutor/DataFlowExecutor_InNode.cxx b/src/GraphExecutor/DataFlowExecutor_InNode.cxx index c504a20..70aa2eb 100644 --- a/src/GraphExecutor/DataFlowExecutor_InNode.cxx +++ b/src/GraphExecutor/DataFlowExecutor_InNode.cxx @@ -57,8 +57,9 @@ static void InitInNode( int &_RewindStack , GraphExecutor::InNode ** _aReStartNode , bool & _PyFuncRunned , PyObject ** _MyPyRunMethod , - pthread_mutex_t &_MutexDataWait , - bool &_DataWait , +// pthread_mutex_t &_MutexDataWait , +// bool &_DataWait , + pthread_mutex_t &_MutexDataReady , pthread_mutex_t &_MutexWait , pthread_cond_t &_ReadyWait , pthread_cond_t &_RunningWait , @@ -84,8 +85,9 @@ static void InitInNode( int &_RewindStack , *_aReStartNode = NULL ; _PyFuncRunned = false ; *_MyPyRunMethod = NULL ; - pthread_mutex_init( &_MutexDataWait , NULL ) ; - _DataWait = false ; +// pthread_mutex_init( &_MutexDataWait , NULL ) ; +// _DataWait = false ; + pthread_mutex_init( &_MutexDataReady , NULL ) ; pthread_mutex_init( &_MutexWait , NULL ) ; if ( pthread_cond_init( &_ReadyWait , NULL ) ) { perror("pthread_cond_init( &_ReadyWait , NULL )") ; @@ -143,8 +145,9 @@ GraphExecutor::InNode::InNode() { &_aReStartNode , _PyFuncRunned , &_MyPyRunMethod , - _MutexDataWait , - _DataWait , +// _MutexDataWait , +// _DataWait , + _MutexDataReady , _MutexWait , _ReadyWait , _RunningWait , @@ -201,8 +204,9 @@ GraphExecutor::InNode::InNode( CORBA::ORB_ptr ORB, &_aReStartNode , _PyFuncRunned , &_MyPyRunMethod , - _MutexDataWait , - _DataWait , +// _MutexDataWait , +// _DataWait , + _MutexDataReady , _MutexWait , _ReadyWait , _RunningWait , @@ -385,6 +389,8 @@ GraphExecutor::InNode::InNode( CORBA::ORB_ptr ORB, GraphExecutor::InNode::~InNode() { } +//JR 15.04.2005 Debug PAL8624 RetroConception : +#if 0 void GraphExecutor::InNode::LockDataWait() { // cdebug_in << "GraphExecutor::InNode::LockDataWait " << endl ; if ( pthread_mutex_lock( &_MutexDataWait ) ) { @@ -403,6 +409,33 @@ void GraphExecutor::InNode::UnLockDataWait() { } // cdebug_out << "GraphExecutor::InNode::UnLockDataWait " << endl ; } +#endif + +//JR 15.04.2005 Debug PAL8624 RetroConception : +void GraphExecutor::InNode::LockDataReady() { +// cdebug_in << pthread_self() << "/" << ThreadNo() +// << "GraphExecutor::InNode::LockDataReady : " << Name() << " _MutexDataReadyLocked " +// << _MutexDataReadyLocked << " HasAllDataReady " << HasAllDataReady() << endl ; + if ( pthread_mutex_lock( &_MutexDataReady ) ) { + perror("MutexDataReady pthread_mutex_lock ") ; + exit( 0 ) ; + } + _MutexDataReadyLocked = true ; +// cdebug_out << pthread_self() << "/" << ThreadNo() +// << "GraphExecutor::InNode::LockDataReady : " << Name() << endl ; +} +void GraphExecutor::InNode::UnLockDataReady() { +// cdebug_in << pthread_self() << "/" << ThreadNo() +// << "GraphExecutor::InNode::UnLockDataReady : " << Name() << " _MutexDataReadyLocked " +// << _MutexDataReadyLocked << " HasAllDataReady " << HasAllDataReady() << endl ; + if ( pthread_mutex_unlock( &_MutexDataReady ) ) { + perror("MutexDataReady pthread_mutex_unlock ") ; + exit( 0 ) ; + } + _MutexDataReadyLocked = false ; +// cdebug_out << pthread_self() << "/" << ThreadNo() +// << "GraphExecutor::InNode::UnLockDataReady : " << Name() << endl ; +} Engines::Component_var GraphExecutor::InNode::Component() const { if ( IsFactoryNode() ) { @@ -1317,7 +1350,9 @@ void GraphExecutor::InNode::InitialState() int i; _ControlState = SUPERV::VoidState ; CreateNewThread( false ) ; - CreateNewThreadIf( false ) ; +//JR 15.04.2005 Debug PAL8624 RetroConception : +// CreateNewThreadIf( false ) ; + HasAllDataReady( false ) ; _SuspendSync = false ; _ResumeSync = false ; _InitLoop = false ; @@ -1342,7 +1377,7 @@ void GraphExecutor::InNode::InitialState() } } - int Pc = GetNodeInPortsSize() ; + int InPortsCount = GetNodeInPortsSize() ; for ( i = 0 ; i < GetNodeInPortsSize() ; i++ ) { const GraphBase::InPort * anInPort = GetNodeInPort(i) ; GraphBase::OutPort * anOutPort = anInPort->GetOutPort() ; @@ -1354,7 +1389,7 @@ void GraphExecutor::InNode::InitialState() CORBA::Any anAny = CORBA::Any() ; //JR 21.02.2005 Debug Memory leak : *anAny <<= (long ) 1 ; anAny <<= (long ) 1 ; - anOutPort->Value( anAny ) ; + anOutPort->SetValue( anAny ) ; _InitLoop = true ; cdebug << "InPort" << i << " " << anInPort->PortName() << " " << anInPort->PortStatus() << " OutPort " << anOutPort->PortStatus() << theAutomaton->StateName( anOutPort->PortState() ) @@ -1379,36 +1414,41 @@ void GraphExecutor::InNode::InitialState() << " Gate HeadNode" << endl ; } else { - cdebug << Name() << " IsHeadNode " << IsHeadNode() << " InPort" << i << " " << anInPort->PortName() - << " " << anInPort->PortStatus() << endl ; + cdebug << Name() << " IsHeadNode " << IsHeadNode() << " InPort" << i << " " + << anInPort->PortName() << " " << anInPort->PortStatus() << endl ; } if ( anInPort->IsGate() && anOutPort == NULL ) { - Pc-- ; - cdebug << "InPort" << i << " " << anInPort->PortName() << " Not connected Pc " << Pc << endl ; + InPortsCount-- ; + cdebug << "InPort" << i << " " << anInPort->PortName() << " Not connected InPortsCount " + << InPortsCount << endl ; } else if ( anOutPort ) { if ( anOutPort->IsDataConnected() || anOutPort->IsDataStream() ) { - Pc-- ; + InPortsCount-- ; anOutPort->PortState( SUPERV::ReadyState ) ; anOutPort->PortDone( true ) ; cdebug << "InPort" << i << " " << anInPort->PortName() << " " << anInPort->PortStatus() - << " " << theAutomaton->StateName( anOutPort->PortState() ) << " Pc " << Pc << endl ; + << " " << theAutomaton->StateName( anOutPort->PortState() ) << " InPortsCount " + << InPortsCount << endl ; } else if ( anOutPort->IsPortConnected() ) { anOutPort->PortState( SUPERV::WaitingState ) ; anOutPort->PortDone( false ) ; - cdebug << "InPort" << i << " " << anInPort->PortName() << " " << " " << anInPort->PortStatus() - << " " << theAutomaton->StateName( anOutPort->PortState() ) << " Pc " << Pc << endl ; + cdebug << "InPort" << i << " " << anInPort->PortName() << " " << " " + << anInPort->PortStatus() + << " " << theAutomaton->StateName( anOutPort->PortState() ) << " InPortsCount " + << InPortsCount << endl ; } else { cdebug << "InPort" << i << " " << anInPort->PortName() << " " << anInPort->PortStatus() << " OutPort " << anOutPort->NodeName() << " " << anOutPort->PortName() << " " - << theAutomaton->StateName( anOutPort->PortState() ) << " Pc " << Pc << endl ; + << theAutomaton->StateName( anOutPort->PortState() ) << " InPortsCount " + << InPortsCount << endl ; } } else { cdebug << "InPort" << i << " " << anInPort->PortName() << " " << " " << anInPort->PortStatus() - << " no corresponding OutPort Pc " << Pc << endl ; + << " no corresponding OutPort InPortsCount " << InPortsCount << endl ; } if ( anOutPort ) { if ( !anOutPort->IsDataStream() || anInPort->IsDataStream() ) { @@ -1447,13 +1487,13 @@ void GraphExecutor::InNode::InitialState() } } - _currentState = Pc > 0 ? GraphExecutor::DataWaitingState + _currentState = InPortsCount > 0 ? GraphExecutor::DataWaitingState : GraphExecutor::DataReadyState ; - if ( Pc == GetNodeInPortsSize() ) { + if ( InPortsCount == GetNodeInPortsSize() ) { _OutNode->PushEvent( this , GraphExecutor::NoDataReadyEvent , _currentState ) ; } - else if ( Pc != 0 ) { + else if ( InPortsCount != 0 ) { _OutNode->PushEvent( this , GraphExecutor::SomeDataReadyEvent , _currentState ) ; } @@ -1468,10 +1508,9 @@ void GraphExecutor::InNode::InitialState() << " " << GetNodeOutPort(i)->Kind() << endl ; } - cdebug << "CurrentState = " << theAutomaton->StateName( _currentState ) - << endl; - - cdebug_out << "GraphExecutor::InNode::InitialState" << endl; + cdebug_out << "GraphExecutor::InNode::InitialState Node " << Name() << " CurrentState = " + << theAutomaton->StateName( _currentState ) << " HasAllDataReady " + << HasAllDataReady() << endl; } bool GraphExecutor::InNode::InitPythonFunctions(bool WithErr ) { diff --git a/src/GraphExecutor/DataFlowExecutor_InNode.hxx b/src/GraphExecutor/DataFlowExecutor_InNode.hxx index c8bf760..3986ceb 100644 --- a/src/GraphExecutor/DataFlowExecutor_InNode.hxx +++ b/src/GraphExecutor/DataFlowExecutor_InNode.hxx @@ -80,7 +80,8 @@ namespace GraphExecutor { PyObject * _MyPyRunMethod ; bool _createNewThread ; - bool _createNewThreadIf ; +//JR 15.04.2005 Debug PAL8624 RetroConception : +// bool _createNewThreadIf ; int _RewindStack ; GraphExecutor::AutomatonState _OldState ; GraphExecutor::AutomatonState _currentState ; @@ -88,11 +89,17 @@ namespace GraphExecutor { SUPERV::ControlState _ControlState ; GraphExecutor::AutomatonState _NextState ; GraphExecutor::StateEventAction _NextAction ; + + pthread_mutex_t _MutexDataReady ; + bool _MutexDataReadyLocked ; + bool _HasAllDataReady ; + bool _PyFuncRunned ; bool _Loading ; - pthread_mutex_t _MutexDataWait ; - bool _DataWait ; +//JR 15.04.2005 Debug PAL8624 RetroConception : +// pthread_mutex_t _MutexDataWait ; +// bool _DataWait ; pthread_mutex_t _MutexWait ; @@ -328,9 +335,19 @@ namespace GraphExecutor { bool & Err ) ; void RemovePyDynInvoke( char * PyFuncName ) ; - void LockDataWait() ; - void UnLockDataWait() ; - bool IsLockedDataWait() { return _DataWait ; } ; +//JR 15.04.2005 Debug PAL8624 RetroConception : +// void LockDataWait() ; +// void UnLockDataWait() ; +// bool IsLockedDataWait() { return _DataWait ; } ; + void LockDataReady() ; + void UnLockDataReady() ; + void HasAllDataReady( bool hasAllDataReady ) { +// cdebug << "Executor::InNode::HasAllDataReady( " << hasAllDataReady +// << " ) " << Name() << " previous _HasAllDataReady " << _HasAllDataReady +// << endl ; + _HasAllDataReady = hasAllDataReady ; } ; + bool HasAllDataReady() const { + return _HasAllDataReady ; } bool Ping() ; bool ContainerKill() ; @@ -345,11 +362,12 @@ namespace GraphExecutor { void CreateNewThread( bool k_create ) { // cdebug << Name() << " CreateNewThread " << k_create << endl ; _createNewThread = k_create ; } ; - void CreateNewThreadIf( bool k_create ) { +//JR 15.04.2005 Debug PAL8624 RetroConception : +// void CreateNewThreadIf( bool k_create ) { // cdebug << Name() << " CreateNewThreadIf( " << k_create << " )" << endl ; - _createNewThreadIf = k_create ; } ; +// _createNewThreadIf = k_create ; } ; bool CreateNewThread() { return _createNewThread ; } ; - bool CreateNewThreadIf() { return _createNewThreadIf ; } ; +// bool CreateNewThreadIf() { return _createNewThreadIf ; } ; void NewThread( pthread_t aThread ) ; void ExitThread() ; void RewindStack( int aRewindStack ) { _RewindStack = aRewindStack ; } ; -- 2.39.2