]> SALOME platform Git repositories - modules/superv.git/blobdiff - src/GraphExecutor/DataFlowExecutor_OutNode.cxx
Salome HOME
MPV: Merge V1_2d
[modules/superv.git] / src / GraphExecutor / DataFlowExecutor_OutNode.cxx
index 1d8705d451bf3d7dcbc61eb3c5b5eb7a90fb7128..d9e0fb886d5df5366f1df7a47b30a77f79f0390d 100644 (file)
@@ -440,6 +440,17 @@ bool GraphExecutor::OutNode::Run( const bool AndSuspend ) {
         PyInitialized( true ) ;
       }
       anInNode->InitialState( this ) ;
+      if ( anInNode->IsOneOfInLineNodes() ) {
+        anInNode->InitPythonFunctions( false ) ;
+      }
+    }
+// One more time because inline nodes may share one definition of the same function
+    for ( i = 0 ; i < GraphNodesSize() ; i++ ) {
+      GraphExecutor::InNode * anInNode = (GraphExecutor::InNode *) GraphNodes( i )->GetInNode() ;
+      if ( anInNode->IsOneOfInLineNodes() ) {
+        if ( anInNode->InitPythonFunctions( true ) ) {
+       }
+      }
     }
 
     cdebug << "Execution starting GraphExecutor::Action_DataOk_RunService Node "
