1 // Copyright (C) 2006-2015 CEA/DEN, EDF R&D
3 // This library is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU Lesser General Public
5 // License as published by the Free Software Foundation; either
6 // version 2.1 of the License, or (at your option) any later version.
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 // Lesser General Public License for more details.
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
20 #include "ForEachLoop.hxx"
21 #include "TypeCode.hxx"
22 #include "Visitor.hxx"
23 #include "ComposedNode.hxx"
24 #include "Executor.hxx"
25 #include "AutoLocker.hxx"
31 #include "YacsTrace.hxx"
33 using namespace YACS::ENGINE;
36 /*! \class YACS::ENGINE::ForEachLoop
37 * \brief Loop node for parametric calculation
42 const char FakeNodeForForEachLoop::NAME[]="thisIsAFakeNode";
44 const char SplitterNode::NAME_OF_SEQUENCE_INPUT[]="SmplsCollection";
46 const char ForEachLoop::NAME_OF_SPLITTERNODE[]="splitter";
48 const int ForEachLoop::NOT_RUNNING_BRANCH_ID=-1;
50 InterceptorInputPort::InterceptorInputPort(const std::string& name, Node *node, TypeCode* type):AnyInputPort(name,node,type),
51 DataPort(name,node,type),Port(node),
56 InterceptorInputPort::InterceptorInputPort(const InterceptorInputPort& other, Node *newHelder):AnyInputPort(other,newHelder),DataPort(other,newHelder),
57 Port(other,newHelder),
62 void InterceptorInputPort::getAllRepresentants(std::set<InPort *>& repr) const
64 set<InPort *> ports=_repr->edSetInPort();
65 for(set<InPort *>::iterator iter=ports.begin();iter!=ports.end();iter++)
66 (*iter)->getAllRepresentants(repr);
69 InputPort *InterceptorInputPort::clone(Node *newHelder) const
71 return new InterceptorInputPort(*this,newHelder);
74 void InterceptorInputPort::setRepr(AnySplitOutputPort *repr)
79 bool AnySplitOutputPort::decrRef()
84 void AnySplitOutputPort::incrRef() const
89 AnySplitOutputPort::AnySplitOutputPort(const std::string& name, Node *node, TypeCode *type):OutputPort(name,node,type),
90 DataPort(name,node,type),Port(node),
91 _repr(0),_intercptr(0),_cnt(1)
95 AnySplitOutputPort::AnySplitOutputPort(const AnySplitOutputPort& other, Node *newHelder):OutputPort(other,newHelder),
96 DataPort(other,newHelder),
97 Port(other,newHelder),
98 _repr(0),_intercptr(0),_cnt(1)
102 bool AnySplitOutputPort::addInPort(InPort *inPort) throw(YACS::Exception)
104 bool ret=OutputPort::addInPort(inPort);
106 _repr->addInPort(_intercptr);
110 void AnySplitOutputPort::getAllRepresented(std::set<OutPort *>& represented) const
113 OutPort::getAllRepresented(represented);
115 _repr->getAllRepresented(represented);
118 int AnySplitOutputPort::removeInPort(InPort *inPort, bool forward) throw(YACS::Exception)
120 bool ret=OutputPort::removeInPort(inPort,forward);
122 if(_setOfInputPort.empty())
123 _repr->removeInPort(_intercptr,forward);
127 void AnySplitOutputPort::addRepr(OutPort *repr, InterceptorInputPort *intercptr)
130 _intercptr=intercptr;
133 OutputPort *AnySplitOutputPort::clone(Node *newHelder) const
135 return new AnySplitOutputPort(*this,newHelder);
138 SeqAnyInputPort::SeqAnyInputPort(const std::string& name, Node *node, TypeCodeSeq* type):AnyInputPort(name,node,type),DataPort(name,node,type),Port(node)
143 SeqAnyInputPort::SeqAnyInputPort(const SeqAnyInputPort& other, Node *newHelder):AnyInputPort(other,newHelder),DataPort(other,newHelder),Port(other,newHelder)
147 InputPort *SeqAnyInputPort::clone(Node *newHelder) const
149 return new SeqAnyInputPort(*this,newHelder);
152 unsigned SeqAnyInputPort::getNumberOfElements() const
154 const SequenceAny * valCsted=(const SequenceAny *) _value;
155 if (valCsted) return valCsted->size();
159 Any *SeqAnyInputPort::getValueAtRank(int i) const
161 const SequenceAny * valCsted=(const SequenceAny *) _value;
162 AnyPtr ret=(*valCsted)[i];
167 std::string SeqAnyInputPort::dump()
169 stringstream xmldump;
170 int nbElem = getNumberOfElements();
171 xmldump << "<value><array><data>" << endl;
172 for (int i = 0; i < nbElem; i++)
174 Any *val = getValueAtRank(i);
175 switch (val->getType()->kind())
178 xmldump << "<value><double>" << val->getDoubleValue() << "</double></value>" << endl;
181 xmldump << "<value><int>" << val->getIntValue() << "</int></value>" << endl;
184 xmldump << "<value><boolean>" << val->getBoolValue() << "</boolean></value>" << endl;
187 xmldump << "<value><string>" << val->getStringValue() << "</string></value>" << endl;
190 xmldump << "<value><objref>" << val->getStringValue() << "</objref></value>" << endl;
193 xmldump << "<value><error> NO_SERIALISATION_AVAILABLE </error></value>" << endl;
197 xmldump << "</data></array></value>" << endl;
198 return xmldump.str();
201 SplitterNode::SplitterNode(const std::string& name, TypeCode *typeOfData,
202 ForEachLoop *father):ElementaryNode(name),
203 _dataPortToDispatch(NAME_OF_SEQUENCE_INPUT,
204 this,(TypeCodeSeq *)TypeCode::sequenceTc("","",typeOfData))
209 SplitterNode::SplitterNode(const SplitterNode& other, ForEachLoop *father):ElementaryNode(other,father),
210 _dataPortToDispatch(other._dataPortToDispatch,this)
214 InputPort *SplitterNode::getInputPort(const std::string& name) const throw(YACS::Exception)
216 if(name==NAME_OF_SEQUENCE_INPUT)
217 return (InputPort *)&_dataPortToDispatch;
219 return ElementaryNode::getInputPort(name);
222 Node *SplitterNode::simpleClone(ComposedNode *father, bool editionOnly) const
224 return new SplitterNode(*this,(ForEachLoop *)father);
227 unsigned SplitterNode::getNumberOfElements() const
229 return _dataPortToDispatch.getNumberOfElements();
232 void SplitterNode::execute()
234 //Nothing : should never been called elsewhere big problem...
237 void SplitterNode::init(bool start)
239 ElementaryNode::init(start);
240 _dataPortToDispatch.exInit(start);
243 void SplitterNode::putSplittedValueOnRankTo(int rankInSeq, int branch, bool first)
245 Any *valueToDispatch=_dataPortToDispatch.getValueAtRank(rankInSeq);
246 ForEachLoop *fatherTyped=(ForEachLoop *)_father;
247 fatherTyped->putValueOnBranch(valueToDispatch,branch,first);
248 valueToDispatch->decrRef();
251 FakeNodeForForEachLoop::FakeNodeForForEachLoop(ForEachLoop *loop, bool normalFinish):ElementaryNode(NAME),
253 _normalFinish(normalFinish)
255 _state=YACS::TOACTIVATE;
256 _father=_loop->getFather();
259 FakeNodeForForEachLoop::FakeNodeForForEachLoop(const FakeNodeForForEachLoop& other):ElementaryNode(other),_loop(0),
264 Node *FakeNodeForForEachLoop::simpleClone(ComposedNode *father, bool editionOnly) const
266 return new FakeNodeForForEachLoop(*this);
269 void FakeNodeForForEachLoop::exForwardFailed()
271 _loop->exForwardFailed();
274 void FakeNodeForForEachLoop::exForwardFinished()
276 _loop->exForwardFinished();
279 void FakeNodeForForEachLoop::execute()
282 throw Exception("");//only to trigger ABORT on Executor
284 _loop->pushAllSequenceValues();
287 void FakeNodeForForEachLoop::aborted()
289 _loop->setState(YACS::ERROR);
292 void FakeNodeForForEachLoop::finished()
294 _loop->setState(YACS::DONE);
297 ForEachLoop::ForEachLoop(const std::string& name, TypeCode *typeOfDataSplitted):DynParaLoop(name,typeOfDataSplitted),
298 _splitterNode(NAME_OF_SPLITTERNODE,typeOfDataSplitted,this),
299 _execCurrentId(0),_nodeForSpecialCases(0),_currentIndex(0)
303 ForEachLoop::ForEachLoop(const ForEachLoop& other, ComposedNode *father, bool editionOnly):DynParaLoop(other,father,editionOnly),
304 _splitterNode(other._splitterNode,this),
305 _execCurrentId(0),_nodeForSpecialCases(0),_currentIndex(0)
309 for(vector<AnySplitOutputPort *>::const_iterator iter2=other._outGoingPorts.begin();iter2!=other._outGoingPorts.end();iter2++,i++)
311 AnySplitOutputPort *temp=new AnySplitOutputPort(*(*iter2),this);
312 InterceptorInputPort *interc=new InterceptorInputPort(*other._intecptrsForOutGoingPorts[i],this);
313 temp->addRepr(getOutPort((*iter2)->getName()),interc);
314 interc->setRepr(temp);
315 _outGoingPorts.push_back(temp);
316 _intecptrsForOutGoingPorts.push_back(interc);
320 Node *ForEachLoop::simpleClone(ComposedNode *father, bool editionOnly) const
322 return new ForEachLoop(*this,father,editionOnly);
325 ForEachLoop::~ForEachLoop()
328 for(vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();iter!=_outGoingPorts.end();iter++)
330 for(vector<InterceptorInputPort *>::iterator iter2=_intecptrsForOutGoingPorts.begin();iter2!=_intecptrsForOutGoingPorts.end();iter2++)
334 void ForEachLoop::init(bool start)
336 DynParaLoop::init(start);
337 _splitterNode.init(start);
344 void ForEachLoop::exUpdateState()
346 DEBTRACE("ForEachLoop::exUpdateState");
347 if(_state == YACS::DISABLED)
349 if(_inGate.exIsReady())
351 //internal graph update
353 int nbOfBr=_nbOfBranches.getIntValue();
354 int nbOfElts=_splitterNode.getNumberOfElements();
356 DEBTRACE("nbOfElts=" << nbOfElts);
357 DEBTRACE("nbOfBr=" << nbOfBr);
361 prepareSequenceValues(0);
362 delete _nodeForSpecialCases;
363 _nodeForSpecialCases=new FakeNodeForForEachLoop(this,true);
364 setState(YACS::ACTIVATED);
369 delete _nodeForSpecialCases;
370 _nodeForSpecialCases=new FakeNodeForForEachLoop(this,getAllOutPortsLeavingCurrentScope().empty());
371 setState(YACS::ACTIVATED);
376 _execNodes.resize(nbOfBr);
377 _execIds.resize(nbOfBr);
378 _execOutGoingPorts.resize(nbOfBr);
379 prepareSequenceValues(nbOfElts);
381 _execInitNodes.resize(nbOfBr);
382 _initializingCounter = 0;
384 _execFinalizeNodes.resize(nbOfBr);
386 vector<Node *> origNodes;
387 origNodes.push_back(_initNode);
388 origNodes.push_back(_node);
389 origNodes.push_back(_finalizeNode);
391 //Conversion exceptions can be thrown by createOutputOutOfScopeInterceptors
392 //so catch them to control errors
395 for(i=0;i<nbOfBr;i++)
397 DEBTRACE( "-------------- 1 " << i << " " << _execCurrentId);
398 _execIds[i]=_execCurrentId;
399 DEBTRACE( "-------------- 2" );
400 vector<Node *> clonedNodes = cloneAndPlaceNodesCoherently(origNodes);
402 _execInitNodes[i] = clonedNodes[0];
403 _execNodes[i] = clonedNodes[1];
405 _execFinalizeNodes[i] = clonedNodes[2];
406 DEBTRACE( "-------------- 4" );
407 prepareInputsFromOutOfScope(i);
408 DEBTRACE( "-------------- 5" );
409 createOutputOutOfScopeInterceptors(i);
410 DEBTRACE( "-------------- 6" );
411 _splitterNode.putSplittedValueOnRankTo(_execCurrentId++,i,true);
412 DEBTRACE( "-------------- 7" );
415 catch(YACS::Exception& ex)
417 //ForEachLoop must be put in error and the exception rethrown to notify the caller
418 DEBTRACE( "ForEachLoop::exUpdateState: " << ex.what() );
419 setState(YACS::ERROR);
424 setState(YACS::ACTIVATED); // move the calling of setState method there for adding observers for clone nodes in GUI part
427 for(i=0;i<nbOfBr;i++)
430 _execInitNodes[i]->exUpdateState();
431 _initializingCounter++;
436 _execNodes[i]->exUpdateState();
439 forwardExecStateToOriginalBody(_execNodes[nbOfBr-1]);
443 void ForEachLoop::exUpdateProgress()
445 // emit notification to all observers registered with the dispatcher on any change of the node's state
446 sendEvent("progress");
449 void ForEachLoop::getReadyTasks(std::vector<Task *>& tasks)
453 if(_state==YACS::TOACTIVATE) setState(YACS::ACTIVATED);
454 if(_state==YACS::TOACTIVATE || _state==YACS::ACTIVATED)
456 if(_nodeForSpecialCases)
458 _nodeForSpecialCases->getReadyTasks(tasks);
461 vector<Node *>::iterator iter;
462 for (iter=_execNodes.begin() ; iter!=_execNodes.end() ; iter++)
463 (*iter)->getReadyTasks(tasks);
464 for (iter=_execInitNodes.begin() ; iter!=_execInitNodes.end() ; iter++)
465 (*iter)->getReadyTasks(tasks);
466 for (iter=_execFinalizeNodes.begin() ; iter!=_execFinalizeNodes.end() ; iter++)
467 (*iter)->getReadyTasks(tasks);
471 int ForEachLoop::getNumberOfInputPorts() const
473 return DynParaLoop::getNumberOfInputPorts()+1;
476 void ForEachLoop::checkNoCyclePassingThrough(Node *node) throw(YACS::Exception)
481 void ForEachLoop::selectRunnableTasks(std::vector<Task *>& tasks)
485 std::list<InputPort *> ForEachLoop::getSetOfInputPort() const
487 list<InputPort *> ret=DynParaLoop::getSetOfInputPort();
488 ret.push_back((InputPort *)&_splitterNode._dataPortToDispatch);
492 std::list<InputPort *> ForEachLoop::getLocalInputPorts() const
494 list<InputPort *> ret=DynParaLoop::getLocalInputPorts();
495 ret.push_back((InputPort *)&_splitterNode._dataPortToDispatch);
499 InputPort *ForEachLoop::getInputPort(const std::string& name) const throw(YACS::Exception)
501 if(name==SplitterNode::NAME_OF_SEQUENCE_INPUT)
502 return (InputPort *)&_splitterNode._dataPortToDispatch;
504 return DynParaLoop::getInputPort(name);
507 OutputPort *ForEachLoop::getOutputPort(const std::string& name) const throw(YACS::Exception)
509 for(vector<AnySplitOutputPort *>::const_iterator iter=_outGoingPorts.begin();iter!=_outGoingPorts.end();iter++)
511 if(name==(*iter)->getName())
512 return (OutputPort *)(*iter);
514 return DynParaLoop::getOutputPort(name);
517 OutPort *ForEachLoop::getOutPort(const std::string& name) const throw(YACS::Exception)
519 for(vector<AnySplitOutputPort *>::const_iterator iter=_outGoingPorts.begin();iter!=_outGoingPorts.end();iter++)
521 if(name==(*iter)->getName())
522 return (OutPort *)(*iter);
524 return DynParaLoop::getOutPort(name);
527 Node *ForEachLoop::getChildByShortName(const std::string& name) const throw(YACS::Exception)
529 if(name==NAME_OF_SPLITTERNODE)
530 return (Node *)&_splitterNode;
532 return DynParaLoop::getChildByShortName(name);
535 //! Method used to notify the node that a child node has finished
537 * Update the current state and return the change state
539 * \param node : the child node that has finished
540 * \return the state change
542 YACS::Event ForEachLoop::updateStateOnFinishedEventFrom(Node *node)
544 DEBTRACE("updateStateOnFinishedEventFrom " << node->getName() << " " << node->getState());
546 switch(getIdentityOfNotifyerNode(node,id))
549 return updateStateForInitNodeOnFinishedEventFrom(node,id);
551 return updateStateForWorkNodeOnFinishedEventFrom(node,id,true);
553 return updateStateForFinalizeNodeOnFinishedEventFrom(node,id);
557 return YACS::NOEVENT;
560 YACS::Event ForEachLoop::updateStateForInitNodeOnFinishedEventFrom(Node *node, unsigned int id)
562 _execNodes[id]->exUpdateState();
564 _initializingCounter--;
565 if (_initializingCounter == 0)
566 _initNode->setState(DONE);
567 return YACS::NOEVENT;
571 * \param [in] isNormalFinish - if true
573 YACS::Event ForEachLoop::updateStateForWorkNodeOnFinishedEventFrom(Node *node, unsigned int id, bool isNormalFinish)
578 storeOutValsInSeqForOutOfScopeUse(_execIds[id],id);
579 if(_execCurrentId==_splitterNode.getNumberOfElements())
580 {//No more elements of _dataPortToDispatch to treat
581 _execIds[id]=NOT_RUNNING_BRANCH_ID;
582 //analyzing if some samples are still on treatment on other branches.
583 bool isFinished(true);
584 for(int i=0;i<_execIds.size() && isFinished;i++)
585 isFinished=(_execIds[i]==NOT_RUNNING_BRANCH_ID);
590 if(_failedCounter!=0)
592 std::ostringstream oss; oss << "Keep Going mode activated and some errors (" << _failedCounter << ")reported !";
593 throw YACS::Exception(oss.str());
595 pushAllSequenceValues();
599 _node->setState(YACS::DONE);
601 ComposedNode* compNode = dynamic_cast<ComposedNode*>(_node);
604 std::list<Node *> aChldn = compNode->getAllRecursiveConstituents();
605 std::list<Node *>::iterator iter=aChldn.begin();
606 for(;iter!=aChldn.end();iter++)
607 (*iter)->setState(YACS::DONE);
611 if (_finalizeNode == NULL)
613 // No finalize node, we just finish the loop at the end of exec nodes execution
614 setState(YACS::DONE);
619 // Run the finalize nodes, the loop will be done only when they all finish
620 _unfinishedCounter = 0; // This counter indicates how many branches are not finished
621 for (int i=0 ; i<_execIds.size() ; i++)
623 YASSERT(_execIds[i] == NOT_RUNNING_BRANCH_ID);
624 DEBTRACE("Launching finalize node for branch " << i);
625 _execFinalizeNodes[i]->exUpdateState();
626 _unfinishedCounter++;
628 return YACS::NOEVENT;
631 catch(YACS::Exception& ex)
633 DEBTRACE("ForEachLoop::updateStateOnFinishedEventFrom: "<<ex.what());
634 //no way to push results : put following nodes in FAILED state
635 //TODO could be more fine grain : put only concerned nodes in FAILED state
637 setState(YACS::ERROR);
642 else if(_state == YACS::ACTIVATED)
643 {//more elements to do and loop still activated
644 _execIds[id]=_execCurrentId;
646 _splitterNode.putSplittedValueOnRankTo(_execCurrentId++,id,false);
647 node->exUpdateState();
648 forwardExecStateToOriginalBody(node);
652 {//elements to process and loop no more activated
653 DEBTRACE("foreach loop state " << _state);
655 return YACS::NOEVENT;
658 YACS::Event ForEachLoop::updateStateForFinalizeNodeOnFinishedEventFrom(Node *node, unsigned int id)
660 DEBTRACE("Finalize node finished on branch " << id);
661 _unfinishedCounter--;
664 DEBTRACE(_unfinishedCounter << " finalize nodes still running");
665 if (_unfinishedCounter == 0)
667 _finalizeNode->setState(YACS::DONE);
668 setState(YACS::DONE);
672 return YACS::NOEVENT;
675 YACS::Event ForEachLoop::updateStateOnFailedEventFrom(Node *node, const Executor *execInst)
678 DynParaLoop::TypeOfNode ton(getIdentityOfNotifyerNode(node,id));
679 if(ton!=WORK_NODE || !execInst->getKeepGoingProperty())
680 return DynParaLoop::updateStateOnFailedEventFrom(node,execInst);
684 return updateStateForWorkNodeOnFinishedEventFrom(node,id,false);
688 void ForEachLoop::buildDelegateOf(std::pair<OutPort *, OutPort *>& port, InPort *finalTarget, const std::list<ComposedNode *>& pointsOfView)
690 DynParaLoop::buildDelegateOf(port,finalTarget,pointsOfView);
691 string typeOfPortInstance=(port.first)->getNameOfTypeOfCurrentInstance();
692 if(typeOfPortInstance==OutputPort::NAME)
694 vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
696 for(;iter!=_outGoingPorts.end();iter++,i++)
697 if((*iter)->getRepr()==port.first || *iter==port.first)
699 if(iter!=_outGoingPorts.end())
701 if(*iter!=port.first)
704 (*iter)->addRepr(port.first,_intecptrsForOutGoingPorts[i]);
710 TypeCodeSeq *newTc=(TypeCodeSeq *)TypeCode::sequenceTc("","",port.first->edGetType());
711 AnySplitOutputPort *newPort=new AnySplitOutputPort(getPortName(port.first),this,newTc);
712 InterceptorInputPort *intercptor=new InterceptorInputPort(string("intercptr for ")+getPortName(port.first),this,port.first->edGetType());
713 intercptor->setRepr(newPort);
715 newPort->addRepr(port.first,intercptor);
716 _outGoingPorts.push_back(newPort);
717 _intecptrsForOutGoingPorts.push_back(intercptor);
722 throw Exception("ForEachLoop::buildDelegateOf : not implemented for DS because not specified");
725 void ForEachLoop::getDelegateOf(std::pair<OutPort *, OutPort *>& port, InPort *finalTarget, const std::list<ComposedNode *>& pointsOfView) throw(YACS::Exception)
727 string typeOfPortInstance=(port.first)->getNameOfTypeOfCurrentInstance();
728 if(typeOfPortInstance==OutputPort::NAME)
730 vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
731 for(;iter!=_outGoingPorts.end();iter++)
732 if((*iter)->getRepr()==port.first)
734 if(iter==_outGoingPorts.end())
736 string what("ForEachLoop::getDelegateOf : Port with name "); what+=port.first->getName(); what+=" not exported by ForEachLoop "; what+=_name;
737 throw Exception(what);
743 throw Exception("ForEachLoop::getDelegateOf : not implemented because not specified");
746 void ForEachLoop::releaseDelegateOf(OutPort *portDwn, OutPort *portUp, InPort *finalTarget, const std::list<ComposedNode *>& pointsOfView) throw(YACS::Exception)
748 string typeOfPortInstance=portDwn->getNameOfTypeOfCurrentInstance();
749 if(typeOfPortInstance==OutputPort::NAME)
751 vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
752 vector<InterceptorInputPort *>::iterator iter2=_intecptrsForOutGoingPorts.begin();
753 for(;iter!=_outGoingPorts.end();iter++,iter2++)
754 if((*iter)->getRepr()==portDwn)
756 //ASSERT(portUp==*iter.second)
757 if((*iter)->decrRef())
759 AnySplitOutputPort *p=*iter;
760 _outGoingPorts.erase(iter);
762 InterceptorInputPort *ip=*iter2;
763 _intecptrsForOutGoingPorts.erase(iter2);
769 OutPort *ForEachLoop::getDynOutPortByAbsName(int branchNb, const std::string& name)
771 string portName, nodeName;
772 splitNamesBySep(name,Node::SEP_CHAR_IN_PORT,nodeName,portName,false);
773 Node *staticChild = getChildByName(nodeName);
774 return _execNodes[branchNb]->getOutPort(portName);//It's impossible(garanteed by YACS::ENGINE::ForEachLoop::buildDelegateOf)
775 //that a link starting from _initNode goes out of scope of 'this'.
778 void ForEachLoop::cleanDynGraph()
780 DynParaLoop::cleanDynGraph();
781 for(vector< SequenceAny *>::iterator iter3=_execVals.begin();iter3!=_execVals.end();iter3++)
784 for(vector< vector<AnyInputPort *> >::iterator iter4=_execOutGoingPorts.begin();iter4!=_execOutGoingPorts.end();iter4++)
785 for(vector<AnyInputPort *>::iterator iter5=(*iter4).begin();iter5!=(*iter4).end();iter5++)
787 _execOutGoingPorts.clear();
790 void ForEachLoop::storeOutValsInSeqForOutOfScopeUse(int rank, int branchNb)
792 vector<AnyInputPort *>::iterator iter;
794 for(iter=_execOutGoingPorts[branchNb].begin();iter!=_execOutGoingPorts[branchNb].end();iter++,i++)
796 Any *val=(Any *)(*iter)->getValue();
797 _execVals[i]->setEltAtRank(rank,val);
801 void ForEachLoop::prepareSequenceValues(int sizeOfSamples)
803 _execVals.resize(_outGoingPorts.size());
804 vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
805 for(int i=0;iter!=_outGoingPorts.end();iter++,i++)
806 _execVals[i]=SequenceAny::New((*iter)->edGetType()->contentType(),sizeOfSamples);
809 void ForEachLoop::pushAllSequenceValues()
811 vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
813 for(;iter!=_outGoingPorts.end();iter++,i++)
814 (*iter)->put((const void *)_execVals[i]);
817 void ForEachLoop::createOutputOutOfScopeInterceptors(int branchNb)
819 vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
821 for(;iter!=_outGoingPorts.end();iter++,i++)
823 DEBTRACE( (*iter)->getName() << " " << (*iter)->edGetType()->kind() );
824 //AnyInputPort *interceptor=new AnyInputPort((*iter)->getName(),this,(*iter)->edGetType());
825 OutPort *portOut=getDynOutPortByAbsName(branchNb,getOutPortName(((*iter)->getRepr())));
826 DEBTRACE( portOut->getName() );
827 AnyInputPort *interceptor=new AnyInputPort((*iter)->getName(),this,portOut->edGetType());
828 portOut->addInPort(interceptor);
829 _execOutGoingPorts[branchNb].push_back(interceptor);
833 void ForEachLoop::checkLinkPossibility(OutPort *start, const std::list<ComposedNode *>& pointsOfViewStart,
834 InPort *end, const std::list<ComposedNode *>& pointsOfViewEnd) throw(YACS::Exception)
836 if(isInMyDescendance(start->getNode())==_node)
837 throw Exception("ForEachLoop::checkLinkPossibility : A link from work node to init node not permitted");
840 std::list<OutputPort *> ForEachLoop::getLocalOutputPorts() const
842 list<OutputPort *> ret;
843 ret.push_back(getOutputPort(NAME_OF_SPLITTED_SEQ_OUT));
847 void ForEachLoop::accept(Visitor *visitor)
849 visitor->visitForEachLoop(this);
852 //! Dump the node state to a stream
854 * \param os : the output stream
856 void ForEachLoop::writeDot(std::ostream &os) const
858 os << " subgraph cluster_" << getId() << " {\n" ;
859 //only one node in a loop
863 os << getId() << " -> " << _node->getId() << ";\n";
866 os << getId() << "[fillcolor=\"" ;
867 YACS::StatesForNode state=getEffectiveState();
868 os << getColorState(state);
869 os << "\" label=\"" << "Loop:" ;
870 os << getName() <<"\"];\n";
873 //! Reset the state of the node and its children depending on the parameter level
874 void ForEachLoop::resetState(int level)
877 DynParaLoop::resetState(level);
879 //Note: cleanDynGraph is not a virtual method (must be called from ForEachLoop object)
883 std::string ForEachLoop::getProgress() const
885 int nbElems = _splitterNode.getNumberOfElements();
886 std::stringstream aProgress;
888 aProgress << _currentIndex << "/" << nbElems;
891 return aProgress.str();
895 * This method allows to retrieve the state of \a this during execution or after. This method works even if this is \b NOT complete, or during execution or after a failure in \a this.
896 * The typical usage of this method is to retrieve the results of items that passed successfully to avoid to lose all of them if only one fails.
897 * This method has one input \a execut and 3 outputs.
899 * \param [in] execut - The single input is for threadsafety reasons because this method can be called safely during the execution of \a this.
900 * \param [out] outputs - For each output ports in \a this linked with nodes sharing the same father than \a this the passed results are stored.
901 * All of the items in \a outputs have the same size.
902 * \param [out] nameOfOutputs - The array with same size than \a outputs, that tells for each item in outputs the output port it refers to.
903 * \return the list of ids among \c this->edGetSeqOfSamplesPort() that run successfully. The length of this returned array will be the length of all
904 * SequenceAny objects contained in \a outputs.
906 * \sa edGetSeqOfSamplesPort
908 std::vector<unsigned int> ForEachLoop::getPassedResults(Executor *execut, std::vector<SequenceAny *>& outputs, std::vector<std::string>& nameOfOutputs) const
910 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&(execut->getTheMutexForSchedulerUpdate()));
911 if(_execVals.empty())
912 return std::vector<unsigned int>();
913 if(_execOutGoingPorts.empty())
914 return std::vector<unsigned int>();
915 std::size_t sz(_execVals.size()); outputs.resize(sz); nameOfOutputs.resize(sz);
916 const std::vector<AnyInputPort *>& ports(_execOutGoingPorts[0]);
917 for(std::size_t i=0;i<sz;i++)
919 outputs[i]=_execVals[i]->removeUnsetItemsFromThis();
920 nameOfOutputs[i]=ports[i]->getName();
922 return _execVals[0]->getSetItems();