]> SALOME platform Git repositories - modules/superv.git/blobdiff - src/GraphExecutor/DataFlowExecutor_InNodeThreads.cxx
Salome HOME
DCQ : Merge with Ecole_Ete_a6.
[modules/superv.git] / src / GraphExecutor / DataFlowExecutor_InNodeThreads.cxx
index ab2e17f93aae5ba94da278cc678877baa49e9002..b8a9eda24f26eb39c9800c8d1ac4de55d7b49285 100644 (file)
@@ -39,14 +39,18 @@ using namespace std;
 
 #include <SALOMEconfig.h>
 #include CORBA_CLIENT_HEADER(SALOME_Component)
-//#include "SALOME_NamingService.hxx"
 #include "SALOME_LifeCycleCORBA.hxx"
 
-//#include "DataFlowExecutor_InNode.hxx"
+//#include "StreamGraph_Impl.hxx"
 
 #include "DataFlowExecutor_OutNode.hxx"
 
-//static char *containerName = "FactoryServer" ;
+
+static void UpperCase(std::string& rstr)
+{
+       std::transform(rstr.begin(), rstr.end(), rstr.begin(), towupper);
+}
+
 
 int GraphExecutor::InNode::SendEvent( const GraphExecutor::NodeEvent anEvent ) {  
 
@@ -487,10 +491,11 @@ void GraphExecutor::InNode::ThreadStartAction() {
 }
 
 int GraphExecutor::InNode::executeAction() {
-  int oldRewindStack = ( _RewindStack > 101 ) ;
+  int oldRewindStack = ( _RewindStack > MAXSTACKTHREADSIZE ) ;
   if ( !CreateNewThread() && oldRewindStack ) {
     cdebug << pthread_self() << "/" << ThreadNo()
-           << " executeAction start Thread _RewindStack " << _RewindStack << " > 101 CreateNewThread "
+           << " executeAction start Thread _RewindStack " << _RewindStack << " > "
+           << MAXSTACKTHREADSIZE << " CreateNewThread "
            << CreateNewThread() << " " << Automaton()->ActionName( _NextAction ) << "(" << Name() << ")"
            << endl;
     CreateNewThread( true ) ;
@@ -754,9 +759,9 @@ int GraphExecutor::InNode::VoidAction() {
 
 
 int GraphExecutor::InNode::DataWaiting_SomeDataReadyAction() {
-//  cdebug << pthread_self() << "/" << ThreadNo()
-//         << " --> DataWaiting_SomeDataReadyAction from " << DataFromNode()
-//         << " to " << Name() << endl;
+  cdebug << pthread_self() << "/" << ThreadNo()
+         << " --> DataWaiting_SomeDataReadyAction from " << DataFromNode()
+         << " to " << Name() << endl;
   unsigned int k;
   int InReady = 0 ;
   int res = 1;
@@ -778,6 +783,7 @@ int GraphExecutor::InNode::DataWaiting_SomeDataReadyAction() {
   for ( k = 0 ; k < (unsigned int ) GetNodeInPortsSize() ; k++ ) {
     GraphBase::InPort * anInPort = GetChangeNodeInPort(k) ;
     GraphBase::OutPort * anOutPort = anInPort->GetOutPort() ;
+    cdebug << pthread_self() << "/" << ThreadNo() << " " << Name() << " InPort " << anInPort->PortName() << endl ;
     if ( anInPort->IsGate() && anOutPort == NULL ) {
       InReady += 1 ;
       anInPort->State( SUPERV::ReadyState ) ;
@@ -1071,9 +1077,14 @@ int GraphExecutor::InNode::DataReady_ExecuteAction() {
     }
   }
   else if ( CORBA::is_nil( Component() ) ) {
-    Err = !_OutNode->StartComponent( ThreadNo() , Computer() ,
-                                     my_strdup( ComponentName() ) ,
-                                     myContainer , myObjComponent ) ;
+    ostringstream astr ;
+    astr << "Graph " << _OutNode->Graph()->Name() << " Node " << Name()
+         << " : load of component " << ComponentName() << " in container "
+         << Computer() ;
+//    _OutNode->Graph()->ObjImpl()->sendMessage( NOTIF_STEP, astr.str().c_str() ) ;
+    Err = !_OutNode->Graph()->StartComponent( ThreadNo() , Computer() ,
+                                              my_strdup( ComponentName() ) ,
+                                              myContainer , myObjComponent ) ;
     ObjInterface( false ) ;
     SetContainer( myContainer ) ;
     SetComponent( myObjComponent ) ;
@@ -1083,7 +1094,13 @@ int GraphExecutor::InNode::DataReady_ExecuteAction() {
     myObjComponent = Component() ;
 //    cdebug << ThreadNo() << "Component known : NO StartComponent & Ping"
 //           << endl ;
-    myObjComponent->ping() ;
+    try {
+      myObjComponent->ping() ;
+    }
+    catch( ... ) {
+      cdebug << "ping() catched" << endl ;
+      Err = true ;
+    }
   }
 
   int nOutParams = GetNodeOutPortsSize()  ;
@@ -1098,6 +1115,9 @@ int GraphExecutor::InNode::DataReady_ExecuteAction() {
     }
     else {
       if ( !Err ) {
+        ostringstream astr ;
+        astr << "Graph " << _OutNode->Graph()->Name() << " Run of Node " << Name() ;
+//        _OutNode->Graph()->ObjImpl()->sendMessage( NOTIF_STEP, astr.str().c_str() ) ;
         cdebug << ThreadNo() << " Run( '" << ServiceName() << "'" ;
         for ( i = 0 ; i < (int ) ServiceInParameter().length() ; i++ ) {
           cdebug << " , " << InParametersList[ i ].Name << "[kind"
@@ -1341,26 +1361,60 @@ int GraphExecutor::InNode::DataReady_ExecuteAction() {
         else {
           try {
             try {
-              cdebug << "DynInvoke -> Names " << _OutNode->Name() << " " << Name() << endl ;
+              cdebug << "DynInvoke -> Names " << _OutNode->Graph()->Name() << " " << Name() << endl ;
               DynInvoke( myObjComponent, "Names" ,
-                         _OutNode->Name() , Name() ) ;
+                         _OutNode->Graph()->Name() , Name() ) ;
             }
             catch( ... ) {
               cdebug << "DynInvoke Names catched ERROR" << endl ;
            }
-            cdebug << ServiceInParameter().length() << " input parameters and "
-                   << ServiceOutParameter().length() << " output parameters" << endl ;
-            if ( IsComputingNode() ) {
+// for DataStreamNodes : call of SetProperties ===> environment variables in the component/container
+            if ( ComputingNode()->HasDataStream() ) {
+              try {
+                cdebug << "DynInvoke -> SetProperties " << _OutNode->Graph()->Name() << " " << Name() << endl ;
+               Engines::FieldsDict_var dict = new Engines::FieldsDict;
+               dict->length( 4 );
+               dict[ 0 ].key = CORBA::string_dup( "CAL_MACHINE");
+               // myContainer->getHostName() ne renvoit pas le nom complet (avec domaine).
+               //              dict[ 0 ].value <<= myContainer->getHostName() ;
+               char FullyQualifiedDomainName[256]="";
+               gethostname(FullyQualifiedDomainName,255);
+               dict[ 0 ].value <<=  FullyQualifiedDomainName ;
+               dict[ 1 ].key = CORBA::string_dup( "CAL_REPERTOIRE");
+               dict[ 1 ].value <<= "/tmp" ;
+               dict[ 2 ].key = CORBA::string_dup( "CAL_COUPLAGE");
+                stringstream ofst1 ;
+                ofst1 << ComputingNode()->SubStreamGraph() ;
+               string cpl = string( "/tmp/" ) + string( _OutNode->Graph()->Name() ) + string( "_" ) + 
+                            ofst1.str() + string( ".cpl" );
+               dict[ 2 ].value <<= cpl.c_str() ;
+               dict[ 3 ].key = CORBA::string_dup( "SALOME_INSTANCE_NAME");
+               string uname = Name();
+               UpperCase( uname);
+               dict[ 3 ].value <<= uname.c_str() ;
+
+                myObjComponent->setProperties( dict ) ;
+              }
+              catch( ... ) {
+                cdebug << "DynInvoke setProperties catched ERROR" << endl ;
+                Err = true ;
+             }
+           }
+            if ( !Err && IsComputingNode() ) {
               cdebug << ThreadNo() << " !ObjInterface " << Name()
                      << " IsComputingNode DynInvoke"  << endl ;
+              cdebug << ServiceInParameter().length()-1 << " input parameters and "
+                     << ServiceOutParameter().length() << " output parameters" << endl ;
               DynInvoke( myObjComponent,
                          ServiceName() ,
                          &InParametersList[1] , ServiceInParameter().length()-1 ,
                          &OutParametersList[0] , ServiceOutParameter().length() ) ;
            }
-            else if ( IsFactoryNode() ) {
+            else if ( !Err &&IsFactoryNode() ) {
               cdebug << ThreadNo() << " !ObjInterface " << Name()
                      << " IsFactoryNode DynInvoke"  << endl ;
+              cdebug << ServiceInParameter().length() << " input parameters and "
+                     << ServiceOutParameter().length() << " output parameters" << endl ;
               DynInvoke( myObjComponent,
                          ServiceName() ,
                          &InParametersList[0] , ServiceInParameter().length() ,
@@ -1383,6 +1437,10 @@ int GraphExecutor::InNode::DataReady_ExecuteAction() {
 //    sleep( 1 ) ;
 //  }
 
+  ostringstream astr ;
+  astr << "Graph " << _OutNode->Graph()->Name() << " Node " << Name() << " is done : "
+       << Automaton()->StateName( State() ) ;
+//  _OutNode->Graph()->ObjImpl()->sendMessage( NOTIF_STEP, astr.str().c_str() ) ;
   if ( Err ) {
     if ( ControlState() == SUPERV::ToKillState ||
          ControlState() == SUPERV::ToKillDoneState ||
@@ -1584,15 +1642,25 @@ void GraphExecutor::InNode::SetWaitingStates(GraphExecutor::InNode * EndNode ) {
 //               << " --> GraphExecutor::InNodeThreads::SetWaitingStates " << Name() << endl;
         docdebug = true ;
       }
-      anInPort->State( SUPERV::WaitingState ) ;
+      if ( !anInPort->IsDataStream() ) {
+        anInPort->State( SUPERV::WaitingState ) ;
+      }
     }
   }
   for ( i = 0 ; i < GetNodeOutPortsSize() ; i++ ) {
-    for ( j = 0 ; j < GetChangeNodeOutPort( i )->InPortsSize() ; j++ ) {
-      GraphBase::OutPort * anOutPort = GetChangeNodeOutPort( i ) ;
+    GraphBase::OutPort * anOutPort = GetChangeNodeOutPort( i ) ;
+    for ( j = 0 ; j < anOutPort->InPortsSize() ; j++ ) {
       if ( !( IsGOTONode() && anOutPort->IsGate() ) &&
-           !( IsEndLoopNode() && ( anOutPort->IsGate() || anOutPort->IsLoop() ) ) ) {
-        GraphExecutor::InNode * aNode = (GraphExecutor::InNode * ) _OutNode->GetChangeGraphNode( anOutPort->ChangeInPorts( j )->NodeName() )->GetInNode() ;
+           !( IsEndLoopNode() && ( anOutPort->IsGate() ||
+              anOutPort->IsLoop() ) ) &&
+           !anOutPort->IsDataStream() &&
+           !anOutPort->ChangeInPorts( j )->IsDataStream() ) {
+//        cdebug << ThreadNo()
+//               << " GraphExecutor::InNodeThreads::SetWaitingStates "
+//               << Name() << "( " << anOutPort->PortName() << " ) --> InPort "
+//               << anOutPort->ChangeInPorts( j )->PortName() << " from Node "
+//               << anOutPort->ChangeInPorts( j )->NodeName() << endl;
+        GraphExecutor::InNode * aNode = (GraphExecutor::InNode * ) _OutNode->Graph()->GetChangeGraphNode( anOutPort->ChangeInPorts( j )->NodeName() )->GetInNode() ;
         if ( aNode != EndNode ) {
           aNode->SetWaitingStates( EndNode ) ;
        }
@@ -1616,6 +1684,8 @@ int GraphExecutor::InNode::Successed_SuccessAction() {
 
   if ( IsGOTONode() ||
        ( IsEndLoopNode() && GetNodeInLoop()->GetOutPort()->BoolValue() ) ) {
+    cdebug << ThreadNo() << " Successed_SuccessAction " << Name()
+           << " SetWaitingStates " << endl ;
     const GraphBase::OutPort * aGateOutPort ;
     if ( IsGOTONode() ) {
       aGateOutPort = GetNodeOutGate() ;
@@ -1625,7 +1695,7 @@ int GraphExecutor::InNode::Successed_SuccessAction() {
     }
     for ( i = 0 ; i < aGateOutPort->InPortsSize() ; i++ ) {
       const GraphBase::InPort * anInPort = aGateOutPort->InPorts( i ) ;
-      GraphExecutor::InNode * aLabelNode = (GraphExecutor::InNode *) _OutNode->GetChangeGraphNode( anInPort->NodeName() )->GetInNode() ;
+      GraphExecutor::InNode * aLabelNode = (GraphExecutor::InNode *) _OutNode->Graph()->GetChangeGraphNode( anInPort->NodeName() )->GetInNode() ;
 //      cdebug << ThreadNo() << " Successed_SuccessAction " << Name() << " will Loop to HeadNode "
 //             << aLabelNode->Name() << " from port " << anInPort->PortName() << endl ;
       aLabelNode->SetWaitingStates( this ) ;
@@ -1743,7 +1813,9 @@ int GraphExecutor::InNode::Successed_SuccessAction() {
 //    cdebug << endl;
     for ( i = 0 ; i < LinkedNodesSize() ; i++ ) {
       bool IgnoreForEndLoop = false ;
-      toNode = (GraphExecutor::InNode *) LinkedNodes( i )->GetInNode() ;
+      GraphBase::ComputingNode * aComputingNode ;
+      aComputingNode = (GraphBase::ComputingNode * ) LinkedNodes( i ) ;
+      toNode = (GraphExecutor::InNode *) aComputingNode->GetInNode() ;
 //      cdebug << ThreadNo() << " Successed_SuccessAction of " << Name()
 //             << " [" << i << "] " << LinkedNodes( i )->Name() << endl ;
       if ( toNode && !toNode->IsDataFlowNode() ) {
@@ -2123,7 +2195,9 @@ void GraphExecutor::InNode::InParametersSet( bool & Err ,
         *anAny <<= (long ) 0 ;
         theOutPort->Value( anAny ) ;
       }
-      anInPort->State( SUPERV::WaitingState ) ;
+      if ( !anInPort->IsDataStream() ) {
+        anInPort->State( SUPERV::WaitingState ) ;
+      }
       D.Name = CORBA::string_dup( anInPort->GetServicesParameter().Parametername ) ;
       cdebug << ThreadNo() << " ArgIn" << i << " " << anInPort->Kind() ;
       cdebug << "      " << D.Name << " " << anInPort->GetServicesParameter().Parametertype << " : " ;
@@ -2578,43 +2652,45 @@ bool GraphExecutor::InNode::OutParametersSet( bool Err ,
        }
         }
         OutParametersList[i] = D ;
-        if ( anOutPort->IsGate() ) {
-          aGateOutPort = anOutPort ;
-          cdebug << " Gate " ;
-          long l = 1;
-          OutParametersList[i].Value <<= l;
-          anOutPort->Value( OutParametersList[i].Value );
-        }
-        else if ( anOutPort->IsLoop() ) {
-          cdebug << " Loop " ;
-          anOutPort->Value( OutParametersList[i].Value );
+        if ( !anOutPort->IsDataStream() ) {
+          if ( anOutPort->IsGate() ) {
+            aGateOutPort = anOutPort ;
+            cdebug << " Gate " ;
+            long l = 1;
+            OutParametersList[i].Value <<= l;
+            anOutPort->Value( OutParametersList[i].Value );
+          }
+          else if ( anOutPort->IsLoop() ) {
+            cdebug << " Loop " ;
+            anOutPort->Value( OutParametersList[i].Value );
 // InLoop Port of EndLoopNode is ready :
-          anOutPort->ChangeInPorts(0)->State( SUPERV::ReadyState ) ;
-        }
-        else if ( anOutPort->IsSwitch() ) {
-          cdebug << " Switch " ;
-          anOutPort->Value( OutParametersList[i].Value );
-          if ( anOutPort->InPortsSize() && anOutPort->ChangeInPorts( 0 )->IsGate() ) {
-            if ( OrSwitch && anOutPort->BoolValue() ) {
-              cdebug << "GraphExecutor::InNodeThreads::OutParameters more than one switch is true WARNING"
-                     << endl ;
-           }
-            else {
-              OrSwitch = OrSwitch | anOutPort->BoolValue() ;
-           }
-         }
-          cdebug << "OrSwitch " << OrSwitch ;
-        }
-        else {
-          cdebug << " Param " ;
-          anOutPort->Value( OutParametersList[i].Value );
-        }
-        anOutPort->State( NewState ) ;
-        anOutPort->Done( true ) ;
+            anOutPort->ChangeInPorts(0)->State( SUPERV::ReadyState ) ;
+          }
+          else if ( anOutPort->IsSwitch() ) {
+            cdebug << " Switch " ;
+            anOutPort->Value( OutParametersList[i].Value );
+            if ( anOutPort->InPortsSize() && anOutPort->ChangeInPorts( 0 )->IsGate() ) {
+              if ( OrSwitch && anOutPort->BoolValue() ) {
+                cdebug << "GraphExecutor::InNodeThreads::OutParameters more than one switch is true WARNING"
+                       << endl ;
+             }
+              else {
+                OrSwitch = OrSwitch | anOutPort->BoolValue() ;
+             }
+           }
+            cdebug << "OrSwitch " << OrSwitch ;
+          }
+          else {
+            cdebug << " Param " ;
+            anOutPort->Value( OutParametersList[i].Value );
+          }
+          anOutPort->State( NewState ) ;
+          anOutPort->Done( true ) ;
+       }
         int j ;
         for ( j = 0 ; j < anOutPort->InPortsSize() ; j++ ) {
           bool fromGOTO = false ;
-          GraphBase::OutPort * aGOTOPort = _OutNode->GetChangeGraphNode( anOutPort->ChangeInPorts( j )->NodeName() )->GetChangeNodeInGate()->GetOutPort() ;
+          GraphBase::OutPort * aGOTOPort = _OutNode->Graph()->GetChangeGraphNode( anOutPort->ChangeInPorts( j )->NodeName() )->GetChangeNodeInGate()->GetOutPort() ;
           if ( aGOTOPort ) {
             fromGOTO = aGOTOPort->IsGOTO() ;
          }