From db917af07cd02e3eaeee9007c1159e342a90162b Mon Sep 17 00:00:00 2001 From: Anthony Geay Date: Wed, 8 Apr 2015 12:03:28 +0200 Subject: [PATCH] Implementation of keep going mode of executor for ForEachLoops. --- src/engine/DynParaLoop.cxx | 5 +- src/engine/DynParaLoop.hxx | 1 + src/engine/ForEachLoop.cxx | 212 +++++++++++++++++++++---------------- src/engine/ForEachLoop.hxx | 3 + 4 files changed, 128 insertions(+), 93 deletions(-) diff --git a/src/engine/DynParaLoop.cxx b/src/engine/DynParaLoop.cxx index 2b3f76d4d..0564dabd0 100644 --- a/src/engine/DynParaLoop.cxx +++ b/src/engine/DynParaLoop.cxx @@ -44,7 +44,7 @@ const char DynParaLoop::NAME_OF_NUMBER_OF_BRANCHES[]="nbBranches"; DynParaLoop::DynParaLoop(const std::string& name, TypeCode *typeOfDataSplitted) : ComposedNode(name),_node(0),_initNode(0),_finalizeNode(0),_nbOfEltConsumed(0), _nbOfBranches(NAME_OF_NUMBER_OF_BRANCHES,this,Runtime::_tc_int), - _splittedPort(NAME_OF_SPLITTED_SEQ_OUT,this,typeOfDataSplitted) + _splittedPort(NAME_OF_SPLITTED_SEQ_OUT,this,typeOfDataSplitted),_initializingCounter(0),_unfinishedCounter(0),_failedCounter(0) { } @@ -58,7 +58,7 @@ DynParaLoop::~DynParaLoop() DynParaLoop::DynParaLoop(const DynParaLoop& other, ComposedNode *father, bool editionOnly) : ComposedNode(other,father), _nbOfBranches(other._nbOfBranches,this), _splittedPort(other._splittedPort,this), _node(0), _initNode(0), _finalizeNode(0), - _nbOfEltConsumed(0) + _nbOfEltConsumed(0),_initializingCounter(0),_unfinishedCounter(0),_failedCounter(0) { if(other._node) _node=other._node->clone(this,editionOnly); @@ -117,6 +117,7 @@ void DynParaLoop::init(bool start) _nbOfBranches.exInit(start); _splittedPort.exInit(); _nbOfEltConsumed=0; + _failedCounter=0; } Node *DynParaLoop::edSetInitNode(Node *node) diff --git a/src/engine/DynParaLoop.hxx b/src/engine/DynParaLoop.hxx index e5e476c05..3a29a872d 100644 --- a/src/engine/DynParaLoop.hxx +++ b/src/engine/DynParaLoop.hxx @@ -56,6 +56,7 @@ namespace YACS std::vector _execFinalizeNodes; int _initializingCounter; int _unfinishedCounter; + int _failedCounter; protected: static const char NAME_OF_SPLITTED_SEQ_OUT[]; static const char OLD_NAME_OF_SPLITTED_SEQ_OUT[]; diff --git a/src/engine/ForEachLoop.cxx b/src/engine/ForEachLoop.cxx index e27b6fcb5..431fee9f5 100644 --- a/src/engine/ForEachLoop.cxx +++ b/src/engine/ForEachLoop.cxx @@ -546,113 +546,143 @@ YACS::Event ForEachLoop::updateStateOnFinishedEventFrom(Node *node) switch(getIdentityOfNotifyerNode(node,id)) { case INIT_NODE: - _execNodes[id]->exUpdateState(); - _nbOfEltConsumed++; - _initializingCounter--; - if (_initializingCounter == 0) _initNode->setState(DONE); - break; + return updateStateForInitNodeOnFinishedEventFrom(node,id); case WORK_NODE: - _currentIndex++; - exUpdateProgress(); - storeOutValsInSeqForOutOfScopeUse(_execIds[id],id); - if(_execCurrentId==_splitterNode.getNumberOfElements()) - {//No more elements of _dataPortToDispatch to treat - _execIds[id]=NOT_RUNNING_BRANCH_ID; - //analyzing if some samples are still on treatment on other branches. - bool isFinished=true; - for(int i=0;i<_execIds.size() && isFinished;i++) - isFinished=(_execIds[i]==NOT_RUNNING_BRANCH_ID); - if(isFinished) - { - try + return updateStateForWorkNodeOnFinishedEventFrom(node,id,true); + case FINALIZE_NODE: + return updateStateForFinalizeNodeOnFinishedEventFrom(node,id); + default: + YASSERT(false); + } + return YACS::NOEVENT; +} + +YACS::Event ForEachLoop::updateStateForInitNodeOnFinishedEventFrom(Node *node, unsigned int id) +{ + _execNodes[id]->exUpdateState(); + _nbOfEltConsumed++; + _initializingCounter--; + if (_initializingCounter == 0) + _initNode->setState(DONE); + return YACS::NOEVENT; +} + +/*! + * \param [in] isNormalFinish - if true + */ +YACS::Event ForEachLoop::updateStateForWorkNodeOnFinishedEventFrom(Node *node, unsigned int id, bool isNormalFinish) +{ + _currentIndex++; + exUpdateProgress(); + if(isNormalFinish) + storeOutValsInSeqForOutOfScopeUse(_execIds[id],id); + if(_execCurrentId==_splitterNode.getNumberOfElements()) + {//No more elements of _dataPortToDispatch to treat + _execIds[id]=NOT_RUNNING_BRANCH_ID; + //analyzing if some samples are still on treatment on other branches. + bool isFinished(true); + for(int i=0;i<_execIds.size() && isFinished;i++) + isFinished=(_execIds[i]==NOT_RUNNING_BRANCH_ID); + if(isFinished) + { + try + { + if(_failedCounter!=0) { - pushAllSequenceValues(); - - if (_node) - { - _node->setState(YACS::DONE); - - ComposedNode* compNode = dynamic_cast(_node); - if (compNode) - { - std::list aChldn = compNode->getAllRecursiveConstituents(); - std::list::iterator iter=aChldn.begin(); - for(;iter!=aChldn.end();iter++) - (*iter)->setState(YACS::DONE); - } - } + std::ostringstream oss; oss << "Keep Going mode activated and some errors (" << _failedCounter << ")reported !"; + throw YACS::Exception(oss.str()); + } + pushAllSequenceValues(); - if (_finalizeNode == NULL) - { - // No finalize node, we just finish the loop at the end of exec nodes execution - setState(YACS::DONE); - return YACS::FINISH; - } - else + if (_node) + { + _node->setState(YACS::DONE); + + ComposedNode* compNode = dynamic_cast(_node); + if (compNode) { - // Run the finalize nodes, the loop will be done only when they all finish - _unfinishedCounter = 0; // This counter indicates how many branches are not finished - for (int i=0 ; i<_execIds.size() ; i++) - { - YASSERT(_execIds[i] == NOT_RUNNING_BRANCH_ID); - DEBTRACE("Launching finalize node for branch " << i); - _execFinalizeNodes[i]->exUpdateState(); - _unfinishedCounter++; - } - return YACS::NOEVENT; + std::list aChldn = compNode->getAllRecursiveConstituents(); + std::list::iterator iter=aChldn.begin(); + for(;iter!=aChldn.end();iter++) + (*iter)->setState(YACS::DONE); } } - catch(YACS::Exception& ex) + + if (_finalizeNode == NULL) { - DEBTRACE("ForEachLoop::updateStateOnFinishedEventFrom: "<init(false); - _splitterNode.putSplittedValueOnRankTo(_execCurrentId++,id,false); - node->exUpdateState(); - forwardExecStateToOriginalBody(node); - _nbOfEltConsumed++; - } - else - {//elements to process and loop no more activated - DEBTRACE("foreach loop state " << _state); - } - break; - case FINALIZE_NODE: - { - DEBTRACE("Finalize node finished on branch " << id); - _unfinishedCounter--; - _currentIndex++; - exUpdateProgress(); - DEBTRACE(_unfinishedCounter << " finalize nodes still running"); - if (_unfinishedCounter == 0) - { - _finalizeNode->setState(YACS::DONE); - setState(YACS::DONE); - return YACS::FINISH; + else + { + // Run the finalize nodes, the loop will be done only when they all finish + _unfinishedCounter = 0; // This counter indicates how many branches are not finished + for (int i=0 ; i<_execIds.size() ; i++) + { + YASSERT(_execIds[i] == NOT_RUNNING_BRANCH_ID); + DEBTRACE("Launching finalize node for branch " << i); + _execFinalizeNodes[i]->exUpdateState(); + _unfinishedCounter++; + } + return YACS::NOEVENT; + } + } + catch(YACS::Exception& ex) + { + DEBTRACE("ForEachLoop::updateStateOnFinishedEventFrom: "<init(false); + _splitterNode.putSplittedValueOnRankTo(_execCurrentId++,id,false); + node->exUpdateState(); + forwardExecStateToOriginalBody(node); + _nbOfEltConsumed++; + } + else + {//elements to process and loop no more activated + DEBTRACE("foreach loop state " << _state); } return YACS::NOEVENT; } +YACS::Event ForEachLoop::updateStateForFinalizeNodeOnFinishedEventFrom(Node *node, unsigned int id) +{ + DEBTRACE("Finalize node finished on branch " << id); + _unfinishedCounter--; + _currentIndex++; + exUpdateProgress(); + DEBTRACE(_unfinishedCounter << " finalize nodes still running"); + if (_unfinishedCounter == 0) + { + _finalizeNode->setState(YACS::DONE); + setState(YACS::DONE); + return YACS::FINISH; + } + else + return YACS::NOEVENT; +} + YACS::Event ForEachLoop::updateStateOnFailedEventFrom(Node *node, const Executor *execInst) { - return DynParaLoop::updateStateOnFailedEventFrom(node,execInst); + unsigned int id; + DynParaLoop::TypeOfNode ton(getIdentityOfNotifyerNode(node,id)); + if(ton!=WORK_NODE || !execInst->getKeepGoingProperty()) + return DynParaLoop::updateStateOnFailedEventFrom(node,execInst); + else + { + _failedCounter++; + return updateStateForWorkNodeOnFinishedEventFrom(node,id,false); + } } void ForEachLoop::buildDelegateOf(std::pair& port, InPort *finalTarget, const std::list& pointsOfView) diff --git a/src/engine/ForEachLoop.hxx b/src/engine/ForEachLoop.hxx index b20f5850f..02f7c2521 100644 --- a/src/engine/ForEachLoop.hxx +++ b/src/engine/ForEachLoop.hxx @@ -179,6 +179,9 @@ namespace YACS void checkLinkPossibility(OutPort *start, const std::list& pointsOfViewStart, InPort *end, const std::list& pointsOfViewEnd) throw(Exception); YACS::Event updateStateOnFinishedEventFrom(Node *node); + YACS::Event updateStateForInitNodeOnFinishedEventFrom(Node *node, unsigned int id); + YACS::Event updateStateForWorkNodeOnFinishedEventFrom(Node *node, unsigned int id, bool isNormalFinish); + YACS::Event updateStateForFinalizeNodeOnFinishedEventFrom(Node *node, unsigned int id); YACS::Event updateStateOnFailedEventFrom(Node *node, const Executor *execInst); void buildDelegateOf(std::pair& port, InPort *finalTarget, const std::list& pointsOfView); void getDelegateOf(std::pair& port, InPort *finalTarget, const std::list& pointsOfView) throw(Exception); -- 2.39.2