]> SALOME platform Git repositories - modules/yacs.git/commitdiff
Salome HOME
Implementation of keep going mode of executor for ForEachLoops.
authorAnthony Geay <anthony.geay@edf.fr>
Wed, 8 Apr 2015 10:03:28 +0000 (12:03 +0200)
committerAnthony Geay <anthony.geay@edf.fr>
Wed, 8 Apr 2015 10:03:28 +0000 (12:03 +0200)
src/engine/DynParaLoop.cxx
src/engine/DynParaLoop.hxx
src/engine/ForEachLoop.cxx
src/engine/ForEachLoop.hxx

index 2b3f76d4d45605470dffd607a872c174b9a2baf8..0564dabd0fdd3048a03458a0e80169287949be22 100644 (file)
@@ -44,7 +44,7 @@ const char DynParaLoop::NAME_OF_NUMBER_OF_BRANCHES[]="nbBranches";
 DynParaLoop::DynParaLoop(const std::string& name, TypeCode *typeOfDataSplitted)
   : ComposedNode(name),_node(0),_initNode(0),_finalizeNode(0),_nbOfEltConsumed(0),
     _nbOfBranches(NAME_OF_NUMBER_OF_BRANCHES,this,Runtime::_tc_int),
-    _splittedPort(NAME_OF_SPLITTED_SEQ_OUT,this,typeOfDataSplitted)
+    _splittedPort(NAME_OF_SPLITTED_SEQ_OUT,this,typeOfDataSplitted),_initializingCounter(0),_unfinishedCounter(0),_failedCounter(0)
 {
 }
 
@@ -58,7 +58,7 @@ DynParaLoop::~DynParaLoop()
 DynParaLoop::DynParaLoop(const DynParaLoop& other, ComposedNode *father, bool editionOnly)
   : ComposedNode(other,father), _nbOfBranches(other._nbOfBranches,this),
     _splittedPort(other._splittedPort,this), _node(0), _initNode(0), _finalizeNode(0),
-    _nbOfEltConsumed(0)
+    _nbOfEltConsumed(0),_initializingCounter(0),_unfinishedCounter(0),_failedCounter(0)
 {
   if(other._node)
     _node=other._node->clone(this,editionOnly);
@@ -117,6 +117,7 @@ void DynParaLoop::init(bool start)
   _nbOfBranches.exInit(start);
   _splittedPort.exInit();
   _nbOfEltConsumed=0;
+  _failedCounter=0;
 }
 
 Node *DynParaLoop::edSetInitNode(Node *node)
index e5e476c050320c3cc4bf3f94453c5e87146cf694..3a29a872d0ef9a3f140d054c92212383adb112f7 100644 (file)
@@ -56,6 +56,7 @@ namespace YACS
       std::vector<Node *> _execFinalizeNodes;
       int _initializingCounter;
       int _unfinishedCounter;
+      int _failedCounter;
     protected:
       static const char NAME_OF_SPLITTED_SEQ_OUT[];
       static const char OLD_NAME_OF_SPLITTED_SEQ_OUT[];
