1 // Copyright (C) 2006-2016 CEA/DEN, EDF R&D
3 // This library is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU Lesser General Public
5 // License as published by the Free Software Foundation; either
6 // version 2.1 of the License, or (at your option) any later version.
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 // Lesser General Public License for more details.
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
20 #include "ForEachLoop.hxx"
21 #include "TypeCode.hxx"
22 #include "Visitor.hxx"
23 #include "ComposedNode.hxx"
24 #include "Executor.hxx"
25 #include "AutoLocker.hxx"
30 #include <algorithm> // std::replace_if
33 #include "YacsTrace.hxx"
35 using namespace YACS::ENGINE;
38 /*! \class YACS::ENGINE::ForEachLoop
39 * \brief Loop node for parametric calculation
44 const char FakeNodeForForEachLoop::NAME[]="thisIsAFakeNode";
46 const char SplitterNode::NAME_OF_SEQUENCE_INPUT[]="SmplsCollection";
48 const char ForEachLoop::NAME_OF_SPLITTERNODE[]="splitter";
50 const int ForEachLoop::NOT_RUNNING_BRANCH_ID=-1;
52 const char ForEachLoop::INTERCEPTOR_STR[]="_interceptor";
54 InterceptorInputPort::InterceptorInputPort(const std::string& name, Node *node, TypeCode* type):AnyInputPort(name,node,type),
55 DataPort(name,node,type),Port(node),
60 InterceptorInputPort::InterceptorInputPort(const InterceptorInputPort& other, Node *newHelder):AnyInputPort(other,newHelder),DataPort(other,newHelder),
61 Port(other,newHelder),
66 void InterceptorInputPort::getAllRepresentants(std::set<InPort *>& repr) const
68 set<InPort *> ports=_repr->edSetInPort();
69 for(set<InPort *>::iterator iter=ports.begin();iter!=ports.end();iter++)
70 (*iter)->getAllRepresentants(repr);
73 InputPort *InterceptorInputPort::clone(Node *newHelder) const
75 return new InterceptorInputPort(*this,newHelder);
78 void InterceptorInputPort::setRepr(AnySplitOutputPort *repr)
83 bool AnySplitOutputPort::decrRef()
88 void AnySplitOutputPort::incrRef() const
93 AnySplitOutputPort::AnySplitOutputPort(const std::string& name, Node *node, TypeCode *type):OutputPort(name,node,type),
94 DataPort(name,node,type),Port(node),
95 _repr(0),_intercptr(0),_cnt(1)
99 AnySplitOutputPort::AnySplitOutputPort(const AnySplitOutputPort& other, Node *newHelder):OutputPort(other,newHelder),
100 DataPort(other,newHelder),
101 Port(other,newHelder),
102 _repr(0),_intercptr(0),_cnt(1)
106 bool AnySplitOutputPort::addInPort(InPort *inPort) throw(YACS::Exception)
108 bool ret=OutputPort::addInPort(inPort);
110 _repr->addInPort(_intercptr);
114 void AnySplitOutputPort::getAllRepresented(std::set<OutPort *>& represented) const
117 OutPort::getAllRepresented(represented);
119 _repr->getAllRepresented(represented);
122 int AnySplitOutputPort::removeInPort(InPort *inPort, bool forward) throw(YACS::Exception)
124 bool ret=OutputPort::removeInPort(inPort,forward);
126 if(_setOfInputPort.empty())
127 _repr->removeInPort(_intercptr,forward);
131 void AnySplitOutputPort::addRepr(OutPort *repr, InterceptorInputPort *intercptr)
134 _intercptr=intercptr;
137 OutputPort *AnySplitOutputPort::clone(Node *newHelder) const
139 return new AnySplitOutputPort(*this,newHelder);
142 SeqAnyInputPort::SeqAnyInputPort(const std::string& name, Node *node, TypeCodeSeq* type):AnyInputPort(name,node,type),DataPort(name,node,type),Port(node)
147 SeqAnyInputPort::SeqAnyInputPort(const SeqAnyInputPort& other, Node *newHelder):AnyInputPort(other,newHelder),DataPort(other,newHelder),Port(other,newHelder)
151 InputPort *SeqAnyInputPort::clone(Node *newHelder) const
153 return new SeqAnyInputPort(*this,newHelder);
156 unsigned SeqAnyInputPort::getNumberOfElements() const
158 const SequenceAny * valCsted=(const SequenceAny *) _value;
159 if (valCsted) return valCsted->size();
163 Any *SeqAnyInputPort::getValueAtRank(int i) const
165 const SequenceAny * valCsted=(const SequenceAny *) _value;
166 AnyPtr ret=(*valCsted)[i];
171 std::string SeqAnyInputPort::dump()
173 stringstream xmldump;
174 int nbElem = getNumberOfElements();
175 xmldump << "<value><array><data>" << endl;
176 for (int i = 0; i < nbElem; i++)
178 Any *val = getValueAtRank(i);
179 switch (val->getType()->kind())
182 xmldump << "<value><double>" << setprecision(16) << val->getDoubleValue() << "</double></value>" << endl;
185 xmldump << "<value><int>" << val->getIntValue() << "</int></value>" << endl;
188 xmldump << "<value><boolean>" << val->getBoolValue() << "</boolean></value>" << endl;
191 xmldump << "<value><string>" << val->getStringValue() << "</string></value>" << endl;
194 xmldump << "<value><objref>" << val->getStringValue() << "</objref></value>" << endl;
197 xmldump << "<value><error> NO_SERIALISATION_AVAILABLE </error></value>" << endl;
201 xmldump << "</data></array></value>" << endl;
202 return xmldump.str();
205 SplitterNode::SplitterNode(const std::string& name, TypeCode *typeOfData,
206 ForEachLoop *father):ElementaryNode(name),
207 _dataPortToDispatch(NAME_OF_SEQUENCE_INPUT,
208 this,(TypeCodeSeq *)TypeCode::sequenceTc("","",typeOfData))
213 SplitterNode::SplitterNode(const SplitterNode& other, ForEachLoop *father):ElementaryNode(other,father),
214 _dataPortToDispatch(other._dataPortToDispatch,this)
218 InputPort *SplitterNode::getInputPort(const std::string& name) const throw(YACS::Exception)
220 if(name==NAME_OF_SEQUENCE_INPUT)
221 return (InputPort *)&_dataPortToDispatch;
223 return ElementaryNode::getInputPort(name);
226 Node *SplitterNode::simpleClone(ComposedNode *father, bool editionOnly) const
228 return new SplitterNode(*this,(ForEachLoop *)father);
231 unsigned SplitterNode::getNumberOfElements() const
233 return _dataPortToDispatch.getNumberOfElements();
236 void SplitterNode::execute()
238 //Nothing : should never been called elsewhere big problem...
241 void SplitterNode::init(bool start)
243 ElementaryNode::init(start);
244 _dataPortToDispatch.exInit(start);
247 void SplitterNode::putSplittedValueOnRankTo(int rankInSeq, int branch, bool first)
249 Any *valueToDispatch=_dataPortToDispatch.getValueAtRank(rankInSeq);
250 ForEachLoop *fatherTyped=(ForEachLoop *)_father;
251 fatherTyped->putValueOnBranch(valueToDispatch,branch,first);
252 valueToDispatch->decrRef();
255 FakeNodeForForEachLoop::FakeNodeForForEachLoop(ForEachLoop *loop, bool normalFinish):ElementaryNode(NAME),
257 _normalFinish(normalFinish)
259 _state=YACS::TOACTIVATE;
260 _father=_loop->getFather();
263 FakeNodeForForEachLoop::FakeNodeForForEachLoop(const FakeNodeForForEachLoop& other):ElementaryNode(other),_loop(0),
268 Node *FakeNodeForForEachLoop::simpleClone(ComposedNode *father, bool editionOnly) const
270 return new FakeNodeForForEachLoop(*this);
273 void FakeNodeForForEachLoop::exForwardFailed()
275 _loop->exForwardFailed();
278 void FakeNodeForForEachLoop::exForwardFinished()
280 _loop->exForwardFinished();
283 void FakeNodeForForEachLoop::execute()
286 throw Exception("");//only to trigger ABORT on Executor
288 _loop->pushAllSequenceValues();
291 void FakeNodeForForEachLoop::aborted()
293 _loop->setState(YACS::ERROR);
296 void FakeNodeForForEachLoop::finished()
298 _loop->setState(YACS::DONE);
301 ForEachLoopPassedData::ForEachLoopPassedData(const std::vector<unsigned int>& passedIds, const std::vector<SequenceAny *>& passedOutputs, const std::vector<std::string>& nameOfOutputs):_passedIds(passedIds),_passedOutputs(passedOutputs),_nameOfOutputs(nameOfOutputs)
303 std::size_t sz(_passedIds.size()),sz1(passedOutputs.size()),sz2(nameOfOutputs.size());
305 throw YACS::Exception("ForEachLoopPassedData::ForEachLoopPassedData : nameOfOutputs and passedOutputs must have the same size !");
306 for(std::vector<SequenceAny *>::iterator it=_passedOutputs.begin();it!=_passedOutputs.end();it++)
308 const SequenceAny *elt(*it);
310 if(sz!=(std::size_t)elt->size())
311 throw YACS::Exception("ForEachLoopPassedData::ForEachLoopPassedData : incoherent input of passed data !");
313 for(std::vector<SequenceAny *>::iterator it=_passedOutputs.begin();it!=_passedOutputs.end();it++)
315 SequenceAny *elt(*it);
321 ForEachLoopPassedData::ForEachLoopPassedData(const ForEachLoopPassedData& copy)
322 : _passedIds(copy._passedIds),
323 _passedOutputs(copy._passedOutputs),
324 _nameOfOutputs(copy._nameOfOutputs),
325 _flagsIds(copy._flagsIds)
329 ForEachLoopPassedData::~ForEachLoopPassedData()
331 for(std::vector<SequenceAny *>::iterator it=_passedOutputs.begin();it!=_passedOutputs.end();it++)
333 SequenceAny *elt(*it);
339 void ForEachLoopPassedData::init()
344 void ForEachLoopPassedData::checkCompatibilyWithNb(int nbOfElts) const
347 throw YACS::Exception("ForEachLoopPassedData::checkCompatibilyWithNb : nb of elts is expected to be > 0 !");
348 std::size_t sizeExp(_passedIds.size()),nbOfElts2(nbOfElts);
349 if(nbOfElts2<sizeExp)
350 throw YACS::Exception("ForEachLoopPassedData::checkCompatibilyWithNb : Invalid nb of elemts in input seq regarding passed data set !");
351 for(std::vector<unsigned int>::const_iterator it=_passedIds.begin();it!=_passedIds.end();it++)
354 throw YACS::Exception("ForEachLoopPassedData::checkCompatibilyWithNb : Invalid nb of elemts in input seq regarding passed data set 2 !");
356 _flagsIds.resize(nbOfElts);
357 std::fill(_flagsIds.begin(),_flagsIds.end(),false);
358 for(std::vector<unsigned int>::const_iterator it=_passedIds.begin();it!=_passedIds.end();it++)
366 std::ostringstream oss; oss << "ForEachLoopPassedData::checkCompatibilyWithNb : id " << *it << " in list of ids appears more than once !";
367 throw YACS::Exception(oss.str());
372 std::ostringstream oss; oss << "ForEachLoopPassedData::checkCompatibilyWithNb : Presence of id " << *it << " in list of ids ! Must be in [0," << nbOfElts << ") !";
373 throw YACS::Exception(oss.str());
378 void ForEachLoopPassedData::checkLevel2(const std::vector<AnyInputPort *>& ports) const
380 std::size_t sz(_nameOfOutputs.size());
382 throw YACS::Exception("ForEachLoopPassedData::checkLevel2 : mismatch of size of vectors !");
383 for(std::size_t i=0;i<sz;i++)
385 AnyInputPort *elt(ports[i]);
387 throw YACS::Exception("ForEachLoopPassedData::checkLevel2 : presence of null instance !");
388 if(_nameOfOutputs[i]!=elt->getName())
390 std::ostringstream oss; oss << "ForEachLoopPassedData::checkLevel2 : At pos #" << i << " the name is not OK !";
391 throw YACS::Exception(oss.str());
397 * performs local to abs id. Input \a localId refers to an id in all jobs to perform. Returned id refers to pos in whole output sequences.
399 int ForEachLoopPassedData::toAbsId(int localId) const
402 throw YACS::Exception("ForEachLoopPassedData::toAbsId : local pos must be >= 0 !");
403 int ret(0),curLocId(0);
404 for(std::vector<bool>::const_iterator it=_flagsIds.begin();it!=_flagsIds.end();it++,ret++)
408 if(localId==curLocId)
413 throw YACS::Exception("ForEachLoopPassedData::toAbsId : not referenced Id !");
417 * Equivalent to toAbsId except that only ON are considered here.
419 int ForEachLoopPassedData::toAbsIdNot(int localId) const
422 throw YACS::Exception("ForEachLoopPassedData::toAbsIdNot : local pos must be >= 0 !");
423 int ret(0),curLocId(0);
424 for(std::vector<bool>::const_iterator it=_flagsIds.begin();it!=_flagsIds.end();it++,ret++)
426 if(*it)//<- diff is here !
428 if(localId==curLocId)
433 throw YACS::Exception("ForEachLoopPassedData::toAbsIdNot : not referenced Id !");
436 int ForEachLoopPassedData::getNumberOfElementsToDo() const
438 std::size_t nbAllElts(_flagsIds.size());
439 std::size_t ret(nbAllElts-_passedIds.size());
443 void ForEachLoopPassedData::assignAlreadyDone(const std::vector<SequenceAny *>& execVals) const
445 std::size_t sz(execVals.size());
446 if(_passedOutputs.size()!=sz)
447 throw YACS::Exception("ForEachLoopPassedData::assignedAlreadyDone : mismatch of size of vectors !");
448 for(std::size_t i=0;i<sz;i++)
450 SequenceAny *elt(_passedOutputs[i]);
451 SequenceAny *eltDestination(execVals[i]);
453 throw YACS::Exception("ForEachLoopPassedData::assignedAlreadyDone : presence of null elt !");
454 unsigned int szOfElt(elt->size());
455 for(unsigned int j=0;j<szOfElt;j++)
457 AnyPtr elt1((*elt)[j]);
458 int jAbs(toAbsIdNot(j));
459 eltDestination->setEltAtRank(jAbs,elt1);
464 ForEachLoop::ForEachLoop(const std::string& name, TypeCode *typeOfDataSplitted):DynParaLoop(name,typeOfDataSplitted),
465 _splitterNode(NAME_OF_SPLITTERNODE,typeOfDataSplitted,this),
466 _execCurrentId(0),_nodeForSpecialCases(0),_currentIndex(0),_passedData(0)
470 ForEachLoop::ForEachLoop(const ForEachLoop& other, ComposedNode *father, bool editionOnly):DynParaLoop(other,father,editionOnly),
471 _splitterNode(other._splitterNode,this),
472 _execCurrentId(0),_nodeForSpecialCases(0),_currentIndex(0),_passedData(0)
476 for(vector<AnySplitOutputPort *>::const_iterator iter2=other._outGoingPorts.begin();iter2!=other._outGoingPorts.end();iter2++,i++)
478 AnySplitOutputPort *temp=new AnySplitOutputPort(*(*iter2),this);
479 InterceptorInputPort *interc=new InterceptorInputPort(*other._intecptrsForOutGoingPorts[i],this);
480 temp->addRepr(getOutPort(other.getOutPortName((*iter2)->getRepr())),interc);
481 interc->setRepr(temp);
482 _outGoingPorts.push_back(temp);
483 _intecptrsForOutGoingPorts.push_back(interc);
487 Node *ForEachLoop::simpleClone(ComposedNode *father, bool editionOnly) const
489 return new ForEachLoop(*this,father,editionOnly);
492 ForEachLoop::~ForEachLoop()
495 for(vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();iter!=_outGoingPorts.end();iter++)
497 for(vector<InterceptorInputPort *>::iterator iter2=_intecptrsForOutGoingPorts.begin();iter2!=_intecptrsForOutGoingPorts.end();iter2++)
502 void ForEachLoop::init(bool start)
504 DynParaLoop::init(start);
505 _splitterNode.init(start);
514 void ForEachLoop::exUpdateState()
516 DEBTRACE("ForEachLoop::exUpdateState");
517 if(_state == YACS::DISABLED)
519 if(_state == YACS::DONE)
521 if(_inGate.exIsReady())
523 //internal graph update
525 int nbOfBr(_nbOfBranches.getIntValue()),nbOfElts(_splitterNode.getNumberOfElements()),nbOfEltsDone(0);
528 _passedData->checkCompatibilyWithNb(nbOfElts);
529 nbOfEltsDone=_passedData->getNumberOfEltsAlreadyDone();
531 int nbOfEltsToDo(nbOfElts-nbOfEltsDone);
533 DEBTRACE("nbOfElts=" << nbOfElts);
534 DEBTRACE("nbOfBr=" << nbOfBr);
538 prepareSequenceValues(0);
539 delete _nodeForSpecialCases;
540 _nodeForSpecialCases=new FakeNodeForForEachLoop(this,true);
541 setState(YACS::ACTIVATED);
546 delete _nodeForSpecialCases;
547 _nodeForSpecialCases=new FakeNodeForForEachLoop(this,getAllOutPortsLeavingCurrentScope().empty());
548 setState(YACS::ACTIVATED);
551 if(nbOfBr>nbOfEltsToDo)
553 _execNodes.resize(nbOfBr);
554 _execIds.resize(nbOfBr);
555 _execOutGoingPorts.resize(nbOfBr);
556 prepareSequenceValues(nbOfElts);
558 _execInitNodes.resize(nbOfBr);
559 _initializingCounter = 0;
561 _execFinalizeNodes.resize(nbOfBr);
563 vector<Node *> origNodes;
564 origNodes.push_back(_initNode);
565 origNodes.push_back(_node);
566 origNodes.push_back(_finalizeNode);
568 //Conversion exceptions can be thrown by createOutputOutOfScopeInterceptors
569 //so catch them to control errors
572 for(i=0;i<nbOfBr;i++)
574 DEBTRACE( "-------------- 2" );
575 vector<Node *> clonedNodes = cloneAndPlaceNodesCoherently(origNodes);
577 _execInitNodes[i] = clonedNodes[0];
578 _execNodes[i] = clonedNodes[1];
580 _execFinalizeNodes[i] = clonedNodes[2];
581 DEBTRACE( "-------------- 4" );
582 prepareInputsFromOutOfScope(i);
583 DEBTRACE( "-------------- 5" );
584 createOutputOutOfScopeInterceptors(i);
585 DEBTRACE( "-------------- 6" );
587 for(i=0;i<nbOfBr;i++)
589 DEBTRACE( "-------------- 1 " << i << " " << _execCurrentId);
590 _execIds[i]=_execCurrentId;
591 int posInAbs(_execCurrentId);
593 posInAbs=_passedData->toAbsId(_execCurrentId);
594 _splitterNode.putSplittedValueOnRankTo(posInAbs,i,true);
596 DEBTRACE( "-------------- 7" );
600 _passedData->checkLevel2(_execOutGoingPorts[0]);
601 _passedData->assignAlreadyDone(_execVals);
604 catch(YACS::Exception& ex)
606 //ForEachLoop must be put in error and the exception rethrown to notify the caller
607 DEBTRACE( "ForEachLoop::exUpdateState: " << ex.what() );
608 setState(YACS::ERROR);
613 setState(YACS::ACTIVATED); // move the calling of setState method there for adding observers for clone nodes in GUI part
616 for(i=0;i<nbOfBr;i++)
619 _execInitNodes[i]->exUpdateState();
620 _initializingCounter++;
625 _execNodes[i]->exUpdateState();
628 forwardExecStateToOriginalBody(_execNodes[nbOfBr-1]);
632 void ForEachLoop::exUpdateProgress()
634 // emit notification to all observers registered with the dispatcher on any change of the node's state
635 sendEvent("progress");
638 void ForEachLoop::getReadyTasks(std::vector<Task *>& tasks)
642 if(_state==YACS::TOACTIVATE) setState(YACS::ACTIVATED);
643 if(_state==YACS::TOACTIVATE || _state==YACS::ACTIVATED)
645 if(_nodeForSpecialCases)
647 _nodeForSpecialCases->getReadyTasks(tasks);
650 vector<Node *>::iterator iter;
651 for (iter=_execNodes.begin() ; iter!=_execNodes.end() ; iter++)
652 (*iter)->getReadyTasks(tasks);
653 for (iter=_execInitNodes.begin() ; iter!=_execInitNodes.end() ; iter++)
654 (*iter)->getReadyTasks(tasks);
655 for (iter=_execFinalizeNodes.begin() ; iter!=_execFinalizeNodes.end() ; iter++)
656 (*iter)->getReadyTasks(tasks);
660 int ForEachLoop::getNumberOfInputPorts() const
662 return DynParaLoop::getNumberOfInputPorts()+1;
665 void ForEachLoop::checkNoCyclePassingThrough(Node *node) throw(YACS::Exception)
670 void ForEachLoop::selectRunnableTasks(std::vector<Task *>& tasks)
674 std::list<InputPort *> ForEachLoop::getSetOfInputPort() const
676 list<InputPort *> ret=DynParaLoop::getSetOfInputPort();
677 ret.push_back((InputPort *)&_splitterNode._dataPortToDispatch);
681 std::list<InputPort *> ForEachLoop::getLocalInputPorts() const
683 list<InputPort *> ret=DynParaLoop::getLocalInputPorts();
684 ret.push_back((InputPort *)&_splitterNode._dataPortToDispatch);
688 InputPort *ForEachLoop::getInputPort(const std::string& name) const throw(YACS::Exception)
690 if(name==SplitterNode::NAME_OF_SEQUENCE_INPUT)
691 return (InputPort *)&_splitterNode._dataPortToDispatch;
693 return DynParaLoop::getInputPort(name);
696 OutputPort *ForEachLoop::getOutputPort(const std::string& name) const throw(YACS::Exception)
698 for(vector<AnySplitOutputPort *>::const_iterator iter=_outGoingPorts.begin();iter!=_outGoingPorts.end();iter++)
700 if(name==(*iter)->getName())
701 return (OutputPort *)(*iter);
703 return DynParaLoop::getOutputPort(name);
706 OutPort *ForEachLoop::getOutPort(const std::string& name) const throw(YACS::Exception)
708 for(vector<AnySplitOutputPort *>::const_iterator iter=_outGoingPorts.begin();iter!=_outGoingPorts.end();iter++)
710 if(name==(*iter)->getName())
711 return (OutPort *)(*iter);
713 return DynParaLoop::getOutPort(name);
716 Node *ForEachLoop::getChildByShortName(const std::string& name) const throw(YACS::Exception)
718 if(name==NAME_OF_SPLITTERNODE)
719 return (Node *)&_splitterNode;
721 return DynParaLoop::getChildByShortName(name);
724 //! Method used to notify the node that a child node has finished
726 * Update the current state and return the change state
728 * \param node : the child node that has finished
729 * \return the state change
731 YACS::Event ForEachLoop::updateStateOnFinishedEventFrom(Node *node)
733 DEBTRACE("updateStateOnFinishedEventFrom " << node->getName() << " " << node->getState());
735 switch(getIdentityOfNotifyerNode(node,id))
738 return updateStateForInitNodeOnFinishedEventFrom(node,id);
740 return updateStateForWorkNodeOnFinishedEventFrom(node,id,true);
742 return updateStateForFinalizeNodeOnFinishedEventFrom(node,id);
746 return YACS::NOEVENT;
749 YACS::Event ForEachLoop::updateStateForInitNodeOnFinishedEventFrom(Node *node, unsigned int id)
751 _execNodes[id]->exUpdateState();
753 _initializingCounter--;
755 if (_initializingCounter == 0)
756 _initNode->setState(DONE);
757 return YACS::NOEVENT;
761 * \param [in] isNormalFinish - if true
763 YACS::Event ForEachLoop::updateStateForWorkNodeOnFinishedEventFrom(Node *node, unsigned int id, bool isNormalFinish)
769 int globalId(_execIds[id]);
771 globalId=_passedData->toAbsId(globalId);
772 sendEvent2("progress_ok",&globalId);
773 storeOutValsInSeqForOutOfScopeUse(globalId,id);
777 int globalId(_execIds[id]);
779 globalId=_passedData->toAbsId(globalId);
780 sendEvent2("progress_ko",&globalId);
783 if(_execCurrentId==getFinishedId())
784 {//No more elements of _dataPortToDispatch to treat
785 _execIds[id]=NOT_RUNNING_BRANCH_ID;
786 //analyzing if some samples are still on treatment on other branches.
787 bool isFinished(true);
788 for(int i=0;i<_execIds.size() && isFinished;i++)
789 isFinished=(_execIds[i]==NOT_RUNNING_BRANCH_ID);
794 if(_failedCounter!=0)
795 {// case of keepgoing mode + a failed
796 std::ostringstream oss; oss << "Keep Going mode activated and some errors (" << _failedCounter << ")reported !";
797 DEBTRACE("ForEachLoop::updateStateOnFinishedEventFrom : "<< oss.str());
798 setState(YACS::FAILED);
801 pushAllSequenceValues();
805 _node->setState(YACS::DONE);
807 ComposedNode* compNode = dynamic_cast<ComposedNode*>(_node);
810 std::list<Node *> aChldn = compNode->getAllRecursiveConstituents();
811 std::list<Node *>::iterator iter=aChldn.begin();
812 for(;iter!=aChldn.end();iter++)
813 (*iter)->setState(YACS::DONE);
817 if (_finalizeNode == NULL)
819 // No finalize node, we just finish the loop at the end of exec nodes execution
820 setState(YACS::DONE);
825 // Run the finalize nodes, the loop will be done only when they all finish
826 _unfinishedCounter = 0; // This counter indicates how many branches are not finished
827 for (int i=0 ; i<_execIds.size() ; i++)
829 YASSERT(_execIds[i] == NOT_RUNNING_BRANCH_ID);
830 DEBTRACE("Launching finalize node for branch " << i);
831 _execFinalizeNodes[i]->exUpdateState();
832 _unfinishedCounter++;
834 return YACS::NOEVENT;
837 catch(YACS::Exception& ex)
839 DEBTRACE("ForEachLoop::updateStateOnFinishedEventFrom: "<<ex.what());
840 //no way to push results : put following nodes in FAILED state
841 //TODO could be more fine grain : put only concerned nodes in FAILED state
843 setState(YACS::ERROR);
848 else if(_state == YACS::ACTIVATED)
849 {//more elements to do and loop still activated
850 _execIds[id]=_execCurrentId;
852 int posInAbs(_execCurrentId);
854 posInAbs=_passedData->toAbsId(_execCurrentId);
855 _splitterNode.putSplittedValueOnRankTo(posInAbs,id,false);
857 node->exUpdateState();
858 forwardExecStateToOriginalBody(node);
862 {//elements to process and loop no more activated
863 DEBTRACE("foreach loop state " << _state);
865 return YACS::NOEVENT;
868 YACS::Event ForEachLoop::updateStateForFinalizeNodeOnFinishedEventFrom(Node *node, unsigned int id)
870 DEBTRACE("Finalize node finished on branch " << id);
871 _unfinishedCounter--;
874 DEBTRACE(_unfinishedCounter << " finalize nodes still running");
875 if (_unfinishedCounter == 0)
877 _finalizeNode->setState(YACS::DONE);
878 setState(YACS::DONE);
882 return YACS::NOEVENT;
885 YACS::Event ForEachLoop::updateStateOnFailedEventFrom(Node *node)
888 DynParaLoop::TypeOfNode ton(getIdentityOfNotifyerNode(node,id));
889 // TODO: deal with keepgoing
890 // if(ton!=WORK_NODE || !execInst->getKeepGoingProperty())
892 return DynParaLoop::updateStateOnFailedEventFrom(node);
896 return updateStateForWorkNodeOnFinishedEventFrom(node,id,false);
900 void ForEachLoop::InterceptorizeNameOfPort(std::string& portName)
902 std::replace_if(portName.begin(), portName.end(), std::bind1st(std::equal_to<char>(), '.'), '_');
903 portName += INTERCEPTOR_STR;
906 void ForEachLoop::buildDelegateOf(std::pair<OutPort *, OutPort *>& port, InPort *finalTarget, const std::list<ComposedNode *>& pointsOfView)
908 DynParaLoop::buildDelegateOf(port,finalTarget,pointsOfView);
909 string typeOfPortInstance=(port.first)->getNameOfTypeOfCurrentInstance();
910 if(typeOfPortInstance==OutputPort::NAME)
912 vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
914 for(;iter!=_outGoingPorts.end();iter++,i++)
915 if((*iter)->getRepr()==port.first || *iter==port.first)
917 if(iter!=_outGoingPorts.end())
919 if(*iter!=port.first)
922 (*iter)->addRepr(port.first,_intecptrsForOutGoingPorts[i]);
928 TypeCode *tcTrad((YACS::ENGINE::TypeCode*)finalTarget->edGetType()->subContentType(getFEDeltaBetween(port.first,finalTarget)));
929 TypeCodeSeq *newTc=(TypeCodeSeq *)TypeCode::sequenceTc("","",tcTrad);
930 // The out going ports belong to the ForEachLoop, whereas
931 // the delegated port belongs to a node child of the ForEachLoop.
932 // The name of the delegated port contains dots (bloc.node.outport),
933 // whereas the name of the out going port shouldn't do.
934 std::string outputPortName(getPortName(port.first));
935 InterceptorizeNameOfPort(outputPortName);
936 AnySplitOutputPort *newPort(new AnySplitOutputPort(outputPortName,this,newTc));
937 InterceptorInputPort *intercptor(new InterceptorInputPort(outputPortName + "_in",this,tcTrad));
938 intercptor->setRepr(newPort);
940 newPort->addRepr(port.first,intercptor);
941 _outGoingPorts.push_back(newPort);
942 _intecptrsForOutGoingPorts.push_back(intercptor);
947 throw Exception("ForEachLoop::buildDelegateOf : not implemented for DS because not specified");
950 void ForEachLoop::getDelegateOf(std::pair<OutPort *, OutPort *>& port, InPort *finalTarget, const std::list<ComposedNode *>& pointsOfView) throw(YACS::Exception)
952 string typeOfPortInstance=(port.first)->getNameOfTypeOfCurrentInstance();
953 if(typeOfPortInstance==OutputPort::NAME)
955 vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
956 for(;iter!=_outGoingPorts.end();iter++)
957 if((*iter)->getRepr()==port.first)
959 if(iter==_outGoingPorts.end())
961 string what("ForEachLoop::getDelegateOf : Port with name "); what+=port.first->getName(); what+=" not exported by ForEachLoop "; what+=_name;
962 throw Exception(what);
968 throw Exception("ForEachLoop::getDelegateOf : not implemented because not specified");
971 void ForEachLoop::releaseDelegateOf(OutPort *portDwn, OutPort *portUp, InPort *finalTarget, const std::list<ComposedNode *>& pointsOfView) throw(YACS::Exception)
973 string typeOfPortInstance=portDwn->getNameOfTypeOfCurrentInstance();
974 if(typeOfPortInstance==OutputPort::NAME)
976 vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
977 vector<InterceptorInputPort *>::iterator iter2=_intecptrsForOutGoingPorts.begin();
978 for(;iter!=_outGoingPorts.end();iter++,iter2++)
979 if((*iter)->getRepr()==portDwn)
981 //ASSERT(portUp==*iter.second)
982 if((*iter)->decrRef())
984 AnySplitOutputPort *p=*iter;
985 _outGoingPorts.erase(iter);
987 InterceptorInputPort *ip=*iter2;
988 _intecptrsForOutGoingPorts.erase(iter2);
994 OutPort *ForEachLoop::getDynOutPortByAbsName(int branchNb, const std::string& name)
996 string portName, nodeName;
997 splitNamesBySep(name,Node::SEP_CHAR_IN_PORT,nodeName,portName,false);
998 Node *staticChild = getChildByName(nodeName);
999 return _execNodes[branchNb]->getOutPort(portName);//It's impossible(garanteed by YACS::ENGINE::ForEachLoop::buildDelegateOf)
1000 //that a link starting from _initNode goes out of scope of 'this'.
1003 void ForEachLoop::cleanDynGraph()
1005 DynParaLoop::cleanDynGraph();
1006 for(vector< SequenceAny *>::iterator iter3=_execVals.begin();iter3!=_execVals.end();iter3++)
1007 (*iter3)->decrRef();
1009 for(vector< vector<AnyInputPort *> >::iterator iter4=_execOutGoingPorts.begin();iter4!=_execOutGoingPorts.end();iter4++)
1010 for(vector<AnyInputPort *>::iterator iter5=(*iter4).begin();iter5!=(*iter4).end();iter5++)
1012 _execOutGoingPorts.clear();
1015 void ForEachLoop::storeOutValsInSeqForOutOfScopeUse(int rank, int branchNb)
1017 vector<AnyInputPort *>::iterator iter;
1019 for(iter=_execOutGoingPorts[branchNb].begin();iter!=_execOutGoingPorts[branchNb].end();iter++,i++)
1021 Any *val=(Any *)(*iter)->getValue();
1022 _execVals[i]->setEltAtRank(rank,val);
1026 int ForEachLoop::getFinishedId()
1029 return _splitterNode.getNumberOfElements();
1031 return _passedData->getNumberOfElementsToDo();
1034 void ForEachLoop::prepareSequenceValues(int sizeOfSamples)
1036 _execVals.resize(_outGoingPorts.size());
1037 vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
1038 for(int i=0;iter!=_outGoingPorts.end();iter++,i++)
1039 _execVals[i]=SequenceAny::New((*iter)->edGetType()->contentType(),sizeOfSamples);
1042 void ForEachLoop::pushAllSequenceValues()
1044 vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
1046 for(;iter!=_outGoingPorts.end();iter++,i++)
1047 (*iter)->put((const void *)_execVals[i]);
1050 void ForEachLoop::createOutputOutOfScopeInterceptors(int branchNb)
1052 vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
1054 for(;iter!=_outGoingPorts.end();iter++,i++)
1056 DEBTRACE( (*iter)->getName() << " " << (*iter)->edGetType()->kind() );
1057 //AnyInputPort *interceptor=new AnyInputPort((*iter)->getName(),this,(*iter)->edGetType());
1058 OutPort *portOut=getDynOutPortByAbsName(branchNb,getOutPortName(((*iter)->getRepr())));
1059 DEBTRACE( portOut->getName() );
1060 TypeCode *tc((TypeCode *)(*iter)->edGetType()->contentType());
1061 AnyInputPort *interceptor=new AnyInputPort((*iter)->getName(),this,tc);
1062 portOut->addInPort(interceptor);
1063 _execOutGoingPorts[branchNb].push_back(interceptor);
1067 void ForEachLoop::checkLinkPossibility(OutPort *start, const std::list<ComposedNode *>& pointsOfViewStart,
1068 InPort *end, const std::list<ComposedNode *>& pointsOfViewEnd) throw(YACS::Exception)
1070 DynParaLoop::checkLinkPossibility(start, pointsOfViewStart, end, pointsOfViewEnd);
1071 if(end->getNode() == &_splitterNode)
1072 throw Exception("Illegal link within a foreach loop: \
1073 the 'SmplsCollection' port cannot be linked within the scope of the loop.");
1074 if(end == &_nbOfBranches)
1075 throw Exception("Illegal link within a foreach loop: \
1076 the 'nbBranches' port cannot be linked within the scope of the loop.");
1079 std::list<OutputPort *> ForEachLoop::getLocalOutputPorts() const
1081 list<OutputPort *> ret;
1082 ret.push_back(getOutputPort(NAME_OF_SPLITTED_SEQ_OUT));
1086 void ForEachLoop::accept(Visitor *visitor)
1088 visitor->visitForEachLoop(this);
1091 //! Dump the node state to a stream
1093 * \param os : the output stream
1095 void ForEachLoop::writeDot(std::ostream &os) const
1097 os << " subgraph cluster_" << getId() << " {\n" ;
1098 //only one node in a loop
1101 _node->writeDot(os);
1102 os << getId() << " -> " << _node->getId() << ";\n";
1105 os << getId() << "[fillcolor=\"" ;
1106 YACS::StatesForNode state=getEffectiveState();
1107 os << getColorState(state);
1108 os << "\" label=\"" << "Loop:" ;
1109 os << getName() <<"\"];\n";
1112 //! Reset the state of the node and its children depending on the parameter level
1113 void ForEachLoop::resetState(int level)
1116 DynParaLoop::resetState(level);
1118 //Note: cleanDynGraph is not a virtual method (must be called from ForEachLoop object)
1122 std::string ForEachLoop::getProgress() const
1124 int nbElems(getNbOfElementsToBeProcessed());
1125 std::stringstream aProgress;
1127 aProgress << _currentIndex << "/" << nbElems;
1130 return aProgress.str();
1133 //! Get the progress weight for all elementary nodes
1135 * Only elementary nodes have weight. For each node in the loop, the weight done is multiplied
1136 * by the number of elements done and the weight total by the number total of elements
1138 list<ProgressWeight> ForEachLoop::getProgressWeight() const
1140 list<ProgressWeight> ret;
1141 list<Node *> setOfNode=edGetDirectDescendants();
1142 int elemDone=getCurrentIndex();
1143 int elemTotal=getNbOfElementsToBeProcessed();
1144 for(list<Node *>::const_iterator iter=setOfNode.begin();iter!=setOfNode.end();iter++)
1146 list<ProgressWeight> myCurrentSet=(*iter)->getProgressWeight();
1147 for(list<ProgressWeight>::iterator iter=myCurrentSet.begin();iter!=myCurrentSet.end();iter++)
1149 (*iter).weightDone=((*iter).weightTotal) * elemDone;
1150 (*iter).weightTotal*=elemTotal;
1152 ret.insert(ret.end(),myCurrentSet.begin(),myCurrentSet.end());
1157 int ForEachLoop::getNbOfElementsToBeProcessed() const
1159 int nbBranches = _nbOfBranches.getIntValue();
1160 return _splitterNode.getNumberOfElements()
1161 + (_initNode ? nbBranches:0)
1162 + (_finalizeNode ? nbBranches:0) ;
1166 * This method allows to retrieve the state of \a this during execution or after. This method works even if this is \b NOT complete, or during execution or after a failure in \a this.
1167 * The typical usage of this method is to retrieve the results of items that passed successfully to avoid to lose all of them if only one fails.
1168 * This method has one input \a execut and 3 outputs.
1170 * \param [in] execut - The single input is for threadsafety reasons because this method can be called safely during the execution of \a this.
1171 * \param [out] outputs - For each output ports in \a this linked with nodes sharing the same father than \a this the passed results are stored.
1172 * All of the items in \a outputs have the same size.
1173 * \param [out] nameOfOutputs - The array with same size than \a outputs, that tells for each item in outputs the output port it refers to.
1174 * \return the list of ids among \c this->edGetSeqOfSamplesPort() that run successfully. The length of this returned array will be the length of all
1175 * SequenceAny objects contained in \a outputs.
1177 * \sa edGetSeqOfSamplesPort
1179 std::vector<unsigned int> ForEachLoop::getPassedResults(Executor *execut, std::vector<SequenceAny *>& outputs, std::vector<std::string>& nameOfOutputs) const
1181 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&(execut->getTheMutexForSchedulerUpdate()));
1182 if(_execVals.empty())
1183 return std::vector<unsigned int>();
1184 if(_execOutGoingPorts.empty())
1185 return std::vector<unsigned int>();
1186 std::size_t sz(_execVals.size());
1188 nameOfOutputs.resize(sz);
1189 const std::vector<AnyInputPort *>& ports(_execOutGoingPorts[0]);
1190 for(std::size_t i=0;i<sz;i++)
1192 outputs[i]=_execVals[i]->removeUnsetItemsFromThis();
1193 nameOfOutputs[i]=ports[i]->getName();
1195 return _execVals[0]->getSetItems();
1199 * This method is typically useful for post-mortem relaunch to avoid to recompute already passed cases. This method takes in input exactly the parameters retrieved by
1200 * getPassedResults method.
1202 void ForEachLoop::assignPassedResults(const std::vector<unsigned int>& passedIds, const std::vector<SequenceAny *>& passedOutputs, const std::vector<std::string>& nameOfOutputs)
1206 _passedData=new ForEachLoopPassedData(passedIds,passedOutputs,nameOfOutputs);
1209 int ForEachLoop::getFEDeltaBetween(OutPort *start, InPort *end)
1211 Node *ns(start->getNode()),*ne(end->getNode());
1212 ComposedNode *co(getLowestCommonAncestor(ns,ne));
1217 ForEachLoop *isFE(dynamic_cast<ForEachLoop *>(work));
1220 work=work->getFather();
1222 if(dynamic_cast<AnySplitOutputPort *>(start))
1228 * This method is used to obtain the values already processed by the ForEachLoop.
1229 * A new ForEachLoopPassedData object is returned. You have to delete it.
1231 ForEachLoopPassedData* ForEachLoop::getProcessedData()const
1233 std::vector<SequenceAny *> outputs;
1234 std::vector<std::string> nameOfOutputs;
1235 if(_execVals.empty() || _execOutGoingPorts.empty())
1236 return new ForEachLoopPassedData(std::vector<unsigned int>(), outputs, nameOfOutputs);
1237 std::size_t sz(_execVals.size());
1239 nameOfOutputs.resize(sz);
1240 const std::vector<AnyInputPort *>& ports(_execOutGoingPorts[0]);
1241 for(std::size_t i=0;i<sz;i++)
1243 outputs[i]=_execVals[i]->removeUnsetItemsFromThis();
1244 nameOfOutputs[i]=ports[i]->getName();
1246 return new ForEachLoopPassedData(_execVals[0]->getSetItems(), outputs, nameOfOutputs);
1249 void ForEachLoop::setProcessedData(ForEachLoopPassedData* processedData)
1253 _passedData = processedData;
1257 * \param portName : "interceptorized" name of port.
1259 const YACS::ENGINE::TypeCode* ForEachLoop::getOutputPortType(const std::string& portName)const
1261 const YACS::ENGINE::TypeCode* ret=NULL;
1262 vector<AnySplitOutputPort *>::const_iterator it;
1263 for(it=_outGoingPorts.begin();it!=_outGoingPorts.end() && ret==NULL;it++)
1265 std::string originalPortName(getPortName(*it));
1266 //InterceptorizeNameOfPort(originalPortName);
1267 DEBTRACE("ForEachLoop::getOutputPortType compare " << portName << " == " << originalPortName);
1268 if(originalPortName == portName)
1270 ret = (*it)->edGetType()->contentType();