@@ -454,6 +465,7 @@ bool GraphExecutor::OutNode::Run( const bool AndSuspend ) {
       if ( anInNode->State() != SUPERV::DataReadyState ) {
         cdebug << "GraphExecutor::OutNode::Run inconsistency State of Node "
              << anInNode->Name() << " : " << anInNode->State() << endl ;
+        cdebug_out << "GraphExecutor::OutNode::Run State ERROR" << endl ;
         return false ;
       }
 //      PushEvent( anInNode , GraphExecutor::ReadyEvent ,
@@ -466,6 +478,7 @@ bool GraphExecutor::OutNode::Run( const bool AndSuspend ) {
         if ( !anInNode->SendEvent( GraphExecutor::SomeDataReadyEvent ) ) {
           cdebug << "InNode::SendEvent( SomeDataReadyEvent ) ERROR Node "
                  << anInNode->Name() << endl ;
+          cdebug_out << "GraphExecutor::OutNode::Run SendEvent ERROR" << endl ;
           return false ;
         }
         anInNode->SuspendedWait() ;
@@ -473,6 +486,7 @@ bool GraphExecutor::OutNode::Run( const bool AndSuspend ) {
       else if ( !anInNode->SendEvent( GraphExecutor::ExecuteEvent ) ) {
         cdebug << "InNode::SendEvent( ExecuteEvent ) ERROR Node "
                << anInNode->Name() << endl ;
+        cdebug_out << "GraphExecutor::OutNode::Run SendEvent ERROR" << endl ;
         return false ;
       }
       else {
@@ -523,7 +537,7 @@ void GraphExecutor::OutNode::CheckAllDone() {
   SUPERV::AutomatonState OutNodeState = SUPERV::SuccessedState ;
   SUPERV::AutomatonState InNodeState ;
   bool AllDone = true ;
-  if ( !_Done ) {
+  if ( !Done() ) {
     for ( j = 0 ; j < QueueNodesSize() ; j++ ) {
       InNodeState = ( (GraphExecutor::InNode * ) QueueNodes( j )->GetInNode() )->State() ;
       cdebug << j << ". "
@@ -534,7 +548,9 @@ void GraphExecutor::OutNode::CheckAllDone() {
            InNodeState != SUPERV::DataWaitingState ) {
         AllDone = false ;
       }
-      if ( InNodeState != SUPERV::SuccessedState ) {
+      if ( InNodeState != SUPERV::SuccessedState &&
+           InNodeState != SUPERV::DataWaitingState &&
+           InNodeState != SUPERV::DataReadyState ) {
         OutNodeState = InNodeState ;
       }
     }
@@ -549,40 +565,62 @@ void GraphExecutor::OutNode::CheckAllDone() {
       }
     }
     if ( AllDone ) {
-      State( OutNodeState ) ;
-      _Done = true ;
-      _JustStarted = false ;
+      int alivenodes = 0 ;
       for ( j = 0 ; j < GraphNodesSize()  ; j++ ) {
         GraphExecutor::InNode * aNode ;
         aNode = (GraphExecutor::InNode * ) GraphNodes( j )->GetInNode() ;
         SUPERV::GraphState aState = AutomatonGraphState( aNode->State() ) ;
         cdebug << "GraphExecutor::OutNode::CheckAllDone " << aNode->Name() << " "
-               << theAutomaton->StateName( aNode->State() ) << " :" << endl ;
+               << theAutomaton->StateName( aNode->State() ) << " CreateNewThread " << aNode->CreateNewThread()
+               << endl ;
         if ( aState == SUPERV::ErrorState ||
              aState == SUPERV::SuspendErroredState ||
              aState == SUPERV::KillState ||
              aState == SUPERV::StopState ) {
           OutNodeState = aNode->State() ;
+          State( OutNodeState ) ;
+       }
+        else if ( aState == SUPERV::ReadyState ||
+                  aState == SUPERV::SuspendReadyState ||
+                  aState == SUPERV::RunningState ||
+                  aState == SUPERV::SuspendDoneState ||
+                  aState == SUPERV::SuspendErroredState ||
+                  aState == SUPERV::ReRunState ||
+                  aState == SUPERV::ReStartState ||
+                  aState == SUPERV::SuspendState ) {
+          alivenodes += 1 ;
        }
         aNode->SuspendedAction() ;
         aNode->DoneAction() ;
       }
 // PushEvent AFTER State and _Done ! ...
+      if ( alivenodes == 0 ) {
+        State( OutNodeState ) ;
+        Done( true ) ;
+        _JustStarted = false ;
+      }
       PushEvent( NULL , GraphExecutor::EndExecuteEvent ,
                  OutNodeState ) ;
 //      Py_Finalize() ;
 //      PyInitialized( false ) ;
     }
   }
-  cdebug_out << "GraphExecutor::OutNode::CheckAllDone " << _Done
-             << " GraphAutomatonState " << theAutomaton->StateName( AutomatonState() )
-             << " State " << State() << " Threads " << _Threads << " SuspendedThreads "
-             << _SuspendedThreads << endl ;
-  if ( _Done ) {
+  if ( IsDone() ) {
     MESSAGE("================================================================================") ;
-    MESSAGE( Name() << " IS DONE" ) ;
+    MESSAGE( Name() << " IS DONE : " <<  theAutomaton->StateName( AutomatonState() ) << " EventQSize "
+             << EventQSize() ) ;
     MESSAGE("================================================================================") ;
-  }
+    cdebug << "================================================================================" << endl ;
+    cdebug << Name() << " IS DONE : " <<  theAutomaton->StateName( AutomatonState() ) << " EventQSize "
+             << EventQSize() << endl  ;
+    cdebug << "================================================================================" << endl ;
+    //cout << Name() << " IS DONE : " <<  theAutomaton->StateName( AutomatonState() ) << " EventQSize "
+    //     << EventQSize() << endl  ;
+  }
+  cdebug_out << "GraphExecutor::OutNode::CheckAllDone " << IsDone()
+             << " GraphAutomatonState " << theAutomaton->StateName( AutomatonState() )
+             << " State " << State() << " Threads " << _Threads << " SuspendedThreads "
+             << _SuspendedThreads << " EventQSize " << EventQSize() << endl ;
 }
 
 void GraphExecutor::OutNode::PThreadLock( pthread_mutex_t * aMutex , char * errmsg ) {
@@ -617,6 +655,7 @@ void GraphExecutor::OutNode::PyThreadLock() {
     perror( "GraphExecutor::OutNode::PyThreadLock" ) ;
     exit( 0 ) ;
   }
+  theAutomaton->PyLock() ;
 //  cout << " GraphExecutor::OutNode::PyThreadLocked " << pthread_self() << endl ;
 }
 
@@ -626,6 +665,7 @@ void GraphExecutor::OutNode::PyThreadUnLock() {
     perror( "GraphExecutor::OutNode::PyThreadUnLock" ) ;
     exit( 0 ) ;
   }
+  theAutomaton->PyUnLock() ;
 //  cout << " GraphExecutor::OutNode::PyThreadUnLocked " << pthread_self() << endl ;
 }
 
@@ -643,6 +683,7 @@ void GraphExecutor::OutNode::NewThread() {
     exit( 0 ) ;
   }
 }
+
 void GraphExecutor::OutNode::ExitThread() {
   if ( pthread_mutex_lock( &_MutexWait ) ) {
     perror("pthread_mutex_lock _ExitThread") ;
@@ -662,6 +703,9 @@ void GraphExecutor::OutNode::ExitThread() {
   }
   if ( _Threads == 0 && _SuspendedThreads == 0 ) {
     CheckAllDone() ;
+    if ( IsDone() ) {
+      theAutomaton->Executed() ;
+    }
   }
 }
 void GraphExecutor::OutNode::JoinedWait() {
@@ -679,6 +723,7 @@ void GraphExecutor::OutNode::JoinedWait() {
     exit( 0 ) ;
   }
 }
+
 void GraphExecutor::OutNode::SuspendThread() {
   if ( pthread_mutex_lock( &_MutexWait ) ) {
     perror("pthread_mutex_lock _SuspendThread") ;
@@ -1008,6 +1053,10 @@ SUPERV::GraphState GraphExecutor::OutNode::AutomatonGraphState(SUPERV::Automaton
   return aGraphState ;
 }
 
+int GraphExecutor::OutNode::GetListSize() {
+  return _EventNodes.size();
+}
+
 bool GraphExecutor::OutNode::PushEvent( GraphExecutor::InNode * aNode ,
                                         GraphExecutor::NodeEvent anEvent ,
                                         SUPERV::AutomatonState aState ) {
@@ -1040,6 +1089,7 @@ bool GraphExecutor::OutNode::PushEvent( GraphExecutor::InNode * aNode ,
   }
 //  cdebug_out << "PushEvent Threads " << Threads() << " SuspendedThreads "
 //             << SuspendedThreads() << endl ;
+#if 0
   if ( _EventNodes.size() > 101 ) {
     while ( _EventNodes.size() > 31 ) {
       _EventNodes.pop_front() ;
@@ -1047,6 +1097,7 @@ bool GraphExecutor::OutNode::PushEvent( GraphExecutor::InNode * aNode ,
       _States.pop_front() ;
     }
   }
+#endif
   if ( pthread_mutex_unlock( &_MutexWait ) ) {
     perror("PushEvent pthread_mutex_unlock ") ;
     exit( 0 ) ;
@@ -1064,6 +1115,9 @@ bool GraphExecutor::OutNode::Event( char ** aNodeName ,
                                     bool WithWait ) {
   int ThreadsNumber ;
   int SuspendedThreadsNumber ;
+  if ( _EventNodes.size() > 0 ) {
+    cdebug_in << "GraphExecutor::OutNode::Event " << _EventNodes.size() << " in queue" << endl ;
+  }
   if ( pthread_mutex_lock( &_MutexWait ) ) {
     perror("EventLoop pthread_mutex_lock ") ;
     exit( 0 ) ;
@@ -1078,8 +1132,8 @@ bool GraphExecutor::OutNode::Event( char ** aNodeName ,
   SUPERV::AutomatonState theState = SUPERV::UnKnownState ;
   anEvent = SUPERV::UndefinedEvent ;
   aState = SUPERV::UndefinedState ;
-  if ( ( IsDone() || IsKilled() || IsStopped() ) && _EventNodes.size() == 0 ) {
-//    cdebug << "EventLoop IsDone()/IsKilled()/IsStopped() && _EventNodes.size() == 0" << endl ;
+  if ( ( Done() || IsKilled() || IsStopped() ) && _EventNodes.size() == 0 ) {
+//    cdebug << "EventLoop Done()/IsKilled()/IsStopped() && _EventNodes.size() == 0" << endl ;
     RetVal = false ;
   }
   else if ( !WithWait && _EventNodes.size() == 0 ) {
@@ -1121,6 +1175,9 @@ bool GraphExecutor::OutNode::Event( char ** aNodeName ,
     perror("EventLoop pthread_mutex_lock ") ;
     exit( 0 ) ;
   }
+  if ( _EventNodes.size() > 0 ) {
+    cdebug_out << "GraphExecutor::OutNode::Event " << _EventNodes.size() << " in queue" << endl ;
+  }
   return RetVal ;
 }
 
@@ -1270,8 +1327,37 @@ bool GraphExecutor::OutNode::EventWait( char ** aNodeName ,
   }
   return RetVal ;
 }
+long GraphExecutor::OutNode::EventQSize() {
+  return _EventNodes.size() ;
+}
+
+void GraphExecutor::OutNode::EventList() {
+  if ( pthread_mutex_lock( &_MutexWait ) ) {
+    perror("EventList pthread_mutex_lock ") ;
+    exit( 0 ) ;
+  }
+  list< char * >::iterator itEventNodes = _EventNodes.begin() ;
+  list< GraphExecutor::NodeEvent >::iterator itEvents = _Events.begin() ;
+  list< SUPERV::AutomatonState >::iterator itStates = _States.begin() ;
+  while ( itEventNodes != _EventNodes.end() ) {
+    cdebug << pthread_self() << "EVENTSTACK "
+           << *itEventNodes << " " << *itEvents << " "
+           << theAutomaton->StateName( *itStates )
+           << " Threads " << Threads() << " SuspendedThreads " << SuspendedThreads() << endl ;
+    itEventNodes++ ;
+    itEvents++ ;
+    itStates++ ;
+  }
+  if ( pthread_mutex_unlock( &_MutexWait ) ) {
+    perror("EventList pthread_mutex_lock ") ;
+    exit( 0 ) ;
+  }
+}
 
 void GraphExecutor::OutNode::State(SUPERV::AutomatonState aState ) {
+//  cdebug << "GraphExecutor::OutNode::State " << Name() << " "
+//         << theAutomaton->StateName( AutomatonGraphState( _State ) ) << " ---> "
+//         << theAutomaton->StateName( AutomatonGraphState( aState ) ) << endl ;
   _State = aState ;
 }
 
@@ -1299,13 +1385,11 @@ SUPERV::GraphState GraphExecutor::OutNode::State( const char * NodeName ) {
   return AutomatonGraphState( aret ) ;
 }
 
-SUPERV::GraphState GraphExecutor::OutNode::State(
-                                    const char * NodeName ,
-                                    const char * ServiceParameterName )  {
+SUPERV::GraphState GraphExecutor::OutNode::State( const char * NodeName ,
+                                                  const char * ServiceParameterName )  {
 //  cdebug_in << "GraphExecutor::OutNode::State " << NodeName << " "
 //            << ServiceParameterName<< endl;
-  SUPERV::GraphState aret =
-                    PortState( NodeName , ServiceParameterName ) ;
+  SUPERV::GraphState aret = PortState( NodeName , ServiceParameterName ) ;
 //  cdebug_out << "GraphExecutor::OutNode::State" << endl ;
   return aret ;
 }
@@ -1359,25 +1443,25 @@ void GraphExecutor::OutNode::ControlClear( const char * NodeName ) {
 bool GraphExecutor::OutNode::IsWaiting() {
 //  cdebug_in << "GraphExecutor::OutNode::IsWaiting" << endl;
 //  cdebug_out << "GraphExecutor::OutNode::IsWaiting" << endl ;
-  return !_Done ;
+  return !IsDone() ;
 }
 
 bool GraphExecutor::OutNode::IsReady() {
 //  cdebug_in << "GraphExecutor::OutNode::IsReady" << endl;
 //  cdebug_out << "GraphExecutor::OutNode::IsReady" << endl ;
-  return !_Done ;
+  return !IsDone() ;
 }
 
 bool GraphExecutor::OutNode::IsRunning() {
 //  cdebug_in << "GraphExecutor::OutNode::IsRunning" << endl;
 //  cdebug_out << "GraphExecutor::OutNode::IsRunning" << endl ;
-  return !_Done ;
+  return !IsDone() ;
 }
 
 bool GraphExecutor::OutNode::IsDone() {
 //  cdebug_in << "GraphExecutor::OutNode::IsDone" << endl;
 //  cdebug_out << "GraphExecutor::OutNode::IsDone" << endl ;
-  return ( _Done || IsKilled() || IsStopped() ) ;
+  return ( Done() || IsKilled() || IsStopped() ) ;
 }
 
 bool GraphExecutor::OutNode::IsSuspended() {
@@ -1510,44 +1594,84 @@ bool GraphExecutor::OutNode::ContainerKill() {
 }
 
 bool GraphExecutor::OutNode::Suspend() {
-  bool RetVal = false ;
+  int RetVal = 0 ;
   cdebug_in << "GraphExecutor::OutNode::Suspend" << endl;
-  _ControlState = SUPERV::ToSuspendState ;
+//  _ControlState = SUPERV::ToSuspendState ;
   int i ;
   for ( i = 0 ; i < GraphNodesSize() ; i++ ) {
     GraphExecutor::InNode * aNode = (GraphExecutor::InNode * ) GraphNodes( i )->GetInNode() ;
     bool sts = aNode->Suspend() ;
     if ( sts && aNode->IsSuspended() ) {
-      cdebug << aNode->Name() << " Suspend" << endl ;
+      RetVal += 1 ;
+      cdebug << aNode->Name() << " Suspended" << endl ;
     }
     else if ( aNode->IsWaiting() || aNode->IsDone() ) {
       cdebug << aNode->Name() << " not Suspended : "
              << theAutomaton->StateName( aNode->State() ) << endl ;
     }
     else {
+      RetVal += 1 ;
       cdebug << aNode->Name() << " cannot be Suspended : "
              << theAutomaton->StateName( aNode->State() ) << endl ;
-      RetVal = false ;
     }
   }
-  State( SUPERV::SuspendedState ) ;
-  cdebug_out << "GraphExecutor::OutNode::Suspend" << endl ;
   if ( RetVal ) {
+    State( SUPERV::SuspendedState ) ;
     MESSAGE("================================================================================") ;
     MESSAGE( Name() << " IS SUSPENDED" ) ;
     MESSAGE("================================================================================") ;
   }
+  else {
+    MESSAGE("================================================================================") ;
+    MESSAGE( Name() << " IS NOT SUSPENDED" ) ;
+    MESSAGE("================================================================================") ;
+  }
+  cdebug_out << "GraphExecutor::OutNode::Suspend" << theAutomaton->StateName( State() ) << endl ;
   return RetVal ;
 }
+
 bool GraphExecutor::OutNode::Resume() {
-  bool RetVal = false ;
+  int RetVal = 0 ;
   cdebug_in << "GraphExecutor::OutNode::Resume" << endl;
-  cdebug_out << "GraphExecutor::OutNode::Resume" << endl ;
+  if ( IsSuspended() ) {
+    State( SUPERV::ExecutingState ) ;
+    int i ;
+    for ( i = 0 ; i < GraphNodesSize() ; i++ ) {
+      GraphExecutor::InNode * aNode = (GraphExecutor::InNode * ) GraphNodes( i )->GetInNode() ;
+      aNode->ControlState( SUPERV::VoidState ) ;
+      if ( aNode->IsSuspended() ) {
+        cdebug << aNode->Name() << "->Resume " << theAutomaton->StateName( aNode->State() )
+               << endl ;
+        if ( aNode->Resume() ) {
+          cdebug << aNode->Name() << " Resumed " << theAutomaton->StateName( aNode->State() )
+                 << endl ;
+          RetVal += 1 ;
+       }
+        else {
+          cdebug << aNode->Name() << " Resume failed"
+                 << theAutomaton->StateName( aNode->State() ) << endl ;
+        }
+      }
+      else {
+        cdebug << aNode->Name() << " " << theAutomaton->StateName( aNode->State() )
+               << endl ;
+      }
+    }
+  }
+  else {
+    cdebug << Name() << " not suspended " << theAutomaton->StateName( State() ) << endl ;
+  }
   if ( RetVal ) {
     MESSAGE("================================================================================") ;
     MESSAGE( Name() << " IS RESUMED" ) ;
     MESSAGE("================================================================================") ;
   }
+  else {
+    MESSAGE("================================================================================") ;
+    MESSAGE( Name() << " IS NOT RESUMED" ) ;
+    MESSAGE("================================================================================") ;
+  }
+  cdebug_out << "GraphExecutor::OutNode::Resume" << theAutomaton->StateName( State() ) << " " << RetVal << endl ;
   return RetVal ;
 }
 
@@ -1576,25 +1700,33 @@ bool GraphExecutor::OutNode::Kill() {
     for ( i = 0 ; i < GraphNodesSize() ; i++ ) {
       GraphExecutor::InNode * aNode = (GraphExecutor::InNode * ) GraphNodes( i )->GetInNode() ;
       if ( !aNode->IsKilled() && !aNode->IsWaiting() && !aNode->IsDone() ) {
+        cdebug << aNode->Name() << " not killed : "
+               << theAutomaton->StateName( aNode->State() ) << " " << aNode->Name() << "->"
+               << "KilledAction()" << endl ;
         aNode->KilledAction() ;
       }
     }
     RetVal = true ;
   }
   State( SUPERV::KilledState ) ;
-  cdebug_out << "GraphExecutor::OutNode::Kill" << endl ;
   if ( RetVal ) {
     MESSAGE("================================================================================") ;
     MESSAGE( Name() << " IS KILLED" ) ;
     MESSAGE("================================================================================") ;
   }
+  else {
+    MESSAGE("================================================================================") ;
+    MESSAGE( Name() << " IS NOT KILLED" ) ;
+    MESSAGE("================================================================================") ;
+  }
+  cdebug_out << "GraphExecutor::OutNode::Kill" << endl ;
   return RetVal ;
 }
 
 bool GraphExecutor::OutNode::Stop() {
   bool RetVal = false ;
   cdebug_in << "GraphExecutor::OutNode::Stop" << endl;
-  Kill() ;
+  RetVal = Kill() ;
   cdebug_out << "GraphExecutor::OutNode::Stop" << endl ;
   if ( RetVal ) {
     MESSAGE("================================================================================") ;
@@ -1693,7 +1825,7 @@ bool GraphExecutor::OutNode::SuspendedWait() {
     exit( 0 ) ;
   }
   aret = IsSuspended() ;
-  while ( !aret && !_Done ) {
+  while ( !aret && !IsDone() ) {
     pthread_cond_wait( &_EventWait , &_MutexWait );
     aret = IsSuspended() ;
   }