1 // Copyright (C) 2006-2015 CEA/DEN, EDF R&D
3 // This library is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU Lesser General Public
5 // License as published by the Free Software Foundation; either
6 // version 2.1 of the License, or (at your option) any later version.
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 // Lesser General Public License for more details.
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
20 #include "ForEachLoop.hxx"
21 #include "TypeCode.hxx"
22 #include "Visitor.hxx"
23 #include "ComposedNode.hxx"
24 #include "Executor.hxx"
25 #include "AutoLocker.hxx"
32 #include "YacsTrace.hxx"
34 using namespace YACS::ENGINE;
37 /*! \class YACS::ENGINE::ForEachLoop
38 * \brief Loop node for parametric calculation
43 const char FakeNodeForForEachLoop::NAME[]="thisIsAFakeNode";
45 const char SplitterNode::NAME_OF_SEQUENCE_INPUT[]="SmplsCollection";
47 const char ForEachLoop::NAME_OF_SPLITTERNODE[]="splitter";
49 const int ForEachLoop::NOT_RUNNING_BRANCH_ID=-1;
51 InterceptorInputPort::InterceptorInputPort(const std::string& name, Node *node, TypeCode* type):AnyInputPort(name,node,type),
52 DataPort(name,node,type),Port(node),
57 InterceptorInputPort::InterceptorInputPort(const InterceptorInputPort& other, Node *newHelder):AnyInputPort(other,newHelder),DataPort(other,newHelder),
58 Port(other,newHelder),
63 void InterceptorInputPort::getAllRepresentants(std::set<InPort *>& repr) const
65 set<InPort *> ports=_repr->edSetInPort();
66 for(set<InPort *>::iterator iter=ports.begin();iter!=ports.end();iter++)
67 (*iter)->getAllRepresentants(repr);
70 InputPort *InterceptorInputPort::clone(Node *newHelder) const
72 return new InterceptorInputPort(*this,newHelder);
75 void InterceptorInputPort::setRepr(AnySplitOutputPort *repr)
80 bool AnySplitOutputPort::decrRef()
85 void AnySplitOutputPort::incrRef() const
90 AnySplitOutputPort::AnySplitOutputPort(const std::string& name, Node *node, TypeCode *type):OutputPort(name,node,type),
91 DataPort(name,node,type),Port(node),
92 _repr(0),_intercptr(0),_cnt(1)
96 AnySplitOutputPort::AnySplitOutputPort(const AnySplitOutputPort& other, Node *newHelder):OutputPort(other,newHelder),
97 DataPort(other,newHelder),
98 Port(other,newHelder),
99 _repr(0),_intercptr(0),_cnt(1)
103 bool AnySplitOutputPort::addInPort(InPort *inPort) throw(YACS::Exception)
105 bool ret=OutputPort::addInPort(inPort);
107 _repr->addInPort(_intercptr);
111 void AnySplitOutputPort::getAllRepresented(std::set<OutPort *>& represented) const
114 OutPort::getAllRepresented(represented);
116 _repr->getAllRepresented(represented);
119 int AnySplitOutputPort::removeInPort(InPort *inPort, bool forward) throw(YACS::Exception)
121 bool ret=OutputPort::removeInPort(inPort,forward);
123 if(_setOfInputPort.empty())
124 _repr->removeInPort(_intercptr,forward);
128 void AnySplitOutputPort::addRepr(OutPort *repr, InterceptorInputPort *intercptr)
131 _intercptr=intercptr;
134 OutputPort *AnySplitOutputPort::clone(Node *newHelder) const
136 return new AnySplitOutputPort(*this,newHelder);
139 SeqAnyInputPort::SeqAnyInputPort(const std::string& name, Node *node, TypeCodeSeq* type):AnyInputPort(name,node,type),DataPort(name,node,type),Port(node)
144 SeqAnyInputPort::SeqAnyInputPort(const SeqAnyInputPort& other, Node *newHelder):AnyInputPort(other,newHelder),DataPort(other,newHelder),Port(other,newHelder)
148 InputPort *SeqAnyInputPort::clone(Node *newHelder) const
150 return new SeqAnyInputPort(*this,newHelder);
153 unsigned SeqAnyInputPort::getNumberOfElements() const
155 const SequenceAny * valCsted=(const SequenceAny *) _value;
156 if (valCsted) return valCsted->size();
160 Any *SeqAnyInputPort::getValueAtRank(int i) const
162 const SequenceAny * valCsted=(const SequenceAny *) _value;
163 AnyPtr ret=(*valCsted)[i];
168 std::string SeqAnyInputPort::dump()
170 stringstream xmldump;
171 int nbElem = getNumberOfElements();
172 xmldump << "<value><array><data>" << endl;
173 for (int i = 0; i < nbElem; i++)
175 Any *val = getValueAtRank(i);
176 switch (val->getType()->kind())
179 xmldump << "<value><double>" << setprecision(16) << val->getDoubleValue() << "</double></value>" << endl;
182 xmldump << "<value><int>" << val->getIntValue() << "</int></value>" << endl;
185 xmldump << "<value><boolean>" << val->getBoolValue() << "</boolean></value>" << endl;
188 xmldump << "<value><string>" << val->getStringValue() << "</string></value>" << endl;
191 xmldump << "<value><objref>" << val->getStringValue() << "</objref></value>" << endl;
194 xmldump << "<value><error> NO_SERIALISATION_AVAILABLE </error></value>" << endl;
198 xmldump << "</data></array></value>" << endl;
199 return xmldump.str();
202 SplitterNode::SplitterNode(const std::string& name, TypeCode *typeOfData,
203 ForEachLoop *father):ElementaryNode(name),
204 _dataPortToDispatch(NAME_OF_SEQUENCE_INPUT,
205 this,(TypeCodeSeq *)TypeCode::sequenceTc("","",typeOfData))
210 SplitterNode::SplitterNode(const SplitterNode& other, ForEachLoop *father):ElementaryNode(other,father),
211 _dataPortToDispatch(other._dataPortToDispatch,this)
215 InputPort *SplitterNode::getInputPort(const std::string& name) const throw(YACS::Exception)
217 if(name==NAME_OF_SEQUENCE_INPUT)
218 return (InputPort *)&_dataPortToDispatch;
220 return ElementaryNode::getInputPort(name);
223 Node *SplitterNode::simpleClone(ComposedNode *father, bool editionOnly) const
225 return new SplitterNode(*this,(ForEachLoop *)father);
228 unsigned SplitterNode::getNumberOfElements() const
230 return _dataPortToDispatch.getNumberOfElements();
233 void SplitterNode::execute()
235 //Nothing : should never been called elsewhere big problem...
238 void SplitterNode::init(bool start)
240 ElementaryNode::init(start);
241 _dataPortToDispatch.exInit(start);
244 void SplitterNode::putSplittedValueOnRankTo(int rankInSeq, int branch, bool first)
246 Any *valueToDispatch=_dataPortToDispatch.getValueAtRank(rankInSeq);
247 ForEachLoop *fatherTyped=(ForEachLoop *)_father;
248 fatherTyped->putValueOnBranch(valueToDispatch,branch,first);
249 valueToDispatch->decrRef();
252 FakeNodeForForEachLoop::FakeNodeForForEachLoop(ForEachLoop *loop, bool normalFinish):ElementaryNode(NAME),
254 _normalFinish(normalFinish)
256 _state=YACS::TOACTIVATE;
257 _father=_loop->getFather();
260 FakeNodeForForEachLoop::FakeNodeForForEachLoop(const FakeNodeForForEachLoop& other):ElementaryNode(other),_loop(0),
265 Node *FakeNodeForForEachLoop::simpleClone(ComposedNode *father, bool editionOnly) const
267 return new FakeNodeForForEachLoop(*this);
270 void FakeNodeForForEachLoop::exForwardFailed()
272 _loop->exForwardFailed();
275 void FakeNodeForForEachLoop::exForwardFinished()
277 _loop->exForwardFinished();
280 void FakeNodeForForEachLoop::execute()
283 throw Exception("");//only to trigger ABORT on Executor
285 _loop->pushAllSequenceValues();
288 void FakeNodeForForEachLoop::aborted()
290 _loop->setState(YACS::ERROR);
293 void FakeNodeForForEachLoop::finished()
295 _loop->setState(YACS::DONE);
298 ForEachLoopPassedData::ForEachLoopPassedData(const std::vector<unsigned int>& passedIds, const std::vector<SequenceAny *>& passedOutputs, const std::vector<std::string>& nameOfOutputs):_passedIds(passedIds),_passedOutputs(passedOutputs),_nameOfOutputs(nameOfOutputs)
300 std::size_t sz(_passedIds.size()),sz1(passedOutputs.size()),sz2(nameOfOutputs.size());
302 throw YACS::Exception("ForEachLoopPassedData::ForEachLoopPassedData : nameOfOutputs and passedOutputs must have the same size !");
303 for(std::vector<SequenceAny *>::iterator it=_passedOutputs.begin();it!=_passedOutputs.end();it++)
305 const SequenceAny *elt(*it);
307 if(sz!=(std::size_t)elt->size())
308 throw YACS::Exception("ForEachLoopPassedData::ForEachLoopPassedData : incoherent input of passed data !");
310 for(std::vector<SequenceAny *>::iterator it=_passedOutputs.begin();it!=_passedOutputs.end();it++)
312 SequenceAny *elt(*it);
318 ForEachLoopPassedData::~ForEachLoopPassedData()
320 for(std::vector<SequenceAny *>::iterator it=_passedOutputs.begin();it!=_passedOutputs.end();it++)
322 SequenceAny *elt(*it);
328 void ForEachLoopPassedData::init()
333 void ForEachLoopPassedData::checkCompatibilyWithNb(int nbOfElts) const
336 throw YACS::Exception("ForEachLoopPassedData::checkCompatibilyWithNb : nb of elts is expected to be > 0 !");
337 std::size_t sizeExp(_passedIds.size()),nbOfElts2(nbOfElts);
338 if(nbOfElts2<sizeExp)
339 throw YACS::Exception("ForEachLoopPassedData::checkCompatibilyWithNb : Invalid nb of elemts in input seq regarding passed data set !");
340 for(std::vector<unsigned int>::const_iterator it=_passedIds.begin();it!=_passedIds.end();it++)
343 throw YACS::Exception("ForEachLoopPassedData::checkCompatibilyWithNb : Invalid nb of elemts in input seq regarding passed data set 2 !");
345 _flagsIds.resize(nbOfElts);
346 std::fill(_flagsIds.begin(),_flagsIds.end(),false);
347 for(std::vector<unsigned int>::const_iterator it=_passedIds.begin();it!=_passedIds.end();it++)
355 std::ostringstream oss; oss << "ForEachLoopPassedData::checkCompatibilyWithNb : id " << *it << " in list of ids appears more than once !";
356 throw YACS::Exception(oss.str());
361 std::ostringstream oss; oss << "ForEachLoopPassedData::checkCompatibilyWithNb : Presence of id " << *it << " in list of ids ! Must be in [0," << nbOfElts << ") !";
362 throw YACS::Exception(oss.str());
367 void ForEachLoopPassedData::checkLevel2(const std::vector<AnyInputPort *>& ports) const
369 std::size_t sz(_nameOfOutputs.size());
371 throw YACS::Exception("ForEachLoopPassedData::checkLevel2 : mismatch of size of vectors !");
372 for(std::size_t i=0;i<sz;i++)
374 AnyInputPort *elt(ports[i]);
376 throw YACS::Exception("ForEachLoopPassedData::checkLevel2 : presence of null instance !");
377 if(_nameOfOutputs[i]!=elt->getName())
379 std::ostringstream oss; oss << "ForEachLoopPassedData::checkLevel2 : At pos #" << i << " the name is not OK !";
380 throw YACS::Exception(oss.str());
386 * performs local to abs id. Input \a localId refers to an id in all jobs to perform. Returned id refers to pos in whole output sequences.
388 int ForEachLoopPassedData::toAbsId(int localId) const
391 throw YACS::Exception("ForEachLoopPassedData::toAbsId : local pos must be >= 0 !");
392 int ret(0),curLocId(0);
393 for(std::vector<bool>::const_iterator it=_flagsIds.begin();it!=_flagsIds.end();it++,ret++)
397 if(localId==curLocId)
402 throw YACS::Exception("ForEachLoopPassedData::toAbsId : not referenced Id !");
406 * Equivalent to toAbsId except that only ON are considered here.
408 int ForEachLoopPassedData::toAbsIdNot(int localId) const
411 throw YACS::Exception("ForEachLoopPassedData::toAbsId : local pos must be >= 0 !");
412 int ret(0),curLocId(0);
413 for(std::vector<bool>::const_iterator it=_flagsIds.begin();it!=_flagsIds.end();it++,ret++)
415 if(*it)//<- diff is here !
417 if(localId==curLocId)
422 throw YACS::Exception("ForEachLoopPassedData::toAbsId : not referenced Id !");
425 int ForEachLoopPassedData::getNumberOfElementsToDo() const
427 std::size_t nbAllElts(_flagsIds.size());
428 std::size_t ret(nbAllElts-_passedIds.size());
432 void ForEachLoopPassedData::assignAlreadyDone(const std::vector<SequenceAny *>& execVals) const
434 std::size_t sz(execVals.size());
435 if(_passedOutputs.size()!=sz)
436 throw YACS::Exception("ForEachLoopPassedData::assignedAlreadyDone : mismatch of size of vectors !");
437 for(std::size_t i=0;i<sz;i++)
439 SequenceAny *elt(_passedOutputs[i]);
440 SequenceAny *eltDestination(execVals[i]);
442 throw YACS::Exception("ForEachLoopPassedData::assignedAlreadyDone : presence of null elt !");
443 unsigned int szOfElt(elt->size());
444 for(unsigned int j=0;j<szOfElt;j++)
446 AnyPtr elt1((*elt)[j]);
447 int jAbs(toAbsIdNot(j));
448 eltDestination->setEltAtRank(jAbs,elt1);
453 ForEachLoop::ForEachLoop(const std::string& name, TypeCode *typeOfDataSplitted):DynParaLoop(name,typeOfDataSplitted),
454 _splitterNode(NAME_OF_SPLITTERNODE,typeOfDataSplitted,this),
455 _execCurrentId(0),_nodeForSpecialCases(0),_currentIndex(0),_passedData(0)
459 ForEachLoop::ForEachLoop(const ForEachLoop& other, ComposedNode *father, bool editionOnly):DynParaLoop(other,father,editionOnly),
460 _splitterNode(other._splitterNode,this),
461 _execCurrentId(0),_nodeForSpecialCases(0),_currentIndex(0),_passedData(0)
465 for(vector<AnySplitOutputPort *>::const_iterator iter2=other._outGoingPorts.begin();iter2!=other._outGoingPorts.end();iter2++,i++)
467 AnySplitOutputPort *temp=new AnySplitOutputPort(*(*iter2),this);
468 InterceptorInputPort *interc=new InterceptorInputPort(*other._intecptrsForOutGoingPorts[i],this);
469 temp->addRepr(getOutPort((*iter2)->getName()),interc);
470 interc->setRepr(temp);
471 _outGoingPorts.push_back(temp);
472 _intecptrsForOutGoingPorts.push_back(interc);
476 Node *ForEachLoop::simpleClone(ComposedNode *father, bool editionOnly) const
478 return new ForEachLoop(*this,father,editionOnly);
481 ForEachLoop::~ForEachLoop()
484 for(vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();iter!=_outGoingPorts.end();iter++)
486 for(vector<InterceptorInputPort *>::iterator iter2=_intecptrsForOutGoingPorts.begin();iter2!=_intecptrsForOutGoingPorts.end();iter2++)
491 void ForEachLoop::init(bool start)
493 DynParaLoop::init(start);
494 _splitterNode.init(start);
503 void ForEachLoop::exUpdateState()
505 DEBTRACE("ForEachLoop::exUpdateState");
506 if(_state == YACS::DISABLED)
508 if(_inGate.exIsReady())
510 //internal graph update
512 int nbOfBr(_nbOfBranches.getIntValue()),nbOfElts(_splitterNode.getNumberOfElements()),nbOfEltsDone(0);
515 _passedData->checkCompatibilyWithNb(nbOfElts);
516 nbOfEltsDone=_passedData->getNumberOfEltsAlreadyDone();
518 int nbOfEltsToDo(nbOfElts-nbOfEltsDone);
520 DEBTRACE("nbOfElts=" << nbOfElts);
521 DEBTRACE("nbOfBr=" << nbOfBr);
525 prepareSequenceValues(0);
526 delete _nodeForSpecialCases;
527 _nodeForSpecialCases=new FakeNodeForForEachLoop(this,true);
528 setState(YACS::ACTIVATED);
533 delete _nodeForSpecialCases;
534 _nodeForSpecialCases=new FakeNodeForForEachLoop(this,getAllOutPortsLeavingCurrentScope().empty());
535 setState(YACS::ACTIVATED);
538 if(nbOfBr>nbOfEltsToDo)
540 _execNodes.resize(nbOfBr);
541 _execIds.resize(nbOfBr);
542 _execOutGoingPorts.resize(nbOfBr);
543 prepareSequenceValues(nbOfElts);
545 _execInitNodes.resize(nbOfBr);
546 _initializingCounter = 0;
548 _execFinalizeNodes.resize(nbOfBr);
550 vector<Node *> origNodes;
551 origNodes.push_back(_initNode);
552 origNodes.push_back(_node);
553 origNodes.push_back(_finalizeNode);
555 //Conversion exceptions can be thrown by createOutputOutOfScopeInterceptors
556 //so catch them to control errors
559 for(i=0;i<nbOfBr;i++)
561 DEBTRACE( "-------------- 2" );
562 vector<Node *> clonedNodes = cloneAndPlaceNodesCoherently(origNodes);
564 _execInitNodes[i] = clonedNodes[0];
565 _execNodes[i] = clonedNodes[1];
567 _execFinalizeNodes[i] = clonedNodes[2];
568 DEBTRACE( "-------------- 4" );
569 prepareInputsFromOutOfScope(i);
570 DEBTRACE( "-------------- 5" );
571 createOutputOutOfScopeInterceptors(i);
572 DEBTRACE( "-------------- 6" );
574 for(i=0;i<nbOfBr;i++)
576 DEBTRACE( "-------------- 1 " << i << " " << _execCurrentId);
577 _execIds[i]=_execCurrentId;
578 int posInAbs(_execCurrentId);
580 posInAbs=_passedData->toAbsId(_execCurrentId);
581 _splitterNode.putSplittedValueOnRankTo(posInAbs,i,true);
583 DEBTRACE( "-------------- 7" );
587 _passedData->checkLevel2(_execOutGoingPorts[0]);
588 _passedData->assignAlreadyDone(_execVals);
591 catch(YACS::Exception& ex)
593 //ForEachLoop must be put in error and the exception rethrown to notify the caller
594 DEBTRACE( "ForEachLoop::exUpdateState: " << ex.what() );
595 setState(YACS::ERROR);
600 setState(YACS::ACTIVATED); // move the calling of setState method there for adding observers for clone nodes in GUI part
603 for(i=0;i<nbOfBr;i++)
606 _execInitNodes[i]->exUpdateState();
607 _initializingCounter++;
612 _execNodes[i]->exUpdateState();
615 forwardExecStateToOriginalBody(_execNodes[nbOfBr-1]);
619 void ForEachLoop::exUpdateProgress()
621 // emit notification to all observers registered with the dispatcher on any change of the node's state
622 sendEvent("progress");
625 void ForEachLoop::getReadyTasks(std::vector<Task *>& tasks)
629 if(_state==YACS::TOACTIVATE) setState(YACS::ACTIVATED);
630 if(_state==YACS::TOACTIVATE || _state==YACS::ACTIVATED)
632 if(_nodeForSpecialCases)
634 _nodeForSpecialCases->getReadyTasks(tasks);
637 vector<Node *>::iterator iter;
638 for (iter=_execNodes.begin() ; iter!=_execNodes.end() ; iter++)
639 (*iter)->getReadyTasks(tasks);
640 for (iter=_execInitNodes.begin() ; iter!=_execInitNodes.end() ; iter++)
641 (*iter)->getReadyTasks(tasks);
642 for (iter=_execFinalizeNodes.begin() ; iter!=_execFinalizeNodes.end() ; iter++)
643 (*iter)->getReadyTasks(tasks);
647 int ForEachLoop::getNumberOfInputPorts() const
649 return DynParaLoop::getNumberOfInputPorts()+1;
652 void ForEachLoop::checkNoCyclePassingThrough(Node *node) throw(YACS::Exception)
657 void ForEachLoop::selectRunnableTasks(std::vector<Task *>& tasks)
661 std::list<InputPort *> ForEachLoop::getSetOfInputPort() const
663 list<InputPort *> ret=DynParaLoop::getSetOfInputPort();
664 ret.push_back((InputPort *)&_splitterNode._dataPortToDispatch);
668 std::list<InputPort *> ForEachLoop::getLocalInputPorts() const
670 list<InputPort *> ret=DynParaLoop::getLocalInputPorts();
671 ret.push_back((InputPort *)&_splitterNode._dataPortToDispatch);
675 InputPort *ForEachLoop::getInputPort(const std::string& name) const throw(YACS::Exception)
677 if(name==SplitterNode::NAME_OF_SEQUENCE_INPUT)
678 return (InputPort *)&_splitterNode._dataPortToDispatch;
680 return DynParaLoop::getInputPort(name);
683 OutputPort *ForEachLoop::getOutputPort(const std::string& name) const throw(YACS::Exception)
685 for(vector<AnySplitOutputPort *>::const_iterator iter=_outGoingPorts.begin();iter!=_outGoingPorts.end();iter++)
687 if(name==(*iter)->getName())
688 return (OutputPort *)(*iter);
690 return DynParaLoop::getOutputPort(name);
693 OutPort *ForEachLoop::getOutPort(const std::string& name) const throw(YACS::Exception)
695 for(vector<AnySplitOutputPort *>::const_iterator iter=_outGoingPorts.begin();iter!=_outGoingPorts.end();iter++)
697 if(name==(*iter)->getName())
698 return (OutPort *)(*iter);
700 return DynParaLoop::getOutPort(name);
703 Node *ForEachLoop::getChildByShortName(const std::string& name) const throw(YACS::Exception)
705 if(name==NAME_OF_SPLITTERNODE)
706 return (Node *)&_splitterNode;
708 return DynParaLoop::getChildByShortName(name);
711 //! Method used to notify the node that a child node has finished
713 * Update the current state and return the change state
715 * \param node : the child node that has finished
716 * \return the state change
718 YACS::Event ForEachLoop::updateStateOnFinishedEventFrom(Node *node)
720 DEBTRACE("updateStateOnFinishedEventFrom " << node->getName() << " " << node->getState());
722 switch(getIdentityOfNotifyerNode(node,id))
725 return updateStateForInitNodeOnFinishedEventFrom(node,id);
727 return updateStateForWorkNodeOnFinishedEventFrom(node,id,true);
729 return updateStateForFinalizeNodeOnFinishedEventFrom(node,id);
733 return YACS::NOEVENT;
736 YACS::Event ForEachLoop::updateStateForInitNodeOnFinishedEventFrom(Node *node, unsigned int id)
738 _execNodes[id]->exUpdateState();
740 _initializingCounter--;
741 if (_initializingCounter == 0)
742 _initNode->setState(DONE);
743 return YACS::NOEVENT;
747 * \param [in] isNormalFinish - if true
749 YACS::Event ForEachLoop::updateStateForWorkNodeOnFinishedEventFrom(Node *node, unsigned int id, bool isNormalFinish)
755 int globalId(_execIds[id]);
757 globalId=_passedData->toAbsId(globalId);
758 storeOutValsInSeqForOutOfScopeUse(globalId,id);
761 if(_execCurrentId==getFinishedId())
762 {//No more elements of _dataPortToDispatch to treat
763 _execIds[id]=NOT_RUNNING_BRANCH_ID;
764 //analyzing if some samples are still on treatment on other branches.
765 bool isFinished(true);
766 for(int i=0;i<_execIds.size() && isFinished;i++)
767 isFinished=(_execIds[i]==NOT_RUNNING_BRANCH_ID);
772 if(_failedCounter!=0)
773 {// case of keepgoing mode + a failed
774 std::ostringstream oss; oss << "Keep Going mode activated and some errors (" << _failedCounter << ")reported !";
775 DEBTRACE("ForEachLoop::updateStateOnFinishedEventFrom : "<< oss.str());
776 setState(YACS::FAILED);
779 pushAllSequenceValues();
783 _node->setState(YACS::DONE);
785 ComposedNode* compNode = dynamic_cast<ComposedNode*>(_node);
788 std::list<Node *> aChldn = compNode->getAllRecursiveConstituents();
789 std::list<Node *>::iterator iter=aChldn.begin();
790 for(;iter!=aChldn.end();iter++)
791 (*iter)->setState(YACS::DONE);
795 if (_finalizeNode == NULL)
797 // No finalize node, we just finish the loop at the end of exec nodes execution
798 setState(YACS::DONE);
803 // Run the finalize nodes, the loop will be done only when they all finish
804 _unfinishedCounter = 0; // This counter indicates how many branches are not finished
805 for (int i=0 ; i<_execIds.size() ; i++)
807 YASSERT(_execIds[i] == NOT_RUNNING_BRANCH_ID);
808 DEBTRACE("Launching finalize node for branch " << i);
809 _execFinalizeNodes[i]->exUpdateState();
810 _unfinishedCounter++;
812 return YACS::NOEVENT;
815 catch(YACS::Exception& ex)
817 DEBTRACE("ForEachLoop::updateStateOnFinishedEventFrom: "<<ex.what());
818 //no way to push results : put following nodes in FAILED state
819 //TODO could be more fine grain : put only concerned nodes in FAILED state
821 setState(YACS::ERROR);
826 else if(_state == YACS::ACTIVATED)
827 {//more elements to do and loop still activated
828 _execIds[id]=_execCurrentId;
830 int posInAbs(_execCurrentId);
832 posInAbs=_passedData->toAbsId(_execCurrentId);
833 _splitterNode.putSplittedValueOnRankTo(posInAbs,id,false);
835 node->exUpdateState();
836 forwardExecStateToOriginalBody(node);
840 {//elements to process and loop no more activated
841 DEBTRACE("foreach loop state " << _state);
843 return YACS::NOEVENT;
846 YACS::Event ForEachLoop::updateStateForFinalizeNodeOnFinishedEventFrom(Node *node, unsigned int id)
848 DEBTRACE("Finalize node finished on branch " << id);
849 _unfinishedCounter--;
852 DEBTRACE(_unfinishedCounter << " finalize nodes still running");
853 if (_unfinishedCounter == 0)
855 _finalizeNode->setState(YACS::DONE);
856 setState(YACS::DONE);
860 return YACS::NOEVENT;
863 YACS::Event ForEachLoop::updateStateOnFailedEventFrom(Node *node, const Executor *execInst)
866 DynParaLoop::TypeOfNode ton(getIdentityOfNotifyerNode(node,id));
867 if(ton!=WORK_NODE || !execInst->getKeepGoingProperty())
868 return DynParaLoop::updateStateOnFailedEventFrom(node,execInst);
872 return updateStateForWorkNodeOnFinishedEventFrom(node,id,false);
876 void ForEachLoop::buildDelegateOf(std::pair<OutPort *, OutPort *>& port, InPort *finalTarget, const std::list<ComposedNode *>& pointsOfView)
878 DynParaLoop::buildDelegateOf(port,finalTarget,pointsOfView);
879 string typeOfPortInstance=(port.first)->getNameOfTypeOfCurrentInstance();
880 if(typeOfPortInstance==OutputPort::NAME)
882 vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
884 for(;iter!=_outGoingPorts.end();iter++,i++)
885 if((*iter)->getRepr()==port.first || *iter==port.first)
887 if(iter!=_outGoingPorts.end())
889 if(*iter!=port.first)
892 (*iter)->addRepr(port.first,_intecptrsForOutGoingPorts[i]);
898 TypeCodeSeq *newTc=(TypeCodeSeq *)TypeCode::sequenceTc("","",port.first->edGetType());
899 AnySplitOutputPort *newPort=new AnySplitOutputPort(getPortName(port.first),this,newTc);
900 InterceptorInputPort *intercptor=new InterceptorInputPort(string("intercptr for ")+getPortName(port.first),this,port.first->edGetType());
901 intercptor->setRepr(newPort);
903 newPort->addRepr(port.first,intercptor);
904 _outGoingPorts.push_back(newPort);
905 _intecptrsForOutGoingPorts.push_back(intercptor);
910 throw Exception("ForEachLoop::buildDelegateOf : not implemented for DS because not specified");
913 void ForEachLoop::getDelegateOf(std::pair<OutPort *, OutPort *>& port, InPort *finalTarget, const std::list<ComposedNode *>& pointsOfView) throw(YACS::Exception)
915 string typeOfPortInstance=(port.first)->getNameOfTypeOfCurrentInstance();
916 if(typeOfPortInstance==OutputPort::NAME)
918 vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
919 for(;iter!=_outGoingPorts.end();iter++)
920 if((*iter)->getRepr()==port.first)
922 if(iter==_outGoingPorts.end())
924 string what("ForEachLoop::getDelegateOf : Port with name "); what+=port.first->getName(); what+=" not exported by ForEachLoop "; what+=_name;
925 throw Exception(what);
931 throw Exception("ForEachLoop::getDelegateOf : not implemented because not specified");
934 void ForEachLoop::releaseDelegateOf(OutPort *portDwn, OutPort *portUp, InPort *finalTarget, const std::list<ComposedNode *>& pointsOfView) throw(YACS::Exception)
936 string typeOfPortInstance=portDwn->getNameOfTypeOfCurrentInstance();
937 if(typeOfPortInstance==OutputPort::NAME)
939 vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
940 vector<InterceptorInputPort *>::iterator iter2=_intecptrsForOutGoingPorts.begin();
941 for(;iter!=_outGoingPorts.end();iter++,iter2++)
942 if((*iter)->getRepr()==portDwn)
944 //ASSERT(portUp==*iter.second)
945 if((*iter)->decrRef())
947 AnySplitOutputPort *p=*iter;
948 _outGoingPorts.erase(iter);
950 InterceptorInputPort *ip=*iter2;
951 _intecptrsForOutGoingPorts.erase(iter2);
957 OutPort *ForEachLoop::getDynOutPortByAbsName(int branchNb, const std::string& name)
959 string portName, nodeName;
960 splitNamesBySep(name,Node::SEP_CHAR_IN_PORT,nodeName,portName,false);
961 Node *staticChild = getChildByName(nodeName);
962 return _execNodes[branchNb]->getOutPort(portName);//It's impossible(garanteed by YACS::ENGINE::ForEachLoop::buildDelegateOf)
963 //that a link starting from _initNode goes out of scope of 'this'.
966 void ForEachLoop::cleanDynGraph()
968 DynParaLoop::cleanDynGraph();
969 for(vector< SequenceAny *>::iterator iter3=_execVals.begin();iter3!=_execVals.end();iter3++)
972 for(vector< vector<AnyInputPort *> >::iterator iter4=_execOutGoingPorts.begin();iter4!=_execOutGoingPorts.end();iter4++)
973 for(vector<AnyInputPort *>::iterator iter5=(*iter4).begin();iter5!=(*iter4).end();iter5++)
975 _execOutGoingPorts.clear();
978 void ForEachLoop::storeOutValsInSeqForOutOfScopeUse(int rank, int branchNb)
980 vector<AnyInputPort *>::iterator iter;
982 for(iter=_execOutGoingPorts[branchNb].begin();iter!=_execOutGoingPorts[branchNb].end();iter++,i++)
984 Any *val=(Any *)(*iter)->getValue();
985 _execVals[i]->setEltAtRank(rank,val);
989 int ForEachLoop::getFinishedId()
992 return _splitterNode.getNumberOfElements();
994 return _passedData->getNumberOfElementsToDo();
997 void ForEachLoop::prepareSequenceValues(int sizeOfSamples)
999 _execVals.resize(_outGoingPorts.size());
1000 vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
1001 for(int i=0;iter!=_outGoingPorts.end();iter++,i++)
1002 _execVals[i]=SequenceAny::New((*iter)->edGetType()->contentType(),sizeOfSamples);
1005 void ForEachLoop::pushAllSequenceValues()
1007 vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
1009 for(;iter!=_outGoingPorts.end();iter++,i++)
1010 (*iter)->put((const void *)_execVals[i]);
1013 void ForEachLoop::createOutputOutOfScopeInterceptors(int branchNb)
1015 vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
1017 for(;iter!=_outGoingPorts.end();iter++,i++)
1019 DEBTRACE( (*iter)->getName() << " " << (*iter)->edGetType()->kind() );
1020 //AnyInputPort *interceptor=new AnyInputPort((*iter)->getName(),this,(*iter)->edGetType());
1021 OutPort *portOut=getDynOutPortByAbsName(branchNb,getOutPortName(((*iter)->getRepr())));
1022 DEBTRACE( portOut->getName() );
1023 AnyInputPort *interceptor=new AnyInputPort((*iter)->getName(),this,portOut->edGetType());
1024 portOut->addInPort(interceptor);
1025 _execOutGoingPorts[branchNb].push_back(interceptor);
1029 void ForEachLoop::checkLinkPossibility(OutPort *start, const std::list<ComposedNode *>& pointsOfViewStart,
1030 InPort *end, const std::list<ComposedNode *>& pointsOfViewEnd) throw(YACS::Exception)
1032 if(isInMyDescendance(start->getNode())==_node)
1033 throw Exception("ForEachLoop::checkLinkPossibility : A link from work node to init node not permitted");
1036 std::list<OutputPort *> ForEachLoop::getLocalOutputPorts() const
1038 list<OutputPort *> ret;
1039 ret.push_back(getOutputPort(NAME_OF_SPLITTED_SEQ_OUT));
1043 void ForEachLoop::accept(Visitor *visitor)
1045 visitor->visitForEachLoop(this);
1048 //! Dump the node state to a stream
1050 * \param os : the output stream
1052 void ForEachLoop::writeDot(std::ostream &os) const
1054 os << " subgraph cluster_" << getId() << " {\n" ;
1055 //only one node in a loop
1058 _node->writeDot(os);
1059 os << getId() << " -> " << _node->getId() << ";\n";
1062 os << getId() << "[fillcolor=\"" ;
1063 YACS::StatesForNode state=getEffectiveState();
1064 os << getColorState(state);
1065 os << "\" label=\"" << "Loop:" ;
1066 os << getName() <<"\"];\n";
1069 //! Reset the state of the node and its children depending on the parameter level
1070 void ForEachLoop::resetState(int level)
1073 DynParaLoop::resetState(level);
1075 //Note: cleanDynGraph is not a virtual method (must be called from ForEachLoop object)
1079 std::string ForEachLoop::getProgress() const
1081 int nbElems = _splitterNode.getNumberOfElements();
1082 std::stringstream aProgress;
1084 aProgress << _currentIndex << "/" << nbElems;
1087 return aProgress.str();
1091 * This method allows to retrieve the state of \a this during execution or after. This method works even if this is \b NOT complete, or during execution or after a failure in \a this.
1092 * The typical usage of this method is to retrieve the results of items that passed successfully to avoid to lose all of them if only one fails.
1093 * This method has one input \a execut and 3 outputs.
1095 * \param [in] execut - The single input is for threadsafety reasons because this method can be called safely during the execution of \a this.
1096 * \param [out] outputs - For each output ports in \a this linked with nodes sharing the same father than \a this the passed results are stored.
1097 * All of the items in \a outputs have the same size.
1098 * \param [out] nameOfOutputs - The array with same size than \a outputs, that tells for each item in outputs the output port it refers to.
1099 * \return the list of ids among \c this->edGetSeqOfSamplesPort() that run successfully. The length of this returned array will be the length of all
1100 * SequenceAny objects contained in \a outputs.
1102 * \sa edGetSeqOfSamplesPort
1104 std::vector<unsigned int> ForEachLoop::getPassedResults(Executor *execut, std::vector<SequenceAny *>& outputs, std::vector<std::string>& nameOfOutputs) const
1106 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&(execut->getTheMutexForSchedulerUpdate()));
1107 if(_execVals.empty())
1108 return std::vector<unsigned int>();
1109 if(_execOutGoingPorts.empty())
1110 return std::vector<unsigned int>();
1111 std::size_t sz(_execVals.size()); outputs.resize(sz); nameOfOutputs.resize(sz);
1112 const std::vector<AnyInputPort *>& ports(_execOutGoingPorts[0]);
1113 for(std::size_t i=0;i<sz;i++)
1115 outputs[i]=_execVals[i]->removeUnsetItemsFromThis();
1116 nameOfOutputs[i]=ports[i]->getName();
1118 return _execVals[0]->getSetItems();
1122 * This method is typically useful for post-mortem relaunch to avoid to recompute already passed cases. This method takes in input exactly the parameters retrieved by
1123 * getPassedResults method.
1125 void ForEachLoop::assignPassedResults(const std::vector<unsigned int>& passedIds, const std::vector<SequenceAny *>& passedOutputs, const std::vector<std::string>& nameOfOutputs)
1128 _passedData=new ForEachLoopPassedData(passedIds,passedOutputs,nameOfOutputs);