index e27b6fcb5040e14afab8673129e69c5f5c1f8262..431fee9f51eb8cc0b49d6bb6964a395135e8b9f4 100644 (file)
@@ -546,113 +546,143 @@ YACS::Event ForEachLoop::updateStateOnFinishedEventFrom(Node *node)
   switch(getIdentityOfNotifyerNode(node,id))
     {
     case INIT_NODE:
-      _execNodes[id]->exUpdateState();
-      _nbOfEltConsumed++;
-      _initializingCounter--;
-      if (_initializingCounter == 0) _initNode->setState(DONE);
-      break;
+      return updateStateForInitNodeOnFinishedEventFrom(node,id);
     case WORK_NODE:
-      _currentIndex++;
-      exUpdateProgress();
-      storeOutValsInSeqForOutOfScopeUse(_execIds[id],id);
-      if(_execCurrentId==_splitterNode.getNumberOfElements())
-        {//No more elements of _dataPortToDispatch to treat
-          _execIds[id]=NOT_RUNNING_BRANCH_ID;
-          //analyzing if some samples are still on treatment on other branches.
-          bool isFinished=true;
-          for(int i=0;i<_execIds.size() && isFinished;i++)
-            isFinished=(_execIds[i]==NOT_RUNNING_BRANCH_ID);
-          if(isFinished)
-            {
-              try 
+      return updateStateForWorkNodeOnFinishedEventFrom(node,id,true);
+    case FINALIZE_NODE:
+      return updateStateForFinalizeNodeOnFinishedEventFrom(node,id);
+    default:
+      YASSERT(false);
+    }
+  return YACS::NOEVENT;
+}
+
+YACS::Event ForEachLoop::updateStateForInitNodeOnFinishedEventFrom(Node *node, unsigned int id)
+{
+  _execNodes[id]->exUpdateState();
+  _nbOfEltConsumed++;
+  _initializingCounter--;
+  if (_initializingCounter == 0)
+    _initNode->setState(DONE);
+  return YACS::NOEVENT;
+}
+
+/*!
+ * \param [in] isNormalFinish - if true
+ */
+YACS::Event ForEachLoop::updateStateForWorkNodeOnFinishedEventFrom(Node *node, unsigned int id, bool isNormalFinish)
+{
+  _currentIndex++;
+  exUpdateProgress();
+  if(isNormalFinish)
+    storeOutValsInSeqForOutOfScopeUse(_execIds[id],id);
+  if(_execCurrentId==_splitterNode.getNumberOfElements())
+    {//No more elements of _dataPortToDispatch to treat
+      _execIds[id]=NOT_RUNNING_BRANCH_ID;
+      //analyzing if some samples are still on treatment on other branches.
+      bool isFinished(true);
+      for(int i=0;i<_execIds.size() && isFinished;i++)
+        isFinished=(_execIds[i]==NOT_RUNNING_BRANCH_ID);
+      if(isFinished)
+        {
+          try
+          {
+              if(_failedCounter!=0)
                 {
-                  pushAllSequenceValues();
-  
-                  if (_node)
-                    {
-                      _node->setState(YACS::DONE);
-     
-                      ComposedNode* compNode = dynamic_cast<ComposedNode*>(_node);
-                      if (compNode)
-                        {
-                          std::list<Node *> aChldn = compNode->getAllRecursiveConstituents();
-                          std::list<Node *>::iterator iter=aChldn.begin();
-                          for(;iter!=aChldn.end();iter++)
-                            (*iter)->setState(YACS::DONE);
-                        }
-                    }
+                  std::ostringstream oss; oss << "Keep Going mode activated and some errors (" << _failedCounter << ")reported !";
+                  throw YACS::Exception(oss.str());
+                }
+              pushAllSequenceValues();
 
-                  if (_finalizeNode == NULL)
-                    {
-                      // No finalize node, we just finish the loop at the end of exec nodes execution
-                      setState(YACS::DONE);
-                      return YACS::FINISH;
-                    }
-                  else
+              if (_node)
+                {
+                  _node->setState(YACS::DONE);
+
+                  ComposedNode* compNode = dynamic_cast<ComposedNode*>(_node);
+                  if (compNode)
                     {
-                      // Run the finalize nodes, the loop will be done only when they all finish
-                      _unfinishedCounter = 0;  // This counter indicates how many branches are not finished
-                      for (int i=0 ; i<_execIds.size() ; i++)
-                        {
-                          YASSERT(_execIds[i] == NOT_RUNNING_BRANCH_ID);
-                          DEBTRACE("Launching finalize node for branch " << i);
-                          _execFinalizeNodes[i]->exUpdateState();
-                          _unfinishedCounter++;
-                        }
-                      return YACS::NOEVENT;
+                      std::list<Node *> aChldn = compNode->getAllRecursiveConstituents();
+                      std::list<Node *>::iterator iter=aChldn.begin();
+                      for(;iter!=aChldn.end();iter++)
+                        (*iter)->setState(YACS::DONE);
                     }
                 }
-              catch(YACS::Exception& ex)
+
+              if (_finalizeNode == NULL)
                 {
-                  DEBTRACE("ForEachLoop::updateStateOnFinishedEventFrom: "<<ex.what());
-                  //no way to push results : put following nodes in FAILED state
-                  //TODO could be more fine grain : put only concerned nodes in FAILED state
-                  exForwardFailed();
-                  setState(YACS::ERROR);
-                  return YACS::ABORT;
+                  // No finalize node, we just finish the loop at the end of exec nodes execution
+                  setState(YACS::DONE);
+                  return YACS::FINISH;
                 }
-            }
-        }
-      else if(_state == YACS::ACTIVATED)
-        {//more elements to do and loop still activated
-          _execIds[id]=_execCurrentId;
-          node->init(false);
-          _splitterNode.putSplittedValueOnRankTo(_execCurrentId++,id,false);
-          node->exUpdateState();
-          forwardExecStateToOriginalBody(node);
-          _nbOfEltConsumed++;
-        }
-      else
-        {//elements to process and loop no more activated
-          DEBTRACE("foreach loop state " << _state);
-        }
-      break;
-    case FINALIZE_NODE:
-    {
-      DEBTRACE("Finalize node finished on branch " << id);
-      _unfinishedCounter--;
-      _currentIndex++;
-      exUpdateProgress();
-      DEBTRACE(_unfinishedCounter << " finalize nodes still running");
-      if (_unfinishedCounter == 0)
-        {
-          _finalizeNode->setState(YACS::DONE);
-          setState(YACS::DONE);
-          return YACS::FINISH;
+              else
+                {
+                  // Run the finalize nodes, the loop will be done only when they all finish
+                  _unfinishedCounter = 0;  // This counter indicates how many branches are not finished
+                  for (int i=0 ; i<_execIds.size() ; i++)
+                    {
+                      YASSERT(_execIds[i] == NOT_RUNNING_BRANCH_ID);
+                      DEBTRACE("Launching finalize node for branch " << i);
+                      _execFinalizeNodes[i]->exUpdateState();
+                      _unfinishedCounter++;
+                    }
+                  return YACS::NOEVENT;
+                }
+          }
+          catch(YACS::Exception& ex)
+          {
+              DEBTRACE("ForEachLoop::updateStateOnFinishedEventFrom: "<<ex.what());
+              //no way to push results : put following nodes in FAILED state
+              //TODO could be more fine grain : put only concerned nodes in FAILED state
+              exForwardFailed();
+              setState(YACS::ERROR);
+              return YACS::ABORT;
+          }
         }
-      else
-        return YACS::NOEVENT;
-      break;
     }
-    default:
-      YASSERT(false);
+  else if(_state == YACS::ACTIVATED)
+    {//more elements to do and loop still activated
+      _execIds[id]=_execCurrentId;
+      node->init(false);
+      _splitterNode.putSplittedValueOnRankTo(_execCurrentId++,id,false);
+      node->exUpdateState();
+      forwardExecStateToOriginalBody(node);
+      _nbOfEltConsumed++;
+    }
+  else
+    {//elements to process and loop no more activated
+      DEBTRACE("foreach loop state " << _state);
     }
   return YACS::NOEVENT;
 }
 
+YACS::Event ForEachLoop::updateStateForFinalizeNodeOnFinishedEventFrom(Node *node, unsigned int id)
+{
+  DEBTRACE("Finalize node finished on branch " << id);
+  _unfinishedCounter--;
+  _currentIndex++;
+  exUpdateProgress();
+  DEBTRACE(_unfinishedCounter << " finalize nodes still running");
+  if (_unfinishedCounter == 0)
+    {
+      _finalizeNode->setState(YACS::DONE);
+      setState(YACS::DONE);
+      return YACS::FINISH;
+    }
+  else
+    return YACS::NOEVENT;
+}
+
 YACS::Event ForEachLoop::updateStateOnFailedEventFrom(Node *node, const Executor *execInst)
 {
-  return DynParaLoop::updateStateOnFailedEventFrom(node,execInst);
+  unsigned int id;
+  DynParaLoop::TypeOfNode ton(getIdentityOfNotifyerNode(node,id));
+  if(ton!=WORK_NODE || !execInst->getKeepGoingProperty())
+    return DynParaLoop::updateStateOnFailedEventFrom(node,execInst);
+  else
+    {
+      _failedCounter++;
+      return updateStateForWorkNodeOnFinishedEventFrom(node,id,false);
+    }
 }
 
 void ForEachLoop::buildDelegateOf(std::pair<OutPort *, OutPort *>& port, InPort *finalTarget, const std::list<ComposedNode *>& pointsOfView)
index b20f5850f6f49626f6c2549bd997ee29aefcd856..02f7c25215abf16a4f95b80bacf6e9bfec802583 100644 (file)
@@ -179,6 +179,9 @@ namespace YACS
       void checkLinkPossibility(OutPort *start, const std::list<ComposedNode *>& pointsOfViewStart,
                                 InPort *end, const std::list<ComposedNode *>& pointsOfViewEnd) throw(Exception);
       YACS::Event updateStateOnFinishedEventFrom(Node *node);
+      YACS::Event updateStateForInitNodeOnFinishedEventFrom(Node *node, unsigned int id);
+      YACS::Event updateStateForWorkNodeOnFinishedEventFrom(Node *node, unsigned int id, bool isNormalFinish);
+      YACS::Event updateStateForFinalizeNodeOnFinishedEventFrom(Node *node, unsigned int id);
       YACS::Event updateStateOnFailedEventFrom(Node *node, const Executor *execInst);
       void buildDelegateOf(std::pair<OutPort *, OutPort *>& port, InPort *finalTarget, const std::list<ComposedNode *>& pointsOfView);
       void getDelegateOf(std::pair<OutPort *, OutPort *>& port, InPort *finalTarget, const std::list<ComposedNode *>& pointsOfView) throw(Exception);